Compare commits

...

2 Commits

Author SHA1 Message Date
Neo
cbd16a39ba 阶段性更新 2025-11-20 01:27:33 +08:00
Neo
92f219b575 阶段性更新 2025-11-20 01:27:04 +08:00
25 changed files with 8268 additions and 60 deletions

View File

@@ -22,6 +22,7 @@ STORE_ID=2790685415443269
# 路径配置
EXPORT_ROOT=r"D:\LLZQ\DB\export",
LOG_ROOT=r"D:\LLZQ\DB\logs",
PGCLIENTENCODING=utf8
# ETL配置
OVERLAP_SECONDS=120 # 为了防止边界遗漏,会往前“回拨”一点的冗余秒数

View File

@@ -36,8 +36,6 @@ LOG_UNKNOWN_FIELDS=true
HASH_ALGO=sha1
STRICT_NUMERIC=true
ROUND_MONEY_SCALE=2
<<<<<<< HEAD
=======
# 测试/离线模式
TEST_MODE=ONLINE
@@ -46,4 +44,3 @@ TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp
# 测试数据库(可选:若设置则单元测试连入此 DSN
TEST_DB_DSN=
>>>>>>> main

File diff suppressed because it is too large Load Diff

View File

@@ -219,6 +219,42 @@ python scripts/run_tests.py --preset offline_realdb
python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py
```
#### 3.3.2 脚本化测试组合(`run_tests.py` / `test_presets.py`
- `scripts/run_tests.py` 是 pytest 的统一入口:自动把项目根目录加入 `sys.path`,并提供 `--suite online/offline/integration`、`--tests`(自定义路径)、`--mode`、`--db-dsn`、`--json-archive`、`--json-temp`、`--keyword/-k`、`--pytest-args`、`--env KEY=VALUE` 等参数,可以像搭积木一样自由组合;
- `--preset foo` 会读取 `scripts/test_presets.py` 内 `PRESETS["foo"]` 的配置,并叠加到当前命令;`--list-presets` 与 `--dry-run` 可用来审阅或仅打印命令;
- 直接执行 `python scripts/test_presets.py` 可依次运行 `AUTO_RUN_PRESETS` 中列出的预置;传入 `--preset x --dry-run` 则只打印对应命令。
`test_presets.py` 充当“指令仓库”。每个预置都是一个字典,常用字段解释如下:
| 字段 | 作用 |
| ---- | ---- |
| `suite` | 复用 `run_tests.py` 内置套件online/offline/integration可多选 |
| `tests` | 追加任意 pytest 路径,例如 `tests/unit/test_config.py` |
| `mode` | 覆盖 `TEST_MODE`ONLINE / OFFLINE |
| `db_dsn` | 覆盖 `TEST_DB_DSN`,用于连入真实测试库 |
| `json_archive` / `json_temp` | 配置离线 JSON 归档与临时目录 |
| `keyword` | 映射到 `pytest -k`,用于关键字过滤 |
| `pytest_args` | 附加 pytest 参数,例 `-vv --maxfail=1` |
| `env` | 额外环境变量列表,如 `["STORE_ID=123"]` |
| `preset_meta` | 说明性文字,便于描述场景 |
示例:`offline_realdb` 预置会设置 `TEST_MODE=OFFLINE`、指定 `tests/testdata_json` 为归档目录,并通过 `db_dsn` 连到测试库。执行 `python scripts/run_tests.py --preset offline_realdb` 或 `python scripts/test_presets.py --preset offline_realdb` 即可复用该组合保证本地、CI 与生产回放脚本一致。
#### 3.3.3 数据库连通性快速检查
`python scripts/test_db_connection.py` 提供最轻量的 PostgreSQL 连通性检测:默认使用 `TEST_DB_DSN`(也可传 `--dsn`),尝试连接并执行 `SELECT 1 AS ok`(可通过 `--query` 自定义)。典型用途:
```bash
# 读取 .env/环境变量中的 TEST_DB_DSN
python scripts/test_db_connection.py
# 临时指定 DSN并检查任务配置表
python scripts/test_db_connection.py --dsn postgresql://user:pwd@host:5432/LLZQ-test --query "SELECT count(*) FROM etl_admin.etl_task"
```
脚本返回 0 代表连接与查询成功;若返回非 0可结合第 8 章“常见问题排查”的数据库章节(网络、防火墙、账号权限等)先定位问题,再运行完整 ETL。
---
## 4. 项目结构与文件说明
@@ -452,6 +488,19 @@ etl_billiards/
- API 请求失败时按配置进行重试,超过重试次数记录错误并终止该任务。
- 所有错误被记录到日志和运行追踪表,便于事后排查。
### 5.6 ODS + DWD 双阶段策略(新增)
为了支撑回溯/重放与后续 DWD 宽表构建,项目新增了 `billiards_ods` Schema 以及一组专门的 ODS 任务/Loader
- **ODS 表**`billiards_ods.ods_order_settle`、`ods_table_use_detail`、`ods_assistant_ledger`、`ods_assistant_abolish`、`ods_goods_ledger`、`ods_payment`、`ods_refund`、`ods_coupon_verify`、`ods_member`、`ods_member_card`、`ods_package_coupon`、`ods_inventory_stock`、`ods_inventory_change`。每条记录都会保存 `store_id + 源主键 + payload JSON + fetched_at + source_endpoint` 等信息。
- **通用 Loader**`loaders/ods/generic.py::GenericODSLoader` 统一封装了 `INSERT ... ON CONFLICT ...` 与批量写入逻辑,调用方只需提供列名与主键列即可。
- **ODS 任务**`tasks/ods_tasks.py` 内通过 `OdsTaskSpec` 定义了一组任务(`ODS_ORDER_SETTLE`、`ODS_PAYMENT`、`ODS_ASSISTANT_LEDGER` 等),并在 `TaskRegistry` 中自动注册,可直接通过 `python -m cli.main --tasks ODS_ORDER_SETTLE,ODS_PAYMENT` 执行。
- **双阶段链路**
1. 阶段 1ODS调用 API/离线归档 JSON将原始记录写入 ODS 表,保留分页、抓取时间、来源文件等元数据。
2. 阶段 2DWD/DIM后续订单、支付、券等事实任务将改为从 ODS 读取 payload经过解析/校验后写入 `billiards.fact_*`、`dim_*` 表,避免重复拉取上游接口。
> 新增的单元测试 `tests/unit/test_ods_tasks.py` 覆盖了 `ODS_ORDER_SETTLE`、`ODS_PAYMENT` 的入库路径,可作为扩展其他 ODS 任务的模板。
---
## 6. 迁移指南(从旧脚本到当前项目)

View File

@@ -54,42 +54,76 @@ class APIClient:
resp.raise_for_status()
return resp.json()
def iter_paginated(
self,
endpoint: str,
params: dict | None,
page_size: int = 200,
page_field: str = "pageIndex",
size_field: str = "pageSize",
data_path: tuple = ("data",),
list_key: str | None = None,
):
"""分页迭代器:逐页拉取数据并产出 (page_no, records, request_params, raw_response)。"""
base_params = dict(params or {})
page = 1
while True:
page_params = dict(base_params)
page_params[page_field] = page
page_params[size_field] = page_size
payload = self.get(endpoint, page_params)
records = self._extract_list(payload, data_path, list_key)
yield page, records, page_params, payload
if len(records) < page_size:
break
if len(records) == 0:
break
page += 1
def get_paginated(self, endpoint: str, params: dict, page_size: int = 200,
page_field: str = "pageIndex", size_field: str = "pageSize",
data_path: tuple = ("data",), list_key: str = None) -> tuple:
"""分页获取数据"""
"""分页获取数据并将所有记录汇总在一个列表中。"""
records, pages_meta = [], []
page = 1
while True:
p = dict(params)
p[page_field] = page
p[size_field] = page_size
obj = self.get(endpoint, p)
# 解析数据路径
cur = obj
for k in data_path:
if isinstance(cur, dict) and k in cur:
cur = cur[k]
if list_key:
cur = (cur or {}).get(list_key, [])
if not isinstance(cur, list):
cur = []
records.extend(cur)
if len(cur) == 0:
break
pages_meta.append({"page": page, "request": p, "response": obj})
if len(cur) < page_size:
break
page += 1
for page_no, page_records, request_params, response in self.iter_paginated(
endpoint=endpoint,
params=params,
page_size=page_size,
page_field=page_field,
size_field=size_field,
data_path=data_path,
list_key=list_key,
):
records.extend(page_records)
pages_meta.append(
{"page": page_no, "request": request_params, "response": response}
)
return records, pages_meta
@staticmethod
def _extract_list(payload: dict, data_path: tuple, list_key: str | None):
"""辅助函数:根据 data_path/list_key 提取列表结构。"""
cur = payload
for key in data_path:
if isinstance(cur, dict):
cur = cur.get(key)
else:
cur = None
if cur is None:
break
if list_key and isinstance(cur, dict):
cur = cur.get(list_key)
if not isinstance(cur, list):
cur = []
return cur

View File

@@ -7,6 +7,7 @@ class DatabaseOperations:
"""数据库批量操作封装"""
def __init__(self, connection):
self._connection = connection
self.conn = connection.conn
def batch_execute(self, sql: str, rows: list, page_size: int = 1000):
@@ -75,3 +76,24 @@ class DatabaseOperations:
if isinstance(rec, dict):
return bool(rec.get("inserted"))
return False
# --- pass-through helpers -------------------------------------------------
def commit(self):
"""提交事务(委托给底层连接)"""
self._connection.commit()
def rollback(self):
"""回滚事务(委托给底层连接)"""
self._connection.rollback()
def query(self, sql: str, args=None):
"""执行查询并返回结果"""
return self._connection.query(sql, args)
def execute(self, sql: str, args=None):
"""执行任意 SQL"""
self._connection.execute(sql, args)
def cursor(self):
"""暴露原生 cursor供特殊操作使用"""
return self.conn.cursor()

View File

@@ -1,31 +1,50 @@
# -*- coding: utf-8 -*-
"""支付记录加载器"""
"""支付事实表加载器"""
from ..base_loader import BaseLoader
class PaymentLoader(BaseLoader):
"""支付记录加载器"""
"""支付数据加载器"""
def upsert_payments(self, records: list, store_id: int) -> tuple:
"""加载支付记录"""
"""加载支付数据"""
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.fact_payment (
store_id, pay_id, order_id, pay_time, pay_amount,
pay_type, pay_status, remark, raw_data
store_id, pay_id, site_id, tenant_id,
order_settle_id, order_trade_no,
relate_type, relate_id,
create_time, pay_time,
pay_amount, fee_amount, discount_amount,
payment_method, online_pay_channel, pay_terminal,
pay_status, raw_data
)
VALUES (
%(store_id)s, %(pay_id)s, %(order_id)s, %(pay_time)s, %(pay_amount)s,
%(pay_type)s, %(pay_status)s, %(remark)s, %(raw_data)s
%(store_id)s, %(pay_id)s, %(site_id)s, %(tenant_id)s,
%(order_settle_id)s, %(order_trade_no)s,
%(relate_type)s, %(relate_id)s,
%(create_time)s, %(pay_time)s,
%(pay_amount)s, %(fee_amount)s, %(discount_amount)s,
%(payment_method)s, %(online_pay_channel)s, %(pay_terminal)s,
%(pay_status)s, %(raw_data)s
)
ON CONFLICT (store_id, pay_id) DO UPDATE SET
order_id = EXCLUDED.order_id,
order_settle_id = EXCLUDED.order_settle_id,
order_trade_no = EXCLUDED.order_trade_no,
relate_type = EXCLUDED.relate_type,
relate_id = EXCLUDED.relate_id,
site_id = EXCLUDED.site_id,
tenant_id = EXCLUDED.tenant_id,
create_time = EXCLUDED.create_time,
pay_time = EXCLUDED.pay_time,
pay_amount = EXCLUDED.pay_amount,
pay_type = EXCLUDED.pay_type,
fee_amount = EXCLUDED.fee_amount,
discount_amount = EXCLUDED.discount_amount,
payment_method = EXCLUDED.payment_method,
online_pay_channel = EXCLUDED.online_pay_channel,
pay_terminal = EXCLUDED.pay_terminal,
pay_status = EXCLUDED.pay_status,
remark = EXCLUDED.remark,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted

View File

@@ -0,0 +1,188 @@
# -*- coding: utf-8 -*-
"""小票详情加载器"""
from ..base_loader import BaseLoader
import json
class TicketLoader(BaseLoader):
"""
Loader for parsing Ticket Detail JSON and populating DWD fact tables.
Handles:
- fact_order (Header)
- fact_order_goods (Items)
- fact_table_usage (Items)
- fact_assistant_service (Items)
"""
def process_tickets(self, tickets: list, store_id: int) -> tuple:
"""
Process a batch of ticket JSONs.
Returns (inserted_count, error_count)
"""
inserted_count = 0
error_count = 0
# Prepare batch lists
orders = []
goods_list = []
table_usages = []
assistant_services = []
for ticket in tickets:
try:
# 1. Parse Header (fact_order)
root_data = ticket.get("data", {}).get("data", {})
if not root_data:
continue
order_settle_id = root_data.get("orderSettleId")
if not order_settle_id:
continue
orders.append({
"store_id": store_id,
"order_settle_id": order_settle_id,
"order_trade_no": 0,
"order_no": str(root_data.get("orderSettleNumber", "")),
"member_id": 0,
"pay_time": root_data.get("payTime"),
"total_amount": root_data.get("consumeMoney", 0),
"pay_amount": root_data.get("actualPayment", 0),
"discount_amount": root_data.get("memberOfferAmount", 0),
"coupon_amount": root_data.get("couponAmount", 0),
"status": "PAID",
"cashier_name": root_data.get("cashierName", ""),
"remark": root_data.get("orderRemark", ""),
"raw_data": json.dumps(ticket, ensure_ascii=False)
})
# 2. Parse Items (orderItem list)
order_items = root_data.get("orderItem", [])
for item in order_items:
order_trade_no = item.get("siteOrderId")
# 2.1 Table Ledger
table_ledger = item.get("tableLedger")
if table_ledger:
table_usages.append({
"store_id": store_id,
"order_ledger_id": table_ledger.get("orderTableLedgerId"),
"order_settle_id": order_settle_id,
"table_id": table_ledger.get("siteTableId"),
"table_name": table_ledger.get("tableName"),
"start_time": table_ledger.get("chargeStartTime"),
"end_time": table_ledger.get("chargeEndTime"),
"duration_minutes": table_ledger.get("useDuration", 0),
"total_amount": table_ledger.get("consumptionAmount", 0),
"pay_amount": table_ledger.get("consumptionAmount", 0) - table_ledger.get("memberDiscountAmount", 0)
})
# 2.2 Goods Ledgers
goods_ledgers = item.get("goodsLedgers", [])
for g in goods_ledgers:
goods_list.append({
"store_id": store_id,
"order_goods_id": g.get("orderGoodsLedgerId"),
"order_settle_id": order_settle_id,
"order_trade_no": order_trade_no,
"goods_id": g.get("siteGoodsId"),
"goods_name": g.get("goodsName"),
"quantity": g.get("goodsCount", 0),
"unit_price": g.get("goodsPrice", 0),
"total_amount": g.get("ledgerAmount", 0),
"pay_amount": g.get("realGoodsMoney", 0)
})
# 2.3 Assistant Services
assistant_ledgers = item.get("assistantPlayWith", [])
for a in assistant_ledgers:
assistant_services.append({
"store_id": store_id,
"ledger_id": a.get("orderAssistantLedgerId"),
"order_settle_id": order_settle_id,
"assistant_id": a.get("assistantId"),
"assistant_name": a.get("ledgerName"),
"service_type": a.get("skillName", "Play"),
"start_time": a.get("ledgerStartTime"),
"end_time": a.get("ledgerEndTime"),
"duration_minutes": int(a.get("ledgerCount", 0) / 60) if a.get("ledgerCount") else 0,
"total_amount": a.get("ledgerAmount", 0),
"pay_amount": a.get("ledgerAmount", 0)
})
inserted_count += 1
except Exception as e:
self.logger.error(f"Error parsing ticket: {e}", exc_info=True)
error_count += 1
# 3. Batch Insert/Upsert
if orders:
self._upsert_orders(orders)
if goods_list:
self._upsert_goods(goods_list)
if table_usages:
self._upsert_table_usages(table_usages)
if assistant_services:
self._upsert_assistant_services(assistant_services)
return inserted_count, error_count
def _upsert_orders(self, rows):
sql = """
INSERT INTO billiards.fact_order (
store_id, order_settle_id, order_trade_no, order_no, member_id,
pay_time, total_amount, pay_amount, discount_amount, coupon_amount,
status, cashier_name, remark, raw_data
) VALUES (
%(store_id)s, %(order_settle_id)s, %(order_trade_no)s, %(order_no)s, %(member_id)s,
%(pay_time)s, %(total_amount)s, %(pay_amount)s, %(discount_amount)s, %(coupon_amount)s,
%(status)s, %(cashier_name)s, %(remark)s, %(raw_data)s
)
ON CONFLICT (store_id, order_settle_id) DO UPDATE SET
pay_time = EXCLUDED.pay_time,
pay_amount = EXCLUDED.pay_amount,
updated_at = now()
"""
self.db.batch_execute(sql, rows)
def _upsert_goods(self, rows):
sql = """
INSERT INTO billiards.fact_order_goods (
store_id, order_goods_id, order_settle_id, order_trade_no,
goods_id, goods_name, quantity, unit_price, total_amount, pay_amount
) VALUES (
%(store_id)s, %(order_goods_id)s, %(order_settle_id)s, %(order_trade_no)s,
%(goods_id)s, %(goods_name)s, %(quantity)s, %(unit_price)s, %(total_amount)s, %(pay_amount)s
)
ON CONFLICT (store_id, order_goods_id) DO UPDATE SET
pay_amount = EXCLUDED.pay_amount
"""
self.db.batch_execute(sql, rows)
def _upsert_table_usages(self, rows):
sql = """
INSERT INTO billiards.fact_table_usage (
store_id, order_ledger_id, order_settle_id, table_id, table_name,
start_time, end_time, duration_minutes, total_amount, pay_amount
) VALUES (
%(store_id)s, %(order_ledger_id)s, %(order_settle_id)s, %(table_id)s, %(table_name)s,
%(start_time)s, %(end_time)s, %(duration_minutes)s, %(total_amount)s, %(pay_amount)s
)
ON CONFLICT (store_id, order_ledger_id) DO UPDATE SET
pay_amount = EXCLUDED.pay_amount
"""
self.db.batch_execute(sql, rows)
def _upsert_assistant_services(self, rows):
sql = """
INSERT INTO billiards.fact_assistant_service (
store_id, ledger_id, order_settle_id, assistant_id, assistant_name,
service_type, start_time, end_time, duration_minutes, total_amount, pay_amount
) VALUES (
%(store_id)s, %(ledger_id)s, %(order_settle_id)s, %(assistant_id)s, %(assistant_name)s,
%(service_type)s, %(start_time)s, %(end_time)s, %(duration_minutes)s, %(total_amount)s, %(pay_amount)s
)
ON CONFLICT (store_id, ledger_id) DO UPDATE SET
pay_amount = EXCLUDED.pay_amount
"""
self.db.batch_execute(sql, rows)

View File

@@ -0,0 +1,6 @@
# -*- coding: utf-8 -*-
"""ODS loader helpers."""
from .generic import GenericODSLoader
__all__ = ["GenericODSLoader"]

View File

@@ -0,0 +1,67 @@
# -*- coding: utf-8 -*-
"""Generic ODS loader that keeps raw payload + primary keys."""
from __future__ import annotations
import json
from datetime import datetime, timezone
from typing import Iterable, Sequence
from ..base_loader import BaseLoader
class GenericODSLoader(BaseLoader):
"""Insert/update helper for ODS tables that share the same pattern."""
def __init__(
self,
db_ops,
table_name: str,
columns: Sequence[str],
conflict_columns: Sequence[str],
):
super().__init__(db_ops)
if not conflict_columns:
raise ValueError("conflict_columns must not be empty for ODS loader")
self.table_name = table_name
self.columns = list(columns)
self.conflict_columns = list(conflict_columns)
self._sql = self._build_sql()
def upsert_rows(self, rows: Iterable[dict]) -> tuple[int, int, int]:
"""Insert/update the provided iterable of dictionaries."""
rows = list(rows)
if not rows:
return (0, 0, 0)
normalized = [self._normalize_row(row) for row in rows]
inserted, updated = self.db.batch_upsert_with_returning(
self._sql, normalized, page_size=self._batch_size()
)
return inserted, updated, 0
def _build_sql(self) -> str:
col_list = ", ".join(self.columns)
placeholders = ", ".join(f"%({col})s" for col in self.columns)
conflict_clause = ", ".join(self.conflict_columns)
update_columns = [c for c in self.columns if c not in self.conflict_columns]
set_clause = ", ".join(f"{col} = EXCLUDED.{col}" for col in update_columns)
return (
f"INSERT INTO {self.table_name} ({col_list}) "
f"VALUES ({placeholders}) "
f"ON CONFLICT ({conflict_clause}) DO UPDATE SET {set_clause} "
f"RETURNING (xmax = 0) AS inserted"
)
def _normalize_row(self, row: dict) -> dict:
normalized = {}
for col in self.columns:
value = row.get(col)
if col == "payload" and value is not None and not isinstance(value, str):
normalized[col] = json.dumps(value, ensure_ascii=False)
else:
normalized[col] = value
if "fetched_at" in normalized and normalized["fetched_at"] is None:
normalized["fetched_at"] = datetime.now(timezone.utc)
return normalized

View File

@@ -14,6 +14,11 @@ from tasks.topups_task import TopupsTask
from tasks.table_discount_task import TableDiscountTask
from tasks.assistant_abolish_task import AssistantAbolishTask
from tasks.ledger_task import LedgerTask
from tasks.ods_tasks import ODS_TASK_CLASSES
from tasks.ticket_dwd_task import TicketDwdTask
from tasks.manual_ingest_task import ManualIngestTask
from tasks.payments_dwd_task import PaymentsDwdTask
from tasks.members_dwd_task import MembersDwdTask
class TaskRegistry:
"""任务注册和工厂"""
@@ -55,3 +60,9 @@ default_registry.register("TOPUPS", TopupsTask)
default_registry.register("TABLE_DISCOUNT", TableDiscountTask)
default_registry.register("ASSISTANT_ABOLISH", AssistantAbolishTask)
default_registry.register("LEDGER", LedgerTask)
default_registry.register("TICKET_DWD", TicketDwdTask)
default_registry.register("MANUAL_INGEST", ManualIngestTask)
default_registry.register("PAYMENTS_DWD", PaymentsDwdTask)
default_registry.register("MEMBERS_DWD", MembersDwdTask)
for code, task_cls in ODS_TASK_CLASSES.items():
default_registry.register(code, task_cls)

663
etl_billiards/schema_v2.sql Normal file
View File

@@ -0,0 +1,663 @@
-- -*- coding: utf-8 -*-
-- Feiqiu-ETL schema (JSON-first alignment)
-- Updated: 2025-11-19 (Refactored for Manual Import & AI-Friendly DWS)
CREATE SCHEMA IF NOT EXISTS billiards;
CREATE SCHEMA IF NOT EXISTS billiards_ods;
CREATE SCHEMA IF NOT EXISTS billiards_dws;
CREATE SCHEMA IF NOT EXISTS etl_admin;
COMMENT ON SCHEMA billiards IS '门店业务数据 Schema存放维度/事实层(与 JSON 字段对应)';
COMMENT ON SCHEMA billiards_ods IS '原始数据层 (ODS),存放原始 JSON支持 API 抓取和手工导入';
COMMENT ON SCHEMA billiards_dws IS '数据汇总层 (DWS),存放面向 AI 分析的宽表视图';
COMMENT ON SCHEMA etl_admin IS 'ETL 调度、游标与运行记录 Schema';
-- =========================
-- 1. Billiards ODS tables
-- =========================
-- 1.1 Order & Settlement ODS
-- Corresponds to /order/list or manual export
CREATE TABLE IF NOT EXISTS billiards_ods.ods_order_settle (
store_id bigint NOT NULL,
order_settle_id bigint NOT NULL,
order_trade_no bigint,
page_no integer, -- Nullable for manual import
page_size integer, -- Nullable for manual import
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, order_settle_id)
);
COMMENT ON TABLE billiards_ods.ods_order_settle IS '订单/结算 ODS/order/list、ticket 接口原始 JSON';
-- 1.2 Ticket Detail ODS (NEW)
-- Corresponds to "小票详情.json" - Contains full nested details
CREATE TABLE IF NOT EXISTS billiards_ods.ods_ticket_detail (
store_id bigint NOT NULL,
order_settle_id bigint NOT NULL,
source_file varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, order_settle_id)
);
COMMENT ON TABLE billiards_ods.ods_ticket_detail IS '小票详情 ODS包含台费、商品、助教明细的完整 JSON';
-- 1.3 Table Usage ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_table_use_detail (
store_id bigint NOT NULL,
ledger_id bigint NOT NULL,
order_trade_no bigint,
order_settle_id bigint,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, ledger_id)
);
-- 1.4 Assistant Ledger ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_assistant_ledger (
store_id bigint NOT NULL,
ledger_id bigint NOT NULL,
order_trade_no bigint,
order_settle_id bigint,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, ledger_id)
);
-- 1.5 Assistant Abolish ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_assistant_abolish (
store_id bigint NOT NULL,
abolish_id bigint NOT NULL,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, abolish_id)
);
-- 1.6 Goods Ledger ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_goods_ledger (
store_id bigint NOT NULL,
order_goods_id bigint NOT NULL,
order_trade_no bigint,
order_settle_id bigint,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, order_goods_id)
);
-- 1.7 Payment ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_payment (
store_id bigint NOT NULL,
pay_id bigint NOT NULL,
relate_type varchar(50),
relate_id bigint,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, pay_id)
);
-- 1.8 Refund ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_refund (
store_id bigint NOT NULL,
refund_id bigint NOT NULL,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, refund_id)
);
-- 1.9 Coupon Verify ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_coupon_verify (
store_id bigint NOT NULL,
coupon_id bigint NOT NULL,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, coupon_id)
);
-- 1.10 Member ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_member (
store_id bigint NOT NULL,
member_id bigint NOT NULL,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, member_id)
);
-- 1.11 Member Card ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_member_card (
store_id bigint NOT NULL,
card_id bigint NOT NULL,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, card_id)
);
-- 1.12 Package Coupon ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_package_coupon (
store_id bigint NOT NULL,
package_id bigint NOT NULL,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, package_id)
);
-- 1.13 Inventory Stock ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_inventory_stock (
store_id bigint NOT NULL,
site_goods_id bigint NOT NULL,
snapshot_key varchar(100) NOT NULL DEFAULT 'default',
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, site_goods_id, snapshot_key)
);
-- 1.14 Inventory Change ODS
CREATE TABLE IF NOT EXISTS billiards_ods.ods_inventory_change (
store_id bigint NOT NULL,
change_id bigint NOT NULL,
page_no integer,
source_file varchar(255),
source_endpoint varchar(255),
fetched_at timestamptz NOT NULL DEFAULT now(),
payload jsonb NOT NULL,
PRIMARY KEY (store_id, change_id)
);
-- =========================
-- 2. Billiards Dimension Tables
-- =========================
CREATE TABLE IF NOT EXISTS billiards.dim_store (
store_id bigint PRIMARY KEY,
store_name varchar(200),
tenant_id bigint,
region_code varchar(30),
address varchar(500),
contact_name varchar(100),
contact_phone varchar(30),
created_time timestamptz,
updated_time timestamptz,
remark text,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb
);
CREATE TABLE IF NOT EXISTS billiards.dim_assistant (
store_id bigint NOT NULL,
assistant_id bigint NOT NULL,
assistant_no varchar(64),
nickname varchar(100),
real_name varchar(100),
gender varchar(20),
mobile varchar(30),
level varchar(50),
team_id bigint,
team_name varchar(100),
assistant_status varchar(30),
work_status varchar(30),
entry_time timestamptz,
resign_time timestamptz,
start_time timestamptz,
end_time timestamptz,
create_time timestamptz,
update_time timestamptz,
system_role_id bigint,
online_status varchar(30),
allow_cx integer,
charge_way varchar(30),
pd_unit_price numeric(14,2),
cx_unit_price numeric(14,2),
is_guaranteed integer,
is_team_leader integer,
serial_number varchar(64),
show_sort integer,
is_delete integer,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, assistant_id)
);
CREATE TABLE IF NOT EXISTS billiards.dim_member (
store_id bigint NOT NULL,
member_id bigint NOT NULL,
member_name varchar(100),
phone varchar(30),
balance numeric(18,4),
status varchar(30),
register_time timestamptz,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, member_id)
);
CREATE TABLE IF NOT EXISTS billiards.dim_package_coupon (
store_id bigint NOT NULL,
package_id bigint NOT NULL,
package_code varchar(100),
package_name varchar(200),
table_area_id bigint,
table_area_name varchar(100),
selling_price numeric(14,2),
duration_seconds integer,
start_time timestamptz,
end_time timestamptz,
type varchar(50),
is_enabled integer,
is_delete integer,
usable_count integer,
creator_name varchar(100),
date_type varchar(50),
group_type varchar(50),
coupon_money numeric(14,2),
area_tag_type varchar(50),
system_group_type varchar(50),
card_type_ids text,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, package_id)
);
CREATE TABLE IF NOT EXISTS billiards.dim_product (
store_id bigint NOT NULL,
product_id bigint NOT NULL,
site_product_id bigint,
product_name varchar(200) NOT NULL,
category_id bigint,
category_name varchar(100),
second_category_id bigint,
unit varchar(20),
cost_price numeric(14,4),
sale_price numeric(14,4),
allow_discount boolean,
status varchar(30),
supplier_id bigint,
barcode varchar(128),
is_combo boolean,
created_time timestamptz,
updated_time timestamptz,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, product_id)
);
CREATE TABLE IF NOT EXISTS billiards.dim_product_price_scd (
product_scd_id bigserial PRIMARY KEY,
store_id bigint NOT NULL,
product_id bigint NOT NULL,
product_name varchar(200),
category_id bigint,
category_name varchar(100),
second_category_id bigint,
cost_price numeric(14,4),
sale_price numeric(14,4),
allow_discount boolean,
status varchar(30),
valid_from timestamptz NOT NULL DEFAULT now(),
valid_to timestamptz,
is_current boolean NOT NULL DEFAULT true,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
CONSTRAINT fk_dpps_product FOREIGN KEY (store_id, product_id)
REFERENCES billiards.dim_product(store_id, product_id) ON DELETE CASCADE,
CONSTRAINT ck_dpps_range CHECK (
valid_from < COALESCE(valid_to, '9999-12-31 00:00:00+00'::timestamptz)
)
);
-- Create partial unique index for current records only
CREATE UNIQUE INDEX uq_dpps_current ON billiards.dim_product_price_scd (store_id, product_id) WHERE is_current;
CREATE VIEW billiards.dim_product_price_current AS
SELECT product_scd_id,
store_id,
product_id,
product_name,
category_id,
category_name,
second_category_id,
cost_price,
sale_price,
allow_discount,
status,
valid_from,
valid_to,
raw_data
FROM billiards.dim_product_price_scd
WHERE is_current;
CREATE TABLE IF NOT EXISTS billiards.dim_table (
store_id bigint NOT NULL,
table_id bigint NOT NULL,
site_id bigint,
area_id bigint,
area_name varchar(100),
table_name varchar(100) NOT NULL,
table_price numeric(14,4),
table_status varchar(30),
table_status_name varchar(50),
light_status integer,
is_rest_area integer,
show_status integer,
virtual_table integer,
charge_free integer,
only_allow_groupon integer,
is_online_reservation integer,
created_time timestamptz,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, table_id)
);
-- =========================
-- 3. Billiards Fact Tables
-- =========================
-- 3.1 Order Fact (Header)
CREATE TABLE IF NOT EXISTS billiards.fact_order (
store_id bigint NOT NULL,
order_settle_id bigint NOT NULL, -- Settle ID is the main key for payment
order_trade_no bigint, -- Can be one of many if merged, but usually 1-1 main
order_no varchar(100),
member_id bigint,
pay_time timestamptz,
total_amount numeric(14,4), -- Original price
pay_amount numeric(14,4), -- Actual paid
discount_amount numeric(14,4),
coupon_amount numeric(14,4),
status varchar(50),
cashier_name varchar(100),
remark text,
raw_data jsonb,
created_at timestamptz DEFAULT now(),
PRIMARY KEY (store_id, order_settle_id)
);
-- 3.2 Order Items (Goods)
CREATE TABLE IF NOT EXISTS billiards.fact_order_goods (
store_id bigint NOT NULL,
order_goods_id bigint NOT NULL, -- orderGoodsLedgerId
order_settle_id bigint NOT NULL,
order_trade_no bigint,
goods_id bigint,
goods_name varchar(200),
quantity numeric(10,2),
unit_price numeric(14,4),
total_amount numeric(14,4), -- quantity * price
pay_amount numeric(14,4), -- After discount
created_at timestamptz DEFAULT now(),
PRIMARY KEY (store_id, order_goods_id)
);
-- 3.3 Table Usage Fact
CREATE TABLE IF NOT EXISTS billiards.fact_table_usage (
store_id bigint NOT NULL,
order_ledger_id bigint NOT NULL, -- orderTableLedgerId
order_settle_id bigint NOT NULL,
table_id bigint,
table_name varchar(100),
start_time timestamptz,
end_time timestamptz,
duration_minutes integer,
total_amount numeric(14,4),
pay_amount numeric(14,4),
created_at timestamptz DEFAULT now(),
PRIMARY KEY (store_id, order_ledger_id)
);
-- 3.4 Assistant Service Fact
CREATE TABLE IF NOT EXISTS billiards.fact_assistant_service (
store_id bigint NOT NULL,
ledger_id bigint NOT NULL, -- orderAssistantLedgerId
order_settle_id bigint NOT NULL,
assistant_id bigint,
assistant_name varchar(100),
service_type varchar(50), -- e.g., "Play with"
start_time timestamptz,
end_time timestamptz,
duration_minutes integer,
total_amount numeric(14,4),
pay_amount numeric(14,4),
created_at timestamptz DEFAULT now(),
PRIMARY KEY (store_id, ledger_id)
);
-- 3.5 Payment Fact
CREATE TABLE IF NOT EXISTS billiards.fact_payment (
store_id bigint NOT NULL,
pay_id bigint NOT NULL,
site_id bigint,
tenant_id bigint,
order_settle_id bigint,
order_trade_no bigint,
relate_type varchar(50),
relate_id bigint,
create_time timestamptz,
pay_time timestamptz,
pay_amount numeric(14,4),
fee_amount numeric(14,4),
discount_amount numeric(14,4),
payment_method varchar(50), -- e.g., 'WeChat', 'Cash', 'Balance'
online_pay_channel varchar(50),
pay_terminal varchar(30),
pay_status varchar(30),
raw_data jsonb,
updated_at timestamptz DEFAULT now(),
PRIMARY KEY (store_id, pay_id)
);
-- 3.6 Legacy/Other Facts (Preserved)
CREATE TABLE IF NOT EXISTS billiards.fact_assistant_abolish (
store_id bigint NOT NULL,
abolish_id bigint NOT NULL,
table_id bigint,
table_name varchar(100),
table_area_id bigint,
table_area varchar(100),
assistant_no varchar(64),
assistant_name varchar(100),
charge_minutes integer,
abolish_amount numeric(14,4),
create_time timestamptz,
trash_reason text,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, abolish_id)
);
CREATE TABLE IF NOT EXISTS billiards.fact_assistant_ledger (
store_id bigint NOT NULL,
ledger_id bigint NOT NULL,
assistant_no varchar(64),
assistant_name varchar(100),
nickname varchar(100),
level_name varchar(50),
table_name varchar(100),
ledger_unit_price numeric(14,4),
ledger_count numeric(14,4),
ledger_amount numeric(14,4),
projected_income numeric(14,4),
service_money numeric(14,4),
member_discount_amount numeric(14,4),
manual_discount_amount numeric(14,4),
coupon_deduct_money numeric(14,4),
order_trade_no bigint,
order_settle_id bigint,
operator_id bigint,
operator_name varchar(100),
assistant_team_id bigint,
assistant_level varchar(50),
site_table_id bigint,
order_assistant_id bigint,
site_assistant_id bigint,
user_id bigint,
ledger_start_time timestamptz,
ledger_end_time timestamptz,
start_use_time timestamptz,
last_use_time timestamptz,
income_seconds integer,
real_use_seconds integer,
is_trash integer,
trash_reason text,
is_confirm integer,
ledger_status varchar(30),
create_time timestamptz,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, ledger_id)
);
CREATE TABLE IF NOT EXISTS billiards.fact_inventory_change (
store_id bigint NOT NULL,
change_id bigint NOT NULL,
site_goods_id bigint,
stock_type varchar(50),
goods_name varchar(200),
change_time timestamptz,
start_qty numeric(18,4),
end_qty numeric(18,4),
change_qty numeric(18,4),
unit varchar(20),
price numeric(14,4),
operator_name varchar(100),
remark text,
goods_category_id bigint,
goods_second_category_id bigint,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, change_id)
);
CREATE TABLE IF NOT EXISTS billiards.fact_refund (
store_id bigint NOT NULL,
refund_id bigint NOT NULL,
site_id bigint,
tenant_id bigint,
pay_amount numeric(14,4),
pay_status varchar(30),
pay_time timestamptz,
create_time timestamptz,
relate_type varchar(50),
relate_id bigint,
payment_method varchar(50),
refund_amount numeric(14,4),
refund_reason text,
raw_data jsonb NOT NULL DEFAULT '{}'::jsonb,
updated_at timestamptz NOT NULL DEFAULT now(),
PRIMARY KEY (store_id, refund_id)
);
-- =========================
-- 4. DWS Layer (Data Warehouse Summary)
-- Views for Analysis & AI
-- =========================
-- 4.1 Sales Detail View (The "AI-Friendly" Wide Table)
-- Unifies Goods, Table Fees, and Assistant Services into a single stream of "Sales Items"
CREATE OR REPLACE VIEW billiards_dws.dws_sales_detail AS
SELECT
'GOODS' as item_type,
g.store_id,
g.order_settle_id,
o.pay_time,
g.goods_name as item_name,
g.quantity,
g.pay_amount as amount,
o.cashier_name
FROM billiards.fact_order_goods g
JOIN billiards.fact_order o ON g.store_id = o.store_id AND g.order_settle_id = o.order_settle_id
UNION ALL
SELECT
'TABLE' as item_type,
t.store_id,
t.order_settle_id,
o.pay_time,
t.table_name || ' (' || t.duration_minutes || ' mins)' as item_name,
1 as quantity,
t.pay_amount as amount,
o.cashier_name
FROM billiards.fact_table_usage t
JOIN billiards.fact_order o ON t.store_id = o.store_id AND t.order_settle_id = o.order_settle_id
UNION ALL
SELECT
'ASSISTANT' as item_type,
a.store_id,
a.order_settle_id,
o.pay_time,
a.assistant_name || ' (' || a.duration_minutes || ' mins)' as item_name,
1 as quantity,
a.pay_amount as amount,
o.cashier_name
FROM billiards.fact_assistant_service a
JOIN billiards.fact_order o ON a.store_id = o.store_id AND a.order_settle_id = o.order_settle_id;
-- 4.2 Daily Revenue View
CREATE OR REPLACE VIEW billiards_dws.dws_daily_revenue AS
SELECT
store_id,
DATE(pay_time) as report_date,
COUNT(DISTINCT order_settle_id) as total_orders,
SUM(amount) as total_revenue,
SUM(amount) FILTER (WHERE item_type = 'GOODS') as goods_revenue,
SUM(amount) FILTER (WHERE item_type = 'TABLE') as table_revenue,
SUM(amount) FILTER (WHERE item_type = 'ASSISTANT') as assistant_revenue
FROM billiards_dws.dws_sales_detail
GROUP BY store_id, DATE(pay_time);
-- 4.3 Order Detail Wide View (For detailed inspection)
CREATE OR REPLACE VIEW billiards_dws.dws_order_detail AS
SELECT
o.store_id,
o.order_settle_id,
o.order_no,
o.pay_time,
o.total_amount,
o.pay_amount,
o.cashier_name,
-- Payment pivot (approximate, assumes simple mapping)
COALESCE(SUM(p.pay_amount) FILTER (WHERE p.payment_method = '1'), 0) AS pay_cash,
COALESCE(SUM(p.pay_amount) FILTER (WHERE p.payment_method = '2'), 0) AS pay_balance,
COALESCE(SUM(p.pay_amount) FILTER (WHERE p.payment_method = '4'), 0) AS pay_wechat,
-- Content summary
(SELECT string_agg(goods_name || 'x' || quantity, '; ') FROM billiards.fact_order_goods WHERE order_settle_id = o.order_settle_id) AS goods_summary,
(SELECT string_agg(assistant_name, '; ') FROM billiards.fact_assistant_service WHERE order_settle_id = o.order_settle_id) AS assistant_summary
FROM billiards.fact_order o
LEFT JOIN billiards.fact_payment p ON o.order_settle_id = p.order_settle_id
GROUP BY o.store_id, o.order_settle_id, o.order_no, o.pay_time, o.total_amount, o.pay_amount, o.cashier_name;

View File

@@ -0,0 +1,63 @@
# -*- coding: utf-8 -*-
"""Quick utility for validating PostgreSQL connectivity."""
from __future__ import annotations
import argparse
import os
import sys
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
if PROJECT_ROOT not in sys.path:
sys.path.insert(0, PROJECT_ROOT)
from database.connection import DatabaseConnection
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="PostgreSQL connectivity smoke test")
parser.add_argument("--dsn", help="Override TEST_DB_DSN / env value")
parser.add_argument(
"--query",
default="SELECT 1 AS ok",
help="Custom SQL to run after connection (default: SELECT 1 AS ok)",
)
parser.add_argument(
"--timeout",
type=int,
default=5,
help="connect_timeout seconds passed to psycopg2 (default: 5)",
)
return parser.parse_args()
def main() -> int:
args = parse_args()
dsn = args.dsn or os.environ.get("TEST_DB_DSN")
if not dsn:
print("❌ 未提供 DSN请通过 --dsn 或 TEST_DB_DSN 指定连接串", file=sys.stderr)
return 2
print(f"尝试连接: {dsn}")
try:
conn = DatabaseConnection(dsn, connect_timeout=args.timeout)
except Exception as exc: # pragma: no cover - diagnostic output
print("❌ 连接失败:", exc, file=sys.stderr)
return 1
try:
result = conn.query(args.query)
print("✅ 连接成功,查询结果:")
for row in result:
print(row)
conn.close()
return 0
except Exception as exc: # pragma: no cover - diagnostic output
print("⚠️ 连接成功但执行查询失败:", exc, file=sys.stderr)
try:
conn.close()
finally:
return 3
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -56,6 +56,7 @@ from typing import List
RUN_TESTS_SCRIPT = os.path.join(os.path.dirname(__file__), "run_tests.py")
# 默认自动运行的预置(可自定义顺序)
AUTO_RUN_PRESETS = ["offline_realdb"]
PRESETS = {
@@ -66,6 +67,15 @@ PRESETS = {
"pytest_args": "-vv",
"preset_meta": "在线模式,仅跑订单任务并输出详细日志",
},
"dbrun": {
"suite": ["integration"],
# "mode": "OFFLINE",
# "keyword": "ORDERS",
# "pytest_args": "-vv",
"preset_meta": "在线模式,仅跑订单任务并输出详细日志",
},
"offline_realdb": {
"suite": ["offline"],
"mode": "OFFLINE",

View File

@@ -0,0 +1,79 @@
# -*- coding: utf-8 -*-
"""DWD任务基类"""
import json
from typing import Any, Dict, Iterator, List, Optional, Tuple
from datetime import datetime
from .base_task import BaseTask
from models.parsers import TypeParser
class BaseDwdTask(BaseTask):
"""
DWD 层任务基类
负责从 ODS 表读取数据,供子类清洗和写入事实/维度表
"""
def _get_ods_cursor(self, task_code: str) -> datetime:
"""
获取上次处理的 ODS 数据的时间点 (fetched_at)
这里简化处理,实际应该从 etl_cursor 表读取
目前先依赖 BaseTask 的时间窗口逻辑,或者子类自己管理
"""
# TODO: 对接真正的 CursorManager
# 暂时返回一个较早的时间,或者由子类通过 _get_time_window 获取
return None
def iter_ods_rows(
self,
table_name: str,
columns: List[str],
start_time: datetime,
end_time: datetime,
time_col: str = "fetched_at",
batch_size: int = 1000
) -> Iterator[List[Dict[str, Any]]]:
"""
分批迭代读取 ODS 表数据
Args:
table_name: ODS 表名
columns: 需要查询的字段列表 (必须包含 payload)
start_time: 开始时间 (包含)
end_time: 结束时间 (包含)
time_col: 时间过滤字段,默认 fetched_at
batch_size: 批次大小
"""
offset = 0
cols_str = ", ".join(columns)
while True:
sql = f"""
SELECT {cols_str}
FROM {table_name}
WHERE {time_col} >= %s AND {time_col} <= %s
ORDER BY {time_col} ASC
LIMIT %s OFFSET %s
"""
rows = self.db.fetch_all(sql, (start_time, end_time, batch_size, offset))
if not rows:
break
yield [dict(row) for row in rows]
if len(rows) < batch_size:
break
offset += batch_size
def parse_payload(self, row: Dict[str, Any]) -> Dict[str, Any]:
"""
解析 ODS 行中的 payload JSON
"""
payload = row.get("payload")
if isinstance(payload, str):
return json.loads(payload)
elif isinstance(payload, dict):
return payload
return {}

View File

@@ -0,0 +1,176 @@
# -*- coding: utf-8 -*-
import os
import json
from datetime import datetime
from .base_task import BaseTask
from loaders.ods.generic import GenericODSLoader
class ManualIngestTask(BaseTask):
"""
Task to ingest manually fetched JSON files from a directory into ODS tables.
"""
FILE_MAPPING = {
"小票详情": "billiards_ods.ods_ticket_detail",
"结账记录": "billiards_ods.ods_order_settle",
"支付记录": "billiards_ods.ods_payment",
"助教流水": "billiards_ods.ods_assistant_ledger",
"助教废除": "billiards_ods.ods_assistant_abolish",
"商品档案": "billiards_ods.ods_goods_ledger", # Note: This might be dim_product source, but mapping to ledger for now if it's sales
"库存变化": "billiards_ods.ods_inventory_change",
"会员档案": "billiards_ods.ods_member",
"充值记录": "billiards_ods.ods_member_card", # Approx
"团购套餐": "billiards_ods.ods_package_coupon",
"库存汇总": "billiards_ods.ods_inventory_stock"
}
def get_task_code(self) -> str:
return "MANUAL_INGEST"
def execute(self) -> dict:
self.logger.info("Starting Manual Ingest Task")
# Configurable directory, default to tests/testdata_json for now
data_dir = self.config.get("manual.data_dir", r"c:\dev\LLTQ\ETL\feiqiu-ETL\etl_billiards\tests\testdata_json")
if not os.path.exists(data_dir):
self.logger.error(f"Data directory not found: {data_dir}")
return {"status": "error", "message": "Directory not found"}
total_files = 0
total_rows = 0
for filename in os.listdir(data_dir):
if not filename.endswith(".json"):
continue
# Determine target table
target_table = None
for key, table in self.FILE_MAPPING.items():
if key in filename:
target_table = table
break
if not target_table:
self.logger.warning(f"No mapping found for file: {filename}, skipping.")
continue
self.logger.info(f"Ingesting {filename} into {target_table}")
try:
with open(os.path.join(data_dir, filename), 'r', encoding='utf-8') as f:
data = json.load(f)
if not isinstance(data, list):
data = [data]
# Prepare rows for GenericODSLoader
# We need to adapt the data to what GenericODSLoader expects (or update it)
# GenericODSLoader expects dicts. It handles normalization.
# But we need to ensure the primary keys are present in the payload or extracted.
# The GenericODSLoader might need configuration for PK extraction if it's not standard.
# For now, let's assume the payload IS the row, and we wrap it.
# Actually, GenericODSLoader.upsert_rows expects the raw API result list.
# It calls _normalize_row.
# We need to make sure _normalize_row works for these files.
# Most files have 'id' or similar.
# Let's instantiate a loader for this table
# We need to know the PK for the table.
# This is usually defined in ODS_TASK_CLASSES but here we are dynamic.
# We might need a simpler loader or reuse GenericODSLoader with specific PK config.
# For simplicity, let's use a custom ingestion here that mimics GenericODSLoader but is file-aware.
rows_to_insert = []
for item in data:
# Extract Store ID (usually in siteProfile or data root)
store_id = self._extract_store_id(item) or self.config.get("app.store_id")
# Extract PK (id, orderSettleId, etc.)
pk_val = self._extract_pk(item, target_table)
if not pk_val:
# Try to find 'id' in the item
pk_val = item.get("id")
if not pk_val:
# Special case for Ticket Detail
if "ods_ticket_detail" in target_table:
pk_val = item.get("orderSettleId")
if not pk_val:
continue
row = {
"store_id": store_id,
"payload": json.dumps(item, ensure_ascii=False),
"source_file": filename,
"fetched_at": datetime.now()
}
# Add specific PK column
pk_col = self._get_pk_column(target_table)
row[pk_col] = pk_val
rows_to_insert.append(row)
if rows_to_insert:
self._bulk_insert(target_table, rows_to_insert)
total_rows += len(rows_to_insert)
total_files += 1
except Exception as e:
self.logger.error(f"Error processing {filename}: {e}", exc_info=True)
return {"status": "success", "files_processed": total_files, "rows_inserted": total_rows}
def _extract_store_id(self, item):
# Try common paths
if "store_id" in item: return item["store_id"]
if "siteProfile" in item and "id" in item["siteProfile"]: return item["siteProfile"]["id"]
if "data" in item and "data" in item["data"] and "siteId" in item["data"]["data"]: return item["data"]["data"]["siteId"]
return None
def _extract_pk(self, item, table):
# Helper to find PK based on table
if "ods_order_settle" in table:
# Check for nested structure in some files
if "settleList" in item and "settleList" in item["settleList"]:
return item["settleList"]["settleList"].get("id")
return item.get("id")
return item.get("id")
def _get_pk_column(self, table):
if "ods_ticket_detail" in table: return "order_settle_id"
if "ods_order_settle" in table: return "order_settle_id"
if "ods_payment" in table: return "pay_id"
if "ods_member" in table: return "member_id"
if "ods_assistant_ledger" in table: return "ledger_id"
if "ods_goods_ledger" in table: return "order_goods_id"
if "ods_inventory_change" in table: return "change_id"
if "ods_assistant_abolish" in table: return "abolish_id"
if "ods_coupon_verify" in table: return "coupon_id"
if "ods_member_card" in table: return "card_id"
if "ods_package_coupon" in table: return "package_id"
return "id" # Fallback
def _bulk_insert(self, table, rows):
if not rows: return
keys = list(rows[0].keys())
cols = ", ".join(keys)
vals = ", ".join([f"%({k})s" for k in keys])
# Determine PK col for conflict
pk_col = self._get_pk_column(table)
sql = f"""
INSERT INTO {table} ({cols})
VALUES ({vals})
ON CONFLICT (store_id, {pk_col}) DO UPDATE SET
payload = EXCLUDED.payload,
fetched_at = EXCLUDED.fetched_at,
source_file = EXCLUDED.source_file;
"""
self.db.batch_execute(sql, rows)

View File

@@ -0,0 +1,89 @@
# -*- coding: utf-8 -*-
from .base_dwd_task import BaseDwdTask
from loaders.dimensions.member import MemberLoader
from models.parsers import TypeParser
import json
class MembersDwdTask(BaseDwdTask):
"""
DWD Task: Process Member Records from ODS to Dimension Table
Source: billiards_ods.ods_member
Target: billiards.dim_member
"""
def get_task_code(self) -> str:
return "MEMBERS_DWD"
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
loader = MemberLoader(self.db)
store_id = self.config.get("app.store_id")
total_inserted = 0
total_updated = 0
total_errors = 0
# Iterate ODS Data
batches = self.iter_ods_rows(
table_name="billiards_ods.ods_member",
columns=["store_id", "member_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_member(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, updated, skipped = loader.upsert_members(parsed_rows, store_id)
total_inserted += inserted
total_updated += updated
self.db.commit()
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Updated: {total_updated}")
return {
"status": "success",
"inserted": total_inserted,
"updated": total_updated,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat()
}
def _parse_member(self, raw: dict, store_id: int) -> dict:
"""Parse ODS payload into Dim structure"""
try:
# Handle both API structure (camelCase) and manual structure
member_id = raw.get("id") or raw.get("memberId")
if not member_id:
return None
return {
"store_id": store_id,
"member_id": member_id,
"member_name": raw.get("name") or raw.get("memberName"),
"phone": raw.get("phone") or raw.get("mobile"),
"balance": raw.get("balance", 0),
"status": str(raw.get("status", "NORMAL")),
"register_time": raw.get("createTime") or raw.get("registerTime"),
"raw_data": json.dumps(raw, ensure_ascii=False)
}
except Exception as e:
self.logger.warning(f"Error parsing member: {e}")
return None

View File

@@ -0,0 +1,400 @@
# -*- coding: utf-8 -*-
"""ODS ingestion tasks."""
from __future__ import annotations
from dataclasses import dataclass, field
from datetime import datetime
from typing import Any, Callable, Dict, Iterable, List, Sequence, Tuple, Type
from loaders.ods import GenericODSLoader
from models.parsers import TypeParser
from .base_task import BaseTask
ColumnTransform = Callable[[Any], Any]
@dataclass(frozen=True)
class ColumnSpec:
"""Mapping between DB column and source JSON field."""
column: str
sources: Tuple[str, ...] = ()
required: bool = False
default: Any = None
transform: ColumnTransform | None = None
@dataclass(frozen=True)
class OdsTaskSpec:
"""Definition of a single ODS ingestion task."""
code: str
class_name: str
table_name: str
endpoint: str
data_path: Tuple[str, ...] = ("data",)
list_key: str | None = None
pk_columns: Tuple[ColumnSpec, ...] = ()
extra_columns: Tuple[ColumnSpec, ...] = ()
include_page_size: bool = False
include_page_no: bool = True
include_source_file: bool = True
include_source_endpoint: bool = True
requires_window: bool = True
description: str = ""
extra_params: Dict[str, Any] = field(default_factory=dict)
class BaseOdsTask(BaseTask):
"""Shared functionality for ODS ingestion tasks."""
SPEC: OdsTaskSpec
def get_task_code(self) -> str:
return self.SPEC.code
def execute(self) -> dict:
spec = self.SPEC
self.logger.info("开始执行 %s (ODS)", spec.code)
store_id = TypeParser.parse_int(self.config.get("app.store_id"))
if not store_id:
raise ValueError("app.store_id 未配置,无法执行 ODS 任务")
page_size = self.config.get("api.page_size", 200)
params = self._build_params(spec, store_id)
columns = self._resolve_columns(spec)
conflict_columns = ["store_id"] + [col.column for col in spec.pk_columns]
loader = GenericODSLoader(
self.db,
spec.table_name,
columns,
conflict_columns,
)
counts = {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}
source_file = self._resolve_source_file_hint(spec)
try:
for page_no, page_records, _, _ in self.api.iter_paginated(
endpoint=spec.endpoint,
params=params,
page_size=page_size,
data_path=spec.data_path,
list_key=spec.list_key,
):
rows: List[dict] = []
for raw in page_records:
row = self._build_row(
spec=spec,
store_id=store_id,
record=raw,
page_no=page_no if spec.include_page_no else None,
page_size_value=len(page_records)
if spec.include_page_size
else None,
source_file=source_file,
)
if row is None:
counts["skipped"] += 1
continue
rows.append(row)
inserted, updated, _ = loader.upsert_rows(rows)
counts["inserted"] += inserted
counts["updated"] += updated
counts["fetched"] += len(page_records)
self.db.commit()
self.logger.info("%s ODS 任务完成: %s", spec.code, counts)
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
counts["errors"] += 1
self.logger.error("%s ODS 任务失败", spec.code, exc_info=True)
raise
def _build_params(self, spec: OdsTaskSpec, store_id: int) -> dict:
params: dict[str, Any] = {"storeId": store_id}
params.update(spec.extra_params)
if spec.requires_window:
window_start, window_end, _ = self._get_time_window()
params["startTime"] = TypeParser.format_timestamp(window_start, self.tz)
params["endTime"] = TypeParser.format_timestamp(window_end, self.tz)
return params
def _resolve_columns(self, spec: OdsTaskSpec) -> List[str]:
columns: List[str] = ["store_id"]
seen = set(columns)
for col_spec in list(spec.pk_columns) + list(spec.extra_columns):
if col_spec.column not in seen:
columns.append(col_spec.column)
seen.add(col_spec.column)
if spec.include_page_no and "page_no" not in seen:
columns.append("page_no")
seen.add("page_no")
if spec.include_page_size and "page_size" not in seen:
columns.append("page_size")
seen.add("page_size")
if spec.include_source_file and "source_file" not in seen:
columns.append("source_file")
seen.add("source_file")
if spec.include_source_endpoint and "source_endpoint" not in seen:
columns.append("source_endpoint")
seen.add("source_endpoint")
if "fetched_at" not in seen:
columns.append("fetched_at")
seen.add("fetched_at")
if "payload" not in seen:
columns.append("payload")
return columns
def _build_row(
self,
spec: OdsTaskSpec,
store_id: int,
record: dict,
page_no: int | None,
page_size_value: int | None,
source_file: str | None,
) -> dict | None:
row: dict[str, Any] = {"store_id": store_id}
for col_spec in spec.pk_columns + spec.extra_columns:
value = self._extract_value(record, col_spec)
if value is None and col_spec.required:
self.logger.warning(
"%s 缺少必填字段 %s,原始记录: %s",
spec.code,
col_spec.column,
record,
)
return None
row[col_spec.column] = value
if spec.include_page_no:
row["page_no"] = page_no
if spec.include_page_size:
row["page_size"] = page_size_value
if spec.include_source_file:
row["source_file"] = source_file
if spec.include_source_endpoint:
row["source_endpoint"] = spec.endpoint
row["fetched_at"] = datetime.now(self.tz)
row["payload"] = record
return row
def _extract_value(self, record: dict, spec: ColumnSpec):
value = None
for key in spec.sources:
value = self._dig(record, key)
if value is not None:
break
if value is None and spec.default is not None:
value = spec.default
if value is not None and spec.transform:
value = spec.transform(value)
return value
@staticmethod
def _dig(record: Any, path: str | None):
if not path:
return None
current = record
for part in path.split("."):
if isinstance(current, dict):
current = current.get(part)
else:
return None
return current
def _resolve_source_file_hint(self, spec: OdsTaskSpec) -> str | None:
resolver = getattr(self.api, "get_source_hint", None)
if callable(resolver):
return resolver(spec.endpoint)
return None
def _int_col(name: str, *sources: str, required: bool = False) -> ColumnSpec:
return ColumnSpec(
column=name,
sources=sources,
required=required,
transform=TypeParser.parse_int,
)
ODS_TASK_SPECS: Tuple[OdsTaskSpec, ...] = (
OdsTaskSpec(
code="ODS_ORDER_SETTLE",
class_name="OdsOrderSettleTask",
table_name="billiards_ods.ods_order_settle",
endpoint="/order/list",
data_path=("data",),
pk_columns=(_int_col("order_settle_id", "orderSettleId", "order_settle_id", "id", required=True),),
extra_columns=(_int_col("order_trade_no", "orderTradeNo", "order_trade_no"),),
include_page_size=True,
description="订单/结算 ODS 原始记录",
),
OdsTaskSpec(
code="ODS_TABLE_USE",
class_name="OdsTableUseTask",
table_name="billiards_ods.ods_table_use_detail",
endpoint="/Table/UseDetailList",
data_path=("data", "siteTableUseDetailsList"),
pk_columns=(_int_col("ledger_id", "id", required=True),),
extra_columns=(
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
),
description="台费/开台流水 ODS",
),
OdsTaskSpec(
code="ODS_ASSISTANT_LEDGER",
class_name="OdsAssistantLedgerTask",
table_name="billiards_ods.ods_assistant_ledger",
endpoint="/Assistant/LedgerList",
data_path=("data", "orderAssistantDetails"),
pk_columns=(_int_col("ledger_id", "id", required=True),),
extra_columns=(
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
),
description="助教流水 ODS",
),
OdsTaskSpec(
code="ODS_ASSISTANT_ABOLISH",
class_name="OdsAssistantAbolishTask",
table_name="billiards_ods.ods_assistant_abolish",
endpoint="/Assistant/AbolishList",
data_path=("data", "abolitionAssistants"),
pk_columns=(_int_col("abolish_id", "id", required=True),),
description="助教作废记录 ODS",
),
OdsTaskSpec(
code="ODS_GOODS_LEDGER",
class_name="OdsGoodsLedgerTask",
table_name="billiards_ods.ods_goods_ledger",
endpoint="/Order/GoodsLedgerList",
data_path=("data", "orderGoodsLedgers"),
pk_columns=(_int_col("order_goods_id", "orderGoodsId", "id", required=True),),
extra_columns=(
_int_col("order_trade_no", "order_trade_no", "orderTradeNo"),
_int_col("order_settle_id", "order_settle_id", "orderSettleId"),
),
description="商品销售流水 ODS",
),
OdsTaskSpec(
code="ODS_PAYMENT",
class_name="OdsPaymentTask",
table_name="billiards_ods.ods_payment",
endpoint="/pay/records",
data_path=("data",),
pk_columns=(_int_col("pay_id", "payId", "id", required=True),),
extra_columns=(
ColumnSpec(column="relate_type", sources=("relate_type", "relateType")),
_int_col("relate_id", "relate_id", "relateId"),
),
include_page_size=False,
description="支付流水 ODS",
),
OdsTaskSpec(
code="ODS_REFUND",
class_name="OdsRefundTask",
table_name="billiards_ods.ods_refund",
endpoint="/Pay/RefundList",
data_path=(),
pk_columns=(_int_col("refund_id", "id", required=True),),
extra_columns=(
ColumnSpec(column="relate_type", sources=("relate_type", "relateType")),
_int_col("relate_id", "relate_id", "relateId"),
),
description="退款流水 ODS",
),
OdsTaskSpec(
code="ODS_COUPON_VERIFY",
class_name="OdsCouponVerifyTask",
table_name="billiards_ods.ods_coupon_verify",
endpoint="/Coupon/UsageList",
data_path=(),
pk_columns=(_int_col("coupon_id", "id", "couponId", required=True),),
description="平台验券/团购流水 ODS",
),
OdsTaskSpec(
code="ODS_MEMBER",
class_name="OdsMemberTask",
table_name="billiards_ods.ods_member",
endpoint="/MemberProfile/GetTenantMemberList",
data_path=("data",),
pk_columns=(_int_col("member_id", "memberId", required=True),),
requires_window=False,
description="会员档案 ODS",
),
OdsTaskSpec(
code="ODS_MEMBER_CARD",
class_name="OdsMemberCardTask",
table_name="billiards_ods.ods_member_card",
endpoint="/MemberCard/List",
data_path=("data", "tenantMemberCards"),
pk_columns=(_int_col("card_id", "tenantMemberCardId", "cardId", required=True),),
requires_window=False,
description="会员卡/储值卡 ODS",
),
OdsTaskSpec(
code="ODS_PACKAGE",
class_name="OdsPackageTask",
table_name="billiards_ods.ods_package_coupon",
endpoint="/Package/List",
data_path=("data", "packageCouponList"),
pk_columns=(_int_col("package_id", "id", "packageId", required=True),),
requires_window=False,
description="团购/套餐定义 ODS",
),
OdsTaskSpec(
code="ODS_INVENTORY_STOCK",
class_name="OdsInventoryStockTask",
table_name="billiards_ods.ods_inventory_stock",
endpoint="/Inventory/StockSummary",
data_path=(),
pk_columns=(
_int_col("site_goods_id", "siteGoodsId", required=True),
ColumnSpec(column="snapshot_key", default="default", required=True),
),
requires_window=False,
description="库存汇总 ODS",
),
OdsTaskSpec(
code="ODS_INVENTORY_CHANGE",
class_name="OdsInventoryChangeTask",
table_name="billiards_ods.ods_inventory_change",
endpoint="/Inventory/ChangeList",
data_path=("data", "queryDeliveryRecordsList"),
pk_columns=(_int_col("change_id", "siteGoodsStockId", "id", required=True),),
description="库存变动 ODS",
),
)
def _build_task_class(spec: OdsTaskSpec) -> Type[BaseOdsTask]:
attrs = {
"SPEC": spec,
"__doc__": spec.description or f"ODS ingestion task {spec.code}",
"__module__": __name__,
}
return type(spec.class_name, (BaseOdsTask,), attrs)
ODS_TASK_CLASSES: Dict[str, Type[BaseOdsTask]] = {
spec.code: _build_task_class(spec) for spec in ODS_TASK_SPECS
}
__all__ = ["ODS_TASK_CLASSES", "ODS_TASK_SPECS", "BaseOdsTask"]

View File

@@ -0,0 +1,124 @@
# -*- coding: utf-8 -*-
from .base_dwd_task import BaseDwdTask
from loaders.facts.payment import PaymentLoader
from models.parsers import TypeParser
import json
class PaymentsDwdTask(BaseDwdTask):
"""
DWD Task: Process Payment Records from ODS to Fact Table
Source: billiards_ods.ods_payment
Target: billiards.fact_payment
"""
def get_task_code(self) -> str:
return "PAYMENTS_DWD"
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
loader = PaymentLoader(self.db)
store_id = self.config.get("app.store_id")
total_inserted = 0
total_errors = 0
# Iterate ODS Data
batches = self.iter_ods_rows(
table_name="billiards_ods.ods_payment",
columns=["store_id", "pay_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
parsed_rows = []
for row in batch:
payload = self.parse_payload(row)
if not payload:
continue
parsed = self._parse_payment(payload, store_id)
if parsed:
parsed_rows.append(parsed)
if parsed_rows:
inserted, errors = loader.upsert_payments(parsed_rows, store_id)
total_inserted += inserted
total_errors += errors
self.db.commit()
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Errors: {total_errors}")
return {
"status": "success",
"inserted": total_inserted,
"errors": total_errors,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat()
}
def _parse_payment(self, raw: dict, store_id: int) -> dict:
"""Parse ODS payload into Fact structure"""
try:
pay_id = TypeParser.parse_int(raw.get("payId") or raw.get("id"))
if not pay_id:
return None
relate_type = str(raw.get("relateType") or raw.get("relate_type") or "")
relate_id = TypeParser.parse_int(raw.get("relateId") or raw.get("relate_id"))
# Attempt to populate settlement / trade identifiers
order_settle_id = TypeParser.parse_int(
raw.get("orderSettleId") or raw.get("order_settle_id")
)
order_trade_no = TypeParser.parse_int(
raw.get("orderTradeNo") or raw.get("order_trade_no")
)
if relate_type in {"1", "SETTLE", "ORDER"}:
order_settle_id = order_settle_id or relate_id
return {
"store_id": store_id,
"pay_id": pay_id,
"order_settle_id": order_settle_id,
"order_trade_no": order_trade_no,
"relate_type": relate_type,
"relate_id": relate_id,
"site_id": TypeParser.parse_int(
raw.get("siteId") or raw.get("site_id") or store_id
),
"tenant_id": TypeParser.parse_int(raw.get("tenantId") or raw.get("tenant_id")),
"create_time": TypeParser.parse_timestamp(
raw.get("createTime") or raw.get("create_time"), self.tz
),
"pay_time": TypeParser.parse_timestamp(raw.get("payTime"), self.tz),
"pay_amount": TypeParser.parse_decimal(raw.get("payAmount")),
"fee_amount": TypeParser.parse_decimal(
raw.get("feeAmount")
or raw.get("serviceFee")
or raw.get("channelFee")
or raw.get("fee_amount")
),
"discount_amount": TypeParser.parse_decimal(
raw.get("discountAmount")
or raw.get("couponAmount")
or raw.get("discount_amount")
),
"payment_method": str(raw.get("paymentMethod") or raw.get("payment_method") or ""),
"online_pay_channel": raw.get("onlinePayChannel") or raw.get("online_pay_channel"),
"pay_terminal": raw.get("payTerminal") or raw.get("pay_terminal"),
"pay_status": str(raw.get("payStatus") or raw.get("pay_status") or ""),
"raw_data": json.dumps(raw, ensure_ascii=False)
}
except Exception as e:
self.logger.warning(f"Error parsing payment: {e}")
return None

View File

@@ -62,16 +62,42 @@ class PaymentsTask(BaseTask):
def _parse_payment(self, raw: dict) -> dict:
"""解析支付记录"""
try:
store_id = self.config.get("app.store_id")
return {
"store_id": self.config.get("app.store_id"),
"pay_id": TypeParser.parse_int(raw.get("payId")),
"store_id": store_id,
"pay_id": TypeParser.parse_int(raw.get("payId") or raw.get("id")),
"order_id": TypeParser.parse_int(raw.get("orderId")),
"order_settle_id": TypeParser.parse_int(
raw.get("orderSettleId") or raw.get("order_settle_id")
),
"order_trade_no": TypeParser.parse_int(
raw.get("orderTradeNo") or raw.get("order_trade_no")
),
"relate_type": raw.get("relateType") or raw.get("relate_type"),
"relate_id": TypeParser.parse_int(raw.get("relateId") or raw.get("relate_id")),
"site_id": TypeParser.parse_int(raw.get("siteId") or raw.get("site_id") or store_id),
"tenant_id": TypeParser.parse_int(raw.get("tenantId") or raw.get("tenant_id")),
"pay_time": TypeParser.parse_timestamp(raw.get("payTime"), self.tz),
"create_time": TypeParser.parse_timestamp(
raw.get("createTime") or raw.get("create_time"), self.tz
),
"pay_amount": TypeParser.parse_decimal(raw.get("payAmount")),
"fee_amount": TypeParser.parse_decimal(
raw.get("feeAmount")
or raw.get("serviceFee")
or raw.get("channelFee")
or raw.get("fee_amount")
),
"discount_amount": TypeParser.parse_decimal(
raw.get("discountAmount") or raw.get("couponAmount") or raw.get("discount_amount")
),
"pay_type": raw.get("payType"),
"payment_method": raw.get("paymentMethod") or raw.get("payment_method"),
"online_pay_channel": raw.get("onlinePayChannel") or raw.get("online_pay_channel"),
"pay_status": raw.get("payStatus"),
"pay_terminal": raw.get("payTerminal") or raw.get("pay_terminal"),
"remark": raw.get("remark"),
"raw_data": json.dumps(raw, ensure_ascii=False)
"raw_data": json.dumps(raw, ensure_ascii=False),
}
except Exception as e:
self.logger.warning(f"解析支付记录失败: {e}, 原始数据: {raw}")

View File

@@ -0,0 +1,69 @@
# -*- coding: utf-8 -*-
from .base_dwd_task import BaseDwdTask
from loaders.facts.ticket import TicketLoader
class TicketDwdTask(BaseDwdTask):
"""
DWD Task: Process Ticket Details from ODS to Fact Tables
Source: billiards_ods.ods_ticket_detail
Targets:
- billiards.fact_order
- billiards.fact_order_goods
- billiards.fact_table_usage
- billiards.fact_assistant_service
"""
def get_task_code(self) -> str:
return "TICKET_DWD"
def execute(self) -> dict:
self.logger.info(f"Starting {self.get_task_code()} task")
# 1. Get Time Window (Incremental Load)
window_start, window_end, _ = self._get_time_window()
self.logger.info(f"Processing window: {window_start} to {window_end}")
# 2. Initialize Loader
loader = TicketLoader(self.db)
store_id = self.config.get("app.store_id")
total_inserted = 0
total_errors = 0
# 3. Iterate ODS Data
# We query ods_ticket_detail based on fetched_at
batches = self.iter_ods_rows(
table_name="billiards_ods.ods_ticket_detail",
columns=["store_id", "order_settle_id", "payload", "fetched_at"],
start_time=window_start,
end_time=window_end
)
for batch in batches:
if not batch:
continue
# Extract payloads
tickets = []
for row in batch:
payload = self.parse_payload(row)
if payload:
tickets.append(payload)
# Process Batch
inserted, errors = loader.process_tickets(tickets, store_id)
total_inserted += inserted
total_errors += errors
# 4. Commit
self.db.commit()
self.logger.info(f"Task {self.get_task_code()} completed. Inserted: {total_inserted}, Errors: {total_errors}")
return {
"status": "success",
"inserted": total_inserted,
"errors": total_errors,
"window_start": window_start.isoformat(),
"window_end": window_end.isoformat()
}

View File

@@ -0,0 +1,144 @@
==========================================================================================
20251110_034959_助教流水.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'orderAssistantDetails']
list orderAssistantDetails len 100, elem type dict, keys ['assistantNo', 'nickname', 'levelName', 'assistantName', 'tableName', 'siteProfile', 'skillName', 'id', 'order_trade_no', 'site_id']
==========================================================================================
20251110_035004_助教废除.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'abolitionAssistants']
list abolitionAssistants len 15, elem type dict, keys ['siteProfile', 'createTime', 'id', 'siteId', 'tableAreaId', 'tableId', 'tableArea', 'tableName', 'assistantOn', 'assistantName']
==========================================================================================
20251110_035011_台费流水.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'siteTableUseDetailsList']
list siteTableUseDetailsList len 100, elem type dict, keys ['siteProfile', 'id', 'order_trade_no', 'site_id', 'tenant_id', 'member_id', 'operator_id', 'operator_name', 'order_settle_id', 'ledger_unit_price']
==========================================================================================
20251110_035904_小票详情.json
root list len 193
sample keys ['orderSettleId', 'data']
data keys ['data', 'code']
dict data keys ['tenantId', 'siteId', 'orderSettleId', 'orderSettleNumber', 'assistantManualDiscount', 'siteName', 'tenantName', 'siteAddress', 'siteBusinessTel', 'ticketRemark']
==========================================================================================
20251110_035908_台费打折.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'taiFeeAdjustInfos']
list taiFeeAdjustInfos len 100, elem type dict, keys ['tableProfile', 'siteProfile', 'id', 'adjust_type', 'applicant_id', 'applicant_name', 'create_time', 'is_delete', 'ledger_amount', 'ledger_count']
==========================================================================================
20251110_035916_结账记录.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'settleList']
list settleList len 100, elem type dict, keys ['siteProfile', 'settleList']
==========================================================================================
20251110_035923_支付记录.json
root list len 200
sample keys ['siteProfile', 'create_time', 'pay_amount', 'pay_status', 'pay_time', 'online_pay_channel', 'relate_type', 'relate_id', 'site_id', 'id', 'payment_method']
dict siteProfile keys ['id', 'org_id', 'shop_name', 'avatar', 'business_tel', 'full_address', 'address', 'longitude', 'latitude', 'tenant_site_region_id']
==========================================================================================
20251110_035929_退款记录.json
root list len 11
sample keys ['tenantName', 'siteProfile', 'id', 'site_id', 'tenant_id', 'pay_sn', 'pay_amount', 'pay_status', 'pay_time', 'create_time', 'relate_type', 'relate_id', 'is_revoke', 'is_delete', 'online_pay_channel', 'payment_method', 'balance_frozen_amount', 'card_frozen_amount', 'member_id', 'member_card_id']
dict siteProfile keys ['id', 'org_id', 'shop_name', 'avatar', 'business_tel', 'full_address', 'address', 'longitude', 'latitude', 'tenant_site_region_id']
==========================================================================================
20251110_035934_平台验券记录.json
root list len 200
sample keys ['siteProfile', 'id', 'tenant_id', 'site_id', 'sale_price', 'coupon_code', 'coupon_channel', 'site_order_id', 'coupon_free_time', 'use_status', 'create_time', 'is_delete', 'coupon_name', 'coupon_cover', 'coupon_remark', 'channel_deal_id', 'group_package_id', 'consume_time', 'groupon_type', 'coupon_money']
dict siteProfile keys ['id', 'org_id', 'shop_name', 'avatar', 'business_tel', 'full_address', 'address', 'longitude', 'latitude', 'tenant_site_region_id']
==========================================================================================
20251110_035941_商品档案.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'tenantGoodsList']
list tenantGoodsList len 100, elem type dict, keys ['categoryName', 'isInSite', 'commodityCode', 'id', 'tenant_id', 'goods_name', 'goods_cover', 'goods_state', 'goods_category_id', 'unit']
==========================================================================================
20251110_035948_门店销售记录.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'orderGoodsLedgers']
list orderGoodsLedgers len 100, elem type dict, keys ['siteId', 'siteName', 'orderGoodsId', 'openSalesman', 'id', 'order_trade_no', 'site_id', 'tenant_id', 'operator_id', 'operator_name']
==========================================================================================
20251110_043159_库存变化记录1.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'queryDeliveryRecordsList']
list queryDeliveryRecordsList len 100, elem type dict, keys ['siteGoodsStockId', 'siteGoodsId', 'siteId', 'tenantId', 'stockType', 'goodsName', 'createTime', 'startNum', 'endNum', 'changeNum']
==========================================================================================
20251110_043204_库存变化记录2.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'goodsCategoryList']
list goodsCategoryList len 9, elem type dict, keys ['id', 'tenant_id', 'category_name', 'alias_name', 'pid', 'business_name', 'tenant_goods_business_id', 'open_salesman', 'categoryBoxes', 'sort']
==========================================================================================
20251110_043209_会员档案.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'tenantMemberInfos']
list tenantMemberInfos len 100, elem type dict, keys ['id', 'create_time', 'member_card_grade_code', 'mobile', 'nickname', 'register_site_id', 'site_name', 'member_card_grade_name', 'system_member_id', 'tenant_id']
==========================================================================================
20251110_043217_余额变更记录.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'tenantMemberCardLogs']
list tenantMemberCardLogs len 100, elem type dict, keys ['memberCardTypeName', 'paySiteName', 'registerSiteName', 'memberName', 'memberMobile', 'id', 'account_data', 'after', 'before', 'card_type_id']
==========================================================================================
20251110_043223_储值卡列表.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'totalOther', 'tenantMemberCards']
list tenantMemberCards len 100, elem type dict, keys ['site_name', 'member_name', 'member_mobile', 'member_card_type_name', 'table_service_discount', 'assistant_service_discount', 'coupon_discount', 'goods_service_discount', 'is_allow_give', 'able_cross_site']
==========================================================================================
20251110_043231_充值记录.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'settleList']
list settleList len 74, elem type dict, keys ['siteProfile', 'settleList']
==========================================================================================
20251110_043237_助教账号1.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'assistantInfos']
list assistantInfos len 50, elem type dict, keys ['job_num', 'shop_name', 'group_id', 'group_name', 'staff_profile_id', 'ding_talk_synced', 'entry_type', 'team_name', 'entry_sign_status', 'resign_sign_status']
==========================================================================================
20251110_043243_助教账号2.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'assistantInfos']
list assistantInfos len 50, elem type dict, keys ['job_num', 'shop_name', 'group_id', 'group_name', 'staff_profile_id', 'ding_talk_synced', 'entry_type', 'team_name', 'entry_sign_status', 'resign_sign_status']
==========================================================================================
20251110_043250_台桌列表.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'siteTables']
list siteTables len 71, elem type dict, keys ['id', 'audit_status', 'charge_free', 'self_table', 'create_time', 'is_rest_area', 'light_status', 'show_status', 'site_id', 'site_table_area_id']
==========================================================================================
20251110_043255_团购套餐.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'packageCouponList']
list packageCouponList len 17, elem type dict, keys ['site_name', 'effective_status', 'id', 'site_id', 'tenant_id', 'package_name', 'table_area_id', 'table_area_name', 'selling_price', 'duration']
==========================================================================================
20251110_043302_团购套餐流水.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'couponAmountSum', 'siteTableUseDetailsList']
list siteTableUseDetailsList len 100, elem type dict, keys ['tableName', 'tableAreaName', 'siteName', 'goodsOptionPrice', 'id', 'order_trade_no', 'table_id', 'site_id', 'tenant_id', 'operator_id']
==========================================================================================
20251110_043308_库存汇总.json
root list len 161
sample keys ['siteGoodsId', 'goodsName', 'goodsUnit', 'goodsCategoryId', 'goodsCategorySecondId', 'rangeStartStock', 'rangeEndStock', 'rangeIn', 'rangeOut', 'rangeInventory', 'rangeSale', 'rangeSaleMoney', 'currentStock', 'categoryName']
==========================================================================================
20251110_051132_门店商品档案1.json
root list len 2
sample keys ['data', 'code']
data keys ['total', 'orderGoodsList']
list orderGoodsList len 100, elem type dict, keys ['siteName', 'oneCategoryName', 'twoCategoryName', 'id', 'tenant_goods_id', 'site_id', 'tenant_id', 'goods_name', 'goods_cover', 'goods_state']
==========================================================================================
20251110_051138_门店商品档案2.json
root list len 2
sample keys ['data', 'code']
data keys ['goodsStockA', 'goodsStockB', 'goodsSaleNum', 'stockSumMoney']

View File

@@ -135,16 +135,41 @@ class FakeDBOperations:
def __init__(self):
self.upserts: List[Dict] = []
self.executes: List[Dict] = []
self.commits = 0
self.rollbacks = 0
self.conn = FakeConnection()
def batch_upsert_with_returning(self, sql: str, rows: List[Dict], page_size: int = 1000):
self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size})
self.upserts.append(
{
"sql": sql.strip(),
"count": len(rows),
"page_size": page_size,
"rows": [dict(row) for row in rows],
}
)
return len(rows), 0
def batch_execute(self, sql: str, rows: List[Dict], page_size: int = 1000):
self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size})
self.executes.append(
{
"sql": sql.strip(),
"count": len(rows),
"page_size": page_size,
"rows": [dict(row) for row in rows],
}
)
def execute(self, sql: str, params=None):
self.executes.append({"sql": sql.strip(), "params": params})
def query(self, sql: str, params=None):
self.executes.append({"sql": sql.strip(), "params": params, "type": "query"})
return []
def cursor(self):
return self.conn.cursor()
def commit(self):
self.commits += 1
@@ -161,22 +186,53 @@ class FakeAPIClient:
self.calls: List[Dict] = []
# pylint: disable=unused-argument
def get_paginated(self, endpoint: str, params=None, **kwargs):
def iter_paginated(
self,
endpoint: str,
params=None,
page_size: int = 200,
page_field: str = "pageIndex",
size_field: str = "pageSize",
data_path: Tuple[str, ...] = (),
list_key: str | None = None,
):
self.calls.append({"endpoint": endpoint, "params": params})
if endpoint not in self.data_map:
raise AssertionError(f"Missing fixture for endpoint {endpoint}")
return list(self.data_map[endpoint]), [{"page": 1, "size": len(self.data_map[endpoint])}]
records = list(self.data_map[endpoint])
yield 1, records, dict(params or {}), {"data": records}
def get_paginated(self, endpoint: str, params=None, **kwargs):
records = []
pages = []
for page_no, page_records, req, resp in self.iter_paginated(endpoint, params, **kwargs):
records.extend(page_records)
pages.append({"page": page_no, "request": req, "response": resp})
return records, pages
def get_source_hint(self, endpoint: str) -> str | None:
return None
class OfflineAPIClient:
"""离线模式专用 API Client根据 endpoint 读取归档 JSON、套 data_path 并回放列表数据。"""
"""离线模式专用 API Client根据 endpoint 读取归档 JSON、套 data_path 并回放列表数据。"""
def __init__(self, file_map: Dict[str, Path]):
self.file_map = {k: Path(v) for k, v in file_map.items()}
self.calls: List[Dict] = []
# pylint: disable=unused-argument
def get_paginated(self, endpoint: str, params=None, page_size: int = 200, data_path: Tuple[str, ...] = (), **kwargs):
def iter_paginated(
self,
endpoint: str,
params=None,
page_size: int = 200,
page_field: str = "pageIndex",
size_field: str = "pageSize",
data_path: Tuple[str, ...] = (),
list_key: str | None = None,
):
self.calls.append({"endpoint": endpoint, "params": params})
if endpoint not in self.file_map:
raise AssertionError(f"Missing archive for endpoint {endpoint}")
@@ -188,17 +244,42 @@ class OfflineAPIClient:
for key in data_path:
if isinstance(data, dict):
data = data.get(key, [])
else:
data = []
break
if list_key and isinstance(data, dict):
data = data.get(list_key, [])
if not isinstance(data, list):
data = []
return data, [{"page": 1, "mode": "offline"}]
total = len(data)
start = 0
page = 1
while start < total or (start == 0 and total == 0):
chunk = data[start : start + page_size]
if not chunk and total != 0:
break
yield page, list(chunk), dict(params or {}), payload
if len(chunk) < page_size:
break
start += page_size
page += 1
def get_paginated(self, endpoint: str, params=None, **kwargs):
records = []
pages = []
for page_no, page_records, req, resp in self.iter_paginated(endpoint, params, **kwargs):
records.extend(page_records)
pages.append({"page": page_no, "request": req, "response": resp})
return records, pages
def get_source_hint(self, endpoint: str) -> str | None:
if endpoint not in self.file_map:
return None
return str(self.file_map[endpoint])
class RealDBOperationsAdapter:
"""连接真实 PostgreSQL 的适配器,为任务提供 batch_upsert + 事务能力。"""
def __init__(self, dsn: str):

View File

@@ -0,0 +1,74 @@
# -*- coding: utf-8 -*-
"""Unit tests for the new ODS ingestion tasks."""
import logging
import sys
from pathlib import Path
# Ensure project root is resolvable when running tests in isolation
PROJECT_ROOT = Path(__file__).resolve().parents[2]
if str(PROJECT_ROOT) not in sys.path:
sys.path.insert(0, str(PROJECT_ROOT))
from tasks.ods_tasks import ODS_TASK_CLASSES
from .task_test_utils import create_test_config, get_db_operations, FakeAPIClient
def _build_config(tmp_path):
archive_dir = tmp_path / "archive"
temp_dir = tmp_path / "temp"
return create_test_config("ONLINE", archive_dir, temp_dir)
def test_ods_order_settle_ingest(tmp_path):
"""Ensure ODS_ORDER_SETTLE task writes raw payload + metadata."""
config = _build_config(tmp_path)
sample = [
{
"orderSettleId": 701,
"orderTradeNo": 8001,
"anyField": "value",
}
]
api = FakeAPIClient({"/order/list": sample})
task_cls = ODS_TASK_CLASSES["ODS_ORDER_SETTLE"]
with get_db_operations() as db_ops:
task = task_cls(config, db_ops, api, logging.getLogger("test_ods_order"))
result = task.execute()
assert result["status"] == "SUCCESS"
assert result["counts"]["fetched"] == 1
assert db_ops.commits == 1
row = db_ops.upserts[0]["rows"][0]
assert row["order_settle_id"] == 701
assert row["order_trade_no"] == 8001
assert row["source_endpoint"] == "/order/list"
assert '"orderSettleId": 701' in row["payload"]
def test_ods_payment_ingest(tmp_path):
"""Ensure ODS_PAYMENT task stores relate fields and payload."""
config = _build_config(tmp_path)
sample = [
{
"payId": 901,
"relateType": "ORDER",
"relateId": 123,
"payAmount": "100.00",
}
]
api = FakeAPIClient({"/pay/records": sample})
task_cls = ODS_TASK_CLASSES["ODS_PAYMENT"]
with get_db_operations() as db_ops:
task = task_cls(config, db_ops, api, logging.getLogger("test_ods_payment"))
result = task.execute()
assert result["status"] == "SUCCESS"
assert result["counts"]["fetched"] == 1
assert db_ops.commits == 1
row = db_ops.upserts[0]["rows"][0]
assert row["pay_id"] == 901
assert row["relate_type"] == "ORDER"
assert row["relate_id"] == 123
assert '"payId": 901' in row["payload"]

1
schema_v2.sql Normal file
View File

@@ -0,0 +1 @@