# -*- coding: utf-8 -*- """商品档案(PRODUCTS)ETL任务""" import json from .base_task import BaseTask, TaskContext from loaders.dimensions.product import ProductLoader from models.parsers import TypeParser class ProductsTask(BaseTask): """商品维度 ETL 任务""" def get_task_code(self) -> str: return "PRODUCTS" def extract(self, context: TaskContext) -> dict: params = self._merge_common_params({"siteId": context.store_id}) records, _ = self.api.get_paginated( endpoint="/TenantGoods/QueryTenantGoods", params=params, page_size=self.config.get("api.page_size", 200), data_path=("data",), list_key="tenantGoodsList", ) return {"records": records} def transform(self, extracted: dict, context: TaskContext) -> dict: parsed, skipped = [], 0 for raw in extracted.get("records", []): parsed_row = self._parse_product(raw, context.store_id) if parsed_row: parsed.append(parsed_row) else: skipped += 1 return { "records": parsed, "fetched": len(extracted.get("records", [])), "skipped": skipped, } def load(self, transformed: dict, context: TaskContext) -> dict: loader = ProductLoader(self.db) inserted, updated, loader_skipped = loader.upsert_products( transformed["records"], context.store_id ) return { "fetched": transformed["fetched"], "inserted": inserted, "updated": updated, "skipped": transformed["skipped"] + loader_skipped, "errors": 0, } def _parse_product(self, raw: dict, store_id: int) -> dict | None: try: product_id = TypeParser.parse_int( raw.get("siteGoodsId") or raw.get("tenantGoodsId") or raw.get("productId") ) if not product_id: return None return { "store_id": store_id, "product_id": product_id, "site_product_id": TypeParser.parse_int(raw.get("siteGoodsId")), "product_name": raw.get("goodsName") or raw.get("productName"), "category_id": TypeParser.parse_int( raw.get("tenantGoodsCategoryId") or raw.get("goodsCategoryId") ), "category_name": raw.get("categoryName"), "second_category_id": TypeParser.parse_int(raw.get("goodsCategorySecondId")), "unit": raw.get("goodsUnit"), "cost_price": TypeParser.parse_decimal(raw.get("costPrice")), "sale_price": TypeParser.parse_decimal( raw.get("goodsPrice") or raw.get("salePrice") ), "allow_discount": None, "status": raw.get("goodsState") or raw.get("status"), "supplier_id": TypeParser.parse_int(raw.get("supplierId")) if raw.get("supplierId") else None, "barcode": raw.get("barcode"), "is_combo": bool(raw.get("isCombo")) if raw.get("isCombo") is not None else None, "created_time": TypeParser.parse_timestamp(raw.get("createTime"), self.tz), "updated_time": TypeParser.parse_timestamp(raw.get("updateTime"), self.tz), "raw_data": json.dumps(raw, ensure_ascii=False), } except Exception as exc: self.logger.warning("解析商品记录失败: %s, 原始数据: %s", exc, raw) return None