# -*- coding: utf-8 -*- """助教流水任务""" import json from .base_task import BaseTask, TaskContext from loaders.facts.assistant_ledger import AssistantLedgerLoader from models.parsers import TypeParser class LedgerTask(BaseTask): """同步助教服务台账""" def get_task_code(self) -> str: return "LEDGER" def extract(self, context: TaskContext) -> dict: params = self._merge_common_params( { "siteId": context.store_id, "startTime": TypeParser.format_timestamp(context.window_start, self.tz), "endTime": TypeParser.format_timestamp(context.window_end, self.tz), } ) records, _ = self.api.get_paginated( endpoint="/AssistantPerformance/GetOrderAssistantDetails", params=params, page_size=self.config.get("api.page_size", 200), data_path=("data",), list_key="orderAssistantDetails", ) return {"records": records} def transform(self, extracted: dict, context: TaskContext) -> dict: parsed, skipped = [], 0 for raw in extracted.get("records", []): mapped = self._parse_ledger(raw, context.store_id) if mapped: parsed.append(mapped) else: skipped += 1 return { "records": parsed, "fetched": len(extracted.get("records", [])), "skipped": skipped, } def load(self, transformed: dict, context: TaskContext) -> dict: loader = AssistantLedgerLoader(self.db) inserted, updated, loader_skipped = loader.upsert_ledgers(transformed["records"]) return { "fetched": transformed["fetched"], "inserted": inserted, "updated": updated, "skipped": transformed["skipped"] + loader_skipped, "errors": 0, } def _parse_ledger(self, raw: dict, store_id: int) -> dict | None: ledger_id = TypeParser.parse_int(raw.get("id")) if not ledger_id: self.logger.warning("跳过缺少助教流水ID的记录: %s", raw) return None return { "store_id": store_id, "ledger_id": ledger_id, "assistant_no": raw.get("assistantNo"), "assistant_name": raw.get("assistantName"), "nickname": raw.get("nickname"), "level_name": raw.get("levelName"), "table_name": raw.get("tableName"), "ledger_unit_price": TypeParser.parse_decimal(raw.get("ledger_unit_price")), "ledger_count": TypeParser.parse_int(raw.get("ledger_count")), "ledger_amount": TypeParser.parse_decimal(raw.get("ledger_amount")), "projected_income": TypeParser.parse_decimal(raw.get("projected_income")), "service_money": TypeParser.parse_decimal(raw.get("service_money")), "member_discount_amount": TypeParser.parse_decimal( raw.get("member_discount_amount") ), "manual_discount_amount": TypeParser.parse_decimal( raw.get("manual_discount_amount") ), "coupon_deduct_money": TypeParser.parse_decimal( raw.get("coupon_deduct_money") ), "order_trade_no": TypeParser.parse_int(raw.get("order_trade_no")), "order_settle_id": TypeParser.parse_int(raw.get("order_settle_id")), "operator_id": TypeParser.parse_int(raw.get("operator_id")), "operator_name": raw.get("operator_name"), "assistant_team_id": TypeParser.parse_int(raw.get("assistant_team_id")), "assistant_level": raw.get("assistant_level"), "site_table_id": TypeParser.parse_int(raw.get("site_table_id")), "order_assistant_id": TypeParser.parse_int(raw.get("order_assistant_id")), "site_assistant_id": TypeParser.parse_int(raw.get("site_assistant_id")), "user_id": TypeParser.parse_int(raw.get("user_id")), "ledger_start_time": TypeParser.parse_timestamp( raw.get("ledger_start_time"), self.tz ), "ledger_end_time": TypeParser.parse_timestamp( raw.get("ledger_end_time"), self.tz ), "start_use_time": TypeParser.parse_timestamp(raw.get("start_use_time"), self.tz), "last_use_time": TypeParser.parse_timestamp(raw.get("last_use_time"), self.tz), "income_seconds": TypeParser.parse_int(raw.get("income_seconds")), "real_use_seconds": TypeParser.parse_int(raw.get("real_use_seconds")), "is_trash": raw.get("is_trash"), "trash_reason": raw.get("trash_reason"), "is_confirm": raw.get("is_confirm"), "ledger_status": raw.get("ledger_status"), "create_time": TypeParser.parse_timestamp( raw.get("create_time") or raw.get("createTime"), self.tz ), "raw_data": json.dumps(raw, ensure_ascii=False), }