fix(sweeper): 修正 decision key 格式 BUG (decision:INC-* → sweeper_done:INC-*)
Some checks failed
CD Pipeline / build-and-deploy (push) Has been cancelled

根本原因:
  decision token 實際 key 格式為 decision:DEC-{HEX12}
  sweeper 錯誤地查詢 decision:{incident_id} (永遠 = 0)
  → 每 90s 將 186 個 incident 全部列為「未分析」
  → 觸發大量重複 AI 分析請求 (雖 get_or_create_decision 有去重保護)

修正方式:
  改用 sweeper_done:{incident_id} 輕量標記 (TTL 1h)
  分析完成後才設標記,確保失敗的 incident 下輪仍會重試
  get_or_create_decision 內部已有 COMPLETED/READY 去重,雙重保護

2026-04-16 Claude Sonnet 4.6 Asia/Taipei

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-16 01:11:54 +08:00
parent bb7441ec8a
commit 20b3fefca7

View File

@@ -14,7 +14,12 @@ Incident Analysis Sweeper — 自動觸發 INVESTIGATING 事件 AI 分析
Semaphore(3) — 避免並發壓垮 OPENCLAW_NEMO/Ollama
每批最多 5 個 incident避免啟動雪崩
2026-04-16 Claude Sonnet 4.6 Asia/Taipei
Key 格式說明:
decision token 儲存為 decision:DEC-{HEX},內部 incident_id 欄位對應 INC-*。
使用 sweeper_done:{incident_id} 輕量標記避免重複掃描。
get_or_create_decision() 本身已有 COMPLETED/READY 去重,雙重保護。
2026-04-16 Claude Sonnet 4.6 Asia/Taipei — 修正 key 格式 BUG
"""
from __future__ import annotations
@@ -29,6 +34,8 @@ logger = structlog.get_logger(__name__)
_SWEEP_INTERVAL_SEC = 90 # 每 90 秒掃一次
_MAX_BATCH = 5 # 每批最多 5 個
_SEMAPHORE_LIMIT = 3 # 最多 3 個並發 AI 分析
_DONE_MARKER_PREFIX = "sweeper_done:" # 輕量標記:已觸發過分析
_DONE_MARKER_TTL = 3600 # 1 小時 TTL後續由 get_or_create 去重
async def run_incident_analysis_sweeper() -> None:
@@ -52,6 +59,9 @@ async def _sweep_once(sem: asyncio.Semaphore) -> None:
"""
執行一次掃描:找出沒有 decision token 的 INVESTIGATING incidents
在背景觸發 AI 分析。
Decision token key 格式: decision:DEC-{HEX12} (非 decision:INC-*)
使用 sweeper_done:{incident_id} 輕量標記避免重複觸發。
"""
from src.services.decision_manager import get_decision_manager
from src.services.incident_service import get_incident_service
@@ -71,11 +81,11 @@ async def _sweep_once(sem: asyncio.Semaphore) -> None:
if not incidents:
return
# 找出沒有 decision token 的
# 找出尚未觸發過分析的 (用輕量標記,不掃描 decision:DEC-* 全集)
unanalyzed = []
for incident in incidents:
token_key = f"decision:{incident.incident_id}"
if not await redis.exists(token_key):
done_key = f"{_DONE_MARKER_PREFIX}{incident.incident_id}"
if not await redis.exists(done_key):
unanalyzed.append(incident)
if not unanalyzed:
@@ -94,6 +104,9 @@ async def _sweep_once(sem: asyncio.Semaphore) -> None:
try:
timeout = 120.0 if incident.severity in (Severity.P0, Severity.P1) else 180.0
await dm.get_or_create_decision(incident=incident, timeout_sec=timeout)
# 設 done 標記,避免下次掃描重複觸發
done_key = f"{_DONE_MARKER_PREFIX}{incident.incident_id}"
await redis.set(done_key, "1", ex=_DONE_MARKER_TTL)
logger.info("sweeper_analysis_done", incident_id=incident.incident_id)
except Exception as e:
logger.warning(