feat(security): 鎖住 Telegram 通知出口新增旁路
Some checks failed
Code Review / ai-code-review (push) Successful in 13s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-19 00:22:17 +08:00
parent 4d0150e178
commit 9ebab2db6e
13 changed files with 3809 additions and 1 deletions

View File

@@ -74,6 +74,8 @@ REQUIRED_CONTROL_DOCS = [
"docs/security/SECURITY-ASSET-CONTROL-LEDGER.md",
"docs/security/AI-PROVIDER-OWNER-RESPONSE-ACCEPTANCE.md",
"docs/security/AGENT-BOUNTY-OWNER-REQUEST-DRAFT.md",
"docs/security/TELEGRAM-NOTIFICATION-EGRESS-NO-NEW-BYPASS-GUARD.md",
"docs/security/TELEGRAM-NOTIFICATION-EGRESS-OWNER-RESPONSE-ACCEPTANCE.md",
"docs/security/SECURITY-SUPPLY-CHAIN-CONTRACT-MANIFEST.md",
]

View File

@@ -231,12 +231,18 @@ def validate(root: Path) -> None:
telegram_notification_egress_inventory = load_json(
security_dir / "telegram-notification-egress-inventory.snapshot.json"
)
telegram_notification_egress_no_new_bypass_guard = load_json(
security_dir / "telegram-notification-egress-no-new-bypass-guard.snapshot.json"
)
telegram_notification_egress_owner_request_draft = load_json(
security_dir / "telegram-notification-egress-owner-request-draft.snapshot.json"
)
telegram_notification_egress_migration_plan_draft = load_json(
security_dir / "telegram-notification-egress-migration-plan-draft.snapshot.json"
)
telegram_notification_egress_owner_response_acceptance = load_json(
security_dir / "telegram-notification-egress-owner-response-acceptance.snapshot.json"
)
public_runtime_config_change_evidence_acceptance = load_json(
security_dir / "public-runtime-config-change-evidence-acceptance.snapshot.json"
)
@@ -21894,6 +21900,238 @@ def validate(root: Path) -> None:
f"telegram_notification_egress_migration_plan_draft.{item['migration_candidate_id']}.{false_key}",
item[false_key],
)
assert_equal(
"telegram_notification_egress_no_new_bypass_guard.schema",
telegram_notification_egress_no_new_bypass_guard["schema_version"],
"telegram_notification_egress_no_new_bypass_guard_v1",
)
assert_equal(
"telegram_notification_egress_no_new_bypass_guard.status",
telegram_notification_egress_no_new_bypass_guard["status"],
"pass_no_new_bypass",
)
assert_equal(
"telegram_notification_egress_no_new_bypass_guard.mode",
telegram_notification_egress_no_new_bypass_guard["mode"],
"repo_source_scan_no_secret_value_no_telegram_send",
)
expected_telegram_egress_no_new_bypass_summary = {
"source_direct_bot_api_call_count": 18,
"source_direct_bot_api_file_count": 11,
"baseline_signature_count": 18,
"current_direct_bot_api_call_count": 18,
"current_direct_bot_api_file_count": 11,
"guarded_method_count": 9,
"sendMessage_call_count": 18,
"sendDocument_call_count": 0,
"sendPhoto_call_count": 0,
"sendMediaGroup_call_count": 0,
"editMessageText_call_count": 0,
"other_guarded_method_call_count": 0,
"new_bypass_count": 0,
"new_bypass_file_count": 0,
"removed_baseline_call_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
}
for key, expected in expected_telegram_egress_no_new_bypass_summary.items():
assert_equal(
f"telegram_notification_egress_no_new_bypass_guard.summary.{key}",
telegram_notification_egress_no_new_bypass_guard["summary"][key],
expected,
)
assert_equal(
"telegram_notification_egress_no_new_bypass_guard.current_paths",
[item["path"] for item in telegram_notification_egress_no_new_bypass_guard["current_direct_bot_api_calls"]],
[
".gitea/workflows/cd-dev.yaml",
".gitea/workflows/cd-dev.yaml",
".gitea/workflows/cd-dev.yaml",
".gitea/workflows/cd.yaml",
".gitea/workflows/cd.yaml",
".gitea/workflows/cd.yaml",
".gitea/workflows/cd.yaml",
".gitea/workflows/cd.yaml",
".gitea/workflows/code-review.yaml",
".gitea/workflows/code-review.yaml",
".gitea/workflows/deploy-alerts.yaml",
".gitea/workflows/e2e-health.yaml",
".gitea/workflows/run-migration.yml",
"apps/api/src/services/channel_hub.py",
"scripts/ops/backup-from-110.sh",
"scripts/ops/docker-health-monitor.sh",
"scripts/ops/dr-drill.sh",
"scripts/ops/pg-backup.sh",
],
)
assert_equal(
"telegram_notification_egress_no_new_bypass_guard.new_bypass_findings",
telegram_notification_egress_no_new_bypass_guard["new_bypass_findings"],
[],
)
assert_equal(
"telegram_notification_egress_no_new_bypass_guard.removed_baseline_signatures",
telegram_notification_egress_no_new_bypass_guard["removed_baseline_signatures"],
[],
)
for key, value in telegram_notification_egress_no_new_bypass_guard["execution_boundaries"].items():
if key == "not_authorization":
assert_true(f"telegram_notification_egress_no_new_bypass_guard.execution_boundaries.{key}", value)
else:
assert_false(f"telegram_notification_egress_no_new_bypass_guard.execution_boundaries.{key}", value)
assert_equal(
"telegram_notification_egress_owner_response_acceptance.schema",
telegram_notification_egress_owner_response_acceptance["schema_version"],
"telegram_notification_egress_owner_response_acceptance_v1",
)
assert_equal(
"telegram_notification_egress_owner_response_acceptance.status",
telegram_notification_egress_owner_response_acceptance["status"],
"owner_response_acceptance_ledger_ready_no_runtime_action",
)
assert_equal(
"telegram_notification_egress_owner_response_acceptance.mode",
telegram_notification_egress_owner_response_acceptance["mode"],
"metadata_only_no_secret_value_no_telegram_send_no_workflow_script_api_change",
)
expected_telegram_egress_owner_response_acceptance_summary = {
"source_request_draft_count": 11,
"source_migration_candidate_count": 11,
"source_direct_bot_api_call_count": 18,
"acceptance_candidate_count": 11,
"workflow_acceptance_candidate_count": 6,
"ops_script_acceptance_candidate_count": 4,
"api_direct_acceptance_candidate_count": 1,
"acceptance_field_count": 32,
"required_owner_field_count": 19,
"reviewer_check_count": 22,
"outcome_lane_count": 10,
"forbidden_payload_count": 14,
"blocked_action_count": 35,
"request_sent_count": 0,
"recipient_confirmed_count": 0,
"audit_event_emitted_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"owner_response_rejected_count": 0,
"owner_response_quarantined_count": 0,
"supplement_requested_count": 0,
"formatter_convergence_accepted_count": 0,
"redaction_contract_accepted_count": 0,
"delivery_receipt_accepted_count": 0,
"break_glass_fallback_accepted_count": 0,
"maintenance_window_accepted_count": 0,
"rollback_owner_accepted_count": 0,
"postcheck_evidence_accepted_count": 0,
"dedup_or_fingerprint_accepted_count": 0,
"no_false_green_accepted_count": 0,
"direct_bot_api_migration_authorized_count": 0,
"workflow_modification_authorized_count": 0,
"script_modification_authorized_count": 0,
"api_sender_refactor_authorized_count": 0,
"telegram_send_authorized_count": 0,
"bot_api_call_authorized_count": 0,
"workflow_dispatch_authorized_count": 0,
"production_deploy_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"raw_payload_storage_allowed_count": 0,
"production_write_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
}
for key, expected in expected_telegram_egress_owner_response_acceptance_summary.items():
assert_equal(
f"telegram_notification_egress_owner_response_acceptance.summary.{key}",
telegram_notification_egress_owner_response_acceptance["summary"][key],
expected,
)
assert_equal(
"telegram_notification_egress_owner_response_acceptance.source_paths",
[item["source_path"] for item in telegram_notification_egress_owner_response_acceptance["acceptance_candidates"]],
expected_telegram_egress_request_paths,
)
for key, value in telegram_notification_egress_owner_response_acceptance["execution_boundaries"].items():
if key == "not_authorization":
assert_true(
f"telegram_notification_egress_owner_response_acceptance.execution_boundaries.{key}",
value,
)
else:
assert_false(
f"telegram_notification_egress_owner_response_acceptance.execution_boundaries.{key}",
value,
)
for item in telegram_notification_egress_owner_response_acceptance["acceptance_candidates"]:
assert_equal(
f"telegram_notification_egress_owner_response_acceptance.{item['acceptance_candidate_id']}.acceptance_fields",
len(item["acceptance_fields"]),
32,
)
assert_equal(
f"telegram_notification_egress_owner_response_acceptance.{item['acceptance_candidate_id']}.required_owner_fields",
len(item["required_owner_fields"]),
19,
)
assert_equal(
f"telegram_notification_egress_owner_response_acceptance.{item['acceptance_candidate_id']}.reviewer_checks",
len(item["reviewer_checks"]),
22,
)
assert_equal(
f"telegram_notification_egress_owner_response_acceptance.{item['acceptance_candidate_id']}.outcome_lanes",
len(item["outcome_lanes"]),
10,
)
assert_equal(
f"telegram_notification_egress_owner_response_acceptance.{item['acceptance_candidate_id']}.forbidden_payloads",
len(item["forbidden_payloads"]),
14,
)
assert_equal(
f"telegram_notification_egress_owner_response_acceptance.{item['acceptance_candidate_id']}.blocked_actions",
len(item["blocked_actions"]),
35,
)
assert_true(
f"telegram_notification_egress_owner_response_acceptance.{item['acceptance_candidate_id']}.not_authorization",
item["not_authorization"],
)
for false_key in [
"request_sent",
"recipient_confirmed",
"audit_event_emitted",
"owner_response_received",
"owner_response_accepted",
"owner_response_rejected",
"owner_response_quarantined",
"supplement_requested",
"formatter_convergence_accepted",
"redaction_contract_accepted",
"delivery_receipt_accepted",
"break_glass_fallback_accepted",
"maintenance_window_accepted",
"rollback_owner_accepted",
"postcheck_evidence_accepted",
"dedup_or_fingerprint_accepted",
"no_false_green_accepted",
"direct_bot_api_migration_authorized",
"workflow_modification_authorized",
"script_modification_authorized",
"api_sender_refactor_authorized",
"telegram_send_authorized",
"bot_api_call_authorized",
"workflow_dispatch_authorized",
"production_deploy_authorized",
"secret_value_collection_allowed",
"raw_payload_storage_allowed",
"production_write_authorized",
"runtime_gate",
"action_buttons_allowed",
]:
assert_false(
f"telegram_notification_egress_owner_response_acceptance.{item['acceptance_candidate_id']}.{false_key}",
item[false_key],
)
assert_equal(
"public_runtime_config_change_evidence_acceptance.schema",
public_runtime_config_change_evidence_acceptance["schema_version"],

View File

@@ -0,0 +1,282 @@
#!/usr/bin/env python3
"""檢查 Telegram 通知出口不可新增未登記 direct Bot API 旁路。
本 guard 只掃描 repo 原始碼與 committed snapshot不讀 secret、不呼叫
Telegram、不修改 workflow / script / API sender。既有 direct send 仍是待
owner response 的基線;任何新增或變形的 direct Bot API endpoint 都必須先
進 inventory / owner request / migration plan而不是直接合併。
"""
from __future__ import annotations
import argparse
import json
import re
import subprocess
import sys
from collections import Counter
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
SOURCE_SNAPSHOT = Path("docs/security/telegram-notification-egress-inventory.snapshot.json")
SCAN_ROOTS = (
Path(".gitea/workflows"),
Path("scripts/ops"),
Path("scripts/ci"),
Path("apps/api/src"),
)
SCAN_SUFFIXES = {".py", ".sh", ".js", ".yml", ".yaml"}
GUARDED_BOT_METHODS = (
"sendMessage",
"sendDocument",
"sendPhoto",
"sendMediaGroup",
"editMessageText",
"sendAnimation",
"sendVideo",
"sendAudio",
"sendVoice",
)
BOT_ENDPOINT_RE = re.compile(
r"api\.telegram\.org/bot.*?/(?P<method>"
+ "|".join(re.escape(method) for method in GUARDED_BOT_METHODS)
+ r")\b",
re.IGNORECASE,
)
SECRET_INTERPOLATION_RE = re.compile(r"\$\{\{\s*secrets\.[^}]+\}\}")
BOT_TOKEN_URL_RE = re.compile(
r"api\.telegram\.org/bot.*?/(?P<method>"
+ "|".join(re.escape(method) for method in GUARDED_BOT_METHODS)
+ r")\b",
re.IGNORECASE,
)
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def iter_scannable_files(root: Path) -> list[Path]:
files: list[Path] = []
for scan_root in SCAN_ROOTS:
absolute_root = root / scan_root
if not absolute_root.exists():
continue
for path in absolute_root.rglob("*"):
if path.is_file() and path.suffix in SCAN_SUFFIXES:
files.append(path)
return sorted(files)
def sanitize_excerpt(line: str) -> str:
excerpt = line.strip()
excerpt = SECRET_INTERPOLATION_RE.sub("${{ secrets.<redacted> }}", excerpt)
excerpt = BOT_TOKEN_URL_RE.sub(
lambda match: f"api.telegram.org/bot<redacted>/{match.group('method')}",
excerpt,
)
return excerpt[:180]
def signature(path: str, method: str, sanitized_excerpt: str) -> str:
return f"{path}::{method.lower()}::{sanitized_excerpt}"
def load_source_snapshot(root: Path) -> dict[str, Any]:
snapshot_path = root / SOURCE_SNAPSHOT
return json.loads(snapshot_path.read_text(encoding="utf-8"))
def build_baseline(source_snapshot: dict[str, Any]) -> Counter[str]:
baseline: Counter[str] = Counter()
for item in source_snapshot.get("direct_bot_api_calls", []):
excerpt = item.get("sanitized_excerpt", "")
match = BOT_ENDPOINT_RE.search(excerpt)
method = match.group("method") if match else "sendMessage"
baseline[signature(item["path"], method, excerpt)] += 1
return baseline
def scan_current_direct_endpoints(root: Path) -> list[dict[str, Any]]:
findings: list[dict[str, Any]] = []
for path in iter_scannable_files(root):
relative_path = path.relative_to(root).as_posix()
text = path.read_text(encoding="utf-8", errors="replace")
for line_number, line in enumerate(text.splitlines(), start=1):
for match in BOT_ENDPOINT_RE.finditer(line):
method = match.group("method")
sanitized = sanitize_excerpt(line)
findings.append(
{
"path": relative_path,
"line": line_number,
"method": method,
"sanitized_excerpt": sanitized,
"signature": signature(relative_path, method, sanitized),
}
)
return findings
def method_counts(findings: list[dict[str, Any]]) -> dict[str, int]:
counts = {method: 0 for method in GUARDED_BOT_METHODS}
for item in findings:
for method in GUARDED_BOT_METHODS:
if item["method"].lower() == method.lower():
counts[method] += 1
break
return counts
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
source_snapshot = load_source_snapshot(root)
baseline = build_baseline(source_snapshot)
current_findings = scan_current_direct_endpoints(root)
remaining_baseline = baseline.copy()
new_bypass_findings: list[dict[str, Any]] = []
for item in current_findings:
item_signature = item["signature"]
if remaining_baseline[item_signature] > 0:
remaining_baseline[item_signature] -= 1
continue
new_bypass_findings.append(item)
removed_baseline_signatures = [
{"signature": item_signature, "removed_count": count}
for item_signature, count in sorted(remaining_baseline.items())
if count > 0
]
current_files = sorted({item["path"] for item in current_findings})
new_bypass_files = sorted({item["path"] for item in new_bypass_findings})
counts_by_method = method_counts(current_findings)
source_summary = source_snapshot["summary"]
return {
"schema_version": "telegram_notification_egress_no_new_bypass_guard_v1",
"generated_at": generated,
"git_commit": git_short_sha(root),
"status": "pass_no_new_bypass" if not new_bypass_findings else "blocked_new_bypass_detected",
"mode": "repo_source_scan_no_secret_value_no_telegram_send",
"source_snapshot": SOURCE_SNAPSHOT.as_posix(),
"guarded_roots": [path.as_posix() for path in SCAN_ROOTS],
"guarded_bot_methods": list(GUARDED_BOT_METHODS),
"summary": {
"source_direct_bot_api_call_count": source_summary["direct_bot_api_call_count"],
"source_direct_bot_api_file_count": source_summary["direct_bot_api_file_count"],
"baseline_signature_count": sum(baseline.values()),
"current_direct_bot_api_call_count": len(current_findings),
"current_direct_bot_api_file_count": len(current_files),
"guarded_method_count": len(GUARDED_BOT_METHODS),
"sendMessage_call_count": counts_by_method["sendMessage"],
"sendDocument_call_count": counts_by_method["sendDocument"],
"sendPhoto_call_count": counts_by_method["sendPhoto"],
"sendMediaGroup_call_count": counts_by_method["sendMediaGroup"],
"editMessageText_call_count": counts_by_method["editMessageText"],
"other_guarded_method_call_count": sum(
count
for method, count in counts_by_method.items()
if method
not in {
"sendMessage",
"sendDocument",
"sendPhoto",
"sendMediaGroup",
"editMessageText",
}
),
"new_bypass_count": len(new_bypass_findings),
"new_bypass_file_count": len(new_bypass_files),
"removed_baseline_call_count": sum(item["removed_count"] for item in removed_baseline_signatures),
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"runtime_execution_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"api_sender_refactor_authorized": False,
"secret_value_collection_allowed": False,
"secret_hash_collection_allowed": False,
"partial_token_collection_allowed": False,
"chat_route_change_authorized": False,
"bot_token_change_authorized": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"current_direct_bot_api_calls": current_findings,
"new_bypass_findings": new_bypass_findings,
"removed_baseline_signatures": removed_baseline_signatures,
"operator_interpretation": [
"new_bypass_count 維持 0 才代表沒有新增未登記 Telegram Bot API 直送旁路。",
"既有 18 個 sendMessage 旁路仍是待 owner response 的基線,不代表已批准或已收斂。",
"sendDocument / sendPhoto / sendMediaGroup 等附件型出口若出現在 repo source會被視為新增旁路並阻擋。",
"本 guard 只讀 repo source 與 committed snapshot不送 Telegram、不讀 Bot token、不修改 workflow / script / API sender。",
],
}
def validate(root: Path) -> None:
report = build_report(root)
errors: list[str] = []
if report["summary"]["new_bypass_count"]:
for item in report["new_bypass_findings"]:
errors.append(
f"{item['path']}:{item['line']}: 新增未登記 Telegram Bot API 旁路 {item['method']}"
)
if errors:
raise SystemExit(
"BLOCKED telegram notification egress no-new-bypass guard:\n"
+ "\n".join(f"- {error}" for error in errors)
)
def main() -> None:
parser = argparse.ArgumentParser(description="檢查 Telegram 通知出口不可新增未登記 direct Bot API 旁路")
parser.add_argument("--root", default=".", help="repository root")
parser.add_argument("--output", help="寫出 JSON 報告")
parser.add_argument("--generated-at", help="固定 generated_at 時間,供 committed snapshot 使用")
args = parser.parse_args()
root = Path(args.root).resolve()
report = build_report(root, args.generated_at)
if args.output:
output = Path(args.output)
output.parent.mkdir(parents=True, exist_ok=True)
output.write_text(json.dumps(report, ensure_ascii=False, indent=2, sort_keys=True) + "\n", encoding="utf-8")
validate(root)
summary = report["summary"]
print(
"TELEGRAM_NOTIFICATION_EGRESS_NO_NEW_BYPASS_GUARD_OK "
f"current={summary['current_direct_bot_api_call_count']} "
f"baseline={summary['baseline_signature_count']} "
f"new={summary['new_bypass_count']} "
f"sendDocument={summary['sendDocument_call_count']} "
f"runtime_gate={summary['runtime_gate_count']}"
)
if __name__ == "__main__":
sys.exit(main())

View File

@@ -0,0 +1,388 @@
#!/usr/bin/env python3
"""建立 Telegram 通知出口 owner response 驗收帳本。
此帳本把 Telegram 通知出口 owner request 草稿與 migration plan 草稿轉成
reviewer 可驗收的候選項。它不送 Telegram、不呼叫 Bot API、不讀 secret
也不修改 workflow、script、API sender、runtime config 或 production。
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
OWNER_REQUEST_SNAPSHOT = Path("docs/security/telegram-notification-egress-owner-request-draft.snapshot.json")
MIGRATION_PLAN_SNAPSHOT = Path("docs/security/telegram-notification-egress-migration-plan-draft.snapshot.json")
ACCEPTANCE_FIELDS = [
"acceptance_candidate_id",
"source_request_draft_id",
"source_migration_candidate_id",
"source_path",
"surface_kind",
"direct_call_count",
"proposed_wave",
"proposed_target",
"owner_response_ref",
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"message_shape_contract_ref",
"redaction_contract_ref",
"formatter_convergence_decision",
"gateway_or_alertmanager_target",
"break_glass_fallback_decision",
"delivery_receipt_ref",
"dedup_or_fingerprint_plan",
"fallback_or_degraded_mode",
"migration_or_exception_reason",
"maintenance_window",
"rollback_owner",
"postcheck_evidence_ref",
"no_secret_value_attestation",
"no_raw_payload_attestation",
"no_false_green_attestation",
"reviewer_outcome",
"followup_owner",
"not_authorization",
]
REVIEWER_CHECKS = [
"source_owner_request_current",
"source_migration_plan_current",
"owner_identity_present",
"decision_reason_present",
"affected_scope_matches_source",
"redacted_refs_only",
"no_secret_or_token_value",
"no_raw_message_payload",
"message_shape_contract_present",
"redaction_contract_present",
"formatter_convergence_explicit",
"gateway_or_alertmanager_target_valid",
"break_glass_fallback_explicit",
"delivery_receipt_metadata_only",
"dedup_or_fingerprint_present",
"maintenance_window_present",
"rollback_owner_present",
"postcheck_evidence_present",
"no_false_green_attested",
"migration_authorization_separate",
"counts_transition_safe",
"runtime_gate_stays_zero",
]
OUTCOME_LANES = [
"waiting_owner_response",
"quarantine_secret_or_raw_payload",
"reject_execution_request",
"request_owner_route_supplement",
"request_formatter_convergence_supplement",
"request_redaction_or_receipt_supplement",
"request_maintenance_or_rollback_supplement",
"ready_for_migration_review",
"owner_review_only_update",
"waiting_runtime_gate",
]
FORBIDDEN_PAYLOADS = [
"bot_token_value",
"chat_secret_value",
"secret_hash",
"partial_token",
"masked_token",
"authorization_header",
"raw_message_payload",
"raw_workflow_log",
"raw_action_log",
"raw_screenshot_with_secret",
"internal_work_window_transcript",
"private_namespace",
"unredacted_internal_path",
"unredacted_private_ip",
]
BLOCKED_ACTIONS = [
"mark_owner_response_received_without_record",
"mark_owner_response_accepted_without_reviewer_record",
"send_telegram",
"call_bot_api",
"modify_workflow",
"modify_ops_script",
"refactor_api_sender",
"dispatch_workflow",
"trigger_cd",
"deploy_production",
"change_chat_route",
"change_bot_token",
"rotate_secret",
"read_secret_store",
"collect_secret_value",
"collect_secret_hash",
"collect_partial_token",
"collect_chat_id_secret",
"store_raw_message_payload",
"store_unredacted_log",
"store_internal_work_window_transcript",
"accept_cd_success_as_delivery_receipt",
"accept_route_200_as_notification_delivery",
"accept_ui_visible_as_notification_acceptance",
"accept_telegram_sent_without_delivery_receipt",
"skip_formatter_convergence",
"skip_redaction_contract",
"skip_dedup_or_fingerprint_review",
"skip_break_glass_fallback_review",
"authorize_migration",
"authorize_workflow_modification",
"authorize_script_modification",
"authorize_api_sender_refactor",
"open_runtime_gate",
"add_action_button",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def build_candidate(request: dict[str, Any], migration: dict[str, Any]) -> dict[str, Any]:
return {
"acceptance_candidate_id": f"telegram_notification_egress_owner_response_acceptance:{request['source_path']}",
"status": "waiting_owner_response",
"source_request_draft_id": request["request_draft_id"],
"source_migration_candidate_id": migration["migration_candidate_id"],
"source_path": request["source_path"],
"surface_kind": request["surface_kind"],
"direct_call_count": request["direct_call_count"],
"line_refs": request["line_refs"],
"line_hash_refs": request["line_hash_refs"],
"proposed_wave": migration["proposed_wave"],
"proposed_target": migration["proposed_target"],
"proposed_change_summary": migration["proposed_change_summary"],
"owner_response_ref": None,
"owner_role_or_team": "pending_owner_response",
"decision": "pending_owner_response",
"decision_reason": "pending_owner_response",
"affected_scope": "pending_owner_response",
"redacted_evidence_refs": [],
"message_shape_contract_ref": None,
"redaction_contract_ref": None,
"formatter_convergence_decision": "pending_owner_response",
"gateway_or_alertmanager_target": "pending_owner_response",
"break_glass_fallback_decision": "pending_owner_response",
"delivery_receipt_ref": None,
"dedup_or_fingerprint_plan": "pending_owner_response",
"fallback_or_degraded_mode": "pending_owner_response",
"migration_or_exception_reason": "pending_owner_response",
"maintenance_window": "pending_owner_response",
"rollback_owner": "pending_owner_response",
"postcheck_evidence_ref": None,
"no_secret_value_attestation": "pending_owner_response",
"no_raw_payload_attestation": "pending_owner_response",
"no_false_green_attestation": "pending_owner_response",
"reviewer_outcome": "waiting_owner_response",
"followup_owner": "pending_owner_response",
"acceptance_fields": ACCEPTANCE_FIELDS,
"required_owner_fields": request["required_owner_fields"],
"reviewer_checks": REVIEWER_CHECKS,
"outcome_lanes": OUTCOME_LANES,
"forbidden_payloads": FORBIDDEN_PAYLOADS,
"blocked_actions": BLOCKED_ACTIONS,
"not_authorization": True,
"request_sent": False,
"recipient_confirmed": False,
"audit_event_emitted": False,
"owner_response_received": False,
"owner_response_accepted": False,
"owner_response_rejected": False,
"owner_response_quarantined": False,
"supplement_requested": False,
"formatter_convergence_accepted": False,
"redaction_contract_accepted": False,
"delivery_receipt_accepted": False,
"break_glass_fallback_accepted": False,
"maintenance_window_accepted": False,
"rollback_owner_accepted": False,
"postcheck_evidence_accepted": False,
"dedup_or_fingerprint_accepted": False,
"no_false_green_accepted": False,
"direct_bot_api_migration_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"api_sender_refactor_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_dispatch_authorized": False,
"production_deploy_authorized": False,
"secret_value_collection_allowed": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"runtime_gate": False,
"action_buttons_allowed": False,
}
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
owner_request = load_json(root / OWNER_REQUEST_SNAPSHOT)
migration_plan = load_json(root / MIGRATION_PLAN_SNAPSHOT)
migration_by_request_id = {
item["source_request_draft_id"]: item for item in migration_plan["migration_candidates"]
}
candidates = [
build_candidate(request, migration_by_request_id[request["request_draft_id"]])
for request in owner_request["request_drafts"]
]
workflow = [item for item in candidates if item["surface_kind"] == "gitea_workflow_direct_bot_api"]
ops = [item for item in candidates if item["surface_kind"] == "ops_script_direct_bot_api"]
api = [item for item in candidates if item["surface_kind"] == "api_direct_bot_api"]
return {
"schema_version": "telegram_notification_egress_owner_response_acceptance_v1",
"generated_at": generated,
"git_commit": git_short_sha(root),
"status": "owner_response_acceptance_ledger_ready_no_runtime_action",
"mode": "metadata_only_no_secret_value_no_telegram_send_no_workflow_script_api_change",
"source_owner_request_snapshot": OWNER_REQUEST_SNAPSHOT.as_posix(),
"source_owner_request_schema_version": owner_request["schema_version"],
"source_owner_request_status": owner_request["status"],
"source_migration_plan_snapshot": MIGRATION_PLAN_SNAPSHOT.as_posix(),
"source_migration_plan_schema_version": migration_plan["schema_version"],
"source_migration_plan_status": migration_plan["status"],
"summary": {
"source_request_draft_count": owner_request["summary"]["request_draft_count"],
"source_migration_candidate_count": migration_plan["summary"]["migration_candidate_count"],
"source_direct_bot_api_call_count": owner_request["summary"]["source_direct_bot_api_call_count"],
"acceptance_candidate_count": len(candidates),
"workflow_acceptance_candidate_count": len(workflow),
"ops_script_acceptance_candidate_count": len(ops),
"api_direct_acceptance_candidate_count": len(api),
"acceptance_field_count": len(ACCEPTANCE_FIELDS),
"required_owner_field_count": len(owner_request["request_drafts"][0]["required_owner_fields"]),
"reviewer_check_count": len(REVIEWER_CHECKS),
"outcome_lane_count": len(OUTCOME_LANES),
"forbidden_payload_count": len(FORBIDDEN_PAYLOADS),
"blocked_action_count": len(BLOCKED_ACTIONS),
"request_sent_count": 0,
"recipient_confirmed_count": 0,
"audit_event_emitted_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"owner_response_rejected_count": 0,
"owner_response_quarantined_count": 0,
"supplement_requested_count": 0,
"formatter_convergence_accepted_count": 0,
"redaction_contract_accepted_count": 0,
"delivery_receipt_accepted_count": 0,
"break_glass_fallback_accepted_count": 0,
"maintenance_window_accepted_count": 0,
"rollback_owner_accepted_count": 0,
"postcheck_evidence_accepted_count": 0,
"dedup_or_fingerprint_accepted_count": 0,
"no_false_green_accepted_count": 0,
"direct_bot_api_migration_authorized_count": 0,
"workflow_modification_authorized_count": 0,
"script_modification_authorized_count": 0,
"api_sender_refactor_authorized_count": 0,
"telegram_send_authorized_count": 0,
"bot_api_call_authorized_count": 0,
"workflow_dispatch_authorized_count": 0,
"production_deploy_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"raw_payload_storage_allowed_count": 0,
"production_write_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"runtime_execution_authorized": False,
"owner_response_mark_received_authorized": False,
"owner_response_mark_accepted_authorized": False,
"direct_bot_api_migration_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"api_sender_refactor_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_dispatch_authorized": False,
"production_deploy_authorized": False,
"secret_value_collection_allowed": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"acceptance_candidates": candidates,
"operator_interpretation": [
"此帳本只是 reviewer 驗收模板owner response received / accepted 仍維持 0。",
"CD success、route 200、UI 可見或 Telegram sent 狀態本身都不是 delivery receipt。",
"workflow、script 與 API sender 收斂仍需獨立 runtime approval 與 change evidence。",
],
}
def validate(root: Path) -> None:
report = build_report(root)
summary = report["summary"]
if summary["acceptance_candidate_count"] != summary["source_request_draft_count"]:
raise SystemExit("BLOCKED telegram egress owner response acceptance: candidate/request count mismatch")
if summary["acceptance_candidate_count"] != summary["source_migration_candidate_count"]:
raise SystemExit("BLOCKED telegram egress owner response acceptance: candidate/migration count mismatch")
if summary["runtime_gate_count"] != 0:
raise SystemExit("BLOCKED telegram egress owner response acceptance: runtime gate must stay 0")
def main() -> None:
parser = argparse.ArgumentParser(description="建立 Telegram 通知出口 owner response 驗收帳本")
parser.add_argument("--root", default=".", help="repository root")
parser.add_argument("--output", help="write JSON snapshot")
parser.add_argument("--generated-at", help="fixed generated_at timestamp")
args = parser.parse_args()
root = Path(args.root).resolve()
report = build_report(root, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2) + "\n"
if args.output:
Path(args.output).write_text(payload, encoding="utf-8")
else:
sys.stdout.write(payload)
print(
"TELEGRAM_NOTIFICATION_EGRESS_OWNER_RESPONSE_ACCEPTANCE_OK "
f"candidates={report['summary']['acceptance_candidate_count']} "
f"workflow={report['summary']['workflow_acceptance_candidate_count']} "
f"ops={report['summary']['ops_script_acceptance_candidate_count']} "
f"api={report['summary']['api_direct_acceptance_candidate_count']} "
f"accepted={report['summary']['owner_response_accepted_count']} "
f"runtime_gate={report['summary']['runtime_gate_count']}",
file=sys.stderr,
)
if __name__ == "__main__":
main()