Files
feiqiu-ETL/etl_billiards/tasks/dws/assistant_finance_task.py
2026-02-04 21:39:01 +08:00

200 lines
7.6 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
"""
助教收支分析任务
功能说明:
"日期+助教"为粒度,分析助教产出的收入和成本
数据来源:
- dwd_assistant_service_log: 助教服务流水(收入)
- dws_assistant_salary_calc: 工资计算(成本)
目标表:
billiards_dws.dws_assistant_finance_analysis
更新策略:
- 更新频率:每日更新
- 幂等方式delete-before-insert按日期
作者ETL团队
创建日期2026-02-01
"""
from __future__ import annotations
from datetime import date, datetime, timedelta
from decimal import Decimal
from typing import Any, Dict, List, Optional, Tuple
from .base_dws_task import BaseDwsTask, CourseType, TaskContext
class AssistantFinanceTask(BaseDwsTask):
"""
助教收支分析任务
"""
def get_task_code(self) -> str:
return "DWS_ASSISTANT_FINANCE"
def get_target_table(self) -> str:
return "dws_assistant_finance_analysis"
def get_primary_keys(self) -> List[str]:
return ["site_id", "stat_date", "assistant_id"]
def extract(self, context: TaskContext) -> Dict[str, Any]:
start_date = context.window_start.date() if hasattr(context.window_start, 'date') else context.window_start
end_date = context.window_end.date() if hasattr(context.window_end, 'date') else context.window_end
site_id = context.store_id
# 获取助教日度收入
daily_revenue = self._extract_daily_revenue(site_id, start_date, end_date)
# 获取月度工资(用于计算日均成本)
monthly_salary = self._extract_monthly_salary(site_id, start_date, end_date)
# 加载配置
self.load_config_cache()
return {
'daily_revenue': daily_revenue,
'monthly_salary': monthly_salary,
'start_date': start_date,
'end_date': end_date,
'site_id': site_id
}
def transform(self, extracted: Dict[str, Any], context: TaskContext) -> List[Dict[str, Any]]:
daily_revenue = extracted['daily_revenue']
monthly_salary = extracted['monthly_salary']
site_id = extracted['site_id']
# 构建月度工资索引
salary_index = {}
for sal in monthly_salary:
asst_id = sal.get('assistant_id')
month = sal.get('salary_month')
if asst_id and month:
salary_index[(asst_id, month)] = sal
results = []
for rev in daily_revenue:
assistant_id = rev.get('assistant_id')
stat_date = rev.get('stat_date')
# 获取对应月份的工资
month_start = stat_date.replace(day=1) if isinstance(stat_date, date) else None
salary = salary_index.get((assistant_id, month_start), {})
# 计算日均成本
gross_salary = self.safe_decimal(salary.get('gross_salary', 0))
work_days = self.safe_int(salary.get('work_days', 1)) or 1
cost_daily = gross_salary / Decimal(str(work_days))
revenue_total = self.safe_decimal(rev.get('revenue_total', 0))
gross_profit = revenue_total - cost_daily
gross_margin = gross_profit / revenue_total if revenue_total > 0 else Decimal('0')
record = {
'site_id': site_id,
'tenant_id': self.config.get("app.tenant_id", site_id),
'stat_date': stat_date,
'assistant_id': assistant_id,
'assistant_nickname': rev.get('assistant_nickname'),
'revenue_total': revenue_total,
'revenue_base': self.safe_decimal(rev.get('revenue_base', 0)),
'revenue_bonus': self.safe_decimal(rev.get('revenue_bonus', 0)),
'cost_daily': cost_daily,
'gross_profit': gross_profit,
'gross_margin': gross_margin,
'service_count': self.safe_int(rev.get('service_count', 0)),
'service_hours': self.safe_decimal(rev.get('service_hours', 0)),
'unique_customers': self.safe_int(rev.get('unique_customers', 0)),
}
results.append(record)
return results
def load(self, transformed: List[Dict[str, Any]], context: TaskContext) -> Dict:
if not transformed:
return {"counts": {"fetched": 0, "inserted": 0, "updated": 0, "skipped": 0, "errors": 0}}
deleted = self.delete_existing_data(context, date_col="stat_date")
inserted = self.bulk_insert(transformed)
return {
"counts": {"fetched": len(transformed), "inserted": inserted, "updated": 0, "skipped": 0, "errors": 0},
"extra": {"deleted": deleted}
}
def _extract_daily_revenue(self, site_id: int, start_date: date, end_date: date) -> List[Dict[str, Any]]:
# 基础课skill_id
BASE_SKILL_ID = 2791903611396869
sql = """
SELECT
DATE(start_use_time) AS stat_date,
site_assistant_id AS assistant_id,
MAX(nickname) AS assistant_nickname,
COUNT(*) AS service_count,
SUM(income_seconds) / 3600.0 AS service_hours,
SUM(ledger_amount) AS revenue_total,
SUM(CASE WHEN skill_id = %s THEN ledger_amount ELSE 0 END) AS revenue_base,
SUM(CASE WHEN skill_id != %s THEN ledger_amount ELSE 0 END) AS revenue_bonus,
COUNT(DISTINCT tenant_member_id) AS unique_customers
FROM billiards_dwd.dwd_assistant_service_log
WHERE site_id = %s
AND DATE(start_use_time) >= %s
AND DATE(start_use_time) <= %s
GROUP BY DATE(start_use_time), site_assistant_id
"""
rows = self.db.query(sql, (BASE_SKILL_ID, BASE_SKILL_ID, site_id, start_date, end_date))
return [dict(row) for row in rows] if rows else []
def _extract_monthly_salary(self, site_id: int, start_date: date, end_date: date) -> List[Dict[str, Any]]:
# 获取涉及的月份
month_start = start_date.replace(day=1)
month_end = end_date.replace(day=1)
sql = """
SELECT
assistant_id,
salary_month,
gross_salary,
effective_hours
FROM billiards_dws.dws_assistant_salary_calc
WHERE site_id = %s
AND salary_month >= %s
AND salary_month <= %s
"""
rows = self.db.query(sql, (site_id, month_start, month_end))
# 获取每月工作天数
work_days_sql = """
SELECT
assistant_id,
DATE_TRUNC('month', stat_date)::DATE AS month,
COUNT(DISTINCT stat_date) AS work_days
FROM billiards_dws.dws_assistant_daily_detail
WHERE site_id = %s
AND stat_date >= %s
AND stat_date <= %s
GROUP BY assistant_id, DATE_TRUNC('month', stat_date)
"""
work_days_rows = self.db.query(work_days_sql, (site_id, start_date, end_date))
work_days_index = {(r['assistant_id'], r['month']): r['work_days'] for r in (work_days_rows or [])}
results = []
for row in (rows or []):
row_dict = dict(row)
asst_id = row_dict.get('assistant_id')
month = row_dict.get('salary_month')
row_dict['work_days'] = work_days_index.get((asst_id, month), 20)
results.append(row_dict)
return results
__all__ = ['AssistantFinanceTask']