diff --git a/apps/api/src/services/telegram_alert_monitoring_coverage_readback.py b/apps/api/src/services/telegram_alert_monitoring_coverage_readback.py index 0bc72abde..19594da21 100644 --- a/apps/api/src/services/telegram_alert_monitoring_coverage_readback.py +++ b/apps/api/src/services/telegram_alert_monitoring_coverage_readback.py @@ -322,6 +322,9 @@ def build_telegram_alert_monitoring_coverage_readback( 1 for stage in alert_receipt_pipeline if stage["ready"] ), "ai_controlled_gap_queue_count": len(ai_controlled_gap_queue), + "ai_controlled_live_receipt_batch_count": _int( + monitoring_asset_rollups.get("live_receipt_work_batch_count") + ), "required_tag_dimension_count": len(_REQUIRED_TAG_DIMENSIONS), "monitoring_asset_config_kind_count": len( monitoring_asset_rollups["by_config_kind"] @@ -333,6 +336,9 @@ def build_telegram_alert_monitoring_coverage_readback( }, "required_tag_dimensions": _required_tag_dimensions(), "monitoring_asset_rollups": monitoring_asset_rollups, + "ai_controlled_live_receipt_batches": _list_of_dicts( + monitoring_asset_rollups.get("live_receipt_work_batches") + ), "runtime_event_type_counts_7d": runtime_event_type_counts, "alert_receipt_pipeline": alert_receipt_pipeline, "work_item_progress": work_item_progress, @@ -555,6 +561,7 @@ def _monitoring_asset_rollups( and not bool(item.get("post_incident_readback_accepted")) and not bool(item.get("live_evidence_received")) ][:12] + live_receipt_work_batches = _monitoring_live_receipt_work_batches(inventory_source) return { "surface_count": len(surfaces) or len(inventory_source), "readback_candidate_count": len(readback_candidates), @@ -575,6 +582,8 @@ def _monitoring_asset_rollups( "by_reviewer_outcome": _count_by_key(inventory_source, "reviewer_outcome"), "by_expected_scope_prefix": _count_by_expected_scope_prefix(inventory_source), "live_receipt_gap_samples": live_receipt_gap_samples, + "live_receipt_work_batch_count": len(live_receipt_work_batches), + "live_receipt_work_batches": live_receipt_work_batches, } @@ -609,6 +618,170 @@ def _monitoring_gap_sample(item: Mapping[str, Any]) -> dict[str, Any]: } +def _monitoring_live_receipt_work_batches( + inventory_source: list[dict[str, Any]], +) -> list[dict[str, Any]]: + gap_items = [ + item + for item in inventory_source + if bool(item.get("requires_live_evidence")) + and not bool(item.get("post_incident_readback_accepted")) + and not bool(item.get("live_evidence_received")) + ] + grouped: dict[str, list[dict[str, Any]]] = {} + for item in gap_items: + grouped.setdefault(_monitoring_receipt_domain(item), []).append(item) + + batches: list[dict[str, Any]] = [] + for index, (domain, items) in enumerate(sorted(grouped.items()), start=1): + missing_fields = sorted( + { + field + for item in items + for field in _monitoring_gap_sample(item)["missing_receipt_fields"] + } + ) + source_paths = sorted( + { + str(item.get("repo_source_path") or item.get("source_path") or "") + for item in items + if item.get("repo_source_path") or item.get("source_path") + } + ) + write_capable_count = sum( + 1 for item in items if bool(item.get("write_capable_surface")) + ) + batches.append({ + "batch_id": f"tg_live_receipt_{index:02d}_{domain}", + "domain": domain, + "priority": "P0" if write_capable_count or index <= 6 else "P1", + "surface_count": len(items), + "write_capable_surface_count": write_capable_count, + "requires_live_evidence_count": sum( + 1 for item in items if bool(item.get("requires_live_evidence")) + ), + "missing_receipt_fields": missing_fields, + "tag_hints": _monitoring_batch_tag_hints(domain, items), + "target_selector": { + "surface_ids": [str(item.get("surface_id") or "") for item in items], + "config_kinds": sorted( + {str(item.get("config_kind") or "unknown") for item in items} + ), + "control_tiers": sorted( + {str(item.get("control_tier") or "unknown") for item in items} + ), + "expected_scope_prefixes": sorted( + { + str(item.get("expected_scope") or "unknown").split("_", 1)[0] + for item in items + } + ), + "repo_source_paths": source_paths[:12], + }, + "controlled_next_action": ( + "ingest_metadata_only_live_receipt_batch_then_verify_gap_delta" + ), + "post_verifier": "/api/v1/agents/telegram-alert-monitoring-coverage-readback", + "sample_surfaces": [_monitoring_gap_sample(item) for item in items[:4]], + }) + return batches + + +def _monitoring_receipt_domain(item: Mapping[str, Any]) -> str: + config_kind = str(item.get("config_kind") or "").lower() + expected_scope = str(item.get("expected_scope") or "").lower() + source_path = str( + item.get("repo_source_path") or item.get("source_path") or "" + ).lower() + haystack = f"{config_kind} {expected_scope} {source_path}" + for domain, needles in ( + ("telegram", ("telegram",)), + ("alertmanager", ("alertmanager", "webhook_receiver")), + ("grafana", ("grafana",)), + ("signoz", ("signoz",)), + ("sentry", ("sentry",)), + ("langfuse", ("langfuse",)), + ("otel", ("otel", "opentelemetry")), + ("prometheus", ("prometheus", "scrape")), + ("notification", ("notification", "router", "policy")), + ("runtime_service", ("runtime", "compose", "service", "host", "k3s")), + ("smoke", ("smoke", "health", "readiness")), + ("drift_guard", ("drift", "recurrence")), + ("ai_agent", ("ai_", "agent", "openclaw")), + ): + if any(needle in haystack for needle in needles): + return domain + return "monitoring_general" + + +def _monitoring_batch_tag_hints( + domain: str, + items: list[dict[str, Any]], +) -> dict[str, Any]: + config_kinds = sorted({str(item.get("config_kind") or "unknown") for item in items}) + expected_scope_prefixes = sorted( + {str(item.get("expected_scope") or "unknown").split("_", 1)[0] for item in items} + ) + return { + "project_id": DEFAULT_PROJECT_ID, + "product_id": _monitoring_product_hint(domain, config_kinds), + "website_id": "awoooi.wooo.work", + "service_name": _monitoring_service_hint(domain), + "package_name": _monitoring_package_hint(domain), + "tool_id": domain, + "log_source": "monitoring_live_receipt_metadata", + "alertname": "monitoring_surface_live_receipt_gap", + "playbook_id": f"playbook://awoooi/monitoring-live-receipt/{domain}", + "rag_context_ref": f"rag://awoooi/monitoring/{domain}", + "mcp_evidence_ref": f"mcp://awoooi/monitoring/{domain}", + "schedule_id": f"schedule://awoooi/monitoring-live-receipt/{domain}", + "config_kinds": config_kinds, + "expected_scope_prefixes": expected_scope_prefixes, + } + + +def _monitoring_product_hint(domain: str, config_kinds: list[str]) -> str: + if domain in {"telegram", "notification", "alertmanager"}: + return "awooop" + if any("langfuse" in kind for kind in config_kinds): + return "ai-observability" + if domain in {"sentry", "signoz", "grafana", "prometheus", "otel"}: + return "awoooi-observability" + return "awoooi" + + +def _monitoring_service_hint(domain: str) -> str: + return { + "telegram": "telegram-gateway", + "notification": "notification-router", + "alertmanager": "alertmanager-webhook", + "prometheus": "prometheus", + "grafana": "grafana", + "signoz": "signoz", + "sentry": "sentry", + "langfuse": "langfuse", + "otel": "otel-collector", + "runtime_service": "runtime-service-monitoring", + "smoke": "smoke-verifier", + "drift_guard": "recurrence-guard", + "ai_agent": "ai-agent-loop", + }.get(domain, "monitoring") + + +def _monitoring_package_hint(domain: str) -> str: + return { + "prometheus": "prometheus-config", + "grafana": "grafana-dashboard", + "signoz": "signoz-rules", + "sentry": "sentry-runtime", + "langfuse": "langfuse-runtime", + "otel": "otel-collector-config", + "telegram": "telegram-alert-delivery", + "notification": "notification-policy", + "alertmanager": "alertmanager-receiver", + }.get(domain, "monitoring-receipt") + + def _alert_receipt_pipeline( *, monitoring_surface_count: int, @@ -730,8 +903,43 @@ def _ai_controlled_gap_queue( monitoring_asset_rollups: Mapping[str, Any], ) -> list[dict[str, Any]]: gap_samples = _list_of_dicts(monitoring_asset_rollups.get("live_receipt_gap_samples")) + work_batches = _list_of_dicts(monitoring_asset_rollups.get("live_receipt_work_batches")) queue: list[dict[str, Any]] = [] for index, blocker in enumerate(active_blockers, start=1): + if blocker.startswith("monitoring_live_receipt_gap") and work_batches: + for batch_index, batch in enumerate(work_batches, start=1): + queue.append({ + "work_item_id": f"CIR-P0-TG-001-LR-{batch_index:02d}", + "parent_work_item_id": "CIR-P0-TG-001", + "priority": str(batch.get("priority") or "P1"), + "blocker": blocker, + "status": "queued_ai_controlled_apply", + "owner_review_required_for_low_medium_high": False, + "critical_break_glass_required": True, + "batch_id": str(batch.get("batch_id") or ""), + "domain": str(batch.get("domain") or "monitoring_general"), + "surface_count": _int(batch.get("surface_count")), + "write_capable_surface_count": _int( + batch.get("write_capable_surface_count") + ), + "tag_hints": _dict(batch.get("tag_hints")), + "target_selector": _dict(batch.get("target_selector")), + "controlled_next_action": str( + batch.get("controlled_next_action") + or "ingest_metadata_only_live_receipt_batch_then_verify_gap_delta" + ), + "post_verifier": ( + "/api/v1/agents/telegram-alert-monitoring-coverage-readback" + ), + "sample_surfaces": _list_of_dicts(batch.get("sample_surfaces")), + "operation_boundaries": { + "requires_secret": False, + "requires_runtime_send": False, + "stores_raw_payload": False, + "destructive_action": False, + }, + }) + continue queue.append({ "work_item_id": f"CIR-P0-TG-001-{index:02d}", "parent_work_item_id": "CIR-P0-TG-001", diff --git a/apps/api/tests/test_telegram_alert_monitoring_coverage_readback_api.py b/apps/api/tests/test_telegram_alert_monitoring_coverage_readback_api.py index 1336b56cc..8ee1cbc37 100644 --- a/apps/api/tests/test_telegram_alert_monitoring_coverage_readback_api.py +++ b/apps/api/tests/test_telegram_alert_monitoring_coverage_readback_api.py @@ -205,7 +205,8 @@ def test_telegram_alert_monitoring_coverage_readback_surfaces_live_gaps(): assert summary["runtime_event_type_with_receipts_count"] == 4 assert summary["alert_receipt_pipeline_stage_count"] == 7 assert summary["alert_receipt_pipeline_ready_count"] == 6 - assert summary["ai_controlled_gap_queue_count"] == 1 + assert summary["ai_controlled_gap_queue_count"] == 2 + assert summary["ai_controlled_live_receipt_batch_count"] == 2 assert summary["required_tag_dimension_count"] == 12 assert summary["monitoring_asset_config_kind_count"] == 2 assert summary["telegram_monitoring_audit_completion_percent"] == 88.9 @@ -224,6 +225,14 @@ def test_telegram_alert_monitoring_coverage_readback_surfaces_live_gaps(): assert asset_rollups["live_receipt_gap_samples"][0]["surface_id"] == ( "prometheus_k8s_base_config" ) + assert asset_rollups["live_receipt_work_batch_count"] == 2 + batches = {batch["domain"]: batch for batch in payload["ai_controlled_live_receipt_batches"]} + assert set(batches) == {"prometheus", "telegram"} + assert batches["prometheus"]["tag_hints"]["project_id"] == "awoooi" + assert batches["prometheus"]["tag_hints"]["product_id"] == "awoooi-observability" + assert batches["prometheus"]["tag_hints"]["service_name"] == "prometheus" + assert batches["telegram"]["tag_hints"]["product_id"] == "awooop" + assert batches["telegram"]["target_selector"]["config_kinds"] == ["telegram_gateway"] pipeline = {stage["stage_id"]: stage for stage in payload["alert_receipt_pipeline"]} assert pipeline["monitoring_inventory_live_receipts"]["ready"] is False @@ -235,8 +244,12 @@ def test_telegram_alert_monitoring_coverage_readback_surfaces_live_gaps(): assert progress["work_item_id"] == "CIR-P0-TG-001" assert progress["completed_check_count"] == 8 assert progress["total_check_count"] == 9 + assert payload["ai_controlled_gap_queue"][0]["work_item_id"] == ( + "CIR-P0-TG-001-LR-01" + ) + assert payload["ai_controlled_gap_queue"][0]["domain"] == "prometheus" assert payload["ai_controlled_gap_queue"][0]["controlled_next_action"] == ( - "classify_monitoring_surfaces_and_ingest_metadata_live_receipts" + "ingest_metadata_only_live_receipt_batch_then_verify_gap_delta" ) matrix = {item["surface_id"]: item for item in payload["coverage_matrix"]} diff --git a/apps/web/messages/en.json b/apps/web/messages/en.json index 9678117b7..21b3ce918 100644 --- a/apps/web/messages/en.json +++ b/apps/web/messages/en.json @@ -11981,6 +11981,7 @@ "blockers": "Active blockers", "queue": "AI controlled queue", "pipeline": "Receipt pipeline", + "surfaces": "{count} surfaces", "noBlockers": "No active blocker" }, "recent": "近 24h {count}", diff --git a/apps/web/messages/zh-TW.json b/apps/web/messages/zh-TW.json index 297412c2f..ef9bdecdf 100644 --- a/apps/web/messages/zh-TW.json +++ b/apps/web/messages/zh-TW.json @@ -11981,6 +11981,7 @@ "blockers": "Active blockers", "queue": "AI controlled queue", "pipeline": "收據流水線", + "surfaces": "{count} surfaces", "noBlockers": "無 active blocker" }, "recent": "近 24h {count}", diff --git a/apps/web/src/components/awooop/autonomous-runtime-receipt-panel.tsx b/apps/web/src/components/awooop/autonomous-runtime-receipt-panel.tsx index 0a61aebd8..a62d5af0c 100644 --- a/apps/web/src/components/awooop/autonomous-runtime-receipt-panel.tsx +++ b/apps/web/src/components/awooop/autonomous-runtime-receipt-panel.tsx @@ -317,6 +317,7 @@ type TelegramMonitoringCoveragePayload = { alert_receipt_pipeline_stage_count?: number | null; alert_receipt_pipeline_ready_count?: number | null; ai_controlled_gap_queue_count?: number | null; + ai_controlled_live_receipt_batch_count?: number | null; required_tag_dimension_count?: number | null; monitoring_asset_config_kind_count?: number | null; telegram_monitoring_audit_completion_percent?: number | null; @@ -335,7 +336,19 @@ type TelegramMonitoringCoveragePayload = { config_kind?: string | null; status?: string | null; }> | null; + live_receipt_work_batch_count?: number | null; } | null; + ai_controlled_live_receipt_batches?: Array<{ + batch_id?: string | null; + domain?: string | null; + priority?: string | null; + surface_count?: number | null; + tag_hints?: { + product_id?: string | null; + service_name?: string | null; + tool_id?: string | null; + } | null; + }> | null; runtime_event_type_counts_7d?: Record | null; alert_receipt_pipeline?: Array<{ stage_id?: string | null; @@ -359,6 +372,14 @@ type TelegramMonitoringCoveragePayload = { blocker?: string | null; status?: string | null; controlled_next_action?: string | null; + batch_id?: string | null; + domain?: string | null; + surface_count?: number | null; + tag_hints?: { + product_id?: string | null; + service_name?: string | null; + tool_id?: string | null; + } | null; }> | null; operation_boundaries?: { telegram_send_performed?: boolean | null; @@ -1649,25 +1670,37 @@ export function AutonomousRuntimeReceiptPanel({
- {telegramCoverageGapQueue.length > 0 ? ( - telegramCoverageGapQueue.slice(0, 4).map((item) => ( -
-
- - {item.controlled_next_action ?? item.blocker ?? "--"} - - - {item.priority ?? "--"} - -
- - {item.work_item_id ?? item.status ?? "--"} - -
- )) + {telegramCoverageGapQueue.length > 0 ? ( + telegramCoverageGapQueue.slice(0, 4).map((item) => ( +
+
+ + {item.domain ?? item.controlled_next_action ?? item.blocker ?? "--"} + + + {item.priority ?? "--"} + +
+
+ {item.work_item_id ?? item.batch_id ?? item.status ?? "--"} + {typeof item.surface_count === "number" ? ( + + {t("monitoringCoverage.surfaces", { count: item.surface_count })} + + ) : null} +
+ + {[ + item.tag_hints?.product_id, + item.tag_hints?.service_name, + item.tag_hints?.tool_id, + ].filter(Boolean).join(" / ") || item.controlled_next_action || "--"} + +
+ )) ) : (
{t("monitoringCoverage.noBlockers")}