generated from eblume/project-template
All checks were successful
Build / validate (pull_request) Successful in 7m1s
Turn the one-off Todoist importer into a documented, repeatable mise task. Self-contained (spawns its own hephd), dry-run by default into a throwaway store, `-- --commit` writes into the real store after backing it up. Auth via TODOIST_TOKEN or TODOIST_OP_REF (op://). Mapping per design §6.2.1: project hierarchy (+ Inbox→unfiled), priority→attention by meaning, due→do-date, NL recurrence, descriptions + sub-tasks→canonical-context doc. - mise-tasks/import-todoist - docs/how-to/import-todoist.md (+ how-to index, reference mise-tasks table) - changelog fragment Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
310 lines
11 KiB
Text
Executable file
310 lines
11 KiB
Text
Executable file
#!/usr/bin/env -S uv run --script
|
|
# /// script
|
|
# requires-python = ">=3.12"
|
|
# dependencies = []
|
|
# ///
|
|
#MISE description="Import Todoist projects + active tasks into heph (dry-run by default; --commit to write)"
|
|
#USAGE flag "--commit" help="Write into your real heph store (default: a throwaway dry-run)"
|
|
#USAGE flag "--socket <socket>" help="Import via an already-running daemon instead of spawning one"
|
|
"""Import Todoist projects + active tasks into a heph store via the `heph` CLI.
|
|
|
|
A one-way seeding tool (Todoist -> heph); it does not sync back. By default it
|
|
runs a **dry-run** into a throwaway store and prints a summary, so you can see
|
|
exactly what would happen. Pass `--commit` to write into your real store (it
|
|
backs the DB up first).
|
|
|
|
Mapping (see docs/explanation/design.md §6.2.1):
|
|
- project hierarchy -> `heph project add --parent`; Todoist Inbox -> unfiled
|
|
- priority -> attention by MEANING of the owner's convention:
|
|
p1 -> red, p2 -> orange, p4 (default) -> white, p3 (backlog) -> blue
|
|
- due.date -> --do-date; recurring due.string -> --recur (NL parser; a form
|
|
it can't parse imports without recurrence and is reported)
|
|
- description + sub-tasks -> the task's canonical-context doc
|
|
(sub-tasks become `- [ ]` context items, Fork A)
|
|
- labels/sections -> skipped (labels are negligible in practice)
|
|
|
|
Auth: set TODOIST_TOKEN, or TODOIST_OP_REF to a 1Password `op://` reference
|
|
(read via the `op` CLI). Requires installed `heph`/`hephd` on PATH.
|
|
|
|
Usage:
|
|
mise run import-todoist # dry-run, prints a summary
|
|
mise run import-todoist -- --commit # write into your real store (backs up)
|
|
"""
|
|
|
|
import argparse
|
|
import collections
|
|
import contextlib
|
|
import datetime
|
|
import json
|
|
import os
|
|
import shutil
|
|
import subprocess
|
|
import sys
|
|
import tempfile
|
|
import time
|
|
import urllib.parse
|
|
import urllib.request
|
|
from pathlib import Path
|
|
|
|
BASE = "https://api.todoist.com/api/v1"
|
|
# Priority (API: 4=p1 urgent, 3=p2, 2=p3 backlog, 1=p4 default) -> attention,
|
|
# preserving the owner's convention (p3 is backlog, p4 is the normal default).
|
|
ATT = {4: "red", 3: "orange", 1: "white", 2: "blue"}
|
|
|
|
|
|
def fail(msg):
|
|
print(f"error: {msg}", file=sys.stderr)
|
|
sys.exit(1)
|
|
|
|
|
|
def get_token():
|
|
t = os.environ.get("TODOIST_TOKEN", "").strip()
|
|
if t:
|
|
return t
|
|
ref = os.environ.get("TODOIST_OP_REF", "").strip()
|
|
if ref:
|
|
r = subprocess.run(["op", "read", ref], capture_output=True, text=True)
|
|
if r.returncode != 0:
|
|
fail(f"`op read {ref}` failed: {r.stderr.strip()}")
|
|
return r.stdout.strip()
|
|
fail("set TODOIST_TOKEN, or TODOIST_OP_REF to a 1Password op:// reference")
|
|
|
|
|
|
def default_db():
|
|
if os.environ.get("HEPH_DB"):
|
|
return Path(os.environ["HEPH_DB"])
|
|
base = os.environ.get("XDG_DATA_HOME") or os.path.expanduser("~/.local/share")
|
|
return Path(base) / "heph" / "heph.db"
|
|
|
|
|
|
def api(tok, path, **params):
|
|
items, cursor = [], None
|
|
while True:
|
|
p = dict(params)
|
|
if cursor:
|
|
p["cursor"] = cursor
|
|
url = f"{BASE}/{path}" + ("?" + urllib.parse.urlencode(p) if p else "")
|
|
req = urllib.request.Request(url, headers={"Authorization": f"Bearer {tok}"})
|
|
data = json.load(urllib.request.urlopen(req, timeout=30))
|
|
if isinstance(data, list):
|
|
items += data
|
|
break
|
|
items += data.get("results", [])
|
|
cursor = data.get("next_cursor")
|
|
if not cursor:
|
|
break
|
|
return items
|
|
|
|
|
|
@contextlib.contextmanager
|
|
def spawn_daemon(db, socket):
|
|
if shutil.which("hephd") is None:
|
|
fail("`hephd` not found on PATH (install it: see docs/how-to/install-heph.md)")
|
|
proc = subprocess.Popen(
|
|
["hephd", "--mode", "local", "--db", str(db), "--socket", str(socket)],
|
|
stdout=subprocess.PIPE,
|
|
stderr=subprocess.STDOUT,
|
|
text=True,
|
|
)
|
|
try:
|
|
for _ in range(200):
|
|
if Path(socket).exists():
|
|
break
|
|
if proc.poll() is not None:
|
|
out = proc.stdout.read() if proc.stdout else ""
|
|
fail(
|
|
f"hephd failed to start on {db} — is another daemon (or an open "
|
|
f"Neovim) using it? Close it and retry, or pass --socket.\n{out}"
|
|
)
|
|
time.sleep(0.05)
|
|
else:
|
|
fail("hephd did not create its socket in time")
|
|
yield
|
|
finally:
|
|
proc.terminate()
|
|
try:
|
|
proc.wait(timeout=5)
|
|
except subprocess.TimeoutExpired:
|
|
proc.kill()
|
|
|
|
|
|
def heph(socket, *args, stdin=None):
|
|
r = subprocess.run(
|
|
["heph", "--socket", str(socket), *args],
|
|
capture_output=True,
|
|
text=True,
|
|
input=stdin,
|
|
)
|
|
return r.stdout, r.returncode == 0, r.stderr.strip()
|
|
|
|
|
|
def is_inbox(p):
|
|
return p.get("is_inbox_project") or p.get("name", "").lower() == "inbox"
|
|
|
|
|
|
def proj_depth(p, byid):
|
|
d, cur = 0, p
|
|
while cur.get("parent_id") and byid.get(cur["parent_id"]):
|
|
d += 1
|
|
cur = byid[cur["parent_id"]]
|
|
return d
|
|
|
|
|
|
def checklist(parent_id, children, out, level=0):
|
|
for c in sorted(children.get(parent_id, []), key=lambda t: t.get("child_order", 0)):
|
|
out.append(" " * level + f"- [ ] {c['content']}")
|
|
checklist(c["id"], children, out, level + 1)
|
|
|
|
|
|
def run_import(socket, tok):
|
|
projects = api(tok, "projects")
|
|
byid = {p["id"]: p for p in projects}
|
|
inbox_ids = {p["id"] for p in projects if is_inbox(p)}
|
|
pname = {p["id"]: p["name"] for p in projects if p["id"] not in inbox_ids}
|
|
|
|
proj_made = 0
|
|
for p in sorted(projects, key=lambda p: (proj_depth(p, byid), p["name"])):
|
|
if is_inbox(p):
|
|
continue
|
|
args = ["project", "add", p["name"]]
|
|
par = byid.get(p.get("parent_id"))
|
|
if par and not is_inbox(par):
|
|
args += ["--parent", par["name"]]
|
|
_, ok, err = heph(socket, *args)
|
|
if ok:
|
|
proj_made += 1
|
|
else:
|
|
print(f" ! project {p['name']!r}: {err}")
|
|
|
|
tasks = api(tok, "tasks")
|
|
children = collections.defaultdict(list)
|
|
for t in tasks:
|
|
if t.get("parent_id"):
|
|
children[t["parent_id"]].append(t)
|
|
toplevel = [t for t in tasks if not t.get("parent_id")]
|
|
|
|
made = subtasks = described = recurred = 0
|
|
recur_fallback, errors = [], []
|
|
|
|
for t in toplevel:
|
|
args = ["task", t["content"], "-a", ATT.get(t["priority"], "white")]
|
|
if pname.get(t["project_id"]):
|
|
args += ["--project", pname[t["project_id"]]]
|
|
due = t.get("due") or {}
|
|
if due.get("date"):
|
|
args += ["--do-date", due["date"][:10]]
|
|
rec = due["string"] if due.get("is_recurring") and due.get("string") else None
|
|
out, ok, err = heph(socket, *(args + (["--recur", rec] if rec else [])))
|
|
if not ok and rec: # retry without the unparseable recurrence
|
|
recur_fallback.append((t["content"], rec))
|
|
out, ok, err = heph(socket, *args)
|
|
elif ok and rec:
|
|
recurred += 1
|
|
if not ok:
|
|
errors.append((t["content"], err))
|
|
continue
|
|
made += 1
|
|
node_id = out.split()[2]
|
|
|
|
body = []
|
|
if t.get("description"):
|
|
body.append(t["description"])
|
|
kids = []
|
|
checklist(t["id"], children, kids)
|
|
if kids:
|
|
body.append("\n".join(kids))
|
|
subtasks += len(kids)
|
|
if body:
|
|
links, _, _ = heph(socket, "links", node_id)
|
|
ctx = next(
|
|
(
|
|
ln.split("-> ")[-1].strip()
|
|
for ln in links.splitlines()
|
|
if "[canonical-context]" in ln
|
|
),
|
|
None,
|
|
)
|
|
if ctx:
|
|
_, uok, uerr = heph(
|
|
socket, "node", "update", ctx, "--body", "-", stdin="\n\n".join(body)
|
|
)
|
|
if uok:
|
|
described += 1
|
|
else:
|
|
errors.append((t["content"] + " (desc)", uerr))
|
|
|
|
return {
|
|
"projects": (proj_made, len(projects) - len(inbox_ids)),
|
|
"tasks": (made, len(toplevel)),
|
|
"subtasks": subtasks,
|
|
"described": described,
|
|
"recurred": recurred,
|
|
"recur_fallback": recur_fallback,
|
|
"errors": errors,
|
|
}
|
|
|
|
|
|
def report(s):
|
|
print("\n=== IMPORT SUMMARY ===")
|
|
print(f"projects created : {s['projects'][0]}/{s['projects'][1]}")
|
|
print(f"tasks created : {s['tasks'][0]}/{s['tasks'][1]} top-level")
|
|
print(f"sub-tasks : {s['subtasks']} attached as context items")
|
|
print(f"descriptions : {s['described']} context docs written")
|
|
print(
|
|
f"recurrences : {s['recurred']} applied, "
|
|
f"{len(s['recur_fallback'])} fell back"
|
|
)
|
|
if s["recur_fallback"]:
|
|
print("\n-- recurrence strings heph couldn't parse (imported without recurrence):")
|
|
for c, r in s["recur_fallback"]:
|
|
print(f" {r!r:30} — {c[:50]}")
|
|
if s["errors"]:
|
|
print(f"\n-- {len(s['errors'])} errors:")
|
|
for c, e in s["errors"][:20]:
|
|
print(f" {c[:50]}: {e}")
|
|
|
|
|
|
def main():
|
|
ap = argparse.ArgumentParser(add_help=False)
|
|
ap.add_argument("--commit", action="store_true")
|
|
ap.add_argument("--socket", default=None)
|
|
args = ap.parse_args()
|
|
|
|
if shutil.which("heph") is None:
|
|
fail("`heph` not found on PATH (install it: see docs/how-to/install-heph.md)")
|
|
tok = get_token()
|
|
|
|
if args.socket:
|
|
target = "an existing daemon" if args.commit else "an existing daemon (dry-run flag ignored)"
|
|
print(f"Importing into {target} at {args.socket} …")
|
|
report(run_import(args.socket, tok))
|
|
return
|
|
|
|
if args.commit:
|
|
db = default_db()
|
|
if not db.parent.exists():
|
|
db.parent.mkdir(parents=True, exist_ok=True)
|
|
if db.exists():
|
|
stamp = datetime.datetime.now().strftime("%Y%m%d-%H%M%S")
|
|
bak = db.with_name(db.name + f".bak-{stamp}")
|
|
shutil.copy2(db, bak)
|
|
print(f"backed up {db} → {bak}")
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
sock = Path(tmp) / "import.sock"
|
|
print(f"COMMIT: importing into your real store {db} …")
|
|
with spawn_daemon(db, sock):
|
|
report(run_import(sock, tok))
|
|
return
|
|
|
|
# Default: a throwaway dry-run.
|
|
with tempfile.TemporaryDirectory() as tmp:
|
|
db = Path(tmp) / "heph.db"
|
|
sock = Path(tmp) / "d.sock"
|
|
print(f"DRY RUN: importing into a throwaway store ({db}); nothing real is touched.")
|
|
print(" re-run with `-- --commit` to write into your real store.")
|
|
with spawn_daemon(db, sock):
|
|
report(run_import(sock, tok))
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|