From 9a1df70a23a5a6dabe74e175ab38d8f43223d292 Mon Sep 17 00:00:00 2001 From: Neo Date: Wed, 19 Nov 2025 03:36:44 +0800 Subject: [PATCH] =?UTF-8?q?=E8=A1=A5=E5=85=A8=E4=BB=BB=E5=8A=A1=E4=B8=8E?= =?UTF-8?q?=E6=B5=8B=E8=AF=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- etl_billiards/.env | 10 +- etl_billiards/.env.example | 8 + etl_billiards/README.md | 28 + etl_billiards/config/defaults.py | 8 + etl_billiards/config/env_parser.py | 3 + etl_billiards/loaders/dimensions/assistant.py | 114 ++++ etl_billiards/loaders/dimensions/package.py | 91 +++ etl_billiards/loaders/dimensions/table.py | 80 +++ .../loaders/facts/assistant_abolish.py | 64 ++ .../loaders/facts/assistant_ledger.py | 136 ++++ etl_billiards/loaders/facts/coupon_usage.py | 91 +++ .../loaders/facts/inventory_change.py | 73 ++ etl_billiards/loaders/facts/refund.py | 88 +++ etl_billiards/loaders/facts/table_discount.py | 82 +++ etl_billiards/loaders/facts/topup.py | 118 ++++ etl_billiards/orchestration/task_registry.py | 25 +- etl_billiards/scripts/run_tests.py | 196 ++++++ etl_billiards/scripts/test_presets.py | 166 +++++ etl_billiards/tasks/assistant_abolish_task.py | 83 +++ etl_billiards/tasks/assistants_task.py | 103 +++ etl_billiards/tasks/coupon_usage_task.py | 92 +++ etl_billiards/tasks/inventory_change_task.py | 90 +++ etl_billiards/tasks/ledger_task.py | 119 ++++ etl_billiards/tasks/packages_task.py | 91 +++ etl_billiards/tasks/refunds_task.py | 91 +++ etl_billiards/tasks/table_discount_task.py | 92 +++ etl_billiards/tasks/tables_task.py | 87 ++- etl_billiards/tasks/topups_task.py | 102 +++ etl_billiards/tests/unit/task_test_utils.py | 638 ++++++++++++++++++ .../tests/unit/test_etl_tasks_offline.py | 39 ++ .../tests/unit/test_etl_tasks_online.py | 32 + 31 files changed, 3034 insertions(+), 6 deletions(-) create mode 100644 etl_billiards/loaders/dimensions/assistant.py create mode 100644 etl_billiards/loaders/dimensions/package.py create mode 100644 etl_billiards/loaders/facts/assistant_abolish.py create mode 100644 etl_billiards/loaders/facts/assistant_ledger.py create mode 100644 etl_billiards/loaders/facts/coupon_usage.py create mode 100644 etl_billiards/loaders/facts/inventory_change.py create mode 100644 etl_billiards/loaders/facts/refund.py create mode 100644 etl_billiards/loaders/facts/table_discount.py create mode 100644 etl_billiards/loaders/facts/topup.py create mode 100644 etl_billiards/scripts/run_tests.py create mode 100644 etl_billiards/scripts/test_presets.py create mode 100644 etl_billiards/tasks/assistant_abolish_task.py create mode 100644 etl_billiards/tasks/assistants_task.py create mode 100644 etl_billiards/tasks/coupon_usage_task.py create mode 100644 etl_billiards/tasks/inventory_change_task.py create mode 100644 etl_billiards/tasks/ledger_task.py create mode 100644 etl_billiards/tasks/packages_task.py create mode 100644 etl_billiards/tasks/refunds_task.py create mode 100644 etl_billiards/tasks/table_discount_task.py create mode 100644 etl_billiards/tasks/topups_task.py create mode 100644 etl_billiards/tests/unit/task_test_utils.py create mode 100644 etl_billiards/tests/unit/test_etl_tasks_offline.py create mode 100644 etl_billiards/tests/unit/test_etl_tasks_online.py diff --git a/etl_billiards/.env b/etl_billiards/.env index 7cd7dc2..bad1af6 100644 --- a/etl_billiards/.env +++ b/etl_billiards/.env @@ -1,5 +1,5 @@ # 数据库配置 -PG_DSN=postgresql://local-Python:Neo-local-1991125@localhost:5432/LLZQ +PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ # PG_HOST=localhost # PG_PORT=5432 # PG_NAME=LLZQ @@ -36,3 +36,11 @@ LOG_UNKNOWN_FIELDS=true HASH_ALGO=sha1 STRICT_NUMERIC=true ROUND_MONEY_SCALE=2 + +# 测试/离线模式 +TEST_MODE=OFFLINE +TEST_JSON_ARCHIVE_DIR=tests/testdata_json #指定离线模式(OFFLINE)要读取的 JSON 归档目录。测试或回放任务时,会从这个目录中找到各个任务预先保存的 API 响应文件,直接做 Transform + Load,不再访问真实接口。 +TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp #指定测试运行时临时生成或复制 JSON 文件的目录。在线/离线联动测试会把输出、中间文件等写到这个路径,既避免污染真实导出目录,也让 CI 可以在一次运行中隔离不同任务产生的临时数据。 + +# 测试数据库(留空则单元测试使用伪库) +TEST_DB_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test diff --git a/etl_billiards/.env.example b/etl_billiards/.env.example index 794814e..cb8218f 100644 --- a/etl_billiards/.env.example +++ b/etl_billiards/.env.example @@ -36,3 +36,11 @@ LOG_UNKNOWN_FIELDS=true HASH_ALGO=sha1 STRICT_NUMERIC=true ROUND_MONEY_SCALE=2 + +# 测试/离线模式 +TEST_MODE=ONLINE +TEST_JSON_ARCHIVE_DIR=tests/testdata_json +TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp + +# 测试数据库(可选:若设置则单元测试连入此 DSN) +TEST_DB_DSN= diff --git a/etl_billiards/README.md b/etl_billiards/README.md index a27852d..47f35e1 100644 --- a/etl_billiards/README.md +++ b/etl_billiards/README.md @@ -191,6 +191,34 @@ pytest --cov=. --cov-report=html - `tests/unit/test_parsers.py` – 解析器单元测试 - `tests/integration/test_database.py` – 数据库集成测试 +#### 3.3.1 测试模式(ONLINE / OFFLINE) + +- `TEST_MODE=ONLINE`(默认)时,测试会模拟实时 API,完整执行 E/T/L。 +- `TEST_MODE=OFFLINE` 时,测试改为从 `TEST_JSON_ARCHIVE_DIR` 指定的归档 JSON 中读取数据,仅做 Transform + Load,适合验证本地归档数据是否仍可回放。 +- `TEST_JSON_ARCHIVE_DIR`:离线 JSON 归档目录(示例:`tests/testdata_json` 或 CI 产出的快照)。 +- `TEST_JSON_TEMP_DIR`:测试生成的临时 JSON 输出目录,便于隔离每次运行的数据。 +- `TEST_DB_DSN`:可选,若设置则单元测试会连接到此 PostgreSQL DSN,实打实执行写库;留空时测试使用内存伪库,避免依赖数据库。 + +示例命令: + +```bash +# 在线模式覆盖所有任务 +TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py + +# 离线模式使用归档 JSON 覆盖所有任务 +TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/testdata_json pytest tests/unit/test_etl_tasks_offline.py + +# 使用脚本按需组合参数(示例:在线 + 仅订单用例) +python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS + +# 使用脚本连接真实测试库并回放离线模式 +python scripts/run_tests.py --suite offline --mode OFFLINE --db-dsn postgresql://user:pwd@localhost:5432/testdb + +# 使用“指令仓库”中的预置命令 +python scripts/run_tests.py --preset offline_realdb +python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py +``` + --- ## 4. 项目结构与文件说明 diff --git a/etl_billiards/config/defaults.py b/etl_billiards/config/defaults.py index 5f2d312..872f826 100644 --- a/etl_billiards/config/defaults.py +++ b/etl_billiards/config/defaults.py @@ -76,6 +76,14 @@ DEFAULTS = { "redact_keys": ["token", "password", "Authorization"], "echo_token_in_logs": False, }, + "testing": { + # ONLINE: 正常实时ETL;OFFLINE: 读取归档JSON做T/L + "mode": "OFFLINE", + # 离线归档JSON所在目录 + "json_archive_dir": "", + # 测试运行时用于生成/复制临时JSON的目录 + "temp_json_dir": "", + }, } # 任务代码常量 diff --git a/etl_billiards/config/env_parser.py b/etl_billiards/config/env_parser.py index b29f5c8..431e08c 100644 --- a/etl_billiards/config/env_parser.py +++ b/etl_billiards/config/env_parser.py @@ -24,6 +24,9 @@ ENV_MAP = { "OVERLAP_SECONDS": ("run.overlap_seconds",), "WINDOW_BUSY_MIN": ("run.window_minutes.default_busy",), "WINDOW_IDLE_MIN": ("run.window_minutes.default_idle",), + "TEST_MODE": ("testing.mode",), + "TEST_JSON_ARCHIVE_DIR": ("testing.json_archive_dir",), + "TEST_JSON_TEMP_DIR": ("testing.temp_json_dir",), } def _deep_set(d, dotted_keys, value): diff --git a/etl_billiards/loaders/dimensions/assistant.py b/etl_billiards/loaders/dimensions/assistant.py new file mode 100644 index 0000000..40a1c1e --- /dev/null +++ b/etl_billiards/loaders/dimensions/assistant.py @@ -0,0 +1,114 @@ +# -*- coding: utf-8 -*- +"""助教维度加载器""" + +from ..base_loader import BaseLoader + + +class AssistantLoader(BaseLoader): + """写入 dim_assistant""" + + def upsert_assistants(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.dim_assistant ( + store_id, + assistant_id, + assistant_no, + nickname, + real_name, + gender, + mobile, + level, + team_id, + team_name, + assistant_status, + work_status, + entry_time, + resign_time, + start_time, + end_time, + create_time, + update_time, + system_role_id, + online_status, + allow_cx, + charge_way, + pd_unit_price, + cx_unit_price, + is_guaranteed, + is_team_leader, + serial_number, + show_sort, + is_delete, + raw_data + ) + VALUES ( + %(store_id)s, + %(assistant_id)s, + %(assistant_no)s, + %(nickname)s, + %(real_name)s, + %(gender)s, + %(mobile)s, + %(level)s, + %(team_id)s, + %(team_name)s, + %(assistant_status)s, + %(work_status)s, + %(entry_time)s, + %(resign_time)s, + %(start_time)s, + %(end_time)s, + %(create_time)s, + %(update_time)s, + %(system_role_id)s, + %(online_status)s, + %(allow_cx)s, + %(charge_way)s, + %(pd_unit_price)s, + %(cx_unit_price)s, + %(is_guaranteed)s, + %(is_team_leader)s, + %(serial_number)s, + %(show_sort)s, + %(is_delete)s, + %(raw_data)s + ) + ON CONFLICT (store_id, assistant_id) DO UPDATE SET + assistant_no = EXCLUDED.assistant_no, + nickname = EXCLUDED.nickname, + real_name = EXCLUDED.real_name, + gender = EXCLUDED.gender, + mobile = EXCLUDED.mobile, + level = EXCLUDED.level, + team_id = EXCLUDED.team_id, + team_name = EXCLUDED.team_name, + assistant_status= EXCLUDED.assistant_status, + work_status = EXCLUDED.work_status, + entry_time = EXCLUDED.entry_time, + resign_time = EXCLUDED.resign_time, + start_time = EXCLUDED.start_time, + end_time = EXCLUDED.end_time, + update_time = COALESCE(EXCLUDED.update_time, now()), + system_role_id = EXCLUDED.system_role_id, + online_status = EXCLUDED.online_status, + allow_cx = EXCLUDED.allow_cx, + charge_way = EXCLUDED.charge_way, + pd_unit_price = EXCLUDED.pd_unit_price, + cx_unit_price = EXCLUDED.cx_unit_price, + is_guaranteed = EXCLUDED.is_guaranteed, + is_team_leader = EXCLUDED.is_team_leader, + serial_number = EXCLUDED.serial_number, + show_sort = EXCLUDED.show_sort, + is_delete = EXCLUDED.is_delete, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/dimensions/package.py b/etl_billiards/loaders/dimensions/package.py new file mode 100644 index 0000000..bad8aa7 --- /dev/null +++ b/etl_billiards/loaders/dimensions/package.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +"""团购/套餐定义加载器""" + +from ..base_loader import BaseLoader + + +class PackageDefinitionLoader(BaseLoader): + """写入 dim_package_coupon""" + + def upsert_packages(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.dim_package_coupon ( + store_id, + package_id, + package_code, + package_name, + table_area_id, + table_area_name, + selling_price, + duration_seconds, + start_time, + end_time, + type, + is_enabled, + is_delete, + usable_count, + creator_name, + date_type, + group_type, + coupon_money, + area_tag_type, + system_group_type, + card_type_ids, + raw_data + ) + VALUES ( + %(store_id)s, + %(package_id)s, + %(package_code)s, + %(package_name)s, + %(table_area_id)s, + %(table_area_name)s, + %(selling_price)s, + %(duration_seconds)s, + %(start_time)s, + %(end_time)s, + %(type)s, + %(is_enabled)s, + %(is_delete)s, + %(usable_count)s, + %(creator_name)s, + %(date_type)s, + %(group_type)s, + %(coupon_money)s, + %(area_tag_type)s, + %(system_group_type)s, + %(card_type_ids)s, + %(raw_data)s + ) + ON CONFLICT (store_id, package_id) DO UPDATE SET + package_code = EXCLUDED.package_code, + package_name = EXCLUDED.package_name, + table_area_id = EXCLUDED.table_area_id, + table_area_name = EXCLUDED.table_area_name, + selling_price = EXCLUDED.selling_price, + duration_seconds = EXCLUDED.duration_seconds, + start_time = EXCLUDED.start_time, + end_time = EXCLUDED.end_time, + type = EXCLUDED.type, + is_enabled = EXCLUDED.is_enabled, + is_delete = EXCLUDED.is_delete, + usable_count = EXCLUDED.usable_count, + creator_name = EXCLUDED.creator_name, + date_type = EXCLUDED.date_type, + group_type = EXCLUDED.group_type, + coupon_money = EXCLUDED.coupon_money, + area_tag_type = EXCLUDED.area_tag_type, + system_group_type = EXCLUDED.system_group_type, + card_type_ids = EXCLUDED.card_type_ids, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/dimensions/table.py b/etl_billiards/loaders/dimensions/table.py index e69de29..eab02d6 100644 --- a/etl_billiards/loaders/dimensions/table.py +++ b/etl_billiards/loaders/dimensions/table.py @@ -0,0 +1,80 @@ +# -*- coding: utf-8 -*- +"""台桌维度加载器""" + +from ..base_loader import BaseLoader + + +class TableLoader(BaseLoader): + """将台桌档案写入 dim_table""" + + def upsert_tables(self, records: list) -> tuple: + """批量写入台桌档案""" + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.dim_table ( + store_id, + table_id, + site_id, + area_id, + area_name, + table_name, + table_price, + table_status, + table_status_name, + light_status, + is_rest_area, + show_status, + virtual_table, + charge_free, + only_allow_groupon, + is_online_reservation, + created_time, + raw_data + ) + VALUES ( + %(store_id)s, + %(table_id)s, + %(site_id)s, + %(area_id)s, + %(area_name)s, + %(table_name)s, + %(table_price)s, + %(table_status)s, + %(table_status_name)s, + %(light_status)s, + %(is_rest_area)s, + %(show_status)s, + %(virtual_table)s, + %(charge_free)s, + %(only_allow_groupon)s, + %(is_online_reservation)s, + %(created_time)s, + %(raw_data)s + ) + ON CONFLICT (store_id, table_id) DO UPDATE SET + site_id = EXCLUDED.site_id, + area_id = EXCLUDED.area_id, + area_name = EXCLUDED.area_name, + table_name = EXCLUDED.table_name, + table_price = EXCLUDED.table_price, + table_status = EXCLUDED.table_status, + table_status_name = EXCLUDED.table_status_name, + light_status = EXCLUDED.light_status, + is_rest_area = EXCLUDED.is_rest_area, + show_status = EXCLUDED.show_status, + virtual_table = EXCLUDED.virtual_table, + charge_free = EXCLUDED.charge_free, + only_allow_groupon = EXCLUDED.only_allow_groupon, + is_online_reservation = EXCLUDED.is_online_reservation, + created_time = COALESCE(EXCLUDED.created_time, dim_table.created_time), + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/facts/assistant_abolish.py b/etl_billiards/loaders/facts/assistant_abolish.py new file mode 100644 index 0000000..1324720 --- /dev/null +++ b/etl_billiards/loaders/facts/assistant_abolish.py @@ -0,0 +1,64 @@ +# -*- coding: utf-8 -*- +"""助教作废事实表""" + +from ..base_loader import BaseLoader + + +class AssistantAbolishLoader(BaseLoader): + """写入 fact_assistant_abolish""" + + def upsert_records(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.fact_assistant_abolish ( + store_id, + abolish_id, + table_id, + table_name, + table_area_id, + table_area, + assistant_no, + assistant_name, + charge_minutes, + abolish_amount, + create_time, + trash_reason, + raw_data + ) + VALUES ( + %(store_id)s, + %(abolish_id)s, + %(table_id)s, + %(table_name)s, + %(table_area_id)s, + %(table_area)s, + %(assistant_no)s, + %(assistant_name)s, + %(charge_minutes)s, + %(abolish_amount)s, + %(create_time)s, + %(trash_reason)s, + %(raw_data)s + ) + ON CONFLICT (store_id, abolish_id) DO UPDATE SET + table_id = EXCLUDED.table_id, + table_name = EXCLUDED.table_name, + table_area_id = EXCLUDED.table_area_id, + table_area = EXCLUDED.table_area, + assistant_no = EXCLUDED.assistant_no, + assistant_name = EXCLUDED.assistant_name, + charge_minutes = EXCLUDED.charge_minutes, + abolish_amount = EXCLUDED.abolish_amount, + create_time = EXCLUDED.create_time, + trash_reason = EXCLUDED.trash_reason, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/facts/assistant_ledger.py b/etl_billiards/loaders/facts/assistant_ledger.py new file mode 100644 index 0000000..4ebbaff --- /dev/null +++ b/etl_billiards/loaders/facts/assistant_ledger.py @@ -0,0 +1,136 @@ +# -*- coding: utf-8 -*- +"""助教流水事实表""" + +from ..base_loader import BaseLoader + + +class AssistantLedgerLoader(BaseLoader): + """写入 fact_assistant_ledger""" + + def upsert_ledgers(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.fact_assistant_ledger ( + store_id, + ledger_id, + assistant_no, + assistant_name, + nickname, + level_name, + table_name, + ledger_unit_price, + ledger_count, + ledger_amount, + projected_income, + service_money, + member_discount_amount, + manual_discount_amount, + coupon_deduct_money, + order_trade_no, + order_settle_id, + operator_id, + operator_name, + assistant_team_id, + assistant_level, + site_table_id, + order_assistant_id, + site_assistant_id, + user_id, + ledger_start_time, + ledger_end_time, + start_use_time, + last_use_time, + income_seconds, + real_use_seconds, + is_trash, + trash_reason, + is_confirm, + ledger_status, + create_time, + raw_data + ) + VALUES ( + %(store_id)s, + %(ledger_id)s, + %(assistant_no)s, + %(assistant_name)s, + %(nickname)s, + %(level_name)s, + %(table_name)s, + %(ledger_unit_price)s, + %(ledger_count)s, + %(ledger_amount)s, + %(projected_income)s, + %(service_money)s, + %(member_discount_amount)s, + %(manual_discount_amount)s, + %(coupon_deduct_money)s, + %(order_trade_no)s, + %(order_settle_id)s, + %(operator_id)s, + %(operator_name)s, + %(assistant_team_id)s, + %(assistant_level)s, + %(site_table_id)s, + %(order_assistant_id)s, + %(site_assistant_id)s, + %(user_id)s, + %(ledger_start_time)s, + %(ledger_end_time)s, + %(start_use_time)s, + %(last_use_time)s, + %(income_seconds)s, + %(real_use_seconds)s, + %(is_trash)s, + %(trash_reason)s, + %(is_confirm)s, + %(ledger_status)s, + %(create_time)s, + %(raw_data)s + ) + ON CONFLICT (store_id, ledger_id) DO UPDATE SET + assistant_no = EXCLUDED.assistant_no, + assistant_name = EXCLUDED.assistant_name, + nickname = EXCLUDED.nickname, + level_name = EXCLUDED.level_name, + table_name = EXCLUDED.table_name, + ledger_unit_price = EXCLUDED.ledger_unit_price, + ledger_count = EXCLUDED.ledger_count, + ledger_amount = EXCLUDED.ledger_amount, + projected_income = EXCLUDED.projected_income, + service_money = EXCLUDED.service_money, + member_discount_amount = EXCLUDED.member_discount_amount, + manual_discount_amount = EXCLUDED.manual_discount_amount, + coupon_deduct_money = EXCLUDED.coupon_deduct_money, + order_trade_no = EXCLUDED.order_trade_no, + order_settle_id = EXCLUDED.order_settle_id, + operator_id = EXCLUDED.operator_id, + operator_name = EXCLUDED.operator_name, + assistant_team_id = EXCLUDED.assistant_team_id, + assistant_level = EXCLUDED.assistant_level, + site_table_id = EXCLUDED.site_table_id, + order_assistant_id = EXCLUDED.order_assistant_id, + site_assistant_id = EXCLUDED.site_assistant_id, + user_id = EXCLUDED.user_id, + ledger_start_time = EXCLUDED.ledger_start_time, + ledger_end_time = EXCLUDED.ledger_end_time, + start_use_time = EXCLUDED.start_use_time, + last_use_time = EXCLUDED.last_use_time, + income_seconds = EXCLUDED.income_seconds, + real_use_seconds = EXCLUDED.real_use_seconds, + is_trash = EXCLUDED.is_trash, + trash_reason = EXCLUDED.trash_reason, + is_confirm = EXCLUDED.is_confirm, + ledger_status = EXCLUDED.ledger_status, + create_time = EXCLUDED.create_time, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/facts/coupon_usage.py b/etl_billiards/loaders/facts/coupon_usage.py new file mode 100644 index 0000000..8f683db --- /dev/null +++ b/etl_billiards/loaders/facts/coupon_usage.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +"""券核销事实表""" + +from ..base_loader import BaseLoader + + +class CouponUsageLoader(BaseLoader): + """写入 fact_coupon_usage""" + + def upsert_coupon_usage(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.fact_coupon_usage ( + store_id, + usage_id, + coupon_code, + coupon_channel, + coupon_name, + sale_price, + coupon_money, + coupon_free_time, + use_status, + create_time, + consume_time, + operator_id, + operator_name, + table_id, + site_order_id, + group_package_id, + coupon_remark, + deal_id, + certificate_id, + verify_id, + is_delete, + raw_data + ) + VALUES ( + %(store_id)s, + %(usage_id)s, + %(coupon_code)s, + %(coupon_channel)s, + %(coupon_name)s, + %(sale_price)s, + %(coupon_money)s, + %(coupon_free_time)s, + %(use_status)s, + %(create_time)s, + %(consume_time)s, + %(operator_id)s, + %(operator_name)s, + %(table_id)s, + %(site_order_id)s, + %(group_package_id)s, + %(coupon_remark)s, + %(deal_id)s, + %(certificate_id)s, + %(verify_id)s, + %(is_delete)s, + %(raw_data)s + ) + ON CONFLICT (store_id, usage_id) DO UPDATE SET + coupon_code = EXCLUDED.coupon_code, + coupon_channel = EXCLUDED.coupon_channel, + coupon_name = EXCLUDED.coupon_name, + sale_price = EXCLUDED.sale_price, + coupon_money = EXCLUDED.coupon_money, + coupon_free_time = EXCLUDED.coupon_free_time, + use_status = EXCLUDED.use_status, + create_time = EXCLUDED.create_time, + consume_time = EXCLUDED.consume_time, + operator_id = EXCLUDED.operator_id, + operator_name = EXCLUDED.operator_name, + table_id = EXCLUDED.table_id, + site_order_id = EXCLUDED.site_order_id, + group_package_id = EXCLUDED.group_package_id, + coupon_remark = EXCLUDED.coupon_remark, + deal_id = EXCLUDED.deal_id, + certificate_id = EXCLUDED.certificate_id, + verify_id = EXCLUDED.verify_id, + is_delete = EXCLUDED.is_delete, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/facts/inventory_change.py b/etl_billiards/loaders/facts/inventory_change.py new file mode 100644 index 0000000..e20b655 --- /dev/null +++ b/etl_billiards/loaders/facts/inventory_change.py @@ -0,0 +1,73 @@ +# -*- coding: utf-8 -*- +"""库存变动事实表""" + +from ..base_loader import BaseLoader + + +class InventoryChangeLoader(BaseLoader): + """写入 fact_inventory_change""" + + def upsert_changes(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.fact_inventory_change ( + store_id, + change_id, + site_goods_id, + stock_type, + goods_name, + change_time, + start_qty, + end_qty, + change_qty, + unit, + price, + operator_name, + remark, + goods_category_id, + goods_second_category_id, + raw_data + ) + VALUES ( + %(store_id)s, + %(change_id)s, + %(site_goods_id)s, + %(stock_type)s, + %(goods_name)s, + %(change_time)s, + %(start_qty)s, + %(end_qty)s, + %(change_qty)s, + %(unit)s, + %(price)s, + %(operator_name)s, + %(remark)s, + %(goods_category_id)s, + %(goods_second_category_id)s, + %(raw_data)s + ) + ON CONFLICT (store_id, change_id) DO UPDATE SET + site_goods_id = EXCLUDED.site_goods_id, + stock_type = EXCLUDED.stock_type, + goods_name = EXCLUDED.goods_name, + change_time = EXCLUDED.change_time, + start_qty = EXCLUDED.start_qty, + end_qty = EXCLUDED.end_qty, + change_qty = EXCLUDED.change_qty, + unit = EXCLUDED.unit, + price = EXCLUDED.price, + operator_name = EXCLUDED.operator_name, + remark = EXCLUDED.remark, + goods_category_id = EXCLUDED.goods_category_id, + goods_second_category_id = EXCLUDED.goods_second_category_id, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/facts/refund.py b/etl_billiards/loaders/facts/refund.py new file mode 100644 index 0000000..a9abc8a --- /dev/null +++ b/etl_billiards/loaders/facts/refund.py @@ -0,0 +1,88 @@ +# -*- coding: utf-8 -*- +"""退款事实表加载器""" + +from ..base_loader import BaseLoader + + +class RefundLoader(BaseLoader): + """写入 fact_refund""" + + def upsert_refunds(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.fact_refund ( + store_id, + refund_id, + site_id, + tenant_id, + pay_amount, + pay_status, + pay_time, + create_time, + relate_type, + relate_id, + payment_method, + refund_amount, + action_type, + pay_terminal, + operator_id, + channel_pay_no, + channel_fee, + is_delete, + member_id, + member_card_id, + raw_data + ) + VALUES ( + %(store_id)s, + %(refund_id)s, + %(site_id)s, + %(tenant_id)s, + %(pay_amount)s, + %(pay_status)s, + %(pay_time)s, + %(create_time)s, + %(relate_type)s, + %(relate_id)s, + %(payment_method)s, + %(refund_amount)s, + %(action_type)s, + %(pay_terminal)s, + %(operator_id)s, + %(channel_pay_no)s, + %(channel_fee)s, + %(is_delete)s, + %(member_id)s, + %(member_card_id)s, + %(raw_data)s + ) + ON CONFLICT (store_id, refund_id) DO UPDATE SET + site_id = EXCLUDED.site_id, + tenant_id = EXCLUDED.tenant_id, + pay_amount = EXCLUDED.pay_amount, + pay_status = EXCLUDED.pay_status, + pay_time = EXCLUDED.pay_time, + create_time = EXCLUDED.create_time, + relate_type = EXCLUDED.relate_type, + relate_id = EXCLUDED.relate_id, + payment_method = EXCLUDED.payment_method, + refund_amount = EXCLUDED.refund_amount, + action_type = EXCLUDED.action_type, + pay_terminal = EXCLUDED.pay_terminal, + operator_id = EXCLUDED.operator_id, + channel_pay_no = EXCLUDED.channel_pay_no, + channel_fee = EXCLUDED.channel_fee, + is_delete = EXCLUDED.is_delete, + member_id = EXCLUDED.member_id, + member_card_id = EXCLUDED.member_card_id, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/facts/table_discount.py b/etl_billiards/loaders/facts/table_discount.py new file mode 100644 index 0000000..0ecdddb --- /dev/null +++ b/etl_billiards/loaders/facts/table_discount.py @@ -0,0 +1,82 @@ +# -*- coding: utf-8 -*- +"""台费打折事实表""" + +from ..base_loader import BaseLoader + + +class TableDiscountLoader(BaseLoader): + """写入 fact_table_discount""" + + def upsert_discounts(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.fact_table_discount ( + store_id, + discount_id, + adjust_type, + applicant_id, + applicant_name, + operator_id, + operator_name, + ledger_amount, + ledger_count, + ledger_name, + ledger_status, + order_settle_id, + order_trade_no, + site_table_id, + table_area_id, + table_area_name, + create_time, + is_delete, + raw_data + ) + VALUES ( + %(store_id)s, + %(discount_id)s, + %(adjust_type)s, + %(applicant_id)s, + %(applicant_name)s, + %(operator_id)s, + %(operator_name)s, + %(ledger_amount)s, + %(ledger_count)s, + %(ledger_name)s, + %(ledger_status)s, + %(order_settle_id)s, + %(order_trade_no)s, + %(site_table_id)s, + %(table_area_id)s, + %(table_area_name)s, + %(create_time)s, + %(is_delete)s, + %(raw_data)s + ) + ON CONFLICT (store_id, discount_id) DO UPDATE SET + adjust_type = EXCLUDED.adjust_type, + applicant_id = EXCLUDED.applicant_id, + applicant_name = EXCLUDED.applicant_name, + operator_id = EXCLUDED.operator_id, + operator_name = EXCLUDED.operator_name, + ledger_amount = EXCLUDED.ledger_amount, + ledger_count = EXCLUDED.ledger_count, + ledger_name = EXCLUDED.ledger_name, + ledger_status = EXCLUDED.ledger_status, + order_settle_id = EXCLUDED.order_settle_id, + order_trade_no = EXCLUDED.order_trade_no, + site_table_id = EXCLUDED.site_table_id, + table_area_id = EXCLUDED.table_area_id, + table_area_name = EXCLUDED.table_area_name, + create_time = EXCLUDED.create_time, + is_delete = EXCLUDED.is_delete, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/loaders/facts/topup.py b/etl_billiards/loaders/facts/topup.py new file mode 100644 index 0000000..f7e614f --- /dev/null +++ b/etl_billiards/loaders/facts/topup.py @@ -0,0 +1,118 @@ +# -*- coding: utf-8 -*- +"""充值记录事实表""" + +from ..base_loader import BaseLoader + + +class TopupLoader(BaseLoader): + """写入 fact_topup""" + + def upsert_topups(self, records: list) -> tuple: + if not records: + return (0, 0, 0) + + sql = """ + INSERT INTO billiards.fact_topup ( + store_id, + topup_id, + member_id, + member_name, + member_phone, + card_id, + card_type_name, + pay_amount, + consume_money, + settle_status, + settle_type, + settle_name, + settle_relate_id, + pay_time, + create_time, + operator_id, + operator_name, + payment_method, + refund_amount, + cash_amount, + card_amount, + balance_amount, + online_amount, + rounding_amount, + adjust_amount, + goods_money, + table_charge_money, + service_money, + coupon_amount, + order_remark, + raw_data + ) + VALUES ( + %(store_id)s, + %(topup_id)s, + %(member_id)s, + %(member_name)s, + %(member_phone)s, + %(card_id)s, + %(card_type_name)s, + %(pay_amount)s, + %(consume_money)s, + %(settle_status)s, + %(settle_type)s, + %(settle_name)s, + %(settle_relate_id)s, + %(pay_time)s, + %(create_time)s, + %(operator_id)s, + %(operator_name)s, + %(payment_method)s, + %(refund_amount)s, + %(cash_amount)s, + %(card_amount)s, + %(balance_amount)s, + %(online_amount)s, + %(rounding_amount)s, + %(adjust_amount)s, + %(goods_money)s, + %(table_charge_money)s, + %(service_money)s, + %(coupon_amount)s, + %(order_remark)s, + %(raw_data)s + ) + ON CONFLICT (store_id, topup_id) DO UPDATE SET + member_id = EXCLUDED.member_id, + member_name = EXCLUDED.member_name, + member_phone = EXCLUDED.member_phone, + card_id = EXCLUDED.card_id, + card_type_name = EXCLUDED.card_type_name, + pay_amount = EXCLUDED.pay_amount, + consume_money = EXCLUDED.consume_money, + settle_status = EXCLUDED.settle_status, + settle_type = EXCLUDED.settle_type, + settle_name = EXCLUDED.settle_name, + settle_relate_id = EXCLUDED.settle_relate_id, + pay_time = EXCLUDED.pay_time, + create_time = EXCLUDED.create_time, + operator_id = EXCLUDED.operator_id, + operator_name = EXCLUDED.operator_name, + payment_method = EXCLUDED.payment_method, + refund_amount = EXCLUDED.refund_amount, + cash_amount = EXCLUDED.cash_amount, + card_amount = EXCLUDED.card_amount, + balance_amount = EXCLUDED.balance_amount, + online_amount = EXCLUDED.online_amount, + rounding_amount = EXCLUDED.rounding_amount, + adjust_amount = EXCLUDED.adjust_amount, + goods_money = EXCLUDED.goods_money, + table_charge_money = EXCLUDED.table_charge_money, + service_money = EXCLUDED.service_money, + coupon_amount = EXCLUDED.coupon_amount, + order_remark = EXCLUDED.order_remark, + raw_data = EXCLUDED.raw_data, + updated_at = now() + RETURNING (xmax = 0) AS inserted + """ + + inserted, updated = self.db.batch_upsert_with_returning( + sql, records, page_size=self._batch_size() + ) + return (inserted, updated, 0) diff --git a/etl_billiards/orchestration/task_registry.py b/etl_billiards/orchestration/task_registry.py index dd72d87..a6fcbd4 100644 --- a/etl_billiards/orchestration/task_registry.py +++ b/etl_billiards/orchestration/task_registry.py @@ -3,6 +3,17 @@ from tasks.orders_task import OrdersTask from tasks.payments_task import PaymentsTask from tasks.members_task import MembersTask +from tasks.products_task import ProductsTask +from tasks.tables_task import TablesTask +from tasks.assistants_task import AssistantsTask +from tasks.packages_task import PackagesDefTask +from tasks.refunds_task import RefundsTask +from tasks.coupon_usage_task import CouponUsageTask +from tasks.inventory_change_task import InventoryChangeTask +from tasks.topups_task import TopupsTask +from tasks.table_discount_task import TableDiscountTask +from tasks.assistant_abolish_task import AssistantAbolishTask +from tasks.ledger_task import LedgerTask class TaskRegistry: """任务注册和工厂""" @@ -30,7 +41,17 @@ class TaskRegistry: # 默认注册表 default_registry = TaskRegistry() +default_registry.register("PRODUCTS", ProductsTask) +default_registry.register("TABLES", TablesTask) +default_registry.register("MEMBERS", MembersTask) +default_registry.register("ASSISTANTS", AssistantsTask) +default_registry.register("PACKAGES_DEF", PackagesDefTask) default_registry.register("ORDERS", OrdersTask) default_registry.register("PAYMENTS", PaymentsTask) -default_registry.register("MEMBERS", MembersTask) -# 可以继续注册其他任务... +default_registry.register("REFUNDS", RefundsTask) +default_registry.register("COUPON_USAGE", CouponUsageTask) +default_registry.register("INVENTORY_CHANGE", InventoryChangeTask) +default_registry.register("TOPUPS", TopupsTask) +default_registry.register("TABLE_DISCOUNT", TableDiscountTask) +default_registry.register("ASSISTANT_ABOLISH", AssistantAbolishTask) +default_registry.register("LEDGER", LedgerTask) diff --git a/etl_billiards/scripts/run_tests.py b/etl_billiards/scripts/run_tests.py new file mode 100644 index 0000000..1c6301d --- /dev/null +++ b/etl_billiards/scripts/run_tests.py @@ -0,0 +1,196 @@ +# -*- coding: utf-8 -*- +""" +灵活的测试执行脚本,可像搭积木一样组合不同参数或预置命令(模式/数据库/归档路径等), +直接运行本文件即可触发 pytest。 + +示例: + python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS + python scripts/run_tests.py --preset offline_realdb + python scripts/run_tests.py --suite online offline --db-dsn ... --json-archive tmp/archives +""" +from __future__ import annotations + +import argparse +import importlib.util +import os +import shlex +import sys +from typing import Dict, List + +import pytest + +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) + +SUITE_MAP: Dict[str, str] = { + "online": "tests/unit/test_etl_tasks_online.py", + "offline": "tests/unit/test_etl_tasks_offline.py", + "integration": "tests/integration/test_database.py", +} + +PRESETS: Dict[str, Dict] = {} + + +def _load_presets(): + preset_path = os.path.join(os.path.dirname(__file__), "test_presets.py") + if not os.path.exists(preset_path): + return + spec = importlib.util.spec_from_file_location("test_presets", preset_path) + if not spec or not spec.loader: + return + module = importlib.util.module_from_spec(spec) + spec.loader.exec_module(module) # type: ignore[attr-defined] + presets = getattr(module, "PRESETS", {}) + if isinstance(presets, dict): + PRESETS.update(presets) + + +_load_presets() + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="ETL 测试执行器(支持参数化调配)") + parser.add_argument( + "--suite", + choices=sorted(SUITE_MAP.keys()), + nargs="+", + help="预置测试套件,可多选(默认全部 online/offline)", + ) + parser.add_argument( + "--tests", + nargs="+", + help="自定义测试路径(可与 --suite 混用),例如 tests/unit/test_config.py", + ) + parser.add_argument( + "--mode", + choices=["ONLINE", "OFFLINE"], + help="覆盖 TEST_MODE(默认沿用 .env / 环境变量)", + ) + parser.add_argument("--db-dsn", help="设置 TEST_DB_DSN,连接真实数据库进行测试") + parser.add_argument("--json-archive", help="设置 TEST_JSON_ARCHIVE_DIR(离线档案目录)") + parser.add_argument("--json-temp", help="设置 TEST_JSON_TEMP_DIR(临时 JSON 路径)") + parser.add_argument( + "--keyword", + "-k", + help="pytest -k 关键字过滤(例如 ORDERS,只运行包含该字符串的用例)", + ) + parser.add_argument( + "--pytest-args", + help="附加 pytest 参数,格式与命令行一致(例如 \"-vv --maxfail=1\")", + ) + parser.add_argument( + "--env", + action="append", + metavar="KEY=VALUE", + help="自定义环境变量,可重复传入,例如 --env STORE_ID=123", + ) + parser.add_argument("--preset", choices=sorted(PRESETS.keys()) if PRESETS else None, nargs="+", + help="从 scripts/test_presets.py 中选择一个或多个组合命令") + parser.add_argument("--list-presets", action="store_true", help="列出可用预置命令后退出") + parser.add_argument("--dry-run", action="store_true", help="仅打印将要执行的命令与环境,不真正运行 pytest") + return parser.parse_args() + + +def apply_presets_to_args(args: argparse.Namespace): + if not args.preset: + return + for name in args.preset: + preset = PRESETS.get(name, {}) + if not preset: + continue + for key, value in preset.items(): + if key in ("suite", "tests"): + if not value: + continue + existing = getattr(args, key) + if existing is None: + setattr(args, key, list(value)) + else: + existing.extend(value) + elif key == "env": + args.env = (args.env or []) + list(value) + elif key == "pytest_args": + args.pytest_args = " ".join(filter(None, [value, args.pytest_args or ""])) + elif key == "keyword": + if args.keyword is None: + args.keyword = value + else: + if getattr(args, key, None) is None: + setattr(args, key, value) + + +def apply_env(args: argparse.Namespace) -> Dict[str, str]: + env_updates = {} + if args.mode: + env_updates["TEST_MODE"] = args.mode + if args.db_dsn: + env_updates["TEST_DB_DSN"] = args.db_dsn + if args.json_archive: + env_updates["TEST_JSON_ARCHIVE_DIR"] = args.json_archive + if args.json_temp: + env_updates["TEST_JSON_TEMP_DIR"] = args.json_temp + if args.env: + for item in args.env: + if "=" not in item: + raise SystemExit(f"--env 参数格式错误: {item!r},应为 KEY=VALUE") + key, value = item.split("=", 1) + env_updates[key.strip()] = value.strip() + + for key, value in env_updates.items(): + os.environ[key] = value + return env_updates + + +def build_pytest_args(args: argparse.Namespace) -> List[str]: + targets: List[str] = [] + if args.suite: + for suite in args.suite: + targets.append(SUITE_MAP[suite]) + if args.tests: + targets.extend(args.tests) + if not targets: + # 默认跑 online + offline 套件 + targets = [SUITE_MAP["online"], SUITE_MAP["offline"]] + + pytest_args: List[str] = targets + if args.keyword: + pytest_args += ["-k", args.keyword] + if args.pytest_args: + pytest_args += shlex.split(args.pytest_args) + return pytest_args + + +def main() -> int: + os.chdir(PROJECT_ROOT) + args = parse_args() + if args.list_presets: + print("可用预置命令:") + if not PRESETS: + print("(暂无,可编辑 scripts/test_presets.py 添加)") + else: + for name in sorted(PRESETS): + print(f"- {name}") + return 0 + + apply_presets_to_args(args) + env_updates = apply_env(args) + pytest_args = build_pytest_args(args) + + print("=== 环境变量覆盖 ===") + if env_updates: + for k, v in env_updates.items(): + print(f"{k}={v}") + else: + print("(无覆盖,沿用系统默认)") + print("\n=== Pytest 参数 ===") + print(" ".join(pytest_args)) + print() + + if args.dry_run: + print("Dry-run 模式,未真正执行 pytest") + return 0 + + exit_code = pytest.main(pytest_args) + return int(exit_code) + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/etl_billiards/scripts/test_presets.py b/etl_billiards/scripts/test_presets.py new file mode 100644 index 0000000..952e5bc --- /dev/null +++ b/etl_billiards/scripts/test_presets.py @@ -0,0 +1,166 @@ +# -*- coding: utf-8 -*- +"""测试命令“仓库”,集中维护 run_tests.py 的预置组合。 + +支持的参数键(可在 PRESETS 中自由组合): + +1. suite + - 类型:列表 + - 作用:引用 run_tests 中的预置套件,值可为 online / offline / integration 等。 + - 用法:["online"] 仅跑在线模式;["online","offline"] 同时跑两套;["integration"] 跑数据库集成测试。 + +2. tests + - 类型:列表 + - 作用:传入任意 pytest 目标路径,适合补充临时/自定义测试文件。 + - 用法:["tests/unit/test_config.py","tests/unit/test_parsers.py"]。 + +3. mode + - 类型:字符串 + - 取值:ONLINE 或 OFFLINE。 + - 作用:覆盖 TEST_MODE;ONLINE 走 API 全流程,OFFLINE 读取 JSON 归档执行 T+L。 + +4. db_dsn + - 类型:字符串 + - 作用:设置 TEST_DB_DSN,指定真实 PostgreSQL 连接;缺省时测试引擎使用伪 DB,仅记录写入不触库。 + - 示例:postgresql://user:pwd@localhost:5432/testdb。 + +5. json_archive / json_temp + - 类型:字符串 + - 作用:离线模式的 JSON 归档目录 / 临时输出目录。 + - 说明:不设置时沿用 .env 或默认值;仅在 OFFLINE 模式需要关注。 + +6. keyword + - 类型:字符串 + - 作用:等价 pytest -k,用于筛选测试名/节点。 + - 示例:"ORDERS" 可只运行包含该关键字的测试函数。 + +7. pytest_args + - 类型:字符串 + - 作用:附加 pytest 命令行参数。 + - 示例:"-vv --maxfail=1 --disable-warnings"。 + +8. env + - 类型:列表 + - 作用:追加环境变量,形如 ["STORE_ID=123","API_TOKEN=xxx"],会在 run_tests 内透传给 os.environ。 + +9. preset_meta + - 类型:字符串 + - 作用:纯注释信息,便于描述该预置组合的用途,不会传递给 run_tests。 + +运行方式建议直接 F5(或 `python scripts/test_presets.py`),脚本将读取 AUTO_RUN_PRESETS 中的配置依次执行。 +如需临时指定其它预置,可传入 `--preset xxx`;`--list` 用于查看所有参数说明和预置详情。 +""" + +from __future__ import annotations + +import argparse +import os +import subprocess +import sys +from typing import List + +RUN_TESTS_SCRIPT = os.path.join(os.path.dirname(__file__), "run_tests.py") + +AUTO_RUN_PRESETS = ["online_orders"] + +# PRESETS = { +# "online_orders": { +# "suite": ["online"], +# "mode": "ONLINE", +# "keyword": "ORDERS", +# "pytest_args": "-vv", +# "preset_meta": "在线模式,仅跑订单任务,输出更详细日志", +# }, +# "offline_realdb": { +# "suite": ["offline"], +# "mode": "OFFLINE", +# "db_dsn": "postgresql://user:pwd@localhost:5432/testdb", +# "json_archive": "tests/testdata_json", +# "preset_meta": "离线模式 + 真实测试库,用预置 JSON 回放全量任务", +# }, +# "integration_db": { +# "suite": ["integration"], +# "db_dsn": "postgresql://user:pwd@localhost:5432/testdb", +# "preset_meta": "仅跑数据库连接/操作相关的集成测试", +# }, +# } + +PRESETS = { + "offline_realdb": { + "suite": ["offline"], + "mode": "OFFLINE", + "db_dsn": "postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test", + "json_archive": "tests/testdata_json", + "preset_meta": "离线模式 + 真实测试库,用预置 JSON 回放全量任务", + }, +} + + + +def print_parameter_help(): + print("可用参数键说明:") + print(" suite -> 预置测试套件列表,如 ['online','offline']") + print(" tests -> 自定义测试文件路径列表") + print(" mode -> TEST_MODE,ONLINE / OFFLINE") + print(" db_dsn -> TEST_DB_DSN,连接真实 PostgreSQL") + print(" json_archive -> TEST_JSON_ARCHIVE_DIR,离线 JSON 目录") + print(" json_temp -> TEST_JSON_TEMP_DIR,离线临时目录") + print(" keyword -> pytest -k 过滤关键字") + print(" pytest_args -> 额外 pytest 参数(单个字符串)") + print(" env -> 附加环境变量,形如 ['KEY=VALUE']") + print(" preset_meta -> 注释说明,不会传给 run_tests") + print() + + +def print_presets(): + if not PRESETS: + print("当前没有定义任何预置命令,可自行在 PRESETS 中添加。") + return + for idx, (name, payload) in enumerate(PRESETS.items(), start=1): + comment = payload.get("preset_meta", "") + print(f"{idx}. {name}") + if comment: + print(f" 说明: {comment}") + for key, value in payload.items(): + if key == "preset_meta": + continue + print(f" {key}: {value}") + print() + + +def run_presets(preset_names: List[str], dry_run: bool): + cmds = [] + for name in preset_names: + cmd = [sys.executable, RUN_TESTS_SCRIPT, "--preset", name] + cmds.append(cmd) + + for cmd in cmds: + printable = " ".join(cmd) + if dry_run: + print(f"[Dry-Run] {printable}") + else: + print(f"\n>>> 执行: {printable}") + subprocess.run(cmd, check=False) + + +def main(): + parser = argparse.ArgumentParser(description="测试预置仓库(在此集中配置并运行测试组合)") + parser.add_argument("--preset", choices=sorted(PRESETS.keys()), nargs="+", help="直接指定要运行的预置命令") + parser.add_argument("--list", action="store_true", help="仅列出参数键和所有预置命令") + parser.add_argument("--dry-run", action="store_true", help="仅打印将要执行的命令,而不真正运行") + args = parser.parse_args() + + if args.list: + print_parameter_help() + print_presets() + return + + if args.preset: + target = args.preset + else: + target = AUTO_RUN_PRESETS or list(PRESETS.keys()) + + run_presets(target, dry_run=args.dry_run) + + +if __name__ == "__main__": + main() diff --git a/etl_billiards/tasks/assistant_abolish_task.py b/etl_billiards/tasks/assistant_abolish_task.py new file mode 100644 index 0000000..b364e32 --- /dev/null +++ b/etl_billiards/tasks/assistant_abolish_task.py @@ -0,0 +1,83 @@ +# -*- coding: utf-8 -*- +"""助教作废任务""" + +import json + +from .base_task import BaseTask +from loaders.facts.assistant_abolish import AssistantAbolishLoader +from models.parsers import TypeParser + + +class AssistantAbolishTask(BaseTask): + """同步助教作废记录""" + + def get_task_code(self) -> str: + return "ASSISTANT_ABOLISH" + + def execute(self) -> dict: + self.logger.info("开始执行 ASSISTANT_ABOLISH 任务") + window_start, window_end, _ = self._get_time_window() + params = { + "storeId": self.config.get("app.store_id"), + "startTime": TypeParser.format_timestamp(window_start, self.tz), + "endTime": TypeParser.format_timestamp(window_end, self.tz), + } + + try: + records, _ = self.api.get_paginated( + endpoint="/Assistant/AbolishList", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=("data", "abolitionAssistants"), + ) + + parsed = [] + for raw in records: + mapped = self._parse_record(raw) + if mapped: + parsed.append(mapped) + + loader = AssistantAbolishLoader(self.db) + inserted, updated, skipped = loader.upsert_records(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"ASSISTANT_ABOLISH 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("ASSISTANT_ABOLISH 失败", exc_info=True) + raise + + def _parse_record(self, raw: dict) -> dict | None: + abolish_id = TypeParser.parse_int(raw.get("id")) + if not abolish_id: + self.logger.warning("跳过缺少 id 的助教作废记录: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "abolish_id": abolish_id, + "table_id": TypeParser.parse_int(raw.get("tableId")), + "table_name": raw.get("tableName"), + "table_area_id": TypeParser.parse_int(raw.get("tableAreaId")), + "table_area": raw.get("tableArea"), + "assistant_no": raw.get("assistantOn"), + "assistant_name": raw.get("assistantName"), + "charge_minutes": TypeParser.parse_int(raw.get("pdChargeMinutes")), + "abolish_amount": TypeParser.parse_decimal( + raw.get("assistantAbolishAmount") + ), + "create_time": TypeParser.parse_timestamp( + raw.get("createTime") or raw.get("create_time"), self.tz + ), + "trash_reason": raw.get("trashReason"), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/assistants_task.py b/etl_billiards/tasks/assistants_task.py new file mode 100644 index 0000000..a5794ea --- /dev/null +++ b/etl_billiards/tasks/assistants_task.py @@ -0,0 +1,103 @@ +# -*- coding: utf-8 -*- +"""助教账号任务""" + +import json + +from .base_task import BaseTask +from loaders.dimensions.assistant import AssistantLoader +from models.parsers import TypeParser + + +class AssistantsTask(BaseTask): + """同步助教账号资料""" + + def get_task_code(self) -> str: + return "ASSISTANTS" + + def execute(self) -> dict: + self.logger.info("开始执行 ASSISTANTS 任务") + params = {"storeId": self.config.get("app.store_id")} + + try: + records, _ = self.api.get_paginated( + endpoint="/Assistant/List", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=("data", "assistantInfos"), + ) + + parsed = [] + for raw in records: + mapped = self._parse_assistant(raw) + if mapped: + parsed.append(mapped) + + loader = AssistantLoader(self.db) + inserted, updated, skipped = loader.upsert_assistants(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"ASSISTANTS 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("ASSISTANTS 失败", exc_info=True) + raise + + def _parse_assistant(self, raw: dict) -> dict | None: + assistant_id = TypeParser.parse_int(raw.get("id")) + if not assistant_id: + self.logger.warning("跳过缺少 id 的助教数据: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "assistant_id": assistant_id, + "assistant_no": raw.get("assistant_no") or raw.get("assistantNo"), + "nickname": raw.get("nickname"), + "real_name": raw.get("real_name") or raw.get("realName"), + "gender": raw.get("gender"), + "mobile": raw.get("mobile"), + "level": raw.get("level"), + "team_id": TypeParser.parse_int(raw.get("team_id") or raw.get("teamId")), + "team_name": raw.get("team_name"), + "assistant_status": raw.get("assistant_status"), + "work_status": raw.get("work_status"), + "entry_time": TypeParser.parse_timestamp( + raw.get("entry_time") or raw.get("entryTime"), self.tz + ), + "resign_time": TypeParser.parse_timestamp( + raw.get("resign_time") or raw.get("resignTime"), self.tz + ), + "start_time": TypeParser.parse_timestamp( + raw.get("start_time") or raw.get("startTime"), self.tz + ), + "end_time": TypeParser.parse_timestamp( + raw.get("end_time") or raw.get("endTime"), self.tz + ), + "create_time": TypeParser.parse_timestamp( + raw.get("create_time") or raw.get("createTime"), self.tz + ), + "update_time": TypeParser.parse_timestamp( + raw.get("update_time") or raw.get("updateTime"), self.tz + ), + "system_role_id": raw.get("system_role_id"), + "online_status": raw.get("online_status"), + "allow_cx": raw.get("allow_cx"), + "charge_way": raw.get("charge_way"), + "pd_unit_price": TypeParser.parse_decimal(raw.get("pd_unit_price")), + "cx_unit_price": TypeParser.parse_decimal(raw.get("cx_unit_price")), + "is_guaranteed": raw.get("is_guaranteed"), + "is_team_leader": raw.get("is_team_leader"), + "serial_number": raw.get("serial_number"), + "show_sort": raw.get("show_sort"), + "is_delete": raw.get("is_delete"), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/coupon_usage_task.py b/etl_billiards/tasks/coupon_usage_task.py new file mode 100644 index 0000000..91453fe --- /dev/null +++ b/etl_billiards/tasks/coupon_usage_task.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +"""平台券核销任务""" + +import json + +from .base_task import BaseTask +from loaders.facts.coupon_usage import CouponUsageLoader +from models.parsers import TypeParser + + +class CouponUsageTask(BaseTask): + """同步平台券验证/核销记录""" + + def get_task_code(self) -> str: + return "COUPON_USAGE" + + def execute(self) -> dict: + self.logger.info("开始执行 COUPON_USAGE 任务") + window_start, window_end, _ = self._get_time_window() + params = { + "storeId": self.config.get("app.store_id"), + "startTime": TypeParser.format_timestamp(window_start, self.tz), + "endTime": TypeParser.format_timestamp(window_end, self.tz), + } + + try: + records, _ = self.api.get_paginated( + endpoint="/Coupon/UsageList", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=(), + ) + + parsed = [] + for raw in records: + mapped = self._parse_usage(raw) + if mapped: + parsed.append(mapped) + + loader = CouponUsageLoader(self.db) + inserted, updated, skipped = loader.upsert_coupon_usage(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"COUPON_USAGE 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("COUPON_USAGE 失败", exc_info=True) + raise + + def _parse_usage(self, raw: dict) -> dict | None: + usage_id = TypeParser.parse_int(raw.get("id")) + if not usage_id: + self.logger.warning("跳过缺少 id 的券核销记录: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "usage_id": usage_id, + "coupon_code": raw.get("coupon_code"), + "coupon_channel": raw.get("coupon_channel"), + "coupon_name": raw.get("coupon_name"), + "sale_price": TypeParser.parse_decimal(raw.get("sale_price")), + "coupon_money": TypeParser.parse_decimal(raw.get("coupon_money")), + "coupon_free_time": TypeParser.parse_int(raw.get("coupon_free_time")), + "use_status": raw.get("use_status"), + "create_time": TypeParser.parse_timestamp( + raw.get("create_time") or raw.get("createTime"), self.tz + ), + "consume_time": TypeParser.parse_timestamp( + raw.get("consume_time") or raw.get("consumeTime"), self.tz + ), + "operator_id": TypeParser.parse_int(raw.get("operator_id")), + "operator_name": raw.get("operator_name"), + "table_id": TypeParser.parse_int(raw.get("table_id")), + "site_order_id": TypeParser.parse_int(raw.get("site_order_id")), + "group_package_id": TypeParser.parse_int(raw.get("group_package_id")), + "coupon_remark": raw.get("coupon_remark"), + "deal_id": raw.get("deal_id"), + "certificate_id": raw.get("certificate_id"), + "verify_id": raw.get("verify_id"), + "is_delete": raw.get("is_delete"), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/inventory_change_task.py b/etl_billiards/tasks/inventory_change_task.py new file mode 100644 index 0000000..99e10eb --- /dev/null +++ b/etl_billiards/tasks/inventory_change_task.py @@ -0,0 +1,90 @@ +# -*- coding: utf-8 -*- +"""库存变更任务""" + +import json + +from .base_task import BaseTask +from loaders.facts.inventory_change import InventoryChangeLoader +from models.parsers import TypeParser + + +class InventoryChangeTask(BaseTask): + """同步库存变化记录""" + + def get_task_code(self) -> str: + return "INVENTORY_CHANGE" + + def execute(self) -> dict: + self.logger.info("开始执行 INVENTORY_CHANGE 任务") + window_start, window_end, _ = self._get_time_window() + params = { + "storeId": self.config.get("app.store_id"), + "startTime": TypeParser.format_timestamp(window_start, self.tz), + "endTime": TypeParser.format_timestamp(window_end, self.tz), + } + + try: + records, _ = self.api.get_paginated( + endpoint="/Inventory/ChangeList", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=("data", "queryDeliveryRecordsList"), + ) + + parsed = [] + for raw in records: + mapped = self._parse_change(raw) + if mapped: + parsed.append(mapped) + + loader = InventoryChangeLoader(self.db) + inserted, updated, skipped = loader.upsert_changes(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"INVENTORY_CHANGE 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("INVENTORY_CHANGE 失败", exc_info=True) + raise + + def _parse_change(self, raw: dict) -> dict | None: + change_id = TypeParser.parse_int( + raw.get("siteGoodsStockId") or raw.get("site_goods_stock_id") + ) + if not change_id: + self.logger.warning("跳过缺少变动 id 的库存记录: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "change_id": change_id, + "site_goods_id": TypeParser.parse_int( + raw.get("siteGoodsId") or raw.get("site_goods_id") + ), + "stock_type": raw.get("stockType") or raw.get("stock_type"), + "goods_name": raw.get("goodsName"), + "change_time": TypeParser.parse_timestamp( + raw.get("createTime") or raw.get("create_time"), self.tz + ), + "start_qty": TypeParser.parse_int(raw.get("startNum")), + "end_qty": TypeParser.parse_int(raw.get("endNum")), + "change_qty": TypeParser.parse_int(raw.get("changeNum")), + "unit": raw.get("unit"), + "price": TypeParser.parse_decimal(raw.get("price")), + "operator_name": raw.get("operatorName"), + "remark": raw.get("remark"), + "goods_category_id": TypeParser.parse_int(raw.get("goodsCategoryId")), + "goods_second_category_id": TypeParser.parse_int( + raw.get("goodsSecondCategoryId") + ), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/ledger_task.py b/etl_billiards/tasks/ledger_task.py new file mode 100644 index 0000000..bd8f9fa --- /dev/null +++ b/etl_billiards/tasks/ledger_task.py @@ -0,0 +1,119 @@ +# -*- coding: utf-8 -*- +"""助教流水任务""" + +import json + +from .base_task import BaseTask +from loaders.facts.assistant_ledger import AssistantLedgerLoader +from models.parsers import TypeParser + + +class LedgerTask(BaseTask): + """同步助教服务台账""" + + def get_task_code(self) -> str: + return "LEDGER" + + def execute(self) -> dict: + self.logger.info("开始执行 LEDGER 任务") + window_start, window_end, _ = self._get_time_window() + params = { + "storeId": self.config.get("app.store_id"), + "startTime": TypeParser.format_timestamp(window_start, self.tz), + "endTime": TypeParser.format_timestamp(window_end, self.tz), + } + + try: + records, _ = self.api.get_paginated( + endpoint="/Assistant/LedgerList", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=("data", "orderAssistantDetails"), + ) + + parsed = [] + for raw in records: + mapped = self._parse_ledger(raw) + if mapped: + parsed.append(mapped) + + loader = AssistantLedgerLoader(self.db) + inserted, updated, skipped = loader.upsert_ledgers(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"LEDGER 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("LEDGER 失败", exc_info=True) + raise + + def _parse_ledger(self, raw: dict) -> dict | None: + ledger_id = TypeParser.parse_int(raw.get("id")) + if not ledger_id: + self.logger.warning("跳过缺少 id 的助教流水: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "ledger_id": ledger_id, + "assistant_no": raw.get("assistantNo"), + "assistant_name": raw.get("assistantName"), + "nickname": raw.get("nickname"), + "level_name": raw.get("levelName"), + "table_name": raw.get("tableName"), + "ledger_unit_price": TypeParser.parse_decimal(raw.get("ledger_unit_price")), + "ledger_count": TypeParser.parse_int(raw.get("ledger_count")), + "ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")), + "projected_income": TypeParser.parse_decimal(raw.get("projected_income")), + "service_money": TypeParser.parse_decimal(raw.get("service_money")), + "member_discount_amount": TypeParser.parse_decimal( + raw.get("member_discount_amount") + ), + "manual_discount_amount": TypeParser.parse_decimal( + raw.get("manual_discount_amount") + ), + "coupon_deduct_money": TypeParser.parse_decimal( + raw.get("coupon_deduct_money") + ), + "order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")), + "order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")), + "operator_id": TypeParser.parse_int(raw.get("operator_id")), + "operator_name": raw.get("operator_name"), + "assistant_team_id": TypeParser.parse_int(raw.get("assistant_team_id")), + "assistant_level": raw.get("assistant_level"), + "site_table_id": TypeParser.parse_int(raw.get("site_table_id")), + "order_assistant_id": TypeParser.parse_int(raw.get("order_assistant_id")), + "site_assistant_id": TypeParser.parse_int(raw.get("site_assistant_id")), + "user_id": TypeParser.parse_int(raw.get("user_id")), + "ledger_start_time": TypeParser.parse_timestamp( + raw.get("ledger_start_time"), self.tz + ), + "ledger_end_time": TypeParser.parse_timestamp( + raw.get("ledger_end_time"), self.tz + ), + "start_use_time": TypeParser.parse_timestamp( + raw.get("start_use_time"), self.tz + ), + "last_use_time": TypeParser.parse_timestamp( + raw.get("last_use_time"), self.tz + ), + "income_seconds": TypeParser.parse_int(raw.get("income_seconds")), + "real_use_seconds": TypeParser.parse_int(raw.get("real_use_seconds")), + "is_trash": raw.get("is_trash"), + "trash_reason": raw.get("trash_reason"), + "is_confirm": raw.get("is_confirm"), + "ledger_status": raw.get("ledger_status"), + "create_time": TypeParser.parse_timestamp( + raw.get("create_time") or raw.get("createTime"), self.tz + ), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/packages_task.py b/etl_billiards/tasks/packages_task.py new file mode 100644 index 0000000..8a98a4e --- /dev/null +++ b/etl_billiards/tasks/packages_task.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +"""团购/套餐定义任务""" + +import json + +from .base_task import BaseTask +from loaders.dimensions.package import PackageDefinitionLoader +from models.parsers import TypeParser + + +class PackagesDefTask(BaseTask): + """同步团购套餐定义""" + + def get_task_code(self) -> str: + return "PACKAGES_DEF" + + def execute(self) -> dict: + self.logger.info("开始执行 PACKAGES_DEF 任务") + params = {"storeId": self.config.get("app.store_id")} + + try: + records, _ = self.api.get_paginated( + endpoint="/Package/List", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=("data", "packageCouponList"), + ) + + parsed = [] + for raw in records: + mapped = self._parse_package(raw) + if mapped: + parsed.append(mapped) + + loader = PackageDefinitionLoader(self.db) + inserted, updated, skipped = loader.upsert_packages(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"PACKAGES_DEF 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("PACKAGES_DEF 失败", exc_info=True) + raise + + def _parse_package(self, raw: dict) -> dict | None: + package_id = TypeParser.parse_int(raw.get("id")) + if not package_id: + self.logger.warning("跳过缺少 id 的套餐数据: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "package_id": package_id, + "package_code": raw.get("package_id") or raw.get("packageId"), + "package_name": raw.get("package_name"), + "table_area_id": raw.get("table_area_id"), + "table_area_name": raw.get("table_area_name"), + "selling_price": TypeParser.parse_decimal( + raw.get("selling_price") or raw.get("sellingPrice") + ), + "duration_seconds": TypeParser.parse_int(raw.get("duration")), + "start_time": TypeParser.parse_timestamp( + raw.get("start_time") or raw.get("startTime"), self.tz + ), + "end_time": TypeParser.parse_timestamp( + raw.get("end_time") or raw.get("endTime"), self.tz + ), + "type": raw.get("type"), + "is_enabled": raw.get("is_enabled"), + "is_delete": raw.get("is_delete"), + "usable_count": TypeParser.parse_int(raw.get("usable_count")), + "creator_name": raw.get("creator_name"), + "date_type": raw.get("date_type"), + "group_type": raw.get("group_type"), + "coupon_money": TypeParser.parse_decimal( + raw.get("coupon_money") or raw.get("couponMoney") + ), + "area_tag_type": raw.get("area_tag_type"), + "system_group_type": raw.get("system_group_type"), + "card_type_ids": raw.get("card_type_ids"), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/refunds_task.py b/etl_billiards/tasks/refunds_task.py new file mode 100644 index 0000000..06cd8c7 --- /dev/null +++ b/etl_billiards/tasks/refunds_task.py @@ -0,0 +1,91 @@ +# -*- coding: utf-8 -*- +"""退款记录任务""" + +import json + +from .base_task import BaseTask +from loaders.facts.refund import RefundLoader +from models.parsers import TypeParser + + +class RefundsTask(BaseTask): + """同步支付退款流水""" + + def get_task_code(self) -> str: + return "REFUNDS" + + def execute(self) -> dict: + self.logger.info("开始执行 REFUNDS 任务") + window_start, window_end, _ = self._get_time_window() + params = { + "storeId": self.config.get("app.store_id"), + "startTime": TypeParser.format_timestamp(window_start, self.tz), + "endTime": TypeParser.format_timestamp(window_end, self.tz), + } + + try: + records, _ = self.api.get_paginated( + endpoint="/Pay/RefundList", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=(), + ) + + parsed = [] + for raw in records: + mapped = self._parse_refund(raw) + if mapped: + parsed.append(mapped) + + loader = RefundLoader(self.db) + inserted, updated, skipped = loader.upsert_refunds(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"REFUNDS 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("REFUNDS 失败", exc_info=True) + raise + + def _parse_refund(self, raw: dict) -> dict | None: + refund_id = TypeParser.parse_int(raw.get("id")) + if not refund_id: + self.logger.warning("跳过缺少 id 的退款记录: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "refund_id": refund_id, + "site_id": TypeParser.parse_int(raw.get("site_id") or raw.get("siteId")), + "tenant_id": TypeParser.parse_int(raw.get("tenant_id") or raw.get("tenantId")), + "pay_amount": TypeParser.parse_decimal(raw.get("pay_amount")), + "pay_status": raw.get("pay_status"), + "pay_time": TypeParser.parse_timestamp( + raw.get("pay_time") or raw.get("payTime"), self.tz + ), + "create_time": TypeParser.parse_timestamp( + raw.get("create_time") or raw.get("createTime"), self.tz + ), + "relate_type": raw.get("relate_type"), + "relate_id": TypeParser.parse_int(raw.get("relate_id")), + "payment_method": raw.get("payment_method"), + "refund_amount": TypeParser.parse_decimal(raw.get("refund_amount")), + "action_type": raw.get("action_type"), + "pay_terminal": raw.get("pay_terminal"), + "operator_id": TypeParser.parse_int(raw.get("operator_id")), + "channel_pay_no": raw.get("channel_pay_no"), + "channel_fee": TypeParser.parse_decimal(raw.get("channel_fee")), + "is_delete": raw.get("is_delete"), + "member_id": TypeParser.parse_int(raw.get("member_id")), + "member_card_id": TypeParser.parse_int(raw.get("member_card_id")), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/table_discount_task.py b/etl_billiards/tasks/table_discount_task.py new file mode 100644 index 0000000..4d17399 --- /dev/null +++ b/etl_billiards/tasks/table_discount_task.py @@ -0,0 +1,92 @@ +# -*- coding: utf-8 -*- +"""台费折扣任务""" + +import json + +from .base_task import BaseTask +from loaders.facts.table_discount import TableDiscountLoader +from models.parsers import TypeParser + + +class TableDiscountTask(BaseTask): + """同步台费折扣/调价记录""" + + def get_task_code(self) -> str: + return "TABLE_DISCOUNT" + + def execute(self) -> dict: + self.logger.info("开始执行 TABLE_DISCOUNT 任务") + window_start, window_end, _ = self._get_time_window() + params = { + "storeId": self.config.get("app.store_id"), + "startTime": TypeParser.format_timestamp(window_start, self.tz), + "endTime": TypeParser.format_timestamp(window_end, self.tz), + } + + try: + records, _ = self.api.get_paginated( + endpoint="/Table/AdjustList", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=("data", "taiFeeAdjustInfos"), + ) + + parsed = [] + for raw in records: + mapped = self._parse_discount(raw) + if mapped: + parsed.append(mapped) + + loader = TableDiscountLoader(self.db) + inserted, updated, skipped = loader.upsert_discounts(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"TABLE_DISCOUNT 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("TABLE_DISCOUNT 失败", exc_info=True) + raise + + def _parse_discount(self, raw: dict) -> dict | None: + discount_id = TypeParser.parse_int(raw.get("id")) + if not discount_id: + self.logger.warning("跳过缺少 id 的台费折扣记录: %s", raw) + return None + + table_profile = raw.get("tableProfile") or {} + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "discount_id": discount_id, + "adjust_type": raw.get("adjust_type") or raw.get("adjustType"), + "applicant_id": TypeParser.parse_int(raw.get("applicant_id")), + "applicant_name": raw.get("applicant_name"), + "operator_id": TypeParser.parse_int(raw.get("operator_id")), + "operator_name": raw.get("operator_name"), + "ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")), + "ledger_count": TypeParser.parse_int(raw.get("ledger_count")), + "ledger_name": raw.get("ledger_name"), + "ledger_status": raw.get("ledger_status"), + "order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")), + "order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")), + "site_table_id": TypeParser.parse_int( + raw.get("site_table_id") or table_profile.get("id") + ), + "table_area_id": TypeParser.parse_int( + raw.get("tableAreaId") or table_profile.get("site_table_area_id") + ), + "table_area_name": table_profile.get("site_table_area_name"), + "create_time": TypeParser.parse_timestamp( + raw.get("create_time") or raw.get("createTime"), self.tz + ), + "is_delete": raw.get("is_delete"), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/tables_task.py b/etl_billiards/tasks/tables_task.py index 93e925a..acca631 100644 --- a/etl_billiards/tasks/tables_task.py +++ b/etl_billiards/tasks/tables_task.py @@ -1,4 +1,85 @@ +# -*- coding: utf-8 -*- +"""台桌档案任务""" + +import json + +from .base_task import BaseTask +from loaders.dimensions.table import TableLoader +from models.parsers import TypeParser + + class TablesTask(BaseTask): - def get_task_code(self) -> str: # 返回 "TABLES" - def execute(self) -> dict: # 拉取 /Table/GetSiteTables - def _parse_table(self, raw: dict) -> dict | None: \ No newline at end of file + """同步门店台桌列表""" + + def get_task_code(self) -> str: + return "TABLES" + + def execute(self) -> dict: + self.logger.info("开始执行 TABLES 任务") + params = {"storeId": self.config.get("app.store_id")} + + try: + records, _ = self.api.get_paginated( + endpoint="/Table/GetSiteTables", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=("data", "siteTables"), + ) + + parsed = [] + for raw in records: + mapped = self._parse_table(raw) + if mapped: + parsed.append(mapped) + + loader = TableLoader(self.db) + inserted, updated, skipped = loader.upsert_tables(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"TABLES 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("TABLES 失败", exc_info=True) + raise + + def _parse_table(self, raw: dict) -> dict | None: + table_id = TypeParser.parse_int(raw.get("id")) + if not table_id: + self.logger.warning("跳过缺少 table_id 的台桌记录: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "table_id": table_id, + "site_id": TypeParser.parse_int(raw.get("site_id") or raw.get("siteId")), + "area_id": TypeParser.parse_int( + raw.get("site_table_area_id") or raw.get("siteTableAreaId") + ), + "area_name": raw.get("areaName") or raw.get("site_table_area_name"), + "table_name": raw.get("table_name") or raw.get("tableName"), + "table_price": TypeParser.parse_decimal( + raw.get("table_price") or raw.get("tablePrice") + ), + "table_status": raw.get("table_status") or raw.get("tableStatus"), + "table_status_name": raw.get("tableStatusName"), + "light_status": raw.get("light_status"), + "is_rest_area": raw.get("is_rest_area"), + "show_status": raw.get("show_status"), + "virtual_table": raw.get("virtual_table"), + "charge_free": raw.get("charge_free"), + "only_allow_groupon": raw.get("only_allow_groupon"), + "is_online_reservation": raw.get("is_online_reservation"), + "created_time": TypeParser.parse_timestamp( + raw.get("create_time") or raw.get("createTime"), self.tz + ), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tasks/topups_task.py b/etl_billiards/tasks/topups_task.py new file mode 100644 index 0000000..26ea0ad --- /dev/null +++ b/etl_billiards/tasks/topups_task.py @@ -0,0 +1,102 @@ +# -*- coding: utf-8 -*- +"""充值记录任务""" + +import json + +from .base_task import BaseTask +from loaders.facts.topup import TopupLoader +from models.parsers import TypeParser + + +class TopupsTask(BaseTask): + """同步储值充值结算记录""" + + def get_task_code(self) -> str: + return "TOPUPS" + + def execute(self) -> dict: + self.logger.info("开始执行 TOPUPS 任务") + window_start, window_end, _ = self._get_time_window() + params = { + "storeId": self.config.get("app.store_id"), + "startTime": TypeParser.format_timestamp(window_start, self.tz), + "endTime": TypeParser.format_timestamp(window_end, self.tz), + } + + try: + records, _ = self.api.get_paginated( + endpoint="/Topup/SettleList", + params=params, + page_size=self.config.get("api.page_size", 200), + data_path=("data", "settleList"), + ) + + parsed = [] + for raw in records: + mapped = self._parse_topup(raw) + if mapped: + parsed.append(mapped) + + loader = TopupLoader(self.db) + inserted, updated, skipped = loader.upsert_topups(parsed) + + self.db.commit() + counts = { + "fetched": len(records), + "inserted": inserted, + "updated": updated, + "skipped": skipped, + "errors": 0, + } + self.logger.info(f"TOPUPS 完成: {counts}") + return self._build_result("SUCCESS", counts) + except Exception: + self.db.rollback() + self.logger.error("TOPUPS 失败", exc_info=True) + raise + + def _parse_topup(self, raw: dict) -> dict | None: + node = raw.get("settleList") if isinstance(raw.get("settleList"), dict) else raw + topup_id = TypeParser.parse_int(node.get("id")) + if not topup_id: + self.logger.warning("跳过缺少 id 的充值结算: %s", raw) + return None + + store_id = self.config.get("app.store_id") + return { + "store_id": store_id, + "topup_id": topup_id, + "member_id": TypeParser.parse_int(node.get("memberId")), + "member_name": node.get("memberName"), + "member_phone": node.get("memberPhone"), + "card_id": TypeParser.parse_int(node.get("tenantMemberCardId")), + "card_type_name": node.get("memberCardTypeName"), + "pay_amount": TypeParser.parse_decimal(node.get("payAmount")), + "consume_money": TypeParser.parse_decimal(node.get("consumeMoney")), + "settle_status": node.get("settleStatus"), + "settle_type": node.get("settleType"), + "settle_name": node.get("settleName"), + "settle_relate_id": TypeParser.parse_int(node.get("settleRelateId")), + "pay_time": TypeParser.parse_timestamp( + node.get("payTime") or node.get("pay_time"), self.tz + ), + "create_time": TypeParser.parse_timestamp( + node.get("createTime") or node.get("create_time"), self.tz + ), + "operator_id": TypeParser.parse_int(node.get("operatorId")), + "operator_name": node.get("operatorName"), + "payment_method": node.get("paymentMethod"), + "refund_amount": TypeParser.parse_decimal(node.get("refundAmount")), + "cash_amount": TypeParser.parse_decimal(node.get("cashAmount")), + "card_amount": TypeParser.parse_decimal(node.get("cardAmount")), + "balance_amount": TypeParser.parse_decimal(node.get("balanceAmount")), + "online_amount": TypeParser.parse_decimal(node.get("onlineAmount")), + "rounding_amount": TypeParser.parse_decimal(node.get("roundingAmount")), + "adjust_amount": TypeParser.parse_decimal(node.get("adjustAmount")), + "goods_money": TypeParser.parse_decimal(node.get("goodsMoney")), + "table_charge_money": TypeParser.parse_decimal(node.get("tableChargeMoney")), + "service_money": TypeParser.parse_decimal(node.get("serviceMoney")), + "coupon_amount": TypeParser.parse_decimal(node.get("couponAmount")), + "order_remark": node.get("orderRemark"), + "raw_data": json.dumps(raw, ensure_ascii=False), + } diff --git a/etl_billiards/tests/unit/task_test_utils.py b/etl_billiards/tests/unit/task_test_utils.py new file mode 100644 index 0000000..7d7e7ac --- /dev/null +++ b/etl_billiards/tests/unit/task_test_utils.py @@ -0,0 +1,638 @@ +# -*- coding: utf-8 -*- +"""ETL 任务测试的共用辅助模块,涵盖在线/离线模式所需的伪造数据、客户端与配置等工具函数。""" +from __future__ import annotations + +import json +import os +from contextlib import contextmanager +from dataclasses import dataclass +from pathlib import Path +from typing import Dict, List, Sequence, Tuple, Type + +from config.settings import AppConfig +from database.connection import DatabaseConnection +from database.operations import DatabaseOperations as PgDBOperations +from tasks.assistant_abolish_task import AssistantAbolishTask +from tasks.assistants_task import AssistantsTask +from tasks.coupon_usage_task import CouponUsageTask +from tasks.inventory_change_task import InventoryChangeTask +from tasks.ledger_task import LedgerTask +from tasks.members_task import MembersTask +from tasks.orders_task import OrdersTask +from tasks.packages_task import PackagesDefTask +from tasks.payments_task import PaymentsTask +from tasks.products_task import ProductsTask +from tasks.refunds_task import RefundsTask +from tasks.table_discount_task import TableDiscountTask +from tasks.tables_task import TablesTask +from tasks.topups_task import TopupsTask + +DEFAULT_STORE_ID = 2790685415443269 +BASE_TS = "2025-01-01 10:00:00" +END_TS = "2025-01-01 12:00:00" + + +@dataclass(frozen=True) +class TaskSpec: + """描述单个任务在测试中如何被驱动的元数据,包含任务代码、API 路径、数据路径与样例记录。""" + + code: str + task_cls: Type + endpoint: str + data_path: Tuple[str, ...] + sample_records: List[Dict] + + @property + def archive_filename(self) -> str: + return endpoint_to_filename(self.endpoint) + + +def endpoint_to_filename(endpoint: str) -> str: + """根据 API endpoint 生成稳定可复用的文件名,便于离线模式在目录中直接定位归档 JSON。""" + normalized = endpoint.strip("/").replace("/", "__").replace(" ", "_").lower() + return f"{normalized or 'root'}.json" + + +def wrap_records(records: List[Dict], data_path: Sequence[str]): + """按照 data_path 逐层包裹记录列表,使其结构与真实 API 返回体一致,方便离线回放。""" + payload = records + for key in reversed(data_path): + payload = {key: payload} + return payload + + +def create_test_config(mode: str, archive_dir: Path, temp_dir: Path) -> AppConfig: + """构建一份适合测试的 AppConfig,自动填充存储、日志、归档目录等参数并保证目录存在。""" + archive_dir = Path(archive_dir) + temp_dir = Path(temp_dir) + archive_dir.mkdir(parents=True, exist_ok=True) + temp_dir.mkdir(parents=True, exist_ok=True) + + overrides = { + "app": {"store_id": DEFAULT_STORE_ID, "timezone": "Asia/Taipei"}, + "db": {"dsn": "postgresql://user:pass@localhost:5432/etl_billiards_test"}, + "api": { + "base_url": "https://api.example.com", + "token": "test-token", + "timeout_sec": 3, + "page_size": 50, + }, + "testing": { + "mode": mode, + "json_archive_dir": str(archive_dir), + "temp_json_dir": str(temp_dir), + }, + "io": { + "export_root": str(temp_dir / "export"), + "log_root": str(temp_dir / "logs"), + }, + } + return AppConfig.load(overrides) + + +def dump_offline_payload(spec: TaskSpec, archive_dir: Path) -> Path: + """将 TaskSpec 的样例数据写入指定归档目录,供离线测试回放使用,并返回生成文件的完整路径。""" + archive_dir = Path(archive_dir) + payload = wrap_records(spec.sample_records, spec.data_path) + file_path = archive_dir / spec.archive_filename + with file_path.open("w", encoding="utf-8") as fp: + json.dump(payload, fp, ensure_ascii=False) + return file_path + + +class FakeCursor: + """极简游标桩对象,记录 SQL/参数并支持上下文管理,供 FakeDBOperations 与 SCD2Handler 使用。""" + + def __init__(self, recorder: List[Dict]): + self.recorder = recorder + + # pylint: disable=unused-argument + def execute(self, sql: str, params=None): + self.recorder.append({"sql": sql.strip(), "params": params}) + + def fetchone(self): + return None + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +class FakeConnection: + """仿 psycopg 连接对象,仅满足 SCD2Handler 对 cursor 的最小需求,并缓存执行过的语句。""" + + def __init__(self): + self.statements: List[Dict] = [] + + def cursor(self): + return FakeCursor(self.statements) + + +class FakeDBOperations: + """拦截并记录批量 upsert/事务操作,避免触碰真实数据库,同时提供 commit/rollback 计数。""" + + def __init__(self): + self.upserts: List[Dict] = [] + self.commits = 0 + self.rollbacks = 0 + self.conn = FakeConnection() + + def batch_upsert_with_returning(self, sql: str, rows: List[Dict], page_size: int = 1000): + self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size}) + return len(rows), 0 + + def batch_execute(self, sql: str, rows: List[Dict], page_size: int = 1000): + self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size}) + + def commit(self): + self.commits += 1 + + def rollback(self): + self.rollbacks += 1 + + +class FakeAPIClient: + """在线模式使用的伪 API Client,直接返回预置的内存数据并记录调用,以确保任务参数正确传递。""" + + def __init__(self, data_map: Dict[str, List[Dict]]): + self.data_map = data_map + self.calls: List[Dict] = [] + + # pylint: disable=unused-argument + def get_paginated(self, endpoint: str, params=None, **kwargs): + self.calls.append({"endpoint": endpoint, "params": params}) + if endpoint not in self.data_map: + raise AssertionError(f"Missing fixture for endpoint {endpoint}") + return list(self.data_map[endpoint]), [{"page": 1, "size": len(self.data_map[endpoint])}] + + +class OfflineAPIClient: + """离线模式专用 API Client,根据 endpoint 读取归档 JSON、套用 data_path 并回放列表数据。""" + + def __init__(self, file_map: Dict[str, Path]): + self.file_map = {k: Path(v) for k, v in file_map.items()} + self.calls: List[Dict] = [] + + # pylint: disable=unused-argument + def get_paginated(self, endpoint: str, params=None, page_size: int = 200, data_path: Tuple[str, ...] = (), **kwargs): + self.calls.append({"endpoint": endpoint, "params": params}) + if endpoint not in self.file_map: + raise AssertionError(f"Missing archive for endpoint {endpoint}") + + with self.file_map[endpoint].open("r", encoding="utf-8") as fp: + payload = json.load(fp) + + data = payload + for key in data_path: + if isinstance(data, dict): + data = data.get(key, []) + else: + data = [] + break + + if not isinstance(data, list): + data = [] + + return data, [{"page": 1, "mode": "offline"}] + + +class RealDBOperationsAdapter: + """连接真实 PostgreSQL 的适配器,为任务提供 batch_upsert + 事务能力。""" + + def __init__(self, dsn: str): + self._conn = DatabaseConnection(dsn) + self._ops = PgDBOperations(self._conn) + # SCD2Handler 会访问 db.conn.cursor(),因此暴露底层连接 + self.conn = self._conn.conn + + def batch_upsert_with_returning(self, sql: str, rows: List[Dict], page_size: int = 1000): + return self._ops.batch_upsert_with_returning(sql, rows, page_size=page_size) + + def batch_execute(self, sql: str, rows: List[Dict], page_size: int = 1000): + return self._ops.batch_execute(sql, rows, page_size=page_size) + + def commit(self): + self._conn.commit() + + def rollback(self): + self._conn.rollback() + + def close(self): + self._conn.close() + + +@contextmanager +def get_db_operations(): + """ + 测试专用的 DB 操作上下文: + - 若设置 TEST_DB_DSN,则连接真实 PostgreSQL; + - 否则回退到 FakeDBOperations(内存桩)。 + """ + dsn = os.environ.get("TEST_DB_DSN") + if dsn: + adapter = RealDBOperationsAdapter(dsn) + try: + yield adapter + finally: + adapter.close() + else: + fake = FakeDBOperations() + yield fake + + +TASK_SPECS: List[TaskSpec] = [ + TaskSpec( + code="PRODUCTS", + task_cls=ProductsTask, + endpoint="/TenantGoods/QueryTenantGoods", + data_path=("data",), + sample_records=[ + { + "siteGoodsId": 101, + "tenantGoodsId": 101, + "goodsName": "测试球杆", + "goodsCategoryId": 201, + "categoryName": "器材", + "goodsCategorySecondId": 202, + "goodsUnit": "支", + "costPrice": "100.00", + "goodsPrice": "150.00", + "goodsState": "ON", + "supplierId": 20, + "barcode": "PRD001", + "isCombo": False, + "createTime": BASE_TS, + "updateTime": END_TS, + } + ], + ), + TaskSpec( + code="TABLES", + task_cls=TablesTask, + endpoint="/Table/GetSiteTables", + data_path=("data", "siteTables"), + sample_records=[ + { + "id": 301, + "site_id": 30, + "site_table_area_id": 40, + "areaName": "大厅", + "table_name": "1号桌", + "table_price": "50.00", + "table_status": "FREE", + "tableStatusName": "空闲", + "light_status": "OFF", + "is_rest_area": False, + "show_status": True, + "virtual_table": False, + "charge_free": False, + "only_allow_groupon": False, + "is_online_reservation": True, + "createTime": BASE_TS, + } + ], + ), + TaskSpec( + code="MEMBERS", + task_cls=MembersTask, + endpoint="/MemberProfile/GetTenantMemberList", + data_path=("data",), + sample_records=[ + { + "memberId": 401, + "memberName": "张三", + "phone": "13800000000", + "balance": "88.88", + "status": "ACTIVE", + "registerTime": BASE_TS, + } + ], + ), + TaskSpec( + code="ASSISTANTS", + task_cls=AssistantsTask, + endpoint="/Assistant/List", + data_path=("data", "assistantInfos"), + sample_records=[ + { + "id": 501, + "assistant_no": "AS001", + "nickname": "小李", + "real_name": "李雷", + "gender": "M", + "mobile": "13900000000", + "level": "A", + "team_id": 10, + "team_name": "先锋队", + "assistant_status": "ON", + "work_status": "BUSY", + "entry_time": BASE_TS, + "resign_time": END_TS, + "start_time": BASE_TS, + "end_time": END_TS, + "create_time": BASE_TS, + "update_time": END_TS, + "system_role_id": 1, + "online_status": "ONLINE", + "allow_cx": True, + "charge_way": "TIME", + "pd_unit_price": "30.00", + "cx_unit_price": "20.00", + "is_guaranteed": True, + "is_team_leader": False, + "serial_number": "SN001", + "show_sort": 1, + "is_delete": False, + } + ], + ), + TaskSpec( + code="PACKAGES_DEF", + task_cls=PackagesDefTask, + endpoint="/Package/List", + data_path=("data", "packageCouponList"), + sample_records=[ + { + "id": 601, + "package_id": "PKG001", + "package_name": "白天特惠", + "table_area_id": 70, + "table_area_name": "大厅", + "selling_price": "199.00", + "duration": 120, + "start_time": BASE_TS, + "end_time": END_TS, + "type": "Groupon", + "is_enabled": True, + "is_delete": False, + "usable_count": 3, + "creator_name": "系统", + "date_type": "WEEKDAY", + "group_type": "DINE_IN", + "coupon_money": "30.00", + "area_tag_type": "VIP", + "system_group_type": "BASIC", + "card_type_ids": "1,2,3", + } + ], + ), + TaskSpec( + code="ORDERS", + task_cls=OrdersTask, + endpoint="/order/list", + data_path=("data",), + sample_records=[ + { + "orderId": 701, + "orderNo": "ORD001", + "memberId": 401, + "tableId": 301, + "orderTime": BASE_TS, + "endTime": END_TS, + "totalAmount": "300.00", + "discountAmount": "20.00", + "finalAmount": "280.00", + "payStatus": "PAID", + "orderStatus": "CLOSED", + "remark": "测试订单", + } + ], + ), + TaskSpec( + code="PAYMENTS", + task_cls=PaymentsTask, + endpoint="/pay/records", + data_path=("data",), + sample_records=[ + { + "payId": 801, + "orderId": 701, + "payTime": END_TS, + "payAmount": "280.00", + "payType": "CARD", + "payStatus": "SUCCESS", + "remark": "测试支付", + } + ], + ), + TaskSpec( + code="REFUNDS", + task_cls=RefundsTask, + endpoint="/Pay/RefundList", + data_path=(), + sample_records=[ + { + "id": 901, + "site_id": 1, + "tenant_id": 2, + "pay_amount": "100.00", + "pay_status": "SUCCESS", + "pay_time": END_TS, + "create_time": END_TS, + "relate_type": "ORDER", + "relate_id": 701, + "payment_method": "CARD", + "refund_amount": "20.00", + "action_type": "PARTIAL", + "pay_terminal": "POS", + "operator_id": 11, + "channel_pay_no": "CH001", + "channel_fee": "1.00", + "is_delete": False, + "member_id": 401, + "member_card_id": 501, + } + ], + ), + TaskSpec( + code="COUPON_USAGE", + task_cls=CouponUsageTask, + endpoint="/Coupon/UsageList", + data_path=(), + sample_records=[ + { + "id": 1001, + "coupon_code": "CP001", + "coupon_channel": "MEITUAN", + "coupon_name": "双人券", + "sale_price": "50.00", + "coupon_money": "30.00", + "coupon_free_time": 60, + "use_status": "USED", + "create_time": BASE_TS, + "consume_time": END_TS, + "operator_id": 11, + "operator_name": "操作员", + "table_id": 301, + "site_order_id": 701, + "group_package_id": 601, + "coupon_remark": "备注", + "deal_id": "DEAL001", + "certificate_id": "CERT001", + "verify_id": "VERIFY001", + "is_delete": False, + } + ], + ), + TaskSpec( + code="INVENTORY_CHANGE", + task_cls=InventoryChangeTask, + endpoint="/Inventory/ChangeList", + data_path=("data", "queryDeliveryRecordsList"), + sample_records=[ + { + "siteGoodsStockId": 1101, + "siteGoodsId": 101, + "stockType": "OUT", + "goodsName": "测试球杆", + "createTime": END_TS, + "startNum": 10, + "endNum": 8, + "changeNum": -2, + "unit": "支", + "price": "120.00", + "operatorName": "仓管", + "remark": "测试出库", + "goodsCategoryId": 201, + "goodsSecondCategoryId": 202, + } + ], + ), + TaskSpec( + code="TOPUPS", + task_cls=TopupsTask, + endpoint="/Topup/SettleList", + data_path=("data", "settleList"), + sample_records=[ + { + "id": 1201, + "memberId": 401, + "memberName": "张三", + "memberPhone": "13800000000", + "tenantMemberCardId": 1301, + "memberCardTypeName": "金卡", + "payAmount": "500.00", + "consumeMoney": "100.00", + "settleStatus": "DONE", + "settleType": "AUTO", + "settleName": "日结", + "settleRelateId": 1501, + "payTime": BASE_TS, + "createTime": END_TS, + "operatorId": 11, + "operatorName": "收银员", + "paymentMethod": "CASH", + "refundAmount": "0", + "cashAmount": "500.00", + "cardAmount": "0", + "balanceAmount": "0", + "onlineAmount": "0", + "roundingAmount": "0", + "adjustAmount": "0", + "goodsMoney": "0", + "tableChargeMoney": "0", + "serviceMoney": "0", + "couponAmount": "0", + "orderRemark": "首次充值", + } + ], + ), + TaskSpec( + code="TABLE_DISCOUNT", + task_cls=TableDiscountTask, + endpoint="/Table/AdjustList", + data_path=("data", "taiFeeAdjustInfos"), + sample_records=[ + { + "id": 1301, + "adjust_type": "DISCOUNT", + "applicant_id": 11, + "applicant_name": "店长", + "operator_id": 22, + "operator_name": "值班", + "ledger_amount": "50.00", + "ledger_count": 2, + "ledger_name": "调价", + "ledger_status": "APPROVED", + "order_settle_id": 7010, + "order_trade_no": 8001, + "site_table_id": 301, + "create_time": END_TS, + "is_delete": False, + "tableProfile": { + "id": 301, + "site_table_area_id": 40, + "site_table_area_name": "大厅", + }, + } + ], + ), + TaskSpec( + code="ASSISTANT_ABOLISH", + task_cls=AssistantAbolishTask, + endpoint="/Assistant/AbolishList", + data_path=("data", "abolitionAssistants"), + sample_records=[ + { + "id": 1401, + "tableId": 301, + "tableName": "1号桌", + "tableAreaId": 40, + "tableArea": "大厅", + "assistantOn": "AS001", + "assistantName": "小李", + "pdChargeMinutes": 30, + "assistantAbolishAmount": "15.00", + "createTime": END_TS, + "trashReason": "测试", + } + ], + ), + TaskSpec( + code="LEDGER", + task_cls=LedgerTask, + endpoint="/Assistant/LedgerList", + data_path=("data", "orderAssistantDetails"), + sample_records=[ + { + "id": 1501, + "assistantNo": "AS001", + "assistantName": "小李", + "nickname": "李", + "levelName": "L1", + "tableName": "1号桌", + "ledger_unit_price": "30.00", + "ledger_count": 2, + "ledger_amount": "60.00", + "projected_income": "80.00", + "service_money": "5.00", + "member_discount_amount": "2.00", + "manual_discount_amount": "1.00", + "coupon_deduct_money": "3.00", + "order_trade_no": 8001, + "order_settle_id": 7010, + "operator_id": 22, + "operator_name": "值班", + "assistant_team_id": 10, + "assistant_level": "A", + "site_table_id": 301, + "order_assistant_id": 1601, + "site_assistant_id": 501, + "user_id": 5010, + "ledger_start_time": BASE_TS, + "ledger_end_time": END_TS, + "start_use_time": BASE_TS, + "last_use_time": END_TS, + "income_seconds": 3600, + "real_use_seconds": 3300, + "is_trash": False, + "trash_reason": "", + "is_confirm": True, + "ledger_status": "CLOSED", + "create_time": END_TS, + } + ], + ), +] diff --git a/etl_billiards/tests/unit/test_etl_tasks_offline.py b/etl_billiards/tests/unit/test_etl_tasks_offline.py new file mode 100644 index 0000000..a627807 --- /dev/null +++ b/etl_billiards/tests/unit/test_etl_tasks_offline.py @@ -0,0 +1,39 @@ +# -*- coding: utf-8 -*- +"""离线模式任务测试,通过回放归档 JSON 来验证 T+L 链路可用。""" +import logging +from pathlib import Path + +import pytest + +from .task_test_utils import ( + TASK_SPECS, + OfflineAPIClient, + create_test_config, + dump_offline_payload, + get_db_operations, +) + + +@pytest.mark.parametrize("spec", TASK_SPECS, ids=lambda spec: spec.code) +def test_task_offline_mode(spec, tmp_path): + """确保每个任务都能读取归档 JSON 并完成 Transform + Load 操作。""" + archive_dir = tmp_path / "archive" + temp_dir = tmp_path / "tmp" + archive_dir.mkdir() + temp_dir.mkdir() + + file_path = dump_offline_payload(spec, archive_dir) + config = create_test_config("OFFLINE", archive_dir, temp_dir) + offline_api = OfflineAPIClient({spec.endpoint: Path(file_path)}) + logger = logging.getLogger(f"test_offline_{spec.code.lower()}") + + with get_db_operations() as db_ops: + task = spec.task_cls(config, db_ops, offline_api, logger) + result = task.execute() + + assert result["status"] == "SUCCESS" + assert result["counts"]["fetched"] == len(spec.sample_records) + assert result["counts"]["inserted"] == len(spec.sample_records) + if hasattr(db_ops, "commits"): + assert db_ops.commits == 1 + assert db_ops.rollbacks == 0 diff --git a/etl_billiards/tests/unit/test_etl_tasks_online.py b/etl_billiards/tests/unit/test_etl_tasks_online.py new file mode 100644 index 0000000..83a9b42 --- /dev/null +++ b/etl_billiards/tests/unit/test_etl_tasks_online.py @@ -0,0 +1,32 @@ +# -*- coding: utf-8 -*- +"""在线模式下的端到端任务测试,验证所有任务在模拟 API 下能顺利执行。""" +import logging +import pytest + +from .task_test_utils import ( + TASK_SPECS, + FakeAPIClient, + create_test_config, + get_db_operations, +) + + +@pytest.mark.parametrize("spec", TASK_SPECS, ids=lambda spec: spec.code) +def test_task_online_mode(spec, tmp_path): + """针对每个 TaskSpec 验证:模拟 API 数据下依旧能完整跑完 ETL,并正确统计。""" + archive_dir = tmp_path / "archive" + temp_dir = tmp_path / "tmp" + config = create_test_config("ONLINE", archive_dir, temp_dir) + fake_api = FakeAPIClient({spec.endpoint: spec.sample_records}) + logger = logging.getLogger(f"test_online_{spec.code.lower()}") + + with get_db_operations() as db_ops: + task = spec.task_cls(config, db_ops, fake_api, logger) + result = task.execute() + + assert result["status"] == "SUCCESS" + assert result["counts"]["fetched"] == len(spec.sample_records) + assert result["counts"]["inserted"] == len(spec.sample_records) + if hasattr(db_ops, "commits"): + assert db_ops.commits == 1 + assert db_ops.rollbacks == 0