fix(agents): batch monitoring live receipt gaps
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Successful in 58s
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / build-and-deploy (push) Has been cancelled

This commit is contained in:
Your Name
2026-07-03 10:31:07 +08:00
parent 647d811631
commit fabe9d5cb6
5 changed files with 277 additions and 21 deletions

View File

@@ -322,6 +322,9 @@ def build_telegram_alert_monitoring_coverage_readback(
1 for stage in alert_receipt_pipeline if stage["ready"]
),
"ai_controlled_gap_queue_count": len(ai_controlled_gap_queue),
"ai_controlled_live_receipt_batch_count": _int(
monitoring_asset_rollups.get("live_receipt_work_batch_count")
),
"required_tag_dimension_count": len(_REQUIRED_TAG_DIMENSIONS),
"monitoring_asset_config_kind_count": len(
monitoring_asset_rollups["by_config_kind"]
@@ -333,6 +336,9 @@ def build_telegram_alert_monitoring_coverage_readback(
},
"required_tag_dimensions": _required_tag_dimensions(),
"monitoring_asset_rollups": monitoring_asset_rollups,
"ai_controlled_live_receipt_batches": _list_of_dicts(
monitoring_asset_rollups.get("live_receipt_work_batches")
),
"runtime_event_type_counts_7d": runtime_event_type_counts,
"alert_receipt_pipeline": alert_receipt_pipeline,
"work_item_progress": work_item_progress,
@@ -555,6 +561,7 @@ def _monitoring_asset_rollups(
and not bool(item.get("post_incident_readback_accepted"))
and not bool(item.get("live_evidence_received"))
][:12]
live_receipt_work_batches = _monitoring_live_receipt_work_batches(inventory_source)
return {
"surface_count": len(surfaces) or len(inventory_source),
"readback_candidate_count": len(readback_candidates),
@@ -575,6 +582,8 @@ def _monitoring_asset_rollups(
"by_reviewer_outcome": _count_by_key(inventory_source, "reviewer_outcome"),
"by_expected_scope_prefix": _count_by_expected_scope_prefix(inventory_source),
"live_receipt_gap_samples": live_receipt_gap_samples,
"live_receipt_work_batch_count": len(live_receipt_work_batches),
"live_receipt_work_batches": live_receipt_work_batches,
}
@@ -609,6 +618,170 @@ def _monitoring_gap_sample(item: Mapping[str, Any]) -> dict[str, Any]:
}
def _monitoring_live_receipt_work_batches(
inventory_source: list[dict[str, Any]],
) -> list[dict[str, Any]]:
gap_items = [
item
for item in inventory_source
if bool(item.get("requires_live_evidence"))
and not bool(item.get("post_incident_readback_accepted"))
and not bool(item.get("live_evidence_received"))
]
grouped: dict[str, list[dict[str, Any]]] = {}
for item in gap_items:
grouped.setdefault(_monitoring_receipt_domain(item), []).append(item)
batches: list[dict[str, Any]] = []
for index, (domain, items) in enumerate(sorted(grouped.items()), start=1):
missing_fields = sorted(
{
field
for item in items
for field in _monitoring_gap_sample(item)["missing_receipt_fields"]
}
)
source_paths = sorted(
{
str(item.get("repo_source_path") or item.get("source_path") or "")
for item in items
if item.get("repo_source_path") or item.get("source_path")
}
)
write_capable_count = sum(
1 for item in items if bool(item.get("write_capable_surface"))
)
batches.append({
"batch_id": f"tg_live_receipt_{index:02d}_{domain}",
"domain": domain,
"priority": "P0" if write_capable_count or index <= 6 else "P1",
"surface_count": len(items),
"write_capable_surface_count": write_capable_count,
"requires_live_evidence_count": sum(
1 for item in items if bool(item.get("requires_live_evidence"))
),
"missing_receipt_fields": missing_fields,
"tag_hints": _monitoring_batch_tag_hints(domain, items),
"target_selector": {
"surface_ids": [str(item.get("surface_id") or "") for item in items],
"config_kinds": sorted(
{str(item.get("config_kind") or "unknown") for item in items}
),
"control_tiers": sorted(
{str(item.get("control_tier") or "unknown") for item in items}
),
"expected_scope_prefixes": sorted(
{
str(item.get("expected_scope") or "unknown").split("_", 1)[0]
for item in items
}
),
"repo_source_paths": source_paths[:12],
},
"controlled_next_action": (
"ingest_metadata_only_live_receipt_batch_then_verify_gap_delta"
),
"post_verifier": "/api/v1/agents/telegram-alert-monitoring-coverage-readback",
"sample_surfaces": [_monitoring_gap_sample(item) for item in items[:4]],
})
return batches
def _monitoring_receipt_domain(item: Mapping[str, Any]) -> str:
config_kind = str(item.get("config_kind") or "").lower()
expected_scope = str(item.get("expected_scope") or "").lower()
source_path = str(
item.get("repo_source_path") or item.get("source_path") or ""
).lower()
haystack = f"{config_kind} {expected_scope} {source_path}"
for domain, needles in (
("telegram", ("telegram",)),
("alertmanager", ("alertmanager", "webhook_receiver")),
("grafana", ("grafana",)),
("signoz", ("signoz",)),
("sentry", ("sentry",)),
("langfuse", ("langfuse",)),
("otel", ("otel", "opentelemetry")),
("prometheus", ("prometheus", "scrape")),
("notification", ("notification", "router", "policy")),
("runtime_service", ("runtime", "compose", "service", "host", "k3s")),
("smoke", ("smoke", "health", "readiness")),
("drift_guard", ("drift", "recurrence")),
("ai_agent", ("ai_", "agent", "openclaw")),
):
if any(needle in haystack for needle in needles):
return domain
return "monitoring_general"
def _monitoring_batch_tag_hints(
domain: str,
items: list[dict[str, Any]],
) -> dict[str, Any]:
config_kinds = sorted({str(item.get("config_kind") or "unknown") for item in items})
expected_scope_prefixes = sorted(
{str(item.get("expected_scope") or "unknown").split("_", 1)[0] for item in items}
)
return {
"project_id": DEFAULT_PROJECT_ID,
"product_id": _monitoring_product_hint(domain, config_kinds),
"website_id": "awoooi.wooo.work",
"service_name": _monitoring_service_hint(domain),
"package_name": _monitoring_package_hint(domain),
"tool_id": domain,
"log_source": "monitoring_live_receipt_metadata",
"alertname": "monitoring_surface_live_receipt_gap",
"playbook_id": f"playbook://awoooi/monitoring-live-receipt/{domain}",
"rag_context_ref": f"rag://awoooi/monitoring/{domain}",
"mcp_evidence_ref": f"mcp://awoooi/monitoring/{domain}",
"schedule_id": f"schedule://awoooi/monitoring-live-receipt/{domain}",
"config_kinds": config_kinds,
"expected_scope_prefixes": expected_scope_prefixes,
}
def _monitoring_product_hint(domain: str, config_kinds: list[str]) -> str:
if domain in {"telegram", "notification", "alertmanager"}:
return "awooop"
if any("langfuse" in kind for kind in config_kinds):
return "ai-observability"
if domain in {"sentry", "signoz", "grafana", "prometheus", "otel"}:
return "awoooi-observability"
return "awoooi"
def _monitoring_service_hint(domain: str) -> str:
return {
"telegram": "telegram-gateway",
"notification": "notification-router",
"alertmanager": "alertmanager-webhook",
"prometheus": "prometheus",
"grafana": "grafana",
"signoz": "signoz",
"sentry": "sentry",
"langfuse": "langfuse",
"otel": "otel-collector",
"runtime_service": "runtime-service-monitoring",
"smoke": "smoke-verifier",
"drift_guard": "recurrence-guard",
"ai_agent": "ai-agent-loop",
}.get(domain, "monitoring")
def _monitoring_package_hint(domain: str) -> str:
return {
"prometheus": "prometheus-config",
"grafana": "grafana-dashboard",
"signoz": "signoz-rules",
"sentry": "sentry-runtime",
"langfuse": "langfuse-runtime",
"otel": "otel-collector-config",
"telegram": "telegram-alert-delivery",
"notification": "notification-policy",
"alertmanager": "alertmanager-receiver",
}.get(domain, "monitoring-receipt")
def _alert_receipt_pipeline(
*,
monitoring_surface_count: int,
@@ -730,8 +903,43 @@ def _ai_controlled_gap_queue(
monitoring_asset_rollups: Mapping[str, Any],
) -> list[dict[str, Any]]:
gap_samples = _list_of_dicts(monitoring_asset_rollups.get("live_receipt_gap_samples"))
work_batches = _list_of_dicts(monitoring_asset_rollups.get("live_receipt_work_batches"))
queue: list[dict[str, Any]] = []
for index, blocker in enumerate(active_blockers, start=1):
if blocker.startswith("monitoring_live_receipt_gap") and work_batches:
for batch_index, batch in enumerate(work_batches, start=1):
queue.append({
"work_item_id": f"CIR-P0-TG-001-LR-{batch_index:02d}",
"parent_work_item_id": "CIR-P0-TG-001",
"priority": str(batch.get("priority") or "P1"),
"blocker": blocker,
"status": "queued_ai_controlled_apply",
"owner_review_required_for_low_medium_high": False,
"critical_break_glass_required": True,
"batch_id": str(batch.get("batch_id") or ""),
"domain": str(batch.get("domain") or "monitoring_general"),
"surface_count": _int(batch.get("surface_count")),
"write_capable_surface_count": _int(
batch.get("write_capable_surface_count")
),
"tag_hints": _dict(batch.get("tag_hints")),
"target_selector": _dict(batch.get("target_selector")),
"controlled_next_action": str(
batch.get("controlled_next_action")
or "ingest_metadata_only_live_receipt_batch_then_verify_gap_delta"
),
"post_verifier": (
"/api/v1/agents/telegram-alert-monitoring-coverage-readback"
),
"sample_surfaces": _list_of_dicts(batch.get("sample_surfaces")),
"operation_boundaries": {
"requires_secret": False,
"requires_runtime_send": False,
"stores_raw_payload": False,
"destructive_action": False,
},
})
continue
queue.append({
"work_item_id": f"CIR-P0-TG-001-{index:02d}",
"parent_work_item_id": "CIR-P0-TG-001",

View File

@@ -205,7 +205,8 @@ def test_telegram_alert_monitoring_coverage_readback_surfaces_live_gaps():
assert summary["runtime_event_type_with_receipts_count"] == 4
assert summary["alert_receipt_pipeline_stage_count"] == 7
assert summary["alert_receipt_pipeline_ready_count"] == 6
assert summary["ai_controlled_gap_queue_count"] == 1
assert summary["ai_controlled_gap_queue_count"] == 2
assert summary["ai_controlled_live_receipt_batch_count"] == 2
assert summary["required_tag_dimension_count"] == 12
assert summary["monitoring_asset_config_kind_count"] == 2
assert summary["telegram_monitoring_audit_completion_percent"] == 88.9
@@ -224,6 +225,14 @@ def test_telegram_alert_monitoring_coverage_readback_surfaces_live_gaps():
assert asset_rollups["live_receipt_gap_samples"][0]["surface_id"] == (
"prometheus_k8s_base_config"
)
assert asset_rollups["live_receipt_work_batch_count"] == 2
batches = {batch["domain"]: batch for batch in payload["ai_controlled_live_receipt_batches"]}
assert set(batches) == {"prometheus", "telegram"}
assert batches["prometheus"]["tag_hints"]["project_id"] == "awoooi"
assert batches["prometheus"]["tag_hints"]["product_id"] == "awoooi-observability"
assert batches["prometheus"]["tag_hints"]["service_name"] == "prometheus"
assert batches["telegram"]["tag_hints"]["product_id"] == "awooop"
assert batches["telegram"]["target_selector"]["config_kinds"] == ["telegram_gateway"]
pipeline = {stage["stage_id"]: stage for stage in payload["alert_receipt_pipeline"]}
assert pipeline["monitoring_inventory_live_receipts"]["ready"] is False
@@ -235,8 +244,12 @@ def test_telegram_alert_monitoring_coverage_readback_surfaces_live_gaps():
assert progress["work_item_id"] == "CIR-P0-TG-001"
assert progress["completed_check_count"] == 8
assert progress["total_check_count"] == 9
assert payload["ai_controlled_gap_queue"][0]["work_item_id"] == (
"CIR-P0-TG-001-LR-01"
)
assert payload["ai_controlled_gap_queue"][0]["domain"] == "prometheus"
assert payload["ai_controlled_gap_queue"][0]["controlled_next_action"] == (
"classify_monitoring_surfaces_and_ingest_metadata_live_receipts"
"ingest_metadata_only_live_receipt_batch_then_verify_gap_delta"
)
matrix = {item["surface_id"]: item for item in payload["coverage_matrix"]}

View File

@@ -11981,6 +11981,7 @@
"blockers": "Active blockers",
"queue": "AI controlled queue",
"pipeline": "Receipt pipeline",
"surfaces": "{count} surfaces",
"noBlockers": "No active blocker"
},
"recent": "近 24h {count}",

View File

@@ -11981,6 +11981,7 @@
"blockers": "Active blockers",
"queue": "AI controlled queue",
"pipeline": "收據流水線",
"surfaces": "{count} surfaces",
"noBlockers": "無 active blocker"
},
"recent": "近 24h {count}",

View File

@@ -317,6 +317,7 @@ type TelegramMonitoringCoveragePayload = {
alert_receipt_pipeline_stage_count?: number | null;
alert_receipt_pipeline_ready_count?: number | null;
ai_controlled_gap_queue_count?: number | null;
ai_controlled_live_receipt_batch_count?: number | null;
required_tag_dimension_count?: number | null;
monitoring_asset_config_kind_count?: number | null;
telegram_monitoring_audit_completion_percent?: number | null;
@@ -335,7 +336,19 @@ type TelegramMonitoringCoveragePayload = {
config_kind?: string | null;
status?: string | null;
}> | null;
live_receipt_work_batch_count?: number | null;
} | null;
ai_controlled_live_receipt_batches?: Array<{
batch_id?: string | null;
domain?: string | null;
priority?: string | null;
surface_count?: number | null;
tag_hints?: {
product_id?: string | null;
service_name?: string | null;
tool_id?: string | null;
} | null;
}> | null;
runtime_event_type_counts_7d?: Record<string, number | null | undefined> | null;
alert_receipt_pipeline?: Array<{
stage_id?: string | null;
@@ -359,6 +372,14 @@ type TelegramMonitoringCoveragePayload = {
blocker?: string | null;
status?: string | null;
controlled_next_action?: string | null;
batch_id?: string | null;
domain?: string | null;
surface_count?: number | null;
tag_hints?: {
product_id?: string | null;
service_name?: string | null;
tool_id?: string | null;
} | null;
}> | null;
operation_boundaries?: {
telegram_send_performed?: boolean | null;
@@ -1649,25 +1670,37 @@ export function AutonomousRuntimeReceiptPanel({
</span>
</div>
<div className="mt-3 grid gap-2">
{telegramCoverageGapQueue.length > 0 ? (
telegramCoverageGapQueue.slice(0, 4).map((item) => (
<div
key={item.work_item_id ?? item.blocker ?? "gap"}
className="grid gap-1 border border-[#eee9dd] px-2 py-1.5"
>
<div className="flex min-w-0 items-center justify-between gap-2">
<span className="min-w-0 truncate text-xs font-semibold text-[#141413]">
{item.controlled_next_action ?? item.blocker ?? "--"}
</span>
<span className="font-mono text-xs text-[#77736a]">
{item.priority ?? "--"}
</span>
</div>
<span className="truncate font-mono text-xs text-[#77736a]">
{item.work_item_id ?? item.status ?? "--"}
</span>
</div>
))
{telegramCoverageGapQueue.length > 0 ? (
telegramCoverageGapQueue.slice(0, 4).map((item) => (
<div
key={item.work_item_id ?? item.blocker ?? "gap"}
className="grid gap-1 border border-[#eee9dd] px-2 py-1.5"
>
<div className="flex min-w-0 items-center justify-between gap-2">
<span className="min-w-0 truncate text-xs font-semibold text-[#141413]">
{item.domain ?? item.controlled_next_action ?? item.blocker ?? "--"}
</span>
<span className="font-mono text-xs text-[#77736a]">
{item.priority ?? "--"}
</span>
</div>
<div className="flex min-w-0 flex-wrap items-center gap-x-2 gap-y-1 font-mono text-xs text-[#77736a]">
<span className="truncate">{item.work_item_id ?? item.batch_id ?? item.status ?? "--"}</span>
{typeof item.surface_count === "number" ? (
<span>
{t("monitoringCoverage.surfaces", { count: item.surface_count })}
</span>
) : null}
</div>
<span className="truncate text-xs text-[#5f5b52]">
{[
item.tag_hints?.product_id,
item.tag_hints?.service_name,
item.tag_hints?.tool_id,
].filter(Boolean).join(" / ") || item.controlled_next_action || "--"}
</span>
</div>
))
) : (
<div className="border border-[#eee9dd] px-2 py-2 text-xs font-semibold text-[#17602a]">
{t("monitoringCoverage.noBlockers")}