#!/usr/bin/env python3 """Bounded Docker disk-pressure cleanup for host reboot recovery. This controller intentionally avoids `docker system prune`, volumes, containers, running images, databases, backups, and logs. It only removes dangling images that are not referenced by any container, and can optionally run a bounded BuildKit cache cleanup with an explicit keep-storage floor. """ from __future__ import annotations import argparse import json import subprocess import sys from dataclasses import dataclass from datetime import datetime, timezone from pathlib import Path from typing import Any, Iterable DEFAULT_MIN_AGE_HOURS = 24 DEFAULT_KEEP_DANGLING_NEWEST = 20 DEFAULT_BUILDER_KEEP_STORAGE = "30GB" @dataclass(frozen=True) class ImageInfo: image_id: str created_at: datetime size_bytes: int repo_tags: tuple[str, ...] def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser( description="Safely reclaim Docker disk space without touching volumes or containers.", ) parser.add_argument("--apply", action="store_true", help="Actually remove selected images/cache.") parser.add_argument("--docker-bin", default="docker") parser.add_argument("--disk-path", default="/") parser.add_argument("--host-label", default="") parser.add_argument("--min-age-hours", type=int, default=DEFAULT_MIN_AGE_HOURS) parser.add_argument("--keep-dangling-newest", type=int, default=DEFAULT_KEEP_DANGLING_NEWEST) parser.add_argument( "--skip-dangling-images", action="store_true", help="Do not remove dangling images; useful for builder-cache-only follow-up cleanup.", ) parser.add_argument( "--include-builder-cache", action="store_true", help="Also run docker builder prune with --filter until and --keep-storage.", ) parser.add_argument("--builder-keep-storage", default=DEFAULT_BUILDER_KEEP_STORAGE) parser.add_argument("--output", type=Path, help="Optional JSON receipt path.") return parser.parse_args() def run_command( args: list[str], *, check: bool = True, capture_output: bool = True, ) -> subprocess.CompletedProcess[str]: return subprocess.run( args, check=check, text=True, capture_output=capture_output, ) def docker(args: list[str], docker_bin: str) -> subprocess.CompletedProcess[str]: return run_command([docker_bin, *args]) def normalize_image_id(value: str) -> str: value = value.strip() if value.startswith("sha256:"): value = value.split(":", 1)[1] return value def parse_docker_datetime(value: str) -> datetime: text = value.strip() if text.endswith("Z"): text = text[:-1] + "+00:00" if "." in text: head, tail = text.split(".", 1) fraction = [] suffix_start = len(tail) for index, char in enumerate(tail): if not char.isdigit(): suffix_start = index break fraction.append(char) frac_text = "".join(fraction) suffix = tail[suffix_start:] if len(frac_text) > 6: frac_text = frac_text[:6] text = f"{head}.{frac_text}{suffix}" parsed = datetime.fromisoformat(text) if parsed.tzinfo is None: parsed = parsed.replace(tzinfo=timezone.utc) return parsed.astimezone(timezone.utc) def chunked(values: list[str], size: int) -> Iterable[list[str]]: for start in range(0, len(values), size): yield values[start : start + size] def current_disk_bytes(path: str) -> dict[str, int]: result = run_command(["df", "-PB1", path]) lines = [line for line in result.stdout.splitlines() if line.strip()] if len(lines) < 2: return {"size_bytes": 0, "used_bytes": 0, "available_bytes": 0, "used_percent": 0} parts = lines[1].split() size = int(parts[1]) used = int(parts[2]) avail = int(parts[3]) used_percent = int(parts[4].rstrip("%")) return { "size_bytes": size, "used_bytes": used, "available_bytes": avail, "used_percent": used_percent, } def get_container_image_ids(docker_bin: str) -> set[str]: containers = docker(["ps", "-aq", "--no-trunc"], docker_bin).stdout.split() protected: set[str] = set() for group in chunked(containers, 100): if not group: continue result = docker(["inspect", "--format", "{{.Image}}", *group], docker_bin) for line in result.stdout.splitlines(): image_id = normalize_image_id(line) if image_id: protected.add(image_id) return protected def get_dangling_images(docker_bin: str) -> list[ImageInfo]: image_ids = docker( ["image", "ls", "--filter", "dangling=true", "--quiet", "--no-trunc"], docker_bin, ).stdout.split() images: list[ImageInfo] = [] for group in chunked([normalize_image_id(value) for value in image_ids], 100): if not group: continue result = docker(["image", "inspect", *group], docker_bin) payload = json.loads(result.stdout or "[]") for item in payload: image_id = normalize_image_id(str(item.get("Id") or "")) if not image_id: continue tags = item.get("RepoTags") or [] images.append( ImageInfo( image_id=image_id, created_at=parse_docker_datetime(str(item.get("Created") or "")), size_bytes=int(item.get("Size") or 0), repo_tags=tuple(str(tag) for tag in tags if tag), ) ) return images def select_dangling_image_removals( images: list[ImageInfo], protected_ids: set[str], *, now: datetime, min_age_hours: int, keep_newest: int, ) -> list[ImageInfo]: cutoff_seconds = min_age_hours * 3600 dangling = [ image for image in images if normalize_image_id(image.image_id) not in protected_ids and not image.repo_tags and (now - image.created_at).total_seconds() >= cutoff_seconds ] dangling.sort(key=lambda image: image.created_at, reverse=True) if keep_newest > 0: dangling = dangling[keep_newest:] return sorted(dangling, key=lambda image: image.created_at) def summarize_images(images: list[ImageInfo]) -> dict[str, Any]: return { "count": len(images), "estimated_total_size_bytes": sum(image.size_bytes for image in images), "oldest_created_at": images[0].created_at.isoformat() if images else None, "newest_created_at": images[-1].created_at.isoformat() if images else None, "sample_image_ids": [image.image_id[:12] for image in images[:20]], } def remove_images(images: list[ImageInfo], docker_bin: str) -> list[str]: removed: list[str] = [] for group in chunked([image.image_id for image in images], 25): if not group: continue docker(["image", "rm", *group], docker_bin) removed.extend(group) return removed def builder_prune_command(args: argparse.Namespace) -> list[str]: command = [ args.docker_bin, "builder", "prune", "--force", "--keep-storage", args.builder_keep_storage, ] if args.min_age_hours > 0: command[4:4] = ["--filter", f"until={args.min_age_hours}h"] return command def build_receipt(args: argparse.Namespace) -> dict[str, Any]: now = datetime.now(timezone.utc) before = current_disk_bytes(args.disk_path) protected_ids = get_container_image_ids(args.docker_bin) dangling_images = get_dangling_images(args.docker_bin) removal_candidates = ( [] if args.skip_dangling_images else select_dangling_image_removals( dangling_images, protected_ids, now=now, min_age_hours=args.min_age_hours, keep_newest=args.keep_dangling_newest, ) ) receipt: dict[str, Any] = { "schema_version": "awoooi_docker_disk_pressure_retention_cleanup_v1", "generated_at": now.isoformat(), "host_label": args.host_label, "mode": "apply" if args.apply else "dry_run", "disk_path": args.disk_path, "boundaries": { "touches_containers": False, "touches_volumes": False, "touches_databases": False, "touches_backups": False, "uses_docker_system_prune": False, "removes_only_unreferenced_dangling_images": True, "builder_cache_cleanup_requires_explicit_flag": True, }, "parameters": { "min_age_hours": args.min_age_hours, "keep_dangling_newest": args.keep_dangling_newest, "include_builder_cache": args.include_builder_cache, "builder_keep_storage": args.builder_keep_storage, "skip_dangling_images": args.skip_dangling_images, }, "disk_before": before, "protected_container_image_count": len(protected_ids), "dangling_image_total_count": len(dangling_images), "dangling_image_removal_plan": summarize_images(removal_candidates), "builder_cache_command": builder_prune_command(args)[1:] if args.include_builder_cache else None, "removed_image_ids": [], "builder_cache_cleanup_executed": False, } if args.apply: receipt["removed_image_ids"] = [image[:12] for image in remove_images(removal_candidates, args.docker_bin)] if args.include_builder_cache: run_command(builder_prune_command(args), capture_output=True) receipt["builder_cache_cleanup_executed"] = True receipt["disk_after"] = current_disk_bytes(args.disk_path) return receipt def main() -> int: args = parse_args() if args.min_age_hours < 0: print("min-age-hours must be >= 0", file=sys.stderr) return 2 if args.min_age_hours == 0 and not args.skip_dangling_images: print("min-age-hours=0 requires --skip-dangling-images", file=sys.stderr) return 2 if args.keep_dangling_newest < 0: print("keep-dangling-newest must be >= 0", file=sys.stderr) return 2 receipt = build_receipt(args) text = json.dumps(receipt, ensure_ascii=False, indent=2, sort_keys=True) + "\n" if args.output: args.output.parent.mkdir(parents=True, exist_ok=True) args.output.write_text(text, encoding="utf-8") print(text, end="") return 0 if __name__ == "__main__": raise SystemExit(main())