Compare commits

...

7 Commits

Author SHA1 Message Date
Neo
821d302243 Merge branch 'main' into dev 2025-11-19 03:38:27 +08:00
Neo
9a1df70a23 补全任务与测试 2025-11-19 03:36:44 +08:00
Neo
5bb5a8a568 迁移代码到Git 2025-11-18 21:46:46 +08:00
Neo
c3749474c6 迁移代码到Git 2025-11-18 02:32:00 +08:00
Neo
7f87421678 迁移代码到Git 2025-11-18 02:31:52 +08:00
13d853c3f5 Merge pull request 'Merge pull request '清空.md' (#2) from main into test' (#3) from test into main
Reviewed-on: #3
2025-11-17 17:10:17 +00:00
56fac9d3c0 Merge pull request '清空.md' (#2) from main into test
Reviewed-on: #2
2025-11-17 17:08:30 +00:00
31 changed files with 3101 additions and 1 deletions

46
etl_billiards/.env Normal file
View File

@@ -0,0 +1,46 @@
# 数据库配置
PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ
# PG_HOST=localhost
# PG_PORT=5432
# PG_NAME=LLZQ
# PG_USER=local-Python
# PG_PASSWORD=your_password_here
# API配置(抓取时,非球的一些配置)
API_BASE=https://api.example.com # 非球URL前缀
API_TOKEN=your_token_here # 登录Token
# API_TIMEOUT=20
# API_PAGE_SIZE=200
# API_RETRY_MAX=3
# 应用配置
STORE_ID=2790685415443269
# TIMEZONE=Asia/Taipei
# SCHEMA_OLTP=billiards
# SCHEMA_ETL=etl_admin
# 路径配置
EXPORT_ROOT=r"D:\LLZQ\DB\export",
LOG_ROOT=r"D:\LLZQ\DB\logs",
# ETL配置
OVERLAP_SECONDS=120 # 为了防止边界遗漏,会往前“回拨”一点的冗余秒数
WINDOW_BUSY_MIN=30
WINDOW_IDLE_MIN=180
IDLE_START=04:00
IDLE_END=16:00
ALLOW_EMPTY_ADVANCE=true
# 清洗配置
LOG_UNKNOWN_FIELDS=true
HASH_ALGO=sha1
STRICT_NUMERIC=true
ROUND_MONEY_SCALE=2
# 测试/离线模式
TEST_MODE=OFFLINE
TEST_JSON_ARCHIVE_DIR=tests/testdata_json #指定离线模式OFFLINE要读取的 JSON 归档目录。测试或回放任务时,会从这个目录中找到各个任务预先保存的 API 响应文件,直接做 Transform + Load不再访问真实接口。
TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp #指定测试运行时临时生成或复制 JSON 文件的目录。在线/离线联动测试会把输出、中间文件等写到这个路径,既避免污染真实导出目录,也让 CI 可以在一次运行中隔离不同任务产生的临时数据。
# 测试数据库(留空则单元测试使用伪库)
TEST_DB_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test

View File

@@ -36,3 +36,14 @@ LOG_UNKNOWN_FIELDS=true
HASH_ALGO=sha1
STRICT_NUMERIC=true
ROUND_MONEY_SCALE=2
<<<<<<< HEAD
=======
# 测试/离线模式
TEST_MODE=ONLINE
TEST_JSON_ARCHIVE_DIR=tests/testdata_json
TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp
# 测试数据库(可选:若设置则单元测试连入此 DSN
TEST_DB_DSN=
>>>>>>> main

View File

@@ -191,6 +191,37 @@ pytest --cov=. --cov-report=html
- `tests/unit/test_parsers.py` 解析器单元测试
- `tests/integration/test_database.py` 数据库集成测试
<<<<<<< HEAD
=======
#### 3.3.1 测试模式ONLINE / OFFLINE
- `TEST_MODE=ONLINE`(默认)时,测试会模拟实时 API完整执行 E/T/L。
- `TEST_MODE=OFFLINE` 时,测试改为从 `TEST_JSON_ARCHIVE_DIR` 指定的归档 JSON 中读取数据,仅做 Transform + Load适合验证本地归档数据是否仍可回放。
- `TEST_JSON_ARCHIVE_DIR`:离线 JSON 归档目录(示例:`tests/testdata_json` 或 CI 产出的快照)。
- `TEST_JSON_TEMP_DIR`:测试生成的临时 JSON 输出目录,便于隔离每次运行的数据。
- `TEST_DB_DSN`:可选,若设置则单元测试会连接到此 PostgreSQL DSN实打实执行写库留空时测试使用内存伪库避免依赖数据库。
示例命令:
```bash
# 在线模式覆盖所有任务
TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py
# 离线模式使用归档 JSON 覆盖所有任务
TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/testdata_json pytest tests/unit/test_etl_tasks_offline.py
# 使用脚本按需组合参数(示例:在线 + 仅订单用例)
python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS
# 使用脚本连接真实测试库并回放离线模式
python scripts/run_tests.py --suite offline --mode OFFLINE --db-dsn postgresql://user:pwd@localhost:5432/testdb
# 使用“指令仓库”中的预置命令
python scripts/run_tests.py --preset offline_realdb
python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py
```
>>>>>>> main
---
## 4. 项目结构与文件说明
@@ -277,6 +308,11 @@ etl_billiards/
│ │ ├── __init__.py
│ │ ├── test_config.py
│ │ └── test_parsers.py
<<<<<<< HEAD
=======
│ ├── testdata_json/ # 清洗入库用的测试Json文件
│ │ └── XX.json
>>>>>>> main
│ └── integration/ # 集成测试
│ ├── __init__.py
│ └── test_database.py

View File

@@ -76,6 +76,17 @@ DEFAULTS = {
"redact_keys": ["token", "password", "Authorization"],
"echo_token_in_logs": False,
},
<<<<<<< HEAD
=======
"testing": {
# ONLINE: 正常实时ETLOFFLINE: 读取归档JSON做T/L
"mode": "OFFLINE",
# 离线归档JSON所在目录
"json_archive_dir": "",
# 测试运行时用于生成/复制临时JSON的目录
"temp_json_dir": "",
},
>>>>>>> main
}
# 任务代码常量

View File

@@ -24,6 +24,12 @@ ENV_MAP = {
"OVERLAP_SECONDS": ("run.overlap_seconds",),
"WINDOW_BUSY_MIN": ("run.window_minutes.default_busy",),
"WINDOW_IDLE_MIN": ("run.window_minutes.default_idle",),
<<<<<<< HEAD
=======
"TEST_MODE": ("testing.mode",),
"TEST_JSON_ARCHIVE_DIR": ("testing.json_archive_dir",),
"TEST_JSON_TEMP_DIR": ("testing.temp_json_dir",),
>>>>>>> main
}
def _deep_set(d, dotted_keys, value):

View File

@@ -0,0 +1,114 @@
# -*- coding: utf-8 -*-
"""助教维度加载器"""
from ..base_loader import BaseLoader
class AssistantLoader(BaseLoader):
"""写入 dim_assistant"""
def upsert_assistants(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.dim_assistant (
store_id,
assistant_id,
assistant_no,
nickname,
real_name,
gender,
mobile,
level,
team_id,
team_name,
assistant_status,
work_status,
entry_time,
resign_time,
start_time,
end_time,
create_time,
update_time,
system_role_id,
online_status,
allow_cx,
charge_way,
pd_unit_price,
cx_unit_price,
is_guaranteed,
is_team_leader,
serial_number,
show_sort,
is_delete,
raw_data
)
VALUES (
%(store_id)s,
%(assistant_id)s,
%(assistant_no)s,
%(nickname)s,
%(real_name)s,
%(gender)s,
%(mobile)s,
%(level)s,
%(team_id)s,
%(team_name)s,
%(assistant_status)s,
%(work_status)s,
%(entry_time)s,
%(resign_time)s,
%(start_time)s,
%(end_time)s,
%(create_time)s,
%(update_time)s,
%(system_role_id)s,
%(online_status)s,
%(allow_cx)s,
%(charge_way)s,
%(pd_unit_price)s,
%(cx_unit_price)s,
%(is_guaranteed)s,
%(is_team_leader)s,
%(serial_number)s,
%(show_sort)s,
%(is_delete)s,
%(raw_data)s
)
ON CONFLICT (store_id, assistant_id) DO UPDATE SET
assistant_no = EXCLUDED.assistant_no,
nickname = EXCLUDED.nickname,
real_name = EXCLUDED.real_name,
gender = EXCLUDED.gender,
mobile = EXCLUDED.mobile,
level = EXCLUDED.level,
team_id = EXCLUDED.team_id,
team_name = EXCLUDED.team_name,
assistant_status= EXCLUDED.assistant_status,
work_status = EXCLUDED.work_status,
entry_time = EXCLUDED.entry_time,
resign_time = EXCLUDED.resign_time,
start_time = EXCLUDED.start_time,
end_time = EXCLUDED.end_time,
update_time = COALESCE(EXCLUDED.update_time, now()),
system_role_id = EXCLUDED.system_role_id,
online_status = EXCLUDED.online_status,
allow_cx = EXCLUDED.allow_cx,
charge_way = EXCLUDED.charge_way,
pd_unit_price = EXCLUDED.pd_unit_price,
cx_unit_price = EXCLUDED.cx_unit_price,
is_guaranteed = EXCLUDED.is_guaranteed,
is_team_leader = EXCLUDED.is_team_leader,
serial_number = EXCLUDED.serial_number,
show_sort = EXCLUDED.show_sort,
is_delete = EXCLUDED.is_delete,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
"""团购/套餐定义加载器"""
from ..base_loader import BaseLoader
class PackageDefinitionLoader(BaseLoader):
"""写入 dim_package_coupon"""
def upsert_packages(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.dim_package_coupon (
store_id,
package_id,
package_code,
package_name,
table_area_id,
table_area_name,
selling_price,
duration_seconds,
start_time,
end_time,
type,
is_enabled,
is_delete,
usable_count,
creator_name,
date_type,
group_type,
coupon_money,
area_tag_type,
system_group_type,
card_type_ids,
raw_data
)
VALUES (
%(store_id)s,
%(package_id)s,
%(package_code)s,
%(package_name)s,
%(table_area_id)s,
%(table_area_name)s,
%(selling_price)s,
%(duration_seconds)s,
%(start_time)s,
%(end_time)s,
%(type)s,
%(is_enabled)s,
%(is_delete)s,
%(usable_count)s,
%(creator_name)s,
%(date_type)s,
%(group_type)s,
%(coupon_money)s,
%(area_tag_type)s,
%(system_group_type)s,
%(card_type_ids)s,
%(raw_data)s
)
ON CONFLICT (store_id, package_id) DO UPDATE SET
package_code = EXCLUDED.package_code,
package_name = EXCLUDED.package_name,
table_area_id = EXCLUDED.table_area_id,
table_area_name = EXCLUDED.table_area_name,
selling_price = EXCLUDED.selling_price,
duration_seconds = EXCLUDED.duration_seconds,
start_time = EXCLUDED.start_time,
end_time = EXCLUDED.end_time,
type = EXCLUDED.type,
is_enabled = EXCLUDED.is_enabled,
is_delete = EXCLUDED.is_delete,
usable_count = EXCLUDED.usable_count,
creator_name = EXCLUDED.creator_name,
date_type = EXCLUDED.date_type,
group_type = EXCLUDED.group_type,
coupon_money = EXCLUDED.coupon_money,
area_tag_type = EXCLUDED.area_tag_type,
system_group_type = EXCLUDED.system_group_type,
card_type_ids = EXCLUDED.card_type_ids,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,80 @@
# -*- coding: utf-8 -*-
"""台桌维度加载器"""
from ..base_loader import BaseLoader
class TableLoader(BaseLoader):
"""将台桌档案写入 dim_table"""
def upsert_tables(self, records: list) -> tuple:
"""批量写入台桌档案"""
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.dim_table (
store_id,
table_id,
site_id,
area_id,
area_name,
table_name,
table_price,
table_status,
table_status_name,
light_status,
is_rest_area,
show_status,
virtual_table,
charge_free,
only_allow_groupon,
is_online_reservation,
created_time,
raw_data
)
VALUES (
%(store_id)s,
%(table_id)s,
%(site_id)s,
%(area_id)s,
%(area_name)s,
%(table_name)s,
%(table_price)s,
%(table_status)s,
%(table_status_name)s,
%(light_status)s,
%(is_rest_area)s,
%(show_status)s,
%(virtual_table)s,
%(charge_free)s,
%(only_allow_groupon)s,
%(is_online_reservation)s,
%(created_time)s,
%(raw_data)s
)
ON CONFLICT (store_id, table_id) DO UPDATE SET
site_id = EXCLUDED.site_id,
area_id = EXCLUDED.area_id,
area_name = EXCLUDED.area_name,
table_name = EXCLUDED.table_name,
table_price = EXCLUDED.table_price,
table_status = EXCLUDED.table_status,
table_status_name = EXCLUDED.table_status_name,
light_status = EXCLUDED.light_status,
is_rest_area = EXCLUDED.is_rest_area,
show_status = EXCLUDED.show_status,
virtual_table = EXCLUDED.virtual_table,
charge_free = EXCLUDED.charge_free,
only_allow_groupon = EXCLUDED.only_allow_groupon,
is_online_reservation = EXCLUDED.is_online_reservation,
created_time = COALESCE(EXCLUDED.created_time, dim_table.created_time),
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,64 @@
# -*- coding: utf-8 -*-
"""助教作废事实表"""
from ..base_loader import BaseLoader
class AssistantAbolishLoader(BaseLoader):
"""写入 fact_assistant_abolish"""
def upsert_records(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.fact_assistant_abolish (
store_id,
abolish_id,
table_id,
table_name,
table_area_id,
table_area,
assistant_no,
assistant_name,
charge_minutes,
abolish_amount,
create_time,
trash_reason,
raw_data
)
VALUES (
%(store_id)s,
%(abolish_id)s,
%(table_id)s,
%(table_name)s,
%(table_area_id)s,
%(table_area)s,
%(assistant_no)s,
%(assistant_name)s,
%(charge_minutes)s,
%(abolish_amount)s,
%(create_time)s,
%(trash_reason)s,
%(raw_data)s
)
ON CONFLICT (store_id, abolish_id) DO UPDATE SET
table_id = EXCLUDED.table_id,
table_name = EXCLUDED.table_name,
table_area_id = EXCLUDED.table_area_id,
table_area = EXCLUDED.table_area,
assistant_no = EXCLUDED.assistant_no,
assistant_name = EXCLUDED.assistant_name,
charge_minutes = EXCLUDED.charge_minutes,
abolish_amount = EXCLUDED.abolish_amount,
create_time = EXCLUDED.create_time,
trash_reason = EXCLUDED.trash_reason,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
"""助教流水事实表"""
from ..base_loader import BaseLoader
class AssistantLedgerLoader(BaseLoader):
"""写入 fact_assistant_ledger"""
def upsert_ledgers(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.fact_assistant_ledger (
store_id,
ledger_id,
assistant_no,
assistant_name,
nickname,
level_name,
table_name,
ledger_unit_price,
ledger_count,
ledger_amount,
projected_income,
service_money,
member_discount_amount,
manual_discount_amount,
coupon_deduct_money,
order_trade_no,
order_settle_id,
operator_id,
operator_name,
assistant_team_id,
assistant_level,
site_table_id,
order_assistant_id,
site_assistant_id,
user_id,
ledger_start_time,
ledger_end_time,
start_use_time,
last_use_time,
income_seconds,
real_use_seconds,
is_trash,
trash_reason,
is_confirm,
ledger_status,
create_time,
raw_data
)
VALUES (
%(store_id)s,
%(ledger_id)s,
%(assistant_no)s,
%(assistant_name)s,
%(nickname)s,
%(level_name)s,
%(table_name)s,
%(ledger_unit_price)s,
%(ledger_count)s,
%(ledger_amount)s,
%(projected_income)s,
%(service_money)s,
%(member_discount_amount)s,
%(manual_discount_amount)s,
%(coupon_deduct_money)s,
%(order_trade_no)s,
%(order_settle_id)s,
%(operator_id)s,
%(operator_name)s,
%(assistant_team_id)s,
%(assistant_level)s,
%(site_table_id)s,
%(order_assistant_id)s,
%(site_assistant_id)s,
%(user_id)s,
%(ledger_start_time)s,
%(ledger_end_time)s,
%(start_use_time)s,
%(last_use_time)s,
%(income_seconds)s,
%(real_use_seconds)s,
%(is_trash)s,
%(trash_reason)s,
%(is_confirm)s,
%(ledger_status)s,
%(create_time)s,
%(raw_data)s
)
ON CONFLICT (store_id, ledger_id) DO UPDATE SET
assistant_no = EXCLUDED.assistant_no,
assistant_name = EXCLUDED.assistant_name,
nickname = EXCLUDED.nickname,
level_name = EXCLUDED.level_name,
table_name = EXCLUDED.table_name,
ledger_unit_price = EXCLUDED.ledger_unit_price,
ledger_count = EXCLUDED.ledger_count,
ledger_amount = EXCLUDED.ledger_amount,
projected_income = EXCLUDED.projected_income,
service_money = EXCLUDED.service_money,
member_discount_amount = EXCLUDED.member_discount_amount,
manual_discount_amount = EXCLUDED.manual_discount_amount,
coupon_deduct_money = EXCLUDED.coupon_deduct_money,
order_trade_no = EXCLUDED.order_trade_no,
order_settle_id = EXCLUDED.order_settle_id,
operator_id = EXCLUDED.operator_id,
operator_name = EXCLUDED.operator_name,
assistant_team_id = EXCLUDED.assistant_team_id,
assistant_level = EXCLUDED.assistant_level,
site_table_id = EXCLUDED.site_table_id,
order_assistant_id = EXCLUDED.order_assistant_id,
site_assistant_id = EXCLUDED.site_assistant_id,
user_id = EXCLUDED.user_id,
ledger_start_time = EXCLUDED.ledger_start_time,
ledger_end_time = EXCLUDED.ledger_end_time,
start_use_time = EXCLUDED.start_use_time,
last_use_time = EXCLUDED.last_use_time,
income_seconds = EXCLUDED.income_seconds,
real_use_seconds = EXCLUDED.real_use_seconds,
is_trash = EXCLUDED.is_trash,
trash_reason = EXCLUDED.trash_reason,
is_confirm = EXCLUDED.is_confirm,
ledger_status = EXCLUDED.ledger_status,
create_time = EXCLUDED.create_time,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
"""券核销事实表"""
from ..base_loader import BaseLoader
class CouponUsageLoader(BaseLoader):
"""写入 fact_coupon_usage"""
def upsert_coupon_usage(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.fact_coupon_usage (
store_id,
usage_id,
coupon_code,
coupon_channel,
coupon_name,
sale_price,
coupon_money,
coupon_free_time,
use_status,
create_time,
consume_time,
operator_id,
operator_name,
table_id,
site_order_id,
group_package_id,
coupon_remark,
deal_id,
certificate_id,
verify_id,
is_delete,
raw_data
)
VALUES (
%(store_id)s,
%(usage_id)s,
%(coupon_code)s,
%(coupon_channel)s,
%(coupon_name)s,
%(sale_price)s,
%(coupon_money)s,
%(coupon_free_time)s,
%(use_status)s,
%(create_time)s,
%(consume_time)s,
%(operator_id)s,
%(operator_name)s,
%(table_id)s,
%(site_order_id)s,
%(group_package_id)s,
%(coupon_remark)s,
%(deal_id)s,
%(certificate_id)s,
%(verify_id)s,
%(is_delete)s,
%(raw_data)s
)
ON CONFLICT (store_id, usage_id) DO UPDATE SET
coupon_code = EXCLUDED.coupon_code,
coupon_channel = EXCLUDED.coupon_channel,
coupon_name = EXCLUDED.coupon_name,
sale_price = EXCLUDED.sale_price,
coupon_money = EXCLUDED.coupon_money,
coupon_free_time = EXCLUDED.coupon_free_time,
use_status = EXCLUDED.use_status,
create_time = EXCLUDED.create_time,
consume_time = EXCLUDED.consume_time,
operator_id = EXCLUDED.operator_id,
operator_name = EXCLUDED.operator_name,
table_id = EXCLUDED.table_id,
site_order_id = EXCLUDED.site_order_id,
group_package_id = EXCLUDED.group_package_id,
coupon_remark = EXCLUDED.coupon_remark,
deal_id = EXCLUDED.deal_id,
certificate_id = EXCLUDED.certificate_id,
verify_id = EXCLUDED.verify_id,
is_delete = EXCLUDED.is_delete,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,73 @@
# -*- coding: utf-8 -*-
"""库存变动事实表"""
from ..base_loader import BaseLoader
class InventoryChangeLoader(BaseLoader):
"""写入 fact_inventory_change"""
def upsert_changes(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.fact_inventory_change (
store_id,
change_id,
site_goods_id,
stock_type,
goods_name,
change_time,
start_qty,
end_qty,
change_qty,
unit,
price,
operator_name,
remark,
goods_category_id,
goods_second_category_id,
raw_data
)
VALUES (
%(store_id)s,
%(change_id)s,
%(site_goods_id)s,
%(stock_type)s,
%(goods_name)s,
%(change_time)s,
%(start_qty)s,
%(end_qty)s,
%(change_qty)s,
%(unit)s,
%(price)s,
%(operator_name)s,
%(remark)s,
%(goods_category_id)s,
%(goods_second_category_id)s,
%(raw_data)s
)
ON CONFLICT (store_id, change_id) DO UPDATE SET
site_goods_id = EXCLUDED.site_goods_id,
stock_type = EXCLUDED.stock_type,
goods_name = EXCLUDED.goods_name,
change_time = EXCLUDED.change_time,
start_qty = EXCLUDED.start_qty,
end_qty = EXCLUDED.end_qty,
change_qty = EXCLUDED.change_qty,
unit = EXCLUDED.unit,
price = EXCLUDED.price,
operator_name = EXCLUDED.operator_name,
remark = EXCLUDED.remark,
goods_category_id = EXCLUDED.goods_category_id,
goods_second_category_id = EXCLUDED.goods_second_category_id,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,88 @@
# -*- coding: utf-8 -*-
"""退款事实表加载器"""
from ..base_loader import BaseLoader
class RefundLoader(BaseLoader):
"""写入 fact_refund"""
def upsert_refunds(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.fact_refund (
store_id,
refund_id,
site_id,
tenant_id,
pay_amount,
pay_status,
pay_time,
create_time,
relate_type,
relate_id,
payment_method,
refund_amount,
action_type,
pay_terminal,
operator_id,
channel_pay_no,
channel_fee,
is_delete,
member_id,
member_card_id,
raw_data
)
VALUES (
%(store_id)s,
%(refund_id)s,
%(site_id)s,
%(tenant_id)s,
%(pay_amount)s,
%(pay_status)s,
%(pay_time)s,
%(create_time)s,
%(relate_type)s,
%(relate_id)s,
%(payment_method)s,
%(refund_amount)s,
%(action_type)s,
%(pay_terminal)s,
%(operator_id)s,
%(channel_pay_no)s,
%(channel_fee)s,
%(is_delete)s,
%(member_id)s,
%(member_card_id)s,
%(raw_data)s
)
ON CONFLICT (store_id, refund_id) DO UPDATE SET
site_id = EXCLUDED.site_id,
tenant_id = EXCLUDED.tenant_id,
pay_amount = EXCLUDED.pay_amount,
pay_status = EXCLUDED.pay_status,
pay_time = EXCLUDED.pay_time,
create_time = EXCLUDED.create_time,
relate_type = EXCLUDED.relate_type,
relate_id = EXCLUDED.relate_id,
payment_method = EXCLUDED.payment_method,
refund_amount = EXCLUDED.refund_amount,
action_type = EXCLUDED.action_type,
pay_terminal = EXCLUDED.pay_terminal,
operator_id = EXCLUDED.operator_id,
channel_pay_no = EXCLUDED.channel_pay_no,
channel_fee = EXCLUDED.channel_fee,
is_delete = EXCLUDED.is_delete,
member_id = EXCLUDED.member_id,
member_card_id = EXCLUDED.member_card_id,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,82 @@
# -*- coding: utf-8 -*-
"""台费打折事实表"""
from ..base_loader import BaseLoader
class TableDiscountLoader(BaseLoader):
"""写入 fact_table_discount"""
def upsert_discounts(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.fact_table_discount (
store_id,
discount_id,
adjust_type,
applicant_id,
applicant_name,
operator_id,
operator_name,
ledger_amount,
ledger_count,
ledger_name,
ledger_status,
order_settle_id,
order_trade_no,
site_table_id,
table_area_id,
table_area_name,
create_time,
is_delete,
raw_data
)
VALUES (
%(store_id)s,
%(discount_id)s,
%(adjust_type)s,
%(applicant_id)s,
%(applicant_name)s,
%(operator_id)s,
%(operator_name)s,
%(ledger_amount)s,
%(ledger_count)s,
%(ledger_name)s,
%(ledger_status)s,
%(order_settle_id)s,
%(order_trade_no)s,
%(site_table_id)s,
%(table_area_id)s,
%(table_area_name)s,
%(create_time)s,
%(is_delete)s,
%(raw_data)s
)
ON CONFLICT (store_id, discount_id) DO UPDATE SET
adjust_type = EXCLUDED.adjust_type,
applicant_id = EXCLUDED.applicant_id,
applicant_name = EXCLUDED.applicant_name,
operator_id = EXCLUDED.operator_id,
operator_name = EXCLUDED.operator_name,
ledger_amount = EXCLUDED.ledger_amount,
ledger_count = EXCLUDED.ledger_count,
ledger_name = EXCLUDED.ledger_name,
ledger_status = EXCLUDED.ledger_status,
order_settle_id = EXCLUDED.order_settle_id,
order_trade_no = EXCLUDED.order_trade_no,
site_table_id = EXCLUDED.site_table_id,
table_area_id = EXCLUDED.table_area_id,
table_area_name = EXCLUDED.table_area_name,
create_time = EXCLUDED.create_time,
is_delete = EXCLUDED.is_delete,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -0,0 +1,118 @@
# -*- coding: utf-8 -*-
"""充值记录事实表"""
from ..base_loader import BaseLoader
class TopupLoader(BaseLoader):
"""写入 fact_topup"""
def upsert_topups(self, records: list) -> tuple:
if not records:
return (0, 0, 0)
sql = """
INSERT INTO billiards.fact_topup (
store_id,
topup_id,
member_id,
member_name,
member_phone,
card_id,
card_type_name,
pay_amount,
consume_money,
settle_status,
settle_type,
settle_name,
settle_relate_id,
pay_time,
create_time,
operator_id,
operator_name,
payment_method,
refund_amount,
cash_amount,
card_amount,
balance_amount,
online_amount,
rounding_amount,
adjust_amount,
goods_money,
table_charge_money,
service_money,
coupon_amount,
order_remark,
raw_data
)
VALUES (
%(store_id)s,
%(topup_id)s,
%(member_id)s,
%(member_name)s,
%(member_phone)s,
%(card_id)s,
%(card_type_name)s,
%(pay_amount)s,
%(consume_money)s,
%(settle_status)s,
%(settle_type)s,
%(settle_name)s,
%(settle_relate_id)s,
%(pay_time)s,
%(create_time)s,
%(operator_id)s,
%(operator_name)s,
%(payment_method)s,
%(refund_amount)s,
%(cash_amount)s,
%(card_amount)s,
%(balance_amount)s,
%(online_amount)s,
%(rounding_amount)s,
%(adjust_amount)s,
%(goods_money)s,
%(table_charge_money)s,
%(service_money)s,
%(coupon_amount)s,
%(order_remark)s,
%(raw_data)s
)
ON CONFLICT (store_id, topup_id) DO UPDATE SET
member_id = EXCLUDED.member_id,
member_name = EXCLUDED.member_name,
member_phone = EXCLUDED.member_phone,
card_id = EXCLUDED.card_id,
card_type_name = EXCLUDED.card_type_name,
pay_amount = EXCLUDED.pay_amount,
consume_money = EXCLUDED.consume_money,
settle_status = EXCLUDED.settle_status,
settle_type = EXCLUDED.settle_type,
settle_name = EXCLUDED.settle_name,
settle_relate_id = EXCLUDED.settle_relate_id,
pay_time = EXCLUDED.pay_time,
create_time = EXCLUDED.create_time,
operator_id = EXCLUDED.operator_id,
operator_name = EXCLUDED.operator_name,
payment_method = EXCLUDED.payment_method,
refund_amount = EXCLUDED.refund_amount,
cash_amount = EXCLUDED.cash_amount,
card_amount = EXCLUDED.card_amount,
balance_amount = EXCLUDED.balance_amount,
online_amount = EXCLUDED.online_amount,
rounding_amount = EXCLUDED.rounding_amount,
adjust_amount = EXCLUDED.adjust_amount,
goods_money = EXCLUDED.goods_money,
table_charge_money = EXCLUDED.table_charge_money,
service_money = EXCLUDED.service_money,
coupon_amount = EXCLUDED.coupon_amount,
order_remark = EXCLUDED.order_remark,
raw_data = EXCLUDED.raw_data,
updated_at = now()
RETURNING (xmax = 0) AS inserted
"""
inserted, updated = self.db.batch_upsert_with_returning(
sql, records, page_size=self._batch_size()
)
return (inserted, updated, 0)

View File

@@ -3,6 +3,20 @@
from tasks.orders_task import OrdersTask
from tasks.payments_task import PaymentsTask
from tasks.members_task import MembersTask
<<<<<<< HEAD
=======
from tasks.products_task import ProductsTask
from tasks.tables_task import TablesTask
from tasks.assistants_task import AssistantsTask
from tasks.packages_task import PackagesDefTask
from tasks.refunds_task import RefundsTask
from tasks.coupon_usage_task import CouponUsageTask
from tasks.inventory_change_task import InventoryChangeTask
from tasks.topups_task import TopupsTask
from tasks.table_discount_task import TableDiscountTask
from tasks.assistant_abolish_task import AssistantAbolishTask
from tasks.ledger_task import LedgerTask
>>>>>>> main
class TaskRegistry:
"""任务注册和工厂"""
@@ -30,7 +44,24 @@ class TaskRegistry:
# 默认注册表
default_registry = TaskRegistry()
<<<<<<< HEAD
default_registry.register("ORDERS", OrdersTask)
default_registry.register("PAYMENTS", PaymentsTask)
default_registry.register("MEMBERS", MembersTask)
# 可以继续注册其他任务...
=======
default_registry.register("PRODUCTS", ProductsTask)
default_registry.register("TABLES", TablesTask)
default_registry.register("MEMBERS", MembersTask)
default_registry.register("ASSISTANTS", AssistantsTask)
default_registry.register("PACKAGES_DEF", PackagesDefTask)
default_registry.register("ORDERS", OrdersTask)
default_registry.register("PAYMENTS", PaymentsTask)
default_registry.register("REFUNDS", RefundsTask)
default_registry.register("COUPON_USAGE", CouponUsageTask)
default_registry.register("INVENTORY_CHANGE", InventoryChangeTask)
default_registry.register("TOPUPS", TopupsTask)
default_registry.register("TABLE_DISCOUNT", TableDiscountTask)
default_registry.register("ASSISTANT_ABOLISH", AssistantAbolishTask)
default_registry.register("LEDGER", LedgerTask)
>>>>>>> main

View File

@@ -0,0 +1,196 @@
# -*- coding: utf-8 -*-
"""
灵活的测试执行脚本,可像搭积木一样组合不同参数或预置命令(模式/数据库/归档路径等),
直接运行本文件即可触发 pytest。
示例:
python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS
python scripts/run_tests.py --preset offline_realdb
python scripts/run_tests.py --suite online offline --db-dsn ... --json-archive tmp/archives
"""
from __future__ import annotations
import argparse
import importlib.util
import os
import shlex
import sys
from typing import Dict, List
import pytest
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
SUITE_MAP: Dict[str, str] = {
"online": "tests/unit/test_etl_tasks_online.py",
"offline": "tests/unit/test_etl_tasks_offline.py",
"integration": "tests/integration/test_database.py",
}
PRESETS: Dict[str, Dict] = {}
def _load_presets():
preset_path = os.path.join(os.path.dirname(__file__), "test_presets.py")
if not os.path.exists(preset_path):
return
spec = importlib.util.spec_from_file_location("test_presets", preset_path)
if not spec or not spec.loader:
return
module = importlib.util.module_from_spec(spec)
spec.loader.exec_module(module) # type: ignore[attr-defined]
presets = getattr(module, "PRESETS", {})
if isinstance(presets, dict):
PRESETS.update(presets)
_load_presets()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="ETL 测试执行器(支持参数化调配)")
parser.add_argument(
"--suite",
choices=sorted(SUITE_MAP.keys()),
nargs="+",
help="预置测试套件,可多选(默认全部 online/offline",
)
parser.add_argument(
"--tests",
nargs="+",
help="自定义测试路径(可与 --suite 混用),例如 tests/unit/test_config.py",
)
parser.add_argument(
"--mode",
choices=["ONLINE", "OFFLINE"],
help="覆盖 TEST_MODE默认沿用 .env / 环境变量)",
)
parser.add_argument("--db-dsn", help="设置 TEST_DB_DSN连接真实数据库进行测试")
parser.add_argument("--json-archive", help="设置 TEST_JSON_ARCHIVE_DIR离线档案目录")
parser.add_argument("--json-temp", help="设置 TEST_JSON_TEMP_DIR临时 JSON 路径)")
parser.add_argument(
"--keyword",
"-k",
help="pytest -k 关键字过滤(例如 ORDERS只运行包含该字符串的用例",
)
parser.add_argument(
"--pytest-args",
help="附加 pytest 参数,格式与命令行一致(例如 \"-vv --maxfail=1\"",
)
parser.add_argument(
"--env",
action="append",
metavar="KEY=VALUE",
help="自定义环境变量,可重复传入,例如 --env STORE_ID=123",
)
parser.add_argument("--preset", choices=sorted(PRESETS.keys()) if PRESETS else None, nargs="+",
help="从 scripts/test_presets.py 中选择一个或多个组合命令")
parser.add_argument("--list-presets", action="store_true", help="列出可用预置命令后退出")
parser.add_argument("--dry-run", action="store_true", help="仅打印将要执行的命令与环境,不真正运行 pytest")
return parser.parse_args()
def apply_presets_to_args(args: argparse.Namespace):
if not args.preset:
return
for name in args.preset:
preset = PRESETS.get(name, {})
if not preset:
continue
for key, value in preset.items():
if key in ("suite", "tests"):
if not value:
continue
existing = getattr(args, key)
if existing is None:
setattr(args, key, list(value))
else:
existing.extend(value)
elif key == "env":
args.env = (args.env or []) + list(value)
elif key == "pytest_args":
args.pytest_args = " ".join(filter(None, [value, args.pytest_args or ""]))
elif key == "keyword":
if args.keyword is None:
args.keyword = value
else:
if getattr(args, key, None) is None:
setattr(args, key, value)
def apply_env(args: argparse.Namespace) -> Dict[str, str]:
env_updates = {}
if args.mode:
env_updates["TEST_MODE"] = args.mode
if args.db_dsn:
env_updates["TEST_DB_DSN"] = args.db_dsn
if args.json_archive:
env_updates["TEST_JSON_ARCHIVE_DIR"] = args.json_archive
if args.json_temp:
env_updates["TEST_JSON_TEMP_DIR"] = args.json_temp
if args.env:
for item in args.env:
if "=" not in item:
raise SystemExit(f"--env 参数格式错误: {item!r},应为 KEY=VALUE")
key, value = item.split("=", 1)
env_updates[key.strip()] = value.strip()
for key, value in env_updates.items():
os.environ[key] = value
return env_updates
def build_pytest_args(args: argparse.Namespace) -> List[str]:
targets: List[str] = []
if args.suite:
for suite in args.suite:
targets.append(SUITE_MAP[suite])
if args.tests:
targets.extend(args.tests)
if not targets:
# 默认跑 online + offline 套件
targets = [SUITE_MAP["online"], SUITE_MAP["offline"]]
pytest_args: List[str] = targets
if args.keyword:
pytest_args += ["-k", args.keyword]
if args.pytest_args:
pytest_args += shlex.split(args.pytest_args)
return pytest_args
def main() -> int:
os.chdir(PROJECT_ROOT)
args = parse_args()
if args.list_presets:
print("可用预置命令:")
if not PRESETS:
print("(暂无,可编辑 scripts/test_presets.py 添加)")
else:
for name in sorted(PRESETS):
print(f"- {name}")
return 0
apply_presets_to_args(args)
env_updates = apply_env(args)
pytest_args = build_pytest_args(args)
print("=== 环境变量覆盖 ===")
if env_updates:
for k, v in env_updates.items():
print(f"{k}={v}")
else:
print("(无覆盖,沿用系统默认)")
print("\n=== Pytest 参数 ===")
print(" ".join(pytest_args))
print()
if args.dry_run:
print("Dry-run 模式,未真正执行 pytest")
return 0
exit_code = pytest.main(pytest_args)
return int(exit_code)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,166 @@
# -*- coding: utf-8 -*-
"""测试命令“仓库”,集中维护 run_tests.py 的预置组合。
支持的参数键(可在 PRESETS 中自由组合):
1. suite
- 类型:列表
- 作用:引用 run_tests 中的预置套件,值可为 online / offline / integration 等。
- 用法:["online"] 仅跑在线模式;["online","offline"] 同时跑两套;["integration"] 跑数据库集成测试。
2. tests
- 类型:列表
- 作用:传入任意 pytest 目标路径,适合补充临时/自定义测试文件。
- 用法:["tests/unit/test_config.py","tests/unit/test_parsers.py"]。
3. mode
- 类型:字符串
- 取值ONLINE 或 OFFLINE。
- 作用:覆盖 TEST_MODEONLINE 走 API 全流程OFFLINE 读取 JSON 归档执行 T+L。
4. db_dsn
- 类型:字符串
- 作用:设置 TEST_DB_DSN指定真实 PostgreSQL 连接;缺省时测试引擎使用伪 DB仅记录写入不触库。
- 示例postgresql://user:pwd@localhost:5432/testdb。
5. json_archive / json_temp
- 类型:字符串
- 作用:离线模式的 JSON 归档目录 / 临时输出目录。
- 说明:不设置时沿用 .env 或默认值;仅在 OFFLINE 模式需要关注。
6. keyword
- 类型:字符串
- 作用:等价 pytest -k用于筛选测试名/节点。
- 示例:"ORDERS" 可只运行包含该关键字的测试函数。
7. pytest_args
- 类型:字符串
- 作用:附加 pytest 命令行参数。
- 示例:"-vv --maxfail=1 --disable-warnings"
8. env
- 类型:列表
- 作用:追加环境变量,形如 ["STORE_ID=123","API_TOKEN=xxx"],会在 run_tests 内透传给 os.environ。
9. preset_meta
- 类型:字符串
- 作用:纯注释信息,便于描述该预置组合的用途,不会传递给 run_tests。
运行方式建议直接 F5或 `python scripts/test_presets.py`),脚本将读取 AUTO_RUN_PRESETS 中的配置依次执行。
如需临时指定其它预置,可传入 `--preset xxx``--list` 用于查看所有参数说明和预置详情。
"""
from __future__ import annotations
import argparse
import os
import subprocess
import sys
from typing import List
RUN_TESTS_SCRIPT = os.path.join(os.path.dirname(__file__), "run_tests.py")
AUTO_RUN_PRESETS = ["online_orders"]
# PRESETS = {
# "online_orders": {
# "suite": ["online"],
# "mode": "ONLINE",
# "keyword": "ORDERS",
# "pytest_args": "-vv",
# "preset_meta": "在线模式,仅跑订单任务,输出更详细日志",
# },
# "offline_realdb": {
# "suite": ["offline"],
# "mode": "OFFLINE",
# "db_dsn": "postgresql://user:pwd@localhost:5432/testdb",
# "json_archive": "tests/testdata_json",
# "preset_meta": "离线模式 + 真实测试库,用预置 JSON 回放全量任务",
# },
# "integration_db": {
# "suite": ["integration"],
# "db_dsn": "postgresql://user:pwd@localhost:5432/testdb",
# "preset_meta": "仅跑数据库连接/操作相关的集成测试",
# },
# }
PRESETS = {
"offline_realdb": {
"suite": ["offline"],
"mode": "OFFLINE",
"db_dsn": "postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test",
"json_archive": "tests/testdata_json",
"preset_meta": "离线模式 + 真实测试库,用预置 JSON 回放全量任务",
},
}
def print_parameter_help():
print("可用参数键说明:")
print(" suite -> 预置测试套件列表,如 ['online','offline']")
print(" tests -> 自定义测试文件路径列表")
print(" mode -> TEST_MODEONLINE / OFFLINE")
print(" db_dsn -> TEST_DB_DSN连接真实 PostgreSQL")
print(" json_archive -> TEST_JSON_ARCHIVE_DIR离线 JSON 目录")
print(" json_temp -> TEST_JSON_TEMP_DIR离线临时目录")
print(" keyword -> pytest -k 过滤关键字")
print(" pytest_args -> 额外 pytest 参数(单个字符串)")
print(" env -> 附加环境变量,形如 ['KEY=VALUE']")
print(" preset_meta -> 注释说明,不会传给 run_tests")
print()
def print_presets():
if not PRESETS:
print("当前没有定义任何预置命令,可自行在 PRESETS 中添加。")
return
for idx, (name, payload) in enumerate(PRESETS.items(), start=1):
comment = payload.get("preset_meta", "")
print(f"{idx}. {name}")
if comment:
print(f" 说明: {comment}")
for key, value in payload.items():
if key == "preset_meta":
continue
print(f" {key}: {value}")
print()
def run_presets(preset_names: List[str], dry_run: bool):
cmds = []
for name in preset_names:
cmd = [sys.executable, RUN_TESTS_SCRIPT, "--preset", name]
cmds.append(cmd)
for cmd in cmds:
printable = " ".join(cmd)
if dry_run:
print(f"[Dry-Run] {printable}")
else:
print(f"\n>>> 执行: {printable}")
subprocess.run(cmd, check=False)
def main():
parser = argparse.ArgumentParser(description="测试预置仓库(在此集中配置并运行测试组合)")
parser.add_argument("--preset", choices=sorted(PRESETS.keys()), nargs="+", help="直接指定要运行的预置命令")
parser.add_argument("--list", action="store_true", help="仅列出参数键和所有预置命令")
parser.add_argument("--dry-run", action="store_true", help="仅打印将要执行的命令,而不真正运行")
args = parser.parse_args()
if args.list:
print_parameter_help()
print_presets()
return
if args.preset:
target = args.preset
else:
target = AUTO_RUN_PRESETS or list(PRESETS.keys())
run_presets(target, dry_run=args.dry_run)
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,83 @@
# -*- coding: utf-8 -*-
"""助教作废任务"""
import json
from .base_task import BaseTask
from loaders.facts.assistant_abolish import AssistantAbolishLoader
from models.parsers import TypeParser
class AssistantAbolishTask(BaseTask):
"""同步助教作废记录"""
def get_task_code(self) -> str:
return "ASSISTANT_ABOLISH"
def execute(self) -> dict:
self.logger.info("开始执行 ASSISTANT_ABOLISH 任务")
window_start, window_end, _ = self._get_time_window()
params = {
"storeId": self.config.get("app.store_id"),
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}
try:
records, _ = self.api.get_paginated(
endpoint="/Assistant/AbolishList",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data", "abolitionAssistants"),
)
parsed = []
for raw in records:
mapped = self._parse_record(raw)
if mapped:
parsed.append(mapped)
loader = AssistantAbolishLoader(self.db)
inserted, updated, skipped = loader.upsert_records(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"ASSISTANT_ABOLISH 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("ASSISTANT_ABOLISH 失败", exc_info=True)
raise
def _parse_record(self, raw: dict) -> dict | None:
abolish_id = TypeParser.parse_int(raw.get("id"))
if not abolish_id:
self.logger.warning("跳过缺少 id 的助教作废记录: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"abolish_id": abolish_id,
"table_id": TypeParser.parse_int(raw.get("tableId")),
"table_name": raw.get("tableName"),
"table_area_id": TypeParser.parse_int(raw.get("tableAreaId")),
"table_area": raw.get("tableArea"),
"assistant_no": raw.get("assistantOn"),
"assistant_name": raw.get("assistantName"),
"charge_minutes": TypeParser.parse_int(raw.get("pdChargeMinutes")),
"abolish_amount": TypeParser.parse_decimal(
raw.get("assistantAbolishAmount")
),
"create_time": TypeParser.parse_timestamp(
raw.get("createTime") or raw.get("create_time"), self.tz
),
"trash_reason": raw.get("trashReason"),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -0,0 +1,103 @@
# -*- coding: utf-8 -*-
"""助教账号任务"""
import json
from .base_task import BaseTask
from loaders.dimensions.assistant import AssistantLoader
from models.parsers import TypeParser
class AssistantsTask(BaseTask):
"""同步助教账号资料"""
def get_task_code(self) -> str:
return "ASSISTANTS"
def execute(self) -> dict:
self.logger.info("开始执行 ASSISTANTS 任务")
params = {"storeId": self.config.get("app.store_id")}
try:
records, _ = self.api.get_paginated(
endpoint="/Assistant/List",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data", "assistantInfos"),
)
parsed = []
for raw in records:
mapped = self._parse_assistant(raw)
if mapped:
parsed.append(mapped)
loader = AssistantLoader(self.db)
inserted, updated, skipped = loader.upsert_assistants(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"ASSISTANTS 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("ASSISTANTS 失败", exc_info=True)
raise
def _parse_assistant(self, raw: dict) -> dict | None:
assistant_id = TypeParser.parse_int(raw.get("id"))
if not assistant_id:
self.logger.warning("跳过缺少 id 的助教数据: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"assistant_id": assistant_id,
"assistant_no": raw.get("assistant_no") or raw.get("assistantNo"),
"nickname": raw.get("nickname"),
"real_name": raw.get("real_name") or raw.get("realName"),
"gender": raw.get("gender"),
"mobile": raw.get("mobile"),
"level": raw.get("level"),
"team_id": TypeParser.parse_int(raw.get("team_id") or raw.get("teamId")),
"team_name": raw.get("team_name"),
"assistant_status": raw.get("assistant_status"),
"work_status": raw.get("work_status"),
"entry_time": TypeParser.parse_timestamp(
raw.get("entry_time") or raw.get("entryTime"), self.tz
),
"resign_time": TypeParser.parse_timestamp(
raw.get("resign_time") or raw.get("resignTime"), self.tz
),
"start_time": TypeParser.parse_timestamp(
raw.get("start_time") or raw.get("startTime"), self.tz
),
"end_time": TypeParser.parse_timestamp(
raw.get("end_time") or raw.get("endTime"), self.tz
),
"create_time": TypeParser.parse_timestamp(
raw.get("create_time") or raw.get("createTime"), self.tz
),
"update_time": TypeParser.parse_timestamp(
raw.get("update_time") or raw.get("updateTime"), self.tz
),
"system_role_id": raw.get("system_role_id"),
"online_status": raw.get("online_status"),
"allow_cx": raw.get("allow_cx"),
"charge_way": raw.get("charge_way"),
"pd_unit_price": TypeParser.parse_decimal(raw.get("pd_unit_price")),
"cx_unit_price": TypeParser.parse_decimal(raw.get("cx_unit_price")),
"is_guaranteed": raw.get("is_guaranteed"),
"is_team_leader": raw.get("is_team_leader"),
"serial_number": raw.get("serial_number"),
"show_sort": raw.get("show_sort"),
"is_delete": raw.get("is_delete"),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
"""平台券核销任务"""
import json
from .base_task import BaseTask
from loaders.facts.coupon_usage import CouponUsageLoader
from models.parsers import TypeParser
class CouponUsageTask(BaseTask):
"""同步平台券验证/核销记录"""
def get_task_code(self) -> str:
return "COUPON_USAGE"
def execute(self) -> dict:
self.logger.info("开始执行 COUPON_USAGE 任务")
window_start, window_end, _ = self._get_time_window()
params = {
"storeId": self.config.get("app.store_id"),
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}
try:
records, _ = self.api.get_paginated(
endpoint="/Coupon/UsageList",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=(),
)
parsed = []
for raw in records:
mapped = self._parse_usage(raw)
if mapped:
parsed.append(mapped)
loader = CouponUsageLoader(self.db)
inserted, updated, skipped = loader.upsert_coupon_usage(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"COUPON_USAGE 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("COUPON_USAGE 失败", exc_info=True)
raise
def _parse_usage(self, raw: dict) -> dict | None:
usage_id = TypeParser.parse_int(raw.get("id"))
if not usage_id:
self.logger.warning("跳过缺少 id 的券核销记录: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"usage_id": usage_id,
"coupon_code": raw.get("coupon_code"),
"coupon_channel": raw.get("coupon_channel"),
"coupon_name": raw.get("coupon_name"),
"sale_price": TypeParser.parse_decimal(raw.get("sale_price")),
"coupon_money": TypeParser.parse_decimal(raw.get("coupon_money")),
"coupon_free_time": TypeParser.parse_int(raw.get("coupon_free_time")),
"use_status": raw.get("use_status"),
"create_time": TypeParser.parse_timestamp(
raw.get("create_time") or raw.get("createTime"), self.tz
),
"consume_time": TypeParser.parse_timestamp(
raw.get("consume_time") or raw.get("consumeTime"), self.tz
),
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
"operator_name": raw.get("operator_name"),
"table_id": TypeParser.parse_int(raw.get("table_id")),
"site_order_id": TypeParser.parse_int(raw.get("site_order_id")),
"group_package_id": TypeParser.parse_int(raw.get("group_package_id")),
"coupon_remark": raw.get("coupon_remark"),
"deal_id": raw.get("deal_id"),
"certificate_id": raw.get("certificate_id"),
"verify_id": raw.get("verify_id"),
"is_delete": raw.get("is_delete"),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -0,0 +1,90 @@
# -*- coding: utf-8 -*-
"""库存变更任务"""
import json
from .base_task import BaseTask
from loaders.facts.inventory_change import InventoryChangeLoader
from models.parsers import TypeParser
class InventoryChangeTask(BaseTask):
"""同步库存变化记录"""
def get_task_code(self) -> str:
return "INVENTORY_CHANGE"
def execute(self) -> dict:
self.logger.info("开始执行 INVENTORY_CHANGE 任务")
window_start, window_end, _ = self._get_time_window()
params = {
"storeId": self.config.get("app.store_id"),
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}
try:
records, _ = self.api.get_paginated(
endpoint="/Inventory/ChangeList",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data", "queryDeliveryRecordsList"),
)
parsed = []
for raw in records:
mapped = self._parse_change(raw)
if mapped:
parsed.append(mapped)
loader = InventoryChangeLoader(self.db)
inserted, updated, skipped = loader.upsert_changes(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"INVENTORY_CHANGE 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("INVENTORY_CHANGE 失败", exc_info=True)
raise
def _parse_change(self, raw: dict) -> dict | None:
change_id = TypeParser.parse_int(
raw.get("siteGoodsStockId") or raw.get("site_goods_stock_id")
)
if not change_id:
self.logger.warning("跳过缺少变动 id 的库存记录: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"change_id": change_id,
"site_goods_id": TypeParser.parse_int(
raw.get("siteGoodsId") or raw.get("site_goods_id")
),
"stock_type": raw.get("stockType") or raw.get("stock_type"),
"goods_name": raw.get("goodsName"),
"change_time": TypeParser.parse_timestamp(
raw.get("createTime") or raw.get("create_time"), self.tz
),
"start_qty": TypeParser.parse_int(raw.get("startNum")),
"end_qty": TypeParser.parse_int(raw.get("endNum")),
"change_qty": TypeParser.parse_int(raw.get("changeNum")),
"unit": raw.get("unit"),
"price": TypeParser.parse_decimal(raw.get("price")),
"operator_name": raw.get("operatorName"),
"remark": raw.get("remark"),
"goods_category_id": TypeParser.parse_int(raw.get("goodsCategoryId")),
"goods_second_category_id": TypeParser.parse_int(
raw.get("goodsSecondCategoryId")
),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -0,0 +1,119 @@
# -*- coding: utf-8 -*-
"""助教流水任务"""
import json
from .base_task import BaseTask
from loaders.facts.assistant_ledger import AssistantLedgerLoader
from models.parsers import TypeParser
class LedgerTask(BaseTask):
"""同步助教服务台账"""
def get_task_code(self) -> str:
return "LEDGER"
def execute(self) -> dict:
self.logger.info("开始执行 LEDGER 任务")
window_start, window_end, _ = self._get_time_window()
params = {
"storeId": self.config.get("app.store_id"),
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}
try:
records, _ = self.api.get_paginated(
endpoint="/Assistant/LedgerList",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data", "orderAssistantDetails"),
)
parsed = []
for raw in records:
mapped = self._parse_ledger(raw)
if mapped:
parsed.append(mapped)
loader = AssistantLedgerLoader(self.db)
inserted, updated, skipped = loader.upsert_ledgers(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"LEDGER 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("LEDGER 失败", exc_info=True)
raise
def _parse_ledger(self, raw: dict) -> dict | None:
ledger_id = TypeParser.parse_int(raw.get("id"))
if not ledger_id:
self.logger.warning("跳过缺少 id 的助教流水: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"ledger_id": ledger_id,
"assistant_no": raw.get("assistantNo"),
"assistant_name": raw.get("assistantName"),
"nickname": raw.get("nickname"),
"level_name": raw.get("levelName"),
"table_name": raw.get("tableName"),
"ledger_unit_price": TypeParser.parse_decimal(raw.get("ledger_unit_price")),
"ledger_count": TypeParser.parse_int(raw.get("ledger_count")),
"ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")),
"projected_income": TypeParser.parse_decimal(raw.get("projected_income")),
"service_money": TypeParser.parse_decimal(raw.get("service_money")),
"member_discount_amount": TypeParser.parse_decimal(
raw.get("member_discount_amount")
),
"manual_discount_amount": TypeParser.parse_decimal(
raw.get("manual_discount_amount")
),
"coupon_deduct_money": TypeParser.parse_decimal(
raw.get("coupon_deduct_money")
),
"order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")),
"order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")),
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
"operator_name": raw.get("operator_name"),
"assistant_team_id": TypeParser.parse_int(raw.get("assistant_team_id")),
"assistant_level": raw.get("assistant_level"),
"site_table_id": TypeParser.parse_int(raw.get("site_table_id")),
"order_assistant_id": TypeParser.parse_int(raw.get("order_assistant_id")),
"site_assistant_id": TypeParser.parse_int(raw.get("site_assistant_id")),
"user_id": TypeParser.parse_int(raw.get("user_id")),
"ledger_start_time": TypeParser.parse_timestamp(
raw.get("ledger_start_time"), self.tz
),
"ledger_end_time": TypeParser.parse_timestamp(
raw.get("ledger_end_time"), self.tz
),
"start_use_time": TypeParser.parse_timestamp(
raw.get("start_use_time"), self.tz
),
"last_use_time": TypeParser.parse_timestamp(
raw.get("last_use_time"), self.tz
),
"income_seconds": TypeParser.parse_int(raw.get("income_seconds")),
"real_use_seconds": TypeParser.parse_int(raw.get("real_use_seconds")),
"is_trash": raw.get("is_trash"),
"trash_reason": raw.get("trash_reason"),
"is_confirm": raw.get("is_confirm"),
"ledger_status": raw.get("ledger_status"),
"create_time": TypeParser.parse_timestamp(
raw.get("create_time") or raw.get("createTime"), self.tz
),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
"""团购/套餐定义任务"""
import json
from .base_task import BaseTask
from loaders.dimensions.package import PackageDefinitionLoader
from models.parsers import TypeParser
class PackagesDefTask(BaseTask):
"""同步团购套餐定义"""
def get_task_code(self) -> str:
return "PACKAGES_DEF"
def execute(self) -> dict:
self.logger.info("开始执行 PACKAGES_DEF 任务")
params = {"storeId": self.config.get("app.store_id")}
try:
records, _ = self.api.get_paginated(
endpoint="/Package/List",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data", "packageCouponList"),
)
parsed = []
for raw in records:
mapped = self._parse_package(raw)
if mapped:
parsed.append(mapped)
loader = PackageDefinitionLoader(self.db)
inserted, updated, skipped = loader.upsert_packages(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"PACKAGES_DEF 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("PACKAGES_DEF 失败", exc_info=True)
raise
def _parse_package(self, raw: dict) -> dict | None:
package_id = TypeParser.parse_int(raw.get("id"))
if not package_id:
self.logger.warning("跳过缺少 id 的套餐数据: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"package_id": package_id,
"package_code": raw.get("package_id") or raw.get("packageId"),
"package_name": raw.get("package_name"),
"table_area_id": raw.get("table_area_id"),
"table_area_name": raw.get("table_area_name"),
"selling_price": TypeParser.parse_decimal(
raw.get("selling_price") or raw.get("sellingPrice")
),
"duration_seconds": TypeParser.parse_int(raw.get("duration")),
"start_time": TypeParser.parse_timestamp(
raw.get("start_time") or raw.get("startTime"), self.tz
),
"end_time": TypeParser.parse_timestamp(
raw.get("end_time") or raw.get("endTime"), self.tz
),
"type": raw.get("type"),
"is_enabled": raw.get("is_enabled"),
"is_delete": raw.get("is_delete"),
"usable_count": TypeParser.parse_int(raw.get("usable_count")),
"creator_name": raw.get("creator_name"),
"date_type": raw.get("date_type"),
"group_type": raw.get("group_type"),
"coupon_money": TypeParser.parse_decimal(
raw.get("coupon_money") or raw.get("couponMoney")
),
"area_tag_type": raw.get("area_tag_type"),
"system_group_type": raw.get("system_group_type"),
"card_type_ids": raw.get("card_type_ids"),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -0,0 +1,91 @@
# -*- coding: utf-8 -*-
"""退款记录任务"""
import json
from .base_task import BaseTask
from loaders.facts.refund import RefundLoader
from models.parsers import TypeParser
class RefundsTask(BaseTask):
"""同步支付退款流水"""
def get_task_code(self) -> str:
return "REFUNDS"
def execute(self) -> dict:
self.logger.info("开始执行 REFUNDS 任务")
window_start, window_end, _ = self._get_time_window()
params = {
"storeId": self.config.get("app.store_id"),
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}
try:
records, _ = self.api.get_paginated(
endpoint="/Pay/RefundList",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=(),
)
parsed = []
for raw in records:
mapped = self._parse_refund(raw)
if mapped:
parsed.append(mapped)
loader = RefundLoader(self.db)
inserted, updated, skipped = loader.upsert_refunds(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"REFUNDS 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("REFUNDS 失败", exc_info=True)
raise
def _parse_refund(self, raw: dict) -> dict | None:
refund_id = TypeParser.parse_int(raw.get("id"))
if not refund_id:
self.logger.warning("跳过缺少 id 的退款记录: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"refund_id": refund_id,
"site_id": TypeParser.parse_int(raw.get("site_id") or raw.get("siteId")),
"tenant_id": TypeParser.parse_int(raw.get("tenant_id") or raw.get("tenantId")),
"pay_amount": TypeParser.parse_decimal(raw.get("pay_amount")),
"pay_status": raw.get("pay_status"),
"pay_time": TypeParser.parse_timestamp(
raw.get("pay_time") or raw.get("payTime"), self.tz
),
"create_time": TypeParser.parse_timestamp(
raw.get("create_time") or raw.get("createTime"), self.tz
),
"relate_type": raw.get("relate_type"),
"relate_id": TypeParser.parse_int(raw.get("relate_id")),
"payment_method": raw.get("payment_method"),
"refund_amount": TypeParser.parse_decimal(raw.get("refund_amount")),
"action_type": raw.get("action_type"),
"pay_terminal": raw.get("pay_terminal"),
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
"channel_pay_no": raw.get("channel_pay_no"),
"channel_fee": TypeParser.parse_decimal(raw.get("channel_fee")),
"is_delete": raw.get("is_delete"),
"member_id": TypeParser.parse_int(raw.get("member_id")),
"member_card_id": TypeParser.parse_int(raw.get("member_card_id")),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -0,0 +1,92 @@
# -*- coding: utf-8 -*-
"""台费折扣任务"""
import json
from .base_task import BaseTask
from loaders.facts.table_discount import TableDiscountLoader
from models.parsers import TypeParser
class TableDiscountTask(BaseTask):
"""同步台费折扣/调价记录"""
def get_task_code(self) -> str:
return "TABLE_DISCOUNT"
def execute(self) -> dict:
self.logger.info("开始执行 TABLE_DISCOUNT 任务")
window_start, window_end, _ = self._get_time_window()
params = {
"storeId": self.config.get("app.store_id"),
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}
try:
records, _ = self.api.get_paginated(
endpoint="/Table/AdjustList",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data", "taiFeeAdjustInfos"),
)
parsed = []
for raw in records:
mapped = self._parse_discount(raw)
if mapped:
parsed.append(mapped)
loader = TableDiscountLoader(self.db)
inserted, updated, skipped = loader.upsert_discounts(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"TABLE_DISCOUNT 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("TABLE_DISCOUNT 失败", exc_info=True)
raise
def _parse_discount(self, raw: dict) -> dict | None:
discount_id = TypeParser.parse_int(raw.get("id"))
if not discount_id:
self.logger.warning("跳过缺少 id 的台费折扣记录: %s", raw)
return None
table_profile = raw.get("tableProfile") or {}
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"discount_id": discount_id,
"adjust_type": raw.get("adjust_type") or raw.get("adjustType"),
"applicant_id": TypeParser.parse_int(raw.get("applicant_id")),
"applicant_name": raw.get("applicant_name"),
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
"operator_name": raw.get("operator_name"),
"ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")),
"ledger_count": TypeParser.parse_int(raw.get("ledger_count")),
"ledger_name": raw.get("ledger_name"),
"ledger_status": raw.get("ledger_status"),
"order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")),
"order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")),
"site_table_id": TypeParser.parse_int(
raw.get("site_table_id") or table_profile.get("id")
),
"table_area_id": TypeParser.parse_int(
raw.get("tableAreaId") or table_profile.get("site_table_area_id")
),
"table_area_name": table_profile.get("site_table_area_name"),
"create_time": TypeParser.parse_timestamp(
raw.get("create_time") or raw.get("createTime"), self.tz
),
"is_delete": raw.get("is_delete"),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -1,4 +1,92 @@
<<<<<<< HEAD
class TablesTask(BaseTask):
def get_task_code(self) -> str: # 返回 "TABLES"
def execute(self) -> dict: # 拉取 /Table/GetSiteTables
def _parse_table(self, raw: dict) -> dict | None:
def _parse_table(self, raw: dict) -> dict | None:
=======
# -*- coding: utf-8 -*-
"""台桌档案任务"""
import json
from .base_task import BaseTask
from loaders.dimensions.table import TableLoader
from models.parsers import TypeParser
class TablesTask(BaseTask):
"""同步门店台桌列表"""
def get_task_code(self) -> str:
return "TABLES"
def execute(self) -> dict:
self.logger.info("开始执行 TABLES 任务")
params = {"storeId": self.config.get("app.store_id")}
try:
records, _ = self.api.get_paginated(
endpoint="/Table/GetSiteTables",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data", "siteTables"),
)
parsed = []
for raw in records:
mapped = self._parse_table(raw)
if mapped:
parsed.append(mapped)
loader = TableLoader(self.db)
inserted, updated, skipped = loader.upsert_tables(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"TABLES 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("TABLES 失败", exc_info=True)
raise
def _parse_table(self, raw: dict) -> dict | None:
table_id = TypeParser.parse_int(raw.get("id"))
if not table_id:
self.logger.warning("跳过缺少 table_id 的台桌记录: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"table_id": table_id,
"site_id": TypeParser.parse_int(raw.get("site_id") or raw.get("siteId")),
"area_id": TypeParser.parse_int(
raw.get("site_table_area_id") or raw.get("siteTableAreaId")
),
"area_name": raw.get("areaName") or raw.get("site_table_area_name"),
"table_name": raw.get("table_name") or raw.get("tableName"),
"table_price": TypeParser.parse_decimal(
raw.get("table_price") or raw.get("tablePrice")
),
"table_status": raw.get("table_status") or raw.get("tableStatus"),
"table_status_name": raw.get("tableStatusName"),
"light_status": raw.get("light_status"),
"is_rest_area": raw.get("is_rest_area"),
"show_status": raw.get("show_status"),
"virtual_table": raw.get("virtual_table"),
"charge_free": raw.get("charge_free"),
"only_allow_groupon": raw.get("only_allow_groupon"),
"is_online_reservation": raw.get("is_online_reservation"),
"created_time": TypeParser.parse_timestamp(
raw.get("create_time") or raw.get("createTime"), self.tz
),
"raw_data": json.dumps(raw, ensure_ascii=False),
}
>>>>>>> main

View File

@@ -0,0 +1,102 @@
# -*- coding: utf-8 -*-
"""充值记录任务"""
import json
from .base_task import BaseTask
from loaders.facts.topup import TopupLoader
from models.parsers import TypeParser
class TopupsTask(BaseTask):
"""同步储值充值结算记录"""
def get_task_code(self) -> str:
return "TOPUPS"
def execute(self) -> dict:
self.logger.info("开始执行 TOPUPS 任务")
window_start, window_end, _ = self._get_time_window()
params = {
"storeId": self.config.get("app.store_id"),
"startTime": TypeParser.format_timestamp(window_start, self.tz),
"endTime": TypeParser.format_timestamp(window_end, self.tz),
}
try:
records, _ = self.api.get_paginated(
endpoint="/Topup/SettleList",
params=params,
page_size=self.config.get("api.page_size", 200),
data_path=("data", "settleList"),
)
parsed = []
for raw in records:
mapped = self._parse_topup(raw)
if mapped:
parsed.append(mapped)
loader = TopupLoader(self.db)
inserted, updated, skipped = loader.upsert_topups(parsed)
self.db.commit()
counts = {
"fetched": len(records),
"inserted": inserted,
"updated": updated,
"skipped": skipped,
"errors": 0,
}
self.logger.info(f"TOPUPS 完成: {counts}")
return self._build_result("SUCCESS", counts)
except Exception:
self.db.rollback()
self.logger.error("TOPUPS 失败", exc_info=True)
raise
def _parse_topup(self, raw: dict) -> dict | None:
node = raw.get("settleList") if isinstance(raw.get("settleList"), dict) else raw
topup_id = TypeParser.parse_int(node.get("id"))
if not topup_id:
self.logger.warning("跳过缺少 id 的充值结算: %s", raw)
return None
store_id = self.config.get("app.store_id")
return {
"store_id": store_id,
"topup_id": topup_id,
"member_id": TypeParser.parse_int(node.get("memberId")),
"member_name": node.get("memberName"),
"member_phone": node.get("memberPhone"),
"card_id": TypeParser.parse_int(node.get("tenantMemberCardId")),
"card_type_name": node.get("memberCardTypeName"),
"pay_amount": TypeParser.parse_decimal(node.get("payAmount")),
"consume_money": TypeParser.parse_decimal(node.get("consumeMoney")),
"settle_status": node.get("settleStatus"),
"settle_type": node.get("settleType"),
"settle_name": node.get("settleName"),
"settle_relate_id": TypeParser.parse_int(node.get("settleRelateId")),
"pay_time": TypeParser.parse_timestamp(
node.get("payTime") or node.get("pay_time"), self.tz
),
"create_time": TypeParser.parse_timestamp(
node.get("createTime") or node.get("create_time"), self.tz
),
"operator_id": TypeParser.parse_int(node.get("operatorId")),
"operator_name": node.get("operatorName"),
"payment_method": node.get("paymentMethod"),
"refund_amount": TypeParser.parse_decimal(node.get("refundAmount")),
"cash_amount": TypeParser.parse_decimal(node.get("cashAmount")),
"card_amount": TypeParser.parse_decimal(node.get("cardAmount")),
"balance_amount": TypeParser.parse_decimal(node.get("balanceAmount")),
"online_amount": TypeParser.parse_decimal(node.get("onlineAmount")),
"rounding_amount": TypeParser.parse_decimal(node.get("roundingAmount")),
"adjust_amount": TypeParser.parse_decimal(node.get("adjustAmount")),
"goods_money": TypeParser.parse_decimal(node.get("goodsMoney")),
"table_charge_money": TypeParser.parse_decimal(node.get("tableChargeMoney")),
"service_money": TypeParser.parse_decimal(node.get("serviceMoney")),
"coupon_amount": TypeParser.parse_decimal(node.get("couponAmount")),
"order_remark": node.get("orderRemark"),
"raw_data": json.dumps(raw, ensure_ascii=False),
}

View File

@@ -0,0 +1,638 @@
# -*- coding: utf-8 -*-
"""ETL 任务测试的共用辅助模块,涵盖在线/离线模式所需的伪造数据、客户端与配置等工具函数。"""
from __future__ import annotations
import json
import os
from contextlib import contextmanager
from dataclasses import dataclass
from pathlib import Path
from typing import Dict, List, Sequence, Tuple, Type
from config.settings import AppConfig
from database.connection import DatabaseConnection
from database.operations import DatabaseOperations as PgDBOperations
from tasks.assistant_abolish_task import AssistantAbolishTask
from tasks.assistants_task import AssistantsTask
from tasks.coupon_usage_task import CouponUsageTask
from tasks.inventory_change_task import InventoryChangeTask
from tasks.ledger_task import LedgerTask
from tasks.members_task import MembersTask
from tasks.orders_task import OrdersTask
from tasks.packages_task import PackagesDefTask
from tasks.payments_task import PaymentsTask
from tasks.products_task import ProductsTask
from tasks.refunds_task import RefundsTask
from tasks.table_discount_task import TableDiscountTask
from tasks.tables_task import TablesTask
from tasks.topups_task import TopupsTask
DEFAULT_STORE_ID = 2790685415443269
BASE_TS = "2025-01-01 10:00:00"
END_TS = "2025-01-01 12:00:00"
@dataclass(frozen=True)
class TaskSpec:
"""描述单个任务在测试中如何被驱动的元数据包含任务代码、API 路径、数据路径与样例记录。"""
code: str
task_cls: Type
endpoint: str
data_path: Tuple[str, ...]
sample_records: List[Dict]
@property
def archive_filename(self) -> str:
return endpoint_to_filename(self.endpoint)
def endpoint_to_filename(endpoint: str) -> str:
"""根据 API endpoint 生成稳定可复用的文件名,便于离线模式在目录中直接定位归档 JSON。"""
normalized = endpoint.strip("/").replace("/", "__").replace(" ", "_").lower()
return f"{normalized or 'root'}.json"
def wrap_records(records: List[Dict], data_path: Sequence[str]):
"""按照 data_path 逐层包裹记录列表,使其结构与真实 API 返回体一致,方便离线回放。"""
payload = records
for key in reversed(data_path):
payload = {key: payload}
return payload
def create_test_config(mode: str, archive_dir: Path, temp_dir: Path) -> AppConfig:
"""构建一份适合测试的 AppConfig自动填充存储、日志、归档目录等参数并保证目录存在。"""
archive_dir = Path(archive_dir)
temp_dir = Path(temp_dir)
archive_dir.mkdir(parents=True, exist_ok=True)
temp_dir.mkdir(parents=True, exist_ok=True)
overrides = {
"app": {"store_id": DEFAULT_STORE_ID, "timezone": "Asia/Taipei"},
"db": {"dsn": "postgresql://user:pass@localhost:5432/etl_billiards_test"},
"api": {
"base_url": "https://api.example.com",
"token": "test-token",
"timeout_sec": 3,
"page_size": 50,
},
"testing": {
"mode": mode,
"json_archive_dir": str(archive_dir),
"temp_json_dir": str(temp_dir),
},
"io": {
"export_root": str(temp_dir / "export"),
"log_root": str(temp_dir / "logs"),
},
}
return AppConfig.load(overrides)
def dump_offline_payload(spec: TaskSpec, archive_dir: Path) -> Path:
"""将 TaskSpec 的样例数据写入指定归档目录,供离线测试回放使用,并返回生成文件的完整路径。"""
archive_dir = Path(archive_dir)
payload = wrap_records(spec.sample_records, spec.data_path)
file_path = archive_dir / spec.archive_filename
with file_path.open("w", encoding="utf-8") as fp:
json.dump(payload, fp, ensure_ascii=False)
return file_path
class FakeCursor:
"""极简游标桩对象,记录 SQL/参数并支持上下文管理,供 FakeDBOperations 与 SCD2Handler 使用。"""
def __init__(self, recorder: List[Dict]):
self.recorder = recorder
# pylint: disable=unused-argument
def execute(self, sql: str, params=None):
self.recorder.append({"sql": sql.strip(), "params": params})
def fetchone(self):
return None
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class FakeConnection:
"""仿 psycopg 连接对象,仅满足 SCD2Handler 对 cursor 的最小需求,并缓存执行过的语句。"""
def __init__(self):
self.statements: List[Dict] = []
def cursor(self):
return FakeCursor(self.statements)
class FakeDBOperations:
"""拦截并记录批量 upsert/事务操作,避免触碰真实数据库,同时提供 commit/rollback 计数。"""
def __init__(self):
self.upserts: List[Dict] = []
self.commits = 0
self.rollbacks = 0
self.conn = FakeConnection()
def batch_upsert_with_returning(self, sql: str, rows: List[Dict], page_size: int = 1000):
self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size})
return len(rows), 0
def batch_execute(self, sql: str, rows: List[Dict], page_size: int = 1000):
self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size})
def commit(self):
self.commits += 1
def rollback(self):
self.rollbacks += 1
class FakeAPIClient:
"""在线模式使用的伪 API Client直接返回预置的内存数据并记录调用以确保任务参数正确传递。"""
def __init__(self, data_map: Dict[str, List[Dict]]):
self.data_map = data_map
self.calls: List[Dict] = []
# pylint: disable=unused-argument
def get_paginated(self, endpoint: str, params=None, **kwargs):
self.calls.append({"endpoint": endpoint, "params": params})
if endpoint not in self.data_map:
raise AssertionError(f"Missing fixture for endpoint {endpoint}")
return list(self.data_map[endpoint]), [{"page": 1, "size": len(self.data_map[endpoint])}]
class OfflineAPIClient:
"""离线模式专用 API Client根据 endpoint 读取归档 JSON、套用 data_path 并回放列表数据。"""
def __init__(self, file_map: Dict[str, Path]):
self.file_map = {k: Path(v) for k, v in file_map.items()}
self.calls: List[Dict] = []
# pylint: disable=unused-argument
def get_paginated(self, endpoint: str, params=None, page_size: int = 200, data_path: Tuple[str, ...] = (), **kwargs):
self.calls.append({"endpoint": endpoint, "params": params})
if endpoint not in self.file_map:
raise AssertionError(f"Missing archive for endpoint {endpoint}")
with self.file_map[endpoint].open("r", encoding="utf-8") as fp:
payload = json.load(fp)
data = payload
for key in data_path:
if isinstance(data, dict):
data = data.get(key, [])
else:
data = []
break
if not isinstance(data, list):
data = []
return data, [{"page": 1, "mode": "offline"}]
class RealDBOperationsAdapter:
"""连接真实 PostgreSQL 的适配器,为任务提供 batch_upsert + 事务能力。"""
def __init__(self, dsn: str):
self._conn = DatabaseConnection(dsn)
self._ops = PgDBOperations(self._conn)
# SCD2Handler 会访问 db.conn.cursor(),因此暴露底层连接
self.conn = self._conn.conn
def batch_upsert_with_returning(self, sql: str, rows: List[Dict], page_size: int = 1000):
return self._ops.batch_upsert_with_returning(sql, rows, page_size=page_size)
def batch_execute(self, sql: str, rows: List[Dict], page_size: int = 1000):
return self._ops.batch_execute(sql, rows, page_size=page_size)
def commit(self):
self._conn.commit()
def rollback(self):
self._conn.rollback()
def close(self):
self._conn.close()
@contextmanager
def get_db_operations():
"""
测试专用的 DB 操作上下文:
- 若设置 TEST_DB_DSN则连接真实 PostgreSQL
- 否则回退到 FakeDBOperations内存桩
"""
dsn = os.environ.get("TEST_DB_DSN")
if dsn:
adapter = RealDBOperationsAdapter(dsn)
try:
yield adapter
finally:
adapter.close()
else:
fake = FakeDBOperations()
yield fake
TASK_SPECS: List[TaskSpec] = [
TaskSpec(
code="PRODUCTS",
task_cls=ProductsTask,
endpoint="/TenantGoods/QueryTenantGoods",
data_path=("data",),
sample_records=[
{
"siteGoodsId": 101,
"tenantGoodsId": 101,
"goodsName": "测试球杆",
"goodsCategoryId": 201,
"categoryName": "器材",
"goodsCategorySecondId": 202,
"goodsUnit": "",
"costPrice": "100.00",
"goodsPrice": "150.00",
"goodsState": "ON",
"supplierId": 20,
"barcode": "PRD001",
"isCombo": False,
"createTime": BASE_TS,
"updateTime": END_TS,
}
],
),
TaskSpec(
code="TABLES",
task_cls=TablesTask,
endpoint="/Table/GetSiteTables",
data_path=("data", "siteTables"),
sample_records=[
{
"id": 301,
"site_id": 30,
"site_table_area_id": 40,
"areaName": "大厅",
"table_name": "1号桌",
"table_price": "50.00",
"table_status": "FREE",
"tableStatusName": "空闲",
"light_status": "OFF",
"is_rest_area": False,
"show_status": True,
"virtual_table": False,
"charge_free": False,
"only_allow_groupon": False,
"is_online_reservation": True,
"createTime": BASE_TS,
}
],
),
TaskSpec(
code="MEMBERS",
task_cls=MembersTask,
endpoint="/MemberProfile/GetTenantMemberList",
data_path=("data",),
sample_records=[
{
"memberId": 401,
"memberName": "张三",
"phone": "13800000000",
"balance": "88.88",
"status": "ACTIVE",
"registerTime": BASE_TS,
}
],
),
TaskSpec(
code="ASSISTANTS",
task_cls=AssistantsTask,
endpoint="/Assistant/List",
data_path=("data", "assistantInfos"),
sample_records=[
{
"id": 501,
"assistant_no": "AS001",
"nickname": "小李",
"real_name": "李雷",
"gender": "M",
"mobile": "13900000000",
"level": "A",
"team_id": 10,
"team_name": "先锋队",
"assistant_status": "ON",
"work_status": "BUSY",
"entry_time": BASE_TS,
"resign_time": END_TS,
"start_time": BASE_TS,
"end_time": END_TS,
"create_time": BASE_TS,
"update_time": END_TS,
"system_role_id": 1,
"online_status": "ONLINE",
"allow_cx": True,
"charge_way": "TIME",
"pd_unit_price": "30.00",
"cx_unit_price": "20.00",
"is_guaranteed": True,
"is_team_leader": False,
"serial_number": "SN001",
"show_sort": 1,
"is_delete": False,
}
],
),
TaskSpec(
code="PACKAGES_DEF",
task_cls=PackagesDefTask,
endpoint="/Package/List",
data_path=("data", "packageCouponList"),
sample_records=[
{
"id": 601,
"package_id": "PKG001",
"package_name": "白天特惠",
"table_area_id": 70,
"table_area_name": "大厅",
"selling_price": "199.00",
"duration": 120,
"start_time": BASE_TS,
"end_time": END_TS,
"type": "Groupon",
"is_enabled": True,
"is_delete": False,
"usable_count": 3,
"creator_name": "系统",
"date_type": "WEEKDAY",
"group_type": "DINE_IN",
"coupon_money": "30.00",
"area_tag_type": "VIP",
"system_group_type": "BASIC",
"card_type_ids": "1,2,3",
}
],
),
TaskSpec(
code="ORDERS",
task_cls=OrdersTask,
endpoint="/order/list",
data_path=("data",),
sample_records=[
{
"orderId": 701,
"orderNo": "ORD001",
"memberId": 401,
"tableId": 301,
"orderTime": BASE_TS,
"endTime": END_TS,
"totalAmount": "300.00",
"discountAmount": "20.00",
"finalAmount": "280.00",
"payStatus": "PAID",
"orderStatus": "CLOSED",
"remark": "测试订单",
}
],
),
TaskSpec(
code="PAYMENTS",
task_cls=PaymentsTask,
endpoint="/pay/records",
data_path=("data",),
sample_records=[
{
"payId": 801,
"orderId": 701,
"payTime": END_TS,
"payAmount": "280.00",
"payType": "CARD",
"payStatus": "SUCCESS",
"remark": "测试支付",
}
],
),
TaskSpec(
code="REFUNDS",
task_cls=RefundsTask,
endpoint="/Pay/RefundList",
data_path=(),
sample_records=[
{
"id": 901,
"site_id": 1,
"tenant_id": 2,
"pay_amount": "100.00",
"pay_status": "SUCCESS",
"pay_time": END_TS,
"create_time": END_TS,
"relate_type": "ORDER",
"relate_id": 701,
"payment_method": "CARD",
"refund_amount": "20.00",
"action_type": "PARTIAL",
"pay_terminal": "POS",
"operator_id": 11,
"channel_pay_no": "CH001",
"channel_fee": "1.00",
"is_delete": False,
"member_id": 401,
"member_card_id": 501,
}
],
),
TaskSpec(
code="COUPON_USAGE",
task_cls=CouponUsageTask,
endpoint="/Coupon/UsageList",
data_path=(),
sample_records=[
{
"id": 1001,
"coupon_code": "CP001",
"coupon_channel": "MEITUAN",
"coupon_name": "双人券",
"sale_price": "50.00",
"coupon_money": "30.00",
"coupon_free_time": 60,
"use_status": "USED",
"create_time": BASE_TS,
"consume_time": END_TS,
"operator_id": 11,
"operator_name": "操作员",
"table_id": 301,
"site_order_id": 701,
"group_package_id": 601,
"coupon_remark": "备注",
"deal_id": "DEAL001",
"certificate_id": "CERT001",
"verify_id": "VERIFY001",
"is_delete": False,
}
],
),
TaskSpec(
code="INVENTORY_CHANGE",
task_cls=InventoryChangeTask,
endpoint="/Inventory/ChangeList",
data_path=("data", "queryDeliveryRecordsList"),
sample_records=[
{
"siteGoodsStockId": 1101,
"siteGoodsId": 101,
"stockType": "OUT",
"goodsName": "测试球杆",
"createTime": END_TS,
"startNum": 10,
"endNum": 8,
"changeNum": -2,
"unit": "",
"price": "120.00",
"operatorName": "仓管",
"remark": "测试出库",
"goodsCategoryId": 201,
"goodsSecondCategoryId": 202,
}
],
),
TaskSpec(
code="TOPUPS",
task_cls=TopupsTask,
endpoint="/Topup/SettleList",
data_path=("data", "settleList"),
sample_records=[
{
"id": 1201,
"memberId": 401,
"memberName": "张三",
"memberPhone": "13800000000",
"tenantMemberCardId": 1301,
"memberCardTypeName": "金卡",
"payAmount": "500.00",
"consumeMoney": "100.00",
"settleStatus": "DONE",
"settleType": "AUTO",
"settleName": "日结",
"settleRelateId": 1501,
"payTime": BASE_TS,
"createTime": END_TS,
"operatorId": 11,
"operatorName": "收银员",
"paymentMethod": "CASH",
"refundAmount": "0",
"cashAmount": "500.00",
"cardAmount": "0",
"balanceAmount": "0",
"onlineAmount": "0",
"roundingAmount": "0",
"adjustAmount": "0",
"goodsMoney": "0",
"tableChargeMoney": "0",
"serviceMoney": "0",
"couponAmount": "0",
"orderRemark": "首次充值",
}
],
),
TaskSpec(
code="TABLE_DISCOUNT",
task_cls=TableDiscountTask,
endpoint="/Table/AdjustList",
data_path=("data", "taiFeeAdjustInfos"),
sample_records=[
{
"id": 1301,
"adjust_type": "DISCOUNT",
"applicant_id": 11,
"applicant_name": "店长",
"operator_id": 22,
"operator_name": "值班",
"ledger_amount": "50.00",
"ledger_count": 2,
"ledger_name": "调价",
"ledger_status": "APPROVED",
"order_settle_id": 7010,
"order_trade_no": 8001,
"site_table_id": 301,
"create_time": END_TS,
"is_delete": False,
"tableProfile": {
"id": 301,
"site_table_area_id": 40,
"site_table_area_name": "大厅",
},
}
],
),
TaskSpec(
code="ASSISTANT_ABOLISH",
task_cls=AssistantAbolishTask,
endpoint="/Assistant/AbolishList",
data_path=("data", "abolitionAssistants"),
sample_records=[
{
"id": 1401,
"tableId": 301,
"tableName": "1号桌",
"tableAreaId": 40,
"tableArea": "大厅",
"assistantOn": "AS001",
"assistantName": "小李",
"pdChargeMinutes": 30,
"assistantAbolishAmount": "15.00",
"createTime": END_TS,
"trashReason": "测试",
}
],
),
TaskSpec(
code="LEDGER",
task_cls=LedgerTask,
endpoint="/Assistant/LedgerList",
data_path=("data", "orderAssistantDetails"),
sample_records=[
{
"id": 1501,
"assistantNo": "AS001",
"assistantName": "小李",
"nickname": "",
"levelName": "L1",
"tableName": "1号桌",
"ledger_unit_price": "30.00",
"ledger_count": 2,
"ledger_amount": "60.00",
"projected_income": "80.00",
"service_money": "5.00",
"member_discount_amount": "2.00",
"manual_discount_amount": "1.00",
"coupon_deduct_money": "3.00",
"order_trade_no": 8001,
"order_settle_id": 7010,
"operator_id": 22,
"operator_name": "值班",
"assistant_team_id": 10,
"assistant_level": "A",
"site_table_id": 301,
"order_assistant_id": 1601,
"site_assistant_id": 501,
"user_id": 5010,
"ledger_start_time": BASE_TS,
"ledger_end_time": END_TS,
"start_use_time": BASE_TS,
"last_use_time": END_TS,
"income_seconds": 3600,
"real_use_seconds": 3300,
"is_trash": False,
"trash_reason": "",
"is_confirm": True,
"ledger_status": "CLOSED",
"create_time": END_TS,
}
],
),
]

View File

@@ -0,0 +1,39 @@
# -*- coding: utf-8 -*-
"""离线模式任务测试,通过回放归档 JSON 来验证 T+L 链路可用。"""
import logging
from pathlib import Path
import pytest
from .task_test_utils import (
TASK_SPECS,
OfflineAPIClient,
create_test_config,
dump_offline_payload,
get_db_operations,
)
@pytest.mark.parametrize("spec", TASK_SPECS, ids=lambda spec: spec.code)
def test_task_offline_mode(spec, tmp_path):
"""确保每个任务都能读取归档 JSON 并完成 Transform + Load 操作。"""
archive_dir = tmp_path / "archive"
temp_dir = tmp_path / "tmp"
archive_dir.mkdir()
temp_dir.mkdir()
file_path = dump_offline_payload(spec, archive_dir)
config = create_test_config("OFFLINE", archive_dir, temp_dir)
offline_api = OfflineAPIClient({spec.endpoint: Path(file_path)})
logger = logging.getLogger(f"test_offline_{spec.code.lower()}")
with get_db_operations() as db_ops:
task = spec.task_cls(config, db_ops, offline_api, logger)
result = task.execute()
assert result["status"] == "SUCCESS"
assert result["counts"]["fetched"] == len(spec.sample_records)
assert result["counts"]["inserted"] == len(spec.sample_records)
if hasattr(db_ops, "commits"):
assert db_ops.commits == 1
assert db_ops.rollbacks == 0

View File

@@ -0,0 +1,32 @@
# -*- coding: utf-8 -*-
"""在线模式下的端到端任务测试,验证所有任务在模拟 API 下能顺利执行。"""
import logging
import pytest
from .task_test_utils import (
TASK_SPECS,
FakeAPIClient,
create_test_config,
get_db_operations,
)
@pytest.mark.parametrize("spec", TASK_SPECS, ids=lambda spec: spec.code)
def test_task_online_mode(spec, tmp_path):
"""针对每个 TaskSpec 验证:模拟 API 数据下依旧能完整跑完 ETL并正确统计。"""
archive_dir = tmp_path / "archive"
temp_dir = tmp_path / "tmp"
config = create_test_config("ONLINE", archive_dir, temp_dir)
fake_api = FakeAPIClient({spec.endpoint: spec.sample_records})
logger = logging.getLogger(f"test_online_{spec.code.lower()}")
with get_db_operations() as db_ops:
task = spec.task_cls(config, db_ops, fake_api, logger)
result = task.execute()
assert result["status"] == "SUCCESS"
assert result["counts"]["fetched"] == len(spec.sample_records)
assert result["counts"]["inserted"] == len(spec.sample_records)
if hasattr(db_ops, "commits"):
assert db_ops.commits == 1
assert db_ops.rollbacks == 0