hephaestus/crates/hephd/src/main.rs
Erich Blume f4db186234
Some checks failed
Build / validate (pull_request) Failing after 9s
hephd: OIDC client auth — device-code flow + token attach (auth 10b)
Close the auth loop: clients obtain a bearer token and present it to the
hub (tech-spec §13).

- oauth module: DeviceFlow (RFC 8628 — discover, start, poll handling
  authorization_pending/slow_down, refresh) + StoredToken + TokenStore
  (OS keyring via `keyring`, in-memory for tests) + current_bearer (loads
  and refreshes-on-expiry).
- heph auth login/logout: runs the device flow, prints the verification
  URL + user code, caches the token in the keyring.
- sync_once gains a bearer arg; the daemon (Daemon::spawn_sync_loop +
  sync.now) obtains it via current_bearer; RemoteStore attaches it to /rpc.
  --oidc-issuer/--oidc-client-id configure the spoke/client.
- Fix a latent panic: reqwest::blocking spins its own runtime and panics
  inside the daemon's spawn_blocking pool. All blocking auth/proxy HTTP
  (OidcVerifier JWKS, DeviceFlow, RemoteStore) now uses runtime-free `ureq`;
  async reqwest remains only for sync_once. (Caught by the new e2e test.)
- Tests (offline): device flow + refresh + token store vs a mock OAuth
  server; a full spoke->authenticated-hub loop (valid token accepted,
  missing token rejected) signed by a runtime-generated RSA key.

112 tests green; clippy -D warnings + fmt + prek clean. Slice 10 (auth)
complete; next is heph.nvim.

Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
2026-06-01 16:27:36 -07:00

203 lines
7.8 KiB
Rust

