feat(ops): 新增 host runaway process aiops guard
Some checks failed
Code Review / ai-code-review (push) Successful in 14s
Deploy Alert Rules / Deploy Prometheus Alert Rules (push) Failing after 26s
Ansible / Reboot Recovery Contract / validate (push) Has been cancelled

This commit is contained in:
Your Name
2026-06-18 14:15:18 +08:00
parent 2862d24307
commit ff18872a23
14 changed files with 1138 additions and 8 deletions

View File

@@ -0,0 +1,390 @@
#!/usr/bin/env python3
"""
Host runaway process textfile exporter for AWOOOI AIOps.
This exporter is read-only. It classifies orphaned headless browser/smoke
process groups separately from legitimate Gitea Actions load so host CPU alerts
can point to a concrete PlayBook instead of a generic "high CPU" symptom.
"""
from __future__ import annotations
import argparse
import os
import re
import subprocess
import tempfile
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Iterable
TEXTFILE_DIR = Path(os.environ.get("NODE_EXPORTER_TEXTFILE_DIR", "/var/lib/node_exporter/textfile_collector"))
OUTPUT_NAME = "host_runaway_process.prom"
HOST_LABEL = os.environ.get("AIOPS_HOST_LABEL", os.uname().nodename)
LABEL_RE = re.compile(r'["\\\n]')
@dataclass(frozen=True)
class ProcessRow:
pid: int
ppid: int
pgid: int
sid: int
etimes: int
pcpu: float
stat: str
comm: str
args: str
@dataclass(frozen=True)
class RunawayRule:
rule_id: str
command_pattern: re.Pattern[str]
context_pattern: re.Pattern[str]
@dataclass(frozen=True)
class ProcessGroup:
rule_id: str
pgid: int
rows: tuple[ProcessRow, ...]
cpu_percent: float
oldest_age_seconds: int
orphan_reason: str
sample_comm: str
DEFAULT_RULES = (
RunawayRule(
"stockplatform_headless_smoke",
re.compile(r"(chrome|chromium|playwright)", re.IGNORECASE),
re.compile(r"stockplatform-review-bulk-ux|/tmp/stockplatform", re.IGNORECASE),
),
RunawayRule(
"headless_browser_smoke",
re.compile(r"(chrome|chromium|playwright)", re.IGNORECASE),
re.compile(r"--headless|--user-data-dir=/tmp|/tmp/.*(smoke|ux|playwright)", re.IGNORECASE),
),
)
def escape_label(value: str) -> str:
return LABEL_RE.sub(lambda m: {"\n": r"\n", "\\": r"\\", '"': r"\""}[m.group(0)], value)
def run_text(command: list[str], timeout: int = 20) -> str:
return subprocess.run(command, check=True, capture_output=True, text=True, timeout=timeout).stdout
def read_ps_text(ps_file: Path | None = None) -> str:
if ps_file:
return ps_file.read_text(encoding="utf-8")
linux_command = [
"ps",
"-eo",
"pid=,ppid=,pgid=,sid=,etimes=,pcpu=,stat=,comm=,args=",
]
try:
return run_text(linux_command)
except (subprocess.CalledProcessError, subprocess.TimeoutExpired):
return run_text(
[
"ps",
"-axo",
"pid=,ppid=,pgid=,sess=,etime=,pcpu=,stat=,comm=,command=",
]
)
def elapsed_to_seconds(value: str) -> int:
try:
return int(float(value))
except ValueError:
pass
days = 0
clock = value
if "-" in value:
raw_days, clock = value.split("-", 1)
days = int(raw_days)
parts = [int(part) for part in clock.split(":")]
if len(parts) == 3:
hours, minutes, seconds = parts
elif len(parts) == 2:
hours = 0
minutes, seconds = parts
else:
hours = 0
minutes = 0
seconds = parts[0]
return days * 86400 + hours * 3600 + minutes * 60 + seconds
def parse_ps_rows(text: str) -> list[ProcessRow]:
rows: list[ProcessRow] = []
for line in text.splitlines():
raw = line.strip()
if not raw:
continue
parts = raw.split(None, 8)
if len(parts) < 9:
continue
try:
rows.append(
ProcessRow(
pid=int(parts[0]),
ppid=int(parts[1]),
pgid=int(parts[2]),
sid=int(parts[3]),
etimes=elapsed_to_seconds(parts[4]),
pcpu=float(parts[5]),
stat=parts[6],
comm=parts[7],
args=parts[8],
)
)
except ValueError:
continue
return rows
def matching_rule(row: ProcessRow, rules: Iterable[RunawayRule] = DEFAULT_RULES) -> str | None:
haystack = f"{row.comm} {row.args}"
for rule in rules:
if rule.command_pattern.search(haystack) and rule.context_pattern.search(haystack):
return rule.rule_id
return None
def orphan_reason(rows: list[ProcessRow], all_pids: set[int]) -> str | None:
if any(row.ppid == 1 for row in rows):
return "ppid_1"
pgid = rows[0].pgid
if pgid not in all_pids:
return "missing_group_leader"
return None
def classify_groups(
rows: list[ProcessRow],
*,
min_age_seconds: int,
min_cpu_percent: float,
) -> list[ProcessGroup]:
all_pids = {row.pid for row in rows}
grouped: dict[tuple[str, int], list[ProcessRow]] = {}
for row in rows:
rule_id = matching_rule(row)
if rule_id is None:
continue
grouped.setdefault((rule_id, row.pgid), []).append(row)
groups: list[ProcessGroup] = []
for (rule_id, pgid), members in grouped.items():
reason = orphan_reason(members, all_pids)
if reason is None:
continue
oldest = max(row.etimes for row in members)
cpu_percent = sum(row.pcpu for row in members)
if oldest < min_age_seconds or cpu_percent < min_cpu_percent:
continue
sample_comm = sorted({row.comm for row in members})[0][:48]
groups.append(
ProcessGroup(
rule_id=rule_id,
pgid=pgid,
rows=tuple(sorted(members, key=lambda row: row.pid)),
cpu_percent=cpu_percent,
oldest_age_seconds=oldest,
orphan_reason=reason,
sample_comm=sample_comm,
)
)
return sorted(groups, key=lambda group: (-group.cpu_percent, group.rule_id, group.pgid))
def active_gitea_action_containers(docker_file: Path | None = None) -> int:
try:
if docker_file:
names = docker_file.read_text(encoding="utf-8").splitlines()
else:
names = run_text(["docker", "ps", "--format", "{{.Names}}"], timeout=10).splitlines()
except Exception:
return -1
return sum(1 for name in names if "GITEA-ACTIONS-TASK-" in name)
def load5_per_core() -> float:
try:
load5 = float(Path("/proc/loadavg").read_text(encoding="utf-8").split()[1])
except Exception:
try:
load5 = os.getloadavg()[1]
except OSError:
return 0.0
cores = os.cpu_count() or 1
return load5 / cores
def swap_used_ratio(meminfo_file: Path | None = None) -> float:
path = meminfo_file or Path("/proc/meminfo")
try:
values: dict[str, float] = {}
for line in path.read_text(encoding="utf-8").splitlines():
key, _, raw = line.partition(":")
if key in {"SwapTotal", "SwapFree"}:
values[key] = float(raw.strip().split()[0]) * 1024
total = values.get("SwapTotal", 0.0)
free = values.get("SwapFree", 0.0)
if total <= 0:
return 0.0
return max(0.0, min(1.0, (total - free) / total))
except Exception:
return 0.0
def render_metrics(
*,
host: str,
groups: list[ProcessGroup],
active_action_containers: int,
min_age_seconds: int,
min_cpu_percent: float,
now: int,
load_ratio: float,
swap_ratio: float,
) -> str:
labels_host = f'host="{escape_label(host)}"'
rule_ids = sorted({rule.rule_id for rule in DEFAULT_RULES})
by_rule = {rule_id: [group for group in groups if group.rule_id == rule_id] for rule_id in rule_ids}
lines = [
"# HELP awoooi_host_runaway_process_monitor_up Whether the host runaway process exporter completed.",
"# TYPE awoooi_host_runaway_process_monitor_up gauge",
"# HELP awoooi_host_runaway_process_last_run_timestamp Unix timestamp of the last exporter run.",
"# TYPE awoooi_host_runaway_process_last_run_timestamp gauge",
"# HELP awoooi_host_runaway_browser_orphan_group_count Count of orphaned browser/smoke process groups above thresholds.",
"# TYPE awoooi_host_runaway_browser_orphan_group_count gauge",
"# HELP awoooi_host_runaway_browser_orphan_process_count Count of orphaned browser/smoke processes above thresholds.",
"# TYPE awoooi_host_runaway_browser_orphan_process_count gauge",
"# HELP awoooi_host_runaway_browser_orphan_cpu_percent Sum CPU percent for orphaned browser/smoke process groups above thresholds.",
"# TYPE awoooi_host_runaway_browser_orphan_cpu_percent gauge",
"# HELP awoooi_host_runaway_browser_orphan_oldest_age_seconds Oldest age of matching orphaned process groups.",
"# TYPE awoooi_host_runaway_browser_orphan_oldest_age_seconds gauge",
"# HELP awoooi_host_runaway_browser_orphan_group_cpu_percent CPU percent for an individual orphaned browser/smoke process group.",
"# TYPE awoooi_host_runaway_browser_orphan_group_cpu_percent gauge",
"# HELP awoooi_host_runaway_browser_orphan_group_info Metadata for an individual orphaned browser/smoke process group.",
"# TYPE awoooi_host_runaway_browser_orphan_group_info gauge",
"# HELP awoooi_host_gitea_actions_active_container_count Active Gitea Actions task containers visible on the host, -1 when Docker is unavailable.",
"# TYPE awoooi_host_gitea_actions_active_container_count gauge",
"# HELP awoooi_host_load5_per_core Host load5 divided by CPU core count.",
"# TYPE awoooi_host_load5_per_core gauge",
"# HELP awoooi_host_swap_used_ratio Host swap used ratio from /proc/meminfo.",
"# TYPE awoooi_host_swap_used_ratio gauge",
"# HELP awoooi_host_runaway_process_remediation_authorized Static guardrail: remediation is not authorized by this exporter.",
"# TYPE awoooi_host_runaway_process_remediation_authorized gauge",
f"awoooi_host_runaway_process_monitor_up{{{labels_host},mode=\"read_only\"}} 1",
f"awoooi_host_runaway_process_last_run_timestamp{{{labels_host}}} {now}",
f"awoooi_host_gitea_actions_active_container_count{{{labels_host}}} {active_action_containers}",
f"awoooi_host_load5_per_core{{{labels_host}}} {load_ratio:.6f}",
f"awoooi_host_swap_used_ratio{{{labels_host}}} {swap_ratio:.6f}",
f"awoooi_host_runaway_process_remediation_authorized{{{labels_host}}} 0",
]
for rule_id in rule_ids:
rule_labels = (
f'{labels_host},rule="{escape_label(rule_id)}",'
f'min_age_seconds="{min_age_seconds}",min_cpu_percent="{min_cpu_percent:g}"'
)
rule_groups = by_rule[rule_id]
lines.append(f"awoooi_host_runaway_browser_orphan_group_count{{{rule_labels}}} {len(rule_groups)}")
lines.append(
f"awoooi_host_runaway_browser_orphan_process_count{{{rule_labels}}} "
f"{sum(len(group.rows) for group in rule_groups)}"
)
lines.append(
f"awoooi_host_runaway_browser_orphan_cpu_percent{{{rule_labels}}} "
f"{sum(group.cpu_percent for group in rule_groups):.6f}"
)
lines.append(
f"awoooi_host_runaway_browser_orphan_oldest_age_seconds{{{rule_labels}}} "
f"{max((group.oldest_age_seconds for group in rule_groups), default=0)}"
)
for group in groups[:20]:
group_labels = (
f'{labels_host},rule="{escape_label(group.rule_id)}",pgid="{group.pgid}",'
f'orphan_reason="{escape_label(group.orphan_reason)}",comm="{escape_label(group.sample_comm)}"'
)
lines.append(f"awoooi_host_runaway_browser_orphan_group_cpu_percent{{{group_labels}}} {group.cpu_percent:.6f}")
lines.append(f"awoooi_host_runaway_browser_orphan_group_info{{{group_labels}}} 1")
return "\n".join(lines) + "\n"
def collect(args: argparse.Namespace) -> str:
rows = parse_ps_rows(read_ps_text(args.ps_file))
groups = classify_groups(
rows,
min_age_seconds=args.min_age_seconds,
min_cpu_percent=args.min_cpu_percent,
)
return render_metrics(
host=args.host,
groups=groups,
active_action_containers=active_gitea_action_containers(args.docker_ps_file),
min_age_seconds=args.min_age_seconds,
min_cpu_percent=args.min_cpu_percent,
now=int(time.time()),
load_ratio=load5_per_core(),
swap_ratio=swap_used_ratio(args.meminfo_file),
)
def write_textfile(payload: str, textfile_dir: Path, output_name: str) -> Path:
textfile_dir.mkdir(parents=True, exist_ok=True)
with tempfile.NamedTemporaryFile("w", dir=textfile_dir, delete=False, encoding="utf-8") as tmp:
tmp.write(payload)
tmp_path = Path(tmp.name)
output_path = textfile_dir / output_name
tmp_path.replace(output_path)
output_path.chmod(0o644)
return output_path
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Export AWOOOI host runaway process metrics.")
parser.add_argument("--host", default=HOST_LABEL)
parser.add_argument("--textfile-dir", type=Path, default=TEXTFILE_DIR)
parser.add_argument("--output-name", default=OUTPUT_NAME)
parser.add_argument("--stdout", action="store_true", help="Print metrics instead of writing the textfile.")
parser.add_argument("--ps-file", type=Path, help="Use a fixture file instead of running ps.")
parser.add_argument("--docker-ps-file", type=Path, help="Use a fixture file instead of docker ps.")
parser.add_argument("--meminfo-file", type=Path, help="Use a fixture file instead of /proc/meminfo.")
parser.add_argument(
"--min-age-seconds",
type=int,
default=int(os.environ.get("AIOPS_RUNAWAY_PROCESS_MIN_AGE_SECONDS", "1800")),
)
parser.add_argument(
"--min-cpu-percent",
type=float,
default=float(os.environ.get("AIOPS_RUNAWAY_PROCESS_MIN_CPU_PERCENT", "50")),
)
return parser.parse_args()
def main() -> None:
args = parse_args()
payload = collect(args)
if args.stdout:
print(payload, end="")
return
output_path = write_textfile(payload, args.textfile_dir, args.output_name)
print(f"HOST_RUNAWAY_PROCESS_EXPORTER_OK output={output_path}")
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,165 @@
#!/usr/bin/env python3
"""
Gated remediation helper for AWOOOI host runaway process groups.
Default mode is dry-run. Applying SIGTERM requires explicit owner approval,
maintenance window, evidence reference, and --confirm-apply. This script is a
PlayBook primitive, not a background auto-kill daemon.
"""
from __future__ import annotations
import argparse
import importlib.util
import json
import os
import signal
import sys
import time
from pathlib import Path
from types import ModuleType
EXPORTER_PATH = Path(__file__).with_name("host-runaway-process-exporter.py")
def load_exporter() -> ModuleType:
spec = importlib.util.spec_from_file_location("host_runaway_process_exporter", EXPORTER_PATH)
if spec is None or spec.loader is None:
raise RuntimeError(f"cannot load exporter module: {EXPORTER_PATH}")
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Dry-run or gated SIGTERM for AWOOOI runaway process groups.")
parser.add_argument("--host", default=os.environ.get("AIOPS_HOST_LABEL", os.uname().nodename))
parser.add_argument("--rule", help="Limit candidates to one rule id. Required with --apply.")
parser.add_argument("--ps-file", type=Path, help="Use a fixture ps file for tests or offline review.")
parser.add_argument("--min-age-seconds", type=int, default=1800)
parser.add_argument("--min-cpu-percent", type=float, default=50)
parser.add_argument("--apply", action="store_true", help="Send SIGTERM to matching process groups.")
parser.add_argument("--confirm-apply", action="store_true", help="Required together with --apply.")
parser.add_argument("--owner-approval-id", default="")
parser.add_argument("--maintenance-window-id", default="")
parser.add_argument("--evidence-ref", default="")
parser.add_argument("--wait-seconds", type=int, default=0, help="Optional wait after SIGTERM before re-reading ps.")
return parser.parse_args()
def validate_apply_args(args: argparse.Namespace) -> None:
if not args.apply:
return
missing = []
if not args.confirm_apply:
missing.append("--confirm-apply")
if not args.rule:
missing.append("--rule")
if not args.owner_approval_id:
missing.append("--owner-approval-id")
if not args.maintenance_window_id:
missing.append("--maintenance-window-id")
if not args.evidence_ref:
missing.append("--evidence-ref")
if missing:
raise SystemExit(
"Refusing apply; missing required gates: "
+ ", ".join(missing)
+ ". Use dry-run output for the PlayBook packet first."
)
def current_process_group() -> int:
try:
return os.getpgrp()
except Exception:
return -1
def main() -> None:
args = parse_args()
validate_apply_args(args)
exporter = load_exporter()
rows = exporter.parse_ps_rows(exporter.read_ps_text(args.ps_file))
groups = exporter.classify_groups(
rows,
min_age_seconds=args.min_age_seconds,
min_cpu_percent=args.min_cpu_percent,
)
if args.rule:
groups = [group for group in groups if group.rule_id == args.rule]
own_pgrp = current_process_group()
candidates = []
for group in groups:
blocked_reason = None
if group.pgid <= 1:
blocked_reason = "unsafe_pgid"
elif group.pgid == own_pgrp:
blocked_reason = "own_process_group"
candidates.append(
{
"rule": group.rule_id,
"pgid": group.pgid,
"process_count": len(group.rows),
"cpu_percent": round(group.cpu_percent, 3),
"oldest_age_seconds": group.oldest_age_seconds,
"orphan_reason": group.orphan_reason,
"sample_comm": group.sample_comm,
"blocked_reason": blocked_reason,
"action": "skip" if blocked_reason else ("sigterm" if args.apply else "dry_run"),
}
)
signaled: list[int] = []
if args.apply:
for candidate in candidates:
if candidate["blocked_reason"]:
continue
os.killpg(int(candidate["pgid"]), signal.SIGTERM)
signaled.append(int(candidate["pgid"]))
remaining_after_wait = None
if args.apply and args.wait_seconds > 0:
time.sleep(args.wait_seconds)
fresh_rows = exporter.parse_ps_rows(exporter.read_ps_text(args.ps_file))
fresh_groups = exporter.classify_groups(
fresh_rows,
min_age_seconds=args.min_age_seconds,
min_cpu_percent=args.min_cpu_percent,
)
remaining_after_wait = [
group.pgid for group in fresh_groups if not args.rule or group.rule_id == args.rule
]
payload = {
"schema_version": "host_runaway_process_remediation_v1",
"host": args.host,
"mode": "apply_sigterm" if args.apply else "dry_run",
"runtime_gate": 1 if args.apply else 0,
"owner_approval_id": args.owner_approval_id if args.apply else None,
"maintenance_window_id": args.maintenance_window_id if args.apply else None,
"evidence_ref": args.evidence_ref if args.apply else None,
"min_age_seconds": args.min_age_seconds,
"min_cpu_percent": args.min_cpu_percent,
"candidate_count": len(candidates),
"signaled_process_group_count": len(signaled),
"signaled_process_groups": signaled,
"remaining_after_wait": remaining_after_wait,
"candidates": candidates,
"forbidden_without_gates": [
"sigkill",
"docker_restart",
"systemctl_restart",
"nginx_reload",
"firewall_change",
"secret_collection",
],
}
print(json.dumps(payload, ensure_ascii=False, indent=2, sort_keys=True))
if __name__ == "__main__":
main()

