diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index ddb93ac..14cc421 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -2,81 +2,83 @@ # -*- coding: utf-8 -*- """ Script: scripts/import_markdown.py -Version: 0.6.0 (2025-09-06) +Version: 0.7.1 (2025-09-06) Autor: mindnet / Architektur Datenimporte & Sync Kurzbeschreibung --------------- Importiert Markdown-Notizen aus einem Obsidian-ähnlichen Vault in Qdrant: -- Validiert Frontmatter / Note-Payload. +- Validiert Frontmatter / Note-Payload (gegen note.schema.json). - Chunking + Embeddings. - Leitet Edges direkt beim Import aus [[Wikilinks]] ab: - 'references' (Note→Note) - 'references_at' (Chunk→Note) - - 'backlink' (Note←Note) nur für Note→Note-Kanten. + - 'backlink' (Note←Note) (symmetrisch zu 'references') +- Idempotente Upserts (deterministische IDs über qdrant_points). -Neu in 0.6.0 +Neu in 0.7.1 ------------ -- Option `--purge-before-upsert`: löscht für die jeweils verarbeitete Note - *vor* dem Upsert alle zugehörigen Chunks und Edges in Qdrant (selektiv!), - um Leichen nach Re-Chunking zu vermeiden. -- Robuste Link-Auflösung via Note-Index (ID / Titel-Slug / Datei-Slug) - konsistent zu `derive_edges.py`. +- Korrekte Änderungsdetektion via SHA-256 über die **komplette Datei** (Frontmatter+Body): + - Feld: payload.hash_fulltext + - Vergleicht neuen Hash gegen bestehenden Hash in Qdrant. + - Nur bei Änderung → Verarbeitung/Upsert; sonst "skip". +- `--purge-before-upsert` wird **nur** ausgeführt, wenn sich die Note **wirklich geändert** hat. +- Robuste Qdrant-Scroll-Kompatibilität (2- oder 3-Tupel Rückgaben). Aufrufbeispiele --------------- -Dry-Run (keine Schreibzugriffe): +Dry-Run (nur prüfen, nichts schreiben): python3 -m scripts.import_markdown --vault ./vault -Nur eine bestimmte Note: +Nur eine spezifische Note: python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo -Apply (schreiben) mit Purge: +Apply (schreiben) mit Purge (nur geänderte Noten werden bereinigt + neu geschrieben): python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert Parameter --------- --vault PATH : Pflicht. Root-Verzeichnis des Vaults. --apply : Wenn gesetzt, werden Upserts durchgeführt (sonst Dry-Run). ---purge-before-upsert : Wenn gesetzt, werden vor dem Upsert (nur bei --apply) - alte Chunks und Edges dieser Note in Qdrant gelöscht. +--purge-before-upsert : Vor Upsert alte Chunks/Edges der **geänderten** Note löschen. --note-id ID : Optional, verarbeitet nur diese eine Note. Umgebungsvariablen (.env) ------------------------- QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM -Standard: url=http://127.0.0.1:6333, prefix=mindnet, dim=384 +Defaults: url=http://127.0.0.1:6333, prefix=mindnet, dim=384 Kompatibilität -------------- -- Nutzt die bestehenden Kernmodule: +- Bestehende Kernmodule werden weiterverwendet: app.core.parser (read_markdown, normalize_frontmatter, validate_required_frontmatter) app.core.validate_note (validate_note_payload) app.core.chunker (assemble_chunks) app.core.chunk_payload (make_chunk_payloads) app.core.embed (embed_texts) - app.core.qdrant (QdrantConfig, get_client, ensure_collections) + app.core.qdrant (QdrantConfig, ensure_collections, get_client, collection_names) app.core.qdrant_points (points_for_note, points_for_chunks, points_for_edges, upsert_batch) app.core.derive_edges (build_note_index, derive_wikilink_edges) -Änderungshinweise vs. früherer Importer ---------------------------------------- -- Alte, globale Lösch-Workarounds entfallen. Selektives Purge ist jetzt optional und sicher. -- Edges werden nur noch in der neuen, einheitlichen Struktur erzeugt. +Hinweise +-------- +- Bitte im aktivierten venv laufen lassen: source .venv/bin/activate """ from __future__ import annotations import argparse import glob +import hashlib import json import os import sys -from typing import List, Dict +from typing import List, Dict, Tuple, Optional from dotenv import load_dotenv from qdrant_client.http import models as rest +from qdrant_client import QdrantClient -# Kern-Bausteine (vorhanden in eurem Projekt) +# Kern-Bausteine (aus eurem Projekt) from app.core.parser import ( read_markdown, normalize_frontmatter, @@ -105,17 +107,54 @@ def iter_md(root: str) -> List[str]: out: List[str] = [] for p in patterns: out.extend(glob.glob(os.path.join(root, p), recursive=True)) - return sorted(list(dict.fromkeys(out))) # de-dupe + sort + # de-dupe + sort + return sorted(list(dict.fromkeys(out))) + + +def file_sha256(path: str) -> str: + """SHA256 über die **Rohdatei** (Frontmatter + Body).""" + h = hashlib.sha256() + with open(path, "rb") as f: + for chunk in iter(lambda: f.read(8192), b""): + h.update(chunk) + return h.hexdigest() + + +def qdrant_scroll(client: QdrantClient, **kwargs) -> Tuple[List, Optional[str]]: + """ + Wrapper, der sowohl (points, next_offset) als auch (points, next_page, _) Signaturen abdeckt. + """ + res = client.scroll(**kwargs) + if isinstance(res, tuple): + if len(res) == 2: + return res[0], res[1] + if len(res) >= 3: + return res[0], res[1] + return res, None + + +def fetch_existing_note_hash(client: QdrantClient, prefix: str, note_id: str) -> Optional[str]: + """Liest hash_fulltext aus mindnet_notes.payload für eine Note.""" + notes_col, _, _ = collection_names(prefix) + f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + pts, _ = qdrant_scroll( + client, + collection_name=notes_col, + scroll_filter=f, + with_payload=True, + with_vectors=False, + limit=1 + ) + if not pts: + return None + pl = getattr(pts[0], "payload", {}) or {} + return pl.get("hash_fulltext") def make_note_stub(abs_path: str, vault_root: str) -> Dict: - """ - Erstellt einen minimalen Note-Stub für den Index (build_note_index): - { note_id, title, path } - """ + """Minimaler Stub (id/title/path) für build_note_index.""" parsed = read_markdown(abs_path) fm = normalize_frontmatter(parsed.frontmatter or {}) - # Validierung minimal: wir brauchen id + title (title optional für Slug-Auflösung) if "id" not in fm or not fm["id"]: raise ValueError(f"Missing id in frontmatter: {abs_path}") rel = os.path.relpath(abs_path, vault_root) @@ -123,48 +162,37 @@ def make_note_stub(abs_path: str, vault_root: str) -> Dict: def build_vault_index(vault_root: str) -> tuple[Dict, Dict, Dict]: - """ - Liest alle Noten ein und baut den Dreifach-Index für Wikilink-Auflösung. - """ - files = iter_md(vault_root) + """Index für robuste Wikilink-Auflösung über alle Noten im Vault.""" stubs = [] - for p in files: + for p in iter_md(vault_root): try: stubs.append(make_note_stub(p, vault_root)) except Exception: - # Notiz ohne id → wird vom Importer später ohnehin übersprungen continue return build_note_index(stubs) -def purge_for_note(client, prefix: str, note_id: str, chunk_ids: List[str]) -> Dict[str, int]: +def purge_for_note(client: QdrantClient, prefix: str, note_id: str, chunk_ids: List[str]) -> None: """ - Selektives Purge für die aktuelle Note: - - Chunks: alle mit payload.note_id == note_id - - Edges: alle mit payload.source_id == note_id ODER == einem der chunk_ids - - Notes: werden nicht gelöscht (Upsert überschreibt Payload/Vektor) + Selektives Purge der alten Daten für **diese** Note: + - löscht Chunks (payload.note_id == note_id) + - löscht Edges, deren source_id == note_id ODER in chunk_ids + Notes selbst werden nicht gelöscht (Upsert reicht). """ notes_col, chunks_col, edges_col = collection_names(prefix) - counts = {"chunks_deleted": 0, "edges_deleted": 0} - # Chunks löschen (Filter must: note_id == X) - f_chunks = rest.Filter( - must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))] - ) - res_chunks = client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True) - counts["chunks_deleted"] = getattr(res_chunks, "status", None) and 0 or 0 # Qdrant liefert keine count hier + # Chunks löschen + f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) + client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True) - # Edges löschen: OR über Note-ID und alle Chunk-IDs + # Edges löschen (OR über source_id Werte) should_conds = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id))] for cid in chunk_ids: should_conds.append(rest.FieldCondition(key="source_id", match=rest.MatchValue(value=cid))) - - f_edges = rest.Filter(should=should_conds) if should_conds else None - if f_edges is not None: + if should_conds: + f_edges = rest.Filter(should=should_conds) client.delete(collection_name=edges_col, points_selector=f_edges, wait=True) - return counts - # ------------------------------------------------- # Main @@ -177,7 +205,7 @@ def main(): ap.add_argument( "--purge-before-upsert", action="store_true", - help="Vor Upsert alte Chunks/Edges der aktuellen Note löschen (nur mit --apply wirksam).", + help="Vor Upsert alte Chunks/Edges **nur für geänderte** Noten löschen.", ) ap.add_argument("--note-id", help="Optional: nur diese Note verarbeiten") args = ap.parse_args() @@ -198,7 +226,7 @@ def main(): print("Keine Markdown-Dateien gefunden.", file=sys.stderr) sys.exit(2) - # 1) Note-Index über den gesamten Vault (für robuste Link-Auflösung) + # Index einmal bauen (für Linkauflösung bei geänderten Noten) note_index = build_vault_index(vault_root) processed = 0 @@ -215,57 +243,74 @@ def main(): processed += 1 - # --- Note-Payload --- - from app.core.note_payload import make_note_payload # lazy import (bestehende Funktion) + # Änderungsdetektion (Datei-Hash vs. Qdrant) + new_hash = file_sha256(abs_path) + old_hash = fetch_existing_note_hash(client, cfg.prefix, fm["id"]) + changed = (old_hash != new_hash) + + decision = "skip" + if changed: + decision = "apply" if args.apply else "dry-run" + + # Bei "skip" kein teures Chunking/Embedding/Edges nötig + if not changed: + print(json.dumps({ + "note_id": fm["id"], + "title": fm.get("title"), + "changed": False, + "decision": "skip", + "path": os.path.relpath(abs_path, vault_root), + }, ensure_ascii=False)) + continue + + # --- Ab hier: Nur für geänderte Noten --- + + # Note-Payload erzeugen + from app.core.note_payload import make_note_payload # lazy import note_pl = make_note_payload(parsed, vault_root=vault_root) + # Hash im Payload mitschreiben (Schema erlaubt hash_fulltext) + note_pl["hash_fulltext"] = new_hash validate_note_payload(note_pl) - # --- Chunking & Payloads --- + # Chunking & Payloads chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks) - # --- Embeddings --- + # Embeddings texts = [c.get("text") or c.get("content") or "" for c in chunk_pls] vectors = embed_texts(texts) - # --- Edge-Ableitung (direkt) --- + # Edges ableiten (Note-/Chunk-Level) edges = derive_wikilink_edges(note_pl, chunk_pls, note_index) - # --- Ausgabe je Note (Entscheidung) --- - decision = "apply" if args.apply else "dry-run" - - # --- Purge vor Upsert (nur wenn --apply) --- + # Purge (nur wenn apply + Option gesetzt) if args.apply and args.purge_before_upsert: - # Chunk-IDs (neu) ermitteln → für Edge-Purge by source_id + # Chunk-IDs bestimmen (für Edge-Purge by source_id) chunk_ids = [] for i, ch in enumerate(chunk_pls, start=1): cid = ch.get("chunk_id") or ch.get("id") or f"{fm['id']}#{i}" ch["chunk_id"] = cid # sicherstellen chunk_ids.append(cid) - purge_for_note(client, cfg.prefix, fm["id"], chunk_ids) - # --- Upserts (nur bei --apply) --- + # Upserts (nur Apply) if args.apply: - # Note notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec=None, dim=cfg.dim) upsert_batch(client, notes_col, note_pts) - # Chunks chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors) upsert_batch(client, chunks_col, chunk_pts) - # Edges edges_col, edge_pts = points_for_edges(cfg.prefix, edges) upsert_batch(client, edges_col, edge_pts) - # Logging pro Note + # Logging geänderte Note print(json.dumps({ "note_id": fm["id"], "title": fm.get("title"), "chunks": len(chunk_pls), "edges": len(edges), - "changed": True, # Hash-/Zeitvergleich kann optional hier ergänzt werden + "changed": True, "decision": decision, "path": note_pl["path"], }, ensure_ascii=False))