313 lines
12 KiB
Python
313 lines
12 KiB
Python
# -*- coding: utf-8 -*-
|
|
"""Smoke test scripts for WBI/NCI/Intimacy index tasks."""
|
|
import logging
|
|
import os
|
|
import sys
|
|
from typing import Dict, List
|
|
|
|
ROOT = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
|
if ROOT not in sys.path:
|
|
sys.path.insert(0, ROOT)
|
|
|
|
from config.settings import AppConfig
|
|
from database.connection import DatabaseConnection
|
|
from database.operations import DatabaseOperations
|
|
from tasks.dws.index import IntimacyIndexTask, NewconvIndexTask, WinbackIndexTask
|
|
|
|
|
|
logging.basicConfig(
|
|
level=logging.INFO,
|
|
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
|
|
)
|
|
logger = logging.getLogger("test_index_tasks")
|
|
|
|
|
|
def _make_db() -> tuple[AppConfig, DatabaseConnection, DatabaseOperations]:
|
|
config = AppConfig.load()
|
|
db_conn = DatabaseConnection(config.config["db"]["dsn"])
|
|
db = DatabaseOperations(db_conn)
|
|
return config, db_conn, db
|
|
|
|
|
|
def _dict_rows(rows) -> List[Dict]:
|
|
return [dict(r) for r in (rows or [])]
|
|
|
|
|
|
def _fmt(value, digits: int = 2) -> str:
|
|
if value is None:
|
|
return "-"
|
|
if isinstance(value, (int, float)):
|
|
return f"{value:.{digits}f}"
|
|
return str(value)
|
|
|
|
|
|
def _check_required_tables() -> None:
|
|
_, db_conn, db = _make_db()
|
|
try:
|
|
sql = """
|
|
SELECT table_name
|
|
FROM information_schema.tables
|
|
WHERE table_schema = 'billiards_dws'
|
|
AND table_name IN (
|
|
'cfg_index_parameters',
|
|
'dws_member_winback_index',
|
|
'dws_member_newconv_index',
|
|
'dws_member_assistant_intimacy'
|
|
)
|
|
"""
|
|
rows = _dict_rows(db.query(sql))
|
|
existing = {r["table_name"] for r in rows}
|
|
required = {
|
|
"cfg_index_parameters",
|
|
"dws_member_winback_index",
|
|
"dws_member_newconv_index",
|
|
"dws_member_assistant_intimacy",
|
|
}
|
|
missing = sorted(required - existing)
|
|
if missing:
|
|
raise RuntimeError(f"Missing required tables: {', '.join(missing)}")
|
|
finally:
|
|
db_conn.close()
|
|
|
|
|
|
def test_winback_index() -> Dict:
|
|
logger.info("=" * 80)
|
|
logger.info("Run WBI task")
|
|
logger.info("=" * 80)
|
|
|
|
config, db_conn, db = _make_db()
|
|
try:
|
|
task = WinbackIndexTask(config, db, None, logger)
|
|
result = task.execute(None)
|
|
logger.info("WBI result: %s", result)
|
|
|
|
if result.get("status") == "success":
|
|
stats_sql = """
|
|
SELECT
|
|
COUNT(*) AS total_count,
|
|
ROUND(AVG(display_score)::numeric, 2) AS avg_display,
|
|
ROUND(MIN(display_score)::numeric, 2) AS min_display,
|
|
ROUND(MAX(display_score)::numeric, 2) AS max_display,
|
|
ROUND(AVG(raw_score)::numeric, 4) AS avg_raw,
|
|
ROUND(AVG(overdue_old)::numeric, 4) AS avg_overdue,
|
|
ROUND(AVG(drop_old)::numeric, 4) AS avg_drop,
|
|
ROUND(AVG(recharge_old)::numeric, 4) AS avg_recharge,
|
|
ROUND(AVG(value_old)::numeric, 4) AS avg_value,
|
|
ROUND(AVG(t_v)::numeric, 2) AS avg_t_v
|
|
FROM billiards_dws.dws_member_winback_index
|
|
"""
|
|
stats_rows = _dict_rows(db.query(stats_sql))
|
|
if stats_rows:
|
|
s = stats_rows[0]
|
|
logger.info(
|
|
"WBI stats | total=%s, display(avg/min/max)=%s/%s/%s, raw_avg=%s, overdue=%s, drop=%s, recharge=%s, value=%s, t_v=%s",
|
|
s.get("total_count"),
|
|
_fmt(s.get("avg_display")),
|
|
_fmt(s.get("min_display")),
|
|
_fmt(s.get("max_display")),
|
|
_fmt(s.get("avg_raw"), 4),
|
|
_fmt(s.get("avg_overdue"), 4),
|
|
_fmt(s.get("avg_drop"), 4),
|
|
_fmt(s.get("avg_recharge"), 4),
|
|
_fmt(s.get("avg_value"), 4),
|
|
_fmt(s.get("avg_t_v"), 2),
|
|
)
|
|
|
|
top_sql = """
|
|
SELECT member_id, display_score, raw_score, t_v, visits_14d, sv_balance
|
|
FROM billiards_dws.dws_member_winback_index
|
|
ORDER BY display_score DESC NULLS LAST
|
|
LIMIT 5
|
|
"""
|
|
for i, r in enumerate(_dict_rows(db.query(top_sql)), 1):
|
|
logger.info(
|
|
"WBI TOP%d | member=%s, display=%s, raw=%s, t_v=%s, visits_14d=%s, sv_balance=%s",
|
|
i,
|
|
r.get("member_id"),
|
|
_fmt(r.get("display_score")),
|
|
_fmt(r.get("raw_score"), 4),
|
|
_fmt(r.get("t_v"), 2),
|
|
_fmt(r.get("visits_14d"), 0),
|
|
_fmt(r.get("sv_balance"), 2),
|
|
)
|
|
|
|
return result
|
|
finally:
|
|
db_conn.close()
|
|
|
|
|
|
def test_newconv_index() -> Dict:
|
|
logger.info("=" * 80)
|
|
logger.info("Run NCI task")
|
|
logger.info("=" * 80)
|
|
|
|
config, db_conn, db = _make_db()
|
|
try:
|
|
task = NewconvIndexTask(config, db, None, logger)
|
|
result = task.execute(None)
|
|
logger.info("NCI result: %s", result)
|
|
|
|
if result.get("status") == "success":
|
|
stats_sql = """
|
|
SELECT
|
|
COUNT(*) AS total_count,
|
|
ROUND(AVG(display_score)::numeric, 2) AS avg_display,
|
|
ROUND(MIN(display_score)::numeric, 2) AS min_display,
|
|
ROUND(MAX(display_score)::numeric, 2) AS max_display,
|
|
ROUND(AVG(display_score_welcome)::numeric, 2) AS avg_display_welcome,
|
|
ROUND(AVG(display_score_convert)::numeric, 2) AS avg_display_convert,
|
|
ROUND(AVG(raw_score)::numeric, 4) AS avg_raw,
|
|
ROUND(AVG(raw_score_welcome)::numeric, 4) AS avg_raw_welcome,
|
|
ROUND(AVG(raw_score_convert)::numeric, 4) AS avg_raw_convert,
|
|
ROUND(AVG(need_new)::numeric, 4) AS avg_need,
|
|
ROUND(AVG(salvage_new)::numeric, 4) AS avg_salvage,
|
|
ROUND(AVG(recharge_new)::numeric, 4) AS avg_recharge,
|
|
ROUND(AVG(value_new)::numeric, 4) AS avg_value,
|
|
ROUND(AVG(welcome_new)::numeric, 4) AS avg_welcome,
|
|
ROUND(AVG(t_v)::numeric, 2) AS avg_t_v
|
|
FROM billiards_dws.dws_member_newconv_index
|
|
"""
|
|
stats_rows = _dict_rows(db.query(stats_sql))
|
|
if stats_rows:
|
|
s = stats_rows[0]
|
|
logger.info(
|
|
"NCI stats | total=%s, display(avg/min/max)=%s/%s/%s, display_welcome=%s, display_convert=%s, raw_avg=%s, raw_welcome=%s, raw_convert=%s",
|
|
s.get("total_count"),
|
|
_fmt(s.get("avg_display")),
|
|
_fmt(s.get("min_display")),
|
|
_fmt(s.get("max_display")),
|
|
_fmt(s.get("avg_display_welcome")),
|
|
_fmt(s.get("avg_display_convert")),
|
|
_fmt(s.get("avg_raw"), 4),
|
|
_fmt(s.get("avg_raw_welcome"), 4),
|
|
_fmt(s.get("avg_raw_convert"), 4),
|
|
)
|
|
logger.info(
|
|
"NCI components | need=%s, salvage=%s, recharge=%s, value=%s, welcome=%s, t_v=%s",
|
|
_fmt(s.get("avg_need"), 4),
|
|
_fmt(s.get("avg_salvage"), 4),
|
|
_fmt(s.get("avg_recharge"), 4),
|
|
_fmt(s.get("avg_value"), 4),
|
|
_fmt(s.get("avg_welcome"), 4),
|
|
_fmt(s.get("avg_t_v"), 2),
|
|
)
|
|
|
|
top_sql = """
|
|
SELECT member_id, display_score, display_score_welcome, display_score_convert,
|
|
raw_score, raw_score_welcome, raw_score_convert, t_v, visits_14d
|
|
FROM billiards_dws.dws_member_newconv_index
|
|
ORDER BY display_score DESC NULLS LAST
|
|
LIMIT 5
|
|
"""
|
|
for i, r in enumerate(_dict_rows(db.query(top_sql)), 1):
|
|
logger.info(
|
|
"NCI TOP%d | member=%s, nci=%s (welcome=%s, convert=%s), raw=%s (w=%s,c=%s), t_v=%s, visits_14d=%s",
|
|
i,
|
|
r.get("member_id"),
|
|
_fmt(r.get("display_score")),
|
|
_fmt(r.get("display_score_welcome")),
|
|
_fmt(r.get("display_score_convert")),
|
|
_fmt(r.get("raw_score"), 4),
|
|
_fmt(r.get("raw_score_welcome"), 4),
|
|
_fmt(r.get("raw_score_convert"), 4),
|
|
_fmt(r.get("t_v"), 2),
|
|
_fmt(r.get("visits_14d"), 0),
|
|
)
|
|
|
|
return result
|
|
finally:
|
|
db_conn.close()
|
|
|
|
|
|
def test_intimacy_index() -> Dict:
|
|
logger.info("=" * 80)
|
|
logger.info("Run Intimacy task")
|
|
logger.info("=" * 80)
|
|
|
|
config, db_conn, db = _make_db()
|
|
try:
|
|
task = IntimacyIndexTask(config, db, None, logger)
|
|
result = task.execute(None)
|
|
logger.info("Intimacy result: %s", result)
|
|
|
|
if result.get("status") == "success":
|
|
stats_sql = """
|
|
SELECT
|
|
COUNT(*) AS total_count,
|
|
COUNT(DISTINCT member_id) AS unique_members,
|
|
COUNT(DISTINCT assistant_id) AS unique_assistants,
|
|
ROUND(AVG(display_score)::numeric, 2) AS avg_display,
|
|
ROUND(MIN(display_score)::numeric, 2) AS min_display,
|
|
ROUND(MAX(display_score)::numeric, 2) AS max_display,
|
|
ROUND(AVG(raw_score)::numeric, 4) AS avg_raw,
|
|
ROUND(AVG(score_frequency)::numeric, 4) AS avg_frequency,
|
|
ROUND(AVG(score_recency)::numeric, 4) AS avg_recency,
|
|
ROUND(AVG(score_recharge)::numeric, 4) AS avg_recharge,
|
|
ROUND(AVG(score_duration)::numeric, 4) AS avg_duration,
|
|
ROUND(AVG(burst_multiplier)::numeric, 4) AS avg_burst
|
|
FROM billiards_dws.dws_member_assistant_intimacy
|
|
"""
|
|
stats_rows = _dict_rows(db.query(stats_sql))
|
|
if stats_rows:
|
|
s = stats_rows[0]
|
|
logger.info(
|
|
"Intimacy stats | total=%s, members=%s, assistants=%s, display(avg/min/max)=%s/%s/%s, raw_avg=%s",
|
|
s.get("total_count"),
|
|
s.get("unique_members"),
|
|
s.get("unique_assistants"),
|
|
_fmt(s.get("avg_display")),
|
|
_fmt(s.get("min_display")),
|
|
_fmt(s.get("max_display")),
|
|
_fmt(s.get("avg_raw"), 4),
|
|
)
|
|
logger.info(
|
|
"Intimacy components | freq=%s, recency=%s, recharge=%s, duration=%s, burst=%s",
|
|
_fmt(s.get("avg_frequency"), 4),
|
|
_fmt(s.get("avg_recency"), 4),
|
|
_fmt(s.get("avg_recharge"), 4),
|
|
_fmt(s.get("avg_duration"), 4),
|
|
_fmt(s.get("avg_burst"), 4),
|
|
)
|
|
|
|
top_sql = """
|
|
SELECT member_id, assistant_id, display_score, raw_score,
|
|
session_count, attributed_recharge_amount
|
|
FROM billiards_dws.dws_member_assistant_intimacy
|
|
ORDER BY display_score DESC NULLS LAST
|
|
LIMIT 5
|
|
"""
|
|
for i, r in enumerate(_dict_rows(db.query(top_sql)), 1):
|
|
logger.info(
|
|
"Intimacy TOP%d | member=%s assistant=%s display=%s raw=%s sessions=%s recharge=%s",
|
|
i,
|
|
r.get("member_id"),
|
|
r.get("assistant_id"),
|
|
_fmt(r.get("display_score")),
|
|
_fmt(r.get("raw_score"), 4),
|
|
_fmt(r.get("session_count"), 0),
|
|
_fmt(r.get("attributed_recharge_amount"), 2),
|
|
)
|
|
|
|
return result
|
|
finally:
|
|
db_conn.close()
|
|
|
|
|
|
def main() -> None:
|
|
_check_required_tables()
|
|
|
|
results = {
|
|
"WBI": test_winback_index(),
|
|
"NCI": test_newconv_index(),
|
|
"INTIMACY": test_intimacy_index(),
|
|
}
|
|
|
|
logger.info("=" * 80)
|
|
logger.info("Test complete")
|
|
logger.info("WBI=%s, NCI=%s, INTIMACY=%s", results["WBI"].get("status"), results["NCI"].get("status"), results["INTIMACY"].get("status"))
|
|
logger.info("=" * 80)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|