# -*- coding: utf-8 -*- """会员ETL任务""" import json from .base_task import BaseTask, TaskContext from loaders.dimensions.member import MemberLoader from models.parsers import TypeParser class MembersTask(BaseTask): """会员ETL任务""" def get_task_code(self) -> str: return "MEMBERS" def extract(self, context: TaskContext) -> dict: params = self._merge_common_params({"siteId": context.store_id}) records, _ = self.api.get_paginated( endpoint="/MemberProfile/GetTenantMemberList", params=params, page_size=self.config.get("api.page_size", 200), data_path=("data",), list_key="tenantMemberInfos", ) return {"records": records} def transform(self, extracted: dict, context: TaskContext) -> dict: parsed, skipped = [], 0 for raw in extracted.get("records", []): parsed_row = self._parse_member(raw, context.store_id) if parsed_row: parsed.append(parsed_row) else: skipped += 1 return { "records": parsed, "fetched": len(extracted.get("records", [])), "skipped": skipped, } def load(self, transformed: dict, context: TaskContext) -> dict: loader = MemberLoader(self.db) inserted, updated, loader_skipped = loader.upsert_members( transformed["records"], context.store_id ) return { "fetched": transformed["fetched"], "inserted": inserted, "updated": updated, "skipped": transformed["skipped"] + loader_skipped, "errors": 0, } def _parse_member(self, raw: dict, store_id: int) -> dict | None: """解析会员记录""" try: member_id = TypeParser.parse_int(raw.get("memberId")) if not member_id: return None return { "store_id": store_id, "member_id": member_id, "member_name": raw.get("memberName"), "phone": raw.get("phone"), "balance": TypeParser.parse_decimal(raw.get("balance")), "status": raw.get("status"), "register_time": TypeParser.parse_timestamp(raw.get("registerTime"), self.tz), "raw_data": json.dumps(raw, ensure_ascii=False), } except Exception as exc: self.logger.warning("解析会员记录失败: %s, 原始数据: %s", exc, raw) return None