#!/usr/bin/env -S uv run --script # /// script # requires-python = ">=3.12" # dependencies = ["httpx>=0.28.0", "pyyaml>=6.0", "rich>=13.0.0", "typer>=0.15.0"] # /// #MISE description="View active Mikado dependency chains for C2 changes" #USAGE arg "[card]" help="Card stem to show chain for" #USAGE flag "--all" help="Show all cards in full, including complete ones" #USAGE flag "--resume" help="Resume a chain: detect branch, show state and next steps" """View active Mikado dependency chains for C2 changes. Scans all markdown files in docs/ for YAML frontmatter with ``status: active`` and ``requires`` fields, then builds and displays the Mikado dependency graph. Usage: mise run docs-mikado # list all active chains mise run docs-mikado deploy-authentik # show chain for a card mise run docs-mikado deploy-authentik --all # include complete cards in full mise run docs-mikado --resume # resume: detect branch, show state mise run docs-mikado --resume deploy-authentik # resume specific chain """ import re import subprocess import sys from pathlib import Path from typing import Annotated import typer import yaml from rich.console import Console from rich.markdown import Markdown from rich.panel import Panel from rich.table import Table DOCS_DIR = Path(__file__).parent.parent / "docs" REPO_DIR = Path(__file__).parent.parent C2_COMMIT_RE = re.compile(r"^C2\(([^)]+)\):\s+(plan|impl|close|finalize)\s+(.+)$") def extract_frontmatter(file_path: Path) -> dict | None: """Extract YAML frontmatter from a markdown file.""" content = file_path.read_text() if not content.startswith("---"): return None end_idx = content.find("---", 3) if end_idx == -1: return None frontmatter_text = content[3:end_idx].strip() try: return yaml.safe_load(frontmatter_text) or {} except yaml.YAMLError: return None def build_graph() -> dict[str, dict]: """Build the dependency graph from all docs.""" cards: dict[str, dict] = {} for md_file in sorted(DOCS_DIR.rglob("*.md")): if "changelog.d" in md_file.parts: continue frontmatter = extract_frontmatter(md_file) if frontmatter is None: continue stem = md_file.stem cards[stem] = { "path": md_file, "title": frontmatter.get("title", stem), "status": frontmatter.get("status"), "branch": frontmatter.get("branch"), "requires": frontmatter.get("requires", []) or [], "required_by": [], } # Compute inverse relationships for stem, card in cards.items(): for req in card["requires"]: if req in cards: cards[req]["required_by"].append(stem) return cards def detect_cycles(cards: dict[str, dict]) -> list[list[str]]: """Detect circular dependencies in the graph. Returns list of cycles.""" cycles: list[list[str]] = [] visited: set[str] = set() on_stack: set[str] = set() def dfs(stem: str, path: list[str]) -> None: if stem in on_stack: cycle_start = path.index(stem) cycles.append(path[cycle_start:] + [stem]) return if stem in visited or stem not in cards: return visited.add(stem) on_stack.add(stem) path.append(stem) for req in cards[stem]["requires"]: dfs(req, path) path.pop() on_stack.discard(stem) for stem in cards: dfs(stem, []) return cycles def is_active(card: dict) -> bool: """Check if a card has status: active.""" return card.get("status") == "active" def find_root_goals(cards: dict[str, dict]) -> list[str]: """Find active cards that aren't required by another active card.""" roots = [] for stem, card in cards.items(): if not is_active(card): continue # A root goal is not required by any other active card has_active_parent = any( is_active(cards[rb]) for rb in card["required_by"] if rb in cards ) if not has_active_parent: roots.append(stem) return sorted(roots) def find_ready_leaves(cards: dict[str, dict], root: str) -> list[str]: """Find active leaf nodes ready for work (no unmet active requires).""" leaves = [] visited: set[str] = set() def walk(stem: str) -> None: if stem in visited or stem not in cards: return visited.add(stem) card = cards[stem] if not is_active(card): return # Check if all requires are met (not active) unmet = [ r for r in card["requires"] if r in cards and is_active(cards[r]) ] if not unmet: leaves.append(stem) for req in card["requires"]: walk(req) walk(root) return sorted(leaves) def get_current_branch() -> str | None: """Get the current git branch name.""" try: result = subprocess.run( ["git", "rev-parse", "--abbrev-ref", "HEAD"], capture_output=True, text=True, cwd=REPO_DIR, ) if result.returncode == 0: branch = result.stdout.strip() return branch if branch != "HEAD" else None return None except FileNotFoundError: return None def get_branch_commits(branch: str) -> list[dict]: """Get commits on a branch since it diverged from main.""" try: result = subprocess.run( ["git", "log", "--oneline", "--format=%H %s", f"main..{branch}"], capture_output=True, text=True, cwd=REPO_DIR, ) if result.returncode != 0: return [] commits = [] for line in result.stdout.strip().split("\n"): if not line: continue sha, _, subject = line.partition(" ") match = C2_COMMIT_RE.match(subject) commits.append( { "sha": sha, "subject": subject, "chain": match.group(1) if match else None, "verb": match.group(2) if match else None, "description": match.group(3) if match else None, "conventional": match is not None, } ) # git log returns newest first; reverse for chronological commits.reverse() return commits except FileNotFoundError: return [] def classify_branch_position(commits: list[dict]) -> str: """Determine where in the invariant the branch currently is.""" if not commits: return "empty" verbs = [c["verb"] for c in commits if c["conventional"]] if not verbs: return "unconventional" last_verb = verbs[-1] has_plan = "plan" in verbs has_impl = "impl" in verbs has_close = "close" in verbs if last_verb == "finalize": return "finalized" if not has_impl and not has_close: return "planning" if last_verb == "close": return "between-cycles" if last_verb == "impl": return "mid-cycle" if last_verb == "plan" and has_impl: # plan after impl — invariant violation return "invariant-violation" return "unknown" FORGE_API = "https://forge.ops.eblu.me/api/v1" def find_pr_for_branch(branch: str) -> dict | None: """Find an open PR for the given branch via the Forgejo API.""" import httpx try: resp = httpx.get( f"{FORGE_API}/repos/eblume/blumeops/pulls", params={"state": "open", "limit": 50}, timeout=10, ) if resp.status_code != 200: return None for pr in resp.json(): if pr.get("head", {}).get("ref") == branch: return { "number": pr["number"], "title": pr["title"], "url": pr.get("html_url", ""), } except (httpx.HTTPError, KeyError): pass return None def get_stash_list() -> list[str]: """Get the list of git stash entries.""" try: result = subprocess.run( ["git", "stash", "list"], capture_output=True, text=True, cwd=REPO_DIR, ) if result.returncode == 0 and result.stdout.strip(): return result.stdout.strip().split("\n") except FileNotFoundError: pass return [] def show_resume( cards: dict[str, dict], console: Console, chain_name: str | None, ) -> None: """Show resume information for continuing C2 work.""" current_branch = get_current_branch() roots = find_root_goals(cards) # Find chain-to-branch mapping from goal cards chain_branches: dict[str, str | None] = {} for stem in roots: card = cards[stem] chain_branches[stem] = card.get("branch") if chain_name: # Explicit chain requested — validate if chain_name not in cards: console.print(f"[red]Chain not found: {chain_name}[/red]") raise typer.Exit(code=1) if not is_active(cards[chain_name]): console.print( f"[yellow]{chain_name} is not active (no status: active)[/yellow]" ) raise typer.Exit(code=1) expected_branch = cards[chain_name].get("branch") if expected_branch and current_branch != expected_branch: console.print( f"[red]Branch mismatch:[/red] chain {chain_name} expects " f"branch [bold]{expected_branch}[/bold] but you are on " f"[bold]{current_branch}[/bold]" ) console.print( f"\n[dim]Run: git checkout {expected_branch}[/dim]" ) raise typer.Exit(code=1) _show_chain_resume(cards, console, chain_name, current_branch) return # No explicit chain — try to detect from branch if current_branch and current_branch.startswith("mikado/"): chain_stem = current_branch.removeprefix("mikado/") # Try to match to a goal card matched = None for stem in roots: if stem == chain_stem: matched = stem break if cards[stem].get("branch") == current_branch: matched = stem break if matched: _show_chain_resume(cards, console, matched, current_branch) return else: console.print( f"[yellow]On branch {current_branch} but no matching " f"active chain found for stem '{chain_stem}'[/yellow]" ) # On main or unrecognized branch — list options console.print() if not roots: console.print("[dim]No active Mikado chains to resume.[/dim]") raise typer.Exit() console.print("[bold]Active Mikado chains:[/bold]") console.print() table = Table(show_header=True, header_style="bold") table.add_column("Chain") table.add_column("Title") table.add_column("Branch") table.add_column("Status") table.add_column("Ready Leaves") for stem in roots: card = cards[stem] branch = card.get("branch") if branch: branch_display = f"[green]{branch}[/green]" status = "in progress" else: branch_display = "[dim]not started[/dim]" status = "planned" leaves = find_ready_leaves(cards, stem) leaves_display = ", ".join(leaves) if leaves else "[dim]none[/dim]" table.add_row(stem, card["title"], branch_display, status, leaves_display) console.print(table) console.print() console.print( "[dim]Run: mise run docs-mikado --resume to resume a specific chain[/dim]" ) def _show_chain_resume( cards: dict[str, dict], console: Console, chain: str, current_branch: str | None, ) -> None: """Show detailed resume info for a specific chain.""" card = cards[chain] branch = card.get("branch") or current_branch console.print() # Look up PR for this branch pr_info = find_pr_for_branch(branch) if branch else None panel_lines = [ f"[bold]{chain}[/bold] — {card['title']}", f"Branch: [green]{branch or 'not set'}[/green]", ] if pr_info: panel_lines.append( f"PR: [cyan]#{pr_info['number']}[/cyan] — {pr_info['title']}" ) if pr_info["url"]: panel_lines.append(f" {pr_info['url']}") console.print(Panel("\n".join(panel_lines), title="Resuming Mikado Chain")) if pr_info: console.print( f"[dim]Check PR comments: mise run pr-comments {pr_info['number']}[/dim]" ) # Show branch position if we can read commits if branch and current_branch == branch: commits = get_branch_commits(branch) position = classify_branch_position(commits) position_labels = { "empty": "[dim]No commits on branch yet[/dim]", "planning": "[cyan]Planning phase[/cyan] — still adding cards", "mid-cycle": "[yellow]Mid-cycle[/yellow] — impl in progress, leaf not yet closed", "between-cycles": "[green]Between cycles[/green] — ready for next leaf", "finalized": "[green]Finalized[/green] — chain complete, awaiting merge", "unconventional": "[yellow]Unconventional commits[/yellow] — commits don't follow C2() convention", "invariant-violation": "[red]Invariant violation[/red] — plan commit found after impl", "unknown": "[dim]Unknown position[/dim]", } console.print(f"\nBranch position: {position_labels.get(position, position)}") if commits: console.print("\n[bold]Recent commits:[/bold]") # Show last 10 commits for c in commits[-10:]: if c["conventional"]: verb_colors = { "plan": "cyan", "impl": "yellow", "close": "green", "finalize": "magenta", } color = verb_colors.get(c["verb"], "white") console.print( f" {c['sha'][:8]} [{color}]{c['verb']}[/{color}] {c['description']}" ) else: console.print( f" {c['sha'][:8]} [dim]{c['subject']}[/dim]" ) # Show ready leaves leaves = find_ready_leaves(cards, chain) if leaves: console.print("\n[bold]Ready leaf nodes (no unmet dependencies):[/bold]") for leaf in leaves: leaf_card = cards[leaf] console.print(f" [green]→[/green] {leaf} — {leaf_card['title']}") else: console.print("\n[dim]No ready leaf nodes — all leaves have unmet dependencies[/dim]") # Check for stashed work stashes = get_stash_list() if stashes: console.print( f"\n[yellow]Note: {len(stashes)} stash entr{'y' if len(stashes) == 1 else 'ies'} found:[/yellow]" ) for entry in stashes[:5]: console.print(f" [dim]{entry}[/dim]") if len(stashes) > 5: console.print(f" [dim]... and {len(stashes) - 5} more[/dim]") console.print( "[dim]Review with: git stash list / git stash show[/dim]" ) console.print() def walk_chain( cards: dict[str, dict], stem: str, console: Console, show_all: bool, visited: set[str] | None = None, depth: int = 0, ) -> None: """Walk the dependency tree depth-first from a card.""" if visited is None: visited = set() if stem in visited: console.print(f"{' ' * depth}[dim](circular: {stem})[/dim]") return visited.add(stem) card = cards.get(stem) if card is None: console.print(f"{' ' * depth}[red](missing: {stem})[/red]") return active = is_active(card) if active or show_all: # Print full card content console.print() separator = "=" * 72 console.print(f"[bold cyan]{separator}[/bold cyan]") status_tag = "[active]" if active else "[complete]" console.print( f"[bold]{status_tag} {stem}[/bold] — {card['title']}" ) console.print(f"[dim]{card['path'].relative_to(DOCS_DIR)}[/dim]") if card.get("branch"): console.print(f"[dim]branch: {card['branch']}[/dim]") if card["requires"]: console.print(f"[dim]requires: {', '.join(card['requires'])}[/dim]") if card["required_by"]: console.print( f"[dim]required-by: {', '.join(card['required_by'])}[/dim]" ) console.print(f"[bold cyan]{separator}[/bold cyan]") console.print() content = card["path"].read_text() console.print(content) else: # One-line summary for complete cards console.print( f"{' ' * depth}[green][complete][/green] {stem} — {card['title']}" ) # Recurse into dependencies for req in card["requires"]: walk_chain(cards, req, console, show_all, visited, depth + 1) def main( card: Annotated[ str | None, typer.Argument(help="Card stem to show chain for") ] = None, all: Annotated[ bool, typer.Option("--all", help="Show all cards in full, including complete ones"), ] = False, resume: Annotated[ bool, typer.Option("--resume", help="Resume a chain: detect branch, show state"), ] = False, ) -> None: console = Console() cards = build_graph() # Check for circular dependencies cycles = detect_cycles(cards) if cycles: console.print("[bold red]Circular dependencies detected![/bold red]") for cycle in cycles: console.print(f" [red]{' → '.join(cycle)}[/red]") console.print() if resume: show_resume(cards, console, chain_name=card) return if card is None: # List all active Mikado chains roots = find_root_goals(cards) if not roots: console.print("[dim]No active Mikado chains found.[/dim]") console.print( "[dim]Cards need status: active in frontmatter to appear here.[/dim]" ) raise typer.Exit() table = Table( title="Active Mikado Chains", show_header=True, header_style="bold" ) table.add_column("Goal Card") table.add_column("Title") table.add_column("Branch") table.add_column("Active Deps", justify="right") table.add_column("Total Deps", justify="right") for stem in roots: card_data = cards[stem] # Count dependencies def count_deps(s: str, seen: set[str] | None = None) -> tuple[int, int]: if seen is None: seen = set() if s in seen: return 0, 0 seen.add(s) active_count = 0 total_count = 0 c = cards.get(s) if c is None: return 0, 0 for req in c["requires"]: total_count += 1 req_card = cards.get(req) if req_card and is_active(req_card): active_count += 1 sub_active, sub_total = count_deps(req, seen) active_count += sub_active total_count += sub_total return active_count, total_count active_deps, total_deps = count_deps(stem) branch = card_data.get("branch") if branch: branch_display = branch else: branch_display = "[dim]not started[/dim]" table.add_row( f"[bold]{stem}[/bold]", card_data["title"], branch_display, str(active_deps), str(total_deps), ) console.print() console.print(table) console.print() console.print( "[dim]Run: mise run docs-mikado to see full chain[/dim]" ) console.print( "[dim]Run: mise run docs-mikado --resume to resume work on a chain[/dim]" ) else: if card not in cards: console.print(f"[red]Card not found: {card}[/red]") console.print("[dim]Available cards with status: active:[/dim]") for stem, data in sorted(cards.items()): if is_active(data): console.print(f" {stem} — {data['title']}") raise typer.Exit(code=1) walk_chain(cards, card, console, show_all=all) if __name__ == "__main__": typer.run(main)