From 77fa57a2a63dd5eba1bf68c62ac261bd77ef22a9 Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 9 Sep 2025 11:19:06 +0200 Subject: [PATCH] app/core/chunk_payload.py aktualisiert --- app/core/chunk_payload.py | 399 ++++++++++++-------------------------- 1 file changed, 120 insertions(+), 279 deletions(-) diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index a619d3b..85a335d 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -1,318 +1,159 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Script: import_markdown.py — Markdown → Qdrant (Notes, Chunks, Edges) -Version: 3.1.1 +Modul: app/core/chunk_payload.py +Version: 1.1.3 Datum: 2025-09-09 Kurzbeschreibung ---------------- -Liest Markdown-Dateien aus einem Vault ein und schreibt Notes, Chunks und Edges -idempotent nach Qdrant. Change-Detection basiert standardmäßig auf dem **Body-Hash** -(ENV: MINDNET_HASH_MODE). Persistiert zusätzlich den Volltext der Note unter -``payload.fulltext`` und speichert den Vault-Pfad relativ. +Erzeugt Qdrant-Payloads für Text-Chunks einer Note. Jeder Chunk enthält +den tatsächlichen Text unter dem Schlüssel ``text``. Dadurch kann der +Exporter den vollständigen Body verlässlich aus Chunks rekonstruieren, +falls ``notes.payload.fulltext`` fehlt. -Abwärtskompatibilität ---------------------- -- Keine harte Abhängigkeit mehr von nicht gelieferten Modulen (validate_note, embed, edges). - * Embeddings: Nullvektor-Fallback (Dim aus ENV/Config). - * Edges: werden aus Chunk-Payloads + Note-Refs lokal abgeleitet. -- IDs, Collections, Upsert-Semantik unverändert. +Wesentliche Features +-------------------- +- Stabile, idempotente Payload-Struktur für Chunks +- Persistenter Chunk-Text (``text``) +- Extraktion von Wikilinks pro Chunk (``wikilinks`` & ``references``) +- Pfadübernahme (relativ zum Vault, wird vom Aufrufer geliefert) +- Bereinigung leerer Felder (keine ``None``/leere Collections im Payload) -ENV / Qdrant ------------- -- QDRANT_URL (oder QDRANT_HOST/QDRANT_PORT) -- QDRANT_API_KEY (optional) -- COLLECTION_PREFIX (Default: mindnet) -- VECTOR_DIM (Default: 384) -- MINDNET_HASH_MODE: "body" (Default) | "frontmatter" | "body+frontmatter" +Abhängigkeiten +-------------- +- ``app.core.chunker.Chunk`` und ``assemble_chunks`` (nur für CLI-Demo) +- ``app.core.parser.extract_wikilinks`` und ``read_markdown`` (nur CLI-Demo) -Aufruf ------- - python3 -m scripts.import_markdown --vault ./vault - python3 -m scripts.import_markdown --vault ./vault --apply - python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert - python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo --apply - python3 -m scripts.import_markdown --vault ./vault --apply --embed-note +Beispiele (CLI – Sichtprüfung) +------------------------------ + python3 -m app.core.chunk_payload --from-file ./vault/demo.md --print """ from __future__ import annotations - +from typing import Dict, List, Any import argparse import json import os -import sys -from typing import Dict, List, Optional, Tuple -from dotenv import load_dotenv -from qdrant_client.http import models as rest - -# Core/Projekt-Module (vorhanden) -from app.core.parser import ( - read_markdown, - normalize_frontmatter, - validate_required_frontmatter, - extract_wikilinks, -) -from app.core.note_payload import make_note_payload -from app.core.chunker import assemble_chunks -from app.core.chunk_payload import make_chunk_payloads -from app.core.qdrant import QdrantConfig, get_client, ensure_collections -from app.core.qdrant_points import ( - points_for_chunks, - points_for_note, - points_for_edges, - upsert_batch, -) - -# Optionale Module (wenn nicht vorhanden → Fallback) +# WICHTIG: Keine direkten Selbst- oder Kreuz-Imports, die Zyklen erzeugen! +# Für die Kernfunktion reicht extract_wikilinks als Typ-unabhängige Hilfsfunktion. try: - from app.core.embed import embed_texts, embed_one # type: ignore -except Exception: - embed_texts = None - embed_one = None + from app.core.parser import extract_wikilinks # nur Funktion, erzeugt keine Zyklen +except Exception: # pragma: no cover + from .parser import extract_wikilinks # type: ignore + +# Typ-Hinweis für Chunk (zur Laufzeit nicht notwendig) +try: + from app.core.chunker import Chunk # nur Typ-Annotation / Nutzung im CLI +except Exception: # pragma: no cover + class Chunk: # type: ignore + pass -# ----------------------------------------------------------------------------- -# Hilfsfunktionen -# ----------------------------------------------------------------------------- +# --------------------------------------------------------------------------- +# Utils +# --------------------------------------------------------------------------- -def iter_md(root: str) -> List[str]: - """Sammelt alle .md-Dateien unterhalb von root, filtert systemische Ordner.""" - out: List[str] = [] - for dirpath, _, filenames in os.walk(root): - for fn in filenames: - if not fn.lower().endswith(".md"): - continue - p = os.path.join(dirpath, fn) - pn = p.replace("\\", "/") - if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]): - continue - out.append(p) - return sorted(out) +def _drop_empty(d: Dict[str, Any]) -> Dict[str, Any]: + """Entfernt leere/None-Felder aus einem Dict (für saubere Payloads).""" + return {k: v for k, v in d.items() if v not in (None, [], {}, "")} -def collections(prefix: str) -> Tuple[str, str, str]: - return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" +# --------------------------------------------------------------------------- +# Kernfunktion +# --------------------------------------------------------------------------- - -def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]: - """Liest den bisherigen hash_fulltext aus der Notes-Collection (falls vorhanden).""" - notes_col, _, _ = collections(prefix) - f = rest.Filter(must=[rest.FieldCondition( - key="note_id", - match=rest.MatchValue(value=note_id), - )]) - points, _ = client.scroll( - collection_name=notes_col, - scroll_filter=f, - with_payload=True, - with_vectors=False, - limit=1, - ) - if not points: - return None - pl = points[0].payload or {} - return pl.get("hash_fulltext") - - -def purge_note_artifacts(client, prefix: str, note_id: str) -> None: +def make_chunk_payloads(note_meta: Dict[str, Any], path: str, chunks: List[Chunk]) -> List[Dict[str, Any]]: """ - Löscht alle Chunks+Edges zu einer Note. Collections bleiben bestehen. + Baut Payloads für alle ``chunks`` der gegebenen Note. + + Parameters + ---------- + note_meta : Dict[str, Any] + Minimale Metadaten der Note (mind. ``id``, ``title``; optional ``type``, + ``area``, ``project``, ``tags``, ``lang``). + path : str + Relativer Pfad der Note innerhalb des Vaults (z. B. "area/topic/file.md"). + chunks : List[Chunk] + Liste vorsegmentierter Chunks (vgl. app.core.chunker.Chunk). + + Returns + ------- + List[Dict[str, Any]] + Payload-Objekte, bereit für Qdrant-Upserts. """ - _, chunks_col, edges_col = collections(prefix) - - # Chunks der Note löschen - f_chunks = rest.Filter( - must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))] - ) - client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True) - - # Edges der Note löschen (Chunk- und Note-Scope) - should = [ - rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")), - rest.FieldCondition(key="target_id", match=rest.MatchText(text=f"{note_id}#")), - rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)), - rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)), - ] - f_edges = rest.Filter(should=should) - client.delete(collection_name=edges_col, points_selector=f_edges, wait=True) + res: List[Dict[str, Any]] = [] + for ch in chunks: + # 'text' kann bei leerem Abschnitt None/'' sein → robust behandeln + text_val = getattr(ch, "text", "") or "" + wikilinks = extract_wikilinks(text_val) + payload = { + "id": getattr(ch, "id", None), + "note_id": note_meta.get("id"), + "note_title": note_meta.get("title"), + "chunk_index": getattr(ch, "index", None), + "char_start": getattr(ch, "char_start", None), + "char_end": getattr(ch, "char_end", None), + "token_count": getattr(ch, "token_count", None), + "section_title": getattr(ch, "section_title", None), + "section_path": getattr(ch, "section_path", None), + "lang": note_meta.get("lang"), + "wikilinks": wikilinks, + "external_links": [], + "references": [{"target_id": w, "kind": "wikilink"} for w in wikilinks], + "neighbors": { + "prev": getattr(ch, "neighbors_prev", None), + "next": getattr(ch, "neighbors_next", None), + }, + "path": path, # vom Aufrufer relativ geliefert + "text": text_val, # WICHTIG für Export/Rekonstruktion + } + res.append(_drop_empty(payload)) + return res -def has_chunk_level_refs(chunk_payloads: List[Dict]) -> bool: - return any(isinstance(ch.get("references"), list) and ch["references"] for ch in chunk_payloads) +# --------------------------------------------------------------------------- +# CLI – nur zur Sichtprüfung / Debug +# --------------------------------------------------------------------------- - -def derive_edges(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]: - """ - Erzeugt Edges aus den verfügbaren Payloads (ohne externe Abhängigkeiten). - - belongs_to (chunk -> note) - - prev/next (zwischen chunks) - - references (aus chunk.references[*].target_id) – scope: chunk - - backlink (note <-> note) aus note_meta['references'] (falls vorhanden) – scope: note - """ - edges: List[Dict] = [] - - # belongs_to + prev/next (Chunk-Scope) - for ch in chunk_payloads: - src = ch["id"] - edges.append({"src_id": src, "dst_id": note_meta["id"], "edge_type": "belongs_to", "scope": "chunk"}) - nb = ch.get("neighbors") or {} - prev_id = nb.get("prev") - next_id = nb.get("next") - if prev_id: - edges.append({"src_id": prev_id, "dst_id": src, "edge_type": "next", "scope": "chunk"}) - edges.append({"src_id": src, "dst_id": prev_id, "edge_type": "prev", "scope": "chunk"}) - if next_id: - edges.append({"src_id": src, "dst_id": next_id, "edge_type": "next", "scope": "chunk"}) - edges.append({"src_id": next_id, "dst_id": src, "edge_type": "prev", "scope": "chunk"}) - - # references aus Chunk - for ref in (ch.get("references") or []): - tid = ref.get("target_id") - if not tid: - continue - edges.append({"src_id": src, "dst_id": tid, "edge_type": "references", "scope": "chunk"}) - - # Note-Scope references/backlink - for tid in (note_meta.get("references") or []): - edges.append({"src_id": note_meta["id"], "dst_id": tid, "edge_type": "references", "scope": "note"}) - edges.append({"src_id": tid, "dst_id": note_meta["id"], "edge_type": "backlink", "scope": "note"}) - - # Dedupe - uniq = {} - for e in edges: - key = (e["src_id"], e["dst_id"], e["edge_type"], e.get("scope", "")) - uniq[key] = e - return list(uniq.values()) - - -def _normalize_rel_path(abs_path: str, vault_root: str) -> str: - """Gibt einen **relativen** Pfad zurück, normalisiert auf forward slashes.""" +def _cli() -> None: + # CLI lädt hier *lazy* die schweren Abhängigkeiten, + # damit die Modul-Initialisierung zirkularfrei bleibt. try: - rel = os.path.relpath(abs_path, vault_root) - except Exception: - rel = abs_path # Fallback - return rel.replace("\\", "/").lstrip("/") + from app.core.parser import read_markdown + from app.core.chunker import assemble_chunks + except Exception: # pragma: no cover + from .parser import read_markdown # type: ignore + from .chunker import assemble_chunks # type: ignore - -# ----------------------------------------------------------------------------- -# Main -# ----------------------------------------------------------------------------- - -def main() -> None: - load_dotenv() - ap = argparse.ArgumentParser() - ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (Root-Ordner)") - ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant; ohne Flag nur Dry-Run") - ap.add_argument("--purge-before-upsert", action="store_true", - help="Vor Upsert Chunks & Edges der GEÄNDERTEN Note löschen") - ap.add_argument("--note-id", help="Nur eine bestimmte Note-ID verarbeiten") - ap.add_argument("--embed-note", action="store_true", help="Optional: Note-Volltext einbetten") - ap.add_argument("--force-replace", action="store_true", - help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)") + ap = argparse.ArgumentParser(description="Vorschau: Chunk-Payloads erzeugen und anzeigen") + ap.add_argument("--from-file", dest="src", required=True, help="Pfad zu einer Markdown-Datei") + ap.add_argument("--print", dest="do_print", action="store_true", help="Payload auf stdout ausgeben") args = ap.parse_args() - # Qdrant - cfg = QdrantConfig.from_env() - client = get_client(cfg) - ensure_collections(client, cfg.prefix, cfg.dim) - notes_col, chunks_col, edges_col = collections(cfg.prefix) + note = read_markdown(args.src) + note_meta = { + "id": note.frontmatter.get("id") or note.frontmatter.get("note_id"), + "title": note.frontmatter.get("title"), + "type": note.frontmatter.get("type"), + "area": note.frontmatter.get("area"), + "project": note.frontmatter.get("project"), + "tags": note.frontmatter.get("tags", []), + "lang": note.frontmatter.get("lang"), + } - root = os.path.abspath(args.vault) - files = iter_md(root) - if not files: - print("Keine Markdown-Dateien gefunden.", file=sys.stderr) - sys.exit(2) + chunks = assemble_chunks(note.frontmatter.get("id"), note.body or "", note.frontmatter.get("type", "concept")) - processed = 0 - for path in files: - parsed = read_markdown(path) - fm = normalize_frontmatter(parsed.frontmatter) + # Vault-Root heuristisch relativieren (nur für Demo) + vault_root = os.path.dirname(os.path.dirname(args.src)) # heuristisch + rel_path = os.path.relpath(args.src, vault_root).replace("\\", "/").lstrip("/") - # Pflichtfelder prüfen (z. B. id, title) – bei Fehler: Note überspringen - try: - validate_required_frontmatter(fm) - except Exception as e: - print(json.dumps({"path": path, "error": f"Frontmatter invalid: {e}"})) - continue + payloads = make_chunk_payloads(note_meta, rel_path, chunks) - if args.note_id and fm.get("id") != args.note_id: - continue - - processed += 1 - - # Note-Payload - note_pl = make_note_payload(parsed, vault_root=root) - if "fulltext" not in (note_pl or {}): - note_pl["fulltext"] = parsed.body or "" - if note_pl.get("path"): - note_pl["path"] = _normalize_rel_path( - os.path.join(root, note_pl["path"]) if not os.path.isabs(note_pl["path"]) else note_pl["path"], root - ) - else: - note_pl["path"] = _normalize_rel_path(parsed.path, root) - - note_id = note_pl["note_id"] - - # Change-Detection - new_hash = note_pl.get("hash_fulltext") - old_hash = None if args.force_replace else fetch_existing_note_hash(client, cfg.prefix, note_id) - changed = args.force_replace or (old_hash != new_hash) - - # Chunks + Embeddings (mit Fallback Nullvektoren) - chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) - chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks) - if embed_texts: - vecs = embed_texts([getattr(c, "text", "") for c in chunks]) - else: - vecs = [[0.0] * cfg.dim for _ in chunks] - - # Optionaler Note-Vektor (Fallback Nullvektor) - if args.embed_note and embed_one: - note_vec = embed_one(parsed.body or "") - elif args.embed_note: - note_vec = [0.0] * cfg.dim - else: - note_vec = None - - # Edges lokal ableiten - edges = derive_edges(fm | {"id": note_id, "references": note_pl.get("references")}, chunk_pls) - - # Zusammenfassung - summary = { - "note_id": note_id, - "title": fm.get("title"), - "chunks": len(chunk_pls), - "edges": len(edges), - "changed": changed, - "decision": ("apply" if args.apply and changed else - "apply-skip-unchanged" if args.apply and not changed else - "dry-run"), - "path": note_pl["path"], - } - print(json.dumps(summary, ensure_ascii=False)) - - # Dry-Run? - if not args.apply: - continue - - # Optionaler Purge NUR für geänderte Notes - if changed and args.purge_before_upsert: - purge_note_artifacts(client, cfg.prefix, note_id) - - # Upserts: Notes / Chunks / Edges - notes_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim) - upsert_batch(client, notes_name, note_pts) - - chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs) - upsert_batch(client, chunks_name, chunk_pts) - - edges_name, edge_pts = points_for_edges(cfg.prefix, edges) - upsert_batch(client, edges_name, edge_pts) - - print(f"Done. Processed notes: {processed}") + if args.do_print: + print(json.dumps(payloads, ensure_ascii=False, indent=2)) -if __name__ == "__main__": - main() +if __name__ == "__main__": # pragma: no cover + _cli()