diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 0c1abb7c..619b3ccc 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -5189,45 +5189,56 @@ class TelegramGateway: - 並行探測所有服務(HeartbeatReportService) - 一條彙整報告發到 SRE_GROUP_CHAT_ID,不散發 - 沉默告警整合進報告 warnings,不額外多發 + + 2026-04-15 ogt: 修復多 replica 重複發送 bug + - 舊做法:RedisLock 在 async with 結束後立即 release, + 同 slot 另一 pod 若 10s 後 wake 可再次搶到鎖 → 重複發送 + - 新做法:slot-based key(heartbeat:slot:{slot_id}), + SET NX EX interval_seconds,讓 key 自然過期; + 不主動 release,整個 slot 週期只有一個 pod 能寫入 """ try: if not self._initialized: await self.initialize() - from src.core.redis_client import RedisLock + from src.core.redis_client import get_redis from src.services.heartbeat_report_service import ( HeartbeatReportService, report_to_telegram_html, ) - # 分散式鎖:同一心跳週期只有一個 replica 發報告 - # timeout=25*60 確保下一次心跳前鎖一定釋放(心跳間隔 30min) - # blocking_timeout=0: 鎖被佔用時立刻跳過,不排隊等待(避免多 replica 輪流發) - try: - async with RedisLock("heartbeat:leader", timeout=25 * 60, blocking_timeout=0): - report = await HeartbeatReportService().collect() - text = report_to_telegram_html(report) + # Slot-based 去重:每個 30min slot 只有第一個搶到的 replica 發送 + # key 自然過期(TTL = interval_seconds),不主動 release + interval_seconds = 30 * 60 + slot_id = int(datetime.now(UTC).timestamp() / interval_seconds) + slot_key = f"heartbeat:slot:{slot_id}" - # 只發到 SRE 戰情室群組 - if settings.SRE_GROUP_CHAT_ID: - await self.send_to_group(text=text) - else: - # SRE_GROUP_CHAT_ID 未注入時,fallback 到個人頻道並加警告 - fallback = ( - "⚠️ SRE_GROUP_CHAT_ID 未設定,心跳報告暫發到個人頻道\n\n" - + text - ) - await self.send_notification(fallback) + redis_client = get_redis() + acquired = await redis_client.set(slot_key, "1", nx=True, ex=interval_seconds) + if not acquired: + logger.debug("heartbeat_skipped_slot_taken", slot_id=slot_id) + return True - self._last_message_time = datetime.now(UTC) - logger.info( - "telegram_heartbeat_sent", - warnings=len(report.warnings), - has_sre_group=bool(settings.SRE_GROUP_CHAT_ID), - ) - except RuntimeError: - # 另一個 replica 持有鎖,本次跳過 - logger.debug("heartbeat_skipped_lock_taken") + report = await HeartbeatReportService().collect() + text = report_to_telegram_html(report) + + # 只發到 SRE 戰情室群組 + if settings.SRE_GROUP_CHAT_ID: + await self.send_to_group(text=text) + else: + # SRE_GROUP_CHAT_ID 未注入時,fallback 到個人頻道並加警告 + fallback = ( + "⚠️ SRE_GROUP_CHAT_ID 未設定,心跳報告暫發到個人頻道\n\n" + + text + ) + await self.send_notification(fallback) + + self._last_message_time = datetime.now(UTC) + logger.info( + "telegram_heartbeat_sent", + warnings=len(report.warnings), + has_sre_group=bool(settings.SRE_GROUP_CHAT_ID), + ) return True