From 61496af2c5668167307aba79be7a05f29915d420 Mon Sep 17 00:00:00 2001 From: ogt Date: Mon, 20 Apr 2026 05:34:21 +0800 Subject: [PATCH] fix: stop runaway EA Telegram spam (cooldown + API key detection + dedup) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Root cause: OPENROUTER_API_KEY not set → fallback confidence=0.60 → always below threshold → _escalate_to_human() every 60s loop → infinite Telegram messages, all meaningless. Three-layer fix: 1. API Key detection: if fallback_decision triggered (reasoning contains "Elephant Alpha unavailable"), silently skip — no Telegram, no cost, update last_triggered to prevent infinite retry 2. Per-trigger cooldown in _check_triggers(): price_drop_alert 30min / market_opportunity 60min / threat_escalation 15min / resource_optimization 60min 3. Escalation dedup in _escalate_to_human(): _last_escalated[] tracks last Telegram send time per trigger type; suppresses within cooldown Valid HITL escalations (when EA is actually online) still work correctly. Co-Authored-By: Claude Sonnet 4.6 --- services/elephant_alpha_autonomous_engine.py | 63 +++++++++++++++++++- 1 file changed, 60 insertions(+), 3 deletions(-) diff --git a/services/elephant_alpha_autonomous_engine.py b/services/elephant_alpha_autonomous_engine.py index 38a54c4..b21c208 100644 --- a/services/elephant_alpha_autonomous_engine.py +++ b/services/elephant_alpha_autonomous_engine.py @@ -81,6 +81,15 @@ class ElephantAlphaAutonomousEngine: ADR-013: resource_optimization → auto_heal_service """ + # 各 trigger 的 escalation cooldown(分鐘) + ESCALATION_COOLDOWN: Dict[str, int] = { + "price_drop_alert": 30, # 同一類型 30 分鐘只發一次 + "market_opportunity": 60, + "threat_escalation": 15, + "resource_optimization": 60, + } + DEFAULT_COOLDOWN_MIN = 30 + def __init__(self): self.decision_history: List[DecisionOutcome] = [] # 最近 100 筆快取;持久化到 DB self.triggers: List[AutonomousTrigger] = [] @@ -95,6 +104,9 @@ class ElephantAlphaAutonomousEngine: self.max_hourly_cost_usd = 5.0 # $5/hr 硬上限 self._cost_per_ea_call = 0.002 # ~100B 模型每次約 $0.002 (1K tokens) + # Escalation dedup:記錄每個 trigger 最後一次 escalate 時間 + self._last_escalated: Dict[str, datetime] = {} + self._initialize_triggers() def _initialize_triggers(self): @@ -151,6 +163,18 @@ class ElephantAlphaAutonomousEngine: for trigger in self.triggers: if not trigger.enabled: continue + # Cooldown guard:同一 trigger 在 cooldown 期間內不重複執行 + cooldown_min = self.ESCALATION_COOLDOWN.get( + trigger.trigger_type, self.DEFAULT_COOLDOWN_MIN + ) + if trigger.last_triggered: + elapsed = (datetime.now() - trigger.last_triggered).total_seconds() / 60 + if elapsed < cooldown_min: + logger.debug( + f"[ElephantAlpha] Trigger {trigger.trigger_type} in cooldown " + f"({elapsed:.1f}/{cooldown_min} min), skip" + ) + continue if await self._evaluate_trigger(trigger): await self._execute_autonomous_decision(trigger) @@ -254,10 +278,23 @@ class ElephantAlphaAutonomousEngine: # ADR-013: resource_optimization → AIOps autoheal loop if trigger.trigger_type == "resource_optimization": await self._handle_resource_via_autoheal(trigger) + trigger.last_triggered = datetime.now() return context = await self._build_trigger_context(trigger) decision = await elephant_orchestrator.analyze_and_coordinate(context) + + # ── API Key 未設定時 fallback decision 的偵測 ────────────── + # fallback_decision 的 reasoning 固定字串,偵測到就靜默跳過, + # 不發 Telegram、不計費、只 log warning。 + if "Elephant Alpha unavailable" in (decision.reasoning or ""): + logger.warning( + f"[ElephantAlpha] API Key 未設定,trigger={trigger.trigger_type} " + "跳過本次決策(不發 Telegram)。請在 momo-scheduler 環境變數注入 OPENROUTER_API_KEY。" + ) + trigger.last_triggered = datetime.now() # 仍更新以避免無限重試 + return + self.hourly_cost_usd += self._cost_per_ea_call # 護欄 1: price 行動信心閾值 0.85 @@ -275,7 +312,8 @@ class ElephantAlphaAutonomousEngine: # W2-B: ADR-012 §⑤ — 執行完畢後強制發 Telegram audit trail await self._notify_telegram_executed(decision, trigger) else: - # W2-A: ADR-012 §⑤ — 升級人工 + 強制發 Telegram + # W2-A: ADR-012 §⑤ — 升級人工 + Telegram(有 dedup 保護) + trigger.last_triggered = datetime.now() await self._escalate_to_human(decision, trigger) async def _handle_resource_via_autoheal(self, trigger: AutonomousTrigger): @@ -534,13 +572,24 @@ class ElephantAlphaAutonomousEngine: W2-A: ADR-012 §⑤ — 信心不足時雙寫: 1. ai_insights (DB) 2. triaged_alert Telegram → 統帥收到升級通知 + Dedup:同一 trigger_type 在 cooldown 期間內只發一次 Telegram。 """ logger.warning( f"[ElephantAlpha] Escalating to human: {trigger.trigger_type} " f"confidence={decision.confidence:.2f}" ) - # 1. DB write + # Dedup check:同一 trigger 在 cooldown 期間內只寫 DB + 發一次 Telegram + cooldown_min = self.ESCALATION_COOLDOWN.get( + trigger.trigger_type, self.DEFAULT_COOLDOWN_MIN + ) + last_esc = self._last_escalated.get(trigger.trigger_type) + telegram_allowed = ( + last_esc is None + or (datetime.now() - last_esc).total_seconds() / 60 >= cooldown_min + ) + + # 1. DB write(每次都記錄,不 dedup) session = get_session() try: session.execute(text(""" @@ -569,7 +618,15 @@ class ElephantAlphaAutonomousEngine: finally: session.close() - # 2. Telegram (ADR-012 §⑤ 強制) + # 2. Telegram (ADR-012 §⑤):只在 cooldown 過後才發,避免轟炸 + if not telegram_allowed: + logger.info( + f"[ElephantAlpha] Escalation Telegram suppressed (cooldown {cooldown_min} min): " + f"{trigger.trigger_type}" + ) + return + + self._last_escalated[trigger.trigger_type] = datetime.now() try: from services.telegram_templates import triaged_alert, _send_telegram_raw msg, keyboard = triaged_alert(