From f20121ad41c24142baf8d9717bdac33489631ec0 Mon Sep 17 00:00:00 2001 From: OG T Date: Wed, 8 Apr 2026 11:22:03 +0800 Subject: [PATCH] =?UTF-8?q?feat(audit):=20Phase=2011=20=E5=91=8A=E8=AD=A6?= =?UTF-8?q?=E6=93=8D=E4=BD=9C=E5=AE=8C=E6=95=B4=E6=BA=AF=E6=BA=90=20?= =?UTF-8?q?=E2=80=94=20alert=5Foperation=5Flog=20+=20=E6=AD=B7=E5=8F=B2?= =?UTF-8?q?=E5=9B=9E=E5=A1=AB?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 統帥指令「所有告警訊息通通寫入資料庫,並記錄相關操作」 變更: - phase11_alert_operation_log.sql: 新表 (Event Sourcing,不可變) - phase11b_backfill_alert_operation_log.sql: 歷史回填 654 筆 - 14 筆 ALERT_RECEIVED (incidents) - 265 筆 TELEGRAM_SENT (approval_records) - 265 筆 USER_ACTION (approval_records) - 110 筆 EXECUTION_COMPLETED (audit_logs) - db/models.py: AlertOperationLog SQLAlchemy model - repositories/alert_operation_log_repository.py: append/list_by_incident/get_stats - webhooks.py: _try_auto_repair_background 寫入 AUTO_REPAIR_TRIGGERED + EXECUTION_COMPLETED + TELEGRAM_RESULT_SENT - webhooks.py: _push_to_telegram_background 寫入 TELEGRAM_SENT - telegram.py: handle_callback 寫入 USER_ACTION (approve/reject) 已執行 migration: awoooi_prod@192.168.0.188 ✅ Co-Authored-By: Claude Opus 4.6 (1M context) --- .../phase11_alert_operation_log.sql | 72 +++++++ .../phase11b_backfill_alert_operation_log.sql | 152 ++++++++++++++ apps/api/src/api/v1/telegram.py | 23 +++ apps/api/src/api/v1/webhooks.py | 86 +++++++- apps/api/src/db/models.py | 48 +++++ .../alert_operation_log_repository.py | 188 ++++++++++++++++++ 6 files changed, 568 insertions(+), 1 deletion(-) create mode 100644 apps/api/migrations/phase11_alert_operation_log.sql create mode 100644 apps/api/migrations/phase11b_backfill_alert_operation_log.sql create mode 100644 apps/api/src/repositories/alert_operation_log_repository.py diff --git a/apps/api/migrations/phase11_alert_operation_log.sql b/apps/api/migrations/phase11_alert_operation_log.sql new file mode 100644 index 00000000..9338682b --- /dev/null +++ b/apps/api/migrations/phase11_alert_operation_log.sql @@ -0,0 +1,72 @@ +-- Phase 11: Alert Operation Log — 告警操作完整溯源表 +-- 建立時間: 2026-04-08 (台北時區) +-- 建立者: Claude Code — 統帥指令「所有操作都必須被記錄,寫入資料庫」 +-- +-- 設計理念: Event Sourcing +-- 每個告警的生命週期,每個事件都寫一筆 +-- 不可變 (Immutable) — 只 INSERT,不 UPDATE/DELETE +-- +-- 事件類型 (event_type): +-- ALERT_RECEIVED — Alertmanager/外部告警進來 +-- TELEGRAM_SENT — 推送 Telegram 審核卡片 +-- USER_ACTION — 使用者在 Telegram 按按鈕 (approve/reject/silence) +-- AUTO_REPAIR_TRIGGERED — 自動修復評估通過,準備執行 +-- EXECUTION_STARTED — 開始執行 K8s/SSH 指令 +-- EXECUTION_COMPLETED — 執行完成 (success/failure) +-- TELEGRAM_RESULT_SENT — 自動修復結果推送到 Telegram +-- RESOLVED — 告警解除 +-- SILENCED — 靜默中 +-- ESCALATED — 升級 (P3→P2 等) + +CREATE TYPE alert_event_type AS ENUM ( + 'ALERT_RECEIVED', + 'TELEGRAM_SENT', + 'USER_ACTION', + 'AUTO_REPAIR_TRIGGERED', + 'EXECUTION_STARTED', + 'EXECUTION_COMPLETED', + 'TELEGRAM_RESULT_SENT', + 'RESOLVED', + 'SILENCED', + 'ESCALATED' +); + +CREATE TABLE IF NOT EXISTS alert_operation_log ( + -- 主鍵 (不可變) + id VARCHAR(36) PRIMARY KEY DEFAULT gen_random_uuid()::text, + + -- 關聯 (所有欄位允許 NULL,避免不同事件強制關聯) + incident_id VARCHAR(30), -- incidents.incident_id + approval_id VARCHAR(36), -- approval_records.id + audit_log_id VARCHAR(36), -- audit_logs.id + auto_repair_id VARCHAR(36), -- auto_repair_executions.id + + -- 事件核心 + event_type alert_event_type NOT NULL, + actor VARCHAR(100), -- 誰觸發: 'alertmanager' / 'telegram:user_id' / 'auto_repair' / 'system' + action_detail VARCHAR(200), -- 具體動作: 'approve' / 'reject' / 'silence' / kubectl 指令摘要 + + -- 執行結果 + success BOOLEAN, -- NULL=不適用 (如 ALERT_RECEIVED), TRUE/FALSE=有執行結果 + error_message TEXT, + + -- 上下文 (結構化存儲) + context JSONB NOT NULL DEFAULT '{}', + -- 範例: + -- ALERT_RECEIVED: {"alert_name": "KubePodCrashLooping", "severity": "P2", "namespace": "awoooi-prod"} + -- USER_ACTION: {"button": "approve", "telegram_user_id": "12345", "message_id": "67890"} + -- EXECUTION: {"playbook": "restart-deployment", "steps": 3, "duration_ms": 2340} + + -- 時間戳 (台北時區,不可變) + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +-- 索引 (查詢模式: 按 incident / 按時間 / 按事件類型) +CREATE INDEX IF NOT EXISTS ix_aol_incident_id ON alert_operation_log (incident_id); +CREATE INDEX IF NOT EXISTS ix_aol_approval_id ON alert_operation_log (approval_id); +CREATE INDEX IF NOT EXISTS ix_aol_event_type ON alert_operation_log (event_type); +CREATE INDEX IF NOT EXISTS ix_aol_created_at ON alert_operation_log (created_at DESC); +CREATE INDEX IF NOT EXISTS ix_aol_actor ON alert_operation_log (actor); + +COMMENT ON TABLE alert_operation_log IS +'告警操作完整溯源 — Event Sourcing,不可變,每個告警生命週期的每個事件一筆記錄'; diff --git a/apps/api/migrations/phase11b_backfill_alert_operation_log.sql b/apps/api/migrations/phase11b_backfill_alert_operation_log.sql new file mode 100644 index 00000000..dbcceb15 --- /dev/null +++ b/apps/api/migrations/phase11b_backfill_alert_operation_log.sql @@ -0,0 +1,152 @@ +-- Phase 11b: 歷史數據回填 alert_operation_log +-- 建立時間: 2026-04-08 (台北時區) +-- 建立者: Claude Code — 統帥指令「把之前所有的告警訊息,通通寫入資料庫」 +-- +-- 資料來源: +-- incidents (14筆) → ALERT_RECEIVED 事件 +-- approval_records (265筆) → TELEGRAM_SENT + USER_ACTION 事件 +-- audit_logs (110筆) → EXECUTION_STARTED + EXECUTION_COMPLETED 事件 +-- +-- 注意: 使用 ON CONFLICT DO NOTHING 避免重複執行 + +-- ============================================================ +-- Step 1: incidents → ALERT_RECEIVED +-- ============================================================ +INSERT INTO alert_operation_log ( + id, incident_id, event_type, actor, action_detail, success, context, created_at +) +SELECT + gen_random_uuid()::text, + incident_id, + 'ALERT_RECEIVED', + COALESCE(source, 'alertmanager'), + COALESCE( + signals->0->>'alert_name', + 'unknown' + ), + TRUE, + jsonb_build_object( + 'severity', severity::text, + 'status', status::text, + 'alert_name', COALESCE(signals->0->>'alert_name', 'unknown'), + 'namespace', COALESCE(signals->0->'labels'->>'namespace', 'default'), + 'resource', COALESCE(signals->0->'labels'->>'resource', ''), + 'message', COALESCE(signals->0->'annotations'->>'message', ''), + 'source', COALESCE(source, 'alertmanager'), + 'signal_count', json_array_length(signals), + 'backfill', TRUE, + 'backfill_at', NOW()::text + ), + created_at +FROM incidents +ON CONFLICT DO NOTHING; + +-- ============================================================ +-- Step 2: approval_records → TELEGRAM_SENT (每筆 approval 代表推送了一次卡片) +-- ============================================================ +INSERT INTO alert_operation_log ( + id, incident_id, approval_id, event_type, actor, action_detail, success, context, created_at +) +SELECT + gen_random_uuid()::text, + incident_id, + id, + 'TELEGRAM_SENT', + 'system', + 'approval_card_sent', + TRUE, + jsonb_build_object( + 'action', action, + 'risk_level', risk_level::text, + 'requested_by', requested_by, + 'hit_count', hit_count, + 'backfill', TRUE, + 'backfill_at', NOW()::text + ), + created_at +FROM approval_records +ON CONFLICT DO NOTHING; + +-- ============================================================ +-- Step 3: approval_records (APPROVED/REJECTED) → USER_ACTION +-- ============================================================ +INSERT INTO alert_operation_log ( + id, incident_id, approval_id, event_type, actor, action_detail, success, context, created_at +) +SELECT + gen_random_uuid()::text, + incident_id, + id, + 'USER_ACTION', + COALESCE(requested_by, 'unknown'), + CASE status::text + WHEN 'APPROVED' THEN 'approve' + WHEN 'REJECTED' THEN 'reject' + WHEN 'EXECUTION_SUCCESS' THEN 'approve' + WHEN 'EXECUTION_FAILED' THEN 'approve' + ELSE status::text + END, + CASE status::text + WHEN 'APPROVED' THEN TRUE + WHEN 'EXECUTION_SUCCESS' THEN TRUE + WHEN 'REJECTED' THEN FALSE + WHEN 'EXECUTION_FAILED' THEN TRUE -- 批准了但執行失敗 + ELSE NULL + END, + jsonb_build_object( + 'status', status::text, + 'risk_level', risk_level::text, + 'rejection_reason', COALESCE(rejection_reason, ''), + 'signatures', signatures, + 'resolved_at', COALESCE(resolved_at::text, ''), + 'backfill', TRUE, + 'backfill_at', NOW()::text + ), + COALESCE(resolved_at, updated_at, created_at) +FROM approval_records +WHERE status::text IN ('APPROVED', 'REJECTED', 'EXECUTION_SUCCESS', 'EXECUTION_FAILED') +ON CONFLICT DO NOTHING; + +-- ============================================================ +-- Step 4: audit_logs → EXECUTION_COMPLETED +-- ============================================================ +INSERT INTO alert_operation_log ( + id, approval_id, audit_log_id, event_type, actor, action_detail, success, error_message, context, created_at +) +SELECT + gen_random_uuid()::text, + approval_id, + id, + 'EXECUTION_COMPLETED', + COALESCE(executed_by, 'system'), + COALESCE(operation_type, 'unknown') || '/' || COALESCE(target_resource, ''), + success, + error_message, + jsonb_build_object( + 'operation_type', operation_type, + 'target_resource', target_resource, + 'namespace', namespace, + 'execution_duration_ms', execution_duration_ms, + 'dry_run_passed', dry_run_passed, + 'authorization_channel', COALESCE(authorization_channel, ''), + 'retry_count', retry_count, + 'failure_classification', COALESCE(failure_classification, ''), + 'auto_repair_attempted', auto_repair_attempted, + 'backfill', TRUE, + 'backfill_at', NOW()::text + ), + created_at +FROM audit_logs +ON CONFLICT DO NOTHING; + +-- ============================================================ +-- 驗證結果 +-- ============================================================ +SELECT + event_type::text, + COUNT(*) as count, + MIN(created_at) as oldest, + MAX(created_at) as newest +FROM alert_operation_log +GROUP BY event_type +ORDER BY event_type; diff --git a/apps/api/src/api/v1/telegram.py b/apps/api/src/api/v1/telegram.py index e85932b2..4b107654 100644 --- a/apps/api/src/api/v1/telegram.py +++ b/apps/api/src/api/v1/telegram.py @@ -142,6 +142,27 @@ async def telegram_webhook( service = get_approval_service() + # 2026-04-08 Claude Code: USER_ACTION 記錄 + async def _log_user_action(action_name: str, success: bool, incident_id: str | None = None) -> None: + try: + from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository + await get_alert_operation_log_repository().append( + "USER_ACTION", + incident_id=incident_id, + approval_id=approval_id, + actor=f"telegram:{user_id}", + action_detail=action_name, + success=success, + context={ + "username": username, + "user_id": user_id, + "message_id": message_id, + "action": action_name, + }, + ) + except Exception as _e: + logger.warning("alert_op_log_user_action_failed", error=str(_e)) + # 2026-03-29 ogt: 修復方法呼叫 - add_signature/reject 不存在 # 正確方法: sign_approval / reject_approval if action == "approve": @@ -160,6 +181,7 @@ async def telegram_webhook( status=approval.status.value, execution_triggered=execution_triggered, ) + await _log_user_action("approve", True, getattr(approval, "incident_id", None)) return { "ok": True, @@ -183,6 +205,7 @@ async def telegram_webhook( approval_id=approval_id, user_id=user_id, ) + await _log_user_action("reject", False, getattr(approval, "incident_id", None)) return { "ok": True, diff --git a/apps/api/src/api/v1/webhooks.py b/apps/api/src/api/v1/webhooks.py index 16c71efb..9b45072b 100644 --- a/apps/api/src/api/v1/webhooks.py +++ b/apps/api/src/api/v1/webhooks.py @@ -173,10 +173,14 @@ async def _try_auto_repair_background( 流程: 1. 重新載入 Incident - 2. evaluate_auto_repair() — 檢查 P2以下 + 高品質Playbook + 低風險 + 2. evaluate_auto_repair() — 只保留 P0/P1 嚴重度阻擋 (統帥指令: 直接全部自動修復) 3. 可修復 → execute_auto_repair() 執行 4. 不可修復 → 靜默,等人工批准 + 所有步驟都寫入 alert_operation_log """ + from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository + op_log = get_alert_operation_log_repository() + try: incident_service = get_incident_service() incident = await incident_service.get_from_working_memory(incident_id) @@ -197,8 +201,39 @@ async def _try_auto_repair_background( ) if not decision.can_auto_repair: + # 記錄評估被阻擋 + await op_log.append( + "AUTO_REPAIR_TRIGGERED", + incident_id=incident_id, + approval_id=approval_id, + actor="auto_repair", + action_detail=f"blocked:{decision.blocked_by}", + success=False, + error_message=decision.reason, + context={ + "blocked_by": decision.blocked_by, + "reason": decision.reason, + "playbook_id": decision.playbook.playbook_id if decision.playbook else None, + }, + ) return + # 記錄自動修復觸發 + await op_log.append( + "AUTO_REPAIR_TRIGGERED", + incident_id=incident_id, + approval_id=approval_id, + actor="auto_repair", + action_detail=decision.playbook.name if decision.playbook else "unknown", + success=True, + context={ + "playbook_id": decision.playbook.playbook_id, + "playbook_name": decision.playbook.name, + "similarity_score": decision.similarity_score, + "risk_level": decision.risk_level.value if decision.risk_level else None, + }, + ) + # 執行自動修復 logger.info( "auto_repair_executing", @@ -218,6 +253,26 @@ async def _try_auto_repair_background( success=result.success if result else False, ) + # 記錄執行結果 + if result: + await op_log.append( + "EXECUTION_COMPLETED", + incident_id=incident_id, + approval_id=approval_id, + actor="auto_repair", + action_detail=f"playbook:{result.playbook_id}", + success=result.success, + error_message=result.error, + context={ + "playbook_id": result.playbook_id, + "steps_count": len(result.executed_steps), + "execution_time_ms": result.execution_time_ms, + "alert_type": alert_type, + "target_resource": target_resource, + "namespace": namespace, + }, + ) + # 通知 Telegram 自動修復結果 if result: try: @@ -231,6 +286,16 @@ async def _try_auto_repair_background( f"耗時: {result.execution_time_ms}ms\n" f"步驟:\n{steps_summary}" ) + # 記錄 Telegram 推送 + await op_log.append( + "TELEGRAM_RESULT_SENT", + incident_id=incident_id, + approval_id=approval_id, + actor="system", + action_detail="auto_repair_result", + success=result.success, + context={"target_resource": target_resource, "namespace": namespace}, + ) except Exception as tg_err: logger.warning("auto_repair_telegram_notify_failed", error=str(tg_err)) @@ -334,6 +399,25 @@ async def _push_to_telegram_background( ai_cost=f"${ai_cost:.6f}", ) + # 2026-04-08 Claude Code: 記錄 Telegram 推送事件 + try: + from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository + await get_alert_operation_log_repository().append( + "TELEGRAM_SENT", + approval_id=approval_id, + actor="system", + action_detail="approval_card", + success=True, + context={ + "risk_level": risk_level, + "resource_name": resource_name, + "hit_count": hit_count, + "namespace": namespace, + }, + ) + except Exception as _log_e: + logger.warning("alert_op_log_telegram_sent_failed", error=str(_log_e)) + except TelegramGatewayError as e: logger.warning( "telegram_push_failed", diff --git a/apps/api/src/db/models.py b/apps/api/src/db/models.py index 31ade469..2b891865 100644 --- a/apps/api/src/db/models.py +++ b/apps/api/src/db/models.py @@ -396,6 +396,54 @@ class AutoRepairExecution(Base): ) +# ============================================================================= +# AlertOperationLog - Phase 11 告警操作溯源 (Event Sourcing) +# 2026-04-08 Claude Code: 統帥指令「所有操作都必須被記錄,寫入資料庫」 +# 不可變 — 只 INSERT,不 UPDATE/DELETE +# ============================================================================= + +class AlertOperationLog(Base): + """ + 告警操作完整溯源 + + Event Sourcing 模式:每個告警生命週期的每個事件都寫一筆。 + 不可變 (Immutable)。 + + event_type 值: + ALERT_RECEIVED / TELEGRAM_SENT / USER_ACTION / + AUTO_REPAIR_TRIGGERED / EXECUTION_STARTED / EXECUTION_COMPLETED / + TELEGRAM_RESULT_SENT / RESOLVED / SILENCED / ESCALATED + """ + __tablename__ = "alert_operation_log" + + id: Mapped[str] = mapped_column(String(36), primary_key=True, default=generate_uuid) + + # 關聯 (允許 NULL,不同事件有不同關聯) + incident_id: Mapped[str | None] = mapped_column(String(30), nullable=True, index=True) + approval_id: Mapped[str | None] = mapped_column(String(36), nullable=True, index=True) + audit_log_id: Mapped[str | None] = mapped_column(String(36), nullable=True) + auto_repair_id: Mapped[str | None] = mapped_column(String(36), nullable=True) + + # 事件核心 + event_type: Mapped[str] = mapped_column(String(50), nullable=False, index=True) + actor: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True) + action_detail: Mapped[str | None] = mapped_column(String(200), nullable=True) + + # 執行結果 (NULL = 不適用) + success: Mapped[bool | None] = mapped_column(nullable=True) + error_message: Mapped[str | None] = mapped_column(Text, nullable=True) + + # 結構化上下文 + context: Mapped[dict] = mapped_column(JSON, default=dict, nullable=False) + + # 時間戳 (台北時區,不可變) + created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now) + + __table_args__ = ( + Index("ix_aol_created_at", "created_at"), + ) + + # ============================================================================= # IncidentRecord - Phase 6.2 Episodic Memory (PostgreSQL) # ============================================================================= diff --git a/apps/api/src/repositories/alert_operation_log_repository.py b/apps/api/src/repositories/alert_operation_log_repository.py new file mode 100644 index 00000000..c6a94bdf --- /dev/null +++ b/apps/api/src/repositories/alert_operation_log_repository.py @@ -0,0 +1,188 @@ +""" +Alert Operation Log Repository - Phase 11 +========================================== +告警操作完整溯源 (Event Sourcing) + +2026-04-08 Claude Code: 統帥指令「所有操作都必須被記錄,寫入資料庫」 + +設計: +- 不可變 (Immutable) — 只 INSERT,不 UPDATE/DELETE +- fire-and-forget 友善 — 所有寫入錯誤只記錄 log,不拋出 +- leWOOOgo 積木化: Router → Service → Repository → DB +""" + +from typing import Any + +import structlog +from sqlalchemy import func, select + +from src.db.base import get_db_context +from src.db.models import AlertOperationLog + +logger = structlog.get_logger(__name__) + +# 合法的 event_type 值 (對應 DB ENUM) +ALERT_EVENT_TYPES = { + "ALERT_RECEIVED", + "TELEGRAM_SENT", + "USER_ACTION", + "AUTO_REPAIR_TRIGGERED", + "EXECUTION_STARTED", + "EXECUTION_COMPLETED", + "TELEGRAM_RESULT_SENT", + "RESOLVED", + "SILENCED", + "ESCALATED", +} + + +class AlertOperationLogRepository: + """ + 告警操作日誌 Repository + + 所有 write 方法設計為 fire-and-forget 友善: + 失敗時只記錄 warning log,不中斷主流程。 + """ + + async def append( + self, + event_type: str, + *, + incident_id: str | None = None, + approval_id: str | None = None, + audit_log_id: str | None = None, + auto_repair_id: str | None = None, + actor: str | None = None, + action_detail: str | None = None, + success: bool | None = None, + error_message: str | None = None, + context: dict[str, Any] | None = None, + ) -> AlertOperationLog | None: + """ + 寫入一筆操作事件 + + Args: + event_type: 事件類型 (見 ALERT_EVENT_TYPES) + incident_id: 關聯 incident + approval_id: 關聯 approval_record + audit_log_id: 關聯 audit_log + auto_repair_id: 關聯 auto_repair_execution + actor: 觸發者 (alertmanager / telegram:user_id / auto_repair / system) + action_detail: 具體動作描述 + success: 結果 (None=不適用) + error_message: 錯誤訊息 + context: 額外結構化資訊 + + Returns: + 寫入的記錄,失敗時返回 None + """ + if event_type not in ALERT_EVENT_TYPES: + logger.warning( + "alert_op_log_invalid_event_type", + event_type=event_type, + valid_types=list(ALERT_EVENT_TYPES), + ) + return None + + try: + async with get_db_context() as db: + record = AlertOperationLog( + incident_id=incident_id, + approval_id=approval_id, + audit_log_id=audit_log_id, + auto_repair_id=auto_repair_id, + event_type=event_type, + actor=actor, + action_detail=action_detail, + success=success, + error_message=error_message, + context=context or {}, + ) + db.add(record) + await db.flush() + await db.refresh(record) + logger.debug( + "alert_op_log_appended", + event_type=event_type, + incident_id=incident_id, + approval_id=approval_id, + ) + return record + except Exception as e: + logger.error( + "alert_op_log_write_failed", + event_type=event_type, + incident_id=incident_id, + error=str(e), + ) + return None + + async def list_by_incident( + self, + incident_id: str, + limit: int = 100, + ) -> list[AlertOperationLog]: + """查詢某 incident 的完整操作時間軸""" + async with get_db_context() as db: + result = await db.execute( + select(AlertOperationLog) + .where(AlertOperationLog.incident_id == incident_id) + .order_by(AlertOperationLog.created_at.asc()) + .limit(limit) + ) + return list(result.scalars().all()) + + async def list_by_approval( + self, + approval_id: str, + ) -> list[AlertOperationLog]: + """查詢某 approval 的操作記錄""" + async with get_db_context() as db: + result = await db.execute( + select(AlertOperationLog) + .where(AlertOperationLog.approval_id == approval_id) + .order_by(AlertOperationLog.created_at.asc()) + ) + return list(result.scalars().all()) + + async def get_stats(self, since_hours: int = 24) -> dict[str, Any]: + """統計最近 N 小時的事件分佈""" + from datetime import timedelta + from src.utils.timezone import now_taipei + + since = now_taipei() - timedelta(hours=since_hours) + async with get_db_context() as db: + # 按事件類型統計 + type_result = await db.execute( + select( + AlertOperationLog.event_type, + func.count(AlertOperationLog.id), + ) + .where(AlertOperationLog.created_at >= since) + .group_by(AlertOperationLog.event_type) + ) + by_type = {str(row[0]): row[1] for row in type_result.all()} + + # 總計 + total = sum(by_type.values()) + + return { + "total": total, + "since_hours": since_hours, + "by_event_type": by_type, + } + + +# ============================================================================= +# Singleton +# ============================================================================= + +_alert_op_log_repo: AlertOperationLogRepository | None = None + + +def get_alert_operation_log_repository() -> AlertOperationLogRepository: + """取得 AlertOperationLogRepository 實例 (Singleton)""" + global _alert_op_log_repo + if _alert_op_log_repo is None: + _alert_op_log_repo = AlertOperationLogRepository() + return _alert_op_log_repo