648 lines
21 KiB
Text
Executable file
648 lines
21 KiB
Text
Executable file
#!/usr/bin/env -S uv run --script
|
|
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = ["httpx>=0.28.0", "pyyaml>=6.0", "rich>=13.0.0", "typer>=0.15.0"]
|
|
# ///
|
|
#MISE description="View active Mikado dependency chains for C2 changes"
|
|
#USAGE arg "[card]" help="Card stem to show chain for"
|
|
#USAGE flag "--all" help="Show all cards in full, including complete ones"
|
|
#USAGE flag "--resume" help="Resume a chain: detect branch, show state and next steps"
|
|
"""View active Mikado dependency chains for C2 changes.
|
|
|
|
Scans all markdown files in docs/ for YAML frontmatter with ``status: active``
|
|
and ``requires`` fields, then builds and displays the Mikado dependency graph.
|
|
|
|
Usage:
|
|
mise run docs-mikado # list all active chains
|
|
mise run docs-mikado deploy-authentik # show chain for a card
|
|
mise run docs-mikado deploy-authentik --all # include complete cards in full
|
|
mise run docs-mikado --resume # resume: detect branch, show state
|
|
mise run docs-mikado --resume deploy-authentik # resume specific chain
|
|
"""
|
|
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
from pathlib import Path
|
|
from typing import Annotated
|
|
|
|
import typer
|
|
import yaml
|
|
from rich.console import Console
|
|
from rich.markdown import Markdown
|
|
from rich.panel import Panel
|
|
from rich.table import Table
|
|
|
|
DOCS_DIR = Path(__file__).parent.parent / "docs"
|
|
REPO_DIR = Path(__file__).parent.parent
|
|
|
|
C2_COMMIT_RE = re.compile(r"^C2\(([^)]+)\):\s+(plan|impl|close|finalize)\s+(.+)$")
|
|
|
|
|
|
def extract_frontmatter(file_path: Path) -> dict | None:
|
|
"""Extract YAML frontmatter from a markdown file."""
|
|
content = file_path.read_text()
|
|
if not content.startswith("---"):
|
|
return None
|
|
|
|
end_idx = content.find("---", 3)
|
|
if end_idx == -1:
|
|
return None
|
|
|
|
frontmatter_text = content[3:end_idx].strip()
|
|
try:
|
|
return yaml.safe_load(frontmatter_text) or {}
|
|
except yaml.YAMLError:
|
|
return None
|
|
|
|
|
|
def build_graph() -> dict[str, dict]:
|
|
"""Build the dependency graph from all docs."""
|
|
cards: dict[str, dict] = {}
|
|
|
|
for md_file in sorted(DOCS_DIR.rglob("*.md")):
|
|
if "changelog.d" in md_file.parts:
|
|
continue
|
|
|
|
frontmatter = extract_frontmatter(md_file)
|
|
if frontmatter is None:
|
|
continue
|
|
|
|
stem = md_file.stem
|
|
cards[stem] = {
|
|
"path": md_file,
|
|
"title": frontmatter.get("title", stem),
|
|
"status": frontmatter.get("status"),
|
|
"branch": frontmatter.get("branch"),
|
|
"requires": frontmatter.get("requires", []) or [],
|
|
"required_by": [],
|
|
}
|
|
|
|
# Compute inverse relationships
|
|
for stem, card in cards.items():
|
|
for req in card["requires"]:
|
|
if req in cards:
|
|
cards[req]["required_by"].append(stem)
|
|
|
|
return cards
|
|
|
|
|
|
def detect_cycles(cards: dict[str, dict]) -> list[list[str]]:
|
|
"""Detect circular dependencies in the graph. Returns list of cycles."""
|
|
cycles: list[list[str]] = []
|
|
visited: set[str] = set()
|
|
on_stack: set[str] = set()
|
|
|
|
def dfs(stem: str, path: list[str]) -> None:
|
|
if stem in on_stack:
|
|
cycle_start = path.index(stem)
|
|
cycles.append(path[cycle_start:] + [stem])
|
|
return
|
|
if stem in visited or stem not in cards:
|
|
return
|
|
visited.add(stem)
|
|
on_stack.add(stem)
|
|
path.append(stem)
|
|
for req in cards[stem]["requires"]:
|
|
dfs(req, path)
|
|
path.pop()
|
|
on_stack.discard(stem)
|
|
|
|
for stem in cards:
|
|
dfs(stem, [])
|
|
|
|
return cycles
|
|
|
|
|
|
def is_active(card: dict) -> bool:
|
|
"""Check if a card has status: active."""
|
|
return card.get("status") == "active"
|
|
|
|
|
|
def find_root_goals(cards: dict[str, dict]) -> list[str]:
|
|
"""Find active cards that aren't required by another active card."""
|
|
roots = []
|
|
for stem, card in cards.items():
|
|
if not is_active(card):
|
|
continue
|
|
# A root goal is not required by any other active card
|
|
has_active_parent = any(
|
|
is_active(cards[rb]) for rb in card["required_by"] if rb in cards
|
|
)
|
|
if not has_active_parent:
|
|
roots.append(stem)
|
|
return sorted(roots)
|
|
|
|
|
|
def find_ready_leaves(cards: dict[str, dict], root: str) -> list[str]:
|
|
"""Find active leaf nodes ready for work (no unmet active requires)."""
|
|
leaves = []
|
|
visited: set[str] = set()
|
|
|
|
def walk(stem: str) -> None:
|
|
if stem in visited or stem not in cards:
|
|
return
|
|
visited.add(stem)
|
|
card = cards[stem]
|
|
if not is_active(card):
|
|
return
|
|
# Check if all requires are met (not active)
|
|
unmet = [
|
|
r for r in card["requires"] if r in cards and is_active(cards[r])
|
|
]
|
|
if not unmet:
|
|
leaves.append(stem)
|
|
for req in card["requires"]:
|
|
walk(req)
|
|
|
|
walk(root)
|
|
return sorted(leaves)
|
|
|
|
|
|
def get_current_branch() -> str | None:
|
|
"""Get the current git branch name."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "rev-parse", "--abbrev-ref", "HEAD"],
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=REPO_DIR,
|
|
)
|
|
if result.returncode == 0:
|
|
branch = result.stdout.strip()
|
|
return branch if branch != "HEAD" else None
|
|
return None
|
|
except FileNotFoundError:
|
|
return None
|
|
|
|
|
|
def get_branch_commits(branch: str) -> list[dict]:
|
|
"""Get commits on a branch since it diverged from main."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "log", "--oneline", "--format=%H %s", f"main..{branch}"],
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=REPO_DIR,
|
|
)
|
|
if result.returncode != 0:
|
|
return []
|
|
commits = []
|
|
for line in result.stdout.strip().split("\n"):
|
|
if not line:
|
|
continue
|
|
sha, _, subject = line.partition(" ")
|
|
match = C2_COMMIT_RE.match(subject)
|
|
commits.append(
|
|
{
|
|
"sha": sha,
|
|
"subject": subject,
|
|
"chain": match.group(1) if match else None,
|
|
"verb": match.group(2) if match else None,
|
|
"description": match.group(3) if match else None,
|
|
"conventional": match is not None,
|
|
}
|
|
)
|
|
# git log returns newest first; reverse for chronological
|
|
commits.reverse()
|
|
return commits
|
|
except FileNotFoundError:
|
|
return []
|
|
|
|
|
|
def classify_branch_position(commits: list[dict]) -> str:
|
|
"""Determine where in the invariant the branch currently is."""
|
|
if not commits:
|
|
return "empty"
|
|
|
|
verbs = [c["verb"] for c in commits if c["conventional"]]
|
|
if not verbs:
|
|
return "unconventional"
|
|
|
|
last_verb = verbs[-1]
|
|
has_plan = "plan" in verbs
|
|
has_impl = "impl" in verbs
|
|
has_close = "close" in verbs
|
|
|
|
if last_verb == "finalize":
|
|
return "finalized"
|
|
if not has_impl and not has_close:
|
|
return "planning"
|
|
if last_verb == "close":
|
|
return "between-cycles"
|
|
if last_verb == "impl":
|
|
return "mid-cycle"
|
|
if last_verb == "plan" and has_impl:
|
|
# plan after impl — invariant violation
|
|
return "invariant-violation"
|
|
return "unknown"
|
|
|
|
|
|
FORGE_API = "https://forge.ops.eblu.me/api/v1"
|
|
|
|
|
|
def find_pr_for_branch(branch: str) -> dict | None:
|
|
"""Find an open PR for the given branch via the Forgejo API."""
|
|
import httpx
|
|
|
|
try:
|
|
resp = httpx.get(
|
|
f"{FORGE_API}/repos/eblume/blumeops/pulls",
|
|
params={"state": "open", "limit": 50},
|
|
timeout=10,
|
|
)
|
|
if resp.status_code != 200:
|
|
return None
|
|
for pr in resp.json():
|
|
if pr.get("head", {}).get("ref") == branch:
|
|
return {
|
|
"number": pr["number"],
|
|
"title": pr["title"],
|
|
"url": pr.get("html_url", ""),
|
|
}
|
|
except (httpx.HTTPError, KeyError):
|
|
pass
|
|
return None
|
|
|
|
|
|
def get_stash_list() -> list[str]:
|
|
"""Get the list of git stash entries."""
|
|
try:
|
|
result = subprocess.run(
|
|
["git", "stash", "list"],
|
|
capture_output=True,
|
|
text=True,
|
|
cwd=REPO_DIR,
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
return result.stdout.strip().split("\n")
|
|
except FileNotFoundError:
|
|
pass
|
|
return []
|
|
|
|
|
|
def show_resume(
|
|
cards: dict[str, dict],
|
|
console: Console,
|
|
chain_name: str | None,
|
|
) -> None:
|
|
"""Show resume information for continuing C2 work."""
|
|
current_branch = get_current_branch()
|
|
roots = find_root_goals(cards)
|
|
|
|
# Find chain-to-branch mapping from goal cards
|
|
chain_branches: dict[str, str | None] = {}
|
|
for stem in roots:
|
|
card = cards[stem]
|
|
chain_branches[stem] = card.get("branch")
|
|
|
|
if chain_name:
|
|
# Explicit chain requested — validate
|
|
if chain_name not in cards:
|
|
console.print(f"[red]Chain not found: {chain_name}[/red]")
|
|
raise typer.Exit(code=1)
|
|
if not is_active(cards[chain_name]):
|
|
console.print(
|
|
f"[yellow]{chain_name} is not active (no status: active)[/yellow]"
|
|
)
|
|
raise typer.Exit(code=1)
|
|
|
|
expected_branch = cards[chain_name].get("branch")
|
|
if expected_branch and current_branch != expected_branch:
|
|
console.print(
|
|
f"[red]Branch mismatch:[/red] chain {chain_name} expects "
|
|
f"branch [bold]{expected_branch}[/bold] but you are on "
|
|
f"[bold]{current_branch}[/bold]"
|
|
)
|
|
console.print(
|
|
f"\n[dim]Run: git checkout {expected_branch}[/dim]"
|
|
)
|
|
raise typer.Exit(code=1)
|
|
|
|
_show_chain_resume(cards, console, chain_name, current_branch)
|
|
return
|
|
|
|
# No explicit chain — try to detect from branch
|
|
if current_branch and current_branch.startswith("mikado/"):
|
|
chain_stem = current_branch.removeprefix("mikado/")
|
|
# Try to match to a goal card
|
|
matched = None
|
|
for stem in roots:
|
|
if stem == chain_stem:
|
|
matched = stem
|
|
break
|
|
if cards[stem].get("branch") == current_branch:
|
|
matched = stem
|
|
break
|
|
|
|
if matched:
|
|
_show_chain_resume(cards, console, matched, current_branch)
|
|
return
|
|
else:
|
|
console.print(
|
|
f"[yellow]On branch {current_branch} but no matching "
|
|
f"active chain found for stem '{chain_stem}'[/yellow]"
|
|
)
|
|
|
|
# On main or unrecognized branch — list options
|
|
console.print()
|
|
if not roots:
|
|
console.print("[dim]No active Mikado chains to resume.[/dim]")
|
|
raise typer.Exit()
|
|
|
|
console.print("[bold]Active Mikado chains:[/bold]")
|
|
console.print()
|
|
|
|
table = Table(show_header=True, header_style="bold")
|
|
table.add_column("Chain")
|
|
table.add_column("Title")
|
|
table.add_column("Branch")
|
|
table.add_column("Status")
|
|
table.add_column("Ready Leaves")
|
|
|
|
for stem in roots:
|
|
card = cards[stem]
|
|
branch = card.get("branch")
|
|
if branch:
|
|
branch_display = f"[green]{branch}[/green]"
|
|
status = "in progress"
|
|
else:
|
|
branch_display = "[dim]not started[/dim]"
|
|
status = "planned"
|
|
|
|
leaves = find_ready_leaves(cards, stem)
|
|
leaves_display = ", ".join(leaves) if leaves else "[dim]none[/dim]"
|
|
|
|
table.add_row(stem, card["title"], branch_display, status, leaves_display)
|
|
|
|
console.print(table)
|
|
console.print()
|
|
console.print(
|
|
"[dim]Run: mise run docs-mikado --resume <chain> to resume a specific chain[/dim]"
|
|
)
|
|
|
|
|
|
def _show_chain_resume(
|
|
cards: dict[str, dict],
|
|
console: Console,
|
|
chain: str,
|
|
current_branch: str | None,
|
|
) -> None:
|
|
"""Show detailed resume info for a specific chain."""
|
|
card = cards[chain]
|
|
branch = card.get("branch") or current_branch
|
|
|
|
console.print()
|
|
|
|
# Look up PR for this branch
|
|
pr_info = find_pr_for_branch(branch) if branch else None
|
|
panel_lines = [
|
|
f"[bold]{chain}[/bold] — {card['title']}",
|
|
f"Branch: [green]{branch or 'not set'}[/green]",
|
|
]
|
|
if pr_info:
|
|
panel_lines.append(
|
|
f"PR: [cyan]#{pr_info['number']}[/cyan] — {pr_info['title']}"
|
|
)
|
|
if pr_info["url"]:
|
|
panel_lines.append(f" {pr_info['url']}")
|
|
|
|
console.print(Panel("\n".join(panel_lines), title="Resuming Mikado Chain"))
|
|
|
|
if pr_info:
|
|
console.print(
|
|
f"[dim]Check PR comments: mise run pr-comments {pr_info['number']}[/dim]"
|
|
)
|
|
|
|
# Show branch position if we can read commits
|
|
if branch and current_branch == branch:
|
|
commits = get_branch_commits(branch)
|
|
position = classify_branch_position(commits)
|
|
|
|
position_labels = {
|
|
"empty": "[dim]No commits on branch yet[/dim]",
|
|
"planning": "[cyan]Planning phase[/cyan] — still adding cards",
|
|
"mid-cycle": "[yellow]Mid-cycle[/yellow] — impl in progress, leaf not yet closed",
|
|
"between-cycles": "[green]Between cycles[/green] — ready for next leaf",
|
|
"finalized": "[green]Finalized[/green] — chain complete, awaiting merge",
|
|
"unconventional": "[yellow]Unconventional commits[/yellow] — commits don't follow C2() convention",
|
|
"invariant-violation": "[red]Invariant violation[/red] — plan commit found after impl",
|
|
"unknown": "[dim]Unknown position[/dim]",
|
|
}
|
|
console.print(f"\nBranch position: {position_labels.get(position, position)}")
|
|
|
|
if commits:
|
|
console.print("\n[bold]Recent commits:[/bold]")
|
|
# Show last 10 commits
|
|
for c in commits[-10:]:
|
|
if c["conventional"]:
|
|
verb_colors = {
|
|
"plan": "cyan",
|
|
"impl": "yellow",
|
|
"close": "green",
|
|
"finalize": "magenta",
|
|
}
|
|
color = verb_colors.get(c["verb"], "white")
|
|
console.print(
|
|
f" {c['sha'][:8]} [{color}]{c['verb']}[/{color}] {c['description']}"
|
|
)
|
|
else:
|
|
console.print(
|
|
f" {c['sha'][:8]} [dim]{c['subject']}[/dim]"
|
|
)
|
|
|
|
# Show ready leaves
|
|
leaves = find_ready_leaves(cards, chain)
|
|
if leaves:
|
|
console.print("\n[bold]Ready leaf nodes (no unmet dependencies):[/bold]")
|
|
for leaf in leaves:
|
|
leaf_card = cards[leaf]
|
|
console.print(f" [green]→[/green] {leaf} — {leaf_card['title']}")
|
|
else:
|
|
console.print("\n[dim]No ready leaf nodes — all leaves have unmet dependencies[/dim]")
|
|
|
|
# Check for stashed work
|
|
stashes = get_stash_list()
|
|
if stashes:
|
|
console.print(
|
|
f"\n[yellow]Note: {len(stashes)} stash entr{'y' if len(stashes) == 1 else 'ies'} found:[/yellow]"
|
|
)
|
|
for entry in stashes[:5]:
|
|
console.print(f" [dim]{entry}[/dim]")
|
|
if len(stashes) > 5:
|
|
console.print(f" [dim]... and {len(stashes) - 5} more[/dim]")
|
|
console.print(
|
|
"[dim]Review with: git stash list / git stash show[/dim]"
|
|
)
|
|
|
|
console.print()
|
|
|
|
|
|
def walk_chain(
|
|
cards: dict[str, dict],
|
|
stem: str,
|
|
console: Console,
|
|
show_all: bool,
|
|
visited: set[str] | None = None,
|
|
depth: int = 0,
|
|
) -> None:
|
|
"""Walk the dependency tree depth-first from a card."""
|
|
if visited is None:
|
|
visited = set()
|
|
|
|
if stem in visited:
|
|
console.print(f"{' ' * depth}[dim](circular: {stem})[/dim]")
|
|
return
|
|
visited.add(stem)
|
|
|
|
card = cards.get(stem)
|
|
if card is None:
|
|
console.print(f"{' ' * depth}[red](missing: {stem})[/red]")
|
|
return
|
|
|
|
active = is_active(card)
|
|
|
|
if active or show_all:
|
|
# Print full card content
|
|
console.print()
|
|
separator = "=" * 72
|
|
console.print(f"[bold cyan]{separator}[/bold cyan]")
|
|
status_tag = "[active]" if active else "[complete]"
|
|
console.print(
|
|
f"[bold]{status_tag} {stem}[/bold] — {card['title']}"
|
|
)
|
|
console.print(f"[dim]{card['path'].relative_to(DOCS_DIR)}[/dim]")
|
|
if card.get("branch"):
|
|
console.print(f"[dim]branch: {card['branch']}[/dim]")
|
|
if card["requires"]:
|
|
console.print(f"[dim]requires: {', '.join(card['requires'])}[/dim]")
|
|
if card["required_by"]:
|
|
console.print(
|
|
f"[dim]required-by: {', '.join(card['required_by'])}[/dim]"
|
|
)
|
|
console.print(f"[bold cyan]{separator}[/bold cyan]")
|
|
console.print()
|
|
content = card["path"].read_text()
|
|
console.print(content)
|
|
else:
|
|
# One-line summary for complete cards
|
|
console.print(
|
|
f"{' ' * depth}[green][complete][/green] {stem} — {card['title']}"
|
|
)
|
|
|
|
# Recurse into dependencies
|
|
for req in card["requires"]:
|
|
walk_chain(cards, req, console, show_all, visited, depth + 1)
|
|
|
|
|
|
def main(
|
|
card: Annotated[
|
|
str | None, typer.Argument(help="Card stem to show chain for")
|
|
] = None,
|
|
all: Annotated[
|
|
bool,
|
|
typer.Option("--all", help="Show all cards in full, including complete ones"),
|
|
] = False,
|
|
resume: Annotated[
|
|
bool,
|
|
typer.Option("--resume", help="Resume a chain: detect branch, show state"),
|
|
] = False,
|
|
) -> None:
|
|
console = Console()
|
|
cards = build_graph()
|
|
|
|
# Check for circular dependencies
|
|
cycles = detect_cycles(cards)
|
|
if cycles:
|
|
console.print("[bold red]Circular dependencies detected![/bold red]")
|
|
for cycle in cycles:
|
|
console.print(f" [red]{' → '.join(cycle)}[/red]")
|
|
console.print()
|
|
|
|
if resume:
|
|
show_resume(cards, console, chain_name=card)
|
|
return
|
|
|
|
if card is None:
|
|
# List all active Mikado chains
|
|
roots = find_root_goals(cards)
|
|
|
|
if not roots:
|
|
console.print("[dim]No active Mikado chains found.[/dim]")
|
|
console.print(
|
|
"[dim]Cards need status: active in frontmatter to appear here.[/dim]"
|
|
)
|
|
raise typer.Exit()
|
|
|
|
table = Table(
|
|
title="Active Mikado Chains", show_header=True, header_style="bold"
|
|
)
|
|
table.add_column("Goal Card")
|
|
table.add_column("Title")
|
|
table.add_column("Branch")
|
|
table.add_column("Active Deps", justify="right")
|
|
table.add_column("Total Deps", justify="right")
|
|
|
|
for stem in roots:
|
|
card_data = cards[stem]
|
|
|
|
# Count dependencies
|
|
def count_deps(s: str, seen: set[str] | None = None) -> tuple[int, int]:
|
|
if seen is None:
|
|
seen = set()
|
|
if s in seen:
|
|
return 0, 0
|
|
seen.add(s)
|
|
active_count = 0
|
|
total_count = 0
|
|
c = cards.get(s)
|
|
if c is None:
|
|
return 0, 0
|
|
for req in c["requires"]:
|
|
total_count += 1
|
|
req_card = cards.get(req)
|
|
if req_card and is_active(req_card):
|
|
active_count += 1
|
|
sub_active, sub_total = count_deps(req, seen)
|
|
active_count += sub_active
|
|
total_count += sub_total
|
|
return active_count, total_count
|
|
|
|
active_deps, total_deps = count_deps(stem)
|
|
branch = card_data.get("branch")
|
|
if branch:
|
|
branch_display = branch
|
|
else:
|
|
branch_display = "[dim]not started[/dim]"
|
|
|
|
table.add_row(
|
|
f"[bold]{stem}[/bold]",
|
|
card_data["title"],
|
|
branch_display,
|
|
str(active_deps),
|
|
str(total_deps),
|
|
)
|
|
|
|
console.print()
|
|
console.print(table)
|
|
console.print()
|
|
console.print(
|
|
"[dim]Run: mise run docs-mikado <card> to see full chain[/dim]"
|
|
)
|
|
console.print(
|
|
"[dim]Run: mise run docs-mikado --resume to resume work on a chain[/dim]"
|
|
)
|
|
else:
|
|
if card not in cards:
|
|
console.print(f"[red]Card not found: {card}[/red]")
|
|
console.print("[dim]Available cards with status: active:[/dim]")
|
|
for stem, data in sorted(cards.items()):
|
|
if is_active(data):
|
|
console.print(f" {stem} — {data['title']}")
|
|
raise typer.Exit(code=1)
|
|
|
|
walk_chain(cards, card, console, show_all=all)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
typer.run(main)
|