"""Fixtures ingestion worker placeholder.""" from __future__ import annotations import asyncio import json import logging import os from datetime import datetime, timezone from typing import Any from redis.asyncio import Redis LOGGER = logging.getLogger("fifa2026-fixtures-worker") logging.basicConfig(level=logging.INFO) REDIS_URL = os.getenv("REDIS_URL", "redis://fifa2026-redis:6379/0") INTERVAL_SECONDS = max(300, int(os.getenv("FIXTURES_POLL_INTERVAL_SECONDS", "21600"))) STATUS_KEY = "ingestion:fixtures:last_run" async def publish_status(payload: dict[str, Any]) -> None: redis = Redis.from_url(REDIS_URL, decode_responses=True) try: await redis.set(STATUS_KEY, json.dumps(payload, ensure_ascii=False), ex=max(INTERVAL_SECONDS * 3, 900)) finally: await redis.aclose() async def run_once() -> dict[str, Any]: payload = { "status": "standby", "worker": "fixtures_worker", "run_at": datetime.now(timezone.utc).isoformat(), "message": "賽程同步器已啟動;正式來源未驗證前不覆寫既有賽程。", } await publish_status(payload) LOGGER.info("%s", payload) return payload async def run_forever() -> None: LOGGER.info("啟動 fixtures_worker,interval=%ss", INTERVAL_SECONDS) while True: try: await run_once() except Exception as exc: # pragma: no cover - worker loop must survive transient Redis errors LOGGER.exception("fixtures_worker 本輪狀態寫入失敗:%s", exc) await asyncio.sleep(INTERVAL_SECONDS) if __name__ == "__main__": if os.getenv("WORKER_ONCE") == "true": print(asyncio.run(run_once())) else: asyncio.run(run_forever())