generated from eblume/project-template
hephd: OIDC client auth — device-code flow + token attach (auth 10b)
Some checks failed
Build / validate (pull_request) Failing after 9s
Some checks failed
Build / validate (pull_request) Failing after 9s
Close the auth loop: clients obtain a bearer token and present it to the hub (tech-spec §13). - oauth module: DeviceFlow (RFC 8628 — discover, start, poll handling authorization_pending/slow_down, refresh) + StoredToken + TokenStore (OS keyring via `keyring`, in-memory for tests) + current_bearer (loads and refreshes-on-expiry). - heph auth login/logout: runs the device flow, prints the verification URL + user code, caches the token in the keyring. - sync_once gains a bearer arg; the daemon (Daemon::spawn_sync_loop + sync.now) obtains it via current_bearer; RemoteStore attaches it to /rpc. --oidc-issuer/--oidc-client-id configure the spoke/client. - Fix a latent panic: reqwest::blocking spins its own runtime and panics inside the daemon's spawn_blocking pool. All blocking auth/proxy HTTP (OidcVerifier JWKS, DeviceFlow, RemoteStore) now uses runtime-free `ureq`; async reqwest remains only for sync_once. (Caught by the new e2e test.) - Tests (offline): device flow + refresh + token store vs a mock OAuth server; a full spoke->authenticated-hub loop (valid token accepted, missing token rejected) signed by a runtime-generated RSA key. 112 tests green; clippy -D warnings + fmt + prek clean. Slice 10 (auth) complete; next is heph.nvim. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
497c62a988
commit
f4db186234
17 changed files with 2009 additions and 97 deletions
1131
Cargo.lock
generated
1131
Cargo.lock
generated
File diff suppressed because it is too large
Load diff
|
|
@ -35,10 +35,16 @@ clap = { version = "4", features = ["derive"] }
|
|||
fs4 = "0.12"
|
||||
axum = "0.8"
|
||||
jsonwebtoken = { version = "10", features = ["rust_crypto"] }
|
||||
keyring = { version = "3", features = [
|
||||
"apple-native",
|
||||
"sync-secret-service",
|
||||
"crypto-rust",
|
||||
"vendored",
|
||||
] }
|
||||
ureq = { version = "3", features = ["json"] }
|
||||
reqwest = { version = "0.13", default-features = false, features = [
|
||||
"json",
|
||||
"query",
|
||||
"blocking",
|
||||
] }
|
||||
|
||||
[profile.release]
|
||||
|
|
|
|||
|
|
@ -10,7 +10,7 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision
|
|||
|
||||
## Status
|
||||
|
||||
**Phase 1 (v1 prototype) — in progress** on branch `feature/v1-prototype`. **All three runtime modes work, replicas sync through a hub over HTTP, and the hub authenticates op exchange with OIDC bearer tokens.** The offline-first everyday config (`local` + `hub_url`) converges end-to-end with a `yrs` text-CRDT merging bodies; the hub verifies tokens (JWKS/RS256) and enforces single-tenant ownership. Remaining: the client-side device-code login + token cache, and the Neovim plugin. Built test-first (108 tests at last update). The canonical tracker is **tech-spec §14**.
|
||||
**Phase 1 (v1 prototype) — nearly feature-complete** on branch `feature/v1-prototype`. **All three runtime modes work, replicas sync through a hub over HTTP, and op exchange is authenticated end-to-end with OIDC** (Authentik): the hub verifies bearer tokens (JWKS/RS256) and enforces single-tenant ownership, and `heph auth login` runs the device-code flow, caching tokens in the OS keyring. The offline-first everyday config (`local` + `hub_url`) converges with a `yrs` text-CRDT merging bodies. Remaining: the Neovim plugin (the primary surface). Built test-first (112 tests at last update). The canonical tracker is **tech-spec §14**.
|
||||
|
||||
| Area | State |
|
||||
|---|---|
|
||||
|
|
@ -23,7 +23,8 @@ See **[docs/explanation/design.md](docs/explanation/design.md)** for the vision
|
|||
| `server` (hub) mode + spoke push/pull sync over HTTP (axum) | ✅ done |
|
||||
| `client` mode + `RemoteStore` (online-only, no replica) | ✅ done |
|
||||
| OIDC hub auth — bearer-token verification + owner gate | ✅ done |
|
||||
| OIDC client — device-code login, keyring token cache | ⏳ next |
|
||||
| OIDC client — device-code login, keyring token cache | ✅ done |
|
||||
| `heph.nvim` (primary surface) | ⏳ next |
|
||||
| `heph.nvim` (primary surface) | ⏳ |
|
||||
|
||||
## Architecture
|
||||
|
|
@ -35,7 +36,7 @@ A Cargo workspace, layered so the same core runs from a laptop to a hub:
|
|||
- **`crates/heph`** — the CLI: a thin client of the daemon (no direct DB access).
|
||||
- **`heph.nvim/`** *(planned)* — the Neovim plugin, the primary editing/agenda surface.
|
||||
|
||||
**Storage:** SQLite is the source of truth; a node's body is markdown; `export` materializes the whole store as a directory of `.md` files. **Sync:** each device holds a full replica + an append-only op-log; devices reconcile through a hub with automatic merge (text-CRDT bodies, last-writer-wins scalars, OR-set links) and a conflict queue for the ambiguous remainder. **Auth:** the hub verifies an OIDC bearer token (Authentik) on every op exchange — RS256/JWKS verification + a single-tenant owner gate; the client-side device-code login is in progress. Local-only instances need no auth.
|
||||
**Storage:** SQLite is the source of truth; a node's body is markdown; `export` materializes the whole store as a directory of `.md` files. **Sync:** each device holds a full replica + an append-only op-log; devices reconcile through a hub with automatic merge (text-CRDT bodies, last-writer-wins scalars, OR-set links) and a conflict queue for the ambiguous remainder. **Auth:** the hub verifies an OIDC bearer token (Authentik) on every op exchange — RS256/JWKS verification + a single-tenant owner gate — and clients obtain tokens via the OAuth 2.0 device-code flow (`heph auth login`), cached in the OS keyring. Local-only instances need no auth.
|
||||
|
||||
## Build & run
|
||||
|
||||
|
|
|
|||
|
|
@ -9,7 +9,7 @@ use clap::{Parser, Subcommand};
|
|||
use serde_json::{json, Value};
|
||||
|
||||
use heph_core::{Node, RankedTask, Task};
|
||||
use hephd::{default_socket_path, Client};
|
||||
use hephd::{default_socket_path, Client, DeviceFlow, KeyringTokenStore, TokenStore};
|
||||
|
||||
#[derive(Parser, Debug)]
|
||||
#[command(name = "heph", version, about)]
|
||||
|
|
@ -81,10 +81,77 @@ enum Command {
|
|||
/// Destination directory (created if needed).
|
||||
dir: PathBuf,
|
||||
},
|
||||
/// Authenticate this device with a sync hub (OAuth 2.0 device-code flow).
|
||||
Auth {
|
||||
#[command(subcommand)]
|
||||
action: AuthAction,
|
||||
},
|
||||
}
|
||||
|
||||
#[derive(Subcommand, Debug)]
|
||||
enum AuthAction {
|
||||
/// Log in via the device-code flow; caches the bearer token for hub sync.
|
||||
Login {
|
||||
/// Hub/server URL this token is for (keys the credential store entry).
|
||||
#[arg(long)]
|
||||
hub_url: String,
|
||||
/// OIDC issuer, e.g. https://authentik.ops.eblu.me/application/o/heph/.
|
||||
#[arg(long)]
|
||||
issuer: String,
|
||||
/// OIDC client id this device authenticates as.
|
||||
#[arg(long)]
|
||||
client_id: String,
|
||||
/// Scopes to request (`offline_access` yields a refresh token).
|
||||
#[arg(long, default_value = "openid offline_access")]
|
||||
scope: String,
|
||||
},
|
||||
/// Forget the cached token for a hub.
|
||||
Logout {
|
||||
/// Hub/server URL whose cached token to remove.
|
||||
#[arg(long)]
|
||||
hub_url: String,
|
||||
},
|
||||
}
|
||||
|
||||
/// Run the device-code flow (or clear a token) — no daemon needed.
|
||||
fn run_auth(action: AuthAction) -> Result<()> {
|
||||
match action {
|
||||
AuthAction::Login {
|
||||
hub_url,
|
||||
issuer,
|
||||
client_id,
|
||||
scope,
|
||||
} => {
|
||||
let flow = DeviceFlow::discover(&issuer, &client_id)?;
|
||||
let auth = flow.start(&scope)?;
|
||||
let uri = auth
|
||||
.verification_uri_complete
|
||||
.as_deref()
|
||||
.unwrap_or(&auth.verification_uri);
|
||||
println!(
|
||||
"To authorize hephaestus, visit:\n {uri}\nand enter code: {}\n\nWaiting…",
|
||||
auth.user_code
|
||||
);
|
||||
let token = flow.poll(&auth, std::thread::sleep)?;
|
||||
KeyringTokenStore::new(hub_url.as_str()).save(&token)?;
|
||||
println!("Logged in. Token cached for {hub_url}.");
|
||||
}
|
||||
AuthAction::Logout { hub_url } => {
|
||||
KeyringTokenStore::new(hub_url.as_str()).clear()?;
|
||||
println!("Logged out of {hub_url}.");
|
||||
}
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn main() -> Result<()> {
|
||||
let cli = Cli::parse();
|
||||
|
||||
// `auth` runs locally (device-code flow + keyring); it needs no daemon.
|
||||
if let Command::Auth { action } = cli.command {
|
||||
return run_auth(action);
|
||||
}
|
||||
|
||||
let socket = cli.socket.unwrap_or_else(default_socket_path);
|
||||
let mut client = Client::connect(&socket)?;
|
||||
|
||||
|
|
@ -157,6 +224,7 @@ fn main() -> Result<()> {
|
|||
let count = result.get("count").and_then(Value::as_u64).unwrap_or(0);
|
||||
println!("Exported {count} nodes to {}", dir.display());
|
||||
}
|
||||
Command::Auth { .. } => unreachable!("auth is handled before connecting"),
|
||||
}
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -29,7 +29,9 @@ clap.workspace = true
|
|||
fs4.workspace = true
|
||||
axum.workspace = true
|
||||
jsonwebtoken.workspace = true
|
||||
keyring.workspace = true
|
||||
reqwest.workspace = true
|
||||
ureq.workspace = true
|
||||
|
||||
[dev-dependencies]
|
||||
tempfile = "3"
|
||||
|
|
|
|||
|
|
@ -62,7 +62,7 @@ struct Discovery {
|
|||
pub struct OidcVerifier {
|
||||
issuer: String,
|
||||
audience: String,
|
||||
http: reqwest::blocking::Client,
|
||||
http: ureq::Agent,
|
||||
jwks: RwLock<Option<JwkSet>>,
|
||||
}
|
||||
|
||||
|
|
@ -74,37 +74,43 @@ impl OidcVerifier {
|
|||
OidcVerifier {
|
||||
issuer: issuer.into(),
|
||||
audience: audience.into(),
|
||||
http: reqwest::blocking::Client::new(),
|
||||
http: crate::blocking_agent(),
|
||||
jwks: RwLock::new(None),
|
||||
}
|
||||
}
|
||||
|
||||
/// GET `url` and decode a JSON body, erroring on a non-success status.
|
||||
fn get_json<T: serde::de::DeserializeOwned>(&self, url: &str) -> Result<T, AuthError> {
|
||||
let mut resp = self
|
||||
.http
|
||||
.get(url)
|
||||
.call()
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
if !resp.status().is_success() {
|
||||
return Err(AuthError::Provider(format!(
|
||||
"{url} returned {}",
|
||||
resp.status()
|
||||
)));
|
||||
}
|
||||
resp.body_mut()
|
||||
.read_json()
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))
|
||||
}
|
||||
|
||||
/// Resolve the JWKS URI from the provider's discovery document.
|
||||
fn jwks_uri(&self) -> Result<String, AuthError> {
|
||||
let url = format!(
|
||||
"{}/.well-known/openid-configuration",
|
||||
self.issuer.trim_end_matches('/')
|
||||
);
|
||||
let disc: Discovery = self
|
||||
.http
|
||||
.get(url)
|
||||
.send()
|
||||
.and_then(reqwest::blocking::Response::error_for_status)
|
||||
.and_then(reqwest::blocking::Response::json)
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
let disc: Discovery = self.get_json(&url)?;
|
||||
Ok(disc.jwks_uri)
|
||||
}
|
||||
|
||||
/// Fetch (and cache) the provider's JWKS.
|
||||
fn refresh_jwks(&self) -> Result<(), AuthError> {
|
||||
let uri = self.jwks_uri()?;
|
||||
let set: JwkSet = self
|
||||
.http
|
||||
.get(uri)
|
||||
.send()
|
||||
.and_then(reqwest::blocking::Response::error_for_status)
|
||||
.and_then(reqwest::blocking::Response::json)
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
let set: JwkSet = self.get_json(&uri)?;
|
||||
*self.jwks.write().expect("jwks lock poisoned") = Some(set);
|
||||
Ok(())
|
||||
}
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ pub mod auth;
|
|||
pub mod client;
|
||||
pub mod clock;
|
||||
pub mod lock;
|
||||
pub mod oauth;
|
||||
pub mod remote;
|
||||
pub mod rpc;
|
||||
pub mod server;
|
||||
|
|
@ -22,10 +23,24 @@ pub use auth::{AuthError, Claims, OidcVerifier, TokenVerifier};
|
|||
pub use client::Client;
|
||||
pub use clock::SystemClock;
|
||||
pub use lock::LockGuard;
|
||||
pub use oauth::{current_bearer, DeviceFlow, KeyringTokenStore, StoredToken, TokenStore};
|
||||
pub use remote::RemoteStore;
|
||||
pub use server::Daemon;
|
||||
pub use sync::{sync_once, SyncReport};
|
||||
|
||||
/// A blocking HTTP agent for the auth paths (JWKS fetch, device flow, the
|
||||
/// `client`-mode `/rpc` proxy). It spins **no** async runtime, so unlike
|
||||
/// `reqwest::blocking` it is safe inside `spawn_blocking` and plain sync code;
|
||||
/// 4xx/5xx are *not* turned into errors so callers can read error bodies
|
||||
/// (e.g. the device flow's `authorization_pending`).
|
||||
pub(crate) fn blocking_agent() -> ureq::Agent {
|
||||
ureq::Agent::new_with_config(
|
||||
ureq::Agent::config_builder()
|
||||
.http_status_as_error(false)
|
||||
.build(),
|
||||
)
|
||||
}
|
||||
|
||||
/// Default unix socket path: `$XDG_RUNTIME_DIR/heph/hephd.sock`, falling back to
|
||||
/// the system temp dir when `XDG_RUNTIME_DIR` is unset (tech-spec §3).
|
||||
pub fn default_socket_path() -> PathBuf {
|
||||
|
|
|
|||
|
|
@ -17,7 +17,8 @@ use tokio::net::{TcpListener, UnixListener};
|
|||
|
||||
use heph_core::LocalStore;
|
||||
use hephd::{
|
||||
default_db_path, default_socket_path, sync, Daemon, LockGuard, RemoteStore, SystemClock,
|
||||
default_db_path, default_socket_path, sync, Daemon, KeyringTokenStore, LockGuard, RemoteStore,
|
||||
SystemClock, TokenStore,
|
||||
};
|
||||
|
||||
/// How often a spoke background-syncs with its hub.
|
||||
|
|
@ -71,6 +72,28 @@ struct Cli {
|
|||
/// OIDC audience (client id) hub tokens must carry (server mode).
|
||||
#[arg(long)]
|
||||
oidc_audience: Option<String>,
|
||||
|
||||
/// OIDC client id this device authenticates as, for spoke/client sync. With
|
||||
/// --oidc-issuer, the device attaches a cached bearer token to hub requests.
|
||||
#[arg(long)]
|
||||
oidc_client_id: Option<String>,
|
||||
}
|
||||
|
||||
/// Build the spoke/client token source: a keyring store keyed by `account` (the
|
||||
/// hub/server url) plus the issuer + client id. `None` unless both are set.
|
||||
fn spoke_auth(
|
||||
account: &str,
|
||||
issuer: Option<&String>,
|
||||
client_id: Option<&String>,
|
||||
) -> Option<(Arc<dyn TokenStore>, String, String)> {
|
||||
match (issuer, client_id) {
|
||||
(Some(issuer), Some(client_id)) => Some((
|
||||
Arc::new(KeyringTokenStore::new(account)) as Arc<dyn TokenStore>,
|
||||
issuer.clone(),
|
||||
client_id.clone(),
|
||||
)),
|
||||
_ => None,
|
||||
}
|
||||
}
|
||||
|
||||
#[tokio::main]
|
||||
|
|
@ -98,7 +121,17 @@ async fn main() -> Result<()> {
|
|||
.clone()
|
||||
.context("client mode requires --server-url")?;
|
||||
tracing::info!(%server_url, "client mode: proxying to server (no local replica)");
|
||||
(None, Daemon::new(RemoteStore::new(&server_url)))
|
||||
let store = match spoke_auth(
|
||||
&server_url,
|
||||
cli.oidc_issuer.as_ref(),
|
||||
cli.oidc_client_id.as_ref(),
|
||||
) {
|
||||
Some((tokens, issuer, client_id)) => {
|
||||
RemoteStore::with_auth(&server_url, tokens, issuer, client_id)
|
||||
}
|
||||
None => RemoteStore::new(&server_url),
|
||||
};
|
||||
(None, Daemon::new(store))
|
||||
}
|
||||
Mode::Local | Mode::Server => {
|
||||
let db = cli.db.clone().unwrap_or_else(default_db_path);
|
||||
|
|
@ -109,7 +142,12 @@ async fn main() -> Result<()> {
|
|||
// Take the exclusive lock before opening the store (tech-spec §3.1).
|
||||
let lock = LockGuard::acquire(&db)?;
|
||||
let store = LocalStore::open(&db, Box::new(SystemClock))?;
|
||||
let daemon = Daemon::new(store).with_hub(cli.hub_url.clone());
|
||||
let spoke = cli.hub_url.as_deref().and_then(|hub| {
|
||||
spoke_auth(hub, cli.oidc_issuer.as_ref(), cli.oidc_client_id.as_ref())
|
||||
});
|
||||
let daemon = Daemon::new(store)
|
||||
.with_hub(cli.hub_url.clone())
|
||||
.with_spoke_auth(spoke);
|
||||
|
||||
// server mode: expose the hub HTTP endpoint over the same store.
|
||||
if cli.mode == Mode::Server {
|
||||
|
|
@ -146,20 +184,7 @@ async fn main() -> Result<()> {
|
|||
}
|
||||
|
||||
// spoke: background-sync the op-log with the configured hub.
|
||||
if let Some(hub) = cli.hub_url.clone() {
|
||||
let store = daemon.store();
|
||||
tokio::spawn(async move {
|
||||
let http = reqwest::Client::new();
|
||||
let mut tick = tokio::time::interval(SYNC_INTERVAL);
|
||||
loop {
|
||||
tick.tick().await;
|
||||
match hephd::sync_once(store.clone(), &hub, &http).await {
|
||||
Ok(report) => tracing::debug!(?report, "background sync"),
|
||||
Err(e) => tracing::warn!("background sync failed: {e}"),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
daemon.spawn_sync_loop(SYNC_INTERVAL);
|
||||
|
||||
(Some(lock), daemon)
|
||||
}
|
||||
|
|
|
|||
327
crates/hephd/src/oauth.rs
Normal file
327
crates/hephd/src/oauth.rs
Normal file
|
|
@ -0,0 +1,327 @@
|
|||
//! Client-side OIDC: the OAuth 2.0 device-code flow (RFC 8628), token storage,
|
||||
//! and refresh (tech-spec §13).
|
||||
//!
|
||||
//! A spoke (`local` + `hub_url`) or a `client` uses this to obtain the bearer
|
||||
//! token it presents to the hub. The flow is **blocking** — it is interactive
|
||||
//! (`heph auth login` waits for the user to authorize in a browser) and the
|
||||
//! daemon only refreshes from its blocking pool. Tokens persist in a
|
||||
//! [`TokenStore`] (the OS keyring in production, in-memory in tests).
|
||||
|
||||
use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
||||
|
||||
use serde::{Deserialize, Serialize};
|
||||
|
||||
use crate::auth::AuthError;
|
||||
|
||||
/// The standard device-code grant type.
|
||||
const DEVICE_GRANT: &str = "urn:ietf:params:oauth:grant-type:device_code";
|
||||
/// Treat a token as expired this many seconds early, to avoid races.
|
||||
const EXPIRY_SKEW: u64 = 30;
|
||||
|
||||
/// Persisted OIDC tokens for one provider.
|
||||
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
|
||||
pub struct StoredToken {
|
||||
/// The bearer token presented to the hub.
|
||||
pub access_token: String,
|
||||
/// Used to obtain a fresh access token without re-authenticating.
|
||||
pub refresh_token: Option<String>,
|
||||
/// Unix seconds at which `access_token` expires.
|
||||
pub expires_at: u64,
|
||||
}
|
||||
|
||||
impl StoredToken {
|
||||
/// Whether the access token is expired (or within the safety skew).
|
||||
pub fn is_expired(&self, now: u64) -> bool {
|
||||
now + EXPIRY_SKEW >= self.expires_at
|
||||
}
|
||||
}
|
||||
|
||||
/// Current unix time in seconds.
|
||||
fn now_secs() -> u64 {
|
||||
SystemTime::now()
|
||||
.duration_since(UNIX_EPOCH)
|
||||
.unwrap_or_default()
|
||||
.as_secs()
|
||||
}
|
||||
|
||||
/// Where tokens persist between runs.
|
||||
pub trait TokenStore: Send + Sync {
|
||||
/// Load the stored token, if any.
|
||||
fn load(&self) -> Option<StoredToken>;
|
||||
/// Persist (replacing) the token.
|
||||
fn save(&self, token: &StoredToken) -> Result<(), AuthError>;
|
||||
/// Remove any stored token.
|
||||
fn clear(&self) -> Result<(), AuthError>;
|
||||
}
|
||||
|
||||
/// An in-memory [`TokenStore`] for tests.
|
||||
#[derive(Default)]
|
||||
pub struct MemoryTokenStore(std::sync::Mutex<Option<StoredToken>>);
|
||||
|
||||
impl TokenStore for MemoryTokenStore {
|
||||
fn load(&self) -> Option<StoredToken> {
|
||||
self.0.lock().expect("token lock poisoned").clone()
|
||||
}
|
||||
fn save(&self, token: &StoredToken) -> Result<(), AuthError> {
|
||||
*self.0.lock().expect("token lock poisoned") = Some(token.clone());
|
||||
Ok(())
|
||||
}
|
||||
fn clear(&self) -> Result<(), AuthError> {
|
||||
*self.0.lock().expect("token lock poisoned") = None;
|
||||
Ok(())
|
||||
}
|
||||
}
|
||||
|
||||
/// A [`TokenStore`] backed by the OS keyring (Keychain / Secret Service). The
|
||||
/// token JSON is stored as the secret for `(service, account)`.
|
||||
pub struct KeyringTokenStore {
|
||||
service: String,
|
||||
account: String,
|
||||
}
|
||||
|
||||
impl KeyringTokenStore {
|
||||
/// Store tokens under this service, keyed by `account` (the hub url).
|
||||
pub fn new(account: impl Into<String>) -> KeyringTokenStore {
|
||||
KeyringTokenStore {
|
||||
service: "hephaestus".into(),
|
||||
account: account.into(),
|
||||
}
|
||||
}
|
||||
|
||||
fn entry(&self) -> Result<keyring::Entry, AuthError> {
|
||||
keyring::Entry::new(&self.service, &self.account)
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))
|
||||
}
|
||||
}
|
||||
|
||||
impl TokenStore for KeyringTokenStore {
|
||||
fn load(&self) -> Option<StoredToken> {
|
||||
let secret = self.entry().ok()?.get_password().ok()?;
|
||||
serde_json::from_str(&secret).ok()
|
||||
}
|
||||
fn save(&self, token: &StoredToken) -> Result<(), AuthError> {
|
||||
let json = serde_json::to_string(token).map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
self.entry()?
|
||||
.set_password(&json)
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))
|
||||
}
|
||||
fn clear(&self) -> Result<(), AuthError> {
|
||||
match self.entry()?.delete_credential() {
|
||||
Ok(()) => Ok(()),
|
||||
Err(keyring::Error::NoEntry) => Ok(()),
|
||||
Err(e) => Err(AuthError::Provider(e.to_string())),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// The device-authorization response (RFC 8628 §3.2).
|
||||
#[derive(Debug, Clone, Deserialize)]
|
||||
pub struct DeviceAuth {
|
||||
/// The code the daemon polls the token endpoint with.
|
||||
pub device_code: String,
|
||||
/// The short code the user types at the verification page.
|
||||
pub user_code: String,
|
||||
/// Where the user goes to authorize.
|
||||
pub verification_uri: String,
|
||||
/// Verification URI with the code pre-filled (optional).
|
||||
#[serde(default)]
|
||||
pub verification_uri_complete: Option<String>,
|
||||
/// Seconds between polls.
|
||||
#[serde(default = "default_interval")]
|
||||
pub interval: u64,
|
||||
/// Seconds until `device_code` expires.
|
||||
pub expires_in: u64,
|
||||
}
|
||||
|
||||
fn default_interval() -> u64 {
|
||||
5
|
||||
}
|
||||
|
||||
/// Discovery fields the device flow needs.
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct DiscoveryDoc {
|
||||
device_authorization_endpoint: String,
|
||||
token_endpoint: String,
|
||||
}
|
||||
|
||||
/// A token-endpoint success response.
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct TokenResponse {
|
||||
access_token: String,
|
||||
#[serde(default)]
|
||||
refresh_token: Option<String>,
|
||||
#[serde(default)]
|
||||
expires_in: Option<u64>,
|
||||
}
|
||||
|
||||
impl TokenResponse {
|
||||
fn into_stored(self) -> StoredToken {
|
||||
StoredToken {
|
||||
access_token: self.access_token,
|
||||
refresh_token: self.refresh_token,
|
||||
expires_at: now_secs() + self.expires_in.unwrap_or(3600),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A token-endpoint error response (RFC 6749 §5.2 / RFC 8628 §3.5).
|
||||
#[derive(Debug, Deserialize)]
|
||||
struct TokenErrorBody {
|
||||
error: String,
|
||||
}
|
||||
|
||||
/// Drives the OAuth 2.0 device-code flow against one provider.
|
||||
pub struct DeviceFlow {
|
||||
client_id: String,
|
||||
http: ureq::Agent,
|
||||
device_authorization_endpoint: String,
|
||||
token_endpoint: String,
|
||||
}
|
||||
|
||||
impl DeviceFlow {
|
||||
/// Discover the device + token endpoints from `issuer` and build a flow.
|
||||
pub fn discover(issuer: &str, client_id: &str) -> Result<DeviceFlow, AuthError> {
|
||||
let http = crate::blocking_agent();
|
||||
let url = format!(
|
||||
"{}/.well-known/openid-configuration",
|
||||
issuer.trim_end_matches('/')
|
||||
);
|
||||
let mut resp = http
|
||||
.get(&url)
|
||||
.call()
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
if !resp.status().is_success() {
|
||||
return Err(AuthError::Provider(format!(
|
||||
"discovery returned {}",
|
||||
resp.status()
|
||||
)));
|
||||
}
|
||||
let doc: DiscoveryDoc = resp
|
||||
.body_mut()
|
||||
.read_json()
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
Ok(DeviceFlow {
|
||||
client_id: client_id.to_string(),
|
||||
http,
|
||||
device_authorization_endpoint: doc.device_authorization_endpoint,
|
||||
token_endpoint: doc.token_endpoint,
|
||||
})
|
||||
}
|
||||
|
||||
/// Request a device + user code (RFC 8628 §3.1).
|
||||
pub fn start(&self, scope: &str) -> Result<DeviceAuth, AuthError> {
|
||||
let mut resp = self
|
||||
.http
|
||||
.post(&self.device_authorization_endpoint)
|
||||
.send_form([("client_id", self.client_id.as_str()), ("scope", scope)])
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
if !resp.status().is_success() {
|
||||
return Err(AuthError::Provider(format!(
|
||||
"device authorization returned {}",
|
||||
resp.status()
|
||||
)));
|
||||
}
|
||||
resp.body_mut()
|
||||
.read_json()
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))
|
||||
}
|
||||
|
||||
/// Poll the token endpoint until the user authorizes, the code expires, or
|
||||
/// access is denied. `sleep` is injected so tests need not wait in real
|
||||
/// time (production passes [`std::thread::sleep`]).
|
||||
pub fn poll(
|
||||
&self,
|
||||
auth: &DeviceAuth,
|
||||
sleep: impl Fn(Duration),
|
||||
) -> Result<StoredToken, AuthError> {
|
||||
let deadline = now_secs() + auth.expires_in;
|
||||
let mut interval = auth.interval.max(1);
|
||||
loop {
|
||||
if now_secs() >= deadline {
|
||||
return Err(AuthError::Invalid("device code expired".into()));
|
||||
}
|
||||
let mut response = self
|
||||
.http
|
||||
.post(&self.token_endpoint)
|
||||
.send_form([
|
||||
("grant_type", DEVICE_GRANT),
|
||||
("device_code", auth.device_code.as_str()),
|
||||
("client_id", self.client_id.as_str()),
|
||||
])
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
|
||||
if response.status().is_success() {
|
||||
let token: TokenResponse = response
|
||||
.body_mut()
|
||||
.read_json()
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
return Ok(token.into_stored());
|
||||
}
|
||||
|
||||
// A non-success is either "keep waiting" or a terminal failure.
|
||||
let body: TokenErrorBody = response
|
||||
.body_mut()
|
||||
.read_json()
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
match body.error.as_str() {
|
||||
"authorization_pending" => {}
|
||||
"slow_down" => interval += 5,
|
||||
other => return Err(AuthError::Invalid(format!("device flow failed: {other}"))),
|
||||
}
|
||||
sleep(Duration::from_secs(interval));
|
||||
}
|
||||
}
|
||||
|
||||
/// Exchange a refresh token for a fresh access token (RFC 6749 §6).
|
||||
pub fn refresh(&self, refresh_token: &str) -> Result<StoredToken, AuthError> {
|
||||
let mut response = self
|
||||
.http
|
||||
.post(&self.token_endpoint)
|
||||
.send_form([
|
||||
("grant_type", "refresh_token"),
|
||||
("refresh_token", refresh_token),
|
||||
("client_id", self.client_id.as_str()),
|
||||
])
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?;
|
||||
if !response.status().is_success() {
|
||||
return Err(AuthError::Provider(format!(
|
||||
"token refresh returned {}",
|
||||
response.status()
|
||||
)));
|
||||
}
|
||||
let mut token: StoredToken = response
|
||||
.body_mut()
|
||||
.read_json::<TokenResponse>()
|
||||
.map_err(|e| AuthError::Provider(e.to_string()))?
|
||||
.into_stored();
|
||||
// Providers may omit the refresh token on refresh — keep the old one.
|
||||
if token.refresh_token.is_none() {
|
||||
token.refresh_token = Some(refresh_token.to_string());
|
||||
}
|
||||
Ok(token)
|
||||
}
|
||||
}
|
||||
|
||||
/// Return a usable access token from `store`, refreshing via `issuer`/`client_id`
|
||||
/// if the stored one is expired. Returns `None` if nothing is stored; errors if
|
||||
/// a refresh was needed but failed. Saves a refreshed token back to `store`.
|
||||
pub fn current_bearer(
|
||||
store: &dyn TokenStore,
|
||||
issuer: &str,
|
||||
client_id: &str,
|
||||
) -> Result<Option<String>, AuthError> {
|
||||
let Some(token) = store.load() else {
|
||||
return Ok(None);
|
||||
};
|
||||
if !token.is_expired(now_secs()) {
|
||||
return Ok(Some(token.access_token));
|
||||
}
|
||||
let Some(refresh) = token.refresh_token.clone() else {
|
||||
return Err(AuthError::Invalid(
|
||||
"token expired and no refresh token".into(),
|
||||
));
|
||||
};
|
||||
let refreshed = DeviceFlow::discover(issuer, client_id)?.refresh(&refresh)?;
|
||||
store.save(&refreshed)?;
|
||||
Ok(Some(refreshed.access_token))
|
||||
}
|
||||
|
|
@ -11,6 +11,8 @@
|
|||
//! background-syncs, it reads and writes the hub live. They are stubbed
|
||||
//! accordingly; the daemon never invokes them in this mode.
|
||||
|
||||
use std::sync::Arc;
|
||||
|
||||
use serde::de::DeserializeOwned;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
|
|
@ -19,33 +21,74 @@ use heph_core::{
|
|||
SyncCursors, Task, TaskState,
|
||||
};
|
||||
|
||||
use crate::oauth::{self, TokenStore};
|
||||
use crate::rpc::{Response, NOT_FOUND};
|
||||
|
||||
/// How a client obtains the bearer token it presents to the server.
|
||||
struct AuthCtx {
|
||||
tokens: Arc<dyn TokenStore>,
|
||||
issuer: String,
|
||||
client_id: String,
|
||||
}
|
||||
|
||||
/// A no-replica store that proxies to a `server` over HTTP.
|
||||
pub struct RemoteStore {
|
||||
base: String,
|
||||
http: reqwest::blocking::Client,
|
||||
http: ureq::Agent,
|
||||
auth: Option<AuthCtx>,
|
||||
}
|
||||
|
||||
impl RemoteStore {
|
||||
/// Point a client at `server_url` (e.g. `http://hub.example:8787`).
|
||||
/// Point a client at `server_url` (e.g. `http://hub.example:8787`),
|
||||
/// unauthenticated.
|
||||
pub fn new(server_url: &str) -> RemoteStore {
|
||||
RemoteStore {
|
||||
base: server_url.trim_end_matches('/').to_string(),
|
||||
http: reqwest::blocking::Client::new(),
|
||||
http: crate::blocking_agent(),
|
||||
auth: None,
|
||||
}
|
||||
}
|
||||
|
||||
/// Point a client at `server_url`, attaching a cached OIDC bearer token
|
||||
/// (refreshed as needed) from `tokens` to every call.
|
||||
pub fn with_auth(
|
||||
server_url: &str,
|
||||
tokens: Arc<dyn TokenStore>,
|
||||
issuer: String,
|
||||
client_id: String,
|
||||
) -> RemoteStore {
|
||||
RemoteStore {
|
||||
auth: Some(AuthCtx {
|
||||
tokens,
|
||||
issuer,
|
||||
client_id,
|
||||
}),
|
||||
..RemoteStore::new(server_url)
|
||||
}
|
||||
}
|
||||
|
||||
/// Issue one `/rpc` call, returning the raw `result` value.
|
||||
fn call(&self, method: &str, params: Value) -> Result<Value> {
|
||||
let response: Response = self
|
||||
.http
|
||||
.post(format!("{}/rpc", self.base))
|
||||
.json(&json!({ "method": method, "params": params }))
|
||||
.send()
|
||||
.and_then(reqwest::blocking::Response::error_for_status)
|
||||
.map_err(|e| Error::Remote(e.to_string()))?
|
||||
.json()
|
||||
let mut request = self.http.post(format!("{}/rpc", self.base));
|
||||
if let Some(auth) = &self.auth {
|
||||
let bearer = oauth::current_bearer(auth.tokens.as_ref(), &auth.issuer, &auth.client_id)
|
||||
.map_err(|e| Error::Remote(e.to_string()))?;
|
||||
if let Some(bearer) = bearer {
|
||||
request = request.header("Authorization", format!("Bearer {bearer}"));
|
||||
}
|
||||
}
|
||||
let mut http_response = request
|
||||
.send_json(json!({ "method": method, "params": params }))
|
||||
.map_err(|e| Error::Remote(e.to_string()))?;
|
||||
if !http_response.status().is_success() {
|
||||
return Err(Error::Remote(format!(
|
||||
"server returned {}",
|
||||
http_response.status()
|
||||
)));
|
||||
}
|
||||
let response: Response = http_response
|
||||
.body_mut()
|
||||
.read_json()
|
||||
.map_err(|e| Error::Remote(e.to_string()))?;
|
||||
if let Some(err) = response.error {
|
||||
// Preserve "not found" so callers keep the typed contract.
|
||||
|
|
|
|||
|
|
@ -10,6 +10,7 @@
|
|||
//! ops with the configured hub (tech-spec §6.1, §12).
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::Result;
|
||||
use serde_json::{json, Value};
|
||||
|
|
@ -18,9 +19,18 @@ use tokio::net::{UnixListener, UnixStream};
|
|||
|
||||
use heph_core::Store;
|
||||
|
||||
use crate::oauth::{self, TokenStore};
|
||||
use crate::rpc::{self, Request, Response, RpcError, INTERNAL_ERROR, PARSE_ERROR};
|
||||
use crate::sync::{self, SharedStore};
|
||||
|
||||
/// How a spoke obtains the bearer token it presents to its hub (tech-spec §13).
|
||||
#[derive(Clone)]
|
||||
struct SpokeAuth {
|
||||
store: Arc<dyn TokenStore>,
|
||||
issuer: String,
|
||||
client_id: String,
|
||||
}
|
||||
|
||||
/// The shared, cheaply-cloneable context each connection serves from.
|
||||
#[derive(Clone)]
|
||||
struct Ctx {
|
||||
|
|
@ -28,6 +38,28 @@ struct Ctx {
|
|||
/// The hub this device syncs with, if it is a spoke (`local` + `hub_url`).
|
||||
hub_url: Option<String>,
|
||||
http: reqwest::Client,
|
||||
/// Token source for authenticated sync (None ⇒ unauthenticated hub).
|
||||
auth: Option<SpokeAuth>,
|
||||
}
|
||||
|
||||
impl Ctx {
|
||||
/// The current bearer token for hub sync (refreshing if expired), or `None`
|
||||
/// if this spoke has no auth configured / no usable token.
|
||||
async fn bearer(&self) -> Option<String> {
|
||||
let auth = self.auth.clone()?;
|
||||
let result = tokio::task::spawn_blocking(move || {
|
||||
oauth::current_bearer(auth.store.as_ref(), &auth.issuer, &auth.client_id)
|
||||
})
|
||||
.await;
|
||||
match result {
|
||||
Ok(Ok(token)) => token,
|
||||
Ok(Err(e)) => {
|
||||
tracing::warn!("could not obtain bearer token: {e}");
|
||||
None
|
||||
}
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/// A running daemon over a shared store (any [`Store`] backend).
|
||||
|
|
@ -43,6 +75,7 @@ impl Daemon {
|
|||
store: Arc::new(Mutex::new(store)),
|
||||
hub_url: None,
|
||||
http: reqwest::Client::new(),
|
||||
auth: None,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
|
@ -53,12 +86,47 @@ impl Daemon {
|
|||
self
|
||||
}
|
||||
|
||||
/// Configure how this spoke obtains its bearer token for authenticated sync.
|
||||
/// `None` (or an unset hub) leaves sync unauthenticated.
|
||||
pub fn with_spoke_auth(
|
||||
mut self,
|
||||
auth: Option<(Arc<dyn TokenStore>, String, String)>,
|
||||
) -> Daemon {
|
||||
self.ctx.auth = auth.map(|(store, issuer, client_id)| SpokeAuth {
|
||||
store,
|
||||
issuer,
|
||||
client_id,
|
||||
});
|
||||
self
|
||||
}
|
||||
|
||||
/// The shared store handle, for code that needs to reach the same store the
|
||||
/// daemon serves (the hub HTTP router and background sync, tech-spec §6.1).
|
||||
pub fn store(&self) -> SharedStore {
|
||||
self.ctx.store.clone()
|
||||
}
|
||||
|
||||
/// If this is a spoke (`hub_url` set), spawn a background task that syncs the
|
||||
/// op-log with the hub every `interval` (attaching a bearer token when auth
|
||||
/// is configured). No-op otherwise.
|
||||
pub fn spawn_sync_loop(&self, interval: Duration) {
|
||||
let Some(hub) = self.ctx.hub_url.clone() else {
|
||||
return;
|
||||
};
|
||||
let ctx = self.ctx.clone();
|
||||
tokio::spawn(async move {
|
||||
let mut tick = tokio::time::interval(interval);
|
||||
loop {
|
||||
tick.tick().await;
|
||||
let bearer = ctx.bearer().await;
|
||||
match sync::sync_once(ctx.store.clone(), &hub, &ctx.http, bearer.as_deref()).await {
|
||||
Ok(report) => tracing::debug!(?report, "background sync"),
|
||||
Err(e) => tracing::warn!("background sync failed: {e}"),
|
||||
}
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/// Serve connections on `listener` until the task is cancelled. Each
|
||||
/// connection is handled concurrently; all share the one store.
|
||||
pub async fn serve(&self, listener: UnixListener) -> Result<()> {
|
||||
|
|
@ -145,7 +213,8 @@ async fn sync_now(ctx: &Ctx) -> Result<Value, RpcError> {
|
|||
message: "no hub_url configured; this instance is standalone".into(),
|
||||
});
|
||||
};
|
||||
match sync::sync_once(ctx.store.clone(), &hub_url, &ctx.http).await {
|
||||
let bearer = ctx.bearer().await;
|
||||
match sync::sync_once(ctx.store.clone(), &hub_url, &ctx.http, bearer.as_deref()).await {
|
||||
Ok(report) => Ok(json!(report)),
|
||||
Err(e) => Err(RpcError {
|
||||
code: INTERNAL_ERROR,
|
||||
|
|
|
|||
|
|
@ -12,9 +12,10 @@
|
|||
//!
|
||||
//! Exchange is **incremental by HLC cursor** (`sync_state`, [`heph_core::SyncCursors`]):
|
||||
//! each side transfers only the tail it hasn't sent/seen. Merge is idempotent,
|
||||
//! so a re-pushed op the hub already has is a harmless no-op. Auth is deferred to
|
||||
//! tech-spec §13 (slice 10) — the endpoint is currently unauthenticated and
|
||||
//! scoped to the hub's single owner.
|
||||
//! so a re-pushed op the hub already has is a harmless no-op. When the hub is
|
||||
//! configured with a verifier ([`crate::auth`]), every route requires a valid
|
||||
//! OIDC bearer token whose `sub` owns the hub (tech-spec §13); spokes attach
|
||||
//! that token via the `bearer` argument to [`sync_once`].
|
||||
|
||||
use std::sync::{Arc, Mutex};
|
||||
|
||||
|
|
@ -234,6 +235,7 @@ pub async fn sync_once(
|
|||
store: SharedStore,
|
||||
hub_url: &str,
|
||||
http: &reqwest::Client,
|
||||
bearer: Option<&str>,
|
||||
) -> Result<SyncReport> {
|
||||
let base = hub_url.trim_end_matches('/');
|
||||
let mut report = SyncReport::default();
|
||||
|
|
@ -248,6 +250,9 @@ pub async fn sync_once(
|
|||
if let Some(after) = &cursors.last_pulled_hlc {
|
||||
req = req.query(&[("after", after)]);
|
||||
}
|
||||
if let Some(token) = bearer {
|
||||
req = req.bearer_auth(token);
|
||||
}
|
||||
let pulled: OpsBody = req.send().await?.error_for_status()?.json().await?;
|
||||
report.pulled = pulled.ops.len();
|
||||
if !pulled.ops.is_empty() {
|
||||
|
|
@ -268,11 +273,13 @@ pub async fn sync_once(
|
|||
if !to_push.is_empty() {
|
||||
// `ops_since` returns HLC order, so the last is the new cursor.
|
||||
let max_pushed = to_push.last().map(|o| o.hlc.clone());
|
||||
http.post(format!("{base}/sync/push"))
|
||||
.json(&OpsBody { ops: to_push })
|
||||
.send()
|
||||
.await?
|
||||
.error_for_status()?;
|
||||
let mut req = http
|
||||
.post(format!("{base}/sync/push"))
|
||||
.json(&OpsBody { ops: to_push });
|
||||
if let Some(token) = bearer {
|
||||
req = req.bearer_auth(token);
|
||||
}
|
||||
req.send().await?.error_for_status()?;
|
||||
if let Some(cursor) = max_pushed {
|
||||
let hub = hub_url.to_string();
|
||||
with_store(&store, move |s| s.record_sync(&hub, Some(&cursor), None)).await?;
|
||||
|
|
|
|||
|
|
@ -27,7 +27,7 @@ use rsa::{RsaPrivateKey, RsaPublicKey};
|
|||
use serde::Serialize;
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use heph_core::{FixedClock, LocalStore};
|
||||
use heph_core::{FixedClock, LocalStore, NewNode, Store};
|
||||
use hephd::auth::{AuthError, Claims, OidcVerifier, TokenVerifier};
|
||||
use hephd::sync::{self, SharedStore};
|
||||
|
||||
|
|
@ -102,14 +102,15 @@ fn start_hub(verifier: Option<Arc<dyn TokenVerifier>>) -> String {
|
|||
|
||||
/// `POST /rpc health` with an optional bearer token; return the HTTP status.
|
||||
fn rpc_health_status(base: &str, token: Option<&str>) -> u16 {
|
||||
let http = reqwest::blocking::Client::new();
|
||||
let mut req = http
|
||||
.post(format!("{base}/rpc"))
|
||||
.json(&json!({ "method": "health", "params": {} }));
|
||||
let mut req = ureq::post(format!("{base}/rpc"));
|
||||
if let Some(t) = token {
|
||||
req = req.header("Authorization", format!("Bearer {t}"));
|
||||
}
|
||||
req.send().unwrap().status().as_u16()
|
||||
match req.send_json(json!({ "method": "health", "params": {} })) {
|
||||
Ok(resp) => resp.status().as_u16(),
|
||||
Err(ureq::Error::StatusCode(code)) => code,
|
||||
Err(e) => panic!("request failed: {e}"),
|
||||
}
|
||||
}
|
||||
|
||||
fn stub(pairs: &[(&str, &str)]) -> Option<Arc<dyn TokenVerifier>> {
|
||||
|
|
@ -292,3 +293,67 @@ fn oidc_verifier_rejects_forgeries() {
|
|||
let token = encode(&header, &nosub, &rsa_key()).unwrap();
|
||||
assert!(verifier.verify(&token).is_err(), "missing sub");
|
||||
}
|
||||
|
||||
// --- layer 3: the loop closes — spoke ⇄ authed hub over real HTTP -----------
|
||||
|
||||
const OWNER: &str = "canonical-user";
|
||||
|
||||
/// Adopt the canonical owner over a temp store and share it.
|
||||
fn shared_replica() -> SharedStore {
|
||||
let dir = Box::leak(Box::new(tempfile::tempdir().unwrap()));
|
||||
let mut store =
|
||||
LocalStore::open(dir.path().join("heph.db"), Box::new(FixedClock(NOW))).unwrap();
|
||||
store.adopt_owner(OWNER).unwrap();
|
||||
Arc::new(Mutex::new(store))
|
||||
}
|
||||
|
||||
#[tokio::test]
|
||||
async fn spoke_syncs_to_an_authed_hub_only_with_a_valid_token() {
|
||||
let issuer = start_mock_idp();
|
||||
|
||||
// A hub that requires tokens issued by the mock IdP.
|
||||
let hub_store = shared_replica();
|
||||
let verifier = Arc::new(OidcVerifier::new(issuer.clone(), AUDIENCE));
|
||||
let hub_listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let hub_url = format!("http://{}", hub_listener.local_addr().unwrap());
|
||||
{
|
||||
let app = sync::router(hub_store.clone(), Some(verifier));
|
||||
tokio::spawn(async move { axum::serve(hub_listener, app).await.unwrap() });
|
||||
}
|
||||
|
||||
// A spoke with a local node to push.
|
||||
let spoke = shared_replica();
|
||||
let node_id = spoke
|
||||
.lock()
|
||||
.unwrap()
|
||||
.create_node(NewNode::doc("Roof", "shingles"))
|
||||
.unwrap()
|
||||
.id;
|
||||
|
||||
let http = reqwest::Client::new();
|
||||
|
||||
// Without a token the hub refuses the exchange.
|
||||
assert!(
|
||||
sync::sync_once(spoke.clone(), &hub_url, &http, None)
|
||||
.await
|
||||
.is_err(),
|
||||
"unauthenticated sync must fail"
|
||||
);
|
||||
|
||||
// With a valid token signed by the IdP, the push is accepted and the node
|
||||
// reaches the hub.
|
||||
let token = sign(&good_claims(&issuer), KID);
|
||||
let report = sync::sync_once(spoke.clone(), &hub_url, &http, Some(&token))
|
||||
.await
|
||||
.expect("authenticated sync succeeds");
|
||||
assert!(report.pushed > 0, "spoke pushed nothing");
|
||||
assert!(
|
||||
hub_store
|
||||
.lock()
|
||||
.unwrap()
|
||||
.get_node(&node_id)
|
||||
.unwrap()
|
||||
.is_some(),
|
||||
"node did not reach the hub"
|
||||
);
|
||||
}
|
||||
|
|
|
|||
153
crates/hephd/tests/oauth.rs
Normal file
153
crates/hephd/tests/oauth.rs
Normal file
|
|
@ -0,0 +1,153 @@
|
|||
//! Device-code flow + token store (tech-spec §13, slice 10b), offline.
|
||||
//!
|
||||
//! A mock OAuth provider serves discovery, the device-authorization endpoint,
|
||||
//! and the token endpoint (which reports `authorization_pending` once before
|
||||
//! issuing tokens). We drive `DeviceFlow` against it with an injected no-op
|
||||
//! sleep, so the polling loop is exercised deterministically and instantly.
|
||||
|
||||
use std::collections::HashMap;
|
||||
use std::sync::atomic::{AtomicUsize, Ordering};
|
||||
use std::sync::{mpsc, Arc};
|
||||
use std::thread;
|
||||
use std::time::Duration;
|
||||
|
||||
use axum::extract::{Form, State};
|
||||
use axum::http::StatusCode;
|
||||
use axum::response::{IntoResponse, Response};
|
||||
use axum::routing::{get, post};
|
||||
use axum::{Json, Router};
|
||||
use serde_json::{json, Value};
|
||||
|
||||
use hephd::oauth::{DeviceFlow, MemoryTokenStore, StoredToken, TokenStore};
|
||||
|
||||
#[derive(Clone)]
|
||||
struct IdpState {
|
||||
base: String,
|
||||
/// How many times the token endpoint has been polled for the device code.
|
||||
polls: Arc<AtomicUsize>,
|
||||
}
|
||||
|
||||
/// Start a mock OIDC provider; return its base URL.
|
||||
fn start_idp() -> String {
|
||||
let (tx, rx) = mpsc::channel();
|
||||
thread::spawn(move || {
|
||||
let rt = tokio::runtime::Builder::new_current_thread()
|
||||
.enable_all()
|
||||
.build()
|
||||
.unwrap();
|
||||
rt.block_on(async move {
|
||||
let listener = tokio::net::TcpListener::bind("127.0.0.1:0").await.unwrap();
|
||||
let base = format!("http://{}", listener.local_addr().unwrap());
|
||||
tx.send(base.clone()).unwrap();
|
||||
let state = IdpState {
|
||||
base,
|
||||
polls: Arc::new(AtomicUsize::new(0)),
|
||||
};
|
||||
let app = Router::new()
|
||||
.route("/.well-known/openid-configuration", get(discovery))
|
||||
.route("/device", post(device_authorization))
|
||||
.route("/token", post(token))
|
||||
.with_state(state);
|
||||
axum::serve(listener, app).await.unwrap();
|
||||
});
|
||||
});
|
||||
rx.recv_timeout(Duration::from_secs(5)).unwrap()
|
||||
}
|
||||
|
||||
async fn discovery(State(s): State<IdpState>) -> Json<Value> {
|
||||
Json(json!({
|
||||
"issuer": s.base,
|
||||
"device_authorization_endpoint": format!("{}/device", s.base),
|
||||
"token_endpoint": format!("{}/token", s.base),
|
||||
}))
|
||||
}
|
||||
|
||||
async fn device_authorization(State(s): State<IdpState>) -> Json<Value> {
|
||||
Json(json!({
|
||||
"device_code": "dev-code-xyz",
|
||||
"user_code": "WDJB-MJHT",
|
||||
"verification_uri": format!("{}/activate", s.base),
|
||||
"interval": 1,
|
||||
"expires_in": 300,
|
||||
}))
|
||||
}
|
||||
|
||||
async fn token(State(s): State<IdpState>, Form(form): Form<HashMap<String, String>>) -> Response {
|
||||
match form.get("grant_type").map(String::as_str) {
|
||||
Some("urn:ietf:params:oauth:grant-type:device_code") => {
|
||||
// Report pending on the first poll, then issue tokens.
|
||||
if s.polls.fetch_add(1, Ordering::SeqCst) == 0 {
|
||||
return (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(json!({ "error": "authorization_pending" })),
|
||||
)
|
||||
.into_response();
|
||||
}
|
||||
Json(json!({
|
||||
"access_token": "access-1",
|
||||
"refresh_token": "refresh-1",
|
||||
"expires_in": 3600,
|
||||
}))
|
||||
.into_response()
|
||||
}
|
||||
Some("refresh_token") => Json(json!({
|
||||
"access_token": "access-2",
|
||||
"expires_in": 3600,
|
||||
}))
|
||||
.into_response(),
|
||||
_ => (
|
||||
StatusCode::BAD_REQUEST,
|
||||
Json(json!({ "error": "unsupported_grant_type" })),
|
||||
)
|
||||
.into_response(),
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn device_flow_polls_pending_then_issues_a_token() {
|
||||
let issuer = start_idp();
|
||||
let flow = DeviceFlow::discover(&issuer, "heph-cli").unwrap();
|
||||
|
||||
let auth = flow.start("openid").unwrap();
|
||||
assert_eq!(auth.user_code, "WDJB-MJHT");
|
||||
assert!(auth.verification_uri.contains("/activate"));
|
||||
|
||||
// No real waiting — the injected sleep is a no-op.
|
||||
let token = flow.poll(&auth, |_| {}).unwrap();
|
||||
assert_eq!(token.access_token, "access-1");
|
||||
assert_eq!(token.refresh_token.as_deref(), Some("refresh-1"));
|
||||
assert!(token.expires_at > 0);
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn refresh_keeps_the_old_refresh_token_when_omitted() {
|
||||
let issuer = start_idp();
|
||||
let flow = DeviceFlow::discover(&issuer, "heph-cli").unwrap();
|
||||
let refreshed = flow.refresh("refresh-1").unwrap();
|
||||
assert_eq!(refreshed.access_token, "access-2");
|
||||
// The provider omitted a new refresh token, so the old one is retained.
|
||||
assert_eq!(refreshed.refresh_token.as_deref(), Some("refresh-1"));
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn memory_token_store_round_trips_and_reports_expiry() {
|
||||
let store = MemoryTokenStore::default();
|
||||
assert!(store.load().is_none());
|
||||
|
||||
let token = StoredToken {
|
||||
access_token: "a".into(),
|
||||
refresh_token: Some("r".into()),
|
||||
expires_at: 10_000,
|
||||
};
|
||||
store.save(&token).unwrap();
|
||||
assert_eq!(store.load(), Some(token.clone()));
|
||||
|
||||
assert!(!token.is_expired(5_000), "still valid well before expiry");
|
||||
assert!(
|
||||
token.is_expired(10_000),
|
||||
"expired at the boundary (with skew)"
|
||||
);
|
||||
|
||||
store.clear().unwrap();
|
||||
assert!(store.load().is_none());
|
||||
}
|
||||
|
|
@ -70,9 +70,13 @@ async fn a_node_propagates_a_to_hub_to_b() {
|
|||
};
|
||||
|
||||
// A pushes to the hub; B pulls from it.
|
||||
let up = sync::sync_once(a.clone(), &hub_url, &http).await.unwrap();
|
||||
let up = sync::sync_once(a.clone(), &hub_url, &http, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(up.pushed > 0, "A pushed nothing");
|
||||
let down = sync::sync_once(b.clone(), &hub_url, &http).await.unwrap();
|
||||
let down = sync::sync_once(b.clone(), &hub_url, &http, None)
|
||||
.await
|
||||
.unwrap();
|
||||
assert!(down.applied > 0, "B applied nothing");
|
||||
|
||||
let on_b = b.lock().unwrap().get_node(&id).unwrap().expect("reached B");
|
||||
|
|
@ -98,8 +102,12 @@ async fn divergent_scalar_edits_converge_through_the_hub_with_a_conflict() {
|
|||
.unwrap()
|
||||
.node_id
|
||||
};
|
||||
sync::sync_once(a.clone(), &hub_url, &http).await.unwrap();
|
||||
sync::sync_once(b.clone(), &hub_url, &http).await.unwrap();
|
||||
sync::sync_once(a.clone(), &hub_url, &http, None)
|
||||
.await
|
||||
.unwrap();
|
||||
sync::sync_once(b.clone(), &hub_url, &http, None)
|
||||
.await
|
||||
.unwrap();
|
||||
|
||||
// Divergent offline edits on conflict-tracked fields; B's is later (higher
|
||||
// HLC) so its whole scalar snapshot wins.
|
||||
|
|
@ -116,8 +124,12 @@ async fn divergent_scalar_edits_converge_through_the_hub_with_a_conflict() {
|
|||
|
||||
// A few exchanges in each direction settle it.
|
||||
for _ in 0..2 {
|
||||
sync::sync_once(a.clone(), &hub_url, &http).await.unwrap();
|
||||
sync::sync_once(b.clone(), &hub_url, &http).await.unwrap();
|
||||
sync::sync_once(a.clone(), &hub_url, &http, None)
|
||||
.await
|
||||
.unwrap();
|
||||
sync::sync_once(b.clone(), &hub_url, &http, None)
|
||||
.await
|
||||
.unwrap();
|
||||
}
|
||||
|
||||
let ta = a.lock().unwrap().get_task(&task_id).unwrap().unwrap();
|
||||
|
|
|
|||
|
|
@ -11,5 +11,6 @@ Begin the v1 prototype (Phase 1, tech-spec §11.1), built in TDD slices:
|
|||
- Body text CRDT (§5, §12, slice 8d): node bodies now merge through the `yrs` text CRDT (`body_crdt`) instead of last-writer-wins — whole-buffer writes are diffed into the doc and the yrs delta rides the op, so concurrent edits to different regions both survive and never enqueue a conflict.
|
||||
- Network sync over HTTP (§6.1, §12, slice 9a): `hephd --mode server` exposes a sync hub (`POST /sync/push`, `GET /sync/pull?after=<hlc>`, axum) over the same store; `hephd --mode local --hub-url <url>` becomes a spoke that background-syncs its op-log with that hub (and on demand via the `sync.now`/`sync.status` RPC). Exchange is incremental by HLC cursor (`sync_state`) and idempotent. The merge engine is `heph-core`'s, unchanged. Unauthenticated/single-owner for now (auth lands with OIDC). `conflicts.list`/`conflicts.resolve` are now reachable over the daemon socket.
|
||||
- Client mode (§3.1, slice 9b): `hephd --mode client --server-url <url>` runs with no local replica, proxying every store call to a server's `POST /rpc` endpoint (the full daemon API over HTTP). The daemon is now backend-agnostic (`local`/`server` front a `LocalStore`, `client` a `RemoteStore`), so surfaces see the same unix-socket API in every mode.
|
||||
- Hub authentication (§13, slice 10a): the sync hub now verifies an OIDC bearer token on `/sync/*` and `/rpc` — RS256-pinned JWT validation with exact issuer/audience, expiry, and a required subject; JWKS discovered and cached, refetched on key rotation (`jsonwebtoken`). Enabled with `hephd --mode server --oidc-issuer <url> --oidc-audience <client-id>` (open when unset, for local dev). A single-tenant owner gate binds the hub to the first authenticated identity and rejects any other. Verification sits behind a `TokenVerifier` trait, so it's tested entirely offline (stub middleware + an adversarial battery against an in-process mock IdP). Client-side login (device-code flow) lands next.
|
||||
- Hub authentication (§13, slice 10a): the sync hub now verifies an OIDC bearer token on `/sync/*` and `/rpc` — RS256-pinned JWT validation with exact issuer/audience, expiry, and a required subject; JWKS discovered and cached, refetched on key rotation (`jsonwebtoken`). Enabled with `hephd --mode server --oidc-issuer <url> --oidc-audience <client-id>` (open when unset, for local dev). A single-tenant owner gate binds the hub to the first authenticated identity and rejects any other. Verification sits behind a `TokenVerifier` trait, so it's tested entirely offline (stub middleware + an adversarial battery against an in-process mock IdP).
|
||||
- Client authentication (§13, slice 10b): `heph auth login --hub-url <url> --issuer <url> --client-id <id>` runs the OAuth 2.0 device-code flow and caches the token in the OS keyring; spokes and `client` mode attach it to hub requests, refreshing on expiry (`--oidc-issuer`/`--oidc-client-id`). Offline-tested against a mock OAuth server and a full spoke-to-authenticated-hub loop. (Auth/proxy HTTP uses the runtime-free `ureq`, since `reqwest::blocking` is unsafe inside the async daemon.)
|
||||
- CI runs the Rust suite (fmt/clippy/test) via the project build hook.
|
||||
|
|
|
|||
|
|
@ -327,7 +327,7 @@ See [[design]] §5–§7 for the constraints later phases impose on present choi
|
|||
|
||||
## 14. Implementation status (Phase 1 tracker)
|
||||
|
||||
> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **108 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet).
|
||||
> Cross-session resume tracker for the Phase 1 C1 (branch `feature/v1-prototype`, PR #1). Updated 2026-06-01 — **112 tests green** (`cargo test --all`), `clippy -D warnings` + `fmt` + `prek` clean. Workspace: `crates/heph-core`, `crates/hephd`, `crates/heph` (no `heph.nvim/` yet).
|
||||
|
||||
**Done**
|
||||
|
||||
|
|
@ -342,12 +342,13 @@ See [[design]] §5–§7 for the constraints later phases impose on present choi
|
|||
- ✅ **Network sync (§6.1, §12, slice 9a):** **transport ratified = `axum` HTTP/JSON.** The hub (`server` mode) exposes `POST /sync/push` + `GET /sync/pull?after=<hlc>` over the same store; a spoke (`local` + `hub_url`) runs `sync::sync_once` (pull→merge, then push) and background-syncs on a 30s interval. Incremental by HLC cursor (`sync_state`/`SyncCursors`); idempotent re-push is a no-op. Two spokes converge through a real-HTTP hub (incl. scalar conflict) in `tests/sync_http.rs`. **Unauthenticated for now, single-owner** (auth + per-user scoping is slice 10).
|
||||
- ✅ **`client` mode + `RemoteStore` (§3.1, slice 9b):** a no-replica backend that proxies every `Store` call to a `server`'s `POST /rpc` (the full `dispatch`, over HTTP) via a **blocking** reqwest client — the online-only escape hatch. `Daemon` is now generic over `dyn Store + Send`, so the same unix-socket surface fronts either a `LocalStore` or a `RemoteStore`. Sync primitives are stubbed (a client has no op-log). Proven in `tests/client_mode.rs`. `dispatch` gained `task.get` + `links.add`.
|
||||
- ✅ **Hub auth — verification side (§13, slice 10a):** the hub validates an **OIDC bearer token** (`jsonwebtoken`, RS256-pinned, exact iss+aud, exp/nbf, required `sub`; JWKS discovered + cached, refetched on unknown `kid`) on `/sync/*` + `/rpc`. A [`TokenVerifier`] trait seam keeps it mockable; **single-tenant** owner gate (`authorize_owner_sub`: claim-on-first, then require-match → 403 for any other identity). `--oidc-issuer`/`--oidc-audience` enable it (open when unset, for local dev). Tested fully offline: stub-verifier middleware tests + an adversarial battery against an in-process mock IdP (expired/wrong-iss/wrong-aud/unknown-kid/tampered/alg-confusion/missing-sub all rejected).
|
||||
- ✅ **CLI (§1):** `heph` next/task/doc/get/export/search/journal.
|
||||
- ✅ **Auth — client side (§13, slice 10b):** OAuth2 **device-code flow** (`hephd::oauth::DeviceFlow`: discover → start → poll handling `authorization_pending`/`slow_down`, + refresh). `TokenStore` (OS keyring via `keyring`, in-memory for tests); `current_bearer` refreshes on expiry. `heph auth login` runs the flow + caches the token; spokes (`sync_once`) and `client` mode (`RemoteStore`) attach the bearer, refreshing as needed (`--oidc-issuer`/`--oidc-client-id`). **All auth/proxy HTTP uses `ureq`** (runtime-free blocking) — `reqwest::blocking` panics inside the daemon's `spawn_blocking`; async `reqwest` remains only for `sync_once`. Tested offline against a mock OAuth server (device flow, refresh, store) + a full spoke⇄authed-hub loop.
|
||||
- ✅ **CLI (§1):** `heph` next/task/doc/get/export/search/journal/**auth login·logout**.
|
||||
- ✅ **CI (§9):** `.forgejo/scripts/build` runs fmt/clippy/test (self-bootstrapping rustup).
|
||||
|
||||
**Not yet done (resume order)**
|
||||
|
||||
1. ⏳ **Auth — client side (§13, slice 10b):** OAuth2 **device-code flow** (`heph auth login`), token cache in the OS keyring + auto-refresh, the spoke attaching its bearer to `sync_once`/RPC, and local→authed **adoption** (owner-embedded deterministic-id rewrite). Multi-tenant hub (owner-per-token storage) remains a future extension beyond the current single-tenant gate.
|
||||
1. ⏳ **Adoption refinement + multi-tenant (§13):** local→authed **adoption** currently rewrites `owner_id` (`adopt_owner`) but not yet the owner-embedded deterministic ids (journal/tag) + their links; and the hub is single-tenant (one owner per store) — owner-per-token storage is a future extension.
|
||||
2. ⏳ **`heph.nvim` (§8):** obsidian.nvim parity + task views; headless-nvim e2e (needs `neovim` + `plenary.nvim` on the CI runner).
|
||||
|
||||
## Related
|
||||
|
|
|
|||
Loading…
Add table
Add a link
Reference in a new issue