From dcb02dd19460af6e2081103599a5da830c283814 Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 9 Sep 2025 11:17:30 +0200 Subject: [PATCH] app/core/chunk_payload.py aktualisiert --- app/core/chunk_payload.py | 386 +++++++++++++++++++++++++++----------- 1 file changed, 281 insertions(+), 105 deletions(-) diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index af2c506..a619d3b 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -1,142 +1,318 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Modul: app/core/chunk_payload.py -Version: 1.1.2 +Script: import_markdown.py — Markdown → Qdrant (Notes, Chunks, Edges) +Version: 3.1.1 Datum: 2025-09-09 Kurzbeschreibung ---------------- -Erzeugt Qdrant-Payloads für Text-Chunks einer Note. Jeder Chunk enthält -den tatsächlichen Text unter dem Schlüssel ``text``. Dadurch kann der -Exporter den vollständigen Body verlässlich aus Chunks rekonstruieren, -falls ``notes.payload.fulltext`` fehlt. +Liest Markdown-Dateien aus einem Vault ein und schreibt Notes, Chunks und Edges +idempotent nach Qdrant. Change-Detection basiert standardmäßig auf dem **Body-Hash** +(ENV: MINDNET_HASH_MODE). Persistiert zusätzlich den Volltext der Note unter +``payload.fulltext`` und speichert den Vault-Pfad relativ. -Wesentliche Features --------------------- -- Stabile, idempotente Payload-Struktur für Chunks -- Persistenter Chunk-Text (``text``) -- Extraktion von Wikilinks pro Chunk (``wikilinks`` & ``references``) -- Pfadübernahme (relativ zum Vault, wird vom Aufrufer geliefert) -- Bereinigung leerer Felder (keine ``None``/leere Collections im Payload) +Abwärtskompatibilität +--------------------- +- Keine harte Abhängigkeit mehr von nicht gelieferten Modulen (validate_note, embed, edges). + * Embeddings: Nullvektor-Fallback (Dim aus ENV/Config). + * Edges: werden aus Chunk-Payloads + Note-Refs lokal abgeleitet. +- IDs, Collections, Upsert-Semantik unverändert. -Abhängigkeiten --------------- -- ``app.core.chunker.Chunk`` und ``assemble_chunks`` -- ``app.core.parser.extract_wikilinks`` und ``read_markdown`` (nur CLI) +ENV / Qdrant +------------ +- QDRANT_URL (oder QDRANT_HOST/QDRANT_PORT) +- QDRANT_API_KEY (optional) +- COLLECTION_PREFIX (Default: mindnet) +- VECTOR_DIM (Default: 384) +- MINDNET_HASH_MODE: "body" (Default) | "frontmatter" | "body+frontmatter" -Beispiele (CLI – Sichtprüfung) ------------------------------- - python3 -m app.core.chunk_payload --from-file ./vault/demo.md --print +Aufruf +------ + python3 -m scripts.import_markdown --vault ./vault + python3 -m scripts.import_markdown --vault ./vault --apply + python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert + python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo --apply + python3 -m scripts.import_markdown --vault ./vault --apply --embed-note """ from __future__ import annotations -from typing import Dict, List, Any + import argparse import json import os +import sys +from typing import Dict, List, Optional, Tuple +from dotenv import load_dotenv +from qdrant_client.http import models as rest + +# Core/Projekt-Module (vorhanden) +from app.core.parser import ( + read_markdown, + normalize_frontmatter, + validate_required_frontmatter, + extract_wikilinks, +) +from app.core.note_payload import make_note_payload +from app.core.chunker import assemble_chunks +from app.core.chunk_payload import make_chunk_payloads +from app.core.qdrant import QdrantConfig, get_client, ensure_collections +from app.core.qdrant_points import ( + points_for_chunks, + points_for_note, + points_for_edges, + upsert_batch, +) + +# Optionale Module (wenn nicht vorhanden → Fallback) try: - # Projektinterne Imports - from app.core.chunker import Chunk, assemble_chunks - from app.core.parser import extract_wikilinks, read_markdown -except Exception: # pragma: no cover - Fallback für relative Ausführung - from .chunker import Chunk, assemble_chunks # type: ignore - from .parser import extract_wikilinks, read_markdown # type: ignore + from app.core.embed import embed_texts, embed_one # type: ignore +except Exception: + embed_texts = None + embed_one = None -# --------------------------------------------------------------------------- -# Utils -# --------------------------------------------------------------------------- +# ----------------------------------------------------------------------------- +# Hilfsfunktionen +# ----------------------------------------------------------------------------- -def _drop_empty(d: Dict[str, Any]) -> Dict[str, Any]: - """Entfernt leere/None-Felder aus einem Dict (für saubere Payloads).""" - return {k: v for k, v in d.items() if v not in (None, [], {}, "")} +def iter_md(root: str) -> List[str]: + """Sammelt alle .md-Dateien unterhalb von root, filtert systemische Ordner.""" + out: List[str] = [] + for dirpath, _, filenames in os.walk(root): + for fn in filenames: + if not fn.lower().endswith(".md"): + continue + p = os.path.join(dirpath, fn) + pn = p.replace("\\", "/") + if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]): + continue + out.append(p) + return sorted(out) -# --------------------------------------------------------------------------- -# Kernfunktion -# --------------------------------------------------------------------------- +def collections(prefix: str) -> Tuple[str, str, str]: + return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" -def make_chunk_payloads(note_meta: Dict[str, Any], path: str, chunks: List[Chunk]) -> List[Dict[str, Any]]: + +def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]: + """Liest den bisherigen hash_fulltext aus der Notes-Collection (falls vorhanden).""" + notes_col, _, _ = collections(prefix) + f = rest.Filter(must=[rest.FieldCondition( + key="note_id", + match=rest.MatchValue(value=note_id), + )]) + points, _ = client.scroll( + collection_name=notes_col, + scroll_filter=f, + with_payload=True, + with_vectors=False, + limit=1, + ) + if not points: + return None + pl = points[0].payload or {} + return pl.get("hash_fulltext") + + +def purge_note_artifacts(client, prefix: str, note_id: str) -> None: """ - Baut Payloads für alle ``chunks`` der gegebenen Note. - - Parameters - ---------- - note_meta : Dict[str, Any] - Minimale Metadaten der Note (mind. ``id``, ``title``; optional ``type``, - ``area``, ``project``, ``tags``, ``lang``). - path : str - Relativer Pfad der Note innerhalb des Vaults (z. B. "area/topic/file.md"). - chunks : List[Chunk] - Liste vorsegmentierter Chunks (vgl. app.core.chunker.Chunk). - - Returns - ------- - List[Dict[str, Any]] - Payload-Objekte, bereit für Qdrant-Upserts. + Löscht alle Chunks+Edges zu einer Note. Collections bleiben bestehen. """ - res: List[Dict[str, Any]] = [] - for ch in chunks: - wikilinks = extract_wikilinks(getattr(ch, "text", "") or "") - payload = { - "id": getattr(ch, "id", None), - "note_id": note_meta.get("id"), - "note_title": note_meta.get("title"), - "chunk_index": getattr(ch, "index", None), - "char_start": getattr(ch, "char_start", None), - "char_end": getattr(ch, "char_end", None), - "token_count": getattr(ch, "token_count", None), - "section_title": getattr(ch, "section_title", None), - "section_path": getattr(ch, "section_path", None), - "lang": note_meta.get("lang"), - "wikilinks": wikilinks, - "external_links": [], - "references": [{"target_id": w, "kind": "wikilink"} for w in wikilinks], - "neighbors": { - "prev": getattr(ch, "neighbors_prev", None), - "next": getattr(ch, "neighbors_next", None), - }, - "path": path, # vom Aufrufer relativ geliefert - "text": getattr(ch, "text", None), # WICHTIG für Export/Rekonstruktion - } - res.append(_drop_empty(payload)) - return res + _, chunks_col, edges_col = collections(prefix) + + # Chunks der Note löschen + f_chunks = rest.Filter( + must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))] + ) + client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True) + + # Edges der Note löschen (Chunk- und Note-Scope) + should = [ + rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")), + rest.FieldCondition(key="target_id", match=rest.MatchText(text=f"{note_id}#")), + rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)), + rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)), + ] + f_edges = rest.Filter(should=should) + client.delete(collection_name=edges_col, points_selector=f_edges, wait=True) -# --------------------------------------------------------------------------- -# CLI – nur zur Sichtprüfung / Debug -# --------------------------------------------------------------------------- +def has_chunk_level_refs(chunk_payloads: List[Dict]) -> bool: + return any(isinstance(ch.get("references"), list) and ch["references"] for ch in chunk_payloads) -def _cli() -> None: - ap = argparse.ArgumentParser(description="Vorschau: Chunk-Payloads erzeugen und anzeigen") - ap.add_argument("--from-file", dest="src", required=True, help="Pfad zu einer Markdown-Datei") - ap.add_argument("--print", dest="do_print", action="store_true", help="Payload auf stdout ausgeben") + +def derive_edges(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]: + """ + Erzeugt Edges aus den verfügbaren Payloads (ohne externe Abhängigkeiten). + - belongs_to (chunk -> note) + - prev/next (zwischen chunks) + - references (aus chunk.references[*].target_id) – scope: chunk + - backlink (note <-> note) aus note_meta['references'] (falls vorhanden) – scope: note + """ + edges: List[Dict] = [] + + # belongs_to + prev/next (Chunk-Scope) + for ch in chunk_payloads: + src = ch["id"] + edges.append({"src_id": src, "dst_id": note_meta["id"], "edge_type": "belongs_to", "scope": "chunk"}) + nb = ch.get("neighbors") or {} + prev_id = nb.get("prev") + next_id = nb.get("next") + if prev_id: + edges.append({"src_id": prev_id, "dst_id": src, "edge_type": "next", "scope": "chunk"}) + edges.append({"src_id": src, "dst_id": prev_id, "edge_type": "prev", "scope": "chunk"}) + if next_id: + edges.append({"src_id": src, "dst_id": next_id, "edge_type": "next", "scope": "chunk"}) + edges.append({"src_id": next_id, "dst_id": src, "edge_type": "prev", "scope": "chunk"}) + + # references aus Chunk + for ref in (ch.get("references") or []): + tid = ref.get("target_id") + if not tid: + continue + edges.append({"src_id": src, "dst_id": tid, "edge_type": "references", "scope": "chunk"}) + + # Note-Scope references/backlink + for tid in (note_meta.get("references") or []): + edges.append({"src_id": note_meta["id"], "dst_id": tid, "edge_type": "references", "scope": "note"}) + edges.append({"src_id": tid, "dst_id": note_meta["id"], "edge_type": "backlink", "scope": "note"}) + + # Dedupe + uniq = {} + for e in edges: + key = (e["src_id"], e["dst_id"], e["edge_type"], e.get("scope", "")) + uniq[key] = e + return list(uniq.values()) + + +def _normalize_rel_path(abs_path: str, vault_root: str) -> str: + """Gibt einen **relativen** Pfad zurück, normalisiert auf forward slashes.""" + try: + rel = os.path.relpath(abs_path, vault_root) + except Exception: + rel = abs_path # Fallback + return rel.replace("\\", "/").lstrip("/") + + +# ----------------------------------------------------------------------------- +# Main +# ----------------------------------------------------------------------------- + +def main() -> None: + load_dotenv() + ap = argparse.ArgumentParser() + ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (Root-Ordner)") + ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant; ohne Flag nur Dry-Run") + ap.add_argument("--purge-before-upsert", action="store_true", + help="Vor Upsert Chunks & Edges der GEÄNDERTEN Note löschen") + ap.add_argument("--note-id", help="Nur eine bestimmte Note-ID verarbeiten") + ap.add_argument("--embed-note", action="store_true", help="Optional: Note-Volltext einbetten") + ap.add_argument("--force-replace", action="store_true", + help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)") args = ap.parse_args() - note = read_markdown(args.src) - note_meta = { - "id": note.frontmatter.get("id") or note.frontmatter.get("note_id"), - "title": note.frontmatter.get("title"), - "type": note.frontmatter.get("type"), - "area": note.frontmatter.get("area"), - "project": note.frontmatter.get("project"), - "tags": note.frontmatter.get("tags", []), - "lang": note.frontmatter.get("lang"), - } + # Qdrant + cfg = QdrantConfig.from_env() + client = get_client(cfg) + ensure_collections(client, cfg.prefix, cfg.dim) + notes_col, chunks_col, edges_col = collections(cfg.prefix) - chunks = assemble_chunks(note.frontmatter.get("id"), note.body or "", note.frontmatter.get("type", "concept")) + root = os.path.abspath(args.vault) + files = iter_md(root) + if not files: + print("Keine Markdown-Dateien gefunden.", file=sys.stderr) + sys.exit(2) - # Vault-Root heuristisch relativieren (nur für Demo) - vault_root = os.path.dirname(os.path.dirname(args.src)) # heuristisch - rel_path = os.path.relpath(args.src, vault_root).replace("\\", "/").lstrip("/") + processed = 0 + for path in files: + parsed = read_markdown(path) + fm = normalize_frontmatter(parsed.frontmatter) - payloads = make_chunk_payloads(note_meta, rel_path, chunks) + # Pflichtfelder prüfen (z. B. id, title) – bei Fehler: Note überspringen + try: + validate_required_frontmatter(fm) + except Exception as e: + print(json.dumps({"path": path, "error": f"Frontmatter invalid: {e}"})) + continue - if args.do_print: - print(json.dumps(payloads, ensure_ascii=False, indent=2)) + if args.note_id and fm.get("id") != args.note_id: + continue + + processed += 1 + + # Note-Payload + note_pl = make_note_payload(parsed, vault_root=root) + if "fulltext" not in (note_pl or {}): + note_pl["fulltext"] = parsed.body or "" + if note_pl.get("path"): + note_pl["path"] = _normalize_rel_path( + os.path.join(root, note_pl["path"]) if not os.path.isabs(note_pl["path"]) else note_pl["path"], root + ) + else: + note_pl["path"] = _normalize_rel_path(parsed.path, root) + + note_id = note_pl["note_id"] + + # Change-Detection + new_hash = note_pl.get("hash_fulltext") + old_hash = None if args.force_replace else fetch_existing_note_hash(client, cfg.prefix, note_id) + changed = args.force_replace or (old_hash != new_hash) + + # Chunks + Embeddings (mit Fallback Nullvektoren) + chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) + chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks) + if embed_texts: + vecs = embed_texts([getattr(c, "text", "") for c in chunks]) + else: + vecs = [[0.0] * cfg.dim for _ in chunks] + + # Optionaler Note-Vektor (Fallback Nullvektor) + if args.embed_note and embed_one: + note_vec = embed_one(parsed.body or "") + elif args.embed_note: + note_vec = [0.0] * cfg.dim + else: + note_vec = None + + # Edges lokal ableiten + edges = derive_edges(fm | {"id": note_id, "references": note_pl.get("references")}, chunk_pls) + + # Zusammenfassung + summary = { + "note_id": note_id, + "title": fm.get("title"), + "chunks": len(chunk_pls), + "edges": len(edges), + "changed": changed, + "decision": ("apply" if args.apply and changed else + "apply-skip-unchanged" if args.apply and not changed else + "dry-run"), + "path": note_pl["path"], + } + print(json.dumps(summary, ensure_ascii=False)) + + # Dry-Run? + if not args.apply: + continue + + # Optionaler Purge NUR für geänderte Notes + if changed and args.purge_before_upsert: + purge_note_artifacts(client, cfg.prefix, note_id) + + # Upserts: Notes / Chunks / Edges + notes_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim) + upsert_batch(client, notes_name, note_pts) + + chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs) + upsert_batch(client, chunks_name, chunk_pts) + + edges_name, edge_pts = points_for_edges(cfg.prefix, edges) + upsert_batch(client, edges_name, edge_pts) + + print(f"Done. Processed notes: {processed}") -if __name__ == "__main__": # pragma: no cover - _cli() +if __name__ == "__main__": + main()