Merge branch 'main' into dev
This commit is contained in:
83
etl_billiards/tasks/assistant_abolish_task.py
Normal file
83
etl_billiards/tasks/assistant_abolish_task.py
Normal file
@@ -0,0 +1,83 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教作废任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.assistant_abolish import AssistantAbolishLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class AssistantAbolishTask(BaseTask):
|
||||
"""同步助教作废记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "ASSISTANT_ABOLISH"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 ASSISTANT_ABOLISH 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Assistant/AbolishList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "abolitionAssistants"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_record(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = AssistantAbolishLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_records(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"ASSISTANT_ABOLISH 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("ASSISTANT_ABOLISH 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_record(self, raw: dict) -> dict | None:
|
||||
abolish_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not abolish_id:
|
||||
self.logger.warning("跳过缺少 id 的助教作废记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"abolish_id": abolish_id,
|
||||
"table_id": TypeParser.parse_int(raw.get("tableId")),
|
||||
"table_name": raw.get("tableName"),
|
||||
"table_area_id": TypeParser.parse_int(raw.get("tableAreaId")),
|
||||
"table_area": raw.get("tableArea"),
|
||||
"assistant_no": raw.get("assistantOn"),
|
||||
"assistant_name": raw.get("assistantName"),
|
||||
"charge_minutes": TypeParser.parse_int(raw.get("pdChargeMinutes")),
|
||||
"abolish_amount": TypeParser.parse_decimal(
|
||||
raw.get("assistantAbolishAmount")
|
||||
),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("createTime") or raw.get("create_time"), self.tz
|
||||
),
|
||||
"trash_reason": raw.get("trashReason"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
103
etl_billiards/tasks/assistants_task.py
Normal file
103
etl_billiards/tasks/assistants_task.py
Normal file
@@ -0,0 +1,103 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教账号任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.dimensions.assistant import AssistantLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class AssistantsTask(BaseTask):
|
||||
"""同步助教账号资料"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "ASSISTANTS"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 ASSISTANTS 任务")
|
||||
params = {"storeId": self.config.get("app.store_id")}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Assistant/List",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "assistantInfos"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_assistant(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = AssistantLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_assistants(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"ASSISTANTS 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("ASSISTANTS 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_assistant(self, raw: dict) -> dict | None:
|
||||
assistant_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not assistant_id:
|
||||
self.logger.warning("跳过缺少 id 的助教数据: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"assistant_id": assistant_id,
|
||||
"assistant_no": raw.get("assistant_no") or raw.get("assistantNo"),
|
||||
"nickname": raw.get("nickname"),
|
||||
"real_name": raw.get("real_name") or raw.get("realName"),
|
||||
"gender": raw.get("gender"),
|
||||
"mobile": raw.get("mobile"),
|
||||
"level": raw.get("level"),
|
||||
"team_id": TypeParser.parse_int(raw.get("team_id") or raw.get("teamId")),
|
||||
"team_name": raw.get("team_name"),
|
||||
"assistant_status": raw.get("assistant_status"),
|
||||
"work_status": raw.get("work_status"),
|
||||
"entry_time": TypeParser.parse_timestamp(
|
||||
raw.get("entry_time") or raw.get("entryTime"), self.tz
|
||||
),
|
||||
"resign_time": TypeParser.parse_timestamp(
|
||||
raw.get("resign_time") or raw.get("resignTime"), self.tz
|
||||
),
|
||||
"start_time": TypeParser.parse_timestamp(
|
||||
raw.get("start_time") or raw.get("startTime"), self.tz
|
||||
),
|
||||
"end_time": TypeParser.parse_timestamp(
|
||||
raw.get("end_time") or raw.get("endTime"), self.tz
|
||||
),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"update_time": TypeParser.parse_timestamp(
|
||||
raw.get("update_time") or raw.get("updateTime"), self.tz
|
||||
),
|
||||
"system_role_id": raw.get("system_role_id"),
|
||||
"online_status": raw.get("online_status"),
|
||||
"allow_cx": raw.get("allow_cx"),
|
||||
"charge_way": raw.get("charge_way"),
|
||||
"pd_unit_price": TypeParser.parse_decimal(raw.get("pd_unit_price")),
|
||||
"cx_unit_price": TypeParser.parse_decimal(raw.get("cx_unit_price")),
|
||||
"is_guaranteed": raw.get("is_guaranteed"),
|
||||
"is_team_leader": raw.get("is_team_leader"),
|
||||
"serial_number": raw.get("serial_number"),
|
||||
"show_sort": raw.get("show_sort"),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
92
etl_billiards/tasks/coupon_usage_task.py
Normal file
92
etl_billiards/tasks/coupon_usage_task.py
Normal file
@@ -0,0 +1,92 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""平台券核销任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.coupon_usage import CouponUsageLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class CouponUsageTask(BaseTask):
|
||||
"""同步平台券验证/核销记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "COUPON_USAGE"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 COUPON_USAGE 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Coupon/UsageList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=(),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_usage(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = CouponUsageLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_coupon_usage(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"COUPON_USAGE 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("COUPON_USAGE 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_usage(self, raw: dict) -> dict | None:
|
||||
usage_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not usage_id:
|
||||
self.logger.warning("跳过缺少 id 的券核销记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"usage_id": usage_id,
|
||||
"coupon_code": raw.get("coupon_code"),
|
||||
"coupon_channel": raw.get("coupon_channel"),
|
||||
"coupon_name": raw.get("coupon_name"),
|
||||
"sale_price": TypeParser.parse_decimal(raw.get("sale_price")),
|
||||
"coupon_money": TypeParser.parse_decimal(raw.get("coupon_money")),
|
||||
"coupon_free_time": TypeParser.parse_int(raw.get("coupon_free_time")),
|
||||
"use_status": raw.get("use_status"),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"consume_time": TypeParser.parse_timestamp(
|
||||
raw.get("consume_time") or raw.get("consumeTime"), self.tz
|
||||
),
|
||||
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
|
||||
"operator_name": raw.get("operator_name"),
|
||||
"table_id": TypeParser.parse_int(raw.get("table_id")),
|
||||
"site_order_id": TypeParser.parse_int(raw.get("site_order_id")),
|
||||
"group_package_id": TypeParser.parse_int(raw.get("group_package_id")),
|
||||
"coupon_remark": raw.get("coupon_remark"),
|
||||
"deal_id": raw.get("deal_id"),
|
||||
"certificate_id": raw.get("certificate_id"),
|
||||
"verify_id": raw.get("verify_id"),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
90
etl_billiards/tasks/inventory_change_task.py
Normal file
90
etl_billiards/tasks/inventory_change_task.py
Normal file
@@ -0,0 +1,90 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""库存变更任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.inventory_change import InventoryChangeLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class InventoryChangeTask(BaseTask):
|
||||
"""同步库存变化记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "INVENTORY_CHANGE"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 INVENTORY_CHANGE 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Inventory/ChangeList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "queryDeliveryRecordsList"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_change(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = InventoryChangeLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_changes(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"INVENTORY_CHANGE 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("INVENTORY_CHANGE 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_change(self, raw: dict) -> dict | None:
|
||||
change_id = TypeParser.parse_int(
|
||||
raw.get("siteGoodsStockId") or raw.get("site_goods_stock_id")
|
||||
)
|
||||
if not change_id:
|
||||
self.logger.warning("跳过缺少变动 id 的库存记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"change_id": change_id,
|
||||
"site_goods_id": TypeParser.parse_int(
|
||||
raw.get("siteGoodsId") or raw.get("site_goods_id")
|
||||
),
|
||||
"stock_type": raw.get("stockType") or raw.get("stock_type"),
|
||||
"goods_name": raw.get("goodsName"),
|
||||
"change_time": TypeParser.parse_timestamp(
|
||||
raw.get("createTime") or raw.get("create_time"), self.tz
|
||||
),
|
||||
"start_qty": TypeParser.parse_int(raw.get("startNum")),
|
||||
"end_qty": TypeParser.parse_int(raw.get("endNum")),
|
||||
"change_qty": TypeParser.parse_int(raw.get("changeNum")),
|
||||
"unit": raw.get("unit"),
|
||||
"price": TypeParser.parse_decimal(raw.get("price")),
|
||||
"operator_name": raw.get("operatorName"),
|
||||
"remark": raw.get("remark"),
|
||||
"goods_category_id": TypeParser.parse_int(raw.get("goodsCategoryId")),
|
||||
"goods_second_category_id": TypeParser.parse_int(
|
||||
raw.get("goodsSecondCategoryId")
|
||||
),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
119
etl_billiards/tasks/ledger_task.py
Normal file
119
etl_billiards/tasks/ledger_task.py
Normal file
@@ -0,0 +1,119 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""助教流水任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.assistant_ledger import AssistantLedgerLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class LedgerTask(BaseTask):
|
||||
"""同步助教服务台账"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "LEDGER"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 LEDGER 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Assistant/LedgerList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "orderAssistantDetails"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_ledger(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = AssistantLedgerLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_ledgers(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"LEDGER 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("LEDGER 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_ledger(self, raw: dict) -> dict | None:
|
||||
ledger_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not ledger_id:
|
||||
self.logger.warning("跳过缺少 id 的助教流水: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"ledger_id": ledger_id,
|
||||
"assistant_no": raw.get("assistantNo"),
|
||||
"assistant_name": raw.get("assistantName"),
|
||||
"nickname": raw.get("nickname"),
|
||||
"level_name": raw.get("levelName"),
|
||||
"table_name": raw.get("tableName"),
|
||||
"ledger_unit_price": TypeParser.parse_decimal(raw.get("ledger_unit_price")),
|
||||
"ledger_count": TypeParser.parse_int(raw.get("ledger_count")),
|
||||
"ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")),
|
||||
"projected_income": TypeParser.parse_decimal(raw.get("projected_income")),
|
||||
"service_money": TypeParser.parse_decimal(raw.get("service_money")),
|
||||
"member_discount_amount": TypeParser.parse_decimal(
|
||||
raw.get("member_discount_amount")
|
||||
),
|
||||
"manual_discount_amount": TypeParser.parse_decimal(
|
||||
raw.get("manual_discount_amount")
|
||||
),
|
||||
"coupon_deduct_money": TypeParser.parse_decimal(
|
||||
raw.get("coupon_deduct_money")
|
||||
),
|
||||
"order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")),
|
||||
"order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")),
|
||||
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
|
||||
"operator_name": raw.get("operator_name"),
|
||||
"assistant_team_id": TypeParser.parse_int(raw.get("assistant_team_id")),
|
||||
"assistant_level": raw.get("assistant_level"),
|
||||
"site_table_id": TypeParser.parse_int(raw.get("site_table_id")),
|
||||
"order_assistant_id": TypeParser.parse_int(raw.get("order_assistant_id")),
|
||||
"site_assistant_id": TypeParser.parse_int(raw.get("site_assistant_id")),
|
||||
"user_id": TypeParser.parse_int(raw.get("user_id")),
|
||||
"ledger_start_time": TypeParser.parse_timestamp(
|
||||
raw.get("ledger_start_time"), self.tz
|
||||
),
|
||||
"ledger_end_time": TypeParser.parse_timestamp(
|
||||
raw.get("ledger_end_time"), self.tz
|
||||
),
|
||||
"start_use_time": TypeParser.parse_timestamp(
|
||||
raw.get("start_use_time"), self.tz
|
||||
),
|
||||
"last_use_time": TypeParser.parse_timestamp(
|
||||
raw.get("last_use_time"), self.tz
|
||||
),
|
||||
"income_seconds": TypeParser.parse_int(raw.get("income_seconds")),
|
||||
"real_use_seconds": TypeParser.parse_int(raw.get("real_use_seconds")),
|
||||
"is_trash": raw.get("is_trash"),
|
||||
"trash_reason": raw.get("trash_reason"),
|
||||
"is_confirm": raw.get("is_confirm"),
|
||||
"ledger_status": raw.get("ledger_status"),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
91
etl_billiards/tasks/packages_task.py
Normal file
91
etl_billiards/tasks/packages_task.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""团购/套餐定义任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.dimensions.package import PackageDefinitionLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class PackagesDefTask(BaseTask):
|
||||
"""同步团购套餐定义"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "PACKAGES_DEF"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 PACKAGES_DEF 任务")
|
||||
params = {"storeId": self.config.get("app.store_id")}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Package/List",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "packageCouponList"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_package(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = PackageDefinitionLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_packages(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"PACKAGES_DEF 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("PACKAGES_DEF 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_package(self, raw: dict) -> dict | None:
|
||||
package_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not package_id:
|
||||
self.logger.warning("跳过缺少 id 的套餐数据: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"package_id": package_id,
|
||||
"package_code": raw.get("package_id") or raw.get("packageId"),
|
||||
"package_name": raw.get("package_name"),
|
||||
"table_area_id": raw.get("table_area_id"),
|
||||
"table_area_name": raw.get("table_area_name"),
|
||||
"selling_price": TypeParser.parse_decimal(
|
||||
raw.get("selling_price") or raw.get("sellingPrice")
|
||||
),
|
||||
"duration_seconds": TypeParser.parse_int(raw.get("duration")),
|
||||
"start_time": TypeParser.parse_timestamp(
|
||||
raw.get("start_time") or raw.get("startTime"), self.tz
|
||||
),
|
||||
"end_time": TypeParser.parse_timestamp(
|
||||
raw.get("end_time") or raw.get("endTime"), self.tz
|
||||
),
|
||||
"type": raw.get("type"),
|
||||
"is_enabled": raw.get("is_enabled"),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"usable_count": TypeParser.parse_int(raw.get("usable_count")),
|
||||
"creator_name": raw.get("creator_name"),
|
||||
"date_type": raw.get("date_type"),
|
||||
"group_type": raw.get("group_type"),
|
||||
"coupon_money": TypeParser.parse_decimal(
|
||||
raw.get("coupon_money") or raw.get("couponMoney")
|
||||
),
|
||||
"area_tag_type": raw.get("area_tag_type"),
|
||||
"system_group_type": raw.get("system_group_type"),
|
||||
"card_type_ids": raw.get("card_type_ids"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
91
etl_billiards/tasks/refunds_task.py
Normal file
91
etl_billiards/tasks/refunds_task.py
Normal file
@@ -0,0 +1,91 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""退款记录任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.refund import RefundLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class RefundsTask(BaseTask):
|
||||
"""同步支付退款流水"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "REFUNDS"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 REFUNDS 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Pay/RefundList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=(),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_refund(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = RefundLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_refunds(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"REFUNDS 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("REFUNDS 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_refund(self, raw: dict) -> dict | None:
|
||||
refund_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not refund_id:
|
||||
self.logger.warning("跳过缺少 id 的退款记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"refund_id": refund_id,
|
||||
"site_id": TypeParser.parse_int(raw.get("site_id") or raw.get("siteId")),
|
||||
"tenant_id": TypeParser.parse_int(raw.get("tenant_id") or raw.get("tenantId")),
|
||||
"pay_amount": TypeParser.parse_decimal(raw.get("pay_amount")),
|
||||
"pay_status": raw.get("pay_status"),
|
||||
"pay_time": TypeParser.parse_timestamp(
|
||||
raw.get("pay_time") or raw.get("payTime"), self.tz
|
||||
),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"relate_type": raw.get("relate_type"),
|
||||
"relate_id": TypeParser.parse_int(raw.get("relate_id")),
|
||||
"payment_method": raw.get("payment_method"),
|
||||
"refund_amount": TypeParser.parse_decimal(raw.get("refund_amount")),
|
||||
"action_type": raw.get("action_type"),
|
||||
"pay_terminal": raw.get("pay_terminal"),
|
||||
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
|
||||
"channel_pay_no": raw.get("channel_pay_no"),
|
||||
"channel_fee": TypeParser.parse_decimal(raw.get("channel_fee")),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"member_id": TypeParser.parse_int(raw.get("member_id")),
|
||||
"member_card_id": TypeParser.parse_int(raw.get("member_card_id")),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
92
etl_billiards/tasks/table_discount_task.py
Normal file
92
etl_billiards/tasks/table_discount_task.py
Normal file
@@ -0,0 +1,92 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""台费折扣任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.table_discount import TableDiscountLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class TableDiscountTask(BaseTask):
|
||||
"""同步台费折扣/调价记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "TABLE_DISCOUNT"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 TABLE_DISCOUNT 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Table/AdjustList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "taiFeeAdjustInfos"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_discount(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = TableDiscountLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_discounts(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"TABLE_DISCOUNT 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("TABLE_DISCOUNT 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_discount(self, raw: dict) -> dict | None:
|
||||
discount_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not discount_id:
|
||||
self.logger.warning("跳过缺少 id 的台费折扣记录: %s", raw)
|
||||
return None
|
||||
|
||||
table_profile = raw.get("tableProfile") or {}
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"discount_id": discount_id,
|
||||
"adjust_type": raw.get("adjust_type") or raw.get("adjustType"),
|
||||
"applicant_id": TypeParser.parse_int(raw.get("applicant_id")),
|
||||
"applicant_name": raw.get("applicant_name"),
|
||||
"operator_id": TypeParser.parse_int(raw.get("operator_id")),
|
||||
"operator_name": raw.get("operator_name"),
|
||||
"ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")),
|
||||
"ledger_count": TypeParser.parse_int(raw.get("ledger_count")),
|
||||
"ledger_name": raw.get("ledger_name"),
|
||||
"ledger_status": raw.get("ledger_status"),
|
||||
"order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")),
|
||||
"order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")),
|
||||
"site_table_id": TypeParser.parse_int(
|
||||
raw.get("site_table_id") or table_profile.get("id")
|
||||
),
|
||||
"table_area_id": TypeParser.parse_int(
|
||||
raw.get("tableAreaId") or table_profile.get("site_table_area_id")
|
||||
),
|
||||
"table_area_name": table_profile.get("site_table_area_name"),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"is_delete": raw.get("is_delete"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
@@ -1,4 +1,92 @@
|
||||
<<<<<<< HEAD
|
||||
class TablesTask(BaseTask):
|
||||
def get_task_code(self) -> str: # 返回 "TABLES"
|
||||
def execute(self) -> dict: # 拉取 /Table/GetSiteTables
|
||||
def _parse_table(self, raw: dict) -> dict | None:
|
||||
def _parse_table(self, raw: dict) -> dict | None:
|
||||
=======
|
||||
# -*- coding: utf-8 -*-
|
||||
"""台桌档案任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.dimensions.table import TableLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class TablesTask(BaseTask):
|
||||
"""同步门店台桌列表"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "TABLES"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 TABLES 任务")
|
||||
params = {"storeId": self.config.get("app.store_id")}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Table/GetSiteTables",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "siteTables"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_table(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = TableLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_tables(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"TABLES 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("TABLES 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_table(self, raw: dict) -> dict | None:
|
||||
table_id = TypeParser.parse_int(raw.get("id"))
|
||||
if not table_id:
|
||||
self.logger.warning("跳过缺少 table_id 的台桌记录: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"table_id": table_id,
|
||||
"site_id": TypeParser.parse_int(raw.get("site_id") or raw.get("siteId")),
|
||||
"area_id": TypeParser.parse_int(
|
||||
raw.get("site_table_area_id") or raw.get("siteTableAreaId")
|
||||
),
|
||||
"area_name": raw.get("areaName") or raw.get("site_table_area_name"),
|
||||
"table_name": raw.get("table_name") or raw.get("tableName"),
|
||||
"table_price": TypeParser.parse_decimal(
|
||||
raw.get("table_price") or raw.get("tablePrice")
|
||||
),
|
||||
"table_status": raw.get("table_status") or raw.get("tableStatus"),
|
||||
"table_status_name": raw.get("tableStatusName"),
|
||||
"light_status": raw.get("light_status"),
|
||||
"is_rest_area": raw.get("is_rest_area"),
|
||||
"show_status": raw.get("show_status"),
|
||||
"virtual_table": raw.get("virtual_table"),
|
||||
"charge_free": raw.get("charge_free"),
|
||||
"only_allow_groupon": raw.get("only_allow_groupon"),
|
||||
"is_online_reservation": raw.get("is_online_reservation"),
|
||||
"created_time": TypeParser.parse_timestamp(
|
||||
raw.get("create_time") or raw.get("createTime"), self.tz
|
||||
),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
>>>>>>> main
|
||||
|
||||
102
etl_billiards/tasks/topups_task.py
Normal file
102
etl_billiards/tasks/topups_task.py
Normal file
@@ -0,0 +1,102 @@
|
||||
# -*- coding: utf-8 -*-
|
||||
"""充值记录任务"""
|
||||
|
||||
import json
|
||||
|
||||
from .base_task import BaseTask
|
||||
from loaders.facts.topup import TopupLoader
|
||||
from models.parsers import TypeParser
|
||||
|
||||
|
||||
class TopupsTask(BaseTask):
|
||||
"""同步储值充值结算记录"""
|
||||
|
||||
def get_task_code(self) -> str:
|
||||
return "TOPUPS"
|
||||
|
||||
def execute(self) -> dict:
|
||||
self.logger.info("开始执行 TOPUPS 任务")
|
||||
window_start, window_end, _ = self._get_time_window()
|
||||
params = {
|
||||
"storeId": self.config.get("app.store_id"),
|
||||
"startTime": TypeParser.format_timestamp(window_start, self.tz),
|
||||
"endTime": TypeParser.format_timestamp(window_end, self.tz),
|
||||
}
|
||||
|
||||
try:
|
||||
records, _ = self.api.get_paginated(
|
||||
endpoint="/Topup/SettleList",
|
||||
params=params,
|
||||
page_size=self.config.get("api.page_size", 200),
|
||||
data_path=("data", "settleList"),
|
||||
)
|
||||
|
||||
parsed = []
|
||||
for raw in records:
|
||||
mapped = self._parse_topup(raw)
|
||||
if mapped:
|
||||
parsed.append(mapped)
|
||||
|
||||
loader = TopupLoader(self.db)
|
||||
inserted, updated, skipped = loader.upsert_topups(parsed)
|
||||
|
||||
self.db.commit()
|
||||
counts = {
|
||||
"fetched": len(records),
|
||||
"inserted": inserted,
|
||||
"updated": updated,
|
||||
"skipped": skipped,
|
||||
"errors": 0,
|
||||
}
|
||||
self.logger.info(f"TOPUPS 完成: {counts}")
|
||||
return self._build_result("SUCCESS", counts)
|
||||
except Exception:
|
||||
self.db.rollback()
|
||||
self.logger.error("TOPUPS 失败", exc_info=True)
|
||||
raise
|
||||
|
||||
def _parse_topup(self, raw: dict) -> dict | None:
|
||||
node = raw.get("settleList") if isinstance(raw.get("settleList"), dict) else raw
|
||||
topup_id = TypeParser.parse_int(node.get("id"))
|
||||
if not topup_id:
|
||||
self.logger.warning("跳过缺少 id 的充值结算: %s", raw)
|
||||
return None
|
||||
|
||||
store_id = self.config.get("app.store_id")
|
||||
return {
|
||||
"store_id": store_id,
|
||||
"topup_id": topup_id,
|
||||
"member_id": TypeParser.parse_int(node.get("memberId")),
|
||||
"member_name": node.get("memberName"),
|
||||
"member_phone": node.get("memberPhone"),
|
||||
"card_id": TypeParser.parse_int(node.get("tenantMemberCardId")),
|
||||
"card_type_name": node.get("memberCardTypeName"),
|
||||
"pay_amount": TypeParser.parse_decimal(node.get("payAmount")),
|
||||
"consume_money": TypeParser.parse_decimal(node.get("consumeMoney")),
|
||||
"settle_status": node.get("settleStatus"),
|
||||
"settle_type": node.get("settleType"),
|
||||
"settle_name": node.get("settleName"),
|
||||
"settle_relate_id": TypeParser.parse_int(node.get("settleRelateId")),
|
||||
"pay_time": TypeParser.parse_timestamp(
|
||||
node.get("payTime") or node.get("pay_time"), self.tz
|
||||
),
|
||||
"create_time": TypeParser.parse_timestamp(
|
||||
node.get("createTime") or node.get("create_time"), self.tz
|
||||
),
|
||||
"operator_id": TypeParser.parse_int(node.get("operatorId")),
|
||||
"operator_name": node.get("operatorName"),
|
||||
"payment_method": node.get("paymentMethod"),
|
||||
"refund_amount": TypeParser.parse_decimal(node.get("refundAmount")),
|
||||
"cash_amount": TypeParser.parse_decimal(node.get("cashAmount")),
|
||||
"card_amount": TypeParser.parse_decimal(node.get("cardAmount")),
|
||||
"balance_amount": TypeParser.parse_decimal(node.get("balanceAmount")),
|
||||
"online_amount": TypeParser.parse_decimal(node.get("onlineAmount")),
|
||||
"rounding_amount": TypeParser.parse_decimal(node.get("roundingAmount")),
|
||||
"adjust_amount": TypeParser.parse_decimal(node.get("adjustAmount")),
|
||||
"goods_money": TypeParser.parse_decimal(node.get("goodsMoney")),
|
||||
"table_charge_money": TypeParser.parse_decimal(node.get("tableChargeMoney")),
|
||||
"service_money": TypeParser.parse_decimal(node.get("serviceMoney")),
|
||||
"coupon_amount": TypeParser.parse_decimal(node.get("couponAmount")),
|
||||
"order_remark": node.get("orderRemark"),
|
||||
"raw_data": json.dumps(raw, ensure_ascii=False),
|
||||
}
|
||||
Reference in New Issue
Block a user