Files
awoooi/scripts/security/telegram-notification-egress-owner-response-acceptance.py
Your Name b191f8e9fe
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 2m6s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped
fix(telegram): close ops direct sender gaps
2026-07-02 19:32:36 +08:00

404 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""建立 Telegram 通知出口 owner response 驗收帳本。
此帳本把 Telegram 通知出口 owner request 草稿與 migration plan 草稿轉成
reviewer 可驗收的候選項。它不送 Telegram、不呼叫 Bot API、不讀 secret
也不修改 workflow、script、API sender、runtime config 或 production。
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from typing import Any
TAIPEI = timezone(timedelta(hours=8))
OWNER_REQUEST_SNAPSHOT = Path("docs/security/telegram-notification-egress-owner-request-draft.snapshot.json")
MIGRATION_PLAN_SNAPSHOT = Path("docs/security/telegram-notification-egress-migration-plan-draft.snapshot.json")
MESSAGE_READABILITY_GUARD_SNAPSHOT = Path("docs/security/telegram-alert-readability-guard.snapshot.json")
ACCEPTANCE_FIELDS = [
"acceptance_candidate_id",
"source_request_draft_id",
"source_migration_candidate_id",
"source_path",
"surface_kind",
"direct_call_count",
"proposed_wave",
"proposed_target",
"owner_response_ref",
"owner_role_or_team",
"decision",
"decision_reason",
"affected_scope",
"redacted_evidence_refs",
"message_shape_contract_ref",
"message_readability_guard_ref",
"redaction_contract_ref",
"formatter_convergence_decision",
"gateway_or_alertmanager_target",
"break_glass_fallback_decision",
"delivery_receipt_ref",
"dedup_or_fingerprint_plan",
"fallback_or_degraded_mode",
"migration_or_exception_reason",
"maintenance_window",
"rollback_owner",
"postcheck_evidence_ref",
"no_secret_value_attestation",
"no_raw_payload_attestation",
"no_false_green_attestation",
"reviewer_outcome",
"followup_owner",
"not_authorization",
]
REVIEWER_CHECKS = [
"source_owner_request_current",
"source_migration_plan_current",
"owner_identity_present",
"decision_reason_present",
"affected_scope_matches_source",
"redacted_refs_only",
"no_secret_or_token_value",
"no_raw_message_payload",
"message_shape_contract_present",
"message_readability_guard_present",
"redaction_contract_present",
"formatter_convergence_explicit",
"gateway_or_alertmanager_target_valid",
"break_glass_fallback_explicit",
"delivery_receipt_metadata_only",
"dedup_or_fingerprint_present",
"maintenance_window_present",
"rollback_owner_present",
"postcheck_evidence_present",
"no_false_green_attested",
"migration_authorization_separate",
"counts_transition_safe",
"runtime_gate_stays_zero",
]
OUTCOME_LANES = [
"waiting_owner_response",
"quarantine_secret_or_raw_payload",
"reject_execution_request",
"request_owner_route_supplement",
"request_formatter_convergence_supplement",
"request_redaction_or_receipt_supplement",
"request_maintenance_or_rollback_supplement",
"ready_for_migration_review",
"owner_review_only_update",
"waiting_runtime_gate",
]
FORBIDDEN_PAYLOADS = [
"bot_token_value",
"chat_secret_value",
"secret_hash",
"partial_token",
"masked_token",
"authorization_header",
"raw_message_payload",
"raw_workflow_log",
"raw_action_log",
"raw_screenshot_with_secret",
"internal_work_window_transcript",
"private_namespace",
"unredacted_internal_path",
"unredacted_private_ip",
]
BLOCKED_ACTIONS = [
"mark_owner_response_received_without_record",
"mark_owner_response_accepted_without_reviewer_record",
"send_telegram",
"call_bot_api",
"modify_workflow",
"modify_ops_script",
"refactor_api_sender",
"dispatch_workflow",
"trigger_cd",
"deploy_production",
"change_chat_route",
"change_bot_token",
"rotate_secret",
"read_secret_store",
"collect_secret_value",
"collect_secret_hash",
"collect_partial_token",
"collect_chat_id_secret",
"store_raw_message_payload",
"store_unredacted_log",
"store_internal_work_window_transcript",
"accept_cd_success_as_delivery_receipt",
"accept_route_200_as_notification_delivery",
"accept_ui_visible_as_notification_acceptance",
"accept_telegram_sent_without_delivery_receipt",
"skip_formatter_convergence",
"skip_redaction_contract",
"skip_dedup_or_fingerprint_review",
"skip_break_glass_fallback_review",
"authorize_migration",
"authorize_workflow_modification",
"authorize_script_modification",
"authorize_api_sender_refactor",
"open_runtime_gate",
"add_action_button",
]
def git_short_sha(root: Path) -> str:
try:
result = subprocess.run(
["git", "rev-parse", "--short", "HEAD"],
cwd=root,
check=True,
capture_output=True,
text=True,
)
return result.stdout.strip()
except Exception:
return "unknown"
def load_json(path: Path) -> dict[str, Any]:
return json.loads(path.read_text(encoding="utf-8"))
def build_candidate(request: dict[str, Any], migration: dict[str, Any]) -> dict[str, Any]:
return {
"acceptance_candidate_id": f"telegram_notification_egress_owner_response_acceptance:{request['source_path']}",
"status": "waiting_owner_response",
"source_request_draft_id": request["request_draft_id"],
"source_migration_candidate_id": migration["migration_candidate_id"],
"source_path": request["source_path"],
"surface_kind": request["surface_kind"],
"direct_call_count": request["direct_call_count"],
"line_refs": request["line_refs"],
"line_hash_refs": request["line_hash_refs"],
"proposed_wave": migration["proposed_wave"],
"proposed_target": migration["proposed_target"],
"proposed_change_summary": migration["proposed_change_summary"],
"owner_response_ref": None,
"owner_role_or_team": "pending_owner_response",
"decision": "pending_owner_response",
"decision_reason": "pending_owner_response",
"affected_scope": "pending_owner_response",
"redacted_evidence_refs": [],
"message_shape_contract_ref": None,
"message_readability_guard_ref": MESSAGE_READABILITY_GUARD_SNAPSHOT.as_posix(),
"redaction_contract_ref": None,
"formatter_convergence_decision": "pending_owner_response",
"gateway_or_alertmanager_target": "pending_owner_response",
"break_glass_fallback_decision": "pending_owner_response",
"delivery_receipt_ref": None,
"dedup_or_fingerprint_plan": "pending_owner_response",
"fallback_or_degraded_mode": "pending_owner_response",
"migration_or_exception_reason": "pending_owner_response",
"maintenance_window": "pending_owner_response",
"rollback_owner": "pending_owner_response",
"postcheck_evidence_ref": None,
"no_secret_value_attestation": "pending_owner_response",
"no_raw_payload_attestation": "pending_owner_response",
"no_false_green_attestation": "pending_owner_response",
"reviewer_outcome": "waiting_owner_response",
"followup_owner": "pending_owner_response",
"acceptance_fields": ACCEPTANCE_FIELDS,
"required_owner_fields": request["required_owner_fields"],
"reviewer_checks": REVIEWER_CHECKS,
"outcome_lanes": OUTCOME_LANES,
"forbidden_payloads": FORBIDDEN_PAYLOADS,
"blocked_actions": BLOCKED_ACTIONS,
"not_authorization": True,
"request_sent": False,
"recipient_confirmed": False,
"audit_event_emitted": False,
"owner_response_received": False,
"owner_response_accepted": False,
"owner_response_rejected": False,
"owner_response_quarantined": False,
"supplement_requested": False,
"formatter_convergence_accepted": False,
"redaction_contract_accepted": False,
"delivery_receipt_accepted": False,
"break_glass_fallback_accepted": False,
"maintenance_window_accepted": False,
"rollback_owner_accepted": False,
"postcheck_evidence_accepted": False,
"dedup_or_fingerprint_accepted": False,
"no_false_green_accepted": False,
"direct_bot_api_migration_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"api_sender_refactor_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_dispatch_authorized": False,
"production_deploy_authorized": False,
"secret_value_collection_allowed": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"runtime_gate": False,
"action_buttons_allowed": False,
}
def build_report(root: Path, generated_at: str | None = None) -> dict[str, Any]:
generated = generated_at or datetime.now(TAIPEI).isoformat(timespec="seconds")
owner_request = load_json(root / OWNER_REQUEST_SNAPSHOT)
migration_plan = load_json(root / MIGRATION_PLAN_SNAPSHOT)
migration_by_request_id = {
item["source_request_draft_id"]: item for item in migration_plan["migration_candidates"]
}
candidates = [
build_candidate(request, migration_by_request_id[request["request_draft_id"]])
for request in owner_request["request_drafts"]
]
workflow = [item for item in candidates if item["surface_kind"] == "gitea_workflow_direct_bot_api"]
ops = [item for item in candidates if item["surface_kind"] == "ops_script_direct_bot_api"]
api = [item for item in candidates if item["surface_kind"] == "api_direct_bot_api"]
required_owner_field_count = (
len(owner_request["request_drafts"][0]["required_owner_fields"])
if owner_request["request_drafts"]
else 0
)
return {
"schema_version": "telegram_notification_egress_owner_response_acceptance_v1",
"generated_at": generated,
"git_commit": git_short_sha(root),
"status": (
"owner_response_acceptance_ledger_clear_no_direct_candidates"
if not candidates
else "owner_response_acceptance_ledger_ready_no_runtime_action"
),
"mode": "metadata_only_no_secret_value_no_telegram_send_no_workflow_script_api_change",
"source_owner_request_snapshot": OWNER_REQUEST_SNAPSHOT.as_posix(),
"source_owner_request_schema_version": owner_request["schema_version"],
"source_owner_request_status": owner_request["status"],
"source_migration_plan_snapshot": MIGRATION_PLAN_SNAPSHOT.as_posix(),
"source_migration_plan_schema_version": migration_plan["schema_version"],
"source_migration_plan_status": migration_plan["status"],
"message_readability_guard_snapshot": MESSAGE_READABILITY_GUARD_SNAPSHOT.as_posix(),
"summary": {
"source_request_draft_count": owner_request["summary"]["request_draft_count"],
"source_migration_candidate_count": migration_plan["summary"]["migration_candidate_count"],
"source_direct_bot_api_call_count": owner_request["summary"]["source_direct_bot_api_call_count"],
"acceptance_candidate_count": len(candidates),
"workflow_acceptance_candidate_count": len(workflow),
"ops_script_acceptance_candidate_count": len(ops),
"api_direct_acceptance_candidate_count": len(api),
"acceptance_field_count": len(ACCEPTANCE_FIELDS),
"required_owner_field_count": required_owner_field_count,
"reviewer_check_count": len(REVIEWER_CHECKS),
"outcome_lane_count": len(OUTCOME_LANES),
"forbidden_payload_count": len(FORBIDDEN_PAYLOADS),
"blocked_action_count": len(BLOCKED_ACTIONS),
"request_sent_count": 0,
"recipient_confirmed_count": 0,
"audit_event_emitted_count": 0,
"owner_response_received_count": 0,
"owner_response_accepted_count": 0,
"owner_response_rejected_count": 0,
"owner_response_quarantined_count": 0,
"supplement_requested_count": 0,
"formatter_convergence_accepted_count": 0,
"redaction_contract_accepted_count": 0,
"delivery_receipt_accepted_count": 0,
"break_glass_fallback_accepted_count": 0,
"maintenance_window_accepted_count": 0,
"rollback_owner_accepted_count": 0,
"postcheck_evidence_accepted_count": 0,
"dedup_or_fingerprint_accepted_count": 0,
"no_false_green_accepted_count": 0,
"direct_bot_api_migration_authorized_count": 0,
"workflow_modification_authorized_count": 0,
"script_modification_authorized_count": 0,
"api_sender_refactor_authorized_count": 0,
"telegram_send_authorized_count": 0,
"bot_api_call_authorized_count": 0,
"workflow_dispatch_authorized_count": 0,
"production_deploy_authorized_count": 0,
"secret_value_collection_allowed_count": 0,
"raw_payload_storage_allowed_count": 0,
"production_write_authorized_count": 0,
"runtime_gate_count": 0,
"action_button_count": 0,
},
"execution_boundaries": {
"runtime_execution_authorized": False,
"owner_response_mark_received_authorized": False,
"owner_response_mark_accepted_authorized": False,
"direct_bot_api_migration_authorized": False,
"workflow_modification_authorized": False,
"script_modification_authorized": False,
"api_sender_refactor_authorized": False,
"telegram_send_authorized": False,
"bot_api_call_authorized": False,
"workflow_dispatch_authorized": False,
"production_deploy_authorized": False,
"secret_value_collection_allowed": False,
"raw_payload_storage_allowed": False,
"production_write_authorized": False,
"action_buttons_allowed": False,
"not_authorization": True,
},
"acceptance_candidates": candidates,
"operator_interpretation": [
"此帳本只是 reviewer 驗收模板owner response received / accepted 仍維持 0。",
"每個 direct egress candidate 都必須引用 Telegram 告警可讀性 guardmigration review 不得繞過卡片化、脫敏與 runtime_write_gate=0。",
"CD success、route 200、UI 可見或 Telegram sent 狀態本身都不是 delivery receipt。",
"workflow、script 與 API sender 收斂仍需獨立 runtime approval 與 change evidence。",
],
}
def validate(root: Path) -> None:
report = build_report(root)
summary = report["summary"]
if summary["acceptance_candidate_count"] != summary["source_request_draft_count"]:
raise SystemExit("BLOCKED telegram egress owner response acceptance: candidate/request count mismatch")
if summary["acceptance_candidate_count"] != summary["source_migration_candidate_count"]:
raise SystemExit("BLOCKED telegram egress owner response acceptance: candidate/migration count mismatch")
if summary["runtime_gate_count"] != 0:
raise SystemExit("BLOCKED telegram egress owner response acceptance: runtime gate must stay 0")
def main() -> None:
parser = argparse.ArgumentParser(description="建立 Telegram 通知出口 owner response 驗收帳本")
parser.add_argument("--root", default=".", help="repository root")
parser.add_argument("--output", help="write JSON snapshot")
parser.add_argument("--generated-at", help="fixed generated_at timestamp")
args = parser.parse_args()
root = Path(args.root).resolve()
report = build_report(root, args.generated_at)
payload = json.dumps(report, ensure_ascii=False, indent=2) + "\n"
if args.output:
Path(args.output).write_text(payload, encoding="utf-8")
else:
sys.stdout.write(payload)
print(
"TELEGRAM_NOTIFICATION_EGRESS_OWNER_RESPONSE_ACCEPTANCE_OK "
f"candidates={report['summary']['acceptance_candidate_count']} "
f"workflow={report['summary']['workflow_acceptance_candidate_count']} "
f"ops={report['summary']['ops_script_acceptance_candidate_count']} "
f"api={report['summary']['api_direct_acceptance_candidate_count']} "
f"accepted={report['summary']['owner_response_accepted_count']} "
f"runtime_gate={report['summary']['runtime_gate_count']}",
file=sys.stderr,
)
if __name__ == "__main__":
main()