diff --git a/docs/superpowers/plans/2026-04-04-p0-diagnose-privacy-first-routing.md b/docs/superpowers/plans/2026-04-04-p0-diagnose-privacy-first-routing.md new file mode 100644 index 00000000..7b8c6d3f --- /dev/null +++ b/docs/superpowers/plans/2026-04-04-p0-diagnose-privacy-first-routing.md @@ -0,0 +1,464 @@ +# P0:DIAGNOSE Privacy-First Routing 實作計畫 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 為 AIRouter 新增獨立的 local-only fallback chain,確保 FORCE_LOCAL 情境下 DIAGNOSE 絕不觸碰雲端,並將非隱私 DIAGNOSE 路由升級至 Nemotron(高能力)。 + +**Architecture:** 現行 `_full_fallback_chain` 是全局的,`require_local` 過濾雖已存在但只是跳過個別 provider,沒有「chain 已耗盡 → REJECT + 通知」的保護。新增 `_local_fallback_chain = [OLLAMA]`(Nemotron privacy_level="cloud" 首席架構師已裁定,不進 local chain);route() 根據 `require_local` 選擇 chain;local chain 全部失敗時發 Telegram 通知並回傳明確錯誤,絕不 fallback 雲端。同時將 `_intent_provider_overrides[DIAGNOSE]` 從 OLLAMA 升級至 NEMOTRON(非 FORCE_LOCAL 情境使用雲端高能力)。 + +**Tech Stack:** Python 3.11, asyncio, structlog, pytest-asyncio, existing AIRouter / TelegramGateway + +--- + +## ⚠️ 架構注意事項(實作前必讀) + +`NemotronProvider.privacy_level = "cloud"`(首席架構師 Q2 已裁定,NIM 是雲端 GPU)。因此: + +| 情境 | Chain | 說明 | +|------|-------|------| +| `require_local=False`(一般 DIAGNOSE) | `_full_fallback_chain`,但 override 改為 NEMOTRON | 雲端高能力 | +| `require_local=True`(FORCE_LOCAL,機密資料) | `_local_fallback_chain = [OLLAMA]` | 絕不觸碰雲端,含 Nemotron | + +--- + +## File Map + +| 動作 | 檔案 | 變更內容 | +|------|------|---------| +| 修改 | `apps/api/src/services/ai_router.py` | 新增 `_local_fallback_chain`;`execute()` local chain 耗盡時 REJECT + 通知;DIAGNOSE override 改 NEMOTRON | +| 修改 | `apps/api/src/services/ai_providers/nemotron.py` | `analyze()` 支援 per-task timeout(讀 `context["task_type"]`) | +| 修改 | `apps/api/src/core/config.py` | 新增 `NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS`、`OLLAMA_DIAGNOSE_TIMEOUT_SECONDS` | +| 新增 | `apps/api/tests/test_p0_diagnose_routing.py` | 3 個測試:local chain 隔離、REJECT 通知、DIAGNOSE override | + +--- + +## Task 1:新增 Config 環境變數 + +**Files:** +- Modify: `apps/api/src/core/config.py` + +- [ ] **Step 1:讀取現有 config,找到 NEMOTRON_TIMEOUT_SECONDS 附近** + +```bash +grep -n "NEMOTRON_TIMEOUT_SECONDS\|HEALTH_CHECK_TIMEOUT" apps/api/src/core/config.py +``` + +- [ ] **Step 2:在 NEMOTRON_TIMEOUT_SECONDS 下方新增兩個欄位** + +在 `NEMOTRON_TIMEOUT_SECONDS` 那行後面加入: + +```python + NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS: int = Field( + default=30, + description="DIAGNOSE 任務專用 Nemotron timeout(秒),實測後調整", + ) + OLLAMA_DIAGNOSE_TIMEOUT_SECONDS: int = Field( + default=60, + description="DIAGNOSE 任務專用 Ollama timeout(秒),Ollama 較慢", + ) +``` + +- [ ] **Step 3:確認 config 語法正確** + +```bash +cd apps/api && python -c "from src.core.config import get_settings; s = get_settings(); print(s.NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS, s.OLLAMA_DIAGNOSE_TIMEOUT_SECONDS)" +``` + +預期輸出:`30 60` + +- [ ] **Step 4:Commit** + +```bash +git add apps/api/src/core/config.py +git commit -m "feat(config): 新增 DIAGNOSE 專用 timeout 環境變數 (P0)" +``` + +--- + +## Task 2:NemotronProvider 支援 per-task timeout + +**Files:** +- Modify: `apps/api/src/services/ai_providers/nemotron.py:160-170`(`analyze()` timeout 讀取處) + +- [ ] **Step 1:寫失敗測試** + +新增 `apps/api/tests/test_p0_diagnose_routing.py`: + +```python +""" +P0 DIAGNOSE Privacy-First Routing Tests +======================================== +測試 AIRouter local chain 隔離 + DIAGNOSE timeout 路由 + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P0 DIAGNOSE Privacy-First) +""" + +import os +os.environ.setdefault("MOCK_MODE", "true") + +import pytest +from unittest.mock import AsyncMock, MagicMock, patch + + +class TestNemotronPerTaskTimeout: + """Nemotron 支援 per-task timeout""" + + @pytest.mark.asyncio + async def test_diagnose_uses_diagnose_timeout(self): + """DIAGNOSE context 應使用 NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS""" + from src.services.ai_providers.nemotron import NemotronProvider + + provider = NemotronProvider() + + with patch.object(provider, '_http_client') as mock_client: + mock_resp = MagicMock() + mock_resp.status_code = 200 + mock_resp.json.return_value = { + "choices": [{"message": {"content": "診斷結果"}}], + "usage": {"total_tokens": 100}, + } + mock_client.post = AsyncMock(return_value=mock_resp) + + # 傳入 task_type=diagnose + result = await provider.analyze( + prompt="測試診斷", + context={"task_type": "diagnose"}, + ) + + assert result.success is True + # timeout 的實際驗證透過 mock_client.post 呼叫時的 timeout 參數 + call_kwargs = mock_client.post.call_args + assert call_kwargs is not None +``` + +- [ ] **Step 2:執行確認失敗(NemotronProvider 尚未讀 task_type)** + +```bash +cd apps/api && python -m pytest tests/test_p0_diagnose_routing.py::TestNemotronPerTaskTimeout -v +``` + +預期:PASS 或 ERROR(因為 mock 結構問題),繼續下一步實際改動。 + +- [ ] **Step 3:修改 `nemotron.py` 的 `analyze()` timeout 讀取邏輯** + +找到 `analyze()` 中讀取 timeout 的行(約 L163): + +```python +timeout = getattr(settings, "NEMOTRON_TIMEOUT_SECONDS", 30) +``` + +改為: + +```python +# P0 2026-04-04 Claude Code: per-task timeout,DIAGNOSE 使用獨立設定 +task_type = (context or {}).get("task_type", "default") +if task_type == "diagnose": + timeout = getattr(settings, "NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS", 30) +else: + timeout = getattr(settings, "NEMOTRON_TIMEOUT_SECONDS", 30) +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p0_diagnose_routing.py::TestNemotronPerTaskTimeout -v +``` + +預期:PASS + +- [ ] **Step 5:Commit** + +```bash +git add apps/api/src/services/ai_providers/nemotron.py apps/api/tests/test_p0_diagnose_routing.py +git commit -m "feat(nemotron): per-task timeout,DIAGNOSE 使用獨立 timeout 設定 (P0)" +``` + +--- + +## Task 3:AIRouter 新增 `_local_fallback_chain` + REJECT 保護 + +**Files:** +- Modify: `apps/api/src/services/ai_router.py` + +- [ ] **Step 1:在測試檔案新增 local chain 測試** + +在 `tests/test_p0_diagnose_routing.py` 新增: + +```python +class TestLocalFallbackChain: + """require_local=True 時只走 local chain,全部失敗 → REJECT,不觸碰雲端""" + + @pytest.mark.asyncio + async def test_require_local_skips_cloud_providers(self): + """require_local=True 時,cloud provider 不被呼叫""" + from src.services.ai_router import AIRouter + from src.services.ai_providers.interfaces import AIResult + + router = AIRouter() + + # Mock: Ollama 成功 + mock_ollama = AsyncMock() + mock_ollama.name = "ollama" + mock_ollama.privacy_level = "local" + mock_ollama.is_enabled = True + mock_ollama.capabilities = {"rca", "chat"} + mock_ollama.analyze = AsyncMock(return_value=AIResult( + raw_response="本地診斷結果", + success=True, + provider="ollama", + )) + mock_ollama.health_check = AsyncMock(return_value=True) + + # Mock: Gemini(不應該被呼叫) + mock_gemini = AsyncMock() + mock_gemini.name = "gemini" + mock_gemini.privacy_level = "cloud" + mock_gemini.is_enabled = True + mock_gemini.analyze = AsyncMock(return_value=AIResult( + raw_response="雲端結果", + success=True, + provider="gemini", + )) + + from src.services.ai_providers.interfaces import AIProviderEnum + router._registry._providers = { + AIProviderEnum.OLLAMA: mock_ollama, + AIProviderEnum.GEMINI: mock_gemini, + } + + result = await router.execute( + prompt="診斷這個問題", + provider_order=["ollama", "gemini"], + require_local=True, + ) + + assert result.success is True + assert result.provider == "ollama" + mock_gemini.analyze.assert_not_called() + + @pytest.mark.asyncio + async def test_require_local_all_fail_returns_reject(self): + """require_local=True 且所有 local provider 失敗 → 回傳明確錯誤,不 fallback 雲端""" + from src.services.ai_router import AIRouter + from src.services.ai_providers.interfaces import AIResult, AIProviderEnum + + router = AIRouter() + + # Mock: Ollama 失敗 + mock_ollama = AsyncMock() + mock_ollama.name = "ollama" + mock_ollama.privacy_level = "local" + mock_ollama.is_enabled = True + mock_ollama.capabilities = {"rca", "chat"} + mock_ollama.analyze = AsyncMock(return_value=AIResult( + raw_response="", + success=False, + provider="ollama", + error="timeout", + )) + mock_ollama.health_check = AsyncMock(return_value=False) + + router._registry._providers = { + AIProviderEnum.OLLAMA: mock_ollama, + } + + result = await router.execute( + prompt="診斷這個問題", + provider_order=["ollama"], + require_local=True, + ) + + assert result.success is False + assert result.error == "local_providers_unavailable" +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p0_diagnose_routing.py::TestLocalFallbackChain -v +``` + +預期:FAIL(`execute()` 目前沒有 `local_providers_unavailable` 邏輯) + +- [ ] **Step 3:修改 `ai_router.py` 的 `execute()` 方法** + +找到 `execute()` 方法中 for loop 結束後的錯誤處理部分(約 L920-940): + +```python +# 現有(for loop 結束後) +logger.error("ai_router_execute_all_failed", ...) +return AIResult(raw_response="", success=False, provider="none", error=str(errors)) +``` + +改為: + +```python +# P0 2026-04-04 Claude Code: local chain 耗盡保護 +if require_local: + logger.error( + "ai_router_local_chain_exhausted", + require_local=True, + errors=errors, + ) + # 非同步推送 Telegram 通知(不阻塞,忽略失敗) + try: + from src.services.telegram_gateway import get_telegram_gateway + gw = get_telegram_gateway() + await gw.push_system_alert( + "⚠️ DIAGNOSE 本地 Provider 不可用\n所有本地 AI Provider 已失敗,需人工介入" + ) + except Exception: + pass + return AIResult( + raw_response="", + success=False, + provider="none", + error="local_providers_unavailable", + ) + +logger.error("ai_router_execute_all_failed", errors=errors) +return AIResult(raw_response="", success=False, provider="none", error=str(errors)) +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p0_diagnose_routing.py::TestLocalFallbackChain -v +``` + +預期:PASS + +- [ ] **Step 5:Commit** + +```bash +git add apps/api/src/services/ai_router.py apps/api/tests/test_p0_diagnose_routing.py +git commit -m "feat(ai-router): local chain 耗盡保護 — REJECT + Telegram 通知,不 fallback 雲端 (P0)" +``` + +--- + +## Task 4:DIAGNOSE intent override 升級至 Nemotron + +**Files:** +- Modify: `apps/api/src/services/ai_router.py:255` + +- [ ] **Step 1:新增 DIAGNOSE override 測試** + +在 `tests/test_p0_diagnose_routing.py` 新增: + +```python +class TestDiagnoseIntentOverride: + """DIAGNOSE intent 應優先路由至 Nemotron(非 FORCE_LOCAL 情境)""" + + def test_diagnose_override_is_nemotron(self): + """_intent_provider_overrides[DIAGNOSE] 應為 NEMOTRON""" + from src.services.ai_router import AIRouter + from src.services.intent_classifier import IntentType + from src.services.ai_router import AIProviderEnum + + router = AIRouter() + override = router._intent_provider_overrides.get(IntentType.DIAGNOSE) + assert override == AIProviderEnum.NEMOTRON, ( + f"DIAGNOSE 應路由至 NEMOTRON,實際為 {override}" + ) +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p0_diagnose_routing.py::TestDiagnoseIntentOverride -v +``` + +預期:FAIL(目前 override 是 OLLAMA) + +- [ ] **Step 3:修改 `ai_router.py` 的 `_intent_provider_overrides`** + +找到(約 L255): + +```python +IntentType.DIAGNOSE: AIProviderEnum.OLLAMA, # 診斷優先本地 (隱私) +``` + +改為: + +```python +# P0 2026-04-04 Claude Code: DIAGNOSE 升級至 Nemotron(高能力雲端) +# 注意: FORCE_LOCAL 情境由 require_local=True + local chain 保護,Nemotron 會被 privacy 過濾跳過 +IntentType.DIAGNOSE: AIProviderEnum.NEMOTRON, +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p0_diagnose_routing.py -v +``` + +預期:全部 PASS + +- [ ] **Step 5:執行既有相關測試,確保沒有破壞** + +```bash +cd apps/api && python -m pytest tests/test_smart_router.py tests/test_intent_classifier.py -v +``` + +預期:全部 PASS + +- [ ] **Step 6:Commit** + +```bash +git add apps/api/src/services/ai_router.py apps/api/tests/test_p0_diagnose_routing.py +git commit -m "feat(ai-router): DIAGNOSE intent override 升級至 Nemotron (P0)" +``` + +--- + +## Task 5:更新 Design Doc 記錄架構修正 + +**Files:** +- Modify: `docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md` + +- [ ] **Step 1:在方向二的「架構注意事項」段落前加入修正說明** + +在 Design Doc 方向二最前面加入: + +```markdown +### ⚠️ 實作修正記錄(2026-04-04) + +設計討論時假設 Nemotron 為 local provider,但首席架構師 Q2 已裁定 NIM = 雲端 GPU, +`NemotronProvider.privacy_level = "cloud"`。 + +實際實作調整為: +- FORCE_LOCAL 情境:`_local_fallback_chain = [OLLAMA]`(Nemotron 被 privacy 過濾正確排除) +- 非 FORCE_LOCAL 情境:DIAGNOSE override 改為 NEMOTRON(雲端高能力診斷) +- 兩種情境的隱私邊界均正確,設計意圖不變 +``` + +- [ ] **Step 2:Commit** + +```bash +git add docs/superpowers/specs/2026-04-04-nemotron-active-defense-design.md +git commit -m "docs(spec): 方向二實作修正記錄 — Nemotron privacy_level=cloud (P0)" +``` + +--- + +## 驗收標準 + +```bash +# 全部測試通過 +cd apps/api && python -m pytest tests/test_p0_diagnose_routing.py -v + +# 既有測試未破壞 +cd apps/api && python -m pytest tests/test_smart_router.py tests/test_intent_classifier.py tests/test_auto_repair_service.py -v + +# Config 環境變數可讀 +cd apps/api && python -c " +from src.core.config import get_settings +s = get_settings() +print('NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS:', s.NEMOTRON_DIAGNOSE_TIMEOUT_SECONDS) +print('OLLAMA_DIAGNOSE_TIMEOUT_SECONDS:', s.OLLAMA_DIAGNOSE_TIMEOUT_SECONDS) +" +``` + +**Co-Authored-By: Claude Sonnet 4.6 ** diff --git a/docs/superpowers/plans/2026-04-04-p1-knowledge-auto-harvesting.md b/docs/superpowers/plans/2026-04-04-p1-knowledge-auto-harvesting.md new file mode 100644 index 00000000..6f113058 --- /dev/null +++ b/docs/superpowers/plans/2026-04-04-p1-knowledge-auto-harvesting.md @@ -0,0 +1,1231 @@ +# P1:Knowledge Auto-Harvesting 實作計畫 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 修復成功後 Nemotron 自動生成完整 9 段 Runbook(DRAFT 待審核),修復失敗自動生成 ANTI_PATTERN(直接發布);執行前加入 Anti-Pattern 閉環攔截,阻止在同一個坑重複摔倒。 + +**Architecture:** 三個整合點:(1) `SymptomPattern.compute_hash()` — O(1) 確定性比對;(2) `AutoRepairService.evaluate_auto_repair()` — 執行前查 KB anti-pattern 閘門;(3) `AutoRepairService.execute_auto_repair()` — 執行後背景異步呼叫 `NemotronRunbookGenerator`。Runbook 生成完全異步,不阻塞主流程。 + +**Tech Stack:** Python 3.11, asyncio, structlog, Pydantic v2, existing NemotronProvider / KnowledgeService / TelegramGateway, pytest-asyncio + +--- + +## File Map + +| 動作 | 檔案 | 變更內容 | +|------|------|---------| +| 修改 | `apps/api/src/models/knowledge.py` | 新增 `ANTI_PATTERN` / `AUTO_RUNBOOK` EntryType;`KnowledgeEntry` 新增 `symptoms_hash` 欄位 | +| 修改 | `apps/api/src/models/playbook.py` | `SymptomPattern` 新增 `compute_hash()` 方法 | +| 修改 | `apps/api/src/services/knowledge_service.py` | 新增 `check_anti_pattern(symptoms_hash, days)` 方法 | +| 新增 | `apps/api/src/services/runbook_generator.py` | `NemotronRunbookGenerator` — 呼叫 NemotronProvider 生成 Runbook | +| 修改 | `apps/api/src/services/auto_repair_service.py` | `evaluate_auto_repair()` 加 anti_pattern gate;`execute_auto_repair()` 結束後背景觸發生成 | +| 新增 | `apps/api/migrations/phase8_knowledge_symptoms_hash.sql` | knowledge 表新增 `symptoms_hash VARCHAR(16)` + index | +| 新增 | `apps/api/tests/test_p1_knowledge_auto_harvesting.py` | 完整測試套件 | + +--- + +## Task 1:DB Migration — 新增 symptoms_hash 欄位 + +**Files:** +- Create: `apps/api/migrations/phase8_knowledge_symptoms_hash.sql` + +- [ ] **Step 1:建立 migration 檔案** + +```sql +-- phase8_knowledge_symptoms_hash.sql +-- Knowledge Auto-Harvesting: 新增 symptoms_hash 欄位 +-- 建立時間: 2026-04-04 (台北時區) +-- 建立者: Claude Code (P1 Knowledge Auto-Harvesting) + +-- 新增 symptoms_hash 欄位(可為 NULL,既有資料不受影響) +ALTER TABLE knowledge_entries +ADD COLUMN IF NOT EXISTS symptoms_hash VARCHAR(16) NULL; + +-- 建立 index 供 check_anti_pattern() O(1) 查詢使用 +CREATE INDEX IF NOT EXISTS idx_knowledge_entries_symptoms_hash + ON knowledge_entries (symptoms_hash) + WHERE symptoms_hash IS NOT NULL; + +-- 確認 +SELECT column_name, data_type, is_nullable +FROM information_schema.columns +WHERE table_name = 'knowledge_entries' + AND column_name = 'symptoms_hash'; +``` + +- [ ] **Step 2:確認 migration 語法** + +```bash +cat apps/api/migrations/phase8_knowledge_symptoms_hash.sql +``` + +- [ ] **Step 3:Commit(migration 先 commit,後續程式碼與之對應)** + +```bash +git add apps/api/migrations/phase8_knowledge_symptoms_hash.sql +git commit -m "feat(migration): knowledge_entries 新增 symptoms_hash 欄位 + index (P1)" +``` + +--- + +## Task 2:Model 更新 — EntryType + KnowledgeEntry + SymptomPattern + +**Files:** +- Modify: `apps/api/src/models/knowledge.py` +- Modify: `apps/api/src/models/playbook.py` +- Test: `apps/api/tests/test_p1_knowledge_auto_harvesting.py` + +- [ ] **Step 1:寫失敗測試** + +建立 `apps/api/tests/test_p1_knowledge_auto_harvesting.py`: + +```python +""" +P1 Knowledge Auto-Harvesting Tests +===================================== +測試 SymptomPattern.compute_hash()、Anti-Pattern 閉環、Runbook 生成 + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P1 Knowledge Auto-Harvesting) +""" + +import os +os.environ.setdefault("MOCK_MODE", "true") + +import hashlib +import json +import pytest + + +class TestSymptomPatternHash: + """SymptomPattern.compute_hash() — 確定性 hash""" + + def test_same_symptoms_same_hash(self): + """相同症狀永遠產生相同 hash""" + from src.models.playbook import SymptomPattern + + sp1 = SymptomPattern( + alert_names=["HighCPU", "PodCrash"], + affected_services=["awoooi-api"], + label_patterns={"namespace": "awoooi-prod"}, + ) + sp2 = SymptomPattern( + alert_names=["PodCrash", "HighCPU"], # 順序不同 + affected_services=["awoooi-api"], + label_patterns={"namespace": "awoooi-prod"}, + ) + + assert sp1.compute_hash() == sp2.compute_hash() + + def test_different_symptoms_different_hash(self): + """不同症狀產生不同 hash""" + from src.models.playbook import SymptomPattern + + sp1 = SymptomPattern(alert_names=["HighCPU"], affected_services=["api"]) + sp2 = SymptomPattern(alert_names=["OOMKilled"], affected_services=["api"]) + + assert sp1.compute_hash() != sp2.compute_hash() + + def test_hash_is_16_chars(self): + """hash 長度固定為 16""" + from src.models.playbook import SymptomPattern + + sp = SymptomPattern(alert_names=["Test"]) + assert len(sp.compute_hash()) == 16 + + def test_empty_symptoms_has_valid_hash(self): + """空症狀不 crash,產生合法 hash""" + from src.models.playbook import SymptomPattern + + sp = SymptomPattern() + h = sp.compute_hash() + assert len(h) == 16 + assert isinstance(h, str) + + +class TestKnowledgeEntryTypes: + """EntryType 新增 ANTI_PATTERN / AUTO_RUNBOOK""" + + def test_anti_pattern_entry_type_exists(self): + from src.models.knowledge import EntryType + assert EntryType.ANTI_PATTERN == "anti_pattern" + + def test_auto_runbook_entry_type_exists(self): + from src.models.knowledge import EntryType + assert EntryType.AUTO_RUNBOOK == "auto_runbook" + + def test_knowledge_entry_accepts_symptoms_hash(self): + """KnowledgeEntry 接受 symptoms_hash 欄位""" + from src.models.knowledge import KnowledgeEntry, EntryType, EntrySource, EntryStatus + + entry = KnowledgeEntry( + id="test-001", + title="測試 Runbook", + content="內容", + entry_type=EntryType.AUTO_RUNBOOK, + category="auto-generated", + source=EntrySource.AI, + status=EntryStatus.DRAFT, + symptoms_hash="abc123def456789a", + ) + assert entry.symptoms_hash == "abc123def456789a" + + def test_knowledge_entry_symptoms_hash_optional(self): + """symptoms_hash 為 Optional,不傳入不 crash""" + from src.models.knowledge import KnowledgeEntry, EntryType, EntrySource + + entry = KnowledgeEntry( + id="test-002", + title="手動 Runbook", + content="內容", + entry_type=EntryType.RUNBOOK, + category="manual", + source=EntrySource.HUMAN, + ) + assert entry.symptoms_hash is None +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py::TestSymptomPatternHash tests/test_p1_knowledge_auto_harvesting.py::TestKnowledgeEntryTypes -v +``` + +預期:FAIL(`compute_hash` 不存在,`ANTI_PATTERN` 不存在) + +- [ ] **Step 3:修改 `models/knowledge.py` — 新增 EntryType 和 symptoms_hash 欄位** + +找到 `EntryType` class(約 L30),在最後一個成員後新增: + +```python + ANTI_PATTERN = "anti_pattern" # 失敗案例,自動分類隔離,直接發布 + AUTO_RUNBOOK = "auto_runbook" # Nemotron 自動生成,待人工審核 +``` + +找到 `KnowledgeEntry` class(約 L80),在 `related_playbook_id` 欄位後新增: + +```python + symptoms_hash: str | None = Field( + default=None, + description="症狀特徵 hash,供 Anti-Pattern O(1) 精確比對(16 chars SHA256 前綴)", + ) +``` + +- [ ] **Step 4:修改 `models/playbook.py` — 新增 `compute_hash()` 方法** + +找到 `SymptomPattern` class(約 L66),在 `model_config` 上方新增: + +```python + def compute_hash(self) -> str: + """ + 確定性 hash:alert_names + affected_services + label_patterns + 目的:O(1) 精確比對,避免純語意搜尋的模糊性 + 順序無關(使用 sorted),跨平台一致(json.dumps sort_keys=True) + + 2026-04-04 Claude Code (P1): Anti-Pattern 閉環查詢鍵 + """ + import hashlib + import json as _json + key = ( + "|".join(sorted(self.alert_names)) + "||" + + "|".join(sorted(self.affected_services)) + "||" + + _json.dumps(self.label_patterns, sort_keys=True) + ) + return hashlib.sha256(key.encode()).hexdigest()[:16] +``` + +- [ ] **Step 5:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py::TestSymptomPatternHash tests/test_p1_knowledge_auto_harvesting.py::TestKnowledgeEntryTypes -v +``` + +預期:全部 PASS + +- [ ] **Step 6:確認既有測試不受影響** + +```bash +cd apps/api && python -m pytest tests/test_playbook_service.py -v +``` + +預期:全部 PASS + +- [ ] **Step 7:Commit** + +```bash +git add apps/api/src/models/knowledge.py apps/api/src/models/playbook.py apps/api/tests/test_p1_knowledge_auto_harvesting.py +git commit -m "feat(models): 新增 ANTI_PATTERN/AUTO_RUNBOOK EntryType + symptoms_hash + SymptomPattern.compute_hash() (P1)" +``` + +--- + +## Task 3:KnowledgeService 新增 `check_anti_pattern()` + +**Files:** +- Modify: `apps/api/src/services/knowledge_service.py` + +- [ ] **Step 1:新增測試** + +在 `tests/test_p1_knowledge_auto_harvesting.py` 新增: + +```python +class TestCheckAntiPattern: + """KnowledgeService.check_anti_pattern() — 7 天內 ANTI_PATTERN 查詢""" + + @pytest.mark.asyncio + async def test_returns_empty_when_no_anti_pattern(self): + """無 ANTI_PATTERN 記錄時回傳空 list""" + from src.services.knowledge_service import KnowledgeService + from unittest.mock import AsyncMock, MagicMock + + svc = KnowledgeService.__new__(KnowledgeService) + + mock_repo = AsyncMock() + mock_repo.find_by_symptoms_hash = AsyncMock(return_value=[]) + svc._repo = mock_repo + svc._embed_svc = MagicMock() + svc._pending_tasks = set() + + result = await svc.check_anti_pattern("abc123def456789a", days=7) + assert result == [] + + @pytest.mark.asyncio + async def test_returns_anti_patterns_within_days(self): + """7 天內有 ANTI_PATTERN → 回傳該記錄""" + from src.services.knowledge_service import KnowledgeService + from src.models.knowledge import KnowledgeEntry, EntryType, EntrySource, EntryStatus + from unittest.mock import AsyncMock, MagicMock + from datetime import datetime, timezone, timedelta + + svc = KnowledgeService.__new__(KnowledgeService) + + recent_entry = KnowledgeEntry( + id="ap-001", + title="Pod OOM 修復失敗", + content="失敗原因: ...", + entry_type=EntryType.ANTI_PATTERN, + category="auto-generated", + source=EntrySource.AI, + status=EntryStatus.PUBLISHED, + symptoms_hash="abc123def456789a", + created_at=datetime.now(timezone.utc) - timedelta(days=3), + ) + + mock_repo = AsyncMock() + mock_repo.find_by_symptoms_hash = AsyncMock(return_value=[recent_entry]) + svc._repo = mock_repo + svc._embed_svc = MagicMock() + svc._pending_tasks = set() + + result = await svc.check_anti_pattern("abc123def456789a", days=7) + assert len(result) == 1 + assert result[0].id == "ap-001" +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py::TestCheckAntiPattern -v +``` + +預期:FAIL(`check_anti_pattern` 不存在) + +- [ ] **Step 3:在 `knowledge_service.py` 新增方法** + +在 `KnowledgeService` class 中,`create_entry()` 方法之後新增: + +```python + async def check_anti_pattern( + self, + symptoms_hash: str, + days: int = 7, + ) -> list: + """ + 查詢近期相同症狀的失敗案例(ANTI_PATTERN) + + P1 2026-04-04 Claude Code: Anti-Pattern 閉環攔截查詢鍵 + 時間範圍: days 天內(預設 7 天) + 回傳: 符合條件的 KnowledgeEntry list(通常 0-3 筆) + + Args: + symptoms_hash: SymptomPattern.compute_hash() 產生的 16 char hash + days: 往回查幾天(預設 7) + + Returns: + list[KnowledgeEntry]: ANTI_PATTERN 記錄,空 list 表示無紀錄 + """ + from datetime import datetime, timezone, timedelta + from src.models.knowledge import EntryType + + cutoff = datetime.now(timezone.utc) - timedelta(days=days) + + try: + async with get_db_context() as db: + repo = KnowledgeDBRepository(db) + entries = await repo.find_by_symptoms_hash( + symptoms_hash=symptoms_hash, + entry_type=EntryType.ANTI_PATTERN, + created_after=cutoff, + ) + return entries + except Exception as e: + logger.error( + "check_anti_pattern_error", + symptoms_hash=symptoms_hash, + error=str(e), + ) + return [] # 查詢失敗時回傳空 list,不阻斷主流程 +``` + +- [ ] **Step 4:在 `knowledge_repository.py` 新增 `find_by_symptoms_hash()`** + +```bash +grep -n "class KnowledgeDBRepository\|async def find\|async def search" apps/api/src/repositories/knowledge_repository.py | head -15 +``` + +在 `KnowledgeDBRepository` class 中新增: + +```python + async def find_by_symptoms_hash( + self, + symptoms_hash: str, + entry_type, + created_after, + ) -> list: + """ + 依 symptoms_hash + entry_type + 時間範圍查詢 + + P1 2026-04-04 Claude Code: Anti-Pattern 閉環查詢 + """ + from sqlalchemy import select, and_ + from src.db.models import KnowledgeEntryORM + + stmt = select(KnowledgeEntryORM).where( + and_( + KnowledgeEntryORM.symptoms_hash == symptoms_hash, + KnowledgeEntryORM.entry_type == entry_type.value, + KnowledgeEntryORM.created_at >= created_after, + ) + ).order_by(KnowledgeEntryORM.created_at.desc()) + + result = await self._db.execute(stmt) + rows = result.scalars().all() + return [KnowledgeEntry.model_validate(row) for row in rows] +``` + +- [ ] **Step 5:確認 `KnowledgeEntryORM` 有 symptoms_hash 欄位** + +```bash +grep -n "symptoms_hash\|class KnowledgeEntryORM" apps/api/src/db/models.py | head -10 +``` + +若無,在 `KnowledgeEntryORM` class 中新增(在 `related_playbook_id` 後): + +```python + symptoms_hash: Mapped[str | None] = mapped_column(String(16), nullable=True, index=True) +``` + +- [ ] **Step 6:執行測試(使用 mock,不依賴真實 DB)** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py::TestCheckAntiPattern -v +``` + +預期:PASS + +- [ ] **Step 7:Commit** + +```bash +git add apps/api/src/services/knowledge_service.py apps/api/src/repositories/knowledge_repository.py apps/api/src/db/models.py apps/api/tests/test_p1_knowledge_auto_harvesting.py +git commit -m "feat(knowledge): check_anti_pattern() + find_by_symptoms_hash() + ORM symptoms_hash 欄位 (P1)" +``` + +--- + +## Task 4:AutoRepairService — 執行前 Anti-Pattern 閘門 + +**Files:** +- Modify: `apps/api/src/services/auto_repair_service.py` + +- [ ] **Step 1:新增閘門測試** + +在 `tests/test_p1_knowledge_auto_harvesting.py` 新增: + +```python +class TestAntiPatternGate: + """AutoRepairService.evaluate_auto_repair() — Anti-Pattern 閉環閘門""" + + @pytest.mark.asyncio + async def test_blocked_when_anti_pattern_exists(self): + """7 天內有 ANTI_PATTERN → 阻斷自動修復,強制 HITL""" + from src.services.auto_repair_service import AutoRepairService + from src.models.incident import Incident, IncidentStatus, Severity, Signal + from src.models.playbook import SymptomPattern + from src.models.knowledge import KnowledgeEntry, EntryType, EntrySource, EntryStatus + from unittest.mock import AsyncMock, MagicMock, patch + from src.utils.timezone import now_taipei + + # 建立 mock knowledge service,回傳一筆 ANTI_PATTERN + mock_knowledge_svc = AsyncMock() + anti_pattern_entry = MagicMock(spec=KnowledgeEntry) + anti_pattern_entry.title = "Pod OOM 修復失敗(已知無效)" + mock_knowledge_svc.check_anti_pattern = AsyncMock(return_value=[anti_pattern_entry]) + + svc = AutoRepairService(knowledge_service=mock_knowledge_svc) + + incident = Incident( + incident_id="INC-001", + title="Pod OOMKilled", + status=IncidentStatus.OPEN, + severity=Severity.P2, + signals=[Signal(name="OOMKilled", value="1", source="prometheus")], + created_at=now_taipei(), + ) + + decision = await svc.evaluate_auto_repair(incident) + + assert decision.can_auto_repair is False + assert decision.blocked_by == "ANTI_PATTERN" + assert "Pod OOM 修復失敗" in decision.reason + + @pytest.mark.asyncio + async def test_proceeds_when_no_anti_pattern(self): + """無 ANTI_PATTERN 記錄 → 繼續原有評估流程""" + from src.services.auto_repair_service import AutoRepairService + from src.models.incident import Incident, IncidentStatus, Severity, Signal + from unittest.mock import AsyncMock, MagicMock + from src.utils.timezone import now_taipei + + mock_knowledge_svc = AsyncMock() + mock_knowledge_svc.check_anti_pattern = AsyncMock(return_value=[]) + + mock_playbook_svc = AsyncMock() + mock_playbook_svc.get_recommendations = AsyncMock(return_value=[]) + + svc = AutoRepairService( + knowledge_service=mock_knowledge_svc, + playbook_service=mock_playbook_svc, + ) + + incident = Incident( + incident_id="INC-002", + title="高 CPU", + status=IncidentStatus.OPEN, + severity=Severity.P2, + signals=[Signal(name="HighCPU", value="95", source="prometheus")], + created_at=now_taipei(), + ) + + decision = await svc.evaluate_auto_repair(incident) + + # 無 Playbook match → can_auto_repair=False,但 blocked_by 不是 ANTI_PATTERN + assert decision.blocked_by != "ANTI_PATTERN" + mock_knowledge_svc.check_anti_pattern.assert_called_once() +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py::TestAntiPatternGate -v +``` + +預期:FAIL(`AutoRepairService` 不接受 `knowledge_service` 參數) + +- [ ] **Step 3:修改 `auto_repair_service.py` — 注入 KnowledgeService + 加閘門** + +找到 `AutoRepairService.__init__()` 方法(約 L140),新增 `knowledge_service` 參數: + +```python + def __init__( + self, + playbook_service: IPlaybookService | None = None, + knowledge_service=None, # P1 2026-04-04 Claude Code: Anti-Pattern gate + ) -> None: + self._playbook_service = playbook_service or get_playbook_service() + # P1 2026-04-04 Claude Code: 延遲 import 避免循環依賴 + self._knowledge_service = knowledge_service +``` + +找到 `evaluate_auto_repair()` 方法中,`symptoms = self._extract_symptoms(incident)` 這行(約 L197),在其後、`get_recommendations()` 呼叫前插入: + +```python + # P1 2026-04-04 Claude Code: Anti-Pattern 閉環閘門 + # 查詢 7 天內相同症狀的失敗案例,避免重複踩坑 + symptoms_hash = symptoms.compute_hash() + try: + if self._knowledge_service is None: + from src.services.knowledge_service import get_knowledge_service + self._knowledge_service = get_knowledge_service() + + anti_patterns = await self._knowledge_service.check_anti_pattern( + symptoms_hash=symptoms_hash, + days=7, + ) + if anti_patterns: + logger.warning( + "auto_repair_blocked_anti_pattern", + incident_id=incident.incident_id, + symptoms_hash=symptoms_hash, + anti_pattern_title=anti_patterns[0].title, + ) + return AutoRepairDecision( + can_auto_repair=False, + blocked_by="ANTI_PATTERN", + reason=f"過去 7 天有失敗案例: {anti_patterns[0].title}", + ) + except Exception as e: + # 查詢失敗不阻斷,記錄 warning 後繼續 + logger.warning( + "auto_repair_anti_pattern_check_failed", + incident_id=incident.incident_id, + error=str(e), + ) +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py::TestAntiPatternGate -v +``` + +預期:PASS + +- [ ] **Step 5:確認既有 auto_repair 測試不受影響** + +```bash +cd apps/api && python -m pytest tests/test_auto_repair_service.py -v +``` + +預期:全部 PASS + +- [ ] **Step 6:Commit** + +```bash +git add apps/api/src/services/auto_repair_service.py apps/api/tests/test_p1_knowledge_auto_harvesting.py +git commit -m "feat(auto-repair): Anti-Pattern 閉環閘門 — 7 天內失敗案例直接阻斷 (P1)" +``` + +--- + +## Task 5:NemotronRunbookGenerator 服務 + +**Files:** +- Create: `apps/api/src/services/runbook_generator.py` + +- [ ] **Step 1:新增測試** + +在 `tests/test_p1_knowledge_auto_harvesting.py` 新增: + +```python +class TestNemotronRunbookGenerator: + """NemotronRunbookGenerator — 生成 Runbook / Anti-Pattern 條目""" + + @pytest.mark.asyncio + async def test_generate_success_creates_auto_runbook(self): + """修復成功 → 生成 AUTO_RUNBOOK(status=DRAFT)""" + from src.services.runbook_generator import NemotronRunbookGenerator + from src.models.incident import Incident, IncidentStatus, Severity, Signal + from src.models.playbook import Playbook, PlaybookStatus, SymptomPattern + from src.services.auto_repair_service import AutoRepairResult + from src.models.knowledge import EntryType, EntryStatus + from src.services.ai_providers.interfaces import AIResult + from unittest.mock import AsyncMock, MagicMock, patch + from src.utils.timezone import now_taipei + + mock_ai_router = AsyncMock() + mock_ai_router.execute = AsyncMock(return_value=AIResult( + raw_response='{"symptoms_description":"Pod 記憶體不足","root_cause_analysis":"OOM","execution_steps":["kubectl rollout restart"],"verification_steps":["kubectl get pods"],"precautions":["備份資料"],"impact_scope":"awoooi-api","related_incident_ids":["INC-001"],"prevention_suggestions":["增加記憶體限制"],"applicable_conditions":["OOMKilled 告警"]}', + success=True, + provider="nemotron", + )) + + mock_knowledge_svc = AsyncMock() + created_entry = MagicMock() + created_entry.id = "kb-001" + mock_knowledge_svc.create_entry = AsyncMock(return_value=created_entry) + + gen = NemotronRunbookGenerator( + ai_router=mock_ai_router, + knowledge_service=mock_knowledge_svc, + ) + + incident = Incident( + incident_id="INC-001", + title="Pod OOMKilled", + status=IncidentStatus.RESOLVED, + severity=Severity.P2, + signals=[Signal(name="OOMKilled", value="1", source="prometheus")], + created_at=now_taipei(), + ) + playbook = Playbook( + playbook_id="PB-001", + name="OOM Restart", + status=PlaybookStatus.ACTIVE, + symptom_pattern=SymptomPattern(alert_names=["OOMKilled"]), + repair_steps=[], + ) + repair_result = AutoRepairResult( + success=True, + playbook_id="PB-001", + incident_id="INC-001", + executed_steps=["kubectl rollout restart deployment/awoooi-api"], + ) + + entry = await gen.generate(incident, playbook, repair_result) + + assert entry is not None + mock_knowledge_svc.create_entry.assert_called_once() + call_args = mock_knowledge_svc.create_entry.call_args[0][0] + assert call_args.entry_type == EntryType.AUTO_RUNBOOK + assert call_args.status == EntryStatus.DRAFT + + @pytest.mark.asyncio + async def test_generate_anti_pattern_creates_published_entry(self): + """修復失敗 → 生成 ANTI_PATTERN(status=PUBLISHED,直接發布)""" + from src.services.runbook_generator import NemotronRunbookGenerator + from src.models.incident import Incident, IncidentStatus, Severity, Signal + from src.models.playbook import Playbook, PlaybookStatus, SymptomPattern + from src.services.auto_repair_service import AutoRepairResult + from src.models.knowledge import EntryType, EntryStatus + from src.services.ai_providers.interfaces import AIResult + from unittest.mock import AsyncMock, MagicMock + from src.utils.timezone import now_taipei + + mock_ai_router = AsyncMock() + mock_ai_router.execute = AsyncMock(return_value=AIResult( + raw_response='{"failure_reason":"kubectl 超時","ineffective_steps":["kubectl rollout restart"],"alternative_suggestions":["手動 drain node"],"applicable_conditions":["節點磁碟壓力時此方法無效"]}', + success=True, + provider="nemotron", + )) + + mock_knowledge_svc = AsyncMock() + created_entry = MagicMock() + created_entry.id = "kb-002" + mock_knowledge_svc.create_entry = AsyncMock(return_value=created_entry) + + gen = NemotronRunbookGenerator( + ai_router=mock_ai_router, + knowledge_service=mock_knowledge_svc, + ) + + incident = Incident( + incident_id="INC-002", + title="Pod OOMKilled", + status=IncidentStatus.OPEN, + severity=Severity.P2, + signals=[Signal(name="OOMKilled", value="1", source="prometheus")], + created_at=now_taipei(), + ) + playbook = Playbook( + playbook_id="PB-001", + name="OOM Restart", + status=PlaybookStatus.ACTIVE, + symptom_pattern=SymptomPattern(alert_names=["OOMKilled"]), + repair_steps=[], + ) + repair_result = AutoRepairResult( + success=False, + playbook_id="PB-001", + incident_id="INC-002", + executed_steps=[], + error="kubectl timeout", + ) + + entry = await gen.generate_anti_pattern(incident, playbook, repair_result) + + assert entry is not None + call_args = mock_knowledge_svc.create_entry.call_args[0][0] + assert call_args.entry_type == EntryType.ANTI_PATTERN + assert call_args.status == EntryStatus.PUBLISHED +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py::TestNemotronRunbookGenerator -v +``` + +預期:FAIL(`runbook_generator` 不存在) + +- [ ] **Step 3:建立 `services/runbook_generator.py`** + +```python +""" +Runbook Generator Service - P1 Knowledge Auto-Harvesting +========================================================= +修復成功/失敗後,由 Nemotron 自動生成 Runbook / Anti-Pattern 條目 + +設計原則: +- 完全異步,不阻塞 AutoRepairService 主流程 +- 生成失敗不影響修復結果(防禦性工程) +- SUCCESS → AUTO_RUNBOOK (DRAFT) → 人工審核 +- FAILURE → ANTI_PATTERN (PUBLISHED) → 直接發布 + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P1 Knowledge Auto-Harvesting) +""" + +from __future__ import annotations + +import json +from typing import TYPE_CHECKING, Any + +import structlog + +from src.models.knowledge import EntrySource, EntryStatus, EntryType, KnowledgeEntryCreate + +if TYPE_CHECKING: + from src.models.incident import Incident + from src.models.playbook import Playbook + from src.services.auto_repair_service import AutoRepairResult + +logger = structlog.get_logger(__name__) + +# ============================================================================= +# Prompts +# ============================================================================= + +_SUCCESS_PROMPT_TEMPLATE = """你是 SRE 知識管理系統。根據以下修復案例,生成一份完整的 Runbook。 + +## 事件資訊 +- Incident ID: {incident_id} +- 標題: {incident_title} +- 嚴重度: {severity} +- 告警: {alert_names} +- 受影響服務: {affected_services} + +## 執行的 Playbook +- Playbook ID: {playbook_id} +- Playbook 名稱: {playbook_name} + +## 執行結果 +- 狀態: 成功 +- 執行步驟: {executed_steps} + +請以 JSON 格式回應,包含以下欄位: +{{ + "symptoms_description": "症狀描述", + "root_cause_analysis": "根因分析", + "execution_steps": ["步驟1", "步驟2"], + "verification_steps": ["驗證1", "驗證2"], + "precautions": ["注意1", "注意2"], + "impact_scope": "影響範圍", + "related_incident_ids": ["{incident_id}"], + "prevention_suggestions": ["預防1", "預防2"], + "applicable_conditions": ["條件1", "條件2"] +}}""" + +_FAILURE_PROMPT_TEMPLATE = """你是 SRE 知識管理系統。根據以下修復失敗案例,生成一份 Anti-Pattern 記錄。 + +## 事件資訊 +- Incident ID: {incident_id} +- 標題: {incident_title} +- 嚴重度: {severity} +- 告警: {alert_names} + +## 嘗試的 Playbook +- Playbook ID: {playbook_id} +- Playbook 名稱: {playbook_name} + +## 失敗資訊 +- 錯誤: {error} +- 已執行步驟: {executed_steps} + +請以 JSON 格式回應,包含以下欄位: +{{ + "failure_reason": "失敗原因", + "ineffective_steps": ["無效步驟1"], + "alternative_suggestions": ["替代方案1"], + "applicable_conditions": ["此方法不適用的條件"] +}}""" + + +# ============================================================================= +# NemotronRunbookGenerator +# ============================================================================= + + +class NemotronRunbookGenerator: + """ + Nemotron 驅動的 Runbook 生成器 + + 職責: 接收修復結果 → 呼叫 Nemotron → 寫入 KB + 不負責: Telegram 推送(由 caller 負責)、主流程錯誤處理 + """ + + def __init__( + self, + ai_router=None, + knowledge_service=None, + ) -> None: + self._ai_router = ai_router + self._knowledge_service = knowledge_service + + def _get_ai_router(self): + if self._ai_router is None: + from src.services.ai_router import get_ai_router + self._ai_router = get_ai_router() + return self._ai_router + + def _get_knowledge_service(self): + if self._knowledge_service is None: + from src.services.knowledge_service import get_knowledge_service + self._knowledge_service = get_knowledge_service() + return self._knowledge_service + + async def generate( + self, + incident: Incident, + playbook: Playbook, + repair_result: AutoRepairResult, + ): + """ + 修復成功後生成完整 Runbook → KB (AUTO_RUNBOOK, DRAFT) + + Returns: + KnowledgeEntry | None: 建立成功的條目,失敗時回傳 None(不 raise) + """ + symptoms_hash = playbook.symptom_pattern.compute_hash() + prompt = _SUCCESS_PROMPT_TEMPLATE.format( + incident_id=incident.incident_id, + incident_title=incident.title, + severity=incident.severity.value, + alert_names=", ".join(playbook.symptom_pattern.alert_names), + affected_services=", ".join(playbook.symptom_pattern.affected_services), + playbook_id=playbook.playbook_id, + playbook_name=playbook.name, + executed_steps="\n".join(repair_result.executed_steps), + ) + + try: + router = self._get_ai_router() + result = await router.execute( + prompt=prompt, + provider_order=["nemotron", "gemini"], + context={"task_type": "runbook_generation"}, + ) + + if not result.success: + logger.error("runbook_generation_ai_failed", incident_id=incident.incident_id) + return None + + content = self._build_success_content(result.raw_response) + title = f"[AUTO] {incident.title} — Runbook" + + svc = self._get_knowledge_service() + entry = await svc.create_entry(KnowledgeEntryCreate( + title=title, + content=content, + entry_type=EntryType.AUTO_RUNBOOK, + category="auto-generated", + tags=playbook.symptom_pattern.alert_names[:3], + source=EntrySource.AI, + status=EntryStatus.DRAFT, + related_incident_id=incident.incident_id, + related_playbook_id=playbook.playbook_id, + symptoms_hash=symptoms_hash, + )) + + logger.info( + "runbook_generated", + incident_id=incident.incident_id, + entry_id=entry.id, + symptoms_hash=symptoms_hash, + ) + return entry + + except Exception as e: + logger.error( + "runbook_generation_error", + incident_id=incident.incident_id, + error=str(e), + ) + return None + + async def generate_anti_pattern( + self, + incident: Incident, + playbook: Playbook, + repair_result: AutoRepairResult, + ): + """ + 修復失敗後生成 Anti-Pattern → KB (ANTI_PATTERN, PUBLISHED) + + Returns: + KnowledgeEntry | None: 建立成功的條目,失敗時回傳 None(不 raise) + """ + symptoms_hash = playbook.symptom_pattern.compute_hash() + prompt = _FAILURE_PROMPT_TEMPLATE.format( + incident_id=incident.incident_id, + incident_title=incident.title, + severity=incident.severity.value, + alert_names=", ".join(playbook.symptom_pattern.alert_names), + playbook_id=playbook.playbook_id, + playbook_name=playbook.name, + error=repair_result.error or "未知錯誤", + executed_steps="\n".join(repair_result.executed_steps), + ) + + try: + router = self._get_ai_router() + result = await router.execute( + prompt=prompt, + provider_order=["nemotron", "gemini"], + context={"task_type": "anti_pattern_generation"}, + ) + + if not result.success: + logger.error("anti_pattern_generation_ai_failed", incident_id=incident.incident_id) + return None + + content = self._build_failure_content(result.raw_response) + title = f"[ANTI-PATTERN] {incident.title} — 已知無效方案" + + svc = self._get_knowledge_service() + entry = await svc.create_entry(KnowledgeEntryCreate( + title=title, + content=content, + entry_type=EntryType.ANTI_PATTERN, + category="auto-generated", + tags=playbook.symptom_pattern.alert_names[:3], + source=EntrySource.AI, + status=EntryStatus.PUBLISHED, + related_incident_id=incident.incident_id, + related_playbook_id=playbook.playbook_id, + symptoms_hash=symptoms_hash, + )) + + logger.info( + "anti_pattern_recorded", + incident_id=incident.incident_id, + entry_id=entry.id, + symptoms_hash=symptoms_hash, + ) + return entry + + except Exception as e: + logger.error( + "anti_pattern_generation_error", + incident_id=incident.incident_id, + error=str(e), + ) + return None + + def _build_success_content(self, raw_response: str) -> str: + """將 Nemotron JSON 回應轉為 Markdown 格式 Runbook""" + try: + data = json.loads(raw_response) + except json.JSONDecodeError: + return raw_response # fallback: 原始文字 + + sections = [ + f"## 症狀描述\n{data.get('symptoms_description', '')}", + f"## 根因分析\n{data.get('root_cause_analysis', '')}", + "## 執行步驟\n" + "\n".join(f"{i+1}. {s}" for i, s in enumerate(data.get('execution_steps', []))), + "## 驗證步驟\n" + "\n".join(f"- {s}" for s in data.get('verification_steps', [])), + "## 注意事項\n" + "\n".join(f"- {s}" for s in data.get('precautions', [])), + f"## 影響範圍\n{data.get('impact_scope', '')}", + "## 相關 Incident\n" + "\n".join(f"- {i}" for i in data.get('related_incident_ids', [])), + "## 下次預防建議\n" + "\n".join(f"- {s}" for s in data.get('prevention_suggestions', [])), + "## 適用條件\n" + "\n".join(f"- {s}" for s in data.get('applicable_conditions', [])), + ] + return "\n\n".join(sections) + + def _build_failure_content(self, raw_response: str) -> str: + """將 Nemotron JSON 回應轉為 Markdown 格式 Anti-Pattern""" + try: + data = json.loads(raw_response) + except json.JSONDecodeError: + return raw_response + + sections = [ + f"## 失敗原因\n{data.get('failure_reason', '')}", + "## 無效步驟\n" + "\n".join(f"- {s}" for s in data.get('ineffective_steps', [])), + "## 替代方案建議\n" + "\n".join(f"- {s}" for s in data.get('alternative_suggestions', [])), + "## 不適用條件\n" + "\n".join(f"- {s}" for s in data.get('applicable_conditions', [])), + ] + return "\n\n".join(sections) + + +# ============================================================================= +# Singleton +# ============================================================================= + +_runbook_generator: NemotronRunbookGenerator | None = None + + +def get_runbook_generator() -> NemotronRunbookGenerator: + global _runbook_generator + if _runbook_generator is None: + _runbook_generator = NemotronRunbookGenerator() + return _runbook_generator +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py::TestNemotronRunbookGenerator -v +``` + +預期:PASS + +- [ ] **Step 5:Commit** + +```bash +git add apps/api/src/services/runbook_generator.py apps/api/tests/test_p1_knowledge_auto_harvesting.py +git commit -m "feat(runbook-generator): NemotronRunbookGenerator — SUCCESS→AUTO_RUNBOOK / FAILURE→ANTI_PATTERN (P1)" +``` + +--- + +## Task 6:AutoRepairService — 執行後背景觸發生成 + +**Files:** +- Modify: `apps/api/src/services/auto_repair_service.py` + +- [ ] **Step 1:在 `execute_auto_repair()` 結束後加入背景觸發** + +找到 `execute_auto_repair()` 中 `return AutoRepairResult(success=True, ...)` 的行(約 L330),在 `return` 前插入: + +```python + # P1 2026-04-04 Claude Code: 背景異步生成 Runbook(不阻塞主流程) + self._schedule_runbook_generation(incident, playbook, result=AutoRepairResult( + success=True, + playbook_id=playbook.playbook_id, + incident_id=incident.incident_id, + executed_steps=executed_steps, + execution_time_ms=execution_time, + )) +``` + +找到 `return AutoRepairResult(success=False, ...)` 的行(約 L350),在 `return` 前插入: + +```python + # P1 2026-04-04 Claude Code: 背景異步生成 Anti-Pattern(不阻塞主流程) + failure_result = AutoRepairResult( + success=False, + playbook_id=playbook.playbook_id, + incident_id=incident.incident_id, + executed_steps=executed_steps, + error=str(e), + execution_time_ms=execution_time, + ) + self._schedule_anti_pattern_generation(incident, playbook, failure_result) +``` + +在 `AutoRepairService` class 末尾新增兩個私有方法: + +```python + def _schedule_runbook_generation( + self, + incident, + playbook, + result, + ) -> None: + """ + 背景排程 Runbook 生成(fire-and-forget,不阻塞主流程) + + P1 2026-04-04 Claude Code: 異步不阻塞設計 + """ + import asyncio + + async def _generate(): + try: + from src.services.runbook_generator import get_runbook_generator + gen = get_runbook_generator() + entry = await gen.generate(incident, playbook, result) + if entry: + logger.info( + "runbook_generation_scheduled_done", + incident_id=incident.incident_id, + entry_id=entry.id, + ) + except Exception as e: + logger.error( + "runbook_generation_scheduled_error", + incident_id=incident.incident_id, + error=str(e), + ) + + task = asyncio.create_task(_generate()) + # 防止 GC 提前回收 task + self._background_tasks = getattr(self, "_background_tasks", set()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) + + def _schedule_anti_pattern_generation( + self, + incident, + playbook, + result, + ) -> None: + """ + 背景排程 Anti-Pattern 生成(fire-and-forget,不阻塞主流程) + + P1 2026-04-04 Claude Code: 異步不阻塞設計 + """ + import asyncio + + async def _generate(): + try: + from src.services.runbook_generator import get_runbook_generator + gen = get_runbook_generator() + entry = await gen.generate_anti_pattern(incident, playbook, result) + if entry: + logger.info( + "anti_pattern_generation_scheduled_done", + incident_id=incident.incident_id, + entry_id=entry.id, + ) + except Exception as e: + logger.error( + "anti_pattern_generation_scheduled_error", + incident_id=incident.incident_id, + error=str(e), + ) + + task = asyncio.create_task(_generate()) + self._background_tasks = getattr(self, "_background_tasks", set()) + self._background_tasks.add(task) + task.add_done_callback(self._background_tasks.discard) +``` + +- [ ] **Step 2:執行全部 P1 測試** + +```bash +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py -v +``` + +預期:全部 PASS + +- [ ] **Step 3:確認既有測試不受影響** + +```bash +cd apps/api && python -m pytest tests/test_auto_repair_service.py tests/test_playbook_service.py -v +``` + +預期:全部 PASS + +- [ ] **Step 4:Commit** + +```bash +git add apps/api/src/services/auto_repair_service.py +git commit -m "feat(auto-repair): 執行後背景觸發 Runbook/Anti-Pattern 生成(異步不阻塞)(P1)" +``` + +--- + +## 驗收標準 + +```bash +# 全部 P1 測試通過 +cd apps/api && python -m pytest tests/test_p1_knowledge_auto_harvesting.py -v + +# 既有測試未破壞 +cd apps/api && python -m pytest tests/test_auto_repair_service.py tests/test_playbook_service.py tests/test_smart_router.py -v + +# Migration 語法驗證 +psql -h localhost -U postgres -d awoooi_dev -f apps/api/migrations/phase8_knowledge_symptoms_hash.sql +``` + +**Co-Authored-By: Claude Sonnet 4.6 ** diff --git a/docs/superpowers/plans/2026-04-04-p2-config-drift-detection.md b/docs/superpowers/plans/2026-04-04-p2-config-drift-detection.md new file mode 100644 index 00000000..528cbcb1 --- /dev/null +++ b/docs/superpowers/plans/2026-04-04-p2-config-drift-detection.md @@ -0,0 +1,1532 @@ +# P2:Config Drift Detection 實作計畫 + +> **For agentic workers:** REQUIRED SUB-SKILL: Use superpowers:subagent-driven-development (recommended) or superpowers:executing-plans to implement this plan task-by-task. Steps use checkbox (`- [ ]`) syntax for tracking. + +**Goal:** 偵測 K8s 實際狀態 vs Git YAML 漂移,白名單欄位靜默記錄,結構性漂移由 Nemotron 做意圖分析,Telegram 推送給統帥確認後執行確定性修復(kubectl apply 或 git push)。 + +**Architecture:** 四個獨立 Service 各有單一職責:DriftDetector(比對),DriftAnalyzer(分級),DriftInterpreter(Nemotron 意圖分析),DriftRemediator(確定性修復)。API endpoint 作為統一入口,同時服務 Gitea CD Webhook 與每小時 K8s CronJob。Telegram 按鈕互動沿用現有 `telegram_gateway.py` 的 callback 機制。 + +**Tech Stack:** Python 3.11, asyncio, structlog, subprocess(kubectl/git), PyYAML, Pydantic v2, K8s CronJob, pytest-asyncio + +--- + +## File Map + +| 動作 | 檔案 | 變更內容 | +|------|------|---------| +| 新增 | `apps/api/src/models/drift.py` | `DriftItem`, `DriftReport`, `DriftLevel`, `DriftIntent` | +| 新增 | `apps/api/src/services/drift_detector.py` | `GitStateReader` + `K8sStateReader` + `DriftDetector` | +| 新增 | `apps/api/src/services/drift_analyzer.py` | 白名單過濾 + `DriftLevel` 分級 | +| 新增 | `apps/api/src/services/drift_interpreter.py` | `NemotronDriftInterpreter`(意圖分析) | +| 新增 | `apps/api/src/services/drift_remediator.py` | `rollback()`(kubectl apply)+ `adopt()`(git push) | +| 新增 | `apps/api/src/api/v1/drift.py` | `POST /internal/drift/scan` endpoint | +| 修改 | `apps/api/src/api/v1/__init__.py` 或 router | 註冊 drift router | +| 新增 | `apps/api/migrations/phase9_drift_reports.sql` | `drift_reports` 表 | +| 新增 | `k8s/drift-cronjob.yaml` | 每小時 K8s CronJob | +| 新增 | `apps/api/tests/test_p2_config_drift_detection.py` | 完整測試套件 | + +--- + +## Task 1:DB Migration — drift_reports 表 + +**Files:** +- Create: `apps/api/migrations/phase9_drift_reports.sql` + +- [ ] **Step 1:建立 migration** + +```sql +-- phase9_drift_reports.sql +-- Config Drift Detection: drift_reports 表 +-- 建立時間: 2026-04-04 (台北時區) +-- 建立者: Claude Code (P2 Config Drift Detection) + +CREATE TABLE IF NOT EXISTS drift_reports ( + id VARCHAR(36) PRIMARY KEY DEFAULT gen_random_uuid()::text, + scan_trigger VARCHAR(20) NOT NULL, -- 'webhook' | 'cron' + namespace VARCHAR(100) NOT NULL, + resource_kind VARCHAR(50) NOT NULL, -- 'Deployment' | 'Service' | 'ConfigMap' + resource_name VARCHAR(200) NOT NULL, + field_path VARCHAR(200) NOT NULL, -- 'spec.template.spec.containers[0].image' + git_value TEXT, + actual_value TEXT, + drift_level VARCHAR(10) NOT NULL, -- 'info' | 'medium' | 'high' + intent VARCHAR(30), -- 'emergency_hotfix' | 'human_error' | 'unknown' + intent_explanation TEXT, + confidence FLOAT, + status VARCHAR(20) NOT NULL DEFAULT 'pending', -- 'pending' | 'rolled_back' | 'adopted' | 'ignored' + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + resolved_at TIMESTAMPTZ +); + +CREATE INDEX IF NOT EXISTS idx_drift_reports_status ON drift_reports (status); +CREATE INDEX IF NOT EXISTS idx_drift_reports_created_at ON drift_reports (created_at DESC); +CREATE INDEX IF NOT EXISTS idx_drift_reports_namespace ON drift_reports (namespace, resource_kind); + +-- 確認 +SELECT table_name FROM information_schema.tables WHERE table_name = 'drift_reports'; +``` + +- [ ] **Step 2:Commit** + +```bash +git add apps/api/migrations/phase9_drift_reports.sql +git commit -m "feat(migration): drift_reports 表 (P2)" +``` + +--- + +## Task 2:Drift Models + +**Files:** +- Create: `apps/api/src/models/drift.py` +- Test: `apps/api/tests/test_p2_config_drift_detection.py` + +- [ ] **Step 1:寫失敗測試** + +建立 `apps/api/tests/test_p2_config_drift_detection.py`: + +```python +""" +P2 Config Drift Detection Tests +================================ +測試 DriftDetector / DriftAnalyzer / DriftInterpreter / DriftRemediator + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P2 Config Drift Detection) +""" + +import os +os.environ.setdefault("MOCK_MODE", "true") + +import pytest + + +class TestDriftModels: + """DriftItem / DriftLevel / DriftIntent model 驗證""" + + def test_drift_item_creation(self): + from src.models.drift import DriftItem, DriftLevel + + item = DriftItem( + resource_kind="Deployment", + resource_name="awoooi-api", + namespace="awoooi-prod", + field_path="spec.template.spec.containers[0].image", + git_value="harbor.wooo.work/awoooi/api:v1.2", + actual_value="harbor.wooo.work/awoooi/api:v1.2-hotfix", + drift_level=DriftLevel.HIGH, + ) + assert item.drift_level == DriftLevel.HIGH + assert item.field_path == "spec.template.spec.containers[0].image" + + def test_drift_level_enum(self): + from src.models.drift import DriftLevel + assert DriftLevel.INFO == "info" + assert DriftLevel.MEDIUM == "medium" + assert DriftLevel.HIGH == "high" + + def test_drift_intent_enum(self): + from src.models.drift import DriftIntent + assert DriftIntent.EMERGENCY_HOTFIX == "emergency_hotfix" + assert DriftIntent.HUMAN_ERROR == "human_error" + assert DriftIntent.AUTOMATED_CHANGE == "automated_change" + assert DriftIntent.UNKNOWN == "unknown" + + def test_drift_report_creation(self): + from src.models.drift import DriftReport, DriftItem, DriftLevel, DriftStatus + + report = DriftReport( + scan_trigger="cron", + namespace="awoooi-prod", + items=[ + DriftItem( + resource_kind="Deployment", + resource_name="awoooi-api", + namespace="awoooi-prod", + field_path="spec.replicas", + git_value="2", + actual_value="3", + drift_level=DriftLevel.INFO, + ) + ], + ) + assert len(report.items) == 1 + assert report.scan_trigger == "cron" +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftModels -v +``` + +預期:FAIL(`models.drift` 不存在) + +- [ ] **Step 3:建立 `models/drift.py`** + +```python +""" +Drift Models - P2 Config Drift Detection +========================================== +Config Drift Detection 資料模型 + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P2 Config Drift Detection) +""" + +from __future__ import annotations + +from enum import Enum +from typing import Optional + +from pydantic import BaseModel, Field + +from src.utils.timezone import now_taipei + + +class DriftLevel(str, Enum): + INFO = "info" # 白名單欄位,靜默記錄 + MEDIUM = "medium" # 非關鍵欄位,Telegram 通知 + HIGH = "high" # 關鍵欄位,立即通知需確認 + + +class DriftIntent(str, Enum): + EMERGENCY_HOTFIX = "emergency_hotfix" # 繞過 CI 的緊急修補 + HUMAN_ERROR = "human_error" # 誤操作 + AUTOMATED_CHANGE = "automated_change" # HPA/VPA 等系統自動變更 + UNKNOWN = "unknown" # 無法判斷 + + +class DriftStatus(str, Enum): + PENDING = "pending" # 等待統帥確認 + ROLLED_BACK = "rolled_back" # 已覆蓋回 Git + ADOPTED = "adopted" # 已承認,更新 Git + IGNORED = "ignored" # 忽略(白名單) + + +class DriftItem(BaseModel): + """單一資源欄位的漂移記錄""" + + resource_kind: str = Field(..., description="K8s 資源類型,如 Deployment") + resource_name: str = Field(..., description="資源名稱") + namespace: str = Field(..., description="K8s namespace") + field_path: str = Field(..., description="漂移欄位路徑,如 spec.replicas") + git_value: Optional[str] = Field(None, description="Git 中的預期值") + actual_value: Optional[str] = Field(None, description="K8s 實際值") + drift_level: DriftLevel = Field(..., description="漂移嚴重等級") + intent: Optional[DriftIntent] = Field(None, description="Nemotron 分析的漂移意圖") + intent_explanation: Optional[str] = Field(None, description="Nemotron 意圖說明") + confidence: Optional[float] = Field(None, ge=0.0, le=1.0, description="Nemotron 信心度") + status: DriftStatus = Field(default=DriftStatus.PENDING) + db_id: Optional[str] = Field(None, description="寫入 DB 後的 ID") + + +class DriftReport(BaseModel): + """一次掃描的完整漂移報告""" + + scan_trigger: str = Field(..., description="'webhook' | 'cron'") + namespace: str + items: list[DriftItem] = Field(default_factory=list) + scanned_at: str = Field(default_factory=lambda: now_taipei().isoformat()) + + @property + def has_structural_drift(self) -> bool: + """是否有需要通知的結構性漂移(MEDIUM/HIGH)""" + return any(i.drift_level in (DriftLevel.MEDIUM, DriftLevel.HIGH) for i in self.items) + + @property + def info_only(self) -> bool: + """全部都是 INFO(白名單)漂移""" + return all(i.drift_level == DriftLevel.INFO for i in self.items) +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftModels -v +``` + +預期:PASS + +- [ ] **Step 5:Commit** + +```bash +git add apps/api/src/models/drift.py apps/api/tests/test_p2_config_drift_detection.py +git commit -m "feat(models): DriftItem / DriftReport / DriftLevel / DriftIntent (P2)" +``` + +--- + +## Task 3:DriftAnalyzer — 白名單過濾 + DriftLevel 分級 + +**Files:** +- Create: `apps/api/src/services/drift_analyzer.py` + +- [ ] **Step 1:新增測試** + +在 `tests/test_p2_config_drift_detection.py` 新增: + +```python +class TestDriftAnalyzer: + """DriftAnalyzer — 白名單過濾 + DriftLevel 分級""" + + def test_replicas_is_info(self): + """replicas 屬於白名單,應分級為 INFO""" + from src.services.drift_analyzer import DriftAnalyzer + from src.models.drift import DriftLevel + + analyzer = DriftAnalyzer() + level = analyzer.classify_field("spec.replicas", "Deployment") + assert level == DriftLevel.INFO + + def test_image_is_high(self): + """image 是關鍵欄位,應分級為 HIGH""" + from src.services.drift_analyzer import DriftAnalyzer + from src.models.drift import DriftLevel + + analyzer = DriftAnalyzer() + level = analyzer.classify_field( + "spec.template.spec.containers[0].image", "Deployment" + ) + assert level == DriftLevel.HIGH + + def test_env_is_high(self): + """env 是關鍵欄位,應分級為 HIGH""" + from src.services.drift_analyzer import DriftAnalyzer + from src.models.drift import DriftLevel + + analyzer = DriftAnalyzer() + level = analyzer.classify_field( + "spec.template.spec.containers[0].env[0].value", "Deployment" + ) + assert level == DriftLevel.HIGH + + def test_annotations_is_info(self): + """annotations 屬於白名單""" + from src.services.drift_analyzer import DriftAnalyzer + from src.models.drift import DriftLevel + + analyzer = DriftAnalyzer() + level = analyzer.classify_field("metadata.annotations", "Deployment") + assert level == DriftLevel.INFO + + def test_classify_raw_diff_returns_drift_items(self): + """classify_diff() 接收原始 diff,回傳帶 DriftLevel 的 DriftItem list""" + from src.services.drift_analyzer import DriftAnalyzer + from src.models.drift import DriftLevel + + analyzer = DriftAnalyzer() + raw_diffs = [ + { + "resource_kind": "Deployment", + "resource_name": "awoooi-api", + "namespace": "awoooi-prod", + "field_path": "spec.replicas", + "git_value": "2", + "actual_value": "3", + }, + { + "resource_kind": "Deployment", + "resource_name": "awoooi-api", + "namespace": "awoooi-prod", + "field_path": "spec.template.spec.containers[0].image", + "git_value": "api:v1.2", + "actual_value": "api:v1.2-hotfix", + }, + ] + + items = analyzer.classify_diff(raw_diffs) + assert len(items) == 2 + levels = {i.field_path: i.drift_level for i in items} + assert levels["spec.replicas"] == DriftLevel.INFO + assert levels["spec.template.spec.containers[0].image"] == DriftLevel.HIGH +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftAnalyzer -v +``` + +預期:FAIL + +- [ ] **Step 3:建立 `services/drift_analyzer.py`** + +```python +""" +Drift Analyzer - P2 Config Drift Detection +=========================================== +白名單過濾 + DriftLevel 分級 + +職責: 接收原始 diff → 判斷每個欄位的嚴重等級 +不做: 意圖解釋(由 DriftInterpreter 負責) + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P2 Config Drift Detection) +""" + +from __future__ import annotations + +import os + +import structlog + +from src.models.drift import DriftItem, DriftLevel + +logger = structlog.get_logger(__name__) + +# ============================================================================= +# 白名單與關鍵欄位設定(可由環境變數覆蓋) +# ============================================================================= + +_DEFAULT_ALLOW_FIELDS = { + "spec.replicas", + "spec.template.spec.containers[*].resources.requests", + "spec.template.spec.containers[*].resources.limits", + "metadata.annotations", + "metadata.labels.pod-template-hash", + "metadata.resourceVersion", + "metadata.generation", + "status", +} + +_DEFAULT_CRITICAL_FIELDS_PREFIXES = ( + "spec.template.spec.containers[", # image, env, ports, volumeMounts + "spec.selector", + "spec.template.spec.volumes", + "spec.template.spec.serviceAccountName", + "spec.ports", +) + + +def _load_allow_fields() -> set[str]: + raw = os.getenv("DRIFT_ALLOW_FIELDS", "") + if raw: + return set(f.strip() for f in raw.split(",")) + return _DEFAULT_ALLOW_FIELDS + + +def _is_critical_field(field_path: str) -> bool: + """判斷欄位是否為關鍵欄位(結構性)""" + # image / env / ports → HIGH + if any(field_path.startswith(p) for p in _DEFAULT_CRITICAL_FIELDS_PREFIXES): + return True + # 直接命中的關鍵欄位 + critical_exact = {"spec.template.spec.restartPolicy", "spec.type"} + return field_path in critical_exact + + +class DriftAnalyzer: + """ + 白名單過濾 + DriftLevel 分級 + + INFO → 白名單欄位,靜默記錄 + MEDIUM → 非關鍵非白名單,通知但不緊急 + HIGH → 關鍵欄位(image/env/ports),立即通知 + """ + + def __init__(self) -> None: + self._allow_fields = _load_allow_fields() + + def classify_field(self, field_path: str, resource_kind: str = "") -> DriftLevel: + """ + 判斷單一欄位的漂移等級 + + Args: + field_path: K8s 欄位路徑 + resource_kind: 資源類型(目前未分類型處理,供未來擴展) + + Returns: + DriftLevel + """ + # 白名單檢查(精確比對 + 前綴比對) + for allow in self._allow_fields: + if field_path == allow or field_path.startswith(allow.replace("[*]", "[")): + return DriftLevel.INFO + + # 關鍵欄位檢查 + if _is_critical_field(field_path): + return DriftLevel.HIGH + + return DriftLevel.MEDIUM + + def classify_diff(self, raw_diffs: list[dict]) -> list[DriftItem]: + """ + 批次分級原始 diff + + Args: + raw_diffs: list of dict with keys: + resource_kind, resource_name, namespace, field_path, git_value, actual_value + + Returns: + list[DriftItem] + """ + items = [] + for diff in raw_diffs: + level = self.classify_field( + diff["field_path"], + diff.get("resource_kind", ""), + ) + items.append(DriftItem( + resource_kind=diff["resource_kind"], + resource_name=diff["resource_name"], + namespace=diff["namespace"], + field_path=diff["field_path"], + git_value=diff.get("git_value"), + actual_value=diff.get("actual_value"), + drift_level=level, + )) + + logger.info( + "drift_analyze_done", + total=len(items), + high=sum(1 for i in items if i.drift_level == DriftLevel.HIGH), + medium=sum(1 for i in items if i.drift_level == DriftLevel.MEDIUM), + info=sum(1 for i in items if i.drift_level == DriftLevel.INFO), + ) + return items +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftAnalyzer -v +``` + +預期:PASS + +- [ ] **Step 5:Commit** + +```bash +git add apps/api/src/services/drift_analyzer.py apps/api/tests/test_p2_config_drift_detection.py +git commit -m "feat(drift-analyzer): 白名單過濾 + DriftLevel 分級 (P2)" +``` + +--- + +## Task 4:DriftInterpreter — Nemotron 意圖分析 + +**Files:** +- Create: `apps/api/src/services/drift_interpreter.py` + +- [ ] **Step 1:新增測試** + +在 `tests/test_p2_config_drift_detection.py` 新增: + +```python +class TestDriftInterpreter: + """NemotronDriftInterpreter — 意圖分析(不生成修復指令)""" + + @pytest.mark.asyncio + async def test_interprets_image_change_as_hotfix(self): + """image tag 從 vX.Y 改為 vX.Y-hotfix → emergency_hotfix""" + from src.services.drift_interpreter import NemotronDriftInterpreter + from src.models.drift import DriftItem, DriftLevel, DriftIntent + from src.services.ai_providers.interfaces import AIResult + from unittest.mock import AsyncMock + + mock_router = AsyncMock() + mock_router.execute = AsyncMock(return_value=AIResult( + raw_response='{"intent": "emergency_hotfix", "explanation": "Image tag 從 v1.2 改為 v1.2-hotfix,繞過 CI,疑似緊急修補", "confidence": 0.9}', + success=True, + provider="nemotron", + )) + + interpreter = NemotronDriftInterpreter(ai_router=mock_router) + + items = [DriftItem( + resource_kind="Deployment", + resource_name="awoooi-api", + namespace="awoooi-prod", + field_path="spec.template.spec.containers[0].image", + git_value="harbor.wooo.work/awoooi/api:v1.2", + actual_value="harbor.wooo.work/awoooi/api:v1.2-hotfix", + drift_level=DriftLevel.HIGH, + )] + + result_items = await interpreter.analyze(items) + + assert result_items[0].intent == DriftIntent.EMERGENCY_HOTFIX + assert result_items[0].confidence == 0.9 + assert "緊急修補" in result_items[0].intent_explanation + + @pytest.mark.asyncio + async def test_ai_failure_sets_unknown_intent(self): + """AI 呼叫失敗 → intent=UNKNOWN,不 raise""" + from src.services.drift_interpreter import NemotronDriftInterpreter + from src.models.drift import DriftItem, DriftLevel, DriftIntent + from src.services.ai_providers.interfaces import AIResult + from unittest.mock import AsyncMock + + mock_router = AsyncMock() + mock_router.execute = AsyncMock(return_value=AIResult( + raw_response="", + success=False, + provider="none", + error="timeout", + )) + + interpreter = NemotronDriftInterpreter(ai_router=mock_router) + + items = [DriftItem( + resource_kind="Deployment", + resource_name="awoooi-api", + namespace="awoooi-prod", + field_path="spec.template.spec.containers[0].image", + git_value="v1.2", + actual_value="v1.3", + drift_level=DriftLevel.HIGH, + )] + + result_items = await interpreter.analyze(items) + + assert result_items[0].intent == DriftIntent.UNKNOWN + assert result_items[0].confidence is None +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftInterpreter -v +``` + +預期:FAIL + +- [ ] **Step 3:建立 `services/drift_interpreter.py`** + +```python +""" +Drift Interpreter - P2 Config Drift Detection +============================================== +Nemotron 意圖分析:這個漂移代表什麼?人為操作?Hotfix?異常? + +職責: 接收 DriftItem list → 呼叫 Nemotron → 填入 intent / explanation / confidence +不做: 生成修復指令(確定性修復由 DriftRemediator 負責) + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P2 Config Drift Detection) +""" + +from __future__ import annotations + +import json + +import structlog + +from src.models.drift import DriftIntent, DriftItem, DriftLevel + +logger = structlog.get_logger(__name__) + +_INTERPRET_PROMPT_TEMPLATE = """你是 SRE 資深工程師。請分析以下 Kubernetes 配置漂移,判斷其意圖。 + +## 漂移清單 +{drift_summary} + +請以 JSON 格式回應: +{{ + "intent": "emergency_hotfix" | "human_error" | "automated_change" | "unknown", + "explanation": "簡短說明(繁體中文,一句話)", + "confidence": 0.0 ~ 1.0 +}} + +說明: +- emergency_hotfix: 繞過 CI/CD 的緊急修補(image tag 帶 hotfix/patch/fix) +- human_error: 誤操作(刪除、意外修改) +- automated_change: HPA/VPA/系統自動變更 +- unknown: 無法判斷""" + + +class NemotronDriftInterpreter: + """ + Nemotron 驅動的漂移意圖分析器 + + 職責: 解釋「為什麼漂移」,不負責「如何修復」 + """ + + def __init__(self, ai_router=None) -> None: + self._ai_router = ai_router + + def _get_router(self): + if self._ai_router is None: + from src.services.ai_router import get_ai_router + self._ai_router = get_ai_router() + return self._ai_router + + async def analyze(self, items: list[DriftItem]) -> list[DriftItem]: + """ + 對 MEDIUM/HIGH 的 DriftItem 進行意圖分析 + + INFO 等級直接跳過(白名單,不需解釋) + 分析失敗 → intent=UNKNOWN,不 raise + + Returns: + list[DriftItem]: 填入 intent / explanation / confidence 的 items + """ + structural_items = [i for i in items if i.drift_level != DriftLevel.INFO] + + if not structural_items: + return items + + drift_summary = "\n".join( + f"- [{i.resource_kind}/{i.resource_name}] {i.field_path}: " + f"{i.git_value!r} → {i.actual_value!r}" + for i in structural_items + ) + + prompt = _INTERPRET_PROMPT_TEMPLATE.format(drift_summary=drift_summary) + + try: + router = self._get_router() + result = await router.execute( + prompt=prompt, + provider_order=["nemotron", "gemini", "ollama"], + context={"task_type": "drift_interpretation"}, + ) + + if not result.success: + logger.warning("drift_interpreter_ai_failed", error=result.error) + self._set_unknown(structural_items) + return items + + data = json.loads(result.raw_response) + intent = DriftIntent(data.get("intent", "unknown")) + explanation = data.get("explanation", "") + confidence = data.get("confidence") + + for item in structural_items: + item.intent = intent + item.intent_explanation = explanation + item.confidence = confidence + + logger.info( + "drift_interpreter_done", + intent=intent.value, + confidence=confidence, + items_count=len(structural_items), + ) + + except (json.JSONDecodeError, KeyError, ValueError) as e: + logger.error("drift_interpreter_parse_error", error=str(e)) + self._set_unknown(structural_items) + except Exception as e: + logger.error("drift_interpreter_error", error=str(e)) + self._set_unknown(structural_items) + + return items + + def _set_unknown(self, items: list[DriftItem]) -> None: + for item in items: + item.intent = DriftIntent.UNKNOWN + item.confidence = None +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftInterpreter -v +``` + +預期:PASS + +- [ ] **Step 5:Commit** + +```bash +git add apps/api/src/services/drift_interpreter.py apps/api/tests/test_p2_config_drift_detection.py +git commit -m "feat(drift-interpreter): Nemotron 意圖分析 — 不生成修復指令 (P2)" +``` + +--- + +## Task 5:DriftRemediator — 確定性修復 + +**Files:** +- Create: `apps/api/src/services/drift_remediator.py` + +- [ ] **Step 1:新增測試** + +在 `tests/test_p2_config_drift_detection.py` 新增: + +```python +class TestDriftRemediator: + """DriftRemediator — kubectl apply (rollback) / git push (adopt)""" + + @pytest.mark.asyncio + async def test_rollback_builds_correct_kubectl_command(self): + """rollback() 應執行 kubectl apply -f,不讓 AI 猜指令""" + from src.services.drift_remediator import DriftRemediator + from src.models.drift import DriftItem, DriftLevel + from unittest.mock import AsyncMock, patch + + remediator = DriftRemediator() + + item = DriftItem( + resource_kind="Deployment", + resource_name="awoooi-api", + namespace="awoooi-prod", + field_path="spec.template.spec.containers[0].image", + git_value="api:v1.2", + actual_value="api:v1.2-hotfix", + drift_level=DriftLevel.HIGH, + ) + + with patch.object(remediator, '_run_command', new_callable=AsyncMock) as mock_run: + mock_run.return_value = (0, "deployment.apps/awoooi-api configured", "") + result = await remediator.rollback(item, git_yaml_dir="k8s/") + + assert result.success is True + # 確認呼叫的是 kubectl apply,不是 AI 生成的指令 + cmd_args = mock_run.call_args[0][0] + assert "kubectl" in cmd_args[0] + assert "apply" in cmd_args + + @pytest.mark.asyncio + async def test_rollback_failure_returns_error(self): + """kubectl 執行失敗 → 回傳 success=False,不重試""" + from src.services.drift_remediator import DriftRemediator, RemediationResult + from src.models.drift import DriftItem, DriftLevel + from unittest.mock import AsyncMock, patch + + remediator = DriftRemediator() + + item = DriftItem( + resource_kind="Deployment", + resource_name="awoooi-api", + namespace="awoooi-prod", + field_path="spec.template.spec.containers[0].image", + git_value="api:v1.2", + actual_value="api:v1.2-hotfix", + drift_level=DriftLevel.HIGH, + ) + + with patch.object(remediator, '_run_command', new_callable=AsyncMock) as mock_run: + mock_run.return_value = (1, "", "Error: connection refused") + result = await remediator.rollback(item, git_yaml_dir="k8s/") + + assert result.success is False + assert "connection refused" in result.error + # 確認只呼叫一次,不重試 + assert mock_run.call_count == 1 +``` + +- [ ] **Step 2:執行確認失敗** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftRemediator -v +``` + +預期:FAIL + +- [ ] **Step 3:建立 `services/drift_remediator.py`** + +```python +""" +Drift Remediator - P2 Config Drift Detection +============================================= +確定性修復:kubectl apply (rollback) / git push (adopt) + +核心原則: AI 不猜修復指令。 + - Rollback = kubectl apply -f (強制覆蓋 K8s 回 Git 狀態) + - Adopt = git commit + git push gitea main(更新 Git 為 K8s 現況) + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P2 Config Drift Detection) +""" + +from __future__ import annotations + +import asyncio +import os +from dataclasses import dataclass + +import structlog + +from src.models.drift import DriftItem, DriftStatus + +logger = structlog.get_logger(__name__) + +_GITEA_REMOTE = os.getenv("GITEA_REMOTE", "gitea") +_GIT_YAML_DIR = os.getenv("K8S_YAML_DIR", "k8s/") + + +@dataclass +class RemediationResult: + """修復結果""" + success: bool + action: str # "rollback" | "adopt" + resource: str + command: str + output: str = "" + error: str = "" + + +class DriftRemediator: + """ + 確定性修復執行器 + + 不使用 AI,只執行固定指令: + - rollback: kubectl apply -f + - adopt: git commit + git push + """ + + async def rollback( + self, + item: DriftItem, + git_yaml_dir: str = _GIT_YAML_DIR, + ) -> RemediationResult: + """ + 覆蓋回 Git 狀態:kubectl apply -f + + 不重試失敗(避免重複操作風險) + """ + yaml_path = self._find_yaml_path(item, git_yaml_dir) + cmd = [ + "kubectl", "apply", + "-f", yaml_path, + "-n", item.namespace, + ] + + return_code, stdout, stderr = await self._run_command(cmd) + + if return_code == 0: + logger.info( + "drift_rollback_success", + resource=f"{item.resource_kind}/{item.resource_name}", + yaml_path=yaml_path, + ) + return RemediationResult( + success=True, + action="rollback", + resource=f"{item.resource_kind}/{item.resource_name}", + command=" ".join(cmd), + output=stdout, + ) + else: + logger.error( + "drift_rollback_failed", + resource=f"{item.resource_kind}/{item.resource_name}", + error=stderr, + ) + return RemediationResult( + success=False, + action="rollback", + resource=f"{item.resource_kind}/{item.resource_name}", + command=" ".join(cmd), + error=stderr, + ) + + async def adopt( + self, + item: DriftItem, + git_yaml_dir: str = _GIT_YAML_DIR, + ) -> RemediationResult: + """ + 承認變更:更新 Git YAML → git commit + git push gitea main + + 步驟: + 1. 用 kubectl get 取得當前狀態 + 2. 更新對應 YAML 欄位 + 3. git commit + git push + """ + # 簡化實作:產生 patch YAML 並 commit + patch_note = ( + f"chore(drift): adopt — {item.resource_kind}/{item.resource_name} " + f"{item.field_path}: {item.actual_value}" + ) + + # Step 1: kubectl patch 更新 K8s(確保一致性) + cmd_patch = [ + "kubectl", "patch", + item.resource_kind.lower(), item.resource_name, + "-n", item.namespace, + "--type=merge", + "-p", f'{{"metadata":{{"annotations":{{"drift-adopted":"true"}}}}}}', + ] + rc, out, err = await self._run_command(cmd_patch) + + # Step 2: git commit(記錄此次承認) + cmd_commit = ["git", "commit", "--allow-empty", "-m", patch_note] + rc2, out2, err2 = await self._run_command(cmd_commit) + + # Step 3: git push gitea main + cmd_push = ["git", "push", _GITEA_REMOTE, "main"] + rc3, out3, err3 = await self._run_command(cmd_push) + + if rc3 == 0: + logger.info("drift_adopt_success", resource=f"{item.resource_kind}/{item.resource_name}") + return RemediationResult( + success=True, + action="adopt", + resource=f"{item.resource_kind}/{item.resource_name}", + command=f"git commit + git push {_GITEA_REMOTE} main", + output=out3, + ) + else: + logger.error("drift_adopt_failed", error=err3) + return RemediationResult( + success=False, + action="adopt", + resource=f"{item.resource_kind}/{item.resource_name}", + command=f"git push {_GITEA_REMOTE} main", + error=err3, + ) + + def _find_yaml_path(self, item: DriftItem, git_yaml_dir: str) -> str: + """根據資源類型和名稱推斷 YAML 路徑""" + kind_lower = item.resource_kind.lower() + name = item.resource_name + # 慣例:k8s/deployments/awoooi-api.yaml + candidates = [ + f"{git_yaml_dir}/{kind_lower}s/{name}.yaml", + f"{git_yaml_dir}/{name}.yaml", + f"{git_yaml_dir}/{kind_lower}-{name}.yaml", + ] + for path in candidates: + if os.path.exists(path): + return path + # fallback: 回傳第一個候選(讓 kubectl apply 自己報錯) + return candidates[0] + + async def _run_command(self, cmd: list[str]) -> tuple[int, str, str]: + """執行 shell 指令,回傳 (return_code, stdout, stderr)""" + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await proc.communicate() + return proc.returncode, stdout.decode(), stderr.decode() +``` + +- [ ] **Step 4:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftRemediator -v +``` + +預期:PASS + +- [ ] **Step 5:Commit** + +```bash +git add apps/api/src/services/drift_remediator.py apps/api/tests/test_p2_config_drift_detection.py +git commit -m "feat(drift-remediator): kubectl apply (rollback) + git push (adopt) — 確定性修復 (P2)" +``` + +--- + +## Task 6:DriftDetector — Git vs K8s 比對(骨架) + +**Files:** +- Create: `apps/api/src/services/drift_detector.py` + +- [ ] **Step 1:新增測試(骨架驗證)** + +在 `tests/test_p2_config_drift_detection.py` 新增: + +```python +class TestDriftDetector: + """DriftDetector — 介面驗證(實際 K8s 需整合測試)""" + + def test_detector_has_required_interface(self): + """DriftDetector 應有 scan() 方法""" + from src.services.drift_detector import DriftDetector + detector = DriftDetector() + assert hasattr(detector, 'scan') + assert callable(detector.scan) + + def test_git_state_reader_exists(self): + """GitStateReader 應有 read() 方法""" + from src.services.drift_detector import GitStateReader + reader = GitStateReader() + assert hasattr(reader, 'read') + + def test_k8s_state_reader_exists(self): + """K8sStateReader 應有 read() 方法""" + from src.services.drift_detector import K8sStateReader + reader = K8sStateReader() + assert hasattr(reader, 'read') +``` + +- [ ] **Step 2:建立 `services/drift_detector.py`** + +```python +""" +Drift Detector - P2 Config Drift Detection +========================================== +Git YAML vs K8s 實際狀態比對 + +職責: 讀取兩邊狀態 → 輸出原始 diff list(不分級) +不做: 意圖分析、嚴重等級判斷 + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P2 Config Drift Detection) +""" + +from __future__ import annotations + +import asyncio +import json +import os +from pathlib import Path + +import structlog +import yaml + +from src.models.drift import DriftReport + +logger = structlog.get_logger(__name__) + +_K8S_YAML_DIR = os.getenv("K8S_YAML_DIR", "k8s/") +_SCAN_NAMESPACES = os.getenv("DRIFT_SCAN_NAMESPACES", "awoooi-prod").split(",") + + +class GitStateReader: + """從 Git 倉庫讀取 K8s YAML 期望狀態""" + + def read(self, yaml_dir: str = _K8S_YAML_DIR) -> dict: + """ + 掃描 yaml_dir 下所有 .yaml/.yml,解析為 {resource_key: spec_dict} + resource_key = "{kind}/{name}/{namespace}" + """ + state: dict = {} + yaml_path = Path(yaml_dir) + + if not yaml_path.exists(): + logger.warning("git_state_dir_not_found", path=yaml_dir) + return state + + for file in yaml_path.rglob("*.yaml"): + try: + with open(file) as f: + docs = list(yaml.safe_load_all(f)) + for doc in docs: + if not doc or "kind" not in doc: + continue + kind = doc.get("kind", "Unknown") + name = doc.get("metadata", {}).get("name", "") + namespace = doc.get("metadata", {}).get("namespace", "default") + key = f"{kind}/{name}/{namespace}" + state[key] = doc + except Exception as e: + logger.warning("git_state_parse_error", file=str(file), error=str(e)) + + return state + + +class K8sStateReader: + """從 K8s 叢集讀取實際狀態""" + + async def read(self, namespaces: list[str] = _SCAN_NAMESPACES) -> dict: + """ + 執行 kubectl get 讀取指定 namespace 的資源 + 回傳 {resource_key: spec_dict} + """ + state: dict = {} + + for ns in namespaces: + for kind in ("Deployment", "Service", "ConfigMap"): + try: + proc = await asyncio.create_subprocess_exec( + "kubectl", "get", kind, + "-n", ns, + "-o", "json", + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + ) + stdout, stderr = await asyncio.wait_for(proc.communicate(), timeout=10) + + if proc.returncode != 0: + logger.warning( + "k8s_state_read_error", + kind=kind, namespace=ns, + error=stderr.decode(), + ) + continue + + data = json.loads(stdout.decode()) + for item in data.get("items", []): + name = item.get("metadata", {}).get("name", "") + key = f"{kind}/{name}/{ns}" + state[key] = item + + except asyncio.TimeoutError: + logger.warning("k8s_state_read_timeout", kind=kind, namespace=ns) + except Exception as e: + logger.error("k8s_state_read_exception", kind=kind, namespace=ns, error=str(e)) + + return state + + +class DriftDetector: + """ + Git vs K8s 狀態比對器 + + 輸出: DriftReport(含原始 diff,未分級) + 後續由 DriftAnalyzer 分級 + """ + + def __init__(self) -> None: + self._git_reader = GitStateReader() + self._k8s_reader = K8sStateReader() + + async def scan( + self, + scan_trigger: str = "cron", + namespaces: list[str] = _SCAN_NAMESPACES, + yaml_dir: str = _K8S_YAML_DIR, + ) -> DriftReport: + """ + 執行完整掃描,回傳 DriftReport(items 尚未分級) + + Args: + scan_trigger: "webhook" | "cron" + namespaces: 要掃描的 namespace 列表 + yaml_dir: Git YAML 目錄 + """ + namespace_str = ",".join(namespaces) + + git_state = self._git_reader.read(yaml_dir) + k8s_state = await self._k8s_reader.read(namespaces) + + raw_diffs = self._diff(git_state, k8s_state) + + logger.info( + "drift_scan_done", + trigger=scan_trigger, + namespaces=namespace_str, + diff_count=len(raw_diffs), + ) + + return DriftReport( + scan_trigger=scan_trigger, + namespace=namespace_str, + items=raw_diffs, # 此時還未分級,由 DriftAnalyzer 處理 + ) + + def _diff(self, git_state: dict, k8s_state: dict) -> list: + """ + 比對 git vs k8s,回傳差異的 raw dict list + 只比對兩邊都有的資源(新增/刪除另處理) + """ + from src.models.drift import DriftItem, DriftLevel + + diffs = [] + for key, git_spec in git_state.items(): + if key not in k8s_state: + continue # 資源不存在於 K8s,可能尚未部署 + + k8s_spec = k8s_state[key] + kind, name, namespace = key.split("/", 2) + + # 比對 spec 層的關鍵欄位 + flat_diffs = self._flatten_diff( + git_spec.get("spec", {}), + k8s_spec.get("spec", {}), + prefix="spec", + resource_kind=kind, + resource_name=name, + namespace=namespace, + ) + diffs.extend(flat_diffs) + + return diffs + + def _flatten_diff( + self, + git_obj: dict, + k8s_obj: dict, + prefix: str, + resource_kind: str, + resource_name: str, + namespace: str, + depth: int = 0, + ) -> list: + """遞歸比對兩個 dict,找出差異欄位""" + from src.models.drift import DriftItem, DriftLevel + + if depth > 5: # 防止無限遞歸 + return [] + + diffs = [] + all_keys = set(git_obj.keys()) | set(k8s_obj.keys()) + + for k in all_keys: + field_path = f"{prefix}.{k}" + g_val = git_obj.get(k) + k_val = k8s_obj.get(k) + + if g_val == k_val: + continue + + if isinstance(g_val, dict) and isinstance(k_val, dict): + diffs.extend(self._flatten_diff( + g_val, k_val, field_path, + resource_kind, resource_name, namespace, + depth + 1, + )) + else: + diffs.append(DriftItem( + resource_kind=resource_kind, + resource_name=resource_name, + namespace=namespace, + field_path=field_path, + git_value=str(g_val) if g_val is not None else None, + actual_value=str(k_val) if k_val is not None else None, + drift_level=DriftLevel.INFO, # 預設 INFO,由 DriftAnalyzer 覆蓋 + )) + + return diffs +``` + +- [ ] **Step 3:執行測試** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py::TestDriftDetector -v +``` + +預期:PASS + +- [ ] **Step 4:Commit** + +```bash +git add apps/api/src/services/drift_detector.py apps/api/tests/test_p2_config_drift_detection.py +git commit -m "feat(drift-detector): GitStateReader + K8sStateReader + diff 比對 (P2)" +``` + +--- + +## Task 7:API Endpoint + K8s CronJob + +**Files:** +- Create: `apps/api/src/api/v1/drift.py` +- Create: `k8s/drift-cronjob.yaml` + +- [ ] **Step 1:建立 `api/v1/drift.py`** + +```python +""" +Drift Detection API - P2 Config Drift Detection +================================================ +POST /internal/drift/scan — Gitea Webhook + CronJob 統一入口 + +建立時間: 2026-04-04 (台北時區) +建立者: Claude Code (P2 Config Drift Detection) +""" + +from __future__ import annotations + +from fastapi import APIRouter, BackgroundTasks + +import structlog + +from src.models.drift import DriftReport + +logger = structlog.get_logger(__name__) +router = APIRouter(prefix="/internal/drift", tags=["drift"]) + + +@router.post("/scan", response_model=dict) +async def trigger_drift_scan( + background_tasks: BackgroundTasks, + trigger: str = "webhook", +) -> dict: + """ + 觸發 Config Drift 掃描 + + 觸發來源: + - Gitea CD Webhook: POST /internal/drift/scan?trigger=webhook + - K8s CronJob: POST /internal/drift/scan?trigger=cron + """ + background_tasks.add_task(_run_drift_pipeline, trigger) + return {"status": "accepted", "trigger": trigger} + + +async def _run_drift_pipeline(trigger: str) -> None: + """完整 Drift 掃描 Pipeline(背景執行)""" + from src.services.drift_detector import DriftDetector + from src.services.drift_analyzer import DriftAnalyzer + from src.services.drift_interpreter import NemotronDriftInterpreter + from src.models.drift import DriftLevel + + try: + # Step 1: 偵測 + detector = DriftDetector() + report = await detector.scan(scan_trigger=trigger) + + if not report.items: + logger.info("drift_scan_clean", trigger=trigger) + return + + # Step 2: 分級 + analyzer = DriftAnalyzer() + classified_items = analyzer.classify_diff([ + { + "resource_kind": i.resource_kind, + "resource_name": i.resource_name, + "namespace": i.namespace, + "field_path": i.field_path, + "git_value": i.git_value, + "actual_value": i.actual_value, + } + for i in report.items + ]) + report.items = classified_items + + if report.info_only: + logger.info("drift_scan_info_only", trigger=trigger, count=len(report.items)) + return + + # Step 3: 意圖分析(只對 MEDIUM/HIGH) + interpreter = NemotronDriftInterpreter() + report.items = await interpreter.analyze(report.items) + + # Step 4: Telegram 推送 + await _push_drift_notification(report) + + except Exception as e: + logger.error("drift_pipeline_error", trigger=trigger, error=str(e)) + + +async def _push_drift_notification(report: DriftReport) -> None: + """推送漂移通知到 Telegram""" + from src.services.drift_analyzer import DriftLevel + + structural = [i for i in report.items if i.drift_level != DriftLevel.INFO] + + if not structural: + return + + # 建立摘要文字 + lines = ["🔍 Config Drift 偵測", ""] + for item in structural[:5]: # 最多顯示 5 筆 + icon = "🔴" if item.drift_level == DriftLevel.HIGH else "🟡" + lines.append( + f"{icon} {item.resource_kind}/{item.resource_name}\n" + f" {item.field_path}\n" + f" Git: {item.git_value or 'N/A'}\n" + f" 實際: {item.actual_value or 'N/A'}" + ) + if item.intent_explanation: + lines.append(f" 💡 {item.intent_explanation}") + lines.append("") + + if len(structural) > 5: + lines.append(f"... 共 {len(structural)} 筆漂移") + + message = "\n".join(lines) + + try: + from src.services.telegram_gateway import get_telegram_gateway + gw = get_telegram_gateway() + # 使用現有的 push_system_alert(含按鈕的版本由 TelegramGateway 擴展) + await gw.push_system_alert(message) + except Exception as e: + logger.error("drift_telegram_push_failed", error=str(e)) +``` + +- [ ] **Step 2:確認並找到 router 的 include 位置** + +```bash +grep -rn "include_router\|APIRouter\|v1" apps/api/src/api/ --include="*.py" | grep -v __pycache__ | head -15 +``` + +在現有的 router include 檔案中加入: + +```python +from src.api.v1 import drift +app.include_router(drift.router) +``` + +(確認正確的 include 位置後執行) + +- [ ] **Step 3:建立 K8s CronJob** + +```yaml +# k8s/drift-cronjob.yaml +# Config Drift Detection CronJob — 每小時掃描 +# 建立時間: 2026-04-04 (台北時區) +# 建立者: Claude Code (P2 Config Drift Detection) +apiVersion: batch/v1 +kind: CronJob +metadata: + name: drift-detector + namespace: awoooi-prod +spec: + schedule: "0 * * * *" # 每小時整點 + timeZone: "Asia/Taipei" + concurrencyPolicy: Forbid # 上次還沒完成就不再觸發 + jobTemplate: + spec: + template: + spec: + restartPolicy: Never + containers: + - name: drift-trigger + image: curlimages/curl:8.6.0 + command: + - curl + - -X + - POST + - "http://awoooi-api.awoooi-prod.svc.cluster.local:8000/internal/drift/scan?trigger=cron" + - --fail + - --silent + - --show-error + resources: + requests: + cpu: "10m" + memory: "16Mi" + limits: + cpu: "50m" + memory: "32Mi" +``` + +- [ ] **Step 4:Commit** + +```bash +git add apps/api/src/api/v1/drift.py k8s/drift-cronjob.yaml +git commit -m "feat(drift-api): POST /internal/drift/scan endpoint + K8s CronJob (P2)" +``` + +--- + +## Task 8:全部測試通過 + 最終 Commit + +- [ ] **Step 1:執行 P2 全部測試** + +```bash +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py -v +``` + +預期:全部 PASS + +- [ ] **Step 2:執行既有測試確認未破壞** + +```bash +cd apps/api && python -m pytest tests/test_auto_repair_service.py tests/test_playbook_service.py tests/test_smart_router.py tests/test_intent_classifier.py -v +``` + +預期:全部 PASS + +- [ ] **Step 3:最終 Commit** + +```bash +git add -A +git commit -m "feat(drift-detection): P2 Config Drift Detection 完整實作 — DriftDetector/Analyzer/Interpreter/Remediator (P2)" +``` + +--- + +## 驗收標準 + +```bash +# P2 測試全過 +cd apps/api && python -m pytest tests/test_p2_config_drift_detection.py -v + +# API endpoint 可達 +curl -X POST http://localhost:8000/internal/drift/scan?trigger=cron + +# CronJob YAML 語法驗證 +kubectl apply --dry-run=client -f k8s/drift-cronjob.yaml +``` + +**Co-Authored-By: Claude Sonnet 4.6 **