428 lines
14 KiB
Python
428 lines
14 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""Read-only scanner for AI automation debt and legacy human-gate residue."""
|
|
|
|
from __future__ import annotations
|
|
|
|
from dataclasses import dataclass
|
|
from pathlib import Path
|
|
from typing import Any
|
|
|
|
|
|
POLICY = "read_only_ai_automation_debt_scan"
|
|
|
|
ROOT = Path(__file__).resolve().parents[1]
|
|
|
|
SCAN_TARGETS = (
|
|
"templates",
|
|
"routes",
|
|
"services",
|
|
"scripts",
|
|
"docs/AI_INTELLIGENCE_MODULE_SOT.md",
|
|
"TODO_NEXT_STEPS.txt",
|
|
)
|
|
|
|
SCAN_SUFFIXES = {".py", ".html", ".js", ".md", ".txt"}
|
|
|
|
EXCLUDED_PARTS = {
|
|
".git",
|
|
".pytest_cache",
|
|
"__pycache__",
|
|
"node_modules",
|
|
"data",
|
|
"tests",
|
|
"migrations",
|
|
"docs/memory",
|
|
}
|
|
|
|
PRODUCT_SURFACE_PREFIXES = (
|
|
"templates/dashboard_v2.html",
|
|
"templates/daily_sales.html",
|
|
"templates/growth_analysis.html",
|
|
"routes/dashboard_routes.py",
|
|
"routes/ai_routes.py",
|
|
"routes/openclaw_bot_routes.py",
|
|
"services/competitor_intel_repository.py",
|
|
"services/competitor_match_review_service.py",
|
|
"services/competitor_price_feeder.py",
|
|
"services/openclaw_strategist_service.py",
|
|
"services/pchome_mapping_backlog_service.py",
|
|
"services/pchome_revenue_growth_service.py",
|
|
"services/ppt_generator.py",
|
|
"services/telegram_templates.py",
|
|
"services/webcrumbs_host_data_service.py",
|
|
"web/static/js/page-dashboard-v2.js",
|
|
)
|
|
|
|
MANUAL_MARKERS = (
|
|
"需人工",
|
|
"人工覆核",
|
|
"人工閉環",
|
|
"人工已",
|
|
"人工標記",
|
|
"人工要求",
|
|
"人工確認",
|
|
"人工採用",
|
|
"人工否決",
|
|
"人工單位價",
|
|
"重算待人工",
|
|
"HITL",
|
|
"requires_hitl",
|
|
"human_review_required",
|
|
"manual_review_required",
|
|
"manual_required",
|
|
"needs_human",
|
|
"ready_for_manual",
|
|
"manual_operator_approval",
|
|
"manual_approval_required",
|
|
"manual_sample",
|
|
"manual_fetch",
|
|
)
|
|
|
|
HARD_GATE_MARKERS = (
|
|
"secret",
|
|
"token",
|
|
"private key",
|
|
"cookie",
|
|
"raw session",
|
|
"authorization header",
|
|
"DROP ",
|
|
"TRUNCATE ",
|
|
"destructive migration",
|
|
"reboot",
|
|
"force push",
|
|
"paid provider",
|
|
)
|
|
|
|
VISIBLE_HUMAN_TEXT = (
|
|
"需人工",
|
|
"人工覆核",
|
|
"人工閉環",
|
|
"人工已",
|
|
"人工標記",
|
|
"人工要求",
|
|
"人工確認",
|
|
"人工採用",
|
|
"人工否決",
|
|
"人工單位價",
|
|
"重算待人工",
|
|
"HITL",
|
|
)
|
|
|
|
|
|
@dataclass(frozen=True)
|
|
class Finding:
|
|
file: str
|
|
line: int
|
|
marker: str
|
|
snippet: str
|
|
category: str
|
|
priority: str
|
|
controlled_apply_allowed: bool
|
|
recommended_next_action: str
|
|
|
|
def as_dict(self) -> dict[str, Any]:
|
|
return {
|
|
"file": self.file,
|
|
"line": self.line,
|
|
"marker": self.marker,
|
|
"snippet": self.snippet,
|
|
"category": self.category,
|
|
"priority": self.priority,
|
|
"controlled_apply_allowed": self.controlled_apply_allowed,
|
|
"recommended_next_action": self.recommended_next_action,
|
|
}
|
|
|
|
|
|
def _relative(path: Path, root: Path) -> str:
|
|
return path.relative_to(root).as_posix()
|
|
|
|
|
|
def _is_excluded(relative_path: str) -> bool:
|
|
if relative_path == "services/ai_automation_debt_service.py":
|
|
return True
|
|
parts = set(relative_path.split("/"))
|
|
if parts & {".git", ".pytest_cache", "__pycache__", "node_modules", "data", "tests", "migrations"}:
|
|
return True
|
|
return relative_path.startswith("docs/memory/")
|
|
|
|
|
|
def _iter_scan_files(root: Path) -> list[Path]:
|
|
files: list[Path] = []
|
|
for target in SCAN_TARGETS:
|
|
path = root / target
|
|
if not path.exists():
|
|
continue
|
|
if path.is_file():
|
|
if path.suffix in SCAN_SUFFIXES and not _is_excluded(_relative(path, root)):
|
|
files.append(path)
|
|
continue
|
|
for candidate in path.rglob("*"):
|
|
if not candidate.is_file() or candidate.suffix not in SCAN_SUFFIXES:
|
|
continue
|
|
rel = _relative(candidate, root)
|
|
if _is_excluded(rel):
|
|
continue
|
|
files.append(candidate)
|
|
return sorted(set(files))
|
|
|
|
|
|
def _first_marker(line: str) -> str | None:
|
|
return next((marker for marker in MANUAL_MARKERS if marker in line), None)
|
|
|
|
|
|
def _has_hard_gate_context(line: str) -> bool:
|
|
lower = line.lower()
|
|
return any(marker.lower() in lower for marker in HARD_GATE_MARKERS)
|
|
|
|
|
|
def _is_product_surface(relative_path: str) -> bool:
|
|
return any(relative_path == prefix or relative_path.startswith(prefix) for prefix in PRODUCT_SURFACE_PREFIXES)
|
|
|
|
|
|
def _is_legacy_compatibility_line(line: str) -> bool:
|
|
stripped = line.strip()
|
|
if any(text in stripped for text in VISIBLE_HUMAN_TEXT):
|
|
return False
|
|
if "requires_hitl" in stripped and "True" not in stripped and "true" not in stripped:
|
|
return True
|
|
if 'get("human_review_required")' in stripped or "get('human_review_required')" in stripped:
|
|
return True
|
|
compatibility_tokens = (
|
|
"requires_hitl",
|
|
"manual_",
|
|
"human_review_required",
|
|
"manual_review_required",
|
|
"legacy_human_review_required",
|
|
"hitl_count",
|
|
)
|
|
if not any(token in stripped for token in compatibility_tokens):
|
|
return False
|
|
false_or_count_zero = (
|
|
"False" in stripped
|
|
or "false" in stripped
|
|
or "_count" in stripped
|
|
or "legacy_" in stripped
|
|
or "manual_" in stripped
|
|
)
|
|
return false_or_count_zero
|
|
|
|
|
|
def _classify(relative_path: str, line: str, marker: str) -> tuple[str, str, bool, str]:
|
|
legacy_compatibility_line = _is_legacy_compatibility_line(line)
|
|
if _has_hard_gate_context(line) and not legacy_compatibility_line:
|
|
return (
|
|
"incident_hard_gate",
|
|
"P0",
|
|
False,
|
|
"Keep as hard gate; require break-glass path, replay/shadow/canary, and explicit external approval.",
|
|
)
|
|
|
|
if legacy_compatibility_line:
|
|
return (
|
|
"legacy_compatibility_field",
|
|
"P3",
|
|
True,
|
|
"Keep key compatibility, but ensure product copy and summaries expose AI controlled apply fields.",
|
|
)
|
|
|
|
if relative_path == "routes/openclaw_bot_routes.py" and "HITL" in line:
|
|
return (
|
|
"ea_legacy_callback_debt",
|
|
"P1",
|
|
True,
|
|
"Convert EA legacy HITL wording to AI exception callback wording while preserving callback_data compatibility.",
|
|
)
|
|
|
|
if _is_product_surface(relative_path):
|
|
return (
|
|
"product_surface_blocker",
|
|
"P0",
|
|
True,
|
|
"Replace visible/manual gate wording with AI decision envelope, primary_human_gate_count=0, and verifier/rollback path.",
|
|
)
|
|
|
|
if relative_path.startswith("services/market_intel/") or relative_path.startswith("routes/market_intel"):
|
|
return (
|
|
"market_intel_ai_controlled_apply_candidate",
|
|
"P1",
|
|
True,
|
|
"Convert manual preview phases to AI controlled preview with source diff, dry-run, receipt, verifier, and rollback metadata.",
|
|
)
|
|
|
|
if relative_path.startswith("docs/") or relative_path == "TODO_NEXT_STEPS.txt":
|
|
return (
|
|
"governance_doc_debt",
|
|
"P2",
|
|
True,
|
|
"Update current doctrine from manual/HITL wording to AI controlled apply while preserving historical version notes.",
|
|
)
|
|
|
|
return (
|
|
"automation_debt",
|
|
"P2",
|
|
True,
|
|
"Route this residue through AI exception auto-resolution and add a regression guard.",
|
|
)
|
|
|
|
|
|
def _scan_file(path: Path, root: Path, per_file_limit: int) -> list[Finding]:
|
|
relative_path = _relative(path, root)
|
|
findings: list[Finding] = []
|
|
try:
|
|
lines = path.read_text(encoding="utf-8", errors="ignore").splitlines()
|
|
except OSError:
|
|
return findings
|
|
|
|
for index, line in enumerate(lines, start=1):
|
|
marker = _first_marker(line)
|
|
if not marker:
|
|
continue
|
|
category, priority, allowed, action = _classify(relative_path, line, marker)
|
|
findings.append(
|
|
Finding(
|
|
file=relative_path,
|
|
line=index,
|
|
marker=marker,
|
|
snippet=line.strip()[:220],
|
|
category=category,
|
|
priority=priority,
|
|
controlled_apply_allowed=allowed,
|
|
recommended_next_action=action,
|
|
)
|
|
)
|
|
if len(findings) >= per_file_limit:
|
|
break
|
|
return findings
|
|
|
|
|
|
def _priority_key(finding: Finding) -> tuple[int, str, int]:
|
|
rank = {"P0": 0, "P1": 1, "P2": 2, "P3": 3}.get(finding.priority, 9)
|
|
return rank, finding.file, finding.line
|
|
|
|
|
|
def _market_intel_ai_alias_count() -> int:
|
|
try:
|
|
from services.market_intel.ai_controlled_route_aliases import (
|
|
AI_CONTROLLED_ROUTE_ALIASES,
|
|
)
|
|
except Exception:
|
|
return 0
|
|
return len(AI_CONTROLLED_ROUTE_ALIASES)
|
|
|
|
|
|
def build_ai_automation_debt_report(
|
|
*,
|
|
root: Path | str | None = None,
|
|
max_findings: int = 120,
|
|
per_file_limit: int = 8,
|
|
) -> dict[str, Any]:
|
|
"""Build a read-only, machine-actionable AI automation debt inventory."""
|
|
scan_root = Path(root) if root is not None else ROOT
|
|
max_findings = max(10, min(int(max_findings or 120), 500))
|
|
per_file_limit = max(1, min(int(per_file_limit or 8), 40))
|
|
|
|
files = _iter_scan_files(scan_root)
|
|
findings: list[Finding] = []
|
|
for path in files:
|
|
findings.extend(_scan_file(path, scan_root, per_file_limit=per_file_limit))
|
|
|
|
findings.sort(key=_priority_key)
|
|
all_finding_dicts = [finding.as_dict() for finding in findings]
|
|
visible_findings = all_finding_dicts[:max_findings]
|
|
|
|
category_counts: dict[str, int] = {}
|
|
priority_counts: dict[str, int] = {}
|
|
for finding in findings:
|
|
category_counts[finding.category] = category_counts.get(finding.category, 0) + 1
|
|
priority_counts[finding.priority] = priority_counts.get(finding.priority, 0) + 1
|
|
|
|
product_surface_blocker_count = category_counts.get("product_surface_blocker", 0)
|
|
controlled_apply_candidate_count = sum(
|
|
1
|
|
for finding in findings
|
|
if finding.controlled_apply_allowed and finding.category != "legacy_compatibility_field"
|
|
)
|
|
hard_gate_count = category_counts.get("incident_hard_gate", 0)
|
|
market_intel_ai_alias_count = _market_intel_ai_alias_count()
|
|
if market_intel_ai_alias_count >= 90:
|
|
market_intel_alias_status = "review_report_alias_layer_complete"
|
|
market_intel_next_action = (
|
|
"Migrate internal legacy names behind AI exception aliases while preserving "
|
|
"compatibility routes and receipts."
|
|
)
|
|
elif market_intel_ai_alias_count:
|
|
market_intel_alias_status = "alias_layer_started"
|
|
market_intel_next_action = (
|
|
"Expand AI controlled route aliases into the remaining review/report routes, "
|
|
"then migrate internal legacy names behind compatibility constants."
|
|
)
|
|
else:
|
|
market_intel_alias_status = "ready_for_source_refactor"
|
|
market_intel_next_action = (
|
|
"Add AI controlled canonical route aliases before migrating internal legacy "
|
|
"names behind compatibility constants."
|
|
)
|
|
|
|
return {
|
|
"policy": POLICY,
|
|
"success": True,
|
|
"result": "PRODUCT_SURFACE_CLEAR" if product_surface_blocker_count == 0 else "PRODUCT_SURFACE_BLOCKED",
|
|
"summary": {
|
|
"scanned_file_count": len(files),
|
|
"finding_count": len(findings),
|
|
"returned_finding_count": len(visible_findings),
|
|
"product_surface_blocker_count": product_surface_blocker_count,
|
|
"controlled_apply_candidate_count": controlled_apply_candidate_count,
|
|
"incident_hard_gate_count": hard_gate_count,
|
|
"legacy_compatibility_field_count": category_counts.get("legacy_compatibility_field", 0),
|
|
"primary_human_gate_count": product_surface_blocker_count,
|
|
"ai_controlled_apply_ready": product_surface_blocker_count == 0,
|
|
"market_intel_ai_controlled_alias_count": market_intel_ai_alias_count,
|
|
"category_counts": category_counts,
|
|
"priority_counts": priority_counts,
|
|
},
|
|
"findings": visible_findings,
|
|
"next_work_order": [
|
|
{
|
|
"priority": "P0",
|
|
"lane": "product_surface",
|
|
"status": "clear" if product_surface_blocker_count == 0 else "needs_ai_copy_fix",
|
|
"target_count": product_surface_blocker_count,
|
|
"next_action": "Keep dashboard/daily/growth/OpenClaw/Webcrumbs product copy locked to AI decision envelope wording.",
|
|
},
|
|
{
|
|
"priority": "P1",
|
|
"lane": "market_intel_controlled_apply",
|
|
"status": market_intel_alias_status,
|
|
"target_count": category_counts.get("market_intel_ai_controlled_apply_candidate", 0),
|
|
"alias_count": market_intel_ai_alias_count,
|
|
"next_action": market_intel_next_action,
|
|
},
|
|
{
|
|
"priority": "P2",
|
|
"lane": "governance_docs",
|
|
"status": "ready_for_doctrine_cleanup",
|
|
"target_count": category_counts.get("governance_doc_debt", 0),
|
|
"next_action": "Update current SOT wording while leaving historical release notes marked as legacy history.",
|
|
},
|
|
{
|
|
"priority": "P3",
|
|
"lane": "legacy_compatibility_aliases",
|
|
"status": "needs_alias_migration"
|
|
if category_counts.get("legacy_compatibility_field", 0)
|
|
else "clear",
|
|
"target_count": category_counts.get("legacy_compatibility_field", 0),
|
|
"next_action": "Move remaining legacy manual/human-review API keys behind AI exception aliases while preserving backward compatibility.",
|
|
},
|
|
],
|
|
"safety": {
|
|
"read_only": True,
|
|
"writes_database": False,
|
|
"executes_network": False,
|
|
"uses_llm": False,
|
|
"scans_raw_sessions": False,
|
|
"github_used": False,
|
|
},
|
|
}
|