diff --git a/apps/api/src/services/ai_agent_autonomous_runtime_control.py b/apps/api/src/services/ai_agent_autonomous_runtime_control.py index 29f103f54..6f04a9255 100644 --- a/apps/api/src/services/ai_agent_autonomous_runtime_control.py +++ b/apps/api/src/services/ai_agent_autonomous_runtime_control.py @@ -9,6 +9,9 @@ KM, and Telegram receipts are present. from __future__ import annotations +import asyncio +import copy +import time from collections.abc import Iterable, Mapping from datetime import UTC, datetime from typing import Any @@ -42,6 +45,7 @@ _DEPLOY_ATTEMPT_NOTE = "cd_internal_control_plane_readback_retry_20260628_2" _LIVE_READBACK_SCHEMA_VERSION = "ai_agent_autonomous_runtime_receipt_readback_v1" _DEFAULT_PROJECT_ID = "awoooi" _DEFAULT_LOOKBACK_HOURS = 24 +_RUNTIME_RECEIPT_READBACK_CACHE_TTL_SECONDS = 20.0 # CD cancel-stale-cd no-op triggers must not change runtime payloads. _EXECUTOR_OPERATION_TYPES = ( "ansible_candidate_matched", @@ -74,6 +78,11 @@ _PUBLIC_VALUE_REDACTIONS = ( ) logger = get_logger(__name__) +_runtime_receipt_readback_cache: dict[ + tuple[str, int, int], + tuple[float, dict[str, Any]], +] = {} +_runtime_receipt_readback_lock: asyncio.Lock | None = None def _allowed_risk_levels() -> list[str]: @@ -125,6 +134,52 @@ def _int_value(value: Any) -> int: return 0 +def _runtime_receipt_readback_cache_key( + *, + project_id: str, + lookback_hours: int, + limit: int, +) -> tuple[str, int, int]: + return ( + project_id, + max(1, int(lookback_hours or _DEFAULT_LOOKBACK_HOURS)), + max(1, int(limit or 20)), + ) + + +def _get_runtime_receipt_readback_lock() -> asyncio.Lock: + global _runtime_receipt_readback_lock + if _runtime_receipt_readback_lock is None: + _runtime_receipt_readback_lock = asyncio.Lock() + return _runtime_receipt_readback_lock + + +def _runtime_receipt_readback_cache_get( + key: tuple[str, int, int], +) -> dict[str, Any] | None: + cached = _runtime_receipt_readback_cache.get(key) + if cached is None: + return None + stored_at, readback = cached + if time.monotonic() - stored_at > _RUNTIME_RECEIPT_READBACK_CACHE_TTL_SECONDS: + _runtime_receipt_readback_cache.pop(key, None) + return None + return copy.deepcopy(readback) + + +def _runtime_receipt_readback_cache_store( + key: tuple[str, int, int], + readback: Mapping[str, Any], +) -> None: + if readback.get("db_read_status") == "unavailable": + return + _runtime_receipt_readback_cache[key] = (time.monotonic(), copy.deepcopy(dict(readback))) + + +def _clear_runtime_receipt_readback_cache() -> None: + _runtime_receipt_readback_cache.clear() + + def _sanitize_latest_rows( rows: Iterable[Mapping[str, Any] | Any], *, @@ -3994,6 +4049,38 @@ async def load_ai_agent_autonomous_runtime_receipt_readback( project_id: str = _DEFAULT_PROJECT_ID, lookback_hours: int = _DEFAULT_LOOKBACK_HOURS, limit: int = 20, +) -> dict[str, Any]: + """Read live executor receipts with short in-process DB pool protection.""" + + normalized_lookback_hours = max(1, int(lookback_hours or _DEFAULT_LOOKBACK_HOURS)) + normalized_limit = max(1, int(limit or 20)) + cache_key = _runtime_receipt_readback_cache_key( + project_id=project_id, + lookback_hours=normalized_lookback_hours, + limit=normalized_limit, + ) + cached = _runtime_receipt_readback_cache_get(cache_key) + if cached is not None: + return cached + + async with _get_runtime_receipt_readback_lock(): + cached = _runtime_receipt_readback_cache_get(cache_key) + if cached is not None: + return cached + readback = await _load_ai_agent_autonomous_runtime_receipt_readback_uncached( + project_id=project_id, + lookback_hours=normalized_lookback_hours, + limit=normalized_limit, + ) + _runtime_receipt_readback_cache_store(cache_key, readback) + return readback + + +async def _load_ai_agent_autonomous_runtime_receipt_readback_uncached( + *, + project_id: str = _DEFAULT_PROJECT_ID, + lookback_hours: int = _DEFAULT_LOOKBACK_HOURS, + limit: int = 20, ) -> dict[str, Any]: """Read live executor receipts without sending messages or mutating runtime state.""" diff --git a/apps/api/tests/test_ai_agent_autonomous_runtime_control.py b/apps/api/tests/test_ai_agent_autonomous_runtime_control.py index 956ac4934..fac62a80b 100644 --- a/apps/api/tests/test_ai_agent_autonomous_runtime_control.py +++ b/apps/api/tests/test_ai_agent_autonomous_runtime_control.py @@ -210,6 +210,8 @@ def _assert_log_controlled_writeback_consumer(payload: dict): async def test_live_runtime_receipt_keeps_consumer_readback_when_trace_query_fails( monkeypatch, ): + runtime_control_module._clear_runtime_receipt_readback_cache() + async def _fake_consumer_readback(*, project_id: str): assert project_id == "awoooi" return _log_controlled_writeback_consumer_readback() @@ -225,9 +227,12 @@ async def test_live_runtime_receipt_keeps_consumer_readback_when_trace_query_fai _fake_consumer_readback, ) - readback = ( - await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback() - ) + try: + readback = ( + await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback() + ) + finally: + runtime_control_module._clear_runtime_receipt_readback_cache() assert readback["db_read_status"] == "unavailable" assert readback["error"]["type"] == "RuntimeError" @@ -236,6 +241,99 @@ async def test_live_runtime_receipt_keeps_consumer_readback_when_trace_query_fai ) +@pytest.mark.asyncio +async def test_live_runtime_receipt_readback_uses_short_cache_to_protect_db_pool( + monkeypatch, +): + runtime_control_module._clear_runtime_receipt_readback_cache() + calls = 0 + + async def _fake_uncached_loader( + *, + project_id: str, + lookback_hours: int, + limit: int, + ) -> dict: + nonlocal calls + calls += 1 + return build_runtime_receipt_readback_from_rows( + project_id=project_id, + lookback_hours=lookback_hours, + db_read_status="ok", + service_log_count_rows=[ + {"status": "sanitized_recent_logs", "total": calls, "recent": calls}, + ], + ) + + monkeypatch.setattr( + runtime_control_module, + "_load_ai_agent_autonomous_runtime_receipt_readback_uncached", + _fake_uncached_loader, + ) + + try: + first = await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback() + second = await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback() + finally: + runtime_control_module._clear_runtime_receipt_readback_cache() + + assert calls == 1 + assert first["db_read_status"] == "ok" + assert second["db_read_status"] == "ok" + assert ( + second["log_integration_taxonomy"]["rollups"]["classified_event_total"] + == first["log_integration_taxonomy"]["rollups"]["classified_event_total"] + ) + + +@pytest.mark.asyncio +async def test_live_runtime_receipt_readback_does_not_cache_pool_unavailable( + monkeypatch, +): + runtime_control_module._clear_runtime_receipt_readback_cache() + calls = 0 + + async def _fake_uncached_loader( + *, + project_id: str, + lookback_hours: int, + limit: int, + ) -> dict: + nonlocal calls + calls += 1 + if calls == 1: + return build_runtime_receipt_readback_from_rows( + project_id=project_id, + lookback_hours=lookback_hours, + db_read_status="unavailable", + error_type="TimeoutError", + ) + return build_runtime_receipt_readback_from_rows( + project_id=project_id, + lookback_hours=lookback_hours, + db_read_status="ok", + service_log_count_rows=[ + {"status": "sanitized_recent_logs", "total": 3, "recent": 1}, + ], + ) + + monkeypatch.setattr( + runtime_control_module, + "_load_ai_agent_autonomous_runtime_receipt_readback_uncached", + _fake_uncached_loader, + ) + + try: + first = await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback() + second = await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback() + finally: + runtime_control_module._clear_runtime_receipt_readback_cache() + + assert calls == 2 + assert first["db_read_status"] == "unavailable" + assert second["db_read_status"] == "ok" + + def test_runtime_control_rollups_project_consumer_dispatch_ledger_if_trace_missing(): payload = build_ai_agent_autonomous_runtime_control() readback = build_runtime_receipt_readback_from_rows(