diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index 775f68b..6689fbd 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -2,66 +2,60 @@ # -*- coding: utf-8 -*- """ Script: scripts/import_markdown.py -Version: 3.9.2 +Version: 3.9.3 Date: 2025-11-08 -Purpose -------- -Importer für Obsidian-Markdown-Notizen in Qdrant. -- Liest Frontmatter + Body -- erzeugt Note-/Chunk-Payloads -- leitet Edges ab -- Upsert in Qdrant (Notes, Chunks, Edges) -- Hash-basierte Änderungsdetektion (konfigurierbar via ENV) +Zweck +----- +Importer für Obsidian-Markdown-Notizen in Qdrant: +- Einlesen (Frontmatter/Body) +- Chunking (unterstützt alte und neue Chunk-Pipelines) +- Edges ableiten (kompatibel zu alten derive_edges-Signaturen) +- Hash-Detektion (ENV-gesteuert) +- Upsert Notes/Chunks/Edges (inkl. Notes-Vector, falls Collection Vektor verlangt) Kompatibilität -------------- -- Funktioniert mit Parsern, die NUR `body` bereitstellen (ohne `body_full`) -- Unterstützt bestehende ENV-Variablen (COLLECTION_PREFIX / MINDNET_PREFIX) -- Nutzt Wrapper aus app.core.qdrant / app.core.qdrant_points (siehe v1.8.0 / v1.7.0) -- Fällt bei fehlenden neuen Funktionen auf vorhandene Defaults zurück - -Usage ------ - export COLLECTION_PREFIX="mindnet" - python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert --prefix "$COLLECTION_PREFIX" - -Optional flags: - --note-scope-refs : extrahiert auch note-scope References - --baseline-modes : legt Basis-Hashes für Body/Frontmatter/Full an (falls genutzt) - --dry-run / (kein --apply): zeigt nur Entscheidungen an +- Parser mit/ohne `body_full` +- `make_chunk_payloads(parsed, note_pl, chunks)` ODER ältere Signaturen +- `build_edges_for_note(parsed, chunks)` ODER neue Signaturen (optional mit note_scope_refs) +- Qdrant-Collections mit/ohne Vektorerwartung; Notes erhalten notfalls einen Zero-Vector +- Prefix-Auflösung: CLI --prefix > COLLECTION_PREFIX > MINDNET_PREFIX > "mindnet" ENV (Hash-Steuerung) -------------------- -MINDNET_HASH_COMPARE : Body | Frontmatter | Full (default: Body) -MINDNET_HASH_SOURCE : parsed | raw (default: parsed) -MINDNET_HASH_NORMALIZE: canonical | none (default: canonical) +MINDNET_HASH_COMPARE : Body | Frontmatter | Full (default: Body) +MINDNET_HASH_SOURCE : parsed | raw (default: parsed) +MINDNET_HASH_NORMALIZE : canonical | none (default: canonical) + +Weitere ENV +----------- +MINDNET_NOTE_VECTOR_D : Dimension des Note-Vektors (default: aus QdrantConfig oder 384) """ from __future__ import annotations import argparse +import inspect import json import os import sys from pathlib import Path from typing import Dict, List, Optional, Tuple -# Core imports (bestehend) +# Parser & Payloads from app.core.parser import read_markdown from app.core.note_payload import make_note_payload from app.core.chunk_payload import make_chunk_payloads from app.core.derive_edges import build_edges_for_note +# Qdrant Helper from app.core.qdrant import ( QdrantConfig, get_client, ensure_collections, count_points, - list_note_ids, - fetch_one_note, ) - from app.core.qdrant_points import ( upsert_notes, upsert_chunks, @@ -69,8 +63,15 @@ from app.core.qdrant_points import ( delete_by_note, ) +# Optionales Chunk-Assembly (neuere Pipeline) +try: + from app.core.chunker import assemble_chunks # bevorzugt +except Exception: # pragma: no cover + assemble_chunks = None + + # ---------------------------- -# Hilfsfunktionen +# Utilities # ---------------------------- def _env(key: str, default: str = "") -> str: @@ -79,36 +80,30 @@ def _env(key: str, default: str = "") -> str: def _hash_mode() -> str: m = _env("MINDNET_HASH_COMPARE", "Body").lower() - if m not in ("body", "frontmatter", "full"): - m = "body" - return m + return m if m in ("body", "frontmatter", "full") else "body" def _hash_source() -> str: s = _env("MINDNET_HASH_SOURCE", "parsed").lower() - if s not in ("parsed", "raw"): - s = "parsed" - return s + return s if s in ("parsed", "raw") else "parsed" def _hash_normalize() -> str: n = _env("MINDNET_HASH_NORMALIZE", "canonical").lower() - if n not in ("canonical", "none"): - n = "canonical" - return n + return n if n in ("canonical", "none") else "canonical" def _safe_text(parsed) -> str: - """ - Liefert bevorzugt parsed.body_full, sonst parsed.body, sonst "". - Kompatibilitätshelfer für Parser ohne 'body_full'. - """ + """Bevorzugt parsed.body_full, sonst parsed.body.""" return getattr(parsed, "body_full", None) or getattr(parsed, "body", "") or "" def _load_prefix(arg_prefix: Optional[str]) -> str: - # Reihenfolge: CLI --prefix > COLLECTION_PREFIX > MINDNET_PREFIX > "mindnet" if arg_prefix and arg_prefix.strip(): return arg_prefix.strip() env_prefix = os.environ.get("COLLECTION_PREFIX") or os.environ.get("MINDNET_PREFIX") return (env_prefix or "mindnet").strip() +def _print(obj): + sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n") + sys.stdout.flush() + def _iter_md(vault: Path) -> List[Path]: out: List[Path] = [] for p in sorted(vault.rglob("*.md")): @@ -116,37 +111,130 @@ def _iter_md(vault: Path) -> List[Path]: out.append(p) return out -def _print(obj): - sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n") - sys.stdout.flush() +def _note_vector_dim(cfg: QdrantConfig) -> int: + # Bevorzugt Konfig, sonst ENV, sonst 384 + # Viele Setups nutzen 384 (MiniLM 384d) + dim = getattr(cfg, "notes_vector_dim", None) + if isinstance(dim, int) and dim > 0: + return dim + env_dim = _env("MINDNET_NOTE_VECTOR_D", "") + if env_dim.isdigit(): + try: + d = int(env_dim) + if d > 0: + return d + except Exception: + pass + return 384 + +def _ensure_note_vector(note_pl: Dict, cfg: QdrantConfig) -> None: + # Falls die Notes-Collection einen dichten Vektor verlangt, muss `vector` gesetzt sein. + # Wir setzen einen Zero-Vector mit korrekter Dimension. + if "vector" not in note_pl or note_pl["vector"] is None: + d = _note_vector_dim(cfg) + note_pl["vector"] = [0.0] * d + # ---------------------------- -# Hauptlogik +# Signatur-kompatible Aufrufe +# ---------------------------- + +def _call_make_chunk_payloads(parsed, note_pl, raw_chunks: Optional[List[Dict]] = None) -> List[Dict]: + """ + Ruft make_chunk_payloads mit der passenden Signatur auf. + Historisch gab es Varianten: + A) make_chunk_payloads(parsed, note_pl, chunks) + B) make_chunk_payloads(parsed, note_pl) + C) make_chunk_payloads(note_pl, chunks) + Wir erkennen das zur Laufzeit. + """ + sig = inspect.signature(make_chunk_payloads) + params = list(sig.parameters.keys()) + + # Versuche die plausibelste moderne Variante zuerst + try_order = [] + + if params[:3] == ["parsed", "note_pl", "chunks"]: + try_order = [("parsed_note_chunks",)] + elif params[:2] == ["parsed", "note_pl"]: + try_order = [("parsed_note",)] + elif params[:2] == ["note_pl", "chunks"]: + try_order = [("note_chunks",)] + else: + # generischer Fallback: wir probieren die drei Muster + try_order = [("parsed_note_chunks",), ("parsed_note",), ("note_chunks",)] + + last_err = None + for variant in try_order: + try: + if variant == ("parsed_note_chunks",): + if raw_chunks is None: + # wenn Signatur die Chunks erwartet, aber keine vorhanden sind, baue konservativ 1 Chunk + raw_chunks = [{ + "chunk_id": f"{note_pl.get('note_id', 'note')}#1", + "text": _safe_text(parsed), + "window": _safe_text(parsed), + "order": 1, + "path": note_pl.get("path", ""), + }] + return make_chunk_payloads(parsed, note_pl, raw_chunks) # type: ignore + elif variant == ("parsed_note",): + return make_chunk_payloads(parsed, note_pl) # type: ignore + elif variant == ("note_chunks",): + if raw_chunks is None: + raw_chunks = [{ + "chunk_id": f"{note_pl.get('note_id', 'note')}#1", + "text": _safe_text(parsed), + "window": _safe_text(parsed), + "order": 1, + "path": note_pl.get("path", ""), + }] + return make_chunk_payloads(note_pl, raw_chunks) # type: ignore + except Exception as e: + last_err = e + + raise RuntimeError(f"make_chunk_payloads invocation failed: {last_err}") + +def _call_build_edges_for_note(parsed, chunk_payloads: List[Dict], note_scope_refs: bool) -> List[Dict]: + """ + Ruft build_edges_for_note mit kompatibler Signatur auf. + Historisch: + A) build_edges_for_note(parsed, chunks) + B) build_edges_for_note(parsed, chunks, note_scope_refs=True/False) + """ + sig = inspect.signature(build_edges_for_note) + params = list(sig.parameters.keys()) + try: + if "note_scope_refs" in params: + return build_edges_for_note(parsed, chunk_payloads, note_scope_refs=note_scope_refs) # type: ignore + else: + return build_edges_for_note(parsed, chunk_payloads) # type: ignore + except TypeError: + # strenger Fallback: ohne Zusatzparameter + return build_edges_for_note(parsed, chunk_payloads) # type: ignore + + +# ---------------------------- +# Hauptverarbeitung # ---------------------------- def process_file( path: Path, cfg: QdrantConfig, - note_scope_refs: bool = False, - apply: bool = False, - purge_before_upsert: bool = False, + note_scope_refs: bool, + apply: bool, + purge_before_upsert: bool, ) -> Tuple[Optional[dict], List[dict], List[dict]]: - """ - Liest eine Datei, erzeugt Note-/Chunk-/Edge-Payloads. - Gibt (note_payload, chunk_payloads, edge_payloads) zurück. - """ try: parsed = read_markdown(str(path)) except Exception as e: _print({"path": str(path), "error": f"read_markdown failed: {e.__class__.__name__}: {e}"}) return None, [], [] - # Note + # Note-Payload try: - note_pl = make_note_payload(parsed, vault_root=str(path.parent.parent)) # vault_root nur für Pfadfelder + note_pl = make_note_payload(parsed, vault_root=str(path.parent.parent)) if not isinstance(note_pl, dict): - # Falls ältere make_note_payload-Version etwas anderes liefert - # oder None zurückgibt -> Fallback note_pl = { "note_id": parsed.frontmatter.get("id") or path.stem, "title": parsed.frontmatter.get("title") or path.stem, @@ -154,31 +242,44 @@ def process_file( "path": str(path).replace("\\", "/"), "tags": parsed.frontmatter.get("tags", []), } - # robustes Fulltext-Feld note_pl["fulltext"] = _safe_text(parsed) - # Hash-Metadaten anfügen (ohne Änderung der bestehenden Logik deiner DB) note_pl["hash_signature"] = f"{_hash_mode()}:{_hash_source()}:{_hash_normalize()}" + # Notes-Vector sicherstellen (Zero-Vector, wenn Collection ihn verlangt) + _ensure_note_vector(note_pl, cfg) except Exception as e: _print({"path": str(path), "error": f"make_note_payload failed: {e}"}) return None, [], [] - # Chunks + # Roh-Chunks (falls assemble_chunks verfügbar) + raw_chunks: Optional[List[Dict]] = None + if assemble_chunks is not None: + try: + raw_chunks = assemble_chunks( + note_pl.get("note_id", path.stem), + _safe_text(parsed), + parsed.frontmatter.get("type", "concept"), + ) + except Exception as e: + _print({"path": str(path), "note_id": note_pl.get("note_id"), "warn": f"assemble_chunks failed: {e}"}) + raw_chunks = None + + # Chunk-Payloads try: - chunks = make_chunk_payloads(parsed, note_pl) - if not isinstance(chunks, list): - chunks = [] + chunk_payloads = _call_make_chunk_payloads(parsed, note_pl, raw_chunks) + if not isinstance(chunk_payloads, list): + chunk_payloads = [] except Exception as e: _print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"make_chunk_payloads failed: {e}"}) - chunks = [] + chunk_payloads = [] # Edges try: - edges = build_edges_for_note(parsed, chunks, note_scope_refs=note_scope_refs) + edges = _call_build_edges_for_note(parsed, chunk_payloads, note_scope_refs=note_scope_refs) except Exception as e: _print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"build_edges_for_note failed: {e}"}) edges = [] - return note_pl, chunks, edges + return note_pl, chunk_payloads, edges def main(): @@ -186,7 +287,7 @@ def main(): ap.add_argument("--vault", required=True, help="Pfad zum Vault-Verzeichnis (Wurzel).") ap.add_argument("--apply", action="store_true", help="Änderungen anwenden (Upsert in Qdrant).") ap.add_argument("--purge-before-upsert", action="store_true", help="Pro Note Chunks/Edges vorher löschen.") - ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope Referenzen ableiten.") + ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope Referenzen (falls unterstützt).") ap.add_argument("--baseline-modes", action="store_true", help="(Optional) Baseline-Hashes vorbereiten.") ap.add_argument("--prefix", required=False, help="Collection-Präfix (überschreibt ENV).") args = ap.parse_args() @@ -195,7 +296,6 @@ def main(): if not vault.exists(): ap.error(f"Vault nicht gefunden: {vault}") - # Prefix bestimmen & Config laden prefix = _load_prefix(args.prefix) cfg = QdrantConfig.from_env(prefix=prefix) client = get_client(cfg) @@ -206,29 +306,27 @@ def main(): _print({"summary": "done", "processed": 0, "prefix": cfg.prefix}) return - # Optional Baseline-Aktion (nur Meta-Info / kein Abbruch wenn nicht genutzt) if args.baseline_modes: _print({"action": "baseline", "modes": ["body", "frontmatter", "full"], "source": _hash_source(), "norm": _hash_normalize()}) processed = 0 - for idx, p in enumerate(files): - note_pl, chunks, edges = process_file( + for p in files: + note_pl, chunk_payloads, edges = process_file( p, cfg, note_scope_refs=args.note_scope_refs, apply=args.apply, purge_before_upsert=args.purge_before_upsert, ) - if not note_pl: continue info = { "note_id": note_pl.get("note_id"), "title": note_pl.get("title"), - "chunks": len(chunks), + "chunks": len(chunk_payloads), "edges": len(edges), - "changed": True, # Die konkrete Hash-/Sig-Prüfung erfolgt in deinen Payload-Funktionen + "changed": True, # Detail-Hashing passiert innerhalb der Payload-Builder "decision": "apply" if args.apply else "dry-run", "path": str(p.relative_to(vault)).replace("\\", "/"), "hash_mode": _hash_mode(), @@ -238,23 +336,20 @@ def main(): } if args.apply: - # Optional: pro Note vorher Chunks/Edges löschen (saubere Aktualisierung) if args.purge_before_upsert: try: delete_by_note(client, cfg, note_pl.get("note_id", "")) except Exception as e: _print({"note_id": note_pl.get("note_id"), "warn": f"delete_by_note failed: {e}"}) - # Upserts try: upsert_notes(client, cfg, [note_pl]) except Exception as e: _print({"note_id": note_pl.get("note_id"), "error": f"upsert_notes failed: {e}"}) - continue - if chunks: + if chunk_payloads: try: - upsert_chunks(client, cfg, chunks) + upsert_chunks(client, cfg, chunk_payloads) except Exception as e: _print({"note_id": note_pl.get("note_id"), "error": f"upsert_chunks failed: {e}"}) @@ -267,14 +362,12 @@ def main(): _print(info) processed += 1 - # Abschlussstatus - counts = count_points(client, cfg) _print({ "summary": "done", "processed": processed, "prefix": cfg.prefix, "collections": {"notes": cfg.notes, "chunks": cfg.chunks, "edges": cfg.edges}, - "counts": counts, + "counts": count_points(client, cfg), })