feat(audit): Phase 11 告警操作完整溯源 — alert_operation_log + 歷史回填
Some checks failed
CD Pipeline / build-and-deploy (push) Failing after 1m29s

統帥指令「所有告警訊息通通寫入資料庫,並記錄相關操作」

變更:
- phase11_alert_operation_log.sql: 新表 (Event Sourcing,不可變)
- phase11b_backfill_alert_operation_log.sql: 歷史回填 654 筆
  - 14 筆 ALERT_RECEIVED (incidents)
  - 265 筆 TELEGRAM_SENT (approval_records)
  - 265 筆 USER_ACTION (approval_records)
  - 110 筆 EXECUTION_COMPLETED (audit_logs)
- db/models.py: AlertOperationLog SQLAlchemy model
- repositories/alert_operation_log_repository.py: append/list_by_incident/get_stats
- webhooks.py: _try_auto_repair_background 寫入 AUTO_REPAIR_TRIGGERED + EXECUTION_COMPLETED + TELEGRAM_RESULT_SENT
- webhooks.py: _push_to_telegram_background 寫入 TELEGRAM_SENT
- telegram.py: handle_callback 寫入 USER_ACTION (approve/reject)

已執行 migration: awoooi_prod@192.168.0.188 

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-08 11:22:03 +08:00
parent eee6f06215
commit f20121ad41
6 changed files with 568 additions and 1 deletions

View File

