fix(ops): close post-reboot recovery guardrails
Some checks failed
CD Pipeline / workflow-shape (push) Successful in 0s
CD Pipeline / cancel-stale-cd (push) Has been skipped
CD Pipeline / tests (push) Failing after 1m57s
CD Pipeline / build-and-deploy (push) Has been skipped
CD Pipeline / post-deploy-checks (push) Has been skipped

This commit is contained in:
Your Name
2026-07-02 00:18:17 +08:00
parent 6b8edd8ffe
commit a15ab298ff
6 changed files with 654 additions and 2 deletions

View File

@@ -0,0 +1,305 @@
#!/usr/bin/env python3
"""Bounded Docker disk-pressure cleanup for host reboot recovery.
This controller intentionally avoids `docker system prune`, volumes, containers,
running images, databases, backups, and logs. It only removes dangling images
that are not referenced by any container, and can optionally run a bounded
BuildKit cache cleanup with an explicit keep-storage floor.
"""
from __future__ import annotations
import argparse
import json
import subprocess
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path
from typing import Any, Iterable
DEFAULT_MIN_AGE_HOURS = 24
DEFAULT_KEEP_DANGLING_NEWEST = 20
DEFAULT_BUILDER_KEEP_STORAGE = "30GB"
@dataclass(frozen=True)
class ImageInfo:
image_id: str
created_at: datetime
size_bytes: int
repo_tags: tuple[str, ...]
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Safely reclaim Docker disk space without touching volumes or containers.",
)
parser.add_argument("--apply", action="store_true", help="Actually remove selected images/cache.")
parser.add_argument("--docker-bin", default="docker")
parser.add_argument("--disk-path", default="/")
parser.add_argument("--host-label", default="")
parser.add_argument("--min-age-hours", type=int, default=DEFAULT_MIN_AGE_HOURS)
parser.add_argument("--keep-dangling-newest", type=int, default=DEFAULT_KEEP_DANGLING_NEWEST)
parser.add_argument(
"--skip-dangling-images",
action="store_true",
help="Do not remove dangling images; useful for builder-cache-only follow-up cleanup.",
)
parser.add_argument(
"--include-builder-cache",
action="store_true",
help="Also run docker builder prune with --filter until and --keep-storage.",
)
parser.add_argument("--builder-keep-storage", default=DEFAULT_BUILDER_KEEP_STORAGE)
parser.add_argument("--output", type=Path, help="Optional JSON receipt path.")
return parser.parse_args()
def run_command(
args: list[str],
*,
check: bool = True,
capture_output: bool = True,
) -> subprocess.CompletedProcess[str]:
return subprocess.run(
args,
check=check,
text=True,
capture_output=capture_output,
)
def docker(args: list[str], docker_bin: str) -> subprocess.CompletedProcess[str]:
return run_command([docker_bin, *args])
def normalize_image_id(value: str) -> str:
value = value.strip()
if value.startswith("sha256:"):
value = value.split(":", 1)[1]
return value
def parse_docker_datetime(value: str) -> datetime:
text = value.strip()
if text.endswith("Z"):
text = text[:-1] + "+00:00"
if "." in text:
head, tail = text.split(".", 1)
fraction = []
suffix_start = len(tail)
for index, char in enumerate(tail):
if not char.isdigit():
suffix_start = index
break
fraction.append(char)
frac_text = "".join(fraction)
suffix = tail[suffix_start:]
if len(frac_text) > 6:
frac_text = frac_text[:6]
text = f"{head}.{frac_text}{suffix}"
parsed = datetime.fromisoformat(text)
if parsed.tzinfo is None:
parsed = parsed.replace(tzinfo=timezone.utc)
return parsed.astimezone(timezone.utc)
def chunked(values: list[str], size: int) -> Iterable[list[str]]:
for start in range(0, len(values), size):
yield values[start : start + size]
def current_disk_bytes(path: str) -> dict[str, int]:
result = run_command(["df", "-PB1", path])
lines = [line for line in result.stdout.splitlines() if line.strip()]
if len(lines) < 2:
return {"size_bytes": 0, "used_bytes": 0, "available_bytes": 0, "used_percent": 0}
parts = lines[1].split()
size = int(parts[1])
used = int(parts[2])
avail = int(parts[3])
used_percent = int(parts[4].rstrip("%"))
return {
"size_bytes": size,
"used_bytes": used,
"available_bytes": avail,
"used_percent": used_percent,
}
def get_container_image_ids(docker_bin: str) -> set[str]:
containers = docker(["ps", "-aq", "--no-trunc"], docker_bin).stdout.split()
protected: set[str] = set()
for group in chunked(containers, 100):
if not group:
continue
result = docker(["inspect", "--format", "{{.Image}}", *group], docker_bin)
for line in result.stdout.splitlines():
image_id = normalize_image_id(line)
if image_id:
protected.add(image_id)
return protected
def get_dangling_images(docker_bin: str) -> list[ImageInfo]:
image_ids = docker(
["image", "ls", "--filter", "dangling=true", "--quiet", "--no-trunc"],
docker_bin,
).stdout.split()
images: list[ImageInfo] = []
for group in chunked([normalize_image_id(value) for value in image_ids], 100):
if not group:
continue
result = docker(["image", "inspect", *group], docker_bin)
payload = json.loads(result.stdout or "[]")
for item in payload:
image_id = normalize_image_id(str(item.get("Id") or ""))
if not image_id:
continue
tags = item.get("RepoTags") or []
images.append(
ImageInfo(
image_id=image_id,
created_at=parse_docker_datetime(str(item.get("Created") or "")),
size_bytes=int(item.get("Size") or 0),
repo_tags=tuple(str(tag) for tag in tags if tag),
)
)
return images
def select_dangling_image_removals(
images: list[ImageInfo],
protected_ids: set[str],
*,
now: datetime,
min_age_hours: int,
keep_newest: int,
) -> list[ImageInfo]:
cutoff_seconds = min_age_hours * 3600
dangling = [
image
for image in images
if normalize_image_id(image.image_id) not in protected_ids
and not image.repo_tags
and (now - image.created_at).total_seconds() >= cutoff_seconds
]
dangling.sort(key=lambda image: image.created_at, reverse=True)
if keep_newest > 0:
dangling = dangling[keep_newest:]
return sorted(dangling, key=lambda image: image.created_at)
def summarize_images(images: list[ImageInfo]) -> dict[str, Any]:
return {
"count": len(images),
"estimated_total_size_bytes": sum(image.size_bytes for image in images),
"oldest_created_at": images[0].created_at.isoformat() if images else None,
"newest_created_at": images[-1].created_at.isoformat() if images else None,
"sample_image_ids": [image.image_id[:12] for image in images[:20]],
}
def remove_images(images: list[ImageInfo], docker_bin: str) -> list[str]:
removed: list[str] = []
for group in chunked([image.image_id for image in images], 25):
if not group:
continue
docker(["image", "rm", *group], docker_bin)
removed.extend(group)
return removed
def builder_prune_command(args: argparse.Namespace) -> list[str]:
command = [
args.docker_bin,
"builder",
"prune",
"--force",
"--keep-storage",
args.builder_keep_storage,
]
if args.min_age_hours > 0:
command[4:4] = ["--filter", f"until={args.min_age_hours}h"]
return command
def build_receipt(args: argparse.Namespace) -> dict[str, Any]:
now = datetime.now(timezone.utc)
before = current_disk_bytes(args.disk_path)
protected_ids = get_container_image_ids(args.docker_bin)
dangling_images = get_dangling_images(args.docker_bin)
removal_candidates = (
[]
if args.skip_dangling_images
else select_dangling_image_removals(
dangling_images,
protected_ids,
now=now,
min_age_hours=args.min_age_hours,
keep_newest=args.keep_dangling_newest,
)
)
receipt: dict[str, Any] = {
"schema_version": "awoooi_docker_disk_pressure_retention_cleanup_v1",
"generated_at": now.isoformat(),
"host_label": args.host_label,
"mode": "apply" if args.apply else "dry_run",
"disk_path": args.disk_path,
"boundaries": {
"touches_containers": False,
"touches_volumes": False,
"touches_databases": False,
"touches_backups": False,
"uses_docker_system_prune": False,
"removes_only_unreferenced_dangling_images": True,
"builder_cache_cleanup_requires_explicit_flag": True,
},
"parameters": {
"min_age_hours": args.min_age_hours,
"keep_dangling_newest": args.keep_dangling_newest,
"include_builder_cache": args.include_builder_cache,
"builder_keep_storage": args.builder_keep_storage,
"skip_dangling_images": args.skip_dangling_images,
},
"disk_before": before,
"protected_container_image_count": len(protected_ids),
"dangling_image_total_count": len(dangling_images),
"dangling_image_removal_plan": summarize_images(removal_candidates),
"builder_cache_command": builder_prune_command(args)[1:] if args.include_builder_cache else None,
"removed_image_ids": [],
"builder_cache_cleanup_executed": False,
}
if args.apply:
receipt["removed_image_ids"] = [image[:12] for image in remove_images(removal_candidates, args.docker_bin)]
if args.include_builder_cache:
run_command(builder_prune_command(args), capture_output=True)
receipt["builder_cache_cleanup_executed"] = True
receipt["disk_after"] = current_disk_bytes(args.disk_path)
return receipt
def main() -> int:
args = parse_args()
if args.min_age_hours < 0:
print("min-age-hours must be >= 0", file=sys.stderr)
return 2
if args.min_age_hours == 0 and not args.skip_dangling_images:
print("min-age-hours=0 requires --skip-dangling-images", file=sys.stderr)
return 2
if args.keep_dangling_newest < 0:
print("keep-dangling-newest must be >= 0", file=sys.stderr)
return 2
receipt = build_receipt(args)
text = json.dumps(receipt, ensure_ascii=False, indent=2, sort_keys=True) + "\n"
if args.output:
args.output.parent.mkdir(parents=True, exist_ok=True)
args.output.write_text(text, encoding="utf-8")
print(text, end="")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,110 @@
from __future__ import annotations
import importlib.util
import sys
from datetime import datetime, timedelta, timezone
from pathlib import Path
from types import SimpleNamespace
ROOT = Path(__file__).resolve().parents[3]
SCRIPT = ROOT / "scripts" / "ops" / "docker-disk-pressure-retention-cleanup.py"
spec = importlib.util.spec_from_file_location("docker_disk_pressure_retention_cleanup", SCRIPT)
module = importlib.util.module_from_spec(spec)
assert spec and spec.loader
sys.modules[spec.name] = module
spec.loader.exec_module(module)
def image(image_id: str, created_at: datetime, tags: tuple[str, ...] = ()):
return module.ImageInfo(
image_id=image_id,
created_at=created_at,
size_bytes=1024,
repo_tags=tags,
)
def test_select_dangling_images_keeps_newest_and_protects_running_images() -> None:
now = datetime(2026, 7, 1, 12, tzinfo=timezone.utc)
images = [
image("sha256:running", now - timedelta(hours=72)),
image("oldest", now - timedelta(hours=72)),
image("middle", now - timedelta(hours=48)),
image("newest", now - timedelta(hours=30)),
image("too_recent", now - timedelta(hours=2)),
image("tagged", now - timedelta(hours=72), ("repo:tag",)),
]
selected = module.select_dangling_image_removals(
images,
{"running"},
now=now,
min_age_hours=24,
keep_newest=1,
)
assert [item.image_id for item in selected] == ["oldest", "middle"]
def test_builder_prune_command_is_bounded_by_age_and_keep_storage() -> None:
args = SimpleNamespace(
docker_bin="docker",
min_age_hours=36,
builder_keep_storage="40GB",
)
assert module.builder_prune_command(args) == [
"docker",
"builder",
"prune",
"--force",
"--filter",
"until=36h",
"--keep-storage",
"40GB",
]
def test_parse_docker_datetime_accepts_nanosecond_fraction() -> None:
parsed = module.parse_docker_datetime("2026-07-01T23:29:21.919867918+08:00")
assert parsed.isoformat() == "2026-07-01T15:29:21.919867+00:00"
def test_summary_never_reports_volumes_or_container_cleanup_boundary() -> None:
now = datetime(2026, 7, 1, 12, tzinfo=timezone.utc)
selected = module.select_dangling_image_removals(
[image("old", now - timedelta(days=3))],
set(),
now=now,
min_age_hours=24,
keep_newest=0,
)
summary = module.summarize_images(selected)
assert summary["count"] == 1
assert summary["sample_image_ids"] == ["old"]
def test_cli_exposes_builder_cache_only_flag() -> None:
help_text = module.run_command(["python3", str(SCRIPT), "--help"]).stdout
assert "--skip-dangling-images" in help_text
def test_zero_age_builder_prune_omits_until_filter() -> None:
args = SimpleNamespace(
docker_bin="docker",
min_age_hours=0,
builder_keep_storage="1GB",
)
assert module.builder_prune_command(args) == [
"docker",
"builder",
"prune",
"--force",
"--keep-storage",
"1GB",
]