Files
ewoooc/services/openclaw_strategist_service.py
OoO 0c2e9bbced
All checks were successful
CD Pipeline / deploy (push) Successful in 1m13s
串接 AI 洞察向量化與漏通知入口
2026-04-29 23:05:46 +08:00

1301 lines
52 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
services/openclaw_strategist_service.py
OpenClaw 戰略分析師Gemini 2.5 Flash
完整電商情報分析管線:
DB 爬蟲數據 + MCP 外部情報 → Gemini 深度分析 → ai_insights 持久化 → Telegram 推播
提供:
generate_weekly_strategy_report() — 週報(每週一 06:00
generate_meta_analysis_report() — AI 系統效能自我審視(每 6 小時)
分析維度:
1. 業績趨勢MoM / WoW
2. 競品價格比對
3. 定價策略建議
4. 行銷活動洞察
5. 季節性 / 節日機會
6. TOP 威脅 / 機會品項
7. 具體行動清單48h 優先事項)
"""
import json
import logging
import os
import requests
from datetime import datetime, timedelta
from typing import Any, Dict, List, Optional
from database.manager import get_session
from sqlalchemy import text
logger = logging.getLogger(__name__)
GEMINI_API_KEY = os.getenv("GEMINI_API_KEY", "")
STRATEGY_MODEL = os.getenv("OPENCLAW_MODEL", "gemini-2.5-flash")
NVIDIA_API_KEY = os.getenv("NVIDIA_API_KEY", "")
NVIDIA_NIM_URL = "https://integrate.api.nvidia.com/v1/chat/completions"
NVIDIA_FALLBACK_MODEL = "meta/llama-3.3-70b-instruct"
TAIPEI_TZ_OFFSET = 8 # UTC+8
__all__ = [
"generate_daily_report",
"generate_weekly_strategy_report",
"generate_monthly_report",
"generate_meta_analysis_report",
"generate_strategy_response",
]
# ═══════════════════════════════════════════════════════════════════════════════
# Telegram NLP 互動入口(輕量查詢,不走完整報告管線)
# ═══════════════════════════════════════════════════════════════════════════════
def generate_strategy_response(query: str, context: Optional[Dict[str, Any]] = None) -> str:
"""給 Telegram NLP 使用的輕量策略回覆。
Contract:
query: 使用者自然語言訊息(繁體中文)
context: 可選,{"intent": str, "user_id": int, ...}
Returns:
繁體中文回覆字串。GEMINI_API_KEY 未設或呼叫失敗時,回降級訊息
(永遠回字串,不拋例外,由呼叫端顯示於 Telegram
"""
q = (query or "").strip()
if not q:
return "請輸入您的問題,例如:本週業績趨勢、競品價差分析、產出週報 PPT。"
system_prompt = (
"你是 MOMO Pro 電商情報策略師「OpenClaw」。以繁體中文台灣用語回覆使用者。"
"嚴禁簡體字,嚴禁空洞套話。若使用者要求的資料需即時查詢,"
"請告知使用者相關可用指令(例如 /daily、/weekly、/threats"
"回覆長度控制在 500 字內,可用 Markdown 條列。"
)
user_prompt = f"使用者問題:{q}\n上下文:{json.dumps(context or {}, ensure_ascii=False)}"
# 優先 Gemini無 key 或失敗時自動備援 NVIDIA NIM
text_reply = None
if GEMINI_API_KEY:
try:
text_reply = _call_gemini(system_prompt, user_prompt, temperature=0.5)
except Exception as e:
logger.warning("[OpenClaw] Gemini 呼叫失敗,備援 NVIDIA NIM%s", e)
if not text_reply and NVIDIA_API_KEY:
try:
text_reply = _call_nvidia_nim(system_prompt, user_prompt)
except Exception as e:
logger.error("[OpenClaw] NVIDIA NIM 備援也失敗:%s", e)
if not text_reply:
return (
"策略師暫時無法回覆Gemini 與 NVIDIA NIM 均離線)。\n"
"請改用:/daily、/weekly、/threats 取得結構化報告。"
)
return text_reply
# ═══════════════════════════════════════════════════════════════════════════════
# DB 數據讀取層
# ═══════════════════════════════════════════════════════════════════════════════
def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
"""近 N 天業績彙總(本期 / 前期 對比)"""
session = get_session()
try:
rows = session.execute(text("""
SELECT
snapshot_date::date AS dt,
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count
FROM daily_sales_snapshot
WHERE snapshot_date::date >= CURRENT_DATE - :days
GROUP BY dt
ORDER BY dt DESC
"""), {"days": days}).fetchall()
data = [{"date": str(r[0]), "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
for r in rows]
mid = len(data) // 2
curr_rev = sum(d["revenue"] for d in data[:mid]) if mid else 0
prev_rev = sum(d["revenue"] for d in data[mid:]) if mid else 0
wow = ((curr_rev - prev_rev) / prev_rev * 100) if prev_rev else 0
return {
"daily": data[:7],
"current_7d_revenue": curr_rev,
"prev_7d_revenue": prev_rev,
"wow_pct": round(wow, 1),
}
except Exception as e:
logger.warning("[OpenClaw] 業績數據讀取失敗: %s", e)
return {}
finally:
session.close()
def _fetch_top_threats(limit: int = 10) -> List[Dict]:
"""最新 TOP N 競價威脅(來自 Hermes 分析)"""
session = get_session()
try:
rows = session.execute(text("""
SELECT product_sku, content, confidence, metadata_json, created_at
FROM ai_insights
WHERE insight_type = 'price_alert'
AND status = 'approved'
AND created_at >= NOW() - INTERVAL '48 hours'
ORDER BY confidence DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
result = []
for r in rows:
meta = {}
try:
meta = json.loads(r[3]) if r[3] else {}
except Exception:
pass
result.append({
"sku": r[0],
"summary": (r[1] or "")[:200],
"confidence": float(r[2] or 0),
"gap_pct": meta.get("gap_pct", 0),
"sales_delta": meta.get("sales_7d_delta_pct", 0),
"momo_price": meta.get("momo_price"),
"pchome_price": meta.get("pchome_price"),
})
return result
except Exception as e:
logger.warning("[OpenClaw] 威脅數據讀取失敗: %s", e)
return []
finally:
session.close()
def _fetch_top_recommendations(limit: int = 10) -> List[Dict]:
"""最新定價建議"""
session = get_session()
try:
rows = session.execute(text("""
SELECT sku, name, reason, strategy, confidence,
momo_price, pchome_price, gap_pct, sales_7d_delta
FROM ai_price_recommendations
WHERE status = 'pending'
AND created_at >= NOW() - INTERVAL '48 hours'
ORDER BY confidence DESC
LIMIT :lim
"""), {"lim": limit}).fetchall()
return [dict(zip(
["sku","name","reason","strategy","confidence","momo_price","pchome_price","gap_pct","sales_delta"],
r
)) for r in rows]
except Exception as e:
logger.warning("[OpenClaw] 建議數據讀取失敗: %s", e)
return []
finally:
session.close()
def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
"""品類業績分佈"""
session = get_session()
try:
rows = session.execute(text("""
SELECT p.category,
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue,
COUNT(DISTINCT p.i_code) AS sku_count
FROM daily_sales_snapshot s
JOIN products p ON p.name = s."商品名稱"
WHERE s.snapshot_date::date >= CURRENT_DATE - :days
AND p.status = 'ACTIVE'
GROUP BY p.category
ORDER BY revenue DESC
LIMIT 10
"""), {"days": days}).fetchall()
return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
for r in rows]
except Exception as e:
logger.warning("[OpenClaw] 品類數據讀取失敗: %s", e)
return []
finally:
session.close()
def _fetch_competitor_summary() -> Dict[str, Any]:
"""競品價格整體概況"""
session = get_session()
try:
row = session.execute(text("""
SELECT
COUNT(*) AS total,
AVG((cp.price - pr.price) / pr.price * 100) AS avg_gap_pct,
SUM(CASE WHEN cp.price < pr.price * 0.9 THEN 1 ELSE 0 END) AS undercut_count,
SUM(CASE WHEN cp.price > pr.price * 1.1 THEN 1 ELSE 0 END) AS premium_count
FROM competitor_prices cp
JOIN products p ON p.i_code = cp.sku
JOIN (
SELECT DISTINCT ON (product_id) product_id, price
FROM price_records ORDER BY product_id, timestamp DESC
) pr ON pr.product_id = p.id
WHERE cp.expires_at > NOW()
""")).fetchone()
if row and row[0]:
return {
"total_skus": int(row[0]),
"avg_gap_pct": round(float(row[1] or 0), 1),
"undercut_count": int(row[2] or 0),
"premium_count": int(row[3] or 0),
}
return {}
except Exception as e:
logger.warning("[OpenClaw] 競品概況讀取失敗: %s", e)
return {}
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# DB 寫入層
# ═══════════════════════════════════════════════════════════════════════════════
def _save_to_ai_insights(
insight_type: str,
content: str,
confidence: float,
metadata: Dict[str, Any],
period: Optional[str] = None,
) -> Optional[int]:
"""將分析結果持久化到 ai_insights"""
session = get_session()
try:
row = session.execute(text("""
INSERT INTO ai_insights
(insight_type, content, confidence, created_by, status,
metadata_json, period, created_at)
VALUES (:type, :content, :conf, 'openclaw', 'active', :meta, :period, NOW())
RETURNING id
"""), {
"type": insight_type,
"content": content[:8000],
"conf": confidence,
"meta": json.dumps(metadata, ensure_ascii=False),
"period": period or datetime.now().strftime("%Y-%m-%d"),
}).fetchone()
session.commit()
insight_id = row[0] if row else None
if insight_id:
try:
from services.openclaw_learning_service import enqueue_insight_embedding
enqueue_insight_embedding(insight_id, insight_type, content[:8000], period or datetime.now().strftime("%Y-%m-%d"))
except Exception as embed_err:
logger.warning("[OpenClaw] embedding queue enqueue failed id=%s: %s", insight_id, embed_err)
logger.info("[OpenClaw] ai_insights 寫入成功 id=%s type=%s", insight_id, insight_type)
return insight_id
except Exception as e:
logger.error("[OpenClaw] ai_insights 寫入失敗: %s", e)
session.rollback()
return None
finally:
session.close()
def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None:
"""將 AI 建議的行動項目寫入 action_plans"""
if not actions:
return
session = get_session()
try:
for i, action in enumerate(actions[:10]):
session.execute(text("""
INSERT INTO action_plans
(action_type, description, status, priority, metadata_json, created_at)
VALUES ('openclaw_recommendation', :desc, 'pending', :priority, :meta, NOW())
"""), {
"desc": action[:500],
"priority": i + 1,
"meta": json.dumps({"source_insight_id": source_insight_id, "created_by": "openclaw"}),
})
session.commit()
logger.info("[OpenClaw] action_plans 寫入 %d", len(actions[:10]))
except Exception as e:
logger.warning("[OpenClaw] action_plans 寫入失敗: %s", e)
session.rollback()
finally:
session.close()
# ═══════════════════════════════════════════════════════════════════════════════
# Gemini 呼叫層
# ═══════════════════════════════════════════════════════════════════════════════
def _call_gemini(system_prompt: str, user_prompt: str, temperature: float = 0.4) -> Optional[str]:
"""呼叫 Gemini回傳文字失敗回傳 None"""
if not GEMINI_API_KEY:
logger.warning("[OpenClaw] GEMINI_API_KEY 未設定")
return None
try:
import google.generativeai as genai
genai.configure(api_key=GEMINI_API_KEY)
model = genai.GenerativeModel(
model_name=STRATEGY_MODEL,
generation_config=genai.types.GenerationConfig(
temperature=temperature,
max_output_tokens=4096,
),
system_instruction=system_prompt,
)
response = model.generate_content(
user_prompt,
request_options={"timeout": 180},
)
return response.text or ""
except Exception as e:
logger.error("[OpenClaw] Gemini 呼叫失敗: %s", e)
return None
def _call_nvidia_nim(system_prompt: str, user_prompt: str, temperature: float = 0.5) -> Optional[str]:
"""Gemini 離線時備援 NVIDIA NIM回傳文字失敗回傳 None"""
if not NVIDIA_API_KEY:
return None
try:
resp = requests.post(
NVIDIA_NIM_URL,
headers={
"Authorization": f"Bearer {NVIDIA_API_KEY}",
"Content-Type": "application/json",
},
json={
"model": NVIDIA_FALLBACK_MODEL,
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
],
"temperature": temperature,
"max_tokens": 1024,
},
timeout=60,
)
resp.raise_for_status()
return resp.json()["choices"][0]["message"]["content"]
except Exception as e:
logger.error("[OpenClaw] NVIDIA NIM 呼叫失敗: %s", e)
return None
# ═══════════════════════════════════════════════════════════════════════════════
# Telegram 推播
# ═══════════════════════════════════════════════════════════════════════════════
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> None:
try:
from services.telegram_templates import report as tpl_report, _send_telegram_raw
# Telegram 訊息長度限制 4096分段發送
header = tpl_report(title, report_type, period, "")
chunks = _split_message(content, max_len=3800 - len(header))
for i, chunk in enumerate(chunks):
msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk
_send_telegram_raw(msg)
except Exception as e:
logger.error("[OpenClaw] Telegram 推播失敗: %s", e)
def _split_message(text: str, max_len: int = 3800) -> List[str]:
if len(text) <= max_len:
return [text]
chunks = []
while text:
chunks.append(text[:max_len])
text = text[max_len:]
return chunks
# ═══════════════════════════════════════════════════════════════════════════════
# 主要公開函式
# ═══════════════════════════════════════════════════════════════════════════════
def generate_weekly_strategy_report(
context: Optional[Any] = None,
force_tg_alert: bool = False,
) -> dict:
"""
OpenClaw 全景電商週報(每週一 06:00
流程:
1. 讀取 DB業績 / 競品 / 威脅 / 建議 / 品類
2. MCP 收集:外部市場趨勢 / 節日 / 競品動態
3. Gemini 2.5 Flash 深度分析
4. 持久化 → ai_insights + action_plans
5. Telegram 推播
"""
now = datetime.now()
period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})"
logger.info("[OpenClaw] 週報任務啟動 period=%s", period)
# ── Step 1DB 數據收集 ──────────────────────────────────────────────────
sales = _fetch_sales_summary(14)
threats = _fetch_top_threats(10)
recommendations = _fetch_top_recommendations(10)
categories = _fetch_category_breakdown(7)
competitor_summary = _fetch_competitor_summary()
# ── Step 2MCP 外部情報 ─────────────────────────────────────────────────
mcp_data: Dict[str, str] = {}
try:
from services.mcp_collector_service import mcp_collector
mcp_data = mcp_collector.collect_all()
holiday_ctx = mcp_collector.get_holiday_context()
seasonal_ctx = mcp_collector.get_seasonal_context()
except Exception as e:
logger.warning("[OpenClaw] MCP 收集失敗(非阻塞): %s", e)
holiday_ctx = ""
seasonal_ctx = ""
# ── Step 3組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw一位台灣頂尖電商戰略分析師專精於 momo 購物平台。
你的任務是根據真實業績數據、競品情報、外部市場趨勢,產出一份具體可執行的週報。
語言規定:
- 所有輸出必須使用繁體中文(台灣用語)
- 數字格式:金額用 NT$ 標示百分比保留1位小數
- 語氣:專業但不失親切,適合匯報給電商運營主管
分析原則:
- 每個洞察必須有數據支撐,禁止憑空推測
- 建議必須具體(時間、對象、行動、預期效益)
- 優先關注「可在 48 小時內執行」的行動項目"""
db_section = f"""
【DB 即時數據】
業績概況:
本週營收NT${sales.get('current_7d_revenue', 0):,.0f}
前週營收NT${sales.get('prev_7d_revenue', 0):,.0f}
週成長率:{sales.get('wow_pct', 0):+.1f}%
競品比對概況:
監控SKU總數{competitor_summary.get('total_skus', 0)}
平均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
被競品削價數:{competitor_summary.get('undercut_count', 0)}
我方具優勢數:{competitor_summary.get('premium_count', 0)}
TOP 威脅品項近48h Hermes 偵測):
{_format_threats(threats)}
待處理定價建議:
{_format_recommendations(recommendations)}
品類業績分佈(本週):
{_format_categories(categories)}
"""
mcp_section = f"""
【MCP 外部情報】
市場趨勢:
{mcp_data.get('market_trends', '(未取得)')[:600]}
競品動態:
{mcp_data.get('competitor_intel', '(未取得)')[:500]}
消費者情緒:
{mcp_data.get('consumer_sentiment', '(未取得)')[:400]}
定價策略參考:
{mcp_data.get('pricing_strategy', '(未取得)')[:400]}
節日行事曆:
{holiday_ctx}
{mcp_data.get('holiday_calendar', '')[:300]}
季節性洞察:
{seasonal_ctx}
{mcp_data.get('seasonal_insights', '')[:300]}
"""
user_prompt = f"""請根據以下數據,產出本週電商全景戰略週報:
{db_section}
{mcp_section}
請按以下結構輸出(每節使用 HTML <b> 標題,內容精簡扼要):
<b>📊 本週業績總結</b>
(關鍵指標 + WoW 變化 + 異常警示)
<b>🏆 TOP 機會品項</b>
具備提價或強推空間的品項3-5個含具體建議
<b>⚠️ TOP 威脅品項</b>
最需緊急處理的競品削價風險3-5個含建議行動
<b>💰 本週定價策略建議</b>
(整體定價方向 + 品類重點調整 + 心理定價應用)
<b>📢 行銷活動洞察</b>
(節日/季節機會 + 推薦活動形式 + 投放時機)
<b>📦 品類熱度分析</b>
(成長品類 vs 衰退品類 + 庫存備貨建議)
<b>🔮 市場競爭洞察</b>
(競品最新動態 + 平台策略差異 + 我方應對)
<b>🎯 48小時優先行動清單</b>
5-8條具體可執行任務格式[優先度] 行動說明 → 預期效益)
<b>📈 下週展望</b>
(風險提示 + 機會預告 + 需人工決策事項)
重要:語言必須是繁體中文,數據必須引用上方提供的實際數字。
"""
# ── Step 4Gemini 生成 ───────────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Gemini %s 生成週報...", STRATEGY_MODEL)
report_content = _call_gemini(system_prompt, user_prompt, temperature=0.35)
if not report_content:
logger.error("[OpenClaw] Gemini 週報生成失敗")
return {"status": "error", "report_type": "weekly_strategy", "error": "Gemini 呼叫失敗"}
# ── Step 5解析行動清單 ─────────────────────────────────────────────────
action_items = _extract_action_items(report_content)
# ── Step 6持久化 DB ────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"wow_pct": sales.get("wow_pct", 0),
"threat_count": len(threats),
"recommendation_count": len(recommendations),
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
"action_count": len(action_items),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="weekly_strategy",
content=report_content,
confidence=0.88,
metadata=metadata,
period=now.strftime("%Y-%W"),
)
_save_action_items(action_items, insight_id)
# ── Step 7Telegram 推播 ────────────────────────────────────────────────
if force_tg_alert or True:
_send_strategy_telegram(
title="OpenClaw 電商全景週報",
report_type="weekly_strategy",
period=period,
content=report_content,
)
logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items))
return {
"status": "ok",
"report_type": "weekly_strategy",
"insight_id": insight_id,
"period": period,
"action_count": len(action_items),
"summary": report_content[:300],
}
def generate_daily_report() -> dict:
"""
OpenClaw 電商日報(每日 09:00
流程:
1. 讀取昨日業績快照 + TOP 競品威脅 + 定價建議
2. Gemini 快速日報分析(溫度 0.3,精簡版)
3. 生成圖表近7日營收趨勢 + 競品價差柱圖
4. 持久化 ai_insightstype='daily_report'
5. Telegram 圖文推播
"""
now = datetime.now()
yesterday = now - timedelta(days=1)
period = yesterday.strftime("%Y年%m月%d")
logger.info("[OpenClaw] 日報任務啟動 period=%s", period)
# ── Step 1DB 數據收集 ──────────────────────────────────────────────────
sales = _fetch_sales_summary(7)
threats = _fetch_top_threats(5)
recommendations = _fetch_top_recommendations(5)
competitor_summary = _fetch_competitor_summary()
# 昨日單日業績
yesterday_sales = _fetch_yesterday_sales()
# ── Step 2組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw 日報分析師,負責每日電商業績快報。
語言:繁體中文(台灣用語)。風格:精簡、數字導向、可執行。
每個洞察必須有數字支撐,禁止空泛描述。"""
user_prompt = f"""請根據以下數據,產出今日電商日報({period}
【昨日業績】
銷售金額NT${yesterday_sales.get('revenue', 0):,.0f}
成交SKU數{yesterday_sales.get('sku_count', 0)}
訂單數:{yesterday_sales.get('order_count', 0)}
【近7日趨勢】
本週累計NT${sales.get('current_7d_revenue', 0):,.0f}
前週同期NT${sales.get('prev_7d_revenue', 0):,.0f}
WoW變化{sales.get('wow_pct', 0):+.1f}%
【競品警示近24h Hermes偵測
{_format_threats(threats)}
【待處理定價建議TOP 5
{_format_recommendations(recommendations)}
【競品整體概況】
監控SKU{competitor_summary.get('total_skus', 0)}
被削價風險:{competitor_summary.get('undercut_count', 0)}價差超過10%
平均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
請按以下結構輸出(使用 HTML <b> 標題):
<b>📅 {period} 電商日報</b>
<b>📊 昨日業績快報</b>
(昨日關鍵數字 + 與近期均值比較 + 異常點說明)
<b>⚠️ 今日最高優先威脅</b>
(最緊急的 1-3 個競品削價威脅,含具體行動建議)
<b>💰 今日定價行動建議</b>
1-3 條今日應執行的調價動作格式SKU → 建議行動 → 預期效果)
<b>🎯 今日 3 件事</b>
(最重要的 3 件可執行任務24h內完成
語言繁體中文全文200字以內精準扼要。
"""
# ── Step 3Gemini 生成 ───────────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Gemini 生成日報...")
report_content = _call_gemini(system_prompt, user_prompt, temperature=0.3)
if not report_content:
logger.error("[OpenClaw] 日報 Gemini 呼叫失敗")
return {"status": "error", "report_type": "daily_report", "error": "Gemini 呼叫失敗"}
# ── Step 4生成圖表 ─────────────────────────────────────────────────────
charts = []
try:
from services.chart_generator_service import (
revenue_trend_chart,
price_gap_bar_chart,
)
rev_chart = revenue_trend_chart(7, "近7日")
if rev_chart:
charts.append(("revenue_7d.png", rev_chart, "📈 近7日營收趨勢"))
if threats:
gap_chart = price_gap_bar_chart(threats, "競品價差警示TOP 5")
if gap_chart:
charts.append(("price_gap.png", gap_chart, "⚠️ 競品價差分析"))
except Exception as e:
logger.warning("[OpenClaw] 日報圖表生成失敗(非阻塞): %s", e)
# ── Step 5持久化 DB ────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"yesterday_revenue": yesterday_sales.get("revenue", 0),
"wow_pct": sales.get("wow_pct", 0),
"threat_count": len(threats),
"chart_count": len(charts),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="daily_report",
content=report_content,
confidence=0.85,
metadata=metadata,
period=yesterday.strftime("%Y-%m-%d"),
)
action_items = _extract_action_items_daily(report_content)
_save_action_items(action_items, insight_id)
# ── Step 6Telegram 推播(圖文)────────────────────────────────────────
try:
from services.telegram_templates import (
daily_report_header,
send_report_with_charts,
_get_chat_ids,
)
header = daily_report_header(
date_str=period,
revenue=yesterday_sales.get("revenue", 0),
wow=sales.get("wow_pct", 0),
threat_count=len(threats),
opportunity_count=competitor_summary.get("premium_count", 0),
)
full_msg = header + "\n\n" + report_content
if charts:
send_report_with_charts(full_msg, charts, _get_chat_ids())
else:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(full_msg)
except Exception as e:
logger.error("[OpenClaw] 日報 Telegram 推播失敗: %s", e)
logger.info("[OpenClaw] 日報完成 insight_id=%s charts=%d", insight_id, len(charts))
return {
"status": "ok",
"report_type": "daily_report",
"insight_id": insight_id,
"period": period,
"chart_count": len(charts),
"action_count": len(action_items),
}
def generate_monthly_report() -> dict:
"""
OpenClaw 電商月報每月1日 07:00
流程:
1. 讀取上月完整業績 + 品類分佈 + 競品趨勢
2. MCP 收集月度外部情報
3. Gemini 深度月度分析(完整版)
4. 生成圖表:月度概覽 + 品類橫條 + 價格熱圖
5. 持久化 ai_insightstype='monthly_report'
6. Telegram 圖文推播
"""
now = datetime.now()
# 上個月
first_of_this_month = now.replace(day=1)
last_month_end = first_of_this_month - timedelta(days=1)
last_month_start = last_month_end.replace(day=1)
period = last_month_end.strftime("%Y年%m月")
logger.info("[OpenClaw] 月報任務啟動 period=%s", period)
# ── Step 1DB 數據收集(上月完整數據)─────────────────────────────────
days_in_month = (first_of_this_month - last_month_start).days
sales = _fetch_monthly_sales_summary(last_month_start, last_month_end)
categories = _fetch_category_breakdown(days_in_month)
threats = _fetch_top_threats(10)
competitor_summary = _fetch_competitor_summary()
price_trend_data = _fetch_price_trend_summary(days_in_month)
# ── Step 2MCP 外部情報(月度版)───────────────────────────────────────
mcp_data: Dict[str, str] = {}
try:
from services.mcp_collector_service import mcp_collector
mcp_data = mcp_collector.collect_all()
holiday_ctx = mcp_collector.get_holiday_context()
seasonal_ctx = mcp_collector.get_seasonal_context()
except Exception as e:
logger.warning("[OpenClaw] 月報 MCP 收集失敗(非阻塞): %s", e)
holiday_ctx = ""
seasonal_ctx = ""
# ── Step 3組建 Gemini Prompt ───────────────────────────────────────────
system_prompt = """你是 OpenClaw 月報首席分析師,負責 momo 平台電商月度深度報告。
語言繁體中文台灣用語。格式HTML標題 + 條列式數據。
每個洞察必須有月度數字支撐,並與上月/去年同期比較。
重點:月度趨勢、品類策略、定價最佳化、下月行動計畫。"""
db_section = f"""
{period} 業績總覽】
月營收NT${sales.get('revenue', 0):,.0f}
MoM 變化:{sales.get('mom_pct', 0):+.1f}%
YoY 變化:{sales.get('yoy_pct', 0):+.1f}%
活躍SKU數{sales.get('sku_count', 0)}
平均客單價NT${sales.get('avg_order_value', 0):,.0f}
【品類業績分佈TOP 10
{_format_categories(categories)}
【競品整體概況】
監控SKU{competitor_summary.get('total_skus', 0)}
月均價差:{competitor_summary.get('avg_gap_pct', 0):+.1f}%
被削價風險SKU{competitor_summary.get('undercut_count', 0)}
【價格變動概況】
本月調價次數:{price_trend_data.get('price_changes', 0)}
平均調幅:{price_trend_data.get('avg_change_pct', 0):+.1f}%
主動降價SKU{price_trend_data.get('price_cuts', 0)}
主動提價SKU{price_trend_data.get('price_raises', 0)}
"""
mcp_section = f"""
【MCP 外部情報(月度)】
市場趨勢:
{mcp_data.get('market_trends', '(未取得)')[:600]}
競品動態:
{mcp_data.get('competitor_intel', '(未取得)')[:500]}
下月節日行事曆:
{holiday_ctx}
{mcp_data.get('holiday_calendar', '')[:400]}
季節性洞察:
{seasonal_ctx}
{mcp_data.get('seasonal_insights', '')[:400]}
"""
user_prompt = f"""請根據以下數據,產出 {period} 電商月度策略報告:
{db_section}
{mcp_section}
請按以下結構輸出(使用 HTML <b> 標題,詳細分析):
<b>📅 {period} 電商月度報告</b>
<b>📊 月度業績總結</b>
(月營收 + MoM/YoY 變化 + 超出/低於預期分析 + 關鍵驅動因素)
<b>🏆 本月品類贏家 vs 輸家</b>
成長最快3個品類 vs 衰退最嚴重3個品類含原因分析
<b>💰 本月定價策略回顧</b>
(調價效果評估 + 最佳定價案例 + 失誤案例 + 改進建議)
<b>⚔️ 競品月度分析</b>
(主要競爭對手動態 + 我方優劣勢 + 市場份額評估)
<b>📢 行銷效益評估</b>
(本月活動效果 + ROI 估算 + 最佳行銷時機分析)
<b>🔮 下月趨勢預測</b>
(季節性機會 + 節日活動規劃 + 風險預警 + 庫存建議)
<b>🎯 下月優先行動計畫</b>
8-10條具體可執行任務含時間節點和負責方向
格式:[週次/日期] 行動說明 → 預期效益)
<b>📈 Q{((now.month-1)//3)+1} 策略展望</b>
(季度目標設定 + 關鍵里程碑 + 需人工決策事項)
語言:繁體中文,數據必須引用上方提供的實際數字。
"""
# ── Step 4Gemini 生成 ───────────────────────────────────────────────────
logger.info("[OpenClaw] 呼叫 Gemini 生成月報...")
report_content = _call_gemini(system_prompt, user_prompt, temperature=0.35)
if not report_content:
logger.error("[OpenClaw] 月報 Gemini 呼叫失敗")
return {"status": "error", "report_type": "monthly_report", "error": "Gemini 呼叫失敗"}
# ── Step 5生成圖表 ─────────────────────────────────────────────────────
charts = []
try:
from services.chart_generator_service import (
monthly_overview_chart,
category_revenue_chart,
price_history_heatmap,
)
overview_chart = monthly_overview_chart(6)
if overview_chart:
charts.append(("monthly_overview.png", overview_chart, f"📊 近6個月營收趨勢"))
cat_chart = category_revenue_chart(days_in_month, period)
if cat_chart:
charts.append(("category_revenue.png", cat_chart, "🏆 品類業績分佈"))
heatmap = price_history_heatmap(days_in_month)
if heatmap:
charts.append(("price_heatmap.png", heatmap, "🔥 品類價格熱圖"))
except Exception as e:
logger.warning("[OpenClaw] 月報圖表生成失敗(非阻塞): %s", e)
# ── Step 6持久化 DB ────────────────────────────────────────────────────
action_items = _extract_action_items(report_content)
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"monthly_revenue": sales.get("revenue", 0),
"mom_pct": sales.get("mom_pct", 0),
"yoy_pct": sales.get("yoy_pct", 0),
"category_count": len(categories),
"chart_count": len(charts),
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
"action_count": len(action_items),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="monthly_report",
content=report_content,
confidence=0.90,
metadata=metadata,
period=last_month_end.strftime("%Y-%m"),
)
_save_action_items(action_items, insight_id)
# ── Step 7Telegram 推播(圖文)────────────────────────────────────────
try:
from services.telegram_templates import (
monthly_report_header,
send_report_with_charts,
_get_chat_ids,
)
top3 = [c.get("category", "N/A") for c in categories[:3]] or ["N/A"]
header = monthly_report_header(
month_str=period,
revenue=sales.get("revenue", 0),
mom=sales.get("mom_pct", 0),
yoy=sales.get("yoy_pct", 0),
top3_categories=top3,
)
full_msg = header + "\n\n" + report_content
if charts:
send_report_with_charts(full_msg, charts, _get_chat_ids())
else:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(full_msg)
except Exception as e:
logger.error("[OpenClaw] 月報 Telegram 推播失敗: %s", e)
logger.info("[OpenClaw] 月報完成 insight_id=%s charts=%d actions=%d",
insight_id, len(charts), len(action_items))
return {
"status": "ok",
"report_type": "monthly_report",
"insight_id": insight_id,
"period": period,
"chart_count": len(charts),
"action_count": len(action_items),
}
def generate_meta_analysis_report() -> str:
"""
AI 系統效能自我審視(每 6 小時 run_openclaw_meta_analysis_task 呼叫)
分析 ai_insights 近期累積資料,評估:
- 各 Agent 預測準確率
- 價格建議執行率
- 告警品質與誤報率
- 系統盲區與改進方向
結果持久化至 ai_insightstype='meta_analysis'),並推播 Telegram。
"""
now = datetime.now()
period = now.strftime("%Y-%m-%d %H:00")
logger.info("[OpenClaw] Meta-Analysis 任務啟動 %s", period)
# ── 讀取近期 ai_insights 摘要 ────────────────────────────────────────────
session = get_session()
try:
stats = session.execute(text("""
SELECT
insight_type,
created_by,
COUNT(*) AS total,
AVG(confidence) AS avg_conf,
SUM(CASE WHEN status='active' THEN 1 ELSE 0 END) AS active_cnt,
SUM(CASE WHEN status='relearn' THEN 1 ELSE 0 END) AS relearn_cnt,
MAX(created_at) AS latest
FROM ai_insights
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY insight_type, created_by
ORDER BY total DESC
""")).fetchall()
action_stats = session.execute(text("""
SELECT status, COUNT(*) AS cnt
FROM action_plans
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY status
""")).fetchall()
reco_stats = session.execute(text("""
SELECT status, COUNT(*) AS cnt, AVG(confidence) AS avg_conf
FROM ai_price_recommendations
WHERE created_at >= NOW() - INTERVAL '24 hours'
GROUP BY status
""")).fetchall()
except Exception as e:
logger.warning("[OpenClaw] Meta 數據讀取失敗: %s", e)
stats, action_stats, reco_stats = [], [], []
finally:
session.close()
# ── 組建 Prompt ───────────────────────────────────────────────────────────
system_prompt = """你是 OpenClaw 自我審視模組,負責分析 AI 多智能體系統的近期表現。
請用繁體中文,以電商 AI 系統架構師的視角撰寫分析報告,語氣客觀、聚焦問題與改進。"""
stats_text = "\n".join([
f" {r[0]} ({r[1]}): 共{r[2]}筆, 平均信心{r[3]:.2f}, 活躍{r[4]}, 重學{r[5]}"
for r in stats
]) or " (無近期數據)"
action_text = "\n".join([
f" {r[0]}: {r[1]}" for r in action_stats
]) or " (無近期數據)"
reco_text = "\n".join([
f" {r[0]}: {r[1]} 筆, 平均信心{r[2]:.2f}" for r in reco_stats
]) or " (無近期數據)"
user_prompt = f"""請分析以下 AI 系統近 24 小時運作數據,產出自我審視報告:
【ai_insights 產出統計】
{stats_text}
【action_plans 執行狀況】
{action_text}
【ai_price_recommendations 建議狀況】
{reco_text}
【分析時間】{period}
請按以下結構輸出:
<b>🤖 AI 系統效能自我審視報告</b>
時間:{period}
<b>📊 各 Agent 產出統計</b>
(逐一評估 Hermes/NemoTron/OpenClaw/ElephantAlpha 的輸出品質)
<b>⚠️ 偵測到的系統問題</b>
(誤報、漏報、重學事件分析)
<b>💡 盲區與改進建議</b>
(哪些場景 AI 表現不足?建議優化方向)
<b>✅ 本週期亮點</b>
(表現良好的分析案例)
<b>🔧 技術債與優化優先順序</b>
1-3 項具體技術改進建議)
語言繁體中文200字以內精簡扼要。
"""
# ── Gemini 生成 ──────────────────────────────────────────────────────────
report_content = _call_gemini(system_prompt, user_prompt, temperature=0.3)
if not report_content:
logger.error("[OpenClaw] Meta-Analysis Gemini 呼叫失敗")
return "Meta-Analysis 生成失敗)"
# ── 持久化 DB ─────────────────────────────────────────────────────────────
metadata = {
"period": period,
"model": STRATEGY_MODEL,
"insight_types_analyzed": len(stats),
"generated_at": now.isoformat(),
}
insight_id = _save_to_ai_insights(
insight_type="meta_analysis",
content=report_content,
confidence=0.85,
metadata=metadata,
period=now.strftime("%Y-%m-%d"),
)
# ── Telegram 推播 ────────────────────────────────────────────────────────
try:
from services.telegram_templates import _send_telegram_raw
_send_telegram_raw(report_content)
except Exception as e:
logger.error("[OpenClaw] Meta-Analysis Telegram 推播失敗: %s", e)
logger.info("[OpenClaw] Meta-Analysis 完成 insight_id=%s", insight_id)
return report_content
# ═══════════════════════════════════════════════════════════════════════════════
# 輔助格式化函式
# ═══════════════════════════════════════════════════════════════════════════════
def _fetch_yesterday_sales() -> Dict[str, Any]:
"""昨日單日業績"""
session = get_session()
try:
row = session.execute(text("""
SELECT
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count,
COUNT(*) AS order_count
FROM daily_sales_snapshot
WHERE snapshot_date::date = CURRENT_DATE - 1
""")).fetchone()
if row:
return {
"revenue": float(row[0] or 0),
"sku_count": int(row[1] or 0),
"order_count": int(row[2] or 0),
}
return {"revenue": 0, "sku_count": 0, "order_count": 0}
except Exception as e:
logger.warning("[OpenClaw] 昨日業績讀取失敗: %s", e)
return {"revenue": 0, "sku_count": 0, "order_count": 0}
finally:
session.close()
def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Dict[str, Any]:
"""上月業績彙總,含 MoM / YoY 比較"""
session = get_session()
try:
row = session.execute(text("""
SELECT
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
COUNT(DISTINCT "商品ID") AS sku_count,
COUNT(*) AS order_count,
AVG(COALESCE("銷售金額"::numeric, 0)) AS avg_order_value
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": start_date.date(), "end": end_date.date()}).fetchone()
revenue = float(row[0] or 0) if row else 0
sku_count = int(row[1] or 0) if row else 0
avg_order_value = float(row[3] or 0) if row else 0
# 上上月MoM
prev_start = (start_date - timedelta(days=1)).replace(day=1)
prev_end = start_date - timedelta(days=1)
prev_row = session.execute(text("""
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": prev_start.date(), "end": prev_end.date()}).fetchone()
prev_revenue = float(prev_row[0] or 0) if prev_row else 0
mom_pct = ((revenue - prev_revenue) / prev_revenue * 100) if prev_revenue else 0
# 去年同月YoY
yoy_start = start_date.replace(year=start_date.year - 1)
yoy_end = end_date.replace(year=end_date.year - 1)
yoy_row = session.execute(text("""
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
FROM daily_sales_snapshot
WHERE snapshot_date::date BETWEEN :start AND :end
"""), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone()
yoy_revenue = float(yoy_row[0] or 0) if yoy_row else 0
yoy_pct = ((revenue - yoy_revenue) / yoy_revenue * 100) if yoy_revenue else 0
return {
"revenue": revenue,
"sku_count": sku_count,
"order_count": int(row[2] or 0) if row else 0,
"avg_order_value": avg_order_value,
"mom_pct": round(mom_pct, 1),
"yoy_pct": round(yoy_pct, 1),
}
except Exception as e:
logger.warning("[OpenClaw] 月度業績讀取失敗: %s", e)
return {"revenue": 0, "sku_count": 0, "order_count": 0,
"avg_order_value": 0, "mom_pct": 0, "yoy_pct": 0}
finally:
session.close()
def _fetch_price_trend_summary(days: int = 30) -> Dict[str, Any]:
"""近N天價格異動統計"""
session = get_session()
try:
row = session.execute(text("""
SELECT
COUNT(*) AS total_changes,
AVG(ABS(pr2.price - pr1.price) / pr1.price * 100) AS avg_change_pct,
SUM(CASE WHEN pr2.price < pr1.price THEN 1 ELSE 0 END) AS price_cuts,
SUM(CASE WHEN pr2.price > pr1.price THEN 1 ELSE 0 END) AS price_raises
FROM price_records pr2
JOIN price_records pr1 ON pr1.product_id = pr2.product_id
AND pr1.timestamp = (
SELECT MAX(timestamp) FROM price_records
WHERE product_id = pr2.product_id
AND timestamp < pr2.timestamp - INTERVAL '1 day'
)
WHERE pr2.timestamp >= NOW() - :days * INTERVAL '1 day'
AND ABS(pr2.price - pr1.price) / pr1.price > 0.005
"""), {"days": days}).fetchone()
if row and row[0]:
return {
"price_changes": int(row[0]),
"avg_change_pct": round(float(row[1] or 0), 1),
"price_cuts": int(row[2] or 0),
"price_raises": int(row[3] or 0),
}
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
except Exception as e:
logger.warning("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
finally:
session.close()
def _extract_action_items_daily(report_text: str) -> List[str]:
"""從日報文字中解析「今日3件事」行動項目"""
lines = report_text.split("\n")
items = []
in_action_section = False
for line in lines:
if "今日" in line and ("3件" in line or "行動" in line or "優先" in line):
in_action_section = True
continue
if in_action_section:
stripped = line.strip()
if stripped.startswith(("", "-", "1.", "2.", "3.", "", "", "")):
items.append(stripped.lstrip("•-①②③").strip().lstrip("123.").strip())
elif stripped.startswith("<b>") and items:
break
return items[:5]
def _format_threats(threats: List[Dict]) -> str:
if not threats:
return " (無近期競價威脅)"
lines = []
for t in threats[:5]:
lines.append(
f" • SKU {t['sku']}:價差 {t.get('gap_pct', 0):+.1f}%"
f"業績週變化 {t.get('sales_delta', 0):+.1f}%"
f"信心 {t.get('confidence', 0):.2f}"
)
return "\n".join(lines)
def _format_recommendations(recs: List[Dict]) -> str:
if not recs:
return " (無待處理定價建議)"
lines = []
for r in recs[:5]:
lines.append(
f"{r.get('name', r.get('sku', ''))[:30]}{r.get('strategy', '')}"
f"信心 {r.get('confidence', 0):.2f}"
)
return "\n".join(lines)
def _format_categories(cats: List[Dict]) -> str:
if not cats:
return " (無品類數據)"
lines = []
for c in cats[:5]:
lines.append(
f"{c.get('category', '未分類')}"
f"NT${c.get('revenue', 0):,.0f}{c.get('sku_count', 0)} 個 SKU"
)
return "\n".join(lines)
def _extract_action_items(report_text: str) -> List[str]:
"""從報告文字中解析行動清單48小時優先行動"""
lines = report_text.split("\n")
items = []
in_action_section = False
for line in lines:
if "48小時" in line or "優先行動" in line:
in_action_section = True
continue
if in_action_section:
stripped = line.strip()
if stripped.startswith("") or stripped.startswith("-") or stripped.startswith("["):
items.append(stripped.lstrip("•-").strip())
elif stripped.startswith("<b>") and items:
break
return items[:8]