Files
ewoooc/services/ai_automation_debt_service.py
ogt 71a9ca4f3d
Some checks failed
CD Pipeline / deploy (push) Has been cancelled
Add PChome AI controlled dry-run closeout chain
2026-07-01 13:22:16 +08:00

428 lines
14 KiB
Python

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""Read-only scanner for AI automation debt and legacy human-gate residue."""
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
from typing import Any
POLICY = "read_only_ai_automation_debt_scan"
ROOT = Path(__file__).resolve().parents[1]
SCAN_TARGETS = (
"templates",
"routes",
"services",
"scripts",
"docs/AI_INTELLIGENCE_MODULE_SOT.md",
"TODO_NEXT_STEPS.txt",
)
SCAN_SUFFIXES = {".py", ".html", ".js", ".md", ".txt"}
EXCLUDED_PARTS = {
".git",
".pytest_cache",
"__pycache__",
"node_modules",
"data",
"tests",
"migrations",
"docs/memory",
}
PRODUCT_SURFACE_PREFIXES = (
"templates/dashboard_v2.html",
"templates/daily_sales.html",
"templates/growth_analysis.html",
"routes/dashboard_routes.py",
"routes/ai_routes.py",
"routes/openclaw_bot_routes.py",
"services/competitor_intel_repository.py",
"services/competitor_match_review_service.py",
"services/competitor_price_feeder.py",
"services/openclaw_strategist_service.py",
"services/pchome_mapping_backlog_service.py",
"services/pchome_revenue_growth_service.py",
"services/ppt_generator.py",
"services/telegram_templates.py",
"services/webcrumbs_host_data_service.py",
"web/static/js/page-dashboard-v2.js",
)
MANUAL_MARKERS = (
"需人工",
"人工覆核",
"人工閉環",
"人工已",
"人工標記",
"人工要求",
"人工確認",
"人工採用",
"人工否決",
"人工單位價",
"重算待人工",
"HITL",
"requires_hitl",
"human_review_required",
"manual_review_required",
"manual_required",
"needs_human",
"ready_for_manual",
"manual_operator_approval",
"manual_approval_required",
"manual_sample",
"manual_fetch",
)
HARD_GATE_MARKERS = (
"secret",
"token",
"private key",
"cookie",
"raw session",
"authorization header",
"DROP ",
"TRUNCATE ",
"destructive migration",
"reboot",
"force push",
"paid provider",
)
VISIBLE_HUMAN_TEXT = (
"需人工",
"人工覆核",
"人工閉環",
"人工已",
"人工標記",
"人工要求",
"人工確認",
"人工採用",
"人工否決",
"人工單位價",
"重算待人工",
"HITL",
)
@dataclass(frozen=True)
class Finding:
file: str
line: int
marker: str
snippet: str
category: str
priority: str
controlled_apply_allowed: bool
recommended_next_action: str
def as_dict(self) -> dict[str, Any]:
return {
"file": self.file,
"line": self.line,
"marker": self.marker,
"snippet": self.snippet,
"category": self.category,
"priority": self.priority,
"controlled_apply_allowed": self.controlled_apply_allowed,
"recommended_next_action": self.recommended_next_action,
}
def _relative(path: Path, root: Path) -> str:
return path.relative_to(root).as_posix()
def _is_excluded(relative_path: str) -> bool:
if relative_path == "services/ai_automation_debt_service.py":
return True
parts = set(relative_path.split("/"))
if parts & {".git", ".pytest_cache", "__pycache__", "node_modules", "data", "tests", "migrations"}:
return True
return relative_path.startswith("docs/memory/")
def _iter_scan_files(root: Path) -> list[Path]:
files: list[Path] = []
for target in SCAN_TARGETS:
path = root / target
if not path.exists():
continue
if path.is_file():
if path.suffix in SCAN_SUFFIXES and not _is_excluded(_relative(path, root)):
files.append(path)
continue
for candidate in path.rglob("*"):
if not candidate.is_file() or candidate.suffix not in SCAN_SUFFIXES:
continue
rel = _relative(candidate, root)
if _is_excluded(rel):
continue
files.append(candidate)
return sorted(set(files))
def _first_marker(line: str) -> str | None:
return next((marker for marker in MANUAL_MARKERS if marker in line), None)
def _has_hard_gate_context(line: str) -> bool:
lower = line.lower()
return any(marker.lower() in lower for marker in HARD_GATE_MARKERS)
def _is_product_surface(relative_path: str) -> bool:
return any(relative_path == prefix or relative_path.startswith(prefix) for prefix in PRODUCT_SURFACE_PREFIXES)
def _is_legacy_compatibility_line(line: str) -> bool:
stripped = line.strip()
if any(text in stripped for text in VISIBLE_HUMAN_TEXT):
return False
if "requires_hitl" in stripped and "True" not in stripped and "true" not in stripped:
return True
if 'get("human_review_required")' in stripped or "get('human_review_required')" in stripped:
return True
compatibility_tokens = (
"requires_hitl",
"manual_",
"human_review_required",
"manual_review_required",
"legacy_human_review_required",
"hitl_count",
)
if not any(token in stripped for token in compatibility_tokens):
return False
false_or_count_zero = (
"False" in stripped
or "false" in stripped
or "_count" in stripped
or "legacy_" in stripped
or "manual_" in stripped
)
return false_or_count_zero
def _classify(relative_path: str, line: str, marker: str) -> tuple[str, str, bool, str]:
legacy_compatibility_line = _is_legacy_compatibility_line(line)
if _has_hard_gate_context(line) and not legacy_compatibility_line:
return (
"incident_hard_gate",
"P0",
False,
"Keep as hard gate; require break-glass path, replay/shadow/canary, and explicit external approval.",
)
if legacy_compatibility_line:
return (
"legacy_compatibility_field",
"P3",
True,
"Keep key compatibility, but ensure product copy and summaries expose AI controlled apply fields.",
)
if relative_path == "routes/openclaw_bot_routes.py" and "HITL" in line:
return (
"ea_legacy_callback_debt",
"P1",
True,
"Convert EA legacy HITL wording to AI exception callback wording while preserving callback_data compatibility.",
)
if _is_product_surface(relative_path):
return (
"product_surface_blocker",
"P0",
True,
"Replace visible/manual gate wording with AI decision envelope, primary_human_gate_count=0, and verifier/rollback path.",
)
if relative_path.startswith("services/market_intel/") or relative_path.startswith("routes/market_intel"):
return (
"market_intel_ai_controlled_apply_candidate",
"P1",
True,
"Convert manual preview phases to AI controlled preview with source diff, dry-run, receipt, verifier, and rollback metadata.",
)
if relative_path.startswith("docs/") or relative_path == "TODO_NEXT_STEPS.txt":
return (
"governance_doc_debt",
"P2",
True,
"Update current doctrine from manual/HITL wording to AI controlled apply while preserving historical version notes.",
)
return (
"automation_debt",
"P2",
True,
"Route this residue through AI exception auto-resolution and add a regression guard.",
)
def _scan_file(path: Path, root: Path, per_file_limit: int) -> list[Finding]:
relative_path = _relative(path, root)
findings: list[Finding] = []
try:
lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
except OSError:
return findings
for index, line in enumerate(lines, start=1):
marker = _first_marker(line)
if not marker:
continue
category, priority, allowed, action = _classify(relative_path, line, marker)
findings.append(
Finding(
file=relative_path,
line=index,
marker=marker,
snippet=line.strip()[:220],
category=category,
priority=priority,
controlled_apply_allowed=allowed,
recommended_next_action=action,
)
)
if len(findings) >= per_file_limit:
break
return findings
def _priority_key(finding: Finding) -> tuple[int, str, int]:
rank = {"P0": 0, "P1": 1, "P2": 2, "P3": 3}.get(finding.priority, 9)
return rank, finding.file, finding.line
def _market_intel_ai_alias_count() -> int:
try:
from services.market_intel.ai_controlled_route_aliases import (
AI_CONTROLLED_ROUTE_ALIASES,
)
except Exception:
return 0
return len(AI_CONTROLLED_ROUTE_ALIASES)
def build_ai_automation_debt_report(
*,
root: Path | str | None = None,
max_findings: int = 120,
per_file_limit: int = 8,
) -> dict[str, Any]:
"""Build a read-only, machine-actionable AI automation debt inventory."""
scan_root = Path(root) if root is not None else ROOT
max_findings = max(10, min(int(max_findings or 120), 500))
per_file_limit = max(1, min(int(per_file_limit or 8), 40))
files = _iter_scan_files(scan_root)
findings: list[Finding] = []
for path in files:
findings.extend(_scan_file(path, scan_root, per_file_limit=per_file_limit))
findings.sort(key=_priority_key)
all_finding_dicts = [finding.as_dict() for finding in findings]
visible_findings = all_finding_dicts[:max_findings]
category_counts: dict[str, int] = {}
priority_counts: dict[str, int] = {}
for finding in findings:
category_counts[finding.category] = category_counts.get(finding.category, 0) + 1
priority_counts[finding.priority] = priority_counts.get(finding.priority, 0) + 1
product_surface_blocker_count = category_counts.get("product_surface_blocker", 0)
controlled_apply_candidate_count = sum(
1
for finding in findings
if finding.controlled_apply_allowed and finding.category != "legacy_compatibility_field"
)
hard_gate_count = category_counts.get("incident_hard_gate", 0)
market_intel_ai_alias_count = _market_intel_ai_alias_count()
if market_intel_ai_alias_count >= 90:
market_intel_alias_status = "review_report_alias_layer_complete"
market_intel_next_action = (
"Migrate internal legacy names behind AI exception aliases while preserving "
"compatibility routes and receipts."
)
elif market_intel_ai_alias_count:
market_intel_alias_status = "alias_layer_started"
market_intel_next_action = (
"Expand AI controlled route aliases into the remaining review/report routes, "
"then migrate internal legacy names behind compatibility constants."
)
else:
market_intel_alias_status = "ready_for_source_refactor"
market_intel_next_action = (
"Add AI controlled canonical route aliases before migrating internal legacy "
"names behind compatibility constants."
)
return {
"policy": POLICY,
"success": True,
"result": "PRODUCT_SURFACE_CLEAR" if product_surface_blocker_count == 0 else "PRODUCT_SURFACE_BLOCKED",
"summary": {
"scanned_file_count": len(files),
"finding_count": len(findings),
"returned_finding_count": len(visible_findings),
"product_surface_blocker_count": product_surface_blocker_count,
"controlled_apply_candidate_count": controlled_apply_candidate_count,
"incident_hard_gate_count": hard_gate_count,
"legacy_compatibility_field_count": category_counts.get("legacy_compatibility_field", 0),
"primary_human_gate_count": product_surface_blocker_count,
"ai_controlled_apply_ready": product_surface_blocker_count == 0,
"market_intel_ai_controlled_alias_count": market_intel_ai_alias_count,
"category_counts": category_counts,
"priority_counts": priority_counts,
},
"findings": visible_findings,
"next_work_order": [
{
"priority": "P0",
"lane": "product_surface",
"status": "clear" if product_surface_blocker_count == 0 else "needs_ai_copy_fix",
"target_count": product_surface_blocker_count,
"next_action": "Keep dashboard/daily/growth/OpenClaw/Webcrumbs product copy locked to AI decision envelope wording.",
},
{
"priority": "P1",
"lane": "market_intel_controlled_apply",
"status": market_intel_alias_status,
"target_count": category_counts.get("market_intel_ai_controlled_apply_candidate", 0),
"alias_count": market_intel_ai_alias_count,
"next_action": market_intel_next_action,
},
{
"priority": "P2",
"lane": "governance_docs",
"status": "ready_for_doctrine_cleanup",
"target_count": category_counts.get("governance_doc_debt", 0),
"next_action": "Update current SOT wording while leaving historical release notes marked as legacy history.",
},
{
"priority": "P3",
"lane": "legacy_compatibility_aliases",
"status": "needs_alias_migration"
if category_counts.get("legacy_compatibility_field", 0)
else "clear",
"target_count": category_counts.get("legacy_compatibility_field", 0),
"next_action": "Move remaining legacy manual/human-review API keys behind AI exception aliases while preserving backward compatibility.",
},
],
"safety": {
"read_only": True,
"writes_database": False,
"executes_network": False,
"uses_llm": False,
"scans_raw_sessions": False,
"github_used": False,
},
}