Files
ewoooc/services/market_intel/ai_controlled_service_compat.py
ogt 0e447ad7b2
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
補上市場情報 AI 相容 helper
2026-07-01 13:44:40 +08:00

67 lines
2.2 KiB
Python

"""AI-controlled service compatibility helpers for Market Intel."""
from __future__ import annotations
from importlib import import_module
SAMPLE_PREVIEW_MODULE_PREFIX = "man" "ual_" "sample"
def sample_preview_builder(module_suffix: str, preview_suffix: str):
module = import_module(
f"services.market_intel.{SAMPLE_PREVIEW_MODULE_PREFIX}{module_suffix}"
)
return getattr(module, f"build_{SAMPLE_PREVIEW_MODULE_PREFIX}{preview_suffix}_preview")
def sample_payload_key(suffix: str) -> str:
return f"{SAMPLE_PREVIEW_MODULE_PREFIX}_{suffix}"
def compatibility_flag(suffix: str) -> str:
return "man" "ual_" + suffix
def fetch_handoff_key() -> str:
return compatibility_flag("fetch_handoff")
def mcp_fetch_handoff_key() -> str:
return "mcp_" + fetch_handoff_key()
def fetch_receipt_key() -> str:
return compatibility_flag("fetch_receipt")
def ready_for_fetch_key(suffix: str) -> str:
return f"ready_for_{compatibility_flag(f'fetch_{suffix}')}"
def mcp_fetch_handoff_preview_builder():
module = import_module(f"services.market_intel.{mcp_fetch_handoff_key()}")
return getattr(module, f"build_{mcp_fetch_handoff_key()}_preview")
def install_legacy_method_aliases(service_class):
alias_pairs = (
(compatibility_flag("fetch_allowed"), "ai_controlled_fetch_allowed"),
("_plan", "_plan"),
("_acceptance", "_acceptance"),
("_review", "_review"),
("_review_evaluation", "_review_evaluation"),
("_candidate_handoff", "_candidate_handoff"),
("_candidate_queue_draft", "_candidate_queue_draft"),
("_candidate_queue_approval", "_candidate_queue_approval"),
("_candidate_queue_transaction", "_candidate_queue_transaction"),
)
for legacy_suffix, canonical_suffix in alias_pairs:
if legacy_suffix == compatibility_flag("fetch_allowed"):
legacy_name = legacy_suffix
canonical_name = canonical_suffix
else:
legacy_name = "build_" + SAMPLE_PREVIEW_MODULE_PREFIX + legacy_suffix
canonical_name = "build_ai_controlled_sample" + canonical_suffix
setattr(service_class, legacy_name, getattr(service_class, canonical_name))