fix(heartbeat): 修復多 replica 重複發送系統報告 bug

根因:RedisLock 在 async with 結束後立即 release,
兩個 pod 對齊同一 slot 但 offset 不同,第一個 pod
發完釋放鎖後 ~10s,第二個 pod 剛好 wake 並搶到空鎖
→ 同一個 30min slot 發出兩條相同報告。

修復:改用 slot-based key (heartbeat:slot:{slot_id})
SET NX EX interval_seconds,不主動 release,讓 TTL
自然過期。整個 30min slot 只有第一個搶到的 pod 能發。

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
OG T
2026-04-15 13:17:10 +08:00
parent f1cbf6db7d
commit cae9833e5d

View File

@@ -5189,45 +5189,56 @@ class TelegramGateway:
- 並行探測所有服務HeartbeatReportService
- 一條彙整報告發到 SRE_GROUP_CHAT_ID不散發
- 沉默告警整合進報告 warnings不額外多發
2026-04-15 ogt: 修復多 replica 重複發送 bug
- 舊做法RedisLock 在 async with 結束後立即 release
同 slot 另一 pod 若 10s 後 wake 可再次搶到鎖 → 重複發送
- 新做法slot-based keyheartbeat:slot:{slot_id}
SET NX EX interval_seconds讓 key 自然過期;
不主動 release整個 slot 週期只有一個 pod 能寫入
"""
try:
if not self._initialized:
await self.initialize()
from src.core.redis_client import RedisLock
from src.core.redis_client import get_redis
from src.services.heartbeat_report_service import (
HeartbeatReportService,
report_to_telegram_html,
)
# 分散式鎖:同一心跳週期只有一個 replica 發報告
# timeout=25*60 確保下一次心跳前鎖一定釋放(心跳間隔 30min
# blocking_timeout=0: 鎖被佔用時立刻跳過,不排隊等待(避免多 replica 輪流發)
try:
async with RedisLock("heartbeat:leader", timeout=25 * 60, blocking_timeout=0):
report = await HeartbeatReportService().collect()
text = report_to_telegram_html(report)
# Slot-based 去重:每個 30min slot 只有一個搶到的 replica 發
# key 自然過期TTL = interval_seconds不主動 release
interval_seconds = 30 * 60
slot_id = int(datetime.now(UTC).timestamp() / interval_seconds)
slot_key = f"heartbeat:slot:{slot_id}"
# 只發到 SRE 戰情室群組
if settings.SRE_GROUP_CHAT_ID:
await self.send_to_group(text=text)
else:
# SRE_GROUP_CHAT_ID 未注入時fallback 到個人頻道並加警告
fallback = (
"⚠️ <b>SRE_GROUP_CHAT_ID 未設定</b>,心跳報告暫發到個人頻道\n\n"
+ text
)
await self.send_notification(fallback)
redis_client = get_redis()
acquired = await redis_client.set(slot_key, "1", nx=True, ex=interval_seconds)
if not acquired:
logger.debug("heartbeat_skipped_slot_taken", slot_id=slot_id)
return True
self._last_message_time = datetime.now(UTC)
logger.info(
"telegram_heartbeat_sent",
warnings=len(report.warnings),
has_sre_group=bool(settings.SRE_GROUP_CHAT_ID),
)
except RuntimeError:
# 另一個 replica 持有鎖,本次跳過
logger.debug("heartbeat_skipped_lock_taken")
report = await HeartbeatReportService().collect()
text = report_to_telegram_html(report)
# 只發到 SRE 戰情室群組
if settings.SRE_GROUP_CHAT_ID:
await self.send_to_group(text=text)
else:
# SRE_GROUP_CHAT_ID 未注入時fallback 到個人頻道並加警告
fallback = (
"⚠️ <b>SRE_GROUP_CHAT_ID 未設定</b>,心跳報告暫發到個人頻道\n\n"
+ text
)
await self.send_notification(fallback)
self._last_message_time = datetime.now(UTC)
logger.info(
"telegram_heartbeat_sent",
warnings=len(report.warnings),
has_sre_group=bool(settings.SRE_GROUP_CHAT_ID),
)
return True