Files
wooo 489baecbe8
Some checks failed
2026 World Cup Quant Platform - Production Deployment / Code Quality, Security Gate & Testing (push) Failing after 2m44s
2026 World Cup Quant Platform - Production Deployment / Deploy to Production VM via Gitea CD (push) Has been skipped
fix: restore backend runtime entrypoints
2026-06-18 13:00:28 +08:00

56 lines
1.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""News ingestion worker placeholder with explicit freshness status."""
from __future__ import annotations
import asyncio
import json
import logging
import os
from datetime import datetime, timezone
from typing import Any
from redis.asyncio import Redis
LOGGER = logging.getLogger("fifa2026-news-worker")
logging.basicConfig(level=logging.INFO)
REDIS_URL = os.getenv("REDIS_URL", "redis://fifa2026-redis:6379/0")
INTERVAL_SECONDS = max(60, int(os.getenv("NEWS_POLL_INTERVAL_SECONDS", "900")))
STATUS_KEY = "ingestion:news:last_run"
async def publish_status(payload: dict[str, Any]) -> None:
redis = Redis.from_url(REDIS_URL, decode_responses=True)
try:
await redis.set(STATUS_KEY, json.dumps(payload, ensure_ascii=False), ex=max(INTERVAL_SECONDS * 3, 900))
finally:
await redis.aclose()
async def run_once() -> dict[str, Any]:
payload = {
"status": "standby",
"worker": "news_worker",
"run_at": datetime.now(timezone.utc).isoformat(),
"message": "新聞來源尚未設定為正式授權來源;目前只回報新鮮度,不產生假新聞。",
}
await publish_status(payload)
LOGGER.info("%s", payload)
return payload
async def run_forever() -> None:
LOGGER.info("啟動 news_workerinterval=%ss", INTERVAL_SECONDS)
while True:
try:
await run_once()
except Exception as exc: # pragma: no cover - worker loop must survive transient Redis errors
LOGGER.exception("news_worker 本輪狀態寫入失敗:%s", exc)
await asyncio.sleep(INTERVAL_SECONDS)
if __name__ == "__main__":
if os.getenv("WORKER_ONCE") == "true":
print(asyncio.run(run_once()))
else:
asyncio.run(run_forever())