From 1e97aef3c44c5f6cb6010e21be332a45a26c801f Mon Sep 17 00:00:00 2001 From: ogt Date: Thu, 2 Jul 2026 14:37:23 +0800 Subject: [PATCH] feat(ai): add scheduled automation health summary --- docs/AI_INTELLIGENCE_MODULE_SOT.md | 1 + .../pchome_ai_automation_priority_backlog.md | 1 + routes/system_public_routes.py | 18 ++ services/ai_automation_smoke_service.py | 212 ++++++++++++++++++ tests/test_ai_automation_smoke_service.py | 116 ++++++++++ 5 files changed, 348 insertions(+) diff --git a/docs/AI_INTELLIGENCE_MODULE_SOT.md b/docs/AI_INTELLIGENCE_MODULE_SOT.md index 7ca555e..1fd0bcb 100644 --- a/docs/AI_INTELLIGENCE_MODULE_SOT.md +++ b/docs/AI_INTELLIGENCE_MODULE_SOT.md @@ -92,6 +92,7 @@ - 2026-07-02 起 PChome dashboard 第一視窗可見文案必須使用繁中營運語言;`retention`、`Compact`、`Artifact`、`DB writes`、`prune` 等工程詞只能留在 API / artifact / 測試證據層,不得出現在第一視窗。產品面用「異動」、「最新回讀」、「證據留存」、「資料寫入」、「清理建議」呈現同一批 AI 自動化 truth。 - 2026-07-02 起 PChome AI automation dashboard 必須符合外部 benchmark guardrails:參考 Grafana / Datadog / New Relic / Atlassian Statuspage 的狀態分層、下一步優先、證據按需與 golden signals 做法;第一視窗必須輸出「已自動落地、已驗證、異動狀態、下一步」,且 `tests/test_pchome_dashboard_benchmark_guardrails.py` 必須鎖住這些要求。 - 2026-07-02 起 PChome safe mapping lane expansion 必須先從 direct mapping candidate decision lane 開始;`/api/ai/pchome-growth/mapping-backlog/direct-mapping-candidate-decision-lane-closeout-package` 會把 candidate decision package 收斂成 lane receipt、receipt replay、drift verifier 與 product readiness,輸出 `primary_human_gate_count=0`、`drift_count`、`next_machine_action` 與 hash evidence。此 endpoint 預設不執行搜尋、不開 DB、不寫 DB、不持久化候選,只在 `execute_search=1` 時走 controlled read-only candidate search。 +- 2026-07-02 起 AI automation scheduled health summary 必須提供 machine-readable endpoint;`/api/ai-automation/scheduled-health-summary` 會只讀 smoke history,並可選擇 `include_current_smoke=1` 執行不寫 history 的 current smoke,收斂 AI smoke、PChome drift monitor、history freshness、daily summary delivery readiness 四個 family,輸出 `primary_human_gate_count=0`、`writes_database_count=0`、`next_machine_actions` 與 scheduled output endpoints。此 endpoint 不寄 Telegram、不寫 DB、不改排程,只提供排程/監控可消費的健康摘要。 - V10.644 起 `/ai_intelligence` 的商品明細列不得只用句子描述比價;每列必須顯示 PChome 價格、MOMO 參考價、差距、可信度四格價格證據,並保留下一步按鈕。單位價候選需顯示單位價與單位,候選待確認或缺資料則以「待補 / 候選待確認」呈現,不得捏造價格。 - V10.645 起 `/ai_intelligence` 的商品明細分流切換後,必須顯示「這類商品怎麼處理」的行動摘要,包含件數、近 7 天業績、平均可信度、最大價差、代表商品與主按鈕;使用者不得只能看到商品列表而不知道下一步。 - V10.646 起 `/ai_intelligence` 的商品明細必須提供搜尋與排序;搜尋至少涵蓋商品、分類、商品編號與 MOMO 候選資訊,排序至少支援優先級、近 7 天業績、價差、下滑幅度與可信度。搜尋/排序後的行動摘要與明細列表必須使用同一批結果。 diff --git a/docs/guides/pchome_ai_automation_priority_backlog.md b/docs/guides/pchome_ai_automation_priority_backlog.md index d3b677b..4784bd9 100644 --- a/docs/guides/pchome_ai_automation_priority_backlog.md +++ b/docs/guides/pchome_ai_automation_priority_backlog.md @@ -221,6 +221,7 @@ | P1.2 | UI wording guard for no raw engineering terms | 已完成 | focused wording guard test | P2.1 benchmark guardrails | | P2.1 | External benchmark encoded into requirements | 已完成 | benchmark guide + focused guard test + first-viewport status | P3.1 safe lane expansion | | P3.1 | Extend receipt / replay / drift pattern to more lanes | 已完成 | direct mapping candidate decision lane closeout route + focused tests | P3.2 scheduled automation health summaries | +| P3.2 | Scheduled automation health summaries | 已完成 | `/api/ai-automation/scheduled-health-summary` + smoke service focused tests | P3.3 rollback evidence packages | ## 後續回報格式 diff --git a/routes/system_public_routes.py b/routes/system_public_routes.py index 7608d1d..b1edea3 100644 --- a/routes/system_public_routes.py +++ b/routes/system_public_routes.py @@ -511,6 +511,24 @@ def ai_automation_smoke_api(): return jsonify(collect_ai_automation_smoke()) +@system_public_bp.route('/api/ai-automation/scheduled-health-summary') +@login_required +def ai_automation_scheduled_health_summary_api(): + """Read-only scheduled health summary for AI automation checks.""" + from services.ai_automation_smoke_service import build_scheduled_automation_health_summary + + include_current = str(request.args.get('include_current_smoke') or '').strip().lower() in {'1', 'true', 'yes'} + history_limit = request.args.get('history_limit', 200, type=int) + freshness_hours = request.args.get('freshness_max_age_hours', 26, type=int) + return jsonify( + build_scheduled_automation_health_summary( + history_limit=max(1, min(history_limit or 200, 500)), + include_current_smoke=include_current, + freshness_max_age_hours=max(1, min(freshness_hours or 26, 168)), + ) + ) + + @system_public_bp.route('/api/ai-automation/smoke/history/export') @login_required def ai_automation_smoke_history_export(): diff --git a/services/ai_automation_smoke_service.py b/services/ai_automation_smoke_service.py index 53d5603..ea16355 100644 --- a/services/ai_automation_smoke_service.py +++ b/services/ai_automation_smoke_service.py @@ -20,6 +20,7 @@ from database.manager import get_session STATUS_RANK = {"ok": 0, "warning": 1, "critical": 2} +SCHEDULED_AUTOMATION_HEALTH_SUMMARY_POLICY = "read_only_ai_automation_scheduled_health_summary" _HISTORY_PATH = os.getenv( "MOMO_AI_AUTOMATION_SMOKE_HISTORY", os.path.join(os.path.dirname(os.path.dirname(__file__)), "data", "ai_automation_smoke_history.jsonl"), @@ -120,6 +121,217 @@ def _history_summary(records: List[Dict[str, Any]]) -> Dict[str, Any]: } +def _parse_history_timestamp(value: Any) -> datetime | None: + if not value: + return None + text_value = str(value).strip() + if not text_value: + return None + try: + return datetime.fromisoformat(text_value.replace("Z", "+00:00")) + except ValueError: + return None + + +def _history_freshness_family( + latest: Dict[str, Any] | None, + *, + max_age_hours: int, +) -> Dict[str, Any]: + if not latest: + return { + "key": "smoke_history_freshness", + "label": "Smoke history freshness", + "status": "warning", + "summary": "No scheduled smoke history is available yet.", + "next_machine_action": "run_ai_automation_smoke_once", + "details": {"history_record_count": 0, "writes_database": False}, + } + generated_at = _parse_history_timestamp(latest.get("generated_at")) + if generated_at is None: + return { + "key": "smoke_history_freshness", + "label": "Smoke history freshness", + "status": "warning", + "summary": "Latest scheduled smoke history timestamp cannot be parsed.", + "next_machine_action": "refresh_smoke_history_timestamp", + "details": { + "generated_at": latest.get("generated_at"), + "writes_database": False, + }, + } + + now = datetime.now(generated_at.tzinfo) if generated_at.tzinfo else datetime.now() + age_hours = max((now - generated_at).total_seconds() / 3600, 0) + fresh = age_hours <= max_age_hours + return { + "key": "smoke_history_freshness", + "label": "Smoke history freshness", + "status": "ok" if fresh else "warning", + "summary": ( + f"Latest smoke history is {age_hours:.1f}h old." + if fresh + else f"Latest smoke history is stale at {age_hours:.1f}h old." + ), + "next_machine_action": "keep_scheduled_smoke_cadence" if fresh else "run_ai_automation_smoke_once", + "details": { + "generated_at": latest.get("generated_at"), + "age_hours": round(age_hours, 2), + "max_age_hours": max_age_hours, + "writes_database": False, + }, + } + + +def _find_check(result: Dict[str, Any], name: str) -> Dict[str, Any]: + for item in result.get("checks", []) or []: + if item.get("name") == name: + return item + return {} + + +def build_scheduled_automation_health_summary( + *, + history_limit: int = 200, + include_current_smoke: bool = False, + current_smoke: Dict[str, Any] | None = None, + freshness_max_age_hours: int = 26, +) -> Dict[str, Any]: + """Build a machine-readable scheduled health summary without sending notifications.""" + records = _load_history(limit=history_limit) + history = _history_summary(records) + latest_history = history.get("latest") or {} + current = current_smoke + if current is None and include_current_smoke: + current = collect_ai_automation_smoke(record_history=False, history_limit=history_limit) + source_result = current or latest_history or {} + source_kind = "current_smoke" if current else "latest_history" + pchome_drift = _find_check(source_result, "PChome 受控落地 drift monitor") + pchome_details = pchome_drift.get("details") or {} + smoke_status = source_result.get("status") or ("warning" if latest_history else "warning") + freshness_family = _history_freshness_family( + latest_history, + max_age_hours=max(1, int(freshness_max_age_hours or 26)), + ) + daily_rows = history.get("daily") or [] + history_count = len(records) + families = [ + { + "key": "ai_automation_smoke", + "label": "AI automation smoke", + "status": smoke_status, + "summary": ( + f"Smoke source={source_kind}; " + f"checks={(source_result.get('summary') or {}).get('total', 0)}" + ), + "next_machine_action": ( + "keep_monitoring" + if smoke_status == "ok" + else "inspect_warning_or_critical_checks" + ), + "details": { + "source_kind": source_kind, + "generated_at": source_result.get("generated_at"), + "summary": source_result.get("summary") or {}, + "writes_database": False, + }, + }, + { + "key": "pchome_controlled_apply_drift_monitor", + "label": "PChome controlled apply drift monitor", + "status": pchome_drift.get("status") or "warning", + "summary": pchome_drift.get("summary") or "PChome drift monitor has no latest check.", + "next_machine_action": ( + "keep_monitoring_drift" + if (pchome_drift.get("status") == "ok" and int(pchome_details.get("drift_count") or 0) == 0) + else "run_pchome_drift_verifier_and_recovery_package" + ), + "details": { + "result": pchome_details.get("result"), + "source_receipt_replay_result": pchome_details.get("source_receipt_replay_result"), + "selector_count": int(pchome_details.get("selector_count") or 0), + "readback_pass_count": int(pchome_details.get("readback_pass_count") or 0), + "drift_count": int(pchome_details.get("drift_count") or 0), + "writes_database": False, + }, + }, + freshness_family, + { + "key": "daily_summary_delivery", + "label": "Daily summary delivery readiness", + "status": "ok" if history_count else "warning", + "summary": ( + f"{len(daily_rows)} daily summary buckets available." + if history_count + else "No history is available for a daily summary yet." + ), + "next_machine_action": ( + "send_daily_summary_from_history" + if history_count + else "collect_smoke_before_daily_summary" + ), + "details": { + "history_record_count": history_count, + "daily_bucket_count": len(daily_rows), + "send_endpoint": "/api/ai-automation/smoke/daily-summary/send", + "writes_database": False, + }, + }, + ] + worst = max(families, key=lambda item: STATUS_RANK.get(item["status"], 2))["status"] + summary = { + "ok": sum(1 for item in families if item["status"] == "ok"), + "warning": sum(1 for item in families if item["status"] == "warning"), + "critical": sum(1 for item in families if item["status"] == "critical"), + "total": len(families), + "history_record_count": history_count, + "primary_human_gate_count": 0, + "writes_database_count": 0, + } + problem_families = [ + item for item in families if item.get("status") in {"warning", "critical"} + ] + return { + "policy": SCHEDULED_AUTOMATION_HEALTH_SUMMARY_POLICY, + "status": worst, + "version": SYSTEM_VERSION, + "generated_at": datetime.now().isoformat(timespec="seconds"), + "summary": summary, + "families": families, + "history": { + "counts": history.get("counts") or {}, + "daily": daily_rows, + "latest": latest_history, + }, + "scheduled_outputs": { + "health_summary_endpoint": "/api/ai-automation/scheduled-health-summary", + "smoke_endpoint": "/api/ai-automation/smoke", + "daily_summary_send_endpoint": "/api/ai-automation/smoke/daily-summary/send", + "telegram_send_in_preview": False, + "writes_database": False, + }, + "automation_policy": { + "primary_flow": "ai_controlled", + "manual_review_mode": "exception_only", + "primary_human_gate_count": 0, + "machine_verifiable_evidence": True, + }, + "next_machine_actions": [ + item.get("next_machine_action") + for item in (problem_families or families[:1]) + if item.get("next_machine_action") + ], + "safety": { + "read_only_summary": True, + "include_current_smoke": bool(include_current_smoke or current_smoke), + "sends_telegram": False, + "writes_database": False, + "writes_database_count": 0, + "requires_production_version_truth": True, + }, + } + + def _daily_summary(records: List[Dict[str, Any]]) -> List[Dict[str, Any]]: by_day: Dict[str, Dict[str, int]] = {} for record in records: diff --git a/tests/test_ai_automation_smoke_service.py b/tests/test_ai_automation_smoke_service.py index 9c93695..8d29178 100644 --- a/tests/test_ai_automation_smoke_service.py +++ b/tests/test_ai_automation_smoke_service.py @@ -1,3 +1,6 @@ +import json + + def test_event_router_smoke_reports_queued_deliveries(tmp_path, monkeypatch): from services import ai_automation_metrics as metrics from services import ai_automation_smoke_service as smoke @@ -150,6 +153,119 @@ def test_smoke_history_daily_summary(): ] +def test_scheduled_automation_health_summary_reads_history_without_side_effects(tmp_path, monkeypatch): + from datetime import datetime + from services import ai_automation_smoke_service as smoke + + history_path = tmp_path / "smoke_history.jsonl" + history_path.write_text( + json.dumps({ + "generated_at": datetime.now().isoformat(timespec="seconds"), + "status": "ok", + "summary": {"ok": 7, "warning": 0, "critical": 0, "total": 7}, + "checks": [ + { + "name": "PChome 受控落地 drift monitor", + "status": "ok", + "summary": "PChome controlled apply drift 已驗證 4/4,目前 0 drift", + "details": { + "result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_DRIFT_VERIFIED", + "source_receipt_replay_result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAYED", + "selector_count": 4, + "readback_pass_count": 4, + "drift_count": 0, + }, + } + ], + }, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path)) + + summary = smoke.build_scheduled_automation_health_summary( + history_limit=5, + freshness_max_age_hours=9999, + ) + + pchome_family = next( + item for item in summary["families"] + if item["key"] == "pchome_controlled_apply_drift_monitor" + ) + assert summary["policy"] == "read_only_ai_automation_scheduled_health_summary" + assert summary["status"] == "ok" + assert summary["summary"]["total"] == 4 + assert summary["summary"]["primary_human_gate_count"] == 0 + assert summary["summary"]["writes_database_count"] == 0 + assert pchome_family["status"] == "ok" + assert pchome_family["details"]["drift_count"] == 0 + assert summary["scheduled_outputs"]["telegram_send_in_preview"] is False + assert summary["scheduled_outputs"]["writes_database"] is False + assert summary["automation_policy"]["primary_flow"] == "ai_controlled" + assert summary["safety"]["writes_database"] is False + + +def test_scheduled_automation_health_summary_can_use_current_smoke_without_recording_history(tmp_path, monkeypatch): + from services import ai_automation_smoke_service as smoke + + history_path = tmp_path / "smoke_history.jsonl" + monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path)) + current_smoke = { + "generated_at": "2026-07-02T12:00:00", + "status": "critical", + "summary": {"ok": 6, "warning": 0, "critical": 1, "total": 7}, + "checks": [ + { + "name": "PChome 受控落地 drift monitor", + "status": "critical", + "summary": "PChome controlled apply 偵測到 1 筆 drift", + "details": {"drift_count": 1, "selector_count": 4, "readback_pass_count": 3}, + } + ], + } + + summary = smoke.build_scheduled_automation_health_summary( + current_smoke=current_smoke, + freshness_max_age_hours=9999, + ) + + assert summary["status"] == "critical" + assert "run_pchome_drift_verifier_and_recovery_package" in summary["next_machine_actions"] + assert summary["safety"]["include_current_smoke"] is True + assert not history_path.exists() + + +def test_scheduled_automation_health_summary_route_returns_compact_payload(tmp_path, monkeypatch): + from datetime import datetime + from flask import Flask + from routes import system_public_routes as routes + from services import ai_automation_smoke_service as smoke + + history_path = tmp_path / "smoke_history.jsonl" + history_path.write_text( + json.dumps({ + "generated_at": datetime.now().isoformat(timespec="seconds"), + "status": "ok", + "summary": {"ok": 7, "warning": 0, "critical": 0, "total": 7}, + "checks": [], + }, ensure_ascii=False) + "\n", + encoding="utf-8", + ) + monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path)) + + app = Flask(__name__) + with app.test_request_context( + "/api/ai-automation/scheduled-health-summary?history_limit=5&freshness_max_age_hours=9999" + ): + response = routes.ai_automation_scheduled_health_summary_api.__wrapped__() + + payload = response.get_json() + assert payload["policy"] == "read_only_ai_automation_scheduled_health_summary" + assert payload["scheduled_outputs"]["health_summary_endpoint"] == ( + "/api/ai-automation/scheduled-health-summary" + ) + assert payload["safety"]["writes_database"] is False + + def test_gemini_egress_check_ok_when_no_calls(monkeypatch): from services import ai_automation_smoke_service as smoke