//! `hephd` binary — starts the daemon in `local`, `server`, or `client` mode.
//!
//! `local`/`server` own the local SQLite file (exclusive lock); `client` keeps
//! no replica and proxies to a `--server-url`. All three serve surfaces over a
//! unix socket. **server** additionally exposes the hub HTTP endpoint for spokes
//! to sync against (requiring OIDC bearer tokens when `--oidc-issuer`/`-audience`
//! are set); a **local** instance given `--hub-url` becomes a syncing spoke that
//! background-exchanges its op-log with that hub (tech-spec §3.1, §6.1, §12, §13).
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
use anyhow::{Context, Result};
use clap::{Parser, ValueEnum};
use tokio::net::{TcpListener, UnixListener};
use heph_core::LocalStore;
use hephd::{
default_db_path, default_socket_path, sync, Daemon, KeyringTokenStore, LockGuard, RemoteStore,
SystemClock, TokenStore,
};
/// How often a spoke background-syncs with its hub.
const SYNC_INTERVAL: Duration = Duration::from_secs(30);
/// Default hub HTTP bind address in server mode.
const DEFAULT_HTTP_ADDR: &str = "127.0.0.1:8787";
#[derive(Copy, Clone, Debug, PartialEq, Eq, ValueEnum)]
enum Mode {
/// Own replica; no inbound network endpoint (syncing spoke if `--hub-url`).
Local,
/// Also a sync hub: exposes the authenticated network endpoint over HTTP.
Server,
/// No local replica; proxy every call to a `--server-url` (online-only).
Client,
}
/// The Hephaestus per-device daemon.
#[derive(Parser, Debug)]
#[command(name = "hephd", version, about)]
struct Cli {
/// Runtime mode.
#[arg(long, value_enum, default_value_t = Mode::Local)]
mode: Mode,
/// Path to the SQLite store file.
#[arg(long)]
db: Option<PathBuf>,
/// Path to the unix socket to listen on.
#[arg(long)]
socket: Option<PathBuf>,
/// Hub to background-sync this replica's op-log with (makes it a spoke).
#[arg(long)]
hub_url: Option<String>,
/// Address for the hub HTTP endpoint (server mode only).
#[arg(long)]
http_addr: Option<String>,
/// Server to proxy to (client mode only; required there).
#[arg(long)]
server_url: Option<String>,
/// OIDC issuer to verify hub bearer tokens against (server mode). When set
/// with --oidc-audience, the hub endpoints require a valid token.
#[arg(long)]
oidc_issuer: Option<String>,
/// OIDC audience (client id) hub tokens must carry (server mode).
#[arg(long)]
oidc_audience: Option<String>,
/// OIDC client id this device authenticates as, for spoke/client sync. With
/// --oidc-issuer, the device attaches a cached bearer token to hub requests.
#[arg(long)]
oidc_client_id: Option<String>,
}
/// Build the spoke/client token source: a keyring store keyed by `account` (the
/// hub/server url) plus the issuer + client id. `None` unless both are set.
fn spoke_auth(
account: &str,
issuer: Option<&String>,
client_id: Option<&String>,
) -> Option<(Arc<dyn TokenStore>, String, String)> {
match (issuer, client_id) {
(Some(issuer), Some(client_id)) => Some((
Arc::new(KeyringTokenStore::new(account)) as Arc<dyn TokenStore>,
issuer.clone(),
client_id.clone(),
)),
_ => None,
}
}
#[tokio::main]
async fn main() -> Result<()> {
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let cli = Cli::parse();
let socket = cli.socket.clone().unwrap_or_else(default_socket_path);
if let Some(parent) = socket.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating socket dir {}", parent.display()))?;
}
// Build the daemon for the chosen mode. `local`/`server` own the file (and
// hold its lock for the process's life); `client` keeps no replica.
let (_lock, daemon) = match cli.mode {
Mode::Client => {
let server_url = cli
.server_url
.clone()
.context("client mode requires --server-url")?;
tracing::info!(%server_url, "client mode: proxying to server (no local replica)");
let store = match spoke_auth(
&server_url,
cli.oidc_issuer.as_ref(),
cli.oidc_client_id.as_ref(),
) {
Some((tokens, issuer, client_id)) => {
RemoteStore::with_auth(&server_url, tokens, issuer, client_id)
}
None => RemoteStore::new(&server_url),
};
(None, Daemon::new(store))
}
Mode::Local | Mode::Server => {
let db = cli.db.clone().unwrap_or_else(default_db_path);
if let Some(parent) = db.parent() {
std::fs::create_dir_all(parent)
.with_context(|| format!("creating store dir {}", parent.display()))?;
}
// Take the exclusive lock before opening the store (tech-spec §3.1).
let lock = LockGuard::acquire(&db)?;
let store = LocalStore::open(&db, Box::new(SystemClock))?;
let spoke = cli.hub_url.as_deref().and_then(|hub| {
spoke_auth(hub, cli.oidc_issuer.as_ref(), cli.oidc_client_id.as_ref())
});
let daemon = Daemon::new(store)
.with_hub(cli.hub_url.clone())
.with_spoke_auth(spoke);
// server mode: expose the hub HTTP endpoint over the same store.
if cli.mode == Mode::Server {
let addr = cli
.http_addr
.clone()
.unwrap_or_else(|| DEFAULT_HTTP_ADDR.to_string());
let verifier: Option<Arc<dyn hephd::TokenVerifier>> =
match (cli.oidc_issuer.clone(), cli.oidc_audience.clone()) {
(Some(issuer), Some(audience)) => {
tracing::info!(%issuer, "hub requires OIDC bearer tokens");
Some(Arc::new(hephd::OidcVerifier::new(issuer, audience)))
}
(None, None) => {
tracing::warn!(
"hub running UNAUTHENTICATED (no --oidc-issuer/--oidc-audience)"
);
None
}
_ => {
anyhow::bail!("--oidc-issuer and --oidc-audience must be set together")
}
};
let app = sync::router(daemon.store(), verifier);
let http_listener = TcpListener::bind(&addr)
.await
.with_context(|| format!("binding hub HTTP endpoint {addr}"))?;
tracing::info!(%addr, "hub HTTP endpoint listening");
tokio::spawn(async move {
if let Err(e) = axum::serve(http_listener, app).await {
tracing::error!("hub HTTP endpoint stopped: {e}");
}
});
}
// spoke: background-sync the op-log with the configured hub.
daemon.spawn_sync_loop(SYNC_INTERVAL);
(Some(lock), daemon)
}
};
// Replace any stale socket from a previous run, then bind.
if socket.exists() {
std::fs::remove_file(&socket)
.with_context(|| format!("removing stale socket {}", socket.display()))?;
}
let listener = UnixListener::bind(&socket)
.with_context(|| format!("binding socket {}", socket.display()))?;
tracing::info!(socket = %socket.display(), mode = ?cli.mode, "hephd listening");
daemon.serve(listener).await
}