From cae9833e5de6b6b940f27015245857f5513c62e6 Mon Sep 17 00:00:00 2001 From: OG T Date: Wed, 15 Apr 2026 13:17:10 +0800 Subject: [PATCH] =?UTF-8?q?fix(heartbeat):=20=E4=BF=AE=E5=BE=A9=E5=A4=9A?= =?UTF-8?q?=20replica=20=E9=87=8D=E8=A4=87=E7=99=BC=E9=80=81=E7=B3=BB?= =?UTF-8?q?=E7=B5=B1=E5=A0=B1=E5=91=8A=20bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 根因:RedisLock 在 async with 結束後立即 release, 兩個 pod 對齊同一 slot 但 offset 不同,第一個 pod 發完釋放鎖後 ~10s,第二個 pod 剛好 wake 並搶到空鎖 → 同一個 30min slot 發出兩條相同報告。 修復:改用 slot-based key (heartbeat:slot:{slot_id}) SET NX EX interval_seconds,不主動 release,讓 TTL 自然過期。整個 30min slot 只有第一個搶到的 pod 能發。 Co-Authored-By: Claude Sonnet 4.6 --- apps/api/src/services/telegram_gateway.py | 65 +++++++++++++---------- 1 file changed, 38 insertions(+), 27 deletions(-) diff --git a/apps/api/src/services/telegram_gateway.py b/apps/api/src/services/telegram_gateway.py index 0c1abb7c..619b3ccc 100644 --- a/apps/api/src/services/telegram_gateway.py +++ b/apps/api/src/services/telegram_gateway.py @@ -5189,45 +5189,56 @@ class TelegramGateway: - 並行探測所有服務(HeartbeatReportService) - 一條彙整報告發到 SRE_GROUP_CHAT_ID,不散發 - 沉默告警整合進報告 warnings,不額外多發 + + 2026-04-15 ogt: 修復多 replica 重複發送 bug + - 舊做法:RedisLock 在 async with 結束後立即 release, + 同 slot 另一 pod 若 10s 後 wake 可再次搶到鎖 → 重複發送 + - 新做法:slot-based key(heartbeat:slot:{slot_id}), + SET NX EX interval_seconds,讓 key 自然過期; + 不主動 release,整個 slot 週期只有一個 pod 能寫入 """ try: if not self._initialized: await self.initialize() - from src.core.redis_client import RedisLock + from src.core.redis_client import get_redis from src.services.heartbeat_report_service import ( HeartbeatReportService, report_to_telegram_html, ) - # 分散式鎖:同一心跳週期只有一個 replica 發報告 - # timeout=25*60 確保下一次心跳前鎖一定釋放(心跳間隔 30min) - # blocking_timeout=0: 鎖被佔用時立刻跳過,不排隊等待(避免多 replica 輪流發) - try: - async with RedisLock("heartbeat:leader", timeout=25 * 60, blocking_timeout=0): - report = await HeartbeatReportService().collect() - text = report_to_telegram_html(report) + # Slot-based 去重:每個 30min slot 只有第一個搶到的 replica 發送 + # key 自然過期(TTL = interval_seconds),不主動 release + interval_seconds = 30 * 60 + slot_id = int(datetime.now(UTC).timestamp() / interval_seconds) + slot_key = f"heartbeat:slot:{slot_id}" - # 只發到 SRE 戰情室群組 - if settings.SRE_GROUP_CHAT_ID: - await self.send_to_group(text=text) - else: - # SRE_GROUP_CHAT_ID 未注入時,fallback 到個人頻道並加警告 - fallback = ( - "⚠️ SRE_GROUP_CHAT_ID 未設定,心跳報告暫發到個人頻道\n\n" - + text - ) - await self.send_notification(fallback) + redis_client = get_redis() + acquired = await redis_client.set(slot_key, "1", nx=True, ex=interval_seconds) + if not acquired: + logger.debug("heartbeat_skipped_slot_taken", slot_id=slot_id) + return True - self._last_message_time = datetime.now(UTC) - logger.info( - "telegram_heartbeat_sent", - warnings=len(report.warnings), - has_sre_group=bool(settings.SRE_GROUP_CHAT_ID), - ) - except RuntimeError: - # 另一個 replica 持有鎖,本次跳過 - logger.debug("heartbeat_skipped_lock_taken") + report = await HeartbeatReportService().collect() + text = report_to_telegram_html(report) + + # 只發到 SRE 戰情室群組 + if settings.SRE_GROUP_CHAT_ID: + await self.send_to_group(text=text) + else: + # SRE_GROUP_CHAT_ID 未注入時,fallback 到個人頻道並加警告 + fallback = ( + "⚠️ SRE_GROUP_CHAT_ID 未設定,心跳報告暫發到個人頻道\n\n" + + text + ) + await self.send_notification(fallback) + + self._last_message_time = datetime.now(UTC) + logger.info( + "telegram_heartbeat_sent", + warnings=len(report.warnings), + has_sre_group=bool(settings.SRE_GROUP_CHAT_ID), + ) return True