//! SQLite-backed [`Store`] implementation. //! //! `LocalStore` opens a SQLite file directly. The exclusive-lock handoff of //! tech-spec §3.1 is layered on by `hephd` when it owns the file; the store //! itself stays a thin, synchronous SQLite wrapper so it is trivially testable //! against an in-memory database. //! //! The query logic lives in focused submodules ([`nodes`], [`tasks`], [`links`]) //! as free functions over a `&Connection`; the [`Store`] impl here is a thin //! delegating layer so a transaction can span several of them. mod apply; mod exporter; mod links; mod log; mod migrations; mod nodes; mod ops; mod syncstate; mod tasks; pub use migrations::latest_version; use std::path::Path; use rusqlite::{Connection, OptionalExtension}; use ulid::Ulid; use crate::clock::Clock; use crate::error::{Error, Result}; use crate::hlc::Hlc; use crate::model::{ Attention, Conflict, Health, Link, LinkType, NewNode, NewTask, Node, SyncCursors, Task, TaskState, }; use crate::oplog::Op; use crate::ranking::RankedTask; use crate::store::Store; /// A SQLite file (or in-memory database) opened directly as a backend. pub struct LocalStore { conn: Connection, owner_id: String, clock: Box, } impl LocalStore { /// Open (creating if needed) a SQLite database at `path`. pub fn open(path: impl AsRef, clock: Box) -> Result { let conn = Connection::open(path)?; Self::init(conn, clock) } /// Open a throwaway in-memory database — for tests. pub fn open_in_memory(clock: Box) -> Result { let conn = Connection::open_in_memory()?; Self::init(conn, clock) } fn init(conn: Connection, clock: Box) -> Result { conn.execute_batch("PRAGMA foreign_keys = ON;")?; migrations::migrate(&conn)?; ensure_origin(&conn)?; let owner_id = ensure_local_user(&conn, clock.as_ref())?; Ok(LocalStore { conn, owner_id, clock, }) } /// The id of the user whose data this store reads/writes. /// /// For a local-only instance this is the single generated local user /// (`oidc_sub = NULL`, tech-spec §13). pub fn owner_id(&self) -> &str { &self.owner_id } /// This device's stable `origin` id (HLC/sync identity). pub fn origin(&self) -> Result { meta_get(&self.conn, "origin")? .ok_or_else(|| Error::Integrity("missing device origin".into())) } } /// A fresh ULID, as a string id. pub(crate) fn new_id() -> String { Ulid::new().to_string() } /// Read a `meta` value. pub(super) fn meta_get(conn: &Connection, key: &str) -> Result> { Ok(conn .query_row("SELECT value FROM meta WHERE key = ?1", [key], |r| r.get(0)) .optional()?) } /// Upsert a `meta` value. pub(super) fn meta_set(conn: &Connection, key: &str, value: &str) -> Result<()> { conn.execute( "INSERT INTO meta (key, value) VALUES (?1, ?2) ON CONFLICT(key) DO UPDATE SET value = excluded.value", (key, value), )?; Ok(()) } /// Advance the persisted clock past a remote `hlc` (encoded), so future local /// stamps exceed everything we've seen (tech-spec §12). Encoded HLCs sort by /// causal order, so a string compare suffices. pub(super) fn absorb_remote_hlc(conn: &Connection, remote: &str) -> Result<()> { let bump = match meta_get(conn, "last_hlc")? { Some(last) => remote > last.as_str(), None => true, }; if bump { meta_set(conn, "last_hlc", remote)?; } Ok(()) } /// Ensure this store has a stable device `origin` id. fn ensure_origin(conn: &Connection) -> Result<()> { if meta_get(conn, "origin")?.is_none() { meta_set(conn, "origin", &new_id())?; } Ok(()) } /// Generate the next local [`Hlc`] (encoded), advancing the persisted clock in /// `meta`. Strictly greater than every previously-generated stamp; the /// read-modify-write runs inside the caller's connection/transaction and the /// store is single-writer, so there is no race. pub(super) fn next_hlc(conn: &Connection, now_ms: i64) -> Result { let origin = meta_get(conn, "origin")? .ok_or_else(|| Error::Integrity("missing device origin".into()))?; let last = match meta_get(conn, "last_hlc")? { Some(s) => Hlc::parse(&s)?, None => Hlc::zero(&origin), }; let physical = now_ms.max(last.physical); let counter = if physical == last.physical { last.counter + 1 } else { 0 }; let encoded = Hlc { physical, counter, origin, } .encode(); meta_set(conn, "last_hlc", &encoded)?; Ok(encoded) } /// Ensure a single local user exists, returning its id. fn ensure_local_user(conn: &Connection, clock: &dyn Clock) -> Result { if let Some(id) = conn .query_row( "SELECT id FROM users ORDER BY created_at LIMIT 1", [], |r| r.get::<_, String>(0), ) .optional()? { return Ok(id); } let id = new_id(); conn.execute( "INSERT INTO users (id, oidc_sub, name, created_at) VALUES (?1, NULL, 'local', ?2)", (&id, clock.now_ms()), )?; Ok(id) } impl Store for LocalStore { fn create_node(&mut self, input: NewNode) -> Result { let now = self.clock.now_ms(); nodes::create(&self.conn, &self.owner_id, now, input) } fn get_node(&self, id: &str) -> Result> { nodes::get(&self.conn, id) } fn update_node( &mut self, id: &str, title: Option, body: Option, ) -> Result { let now = self.clock.now_ms(); nodes::update(&mut self.conn, &self.owner_id, now, id, title, body) } fn tombstone_node(&mut self, id: &str) -> Result<()> { let now = self.clock.now_ms(); nodes::tombstone(&self.conn, &self.owner_id, now, id) } fn resolve_node(&self, title: &str) -> Result> { match links::resolve_id(&self.conn, &self.owner_id, title)? { Some(id) => nodes::get(&self.conn, &id), None => Ok(None), } } fn create_task(&mut self, input: NewTask) -> Result { let now = self.clock.now_ms(); tasks::create(&mut self.conn, &self.owner_id, now, input) } fn get_task(&self, node_id: &str) -> Result> { tasks::get(&self.conn, node_id) } fn set_task_state(&mut self, node_id: &str, state: TaskState) -> Result { let now = self.clock.now_ms(); tasks::set_state(&mut self.conn, &self.owner_id, now, node_id, state) } fn skip_recurrence(&mut self, node_id: &str) -> Result { let now = self.clock.now_ms(); tasks::skip(&self.conn, &self.owner_id, now, node_id) } fn set_task_attention(&mut self, node_id: &str, attention: Attention) -> Result { let now = self.clock.now_ms(); tasks::set_attention(&self.conn, &self.owner_id, now, node_id, attention) } fn next(&self, scope: Option<&str>, limit: usize) -> Result> { let now = self.clock.now_ms(); tasks::next(&self.conn, &self.owner_id, now, scope, limit) } fn list( &self, scope: Option<&str>, attention: Option, include_blue: bool, ) -> Result> { tasks::list(&self.conn, &self.owner_id, scope, attention, include_blue) } fn health(&self) -> Result { tasks::health(&self.conn, &self.owner_id) } fn search(&self, query: &str) -> Result> { nodes::search(&self.conn, &self.owner_id, query) } fn journal_open_or_create(&mut self, date: &str) -> Result { let now = self.clock.now_ms(); nodes::open_or_create_journal(&self.conn, &self.owner_id, now, date) } fn add_link(&mut self, src_id: &str, dst_id: &str, link_type: LinkType) -> Result { let now = self.clock.now_ms(); links::add(&self.conn, &self.owner_id, now, src_id, dst_id, link_type) } fn outgoing_links(&self, id: &str) -> Result> { links::outgoing(&self.conn, id) } fn backlinks(&self, id: &str) -> Result> { links::backlinks(&self.conn, id) } fn log_append(&mut self, task_id: &str, text: &str) -> Result<()> { let now = self.clock.now_ms(); let tx = self.conn.transaction()?; log::append(&tx, &self.owner_id, now, task_id, text)?; tx.commit()?; Ok(()) } fn log_tail(&self, task_id: &str, n: usize) -> Result> { log::tail(&self.conn, task_id, n) } fn export(&self, dir: &std::path::Path) -> Result { exporter::export(&self.conn, &self.owner_id, dir) } fn ops_since(&self, after: Option<&str>) -> Result> { ops::since(&self.conn, &self.owner_id, after) } fn apply_op(&mut self, op: &Op) -> Result { apply::apply(&mut self.conn, op) } fn adopt_owner(&mut self, canonical: &str) -> Result<()> { if self.owner_id == canonical { return Ok(()); } let old = self.owner_id.clone(); let tx = self.conn.transaction()?; tx.execute( "INSERT OR IGNORE INTO users (id, oidc_sub, name, created_at) VALUES (?1, NULL, 'user', 0)", [canonical], )?; tx.execute( "UPDATE nodes SET owner_id = ?1 WHERE owner_id = ?2", (canonical, &old), )?; tx.execute( "UPDATE oplog SET owner_id = ?1 WHERE owner_id = ?2", (canonical, &old), )?; tx.execute( "UPDATE conflicts SET owner_id = ?1 WHERE owner_id = ?2", (canonical, &old), )?; tx.execute("DELETE FROM users WHERE id = ?1", [&old])?; tx.commit()?; self.owner_id = canonical.to_string(); Ok(()) } fn sync_state(&self, peer: &str) -> Result { syncstate::get(&self.conn, peer) } fn record_sync( &mut self, peer: &str, pushed: Option<&str>, pulled: Option<&str>, ) -> Result<()> { let now = self.clock.now_ms(); syncstate::record(&self.conn, peer, pushed, pulled, now) } fn authorize_owner_sub(&mut self, sub: &str) -> Result { // The owner's bound identity (NULL until first authenticated sync). let current: Option = self .conn .query_row( "SELECT oidc_sub FROM users WHERE id = ?1", [&self.owner_id], |r| r.get(0), ) .optional()? .flatten(); match current { None => { self.conn.execute( "UPDATE users SET oidc_sub = ?1 WHERE id = ?2", (sub, &self.owner_id), )?; Ok(true) } Some(existing) => Ok(existing == sub), } } fn conflicts_list(&self) -> Result> { apply::list_conflicts(&self.conn, &self.owner_id) } fn conflicts_resolve(&mut self, id: &str, choice: &str) -> Result<()> { apply::resolve_conflict(&self.conn, id, choice) } } #[cfg(test)] mod tests { use super::*; use crate::clock::FixedClock; fn store_at(now_ms: i64) -> LocalStore { LocalStore::open_in_memory(Box::new(FixedClock(now_ms))).expect("open in-memory store") } #[test] fn migrations_bring_schema_to_latest() { let store = store_at(0); let v: i64 = store .conn .query_row("PRAGMA user_version", [], |r| r.get(0)) .unwrap(); assert_eq!(v, latest_version()); } #[test] fn resolve_node_matches_exact_title_not_fuzzy() { use crate::model::NewNode; let mut store = store_at(1); let roof = store.create_node(NewNode::doc("Roof", "shingles")).unwrap(); // A fuzzy/FTS match would surface this for "Roof"; exact resolve must not. store .create_node(NewNode::doc("Roofing options", "estimates")) .unwrap(); let got = store.resolve_node("Roof").unwrap().expect("exact title"); assert_eq!(got.id, roof.id); // A prefix is not an exact title — resolves to nothing, never the // fuzzy neighbour. assert!(store.resolve_node("Roo").unwrap().is_none()); // Tombstoned nodes are excluded. store.tombstone_node(&roof.id).unwrap(); assert!(store.resolve_node("Roof").unwrap().is_none()); } #[test] fn opening_twice_is_idempotent_for_the_local_user() { let conn = Connection::open_in_memory().unwrap(); conn.execute_batch("PRAGMA foreign_keys = ON;").unwrap(); migrations::migrate(&conn).unwrap(); let a = ensure_local_user(&conn, &FixedClock(1)).unwrap(); let b = ensure_local_user(&conn, &FixedClock(2)).unwrap(); assert_eq!(a, b); } }