Compare commits
2 Commits
b1f64c4bac
...
cbd16a39ba
| Author | SHA1 | Date | |
|---|---|---|---|
| cbd16a39ba | |||
| 92f219b575 |
@@ -22,6 +22,7 @@ STORE_ID=2790685415443269
|
||||
# 路径配置
|
||||
EXPORT_ROOT=r"D:\LLZQ\DB\export",
|
||||
LOG_ROOT=r"D:\LLZQ\DB\logs",
|
||||
PGCLIENTENCODING=utf8
|
||||
|
||||
# ETL配置
|
||||
OVERLAP_SECONDS=120 # 为了防止边界遗漏,会往前“回拨”一点的冗余秒数
|
||||
|
||||
@@ -36,8 +36,6 @@ LOG_UNKNOWN_FIELDS=true
|
||||
HASH_ALGO=sha1
|
||||
STRICT_NUMERIC=true
|
||||
ROUND_MONEY_SCALE=2
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
|
||||
# 测试/离线模式
|
||||
TEST_MODE=ONLINE
|
||||
@@ -46,4 +44,3 @@ TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp
|
||||
|
||||
# 测试数据库(可选:若设置则单元测试连入此 DSN)
|
||||
TEST_DB_DSN=
|
||||
>>>>>>> main
|
||||
|
||||
5815
etl_billiards/LLZQ-test-1.sql
Normal file
5815
etl_billiards/LLZQ-test-1.sql
Normal file
File diff suppressed because it is too large
Load Diff
@@ -219,6 +219,42 @@ python scripts/run_tests.py --preset offline_realdb
|
||||
python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py
|
||||
```
|
||||
|
||||
#### 3.3.2 脚本化测试组合(`run_tests.py` / `test_presets.py`)
|
||||
|
||||
- `scripts/run_tests.py` 是 pytest 的统一入口:自动把项目根目录加入 `sys.path`,并提供 `--suite online/offline/integration`、`--tests`(自定义路径)、`--mode`、`--db-dsn`、`--json-archive`、`--json-temp`、`--keyword/-k`、`--pytest-args`、`--env KEY=VALUE` 等参数,可以像搭积木一样自由组合;
|
||||
- `--preset foo` 会读取 `scripts/test_presets.py` 内 `PRESETS["foo"]` 的配置,并叠加到当前命令;`--list-presets` 与 `--dry-run` 可用来审阅或仅打印命令;
|
||||
- 直接执行 `python scripts/test_presets.py` 可依次运行 `AUTO_RUN_PRESETS` 中列出的预置;传入 `--preset x --dry-run` 则只打印对应命令。
|
||||
|
||||
`test_presets.py` 充当“指令仓库”。每个预置都是一个字典,常用字段解释如下:
|
||||
|
||||
| 字段 | 作用 |
|
||||
| ---- | ---- |
|
||||
| `suite` | 复用 `run_tests.py` 内置套件(online/offline/integration,可多选) |
|
||||
| `tests` | 追加任意 pytest 路径,例如 `tests/unit/test_config.py` |
|
||||
| `mode` | 覆盖 `TEST_MODE`(ONLINE / OFFLINE) |
|
||||
| `db_dsn` | 覆盖 `TEST_DB_DSN`,用于连入真实测试库 |
|
||||
| `json_archive` / `json_temp` | 配置离线 JSON 归档与临时目录 |
|
||||
| `keyword` | 映射到 `pytest -k`,用于关键字过滤 |
|
||||
| `pytest_args` | 附加 pytest 参数,例 `-vv --maxfail=1` |
|
||||
| `env` | 额外环境变量列表,如 `["STORE_ID=123"]` |
|
||||
| `preset_meta` | 说明性文字,便于描述场景 |
|
||||
|
||||
示例:`offline_realdb` 预置会设置 `TEST_MODE=OFFLINE`、指定 `tests/testdata_json` 为归档目录,并通过 `db_dsn` 连到测试库。执行 `python scripts/run_tests.py --preset offline_realdb` 或 `python scripts/test_presets.py --preset offline_realdb` 即可复用该组合,保证本地、CI 与生产回放脚本一致。
|
||||
|
||||
#### 3.3.3 数据库连通性快速检查
|
||||
|
||||
`python scripts/test_db_connection.py` 提供最轻量的 PostgreSQL 连通性检测:默认使用 `TEST_DB_DSN`(也可传 `--dsn`),尝试连接并执行 `SELECT 1 AS ok`(可通过 `--query` 自定义)。典型用途:
|
||||
|
||||
```bash
|
||||
# 读取 .env/环境变量中的 TEST_DB_DSN
|
||||
python scripts/test_db_connection.py
|
||||
|
||||
# 临时指定 DSN,并检查任务配置表
|
||||
python scripts/test_db_connection.py --dsn postgresql://user:pwd@host:5432/LLZQ-test --query "SELECT count(*) FROM etl_admin.etl_task"
|
||||
```
|
||||
|
||||
脚本返回 0 代表连接与查询成功;若返回非 0,可结合第 8 章“常见问题排查”的数据库章节(网络、防火墙、账号权限等)先定位问题,再运行完整 ETL。
|
||||
|
||||
---
|
||||
|
||||
## 4. 项目结构与文件说明
|
||||
@@ -452,6 +488,19 @@ etl_billiards/
|
||||
- API 请求失败时按配置进行重试,超过重试次数记录错误并终止该任务。
|
||||
- 所有错误被记录到日志和运行追踪表,便于事后排查。
|
||||
|
||||
### 5.6 ODS + DWD 双阶段策略(新增)
|
||||
|
||||
为了支撑回溯/重放与后续 DWD 宽表构建,项目新增了 `billiards_ods` Schema 以及一组专门的 ODS 任务/Loader:
|
||||
|
||||
- **ODS 表**:`billiards_ods.ods_order_settle`、`ods_table_use_detail`、`ods_assistant_ledger`、`ods_assistant_abolish`、`ods_goods_ledger`、`ods_payment`、`ods_refund`、`ods_coupon_verify`、`ods_member`、`ods_member_card`、`ods_package_coupon`、`ods_inventory_stock`、`ods_inventory_change`。每条记录都会保存 `store_id + 源主键 + payload JSON + fetched_at + source_endpoint` 等信息。
|
||||
- **通用 Loader**:`loaders/ods/generic.py::GenericODSLoader` 统一封装了 `INSERT ... ON CONFLICT ...` 与批量写入逻辑,调用方只需提供列名与主键列即可。
|
||||
- **ODS 任务**:`tasks/ods_tasks.py` 内通过 `OdsTaskSpec` 定义了一组任务(`ODS_ORDER_SETTLE`、`ODS_PAYMENT`、`ODS_ASSISTANT_LEDGER` 等),并在 `TaskRegistry` 中自动注册,可直接通过 `python -m cli.main --tasks ODS_ORDER_SETTLE,ODS_PAYMENT` 执行。
|
||||
- **双阶段链路**:
|
||||
1. 阶段 1(ODS):调用 API/离线归档 JSON,将原始记录写入 ODS 表,保留分页、抓取时间、来源文件等元数据。
|
||||
2. 阶段 2(DWD/DIM):后续订单、支付、券等事实任务将改为从 ODS 读取 payload,经过解析/校验后写入 `billiards.fact_*`、`dim_*` 表,避免重复拉取上游接口。
|
||||
|
||||
> 新增的单元测试 `tests/unit/test_ods_tasks.py` 覆盖了 `ODS_ORDER_SETTLE`、`ODS_PAYMENT` 的入库路径,可作为扩展其他 ODS 任务的模板。
|
||||
|
||||
---
|
||||
|
||||
## 6. 迁移指南(从旧脚本到当前项目)
|
||||
|
||||
@@ -54,42 +54,76 @@ class APIClient:
|
||||
resp.raise_for_status()
|
||||
return resp.json()
|
||||
|
||||
def iter_paginated(
|
||||
self,
|
||||
endpoint: str,
|
||||
params: dict | None,
|
||||
page_size: int = 200,
|
||||
page_field: str = "pageIndex",
|
||||
size_field: str = "pageSize",
|
||||
data_path: tuple = ("data",),
|
||||
list_key: str | None = None,
|
||||
):
|
||||
"""分页迭代器:逐页拉取数据并产出 (page_no, records, request_params, raw_response)。"""
|
||||
base_params = dict(params or {})
|
||||
page = 1
|
||||
|
||||
while True:
|
||||
page_params = dict(base_params)
|
||||
page_params[page_field] = page
|
||||
page_params[size_field] = page_size
|
||||
|
||||
payload = self.get(endpoint, page_params)
|
||||
records = self._extract_list(payload, data_path, list_key)
|
||||
|
||||
yield page, records, page_params, payload
|
||||
|
||||
if len(records) < page_size:
|
||||
break
|
||||
|
||||
if len(records) == 0:
|
||||
break
|
||||
|
||||
page += 1
|
||||
|
||||
def get_paginated(self, endpoint: str, params: dict, page_size: int = 200,
|
||||
page_field: str = "pageIndex", size_field: str = "pageSize",
|
||||
data_path: tuple = ("data",), list_key: str = None) -> tuple:
|
||||
"""分页获取数据"""
|
||||
"""分页获取数据并将所有记录汇总在一个列表中。"""
|
||||
records, pages_meta = [], []
|
||||
page = 1
|
||||
|
||||
while True:
|
||||
p = dict(params)
|
||||
p[page_field] = page
|
||||
p[size_field] = page_size
|
||||
|
||||
obj = self.get(endpoint, p)
|
||||
|
||||
# 解析数据路径
|
||||
cur = obj
|
||||
for k in data_path:
|
||||
if isinstance(cur, dict) and k in cur:
|
||||
cur = cur[k]
|
||||
|
||||
if list_key:
|
||||
cur = (cur or {}).get(list_key, [])
|
||||
|
||||
if not isinstance(cur, list):
|
||||
cur = []
|
||||
|
||||
records.extend(cur)
|
||||
|
||||
if len(cur) == 0:
|
||||
break
|
||||
|
||||
pages_meta.append({"page": page, "request": p, "response": obj})
|
||||
|
||||
if len(cur) < page_size:
|
||||
break
|
||||
|
||||
page += 1
|
||||
|
||||
|
||||
for page_no, page_records, request_params, response in self.iter_paginated(
|
||||
endpoint=endpoint,
|
||||
params=params,
|
||||
page_size=page_size,
|
||||
page_field=page_field,
|
||||
size_field=size_field,
|
||||
data_path=data_path,
|
||||
list_key=list_key,
|
||||
):
|
||||
records.extend(page_records)
|
||||
pages_meta.append(
|
||||
{"page": page_no, "request": request_params, "response": response}
|
||||
)
|
||||
|
||||
return records, pages_meta
|
||||
|
||||
@staticmethod
|
||||
def _extract_list(payload: dict, data_path: tuple, list_key: str | None):
|
||||
"""辅助函数:根据 data_path/list_key 提取列表结构。"""
|
||||
cur = payload
|
||||
for key in data_path:
|
||||
if isinstance(cur, dict):
|
||||
cur = cur.get(key)
|
||||
else:
|
||||
cur = None
|
||||
if cur is None:
|
||||
break
|
||||
|
||||
if list_key and isinstance(cur, dict):
|
||||
cur = cur.get(list_key)
|
||||
|
||||
if not isinstance(cur, list):
|
||||
cur = []
|
||||
|
||||
return cur
|
||||
|
||||
@@ -7,6 +7,7 @@ class DatabaseOperations:
|
||||
"""数据库批量操作封装"""
|
||||
|
||||
def __init__(self, connection):
|
||||
self._connection = connection
|
||||
self.conn = connection.conn
|
||||
|
||||
def batch_execute(self, sql: str, rows: list, page_size: int = 1000):
|
||||
@@ -75,3 +76,24 @@ class DatabaseOperations:
|
||||
if isinstance(rec, dict):
|
||||
return bool(rec.get("inserted"))
|
||||
return False
|
||||
|
||||
# --- pass-through helpers -------------------------------------------------
|
||||
def commit(self):
|
||||
"""提交事务(委托给底层连接)"""
|
||||
self._connection.commit()
|
||||
|
||||
def rollback(self):
|
||||
"""回滚事务(委托给底层连接)"""
|
||||
self._connection.rollback()
|
||||
|
||||
def query(self, sql: str, args=None):
|
||||
"""执行查询并返回结果"""
|
||||
return self._connection.query(sql, args)
|
||||
|
||||
def execute(self, sql: str, args=None):
|
||||
"""执行任意 SQL"""
|
||||
self._connection.execute(sql, args)
|
||||
|
||||
def cursor(self):
|
||||
"""暴露原生 cursor,供特殊操作使用"""
|
||||
return self.conn.cursor()
|
||||
|
||||
@@ -1,31 +1,50 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""支付记录加载器"""
|
||||
"""支付事实表加载器"""
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
class PaymentLoader(BaseLoader):
|
||||
"""支付记录加载器"""
|
||||
"""支付数据加载器"""
|
||||
|
||||
def upsert_payments(self, records: list, store_id: int) -> tuple:
|
||||
"""加载支付记录"""
|
||||
"""加载支付数据"""
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_payment (
|
||||
store_id, pay_id, order_id, pay_time, pay_amount,
|
||||
pay_type, pay_status, remark, raw_data
|
||||
store_id, pay_id, site_id, tenant_id,
|
||||
order_settle_id, order_trade_no,
|
||||
relate_type, relate_id,
|
||||
create_time, pay_time,
|
||||
pay_amount, fee_amount, discount_amount,
|
||||
payment_method, online_pay_channel, pay_terminal,
|
||||
pay_status, raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s, %(pay_id)s, %(order_id)s, %(pay_time)s, %(pay_amount)s,
|
||||
%(pay_type)s, %(pay_status)s, %(remark)s, %(raw_data)s
|
||||
%(store_id)s, %(pay_id)s, %(site_id)s, %(tenant_id)s,
|
||||
%(order_settle_id)s, %(order_trade_no)s,
|
||||
%(relate_type)s, %(relate_id)s,
|
||||
%(create_time)s, %(pay_time)s,
|
||||
%(pay_amount)s, %(fee_amount)s, %(discount_amount)s,
|
||||
%(payment_method)s, %(online_pay_channel)s, %(pay_terminal)s,
|
||||
%(pay_status)s, %(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, pay_id) DO UPDATE SET
|
||||
order_id = EXCLUDED.order_id,
|
||||
order_settle_id = EXCLUDED.order_settle_id,
|
||||
order_trade_no = EXCLUDED.order_trade_no,
|
||||
relate_type = EXCLUDED.relate_type,
|
||||
relate_id = EXCLUDED.relate_id,
|
||||
site_id = EXCLUDED.site_id,
|
||||
tenant_id = EXCLUDED.tenant_id,
|
||||
create_time = EXCLUDED.create_time,
|
||||
pay_time = EXCLUDED.pay_time,
|
||||
pay_amount = EXCLUDED.pay_amount,
|
||||
pay_type = EXCLUDED.pay_type,
|
||||
fee_amount = EXCLUDED.fee_amount,
|
||||
discount_amount = EXCLUDED.discount_amount,
|
||||
payment_method = EXCLUDED.payment_method,
|
||||
online_pay_channel = EXCLUDED.online_pay_channel,
|
||||
pay_terminal = EXCLUDED.pay_terminal,
|
||||
pay_status = EXCLUDED.pay_status,
|
||||
remark = EXCLUDED.remark,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
|
||||
188
etl_billiards/loaders/facts/ticket.py
Normal file
188
etl_billiards/loaders/facts/ticket.py
Normal file
@@ -0,0 +1,188 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""小票详情加载器"""
|
||||
from ..base_loader import BaseLoader
|
||||
import json
|
||||
|
||||
class TicketLoader(BaseLoader):
|
||||
"""
|
||||
Loader for parsing Ticket Detail JSON and populating DWD fact tables.
|
||||
Handles:
|
||||
- fact_order (Header)
|
||||
- fact_order_goods (Items)
|
||||
- fact_table_usage (Items)
|
||||
- fact_assistant_service (Items)
|
||||
"""
|
||||
|
||||
def process_tickets(self, tickets: list, store_id: int) -> tuple:
|
||||
"""
|
||||
Process a batch of ticket JSONs.
|
||||
Returns (inserted_count, error_count)
|
||||
"""
|
||||
inserted_count = 0
|
||||
error_count = 0
|
||||
|
||||
# Prepare batch lists
|
||||
orders = []
|
||||
goods_list = []
|
||||
table_usages = []
|
||||
assistant_services = []
|
||||
|
||||
for ticket in tickets:
|
||||
try:
|
||||
# 1. Parse Header (fact_order)
|
||||
root_data = ticket.get("data", {}).get("data", {})
|
||||
if not root_data:
|
||||
continue
|
||||
|
||||
order_settle_id = root_data.get("orderSettleId")
|
||||
if not order_settle_id:
|
||||
continue
|
||||
|
||||
orders.append({
|
||||
"store_id": store_id,
|
||||
"order_settle_id": order_settle_id,
|
||||
"order_trade_no": 0,
|
||||
"order_no": str(root_data.get("orderSettleNumber", "")),
|
||||
"member_id": 0,
|
||||
"pay_time": root_data.get("payTime"),
|
||||
"total_amount": root_data.get("consumeMoney", 0),
|
||||
"pay_amount": root_data.get("actualPayment", 0),
|
||||
"discount_amount": root_data.get("memberOfferAmount", 0),
|
||||
"coupon_amount": root_data.get("couponAmount", 0),
|
||||
"status": "PAID",
|
||||
"cashier_name": root_data.get("cashierName", ""),
|
||||
"remark": root_data.get("orderRemark", ""),
|
||||
"raw_data": json.dumps(ticket, ensure_ascii=False)
|
||||
})
|
||||
|
||||
# 2. Parse Items (orderItem list)
|
||||
order_items = root_data.get("orderItem", [])
|
||||
for item in order_items:
|
||||
order_trade_no = item.get("siteOrderId")
|
||||
|
||||
# 2.1 Table Ledger
|
||||
table_ledger = item.get("tableLedger")
|
||||
if table_ledger:
|
||||
table_usages.append({
|
||||
"store_id": store_id,
|
||||
"order_ledger_id": table_ledger.get("orderTableLedgerId"),
|
||||
"order_settle_id": order_settle_id,
|
||||
"table_id": table_ledger.get("siteTableId"),
|
||||
"table_name": table_ledger.get("tableName"),
|
||||
"start_time": table_ledger.get("chargeStartTime"),
|
||||
"end_time": table_ledger.get("chargeEndTime"),
|
||||
"duration_minutes": table_ledger.get("useDuration", 0),
|
||||
"total_amount": table_ledger.get("consumptionAmount", 0),
|
||||
"pay_amount": table_ledger.get("consumptionAmount", 0) - table_ledger.get("memberDiscountAmount", 0)
|
||||
})
|
||||
|
||||
# 2.2 Goods Ledgers
|
||||
goods_ledgers = item.get("goodsLedgers", [])
|
||||
for g in goods_ledgers:
|
||||
goods_list.append({
|
||||
"store_id": store_id,
|
||||
"order_goods_id": g.get("orderGoodsLedgerId"),
|
||||
"order_settle_id": order_settle_id,
|
||||
"order_trade_no": order_trade_no,
|
||||
"goods_id": g.get("siteGoodsId"),
|
||||
"goods_name": g.get("goodsName"),
|
||||
"quantity": g.get("goodsCount", 0),
|
||||
"unit_price": g.get("goodsPrice", 0),
|
||||
"total_amount": g.get("ledgerAmount", 0),
|
||||
"pay_amount": g.get("realGoodsMoney", 0)
|
||||
})
|
||||
|
||||
# 2.3 Assistant Services
|
||||
assistant_ledgers = item.get("assistantPlayWith", [])
|
||||
for a in assistant_ledgers:
|
||||
assistant_services.append({
|
||||
"store_id": store_id,
|
||||
"ledger_id": a.get("orderAssistantLedgerId"),
|
||||
"order_settle_id": order_settle_id,
|
||||
"assistant_id": a.get("assistantId"),
|
||||
"assistant_name": a.get("ledgerName"),
|
||||
"service_type": a.get("skillName", "Play"),
|
||||
"start_time": a.get("ledgerStartTime"),
|
||||
"end_time": a.get("ledgerEndTime"),
|
||||
"duration_minutes": int(a.get("ledgerCount", 0) / 60) if a.get("ledgerCount") else 0,
|
||||
"total_amount": a.get("ledgerAmount", 0),
|
||||
"pay_amount": a.get("ledgerAmount", 0)
|
||||
})
|
||||
|
||||
inserted_count += 1
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error parsing ticket: {e}", exc_info=True)
|
||||
error_count += 1
|
||||
|
||||
# 3. Batch Insert/Upsert
|
||||
if orders:
|
||||
self._upsert_orders(orders)
|
||||
if goods_list:
|
||||
self._upsert_goods(goods_list)
|
||||
if table_usages:
|
||||
self._upsert_table_usages(table_usages)
|
||||
if assistant_services:
|
||||
self._upsert_assistant_services(assistant_services)
|
||||
|
||||
return inserted_count, error_count
|
||||
|
||||
def _upsert_orders(self, rows):
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_order (
|
||||
store_id, order_settle_id, order_trade_no, order_no, member_id,
|
||||
pay_time, total_amount, pay_amount, discount_amount, coupon_amount,
|
||||
status, cashier_name, remark, raw_data
|
||||
) VALUES (
|
||||
%(store_id)s, %(order_settle_id)s, %(order_trade_no)s, %(order_no)s, %(member_id)s,
|
||||
%(pay_time)s, %(total_amount)s, %(pay_amount)s, %(discount_amount)s, %(coupon_amount)s,
|
||||
%(status)s, %(cashier_name)s, %(remark)s, %(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, order_settle_id) DO UPDATE SET
|
||||
pay_time = EXCLUDED.pay_time,
|
||||
pay_amount = EXCLUDED.pay_amount,
|
||||
updated_at = now()
|
||||
"""
|
||||
self.db.batch_execute(sql, rows)
|
||||
|
||||
def _upsert_goods(self, rows):
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_order_goods (
|
||||
store_id, order_goods_id, order_settle_id, order_trade_no,
|
||||
goods_id, goods_name, quantity, unit_price, total_amount, pay_amount
|
||||
) VALUES (
|
||||
%(store_id)s, %(order_goods_id)s, %(order_settle_id)s, %(order_trade_no)s,
|
||||
%(goods_id)s, %(goods_name)s, %(quantity)s, %(unit_price)s, %(total_amount)s, %(pay_amount)s
|
||||
)
|
||||
ON CONFLICT (store_id, order_goods_id) DO UPDATE SET
|
||||
pay_amount = EXCLUDED.pay_amount
|
||||
"""
|
||||
self.db.batch_execute(sql, rows)
|
||||
|
||||
def _upsert_table_usages(self, rows):
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_table_usage (
|
||||
store_id, order_ledger_id, order_settle_id, table_id, table_name,
|
||||
start_time, end_time, duration_minutes, total_amount, pay_amount
|
||||
) VALUES (
|
||||
%(store_id)s, %(order_ledger_id)s, %(order_settle_id)s, %(table_id)s, %(table_name)s,
|
||||
%(start_time)s, %(end_time)s, %(duration_minutes)s, %(total_amount)s, %(pay_amount)s
|
||||
)
|
||||
ON CONFLICT (store_id, order_ledger_id) DO UPDATE SET
|
||||
pay_amount = EXCLUDED.pay_amount
|
||||
"""
|
||||
self.db.batch_execute(sql, rows)
|
||||
|
||||
def _upsert_assistant_services(self, rows):
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_assistant_service (
|
||||
store_id, ledger_id, order_settle_id, assistant_id, assistant_name,
|
||||
service_type, start_time, end_time, duration_minutes, total_amount, pay_amount
|
||||
) VALUES (
|
||||
%(store_id)s, %(ledger_id)s, %(order_settle_id)s, %(assistant_id)s, %(assistant_name)s,
|
||||
%(service_type)s, %(start_time)s, %(end_time)s, %(duration_minutes)s, %(total_amount)s, %(pay_amount)s
|
||||
)
|
||||
ON CONFLICT (store_id, ledger_id) DO UPDATE SET
|
||||
pay_amount = EXCLUDED.pay_amount
|
||||
"""
|
||||
self.db.batch_execute(sql, rows)
|
||||
6
etl_billiards/loaders/ods/__init__.py
Normal file
6
etl_billiards/loaders/ods/__init__.py
Normal file
@@ -0,0 +1,6 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""ODS loader helpers."""
|
||||
|
||||
from .generic import GenericODSLoader
|
||||
|
||||
__all__ = ["GenericODSLoader"]
|
||||
67
etl_billiards/loaders/ods/generic.py
Normal file
67
etl_billiards/loaders/ods/generic.py
Normal file
@@ -0,0 +1,67 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Generic ODS loader that keeps raw payload + primary keys."""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
from datetime import datetime, timezone
|
||||
from typing import Iterable, Sequence
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class GenericODSLoader(BaseLoader):
|
||||
"""Insert/update helper for ODS tables that share the same pattern."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
db_ops,
|
||||
table_name: str,
|
||||
columns: Sequence[str],
|
||||
conflict_columns: Sequence[str],
|
||||
):
|
||||
super().__init__(db_ops)
|
||||
if not conflict_columns:
|
||||
raise ValueError("conflict_columns must not be empty for ODS loader")
|
||||
self.table_name = table_name
|
||||
self.columns = list(columns)
|
||||
self.conflict_columns = list(conflict_columns)
|
||||
self._sql = self._build_sql()
|
||||
|
||||
def upsert_rows(self, rows: Iterable[dict]) -> tuple[int, int, int]:
|
||||
"""Insert/update the provided iterable of dictionaries."""
|
||||
rows = list(rows)
|
||||
if not rows:
|
||||
return (0, 0, 0)
|
||||
|
||||
normalized = [self._normalize_row(row) for row in rows]
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
self._sql, normalized, page_size=self._batch_size()
|
||||
)
|
||||
return inserted, updated, 0
|
||||
|
||||
def _build_sql(self) -> str:
|
||||
col_list = ", ".join(self.columns)
|
||||
placeholders = ", ".join(f"%({col})s" for col in self.columns)
|
||||
conflict_clause = ", ".join(self.conflict_columns)
|
||||
update_columns = [c for c in self.columns if c not in self.conflict_columns]
|
||||
set_clause = ", ".join(f"{col} = EXCLUDED.{col}" for col in update_columns)
|
||||
return (
|
||||
f"INSERT INTO {self.table_name} ({col_list}) "
|
||||
f"VALUES ({placeholders}) "
|
||||
f"ON CONFLICT ({conflict_clause}) DO UPDATE SET {set_clause} "
|
||||
f"RETURNING (xmax = 0) AS inserted"
|
||||
)
|
||||
|
||||
def _normalize_row(self, row: dict) -> dict:
|
||||
normalized = {}
|
||||
for col in self.columns:
|
||||
value = row.get(col)
|
||||
if col == "payload" and value is not None and not isinstance(value, str):
|
||||
normalized[col] = json.dumps(value, ensure_ascii=False)
|
||||
else:
|
||||
normalized[col] = value
|
||||
|
||||
if "fetched_at" in normalized and normalized["fetched_at"] is None:
|
||||
normalized["fetched_at"] = datetime.now(timezone.utc)
|
||||
|
||||
return normalized
|
||||
@@ -14,6 +14,11 @@ from tasks.topups_task import TopupsTask
|
||||
from tasks.table_discount_task import TableDiscountTask
|
||||
from tasks.assistant_abolish_task import AssistantAbolishTask
|
||||
from tasks.ledger_task import LedgerTask
|
||||
from tasks.ods_tasks import ODS_TASK_CLASSES
|
||||
from tasks.ticket_dwd_task import TicketDwdTask
|
||||
from tasks.manual_ingest_task import ManualIngestTask
|
||||
from tasks.payments_dwd_task import PaymentsDwdTask
|
||||
from tasks.members_dwd_task import MembersDwdTask
|
||||
|
||||
class TaskRegistry:
|
||||
"""任务注册和工厂"""
|
||||
@@ -55,3 +60,9 @@ default_registry.register("TOPUPS", TopupsTask)
|
||||
default_registry.register("TABLE_DISCOUNT", TableDiscountTask)
|
||||
default_registry.register("ASSISTANT_ABOLISH", AssistantAbolishTask)
|
||||
default_registry.register("LEDGER", LedgerTask)
|
||||
default_registry.register("TICKET_DWD", TicketDwdTask)
|
||||
default_registry.register("MANUAL_INGEST", ManualIngestTask)
|
||||
default_registry.register("PAYMENTS_DWD", PaymentsDwdTask)
|
||||
default_registry.register("MEMBERS_DWD", MembersDwdTask)
|
||||
for code, task_cls in ODS_TASK_CLASSES.items():
|
||||
default_registry.register(code, task_cls)
|
||||
|
||||
663
etl_billiards/schema_v2.sql
Normal file
663
etl_billiards/schema_v2.sql
Normal file
@@ -0,0 +1,663 @@
|
||||
-- -*- coding: utf-8 -*-
|
||||
-- Feiqiu-ETL schema (JSON-first alignment)
|
||||
-- Updated: 2025-11-19 (Refactored for Manual Import & AI-Friendly DWS)
|
||||
|
||||
CREATE SCHEMA IF NOT EXISTS billiards;
|
||||
CREATE SCHEMA IF NOT EXISTS billiards_ods;
|
||||
CREATE SCHEMA IF NOT EXISTS billiards_dws;
|
||||
CREATE SCHEMA IF NOT EXISTS etl_admin;
|
||||
|
||||
COMMENT ON SCHEMA billiards IS '门店业务数据 Schema,存放维度/事实层(与 JSON 字段对应)';
|
||||
COMMENT ON SCHEMA billiards_ods IS '原始数据层 (ODS),存放原始 JSON,支持 API 抓取和手工导入';
|
||||
COMMENT ON SCHEMA billiards_dws IS '数据汇总层 (DWS),存放面向 AI 分析的宽表视图';
|
||||
COMMENT ON SCHEMA etl_admin IS 'ETL 调度、游标与运行记录 Schema';
|
||||
|
||||
-- =========================
|
||||
-- 1. Billiards ODS tables
|
||||
-- =========================
|
||||
|
||||
-- 1.1 Order & Settlement ODS
|
||||
-- Corresponds to /order/list or manual export
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_order_settle (
|
||||
store_id bigint NOT NULL,
|
||||
order_settle_id bigint NOT NULL,
|
||||
order_trade_no bigint,
|
||||
page_no integer, -- Nullable for manual import
|
||||
page_size integer, -- Nullable for manual import
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, order_settle_id)
|
||||
);
|
||||
|
||||
COMMENT ON TABLE billiards_ods.ods_order_settle IS '订单/结算 ODS(/order/list、ticket 接口原始 JSON)';
|
||||
|
||||
-- 1.2 Ticket Detail ODS (NEW)
|
||||
-- Corresponds to "小票详情.json" - Contains full nested details
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_ticket_detail (
|
||||
store_id bigint NOT NULL,
|
||||
order_settle_id bigint NOT NULL,
|
||||
source_file varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, order_settle_id)
|
||||
);
|
||||
|
||||
COMMENT ON TABLE billiards_ods.ods_ticket_detail IS '小票详情 ODS(包含台费、商品、助教明细的完整 JSON)';
|
||||
|
||||
-- 1.3 Table Usage ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_table_use_detail (
|
||||
store_id bigint NOT NULL,
|
||||
ledger_id bigint NOT NULL,
|
||||
order_trade_no bigint,
|
||||
order_settle_id bigint,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, ledger_id)
|
||||
);
|
||||
|
||||
-- 1.4 Assistant Ledger ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_assistant_ledger (
|
||||
store_id bigint NOT NULL,
|
||||
ledger_id bigint NOT NULL,
|
||||
order_trade_no bigint,
|
||||
order_settle_id bigint,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, ledger_id)
|
||||
);
|
||||
|
||||
-- 1.5 Assistant Abolish ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_assistant_abolish (
|
||||
store_id bigint NOT NULL,
|
||||
abolish_id bigint NOT NULL,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, abolish_id)
|
||||
);
|
||||
|
||||
-- 1.6 Goods Ledger ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_goods_ledger (
|
||||
store_id bigint NOT NULL,
|
||||
order_goods_id bigint NOT NULL,
|
||||
order_trade_no bigint,
|
||||
order_settle_id bigint,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, order_goods_id)
|
||||
);
|
||||
|
||||
-- 1.7 Payment ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_payment (
|
||||
store_id bigint NOT NULL,
|
||||
pay_id bigint NOT NULL,
|
||||
relate_type varchar(50),
|
||||
relate_id bigint,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, pay_id)
|
||||
);
|
||||
|
||||
-- 1.8 Refund ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_refund (
|
||||
store_id bigint NOT NULL,
|
||||
refund_id bigint NOT NULL,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, refund_id)
|
||||
);
|
||||
|
||||
-- 1.9 Coupon Verify ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_coupon_verify (
|
||||
store_id bigint NOT NULL,
|
||||
coupon_id bigint NOT NULL,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, coupon_id)
|
||||
);
|
||||
|
||||
-- 1.10 Member ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_member (
|
||||
store_id bigint NOT NULL,
|
||||
member_id bigint NOT NULL,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, member_id)
|
||||
);
|
||||
|
||||
-- 1.11 Member Card ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_member_card (
|
||||
store_id bigint NOT NULL,
|
||||
card_id bigint NOT NULL,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, card_id)
|
||||
);
|
||||
|
||||
-- 1.12 Package Coupon ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_package_coupon (
|
||||
store_id bigint NOT NULL,
|
||||
package_id bigint NOT NULL,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, package_id)
|
||||
);
|
||||
|
||||
-- 1.13 Inventory Stock ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_inventory_stock (
|
||||
store_id bigint NOT NULL,
|
||||
site_goods_id bigint NOT NULL,
|
||||
snapshot_key varchar(100) NOT NULL DEFAULT 'default',
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, site_goods_id, snapshot_key)
|
||||
);
|
||||
|
||||
-- 1.14 Inventory Change ODS
|
||||
CREATE TABLE IF NOT EXISTS billiards_ods.ods_inventory_change (
|
||||
store_id bigint NOT NULL,
|
||||
change_id bigint NOT NULL,
|
||||
page_no integer,
|
||||
source_file varchar(255),
|
||||
source_endpoint varchar(255),
|
||||
fetched_at timestamptz NOT NULL DEFAULT now(),
|
||||
payload jsonb NOT NULL,
|
||||
PRIMARY KEY (store_id, change_id)
|
||||
);
|
||||
|
||||
-- =========================
|
||||
-- 2. Billiards Dimension Tables
|
||||
-- =========================
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.dim_store (
|
||||
store_id bigint PRIMARY KEY,
|
||||
store_name varchar(200),
|
||||
tenant_id bigint,
|
||||
region_code varchar(30),
|
||||
address varchar(500),
|
||||
contact_name varchar(100),
|
||||
contact_phone varchar(30),
|
||||
created_time timestamptz,
|
||||
updated_time timestamptz,
|
||||
remark text,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.dim_assistant (
|
||||
store_id bigint NOT NULL,
|
||||
assistant_id bigint NOT NULL,
|
||||
assistant_no varchar(64),
|
||||
nickname varchar(100),
|
||||
real_name varchar(100),
|
||||
gender varchar(20),
|
||||
mobile varchar(30),
|
||||
level varchar(50),
|
||||
team_id bigint,
|
||||
team_name varchar(100),
|
||||
assistant_status varchar(30),
|
||||
work_status varchar(30),
|
||||
entry_time timestamptz,
|
||||
resign_time timestamptz,
|
||||
start_time timestamptz,
|
||||
end_time timestamptz,
|
||||
create_time timestamptz,
|
||||
update_time timestamptz,
|
||||
system_role_id bigint,
|
||||
online_status varchar(30),
|
||||
allow_cx integer,
|
||||
charge_way varchar(30),
|
||||
pd_unit_price numeric(14,2),
|
||||
cx_unit_price numeric(14,2),
|
||||
is_guaranteed integer,
|
||||
is_team_leader integer,
|
||||
serial_number varchar(64),
|
||||
show_sort integer,
|
||||
is_delete integer,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, assistant_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.dim_member (
|
||||
store_id bigint NOT NULL,
|
||||
member_id bigint NOT NULL,
|
||||
member_name varchar(100),
|
||||
phone varchar(30),
|
||||
balance numeric(18,4),
|
||||
status varchar(30),
|
||||
register_time timestamptz,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, member_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.dim_package_coupon (
|
||||
store_id bigint NOT NULL,
|
||||
package_id bigint NOT NULL,
|
||||
package_code varchar(100),
|
||||
package_name varchar(200),
|
||||
table_area_id bigint,
|
||||
table_area_name varchar(100),
|
||||
selling_price numeric(14,2),
|
||||
duration_seconds integer,
|
||||
start_time timestamptz,
|
||||
end_time timestamptz,
|
||||
type varchar(50),
|
||||
is_enabled integer,
|
||||
is_delete integer,
|
||||
usable_count integer,
|
||||
creator_name varchar(100),
|
||||
date_type varchar(50),
|
||||
group_type varchar(50),
|
||||
coupon_money numeric(14,2),
|
||||
area_tag_type varchar(50),
|
||||
system_group_type varchar(50),
|
||||
card_type_ids text,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, package_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.dim_product (
|
||||
store_id bigint NOT NULL,
|
||||
product_id bigint NOT NULL,
|
||||
site_product_id bigint,
|
||||
product_name varchar(200) NOT NULL,
|
||||
category_id bigint,
|
||||
category_name varchar(100),
|
||||
second_category_id bigint,
|
||||
unit varchar(20),
|
||||
cost_price numeric(14,4),
|
||||
sale_price numeric(14,4),
|
||||
allow_discount boolean,
|
||||
status varchar(30),
|
||||
supplier_id bigint,
|
||||
barcode varchar(128),
|
||||
is_combo boolean,
|
||||
created_time timestamptz,
|
||||
updated_time timestamptz,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, product_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.dim_product_price_scd (
|
||||
product_scd_id bigserial PRIMARY KEY,
|
||||
store_id bigint NOT NULL,
|
||||
product_id bigint NOT NULL,
|
||||
product_name varchar(200),
|
||||
category_id bigint,
|
||||
category_name varchar(100),
|
||||
second_category_id bigint,
|
||||
cost_price numeric(14,4),
|
||||
sale_price numeric(14,4),
|
||||
allow_discount boolean,
|
||||
status varchar(30),
|
||||
valid_from timestamptz NOT NULL DEFAULT now(),
|
||||
valid_to timestamptz,
|
||||
is_current boolean NOT NULL DEFAULT true,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
CONSTRAINT fk_dpps_product FOREIGN KEY (store_id, product_id)
|
||||
REFERENCES billiards.dim_product(store_id, product_id) ON DELETE CASCADE,
|
||||
CONSTRAINT ck_dpps_range CHECK (
|
||||
valid_from < COALESCE(valid_to, '9999-12-31 00:00:00+00'::timestamptz)
|
||||
)
|
||||
);
|
||||
|
||||
-- Create partial unique index for current records only
|
||||
CREATE UNIQUE INDEX uq_dpps_current ON billiards.dim_product_price_scd (store_id, product_id) WHERE is_current;
|
||||
|
||||
CREATE VIEW billiards.dim_product_price_current AS
|
||||
SELECT product_scd_id,
|
||||
store_id,
|
||||
product_id,
|
||||
product_name,
|
||||
category_id,
|
||||
category_name,
|
||||
second_category_id,
|
||||
cost_price,
|
||||
sale_price,
|
||||
allow_discount,
|
||||
status,
|
||||
valid_from,
|
||||
valid_to,
|
||||
raw_data
|
||||
FROM billiards.dim_product_price_scd
|
||||
WHERE is_current;
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.dim_table (
|
||||
store_id bigint NOT NULL,
|
||||
table_id bigint NOT NULL,
|
||||
site_id bigint,
|
||||
area_id bigint,
|
||||
area_name varchar(100),
|
||||
table_name varchar(100) NOT NULL,
|
||||
table_price numeric(14,4),
|
||||
table_status varchar(30),
|
||||
table_status_name varchar(50),
|
||||
light_status integer,
|
||||
is_rest_area integer,
|
||||
show_status integer,
|
||||
virtual_table integer,
|
||||
charge_free integer,
|
||||
only_allow_groupon integer,
|
||||
is_online_reservation integer,
|
||||
created_time timestamptz,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, table_id)
|
||||
);
|
||||
|
||||
-- =========================
|
||||
-- 3. Billiards Fact Tables
|
||||
-- =========================
|
||||
|
||||
-- 3.1 Order Fact (Header)
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_order (
|
||||
store_id bigint NOT NULL,
|
||||
order_settle_id bigint NOT NULL, -- Settle ID is the main key for payment
|
||||
order_trade_no bigint, -- Can be one of many if merged, but usually 1-1 main
|
||||
order_no varchar(100),
|
||||
member_id bigint,
|
||||
pay_time timestamptz,
|
||||
total_amount numeric(14,4), -- Original price
|
||||
pay_amount numeric(14,4), -- Actual paid
|
||||
discount_amount numeric(14,4),
|
||||
coupon_amount numeric(14,4),
|
||||
status varchar(50),
|
||||
cashier_name varchar(100),
|
||||
remark text,
|
||||
raw_data jsonb,
|
||||
created_at timestamptz DEFAULT now(),
|
||||
PRIMARY KEY (store_id, order_settle_id)
|
||||
);
|
||||
|
||||
-- 3.2 Order Items (Goods)
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_order_goods (
|
||||
store_id bigint NOT NULL,
|
||||
order_goods_id bigint NOT NULL, -- orderGoodsLedgerId
|
||||
order_settle_id bigint NOT NULL,
|
||||
order_trade_no bigint,
|
||||
goods_id bigint,
|
||||
goods_name varchar(200),
|
||||
quantity numeric(10,2),
|
||||
unit_price numeric(14,4),
|
||||
total_amount numeric(14,4), -- quantity * price
|
||||
pay_amount numeric(14,4), -- After discount
|
||||
created_at timestamptz DEFAULT now(),
|
||||
PRIMARY KEY (store_id, order_goods_id)
|
||||
);
|
||||
|
||||
-- 3.3 Table Usage Fact
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_table_usage (
|
||||
store_id bigint NOT NULL,
|
||||
order_ledger_id bigint NOT NULL, -- orderTableLedgerId
|
||||
order_settle_id bigint NOT NULL,
|
||||
table_id bigint,
|
||||
table_name varchar(100),
|
||||
start_time timestamptz,
|
||||
end_time timestamptz,
|
||||
duration_minutes integer,
|
||||
total_amount numeric(14,4),
|
||||
pay_amount numeric(14,4),
|
||||
created_at timestamptz DEFAULT now(),
|
||||
PRIMARY KEY (store_id, order_ledger_id)
|
||||
);
|
||||
|
||||
-- 3.4 Assistant Service Fact
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_assistant_service (
|
||||
store_id bigint NOT NULL,
|
||||
ledger_id bigint NOT NULL, -- orderAssistantLedgerId
|
||||
order_settle_id bigint NOT NULL,
|
||||
assistant_id bigint,
|
||||
assistant_name varchar(100),
|
||||
service_type varchar(50), -- e.g., "Play with"
|
||||
start_time timestamptz,
|
||||
end_time timestamptz,
|
||||
duration_minutes integer,
|
||||
total_amount numeric(14,4),
|
||||
pay_amount numeric(14,4),
|
||||
created_at timestamptz DEFAULT now(),
|
||||
PRIMARY KEY (store_id, ledger_id)
|
||||
);
|
||||
|
||||
-- 3.5 Payment Fact
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_payment (
|
||||
store_id bigint NOT NULL,
|
||||
pay_id bigint NOT NULL,
|
||||
site_id bigint,
|
||||
tenant_id bigint,
|
||||
order_settle_id bigint,
|
||||
order_trade_no bigint,
|
||||
relate_type varchar(50),
|
||||
relate_id bigint,
|
||||
create_time timestamptz,
|
||||
pay_time timestamptz,
|
||||
pay_amount numeric(14,4),
|
||||
fee_amount numeric(14,4),
|
||||
discount_amount numeric(14,4),
|
||||
payment_method varchar(50), -- e.g., 'WeChat', 'Cash', 'Balance'
|
||||
online_pay_channel varchar(50),
|
||||
pay_terminal varchar(30),
|
||||
pay_status varchar(30),
|
||||
raw_data jsonb,
|
||||
updated_at timestamptz DEFAULT now(),
|
||||
PRIMARY KEY (store_id, pay_id)
|
||||
);
|
||||
|
||||
-- 3.6 Legacy/Other Facts (Preserved)
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_assistant_abolish (
|
||||
store_id bigint NOT NULL,
|
||||
abolish_id bigint NOT NULL,
|
||||
table_id bigint,
|
||||
table_name varchar(100),
|
||||
table_area_id bigint,
|
||||
table_area varchar(100),
|
||||
assistant_no varchar(64),
|
||||
assistant_name varchar(100),
|
||||
charge_minutes integer,
|
||||
abolish_amount numeric(14,4),
|
||||
create_time timestamptz,
|
||||
trash_reason text,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, abolish_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_assistant_ledger (
|
||||
store_id bigint NOT NULL,
|
||||
ledger_id bigint NOT NULL,
|
||||
assistant_no varchar(64),
|
||||
assistant_name varchar(100),
|
||||
nickname varchar(100),
|
||||
level_name varchar(50),
|
||||
table_name varchar(100),
|
||||
ledger_unit_price numeric(14,4),
|
||||
ledger_count numeric(14,4),
|
||||
ledger_amount numeric(14,4),
|
||||
projected_income numeric(14,4),
|
||||
service_money numeric(14,4),
|
||||
member_discount_amount numeric(14,4),
|
||||
manual_discount_amount numeric(14,4),
|
||||
coupon_deduct_money numeric(14,4),
|
||||
order_trade_no bigint,
|
||||
order_settle_id bigint,
|
||||
operator_id bigint,
|
||||
operator_name varchar(100),
|
||||
assistant_team_id bigint,
|
||||
assistant_level varchar(50),
|
||||
site_table_id bigint,
|
||||
order_assistant_id bigint,
|
||||
site_assistant_id bigint,
|
||||
user_id bigint,
|
||||
ledger_start_time timestamptz,
|
||||
ledger_end_time timestamptz,
|
||||
start_use_time timestamptz,
|
||||
last_use_time timestamptz,
|
||||
income_seconds integer,
|
||||
real_use_seconds integer,
|
||||
is_trash integer,
|
||||
trash_reason text,
|
||||
is_confirm integer,
|
||||
ledger_status varchar(30),
|
||||
create_time timestamptz,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, ledger_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_inventory_change (
|
||||
store_id bigint NOT NULL,
|
||||
change_id bigint NOT NULL,
|
||||
site_goods_id bigint,
|
||||
stock_type varchar(50),
|
||||
goods_name varchar(200),
|
||||
change_time timestamptz,
|
||||
start_qty numeric(18,4),
|
||||
end_qty numeric(18,4),
|
||||
change_qty numeric(18,4),
|
||||
unit varchar(20),
|
||||
price numeric(14,4),
|
||||
operator_name varchar(100),
|
||||
remark text,
|
||||
goods_category_id bigint,
|
||||
goods_second_category_id bigint,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, change_id)
|
||||
);
|
||||
|
||||
CREATE TABLE IF NOT EXISTS billiards.fact_refund (
|
||||
store_id bigint NOT NULL,
|
||||
refund_id bigint NOT NULL,
|
||||
site_id bigint,
|
||||
tenant_id bigint,
|
||||
pay_amount numeric(14,4),
|
||||
pay_status varchar(30),
|
||||
pay_time timestamptz,
|
||||
create_time timestamptz,
|
||||
relate_type varchar(50),
|
||||
relate_id bigint,
|
||||
payment_method varchar(50),
|
||||
refund_amount numeric(14,4),
|
||||
refund_reason text,
|
||||
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
|
||||
updated_at timestamptz NOT NULL DEFAULT now(),
|
||||
PRIMARY KEY (store_id, refund_id)
|
||||
);
|
||||
|
||||
-- =========================
|
||||
-- 4. DWS Layer (Data Warehouse Summary)
|
||||
-- Views for Analysis & AI
|
||||
-- =========================
|
||||
|
||||
-- 4.1 Sales Detail View (The "AI-Friendly" Wide Table)
|
||||
-- Unifies Goods, Table Fees, and Assistant Services into a single stream of "Sales Items"
|
||||
CREATE OR REPLACE VIEW billiards_dws.dws_sales_detail AS
|
||||
SELECT
|
||||
'GOODS' as item_type,
|
||||
g.store_id,
|
||||
g.order_settle_id,
|
||||
o.pay_time,
|
||||
g.goods_name as item_name,
|
||||
g.quantity,
|
||||
g.pay_amount as amount,
|
||||
o.cashier_name
|
||||
FROM billiards.fact_order_goods g
|
||||
JOIN billiards.fact_order o ON g.store_id = o.store_id AND g.order_settle_id = o.order_settle_id
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT
|
||||
'TABLE' as item_type,
|
||||
t.store_id,
|
||||
t.order_settle_id,
|
||||
o.pay_time,
|
||||
t.table_name || ' (' || t.duration_minutes || ' mins)' as item_name,
|
||||
1 as quantity,
|
||||
t.pay_amount as amount,
|
||||
o.cashier_name
|
||||
FROM billiards.fact_table_usage t
|
||||
JOIN billiards.fact_order o ON t.store_id = o.store_id AND t.order_settle_id = o.order_settle_id
|
||||
|
||||
UNION ALL
|
||||
|
||||
SELECT
|
||||
'ASSISTANT' as item_type,
|
||||
a.store_id,
|
||||
a.order_settle_id,
|
||||
o.pay_time,
|
||||
a.assistant_name || ' (' || a.duration_minutes || ' mins)' as item_name,
|
||||
1 as quantity,
|
||||
a.pay_amount as amount,
|
||||
o.cashier_name
|
||||
FROM billiards.fact_assistant_service a
|
||||
JOIN billiards.fact_order o ON a.store_id = o.store_id AND a.order_settle_id = o.order_settle_id;
|
||||
|
||||
-- 4.2 Daily Revenue View
|
||||
CREATE OR REPLACE VIEW billiards_dws.dws_daily_revenue AS
|
||||
SELECT
|
||||
store_id,
|
||||
DATE(pay_time) as report_date,
|
||||
COUNT(DISTINCT order_settle_id) as total_orders,
|
||||
SUM(amount) as total_revenue,
|
||||
SUM(amount) FILTER (WHERE item_type = 'GOODS') as goods_revenue,
|
||||
SUM(amount) FILTER (WHERE item_type = 'TABLE') as table_revenue,
|
||||
SUM(amount) FILTER (WHERE item_type = 'ASSISTANT') as assistant_revenue
|
||||
FROM billiards_dws.dws_sales_detail
|
||||
GROUP BY store_id, DATE(pay_time);
|
||||
|
||||
-- 4.3 Order Detail Wide View (For detailed inspection)
|
||||
CREATE OR REPLACE VIEW billiards_dws.dws_order_detail AS
|
||||
SELECT
|
||||
o.store_id,
|
||||
o.order_settle_id,
|
||||
o.order_no,
|
||||
o.pay_time,
|
||||
o.total_amount,
|
||||
o.pay_amount,
|
||||
o.cashier_name,
|
||||
-- Payment pivot (approximate, assumes simple mapping)
|
||||
COALESCE(SUM(p.pay_amount) FILTER (WHERE p.payment_method = '1'), 0) AS pay_cash,
|
||||
COALESCE(SUM(p.pay_amount) FILTER (WHERE p.payment_method = '2'), 0) AS pay_balance,
|
||||
COALESCE(SUM(p.pay_amount) FILTER (WHERE p.payment_method = '4'), 0) AS pay_wechat,
|
||||
-- Content summary
|
||||
(SELECT string_agg(goods_name || 'x' || quantity, '; ') FROM billiards.fact_order_goods WHERE order_settle_id = o.order_settle_id) AS goods_summary,
|
||||
(SELECT string_agg(assistant_name, '; ') FROM billiards.fact_assistant_service WHERE order_settle_id = o.order_settle_id) AS assistant_summary
|
||||
FROM billiards.fact_order o
|
||||
LEFT JOIN billiards.fact_payment p ON o.order_settle_id = p.order_settle_id
|
||||
GROUP BY o.store_id, o.order_settle_id, o.order_no, o.pay_time, o.total_amount, o.pay_amount, o.cashier_name;
|
||||
63
etl_billiards/scripts/test_db_connection.py
Normal file
63
etl_billiards/scripts/test_db_connection.py
Normal file
@@ -0,0 +1,63 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Quick utility for validating PostgreSQL connectivity."""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import sys
|
||||
|
||||
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
if PROJECT_ROOT not in sys.path:
|
||||
sys.path.insert(0, PROJECT_ROOT)
|
||||
|
||||
from database.connection import DatabaseConnection
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="PostgreSQL connectivity smoke test")
|
||||
parser.add_argument("--dsn", help="Override TEST_DB_DSN / env value")
|
||||
parser.add_argument(
|
||||
"--query",
|
||||
default="SELECT 1 AS ok",
|
||||
help="Custom SQL to run after connection (default: SELECT 1 AS ok)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--timeout",
|
||||
type=int,
|
||||
default=5,
|
||||
help="connect_timeout seconds passed to psycopg2 (default: 5)",
|
||||
)
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
dsn = args.dsn or os.environ.get("TEST_DB_DSN")
|
||||
if not dsn:
|
||||
print("❌ 未提供 DSN,请通过 --dsn 或 TEST_DB_DSN 指定连接串", file=sys.stderr)
|
||||
return 2
|
||||
|
||||
print(f"尝试连接: {dsn}")
|
||||
try:
|
||||
conn = DatabaseConnection(dsn, connect_timeout=args.timeout)
|
||||
except Exception as exc: # pragma: no cover - diagnostic output
|
||||
print("❌ 连接失败:", exc, file=sys.stderr)
|
||||
return 1
|
||||
|
||||
try:
|
||||
result = conn.query(args.query)
|
||||
print("✅ 连接成功,查询结果:")
|
||||
for row in result:
|
||||
print(row)
|
||||
conn.close()
|
||||
return 0
|
||||
except Exception as exc: # pragma: no cover - diagnostic output
|
||||
print("⚠️ 连接成功但执行查询失败:", exc, file=sys.stderr)
|
||||
try:
|
||||
conn.close()
|
||||
finally:
|
||||
return 3
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -56,6 +56,7 @@ from typing import List
|
||||
RUN_TESTS_SCRIPT = os.path.join(os.path.dirname(__file__), "run_tests.py")
|
||||
|
||||
# 默认自动运行的预置(可自定义顺序)
|
||||
|
||||
AUTO_RUN_PRESETS = ["offline_realdb"]
|
||||
|
||||
PRESETS = {
|
||||
@@ -66,6 +67,15 @@ PRESETS = {
|
||||
"pytest_args": "-vv",
|
||||
"preset_meta": "在线模式,仅跑订单任务并输出详细日志",
|
||||
},
|
||||
|
||||
"dbrun": {
|
||||
"suite": ["integration"],
|
||||
# "mode": "OFFLINE",
|
||||
# "keyword": "ORDERS",
|
||||
# "pytest_args": "-vv",
|
||||
"preset_meta": "在线模式,仅跑订单任务并输出详细日志",
|
||||
},
|
||||
|
||||
"offline_realdb": {
|
||||
"suite": ["offline"],
|
||||
"mode": "OFFLINE",
|
||||
|
||||
79
etl_billiards/tasks/base_dwd_task.py
Normal file
79
etl_billiards/tasks/base_dwd_task.py
Normal file
@@ -0,0 +1,79 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""DWD任务基类"""
|
||||
import json
|
||||
from typing import Any, Dict, Iterator, List, Optional, Tuple
|
||||
from datetime import datetime
|
||||
|
||||
from .base_task import BaseTask
|
||||
from models.parsers import TypeParser
|
||||
|
||||
class BaseDwdTask(BaseTask):
|
||||
"""
|
||||
DWD 层任务基类
|
||||
负责从 ODS 表读取数据,供子类清洗和写入事实/维度表
|
||||
"""
|
||||
|
||||
def _get_ods_cursor(self, task_code: str) -> datetime:
|
||||
"""
|
||||
获取上次处理的 ODS 数据的时间点 (fetched_at)
|
||||
这里简化处理,实际应该从 etl_cursor 表读取
|
||||
目前先依赖 BaseTask 的时间窗口逻辑,或者子类自己管理
|
||||
"""
|
||||
# TODO: 对接真正的 CursorManager
|
||||
# 暂时返回一个较早的时间,或者由子类通过 _get_time_window 获取
|
||||
return None
|
||||
|
||||
def iter_ods_rows(
|
||||
self,
|
||||
table_name: str,
|
||||
columns: List[str],
|
||||
start_time: datetime,
|
||||
end_time: datetime,
|
||||
time_col: str = "fetched_at",
|
||||
batch_size: int = 1000
|
||||
) -> Iterator[List[Dict[str, Any]]]:
|
||||
"""
|
||||
分批迭代读取 ODS 表数据
|
||||
|
||||
Args:
|
||||
table_name: ODS 表名
|
||||
columns: 需要查询的字段列表 (必须包含 payload)
|
||||
start_time: 开始时间 (包含)
|
||||
end_time: 结束时间 (包含)
|
||||
time_col: 时间过滤字段,默认 fetched_at
|
||||
batch_size: 批次大小
|
||||
"""
|
||||
offset = 0
|
||||
cols_str = ", ".join(columns)
|
||||
|
||||
while True:
|
||||
sql = f"""
|
||||
SELECT {cols_str}
|
||||
FROM {table_name}
|
||||
WHERE {time_col} >= %s AND {time_col} <= %s
|
||||
ORDER BY {time_col} ASC
|
||||
LIMIT %s OFFSET %s
|
||||
"""
|
||||
|
||||
rows = self.db.fetch_all(sql, (start_time, end_time, batch_size, offset))
|
||||
|
||||
if not rows:
|
||||
break
|
||||
|
||||
yield [dict(row) for row in rows]
|
||||
|
||||
if len(rows) < batch_size:
|
||||
break
|
||||
|
||||
offset += batch_size
|
||||
|
||||
def parse_payload(self, row: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""
|
||||
解析 ODS 行中的 payload JSON
|
||||
"""
|
||||
payload = row.get("payload")
|
||||
if isinstance(payload, str):
|
||||
return json.loads(payload)
|
||||
elif isinstance(payload, dict):
|
||||
return payload
|
||||
return {}
|
||||
176
etl_billiards/tasks/manual_ingest_task.py
Normal file
176
etl_billiards/tasks/manual_ingest_task.py
Normal file
@@ -0,0 +1,176 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
import os
|
||||
import json
|
||||
from datetime import datetime
|
||||
from .base_task import BaseTask
|
||||
from loaders.ods.generic import GenericODSLoader
|
||||
|
||||
class ManualIngestTask(BaseTask):
|
||||
"""
|
||||
Task to ingest manually fetched JSON files from a directory into ODS tables.
|
||||
"""
|
||||
|
||||
FILE_MAPPING = {
|
||||
"小票详情": "billiards_ods.ods_ticket_detail",
|
||||
"结账记录": "billiards_ods.ods_order_settle",
|
||||
"支付记录": "billiards_ods.ods_payment",
|
||||
"助教流水": "billiards_ods.ods_assistant_ledger",
|
||||
"助教废除": "billiards_ods.ods_assistant_abolish",
|
||||
"商品档案": "billiards_ods.ods_goods_ledger", # Note: This might be dim_product source, but mapping to ledger for now if it's sales
|
||||
"库存变化": "billiards_ods.ods_inventory_change",
|
||||
"会员档案": "billiards_ods.ods_member",
|
||||
"充值记录": "billiards_ods.ods_member_card", # Approx
|
||||
"团购套餐": "billiards_ods.ods_package_coupon",
|
||||
"库存汇总": "billiards_ods.ods_inventory_stock"
|
||||
}
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "MANUAL_INGEST"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("Starting Manual Ingest Task")
|
||||
|
||||
# Configurable directory, default to tests/testdata_json for now
|
||||
data_dir = self.config.get("manual.data_dir", r"c:\dev\LLTQ\ETL\feiqiu-ETL\etl_billiards\tests\testdata_json")
|
||||
|
||||
if not os.path.exists(data_dir):
|
||||
self.logger.error(f"Data directory not found: {data_dir}")
|
||||
return {"status": "error", "message": "Directory not found"}
|
||||
|
||||
total_files = 0
|
||||
total_rows = 0
|
||||
|
||||
for filename in os.listdir(data_dir):
|
||||
if not filename.endswith(".json"):
|
||||
continue
|
||||
|
||||
# Determine target table
|
||||
target_table = None
|
||||
for key, table in self.FILE_MAPPING.items():
|
||||
if key in filename:
|
||||
target_table = table
|
||||
break
|
||||
|
||||
if not target_table:
|
||||
self.logger.warning(f"No mapping found for file: {filename}, skipping.")
|
||||
continue
|
||||
|
||||
self.logger.info(f"Ingesting {filename} into {target_table}")
|
||||
|
||||
try:
|
||||
with open(os.path.join(data_dir, filename), 'r', encoding='utf-8') as f:
|
||||
data = json.load(f)
|
||||
|
||||
if not isinstance(data, list):
|
||||
data = [data]
|
||||
|
||||
# Prepare rows for GenericODSLoader
|
||||
# We need to adapt the data to what GenericODSLoader expects (or update it)
|
||||
# GenericODSLoader expects dicts. It handles normalization.
|
||||
# But we need to ensure the primary keys are present in the payload or extracted.
|
||||
# The GenericODSLoader might need configuration for PK extraction if it's not standard.
|
||||
# For now, let's assume the payload IS the row, and we wrap it.
|
||||
|
||||
# Actually, GenericODSLoader.upsert_rows expects the raw API result list.
|
||||
# It calls _normalize_row.
|
||||
# We need to make sure _normalize_row works for these files.
|
||||
# Most files have 'id' or similar.
|
||||
|
||||
# Let's instantiate a loader for this table
|
||||
# We need to know the PK for the table.
|
||||
# This is usually defined in ODS_TASK_CLASSES but here we are dynamic.
|
||||
# We might need a simpler loader or reuse GenericODSLoader with specific PK config.
|
||||
|
||||
# For simplicity, let's use a custom ingestion here that mimics GenericODSLoader but is file-aware.
|
||||
rows_to_insert = []
|
||||
for item in data:
|
||||
# Extract Store ID (usually in siteProfile or data root)
|
||||
store_id = self._extract_store_id(item) or self.config.get("app.store_id")
|
||||
|
||||
# Extract PK (id, orderSettleId, etc.)
|
||||
pk_val = self._extract_pk(item, target_table)
|
||||
|
||||
if not pk_val:
|
||||
# Try to find 'id' in the item
|
||||
pk_val = item.get("id")
|
||||
|
||||
if not pk_val:
|
||||
# Special case for Ticket Detail
|
||||
if "ods_ticket_detail" in target_table:
|
||||
pk_val = item.get("orderSettleId")
|
||||
|
||||
if not pk_val:
|
||||
continue
|
||||
|
||||
row = {
|
||||
"store_id": store_id,
|
||||
"payload": json.dumps(item, ensure_ascii=False),
|
||||
"source_file": filename,
|
||||
"fetched_at": datetime.now()
|
||||
}
|
||||
|
||||
# Add specific PK column
|
||||
pk_col = self._get_pk_column(target_table)
|
||||
row[pk_col] = pk_val
|
||||
|
||||
rows_to_insert.append(row)
|
||||
|
||||
if rows_to_insert:
|
||||
self._bulk_insert(target_table, rows_to_insert)
|
||||
total_rows += len(rows_to_insert)
|
||||
total_files += 1
|
||||
|
||||
except Exception as e:
|
||||
self.logger.error(f"Error processing {filename}: {e}", exc_info=True)
|
||||
|
||||
return {"status": "success", "files_processed": total_files, "rows_inserted": total_rows}
|
||||
|
||||
def _extract_store_id(self, item):
|
||||
# Try common paths
|
||||
if "store_id" in item: return item["store_id"]
|
||||
if "siteProfile" in item and "id" in item["siteProfile"]: return item["siteProfile"]["id"]
|
||||
if "data" in item and "data" in item["data"] and "siteId" in item["data"]["data"]: return item["data"]["data"]["siteId"]
|
||||
return None
|
||||
|
||||
def _extract_pk(self, item, table):
|
||||
# Helper to find PK based on table
|
||||
if "ods_order_settle" in table:
|
||||
# Check for nested structure in some files
|
||||
if "settleList" in item and "settleList" in item["settleList"]:
|
||||
return item["settleList"]["settleList"].get("id")
|
||||
return item.get("id")
|
||||
return item.get("id")
|
||||
|
||||
def _get_pk_column(self, table):
|
||||
if "ods_ticket_detail" in table: return "order_settle_id"
|
||||
if "ods_order_settle" in table: return "order_settle_id"
|
||||
if "ods_payment" in table: return "pay_id"
|
||||
if "ods_member" in table: return "member_id"
|
||||
if "ods_assistant_ledger" in table: return "ledger_id"
|
||||
if "ods_goods_ledger" in table: return "order_goods_id"
|
||||
if "ods_inventory_change" in table: return "change_id"
|
||||
if "ods_assistant_abolish" in table: return "abolish_id"
|
||||
if "ods_coupon_verify" in table: return "coupon_id"
|
||||
if "ods_member_card" in table: return "card_id"
|
||||
if "ods_package_coupon" in table: return "package_id"
|
||||
return "id" # Fallback
|
||||
|
||||
def _bulk_insert(self, table, rows):
|
||||
if not rows: return
|
||||
|
||||
keys = list(rows[0].keys())
|
||||
cols = ", ".join(keys)
|
||||
vals = ", ".join([f"%({k})s" for k in keys])
|
||||
|
||||
# Determine PK col for conflict
|
||||
pk_col = self._get_pk_column(table)
|
||||
|
||||
sql = f"""
|
||||
INSERT INTO {table} ({cols})
|
||||
VALUES ({vals})
|
||||
ON CONFLICT (store_id, {pk_col}) DO UPDATE SET
|
||||
payload = EXCLUDED.payload,
|
||||
fetched_at = EXCLUDED.fetched_at,
|
||||
source_file = EXCLUDED.source_file;
|
||||
"""
|
||||
self.db.batch_execute(sql, rows)
|
||||
89
etl_billiards/tasks/members_dwd_task.py
Normal file
89
etl_billiards/tasks/members_dwd_task.py
Normal file
@@ -0,0 +1,89 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from .base_dwd_task import BaseDwdTask
|
||||
from loaders.dimensions.member import MemberLoader
|
||||
from models.parsers import TypeParser
|
||||
import json
|
||||
|
||||
class MembersDwdTask(BaseDwdTask):
|
||||
"""
|
||||
DWD Task: Process Member Records from ODS to Dimension Table
|
||||
Source: billiards_ods.ods_member
|
||||
Target: billiards.dim_member
|
||||
"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "MEMBERS_DWD"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info(f"Starting {self.get_task_code()} task")
|
||||
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
self.logger.info(f"Processing window: {window_start} to {window_end}")
|
||||
|
||||
loader = MemberLoader(self.db)
|
||||
store_id = self.config.get("app.store_id")
|
||||
|
||||
total_inserted = 0
|
||||
total_updated = 0
|
||||
total_errors = 0
|
||||
|
||||
# Iterate ODS Data
|
||||
batches = self.iter_ods_rows(
|
||||
table_name="billiards_ods.ods_member",
|
||||
columns=["store_id", "member_id", "payload", "fetched_at"],
|
||||
start_time=window_start,
|
||||
end_time=window_end
|
||||
)
|
||||
|
||||
for batch in batches:
|
||||
if not batch:
|
||||
continue
|
||||
|
||||
parsed_rows = []
|
||||
for row in batch:
|
||||
payload = self.parse_payload(row)
|
||||
if not payload:
|
||||
continue
|
||||
|
||||
parsed = self._parse_member(payload, store_id)
|
||||
if parsed:
|
||||
parsed_rows.append(parsed)
|
||||
|
||||
if parsed_rows:
|
||||
inserted, updated, skipped = loader.upsert_members(parsed_rows, store_id)
|
||||
total_inserted += inserted
|
||||
total_updated += updated
|
||||
|
||||
self.db.commit()
|
||||
|
||||
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Updated: {total_updated}")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"inserted": total_inserted,
|
||||
"updated": total_updated,
|
||||
"window_start": window_start.isoformat(),
|
||||
"window_end": window_end.isoformat()
|
||||
}
|
||||
|
||||
def _parse_member(self, raw: dict, store_id: int) -> dict:
|
||||
"""Parse ODS payload into Dim structure"""
|
||||
try:
|
||||
# Handle both API structure (camelCase) and manual structure
|
||||
member_id = raw.get("id") or raw.get("memberId")
|
||||
if not member_id:
|
||||
return None
|
||||
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"member_id": member_id,
|
||||
"member_name": raw.get("name") or raw.get("memberName"),
|
||||
"phone": raw.get("phone") or raw.get("mobile"),
|
||||
"balance": raw.get("balance", 0),
|
||||
"status": str(raw.get("status", "NORMAL")),
|
||||
"register_time": raw.get("createTime") or raw.get("registerTime"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False)
|
||||
}
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Error parsing member: {e}")
|
||||
return None
|
||||
400
etl_billiards/tasks/ods_tasks.py
Normal file
400
etl_billiards/tasks/ods_tasks.py
Normal file
@@ -0,0 +1,400 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""ODS ingestion tasks."""
|
||||
from __future__ import annotations
|
||||
|
||||
from dataclasses import dataclass, field
|
||||
from datetime import datetime
|
||||
from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type
|
||||
|
||||
from loaders.ods import GenericODSLoader
|
||||
from models.parsers import TypeParser
|
||||
from .base_task import BaseTask
|
||||
|
||||
|
||||
ColumnTransform = Callable[[Any], Any]
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class ColumnSpec:
|
||||
"""Mapping between DB column and source JSON field."""
|
||||
|
||||
column: str
|
||||
sources: Tuple[str, ...] = ()
|
||||
required: bool = False
|
||||
default: Any = None
|
||||
transform: ColumnTransform | None = None
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class OdsTaskSpec:
|
||||
"""Definition of a single ODS ingestion task."""
|
||||
|
||||
code: str
|
||||
class_name: str
|
||||
table_name: str
|
||||
endpoint: str
|
||||
data_path: Tuple[str, ...] = ("data",)
|
||||
list_key: str | None = None
|
||||
pk_columns: Tuple[ColumnSpec, ...] = ()
|
||||
extra_columns: Tuple[ColumnSpec, ...] = ()
|
||||
include_page_size: bool = False
|
||||
include_page_no: bool = True
|
||||
include_source_file: bool = True
|
||||
include_source_endpoint: bool = True
|
||||
requires_window: bool = True
|
||||
description: str = ""
|
||||
extra_params: Dict[str, Any] = field(default_factory=dict)
|
||||
|
||||
|
||||
class BaseOdsTask(BaseTask):
|
||||
"""Shared functionality for ODS ingestion tasks."""
|
||||
|
||||
SPEC: OdsTaskSpec
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return self.SPEC.code
|
||||
|
||||
def execute(self) -> dict:
|
||||
spec = self.SPEC
|
||||
self.logger.info("开始执行 %s (ODS)", spec.code)
|
||||
|
||||
store_id = TypeParser.parse_int(self.config.get("app.store_id"))
|
||||
if not store_id:
|
||||
raise ValueError("app.store_id 未配置,无法执行 ODS 任务")
|
||||
|
||||
page_size = self.config.get("api.page_size", 200)
|
||||
params = self._build_params(spec, store_id)
|
||||
columns = self._resolve_columns(spec)
|
||||
conflict_columns = ["store_id"] + [col.column for col in spec.pk_columns]
|
||||
loader = GenericODSLoader(
|
||||
self.db,
|
||||
spec.table_name,
|
||||
columns,
|
||||
conflict_columns,
|
||||
)
|
||||
|
||||
counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
|
||||
source_file = self._resolve_source_file_hint(spec)
|
||||
|
||||
try:
|
||||
for page_no, page_records, _, _ in self.api.iter_paginated(
|
||||
endpoint=spec.endpoint,
|
||||
params=params,
|
||||
page_size=page_size,
|
||||
data_path=spec.data_path,
|
||||
list_key=spec.list_key,
|
||||
):
|
||||
rows: List[dict] = []
|
||||
for raw in page_records:
|
||||
row = self._build_row(
|
||||
spec=spec,
|
||||
store_id=store_id,
|
||||
record=raw,
|
||||
page_no=page_no if spec.include_page_no else None,
|
||||
page_size_value=len(page_records)
|
||||
if spec.include_page_size
|
||||
else None,
|
||||
source_file=source_file,
|
||||
)
|
||||
if row is None:
|
||||
counts["skipped"] += 1
|
||||
continue
|
||||
rows.append(row)
|
||||
|
||||
inserted, updated, _ = loader.upsert_rows(rows)
|
||||
counts["inserted"] += inserted
|
||||
counts["updated"] += updated
|
||||
counts["fetched"] += len(page_records)
|
||||
|
||||
self.db.commit()
|
||||
self.logger.info("%s ODS 任务完成: %s", spec.code, counts)
|
||||
return self._build_result("SUCCESS", counts)
|
||||
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
counts["errors"] += 1
|
||||
self.logger.error("%s ODS 任务失败", spec.code, exc_info=True)
|
||||
raise
|
||||
|
||||
def _build_params(self, spec: OdsTaskSpec, store_id: int) -> dict:
|
||||
params: dict[str, Any] = {"storeId": store_id}
|
||||
params.update(spec.extra_params)
|
||||
if spec.requires_window:
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params["startTime"] = TypeParser.format_timestamp(window_start, self.tz)
|
||||
params["endTime"] = TypeParser.format_timestamp(window_end, self.tz)
|
||||
return params
|
||||
|
||||
def _resolve_columns(self, spec: OdsTaskSpec) -> List[str]:
|
||||
columns: List[str] = ["store_id"]
|
||||
seen = set(columns)
|
||||
for col_spec in list(spec.pk_columns) + list(spec.extra_columns):
|
||||
if col_spec.column not in seen:
|
||||
columns.append(col_spec.column)
|
||||
seen.add(col_spec.column)
|
||||
|
||||
if spec.include_page_no and "page_no" not in seen:
|
||||
columns.append("page_no")
|
||||
seen.add("page_no")
|
||||
|
||||
if spec.include_page_size and "page_size" not in seen:
|
||||
columns.append("page_size")
|
||||
seen.add("page_size")
|
||||
|
||||
if spec.include_source_file and "source_file" not in seen:
|
||||
columns.append("source_file")
|
||||
seen.add("source_file")
|
||||
|
||||
if spec.include_source_endpoint and "source_endpoint" not in seen:
|
||||
columns.append("source_endpoint")
|
||||
seen.add("source_endpoint")
|
||||
|
||||
if "fetched_at" not in seen:
|
||||
columns.append("fetched_at")
|
||||
seen.add("fetched_at")
|
||||
if "payload" not in seen:
|
||||
columns.append("payload")
|
||||
|
||||
return columns
|
||||
|
||||
def _build_row(
|
||||
self,
|
||||
spec: OdsTaskSpec,
|
||||
store_id: int,
|
||||
record: dict,
|
||||
page_no: int | None,
|
||||
page_size_value: int | None,
|
||||
source_file: str | None,
|
||||
) -> dict | None:
|
||||
row: dict[str, Any] = {"store_id": store_id}
|
||||
|
||||
for col_spec in spec.pk_columns + spec.extra_columns:
|
||||
value = self._extract_value(record, col_spec)
|
||||
if value is None and col_spec.required:
|
||||
self.logger.warning(
|
||||
"%s 缺少必填字段 %s,原始记录: %s",
|
||||
spec.code,
|
||||
col_spec.column,
|
||||
record,
|
||||
)
|
||||
return None
|
||||
row[col_spec.column] = value
|
||||
|
||||
if spec.include_page_no:
|
||||
row["page_no"] = page_no
|
||||
if spec.include_page_size:
|
||||
row["page_size"] = page_size_value
|
||||
if spec.include_source_file:
|
||||
row["source_file"] = source_file
|
||||
if spec.include_source_endpoint:
|
||||
row["source_endpoint"] = spec.endpoint
|
||||
|
||||
row["fetched_at"] = datetime.now(self.tz)
|
||||
row["payload"] = record
|
||||
return row
|
||||
|
||||
def _extract_value(self, record: dict, spec: ColumnSpec):
|
||||
value = None
|
||||
for key in spec.sources:
|
||||
value = self._dig(record, key)
|
||||
if value is not None:
|
||||
break
|
||||
if value is None and spec.default is not None:
|
||||
value = spec.default
|
||||
if value is not None and spec.transform:
|
||||
value = spec.transform(value)
|
||||
return value
|
||||
|
||||
@staticmethod
|
||||
def _dig(record: Any, path: str | None):
|
||||
if not path:
|
||||
return None
|
||||
current = record
|
||||
for part in path.split("."):
|
||||
if isinstance(current, dict):
|
||||
current = current.get(part)
|
||||
else:
|
||||
return None
|
||||
return current
|
||||
|
||||
def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None:
|
||||
resolver = getattr(self.api, "get_source_hint", None)
|
||||
if callable(resolver):
|
||||
return resolver(spec.endpoint)
|
||||
return None
|
||||
|
||||
|
||||
def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec:
|
||||
return ColumnSpec(
|
||||
column=name,
|
||||
sources=sources,
|
||||
required=required,
|
||||
transform=TypeParser.parse_int,
|
||||
)
|
||||
|
||||
|
||||
ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = (
|
||||
OdsTaskSpec(
|
||||
code="ODS_ORDER_SETTLE",
|
||||
class_name="OdsOrderSettleTask",
|
||||
table_name="billiards_ods.ods_order_settle",
|
||||
endpoint="/order/list",
|
||||
data_path=("data",),
|
||||
pk_columns=(_int_col("order_settle_id", "orderSettleId", "order_settle_id", "id", required=True),),
|
||||
extra_columns=(_int_col("order_trade_no", "orderTradeNo", "order_trade_no"),),
|
||||
include_page_size=True,
|
||||
description="订单/结算 ODS 原始记录",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_TABLE_USE",
|
||||
class_name="OdsTableUseTask",
|
||||
table_name="billiards_ods.ods_table_use_detail",
|
||||
endpoint="/Table/UseDetailList",
|
||||
data_path=("data", "siteTableUseDetailsList"),
|
||||
pk_columns=(_int_col("ledger_id", "id", required=True),),
|
||||
extra_columns=(
|
||||
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
|
||||
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
|
||||
),
|
||||
description="台费/开台流水 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_ASSISTANT_LEDGER",
|
||||
class_name="OdsAssistantLedgerTask",
|
||||
table_name="billiards_ods.ods_assistant_ledger",
|
||||
endpoint="/Assistant/LedgerList",
|
||||
data_path=("data", "orderAssistantDetails"),
|
||||
pk_columns=(_int_col("ledger_id", "id", required=True),),
|
||||
extra_columns=(
|
||||
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
|
||||
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
|
||||
),
|
||||
description="助教流水 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_ASSISTANT_ABOLISH",
|
||||
class_name="OdsAssistantAbolishTask",
|
||||
table_name="billiards_ods.ods_assistant_abolish",
|
||||
endpoint="/Assistant/AbolishList",
|
||||
data_path=("data", "abolitionAssistants"),
|
||||
pk_columns=(_int_col("abolish_id", "id", required=True),),
|
||||
description="助教作废记录 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_GOODS_LEDGER",
|
||||
class_name="OdsGoodsLedgerTask",
|
||||
table_name="billiards_ods.ods_goods_ledger",
|
||||
endpoint="/Order/GoodsLedgerList",
|
||||
data_path=("data", "orderGoodsLedgers"),
|
||||
pk_columns=(_int_col("order_goods_id", "orderGoodsId", "id", required=True),),
|
||||
extra_columns=(
|
||||
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
|
||||
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
|
||||
),
|
||||
description="商品销售流水 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_PAYMENT",
|
||||
class_name="OdsPaymentTask",
|
||||
table_name="billiards_ods.ods_payment",
|
||||
endpoint="/pay/records",
|
||||
data_path=("data",),
|
||||
pk_columns=(_int_col("pay_id", "payId", "id", required=True),),
|
||||
extra_columns=(
|
||||
ColumnSpec(column="relate_type", sources=("relate_type", "relateType")),
|
||||
_int_col("relate_id", "relate_id", "relateId"),
|
||||
),
|
||||
include_page_size=False,
|
||||
description="支付流水 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_REFUND",
|
||||
class_name="OdsRefundTask",
|
||||
table_name="billiards_ods.ods_refund",
|
||||
endpoint="/Pay/RefundList",
|
||||
data_path=(),
|
||||
pk_columns=(_int_col("refund_id", "id", required=True),),
|
||||
extra_columns=(
|
||||
ColumnSpec(column="relate_type", sources=("relate_type", "relateType")),
|
||||
_int_col("relate_id", "relate_id", "relateId"),
|
||||
),
|
||||
description="退款流水 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_COUPON_VERIFY",
|
||||
class_name="OdsCouponVerifyTask",
|
||||
table_name="billiards_ods.ods_coupon_verify",
|
||||
endpoint="/Coupon/UsageList",
|
||||
data_path=(),
|
||||
pk_columns=(_int_col("coupon_id", "id", "couponId", required=True),),
|
||||
description="平台验券/团购流水 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_MEMBER",
|
||||
class_name="OdsMemberTask",
|
||||
table_name="billiards_ods.ods_member",
|
||||
endpoint="/MemberProfile/GetTenantMemberList",
|
||||
data_path=("data",),
|
||||
pk_columns=(_int_col("member_id", "memberId", required=True),),
|
||||
requires_window=False,
|
||||
description="会员档案 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_MEMBER_CARD",
|
||||
class_name="OdsMemberCardTask",
|
||||
table_name="billiards_ods.ods_member_card",
|
||||
endpoint="/MemberCard/List",
|
||||
data_path=("data", "tenantMemberCards"),
|
||||
pk_columns=(_int_col("card_id", "tenantMemberCardId", "cardId", required=True),),
|
||||
requires_window=False,
|
||||
description="会员卡/储值卡 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_PACKAGE",
|
||||
class_name="OdsPackageTask",
|
||||
table_name="billiards_ods.ods_package_coupon",
|
||||
endpoint="/Package/List",
|
||||
data_path=("data", "packageCouponList"),
|
||||
pk_columns=(_int_col("package_id", "id", "packageId", required=True),),
|
||||
requires_window=False,
|
||||
description="团购/套餐定义 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_INVENTORY_STOCK",
|
||||
class_name="OdsInventoryStockTask",
|
||||
table_name="billiards_ods.ods_inventory_stock",
|
||||
endpoint="/Inventory/StockSummary",
|
||||
data_path=(),
|
||||
pk_columns=(
|
||||
_int_col("site_goods_id", "siteGoodsId", required=True),
|
||||
ColumnSpec(column="snapshot_key", default="default", required=True),
|
||||
),
|
||||
requires_window=False,
|
||||
description="库存汇总 ODS",
|
||||
),
|
||||
OdsTaskSpec(
|
||||
code="ODS_INVENTORY_CHANGE",
|
||||
class_name="OdsInventoryChangeTask",
|
||||
table_name="billiards_ods.ods_inventory_change",
|
||||
endpoint="/Inventory/ChangeList",
|
||||
data_path=("data", "queryDeliveryRecordsList"),
|
||||
pk_columns=(_int_col("change_id", "siteGoodsStockId", "id", required=True),),
|
||||
description="库存变动 ODS",
|
||||
),
|
||||
)
|
||||
|
||||
|
||||
def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]:
|
||||
attrs = {
|
||||
"SPEC": spec,
|
||||
"__doc__": spec.description or f"ODS ingestion task {spec.code}",
|
||||
"__module__": __name__,
|
||||
}
|
||||
return type(spec.class_name, (BaseOdsTask,), attrs)
|
||||
|
||||
|
||||
ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = {
|
||||
spec.code: _build_task_class(spec) for spec in ODS_TASK_SPECS
|
||||
}
|
||||
|
||||
__all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask"]
|
||||
124
etl_billiards/tasks/payments_dwd_task.py
Normal file
124
etl_billiards/tasks/payments_dwd_task.py
Normal file
@@ -0,0 +1,124 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from .base_dwd_task import BaseDwdTask
|
||||
from loaders.facts.payment import PaymentLoader
|
||||
from models.parsers import TypeParser
|
||||
import json
|
||||
|
||||
class PaymentsDwdTask(BaseDwdTask):
|
||||
"""
|
||||
DWD Task: Process Payment Records from ODS to Fact Table
|
||||
Source: billiards_ods.ods_payment
|
||||
Target: billiards.fact_payment
|
||||
"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "PAYMENTS_DWD"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info(f"Starting {self.get_task_code()} task")
|
||||
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
self.logger.info(f"Processing window: {window_start} to {window_end}")
|
||||
|
||||
loader = PaymentLoader(self.db)
|
||||
store_id = self.config.get("app.store_id")
|
||||
|
||||
total_inserted = 0
|
||||
total_errors = 0
|
||||
|
||||
# Iterate ODS Data
|
||||
batches = self.iter_ods_rows(
|
||||
table_name="billiards_ods.ods_payment",
|
||||
columns=["store_id", "pay_id", "payload", "fetched_at"],
|
||||
start_time=window_start,
|
||||
end_time=window_end
|
||||
)
|
||||
|
||||
for batch in batches:
|
||||
if not batch:
|
||||
continue
|
||||
|
||||
parsed_rows = []
|
||||
for row in batch:
|
||||
payload = self.parse_payload(row)
|
||||
if not payload:
|
||||
continue
|
||||
|
||||
parsed = self._parse_payment(payload, store_id)
|
||||
if parsed:
|
||||
parsed_rows.append(parsed)
|
||||
|
||||
if parsed_rows:
|
||||
inserted, errors = loader.upsert_payments(parsed_rows, store_id)
|
||||
total_inserted += inserted
|
||||
total_errors += errors
|
||||
|
||||
self.db.commit()
|
||||
|
||||
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Errors: {total_errors}")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"inserted": total_inserted,
|
||||
"errors": total_errors,
|
||||
"window_start": window_start.isoformat(),
|
||||
"window_end": window_end.isoformat()
|
||||
}
|
||||
|
||||
def _parse_payment(self, raw: dict, store_id: int) -> dict:
|
||||
"""Parse ODS payload into Fact structure"""
|
||||
try:
|
||||
pay_id = TypeParser.parse_int(raw.get("payId") or raw.get("id"))
|
||||
if not pay_id:
|
||||
return None
|
||||
|
||||
relate_type = str(raw.get("relateType") or raw.get("relate_type") or "")
|
||||
relate_id = TypeParser.parse_int(raw.get("relateId") or raw.get("relate_id"))
|
||||
|
||||
# Attempt to populate settlement / trade identifiers
|
||||
order_settle_id = TypeParser.parse_int(
|
||||
raw.get("orderSettleId") or raw.get("order_settle_id")
|
||||
)
|
||||
order_trade_no = TypeParser.parse_int(
|
||||
raw.get("orderTradeNo") or raw.get("order_trade_no")
|
||||
)
|
||||
|
||||
if relate_type in {"1", "SETTLE", "ORDER"}:
|
||||
order_settle_id = order_settle_id or relate_id
|
||||
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"pay_id": pay_id,
|
||||
"order_settle_id": order_settle_id,
|
||||
"order_trade_no": order_trade_no,
|
||||
"relate_type": relate_type,
|
||||
"relate_id": relate_id,
|
||||
"site_id": TypeParser.parse_int(
|
||||
raw.get("siteId") or raw.get("site_id") or store_id
|
||||
),
|
||||
"tenant_id": TypeParser.parse_int(raw.get("tenantId") or raw.get("tenant_id")),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("createTime") or raw.get("create_time"), self.tz
|
||||
),
|
||||
"pay_time": TypeParser.parse_timestamp(raw.get("payTime"), self.tz),
|
||||
"pay_amount": TypeParser.parse_decimal(raw.get("payAmount")),
|
||||
"fee_amount": TypeParser.parse_decimal(
|
||||
raw.get("feeAmount")
|
||||
or raw.get("serviceFee")
|
||||
or raw.get("channelFee")
|
||||
or raw.get("fee_amount")
|
||||
),
|
||||
"discount_amount": TypeParser.parse_decimal(
|
||||
raw.get("discountAmount")
|
||||
or raw.get("couponAmount")
|
||||
or raw.get("discount_amount")
|
||||
),
|
||||
"payment_method": str(raw.get("paymentMethod") or raw.get("payment_method") or ""),
|
||||
"online_pay_channel": raw.get("onlinePayChannel") or raw.get("online_pay_channel"),
|
||||
"pay_terminal": raw.get("payTerminal") or raw.get("pay_terminal"),
|
||||
"pay_status": str(raw.get("payStatus") or raw.get("pay_status") or ""),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False)
|
||||
}
|
||||
except Exception as e:
|
||||
self.logger.warning(f"Error parsing payment: {e}")
|
||||
return None
|
||||
@@ -62,16 +62,42 @@ class PaymentsTask(BaseTask):
|
||||
def _parse_payment(self, raw: dict) -> dict:
|
||||
"""解析支付记录"""
|
||||
try:
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": self.config.get("app.store_id"),
|
||||
"pay_id": TypeParser.parse_int(raw.get("payId")),
|
||||
"store_id": store_id,
|
||||
"pay_id": TypeParser.parse_int(raw.get("payId") or raw.get("id")),
|
||||
"order_id": TypeParser.parse_int(raw.get("orderId")),
|
||||
"order_settle_id": TypeParser.parse_int(
|
||||
raw.get("orderSettleId") or raw.get("order_settle_id")
|
||||
),
|
||||
"order_trade_no": TypeParser.parse_int(
|
||||
raw.get("orderTradeNo") or raw.get("order_trade_no")
|
||||
),
|
||||
"relate_type": raw.get("relateType") or raw.get("relate_type"),
|
||||
"relate_id": TypeParser.parse_int(raw.get("relateId") or raw.get("relate_id")),
|
||||
"site_id": TypeParser.parse_int(raw.get("siteId") or raw.get("site_id") or store_id),
|
||||
"tenant_id": TypeParser.parse_int(raw.get("tenantId") or raw.get("tenant_id")),
|
||||
"pay_time": TypeParser.parse_timestamp(raw.get("payTime"), self.tz),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("createTime") or raw.get("create_time"), self.tz
|
||||
),
|
||||
"pay_amount": TypeParser.parse_decimal(raw.get("payAmount")),
|
||||
"fee_amount": TypeParser.parse_decimal(
|
||||
raw.get("feeAmount")
|
||||
or raw.get("serviceFee")
|
||||
or raw.get("channelFee")
|
||||
or raw.get("fee_amount")
|
||||
),
|
||||
"discount_amount": TypeParser.parse_decimal(
|
||||
raw.get("discountAmount") or raw.get("couponAmount") or raw.get("discount_amount")
|
||||
),
|
||||
"pay_type": raw.get("payType"),
|
||||
"payment_method": raw.get("paymentMethod") or raw.get("payment_method"),
|
||||
"online_pay_channel": raw.get("onlinePayChannel") or raw.get("online_pay_channel"),
|
||||
"pay_status": raw.get("payStatus"),
|
||||
"pay_terminal": raw.get("payTerminal") or raw.get("pay_terminal"),
|
||||
"remark": raw.get("remark"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False)
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
except Exception as e:
|
||||
self.logger.warning(f"解析支付记录失败: {e}, 原始数据: {raw}")
|
||||
|
||||
69
etl_billiards/tasks/ticket_dwd_task.py
Normal file
69
etl_billiards/tasks/ticket_dwd_task.py
Normal file
@@ -0,0 +1,69 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
from .base_dwd_task import BaseDwdTask
|
||||
from loaders.facts.ticket import TicketLoader
|
||||
|
||||
class TicketDwdTask(BaseDwdTask):
|
||||
"""
|
||||
DWD Task: Process Ticket Details from ODS to Fact Tables
|
||||
Source: billiards_ods.ods_ticket_detail
|
||||
Targets:
|
||||
- billiards.fact_order
|
||||
- billiards.fact_order_goods
|
||||
- billiards.fact_table_usage
|
||||
- billiards.fact_assistant_service
|
||||
"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "TICKET_DWD"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info(f"Starting {self.get_task_code()} task")
|
||||
|
||||
# 1. Get Time Window (Incremental Load)
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
self.logger.info(f"Processing window: {window_start} to {window_end}")
|
||||
|
||||
# 2. Initialize Loader
|
||||
loader = TicketLoader(self.db)
|
||||
store_id = self.config.get("app.store_id")
|
||||
|
||||
total_inserted = 0
|
||||
total_errors = 0
|
||||
|
||||
# 3. Iterate ODS Data
|
||||
# We query ods_ticket_detail based on fetched_at
|
||||
batches = self.iter_ods_rows(
|
||||
table_name="billiards_ods.ods_ticket_detail",
|
||||
columns=["store_id", "order_settle_id", "payload", "fetched_at"],
|
||||
start_time=window_start,
|
||||
end_time=window_end
|
||||
)
|
||||
|
||||
for batch in batches:
|
||||
if not batch:
|
||||
continue
|
||||
|
||||
# Extract payloads
|
||||
tickets = []
|
||||
for row in batch:
|
||||
payload = self.parse_payload(row)
|
||||
if payload:
|
||||
tickets.append(payload)
|
||||
|
||||
# Process Batch
|
||||
inserted, errors = loader.process_tickets(tickets, store_id)
|
||||
total_inserted += inserted
|
||||
total_errors += errors
|
||||
|
||||
# 4. Commit
|
||||
self.db.commit()
|
||||
|
||||
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Errors: {total_errors}")
|
||||
|
||||
return {
|
||||
"status": "success",
|
||||
"inserted": total_inserted,
|
||||
"errors": total_errors,
|
||||
"window_start": window_start.isoformat(),
|
||||
"window_end": window_end.isoformat()
|
||||
}
|
||||
144
etl_billiards/tests/testdata_json/summary.txt
Normal file
144
etl_billiards/tests/testdata_json/summary.txt
Normal file
@@ -0,0 +1,144 @@
|
||||
==========================================================================================
|
||||
20251110_034959_助教流水.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'orderAssistantDetails']
|
||||
list orderAssistantDetails len 100, elem type dict, keys ['assistantNo', 'nickname', 'levelName', 'assistantName', 'tableName', 'siteProfile', 'skillName', 'id', 'order_trade_no', 'site_id']
|
||||
==========================================================================================
|
||||
20251110_035004_助教废除.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'abolitionAssistants']
|
||||
list abolitionAssistants len 15, elem type dict, keys ['siteProfile', 'createTime', 'id', 'siteId', 'tableAreaId', 'tableId', 'tableArea', 'tableName', 'assistantOn', 'assistantName']
|
||||
==========================================================================================
|
||||
20251110_035011_台费流水.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'siteTableUseDetailsList']
|
||||
list siteTableUseDetailsList len 100, elem type dict, keys ['siteProfile', 'id', 'order_trade_no', 'site_id', 'tenant_id', 'member_id', 'operator_id', 'operator_name', 'order_settle_id', 'ledger_unit_price']
|
||||
==========================================================================================
|
||||
20251110_035904_小票详情.json
|
||||
root list len 193
|
||||
sample keys ['orderSettleId', 'data']
|
||||
data keys ['data', 'code']
|
||||
dict data keys ['tenantId', 'siteId', 'orderSettleId', 'orderSettleNumber', 'assistantManualDiscount', 'siteName', 'tenantName', 'siteAddress', 'siteBusinessTel', 'ticketRemark']
|
||||
==========================================================================================
|
||||
20251110_035908_台费打折.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'taiFeeAdjustInfos']
|
||||
list taiFeeAdjustInfos len 100, elem type dict, keys ['tableProfile', 'siteProfile', 'id', 'adjust_type', 'applicant_id', 'applicant_name', 'create_time', 'is_delete', 'ledger_amount', 'ledger_count']
|
||||
==========================================================================================
|
||||
20251110_035916_结账记录.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'settleList']
|
||||
list settleList len 100, elem type dict, keys ['siteProfile', 'settleList']
|
||||
==========================================================================================
|
||||
20251110_035923_支付记录.json
|
||||
root list len 200
|
||||
sample keys ['siteProfile', 'create_time', 'pay_amount', 'pay_status', 'pay_time', 'online_pay_channel', 'relate_type', 'relate_id', 'site_id', 'id', 'payment_method']
|
||||
dict siteProfile keys ['id', 'org_id', 'shop_name', 'avatar', 'business_tel', 'full_address', 'address', 'longitude', 'latitude', 'tenant_site_region_id']
|
||||
==========================================================================================
|
||||
20251110_035929_退款记录.json
|
||||
root list len 11
|
||||
sample keys ['tenantName', 'siteProfile', 'id', 'site_id', 'tenant_id', 'pay_sn', 'pay_amount', 'pay_status', 'pay_time', 'create_time', 'relate_type', 'relate_id', 'is_revoke', 'is_delete', 'online_pay_channel', 'payment_method', 'balance_frozen_amount', 'card_frozen_amount', 'member_id', 'member_card_id']
|
||||
dict siteProfile keys ['id', 'org_id', 'shop_name', 'avatar', 'business_tel', 'full_address', 'address', 'longitude', 'latitude', 'tenant_site_region_id']
|
||||
==========================================================================================
|
||||
20251110_035934_平台验券记录.json
|
||||
root list len 200
|
||||
sample keys ['siteProfile', 'id', 'tenant_id', 'site_id', 'sale_price', 'coupon_code', 'coupon_channel', 'site_order_id', 'coupon_free_time', 'use_status', 'create_time', 'is_delete', 'coupon_name', 'coupon_cover', 'coupon_remark', 'channel_deal_id', 'group_package_id', 'consume_time', 'groupon_type', 'coupon_money']
|
||||
dict siteProfile keys ['id', 'org_id', 'shop_name', 'avatar', 'business_tel', 'full_address', 'address', 'longitude', 'latitude', 'tenant_site_region_id']
|
||||
==========================================================================================
|
||||
20251110_035941_商品档案.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'tenantGoodsList']
|
||||
list tenantGoodsList len 100, elem type dict, keys ['categoryName', 'isInSite', 'commodityCode', 'id', 'tenant_id', 'goods_name', 'goods_cover', 'goods_state', 'goods_category_id', 'unit']
|
||||
==========================================================================================
|
||||
20251110_035948_门店销售记录.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'orderGoodsLedgers']
|
||||
list orderGoodsLedgers len 100, elem type dict, keys ['siteId', 'siteName', 'orderGoodsId', 'openSalesman', 'id', 'order_trade_no', 'site_id', 'tenant_id', 'operator_id', 'operator_name']
|
||||
==========================================================================================
|
||||
20251110_043159_库存变化记录1.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'queryDeliveryRecordsList']
|
||||
list queryDeliveryRecordsList len 100, elem type dict, keys ['siteGoodsStockId', 'siteGoodsId', 'siteId', 'tenantId', 'stockType', 'goodsName', 'createTime', 'startNum', 'endNum', 'changeNum']
|
||||
==========================================================================================
|
||||
20251110_043204_库存变化记录2.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'goodsCategoryList']
|
||||
list goodsCategoryList len 9, elem type dict, keys ['id', 'tenant_id', 'category_name', 'alias_name', 'pid', 'business_name', 'tenant_goods_business_id', 'open_salesman', 'categoryBoxes', 'sort']
|
||||
==========================================================================================
|
||||
20251110_043209_会员档案.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'tenantMemberInfos']
|
||||
list tenantMemberInfos len 100, elem type dict, keys ['id', 'create_time', 'member_card_grade_code', 'mobile', 'nickname', 'register_site_id', 'site_name', 'member_card_grade_name', 'system_member_id', 'tenant_id']
|
||||
==========================================================================================
|
||||
20251110_043217_余额变更记录.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'tenantMemberCardLogs']
|
||||
list tenantMemberCardLogs len 100, elem type dict, keys ['memberCardTypeName', 'paySiteName', 'registerSiteName', 'memberName', 'memberMobile', 'id', 'account_data', 'after', 'before', 'card_type_id']
|
||||
==========================================================================================
|
||||
20251110_043223_储值卡列表.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'totalOther', 'tenantMemberCards']
|
||||
list tenantMemberCards len 100, elem type dict, keys ['site_name', 'member_name', 'member_mobile', 'member_card_type_name', 'table_service_discount', 'assistant_service_discount', 'coupon_discount', 'goods_service_discount', 'is_allow_give', 'able_cross_site']
|
||||
==========================================================================================
|
||||
20251110_043231_充值记录.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'settleList']
|
||||
list settleList len 74, elem type dict, keys ['siteProfile', 'settleList']
|
||||
==========================================================================================
|
||||
20251110_043237_助教账号1.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'assistantInfos']
|
||||
list assistantInfos len 50, elem type dict, keys ['job_num', 'shop_name', 'group_id', 'group_name', 'staff_profile_id', 'ding_talk_synced', 'entry_type', 'team_name', 'entry_sign_status', 'resign_sign_status']
|
||||
==========================================================================================
|
||||
20251110_043243_助教账号2.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'assistantInfos']
|
||||
list assistantInfos len 50, elem type dict, keys ['job_num', 'shop_name', 'group_id', 'group_name', 'staff_profile_id', 'ding_talk_synced', 'entry_type', 'team_name', 'entry_sign_status', 'resign_sign_status']
|
||||
==========================================================================================
|
||||
20251110_043250_台桌列表.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'siteTables']
|
||||
list siteTables len 71, elem type dict, keys ['id', 'audit_status', 'charge_free', 'self_table', 'create_time', 'is_rest_area', 'light_status', 'show_status', 'site_id', 'site_table_area_id']
|
||||
==========================================================================================
|
||||
20251110_043255_团购套餐.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'packageCouponList']
|
||||
list packageCouponList len 17, elem type dict, keys ['site_name', 'effective_status', 'id', 'site_id', 'tenant_id', 'package_name', 'table_area_id', 'table_area_name', 'selling_price', 'duration']
|
||||
==========================================================================================
|
||||
20251110_043302_团购套餐流水.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'couponAmountSum', 'siteTableUseDetailsList']
|
||||
list siteTableUseDetailsList len 100, elem type dict, keys ['tableName', 'tableAreaName', 'siteName', 'goodsOptionPrice', 'id', 'order_trade_no', 'table_id', 'site_id', 'tenant_id', 'operator_id']
|
||||
==========================================================================================
|
||||
20251110_043308_库存汇总.json
|
||||
root list len 161
|
||||
sample keys ['siteGoodsId', 'goodsName', 'goodsUnit', 'goodsCategoryId', 'goodsCategorySecondId', 'rangeStartStock', 'rangeEndStock', 'rangeIn', 'rangeOut', 'rangeInventory', 'rangeSale', 'rangeSaleMoney', 'currentStock', 'categoryName']
|
||||
==========================================================================================
|
||||
20251110_051132_门店商品档案1.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['total', 'orderGoodsList']
|
||||
list orderGoodsList len 100, elem type dict, keys ['siteName', 'oneCategoryName', 'twoCategoryName', 'id', 'tenant_goods_id', 'site_id', 'tenant_id', 'goods_name', 'goods_cover', 'goods_state']
|
||||
==========================================================================================
|
||||
20251110_051138_门店商品档案2.json
|
||||
root list len 2
|
||||
sample keys ['data', 'code']
|
||||
data keys ['goodsStockA', 'goodsStockB', 'goodsSaleNum', 'stockSumMoney']
|
||||
@@ -135,16 +135,41 @@ class FakeDBOperations:
|
||||
|
||||
def __init__(self):
|
||||
self.upserts: List[Dict] = []
|
||||
self.executes: List[Dict] = []
|
||||
self.commits = 0
|
||||
self.rollbacks = 0
|
||||
self.conn = FakeConnection()
|
||||
|
||||
def batch_upsert_with_returning(self, sql: str, rows: List[Dict], page_size: int = 1000):
|
||||
self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size})
|
||||
self.upserts.append(
|
||||
{
|
||||
"sql": sql.strip(),
|
||||
"count": len(rows),
|
||||
"page_size": page_size,
|
||||
"rows": [dict(row) for row in rows],
|
||||
}
|
||||
)
|
||||
return len(rows), 0
|
||||
|
||||
def batch_execute(self, sql: str, rows: List[Dict], page_size: int = 1000):
|
||||
self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size})
|
||||
self.executes.append(
|
||||
{
|
||||
"sql": sql.strip(),
|
||||
"count": len(rows),
|
||||
"page_size": page_size,
|
||||
"rows": [dict(row) for row in rows],
|
||||
}
|
||||
)
|
||||
|
||||
def execute(self, sql: str, params=None):
|
||||
self.executes.append({"sql": sql.strip(), "params": params})
|
||||
|
||||
def query(self, sql: str, params=None):
|
||||
self.executes.append({"sql": sql.strip(), "params": params, "type": "query"})
|
||||
return []
|
||||
|
||||
def cursor(self):
|
||||
return self.conn.cursor()
|
||||
|
||||
def commit(self):
|
||||
self.commits += 1
|
||||
@@ -161,22 +186,53 @@ class FakeAPIClient:
|
||||
self.calls: List[Dict] = []
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get_paginated(self, endpoint: str, params=None, **kwargs):
|
||||
def iter_paginated(
|
||||
self,
|
||||
endpoint: str,
|
||||
params=None,
|
||||
page_size: int = 200,
|
||||
page_field: str = "pageIndex",
|
||||
size_field: str = "pageSize",
|
||||
data_path: Tuple[str, ...] = (),
|
||||
list_key: str | None = None,
|
||||
):
|
||||
self.calls.append({"endpoint": endpoint, "params": params})
|
||||
if endpoint not in self.data_map:
|
||||
raise AssertionError(f"Missing fixture for endpoint {endpoint}")
|
||||
return list(self.data_map[endpoint]), [{"page": 1, "size": len(self.data_map[endpoint])}]
|
||||
|
||||
records = list(self.data_map[endpoint])
|
||||
yield 1, records, dict(params or {}), {"data": records}
|
||||
|
||||
def get_paginated(self, endpoint: str, params=None, **kwargs):
|
||||
records = []
|
||||
pages = []
|
||||
for page_no, page_records, req, resp in self.iter_paginated(endpoint, params, **kwargs):
|
||||
records.extend(page_records)
|
||||
pages.append({"page": page_no, "request": req, "response": resp})
|
||||
return records, pages
|
||||
|
||||
def get_source_hint(self, endpoint: str) -> str | None:
|
||||
return None
|
||||
|
||||
|
||||
class OfflineAPIClient:
|
||||
"""离线模式专用 API Client,根据 endpoint 读取归档 JSON、套用 data_path 并回放列表数据。"""
|
||||
"""离线模式专用 API Client,根据 endpoint 读取归档 JSON、套入 data_path 并回放列表数据。"""
|
||||
|
||||
def __init__(self, file_map: Dict[str, Path]):
|
||||
self.file_map = {k: Path(v) for k, v in file_map.items()}
|
||||
self.calls: List[Dict] = []
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get_paginated(self, endpoint: str, params=None, page_size: int = 200, data_path: Tuple[str, ...] = (), **kwargs):
|
||||
def iter_paginated(
|
||||
self,
|
||||
endpoint: str,
|
||||
params=None,
|
||||
page_size: int = 200,
|
||||
page_field: str = "pageIndex",
|
||||
size_field: str = "pageSize",
|
||||
data_path: Tuple[str, ...] = (),
|
||||
list_key: str | None = None,
|
||||
):
|
||||
self.calls.append({"endpoint": endpoint, "params": params})
|
||||
if endpoint not in self.file_map:
|
||||
raise AssertionError(f"Missing archive for endpoint {endpoint}")
|
||||
@@ -188,17 +244,42 @@ class OfflineAPIClient:
|
||||
for key in data_path:
|
||||
if isinstance(data, dict):
|
||||
data = data.get(key, [])
|
||||
else:
|
||||
data = []
|
||||
break
|
||||
|
||||
if list_key and isinstance(data, dict):
|
||||
data = data.get(list_key, [])
|
||||
|
||||
if not isinstance(data, list):
|
||||
data = []
|
||||
|
||||
return data, [{"page": 1, "mode": "offline"}]
|
||||
total = len(data)
|
||||
start = 0
|
||||
page = 1
|
||||
while start < total or (start == 0 and total == 0):
|
||||
chunk = data[start : start + page_size]
|
||||
if not chunk and total != 0:
|
||||
break
|
||||
yield page, list(chunk), dict(params or {}), payload
|
||||
if len(chunk) < page_size:
|
||||
break
|
||||
start += page_size
|
||||
page += 1
|
||||
|
||||
def get_paginated(self, endpoint: str, params=None, **kwargs):
|
||||
records = []
|
||||
pages = []
|
||||
for page_no, page_records, req, resp in self.iter_paginated(endpoint, params, **kwargs):
|
||||
records.extend(page_records)
|
||||
pages.append({"page": page_no, "request": req, "response": resp})
|
||||
return records, pages
|
||||
|
||||
def get_source_hint(self, endpoint: str) -> str | None:
|
||||
if endpoint not in self.file_map:
|
||||
return None
|
||||
return str(self.file_map[endpoint])
|
||||
|
||||
|
||||
class RealDBOperationsAdapter:
|
||||
|
||||
"""连接真实 PostgreSQL 的适配器,为任务提供 batch_upsert + 事务能力。"""
|
||||
|
||||
def __init__(self, dsn: str):
|
||||
|
||||
74
etl_billiards/tests/unit/test_ods_tasks.py
Normal file
74
etl_billiards/tests/unit/test_ods_tasks.py
Normal file
@@ -0,0 +1,74 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""Unit tests for the new ODS ingestion tasks."""
|
||||
import logging
|
||||
import sys
|
||||
from pathlib import Path
|
||||
|
||||
# Ensure project root is resolvable when running tests in isolation
|
||||
PROJECT_ROOT = Path(__file__).resolve().parents[2]
|
||||
if str(PROJECT_ROOT) not in sys.path:
|
||||
sys.path.insert(0, str(PROJECT_ROOT))
|
||||
|
||||
from tasks.ods_tasks import ODS_TASK_CLASSES
|
||||
from .task_test_utils import create_test_config, get_db_operations, FakeAPIClient
|
||||
|
||||
|
||||
def _build_config(tmp_path):
|
||||
archive_dir = tmp_path / "archive"
|
||||
temp_dir = tmp_path / "temp"
|
||||
return create_test_config("ONLINE", archive_dir, temp_dir)
|
||||
|
||||
|
||||
def test_ods_order_settle_ingest(tmp_path):
|
||||
"""Ensure ODS_ORDER_SETTLE task writes raw payload + metadata."""
|
||||
config = _build_config(tmp_path)
|
||||
sample = [
|
||||
{
|
||||
"orderSettleId": 701,
|
||||
"orderTradeNo": 8001,
|
||||
"anyField": "value",
|
||||
}
|
||||
]
|
||||
api = FakeAPIClient({"/order/list": sample})
|
||||
task_cls = ODS_TASK_CLASSES["ODS_ORDER_SETTLE"]
|
||||
|
||||
with get_db_operations() as db_ops:
|
||||
task = task_cls(config, db_ops, api, logging.getLogger("test_ods_order"))
|
||||
result = task.execute()
|
||||
|
||||
assert result["status"] == "SUCCESS"
|
||||
assert result["counts"]["fetched"] == 1
|
||||
assert db_ops.commits == 1
|
||||
row = db_ops.upserts[0]["rows"][0]
|
||||
assert row["order_settle_id"] == 701
|
||||
assert row["order_trade_no"] == 8001
|
||||
assert row["source_endpoint"] == "/order/list"
|
||||
assert '"orderSettleId": 701' in row["payload"]
|
||||
|
||||
|
||||
def test_ods_payment_ingest(tmp_path):
|
||||
"""Ensure ODS_PAYMENT task stores relate fields and payload."""
|
||||
config = _build_config(tmp_path)
|
||||
sample = [
|
||||
{
|
||||
"payId": 901,
|
||||
"relateType": "ORDER",
|
||||
"relateId": 123,
|
||||
"payAmount": "100.00",
|
||||
}
|
||||
]
|
||||
api = FakeAPIClient({"/pay/records": sample})
|
||||
task_cls = ODS_TASK_CLASSES["ODS_PAYMENT"]
|
||||
|
||||
with get_db_operations() as db_ops:
|
||||
task = task_cls(config, db_ops, api, logging.getLogger("test_ods_payment"))
|
||||
result = task.execute()
|
||||
|
||||
assert result["status"] == "SUCCESS"
|
||||
assert result["counts"]["fetched"] == 1
|
||||
assert db_ops.commits == 1
|
||||
row = db_ops.upserts[0]["rows"][0]
|
||||
assert row["pay_id"] == 901
|
||||
assert row["relate_type"] == "ORDER"
|
||||
assert row["relate_id"] == 123
|
||||
assert '"payId": 901' in row["payload"]
|
||||
1
schema_v2.sql
Normal file
1
schema_v2.sql
Normal file
@@ -0,0 +1 @@
|
||||
|
||||
Reference in New Issue
Block a user