fix(agent): cache runtime receipt readback under db pool pressure
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / build-and-deploy (push) Has been cancelled
CD Pipeline / post-deploy-checks (push) Has been cancelled
CD Pipeline / tests (push) Has been cancelled
This commit is contained in:
@@ -9,6 +9,9 @@ KM, and Telegram receipts are present.
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import copy
|
||||
import time
|
||||
from collections.abc import Iterable, Mapping
|
||||
from datetime import UTC, datetime
|
||||
from typing import Any
|
||||
@@ -42,6 +45,7 @@ _DEPLOY_ATTEMPT_NOTE = "cd_internal_control_plane_readback_retry_20260628_2"
|
||||
_LIVE_READBACK_SCHEMA_VERSION = "ai_agent_autonomous_runtime_receipt_readback_v1"
|
||||
_DEFAULT_PROJECT_ID = "awoooi"
|
||||
_DEFAULT_LOOKBACK_HOURS = 24
|
||||
_RUNTIME_RECEIPT_READBACK_CACHE_TTL_SECONDS = 20.0
|
||||
# CD cancel-stale-cd no-op triggers must not change runtime payloads.
|
||||
_EXECUTOR_OPERATION_TYPES = (
|
||||
"ansible_candidate_matched",
|
||||
@@ -74,6 +78,11 @@ _PUBLIC_VALUE_REDACTIONS = (
|
||||
)
|
||||
|
||||
logger = get_logger(__name__)
|
||||
_runtime_receipt_readback_cache: dict[
|
||||
tuple[str, int, int],
|
||||
tuple[float, dict[str, Any]],
|
||||
] = {}
|
||||
_runtime_receipt_readback_lock: asyncio.Lock | None = None
|
||||
|
||||
|
||||
def _allowed_risk_levels() -> list[str]:
|
||||
@@ -125,6 +134,52 @@ def _int_value(value: Any) -> int:
|
||||
return 0
|
||||
|
||||
|
||||
def _runtime_receipt_readback_cache_key(
|
||||
*,
|
||||
project_id: str,
|
||||
lookback_hours: int,
|
||||
limit: int,
|
||||
) -> tuple[str, int, int]:
|
||||
return (
|
||||
project_id,
|
||||
max(1, int(lookback_hours or _DEFAULT_LOOKBACK_HOURS)),
|
||||
max(1, int(limit or 20)),
|
||||
)
|
||||
|
||||
|
||||
def _get_runtime_receipt_readback_lock() -> asyncio.Lock:
|
||||
global _runtime_receipt_readback_lock
|
||||
if _runtime_receipt_readback_lock is None:
|
||||
_runtime_receipt_readback_lock = asyncio.Lock()
|
||||
return _runtime_receipt_readback_lock
|
||||
|
||||
|
||||
def _runtime_receipt_readback_cache_get(
|
||||
key: tuple[str, int, int],
|
||||
) -> dict[str, Any] | None:
|
||||
cached = _runtime_receipt_readback_cache.get(key)
|
||||
if cached is None:
|
||||
return None
|
||||
stored_at, readback = cached
|
||||
if time.monotonic() - stored_at > _RUNTIME_RECEIPT_READBACK_CACHE_TTL_SECONDS:
|
||||
_runtime_receipt_readback_cache.pop(key, None)
|
||||
return None
|
||||
return copy.deepcopy(readback)
|
||||
|
||||
|
||||
def _runtime_receipt_readback_cache_store(
|
||||
key: tuple[str, int, int],
|
||||
readback: Mapping[str, Any],
|
||||
) -> None:
|
||||
if readback.get("db_read_status") == "unavailable":
|
||||
return
|
||||
_runtime_receipt_readback_cache[key] = (time.monotonic(), copy.deepcopy(dict(readback)))
|
||||
|
||||
|
||||
def _clear_runtime_receipt_readback_cache() -> None:
|
||||
_runtime_receipt_readback_cache.clear()
|
||||
|
||||
|
||||
def _sanitize_latest_rows(
|
||||
rows: Iterable[Mapping[str, Any] | Any],
|
||||
*,
|
||||
@@ -3994,6 +4049,38 @@ async def load_ai_agent_autonomous_runtime_receipt_readback(
|
||||
project_id: str = _DEFAULT_PROJECT_ID,
|
||||
lookback_hours: int = _DEFAULT_LOOKBACK_HOURS,
|
||||
limit: int = 20,
|
||||
) -> dict[str, Any]:
|
||||
"""Read live executor receipts with short in-process DB pool protection."""
|
||||
|
||||
normalized_lookback_hours = max(1, int(lookback_hours or _DEFAULT_LOOKBACK_HOURS))
|
||||
normalized_limit = max(1, int(limit or 20))
|
||||
cache_key = _runtime_receipt_readback_cache_key(
|
||||
project_id=project_id,
|
||||
lookback_hours=normalized_lookback_hours,
|
||||
limit=normalized_limit,
|
||||
)
|
||||
cached = _runtime_receipt_readback_cache_get(cache_key)
|
||||
if cached is not None:
|
||||
return cached
|
||||
|
||||
async with _get_runtime_receipt_readback_lock():
|
||||
cached = _runtime_receipt_readback_cache_get(cache_key)
|
||||
if cached is not None:
|
||||
return cached
|
||||
readback = await _load_ai_agent_autonomous_runtime_receipt_readback_uncached(
|
||||
project_id=project_id,
|
||||
lookback_hours=normalized_lookback_hours,
|
||||
limit=normalized_limit,
|
||||
)
|
||||
_runtime_receipt_readback_cache_store(cache_key, readback)
|
||||
return readback
|
||||
|
||||
|
||||
async def _load_ai_agent_autonomous_runtime_receipt_readback_uncached(
|
||||
*,
|
||||
project_id: str = _DEFAULT_PROJECT_ID,
|
||||
lookback_hours: int = _DEFAULT_LOOKBACK_HOURS,
|
||||
limit: int = 20,
|
||||
) -> dict[str, Any]:
|
||||
"""Read live executor receipts without sending messages or mutating runtime state."""
|
||||
|
||||
|
||||
@@ -210,6 +210,8 @@ def _assert_log_controlled_writeback_consumer(payload: dict):
|
||||
async def test_live_runtime_receipt_keeps_consumer_readback_when_trace_query_fails(
|
||||
monkeypatch,
|
||||
):
|
||||
runtime_control_module._clear_runtime_receipt_readback_cache()
|
||||
|
||||
async def _fake_consumer_readback(*, project_id: str):
|
||||
assert project_id == "awoooi"
|
||||
return _log_controlled_writeback_consumer_readback()
|
||||
@@ -225,9 +227,12 @@ async def test_live_runtime_receipt_keeps_consumer_readback_when_trace_query_fai
|
||||
_fake_consumer_readback,
|
||||
)
|
||||
|
||||
readback = (
|
||||
await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback()
|
||||
)
|
||||
try:
|
||||
readback = (
|
||||
await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback()
|
||||
)
|
||||
finally:
|
||||
runtime_control_module._clear_runtime_receipt_readback_cache()
|
||||
|
||||
assert readback["db_read_status"] == "unavailable"
|
||||
assert readback["error"]["type"] == "RuntimeError"
|
||||
@@ -236,6 +241,99 @@ async def test_live_runtime_receipt_keeps_consumer_readback_when_trace_query_fai
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_live_runtime_receipt_readback_uses_short_cache_to_protect_db_pool(
|
||||
monkeypatch,
|
||||
):
|
||||
runtime_control_module._clear_runtime_receipt_readback_cache()
|
||||
calls = 0
|
||||
|
||||
async def _fake_uncached_loader(
|
||||
*,
|
||||
project_id: str,
|
||||
lookback_hours: int,
|
||||
limit: int,
|
||||
) -> dict:
|
||||
nonlocal calls
|
||||
calls += 1
|
||||
return build_runtime_receipt_readback_from_rows(
|
||||
project_id=project_id,
|
||||
lookback_hours=lookback_hours,
|
||||
db_read_status="ok",
|
||||
service_log_count_rows=[
|
||||
{"status": "sanitized_recent_logs", "total": calls, "recent": calls},
|
||||
],
|
||||
)
|
||||
|
||||
monkeypatch.setattr(
|
||||
runtime_control_module,
|
||||
"_load_ai_agent_autonomous_runtime_receipt_readback_uncached",
|
||||
_fake_uncached_loader,
|
||||
)
|
||||
|
||||
try:
|
||||
first = await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback()
|
||||
second = await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback()
|
||||
finally:
|
||||
runtime_control_module._clear_runtime_receipt_readback_cache()
|
||||
|
||||
assert calls == 1
|
||||
assert first["db_read_status"] == "ok"
|
||||
assert second["db_read_status"] == "ok"
|
||||
assert (
|
||||
second["log_integration_taxonomy"]["rollups"]["classified_event_total"]
|
||||
== first["log_integration_taxonomy"]["rollups"]["classified_event_total"]
|
||||
)
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_live_runtime_receipt_readback_does_not_cache_pool_unavailable(
|
||||
monkeypatch,
|
||||
):
|
||||
runtime_control_module._clear_runtime_receipt_readback_cache()
|
||||
calls = 0
|
||||
|
||||
async def _fake_uncached_loader(
|
||||
*,
|
||||
project_id: str,
|
||||
lookback_hours: int,
|
||||
limit: int,
|
||||
) -> dict:
|
||||
nonlocal calls
|
||||
calls += 1
|
||||
if calls == 1:
|
||||
return build_runtime_receipt_readback_from_rows(
|
||||
project_id=project_id,
|
||||
lookback_hours=lookback_hours,
|
||||
db_read_status="unavailable",
|
||||
error_type="TimeoutError",
|
||||
)
|
||||
return build_runtime_receipt_readback_from_rows(
|
||||
project_id=project_id,
|
||||
lookback_hours=lookback_hours,
|
||||
db_read_status="ok",
|
||||
service_log_count_rows=[
|
||||
{"status": "sanitized_recent_logs", "total": 3, "recent": 1},
|
||||
],
|
||||
)
|
||||
|
||||
monkeypatch.setattr(
|
||||
runtime_control_module,
|
||||
"_load_ai_agent_autonomous_runtime_receipt_readback_uncached",
|
||||
_fake_uncached_loader,
|
||||
)
|
||||
|
||||
try:
|
||||
first = await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback()
|
||||
second = await runtime_control_module.load_ai_agent_autonomous_runtime_receipt_readback()
|
||||
finally:
|
||||
runtime_control_module._clear_runtime_receipt_readback_cache()
|
||||
|
||||
assert calls == 2
|
||||
assert first["db_read_status"] == "unavailable"
|
||||
assert second["db_read_status"] == "ok"
|
||||
|
||||
|
||||
def test_runtime_control_rollups_project_consumer_dispatch_ledger_if_trace_missing():
|
||||
payload = build_ai_agent_autonomous_runtime_control()
|
||||
readback = build_runtime_receipt_readback_from_rows(
|
||||
|
||||
Reference in New Issue
Block a user