140 lines
6.8 KiB
Python
140 lines
6.8 KiB
Python
from flask import Flask
|
|
|
|
|
|
def _client():
|
|
from routes.market_intel_routes import market_intel_bp
|
|
from routes.market_intel_review_routes import market_intel_review_bp
|
|
|
|
app = Flask(__name__)
|
|
app.secret_key = "test-secret"
|
|
app.register_blueprint(market_intel_bp)
|
|
app.register_blueprint(market_intel_review_bp)
|
|
client = app.test_client()
|
|
with client.session_transaction() as session:
|
|
session["logged_in"] = True
|
|
return client
|
|
|
|
|
|
def test_market_intel_ai_controlled_alias_report_is_primary_and_read_only():
|
|
from services.market_intel.ai_controlled_route_aliases import (
|
|
build_ai_controlled_route_alias_report,
|
|
)
|
|
|
|
report = build_ai_controlled_route_alias_report()
|
|
|
|
assert report["policy"] == "read_only_market_intel_ai_controlled_route_aliases"
|
|
assert report["result"] == "AI_CONTROLLED_CANONICAL_ROUTES_READY"
|
|
assert report["primary_route_family"] == "/api/market_intel/ai_controlled"
|
|
assert report["legacy_route_family_status"] == "compatibility_only"
|
|
assert report["canonical_count"] >= 90
|
|
assert report["safety"]["writes_database"] is False
|
|
assert {
|
|
"/api/market_intel/ai_controlled/sample_plan",
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_writer_status",
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_inventory",
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_decision_writer_run_closeout",
|
|
"/api/market_intel/ai_controlled/mcp_fetch_target_review",
|
|
"/api/market_intel/ai_controlled/mcp_fetch_run_package",
|
|
"/api/market_intel/ai_controlled/mcp_fetch_candidate_queue_writer_review_decision_approval",
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_ai_summary_preflight",
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_ai_summary_persistence_telegram_dispatch_run_package",
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_ai_summary_persistence_telegram_dispatch_report_input",
|
|
}.issubset({item["canonical_path"] for item in report["routes"]})
|
|
|
|
|
|
def test_market_intel_ai_controlled_alias_routes_are_registered():
|
|
client = _client()
|
|
|
|
alias_response = client.get("/api/market_intel/ai_controlled/route_aliases")
|
|
plan_response = client.get("/api/market_intel/ai_controlled/sample_plan")
|
|
review_response = client.get("/api/market_intel/ai_controlled/sample_review")
|
|
writer_response = client.post(
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_writer_status"
|
|
)
|
|
decision_response = client.post(
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_decision"
|
|
)
|
|
target_review_response = client.get(
|
|
"/api/market_intel/ai_controlled/mcp_fetch_target_review"
|
|
)
|
|
run_package_response = client.get(
|
|
"/api/market_intel/ai_controlled/mcp_fetch_run_package"
|
|
)
|
|
review_post_response = client.post(
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_ai_summary_preflight"
|
|
)
|
|
post_ai_response = client.post(
|
|
"/api/market_intel/ai_controlled/review/"
|
|
"candidate_queue_review_ai_summary_persistence_telegram_dispatch_run_package"
|
|
)
|
|
report_response = client.post(
|
|
"/api/market_intel/ai_controlled/review/"
|
|
"candidate_queue_review_ai_summary_persistence_telegram_dispatch_report_input"
|
|
)
|
|
|
|
assert alias_response.status_code == 200
|
|
assert alias_response.get_json()["legacy_route_family_status"] == "compatibility_only"
|
|
assert plan_response.status_code == 200
|
|
assert plan_response.get_json()["mode"] == "manual_sample_fetch_plan_preview"
|
|
assert review_response.status_code == 200
|
|
assert review_response.get_json()["candidate_import_allowed"] is False
|
|
assert writer_response.status_code == 400
|
|
writer_data = writer_response.get_json()
|
|
assert writer_data["database_write_executed"] is False
|
|
assert writer_data["safety_contract"]["refuses_api_execution"] is True
|
|
assert decision_response.status_code == 400
|
|
decision_data = decision_response.get_json()
|
|
assert decision_data["database_write_executed"] is False
|
|
assert decision_data["external_network_executed"] is False
|
|
assert target_review_response.status_code == 200
|
|
assert target_review_response.get_json()["database_write_executed"] is False
|
|
assert run_package_response.status_code == 200
|
|
assert run_package_response.get_json()["database_write_executed"] is False
|
|
assert review_post_response.status_code == 400
|
|
assert review_post_response.get_json().get("database_write_executed") is False
|
|
assert post_ai_response.status_code == 400
|
|
assert post_ai_response.get_json().get("database_write_executed") is False
|
|
assert report_response.status_code == 400
|
|
assert report_response.get_json().get("database_write_executed") is False
|
|
|
|
endpoints = {rule.endpoint for rule in client.application.url_map.iter_rules()}
|
|
assert {
|
|
"market_intel_review.market_intel_ai_controlled_review_candidate_queue_review_decision",
|
|
"market_intel_review.market_intel_manual_sample_candidate_queue_review_decision",
|
|
"market_intel_review.market_intel_ai_controlled_review_candidate_queue_review_ai_summary_preflight",
|
|
"market_intel_review.market_intel_manual_sample_candidate_queue_review_ai_summary_preflight",
|
|
"market_intel_review.market_intel_ai_controlled_review_candidate_queue_review_ai_summary_persistence_telegram_dispatch_report_input",
|
|
"market_intel_review.market_intel_manual_sample_candidate_queue_review_ai_summary_persistence_telegram_dispatch_report_input",
|
|
}.issubset(endpoints)
|
|
|
|
|
|
def test_market_intel_deployment_readiness_exposes_canonical_ai_smoke_targets():
|
|
from services.market_intel import MarketIntelService
|
|
|
|
readiness = MarketIntelService().build_deployment_readiness()
|
|
|
|
assert "/api/market_intel/ai_controlled/sample_plan" in readiness["canonical_ai_smoke_targets"]
|
|
assert (
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_writer_status"
|
|
in readiness["canonical_ai_smoke_targets"]
|
|
)
|
|
assert (
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_decision"
|
|
in readiness["canonical_ai_smoke_targets"]
|
|
)
|
|
assert (
|
|
"/api/market_intel/ai_controlled/mcp_fetch_run_package"
|
|
in readiness["canonical_ai_smoke_targets"]
|
|
)
|
|
assert (
|
|
"/api/market_intel/ai_controlled/review/candidate_queue_review_ai_summary_preflight"
|
|
in readiness["canonical_ai_smoke_targets"]
|
|
)
|
|
assert (
|
|
"/api/market_intel/ai_controlled/review/"
|
|
"candidate_queue_review_ai_summary_persistence_telegram_dispatch_report_input"
|
|
in readiness["canonical_ai_smoke_targets"]
|
|
)
|
|
assert readiness["ai_controlled_route_aliases"][0]["canonical_status"] == "primary_ai_controlled"
|
|
assert readiness["ai_controlled_route_aliases"][0]["legacy_status"] == "compatibility_only"
|