Files
ewoooc/services/market_intel/candidate_preview.py
ogt 08d9e3fe7d
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
清除市場情報 P3 相容人工語意
2026-07-01 18:24:51 +08:00

92 lines
3.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""市場情報候選連結 preview 聚合。
只整理本次 diagnostics 結果供AI 例外決策,不建立 campaign/product不寫 DB。
"""
from services.market_intel.ai_controlled_service_compat import compatibility_flag
BAND_RANK = {
"high": 3,
"medium": 2,
"low": 1,
}
def _band_allowed(candidate, min_band):
if not min_band or min_band == "all":
return True
candidate_rank = BAND_RANK.get(candidate.get("confidence_band"), 0)
min_rank = BAND_RANK.get(min_band, 0)
return candidate_rank >= min_rank
def build_candidate_preview_from_discovery(discovery_result, *, min_band="all", limit=50):
"""把 AI-controlled discovery diagnostics 整理成AI 例外決策 preview。"""
candidates = []
run_statuses = []
for run in discovery_result.get("runs", []):
platform_code = run.get("platform_code")
run_statuses.append({
"platform_code": platform_code,
"status": run.get("status"),
"sources_planned": run.get("sources_planned", 0),
"sources_fetched": run.get("sources_fetched", 0),
"errors": run.get("errors", 0),
})
for source_result in run.get("results", []):
diagnostics = source_result.get("diagnostics") or {}
for candidate in diagnostics.get("campaign_link_candidates", []):
if not _band_allowed(candidate, min_band):
continue
candidates.append({
"platform_code": platform_code,
"source_key": source_result.get("source_key"),
"source_name": source_result.get("name"),
"source_url": source_result.get("url"),
"source_status": source_result.get("status"),
"page_title": diagnostics.get("title"),
"page_hash": diagnostics.get("page_hash"),
"href": candidate.get("href"),
"text": candidate.get("text"),
"is_same_host": candidate.get("is_same_host"),
"score": candidate.get("score", 0),
"generic_score": candidate.get("generic_score", 0),
"platform_score": candidate.get("platform_score", 0),
"confidence_band": candidate.get("confidence_band"),
"confidence_reason": candidate.get("confidence_reason"),
})
candidates = sorted(
candidates,
key=lambda item: (
BAND_RANK.get(item.get("confidence_band"), 0),
item.get("score", 0),
item.get("platform_score", 0),
),
reverse=True,
)
return {
"platform_code": discovery_result.get("platform_code", "all"),
"fetch_requested": bool(discovery_result.get("fetch_requested")),
compatibility_flag("fetch_allowed"): bool(
discovery_result.get(compatibility_flag("fetch_allowed"))
),
"mcp_fetch_gate": discovery_result.get("mcp_fetch_gate"),
"mcp_fetch_gate_open": bool(
(discovery_result.get("mcp_fetch_gate") or {}).get(
compatibility_flag("fetch_gate_open")
)
),
"min_band": min_band or "all",
"limit": limit,
"candidate_count": len(candidates),
"candidates": candidates[:limit],
"run_statuses": run_statuses,
"database_write_allowed": False,
"scheduler_attached": False,
}