398 lines
15 KiB
Python
398 lines
15 KiB
Python
import json
|
||
|
||
|
||
def test_event_router_smoke_reports_queued_deliveries(tmp_path, monkeypatch):
|
||
from services import ai_automation_metrics as metrics
|
||
from services import ai_automation_smoke_service as smoke
|
||
from services import event_router
|
||
|
||
queue_path = tmp_path / "failed_deliveries.jsonl"
|
||
queue_path.write_text('{"event_key":"a"}\n{"event_key":"b"}\n', encoding="utf-8")
|
||
monkeypatch.setattr(event_router, "_QUEUE_PATH", str(queue_path))
|
||
metrics.reset_for_tests()
|
||
metrics.record_event_router_dispatch(
|
||
tier="L1",
|
||
event_type="crawler_timeout",
|
||
delivered=False,
|
||
queued=True,
|
||
latency_ms=12,
|
||
)
|
||
|
||
result = smoke._event_router_check()
|
||
|
||
assert result["status"] == "warning"
|
||
assert result["details"]["queued_deliveries"] == 2
|
||
assert result["details"]["dispatch_metric_total"] == 1
|
||
assert result["details"]["dispatch_sync"] is True
|
||
|
||
|
||
def test_collect_ai_automation_smoke_uses_worst_status(monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
monkeypatch.setattr(smoke, "_event_router_check", lambda: smoke._check("event", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_gemini_egress_check", lambda: smoke._check("gemini", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_autoheal_check", lambda: smoke._check("autoheal", "warning", "warn"))
|
||
monkeypatch.setattr(smoke, "_nemotron_check", lambda: smoke._check("nemotron", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_embedding_queue_check", lambda: smoke._check("embedding", "critical", "boom"))
|
||
monkeypatch.setattr(smoke, "_elephant_hitl_check", lambda: smoke._check("elephant", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_pchome_controlled_apply_drift_monitor_check", lambda: smoke._check("pchome", "ok", "ok"))
|
||
|
||
result = smoke.collect_ai_automation_smoke(record_history=False)
|
||
|
||
assert result["status"] == "critical"
|
||
assert result["summary"] == {"ok": 5, "warning": 1, "critical": 1, "total": 7}
|
||
|
||
|
||
def test_pchome_controlled_apply_drift_monitor_reports_verified_zero_drift(monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
from services import pchome_mapping_backlog_service as backlog
|
||
|
||
class FakeEngine:
|
||
disposed = False
|
||
|
||
def dispose(self):
|
||
self.disposed = True
|
||
|
||
fake_engine = FakeEngine()
|
||
monkeypatch.setattr(smoke, "_create_pchome_drift_monitor_engine", lambda _path: fake_engine)
|
||
monkeypatch.setattr(
|
||
backlog,
|
||
"build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_receipt_replay_package",
|
||
lambda **_kwargs: {
|
||
"result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAYED",
|
||
"summary": {
|
||
"target_selector_count": 4,
|
||
"post_apply_readback_pass_count": 4,
|
||
"executor_receipt_hash_match_count": 1,
|
||
},
|
||
},
|
||
)
|
||
monkeypatch.setattr(
|
||
backlog,
|
||
"build_pchome_direct_mapping_retry_candidate_exception_controlled_apply_drift_verifier_package",
|
||
lambda **_kwargs: {
|
||
"result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_DRIFT_VERIFIED",
|
||
"summary": {
|
||
"target_selector_count": 4,
|
||
"post_apply_readback_pass_count": 4,
|
||
"drift_count": 0,
|
||
"drift_verified_count": 1,
|
||
"drift_verifier_artifact_materialized_count": 1,
|
||
"drift_verifier_artifact_hash_match_count": 1,
|
||
"writes_database_count": 0,
|
||
},
|
||
},
|
||
)
|
||
|
||
result = smoke._pchome_controlled_apply_drift_monitor_check()
|
||
|
||
assert result["status"] == "ok"
|
||
assert result["details"]["selector_count"] == 4
|
||
assert result["details"]["drift_count"] == 0
|
||
assert result["details"]["writes_database"] is False
|
||
assert result["details"]["materialize_artifacts"] is False
|
||
assert fake_engine.disposed is True
|
||
|
||
|
||
def test_collect_ai_automation_smoke_persists_recent_history(tmp_path, monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
history_path = tmp_path / "smoke_history.jsonl"
|
||
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
|
||
monkeypatch.setattr(smoke, "_HISTORY_LIMIT", 2)
|
||
monkeypatch.setattr(smoke, "_event_router_check", lambda: smoke._check("event", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_gemini_egress_check", lambda: smoke._check("gemini", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_autoheal_check", lambda: smoke._check("autoheal", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_nemotron_check", lambda: smoke._check("nemotron", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_embedding_queue_check", lambda: smoke._check("embedding", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_elephant_hitl_check", lambda: smoke._check("elephant", "ok", "ok"))
|
||
monkeypatch.setattr(smoke, "_pchome_controlled_apply_drift_monitor_check", lambda: smoke._check("pchome", "ok", "ok"))
|
||
|
||
first = smoke.collect_ai_automation_smoke(history_limit=5)
|
||
second = smoke.collect_ai_automation_smoke(history_limit=5)
|
||
third = smoke.collect_ai_automation_smoke(history_limit=5)
|
||
|
||
assert first["status"] == "ok"
|
||
assert second["history"]["counts"]["ok"] == 2
|
||
assert third["history"]["counts"]["ok"] == 2
|
||
assert len(history_path.read_text(encoding="utf-8").strip().splitlines()) == 2
|
||
|
||
|
||
def test_smoke_history_export_and_clear(tmp_path, monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
history_path = tmp_path / "smoke_history.jsonl"
|
||
history_path.write_text(
|
||
'{"generated_at":"2026-04-29T01:00:00","status":"ok"}\n'
|
||
'{"generated_at":"2026-04-29T02:00:00","status":"warning"}\n',
|
||
encoding="utf-8",
|
||
)
|
||
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
|
||
|
||
export = smoke.export_smoke_history_jsonl()
|
||
cleared = smoke.clear_smoke_history()
|
||
|
||
assert export["count"] == 2
|
||
assert '"status":"warning"' in export["content"]
|
||
assert cleared["cleared"] == 2
|
||
assert not history_path.exists()
|
||
|
||
|
||
def test_smoke_history_daily_summary():
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
summary = smoke._history_summary([
|
||
{"generated_at": "2026-04-28T23:00:00", "status": "ok"},
|
||
{"generated_at": "2026-04-29T01:00:00", "status": "warning"},
|
||
{"generated_at": "2026-04-29T02:00:00", "status": "critical"},
|
||
])
|
||
|
||
assert summary["daily"] == [
|
||
{"date": "2026-04-28", "ok": 1, "warning": 0, "critical": 0, "total": 1},
|
||
{"date": "2026-04-29", "ok": 0, "warning": 1, "critical": 1, "total": 2},
|
||
]
|
||
|
||
|
||
def test_scheduled_automation_health_summary_reads_history_without_side_effects(tmp_path, monkeypatch):
|
||
from datetime import datetime
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
history_path = tmp_path / "smoke_history.jsonl"
|
||
history_path.write_text(
|
||
json.dumps({
|
||
"generated_at": datetime.now().isoformat(timespec="seconds"),
|
||
"status": "ok",
|
||
"summary": {"ok": 7, "warning": 0, "critical": 0, "total": 7},
|
||
"checks": [
|
||
{
|
||
"name": "PChome 受控落地 drift monitor",
|
||
"status": "ok",
|
||
"summary": "PChome controlled apply drift 已驗證 4/4,目前 0 drift",
|
||
"details": {
|
||
"result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_DRIFT_VERIFIED",
|
||
"source_receipt_replay_result": "DIRECT_MAPPING_RETRY_EXCEPTION_CONTROLLED_APPLY_RECEIPT_REPLAYED",
|
||
"selector_count": 4,
|
||
"readback_pass_count": 4,
|
||
"drift_count": 0,
|
||
},
|
||
}
|
||
],
|
||
}, ensure_ascii=False) + "\n",
|
||
encoding="utf-8",
|
||
)
|
||
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
|
||
|
||
summary = smoke.build_scheduled_automation_health_summary(
|
||
history_limit=5,
|
||
freshness_max_age_hours=9999,
|
||
)
|
||
|
||
pchome_family = next(
|
||
item for item in summary["families"]
|
||
if item["key"] == "pchome_controlled_apply_drift_monitor"
|
||
)
|
||
assert summary["policy"] == "read_only_ai_automation_scheduled_health_summary"
|
||
assert summary["status"] == "ok"
|
||
assert summary["summary"]["total"] == 4
|
||
assert summary["summary"]["primary_human_gate_count"] == 0
|
||
assert summary["summary"]["writes_database_count"] == 0
|
||
assert pchome_family["status"] == "ok"
|
||
assert pchome_family["details"]["drift_count"] == 0
|
||
assert summary["scheduled_outputs"]["telegram_send_in_preview"] is False
|
||
assert summary["scheduled_outputs"]["writes_database"] is False
|
||
assert summary["automation_policy"]["primary_flow"] == "ai_controlled"
|
||
assert summary["safety"]["writes_database"] is False
|
||
|
||
|
||
def test_scheduled_automation_health_summary_can_use_current_smoke_without_recording_history(tmp_path, monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
history_path = tmp_path / "smoke_history.jsonl"
|
||
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
|
||
current_smoke = {
|
||
"generated_at": "2026-07-02T12:00:00",
|
||
"status": "critical",
|
||
"summary": {"ok": 6, "warning": 0, "critical": 1, "total": 7},
|
||
"checks": [
|
||
{
|
||
"name": "PChome 受控落地 drift monitor",
|
||
"status": "critical",
|
||
"summary": "PChome controlled apply 偵測到 1 筆 drift",
|
||
"details": {"drift_count": 1, "selector_count": 4, "readback_pass_count": 3},
|
||
}
|
||
],
|
||
}
|
||
|
||
summary = smoke.build_scheduled_automation_health_summary(
|
||
current_smoke=current_smoke,
|
||
freshness_max_age_hours=9999,
|
||
)
|
||
|
||
assert summary["status"] == "critical"
|
||
assert "run_pchome_drift_verifier_and_recovery_package" in summary["next_machine_actions"]
|
||
assert summary["safety"]["include_current_smoke"] is True
|
||
assert not history_path.exists()
|
||
|
||
|
||
def test_scheduled_automation_health_summary_route_returns_compact_payload(tmp_path, monkeypatch):
|
||
from datetime import datetime
|
||
from flask import Flask
|
||
from routes import system_public_routes as routes
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
history_path = tmp_path / "smoke_history.jsonl"
|
||
history_path.write_text(
|
||
json.dumps({
|
||
"generated_at": datetime.now().isoformat(timespec="seconds"),
|
||
"status": "ok",
|
||
"summary": {"ok": 7, "warning": 0, "critical": 0, "total": 7},
|
||
"checks": [],
|
||
}, ensure_ascii=False) + "\n",
|
||
encoding="utf-8",
|
||
)
|
||
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
|
||
|
||
app = Flask(__name__)
|
||
with app.test_request_context(
|
||
"/api/ai-automation/scheduled-health-summary?history_limit=5&freshness_max_age_hours=9999"
|
||
):
|
||
response = routes.ai_automation_scheduled_health_summary_api.__wrapped__()
|
||
|
||
payload = response.get_json()
|
||
assert payload["policy"] == "read_only_ai_automation_scheduled_health_summary"
|
||
assert payload["scheduled_outputs"]["health_summary_endpoint"] == (
|
||
"/api/ai-automation/scheduled-health-summary"
|
||
)
|
||
assert payload["safety"]["writes_database"] is False
|
||
|
||
|
||
def test_gemini_egress_check_ok_when_no_calls(monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
class FakeResult:
|
||
def fetchone(self):
|
||
return {"calls": 0, "tokens": 0, "cost_usd": 0, "last_called": None}
|
||
|
||
def fetchall(self):
|
||
return []
|
||
|
||
class FakeSession:
|
||
closed = False
|
||
|
||
def execute(self, *_args, **_kwargs):
|
||
return FakeResult()
|
||
|
||
def close(self):
|
||
self.closed = True
|
||
|
||
fake_session = FakeSession()
|
||
monkeypatch.setattr(smoke, "get_session", lambda: fake_session)
|
||
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
|
||
|
||
result = smoke._gemini_egress_check()
|
||
|
||
assert result["status"] == "ok"
|
||
assert result["details"]["calls"] == 0
|
||
assert result["details"]["hard_disabled"] is True
|
||
assert fake_session.closed is True
|
||
|
||
|
||
def test_gemini_egress_check_critical_when_hard_disabled_has_calls(monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
class FakeSummaryResult:
|
||
def fetchone(self):
|
||
return {
|
||
"calls": 2,
|
||
"tokens": 1234,
|
||
"cost_usd": 0.012345,
|
||
"last_called": "2026-05-21 07:00:00+00",
|
||
}
|
||
|
||
class FakeTopResult:
|
||
def fetchall(self):
|
||
return [{
|
||
"caller": "code_review_openclaw_gemini",
|
||
"model": "gemini-2.5-flash",
|
||
"calls": 2,
|
||
"tokens": 1234,
|
||
"cost_usd": 0.012345,
|
||
"last_called": "2026-05-21 07:00:00+00",
|
||
}]
|
||
|
||
class FakeSession:
|
||
def __init__(self):
|
||
self.calls = 0
|
||
|
||
def execute(self, *_args, **_kwargs):
|
||
self.calls += 1
|
||
return FakeSummaryResult() if self.calls == 1 else FakeTopResult()
|
||
|
||
def close(self):
|
||
pass
|
||
|
||
monkeypatch.setattr(smoke, "get_session", lambda: FakeSession())
|
||
monkeypatch.delenv("GEMINI_API_HARD_DISABLED", raising=False)
|
||
|
||
result = smoke._gemini_egress_check()
|
||
|
||
assert result["status"] == "critical"
|
||
assert "Hard-disabled" in result["summary"]
|
||
assert result["details"]["calls"] == 2
|
||
assert result["details"]["top_callers"][0]["caller"] == "code_review_openclaw_gemini"
|
||
|
||
|
||
def test_nemotron_smoke_detects_current_dispatcher_fallback(monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
import services.nemoton_dispatcher_service as nemotron
|
||
|
||
monkeypatch.delattr(nemotron, "NemotronDispatcherService", raising=False)
|
||
monkeypatch.setattr(nemotron, "NIM_API_KEY", "")
|
||
|
||
result = smoke._nemotron_check()
|
||
|
||
assert result["status"] == "warning"
|
||
assert result["details"]["fallback_ready"] is True
|
||
assert result["details"]["dispatcher_class"] == "NemotronDispatcher"
|
||
|
||
|
||
def test_build_smoke_daily_summary_message_escapes_history(tmp_path, monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
history_path = tmp_path / "smoke_history.jsonl"
|
||
history_path.write_text(
|
||
'{"generated_at":"2026-04-29T01:00:00","status":"warning",'
|
||
'"summary":{"ok":0,"warning":1,"critical":0,"total":1},'
|
||
'"checks":[{"name":"<script>","status":"warning","summary":"bad"}]}\n',
|
||
encoding="utf-8",
|
||
)
|
||
monkeypatch.setattr(smoke, "_HISTORY_PATH", str(history_path))
|
||
|
||
message = smoke.build_smoke_daily_summary_message()
|
||
|
||
assert "AI 自動化 Smoke 每日摘要" in message
|
||
assert "WARNING" in message
|
||
assert "最近異常檢查" in message
|
||
assert "<script>" not in message
|
||
|
||
|
||
def test_send_smoke_daily_summary_uses_telegram_result(monkeypatch):
|
||
from services import ai_automation_smoke_service as smoke
|
||
|
||
monkeypatch.setattr(smoke, "build_smoke_daily_summary_message", lambda: "hello")
|
||
monkeypatch.setattr(
|
||
"services.telegram_templates.send_telegram_with_result",
|
||
lambda message, chat_ids=None: {
|
||
"ok": True,
|
||
"sent": 1,
|
||
"failed": 0,
|
||
"chat_ids": chat_ids or [123],
|
||
"errors": [],
|
||
},
|
||
)
|
||
|
||
result = smoke.send_smoke_daily_summary(chat_ids=[999])
|
||
|
||
assert result["status"] == "sent"
|
||
assert result["telegram"]["chat_ids"] == [999]
|