Compare commits
7 Commits
84e80841cd
...
821d302243
| Author | SHA1 | Date | |
|---|---|---|---|
| 821d302243 | |||
| 9a1df70a23 | |||
| 5bb5a8a568 | |||
| c3749474c6 | |||
| 7f87421678 | |||
| 13d853c3f5 | |||
| 56fac9d3c0 |
46
etl_billiards/.env
Normal file
46
etl_billiards/.env
Normal file
@@ -0,0 +1,46 @@
|
||||
# 数据库配置
|
||||
PG_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ
|
||||
# PG_HOST=localhost
|
||||
# PG_PORT=5432
|
||||
# PG_NAME=LLZQ
|
||||
# PG_USER=local-Python
|
||||
# PG_PASSWORD=your_password_here
|
||||
|
||||
# API配置(抓取时,非球的一些配置)
|
||||
API_BASE=https://api.example.com # 非球URL前缀
|
||||
API_TOKEN=your_token_here # 登录Token
|
||||
# API_TIMEOUT=20
|
||||
# API_PAGE_SIZE=200
|
||||
# API_RETRY_MAX=3
|
||||
|
||||
# 应用配置
|
||||
STORE_ID=2790685415443269
|
||||
# TIMEZONE=Asia/Taipei
|
||||
# SCHEMA_OLTP=billiards
|
||||
# SCHEMA_ETL=etl_admin
|
||||
|
||||
# 路径配置
|
||||
EXPORT_ROOT=r"D:\LLZQ\DB\export",
|
||||
LOG_ROOT=r"D:\LLZQ\DB\logs",
|
||||
|
||||
# ETL配置
|
||||
OVERLAP_SECONDS=120 # 为了防止边界遗漏,会往前“回拨”一点的冗余秒数
|
||||
WINDOW_BUSY_MIN=30
|
||||
WINDOW_IDLE_MIN=180
|
||||
IDLE_START=04:00
|
||||
IDLE_END=16:00
|
||||
ALLOW_EMPTY_ADVANCE=true
|
||||
|
||||
# 清洗配置
|
||||
LOG_UNKNOWN_FIELDS=true
|
||||
HASH_ALGO=sha1
|
||||
STRICT_NUMERIC=true
|
||||
ROUND_MONEY_SCALE=2
|
||||
|
||||
# 测试/离线模式
|
||||
TEST_MODE=OFFLINE
|
||||
TEST_JSON_ARCHIVE_DIR=tests/testdata_json #指定离线模式(OFFLINE)要读取的 JSON 归档目录。测试或回放任务时,会从这个目录中找到各个任务预先保存的 API 响应文件,直接做 Transform + Load,不再访问真实接口。
|
||||
TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp #指定测试运行时临时生成或复制 JSON 文件的目录。在线/离线联动测试会把输出、中间文件等写到这个路径,既避免污染真实导出目录,也让 CI 可以在一次运行中隔离不同任务产生的临时数据。
|
||||
|
||||
# 测试数据库(留空则单元测试使用伪库)
|
||||
TEST_DB_DSN=postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test
|
||||
@@ -36,3 +36,14 @@ LOG_UNKNOWN_FIELDS=true
|
||||
HASH_ALGO=sha1
|
||||
STRICT_NUMERIC=true
|
||||
ROUND_MONEY_SCALE=2
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
|
||||
# 测试/离线模式
|
||||
TEST_MODE=ONLINE
|
||||
TEST_JSON_ARCHIVE_DIR=tests/testdata_json
|
||||
TEST_JSON_TEMP_DIR=/tmp/etl_billiards_json_tmp
|
||||
|
||||
# 测试数据库(可选:若设置则单元测试连入此 DSN)
|
||||
TEST_DB_DSN=
|
||||
>>>>>>> main
|
||||
|
||||
@@ -191,6 +191,37 @@ pytest --cov=. --cov-report=html
|
||||
- `tests/unit/test_parsers.py` – 解析器单元测试
|
||||
- `tests/integration/test_database.py` – 数据库集成测试
|
||||
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
#### 3.3.1 测试模式(ONLINE / OFFLINE)
|
||||
|
||||
- `TEST_MODE=ONLINE`(默认)时,测试会模拟实时 API,完整执行 E/T/L。
|
||||
- `TEST_MODE=OFFLINE` 时,测试改为从 `TEST_JSON_ARCHIVE_DIR` 指定的归档 JSON 中读取数据,仅做 Transform + Load,适合验证本地归档数据是否仍可回放。
|
||||
- `TEST_JSON_ARCHIVE_DIR`:离线 JSON 归档目录(示例:`tests/testdata_json` 或 CI 产出的快照)。
|
||||
- `TEST_JSON_TEMP_DIR`:测试生成的临时 JSON 输出目录,便于隔离每次运行的数据。
|
||||
- `TEST_DB_DSN`:可选,若设置则单元测试会连接到此 PostgreSQL DSN,实打实执行写库;留空时测试使用内存伪库,避免依赖数据库。
|
||||
|
||||
示例命令:
|
||||
|
||||
```bash
|
||||
# 在线模式覆盖所有任务
|
||||
TEST_MODE=ONLINE pytest tests/unit/test_etl_tasks_online.py
|
||||
|
||||
# 离线模式使用归档 JSON 覆盖所有任务
|
||||
TEST_MODE=OFFLINE TEST_JSON_ARCHIVE_DIR=tests/testdata_json pytest tests/unit/test_etl_tasks_offline.py
|
||||
|
||||
# 使用脚本按需组合参数(示例:在线 + 仅订单用例)
|
||||
python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS
|
||||
|
||||
# 使用脚本连接真实测试库并回放离线模式
|
||||
python scripts/run_tests.py --suite offline --mode OFFLINE --db-dsn postgresql://user:pwd@localhost:5432/testdb
|
||||
|
||||
# 使用“指令仓库”中的预置命令
|
||||
python scripts/run_tests.py --preset offline_realdb
|
||||
python scripts/run_tests.py --list-presets # 查看或自定义 scripts/test_presets.py
|
||||
```
|
||||
|
||||
>>>>>>> main
|
||||
---
|
||||
|
||||
## 4. 项目结构与文件说明
|
||||
@@ -277,6 +308,11 @@ etl_billiards/
|
||||
│ │ ├── __init__.py
|
||||
│ │ ├── test_config.py
|
||||
│ │ └── test_parsers.py
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
│ ├── testdata_json/ # 清洗入库用的测试Json文件
|
||||
│ │ └── XX.json
|
||||
>>>>>>> main
|
||||
│ └── integration/ # 集成测试
|
||||
│ ├── __init__.py
|
||||
│ └── test_database.py
|
||||
|
||||
@@ -76,6 +76,17 @@ DEFAULTS = {
|
||||
"redact_keys": ["token", "password", "Authorization"],
|
||||
"echo_token_in_logs": False,
|
||||
},
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
"testing": {
|
||||
# ONLINE: 正常实时ETL;OFFLINE: 读取归档JSON做T/L
|
||||
"mode": "OFFLINE",
|
||||
# 离线归档JSON所在目录
|
||||
"json_archive_dir": "",
|
||||
# 测试运行时用于生成/复制临时JSON的目录
|
||||
"temp_json_dir": "",
|
||||
},
|
||||
>>>>>>> main
|
||||
}
|
||||
|
||||
# 任务代码常量
|
||||
|
||||
@@ -24,6 +24,12 @@ ENV_MAP = {
|
||||
"OVERLAP_SECONDS": ("run.overlap_seconds",),
|
||||
"WINDOW_BUSY_MIN": ("run.window_minutes.default_busy",),
|
||||
"WINDOW_IDLE_MIN": ("run.window_minutes.default_idle",),
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
"TEST_MODE": ("testing.mode",),
|
||||
"TEST_JSON_ARCHIVE_DIR": ("testing.json_archive_dir",),
|
||||
"TEST_JSON_TEMP_DIR": ("testing.temp_json_dir",),
|
||||
>>>>>>> main
|
||||
}
|
||||
|
||||
def _deep_set(d, dotted_keys, value):
|
||||
|
||||
114
etl_billiards/loaders/dimensions/assistant.py
Normal file
114
etl_billiards/loaders/dimensions/assistant.py
Normal file
@@ -0,0 +1,114 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教维度加载器"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class AssistantLoader(BaseLoader):
|
||||
"""写入 dim_assistant"""
|
||||
|
||||
def upsert_assistants(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.dim_assistant (
|
||||
store_id,
|
||||
assistant_id,
|
||||
assistant_no,
|
||||
nickname,
|
||||
real_name,
|
||||
gender,
|
||||
mobile,
|
||||
level,
|
||||
team_id,
|
||||
team_name,
|
||||
assistant_status,
|
||||
work_status,
|
||||
entry_time,
|
||||
resign_time,
|
||||
start_time,
|
||||
end_time,
|
||||
create_time,
|
||||
update_time,
|
||||
system_role_id,
|
||||
online_status,
|
||||
allow_cx,
|
||||
charge_way,
|
||||
pd_unit_price,
|
||||
cx_unit_price,
|
||||
is_guaranteed,
|
||||
is_team_leader,
|
||||
serial_number,
|
||||
show_sort,
|
||||
is_delete,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(assistant_id)s,
|
||||
%(assistant_no)s,
|
||||
%(nickname)s,
|
||||
%(real_name)s,
|
||||
%(gender)s,
|
||||
%(mobile)s,
|
||||
%(level)s,
|
||||
%(team_id)s,
|
||||
%(team_name)s,
|
||||
%(assistant_status)s,
|
||||
%(work_status)s,
|
||||
%(entry_time)s,
|
||||
%(resign_time)s,
|
||||
%(start_time)s,
|
||||
%(end_time)s,
|
||||
%(create_time)s,
|
||||
%(update_time)s,
|
||||
%(system_role_id)s,
|
||||
%(online_status)s,
|
||||
%(allow_cx)s,
|
||||
%(charge_way)s,
|
||||
%(pd_unit_price)s,
|
||||
%(cx_unit_price)s,
|
||||
%(is_guaranteed)s,
|
||||
%(is_team_leader)s,
|
||||
%(serial_number)s,
|
||||
%(show_sort)s,
|
||||
%(is_delete)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, assistant_id) DO UPDATE SET
|
||||
assistant_no = EXCLUDED.assistant_no,
|
||||
nickname = EXCLUDED.nickname,
|
||||
real_name = EXCLUDED.real_name,
|
||||
gender = EXCLUDED.gender,
|
||||
mobile = EXCLUDED.mobile,
|
||||
level = EXCLUDED.level,
|
||||
team_id = EXCLUDED.team_id,
|
||||
team_name = EXCLUDED.team_name,
|
||||
assistant_status= EXCLUDED.assistant_status,
|
||||
work_status = EXCLUDED.work_status,
|
||||
entry_time = EXCLUDED.entry_time,
|
||||
resign_time = EXCLUDED.resign_time,
|
||||
start_time = EXCLUDED.start_time,
|
||||
end_time = EXCLUDED.end_time,
|
||||
update_time = COALESCE(EXCLUDED.update_time, now()),
|
||||
system_role_id = EXCLUDED.system_role_id,
|
||||
online_status = EXCLUDED.online_status,
|
||||
allow_cx = EXCLUDED.allow_cx,
|
||||
charge_way = EXCLUDED.charge_way,
|
||||
pd_unit_price = EXCLUDED.pd_unit_price,
|
||||
cx_unit_price = EXCLUDED.cx_unit_price,
|
||||
is_guaranteed = EXCLUDED.is_guaranteed,
|
||||
is_team_leader = EXCLUDED.is_team_leader,
|
||||
serial_number = EXCLUDED.serial_number,
|
||||
show_sort = EXCLUDED.show_sort,
|
||||
is_delete = EXCLUDED.is_delete,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
91
etl_billiards/loaders/dimensions/package.py
Normal file
91
etl_billiards/loaders/dimensions/package.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""团购/套餐定义加载器"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class PackageDefinitionLoader(BaseLoader):
|
||||
"""写入 dim_package_coupon"""
|
||||
|
||||
def upsert_packages(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.dim_package_coupon (
|
||||
store_id,
|
||||
package_id,
|
||||
package_code,
|
||||
package_name,
|
||||
table_area_id,
|
||||
table_area_name,
|
||||
selling_price,
|
||||
duration_seconds,
|
||||
start_time,
|
||||
end_time,
|
||||
type,
|
||||
is_enabled,
|
||||
is_delete,
|
||||
usable_count,
|
||||
creator_name,
|
||||
date_type,
|
||||
group_type,
|
||||
coupon_money,
|
||||
area_tag_type,
|
||||
system_group_type,
|
||||
card_type_ids,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(package_id)s,
|
||||
%(package_code)s,
|
||||
%(package_name)s,
|
||||
%(table_area_id)s,
|
||||
%(table_area_name)s,
|
||||
%(selling_price)s,
|
||||
%(duration_seconds)s,
|
||||
%(start_time)s,
|
||||
%(end_time)s,
|
||||
%(type)s,
|
||||
%(is_enabled)s,
|
||||
%(is_delete)s,
|
||||
%(usable_count)s,
|
||||
%(creator_name)s,
|
||||
%(date_type)s,
|
||||
%(group_type)s,
|
||||
%(coupon_money)s,
|
||||
%(area_tag_type)s,
|
||||
%(system_group_type)s,
|
||||
%(card_type_ids)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, package_id) DO UPDATE SET
|
||||
package_code = EXCLUDED.package_code,
|
||||
package_name = EXCLUDED.package_name,
|
||||
table_area_id = EXCLUDED.table_area_id,
|
||||
table_area_name = EXCLUDED.table_area_name,
|
||||
selling_price = EXCLUDED.selling_price,
|
||||
duration_seconds = EXCLUDED.duration_seconds,
|
||||
start_time = EXCLUDED.start_time,
|
||||
end_time = EXCLUDED.end_time,
|
||||
type = EXCLUDED.type,
|
||||
is_enabled = EXCLUDED.is_enabled,
|
||||
is_delete = EXCLUDED.is_delete,
|
||||
usable_count = EXCLUDED.usable_count,
|
||||
creator_name = EXCLUDED.creator_name,
|
||||
date_type = EXCLUDED.date_type,
|
||||
group_type = EXCLUDED.group_type,
|
||||
coupon_money = EXCLUDED.coupon_money,
|
||||
area_tag_type = EXCLUDED.area_tag_type,
|
||||
system_group_type = EXCLUDED.system_group_type,
|
||||
card_type_ids = EXCLUDED.card_type_ids,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
@@ -0,0 +1,80 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""台桌维度加载器"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class TableLoader(BaseLoader):
|
||||
"""将台桌档案写入 dim_table"""
|
||||
|
||||
def upsert_tables(self, records: list) -> tuple:
|
||||
"""批量写入台桌档案"""
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.dim_table (
|
||||
store_id,
|
||||
table_id,
|
||||
site_id,
|
||||
area_id,
|
||||
area_name,
|
||||
table_name,
|
||||
table_price,
|
||||
table_status,
|
||||
table_status_name,
|
||||
light_status,
|
||||
is_rest_area,
|
||||
show_status,
|
||||
virtual_table,
|
||||
charge_free,
|
||||
only_allow_groupon,
|
||||
is_online_reservation,
|
||||
created_time,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(table_id)s,
|
||||
%(site_id)s,
|
||||
%(area_id)s,
|
||||
%(area_name)s,
|
||||
%(table_name)s,
|
||||
%(table_price)s,
|
||||
%(table_status)s,
|
||||
%(table_status_name)s,
|
||||
%(light_status)s,
|
||||
%(is_rest_area)s,
|
||||
%(show_status)s,
|
||||
%(virtual_table)s,
|
||||
%(charge_free)s,
|
||||
%(only_allow_groupon)s,
|
||||
%(is_online_reservation)s,
|
||||
%(created_time)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, table_id) DO UPDATE SET
|
||||
site_id = EXCLUDED.site_id,
|
||||
area_id = EXCLUDED.area_id,
|
||||
area_name = EXCLUDED.area_name,
|
||||
table_name = EXCLUDED.table_name,
|
||||
table_price = EXCLUDED.table_price,
|
||||
table_status = EXCLUDED.table_status,
|
||||
table_status_name = EXCLUDED.table_status_name,
|
||||
light_status = EXCLUDED.light_status,
|
||||
is_rest_area = EXCLUDED.is_rest_area,
|
||||
show_status = EXCLUDED.show_status,
|
||||
virtual_table = EXCLUDED.virtual_table,
|
||||
charge_free = EXCLUDED.charge_free,
|
||||
only_allow_groupon = EXCLUDED.only_allow_groupon,
|
||||
is_online_reservation = EXCLUDED.is_online_reservation,
|
||||
created_time = COALESCE(EXCLUDED.created_time, dim_table.created_time),
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
|
||||
64
etl_billiards/loaders/facts/assistant_abolish.py
Normal file
64
etl_billiards/loaders/facts/assistant_abolish.py
Normal file
@@ -0,0 +1,64 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教作废事实表"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class AssistantAbolishLoader(BaseLoader):
|
||||
"""写入 fact_assistant_abolish"""
|
||||
|
||||
def upsert_records(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_assistant_abolish (
|
||||
store_id,
|
||||
abolish_id,
|
||||
table_id,
|
||||
table_name,
|
||||
table_area_id,
|
||||
table_area,
|
||||
assistant_no,
|
||||
assistant_name,
|
||||
charge_minutes,
|
||||
abolish_amount,
|
||||
create_time,
|
||||
trash_reason,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(abolish_id)s,
|
||||
%(table_id)s,
|
||||
%(table_name)s,
|
||||
%(table_area_id)s,
|
||||
%(table_area)s,
|
||||
%(assistant_no)s,
|
||||
%(assistant_name)s,
|
||||
%(charge_minutes)s,
|
||||
%(abolish_amount)s,
|
||||
%(create_time)s,
|
||||
%(trash_reason)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, abolish_id) DO UPDATE SET
|
||||
table_id = EXCLUDED.table_id,
|
||||
table_name = EXCLUDED.table_name,
|
||||
table_area_id = EXCLUDED.table_area_id,
|
||||
table_area = EXCLUDED.table_area,
|
||||
assistant_no = EXCLUDED.assistant_no,
|
||||
assistant_name = EXCLUDED.assistant_name,
|
||||
charge_minutes = EXCLUDED.charge_minutes,
|
||||
abolish_amount = EXCLUDED.abolish_amount,
|
||||
create_time = EXCLUDED.create_time,
|
||||
trash_reason = EXCLUDED.trash_reason,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
136
etl_billiards/loaders/facts/assistant_ledger.py
Normal file
136
etl_billiards/loaders/facts/assistant_ledger.py
Normal file
@@ -0,0 +1,136 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教流水事实表"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class AssistantLedgerLoader(BaseLoader):
|
||||
"""写入 fact_assistant_ledger"""
|
||||
|
||||
def upsert_ledgers(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_assistant_ledger (
|
||||
store_id,
|
||||
ledger_id,
|
||||
assistant_no,
|
||||
assistant_name,
|
||||
nickname,
|
||||
level_name,
|
||||
table_name,
|
||||
ledger_unit_price,
|
||||
ledger_count,
|
||||
ledger_amount,
|
||||
projected_income,
|
||||
service_money,
|
||||
member_discount_amount,
|
||||
manual_discount_amount,
|
||||
coupon_deduct_money,
|
||||
order_trade_no,
|
||||
order_settle_id,
|
||||
operator_id,
|
||||
operator_name,
|
||||
assistant_team_id,
|
||||
assistant_level,
|
||||
site_table_id,
|
||||
order_assistant_id,
|
||||
site_assistant_id,
|
||||
user_id,
|
||||
ledger_start_time,
|
||||
ledger_end_time,
|
||||
start_use_time,
|
||||
last_use_time,
|
||||
income_seconds,
|
||||
real_use_seconds,
|
||||
is_trash,
|
||||
trash_reason,
|
||||
is_confirm,
|
||||
ledger_status,
|
||||
create_time,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(ledger_id)s,
|
||||
%(assistant_no)s,
|
||||
%(assistant_name)s,
|
||||
%(nickname)s,
|
||||
%(level_name)s,
|
||||
%(table_name)s,
|
||||
%(ledger_unit_price)s,
|
||||
%(ledger_count)s,
|
||||
%(ledger_amount)s,
|
||||
%(projected_income)s,
|
||||
%(service_money)s,
|
||||
%(member_discount_amount)s,
|
||||
%(manual_discount_amount)s,
|
||||
%(coupon_deduct_money)s,
|
||||
%(order_trade_no)s,
|
||||
%(order_settle_id)s,
|
||||
%(operator_id)s,
|
||||
%(operator_name)s,
|
||||
%(assistant_team_id)s,
|
||||
%(assistant_level)s,
|
||||
%(site_table_id)s,
|
||||
%(order_assistant_id)s,
|
||||
%(site_assistant_id)s,
|
||||
%(user_id)s,
|
||||
%(ledger_start_time)s,
|
||||
%(ledger_end_time)s,
|
||||
%(start_use_time)s,
|
||||
%(last_use_time)s,
|
||||
%(income_seconds)s,
|
||||
%(real_use_seconds)s,
|
||||
%(is_trash)s,
|
||||
%(trash_reason)s,
|
||||
%(is_confirm)s,
|
||||
%(ledger_status)s,
|
||||
%(create_time)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, ledger_id) DO UPDATE SET
|
||||
assistant_no = EXCLUDED.assistant_no,
|
||||
assistant_name = EXCLUDED.assistant_name,
|
||||
nickname = EXCLUDED.nickname,
|
||||
level_name = EXCLUDED.level_name,
|
||||
table_name = EXCLUDED.table_name,
|
||||
ledger_unit_price = EXCLUDED.ledger_unit_price,
|
||||
ledger_count = EXCLUDED.ledger_count,
|
||||
ledger_amount = EXCLUDED.ledger_amount,
|
||||
projected_income = EXCLUDED.projected_income,
|
||||
service_money = EXCLUDED.service_money,
|
||||
member_discount_amount = EXCLUDED.member_discount_amount,
|
||||
manual_discount_amount = EXCLUDED.manual_discount_amount,
|
||||
coupon_deduct_money = EXCLUDED.coupon_deduct_money,
|
||||
order_trade_no = EXCLUDED.order_trade_no,
|
||||
order_settle_id = EXCLUDED.order_settle_id,
|
||||
operator_id = EXCLUDED.operator_id,
|
||||
operator_name = EXCLUDED.operator_name,
|
||||
assistant_team_id = EXCLUDED.assistant_team_id,
|
||||
assistant_level = EXCLUDED.assistant_level,
|
||||
site_table_id = EXCLUDED.site_table_id,
|
||||
order_assistant_id = EXCLUDED.order_assistant_id,
|
||||
site_assistant_id = EXCLUDED.site_assistant_id,
|
||||
user_id = EXCLUDED.user_id,
|
||||
ledger_start_time = EXCLUDED.ledger_start_time,
|
||||
ledger_end_time = EXCLUDED.ledger_end_time,
|
||||
start_use_time = EXCLUDED.start_use_time,
|
||||
last_use_time = EXCLUDED.last_use_time,
|
||||
income_seconds = EXCLUDED.income_seconds,
|
||||
real_use_seconds = EXCLUDED.real_use_seconds,
|
||||
is_trash = EXCLUDED.is_trash,
|
||||
trash_reason = EXCLUDED.trash_reason,
|
||||
is_confirm = EXCLUDED.is_confirm,
|
||||
ledger_status = EXCLUDED.ledger_status,
|
||||
create_time = EXCLUDED.create_time,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
91
etl_billiards/loaders/facts/coupon_usage.py
Normal file
91
etl_billiards/loaders/facts/coupon_usage.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""券核销事实表"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class CouponUsageLoader(BaseLoader):
|
||||
"""写入 fact_coupon_usage"""
|
||||
|
||||
def upsert_coupon_usage(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_coupon_usage (
|
||||
store_id,
|
||||
usage_id,
|
||||
coupon_code,
|
||||
coupon_channel,
|
||||
coupon_name,
|
||||
sale_price,
|
||||
coupon_money,
|
||||
coupon_free_time,
|
||||
use_status,
|
||||
create_time,
|
||||
consume_time,
|
||||
operator_id,
|
||||
operator_name,
|
||||
table_id,
|
||||
site_order_id,
|
||||
group_package_id,
|
||||
coupon_remark,
|
||||
deal_id,
|
||||
certificate_id,
|
||||
verify_id,
|
||||
is_delete,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(usage_id)s,
|
||||
%(coupon_code)s,
|
||||
%(coupon_channel)s,
|
||||
%(coupon_name)s,
|
||||
%(sale_price)s,
|
||||
%(coupon_money)s,
|
||||
%(coupon_free_time)s,
|
||||
%(use_status)s,
|
||||
%(create_time)s,
|
||||
%(consume_time)s,
|
||||
%(operator_id)s,
|
||||
%(operator_name)s,
|
||||
%(table_id)s,
|
||||
%(site_order_id)s,
|
||||
%(group_package_id)s,
|
||||
%(coupon_remark)s,
|
||||
%(deal_id)s,
|
||||
%(certificate_id)s,
|
||||
%(verify_id)s,
|
||||
%(is_delete)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, usage_id) DO UPDATE SET
|
||||
coupon_code = EXCLUDED.coupon_code,
|
||||
coupon_channel = EXCLUDED.coupon_channel,
|
||||
coupon_name = EXCLUDED.coupon_name,
|
||||
sale_price = EXCLUDED.sale_price,
|
||||
coupon_money = EXCLUDED.coupon_money,
|
||||
coupon_free_time = EXCLUDED.coupon_free_time,
|
||||
use_status = EXCLUDED.use_status,
|
||||
create_time = EXCLUDED.create_time,
|
||||
consume_time = EXCLUDED.consume_time,
|
||||
operator_id = EXCLUDED.operator_id,
|
||||
operator_name = EXCLUDED.operator_name,
|
||||
table_id = EXCLUDED.table_id,
|
||||
site_order_id = EXCLUDED.site_order_id,
|
||||
group_package_id = EXCLUDED.group_package_id,
|
||||
coupon_remark = EXCLUDED.coupon_remark,
|
||||
deal_id = EXCLUDED.deal_id,
|
||||
certificate_id = EXCLUDED.certificate_id,
|
||||
verify_id = EXCLUDED.verify_id,
|
||||
is_delete = EXCLUDED.is_delete,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
73
etl_billiards/loaders/facts/inventory_change.py
Normal file
73
etl_billiards/loaders/facts/inventory_change.py
Normal file
@@ -0,0 +1,73 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""库存变动事实表"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class InventoryChangeLoader(BaseLoader):
|
||||
"""写入 fact_inventory_change"""
|
||||
|
||||
def upsert_changes(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_inventory_change (
|
||||
store_id,
|
||||
change_id,
|
||||
site_goods_id,
|
||||
stock_type,
|
||||
goods_name,
|
||||
change_time,
|
||||
start_qty,
|
||||
end_qty,
|
||||
change_qty,
|
||||
unit,
|
||||
price,
|
||||
operator_name,
|
||||
remark,
|
||||
goods_category_id,
|
||||
goods_second_category_id,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(change_id)s,
|
||||
%(site_goods_id)s,
|
||||
%(stock_type)s,
|
||||
%(goods_name)s,
|
||||
%(change_time)s,
|
||||
%(start_qty)s,
|
||||
%(end_qty)s,
|
||||
%(change_qty)s,
|
||||
%(unit)s,
|
||||
%(price)s,
|
||||
%(operator_name)s,
|
||||
%(remark)s,
|
||||
%(goods_category_id)s,
|
||||
%(goods_second_category_id)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, change_id) DO UPDATE SET
|
||||
site_goods_id = EXCLUDED.site_goods_id,
|
||||
stock_type = EXCLUDED.stock_type,
|
||||
goods_name = EXCLUDED.goods_name,
|
||||
change_time = EXCLUDED.change_time,
|
||||
start_qty = EXCLUDED.start_qty,
|
||||
end_qty = EXCLUDED.end_qty,
|
||||
change_qty = EXCLUDED.change_qty,
|
||||
unit = EXCLUDED.unit,
|
||||
price = EXCLUDED.price,
|
||||
operator_name = EXCLUDED.operator_name,
|
||||
remark = EXCLUDED.remark,
|
||||
goods_category_id = EXCLUDED.goods_category_id,
|
||||
goods_second_category_id = EXCLUDED.goods_second_category_id,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
88
etl_billiards/loaders/facts/refund.py
Normal file
88
etl_billiards/loaders/facts/refund.py
Normal file
@@ -0,0 +1,88 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""退款事实表加载器"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class RefundLoader(BaseLoader):
|
||||
"""写入 fact_refund"""
|
||||
|
||||
def upsert_refunds(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_refund (
|
||||
store_id,
|
||||
refund_id,
|
||||
site_id,
|
||||
tenant_id,
|
||||
pay_amount,
|
||||
pay_status,
|
||||
pay_time,
|
||||
create_time,
|
||||
relate_type,
|
||||
relate_id,
|
||||
payment_method,
|
||||
refund_amount,
|
||||
action_type,
|
||||
pay_terminal,
|
||||
operator_id,
|
||||
channel_pay_no,
|
||||
channel_fee,
|
||||
is_delete,
|
||||
member_id,
|
||||
member_card_id,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(refund_id)s,
|
||||
%(site_id)s,
|
||||
%(tenant_id)s,
|
||||
%(pay_amount)s,
|
||||
%(pay_status)s,
|
||||
%(pay_time)s,
|
||||
%(create_time)s,
|
||||
%(relate_type)s,
|
||||
%(relate_id)s,
|
||||
%(payment_method)s,
|
||||
%(refund_amount)s,
|
||||
%(action_type)s,
|
||||
%(pay_terminal)s,
|
||||
%(operator_id)s,
|
||||
%(channel_pay_no)s,
|
||||
%(channel_fee)s,
|
||||
%(is_delete)s,
|
||||
%(member_id)s,
|
||||
%(member_card_id)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, refund_id) DO UPDATE SET
|
||||
site_id = EXCLUDED.site_id,
|
||||
tenant_id = EXCLUDED.tenant_id,
|
||||
pay_amount = EXCLUDED.pay_amount,
|
||||
pay_status = EXCLUDED.pay_status,
|
||||
pay_time = EXCLUDED.pay_time,
|
||||
create_time = EXCLUDED.create_time,
|
||||
relate_type = EXCLUDED.relate_type,
|
||||
relate_id = EXCLUDED.relate_id,
|
||||
payment_method = EXCLUDED.payment_method,
|
||||
refund_amount = EXCLUDED.refund_amount,
|
||||
action_type = EXCLUDED.action_type,
|
||||
pay_terminal = EXCLUDED.pay_terminal,
|
||||
operator_id = EXCLUDED.operator_id,
|
||||
channel_pay_no = EXCLUDED.channel_pay_no,
|
||||
channel_fee = EXCLUDED.channel_fee,
|
||||
is_delete = EXCLUDED.is_delete,
|
||||
member_id = EXCLUDED.member_id,
|
||||
member_card_id = EXCLUDED.member_card_id,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
82
etl_billiards/loaders/facts/table_discount.py
Normal file
82
etl_billiards/loaders/facts/table_discount.py
Normal file
@@ -0,0 +1,82 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""台费打折事实表"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class TableDiscountLoader(BaseLoader):
|
||||
"""写入 fact_table_discount"""
|
||||
|
||||
def upsert_discounts(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_table_discount (
|
||||
store_id,
|
||||
discount_id,
|
||||
adjust_type,
|
||||
applicant_id,
|
||||
applicant_name,
|
||||
operator_id,
|
||||
operator_name,
|
||||
ledger_amount,
|
||||
ledger_count,
|
||||
ledger_name,
|
||||
ledger_status,
|
||||
order_settle_id,
|
||||
order_trade_no,
|
||||
site_table_id,
|
||||
table_area_id,
|
||||
table_area_name,
|
||||
create_time,
|
||||
is_delete,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(discount_id)s,
|
||||
%(adjust_type)s,
|
||||
%(applicant_id)s,
|
||||
%(applicant_name)s,
|
||||
%(operator_id)s,
|
||||
%(operator_name)s,
|
||||
%(ledger_amount)s,
|
||||
%(ledger_count)s,
|
||||
%(ledger_name)s,
|
||||
%(ledger_status)s,
|
||||
%(order_settle_id)s,
|
||||
%(order_trade_no)s,
|
||||
%(site_table_id)s,
|
||||
%(table_area_id)s,
|
||||
%(table_area_name)s,
|
||||
%(create_time)s,
|
||||
%(is_delete)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, discount_id) DO UPDATE SET
|
||||
adjust_type = EXCLUDED.adjust_type,
|
||||
applicant_id = EXCLUDED.applicant_id,
|
||||
applicant_name = EXCLUDED.applicant_name,
|
||||
operator_id = EXCLUDED.operator_id,
|
||||
operator_name = EXCLUDED.operator_name,
|
||||
ledger_amount = EXCLUDED.ledger_amount,
|
||||
ledger_count = EXCLUDED.ledger_count,
|
||||
ledger_name = EXCLUDED.ledger_name,
|
||||
ledger_status = EXCLUDED.ledger_status,
|
||||
order_settle_id = EXCLUDED.order_settle_id,
|
||||
order_trade_no = EXCLUDED.order_trade_no,
|
||||
site_table_id = EXCLUDED.site_table_id,
|
||||
table_area_id = EXCLUDED.table_area_id,
|
||||
table_area_name = EXCLUDED.table_area_name,
|
||||
create_time = EXCLUDED.create_time,
|
||||
is_delete = EXCLUDED.is_delete,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
118
etl_billiards/loaders/facts/topup.py
Normal file
118
etl_billiards/loaders/facts/topup.py
Normal file
@@ -0,0 +1,118 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""充值记录事实表"""
|
||||
|
||||
from ..base_loader import BaseLoader
|
||||
|
||||
|
||||
class TopupLoader(BaseLoader):
|
||||
"""写入 fact_topup"""
|
||||
|
||||
def upsert_topups(self, records: list) -> tuple:
|
||||
if not records:
|
||||
return (0, 0, 0)
|
||||
|
||||
sql = """
|
||||
INSERT INTO billiards.fact_topup (
|
||||
store_id,
|
||||
topup_id,
|
||||
member_id,
|
||||
member_name,
|
||||
member_phone,
|
||||
card_id,
|
||||
card_type_name,
|
||||
pay_amount,
|
||||
consume_money,
|
||||
settle_status,
|
||||
settle_type,
|
||||
settle_name,
|
||||
settle_relate_id,
|
||||
pay_time,
|
||||
create_time,
|
||||
operator_id,
|
||||
operator_name,
|
||||
payment_method,
|
||||
refund_amount,
|
||||
cash_amount,
|
||||
card_amount,
|
||||
balance_amount,
|
||||
online_amount,
|
||||
rounding_amount,
|
||||
adjust_amount,
|
||||
goods_money,
|
||||
table_charge_money,
|
||||
service_money,
|
||||
coupon_amount,
|
||||
order_remark,
|
||||
raw_data
|
||||
)
|
||||
VALUES (
|
||||
%(store_id)s,
|
||||
%(topup_id)s,
|
||||
%(member_id)s,
|
||||
%(member_name)s,
|
||||
%(member_phone)s,
|
||||
%(card_id)s,
|
||||
%(card_type_name)s,
|
||||
%(pay_amount)s,
|
||||
%(consume_money)s,
|
||||
%(settle_status)s,
|
||||
%(settle_type)s,
|
||||
%(settle_name)s,
|
||||
%(settle_relate_id)s,
|
||||
%(pay_time)s,
|
||||
%(create_time)s,
|
||||
%(operator_id)s,
|
||||
%(operator_name)s,
|
||||
%(payment_method)s,
|
||||
%(refund_amount)s,
|
||||
%(cash_amount)s,
|
||||
%(card_amount)s,
|
||||
%(balance_amount)s,
|
||||
%(online_amount)s,
|
||||
%(rounding_amount)s,
|
||||
%(adjust_amount)s,
|
||||
%(goods_money)s,
|
||||
%(table_charge_money)s,
|
||||
%(service_money)s,
|
||||
%(coupon_amount)s,
|
||||
%(order_remark)s,
|
||||
%(raw_data)s
|
||||
)
|
||||
ON CONFLICT (store_id, topup_id) DO UPDATE SET
|
||||
member_id = EXCLUDED.member_id,
|
||||
member_name = EXCLUDED.member_name,
|
||||
member_phone = EXCLUDED.member_phone,
|
||||
card_id = EXCLUDED.card_id,
|
||||
card_type_name = EXCLUDED.card_type_name,
|
||||
pay_amount = EXCLUDED.pay_amount,
|
||||
consume_money = EXCLUDED.consume_money,
|
||||
settle_status = EXCLUDED.settle_status,
|
||||
settle_type = EXCLUDED.settle_type,
|
||||
settle_name = EXCLUDED.settle_name,
|
||||
settle_relate_id = EXCLUDED.settle_relate_id,
|
||||
pay_time = EXCLUDED.pay_time,
|
||||
create_time = EXCLUDED.create_time,
|
||||
operator_id = EXCLUDED.operator_id,
|
||||
operator_name = EXCLUDED.operator_name,
|
||||
payment_method = EXCLUDED.payment_method,
|
||||
refund_amount = EXCLUDED.refund_amount,
|
||||
cash_amount = EXCLUDED.cash_amount,
|
||||
card_amount = EXCLUDED.card_amount,
|
||||
balance_amount = EXCLUDED.balance_amount,
|
||||
online_amount = EXCLUDED.online_amount,
|
||||
rounding_amount = EXCLUDED.rounding_amount,
|
||||
adjust_amount = EXCLUDED.adjust_amount,
|
||||
goods_money = EXCLUDED.goods_money,
|
||||
table_charge_money = EXCLUDED.table_charge_money,
|
||||
service_money = EXCLUDED.service_money,
|
||||
coupon_amount = EXCLUDED.coupon_amount,
|
||||
order_remark = EXCLUDED.order_remark,
|
||||
raw_data = EXCLUDED.raw_data,
|
||||
updated_at = now()
|
||||
RETURNING (xmax = 0) AS inserted
|
||||
"""
|
||||
|
||||
inserted, updated = self.db.batch_upsert_with_returning(
|
||||
sql, records, page_size=self._batch_size()
|
||||
)
|
||||
return (inserted, updated, 0)
|
||||
@@ -3,6 +3,20 @@
|
||||
from tasks.orders_task import OrdersTask
|
||||
from tasks.payments_task import PaymentsTask
|
||||
from tasks.members_task import MembersTask
|
||||
<<<<<<< HEAD
|
||||
=======
|
||||
from tasks.products_task import ProductsTask
|
||||
from tasks.tables_task import TablesTask
|
||||
from tasks.assistants_task import AssistantsTask
|
||||
from tasks.packages_task import PackagesDefTask
|
||||
from tasks.refunds_task import RefundsTask
|
||||
from tasks.coupon_usage_task import CouponUsageTask
|
||||
from tasks.inventory_change_task import InventoryChangeTask
|
||||
from tasks.topups_task import TopupsTask
|
||||
from tasks.table_discount_task import TableDiscountTask
|
||||
from tasks.assistant_abolish_task import AssistantAbolishTask
|
||||
from tasks.ledger_task import LedgerTask
|
||||
>>>>>>> main
|
||||
|
||||
class TaskRegistry:
|
||||
"""任务注册和工厂"""
|
||||
@@ -30,7 +44,24 @@ class TaskRegistry:
|
||||
|
||||
# 默认注册表
|
||||
default_registry = TaskRegistry()
|
||||
<<<<<<< HEAD
|
||||
default_registry.register("ORDERS", OrdersTask)
|
||||
default_registry.register("PAYMENTS", PaymentsTask)
|
||||
default_registry.register("MEMBERS", MembersTask)
|
||||
# 可以继续注册其他任务...
|
||||
=======
|
||||
default_registry.register("PRODUCTS", ProductsTask)
|
||||
default_registry.register("TABLES", TablesTask)
|
||||
default_registry.register("MEMBERS", MembersTask)
|
||||
default_registry.register("ASSISTANTS", AssistantsTask)
|
||||
default_registry.register("PACKAGES_DEF", PackagesDefTask)
|
||||
default_registry.register("ORDERS", OrdersTask)
|
||||
default_registry.register("PAYMENTS", PaymentsTask)
|
||||
default_registry.register("REFUNDS", RefundsTask)
|
||||
default_registry.register("COUPON_USAGE", CouponUsageTask)
|
||||
default_registry.register("INVENTORY_CHANGE", InventoryChangeTask)
|
||||
default_registry.register("TOPUPS", TopupsTask)
|
||||
default_registry.register("TABLE_DISCOUNT", TableDiscountTask)
|
||||
default_registry.register("ASSISTANT_ABOLISH", AssistantAbolishTask)
|
||||
default_registry.register("LEDGER", LedgerTask)
|
||||
>>>>>>> main
|
||||
|
||||
196
etl_billiards/scripts/run_tests.py
Normal file
196
etl_billiards/scripts/run_tests.py
Normal file
@@ -0,0 +1,196 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
灵活的测试执行脚本,可像搭积木一样组合不同参数或预置命令(模式/数据库/归档路径等),
|
||||
直接运行本文件即可触发 pytest。
|
||||
|
||||
示例:
|
||||
python scripts/run_tests.py --suite online --mode ONLINE --keyword ORDERS
|
||||
python scripts/run_tests.py --preset offline_realdb
|
||||
python scripts/run_tests.py --suite online offline --db-dsn ... --json-archive tmp/archives
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import importlib.util
|
||||
import os
|
||||
import shlex
|
||||
import sys
|
||||
from typing import Dict, List
|
||||
|
||||
import pytest
|
||||
|
||||
PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), ".."))
|
||||
|
||||
SUITE_MAP: Dict[str, str] = {
|
||||
"online": "tests/unit/test_etl_tasks_online.py",
|
||||
"offline": "tests/unit/test_etl_tasks_offline.py",
|
||||
"integration": "tests/integration/test_database.py",
|
||||
}
|
||||
|
||||
PRESETS: Dict[str, Dict] = {}
|
||||
|
||||
|
||||
def _load_presets():
|
||||
preset_path = os.path.join(os.path.dirname(__file__), "test_presets.py")
|
||||
if not os.path.exists(preset_path):
|
||||
return
|
||||
spec = importlib.util.spec_from_file_location("test_presets", preset_path)
|
||||
if not spec or not spec.loader:
|
||||
return
|
||||
module = importlib.util.module_from_spec(spec)
|
||||
spec.loader.exec_module(module) # type: ignore[attr-defined]
|
||||
presets = getattr(module, "PRESETS", {})
|
||||
if isinstance(presets, dict):
|
||||
PRESETS.update(presets)
|
||||
|
||||
|
||||
_load_presets()
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="ETL 测试执行器(支持参数化调配)")
|
||||
parser.add_argument(
|
||||
"--suite",
|
||||
choices=sorted(SUITE_MAP.keys()),
|
||||
nargs="+",
|
||||
help="预置测试套件,可多选(默认全部 online/offline)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--tests",
|
||||
nargs="+",
|
||||
help="自定义测试路径(可与 --suite 混用),例如 tests/unit/test_config.py",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--mode",
|
||||
choices=["ONLINE", "OFFLINE"],
|
||||
help="覆盖 TEST_MODE(默认沿用 .env / 环境变量)",
|
||||
)
|
||||
parser.add_argument("--db-dsn", help="设置 TEST_DB_DSN,连接真实数据库进行测试")
|
||||
parser.add_argument("--json-archive", help="设置 TEST_JSON_ARCHIVE_DIR(离线档案目录)")
|
||||
parser.add_argument("--json-temp", help="设置 TEST_JSON_TEMP_DIR(临时 JSON 路径)")
|
||||
parser.add_argument(
|
||||
"--keyword",
|
||||
"-k",
|
||||
help="pytest -k 关键字过滤(例如 ORDERS,只运行包含该字符串的用例)",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--pytest-args",
|
||||
help="附加 pytest 参数,格式与命令行一致(例如 \"-vv --maxfail=1\")",
|
||||
)
|
||||
parser.add_argument(
|
||||
"--env",
|
||||
action="append",
|
||||
metavar="KEY=VALUE",
|
||||
help="自定义环境变量,可重复传入,例如 --env STORE_ID=123",
|
||||
)
|
||||
parser.add_argument("--preset", choices=sorted(PRESETS.keys()) if PRESETS else None, nargs="+",
|
||||
help="从 scripts/test_presets.py 中选择一个或多个组合命令")
|
||||
parser.add_argument("--list-presets", action="store_true", help="列出可用预置命令后退出")
|
||||
parser.add_argument("--dry-run", action="store_true", help="仅打印将要执行的命令与环境,不真正运行 pytest")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def apply_presets_to_args(args: argparse.Namespace):
|
||||
if not args.preset:
|
||||
return
|
||||
for name in args.preset:
|
||||
preset = PRESETS.get(name, {})
|
||||
if not preset:
|
||||
continue
|
||||
for key, value in preset.items():
|
||||
if key in ("suite", "tests"):
|
||||
if not value:
|
||||
continue
|
||||
existing = getattr(args, key)
|
||||
if existing is None:
|
||||
setattr(args, key, list(value))
|
||||
else:
|
||||
existing.extend(value)
|
||||
elif key == "env":
|
||||
args.env = (args.env or []) + list(value)
|
||||
elif key == "pytest_args":
|
||||
args.pytest_args = " ".join(filter(None, [value, args.pytest_args or ""]))
|
||||
elif key == "keyword":
|
||||
if args.keyword is None:
|
||||
args.keyword = value
|
||||
else:
|
||||
if getattr(args, key, None) is None:
|
||||
setattr(args, key, value)
|
||||
|
||||
|
||||
def apply_env(args: argparse.Namespace) -> Dict[str, str]:
|
||||
env_updates = {}
|
||||
if args.mode:
|
||||
env_updates["TEST_MODE"] = args.mode
|
||||
if args.db_dsn:
|
||||
env_updates["TEST_DB_DSN"] = args.db_dsn
|
||||
if args.json_archive:
|
||||
env_updates["TEST_JSON_ARCHIVE_DIR"] = args.json_archive
|
||||
if args.json_temp:
|
||||
env_updates["TEST_JSON_TEMP_DIR"] = args.json_temp
|
||||
if args.env:
|
||||
for item in args.env:
|
||||
if "=" not in item:
|
||||
raise SystemExit(f"--env 参数格式错误: {item!r},应为 KEY=VALUE")
|
||||
key, value = item.split("=", 1)
|
||||
env_updates[key.strip()] = value.strip()
|
||||
|
||||
for key, value in env_updates.items():
|
||||
os.environ[key] = value
|
||||
return env_updates
|
||||
|
||||
|
||||
def build_pytest_args(args: argparse.Namespace) -> List[str]:
|
||||
targets: List[str] = []
|
||||
if args.suite:
|
||||
for suite in args.suite:
|
||||
targets.append(SUITE_MAP[suite])
|
||||
if args.tests:
|
||||
targets.extend(args.tests)
|
||||
if not targets:
|
||||
# 默认跑 online + offline 套件
|
||||
targets = [SUITE_MAP["online"], SUITE_MAP["offline"]]
|
||||
|
||||
pytest_args: List[str] = targets
|
||||
if args.keyword:
|
||||
pytest_args += ["-k", args.keyword]
|
||||
if args.pytest_args:
|
||||
pytest_args += shlex.split(args.pytest_args)
|
||||
return pytest_args
|
||||
|
||||
|
||||
def main() -> int:
|
||||
os.chdir(PROJECT_ROOT)
|
||||
args = parse_args()
|
||||
if args.list_presets:
|
||||
print("可用预置命令:")
|
||||
if not PRESETS:
|
||||
print("(暂无,可编辑 scripts/test_presets.py 添加)")
|
||||
else:
|
||||
for name in sorted(PRESETS):
|
||||
print(f"- {name}")
|
||||
return 0
|
||||
|
||||
apply_presets_to_args(args)
|
||||
env_updates = apply_env(args)
|
||||
pytest_args = build_pytest_args(args)
|
||||
|
||||
print("=== 环境变量覆盖 ===")
|
||||
if env_updates:
|
||||
for k, v in env_updates.items():
|
||||
print(f"{k}={v}")
|
||||
else:
|
||||
print("(无覆盖,沿用系统默认)")
|
||||
print("\n=== Pytest 参数 ===")
|
||||
print(" ".join(pytest_args))
|
||||
print()
|
||||
|
||||
if args.dry_run:
|
||||
print("Dry-run 模式,未真正执行 pytest")
|
||||
return 0
|
||||
|
||||
exit_code = pytest.main(pytest_args)
|
||||
return int(exit_code)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
sys.exit(main())
|
||||
166
etl_billiards/scripts/test_presets.py
Normal file
166
etl_billiards/scripts/test_presets.py
Normal file
@@ -0,0 +1,166 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""测试命令“仓库”,集中维护 run_tests.py 的预置组合。
|
||||
|
||||
支持的参数键(可在 PRESETS 中自由组合):
|
||||
|
||||
1. suite
|
||||
- 类型:列表
|
||||
- 作用:引用 run_tests 中的预置套件,值可为 online / offline / integration 等。
|
||||
- 用法:["online"] 仅跑在线模式;["online","offline"] 同时跑两套;["integration"] 跑数据库集成测试。
|
||||
|
||||
2. tests
|
||||
- 类型:列表
|
||||
- 作用:传入任意 pytest 目标路径,适合补充临时/自定义测试文件。
|
||||
- 用法:["tests/unit/test_config.py","tests/unit/test_parsers.py"]。
|
||||
|
||||
3. mode
|
||||
- 类型:字符串
|
||||
- 取值:ONLINE 或 OFFLINE。
|
||||
- 作用:覆盖 TEST_MODE;ONLINE 走 API 全流程,OFFLINE 读取 JSON 归档执行 T+L。
|
||||
|
||||
4. db_dsn
|
||||
- 类型:字符串
|
||||
- 作用:设置 TEST_DB_DSN,指定真实 PostgreSQL 连接;缺省时测试引擎使用伪 DB,仅记录写入不触库。
|
||||
- 示例:postgresql://user:pwd@localhost:5432/testdb。
|
||||
|
||||
5. json_archive / json_temp
|
||||
- 类型:字符串
|
||||
- 作用:离线模式的 JSON 归档目录 / 临时输出目录。
|
||||
- 说明:不设置时沿用 .env 或默认值;仅在 OFFLINE 模式需要关注。
|
||||
|
||||
6. keyword
|
||||
- 类型:字符串
|
||||
- 作用:等价 pytest -k,用于筛选测试名/节点。
|
||||
- 示例:"ORDERS" 可只运行包含该关键字的测试函数。
|
||||
|
||||
7. pytest_args
|
||||
- 类型:字符串
|
||||
- 作用:附加 pytest 命令行参数。
|
||||
- 示例:"-vv --maxfail=1 --disable-warnings"。
|
||||
|
||||
8. env
|
||||
- 类型:列表
|
||||
- 作用:追加环境变量,形如 ["STORE_ID=123","API_TOKEN=xxx"],会在 run_tests 内透传给 os.environ。
|
||||
|
||||
9. preset_meta
|
||||
- 类型:字符串
|
||||
- 作用:纯注释信息,便于描述该预置组合的用途,不会传递给 run_tests。
|
||||
|
||||
运行方式建议直接 F5(或 `python scripts/test_presets.py`),脚本将读取 AUTO_RUN_PRESETS 中的配置依次执行。
|
||||
如需临时指定其它预置,可传入 `--preset xxx`;`--list` 用于查看所有参数说明和预置详情。
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import os
|
||||
import subprocess
|
||||
import sys
|
||||
from typing import List
|
||||
|
||||
RUN_TESTS_SCRIPT = os.path.join(os.path.dirname(__file__), "run_tests.py")
|
||||
|
||||
AUTO_RUN_PRESETS = ["online_orders"]
|
||||
|
||||
# PRESETS = {
|
||||
# "online_orders": {
|
||||
# "suite": ["online"],
|
||||
# "mode": "ONLINE",
|
||||
# "keyword": "ORDERS",
|
||||
# "pytest_args": "-vv",
|
||||
# "preset_meta": "在线模式,仅跑订单任务,输出更详细日志",
|
||||
# },
|
||||
# "offline_realdb": {
|
||||
# "suite": ["offline"],
|
||||
# "mode": "OFFLINE",
|
||||
# "db_dsn": "postgresql://user:pwd@localhost:5432/testdb",
|
||||
# "json_archive": "tests/testdata_json",
|
||||
# "preset_meta": "离线模式 + 真实测试库,用预置 JSON 回放全量任务",
|
||||
# },
|
||||
# "integration_db": {
|
||||
# "suite": ["integration"],
|
||||
# "db_dsn": "postgresql://user:pwd@localhost:5432/testdb",
|
||||
# "preset_meta": "仅跑数据库连接/操作相关的集成测试",
|
||||
# },
|
||||
# }
|
||||
|
||||
PRESETS = {
|
||||
"offline_realdb": {
|
||||
"suite": ["offline"],
|
||||
"mode": "OFFLINE",
|
||||
"db_dsn": "postgresql://local-Python:Neo-local-1991125@100.64.0.4:5432/LLZQ-test",
|
||||
"json_archive": "tests/testdata_json",
|
||||
"preset_meta": "离线模式 + 真实测试库,用预置 JSON 回放全量任务",
|
||||
},
|
||||
}
|
||||
|
||||
|
||||
|
||||
def print_parameter_help():
|
||||
print("可用参数键说明:")
|
||||
print(" suite -> 预置测试套件列表,如 ['online','offline']")
|
||||
print(" tests -> 自定义测试文件路径列表")
|
||||
print(" mode -> TEST_MODE,ONLINE / OFFLINE")
|
||||
print(" db_dsn -> TEST_DB_DSN,连接真实 PostgreSQL")
|
||||
print(" json_archive -> TEST_JSON_ARCHIVE_DIR,离线 JSON 目录")
|
||||
print(" json_temp -> TEST_JSON_TEMP_DIR,离线临时目录")
|
||||
print(" keyword -> pytest -k 过滤关键字")
|
||||
print(" pytest_args -> 额外 pytest 参数(单个字符串)")
|
||||
print(" env -> 附加环境变量,形如 ['KEY=VALUE']")
|
||||
print(" preset_meta -> 注释说明,不会传给 run_tests")
|
||||
print()
|
||||
|
||||
|
||||
def print_presets():
|
||||
if not PRESETS:
|
||||
print("当前没有定义任何预置命令,可自行在 PRESETS 中添加。")
|
||||
return
|
||||
for idx, (name, payload) in enumerate(PRESETS.items(), start=1):
|
||||
comment = payload.get("preset_meta", "")
|
||||
print(f"{idx}. {name}")
|
||||
if comment:
|
||||
print(f" 说明: {comment}")
|
||||
for key, value in payload.items():
|
||||
if key == "preset_meta":
|
||||
continue
|
||||
print(f" {key}: {value}")
|
||||
print()
|
||||
|
||||
|
||||
def run_presets(preset_names: List[str], dry_run: bool):
|
||||
cmds = []
|
||||
for name in preset_names:
|
||||
cmd = [sys.executable, RUN_TESTS_SCRIPT, "--preset", name]
|
||||
cmds.append(cmd)
|
||||
|
||||
for cmd in cmds:
|
||||
printable = " ".join(cmd)
|
||||
if dry_run:
|
||||
print(f"[Dry-Run] {printable}")
|
||||
else:
|
||||
print(f"\n>>> 执行: {printable}")
|
||||
subprocess.run(cmd, check=False)
|
||||
|
||||
|
||||
def main():
|
||||
parser = argparse.ArgumentParser(description="测试预置仓库(在此集中配置并运行测试组合)")
|
||||
parser.add_argument("--preset", choices=sorted(PRESETS.keys()), nargs="+", help="直接指定要运行的预置命令")
|
||||
parser.add_argument("--list", action="store_true", help="仅列出参数键和所有预置命令")
|
||||
parser.add_argument("--dry-run", action="store_true", help="仅打印将要执行的命令,而不真正运行")
|
||||
args = parser.parse_args()
|
||||
|
||||
if args.list:
|
||||
print_parameter_help()
|
||||
print_presets()
|
||||
return
|
||||
|
||||
if args.preset:
|
||||
target = args.preset
|
||||
else:
|
||||
target = AUTO_RUN_PRESETS or list(PRESETS.keys())
|
||||
|
||||
run_presets(target, dry_run=args.dry_run)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
83
etl_billiards/tasks/assistant_abolish_task.py
Normal file
83
etl_billiards/tasks/assistant_abolish_task.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教作废任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.assistant_abolish import AssistantAbolishLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class AssistantAbolishTask(BaseTask):
|
||||
"""同步助教作废记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "ASSISTANT_ABOLISH"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 ASSISTANT_ABOLISH 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Assistant/AbolishList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "abolitionAssistants"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_record(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = AssistantAbolishLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_records(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"ASSISTANT_ABOLISH 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("ASSISTANT_ABOLISH 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_record(self, raw: dict) -> dict | None:
|
||||
abolish_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not abolish_id:
|
||||
self.logger.warning("跳过缺少 id 的助教作废记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"abolish_id": abolish_id,
|
||||
"table_id": TypeParser.parse_int(raw.get("tableId")),
|
||||
"table_name": raw.get("tableName"),
|
||||
"table_area_id": TypeParser.parse_int(raw.get("tableAreaId")),
|
||||
"table_area": raw.get("tableArea"),
|
||||
"assistant_no": raw.get("assistantOn"),
|
||||
"assistant_name": raw.get("assistantName"),
|
||||
"charge_minutes": TypeParser.parse_int(raw.get("pdChargeMinutes")),
|
||||
"abolish_amount": TypeParser.parse_decimal(
|
||||
raw.get("assistantAbolishAmount")
|
||||
),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("createTime") or raw.get("create_time"), self.tz
|
||||
),
|
||||
"trash_reason": raw.get("trashReason"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
103
etl_billiards/tasks/assistants_task.py
Normal file
103
etl_billiards/tasks/assistants_task.py
Normal file
@@ -0,0 +1,103 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教账号任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.dimensions.assistant import AssistantLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class AssistantsTask(BaseTask):
|
||||
"""同步助教账号资料"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "ASSISTANTS"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 ASSISTANTS 任务")
|
||||
params = {"storeId": self.config.get("app.store_id")}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Assistant/List",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "assistantInfos"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_assistant(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = AssistantLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_assistants(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"ASSISTANTS 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("ASSISTANTS 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_assistant(self, raw: dict) -> dict | None:
|
||||
assistant_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not assistant_id:
|
||||
self.logger.warning("跳过缺少 id 的助教数据: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"assistant_id": assistant_id,
|
||||
"assistant_no": raw.get("assistant_no") or raw.get("assistantNo"),
|
||||
"nickname": raw.get("nickname"),
|
||||
"real_name": raw.get("real_name") or raw.get("realName"),
|
||||
"gender": raw.get("gender"),
|
||||
"mobile": raw.get("mobile"),
|
||||
"level": raw.get("level"),
|
||||
"team_id": TypeParser.parse_int(raw.get("team_id") or raw.get("teamId")),
|
||||
"team_name": raw.get("team_name"),
|
||||
"assistant_status": raw.get("assistant_status"),
|
||||
"work_status": raw.get("work_status"),
|
||||
"entry_time": TypeParser.parse_timestamp(
|
||||
raw.get("entry_time") or raw.get("entryTime"), self.tz
|
||||
),
|
||||
"resign_time": TypeParser.parse_timestamp(
|
||||
raw.get("resign_time") or raw.get("resignTime"), self.tz
|
||||
),
|
||||
"start_time": TypeParser.parse_timestamp(
|
||||
raw.get("start_time") or raw.get("startTime"), self.tz
|
||||
),
|
||||
"end_time": TypeParser.parse_timestamp(
|
||||
raw.get("end_time") or raw.get("endTime"), self.tz
|
||||
),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"update_time": TypeParser.parse_timestamp(
|
||||
raw.get("update_time") or raw.get("updateTime"), self.tz
|
||||
),
|
||||
"system_role_id": raw.get("system_role_id"),
|
||||
"online_status": raw.get("online_status"),
|
||||
"allow_cx": raw.get("allow_cx"),
|
||||
"charge_way": raw.get("charge_way"),
|
||||
"pd_unit_price": TypeParser.parse_decimal(raw.get("pd_unit_price")),
|
||||
"cx_unit_price": TypeParser.parse_decimal(raw.get("cx_unit_price")),
|
||||
"is_guaranteed": raw.get("is_guaranteed"),
|
||||
"is_team_leader": raw.get("is_team_leader"),
|
||||
"serial_number": raw.get("serial_number"),
|
||||
"show_sort": raw.get("show_sort"),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
92
etl_billiards/tasks/coupon_usage_task.py
Normal file
92
etl_billiards/tasks/coupon_usage_task.py
Normal file
@@ -0,0 +1,92 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""平台券核销任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.coupon_usage import CouponUsageLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class CouponUsageTask(BaseTask):
|
||||
"""同步平台券验证/核销记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "COUPON_USAGE"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 COUPON_USAGE 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Coupon/UsageList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=(),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_usage(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = CouponUsageLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_coupon_usage(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"COUPON_USAGE 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("COUPON_USAGE 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_usage(self, raw: dict) -> dict | None:
|
||||
usage_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not usage_id:
|
||||
self.logger.warning("跳过缺少 id 的券核销记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"usage_id": usage_id,
|
||||
"coupon_code": raw.get("coupon_code"),
|
||||
"coupon_channel": raw.get("coupon_channel"),
|
||||
"coupon_name": raw.get("coupon_name"),
|
||||
"sale_price": TypeParser.parse_decimal(raw.get("sale_price")),
|
||||
"coupon_money": TypeParser.parse_decimal(raw.get("coupon_money")),
|
||||
"coupon_free_time": TypeParser.parse_int(raw.get("coupon_free_time")),
|
||||
"use_status": raw.get("use_status"),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"consume_time": TypeParser.parse_timestamp(
|
||||
raw.get("consume_time") or raw.get("consumeTime"), self.tz
|
||||
),
|
||||
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
|
||||
"operator_name": raw.get("operator_name"),
|
||||
"table_id": TypeParser.parse_int(raw.get("table_id")),
|
||||
"site_order_id": TypeParser.parse_int(raw.get("site_order_id")),
|
||||
"group_package_id": TypeParser.parse_int(raw.get("group_package_id")),
|
||||
"coupon_remark": raw.get("coupon_remark"),
|
||||
"deal_id": raw.get("deal_id"),
|
||||
"certificate_id": raw.get("certificate_id"),
|
||||
"verify_id": raw.get("verify_id"),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
90
etl_billiards/tasks/inventory_change_task.py
Normal file
90
etl_billiards/tasks/inventory_change_task.py
Normal file
@@ -0,0 +1,90 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""库存变更任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.inventory_change import InventoryChangeLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class InventoryChangeTask(BaseTask):
|
||||
"""同步库存变化记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "INVENTORY_CHANGE"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 INVENTORY_CHANGE 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Inventory/ChangeList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "queryDeliveryRecordsList"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_change(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = InventoryChangeLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_changes(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"INVENTORY_CHANGE 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("INVENTORY_CHANGE 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_change(self, raw: dict) -> dict | None:
|
||||
change_id = TypeParser.parse_int(
|
||||
raw.get("siteGoodsStockId") or raw.get("site_goods_stock_id")
|
||||
)
|
||||
if not change_id:
|
||||
self.logger.warning("跳过缺少变动 id 的库存记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"change_id": change_id,
|
||||
"site_goods_id": TypeParser.parse_int(
|
||||
raw.get("siteGoodsId") or raw.get("site_goods_id")
|
||||
),
|
||||
"stock_type": raw.get("stockType") or raw.get("stock_type"),
|
||||
"goods_name": raw.get("goodsName"),
|
||||
"change_time": TypeParser.parse_timestamp(
|
||||
raw.get("createTime") or raw.get("create_time"), self.tz
|
||||
),
|
||||
"start_qty": TypeParser.parse_int(raw.get("startNum")),
|
||||
"end_qty": TypeParser.parse_int(raw.get("endNum")),
|
||||
"change_qty": TypeParser.parse_int(raw.get("changeNum")),
|
||||
"unit": raw.get("unit"),
|
||||
"price": TypeParser.parse_decimal(raw.get("price")),
|
||||
"operator_name": raw.get("operatorName"),
|
||||
"remark": raw.get("remark"),
|
||||
"goods_category_id": TypeParser.parse_int(raw.get("goodsCategoryId")),
|
||||
"goods_second_category_id": TypeParser.parse_int(
|
||||
raw.get("goodsSecondCategoryId")
|
||||
),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
119
etl_billiards/tasks/ledger_task.py
Normal file
119
etl_billiards/tasks/ledger_task.py
Normal file
@@ -0,0 +1,119 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教流水任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.assistant_ledger import AssistantLedgerLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class LedgerTask(BaseTask):
|
||||
"""同步助教服务台账"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "LEDGER"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 LEDGER 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Assistant/LedgerList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "orderAssistantDetails"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_ledger(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = AssistantLedgerLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_ledgers(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"LEDGER 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("LEDGER 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_ledger(self, raw: dict) -> dict | None:
|
||||
ledger_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not ledger_id:
|
||||
self.logger.warning("跳过缺少 id 的助教流水: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"ledger_id": ledger_id,
|
||||
"assistant_no": raw.get("assistantNo"),
|
||||
"assistant_name": raw.get("assistantName"),
|
||||
"nickname": raw.get("nickname"),
|
||||
"level_name": raw.get("levelName"),
|
||||
"table_name": raw.get("tableName"),
|
||||
"ledger_unit_price": TypeParser.parse_decimal(raw.get("ledger_unit_price")),
|
||||
"ledger_count": TypeParser.parse_int(raw.get("ledger_count")),
|
||||
"ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")),
|
||||
"projected_income": TypeParser.parse_decimal(raw.get("projected_income")),
|
||||
"service_money": TypeParser.parse_decimal(raw.get("service_money")),
|
||||
"member_discount_amount": TypeParser.parse_decimal(
|
||||
raw.get("member_discount_amount")
|
||||
),
|
||||
"manual_discount_amount": TypeParser.parse_decimal(
|
||||
raw.get("manual_discount_amount")
|
||||
),
|
||||
"coupon_deduct_money": TypeParser.parse_decimal(
|
||||
raw.get("coupon_deduct_money")
|
||||
),
|
||||
"order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")),
|
||||
"order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")),
|
||||
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
|
||||
"operator_name": raw.get("operator_name"),
|
||||
"assistant_team_id": TypeParser.parse_int(raw.get("assistant_team_id")),
|
||||
"assistant_level": raw.get("assistant_level"),
|
||||
"site_table_id": TypeParser.parse_int(raw.get("site_table_id")),
|
||||
"order_assistant_id": TypeParser.parse_int(raw.get("order_assistant_id")),
|
||||
"site_assistant_id": TypeParser.parse_int(raw.get("site_assistant_id")),
|
||||
"user_id": TypeParser.parse_int(raw.get("user_id")),
|
||||
"ledger_start_time": TypeParser.parse_timestamp(
|
||||
raw.get("ledger_start_time"), self.tz
|
||||
),
|
||||
"ledger_end_time": TypeParser.parse_timestamp(
|
||||
raw.get("ledger_end_time"), self.tz
|
||||
),
|
||||
"start_use_time": TypeParser.parse_timestamp(
|
||||
raw.get("start_use_time"), self.tz
|
||||
),
|
||||
"last_use_time": TypeParser.parse_timestamp(
|
||||
raw.get("last_use_time"), self.tz
|
||||
),
|
||||
"income_seconds": TypeParser.parse_int(raw.get("income_seconds")),
|
||||
"real_use_seconds": TypeParser.parse_int(raw.get("real_use_seconds")),
|
||||
"is_trash": raw.get("is_trash"),
|
||||
"trash_reason": raw.get("trash_reason"),
|
||||
"is_confirm": raw.get("is_confirm"),
|
||||
"ledger_status": raw.get("ledger_status"),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
91
etl_billiards/tasks/packages_task.py
Normal file
91
etl_billiards/tasks/packages_task.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""团购/套餐定义任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.dimensions.package import PackageDefinitionLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class PackagesDefTask(BaseTask):
|
||||
"""同步团购套餐定义"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "PACKAGES_DEF"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 PACKAGES_DEF 任务")
|
||||
params = {"storeId": self.config.get("app.store_id")}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Package/List",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "packageCouponList"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_package(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = PackageDefinitionLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_packages(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"PACKAGES_DEF 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("PACKAGES_DEF 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_package(self, raw: dict) -> dict | None:
|
||||
package_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not package_id:
|
||||
self.logger.warning("跳过缺少 id 的套餐数据: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"package_id": package_id,
|
||||
"package_code": raw.get("package_id") or raw.get("packageId"),
|
||||
"package_name": raw.get("package_name"),
|
||||
"table_area_id": raw.get("table_area_id"),
|
||||
"table_area_name": raw.get("table_area_name"),
|
||||
"selling_price": TypeParser.parse_decimal(
|
||||
raw.get("selling_price") or raw.get("sellingPrice")
|
||||
),
|
||||
"duration_seconds": TypeParser.parse_int(raw.get("duration")),
|
||||
"start_time": TypeParser.parse_timestamp(
|
||||
raw.get("start_time") or raw.get("startTime"), self.tz
|
||||
),
|
||||
"end_time": TypeParser.parse_timestamp(
|
||||
raw.get("end_time") or raw.get("endTime"), self.tz
|
||||
),
|
||||
"type": raw.get("type"),
|
||||
"is_enabled": raw.get("is_enabled"),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"usable_count": TypeParser.parse_int(raw.get("usable_count")),
|
||||
"creator_name": raw.get("creator_name"),
|
||||
"date_type": raw.get("date_type"),
|
||||
"group_type": raw.get("group_type"),
|
||||
"coupon_money": TypeParser.parse_decimal(
|
||||
raw.get("coupon_money") or raw.get("couponMoney")
|
||||
),
|
||||
"area_tag_type": raw.get("area_tag_type"),
|
||||
"system_group_type": raw.get("system_group_type"),
|
||||
"card_type_ids": raw.get("card_type_ids"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
91
etl_billiards/tasks/refunds_task.py
Normal file
91
etl_billiards/tasks/refunds_task.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""退款记录任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.refund import RefundLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class RefundsTask(BaseTask):
|
||||
"""同步支付退款流水"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "REFUNDS"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 REFUNDS 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Pay/RefundList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=(),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_refund(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = RefundLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_refunds(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"REFUNDS 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("REFUNDS 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_refund(self, raw: dict) -> dict | None:
|
||||
refund_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not refund_id:
|
||||
self.logger.warning("跳过缺少 id 的退款记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"refund_id": refund_id,
|
||||
"site_id": TypeParser.parse_int(raw.get("site_id") or raw.get("siteId")),
|
||||
"tenant_id": TypeParser.parse_int(raw.get("tenant_id") or raw.get("tenantId")),
|
||||
"pay_amount": TypeParser.parse_decimal(raw.get("pay_amount")),
|
||||
"pay_status": raw.get("pay_status"),
|
||||
"pay_time": TypeParser.parse_timestamp(
|
||||
raw.get("pay_time") or raw.get("payTime"), self.tz
|
||||
),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"relate_type": raw.get("relate_type"),
|
||||
"relate_id": TypeParser.parse_int(raw.get("relate_id")),
|
||||
"payment_method": raw.get("payment_method"),
|
||||
"refund_amount": TypeParser.parse_decimal(raw.get("refund_amount")),
|
||||
"action_type": raw.get("action_type"),
|
||||
"pay_terminal": raw.get("pay_terminal"),
|
||||
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
|
||||
"channel_pay_no": raw.get("channel_pay_no"),
|
||||
"channel_fee": TypeParser.parse_decimal(raw.get("channel_fee")),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"member_id": TypeParser.parse_int(raw.get("member_id")),
|
||||
"member_card_id": TypeParser.parse_int(raw.get("member_card_id")),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
92
etl_billiards/tasks/table_discount_task.py
Normal file
92
etl_billiards/tasks/table_discount_task.py
Normal file
@@ -0,0 +1,92 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""台费折扣任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.table_discount import TableDiscountLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class TableDiscountTask(BaseTask):
|
||||
"""同步台费折扣/调价记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "TABLE_DISCOUNT"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 TABLE_DISCOUNT 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Table/AdjustList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "taiFeeAdjustInfos"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_discount(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = TableDiscountLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_discounts(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"TABLE_DISCOUNT 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("TABLE_DISCOUNT 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_discount(self, raw: dict) -> dict | None:
|
||||
discount_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not discount_id:
|
||||
self.logger.warning("跳过缺少 id 的台费折扣记录: %s", raw)
|
||||
return None
|
||||
|
||||
table_profile = raw.get("tableProfile") or {}
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"discount_id": discount_id,
|
||||
"adjust_type": raw.get("adjust_type") or raw.get("adjustType"),
|
||||
"applicant_id": TypeParser.parse_int(raw.get("applicant_id")),
|
||||
"applicant_name": raw.get("applicant_name"),
|
||||
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
|
||||
"operator_name": raw.get("operator_name"),
|
||||
"ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")),
|
||||
"ledger_count": TypeParser.parse_int(raw.get("ledger_count")),
|
||||
"ledger_name": raw.get("ledger_name"),
|
||||
"ledger_status": raw.get("ledger_status"),
|
||||
"order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")),
|
||||
"order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")),
|
||||
"site_table_id": TypeParser.parse_int(
|
||||
raw.get("site_table_id") or table_profile.get("id")
|
||||
),
|
||||
"table_area_id": TypeParser.parse_int(
|
||||
raw.get("tableAreaId") or table_profile.get("site_table_area_id")
|
||||
),
|
||||
"table_area_name": table_profile.get("site_table_area_name"),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
@@ -1,4 +1,92 @@
|
||||
<<<<<<< HEAD
|
||||
class TablesTask(BaseTask):
|
||||
def get_task_code(self) -> str: # 返回 "TABLES"
|
||||
def execute(self) -> dict: # 拉取 /Table/GetSiteTables
|
||||
def _parse_table(self, raw: dict) -> dict | None:
|
||||
def _parse_table(self, raw: dict) -> dict | None:
|
||||
=======
|
||||
# -*- coding: utf-8 -*-
|
||||
"""台桌档案任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.dimensions.table import TableLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class TablesTask(BaseTask):
|
||||
"""同步门店台桌列表"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "TABLES"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 TABLES 任务")
|
||||
params = {"storeId": self.config.get("app.store_id")}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Table/GetSiteTables",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "siteTables"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_table(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = TableLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_tables(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"TABLES 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("TABLES 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_table(self, raw: dict) -> dict | None:
|
||||
table_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not table_id:
|
||||
self.logger.warning("跳过缺少 table_id 的台桌记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"table_id": table_id,
|
||||
"site_id": TypeParser.parse_int(raw.get("site_id") or raw.get("siteId")),
|
||||
"area_id": TypeParser.parse_int(
|
||||
raw.get("site_table_area_id") or raw.get("siteTableAreaId")
|
||||
),
|
||||
"area_name": raw.get("areaName") or raw.get("site_table_area_name"),
|
||||
"table_name": raw.get("table_name") or raw.get("tableName"),
|
||||
"table_price": TypeParser.parse_decimal(
|
||||
raw.get("table_price") or raw.get("tablePrice")
|
||||
),
|
||||
"table_status": raw.get("table_status") or raw.get("tableStatus"),
|
||||
"table_status_name": raw.get("tableStatusName"),
|
||||
"light_status": raw.get("light_status"),
|
||||
"is_rest_area": raw.get("is_rest_area"),
|
||||
"show_status": raw.get("show_status"),
|
||||
"virtual_table": raw.get("virtual_table"),
|
||||
"charge_free": raw.get("charge_free"),
|
||||
"only_allow_groupon": raw.get("only_allow_groupon"),
|
||||
"is_online_reservation": raw.get("is_online_reservation"),
|
||||
"created_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
>>>>>>> main
|
||||
|
||||
102
etl_billiards/tasks/topups_task.py
Normal file
102
etl_billiards/tasks/topups_task.py
Normal file
@@ -0,0 +1,102 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""充值记录任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.topup import TopupLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class TopupsTask(BaseTask):
|
||||
"""同步储值充值结算记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "TOPUPS"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 TOPUPS 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Topup/SettleList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "settleList"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_topup(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = TopupLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_topups(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"TOPUPS 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("TOPUPS 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_topup(self, raw: dict) -> dict | None:
|
||||
node = raw.get("settleList") if isinstance(raw.get("settleList"), dict) else raw
|
||||
topup_id = TypeParser.parse_int(node.get("id"))
|
||||
if not topup_id:
|
||||
self.logger.warning("跳过缺少 id 的充值结算: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"topup_id": topup_id,
|
||||
"member_id": TypeParser.parse_int(node.get("memberId")),
|
||||
"member_name": node.get("memberName"),
|
||||
"member_phone": node.get("memberPhone"),
|
||||
"card_id": TypeParser.parse_int(node.get("tenantMemberCardId")),
|
||||
"card_type_name": node.get("memberCardTypeName"),
|
||||
"pay_amount": TypeParser.parse_decimal(node.get("payAmount")),
|
||||
"consume_money": TypeParser.parse_decimal(node.get("consumeMoney")),
|
||||
"settle_status": node.get("settleStatus"),
|
||||
"settle_type": node.get("settleType"),
|
||||
"settle_name": node.get("settleName"),
|
||||
"settle_relate_id": TypeParser.parse_int(node.get("settleRelateId")),
|
||||
"pay_time": TypeParser.parse_timestamp(
|
||||
node.get("payTime") or node.get("pay_time"), self.tz
|
||||
),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
node.get("createTime") or node.get("create_time"), self.tz
|
||||
),
|
||||
"operator_id": TypeParser.parse_int(node.get("operatorId")),
|
||||
"operator_name": node.get("operatorName"),
|
||||
"payment_method": node.get("paymentMethod"),
|
||||
"refund_amount": TypeParser.parse_decimal(node.get("refundAmount")),
|
||||
"cash_amount": TypeParser.parse_decimal(node.get("cashAmount")),
|
||||
"card_amount": TypeParser.parse_decimal(node.get("cardAmount")),
|
||||
"balance_amount": TypeParser.parse_decimal(node.get("balanceAmount")),
|
||||
"online_amount": TypeParser.parse_decimal(node.get("onlineAmount")),
|
||||
"rounding_amount": TypeParser.parse_decimal(node.get("roundingAmount")),
|
||||
"adjust_amount": TypeParser.parse_decimal(node.get("adjustAmount")),
|
||||
"goods_money": TypeParser.parse_decimal(node.get("goodsMoney")),
|
||||
"table_charge_money": TypeParser.parse_decimal(node.get("tableChargeMoney")),
|
||||
"service_money": TypeParser.parse_decimal(node.get("serviceMoney")),
|
||||
"coupon_amount": TypeParser.parse_decimal(node.get("couponAmount")),
|
||||
"order_remark": node.get("orderRemark"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
638
etl_billiards/tests/unit/task_test_utils.py
Normal file
638
etl_billiards/tests/unit/task_test_utils.py
Normal file
@@ -0,0 +1,638 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""ETL 任务测试的共用辅助模块,涵盖在线/离线模式所需的伪造数据、客户端与配置等工具函数。"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
from contextlib import contextmanager
|
||||
from dataclasses import dataclass
|
||||
from pathlib import Path
|
||||
from typing import Dict, List, Sequence, Tuple, Type
|
||||
|
||||
from config.settings import AppConfig
|
||||
from database.connection import DatabaseConnection
|
||||
from database.operations import DatabaseOperations as PgDBOperations
|
||||
from tasks.assistant_abolish_task import AssistantAbolishTask
|
||||
from tasks.assistants_task import AssistantsTask
|
||||
from tasks.coupon_usage_task import CouponUsageTask
|
||||
from tasks.inventory_change_task import InventoryChangeTask
|
||||
from tasks.ledger_task import LedgerTask
|
||||
from tasks.members_task import MembersTask
|
||||
from tasks.orders_task import OrdersTask
|
||||
from tasks.packages_task import PackagesDefTask
|
||||
from tasks.payments_task import PaymentsTask
|
||||
from tasks.products_task import ProductsTask
|
||||
from tasks.refunds_task import RefundsTask
|
||||
from tasks.table_discount_task import TableDiscountTask
|
||||
from tasks.tables_task import TablesTask
|
||||
from tasks.topups_task import TopupsTask
|
||||
|
||||
DEFAULT_STORE_ID = 2790685415443269
|
||||
BASE_TS = "2025-01-01 10:00:00"
|
||||
END_TS = "2025-01-01 12:00:00"
|
||||
|
||||
|
||||
@dataclass(frozen=True)
|
||||
class TaskSpec:
|
||||
"""描述单个任务在测试中如何被驱动的元数据,包含任务代码、API 路径、数据路径与样例记录。"""
|
||||
|
||||
code: str
|
||||
task_cls: Type
|
||||
endpoint: str
|
||||
data_path: Tuple[str, ...]
|
||||
sample_records: List[Dict]
|
||||
|
||||
@property
|
||||
def archive_filename(self) -> str:
|
||||
return endpoint_to_filename(self.endpoint)
|
||||
|
||||
|
||||
def endpoint_to_filename(endpoint: str) -> str:
|
||||
"""根据 API endpoint 生成稳定可复用的文件名,便于离线模式在目录中直接定位归档 JSON。"""
|
||||
normalized = endpoint.strip("/").replace("/", "__").replace(" ", "_").lower()
|
||||
return f"{normalized or 'root'}.json"
|
||||
|
||||
|
||||
def wrap_records(records: List[Dict], data_path: Sequence[str]):
|
||||
"""按照 data_path 逐层包裹记录列表,使其结构与真实 API 返回体一致,方便离线回放。"""
|
||||
payload = records
|
||||
for key in reversed(data_path):
|
||||
payload = {key: payload}
|
||||
return payload
|
||||
|
||||
|
||||
def create_test_config(mode: str, archive_dir: Path, temp_dir: Path) -> AppConfig:
|
||||
"""构建一份适合测试的 AppConfig,自动填充存储、日志、归档目录等参数并保证目录存在。"""
|
||||
archive_dir = Path(archive_dir)
|
||||
temp_dir = Path(temp_dir)
|
||||
archive_dir.mkdir(parents=True, exist_ok=True)
|
||||
temp_dir.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
overrides = {
|
||||
"app": {"store_id": DEFAULT_STORE_ID, "timezone": "Asia/Taipei"},
|
||||
"db": {"dsn": "postgresql://user:pass@localhost:5432/etl_billiards_test"},
|
||||
"api": {
|
||||
"base_url": "https://api.example.com",
|
||||
"token": "test-token",
|
||||
"timeout_sec": 3,
|
||||
"page_size": 50,
|
||||
},
|
||||
"testing": {
|
||||
"mode": mode,
|
||||
"json_archive_dir": str(archive_dir),
|
||||
"temp_json_dir": str(temp_dir),
|
||||
},
|
||||
"io": {
|
||||
"export_root": str(temp_dir / "export"),
|
||||
"log_root": str(temp_dir / "logs"),
|
||||
},
|
||||
}
|
||||
return AppConfig.load(overrides)
|
||||
|
||||
|
||||
def dump_offline_payload(spec: TaskSpec, archive_dir: Path) -> Path:
|
||||
"""将 TaskSpec 的样例数据写入指定归档目录,供离线测试回放使用,并返回生成文件的完整路径。"""
|
||||
archive_dir = Path(archive_dir)
|
||||
payload = wrap_records(spec.sample_records, spec.data_path)
|
||||
file_path = archive_dir / spec.archive_filename
|
||||
with file_path.open("w", encoding="utf-8") as fp:
|
||||
json.dump(payload, fp, ensure_ascii=False)
|
||||
return file_path
|
||||
|
||||
|
||||
class FakeCursor:
|
||||
"""极简游标桩对象,记录 SQL/参数并支持上下文管理,供 FakeDBOperations 与 SCD2Handler 使用。"""
|
||||
|
||||
def __init__(self, recorder: List[Dict]):
|
||||
self.recorder = recorder
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def execute(self, sql: str, params=None):
|
||||
self.recorder.append({"sql": sql.strip(), "params": params})
|
||||
|
||||
def fetchone(self):
|
||||
return None
|
||||
|
||||
def __enter__(self):
|
||||
return self
|
||||
|
||||
def __exit__(self, exc_type, exc, tb):
|
||||
return False
|
||||
|
||||
|
||||
class FakeConnection:
|
||||
"""仿 psycopg 连接对象,仅满足 SCD2Handler 对 cursor 的最小需求,并缓存执行过的语句。"""
|
||||
|
||||
def __init__(self):
|
||||
self.statements: List[Dict] = []
|
||||
|
||||
def cursor(self):
|
||||
return FakeCursor(self.statements)
|
||||
|
||||
|
||||
class FakeDBOperations:
|
||||
"""拦截并记录批量 upsert/事务操作,避免触碰真实数据库,同时提供 commit/rollback 计数。"""
|
||||
|
||||
def __init__(self):
|
||||
self.upserts: List[Dict] = []
|
||||
self.commits = 0
|
||||
self.rollbacks = 0
|
||||
self.conn = FakeConnection()
|
||||
|
||||
def batch_upsert_with_returning(self, sql: str, rows: List[Dict], page_size: int = 1000):
|
||||
self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size})
|
||||
return len(rows), 0
|
||||
|
||||
def batch_execute(self, sql: str, rows: List[Dict], page_size: int = 1000):
|
||||
self.upserts.append({"sql": sql.strip(), "count": len(rows), "page_size": page_size})
|
||||
|
||||
def commit(self):
|
||||
self.commits += 1
|
||||
|
||||
def rollback(self):
|
||||
self.rollbacks += 1
|
||||
|
||||
|
||||
class FakeAPIClient:
|
||||
"""在线模式使用的伪 API Client,直接返回预置的内存数据并记录调用,以确保任务参数正确传递。"""
|
||||
|
||||
def __init__(self, data_map: Dict[str, List[Dict]]):
|
||||
self.data_map = data_map
|
||||
self.calls: List[Dict] = []
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get_paginated(self, endpoint: str, params=None, **kwargs):
|
||||
self.calls.append({"endpoint": endpoint, "params": params})
|
||||
if endpoint not in self.data_map:
|
||||
raise AssertionError(f"Missing fixture for endpoint {endpoint}")
|
||||
return list(self.data_map[endpoint]), [{"page": 1, "size": len(self.data_map[endpoint])}]
|
||||
|
||||
|
||||
class OfflineAPIClient:
|
||||
"""离线模式专用 API Client,根据 endpoint 读取归档 JSON、套用 data_path 并回放列表数据。"""
|
||||
|
||||
def __init__(self, file_map: Dict[str, Path]):
|
||||
self.file_map = {k: Path(v) for k, v in file_map.items()}
|
||||
self.calls: List[Dict] = []
|
||||
|
||||
# pylint: disable=unused-argument
|
||||
def get_paginated(self, endpoint: str, params=None, page_size: int = 200, data_path: Tuple[str, ...] = (), **kwargs):
|
||||
self.calls.append({"endpoint": endpoint, "params": params})
|
||||
if endpoint not in self.file_map:
|
||||
raise AssertionError(f"Missing archive for endpoint {endpoint}")
|
||||
|
||||
with self.file_map[endpoint].open("r", encoding="utf-8") as fp:
|
||||
payload = json.load(fp)
|
||||
|
||||
data = payload
|
||||
for key in data_path:
|
||||
if isinstance(data, dict):
|
||||
data = data.get(key, [])
|
||||
else:
|
||||
data = []
|
||||
break
|
||||
|
||||
if not isinstance(data, list):
|
||||
data = []
|
||||
|
||||
return data, [{"page": 1, "mode": "offline"}]
|
||||
|
||||
|
||||
class RealDBOperationsAdapter:
|
||||
"""连接真实 PostgreSQL 的适配器,为任务提供 batch_upsert + 事务能力。"""
|
||||
|
||||
def __init__(self, dsn: str):
|
||||
self._conn = DatabaseConnection(dsn)
|
||||
self._ops = PgDBOperations(self._conn)
|
||||
# SCD2Handler 会访问 db.conn.cursor(),因此暴露底层连接
|
||||
self.conn = self._conn.conn
|
||||
|
||||
def batch_upsert_with_returning(self, sql: str, rows: List[Dict], page_size: int = 1000):
|
||||
return self._ops.batch_upsert_with_returning(sql, rows, page_size=page_size)
|
||||
|
||||
def batch_execute(self, sql: str, rows: List[Dict], page_size: int = 1000):
|
||||
return self._ops.batch_execute(sql, rows, page_size=page_size)
|
||||
|
||||
def commit(self):
|
||||
self._conn.commit()
|
||||
|
||||
def rollback(self):
|
||||
self._conn.rollback()
|
||||
|
||||
def close(self):
|
||||
self._conn.close()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def get_db_operations():
|
||||
"""
|
||||
测试专用的 DB 操作上下文:
|
||||
- 若设置 TEST_DB_DSN,则连接真实 PostgreSQL;
|
||||
- 否则回退到 FakeDBOperations(内存桩)。
|
||||
"""
|
||||
dsn = os.environ.get("TEST_DB_DSN")
|
||||
if dsn:
|
||||
adapter = RealDBOperationsAdapter(dsn)
|
||||
try:
|
||||
yield adapter
|
||||
finally:
|
||||
adapter.close()
|
||||
else:
|
||||
fake = FakeDBOperations()
|
||||
yield fake
|
||||
|
||||
|
||||
TASK_SPECS: List[TaskSpec] = [
|
||||
TaskSpec(
|
||||
code="PRODUCTS",
|
||||
task_cls=ProductsTask,
|
||||
endpoint="/TenantGoods/QueryTenantGoods",
|
||||
data_path=("data",),
|
||||
sample_records=[
|
||||
{
|
||||
"siteGoodsId": 101,
|
||||
"tenantGoodsId": 101,
|
||||
"goodsName": "测试球杆",
|
||||
"goodsCategoryId": 201,
|
||||
"categoryName": "器材",
|
||||
"goodsCategorySecondId": 202,
|
||||
"goodsUnit": "支",
|
||||
"costPrice": "100.00",
|
||||
"goodsPrice": "150.00",
|
||||
"goodsState": "ON",
|
||||
"supplierId": 20,
|
||||
"barcode": "PRD001",
|
||||
"isCombo": False,
|
||||
"createTime": BASE_TS,
|
||||
"updateTime": END_TS,
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="TABLES",
|
||||
task_cls=TablesTask,
|
||||
endpoint="/Table/GetSiteTables",
|
||||
data_path=("data", "siteTables"),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 301,
|
||||
"site_id": 30,
|
||||
"site_table_area_id": 40,
|
||||
"areaName": "大厅",
|
||||
"table_name": "1号桌",
|
||||
"table_price": "50.00",
|
||||
"table_status": "FREE",
|
||||
"tableStatusName": "空闲",
|
||||
"light_status": "OFF",
|
||||
"is_rest_area": False,
|
||||
"show_status": True,
|
||||
"virtual_table": False,
|
||||
"charge_free": False,
|
||||
"only_allow_groupon": False,
|
||||
"is_online_reservation": True,
|
||||
"createTime": BASE_TS,
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="MEMBERS",
|
||||
task_cls=MembersTask,
|
||||
endpoint="/MemberProfile/GetTenantMemberList",
|
||||
data_path=("data",),
|
||||
sample_records=[
|
||||
{
|
||||
"memberId": 401,
|
||||
"memberName": "张三",
|
||||
"phone": "13800000000",
|
||||
"balance": "88.88",
|
||||
"status": "ACTIVE",
|
||||
"registerTime": BASE_TS,
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="ASSISTANTS",
|
||||
task_cls=AssistantsTask,
|
||||
endpoint="/Assistant/List",
|
||||
data_path=("data", "assistantInfos"),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 501,
|
||||
"assistant_no": "AS001",
|
||||
"nickname": "小李",
|
||||
"real_name": "李雷",
|
||||
"gender": "M",
|
||||
"mobile": "13900000000",
|
||||
"level": "A",
|
||||
"team_id": 10,
|
||||
"team_name": "先锋队",
|
||||
"assistant_status": "ON",
|
||||
"work_status": "BUSY",
|
||||
"entry_time": BASE_TS,
|
||||
"resign_time": END_TS,
|
||||
"start_time": BASE_TS,
|
||||
"end_time": END_TS,
|
||||
"create_time": BASE_TS,
|
||||
"update_time": END_TS,
|
||||
"system_role_id": 1,
|
||||
"online_status": "ONLINE",
|
||||
"allow_cx": True,
|
||||
"charge_way": "TIME",
|
||||
"pd_unit_price": "30.00",
|
||||
"cx_unit_price": "20.00",
|
||||
"is_guaranteed": True,
|
||||
"is_team_leader": False,
|
||||
"serial_number": "SN001",
|
||||
"show_sort": 1,
|
||||
"is_delete": False,
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="PACKAGES_DEF",
|
||||
task_cls=PackagesDefTask,
|
||||
endpoint="/Package/List",
|
||||
data_path=("data", "packageCouponList"),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 601,
|
||||
"package_id": "PKG001",
|
||||
"package_name": "白天特惠",
|
||||
"table_area_id": 70,
|
||||
"table_area_name": "大厅",
|
||||
"selling_price": "199.00",
|
||||
"duration": 120,
|
||||
"start_time": BASE_TS,
|
||||
"end_time": END_TS,
|
||||
"type": "Groupon",
|
||||
"is_enabled": True,
|
||||
"is_delete": False,
|
||||
"usable_count": 3,
|
||||
"creator_name": "系统",
|
||||
"date_type": "WEEKDAY",
|
||||
"group_type": "DINE_IN",
|
||||
"coupon_money": "30.00",
|
||||
"area_tag_type": "VIP",
|
||||
"system_group_type": "BASIC",
|
||||
"card_type_ids": "1,2,3",
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="ORDERS",
|
||||
task_cls=OrdersTask,
|
||||
endpoint="/order/list",
|
||||
data_path=("data",),
|
||||
sample_records=[
|
||||
{
|
||||
"orderId": 701,
|
||||
"orderNo": "ORD001",
|
||||
"memberId": 401,
|
||||
"tableId": 301,
|
||||
"orderTime": BASE_TS,
|
||||
"endTime": END_TS,
|
||||
"totalAmount": "300.00",
|
||||
"discountAmount": "20.00",
|
||||
"finalAmount": "280.00",
|
||||
"payStatus": "PAID",
|
||||
"orderStatus": "CLOSED",
|
||||
"remark": "测试订单",
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="PAYMENTS",
|
||||
task_cls=PaymentsTask,
|
||||
endpoint="/pay/records",
|
||||
data_path=("data",),
|
||||
sample_records=[
|
||||
{
|
||||
"payId": 801,
|
||||
"orderId": 701,
|
||||
"payTime": END_TS,
|
||||
"payAmount": "280.00",
|
||||
"payType": "CARD",
|
||||
"payStatus": "SUCCESS",
|
||||
"remark": "测试支付",
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="REFUNDS",
|
||||
task_cls=RefundsTask,
|
||||
endpoint="/Pay/RefundList",
|
||||
data_path=(),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 901,
|
||||
"site_id": 1,
|
||||
"tenant_id": 2,
|
||||
"pay_amount": "100.00",
|
||||
"pay_status": "SUCCESS",
|
||||
"pay_time": END_TS,
|
||||
"create_time": END_TS,
|
||||
"relate_type": "ORDER",
|
||||
"relate_id": 701,
|
||||
"payment_method": "CARD",
|
||||
"refund_amount": "20.00",
|
||||
"action_type": "PARTIAL",
|
||||
"pay_terminal": "POS",
|
||||
"operator_id": 11,
|
||||
"channel_pay_no": "CH001",
|
||||
"channel_fee": "1.00",
|
||||
"is_delete": False,
|
||||
"member_id": 401,
|
||||
"member_card_id": 501,
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="COUPON_USAGE",
|
||||
task_cls=CouponUsageTask,
|
||||
endpoint="/Coupon/UsageList",
|
||||
data_path=(),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 1001,
|
||||
"coupon_code": "CP001",
|
||||
"coupon_channel": "MEITUAN",
|
||||
"coupon_name": "双人券",
|
||||
"sale_price": "50.00",
|
||||
"coupon_money": "30.00",
|
||||
"coupon_free_time": 60,
|
||||
"use_status": "USED",
|
||||
"create_time": BASE_TS,
|
||||
"consume_time": END_TS,
|
||||
"operator_id": 11,
|
||||
"operator_name": "操作员",
|
||||
"table_id": 301,
|
||||
"site_order_id": 701,
|
||||
"group_package_id": 601,
|
||||
"coupon_remark": "备注",
|
||||
"deal_id": "DEAL001",
|
||||
"certificate_id": "CERT001",
|
||||
"verify_id": "VERIFY001",
|
||||
"is_delete": False,
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="INVENTORY_CHANGE",
|
||||
task_cls=InventoryChangeTask,
|
||||
endpoint="/Inventory/ChangeList",
|
||||
data_path=("data", "queryDeliveryRecordsList"),
|
||||
sample_records=[
|
||||
{
|
||||
"siteGoodsStockId": 1101,
|
||||
"siteGoodsId": 101,
|
||||
"stockType": "OUT",
|
||||
"goodsName": "测试球杆",
|
||||
"createTime": END_TS,
|
||||
"startNum": 10,
|
||||
"endNum": 8,
|
||||
"changeNum": -2,
|
||||
"unit": "支",
|
||||
"price": "120.00",
|
||||
"operatorName": "仓管",
|
||||
"remark": "测试出库",
|
||||
"goodsCategoryId": 201,
|
||||
"goodsSecondCategoryId": 202,
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="TOPUPS",
|
||||
task_cls=TopupsTask,
|
||||
endpoint="/Topup/SettleList",
|
||||
data_path=("data", "settleList"),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 1201,
|
||||
"memberId": 401,
|
||||
"memberName": "张三",
|
||||
"memberPhone": "13800000000",
|
||||
"tenantMemberCardId": 1301,
|
||||
"memberCardTypeName": "金卡",
|
||||
"payAmount": "500.00",
|
||||
"consumeMoney": "100.00",
|
||||
"settleStatus": "DONE",
|
||||
"settleType": "AUTO",
|
||||
"settleName": "日结",
|
||||
"settleRelateId": 1501,
|
||||
"payTime": BASE_TS,
|
||||
"createTime": END_TS,
|
||||
"operatorId": 11,
|
||||
"operatorName": "收银员",
|
||||
"paymentMethod": "CASH",
|
||||
"refundAmount": "0",
|
||||
"cashAmount": "500.00",
|
||||
"cardAmount": "0",
|
||||
"balanceAmount": "0",
|
||||
"onlineAmount": "0",
|
||||
"roundingAmount": "0",
|
||||
"adjustAmount": "0",
|
||||
"goodsMoney": "0",
|
||||
"tableChargeMoney": "0",
|
||||
"serviceMoney": "0",
|
||||
"couponAmount": "0",
|
||||
"orderRemark": "首次充值",
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="TABLE_DISCOUNT",
|
||||
task_cls=TableDiscountTask,
|
||||
endpoint="/Table/AdjustList",
|
||||
data_path=("data", "taiFeeAdjustInfos"),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 1301,
|
||||
"adjust_type": "DISCOUNT",
|
||||
"applicant_id": 11,
|
||||
"applicant_name": "店长",
|
||||
"operator_id": 22,
|
||||
"operator_name": "值班",
|
||||
"ledger_amount": "50.00",
|
||||
"ledger_count": 2,
|
||||
"ledger_name": "调价",
|
||||
"ledger_status": "APPROVED",
|
||||
"order_settle_id": 7010,
|
||||
"order_trade_no": 8001,
|
||||
"site_table_id": 301,
|
||||
"create_time": END_TS,
|
||||
"is_delete": False,
|
||||
"tableProfile": {
|
||||
"id": 301,
|
||||
"site_table_area_id": 40,
|
||||
"site_table_area_name": "大厅",
|
||||
},
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="ASSISTANT_ABOLISH",
|
||||
task_cls=AssistantAbolishTask,
|
||||
endpoint="/Assistant/AbolishList",
|
||||
data_path=("data", "abolitionAssistants"),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 1401,
|
||||
"tableId": 301,
|
||||
"tableName": "1号桌",
|
||||
"tableAreaId": 40,
|
||||
"tableArea": "大厅",
|
||||
"assistantOn": "AS001",
|
||||
"assistantName": "小李",
|
||||
"pdChargeMinutes": 30,
|
||||
"assistantAbolishAmount": "15.00",
|
||||
"createTime": END_TS,
|
||||
"trashReason": "测试",
|
||||
}
|
||||
],
|
||||
),
|
||||
TaskSpec(
|
||||
code="LEDGER",
|
||||
task_cls=LedgerTask,
|
||||
endpoint="/Assistant/LedgerList",
|
||||
data_path=("data", "orderAssistantDetails"),
|
||||
sample_records=[
|
||||
{
|
||||
"id": 1501,
|
||||
"assistantNo": "AS001",
|
||||
"assistantName": "小李",
|
||||
"nickname": "李",
|
||||
"levelName": "L1",
|
||||
"tableName": "1号桌",
|
||||
"ledger_unit_price": "30.00",
|
||||
"ledger_count": 2,
|
||||
"ledger_amount": "60.00",
|
||||
"projected_income": "80.00",
|
||||
"service_money": "5.00",
|
||||
"member_discount_amount": "2.00",
|
||||
"manual_discount_amount": "1.00",
|
||||
"coupon_deduct_money": "3.00",
|
||||
"order_trade_no": 8001,
|
||||
"order_settle_id": 7010,
|
||||
"operator_id": 22,
|
||||
"operator_name": "值班",
|
||||
"assistant_team_id": 10,
|
||||
"assistant_level": "A",
|
||||
"site_table_id": 301,
|
||||
"order_assistant_id": 1601,
|
||||
"site_assistant_id": 501,
|
||||
"user_id": 5010,
|
||||
"ledger_start_time": BASE_TS,
|
||||
"ledger_end_time": END_TS,
|
||||
"start_use_time": BASE_TS,
|
||||
"last_use_time": END_TS,
|
||||
"income_seconds": 3600,
|
||||
"real_use_seconds": 3300,
|
||||
"is_trash": False,
|
||||
"trash_reason": "",
|
||||
"is_confirm": True,
|
||||
"ledger_status": "CLOSED",
|
||||
"create_time": END_TS,
|
||||
}
|
||||
],
|
||||
),
|
||||
]
|
||||
39
etl_billiards/tests/unit/test_etl_tasks_offline.py
Normal file
39
etl_billiards/tests/unit/test_etl_tasks_offline.py
Normal file
@@ -0,0 +1,39 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""离线模式任务测试,通过回放归档 JSON 来验证 T+L 链路可用。"""
|
||||
import logging
|
||||
from pathlib import Path
|
||||
|
||||
import pytest
|
||||
|
||||
from .task_test_utils import (
|
||||
TASK_SPECS,
|
||||
OfflineAPIClient,
|
||||
create_test_config,
|
||||
dump_offline_payload,
|
||||
get_db_operations,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("spec", TASK_SPECS, ids=lambda spec: spec.code)
|
||||
def test_task_offline_mode(spec, tmp_path):
|
||||
"""确保每个任务都能读取归档 JSON 并完成 Transform + Load 操作。"""
|
||||
archive_dir = tmp_path / "archive"
|
||||
temp_dir = tmp_path / "tmp"
|
||||
archive_dir.mkdir()
|
||||
temp_dir.mkdir()
|
||||
|
||||
file_path = dump_offline_payload(spec, archive_dir)
|
||||
config = create_test_config("OFFLINE", archive_dir, temp_dir)
|
||||
offline_api = OfflineAPIClient({spec.endpoint: Path(file_path)})
|
||||
logger = logging.getLogger(f"test_offline_{spec.code.lower()}")
|
||||
|
||||
with get_db_operations() as db_ops:
|
||||
task = spec.task_cls(config, db_ops, offline_api, logger)
|
||||
result = task.execute()
|
||||
|
||||
assert result["status"] == "SUCCESS"
|
||||
assert result["counts"]["fetched"] == len(spec.sample_records)
|
||||
assert result["counts"]["inserted"] == len(spec.sample_records)
|
||||
if hasattr(db_ops, "commits"):
|
||||
assert db_ops.commits == 1
|
||||
assert db_ops.rollbacks == 0
|
||||
32
etl_billiards/tests/unit/test_etl_tasks_online.py
Normal file
32
etl_billiards/tests/unit/test_etl_tasks_online.py
Normal file
@@ -0,0 +1,32 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""在线模式下的端到端任务测试,验证所有任务在模拟 API 下能顺利执行。"""
|
||||
import logging
|
||||
import pytest
|
||||
|
||||
from .task_test_utils import (
|
||||
TASK_SPECS,
|
||||
FakeAPIClient,
|
||||
create_test_config,
|
||||
get_db_operations,
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("spec", TASK_SPECS, ids=lambda spec: spec.code)
|
||||
def test_task_online_mode(spec, tmp_path):
|
||||
"""针对每个 TaskSpec 验证:模拟 API 数据下依旧能完整跑完 ETL,并正确统计。"""
|
||||
archive_dir = tmp_path / "archive"
|
||||
temp_dir = tmp_path / "tmp"
|
||||
config = create_test_config("ONLINE", archive_dir, temp_dir)
|
||||
fake_api = FakeAPIClient({spec.endpoint: spec.sample_records})
|
||||
logger = logging.getLogger(f"test_online_{spec.code.lower()}")
|
||||
|
||||
with get_db_operations() as db_ops:
|
||||
task = spec.task_cls(config, db_ops, fake_api, logger)
|
||||
result = task.execute()
|
||||
|
||||
assert result["status"] == "SUCCESS"
|
||||
assert result["counts"]["fetched"] == len(spec.sample_records)
|
||||
assert result["counts"]["inserted"] == len(spec.sample_records)
|
||||
if hasattr(db_ops, "commits"):
|
||||
assert db_ops.commits == 1
|
||||
assert db_ops.rollbacks == 0
|
||||
Reference in New Issue
Block a user