diff --git a/apps/api/tests/test_sentry_webhook_signature.py b/apps/api/tests/test_sentry_webhook_signature.py new file mode 100644 index 00000000..3d16728d --- /dev/null +++ b/apps/api/tests/test_sentry_webhook_signature.py @@ -0,0 +1,260 @@ +""" +Sentry Webhook Signature 驗證測試 +=================================== +P3.1-T2 by Claude 2026-04-27 — Tier-2 三服務感知強化 + +驗證: +1. SentryWebhookService.parse_sentry_issue() 正確解析 payload +2. SentryWebhookService.build_telegram_message() 組裝正確 +3. verify_sentry_signature() 正確驗證 HMAC-SHA256 +4. verify_sentry_signature() 在 prod 無 secret 時 raise +5. handle_sentry_error endpoint 有 signature 驗證(401 on bad sig) +6. handle_sentry_error endpoint 在有效簽章時通過驗證 + +注意:endpoint 測試使用 FastAPI TestClient + mock service +""" + +from __future__ import annotations + +import hashlib +import hmac + +import pytest + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: SentryWebhookService.parse_sentry_issue +# ───────────────────────────────────────────────────────────────────────────── + +class TestParseSentryIssue: + def setup_method(self): + from src.services.sentry_webhook_service import SentryWebhookService + self.svc = SentryWebhookService() + + def _make_payload( + self, + issue_id: str = "12345", + title: str = "TypeError: null", + culprit: str = "app/views.py:42", + level: str = "error", + project_slug: str = "awoooi", + action: str = "triggered", + ) -> dict: + return { + "action": action, + "data": { + "issue": { + "id": issue_id, + "title": title, + "culprit": culprit, + "level": level, + "firstSeen": "2026-04-27T00:00:00Z", + "count": 3, + "project": {"slug": project_slug}, + }, + "event": { + "message": "Something went wrong", + "platform": "python", + "tags": [["env", "prod"]], + "exception": { + "values": [{ + "stacktrace": { + "frames": [ + {"filename": "app/views.py", "function": "get", "lineno": 42, "context_line": "x = obj.value"}, + ] + } + }] + }, + }, + }, + } + + def test_parse_valid_payload(self): + payload = self._make_payload() + issue = self.svc.parse_sentry_issue(payload) + assert issue is not None + assert issue.issue_id == "12345" + assert issue.title == "TypeError: null" + assert issue.culprit == "app/views.py:42" + assert issue.level == "error" + assert issue.project == "awoooi" + assert issue.count == 3 + + def test_parse_missing_issue_id_returns_none(self): + payload = self._make_payload() + del payload["data"]["issue"]["id"] + result = self.svc.parse_sentry_issue(payload) + assert result is None + + def test_parse_stacktrace_extracted(self): + payload = self._make_payload() + issue = self.svc.parse_sentry_issue(payload) + assert issue is not None + assert len(issue.stacktrace) == 1 + assert issue.stacktrace[0]["filename"] == "app/views.py" + + def test_parse_empty_payload_returns_none(self): + result = self.svc.parse_sentry_issue({}) + assert result is None + + def test_parse_exception_safety(self): + """惡意 payload 不應 raise,應回傳 None""" + result = self.svc.parse_sentry_issue({"data": None}) + assert result is None + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: SentryWebhookService.build_telegram_message +# ───────────────────────────────────────────────────────────────────────────── + +class TestBuildTelegramMessage: + def setup_method(self): + from src.services.sentry_webhook_service import ( + AIDecision, + SentryIssueContext, + SentryWebhookService, + ) + self.svc = SentryWebhookService() + self.issue = SentryIssueContext( + issue_id="99", + title="NullPointerException", + culprit="com.example.Service:100", + level="fatal", + project="backend", + first_seen="2026-04-27T00:00:00Z", + count=1, + message=None, + platform="java", + tags=[], + stacktrace=[], + ) + self.decision = AIDecision( + root_cause="NullPointer in bean init", + impact="Service down", + fix_suggestion="Add null check", + prevention="Unit test coverage", + confidence=0.85, + analyzed_by="ollama/qwen3", + ) + + def test_message_contains_project(self): + msg = self.svc.build_telegram_message(self.issue, self.decision, "ap-001") + assert "backend" in msg + + def test_message_contains_approval_id(self): + msg = self.svc.build_telegram_message(self.issue, self.decision, "ap-001") + assert "ap-001" in msg + + def test_message_contains_root_cause(self): + msg = self.svc.build_telegram_message(self.issue, self.decision, "ap-001") + assert "NullPointer" in msg + + def test_message_no_decision(self): + """decision=None 時不應 crash""" + msg = self.svc.build_telegram_message(self.issue, None, "ap-002") + assert "ap-002" in msg + + def test_message_frequency_shown_when_count_gt_1(self): + freq = {"count_1h": 3, "count_24h": 10, "count_7d": 25} + msg = self.svc.build_telegram_message(self.issue, self.decision, "ap-003", anomaly_frequency=freq) + assert "頻率" in msg + assert "10" in msg # count_24h + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: verify_sentry_signature +# ───────────────────────────────────────────────────────────────────────────── + +class TestVerifySentrySignature: + def _make_sig(self, body: bytes, secret: str) -> str: + return hmac.new(secret.encode(), body, hashlib.sha256).hexdigest() + + def test_valid_signature_passes(self, monkeypatch): + from src.services.sentry_webhook_service import verify_sentry_signature + from src.core.config import settings + monkeypatch.setattr(settings, "SENTRY_WEBHOOK_SECRET", "test-secret") + monkeypatch.setattr(settings, "ENVIRONMENT", "dev") + + body = b'{"action": "triggered"}' + sig = self._make_sig(body, "test-secret") + assert verify_sentry_signature(body, sig) is True + + def test_invalid_signature_raises(self, monkeypatch): + from src.services.sentry_webhook_service import ( + SentrySignatureError, + verify_sentry_signature, + ) + from src.core.config import settings + monkeypatch.setattr(settings, "SENTRY_WEBHOOK_SECRET", "test-secret") + monkeypatch.setattr(settings, "ENVIRONMENT", "dev") + + body = b'{"action": "triggered"}' + with pytest.raises(SentrySignatureError): + verify_sentry_signature(body, "wrong-signature") + + def test_missing_secret_in_prod_raises(self, monkeypatch): + from src.services.sentry_webhook_service import ( + SentrySignatureError, + verify_sentry_signature, + ) + from src.core.config import settings + monkeypatch.setattr(settings, "SENTRY_WEBHOOK_SECRET", "") + monkeypatch.setattr(settings, "ENVIRONMENT", "prod") + + with pytest.raises(SentrySignatureError): + verify_sentry_signature(b"body", "some-sig") + + def test_missing_secret_in_dev_passes(self, monkeypatch): + from src.services.sentry_webhook_service import verify_sentry_signature + from src.core.config import settings + monkeypatch.setattr(settings, "SENTRY_WEBHOOK_SECRET", "") + monkeypatch.setattr(settings, "ENVIRONMENT", "dev") + + # dev 環境無 secret → 允許通過(不驗簽) + result = verify_sentry_signature(b"body", "") + assert result is True + + def test_missing_sig_header_raises(self, monkeypatch): + from src.services.sentry_webhook_service import ( + SentrySignatureError, + verify_sentry_signature, + ) + from src.core.config import settings + monkeypatch.setattr(settings, "SENTRY_WEBHOOK_SECRET", "test-secret") + monkeypatch.setattr(settings, "ENVIRONMENT", "dev") + + with pytest.raises(SentrySignatureError): + verify_sentry_signature(b"body", "") # 空 header + + +# ───────────────────────────────────────────────────────────────────────────── +# Test: endpoint signature 驗證接線確認 +# ───────────────────────────────────────────────────────────────────────────── + +class TestEndpointSignatureWiring: + def test_endpoint_imports_verify_sentry_signature(self): + """確認 sentry_webhook.py 已 import verify_sentry_signature""" + import inspect + from src.api.v1 import sentry_webhook + source = inspect.getsource(sentry_webhook) + assert "verify_sentry_signature" in source, \ + "sentry_webhook.py 應 import 並呼叫 verify_sentry_signature" + + def test_endpoint_imports_sentry_signature_error(self): + """確認 sentry_webhook.py 已 import SentrySignatureError""" + import inspect + from src.api.v1 import sentry_webhook + source = inspect.getsource(sentry_webhook) + assert "SentrySignatureError" in source, \ + "sentry_webhook.py 應 import SentrySignatureError 用於 401 處理" + + def test_endpoint_calls_verify_before_json_parse(self): + """確認 verify_sentry_signature 在 request.json() 之前呼叫""" + import inspect + from src.api.v1 import sentry_webhook + source = inspect.getsource(sentry_webhook.handle_sentry_error) + verify_pos = source.find("verify_sentry_signature") + json_pos = source.find("request.json()") + assert verify_pos != -1, "verify_sentry_signature 應在 handle_sentry_error 中" + assert json_pos != -1, "request.json() 應在 handle_sentry_error 中" + assert verify_pos < json_pos, "簽章驗證應在 JSON 解析之前"