# -*- coding: utf-8 -*- """ 助教收支分析任务 功能说明: 以"日期+助教"为粒度,分析助教产出的收入和成本 数据来源: - dwd_assistant_service_log: 助教服务流水(收入) - dws_assistant_salary_calc: 工资计算(成本) 目标表: billiards_dws.dws_assistant_finance_analysis 更新策略: - 更新频率:每日更新 - 幂等方式:delete-before-insert(按日期) 作者:ETL团队 创建日期:2026-02-01 """ from __future__ import annotations from datetime import date, datetime, timedelta from decimal import Decimal from typing import Any, Dict, List, Optional, Tuple from .base_dws_task import BaseDwsTask, CourseType, TaskContext class AssistantFinanceTask(BaseDwsTask): """ 助教收支分析任务 """ def get_task_code(self) -> str: return "DWS_ASSISTANT_FINANCE" def get_target_table(self) -> str: return "dws_assistant_finance_analysis" def get_primary_keys(self) -> List[str]: return ["site_id", "stat_date", "assistant_id"] def extract(self, context: TaskContext) -> Dict[str, Any]: start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end site_id = context.store_id # 获取助教日度收入 daily_revenue = self._extract_daily_revenue(site_id, start_date, end_date) # 获取月度工资(用于计算日均成本) monthly_salary = self._extract_monthly_salary(site_id, start_date, end_date) # 加载配置 self.load_config_cache() return { 'daily_revenue': daily_revenue, 'monthly_salary': monthly_salary, 'start_date': start_date, 'end_date': end_date, 'site_id': site_id } def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]: daily_revenue = extracted['daily_revenue'] monthly_salary = extracted['monthly_salary'] site_id = extracted['site_id'] # 构建月度工资索引 salary_index = {} for sal in monthly_salary: asst_id = sal.get('assistant_id') month = sal.get('salary_month') if asst_id and month: salary_index[(asst_id, month)] = sal results = [] for rev in daily_revenue: assistant_id = rev.get('assistant_id') stat_date = rev.get('stat_date') # 获取对应月份的工资 month_start = stat_date.replace(day=1) if isinstance(stat_date, date) else None salary = salary_index.get((assistant_id, month_start), {}) # 计算日均成本 gross_salary = self.safe_decimal(salary.get('gross_salary', 0)) work_days = self.safe_int(salary.get('work_days', 1)) or 1 cost_daily = gross_salary / Decimal(str(work_days)) revenue_total = self.safe_decimal(rev.get('revenue_total', 0)) gross_profit = revenue_total - cost_daily gross_margin = gross_profit / revenue_total if revenue_total > 0 else Decimal('0') record = { 'site_id': site_id, 'tenant_id': self.config.get("app.tenant_id", site_id), 'stat_date': stat_date, 'assistant_id': assistant_id, 'assistant_nickname': rev.get('assistant_nickname'), 'revenue_total': revenue_total, 'revenue_base': self.safe_decimal(rev.get('revenue_base', 0)), 'revenue_bonus': self.safe_decimal(rev.get('revenue_bonus', 0)), 'cost_daily': cost_daily, 'gross_profit': gross_profit, 'gross_margin': gross_margin, 'service_count': self.safe_int(rev.get('service_count', 0)), 'service_hours': self.safe_decimal(rev.get('service_hours', 0)), 'unique_customers': self.safe_int(rev.get('unique_customers', 0)), } results.append(record) return results def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict: if not transformed: return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}} deleted = self.delete_existing_data(context, date_col="stat_date") inserted = self.bulk_insert(transformed) return { "counts": {"fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0}, "extra": {"deleted": deleted} } def _extract_daily_revenue(self, site_id: int, start_date: date, end_date: date) -> List[Dict[str, Any]]: # 基础课skill_id BASE_SKILL_ID = 2791903611396869 sql = """ SELECT DATE(start_use_time) AS stat_date, site_assistant_id AS assistant_id, MAX(nickname) AS assistant_nickname, COUNT(*) AS service_count, SUM(income_seconds) / 3600.0 AS service_hours, SUM(ledger_amount) AS revenue_total, SUM(CASE WHEN skill_id = %s THEN ledger_amount ELSE 0 END) AS revenue_base, SUM(CASE WHEN skill_id != %s THEN ledger_amount ELSE 0 END) AS revenue_bonus, COUNT(DISTINCT tenant_member_id) AS unique_customers FROM billiards_dwd.dwd_assistant_service_log WHERE site_id = %s AND DATE(start_use_time) >= %s AND DATE(start_use_time) <= %s GROUP BY DATE(start_use_time), site_assistant_id """ rows = self.db.query(sql, (BASE_SKILL_ID, BASE_SKILL_ID, site_id, start_date, end_date)) return [dict(row) for row in rows] if rows else [] def _extract_monthly_salary(self, site_id: int, start_date: date, end_date: date) -> List[Dict[str, Any]]: # 获取涉及的月份 month_start = start_date.replace(day=1) month_end = end_date.replace(day=1) sql = """ SELECT assistant_id, salary_month, gross_salary, effective_hours FROM billiards_dws.dws_assistant_salary_calc WHERE site_id = %s AND salary_month >= %s AND salary_month <= %s """ rows = self.db.query(sql, (site_id, month_start, month_end)) # 获取每月工作天数 work_days_sql = """ SELECT assistant_id, DATE_TRUNC('month', stat_date)::DATE AS month, COUNT(DISTINCT stat_date) AS work_days FROM billiards_dws.dws_assistant_daily_detail WHERE site_id = %s AND stat_date >= %s AND stat_date <= %s GROUP BY assistant_id, DATE_TRUNC('month', stat_date) """ work_days_rows = self.db.query(work_days_sql, (site_id, start_date, end_date)) work_days_index = {(r['assistant_id'], r['month']): r['work_days'] for r in (work_days_rows or [])} results = [] for row in (rows or []): row_dict = dict(row) asst_id = row_dict.get('assistant_id') month = row_dict.get('salary_month') row_dict['work_days'] = work_days_index.get((asst_id, month), 20) results.append(row_dict) return results __all__ = ['AssistantFinanceTask']