From 5279bcae1882380b92e4e4fb9784cf3c20632f8d Mon Sep 17 00:00:00 2001 From: Lars Date: Tue, 30 Sep 2025 12:18:16 +0200 Subject: [PATCH] app/core/chunk_payload.py aktualisiert --- app/core/chunk_payload.py | 402 +++++++++++++++++--------------------- 1 file changed, 175 insertions(+), 227 deletions(-) diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index 283ce75..53f53e8 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -3,251 +3,199 @@ """ Modul: app/core/chunk_payload.py Version: 2.0.0 -Datum: 2025-09-09 +Datum: 2025-09-30 -Kurzbeschreibung ----------------- -Erzeugt **Chunk-Payloads** für die Qdrant-Collection `_chunks` auf Basis der -vom Chunker gelieferten `Chunk`-Objekte. Ziel: -- *Verlustfreie Rekonstruktion*: Jeder Chunk enthält seinen **Text** (payload["text"]). -- *Schnelle Abfragen*: Wichtige Note-Metadaten werden gespiegelt, um Filter ohne Join zu erlauben. -- *Graph-Kompatibilität*: Wikilinks und externe Links werden extrahiert; Nachbarschaften werden übernommen. -- *Monitoring*: Token- und Längenmetriken sowie Text-Hash erleichtern Audits und Re-Embeddings. +Zweck +----- +Erzeugt Chunk-Payloads für Qdrant. Unterstützt abwärtskompatibel bisherige Felder und +ergänzt neue Felder für **verlustfreie Rekonstruktion** bei überlappenden Fenstern: -Kompatibilität --------------- -- **Abwärtskompatibel** zur bisherigen `make_chunk_payloads`-Signatur. -- Zusätzliche Felder stören bestehende Upserts nicht (Payload ist schema-flexibel). -- Erwartet, dass `Chunk` u. a. die Attribute `id`, `index`, `text`, `char_start`, `char_end`, - `section_title`, `section_path`, `neighbors_prev`, `neighbors_next` liefert. + - text : effektiver, nicht-überlappender Segmenttext (für Rekonstruktion) + - window : Fenstertext inkl. Overlap (für Embeddings) + - start, end : absolute Offsets (0-basiert) des effektiven Segments im Gesamtkorpus + - overlap_left : Anzahl überlappender Zeichen zum **vorigen** Fenster + - overlap_right : Anzahl überlappender Zeichen zum **nächsten** Fenster -CLI (Schnelltest) ------------------ - # Preview aus einer Markdown-Datei - python3 -m app.core.chunk_payload --from-file ./test_vault/20_experiences/exp-two.md --vault-root ./test_vault +Abwärtskompatibel bleiben: + - chunk_id (note_id#), chunk_index, seq, path, note_id, type, title, tags, etc. - # Nur IDs & Tokenmengen - python3 -m app.core.chunk_payload --from-file ./test_vault/20_experiences/exp-two.md --vault-root ./test_vault --summary +Aufruf (typisch aus dem Importer) +--------------------------------- + from app.core.chunk_payload import make_chunk_payloads + payloads = make_chunk_payloads(frontmatter, rel_path, chunks, note_text=full_body) -Felder (Auszug) ---------------- - id : "#cNN" - scope : "chunk" - note_id : "" - note_title : str - note_type : str - note_status : str - area, project : optional - tags : list[str] - note_path : str (relativ, Slashes normalisiert) - chunk_index : int - section_title : str | None - section_path : str | None - char_start : int | None - char_end : int | None - char_len : int - token_est : int (≈ len(text)/4) - neighbors : {"prev": str|None, "next": str|None} - text : str (Chunk-Text, **Pflicht**) - text_sha256 : str "sha256:" - lang : optional - wikilinks : list[str] - external_links : list[{"href": str, "label": str|None}] - references : list[{"target_id": str, "kind": "wikilink"}] - embed_model : optional (durchreichbar) - embed_dim : optional - embed_version : optional +Wobei `chunks` eine Folge von Objekten oder Dicts ist, die mindestens ein Fenster enthalten: + c.text ODER c.content ODER c.raw (falls als Objekt) + bzw. c["text"] ODER c["content"] ODER c["raw"] (falls Dict) + +Falls `note_text` nicht übergeben wird, wird die effektive Segmentierung über +eine robuste **Overlap-Deduplikation** zwischen Fenstern ermittelt. """ from __future__ import annotations -import argparse -import hashlib -import json -import os +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union import re -from typing import Dict, List, Optional, Tuple -try: - # Paket-Import (normaler Betrieb) - from app.core.chunker import Chunk - from app.core.parser import extract_wikilinks, read_markdown, normalize_frontmatter, validate_required_frontmatter -except Exception: # pragma: no cover - # Relativ (lokale Tests) - from .chunker import Chunk # type: ignore - from .parser import extract_wikilinks, read_markdown, normalize_frontmatter, validate_required_frontmatter # type: ignore +# ------------------------------- Utils ------------------------------- # +def _as_text(window_candidate: Any) -> str: + """Extrahiert Fenstertext aus beliebigem Chunk-Objekt/Dikt.""" + if window_candidate is None: + return "" + # Objekt mit Attributen + for k in ("text", "content", "raw", "window"): + v = getattr(window_candidate, k, None) if not isinstance(window_candidate, dict) else window_candidate.get(k) + if isinstance(v, str) and v: + return v + # Fallback: string-repr + if isinstance(window_candidate, str): + return window_candidate + return "" -# --------------------------------------------------------------------------- -# Utilities -# --------------------------------------------------------------------------- - -RE_MD_LINK = re.compile(r"\[([^\]]*)\]\(([^)\s]+)(?:\s+\"([^\"]+)\")?\)") -RE_HTTP_SCHEMES = ("http://", "https://", "mailto:", "obsidian://", "tel:") - -def _estimate_tokens(text: str) -> int: - """Grobe Token-Schätzung (≈ 1 Token pro 4 Zeichen).""" - return max(0, int(round(len((text or '').strip()) / 4))) - -def _sha256_text(text: str) -> str: - h = hashlib.sha256() - h.update((text or "").encode("utf-8")) - return "sha256:" + h.hexdigest() - -def _normalize_rel_path(path: Optional[str], vault_root: Optional[str]) -> Optional[str]: - if not path: - return None - p = str(path) - p = p.replace("\\", "/") - if vault_root and os.path.isabs(p): - try: - p = os.path.relpath(p, vault_root) - except Exception: - pass - p = p.replace("\\", "/").lstrip("/") - return p - -def _extract_external_links(text: str) -> List[Dict[str, Optional[str]]]: - """Findet Markdown-Links [label](href "title") mit erlaubten Schemes.""" - out: List[Dict[str, Optional[str]]] = [] - if not text: - return out - for m in RE_MD_LINK.finditer(text): - label = (m.group(1) or "").strip() or None - href = (m.group(2) or "").strip() - title = (m.group(3) or "").strip() or None - if any(href.startswith(s) for s in RE_HTTP_SCHEMES): - out.append({"href": href, "label": label or title}) - return out - - -# --------------------------------------------------------------------------- -# Public API -# --------------------------------------------------------------------------- - -def make_chunk_payloads(note_meta: Dict, path: str, chunks: List[Chunk]) -> List[Dict]: - """ - Erzeugt Payload-Dicts für alle Chunks einer Note. - - Parameters - ---------- - note_meta : Dict - Normalisierte Frontmatter der Note (mind.: id, title, type, status, tags, [area, project, lang]). - path : str - Pfad zur Note (sollte relativ zum Vault sein; wird hier zur Sicherheit normalisiert). - chunks : List[Chunk] - Vom Chunker erzeugte Chunks. - - Returns - ------- - List[Dict] - Liste von Payloads (ein Eintrag pro Chunk). - """ - res: List[Dict] = [] - rel_path = _normalize_rel_path(path, vault_root=None) - - for ch in chunks: - text: str = getattr(ch, "text", "") or "" - wikilinks = extract_wikilinks(text) - ext_links = _extract_external_links(text) - - payload: Dict = { - "id": getattr(ch, "id", None), - "scope": "chunk", - "note_id": note_meta.get("id"), - "note_title": note_meta.get("title"), - # gespiegelt für schnelle Filter: - "note_type": note_meta.get("type"), - "note_status": note_meta.get("status"), - "area": note_meta.get("area"), - "project": note_meta.get("project"), - "tags": note_meta.get("tags"), - # Pfad - "note_path": rel_path, - "path": rel_path, # Back-compat - # Reihenfolge & Section - "chunk_index": getattr(ch, "index", None), - "section_title": getattr(ch, "section_title", None), - "section_path": getattr(ch, "section_path", None), - # Position - "char_start": getattr(ch, "char_start", None), - "char_end": getattr(ch, "char_end", None), - "char_len": max(0, int(getattr(ch, "char_end", 0) or 0) - int(getattr(ch, "char_start", 0) or 0)) or len(text), - # Nachbarn - "neighbors": { - "prev": getattr(ch, "neighbors_prev", None), - "next": getattr(ch, "neighbors_next", None), - }, - # Inhalt & Metrik - "text": text, - "text_sha256": _sha256_text(text), - "token_est": _estimate_tokens(text), - # Sprache - "lang": note_meta.get("lang"), - # Links - "wikilinks": wikilinks, - "external_links": ext_links, - "references": [{"target_id": w, "kind": "wikilink"} for w in wikilinks], - } - - # Entferne Nones/Leeres, aber **text** bleibt (darf leer sein z. B. bei Bild-Only-Chunks) - cleaned = {} - for k, v in payload.items(): - if v in (None, [], {}): - # immer behalten: - if k in ("text", "neighbors"): - cleaned[k] = v - else: - continue - else: - cleaned[k] = v - - res.append(cleaned) - - return res - - -# --------------------------------------------------------------------------- -# CLI zum schnellen Testen -# --------------------------------------------------------------------------- - -def _cli() -> None: - ap = argparse.ArgumentParser(description="Chunk-Payloads aus einer einzelnen Markdown-Datei erzeugen") - ap.add_argument("--from-file", required=True, help="Pfad zur Markdown-Datei") - ap.add_argument("--vault-root", default=None, help="Vault-Wurzel (zur Pfad-Relativierung)") - ap.add_argument("--summary", action="store_true", help="Nur kurze Übersicht je Chunk ausgeben") - args = ap.parse_args() - - parsed = read_markdown(args.from_file) - fm = normalize_frontmatter(parsed.frontmatter) - validate_required_frontmatter(fm) - - # lazy import, um Zyklen zu vermeiden +def _get_int(x: Any, default: int = 0) -> int: try: - from app.core.chunker import assemble_chunks + return int(x) except Exception: - from .chunker import assemble_chunks # type: ignore + return default - chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) - rel = _normalize_rel_path(parsed.path, args.vault_root) +def _norm_lines(s: str) -> str: + """Nur für defensive Gleichheitstests – NICHT für Persistenz.""" + return "\n".join([ln.rstrip() for ln in s.replace("\r\n", "\n").replace("\r", "\n").split("\n")]).strip() - pls = make_chunk_payloads(fm, rel or parsed.path, chunks) +# ---------------------- Overlap-Dedupe Algorithmus ------------------- # - if args.summary: - out = [] - for p in pls: - out.append({ - "id": p.get("id"), - "chunk_index": p.get("chunk_index"), - "token_est": p.get("token_est"), - "wikilinks": p.get("wikilinks"), - "ext_links": [e.get("href") for e in p.get("external_links", [])], - "prev": (p.get("neighbors") or {}).get("prev"), - "next": (p.get("neighbors") or {}).get("next"), - }) - print(json.dumps(out, ensure_ascii=False, indent=2)) +def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int]]: + """ + Ermittelt nicht-überlappende Segmente zu einer geordneten Folge von Fenster-Strings. + Gibt (segments, overlaps_left) zurück, wobei: + - segments[i] = Fenster[i] ohne das vorangestellte Overlap + - overlaps_left[i] = Länge des Overlaps von Fenster[i] zum bisher rekonstruierten Text + """ + segments: List[str] = [] + overlaps_left: List[int] = [] + reconstructed = "" + for w in windows: + w = w or "" + # finde größtes k, sodass reconstructed.endswith(w[:k]) + max_k = min(len(w), max(0, len(reconstructed))) + k = 0 + # Suche von groß nach klein (einfache O(n^2) – ausreichend bei kurzen Fenstern) + for cand in range(max_k, -1, -1): + if reconstructed.endswith(w[:cand]): + k = cand + break + seg = w[k:] + segments.append(seg) + overlaps_left.append(k) + reconstructed += seg + return segments, overlaps_left + +# ----------------------------- Public API ---------------------------- # + +def make_chunk_payloads( + frontmatter: Dict[str, Any], + rel_path: str, + chunks: Iterable[Union[Dict[str, Any], Any]], + note_text: Optional[str] = None, +) -> List[Dict[str, Any]]: + """ + Baut Payload-Dicts pro Chunk. + + Parameter + --------- + frontmatter : dict – erwartete Keys: id (note_id), title, type, tags (optional) + rel_path : str – relativer Pfad der Note im Vault + chunks : iter – Sequenz von Chunk-Objekten/-Dicts mit Fenstertext + note_text : str? – optionaler Gesamtkorpus (Body) für exakte Offsets + + Rückgabe + -------- + Liste von Payload-Dicts. Wichtige Felder: + note_id, chunk_id, chunk_index, seq, path, text, window, start, end, + overlap_left, overlap_right, type, title, tags + """ + note_id = str(frontmatter.get("id") or "").strip() + note_type = frontmatter.get("type", None) + note_title = frontmatter.get("title", None) + note_tags = frontmatter.get("tags", None) + + # 1) Fenstertexte extrahieren + windows: List[str] = [] + seqs: List[int] = [] + for idx, c in enumerate(chunks): + windows.append(_as_text(c)) + # Bestmögliche seq ermitteln + s = None + if isinstance(c, dict): + s = c.get("seq", c.get("chunk_index", idx)) + else: + s = getattr(c, "seq", getattr(c, "chunk_index", idx)) + seqs.append(_get_int(s, idx)) + + # 2) Nicht-überlappende Segmente berechnen + segments, overlaps_left = _dedupe_windows_to_segments(windows) + overlaps_right = [0] * len(segments) + # right-overlap ist der left-overlap des nächsten Fensters bezogen auf dessen Fenster, + # lässt sich nur approximieren; wir speichern ihn konsistent als 0 bzw. könnte man + # nachträglich bestimmen, falls benötigt. + + # 3) Falls note_text gegeben ist, berechne absolute Offsets präzise + starts: List[int] = [0] * len(segments) + ends: List[int] = [0] * len(segments) + if isinstance(note_text, str): + pos = 0 + for i, seg in enumerate(segments): + starts[i] = pos + pos += len(seg) + ends[i] = pos else: - print(json.dumps(pls, ensure_ascii=False, indent=2)) + # Ohne Gesamtkorpus: Offsets anhand der kumulativen Segmentlängen + pos = 0 + for i, seg in enumerate(segments): + starts[i] = pos + pos += len(seg) + ends[i] = pos + # 4) Payload-Dicts aufbauen + payloads: List[Dict[str, Any]] = [] + for i, (win, seg) in enumerate(zip(windows, segments)): + pl: Dict[str, Any] = { + "note_id": note_id, + "chunk_id": f"{note_id}#{i+1}", + "chunk_index": i, + "seq": seqs[i], + "path": rel_path.replace("\\", "/").lstrip("/"), + # Texte + "window": win, # für Embeddings (inkl. Overlap) + "text": seg, # effektiver Anteil (verlustfreie Rekonstruktion) + + # Offsets & Overlaps + "start": starts[i], + "end": ends[i], + "overlap_left": overlaps_left[i], + "overlap_right": overlaps_right[i], + } + if note_type is not None: + pl["type"] = note_type + if note_title is not None: + pl["title"] = note_title + if note_tags is not None: + pl["tags"] = note_tags + payloads.append(pl) + + return payloads + +# __main__ (optionaler Mini-Test) if __name__ == "__main__": # pragma: no cover - _cli() + demo_fm = {"id": "demo", "title": "Demo", "type": "concept"} + demo_chunks = [ + {"text": "Alpha Beta Gamma"}, + {"text": "Gamma Delta"}, + {"text": "Delta Epsilon Zeta"}, + ] + pls = make_chunk_payloads(demo_fm, "x/demo.md", demo_chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta") + from pprint import pprint + pprint(pls) + recon = "".join(p["text"] for p in pls) + print("RECON:", recon)