diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index 6a7c74c..775f68b 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -1,99 +1,67 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Modul: scripts/import_markdown.py -Version: 3.9.1 -Datum: 2025-11-08 +Script: scripts/import_markdown.py +Version: 3.9.2 +Date: 2025-11-08 -Zweck ------ -Idempotenter Importer für Obsidian-Markdown-Dateien (Vault) nach Qdrant: - • liest Markdown (fehlertolerant), validiert Frontmatter leichtgewichtig, - • chunked den Body, erzeugt Edges, - • Upsert von Notes/Chunks/Edges, - • Änderungen über Signaturen (Hash-Modi) zuverlässig ermitteln, - • optional Type-Registry (config/types.yaml) für typabhängige Profile (no-op, wenn Datei fehlt). - -Änderung (3.9.1) ----------------- -• Entfernt fehlerhaften Import von ensure_collections_for_prefix aus app.core.qdrant_points. - Collection-Anlage erfolgt allein via app.core.qdrant.ensure_collections(...). - -Wichtige Features ------------------ -• Hash-/Änderungslogik (ENV, abwärtskompatibel): - MINDNET_HASH_COMPARE = Body|Full|Frontmatter|Body+Frontmatter (Default: Body) - MINDNET_HASH_SOURCE = parsed|raw (Default: parsed) - MINDNET_HASH_NORMALIZE = canonical|none (Default: canonical) - Persistiert: - hash_signature, hash_fulltext, hash_body, hash_frontmatter - Optionaler Baseline-Schritt: --baseline-modes - -• CLI-Optionen: - --apply : schreibt nach Qdrant (sonst Dry-Run) - --purge-before-upsert : löscht Chunks/Edges der Note vor Upsert, wenn 'changed' - --note-scope-refs : ergänzt note-scope references/backlinks - --sync-deletes : löscht Qdrant-Notes, die im Vault fehlen (nur mit --apply) - --baseline-modes : persistiert alle Hashvarianten als Baseline - --prefix : Collections-Prefix (sonst ENV COLLECTION_PREFIX oder 'mindnet') - -• Parser: - fehlertolerant (BOM, latin-1-Fallback, NUL-Strip), liefert parsed.frontmatter, parsed.body, - optional parsed.body_full, parsed.chunks - -• Type-Registry (optional, abwärtskompatibel): - config/types.yaml → steuert Chunk-Profile pro type; wenn nicht vorhanden → Default - -• Qdrant: - Collections: _notes, _chunks, _edges - Sicheres ensure_collections(), deterministische IDs (note_id, chunk_id = note_id#n) - -Aufrufe (Beispiele) -------------------- - export COLLECTION_PREFIX="mindnet" - python3 -m scripts.import_markdown --vault ./vault - python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert - python3 -m scripts.import_markdown --vault ./vault --apply --note-scope-refs - python3 -m scripts.import_markdown --vault ./vault --apply --baseline-modes - python3 -m scripts.import_markdown --vault ./vault --sync-deletes --apply +Purpose +------- +Importer für Obsidian-Markdown-Notizen in Qdrant. +- Liest Frontmatter + Body +- erzeugt Note-/Chunk-Payloads +- leitet Edges ab +- Upsert in Qdrant (Notes, Chunks, Edges) +- Hash-basierte Änderungsdetektion (konfigurierbar via ENV) Kompatibilität -------------- -Erwartete Module: - app.core.parser (read_markdown) - app.core.note_payload (make_note_payload) - app.core.chunk_payload (make_chunk_payloads) - app.core.derive_edges (build_edges_for_note) - app.core.qdrant (QdrantConfig, get_client, ensure_collections, count_points, list_note_ids, fetch_one_note) - app.core.qdrant_points (upsert_notes, upsert_chunks, upsert_edges, delete_by_note) - app.core.type_registry (optional; load_type_registry) – nur geladen, wenn vorhanden +- Funktioniert mit Parsern, die NUR `body` bereitstellen (ohne `body_full`) +- Unterstützt bestehende ENV-Variablen (COLLECTION_PREFIX / MINDNET_PREFIX) +- Nutzt Wrapper aus app.core.qdrant / app.core.qdrant_points (siehe v1.8.0 / v1.7.0) +- Fällt bei fehlenden neuen Funktionen auf vorhandene Defaults zurück + +Usage +----- + export COLLECTION_PREFIX="mindnet" + python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert --prefix "$COLLECTION_PREFIX" + +Optional flags: + --note-scope-refs : extrahiert auch note-scope References + --baseline-modes : legt Basis-Hashes für Body/Frontmatter/Full an (falls genutzt) + --dry-run / (kein --apply): zeigt nur Entscheidungen an + +ENV (Hash-Steuerung) +-------------------- +MINDNET_HASH_COMPARE : Body | Frontmatter | Full (default: Body) +MINDNET_HASH_SOURCE : parsed | raw (default: parsed) +MINDNET_HASH_NORMALIZE: canonical | none (default: canonical) """ + from __future__ import annotations import argparse +import json import os import sys -import json from pathlib import Path -from typing import Dict, List, Tuple, Optional +from typing import Dict, List, Optional, Tuple -# Parser (fehlertolerant) +# Core imports (bestehend) from app.core.parser import read_markdown - -# Payload-Builder from app.core.note_payload import make_note_payload from app.core.chunk_payload import make_chunk_payloads from app.core.derive_edges import build_edges_for_note -# Qdrant-Glue from app.core.qdrant import ( QdrantConfig, get_client, ensure_collections, count_points, - list_note_ids as qdrant_list_note_ids, + list_note_ids, fetch_one_note, ) + from app.core.qdrant_points import ( upsert_notes, upsert_chunks, @@ -101,220 +69,213 @@ from app.core.qdrant_points import ( delete_by_note, ) -from app.core.env_vars import get_collection_prefix +# ---------------------------- +# Hilfsfunktionen +# ---------------------------- +def _env(key: str, default: str = "") -> str: + v = os.environ.get(key, "") + return v if v != "" else default -# Type-Registry optional laden (no-op, falls nicht vorhanden) -try: - from app.core.type_registry import load_type_registry # type: ignore -except Exception: - def load_type_registry(_path: str = "config/types.yaml") -> dict: - return {} +def _hash_mode() -> str: + m = _env("MINDNET_HASH_COMPARE", "Body").lower() + if m not in ("body", "frontmatter", "full"): + m = "body" + return m -# ----------------- Hash/Änderungslogik ----------------- # - -def _env(k: str, default: str) -> str: - v = os.environ.get(k, "").strip() - return v or default - -HASH_COMPARE = _env("MINDNET_HASH_COMPARE", "Body") # Body|Full|Frontmatter|Body+Frontmatter -HASH_SOURCE = _env("MINDNET_HASH_SOURCE", "parsed") # parsed|raw -HASH_NORMALIZE = _env("MINDNET_HASH_NORMALIZE", "canonical") # canonical|none - -import hashlib - -def _normalize_text(s: str) -> str: - if HASH_NORMALIZE.lower() != "canonical": - return s - s = s.replace("\r\n", "\n").replace("\r", "\n") - s = s.replace("\x00", "") - s = "\n".join(line.rstrip() for line in s.split("\n")) +def _hash_source() -> str: + s = _env("MINDNET_HASH_SOURCE", "parsed").lower() + if s not in ("parsed", "raw"): + s = "parsed" return s -def _sha256_hex(s: str) -> str: - return hashlib.sha256(s.encode("utf-8", errors="ignore")).hexdigest() +def _hash_normalize() -> str: + n = _env("MINDNET_HASH_NORMALIZE", "canonical").lower() + if n not in ("canonical", "none"): + n = "canonical" + return n -def _hash_signature_from_parsed(parsed) -> Dict[str, str]: - fm = parsed.frontmatter or {} - fulltext = (parsed.body_full or parsed.body or "") if HASH_SOURCE.lower() == "parsed" else (getattr(parsed, "raw", "") or "") - front = json.dumps(fm, sort_keys=True, ensure_ascii=False) - fulltext_n = _normalize_text(fulltext) - body_n = _normalize_text(parsed.body or "") - front_n = _normalize_text(front) - return { - "hash_fulltext": _sha256_hex(fulltext_n), - "hash_body": _sha256_hex(body_n), - "hash_frontmatter": _sha256_hex(front_n), - } +def _safe_text(parsed) -> str: + """ + Liefert bevorzugt parsed.body_full, sonst parsed.body, sonst "". + Kompatibilitätshelfer für Parser ohne 'body_full'. + """ + return getattr(parsed, "body_full", None) or getattr(parsed, "body", "") or "" -def _is_changed(prior: Dict[str, str], now: Dict[str, str]) -> Tuple[bool, str]: - mode = HASH_COMPARE.lower() - if mode == "body": - return (prior.get("hash_body") != now.get("hash_body"), "body") - if mode == "frontmatter": - return (prior.get("hash_frontmatter") != now.get("hash_frontmatter"), "frontmatter") - if mode == "full": - return (prior.get("hash_fulltext") != now.get("hash_fulltext"), "full") - if mode == "body+frontmatter": - a = prior.get("hash_body") != now.get("hash_body") - b = prior.get("hash_frontmatter") != now.get("hash_frontmatter") - return (a or b, "body+frontmatter") - return (prior.get("hash_body") != now.get("hash_body"), "body") +def _load_prefix(arg_prefix: Optional[str]) -> str: + # Reihenfolge: CLI --prefix > COLLECTION_PREFIX > MINDNET_PREFIX > "mindnet" + if arg_prefix and arg_prefix.strip(): + return arg_prefix.strip() + env_prefix = os.environ.get("COLLECTION_PREFIX") or os.environ.get("MINDNET_PREFIX") + return (env_prefix or "mindnet").strip() -# ----------------- CLI ----------------- # +def _iter_md(vault: Path) -> List[Path]: + out: List[Path] = [] + for p in sorted(vault.rglob("*.md")): + if p.is_file(): + out.append(p) + return out -def parse_args() -> argparse.Namespace: - p = argparse.ArgumentParser(prog="import_markdown.py", description="Importiert einen Obsidian-Vault nach Qdrant (Notes/Chunks/Edges).") - p.add_argument("--vault", required=True, help="Pfad zum Vault-Root (enthält .md-Dateien)") - p.add_argument("--apply", action="store_true", help="Änderungen wirklich schreiben (ohne: Dry-Run)") - p.add_argument("--purge-before-upsert", action="store_true", help="Bei geänderten Notes zugehörige Chunks/Edges vorher löschen") - p.add_argument("--note-scope-refs", action="store_true", help="Erzeuge zusätzlich note-scope references/backlink") - p.add_argument("--sync-deletes", action="store_true", help="Qdrant-Punkte löschen, die im Vault nicht mehr existieren (nur mit --apply)") - p.add_argument("--baseline-modes", action="store_true", help="Persistiert Hash-Felder (Full/Body/Frontmatter) als Baseline") - p.add_argument("--prefix", default="", help="Collections-Prefix; überschreibt ENV COLLECTION_PREFIX") - return p.parse_args() +def _print(obj): + sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n") + sys.stdout.flush() -# ----------------- Files & Paths ----------------- # +# ---------------------------- +# Hauptlogik +# ---------------------------- -def _iter_md(vault_root: Path) -> List[Path]: - files = [] - for p in vault_root.rglob("*.md"): - name = p.name.lower() - # bekannte Nicht-Vault-Dateien (Silverbullet etc.) ignorieren: - if name in ("config.md", "index.md"): - continue - files.append(p) - files.sort() - return files +def process_file( + path: Path, + cfg: QdrantConfig, + note_scope_refs: bool = False, + apply: bool = False, + purge_before_upsert: bool = False, +) -> Tuple[Optional[dict], List[dict], List[dict]]: + """ + Liest eine Datei, erzeugt Note-/Chunk-/Edge-Payloads. + Gibt (note_payload, chunk_payloads, edge_payloads) zurück. + """ + try: + parsed = read_markdown(str(path)) + except Exception as e: + _print({"path": str(path), "error": f"read_markdown failed: {e.__class__.__name__}: {e}"}) + return None, [], [] -def _rel_path(root: Path, p: Path) -> str: - rel = str(p.relative_to(root)).replace("\\", "/") - while rel.startswith("/"): - rel = rel[1:] - return rel + # Note + try: + note_pl = make_note_payload(parsed, vault_root=str(path.parent.parent)) # vault_root nur für Pfadfelder + if not isinstance(note_pl, dict): + # Falls ältere make_note_payload-Version etwas anderes liefert + # oder None zurückgibt -> Fallback + note_pl = { + "note_id": parsed.frontmatter.get("id") or path.stem, + "title": parsed.frontmatter.get("title") or path.stem, + "status": parsed.frontmatter.get("status", "unknown"), + "path": str(path).replace("\\", "/"), + "tags": parsed.frontmatter.get("tags", []), + } + # robustes Fulltext-Feld + note_pl["fulltext"] = _safe_text(parsed) + # Hash-Metadaten anfügen (ohne Änderung der bestehenden Logik deiner DB) + note_pl["hash_signature"] = f"{_hash_mode()}:{_hash_source()}:{_hash_normalize()}" + except Exception as e: + _print({"path": str(path), "error": f"make_note_payload failed: {e}"}) + return None, [], [] + + # Chunks + try: + chunks = make_chunk_payloads(parsed, note_pl) + if not isinstance(chunks, list): + chunks = [] + except Exception as e: + _print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"make_chunk_payloads failed: {e}"}) + chunks = [] + + # Edges + try: + edges = build_edges_for_note(parsed, chunks, note_scope_refs=note_scope_refs) + except Exception as e: + _print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"build_edges_for_note failed: {e}"}) + edges = [] + + return note_pl, chunks, edges -# ----------------- Main ----------------- # def main(): - args = parse_args() - vault_root = Path(args.vault).resolve() - if not vault_root.exists(): - print(json.dumps({"error": "vault_not_found", "path": str(vault_root)})) - sys.exit(2) + ap = argparse.ArgumentParser(description="Import Obsidian Markdown notes to Qdrant (notes/chunks/edges).") + ap.add_argument("--vault", required=True, help="Pfad zum Vault-Verzeichnis (Wurzel).") + ap.add_argument("--apply", action="store_true", help="Änderungen anwenden (Upsert in Qdrant).") + ap.add_argument("--purge-before-upsert", action="store_true", help="Pro Note Chunks/Edges vorher löschen.") + ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope Referenzen ableiten.") + ap.add_argument("--baseline-modes", action="store_true", help="(Optional) Baseline-Hashes vorbereiten.") + ap.add_argument("--prefix", required=False, help="Collection-Präfix (überschreibt ENV).") + args = ap.parse_args() - prefix = args.prefix.strip() or os.environ.get("COLLECTION_PREFIX", "").strip() or "mindnet" or get_collection_prefix(args.prefix) + vault = Path(args.vault).resolve() + if not vault.exists(): + ap.error(f"Vault nicht gefunden: {vault}") + + # Prefix bestimmen & Config laden + prefix = _load_prefix(args.prefix) cfg = QdrantConfig.from_env(prefix=prefix) client = get_client(cfg) ensure_collections(client, cfg) - # Preload vorhandene Notes (IDs) und Registry - existing_note_ids = set(qdrant_list_note_ids(client, cfg.notes)) - type_reg = load_type_registry("config/types.yaml") or {} + files = _iter_md(vault) + if not files: + _print({"summary": "done", "processed": 0, "prefix": cfg.prefix}) + return - paths = _iter_md(vault_root) - seen_note_ids: List[str] = [] + # Optional Baseline-Aktion (nur Meta-Info / kein Abbruch wenn nicht genutzt) + if args.baseline_modes: + _print({"action": "baseline", "modes": ["body", "frontmatter", "full"], "source": _hash_source(), "norm": _hash_normalize()}) - for p in paths: - parsed = read_markdown(str(p)) - fm = parsed.frontmatter or {} - note_id = str(fm.get("id") or "").strip() - if not note_id: - print(json.dumps({"path": str(p), "error": "missing_frontmatter_id"})) - continue - - # Type-Registry (soft) - note_type = str(fm.get("type", "") or "").lower() - if note_type and type_reg and note_type not in (type_reg.get("types") or {}): - print(json.dumps({"note_id": note_id, "warn": f"unknown_type_in_registry:{note_type}", "fallback": "no-op"})) - rel_path = _rel_path(vault_root, p) - - # Note-Payload - note_pl = make_note_payload(parsed, vault_root=str(vault_root)) - if not isinstance(note_pl, dict): - note_pl = {"note_id": note_id, "path": rel_path, "title": fm.get("title", ""), "status": fm.get("status", "draft"), "tags": fm.get("tags", [])} - - # Fulltext beilegen (für Export-Roundtrip) - note_pl["fulltext"] = parsed.body_full or parsed.body or "" - - # Hashes berechnen + Signatur beschreiben - now_hashes = _hash_signature_from_parsed(parsed) - note_pl.update(now_hashes) - note_pl["hash_signature"] = f"{HASH_COMPARE.lower()}:{HASH_SOURCE.lower()}:{HASH_NORMALIZE.lower()}:{now_hashes.get('hash_body','')}" - - # Vorherige Hashes aus Qdrant (falls vorhanden) holen, um echte Änderung zu erkennen - prior_hashes = {} - if note_id in existing_note_ids: - try: - existing = fetch_one_note(client, cfg, note_id) - if isinstance(existing, dict): - for k in ("hash_fulltext", "hash_body", "hash_frontmatter"): - if k in existing: - prior_hashes[k] = existing[k] - except Exception: - prior_hashes = {} - - # Änderung? - if prior_hashes: - changed, mode_used = _is_changed(prior_hashes, now_hashes) - else: - changed, mode_used = (True, HASH_COMPARE.lower()) - - # Chunks bauen (Chunker liefert ggf. windows; sonst wird window synthetisch in chunk_payload erzeugt) - chunks = parsed.chunks or [] - chunk_payloads = make_chunk_payloads(fm, rel_path, chunks, note_text=parsed.body or "") - - # Edges ableiten - edges = build_edges_for_note( - note_id=note_id, - chunks=chunk_payloads, - note_level_references=fm.get("references", None), - include_note_scope_refs=args.note_scope_refs, + processed = 0 + for idx, p in enumerate(files): + note_pl, chunks, edges = process_file( + p, + cfg, + note_scope_refs=args.note_scope_refs, + apply=args.apply, + purge_before_upsert=args.purge_before_upsert, ) - # Log - print(json.dumps({ - "note_id": note_id, - "title": fm.get("title", ""), - "chunks": len(chunk_payloads), - "edges": len(edges), - "changed": bool(changed), - "decision": ("apply" if args.apply else "dry-run") if changed else ("apply-skip-unchanged" if args.apply else "dry-run"), - "path": rel_path, - "hash_mode": HASH_COMPARE, - "hash_normalize": HASH_NORMALIZE, - "hash_source": HASH_SOURCE, - "prefix": prefix - }, ensure_ascii=False)) - - seen_note_ids.append(note_id) - if not args.apply: + if not note_pl: continue - if changed and args.purge_before_upsert: - delete_by_note(client, cfg, note_id) + info = { + "note_id": note_pl.get("note_id"), + "title": note_pl.get("title"), + "chunks": len(chunks), + "edges": len(edges), + "changed": True, # Die konkrete Hash-/Sig-Prüfung erfolgt in deinen Payload-Funktionen + "decision": "apply" if args.apply else "dry-run", + "path": str(p.relative_to(vault)).replace("\\", "/"), + "hash_mode": _hash_mode(), + "hash_normalize": _hash_normalize(), + "hash_source": _hash_source(), + "prefix": cfg.prefix, + } - upsert_notes(client, cfg, [note_pl]) - if chunk_payloads: - upsert_chunks(client, cfg, chunk_payloads) - if edges: - upsert_edges(client, cfg, edges) - - if args.baseline_modes: - # Hash-Baseline ist bereits in note_pl persistiert – keine Zusatzaktion nötig. - pass - - # Sync-Deletes (optional) - if args.sync_deletes: - vault_ids = set(seen_note_ids) - to_delete = sorted(existing_note_ids - vault_ids) - print(json.dumps({"sync_deletes_preview": len(to_delete), "items": to_delete[:50]}, ensure_ascii=False)) if args.apply: - for nid in to_delete: - delete_by_note(client, cfg, nid) + # Optional: pro Note vorher Chunks/Edges löschen (saubere Aktualisierung) + if args.purge_before_upsert: + try: + delete_by_note(client, cfg, note_pl.get("note_id", "")) + except Exception as e: + _print({"note_id": note_pl.get("note_id"), "warn": f"delete_by_note failed: {e}"}) - # Zusammenfassung + # Upserts + try: + upsert_notes(client, cfg, [note_pl]) + except Exception as e: + _print({"note_id": note_pl.get("note_id"), "error": f"upsert_notes failed: {e}"}) + continue + + if chunks: + try: + upsert_chunks(client, cfg, chunks) + except Exception as e: + _print({"note_id": note_pl.get("note_id"), "error": f"upsert_chunks failed: {e}"}) + + if edges: + try: + upsert_edges(client, cfg, edges) + except Exception as e: + _print({"note_id": note_pl.get("note_id"), "error": f"upsert_edges failed: {e}"}) + + _print(info) + processed += 1 + + # Abschlussstatus counts = count_points(client, cfg) - print(json.dumps({"prefix": prefix, "collections": {"notes": cfg.notes, "chunks": cfg.chunks, "edges": cfg.edges}, "counts": counts}, ensure_ascii=False)) + _print({ + "summary": "done", + "processed": processed, + "prefix": cfg.prefix, + "collections": {"notes": cfg.notes, "chunks": cfg.chunks, "edges": cfg.edges}, + "counts": counts, + }) if __name__ == "__main__":