fix: gate elephant price decisions to HITL
All checks were successful
CD Pipeline / deploy (push) Successful in 2m11s
All checks were successful
CD Pipeline / deploy (push) Successful in 2m11s
This commit is contained in:
@@ -402,7 +402,7 @@ YOUTUBE_API_KEY = os.getenv('YOUTUBE_API_KEY', '')
|
||||
# ==========================================
|
||||
# 系統版本與路徑
|
||||
# ==========================================
|
||||
SYSTEM_VERSION = "V10.623"
|
||||
SYSTEM_VERSION = "V10.624"
|
||||
LOG_FILE_PATH = os.path.join(BASE_DIR, 'logs/system.log')
|
||||
public_url = PUBLIC_URL # 用於模板顯示
|
||||
|
||||
|
||||
@@ -2,7 +2,7 @@
|
||||
|
||||
> **最後更新**: 2026-06-16 (台北時間)
|
||||
> **狀態**: 🟢 四 AI Agent 自動化閉環已落地;LLM 路由紅線升級為 Ollama-first 三主機級聯;PChome 後台業績匯入韌性已補強;產品定位正名為「PChome 業績成長自動化作戰系統」;外部市場來源正規化層、自動同步、作戰清單與價格參考表優先讀取、CSV 備援預檢、前台操作入口、高可見頁面繁中化守門與比價/作戰 UI 工作台化已建立
|
||||
> **適用版本**: V10.623
|
||||
> **適用版本**: V10.624
|
||||
|
||||
---
|
||||
|
||||
@@ -197,6 +197,7 @@ SQL漏斗(~300筆)
|
||||
- ElephantAlpha 價格類 trigger 的 HITL / 決策 prefetch 必須先使用觸發 SQL 與 `competitor_prices` / `price_records` 的 DB 實證生成 SKU、MOMO / PChome 價差與建議 action lines;完整 Hermes LLM prefetch 預設關閉(`ELEPHANT_ALPHA_HERMES_LLM_PREFETCH_ENABLED=false`),避免 5s timeout 後落入無實證摘要或雲端備援。若無 DB 實證,只記錄 suppressed telemetry / cooldown,不發 Telegram 空告警。
|
||||
- ElephantAlpha `price_drop_alert` / `market_opportunity` trigger 不得對整張 `price_records` 做全表最新價聚合;必須先篩最近有效 `identity_v2` PChome 候選,再用 per-SKU `JOIN LATERAL` 讀最新 MOMO 價格,並把 `match_score`、`tags`、`match_diagnostic_json` 帶入 evidence。
|
||||
- ElephantAlpha 協調器收到非純 JSON、fenced JSON 或混文字 JSON 時,必須先做容錯抽取;仍無法解析時,只能使用 DB/Hermes 實證生成保守 HITL fallback。fallback 不得放入 OpenClaw `generate_*` 類舊策略步驟,也不得暗示已自動調價。
|
||||
- V10.624 起 ElephantAlpha 價格類 trigger 即使信心度達自主門檻,也只能發送 HITL 價格覆核通知;必須跳過 orchestrator `execution_plan` 內的 Hermes/NemoTron/OpenClaw 長任務 step。這是價格決策護欄:避免 60 秒 execution timeout 卡住 scheduler,也避免把價格策略誤描述為已自動執行。
|
||||
- ElephantAlpha 執行器若遇到舊版 OpenClaw strategy 類步驟(含 `generate_market_strategy` / `generate_dynamic_pricing_strategy` / `generate_resource_optimization_strategy`),只能記錄為 advisory skipped,不得觸發 circuit breaker,也不得轉成實際排程、外部呼叫或價格行動。
|
||||
- `resource_optimization` 不再交給 LLM 生成「預期效益 / 已執行」敘事,顯示名稱統一為「資源壓力治理」。此 trigger 必須先由程式量測 `action_plans` backlog、P1/P2 數、pending_review、逾時項目與 CPU load;只有 CPU 達門檻、P1/P2 積壓或逾時積壓才發 Telegram「資源壓力告警」。單純 queue 大但 CPU 正常只記錄 telemetry,不派發 Hermes/NemoTron、不宣稱 48 小時效益;Telegram 段落使用「系統處置紀錄」而非泛稱「已執行」,避免暗示 AI 已完成未經驗證的外部動作。
|
||||
- `resource_optimization` 的 Telegram 必須包含 `decision_envelope` 區塊,標明 `source_agent=elephant_alpha`、資料品質、量測證據、`can_auto_execute=false` 與 deterministic trace;此路徑不呼叫 Gemini、不呼叫 Hermes/NemoTron,也不得把 queue backlog 翻譯成主機資源耗盡。
|
||||
|
||||
@@ -303,3 +303,10 @@
|
||||
- `/price_comparison` 的結果區新增決策摘要,先顯示「需檢查售價或活動 / 可主推曝光 / 觀察賣點」三類數字與建議,再往下看明細表。
|
||||
- `/ai_intelligence` 第一屏新增今日任務摘要,直接顯示今日任務、可立即處理、待補比價與最新業績日;資料來自現有 PChome growth API stats。
|
||||
- 測試守門新增 `priceDecisionGrid`、`price-workflow-strip`、`price-result-summary-grid`、`growth-executive-strip` 與 `renderGrowthExecutiveSummary`,避免頁面退回只有文字說明的狀態。
|
||||
|
||||
## 27. 2026-06-18 V10.624 ElephantAlpha 價格決策只進 HITL
|
||||
|
||||
- 部署後觀察到 `price_drop_alert` 進入 ElephantAlpha execution plan 後可能卡在 Hermes/NemoTron step,最後以 60 秒 timeout 污染 scheduler log。
|
||||
- V10.624 將價格類 trigger 的高信心路徑改為「有實證就發 L3 HITL 價格覆核通知」,不再執行 orchestrator `execution_plan`,避免長任務 timeout 與自動調價誤解。
|
||||
- 新增 `price_decision_review` 決策信封,固定標示 `can_auto_execute=false`、`requires_hitl=true`、`execution_plan skipped`;通知只呈現 DB/Hermes 具體價差實證。
|
||||
- 測試新增高信心價格決策不執行長任務 step 的守門,避免未來又把價格告警回退成自主執行。
|
||||
|
||||
@@ -716,6 +716,18 @@ class ElephantAlphaAutonomousEngine:
|
||||
return
|
||||
|
||||
if decision.confidence >= (0.85 if trigger.trigger_type in {"price_drop_alert", "market_opportunity"} else self.confidence_threshold):
|
||||
if trigger.trigger_type in _PRICE_RELATED_TRIGGERS:
|
||||
# 價格類決策即使信心高,也只進 HITL 覆核通知;不得執行
|
||||
# orchestrator 給出的 Hermes/NemoTron/OpenClaw 長任務 step。
|
||||
# 這避免 scheduler 被 60s execution timeout 卡住,也避免自動調價。
|
||||
await self._notify_telegram_executed(decision, trigger)
|
||||
self._store_escalation(trigger.trigger_type)
|
||||
self._log.info(
|
||||
"Price decision queued for HITL review; execution plan skipped: %s",
|
||||
trigger.trigger_type,
|
||||
)
|
||||
self._circuit_reset()
|
||||
return
|
||||
try:
|
||||
await self._run_with_timeout(
|
||||
self._execute_decision,
|
||||
@@ -1770,6 +1782,10 @@ class ElephantAlphaAutonomousEngine:
|
||||
decision: StrategicDecision,
|
||||
trigger: AutonomousTrigger,
|
||||
) -> None:
|
||||
if trigger.trigger_type in _PRICE_RELATED_TRIGGERS:
|
||||
await self._notify_price_decision_review(decision, trigger)
|
||||
return
|
||||
|
||||
if trigger.trigger_type == "resource_optimization":
|
||||
self._log.warning(
|
||||
"Suppressed legacy autonomous execution Telegram template for resource_optimization; "
|
||||
@@ -1800,6 +1816,123 @@ class ElephantAlphaAutonomousEngine:
|
||||
except Exception as e:
|
||||
self._log.error("Telegram audit failed (non-blocking): %s", e)
|
||||
|
||||
@staticmethod
|
||||
def _build_price_review_decision_envelope(
|
||||
decision: StrategicDecision,
|
||||
trigger: AutonomousTrigger,
|
||||
*,
|
||||
concrete_actions: Optional[List[str]] = None,
|
||||
) -> Dict[str, Any]:
|
||||
concrete_actions = [str(item).strip() for item in (concrete_actions or []) if str(item).strip()]
|
||||
trigger_label = _zh_trigger(trigger.trigger_type)
|
||||
evidence = [
|
||||
{
|
||||
"type": "confidence",
|
||||
"metric": "decision_confidence",
|
||||
"value": f"{float(decision.confidence or 0):.2f}",
|
||||
"basis": "ElephantAlpha high-confidence price signal; HITL still required",
|
||||
"confidence": float(decision.confidence or 0),
|
||||
},
|
||||
{
|
||||
"type": "trigger",
|
||||
"metric": "trigger_type",
|
||||
"value": trigger.trigger_type,
|
||||
"basis": trigger_label,
|
||||
},
|
||||
]
|
||||
for idx, action in enumerate(concrete_actions[:3], start=1):
|
||||
evidence.append({
|
||||
"type": "price_evidence",
|
||||
"metric": f"candidate_{idx}",
|
||||
"value": action[:120],
|
||||
"basis": "DB/Hermes concrete price evidence",
|
||||
})
|
||||
|
||||
return {
|
||||
"decision_id": f"ea_price_review_{trigger.trigger_type}_{int(datetime.now().timestamp())}",
|
||||
"decision_type": "price_decision_review",
|
||||
"source_agent": "elephant_alpha",
|
||||
"severity": "P2",
|
||||
"confidence": float(decision.confidence or 0),
|
||||
"analysis": "已找到價格比對實證,轉人工覆核;未批准前不執行調價或外部修復。",
|
||||
"subject": {
|
||||
"sku": trigger.trigger_type,
|
||||
"name": f"Elephant Alpha · {trigger_label}",
|
||||
},
|
||||
"evidence": evidence,
|
||||
"recommended_action": {
|
||||
"action": "review_price_or_promo",
|
||||
"owner": "ops",
|
||||
"requires_hitl": True,
|
||||
},
|
||||
"expected_impact": {
|
||||
"risk_reduction": "prevent unverified automated price action while preserving actionable evidence",
|
||||
},
|
||||
"guardrails": {
|
||||
"can_auto_execute": False,
|
||||
"blocked_reason": "price decisions require HITL; execution_plan skipped",
|
||||
"data_quality": "complete" if concrete_actions else "missing",
|
||||
},
|
||||
"trace": {
|
||||
"model": "deterministic_price_hitl_gate",
|
||||
"provider": "elephant_alpha",
|
||||
"trigger": trigger.trigger_type,
|
||||
},
|
||||
}
|
||||
|
||||
async def _notify_price_decision_review(
|
||||
self,
|
||||
decision: StrategicDecision,
|
||||
trigger: AutonomousTrigger,
|
||||
) -> None:
|
||||
concrete_actions = (
|
||||
self._get_prefetched_concrete_actions(trigger)
|
||||
or self._get_trigger_db_concrete_actions(trigger)
|
||||
)
|
||||
if not concrete_actions:
|
||||
self._record_suppressed_escalation(decision, trigger, "no_concrete_price_evidence")
|
||||
return
|
||||
|
||||
try:
|
||||
from services.telegram_templates import triaged_alert, _send_telegram_raw
|
||||
|
||||
decision_envelope = self._build_price_review_decision_envelope(
|
||||
decision,
|
||||
trigger,
|
||||
concrete_actions=concrete_actions,
|
||||
)
|
||||
msg, keyboard = triaged_alert(
|
||||
base_event={
|
||||
"event_type": "ea_price_review",
|
||||
"title": f"🐘 EA 價格覆核 · {_zh_trigger(trigger.trigger_type)}",
|
||||
"summary": (
|
||||
f"找到 {len(concrete_actions)} 筆價格比對實證,已轉人工覆核;"
|
||||
"未批准前不自動調價。"
|
||||
),
|
||||
"id": decision_envelope.get("decision_id"),
|
||||
"decision_envelope": decision_envelope,
|
||||
},
|
||||
tier_label="🐘 Elephant Alpha · L3 HITL",
|
||||
ai_summary=(
|
||||
f"已保留 {len(concrete_actions)} 筆 DB/Hermes 價格比對實證;"
|
||||
"本通知只要求人工覆核,不執行外部修復或調價。"
|
||||
),
|
||||
ai_cause=(
|
||||
f"觸發類型:{_zh_trigger(trigger.trigger_type)} | "
|
||||
f"信心度:{decision.confidence:.2f} | "
|
||||
"高信心價格訊號仍需 HITL"
|
||||
),
|
||||
ai_actions=concrete_actions,
|
||||
)
|
||||
await self._run_with_timeout(_send_telegram_raw, msg, timeout=10, reply_markup=keyboard)
|
||||
self._log.info(
|
||||
"Price HITL review Telegram sent: %s concrete=%d",
|
||||
trigger.trigger_type,
|
||||
len(concrete_actions),
|
||||
)
|
||||
except Exception as e:
|
||||
self._log.error("Price HITL review Telegram failed (non-blocking): %s", e)
|
||||
|
||||
@staticmethod
|
||||
def _get_prefetched_concrete_actions(trigger: AutonomousTrigger) -> Optional[List[str]]:
|
||||
actions = (trigger.conditions or {}).get("_prefetched_hermes_threats")
|
||||
|
||||
@@ -198,6 +198,93 @@ def test_execute_autonomous_decision_uses_db_evidence_without_hermes_prefetch(mo
|
||||
assert notified == ["price_drop_alert"]
|
||||
|
||||
|
||||
def test_high_confidence_price_decision_skips_execution_plan_and_goes_hitl(monkeypatch):
|
||||
import services.elephant_alpha_autonomous_engine as engine_module
|
||||
from services.elephant_alpha_autonomous_engine import (
|
||||
AutonomousTrigger,
|
||||
ElephantAlphaAutonomousEngine,
|
||||
)
|
||||
from services.elephant_alpha_orchestrator import StrategicDecision
|
||||
|
||||
engine = ElephantAlphaAutonomousEngine()
|
||||
notified = []
|
||||
stored = []
|
||||
|
||||
async def _capture_context(context):
|
||||
return StrategicDecision(
|
||||
priority="high",
|
||||
agents_required=["hermes", "nemotron"],
|
||||
reasoning="已有 DB 價格比對實證,轉人工覆核。",
|
||||
expected_outcome="人工確認價格策略",
|
||||
confidence=0.96,
|
||||
execution_plan=[
|
||||
{"agent": "hermes", "action": "analyze_price_competition"},
|
||||
{"agent": "nemotron", "action": "dispatch_alert", "parameters": {"threats": []}},
|
||||
],
|
||||
resource_requirements={},
|
||||
)
|
||||
|
||||
async def _execution_should_not_run(decision):
|
||||
raise AssertionError("price decisions must not execute long-running autonomous steps")
|
||||
|
||||
async def _capture_notify(decision, trigger):
|
||||
notified.append((trigger.trigger_type, len(decision.execution_plan)))
|
||||
|
||||
monkeypatch.setattr(engine_module.elephant_orchestrator, "analyze_and_coordinate", _capture_context)
|
||||
monkeypatch.setattr(engine, "_execute_decision", _execution_should_not_run)
|
||||
monkeypatch.setattr(engine, "_notify_telegram_executed", _capture_notify)
|
||||
monkeypatch.setattr(engine, "_store_escalation", lambda trigger_type: stored.append(trigger_type))
|
||||
|
||||
trigger = AutonomousTrigger(
|
||||
trigger_type="price_drop_alert",
|
||||
conditions={"_db_evidence_actions": ["[SKU-1] DB 實證價差"]},
|
||||
threshold=0.8,
|
||||
enabled=True,
|
||||
)
|
||||
|
||||
asyncio.run(engine._execute_autonomous_decision(trigger))
|
||||
|
||||
assert notified == [("price_drop_alert", 2)]
|
||||
assert stored == ["price_drop_alert"]
|
||||
|
||||
|
||||
def test_price_review_decision_envelope_blocks_auto_execution():
|
||||
from services.elephant_alpha_autonomous_engine import (
|
||||
AutonomousTrigger,
|
||||
ElephantAlphaAutonomousEngine,
|
||||
)
|
||||
from services.elephant_alpha_orchestrator import StrategicDecision
|
||||
|
||||
decision = StrategicDecision(
|
||||
priority="high",
|
||||
agents_required=["elephant_alpha"],
|
||||
reasoning="已有價格比對實證。",
|
||||
expected_outcome="人工確認價格策略。",
|
||||
confidence=0.96,
|
||||
execution_plan=[{"agent": "hermes", "action": "analyze_price_competition"}],
|
||||
resource_requirements={},
|
||||
)
|
||||
trigger = AutonomousTrigger(
|
||||
trigger_type="price_drop_alert",
|
||||
conditions={},
|
||||
threshold=0.8,
|
||||
enabled=True,
|
||||
)
|
||||
|
||||
envelope = ElephantAlphaAutonomousEngine._build_price_review_decision_envelope(
|
||||
decision,
|
||||
trigger,
|
||||
concrete_actions=["[SKU-1] MOMO $1,200 vs PChome $990"],
|
||||
)
|
||||
|
||||
assert envelope["decision_type"] == "price_decision_review"
|
||||
assert envelope["source_agent"] == "elephant_alpha"
|
||||
assert envelope["recommended_action"]["requires_hitl"] is True
|
||||
assert envelope["guardrails"]["can_auto_execute"] is False
|
||||
assert envelope["guardrails"]["blocked_reason"] == "price decisions require HITL; execution_plan skipped"
|
||||
assert envelope["guardrails"]["data_quality"] == "complete"
|
||||
|
||||
|
||||
def test_price_trigger_queries_use_lateral_latest_price_lookup(monkeypatch):
|
||||
import services.elephant_alpha_autonomous_engine as engine_module
|
||||
from services.elephant_alpha_autonomous_engine import (
|
||||
|
||||
Reference in New Issue
Block a user