117 lines
4.5 KiB
Python
117 lines
4.5 KiB
Python
import json
|
||
import logging
|
||
from typing import Any, Dict, Optional
|
||
|
||
from sqlalchemy import text as sql_text
|
||
from database.manager import get_session
|
||
from services.hermes_analyst_service import HermesAnalystService
|
||
from services.nemoton_dispatcher_service import NemotronDispatcher
|
||
|
||
sys_log = logging.getLogger(__name__)
|
||
|
||
# SQLAlchemy text() 需從 sqlalchemy 導入,避免 F821
|
||
def _make_text(sql: str):
|
||
return sql_text(sql)
|
||
|
||
class AIOrchestrator:
|
||
"""
|
||
協調流程:
|
||
1) 從 session_id 載入 agent_context
|
||
2) 依 event 類型決定 L1 或 L2
|
||
3) 合併上下文與 event 後調用對應 Agent
|
||
4) 寫回更新後的上下文
|
||
"""
|
||
|
||
def __init__(self):
|
||
self.hermes = HermesAnalystService()
|
||
self.nemotron = NemotronDispatcher()
|
||
|
||
async def handle_l1(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
||
"""L1:Hermes 分析(負責翻譯與建議)"""
|
||
ctx = await self._load_context(session_id, "hermes")
|
||
enriched = self._merge_context(event, ctx)
|
||
result = await self.hermes._batch_analyze([enriched], pchome_prices={})
|
||
if result and result[0]:
|
||
out = result[0]
|
||
analysis = {
|
||
"summary": out.get("risk", "UNKNOWN"),
|
||
"probable_cause": out.get("recommended_action", ""),
|
||
"actions": [out.get("recommended_action", "")],
|
||
}
|
||
else:
|
||
analysis = {"summary": "資訊不足", "probable_cause": "", "actions": ["請人工確認"]}
|
||
|
||
await self._save_context(session_id, "hermes", analysis)
|
||
return analysis
|
||
|
||
async def handle_l2(self, event: Dict[str, Any], session_id: str) -> Dict[str, Any]:
|
||
"""L2:NemoTron 規劃 + 審核閘"""
|
||
ctx = await self._load_context(session_id, "nemotron")
|
||
enriched = self._merge_context(event, ctx)
|
||
plan = await self.nemotron.dispatch([enriched], hermes_stats=None)
|
||
analysis = {
|
||
"plan": {
|
||
"type": "price_adjust",
|
||
"sku": enriched.get("payload", {}).get("sku", ""),
|
||
"actions_taken": plan.get("dispatched", 0),
|
||
"summary": f"已提交 {plan.get('dispatched', 0)} 筄審核建議",
|
||
},
|
||
"actions_taken": [],
|
||
}
|
||
await self._save_context(session_id, "nemotron", analysis)
|
||
return analysis
|
||
|
||
# ── 內部工具 ────────────────────────────────────────────────
|
||
|
||
async def _load_context(self, session_id: str, agent: str) -> Dict[str, Any]:
|
||
session = get_session()
|
||
try:
|
||
sql = _make_text("""
|
||
SELECT context_val FROM agent_context
|
||
WHERE session_id = :sid AND agent_name = :ag
|
||
ORDER BY created_at DESC LIMIT 1
|
||
""")
|
||
row = session.execute(sql, {"sid": session_id, "ag": agent}).fetchone()
|
||
if row:
|
||
return json.loads(row[0]) if row[0] else {}
|
||
return {}
|
||
except Exception as e:
|
||
sys_log.warning(f"[Orchestrator] 載入 context 失敗: {e}")
|
||
return {}
|
||
finally:
|
||
session.close()
|
||
|
||
async def _save_context(self, session_id: str, agent: str, data: Dict[str, Any]) -> None:
|
||
session = get_session()
|
||
try:
|
||
session.execute(
|
||
_make_text("""
|
||
INSERT INTO agent_context
|
||
(session_id, agent_name, context_key, context_val, created_at, ttl_minutes)
|
||
VALUES
|
||
(:sid, :ag, :ck, :cv, NOW(), :ttl)
|
||
ON CONFLICT (session_id, agent_name, context_key)
|
||
DO UPDATE SET context_val = :cv, updated_at = NOW()
|
||
"""),
|
||
{
|
||
"sid": session_id,
|
||
"ag": agent,
|
||
"ck": "latest",
|
||
"cv": json.dumps(data, ensure_ascii=False),
|
||
"ttl": 1440, # 24h
|
||
},
|
||
)
|
||
session.commit()
|
||
except Exception as e:
|
||
session.rollback()
|
||
sys_log.warning(f"[Orchestrator] 寫入 context 失敗: {e}")
|
||
finally:
|
||
session.close()
|
||
|
||
def _merge_context(self, event: Dict[str, Any], ctx: Dict[str, Any]) -> Dict[str, Any]:
|
||
"""簡單合併:event 優先,ctx 作為額外資訊"""
|
||
merged = dict(event)
|
||
if ctx:
|
||
merged["_ctx"] = ctx
|
||
return merged
|