View File

@@ -0,0 +1,144 @@
from __future__ import annotations
import importlib.util
import subprocess
import sys
from pathlib import Path
SCRIPT_ROOT = Path(__file__).resolve().parents[1]
EXPORTER_PATH = SCRIPT_ROOT / "host-runaway-process-exporter.py"
REMEDIATION_PATH = SCRIPT_ROOT / "host-runaway-process-remediation.py"
def load_exporter():
spec = importlib.util.spec_from_file_location("host_runaway_process_exporter", EXPORTER_PATH)
assert spec and spec.loader
module = importlib.util.module_from_spec(spec)
sys.modules[spec.name] = module
spec.loader.exec_module(module)
return module
def test_classifies_orphan_stockplatform_headless_group() -> None:
exporter = load_exporter()
rows = exporter.parse_ps_rows(
"""
100 1 100 100 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa
101 100 100 100 7190 55.0 S chromium /opt/chrome/chromium --type=renderer /tmp/stockplatform-review-bulk-ux-aa
200 10 200 200 600 90.0 S node pnpm --filter @awoooi/web build
"""
)
groups = exporter.classify_groups(rows, min_age_seconds=1800, min_cpu_percent=50)
assert len(groups) == 1
assert groups[0].rule_id == "stockplatform_headless_smoke"
assert groups[0].pgid == 100
assert groups[0].orphan_reason == "ppid_1"
assert groups[0].cpu_percent == 120.0
assert len(groups[0].rows) == 2
def test_ignores_non_orphan_or_young_browser_processes() -> None:
exporter = load_exporter()
rows = exporter.parse_ps_rows(
"""
100 99 100 100 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa
101 100 100 100 7190 55.0 S chromium /opt/chrome/chromium /tmp/stockplatform-review-bulk-ux-aa
300 1 300 300 60 120.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-bb
"""
)
assert exporter.classify_groups(rows, min_age_seconds=1800, min_cpu_percent=50) == []
def test_parses_bsd_elapsed_time_for_local_smoke() -> None:
exporter = load_exporter()
rows = exporter.parse_ps_rows(
"""
100 1 100 100 01:00:00 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa
101 100 100 100 2-00:00:10 55.0 S chromium /opt/chrome/chromium /tmp/stockplatform-review-bulk-ux-aa
"""
)
assert rows[0].etimes == 3600
assert rows[1].etimes == 172810
def test_renders_ci_load_and_swap_without_authorizing_repair(tmp_path: Path) -> None:
exporter = load_exporter()
groups = exporter.classify_groups(
exporter.parse_ps_rows(
"100 1 100 100 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa"
),
min_age_seconds=1800,
min_cpu_percent=50,
)
metrics = exporter.render_metrics(
host="110",
groups=groups,
active_action_containers=3,
min_age_seconds=1800,
min_cpu_percent=50,
now=123,
load_ratio=1.25,
swap_ratio=1.0,
)
assert 'awoooi_host_runaway_process_monitor_up{host="110",mode="read_only"} 1' in metrics
assert 'awoooi_host_gitea_actions_active_container_count{host="110"} 3' in metrics
assert 'awoooi_host_swap_used_ratio{host="110"} 1.000000' in metrics
assert 'awoooi_host_runaway_process_remediation_authorized{host="110"} 0' in metrics
assert 'rule="stockplatform_headless_smoke"' in metrics
def test_remediation_defaults_to_dry_run(tmp_path: Path) -> None:
ps_file = tmp_path / "ps.txt"
ps_file.write_text(
"100 1 100 100 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa\n",
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(REMEDIATION_PATH),
"--ps-file",
str(ps_file),
"--rule",
"stockplatform_headless_smoke",
],
check=True,
capture_output=True,
text=True,
)
assert '"mode": "dry_run"' in result.stdout
assert '"runtime_gate": 0' in result.stdout
assert '"action": "dry_run"' in result.stdout
def test_remediation_refuses_apply_without_gates(tmp_path: Path) -> None:
ps_file = tmp_path / "ps.txt"
ps_file.write_text(
"100 1 100 100 7200 65.0 S chrome /opt/chrome/chrome --headless --user-data-dir=/tmp/stockplatform-review-bulk-ux-aa\n",
encoding="utf-8",
)
result = subprocess.run(
[
sys.executable,
str(REMEDIATION_PATH),
"--ps-file",
str(ps_file),
"--apply",
"--rule",
"stockplatform_headless_smoke",
],
capture_output=True,
text=True,
)
assert result.returncode != 0
assert "Refusing apply" in result.stderr