#!/usr/bin/env -S uv run --script # /// script # requires-python = ">=3.12" # dependencies = ["httpx==0.28.1", "rich==15.0.0", "typer==0.26.2"] # /// #MISE description="List recent Forgejo Actions runs or fetch logs for a specific job" #USAGE arg "[run_number]" help="Run number to show jobs for (omit to list recent runs)" #USAGE flag "--job -j " help="Job index (0-based) to fetch logs for" #USAGE flag "--runner -r " help="Filter listing by runner: indri, ringtail, or all" #USAGE flag "--repo " help="Forge repo (owner/name), default: detected from git remote" #USAGE flag "--limit -n " help="Max runs to display (0 for all)" #USAGE flag "--token " help="Forgejo API token (default: read from 1Password)" """List recent Forgejo Actions runs and fetch job logs. Usage: mise run runner-logs # list recent runs (default 15) mise run runner-logs -n 0 # list ALL runs mise run runner-logs -r ringtail # list recent ringtail runs mise run runner-logs --repo eblume/hermes # list runs for a different repo mise run runner-logs 474 # show jobs in run 474 mise run runner-logs 474 -j 1 # fetch logs for job 1 of run 474 """ import os import re import subprocess import sys from typing import Annotated import httpx import typer from rich.console import Console from rich.table import Table FORGE_URL = "https://forge.ops.eblu.me" FORGE_API = f"{FORGE_URL}/api/v1" OP_TOKEN_REF = "op://vg6xf6vvfmoh5hqjjhlhbeoaie/w3663ffnvkewbftncqxtcpeavy/api-token" # Workflows using the ringtail nix-container-builder runner; everything else # runs on the indri k8s runner. RINGTAIL_WORKFLOWS = {"build-container-nix.yaml"} app = typer.Typer(add_completion=False) def resolve_token(explicit_token: str | None, console: Console) -> str: """Resolve Forgejo API token: explicit flag > FORGEJO_TOKEN env > 1Password.""" if explicit_token: return explicit_token env_token = os.environ.get("FORGEJO_TOKEN", "").strip() if env_token: return env_token console.print("[dim]Reading Forgejo API token from 1Password...[/dim]") result = subprocess.run( ["op", "read", OP_TOKEN_REF], capture_output=True, text=True, check=True, ) return result.stdout.strip() def detect_repo_from_git() -> str | None: """Sniff owner/repo from the git remote 'origin' URL.""" try: result = subprocess.run( ["git", "remote", "get-url", "origin"], capture_output=True, text=True, check=True, ) except (subprocess.CalledProcessError, FileNotFoundError): return None url = result.stdout.strip() # Match SSH (git@host:owner/repo.git) or HTTPS (https://host/owner/repo.git) m = re.search(r"[/:]([^/]+/[^/]+?)(?:\.git)?$", url) if not m: return None candidate = m.group(1) # Only use it if the remote points at our forge if "forge.ops.eblu.me" in url or "forge.eblu.me" in url: return candidate return None def runner_for_workflow(workflow_id: str) -> str: return "ringtail" if workflow_id in RINGTAIL_WORKFLOWS else "indri" def auth_headers(token: str) -> dict[str, str]: return {"Authorization": f"token {token}"} def fetch_tasks(repo: str, token: str) -> list[dict]: """Fetch all tasks from the Forgejo API, paginating if needed.""" tasks: list[dict] = [] page = 1 while True: resp = httpx.get( f"{FORGE_API}/repos/{repo}/actions/tasks", params={"page": page, "limit": 50}, headers=auth_headers(token), timeout=15, ) resp.raise_for_status() batch = resp.json().get("workflow_runs", []) if not batch: break tasks.extend(batch) page += 1 return tasks def list_runs(runner: str, repo: str, limit: int, token: str, console: Console) -> None: """List recent workflow runs, grouped by run number.""" tasks = fetch_tasks(repo, token) # Group tasks by run_number runs: dict[int, list[dict]] = {} for t in tasks: rn = t["run_number"] runs.setdefault(rn, []).append(t) table = Table(title=f"Recent runs — {repo} (filter: {runner})") table.add_column("Run #", style="cyan", no_wrap=True) table.add_column("Status") table.add_column("Runner") table.add_column("Jobs") table.add_column("Title") table.add_column("Event") shown = 0 for rn in sorted(runs, reverse=True): if limit > 0 and shown >= limit: break jobs = sorted(runs[rn], key=lambda x: x["id"]) workflow_id = jobs[0].get("workflow_id", "") host = runner_for_workflow(workflow_id) if runner != "all" and host != runner: continue # Aggregate status: worst status wins statuses = [j.get("status", "") for j in jobs] if "failure" in statuses: status, style = "failure", "red" elif "running" in statuses or "waiting" in statuses: status, style = "running", "yellow" elif all(s == "success" for s in statuses): status, style = "success", "green" else: status, style = statuses[0], "yellow" job_names = ", ".join(j.get("name", "?")[:30] for j in jobs) title = (jobs[0].get("display_title") or "")[:40] event = jobs[0].get("event", "") table.add_row( str(rn), f"[{style}]{status}[/{style}]", host, job_names, title, event, ) shown += 1 console.print(table) console.print("\n[dim]Use: mise run runner-logs to see jobs in a run[/dim]") console.print("[dim] mise run runner-logs -j N to fetch logs for job N[/dim]") def show_jobs(run_number: int, repo: str, token: str, console: Console) -> None: """Show the jobs within a specific run.""" tasks = fetch_tasks(repo, token) jobs = sorted( [t for t in tasks if t["run_number"] == run_number], key=lambda x: x["id"], ) if not jobs: typer.echo(f"Error: No jobs found for run #{run_number}", err=True) raise typer.Exit(1) table = Table(title=f"Jobs in run #{run_number} — {repo}") table.add_column("Job #", style="cyan", no_wrap=True) table.add_column("Status") table.add_column("Name") table.add_column("Created") for i, job in enumerate(jobs): status = job.get("status", "") style = "green" if status == "success" else "red" if status == "failure" else "yellow" table.add_row( str(i), f"[{style}]{status}[/{style}]", job.get("name", ""), job.get("created_at", ""), ) console.print(table) console.print(f"\n[dim]Use: mise run runner-logs {run_number} -j N to fetch logs for job N[/dim]") def fetch_log(run_number: int, job_index: int, repo: str, token: str) -> None: """Fetch logs for a specific job via SSH to indri. Forgejo stores action logs as zstd-compressed files on disk at ~/forgejo/data/actions_log/{owner}/{repo}/{hex_prefix}/{task_id}.log.zst regardless of which runner executed the job. The web log endpoint doesn't support API-token auth for private repos, so we read the files directly. """ tasks = fetch_tasks(repo, token) jobs = sorted( [t for t in tasks if t["run_number"] == run_number], key=lambda x: x["id"], ) if not jobs: typer.echo(f"Error: No jobs found for run #{run_number}", err=True) raise typer.Exit(1) if job_index < 0 or job_index >= len(jobs): typer.echo( f"Error: job index {job_index} out of range (run #{run_number} has {len(jobs)} jobs)", err=True, ) raise typer.Exit(1) task_id = jobs[job_index]["id"] hex_prefix = f"{task_id & 0xff:02x}" log_path = f"~/forgejo/data/actions_log/{repo}/{hex_prefix}/{task_id}.log.zst" # indri's login shell (fish) silently swallows SSH exit codes, so we can't # rely on returncode. zstdcat itself also exits 0 with a "can't stat ... # -- ignored" stderr message when the file is missing. Detect missing logs # by running `test -f` over SSH and parsing the marker line from stdout. probe = subprocess.run( ["ssh", "indri", f"test -f {log_path} && echo EXISTS || echo MISSING"], capture_output=True, text=True, ) marker = probe.stdout.strip().splitlines()[-1] if probe.stdout.strip() else "" if marker != "EXISTS": typer.echo( f"Error: log not found for run #{run_number} job {job_index} (task {task_id})", err=True, ) typer.echo(f"Path: indri:{log_path}", err=True) typer.echo( "The runner may have crashed before uploading its log buffer " "(action_task.log_in_storage = 0).", err=True, ) raise typer.Exit(1) result = subprocess.run( ["ssh", "indri", f"zstdcat {log_path}"], capture_output=True, text=True, ) if result.returncode != 0 or not result.stdout: typer.echo( f"Error: could not read log for run #{run_number} job {job_index} (task {task_id})", err=True, ) typer.echo(f"Path: indri:{log_path}", err=True) if result.stderr.strip(): typer.echo(result.stderr.strip(), err=True) raise typer.Exit(1) sys.stdout.write(result.stdout) @app.command() def main( run_number: Annotated[ int | None, typer.Argument(help="Run number to show jobs for (omit to list recent runs)"), ] = None, job: Annotated[ int | None, typer.Option("--job", "-j", help="Job index (0-based) to fetch logs for"), ] = None, runner: Annotated[ str, typer.Option("--runner", "-r", help="Filter listing by runner: indri, ringtail, or all"), ] = "all", repo: Annotated[ str | None, typer.Option("--repo", help="Forge repo (owner/name), default: detected from git remote"), ] = None, limit: Annotated[ int, typer.Option("--limit", "-n", help="Max runs to display (0 for all)"), ] = 15, token: Annotated[ str | None, typer.Option("--token", help="Forgejo API token (default: read from 1Password)"), ] = None, ) -> None: """List recent Forgejo Actions runs or fetch logs for a specific job.""" if runner not in ("indri", "ringtail", "all"): typer.echo(f"Error: runner must be 'indri', 'ringtail', or 'all', got '{runner}'") raise typer.Exit(1) console = Console() if repo is None: repo = detect_repo_from_git() if repo is None: typer.echo( "Error: could not detect repo from git remote; use --repo owner/name", err=True, ) raise typer.Exit(1) console.print(f"[dim]Detected repo: {repo}[/dim]") resolved_token = resolve_token(token, console) if run_number is None: if job is not None: typer.echo("Error: --job requires a run number", err=True) raise typer.Exit(1) list_runs(runner, repo, limit, resolved_token, console) elif job is None: show_jobs(run_number, repo, resolved_token, console) else: fetch_log(run_number, job, repo, resolved_token) if __name__ == "__main__": app()