@@ -0,0 +1,72 @@
-- Phase 11: Alert Operation Log — 告警操作完整溯源表
-- 建立時間: 2026-04-08 (台北時區)
-- 建立者: Claude Code — 統帥指令「所有操作都必須被記錄,寫入資料庫」
--
-- 設計理念: Event Sourcing
-- 每個告警的生命週期,每個事件都寫一筆
-- 不可變 (Immutable) — 只 INSERT不 UPDATE/DELETE
--
-- 事件類型 (event_type):
-- ALERT_RECEIVED — Alertmanager/外部告警進來
-- TELEGRAM_SENT — 推送 Telegram 審核卡片
-- USER_ACTION — 使用者在 Telegram 按按鈕 (approve/reject/silence)
-- AUTO_REPAIR_TRIGGERED — 自動修復評估通過,準備執行
-- EXECUTION_STARTED — 開始執行 K8s/SSH 指令
-- EXECUTION_COMPLETED — 執行完成 (success/failure)
-- TELEGRAM_RESULT_SENT — 自動修復結果推送到 Telegram
-- RESOLVED — 告警解除
-- SILENCED — 靜默中
-- ESCALATED — 升級 (P3→P2 等)
CREATE TYPE alert_event_type AS ENUM (
'ALERT_RECEIVED',
'TELEGRAM_SENT',
'USER_ACTION',
'AUTO_REPAIR_TRIGGERED',
'EXECUTION_STARTED',
'EXECUTION_COMPLETED',
'TELEGRAM_RESULT_SENT',
'RESOLVED',
'SILENCED',
'ESCALATED'
);
CREATE TABLE IF NOT EXISTS alert_operation_log (
-- 主鍵 (不可變)
id VARCHAR(36) PRIMARY KEY DEFAULT gen_random_uuid()::text,
-- 關聯 (所有欄位允許 NULL避免不同事件強制關聯)
incident_id VARCHAR(30), -- incidents.incident_id
approval_id VARCHAR(36), -- approval_records.id
audit_log_id VARCHAR(36), -- audit_logs.id
auto_repair_id VARCHAR(36), -- auto_repair_executions.id
-- 事件核心
event_type alert_event_type NOT NULL,
actor VARCHAR(100), -- 誰觸發: 'alertmanager' / 'telegram:user_id' / 'auto_repair' / 'system'
action_detail VARCHAR(200), -- 具體動作: 'approve' / 'reject' / 'silence' / kubectl 指令摘要
-- 執行結果
success BOOLEAN, -- NULL=不適用 (如 ALERT_RECEIVED), TRUE/FALSE=有執行結果
error_message TEXT,
-- 上下文 (結構化存儲)
context JSONB NOT NULL DEFAULT '{}',
-- 範例:
-- ALERT_RECEIVED: {"alert_name": "KubePodCrashLooping", "severity": "P2", "namespace": "awoooi-prod"}
-- USER_ACTION: {"button": "approve", "telegram_user_id": "12345", "message_id": "67890"}
-- EXECUTION: {"playbook": "restart-deployment", "steps": 3, "duration_ms": 2340}
-- 時間戳 (台北時區,不可變)
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
-- 索引 (查詢模式: 按 incident / 按時間 / 按事件類型)
CREATE INDEX IF NOT EXISTS ix_aol_incident_id ON alert_operation_log (incident_id);
CREATE INDEX IF NOT EXISTS ix_aol_approval_id ON alert_operation_log (approval_id);
CREATE INDEX IF NOT EXISTS ix_aol_event_type ON alert_operation_log (event_type);
CREATE INDEX IF NOT EXISTS ix_aol_created_at ON alert_operation_log (created_at DESC);
CREATE INDEX IF NOT EXISTS ix_aol_actor ON alert_operation_log (actor);
COMMENT ON TABLE alert_operation_log IS
'告警操作完整溯源 — Event Sourcing不可變每個告警生命週期的每個事件一筆記錄';

View File

@@ -0,0 +1,152 @@
-- Phase 11b: 歷史數據回填 alert_operation_log
-- 建立時間: 2026-04-08 (台北時區)
-- 建立者: Claude Code — 統帥指令「把之前所有的告警訊息,通通寫入資料庫」
--
-- 資料來源:
-- incidents (14筆) → ALERT_RECEIVED 事件
-- approval_records (265筆) → TELEGRAM_SENT + USER_ACTION 事件
-- audit_logs (110筆) → EXECUTION_STARTED + EXECUTION_COMPLETED 事件
--
-- 注意: 使用 ON CONFLICT DO NOTHING 避免重複執行
-- ============================================================
-- Step 1: incidents → ALERT_RECEIVED
-- ============================================================
INSERT INTO alert_operation_log (
id, incident_id, event_type, actor, action_detail, success, context, created_at
)
SELECT
gen_random_uuid()::text,
incident_id,
'ALERT_RECEIVED',
COALESCE(source, 'alertmanager'),
COALESCE(
signals->0->>'alert_name',
'unknown'
),
TRUE,
jsonb_build_object(
'severity', severity::text,
'status', status::text,
'alert_name', COALESCE(signals->0->>'alert_name', 'unknown'),
'namespace', COALESCE(signals->0->'labels'->>'namespace', 'default'),
'resource', COALESCE(signals->0->'labels'->>'resource', ''),
'message', COALESCE(signals->0->'annotations'->>'message', ''),
'source', COALESCE(source, 'alertmanager'),
'signal_count', json_array_length(signals),
'backfill', TRUE,
'backfill_at', NOW()::text
),
created_at
FROM incidents
ON CONFLICT DO NOTHING;
-- ============================================================
-- Step 2: approval_records → TELEGRAM_SENT (每筆 approval 代表推送了一次卡片)
-- ============================================================
INSERT INTO alert_operation_log (
id, incident_id, approval_id, event_type, actor, action_detail, success, context, created_at
)
SELECT
gen_random_uuid()::text,
incident_id,
id,
'TELEGRAM_SENT',
'system',
'approval_card_sent',
TRUE,
jsonb_build_object(
'action', action,
'risk_level', risk_level::text,
'requested_by', requested_by,
'hit_count', hit_count,
'backfill', TRUE,
'backfill_at', NOW()::text
),
created_at
FROM approval_records
ON CONFLICT DO NOTHING;
-- ============================================================
-- Step 3: approval_records (APPROVED/REJECTED) → USER_ACTION
-- ============================================================
INSERT INTO alert_operation_log (
id, incident_id, approval_id, event_type, actor, action_detail, success, context, created_at
)
SELECT
gen_random_uuid()::text,
incident_id,
id,
'USER_ACTION',
COALESCE(requested_by, 'unknown'),
CASE status::text
WHEN 'APPROVED' THEN 'approve'
WHEN 'REJECTED' THEN 'reject'
WHEN 'EXECUTION_SUCCESS' THEN 'approve'
WHEN 'EXECUTION_FAILED' THEN 'approve'
ELSE status::text
END,
CASE status::text
WHEN 'APPROVED' THEN TRUE
WHEN 'EXECUTION_SUCCESS' THEN TRUE
WHEN 'REJECTED' THEN FALSE
WHEN 'EXECUTION_FAILED' THEN TRUE -- 批准了但執行失敗
ELSE NULL
END,
jsonb_build_object(
'status', status::text,
'risk_level', risk_level::text,
'rejection_reason', COALESCE(rejection_reason, ''),
'signatures', signatures,
'resolved_at', COALESCE(resolved_at::text, ''),
'backfill', TRUE,
'backfill_at', NOW()::text
),
COALESCE(resolved_at, updated_at, created_at)
FROM approval_records
WHERE status::text IN ('APPROVED', 'REJECTED', 'EXECUTION_SUCCESS', 'EXECUTION_FAILED')
ON CONFLICT DO NOTHING;
-- ============================================================
-- Step 4: audit_logs → EXECUTION_COMPLETED
-- ============================================================
INSERT INTO alert_operation_log (
id, approval_id, audit_log_id, event_type, actor, action_detail, success, error_message, context, created_at
)
SELECT
gen_random_uuid()::text,
approval_id,
id,
'EXECUTION_COMPLETED',
COALESCE(executed_by, 'system'),
COALESCE(operation_type, 'unknown') || '/' || COALESCE(target_resource, ''),
success,
error_message,
jsonb_build_object(
'operation_type', operation_type,
'target_resource', target_resource,
'namespace', namespace,
'execution_duration_ms', execution_duration_ms,
'dry_run_passed', dry_run_passed,
'authorization_channel', COALESCE(authorization_channel, ''),
'retry_count', retry_count,
'failure_classification', COALESCE(failure_classification, ''),
'auto_repair_attempted', auto_repair_attempted,
'backfill', TRUE,
'backfill_at', NOW()::text
),
created_at
FROM audit_logs
ON CONFLICT DO NOTHING;
-- ============================================================
-- 驗證結果
-- ============================================================
SELECT
event_type::text,
COUNT(*) as count,
MIN(created_at) as oldest,
MAX(created_at) as newest
FROM alert_operation_log
GROUP BY event_type
ORDER BY event_type;

View File

@@ -142,6 +142,27 @@ async def telegram_webhook(
service = get_approval_service()
# 2026-04-08 Claude Code: USER_ACTION 記錄
async def _log_user_action(action_name: str, success: bool, incident_id: str | None = None) -> None:
try:
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
await get_alert_operation_log_repository().append(
"USER_ACTION",
incident_id=incident_id,
approval_id=approval_id,
actor=f"telegram:{user_id}",
action_detail=action_name,
success=success,
context={
"username": username,
"user_id": user_id,
"message_id": message_id,
"action": action_name,
},
)
except Exception as _e:
logger.warning("alert_op_log_user_action_failed", error=str(_e))
# 2026-03-29 ogt: 修復方法呼叫 - add_signature/reject 不存在
# 正確方法: sign_approval / reject_approval
if action == "approve":
@@ -160,6 +181,7 @@ async def telegram_webhook(
status=approval.status.value,
execution_triggered=execution_triggered,
)
await _log_user_action("approve", True, getattr(approval, "incident_id", None))
return {
"ok": True,
@@ -183,6 +205,7 @@ async def telegram_webhook(
approval_id=approval_id,
user_id=user_id,
)
await _log_user_action("reject", False, getattr(approval, "incident_id", None))
return {
"ok": True,

View File

@@ -173,10 +173,14 @@ async def _try_auto_repair_background(
流程:
1. 重新載入 Incident
2. evaluate_auto_repair() — 檢查 P2以下 + 高品質Playbook + 低風險
2. evaluate_auto_repair() — 只保留 P0/P1 嚴重度阻擋 (統帥指令: 直接全部自動修復)
3. 可修復 → execute_auto_repair() 執行
4. 不可修復 → 靜默,等人工批准
所有步驟都寫入 alert_operation_log
"""
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
op_log = get_alert_operation_log_repository()
try:
incident_service = get_incident_service()
incident = await incident_service.get_from_working_memory(incident_id)
@@ -197,8 +201,39 @@ async def _try_auto_repair_background(
)
if not decision.can_auto_repair:
# 記錄評估被阻擋
await op_log.append(
"AUTO_REPAIR_TRIGGERED",
incident_id=incident_id,
approval_id=approval_id,
actor="auto_repair",
action_detail=f"blocked:{decision.blocked_by}",
success=False,
error_message=decision.reason,
context={
"blocked_by": decision.blocked_by,
"reason": decision.reason,
"playbook_id": decision.playbook.playbook_id if decision.playbook else None,
},
)
return
# 記錄自動修復觸發
await op_log.append(
"AUTO_REPAIR_TRIGGERED",
incident_id=incident_id,
approval_id=approval_id,
actor="auto_repair",
action_detail=decision.playbook.name if decision.playbook else "unknown",
success=True,
context={
"playbook_id": decision.playbook.playbook_id,
"playbook_name": decision.playbook.name,
"similarity_score": decision.similarity_score,
"risk_level": decision.risk_level.value if decision.risk_level else None,
},
)
# 執行自動修復
logger.info(
"auto_repair_executing",
@@ -218,6 +253,26 @@ async def _try_auto_repair_background(
success=result.success if result else False,
)
# 記錄執行結果
if result:
await op_log.append(
"EXECUTION_COMPLETED",
incident_id=incident_id,
approval_id=approval_id,
actor="auto_repair",
action_detail=f"playbook:{result.playbook_id}",
success=result.success,
error_message=result.error,
context={
"playbook_id": result.playbook_id,
"steps_count": len(result.executed_steps),
"execution_time_ms": result.execution_time_ms,
"alert_type": alert_type,
"target_resource": target_resource,
"namespace": namespace,
},
)
# 通知 Telegram 自動修復結果
if result:
try:
@@ -231,6 +286,16 @@ async def _try_auto_repair_background(
f"耗時: {result.execution_time_ms}ms\n"
f"步驟:\n{steps_summary}"
)
# 記錄 Telegram 推送
await op_log.append(
"TELEGRAM_RESULT_SENT",
incident_id=incident_id,
approval_id=approval_id,
actor="system",
action_detail="auto_repair_result",
success=result.success,
context={"target_resource": target_resource, "namespace": namespace},
)
except Exception as tg_err:
logger.warning("auto_repair_telegram_notify_failed", error=str(tg_err))
@@ -334,6 +399,25 @@ async def _push_to_telegram_background(
ai_cost=f"${ai_cost:.6f}",
)
# 2026-04-08 Claude Code: 記錄 Telegram 推送事件
try:
from src.repositories.alert_operation_log_repository import get_alert_operation_log_repository
await get_alert_operation_log_repository().append(
"TELEGRAM_SENT",
approval_id=approval_id,
actor="system",
action_detail="approval_card",
success=True,
context={
"risk_level": risk_level,
"resource_name": resource_name,
"hit_count": hit_count,
"namespace": namespace,
},
)
except Exception as _log_e:
logger.warning("alert_op_log_telegram_sent_failed", error=str(_log_e))
except TelegramGatewayError as e:
logger.warning(
"telegram_push_failed",

View File

@@ -396,6 +396,54 @@ class AutoRepairExecution(Base):
)
# =============================================================================
# AlertOperationLog - Phase 11 告警操作溯源 (Event Sourcing)
# 2026-04-08 Claude Code: 統帥指令「所有操作都必須被記錄,寫入資料庫」
# 不可變 — 只 INSERT不 UPDATE/DELETE
# =============================================================================
class AlertOperationLog(Base):
"""
告警操作完整溯源
Event Sourcing 模式:每個告警生命週期的每個事件都寫一筆。
不可變 (Immutable)。
event_type 值:
ALERT_RECEIVED / TELEGRAM_SENT / USER_ACTION /
AUTO_REPAIR_TRIGGERED / EXECUTION_STARTED / EXECUTION_COMPLETED /
TELEGRAM_RESULT_SENT / RESOLVED / SILENCED / ESCALATED
"""
__tablename__ = "alert_operation_log"
id: Mapped[str] = mapped_column(String(36), primary_key=True, default=generate_uuid)
# 關聯 (允許 NULL不同事件有不同關聯)
incident_id: Mapped[str | None] = mapped_column(String(30), nullable=True, index=True)
approval_id: Mapped[str | None] = mapped_column(String(36), nullable=True, index=True)
audit_log_id: Mapped[str | None] = mapped_column(String(36), nullable=True)
auto_repair_id: Mapped[str | None] = mapped_column(String(36), nullable=True)
# 事件核心
event_type: Mapped[str] = mapped_column(String(50), nullable=False, index=True)
actor: Mapped[str | None] = mapped_column(String(100), nullable=True, index=True)
action_detail: Mapped[str | None] = mapped_column(String(200), nullable=True)
# 執行結果 (NULL = 不適用)
success: Mapped[bool | None] = mapped_column(nullable=True)
error_message: Mapped[str | None] = mapped_column(Text, nullable=True)
# 結構化上下文
context: Mapped[dict] = mapped_column(JSON, default=dict, nullable=False)
# 時間戳 (台北時區,不可變)
created_at: Mapped[datetime] = mapped_column(DateTime(timezone=True), default=taipei_now)
__table_args__ = (
Index("ix_aol_created_at", "created_at"),
)
# =============================================================================
# IncidentRecord - Phase 6.2 Episodic Memory (PostgreSQL)
# =============================================================================

View File

@@ -0,0 +1,188 @@
"""
Alert Operation Log Repository - Phase 11
==========================================
告警操作完整溯源 (Event Sourcing)
2026-04-08 Claude Code: 統帥指令「所有操作都必須被記錄,寫入資料庫」
設計:
- 不可變 (Immutable) — 只 INSERT不 UPDATE/DELETE
- fire-and-forget 友善 — 所有寫入錯誤只記錄 log不拋出
- leWOOOgo 積木化: Router → Service → Repository → DB
"""
from typing import Any
import structlog
from sqlalchemy import func, select
from src.db.base import get_db_context
from src.db.models import AlertOperationLog
logger = structlog.get_logger(__name__)
# 合法的 event_type 值 (對應 DB ENUM)
ALERT_EVENT_TYPES = {
"ALERT_RECEIVED",
"TELEGRAM_SENT",
"USER_ACTION",
"AUTO_REPAIR_TRIGGERED",
"EXECUTION_STARTED",
"EXECUTION_COMPLETED",
"TELEGRAM_RESULT_SENT",
"RESOLVED",
"SILENCED",
"ESCALATED",
}
class AlertOperationLogRepository:
"""
告警操作日誌 Repository
所有 write 方法設計為 fire-and-forget 友善:
失敗時只記錄 warning log不中斷主流程。
"""
async def append(
self,
event_type: str,
*,
incident_id: str | None = None,
approval_id: str | None = None,
audit_log_id: str | None = None,
auto_repair_id: str | None = None,
actor: str | None = None,
action_detail: str | None = None,
success: bool | None = None,
error_message: str | None = None,
context: dict[str, Any] | None = None,
) -> AlertOperationLog | None:
"""
寫入一筆操作事件
Args:
event_type: 事件類型 (見 ALERT_EVENT_TYPES)
incident_id: 關聯 incident
approval_id: 關聯 approval_record
audit_log_id: 關聯 audit_log
auto_repair_id: 關聯 auto_repair_execution
actor: 觸發者 (alertmanager / telegram:user_id / auto_repair / system)
action_detail: 具體動作描述
success: 結果 (None=不適用)
error_message: 錯誤訊息
context: 額外結構化資訊
Returns:
寫入的記錄,失敗時返回 None
"""
if event_type not in ALERT_EVENT_TYPES:
logger.warning(
"alert_op_log_invalid_event_type",
event_type=event_type,
valid_types=list(ALERT_EVENT_TYPES),
)
return None
try:
async with get_db_context() as db:
record = AlertOperationLog(
incident_id=incident_id,
approval_id=approval_id,
audit_log_id=audit_log_id,
auto_repair_id=auto_repair_id,
event_type=event_type,
actor=actor,
action_detail=action_detail,
success=success,
error_message=error_message,
context=context or {},
)
db.add(record)
await db.flush()
await db.refresh(record)
logger.debug(
"alert_op_log_appended",
event_type=event_type,
incident_id=incident_id,
approval_id=approval_id,
)
return record
except Exception as e:
logger.error(
"alert_op_log_write_failed",
event_type=event_type,
incident_id=incident_id,
error=str(e),
)
return None
async def list_by_incident(
self,
incident_id: str,
limit: int = 100,
) -> list[AlertOperationLog]:
"""查詢某 incident 的完整操作時間軸"""
async with get_db_context() as db:
result = await db.execute(
select(AlertOperationLog)
.where(AlertOperationLog.incident_id == incident_id)
.order_by(AlertOperationLog.created_at.asc())
.limit(limit)
)
return list(result.scalars().all())
async def list_by_approval(
self,
approval_id: str,
) -> list[AlertOperationLog]:
"""查詢某 approval 的操作記錄"""
async with get_db_context() as db:
result = await db.execute(
select(AlertOperationLog)
.where(AlertOperationLog.approval_id == approval_id)
.order_by(AlertOperationLog.created_at.asc())
)
return list(result.scalars().all())
async def get_stats(self, since_hours: int = 24) -> dict[str, Any]:
"""統計最近 N 小時的事件分佈"""
from datetime import timedelta
from src.utils.timezone import now_taipei
since = now_taipei() - timedelta(hours=since_hours)
async with get_db_context() as db:
# 按事件類型統計
type_result = await db.execute(
select(
AlertOperationLog.event_type,
func.count(AlertOperationLog.id),
)
.where(AlertOperationLog.created_at >= since)
.group_by(AlertOperationLog.event_type)
)
by_type = {str(row[0]): row[1] for row in type_result.all()}
# 總計
total = sum(by_type.values())
return {
"total": total,
"since_hours": since_hours,
"by_event_type": by_type,
}
# =============================================================================
# Singleton
# =============================================================================
_alert_op_log_repo: AlertOperationLogRepository | None = None
def get_alert_operation_log_repository() -> AlertOperationLogRepository:
"""取得 AlertOperationLogRepository 實例 (Singleton)"""
global _alert_op_log_repo
if _alert_op_log_repo is None:
_alert_op_log_repo = AlertOperationLogRepository()
return _alert_op_log_repo