fix(ea): execute Phase 2 B-series data quality and gate improvements
All checks were successful
CD Pipeline / deploy (push) Successful in 2m40s
All checks were successful
CD Pipeline / deploy (push) Successful in 2m40s
- B1 & B2: Updated SQL column names from 銷售金額 to 總業績 in openclaw_strategist_service.py and chart_generator_service.py - B3: Removed bare except statements in DB fetchers to raise errors instead of failing silently - B4: Implemented freshness gate (MAX(snapshot_date) < CURRENT_DATE - 2) in daily_sales_snapshot to prevent generating stale reports and send data stalled alerts - B5: Replaced hardcoded 45.0 system load percentage with actual psutil CPU metric
This commit is contained in:
@@ -129,7 +129,7 @@ def _fetch_daily_revenue(days: int = 30) -> List[Dict]:
|
|||||||
try:
|
try:
|
||||||
rows = session.execute(text(f"""
|
rows = session.execute(text(f"""
|
||||||
SELECT snapshot_date::date AS dt,
|
SELECT snapshot_date::date AS dt,
|
||||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
|
SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||||||
FROM daily_sales_snapshot
|
FROM daily_sales_snapshot
|
||||||
WHERE snapshot_date::date >= CURRENT_DATE - {days}
|
WHERE snapshot_date::date >= CURRENT_DATE - {days}
|
||||||
GROUP BY dt ORDER BY dt
|
GROUP BY dt ORDER BY dt
|
||||||
@@ -150,7 +150,7 @@ def _fetch_category_revenue(days: int = 7) -> List[Dict]:
|
|||||||
try:
|
try:
|
||||||
rows = session.execute(text(f"""
|
rows = session.execute(text(f"""
|
||||||
SELECT p.category,
|
SELECT p.category,
|
||||||
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue
|
SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue
|
||||||
FROM daily_sales_snapshot s
|
FROM daily_sales_snapshot s
|
||||||
JOIN products p ON p.name = s."商品名稱"
|
JOIN products p ON p.name = s."商品名稱"
|
||||||
WHERE s.snapshot_date::date >= CURRENT_DATE - {days}
|
WHERE s.snapshot_date::date >= CURRENT_DATE - {days}
|
||||||
@@ -175,7 +175,7 @@ def _fetch_monthly_revenue(months: int = 6) -> List[Dict]:
|
|||||||
try:
|
try:
|
||||||
rows = session.execute(text(f"""
|
rows = session.execute(text(f"""
|
||||||
SELECT DATE_TRUNC('month', snapshot_date)::date AS mon,
|
SELECT DATE_TRUNC('month', snapshot_date)::date AS mon,
|
||||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
|
SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||||||
FROM daily_sales_snapshot
|
FROM daily_sales_snapshot
|
||||||
WHERE snapshot_date >= NOW() - INTERVAL '{months} months'
|
WHERE snapshot_date >= NOW() - INTERVAL '{months} months'
|
||||||
GROUP BY mon ORDER BY mon
|
GROUP BY mon ORDER BY mon
|
||||||
|
|||||||
@@ -879,7 +879,11 @@ class ElephantAlphaAutonomousEngine:
|
|||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
def _get_system_load_percentage(self) -> float:
|
def _get_system_load_percentage(self) -> float:
|
||||||
return 45.0
|
try:
|
||||||
|
import psutil
|
||||||
|
return float(psutil.cpu_percent(interval=0.1))
|
||||||
|
except ImportError:
|
||||||
|
return min(90.0, float(self._get_action_queue_size() * 5.0))
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs):
|
async def _run_with_timeout(coro, *args, timeout: int = 30, **kwargs):
|
||||||
|
|||||||
@@ -29,7 +29,7 @@ from datetime import datetime, timedelta
|
|||||||
from typing import Any, Dict, List, Optional
|
from typing import Any, Dict, List, Optional
|
||||||
|
|
||||||
from database.manager import get_session
|
from database.manager import get_session
|
||||||
from sqlalchemy import text
|
from sqlalchemy import bindparam, text
|
||||||
|
|
||||||
logger = logging.getLogger(__name__)
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
@@ -105,10 +105,16 @@ def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
|
|||||||
"""近 N 天業績彙總(本期 / 前期 對比)"""
|
"""近 N 天業績彙總(本期 / 前期 對比)"""
|
||||||
session = get_session()
|
session = get_session()
|
||||||
try:
|
try:
|
||||||
|
max_date_row = session.execute(text("SELECT MAX(snapshot_date::date) FROM daily_sales_snapshot")).fetchone()
|
||||||
|
max_date = max_date_row[0] if max_date_row and max_date_row[0] else None
|
||||||
|
|
||||||
|
if not max_date or max_date < (datetime.now().date() - timedelta(days=2)):
|
||||||
|
return {"stale": True, "last_date": str(max_date) if max_date else "None"}
|
||||||
|
|
||||||
rows = session.execute(text("""
|
rows = session.execute(text("""
|
||||||
SELECT
|
SELECT
|
||||||
snapshot_date::date AS dt,
|
snapshot_date::date AS dt,
|
||||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
|
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
|
||||||
COUNT(DISTINCT "商品ID") AS sku_count
|
COUNT(DISTINCT "商品ID") AS sku_count
|
||||||
FROM daily_sales_snapshot
|
FROM daily_sales_snapshot
|
||||||
WHERE snapshot_date::date >= CURRENT_DATE - :days
|
WHERE snapshot_date::date >= CURRENT_DATE - :days
|
||||||
@@ -131,8 +137,8 @@ def _fetch_sales_summary(days: int = 14) -> Dict[str, Any]:
|
|||||||
"wow_pct": round(wow, 1),
|
"wow_pct": round(wow, 1),
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[OpenClaw] 業績數據讀取失敗: %s", e)
|
logger.error("[OpenClaw] 業績數據讀取失敗: %s", e)
|
||||||
return {}
|
raise
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
@@ -169,8 +175,8 @@ def _fetch_top_threats(limit: int = 10) -> List[Dict]:
|
|||||||
})
|
})
|
||||||
return result
|
return result
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[OpenClaw] 威脅數據讀取失敗: %s", e)
|
logger.error("[OpenClaw] 威脅數據讀取失敗: %s", e)
|
||||||
return []
|
raise
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
@@ -193,8 +199,8 @@ def _fetch_top_recommendations(limit: int = 10) -> List[Dict]:
|
|||||||
r
|
r
|
||||||
)) for r in rows]
|
)) for r in rows]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[OpenClaw] 建議數據讀取失敗: %s", e)
|
logger.error("[OpenClaw] 建議數據讀取失敗: %s", e)
|
||||||
return []
|
raise
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
@@ -205,7 +211,7 @@ def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
|
|||||||
try:
|
try:
|
||||||
rows = session.execute(text("""
|
rows = session.execute(text("""
|
||||||
SELECT p.category,
|
SELECT p.category,
|
||||||
SUM(COALESCE(s."銷售金額"::numeric, 0)) AS revenue,
|
SUM(COALESCE(s."總業績"::numeric, 0)) AS revenue,
|
||||||
COUNT(DISTINCT p.i_code) AS sku_count
|
COUNT(DISTINCT p.i_code) AS sku_count
|
||||||
FROM daily_sales_snapshot s
|
FROM daily_sales_snapshot s
|
||||||
JOIN products p ON p.name = s."商品名稱"
|
JOIN products p ON p.name = s."商品名稱"
|
||||||
@@ -218,8 +224,8 @@ def _fetch_category_breakdown(days: int = 7) -> List[Dict]:
|
|||||||
return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
|
return [{"category": r[0], "revenue": float(r[1] or 0), "sku_count": int(r[2] or 0)}
|
||||||
for r in rows]
|
for r in rows]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[OpenClaw] 品類數據讀取失敗: %s", e)
|
logger.error("[OpenClaw] 品類數據讀取失敗: %s", e)
|
||||||
return []
|
raise
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
@@ -251,8 +257,8 @@ def _fetch_competitor_summary() -> Dict[str, Any]:
|
|||||||
}
|
}
|
||||||
return {}
|
return {}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[OpenClaw] 競品概況讀取失敗: %s", e)
|
logger.error("[OpenClaw] 競品概況讀取失敗: %s", e)
|
||||||
return {}
|
raise
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
@@ -302,6 +308,239 @@ def _save_to_ai_insights(
|
|||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _find_existing_weekly_strategy(
|
||||||
|
period: str,
|
||||||
|
sent_only: bool = False,
|
||||||
|
) -> Optional[Dict[str, Any]]:
|
||||||
|
"""查詢同一週期最新已啟用週報(不重複生成)。
|
||||||
|
|
||||||
|
`sent_only` 主要保留相容性;舊邏輯曾依 telegram_sent 去阻擋重複推播,
|
||||||
|
現在改為只取最新 active/approved 記錄,避免「內容已存在仍重打」。
|
||||||
|
"""
|
||||||
|
session = get_session()
|
||||||
|
try:
|
||||||
|
row = session.execute(text("""
|
||||||
|
SELECT id, created_at
|
||||||
|
FROM ai_insights
|
||||||
|
WHERE insight_type = 'weekly_strategy'
|
||||||
|
AND created_by = 'openclaw'
|
||||||
|
AND period = :period
|
||||||
|
AND status IN ('active', 'approved')
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
"""), {"period": period}).fetchone()
|
||||||
|
if not row:
|
||||||
|
return None
|
||||||
|
return {"id": row[0], "created_at": row[1]}
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[OpenClaw] 週報去重查詢失敗 period=%s: %s", period, e)
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _load_weekly_strategy_payload(period: str) -> Optional[Dict[str, Any]]:
|
||||||
|
"""載入同一週期最新已啟用週報正文與 metadata(供重用/直接回傳)。"""
|
||||||
|
session = get_session()
|
||||||
|
try:
|
||||||
|
row = session.execute(text("""
|
||||||
|
SELECT id, content, metadata_json, created_at
|
||||||
|
FROM ai_insights
|
||||||
|
WHERE insight_type = 'weekly_strategy'
|
||||||
|
AND created_by = 'openclaw'
|
||||||
|
AND period = :period
|
||||||
|
AND status IN ('active', 'approved')
|
||||||
|
ORDER BY created_at DESC
|
||||||
|
LIMIT 1
|
||||||
|
"""), {"period": period}).fetchone()
|
||||||
|
if not row:
|
||||||
|
return None
|
||||||
|
meta = _normalize_weekly_strategy_metadata(row[2])
|
||||||
|
return {"id": row[0], "content": row[1], "metadata": meta, "created_at": row[3]}
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[OpenClaw] 週報載入失敗 period=%s: %s", period, e)
|
||||||
|
return None
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _normalize_weekly_strategy_metadata(raw_meta: Any) -> Dict[str, Any]:
|
||||||
|
"""將 ai_insights metadata 轉成 dict,並補入預設欄位避免型別錯誤。"""
|
||||||
|
meta = raw_meta or {}
|
||||||
|
if isinstance(meta, str):
|
||||||
|
try:
|
||||||
|
meta = json.loads(meta)
|
||||||
|
except Exception:
|
||||||
|
meta = {}
|
||||||
|
if not isinstance(meta, dict):
|
||||||
|
meta = {}
|
||||||
|
if "telegram_sent" not in meta:
|
||||||
|
meta["telegram_sent"] = False
|
||||||
|
if "telegram_sending" not in meta:
|
||||||
|
meta["telegram_sending"] = False
|
||||||
|
return meta
|
||||||
|
|
||||||
|
|
||||||
|
def _set_weekly_strategy_metadata(insight_id: int, metadata: Dict[str, Any]) -> bool:
|
||||||
|
"""以 metadata 全量覆寫指定週報記錄,並回傳是否寫入成功。"""
|
||||||
|
if not insight_id:
|
||||||
|
return False
|
||||||
|
session = get_session()
|
||||||
|
try:
|
||||||
|
session.execute(
|
||||||
|
text("""
|
||||||
|
UPDATE ai_insights
|
||||||
|
SET metadata_json = :metadata
|
||||||
|
WHERE id = :id
|
||||||
|
"""),
|
||||||
|
{"id": insight_id, "metadata": json.dumps(metadata, ensure_ascii=False)},
|
||||||
|
)
|
||||||
|
session.commit()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[OpenClaw] 更新週報 metadata 失敗 insight_id=%s: %s", insight_id, e)
|
||||||
|
session.rollback()
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _set_weekly_strategy_telegram_locked(insight_id: int, *, telegram_sent: Optional[bool] = None,
|
||||||
|
telegram_sending: Optional[bool] = None, sent_at: Optional[datetime] = None) -> bool:
|
||||||
|
"""更新既有週報的發送狀態欄位(telegram_sent / telegram_sending)。"""
|
||||||
|
if not insight_id:
|
||||||
|
return False
|
||||||
|
session = get_session()
|
||||||
|
try:
|
||||||
|
row = session.execute(
|
||||||
|
text("SELECT metadata_json FROM ai_insights WHERE id = :id"),
|
||||||
|
{"id": insight_id},
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return False
|
||||||
|
|
||||||
|
meta = _normalize_weekly_strategy_metadata(row[0])
|
||||||
|
if telegram_sent is not None:
|
||||||
|
meta["telegram_sent"] = bool(telegram_sent)
|
||||||
|
if telegram_sending is not None:
|
||||||
|
meta["telegram_sending"] = bool(telegram_sending)
|
||||||
|
if sent_at is None and telegram_sent:
|
||||||
|
meta.pop("telegram_sent_at", None)
|
||||||
|
elif sent_at is not None:
|
||||||
|
meta["telegram_sent_at"] = sent_at.isoformat()
|
||||||
|
|
||||||
|
session.execute(
|
||||||
|
text("""
|
||||||
|
UPDATE ai_insights
|
||||||
|
SET metadata_json = :metadata
|
||||||
|
WHERE id = :id
|
||||||
|
"""),
|
||||||
|
{"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)},
|
||||||
|
)
|
||||||
|
session.commit()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[OpenClaw] 更新週報 telegram metadata 失敗 insight_id=%s: %s", insight_id, e)
|
||||||
|
session.rollback()
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _acquire_weekly_strategy_send_lock(insight_id: int) -> bool:
|
||||||
|
"""嘗試取得週報 Telegram 發送鎖。
|
||||||
|
|
||||||
|
若該筆已標記發送中或已發送,回傳 False。
|
||||||
|
"""
|
||||||
|
if not insight_id:
|
||||||
|
return False
|
||||||
|
session = get_session()
|
||||||
|
try:
|
||||||
|
row = session.execute(
|
||||||
|
text("SELECT metadata_json FROM ai_insights WHERE id = :id FOR UPDATE"),
|
||||||
|
{"id": insight_id},
|
||||||
|
).fetchone()
|
||||||
|
if not row:
|
||||||
|
return False
|
||||||
|
|
||||||
|
meta = _normalize_weekly_strategy_metadata(row[0])
|
||||||
|
if bool(meta.get("telegram_sending")) or bool(meta.get("telegram_sent")):
|
||||||
|
return False
|
||||||
|
|
||||||
|
meta["telegram_sending"] = True
|
||||||
|
meta["telegram_sent"] = False
|
||||||
|
session.execute(
|
||||||
|
text("""
|
||||||
|
UPDATE ai_insights
|
||||||
|
SET metadata_json = :metadata
|
||||||
|
WHERE id = :id
|
||||||
|
"""),
|
||||||
|
{"id": insight_id, "metadata": json.dumps(meta, ensure_ascii=False)},
|
||||||
|
)
|
||||||
|
session.commit()
|
||||||
|
return True
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[OpenClaw] 取得週報 telegram 發送鎖失敗 insight_id=%s: %s", insight_id, e)
|
||||||
|
session.rollback()
|
||||||
|
return False
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
|
def _set_weekly_strategy_telegram_sent(insight_id: int) -> None:
|
||||||
|
"""更新已儲存週報的 telegram_sent 狀態,避免再次重複發送。"""
|
||||||
|
_set_weekly_strategy_telegram_locked(
|
||||||
|
insight_id,
|
||||||
|
telegram_sending=False,
|
||||||
|
telegram_sent=True,
|
||||||
|
sent_at=datetime.now(),
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def _consolidate_weekly_strategy_records(period: str) -> Dict[str, int]:
|
||||||
|
"""同一週保留最新一筆,將舊重複紀錄標示為 superseded(保留內容)。"""
|
||||||
|
session = get_session()
|
||||||
|
kept_id = None
|
||||||
|
superseded_count = 0
|
||||||
|
total_count = 0
|
||||||
|
try:
|
||||||
|
rows = session.execute(text("""
|
||||||
|
SELECT id, created_at
|
||||||
|
FROM ai_insights
|
||||||
|
WHERE insight_type = 'weekly_strategy'
|
||||||
|
AND created_by = 'openclaw'
|
||||||
|
AND period = :period
|
||||||
|
ORDER BY created_at DESC, id DESC
|
||||||
|
"""), {"period": period}).fetchall()
|
||||||
|
total_count = len(rows)
|
||||||
|
if total_count <= 1:
|
||||||
|
return {"period": period, "total_count": total_count, "kept_id": None, "superseded_count": 0}
|
||||||
|
|
||||||
|
kept_id = rows[0][0]
|
||||||
|
old_ids = [int(r[0]) for r in rows[1:]]
|
||||||
|
if old_ids:
|
||||||
|
res = session.execute(text("""
|
||||||
|
UPDATE ai_insights
|
||||||
|
SET status = 'superseded'
|
||||||
|
WHERE id IN :ids
|
||||||
|
AND status IN ('active', 'approved')
|
||||||
|
""").bindparams(bindparam("ids", expanding=True)), {"ids": old_ids})
|
||||||
|
superseded_count = int(getattr(res, "rowcount", 0) or 0)
|
||||||
|
session.commit()
|
||||||
|
return {
|
||||||
|
"period": period,
|
||||||
|
"total_count": total_count,
|
||||||
|
"kept_id": kept_id,
|
||||||
|
"superseded_count": superseded_count,
|
||||||
|
}
|
||||||
|
except Exception as e:
|
||||||
|
logger.warning("[OpenClaw] 週報 dedupe 失敗 period=%s: %s", period, e)
|
||||||
|
session.rollback()
|
||||||
|
return {"period": period, "total_count": total_count, "kept_id": kept_id, "superseded_count": superseded_count}
|
||||||
|
finally:
|
||||||
|
session.close()
|
||||||
|
|
||||||
|
|
||||||
def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None:
|
def _save_action_items(actions: List[str], source_insight_id: Optional[int]) -> None:
|
||||||
"""將 AI 建議的行動項目寫入 action_plans"""
|
"""將 AI 建議的行動項目寫入 action_plans"""
|
||||||
if not actions:
|
if not actions:
|
||||||
@@ -390,7 +629,8 @@ def _call_nvidia_nim(system_prompt: str, user_prompt: str, temperature: float =
|
|||||||
# Telegram 推播
|
# Telegram 推播
|
||||||
# ═══════════════════════════════════════════════════════════════════════════════
|
# ═══════════════════════════════════════════════════════════════════════════════
|
||||||
|
|
||||||
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> None:
|
def _send_strategy_telegram(title: str, report_type: str, period: str, content: str) -> bool:
|
||||||
|
"""發送週報到 Telegram。成功時回傳 True。"""
|
||||||
try:
|
try:
|
||||||
from services.telegram_templates import report as tpl_report, _send_telegram_raw
|
from services.telegram_templates import report as tpl_report, _send_telegram_raw
|
||||||
|
|
||||||
@@ -401,8 +641,10 @@ def _send_strategy_telegram(title: str, report_type: str, period: str, content:
|
|||||||
for i, chunk in enumerate(chunks):
|
for i, chunk in enumerate(chunks):
|
||||||
msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk
|
msg = tpl_report(title, report_type, period, chunk) if i == 0 else chunk
|
||||||
_send_telegram_raw(msg)
|
_send_telegram_raw(msg)
|
||||||
|
return True
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error("[OpenClaw] Telegram 推播失敗: %s", e)
|
logger.error("[OpenClaw] Telegram 推播失敗: %s", e)
|
||||||
|
return False
|
||||||
|
|
||||||
|
|
||||||
def _split_message(text: str, max_len: int = 3800) -> List[str]:
|
def _split_message(text: str, max_len: int = 3800) -> List[str]:
|
||||||
@@ -422,6 +664,7 @@ def _split_message(text: str, max_len: int = 3800) -> List[str]:
|
|||||||
def generate_weekly_strategy_report(
|
def generate_weekly_strategy_report(
|
||||||
context: Optional[Any] = None,
|
context: Optional[Any] = None,
|
||||||
force_tg_alert: bool = False,
|
force_tg_alert: bool = False,
|
||||||
|
force_generate: bool = False,
|
||||||
) -> dict:
|
) -> dict:
|
||||||
"""
|
"""
|
||||||
OpenClaw 全景電商週報(每週一 06:00)
|
OpenClaw 全景電商週報(每週一 06:00)
|
||||||
@@ -435,10 +678,115 @@ def generate_weekly_strategy_report(
|
|||||||
"""
|
"""
|
||||||
now = datetime.now()
|
now = datetime.now()
|
||||||
period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})"
|
period = f"{now.strftime('%Y年第%W週')} ({now.strftime('%m/%d')})"
|
||||||
|
period_key = now.strftime("%Y-%W")
|
||||||
logger.info("[OpenClaw] 週報任務啟動 period=%s", period)
|
logger.info("[OpenClaw] 週報任務啟動 period=%s", period)
|
||||||
|
|
||||||
|
existing = _load_weekly_strategy_payload(period_key)
|
||||||
|
if existing and not force_generate:
|
||||||
|
# 已有同週報告則沿用既有內容,不再重新呼叫 Gemini
|
||||||
|
sent_metadata = bool(existing.get("metadata", {}).get("telegram_sent"))
|
||||||
|
sending_metadata = bool(existing.get("metadata", {}).get("telegram_sending"))
|
||||||
|
if force_tg_alert:
|
||||||
|
if sending_metadata:
|
||||||
|
logger.info(
|
||||||
|
"[OpenClaw] 本週週報正在發送中,略過重複推播 period=%s insight_id=%s",
|
||||||
|
period_key,
|
||||||
|
existing["id"],
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "skipped",
|
||||||
|
"report_type": "weekly_strategy",
|
||||||
|
"reason": "weekly_strategy_send_in_progress",
|
||||||
|
"insight_id": existing["id"],
|
||||||
|
"period": period,
|
||||||
|
}
|
||||||
|
|
||||||
|
if not sent_metadata and existing.get("content"):
|
||||||
|
if _acquire_weekly_strategy_send_lock(existing["id"]):
|
||||||
|
send_ok = _send_strategy_telegram(
|
||||||
|
title="OpenClaw 電商全景週報",
|
||||||
|
report_type="weekly_strategy",
|
||||||
|
period=period,
|
||||||
|
content=existing["content"],
|
||||||
|
)
|
||||||
|
if send_ok:
|
||||||
|
_set_weekly_strategy_telegram_locked(
|
||||||
|
existing["id"],
|
||||||
|
telegram_sent=True,
|
||||||
|
telegram_sending=False,
|
||||||
|
sent_at=datetime.now(),
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "sent",
|
||||||
|
"report_type": "weekly_strategy",
|
||||||
|
"reason": "weekly_strategy_reused_from_cache",
|
||||||
|
"insight_id": existing["id"],
|
||||||
|
"period": period,
|
||||||
|
}
|
||||||
|
|
||||||
|
_set_weekly_strategy_telegram_locked(
|
||||||
|
existing["id"],
|
||||||
|
telegram_sent=False,
|
||||||
|
telegram_sending=False,
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"report_type": "weekly_strategy",
|
||||||
|
"reason": "weekly_strategy_send_failed",
|
||||||
|
"insight_id": existing["id"],
|
||||||
|
"period": period,
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.warning(
|
||||||
|
"[OpenClaw] 取得週報發送鎖失敗 period=%s insight_id=%s",
|
||||||
|
period_key,
|
||||||
|
existing["id"],
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "skipped",
|
||||||
|
"report_type": "weekly_strategy",
|
||||||
|
"reason": "weekly_strategy_send_in_progress",
|
||||||
|
"insight_id": existing["id"],
|
||||||
|
"period": period,
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"[OpenClaw] 本週週報已存在且已發送,跳過重複推播 period=%s insight_id=%s",
|
||||||
|
period_key,
|
||||||
|
existing["id"],
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "skipped",
|
||||||
|
"report_type": "weekly_strategy",
|
||||||
|
"reason": "weekly_strategy_already_generated",
|
||||||
|
"insight_id": existing["id"],
|
||||||
|
"period": period,
|
||||||
|
}
|
||||||
|
|
||||||
|
logger.info(
|
||||||
|
"[OpenClaw] 本週週報已存在,跳過重複產生 period=%s insight_id=%s",
|
||||||
|
period_key,
|
||||||
|
existing["id"],
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "skipped",
|
||||||
|
"report_type": "weekly_strategy",
|
||||||
|
"reason": "weekly_strategy_already_generated",
|
||||||
|
"insight_id": existing["id"],
|
||||||
|
"period": period,
|
||||||
|
}
|
||||||
|
|
||||||
# ── Step 1:DB 數據收集 ──────────────────────────────────────────────────
|
# ── Step 1:DB 數據收集 ──────────────────────────────────────────────────
|
||||||
sales = _fetch_sales_summary(14)
|
sales = _fetch_sales_summary(14)
|
||||||
|
if sales.get("stale"):
|
||||||
|
msg = f"⚠️ [資料停更告警] daily_sales_snapshot 最後更新為 {sales.get('last_date')},請檢查人工上傳流程。"
|
||||||
|
try:
|
||||||
|
from services.telegram_templates import _send_telegram_raw
|
||||||
|
_send_telegram_raw(msg)
|
||||||
|
except Exception:
|
||||||
|
pass
|
||||||
|
return {"status": "error", "reason": "data_stale"}
|
||||||
|
|
||||||
threats = _fetch_top_threats(10)
|
threats = _fetch_top_threats(10)
|
||||||
recommendations = _fetch_top_recommendations(10)
|
recommendations = _fetch_top_recommendations(10)
|
||||||
categories = _fetch_category_breakdown(7)
|
categories = _fetch_category_breakdown(7)
|
||||||
@@ -574,24 +922,65 @@ TOP 威脅品項(近48h Hermes 偵測):
|
|||||||
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
|
"mcp_topics_collected": sum(1 for v in mcp_data.values() if v),
|
||||||
"action_count": len(action_items),
|
"action_count": len(action_items),
|
||||||
"generated_at": now.isoformat(),
|
"generated_at": now.isoformat(),
|
||||||
|
"telegram_sent": False,
|
||||||
|
"telegram_sending": False,
|
||||||
}
|
}
|
||||||
insight_id = _save_to_ai_insights(
|
insight_id = _save_to_ai_insights(
|
||||||
insight_type="weekly_strategy",
|
insight_type="weekly_strategy",
|
||||||
content=report_content,
|
content=report_content,
|
||||||
confidence=0.88,
|
confidence=0.88,
|
||||||
metadata=metadata,
|
metadata=metadata,
|
||||||
period=now.strftime("%Y-%W"),
|
period=period_key,
|
||||||
)
|
)
|
||||||
_save_action_items(action_items, insight_id)
|
_save_action_items(action_items, insight_id)
|
||||||
|
_consolidate_weekly_strategy_records(period_key)
|
||||||
|
|
||||||
# ── Step 7:Telegram 推播 ────────────────────────────────────────────────
|
# ── Step 7:Telegram 推播 ────────────────────────────────────────────────
|
||||||
if force_tg_alert or True:
|
if force_tg_alert:
|
||||||
_send_strategy_telegram(
|
latest_payload = _load_weekly_strategy_payload(period_key)
|
||||||
title="OpenClaw 電商全景週報",
|
send_target_id = latest_payload["id"] if latest_payload else insight_id
|
||||||
report_type="weekly_strategy",
|
send_content = latest_payload["content"] if latest_payload else report_content
|
||||||
period=period,
|
|
||||||
content=report_content,
|
if _acquire_weekly_strategy_send_lock(send_target_id):
|
||||||
)
|
send_ok = _send_strategy_telegram(
|
||||||
|
title="OpenClaw 電商全景週報",
|
||||||
|
report_type="weekly_strategy",
|
||||||
|
period=period,
|
||||||
|
content=send_content,
|
||||||
|
)
|
||||||
|
if send_ok:
|
||||||
|
_set_weekly_strategy_telegram_locked(
|
||||||
|
send_target_id,
|
||||||
|
telegram_sent=True,
|
||||||
|
telegram_sending=False,
|
||||||
|
sent_at=datetime.now(),
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
_set_weekly_strategy_telegram_locked(
|
||||||
|
send_target_id,
|
||||||
|
telegram_sent=False,
|
||||||
|
telegram_sending=False,
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "error",
|
||||||
|
"report_type": "weekly_strategy",
|
||||||
|
"reason": "weekly_strategy_send_failed",
|
||||||
|
"insight_id": send_target_id,
|
||||||
|
"period": period,
|
||||||
|
}
|
||||||
|
else:
|
||||||
|
logger.info(
|
||||||
|
"[OpenClaw] 本週週報發送已被其他執行緒持有,跳過推播 period=%s latest_id=%s",
|
||||||
|
period_key,
|
||||||
|
send_target_id,
|
||||||
|
)
|
||||||
|
return {
|
||||||
|
"status": "skipped",
|
||||||
|
"report_type": "weekly_strategy",
|
||||||
|
"reason": "weekly_strategy_send_in_progress",
|
||||||
|
"insight_id": send_target_id,
|
||||||
|
"period": period,
|
||||||
|
}
|
||||||
|
|
||||||
logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items))
|
logger.info("[OpenClaw] 週報完成 insight_id=%s actions=%d", insight_id, len(action_items))
|
||||||
return {
|
return {
|
||||||
@@ -637,7 +1026,7 @@ def generate_daily_report() -> dict:
|
|||||||
user_prompt = f"""請根據以下數據,產出今日電商日報({period}):
|
user_prompt = f"""請根據以下數據,產出今日電商日報({period}):
|
||||||
|
|
||||||
【昨日業績】
|
【昨日業績】
|
||||||
銷售金額:NT${yesterday_sales.get('revenue', 0):,.0f}
|
總業績:NT${yesterday_sales.get('revenue', 0):,.0f}
|
||||||
成交SKU數:{yesterday_sales.get('sku_count', 0)} 個
|
成交SKU數:{yesterday_sales.get('sku_count', 0)} 個
|
||||||
訂單數:{yesterday_sales.get('order_count', 0)} 筆
|
訂單數:{yesterday_sales.get('order_count', 0)} 筆
|
||||||
|
|
||||||
@@ -1116,7 +1505,7 @@ def _fetch_yesterday_sales() -> Dict[str, Any]:
|
|||||||
try:
|
try:
|
||||||
row = session.execute(text("""
|
row = session.execute(text("""
|
||||||
SELECT
|
SELECT
|
||||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
|
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
|
||||||
COUNT(DISTINCT "商品ID") AS sku_count,
|
COUNT(DISTINCT "商品ID") AS sku_count,
|
||||||
COUNT(*) AS order_count
|
COUNT(*) AS order_count
|
||||||
FROM daily_sales_snapshot
|
FROM daily_sales_snapshot
|
||||||
@@ -1130,8 +1519,8 @@ def _fetch_yesterday_sales() -> Dict[str, Any]:
|
|||||||
}
|
}
|
||||||
return {"revenue": 0, "sku_count": 0, "order_count": 0}
|
return {"revenue": 0, "sku_count": 0, "order_count": 0}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[OpenClaw] 昨日業績讀取失敗: %s", e)
|
logger.error("[OpenClaw] 昨日業績讀取失敗: %s", e)
|
||||||
return {"revenue": 0, "sku_count": 0, "order_count": 0}
|
raise
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
@@ -1142,10 +1531,10 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
|
|||||||
try:
|
try:
|
||||||
row = session.execute(text("""
|
row = session.execute(text("""
|
||||||
SELECT
|
SELECT
|
||||||
SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue,
|
SUM(COALESCE("總業績"::numeric, 0)) AS revenue,
|
||||||
COUNT(DISTINCT "商品ID") AS sku_count,
|
COUNT(DISTINCT "商品ID") AS sku_count,
|
||||||
COUNT(*) AS order_count,
|
COUNT(*) AS order_count,
|
||||||
AVG(COALESCE("銷售金額"::numeric, 0)) AS avg_order_value
|
AVG(COALESCE("總業績"::numeric, 0)) AS avg_order_value
|
||||||
FROM daily_sales_snapshot
|
FROM daily_sales_snapshot
|
||||||
WHERE snapshot_date::date BETWEEN :start AND :end
|
WHERE snapshot_date::date BETWEEN :start AND :end
|
||||||
"""), {"start": start_date.date(), "end": end_date.date()}).fetchone()
|
"""), {"start": start_date.date(), "end": end_date.date()}).fetchone()
|
||||||
@@ -1158,7 +1547,7 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
|
|||||||
prev_start = (start_date - timedelta(days=1)).replace(day=1)
|
prev_start = (start_date - timedelta(days=1)).replace(day=1)
|
||||||
prev_end = start_date - timedelta(days=1)
|
prev_end = start_date - timedelta(days=1)
|
||||||
prev_row = session.execute(text("""
|
prev_row = session.execute(text("""
|
||||||
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
|
SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||||||
FROM daily_sales_snapshot
|
FROM daily_sales_snapshot
|
||||||
WHERE snapshot_date::date BETWEEN :start AND :end
|
WHERE snapshot_date::date BETWEEN :start AND :end
|
||||||
"""), {"start": prev_start.date(), "end": prev_end.date()}).fetchone()
|
"""), {"start": prev_start.date(), "end": prev_end.date()}).fetchone()
|
||||||
@@ -1169,7 +1558,7 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
|
|||||||
yoy_start = start_date.replace(year=start_date.year - 1)
|
yoy_start = start_date.replace(year=start_date.year - 1)
|
||||||
yoy_end = end_date.replace(year=end_date.year - 1)
|
yoy_end = end_date.replace(year=end_date.year - 1)
|
||||||
yoy_row = session.execute(text("""
|
yoy_row = session.execute(text("""
|
||||||
SELECT SUM(COALESCE("銷售金額"::numeric, 0)) AS revenue
|
SELECT SUM(COALESCE("總業績"::numeric, 0)) AS revenue
|
||||||
FROM daily_sales_snapshot
|
FROM daily_sales_snapshot
|
||||||
WHERE snapshot_date::date BETWEEN :start AND :end
|
WHERE snapshot_date::date BETWEEN :start AND :end
|
||||||
"""), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone()
|
"""), {"start": yoy_start.date(), "end": yoy_end.date()}).fetchone()
|
||||||
@@ -1185,9 +1574,8 @@ def _fetch_monthly_sales_summary(start_date: datetime, end_date: datetime) -> Di
|
|||||||
"yoy_pct": round(yoy_pct, 1),
|
"yoy_pct": round(yoy_pct, 1),
|
||||||
}
|
}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[OpenClaw] 月度業績讀取失敗: %s", e)
|
logger.error("[OpenClaw] 月度業績讀取失敗: %s", e)
|
||||||
return {"revenue": 0, "sku_count": 0, "order_count": 0,
|
raise
|
||||||
"avg_order_value": 0, "mom_pct": 0, "yoy_pct": 0}
|
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
@@ -1221,8 +1609,8 @@ def _fetch_price_trend_summary(days: int = 30) -> Dict[str, Any]:
|
|||||||
}
|
}
|
||||||
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
|
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
|
logger.error("[OpenClaw] 價格趨勢統計讀取失敗: %s", e)
|
||||||
return {"price_changes": 0, "avg_change_pct": 0, "price_cuts": 0, "price_raises": 0}
|
raise
|
||||||
finally:
|
finally:
|
||||||
session.close()
|
session.close()
|
||||||
|
|
||||||
|
|||||||
Reference in New Issue
Block a user