Files
ewoooc/tests/test_ai_automation_smoke_service.py
ogt 1e97aef3c4
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
feat(ai): add scheduled automation health summary
2026-07-02 14:37:23 +08:00

398 lines
15 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import json
def test_event_router_smoke_reports_queued_deliveries(tmp_path, monkeypatch):
from services import ai_automation_metrics as metrics
from services import ai_automation_smoke_service as smoke
from services import event_router
queue_path = tmp_path / "failed_deliveries.jsonl"
queue_path.write_text('{"event_key":"a"}\n{"event_key":"b"}\n', encoding="utf-8")
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
metrics.reset_for_tests()
metrics.record_event_router_dispatch(
tier="L1",
event_type="crawler_timeout",
delivered=False,
queued=True,
latency_ms=12,
)
result = smoke._event_router_check()
assert result["status"] == "warning"
assert result["details"]["queued_deliveries"] == 2
assert result["details"]["dispatch_metric_total"] == 1
assert result["details"]["dispatch_sync"] is True
def test_collect_ai_automation_smoke_uses_worst_status(monkeypatch):
from services import ai_automation_smoke_service as smoke
monkeypatch.setattr(smoke, "_event_router_check", lambda: smoke._check("event", "ok", "ok"))
monkeypatch.setattr(smoke, "_gemini_egress_check", lambda: smoke._check("gemini", "ok", "ok"))
monkeypatch.setattr(smoke, "_autoheal_check", lambda: smoke._check("autoheal", "warning", "warn"))
monkeypatch.setattr(smoke, "_nemotron_check", lambda: smoke._check("nemotron", "ok", "ok"))
monkeypatch.setattr(smoke, "_embedding_queue_check", lambda: smoke._check("embedding", "critical", "boom"))
monkeypatch.setattr(smoke, "_elephant_hitl_check", lambda: smoke._check("elephant", "ok", "ok"))
monkeypatch.setattr(smoke, "_pchome_controlled_apply_drift_monitor_check", lambda: smoke._check("pchome", "ok", "ok"))
result = smoke.collect_ai_automation_smoke(record_history=False)
assert result["status"] == "critical"
assert result["summary"] == {"ok": 5, "warning": 1, "critical": 1, "total": 7}
def test_pchome_controlled_apply_drift_monitor_reports_verified_zero_drift(monkeypatch):
from services import ai_automation_smoke_service as smoke
from services import pchome_mapping_backlog_service as backlog
class FakeEngine:
disposed = False
def dispose(self):
self.disposed = True
fake_engine = FakeEngine()
monkeypatch.setattr(smoke, "_create_pchome_drift_monitor_engine", lambda _path: fake_engine)
monkeypatch.setattr(
backlog,
"build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package",
lambda **_kwargs: {
"result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAYED",
"summary": {
"target_selector_count": 4,
"post_apply_readback_pass_count": 4,
"executor_receipt_hash_match_count": 1,
},
},
)
monkeypatch.setattr(
backlog,
"build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_drift_verifier_package",
lambda **_kwargs: {
"result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_DRIFT_VERIFIED",
"summary": {
"target_selector_count": 4,
"post_apply_readback_pass_count": 4,
"drift_count": 0,
"drift_verified_count": 1,
"drift_verifier_artifact_materialized_count": 1,
"drift_verifier_artifact_hash_match_count": 1,
"writes_database_count": 0,
},
},
)
result = smoke._pchome_controlled_apply_drift_monitor_check()
assert result["status"] == "ok"
assert result["details"]["selector_count"] == 4
assert result["details"]["drift_count"] == 0
assert result["details"]["writes_database"] is False
assert result["details"]["materialize_artifacts"] is False
assert fake_engine.disposed is True
def test_collect_ai_automation_smoke_persists_recent_history(tmp_path, monkeypatch):
from services import ai_automation_smoke_service as smoke
history_path = tmp_path / "smoke_history.jsonl"
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
monkeypatch.setattr(smoke, "_HISTORY_LIMIT", 2)
monkeypatch.setattr(smoke, "_event_router_check", lambda: smoke._check("event", "ok", "ok"))
monkeypatch.setattr(smoke, "_gemini_egress_check", lambda: smoke._check("gemini", "ok", "ok"))
monkeypatch.setattr(smoke, "_autoheal_check", lambda: smoke._check("autoheal", "ok", "ok"))
monkeypatch.setattr(smoke, "_nemotron_check", lambda: smoke._check("nemotron", "ok", "ok"))
monkeypatch.setattr(smoke, "_embedding_queue_check", lambda: smoke._check("embedding", "ok", "ok"))
monkeypatch.setattr(smoke, "_elephant_hitl_check", lambda: smoke._check("elephant", "ok", "ok"))
monkeypatch.setattr(smoke, "_pchome_controlled_apply_drift_monitor_check", lambda: smoke._check("pchome", "ok", "ok"))
first = smoke.collect_ai_automation_smoke(history_limit=5)
second = smoke.collect_ai_automation_smoke(history_limit=5)
third = smoke.collect_ai_automation_smoke(history_limit=5)
assert first["status"] == "ok"
assert second["history"]["counts"]["ok"] == 2
assert third["history"]["counts"]["ok"] == 2
assert len(history_path.read_text(encoding="utf-8").strip().splitlines()) == 2
def test_smoke_history_export_and_clear(tmp_path, monkeypatch):
from services import ai_automation_smoke_service as smoke
history_path = tmp_path / "smoke_history.jsonl"
history_path.write_text(
'{"generated_at":"2026-04-29T01:00:00","status":"ok"}\n'
'{"generated_at":"2026-04-29T02:00:00","status":"warning"}\n',
encoding="utf-8",
)
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
export = smoke.export_smoke_history_jsonl()
cleared = smoke.clear_smoke_history()
assert export["count"] == 2
assert '"status":"warning"' in export["content"]
assert cleared["cleared"] == 2
assert not history_path.exists()
def test_smoke_history_daily_summary():
from services import ai_automation_smoke_service as smoke
summary = smoke._history_summary([
{"generated_at": "2026-04-28T23:00:00", "status": "ok"},
{"generated_at": "2026-04-29T01:00:00", "status": "warning"},
{"generated_at": "2026-04-29T02:00:00", "status": "critical"},
])
assert summary["daily"] == [
{"date": "2026-04-28", "ok": 1, "warning": 0, "critical": 0, "total": 1},
{"date": "2026-04-29", "ok": 0, "warning": 1, "critical": 1, "total": 2},
]
def test_scheduled_automation_health_summary_reads_history_without_side_effects(tmp_path, monkeypatch):
from datetime import datetime
from services import ai_automation_smoke_service as smoke
history_path = tmp_path / "smoke_history.jsonl"
history_path.write_text(
json.dumps({
"generated_at": datetime.now().isoformat(timespec="seconds"),
"status": "ok",
"summary": {"ok": 7, "warning": 0, "critical": 0, "total": 7},
"checks": [
{
"name": "PChome 受控落地 drift monitor",
"status": "ok",
"summary": "PChome controlled apply drift 已驗證 4/4目前 0 drift",
"details": {
"result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_DRIFT_VERIFIED",
"source_receipt_replay_result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAYED",
"selector_count": 4,
"readback_pass_count": 4,
"drift_count": 0,
},
}
],
}, ensure_ascii=False) + "\n",
encoding="utf-8",
)
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
summary = smoke.build_scheduled_automation_health_summary(
history_limit=5,
freshness_max_age_hours=9999,
)
pchome_family = next(
item for item in summary["families"]
if item["key"] == "pchome_controlled_apply_drift_monitor"
)
assert summary["policy"] == "read_only_ai_automation_scheduled_health_summary"
assert summary["status"] == "ok"
assert summary["summary"]["total"] == 4
assert summary["summary"]["primary_human_gate_count"] == 0
assert summary["summary"]["writes_database_count"] == 0
assert pchome_family["status"] == "ok"
assert pchome_family["details"]["drift_count"] == 0
assert summary["scheduled_outputs"]["telegram_send_in_preview"] is False
assert summary["scheduled_outputs"]["writes_database"] is False
assert summary["automation_policy"]["primary_flow"] == "ai_controlled"
assert summary["safety"]["writes_database"] is False
def test_scheduled_automation_health_summary_can_use_current_smoke_without_recording_history(tmp_path, monkeypatch):
from services import ai_automation_smoke_service as smoke
history_path = tmp_path / "smoke_history.jsonl"
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
current_smoke = {
"generated_at": "2026-07-02T12:00:00",
"status": "critical",
"summary": {"ok": 6, "warning": 0, "critical": 1, "total": 7},
"checks": [
{
"name": "PChome 受控落地 drift monitor",
"status": "critical",
"summary": "PChome controlled apply 偵測到 1 筆 drift",
"details": {"drift_count": 1, "selector_count": 4, "readback_pass_count": 3},
}
],
}
summary = smoke.build_scheduled_automation_health_summary(
current_smoke=current_smoke,
freshness_max_age_hours=9999,
)
assert summary["status"] == "critical"
assert "run_pchome_drift_verifier_and_recovery_package" in summary["next_machine_actions"]
assert summary["safety"]["include_current_smoke"] is True
assert not history_path.exists()
def test_scheduled_automation_health_summary_route_returns_compact_payload(tmp_path, monkeypatch):
from datetime import datetime
from flask import Flask
from routes import system_public_routes as routes
from services import ai_automation_smoke_service as smoke
history_path = tmp_path / "smoke_history.jsonl"
history_path.write_text(
json.dumps({
"generated_at": datetime.now().isoformat(timespec="seconds"),
"status": "ok",
"summary": {"ok": 7, "warning": 0, "critical": 0, "total": 7},
"checks": [],
}, ensure_ascii=False) + "\n",
encoding="utf-8",
)
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
app = Flask(__name__)
with app.test_request_context(
"/api/ai-automation/scheduled-health-summary?history_limit=5&freshness_max_age_hours=9999"
):
response = routes.ai_automation_scheduled_health_summary_api.__wrapped__()
payload = response.get_json()
assert payload["policy"] == "read_only_ai_automation_scheduled_health_summary"
assert payload["scheduled_outputs"]["health_summary_endpoint"] == (
"/api/ai-automation/scheduled-health-summary"
)
assert payload["safety"]["writes_database"] is False
def test_gemini_egress_check_ok_when_no_calls(monkeypatch):
from services import ai_automation_smoke_service as smoke
class FakeResult:
def fetchone(self):
return {"calls": 0, "tokens": 0, "cost_usd": 0, "last_called": None}
def fetchall(self):
return []
class FakeSession:
closed = False
def execute(self, *_args, **_kwargs):
return FakeResult()
def close(self):
self.closed = True
fake_session = FakeSession()
monkeypatch.setattr(smoke, "get_session", lambda: fake_session)
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
result = smoke._gemini_egress_check()
assert result["status"] == "ok"
assert result["details"]["calls"] == 0
assert result["details"]["hard_disabled"] is True
assert fake_session.closed is True
def test_gemini_egress_check_critical_when_hard_disabled_has_calls(monkeypatch):
from services import ai_automation_smoke_service as smoke
class FakeSummaryResult:
def fetchone(self):
return {
"calls": 2,
"tokens": 1234,
"cost_usd": 0.012345,
"last_called": "2026-05-21 07:00:00+00",
}
class FakeTopResult:
def fetchall(self):
return [{
"caller": "code_review_openclaw_gemini",
"model": "gemini-2.5-flash",
"calls": 2,
"tokens": 1234,
"cost_usd": 0.012345,
"last_called": "2026-05-21 07:00:00+00",
}]
class FakeSession:
def __init__(self):
self.calls = 0
def execute(self, *_args, **_kwargs):
self.calls += 1
return FakeSummaryResult() if self.calls == 1 else FakeTopResult()
def close(self):
pass
monkeypatch.setattr(smoke, "get_session", lambda: FakeSession())
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
result = smoke._gemini_egress_check()
assert result["status"] == "critical"
assert "Hard-disabled" in result["summary"]
assert result["details"]["calls"] == 2
assert result["details"]["top_callers"][0]["caller"] == "code_review_openclaw_gemini"
def test_nemotron_smoke_detects_current_dispatcher_fallback(monkeypatch):
from services import ai_automation_smoke_service as smoke
import services.nemoton_dispatcher_service as nemotron
monkeypatch.delattr(nemotron, "NemotronDispatcherService", raising=False)
monkeypatch.setattr(nemotron, "NIM_API_KEY", "")
result = smoke._nemotron_check()
assert result["status"] == "warning"
assert result["details"]["fallback_ready"] is True
assert result["details"]["dispatcher_class"] == "NemotronDispatcher"
def test_build_smoke_daily_summary_message_escapes_history(tmp_path, monkeypatch):
from services import ai_automation_smoke_service as smoke
history_path = tmp_path / "smoke_history.jsonl"
history_path.write_text(
'{"generated_at":"2026-04-29T01:00:00","status":"warning",'
'"summary":{"ok":0,"warning":1,"critical":0,"total":1},'
'"checks":[{"name":"<script>","status":"warning","summary":"bad"}]}\n',
encoding="utf-8",
)
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
message = smoke.build_smoke_daily_summary_message()
assert "AI 自動化 Smoke 每日摘要" in message
assert "WARNING" in message
assert "最近異常檢查" in message
assert "<script>" not in message
def test_send_smoke_daily_summary_uses_telegram_result(monkeypatch):
from services import ai_automation_smoke_service as smoke
monkeypatch.setattr(smoke, "build_smoke_daily_summary_message", lambda: "hello")
monkeypatch.setattr(
"services.telegram_templates.send_telegram_with_result",
lambda message, chat_ids=None: {
"ok": True,
"sent": 1,
"failed": 0,
"chat_ids": chat_ids or [123],
"errors": [],
},
)
result = smoke.send_smoke_daily_summary(chat_ids=[999])
assert result["status"] == "sent"
assert result["telegram"]["chat_ids"] == [999]