Monthly tooling refresh per [[update-tooling-dependencies]]: - prek: trufflehog v3.95.3, kingfisher v1.101.0, ruff v0.15.14, ansible-core 2.21.0 - fly proxy: nginx 1.30.1-alpine, alloy v1.16.1 - mise-tasks: typer==0.26.2 across all scripts - tailscale held at v1.94.2 (v1.96.5+ MagicDNS regression)
307 lines
10 KiB
Text
Executable file
307 lines
10 KiB
Text
Executable file
#!/usr/bin/env -S uv run --script
|
|
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = ["httpx==0.28.1", "rich==15.0.0", "typer==0.26.2"]
|
|
# ///
|
|
#MISE description="List recent Forgejo Actions runs or fetch logs for a specific job"
|
|
#USAGE arg "[run_number]" help="Run number to show jobs for (omit to list recent runs)"
|
|
#USAGE flag "--job -j <job>" help="Job index (0-based) to fetch logs for"
|
|
#USAGE flag "--runner -r <runner>" help="Filter listing by runner: indri, ringtail, or all"
|
|
#USAGE flag "--repo <repo>" help="Forge repo (owner/name), default: detected from git remote"
|
|
#USAGE flag "--limit -n <limit>" help="Max runs to display (0 for all)"
|
|
#USAGE flag "--token <token>" help="Forgejo API token (default: read from 1Password)"
|
|
"""List recent Forgejo Actions runs and fetch job logs.
|
|
|
|
Usage:
|
|
mise run runner-logs # list recent runs (default 15)
|
|
mise run runner-logs -n 0 # list ALL runs
|
|
mise run runner-logs -r ringtail # list recent ringtail runs
|
|
mise run runner-logs --repo eblume/hermes # list runs for a different repo
|
|
mise run runner-logs 474 # show jobs in run 474
|
|
mise run runner-logs 474 -j 1 # fetch logs for job 1 of run 474
|
|
"""
|
|
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from typing import Annotated
|
|
|
|
import httpx
|
|
import typer
|
|
from rich.console import Console
|
|
from rich.table import Table
|
|
|
|
FORGE_URL = "https://forge.ops.eblu.me"
|
|
FORGE_API = f"{FORGE_URL}/api/v1"
|
|
OP_TOKEN_REF = "op://vg6xf6vvfmoh5hqjjhlhbeoaie/w3663ffnvkewbftncqxtcpeavy/api-token"
|
|
|
|
# Workflows using the ringtail nix-container-builder runner; everything else
|
|
# runs on the indri k8s runner.
|
|
RINGTAIL_WORKFLOWS = {"build-container-nix.yaml"}
|
|
|
|
app = typer.Typer(add_completion=False)
|
|
|
|
|
|
def resolve_token(explicit_token: str | None, console: Console) -> str:
|
|
"""Resolve Forgejo API token: explicit flag > FORGEJO_TOKEN env > 1Password."""
|
|
if explicit_token:
|
|
return explicit_token
|
|
env_token = os.environ.get("FORGEJO_TOKEN", "").strip()
|
|
if env_token:
|
|
return env_token
|
|
console.print("[dim]Reading Forgejo API token from 1Password...[/dim]")
|
|
result = subprocess.run(
|
|
["op", "read", OP_TOKEN_REF],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
return result.stdout.strip()
|
|
|
|
|
|
def detect_repo_from_git() -> str | None:
|
|
"""Sniff owner/repo from the git remote 'origin' URL."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "remote", "get-url", "origin"],
|
|
capture_output=True,
|
|
text=True,
|
|
check=True,
|
|
)
|
|
except (subprocess.CalledProcessError, FileNotFoundError):
|
|
return None
|
|
url = result.stdout.strip()
|
|
# Match SSH (git@host:owner/repo.git) or HTTPS (https://host/owner/repo.git)
|
|
m = re.search(r"[/:]([^/]+/[^/]+?)(?:\.git)?$", url)
|
|
if not m:
|
|
return None
|
|
candidate = m.group(1)
|
|
# Only use it if the remote points at our forge
|
|
if "forge.ops.eblu.me" in url or "forge.eblu.me" in url:
|
|
return candidate
|
|
return None
|
|
|
|
|
|
def runner_for_workflow(workflow_id: str) -> str:
|
|
return "ringtail" if workflow_id in RINGTAIL_WORKFLOWS else "indri"
|
|
|
|
|
|
def auth_headers(token: str) -> dict[str, str]:
|
|
return {"Authorization": f"token {token}"}
|
|
|
|
|
|
def fetch_tasks(repo: str, token: str) -> list[dict]:
|
|
"""Fetch all tasks from the Forgejo API, paginating if needed."""
|
|
tasks: list[dict] = []
|
|
page = 1
|
|
while True:
|
|
resp = httpx.get(
|
|
f"{FORGE_API}/repos/{repo}/actions/tasks",
|
|
params={"page": page, "limit": 50},
|
|
headers=auth_headers(token),
|
|
timeout=15,
|
|
)
|
|
resp.raise_for_status()
|
|
batch = resp.json().get("workflow_runs", [])
|
|
if not batch:
|
|
break
|
|
tasks.extend(batch)
|
|
page += 1
|
|
return tasks
|
|
|
|
|
|
def list_runs(runner: str, repo: str, limit: int, token: str, console: Console) -> None:
|
|
"""List recent workflow runs, grouped by run number."""
|
|
tasks = fetch_tasks(repo, token)
|
|
|
|
# Group tasks by run_number
|
|
runs: dict[int, list[dict]] = {}
|
|
for t in tasks:
|
|
rn = t["run_number"]
|
|
runs.setdefault(rn, []).append(t)
|
|
|
|
table = Table(title=f"Recent runs — {repo} (filter: {runner})")
|
|
table.add_column("Run #", style="cyan", no_wrap=True)
|
|
table.add_column("Status")
|
|
table.add_column("Runner")
|
|
table.add_column("Jobs")
|
|
table.add_column("Title")
|
|
table.add_column("Event")
|
|
|
|
shown = 0
|
|
for rn in sorted(runs, reverse=True):
|
|
if limit > 0 and shown >= limit:
|
|
break
|
|
|
|
jobs = sorted(runs[rn], key=lambda x: x["id"])
|
|
workflow_id = jobs[0].get("workflow_id", "")
|
|
host = runner_for_workflow(workflow_id)
|
|
if runner != "all" and host != runner:
|
|
continue
|
|
|
|
# Aggregate status: worst status wins
|
|
statuses = [j.get("status", "") for j in jobs]
|
|
if "failure" in statuses:
|
|
status, style = "failure", "red"
|
|
elif "running" in statuses or "waiting" in statuses:
|
|
status, style = "running", "yellow"
|
|
elif all(s == "success" for s in statuses):
|
|
status, style = "success", "green"
|
|
else:
|
|
status, style = statuses[0], "yellow"
|
|
|
|
job_names = ", ".join(j.get("name", "?")[:30] for j in jobs)
|
|
title = (jobs[0].get("display_title") or "")[:40]
|
|
event = jobs[0].get("event", "")
|
|
|
|
table.add_row(
|
|
str(rn),
|
|
f"[{style}]{status}[/{style}]",
|
|
host,
|
|
job_names,
|
|
title,
|
|
event,
|
|
)
|
|
shown += 1
|
|
|
|
console.print(table)
|
|
console.print("\n[dim]Use: mise run runner-logs <run#> to see jobs in a run[/dim]")
|
|
console.print("[dim] mise run runner-logs <run#> -j N to fetch logs for job N[/dim]")
|
|
|
|
|
|
def show_jobs(run_number: int, repo: str, token: str, console: Console) -> None:
|
|
"""Show the jobs within a specific run."""
|
|
tasks = fetch_tasks(repo, token)
|
|
|
|
jobs = sorted(
|
|
[t for t in tasks if t["run_number"] == run_number],
|
|
key=lambda x: x["id"],
|
|
)
|
|
if not jobs:
|
|
typer.echo(f"Error: No jobs found for run #{run_number}", err=True)
|
|
raise typer.Exit(1)
|
|
|
|
table = Table(title=f"Jobs in run #{run_number} — {repo}")
|
|
table.add_column("Job #", style="cyan", no_wrap=True)
|
|
table.add_column("Status")
|
|
table.add_column("Name")
|
|
table.add_column("Created")
|
|
|
|
for i, job in enumerate(jobs):
|
|
status = job.get("status", "")
|
|
style = "green" if status == "success" else "red" if status == "failure" else "yellow"
|
|
table.add_row(
|
|
str(i),
|
|
f"[{style}]{status}[/{style}]",
|
|
job.get("name", ""),
|
|
job.get("created_at", ""),
|
|
)
|
|
|
|
console.print(table)
|
|
console.print(f"\n[dim]Use: mise run runner-logs {run_number} -j N to fetch logs for job N[/dim]")
|
|
|
|
|
|
def fetch_log(run_number: int, job_index: int, repo: str, token: str) -> None:
|
|
"""Fetch logs for a specific job via SSH to indri.
|
|
|
|
Forgejo stores action logs as zstd-compressed files on disk at
|
|
~/forgejo/data/actions_log/{owner}/{repo}/{hex_prefix}/{task_id}.log.zst
|
|
regardless of which runner executed the job. The web log endpoint doesn't
|
|
support API-token auth for private repos, so we read the files directly.
|
|
"""
|
|
tasks = fetch_tasks(repo, token)
|
|
jobs = sorted(
|
|
[t for t in tasks if t["run_number"] == run_number],
|
|
key=lambda x: x["id"],
|
|
)
|
|
if not jobs:
|
|
typer.echo(f"Error: No jobs found for run #{run_number}", err=True)
|
|
raise typer.Exit(1)
|
|
if job_index < 0 or job_index >= len(jobs):
|
|
typer.echo(
|
|
f"Error: job index {job_index} out of range (run #{run_number} has {len(jobs)} jobs)",
|
|
err=True,
|
|
)
|
|
raise typer.Exit(1)
|
|
|
|
task_id = jobs[job_index]["id"]
|
|
hex_prefix = f"{task_id & 0xff:02x}"
|
|
log_path = f"~/forgejo/data/actions_log/{repo}/{hex_prefix}/{task_id}.log.zst"
|
|
|
|
result = subprocess.run(
|
|
["ssh", "indri", f"zstdcat {log_path}"],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode != 0:
|
|
typer.echo(
|
|
f"Error: could not read log for run #{run_number} job {job_index} (task {task_id})",
|
|
err=True,
|
|
)
|
|
typer.echo(f"Path: indri:{log_path}", err=True)
|
|
if result.stderr.strip():
|
|
typer.echo(result.stderr.strip(), err=True)
|
|
raise typer.Exit(1)
|
|
sys.stdout.write(result.stdout)
|
|
|
|
|
|
@app.command()
|
|
def main(
|
|
run_number: Annotated[
|
|
int | None,
|
|
typer.Argument(help="Run number to show jobs for (omit to list recent runs)"),
|
|
] = None,
|
|
job: Annotated[
|
|
int | None,
|
|
typer.Option("--job", "-j", help="Job index (0-based) to fetch logs for"),
|
|
] = None,
|
|
runner: Annotated[
|
|
str,
|
|
typer.Option("--runner", "-r", help="Filter listing by runner: indri, ringtail, or all"),
|
|
] = "all",
|
|
repo: Annotated[
|
|
str | None,
|
|
typer.Option("--repo", help="Forge repo (owner/name), default: detected from git remote"),
|
|
] = None,
|
|
limit: Annotated[
|
|
int,
|
|
typer.Option("--limit", "-n", help="Max runs to display (0 for all)"),
|
|
] = 15,
|
|
token: Annotated[
|
|
str | None,
|
|
typer.Option("--token", help="Forgejo API token (default: read from 1Password)"),
|
|
] = None,
|
|
) -> None:
|
|
"""List recent Forgejo Actions runs or fetch logs for a specific job."""
|
|
if runner not in ("indri", "ringtail", "all"):
|
|
typer.echo(f"Error: runner must be 'indri', 'ringtail', or 'all', got '{runner}'")
|
|
raise typer.Exit(1)
|
|
|
|
console = Console()
|
|
|
|
if repo is None:
|
|
repo = detect_repo_from_git()
|
|
if repo is None:
|
|
typer.echo(
|
|
"Error: could not detect repo from git remote; use --repo owner/name",
|
|
err=True,
|
|
)
|
|
raise typer.Exit(1)
|
|
console.print(f"[dim]Detected repo: {repo}[/dim]")
|
|
|
|
resolved_token = resolve_token(token, console)
|
|
|
|
if run_number is None:
|
|
if job is not None:
|
|
typer.echo("Error: --job requires a run number", err=True)
|
|
raise typer.Exit(1)
|
|
list_runs(runner, repo, limit, resolved_token, console)
|
|
elif job is None:
|
|
show_jobs(run_number, repo, resolved_token, console)
|
|
else:
|
|
fetch_log(run_number, job, repo, resolved_token)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
app()
|