app/core/chunk_payload.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-09-30 12:18:16 +02:00
parent daa58e6b27
commit 5279bcae18

View File

@ -3,251 +3,199 @@
""" """
Modul: app/core/chunk_payload.py Modul: app/core/chunk_payload.py
Version: 2.0.0 Version: 2.0.0
Datum: 2025-09-09 Datum: 2025-09-30
Kurzbeschreibung Zweck
---------------- -----
Erzeugt **Chunk-Payloads** für die Qdrant-Collection `<prefix>_chunks` auf Basis der Erzeugt Chunk-Payloads für Qdrant. Unterstützt abwärtskompatibel bisherige Felder und
vom Chunker gelieferten `Chunk`-Objekte. Ziel: ergänzt neue Felder für **verlustfreie Rekonstruktion** bei überlappenden Fenstern:
- *Verlustfreie Rekonstruktion*: Jeder Chunk enthält seinen **Text** (payload["text"]).
- *Schnelle Abfragen*: Wichtige Note-Metadaten werden gespiegelt, um Filter ohne Join zu erlauben.
- *Graph-Kompatibilität*: Wikilinks und externe Links werden extrahiert; Nachbarschaften werden übernommen.
- *Monitoring*: Token- und Längenmetriken sowie Text-Hash erleichtern Audits und Re-Embeddings.
Kompatibilität - text : effektiver, nicht-überlappender Segmenttext (für Rekonstruktion)
-------------- - window : Fenstertext inkl. Overlap (für Embeddings)
- **Abwärtskompatibel** zur bisherigen `make_chunk_payloads`-Signatur. - start, end : absolute Offsets (0-basiert) des effektiven Segments im Gesamtkorpus
- Zusätzliche Felder stören bestehende Upserts nicht (Payload ist schema-flexibel). - overlap_left : Anzahl überlappender Zeichen zum **vorigen** Fenster
- Erwartet, dass `Chunk` u. a. die Attribute `id`, `index`, `text`, `char_start`, `char_end`, - overlap_right : Anzahl überlappender Zeichen zum **nächsten** Fenster
`section_title`, `section_path`, `neighbors_prev`, `neighbors_next` liefert.
CLI (Schnelltest) Abwärtskompatibel bleiben:
----------------- - chunk_id (note_id#<n>), chunk_index, seq, path, note_id, type, title, tags, etc.
# Preview aus einer Markdown-Datei
python3 -m app.core.chunk_payload --from-file ./test_vault/20_experiences/exp-two.md --vault-root ./test_vault
# Nur IDs & Tokenmengen Aufruf (typisch aus dem Importer)
python3 -m app.core.chunk_payload --from-file ./test_vault/20_experiences/exp-two.md --vault-root ./test_vault --summary ---------------------------------
from app.core.chunk_payload import make_chunk_payloads
payloads = make_chunk_payloads(frontmatter, rel_path, chunks, note_text=full_body)
Felder (Auszug) Wobei `chunks` eine Folge von Objekten oder Dicts ist, die mindestens ein Fenster enthalten:
--------------- c.text ODER c.content ODER c.raw (falls als Objekt)
id : "<note_id>#cNN" bzw. c["text"] ODER c["content"] ODER c["raw"] (falls Dict)
scope : "chunk"
note_id : "<note_id>" Falls `note_text` nicht übergeben wird, wird die effektive Segmentierung über
note_title : str eine robuste **Overlap-Deduplikation** zwischen Fenstern ermittelt.
note_type : str
note_status : str
area, project : optional
tags : list[str]
note_path : str (relativ, Slashes normalisiert)
chunk_index : int
section_title : str | None
section_path : str | None
char_start : int | None
char_end : int | None
char_len : int
token_est : int ( len(text)/4)
neighbors : {"prev": str|None, "next": str|None}
text : str (Chunk-Text, **Pflicht**)
text_sha256 : str "sha256:<hex>"
lang : optional
wikilinks : list[str]
external_links : list[{"href": str, "label": str|None}]
references : list[{"target_id": str, "kind": "wikilink"}]
embed_model : optional (durchreichbar)
embed_dim : optional
embed_version : optional
""" """
from __future__ import annotations from __future__ import annotations
import argparse from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
import hashlib
import json
import os
import re import re
from typing import Dict, List, Optional, Tuple
# ------------------------------- Utils ------------------------------- #
def _as_text(window_candidate: Any) -> str:
"""Extrahiert Fenstertext aus beliebigem Chunk-Objekt/Dikt."""
if window_candidate is None:
return ""
# Objekt mit Attributen
for k in ("text", "content", "raw", "window"):
v = getattr(window_candidate, k, None) if not isinstance(window_candidate, dict) else window_candidate.get(k)
if isinstance(v, str) and v:
return v
# Fallback: string-repr
if isinstance(window_candidate, str):
return window_candidate
return ""
def _get_int(x: Any, default: int = 0) -> int:
try: try:
# Paket-Import (normaler Betrieb) return int(x)
from app.core.chunker import Chunk
from app.core.parser import extract_wikilinks, read_markdown, normalize_frontmatter, validate_required_frontmatter
except Exception: # pragma: no cover
# Relativ (lokale Tests)
from .chunker import Chunk # type: ignore
from .parser import extract_wikilinks, read_markdown, normalize_frontmatter, validate_required_frontmatter # type: ignore
# ---------------------------------------------------------------------------
# Utilities
# ---------------------------------------------------------------------------
RE_MD_LINK = re.compile(r"\[([^\]]*)\]\(([^)\s]+)(?:\s+\"([^\"]+)\")?\)")
RE_HTTP_SCHEMES = ("http://", "https://", "mailto:", "obsidian://", "tel:")
def _estimate_tokens(text: str) -> int:
"""Grobe Token-Schätzung (≈ 1 Token pro 4 Zeichen)."""
return max(0, int(round(len((text or '').strip()) / 4)))
def _sha256_text(text: str) -> str:
h = hashlib.sha256()
h.update((text or "").encode("utf-8"))
return "sha256:" + h.hexdigest()
def _normalize_rel_path(path: Optional[str], vault_root: Optional[str]) -> Optional[str]:
if not path:
return None
p = str(path)
p = p.replace("\\", "/")
if vault_root and os.path.isabs(p):
try:
p = os.path.relpath(p, vault_root)
except Exception: except Exception:
pass return default
p = p.replace("\\", "/").lstrip("/")
return p
def _extract_external_links(text: str) -> List[Dict[str, Optional[str]]]: def _norm_lines(s: str) -> str:
"""Findet Markdown-Links [label](href "title") mit erlaubten Schemes.""" """Nur für defensive Gleichheitstests NICHT für Persistenz."""
out: List[Dict[str, Optional[str]]] = [] return "\n".join([ln.rstrip() for ln in s.replace("\r\n", "\n").replace("\r", "\n").split("\n")]).strip()
if not text:
return out
for m in RE_MD_LINK.finditer(text):
label = (m.group(1) or "").strip() or None
href = (m.group(2) or "").strip()
title = (m.group(3) or "").strip() or None
if any(href.startswith(s) for s in RE_HTTP_SCHEMES):
out.append({"href": href, "label": label or title})
return out
# ---------------------- Overlap-Dedupe Algorithmus ------------------- #
# --------------------------------------------------------------------------- def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int]]:
# Public API
# ---------------------------------------------------------------------------
def make_chunk_payloads(note_meta: Dict, path: str, chunks: List[Chunk]) -> List[Dict]:
""" """
Erzeugt Payload-Dicts für alle Chunks einer Note. Ermittelt nicht-überlappende Segmente zu einer geordneten Folge von Fenster-Strings.
Gibt (segments, overlaps_left) zurück, wobei:
Parameters - segments[i] = Fenster[i] ohne das vorangestellte Overlap
---------- - overlaps_left[i] = Länge des Overlaps von Fenster[i] zum bisher rekonstruierten Text
note_meta : Dict
Normalisierte Frontmatter der Note (mind.: id, title, type, status, tags, [area, project, lang]).
path : str
Pfad zur Note (sollte relativ zum Vault sein; wird hier zur Sicherheit normalisiert).
chunks : List[Chunk]
Vom Chunker erzeugte Chunks.
Returns
-------
List[Dict]
Liste von Payloads (ein Eintrag pro Chunk).
""" """
res: List[Dict] = [] segments: List[str] = []
rel_path = _normalize_rel_path(path, vault_root=None) overlaps_left: List[int] = []
reconstructed = ""
for w in windows:
w = w or ""
# finde größtes k, sodass reconstructed.endswith(w[:k])
max_k = min(len(w), max(0, len(reconstructed)))
k = 0
# Suche von groß nach klein (einfache O(n^2) ausreichend bei kurzen Fenstern)
for cand in range(max_k, -1, -1):
if reconstructed.endswith(w[:cand]):
k = cand
break
seg = w[k:]
segments.append(seg)
overlaps_left.append(k)
reconstructed += seg
return segments, overlaps_left
for ch in chunks: # ----------------------------- Public API ---------------------------- #
text: str = getattr(ch, "text", "") or ""
wikilinks = extract_wikilinks(text)
ext_links = _extract_external_links(text)
payload: Dict = { def make_chunk_payloads(
"id": getattr(ch, "id", None), frontmatter: Dict[str, Any],
"scope": "chunk", rel_path: str,
"note_id": note_meta.get("id"), chunks: Iterable[Union[Dict[str, Any], Any]],
"note_title": note_meta.get("title"), note_text: Optional[str] = None,
# gespiegelt für schnelle Filter: ) -> List[Dict[str, Any]]:
"note_type": note_meta.get("type"), """
"note_status": note_meta.get("status"), Baut Payload-Dicts pro Chunk.
"area": note_meta.get("area"),
"project": note_meta.get("project"), Parameter
"tags": note_meta.get("tags"), ---------
# Pfad frontmatter : dict erwartete Keys: id (note_id), title, type, tags (optional)
"note_path": rel_path, rel_path : str relativer Pfad der Note im Vault
"path": rel_path, # Back-compat chunks : iter Sequenz von Chunk-Objekten/-Dicts mit Fenstertext
# Reihenfolge & Section note_text : str? optionaler Gesamtkorpus (Body) für exakte Offsets
"chunk_index": getattr(ch, "index", None),
"section_title": getattr(ch, "section_title", None), Rückgabe
"section_path": getattr(ch, "section_path", None), --------
# Position Liste von Payload-Dicts. Wichtige Felder:
"char_start": getattr(ch, "char_start", None), note_id, chunk_id, chunk_index, seq, path, text, window, start, end,
"char_end": getattr(ch, "char_end", None), overlap_left, overlap_right, type, title, tags
"char_len": max(0, int(getattr(ch, "char_end", 0) or 0) - int(getattr(ch, "char_start", 0) or 0)) or len(text), """
# Nachbarn note_id = str(frontmatter.get("id") or "").strip()
"neighbors": { note_type = frontmatter.get("type", None)
"prev": getattr(ch, "neighbors_prev", None), note_title = frontmatter.get("title", None)
"next": getattr(ch, "neighbors_next", None), note_tags = frontmatter.get("tags", None)
},
# Inhalt & Metrik # 1) Fenstertexte extrahieren
"text": text, windows: List[str] = []
"text_sha256": _sha256_text(text), seqs: List[int] = []
"token_est": _estimate_tokens(text), for idx, c in enumerate(chunks):
# Sprache windows.append(_as_text(c))
"lang": note_meta.get("lang"), # Bestmögliche seq ermitteln
# Links s = None
"wikilinks": wikilinks, if isinstance(c, dict):
"external_links": ext_links, s = c.get("seq", c.get("chunk_index", idx))
"references": [{"target_id": w, "kind": "wikilink"} for w in wikilinks], else:
s = getattr(c, "seq", getattr(c, "chunk_index", idx))
seqs.append(_get_int(s, idx))
# 2) Nicht-überlappende Segmente berechnen
segments, overlaps_left = _dedupe_windows_to_segments(windows)
overlaps_right = [0] * len(segments)
# right-overlap ist der left-overlap des nächsten Fensters bezogen auf dessen Fenster,
# lässt sich nur approximieren; wir speichern ihn konsistent als 0 bzw. könnte man
# nachträglich bestimmen, falls benötigt.
# 3) Falls note_text gegeben ist, berechne absolute Offsets präzise
starts: List[int] = [0] * len(segments)
ends: List[int] = [0] * len(segments)
if isinstance(note_text, str):
pos = 0
for i, seg in enumerate(segments):
starts[i] = pos
pos += len(seg)
ends[i] = pos
else:
# Ohne Gesamtkorpus: Offsets anhand der kumulativen Segmentlängen
pos = 0
for i, seg in enumerate(segments):
starts[i] = pos
pos += len(seg)
ends[i] = pos
# 4) Payload-Dicts aufbauen
payloads: List[Dict[str, Any]] = []
for i, (win, seg) in enumerate(zip(windows, segments)):
pl: Dict[str, Any] = {
"note_id": note_id,
"chunk_id": f"{note_id}#{i+1}",
"chunk_index": i,
"seq": seqs[i],
"path": rel_path.replace("\\", "/").lstrip("/"),
# Texte
"window": win, # für Embeddings (inkl. Overlap)
"text": seg, # effektiver Anteil (verlustfreie Rekonstruktion)
# Offsets & Overlaps
"start": starts[i],
"end": ends[i],
"overlap_left": overlaps_left[i],
"overlap_right": overlaps_right[i],
} }
if note_type is not None:
pl["type"] = note_type
if note_title is not None:
pl["title"] = note_title
if note_tags is not None:
pl["tags"] = note_tags
payloads.append(pl)
# Entferne Nones/Leeres, aber **text** bleibt (darf leer sein z. B. bei Bild-Only-Chunks) return payloads
cleaned = {}
for k, v in payload.items():
if v in (None, [], {}):
# immer behalten:
if k in ("text", "neighbors"):
cleaned[k] = v
else:
continue
else:
cleaned[k] = v
res.append(cleaned)
return res
# ---------------------------------------------------------------------------
# CLI zum schnellen Testen
# ---------------------------------------------------------------------------
def _cli() -> None:
ap = argparse.ArgumentParser(description="Chunk-Payloads aus einer einzelnen Markdown-Datei erzeugen")
ap.add_argument("--from-file", required=True, help="Pfad zur Markdown-Datei")
ap.add_argument("--vault-root", default=None, help="Vault-Wurzel (zur Pfad-Relativierung)")
ap.add_argument("--summary", action="store_true", help="Nur kurze Übersicht je Chunk ausgeben")
args = ap.parse_args()
parsed = read_markdown(args.from_file)
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
# lazy import, um Zyklen zu vermeiden
try:
from app.core.chunker import assemble_chunks
except Exception:
from .chunker import assemble_chunks # type: ignore
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
rel = _normalize_rel_path(parsed.path, args.vault_root)
pls = make_chunk_payloads(fm, rel or parsed.path, chunks)
if args.summary:
out = []
for p in pls:
out.append({
"id": p.get("id"),
"chunk_index": p.get("chunk_index"),
"token_est": p.get("token_est"),
"wikilinks": p.get("wikilinks"),
"ext_links": [e.get("href") for e in p.get("external_links", [])],
"prev": (p.get("neighbors") or {}).get("prev"),
"next": (p.get("neighbors") or {}).get("next"),
})
print(json.dumps(out, ensure_ascii=False, indent=2))
else:
print(json.dumps(pls, ensure_ascii=False, indent=2))
# __main__ (optionaler Mini-Test)
if __name__ == "__main__": # pragma: no cover if __name__ == "__main__": # pragma: no cover
_cli() demo_fm = {"id": "demo", "title": "Demo", "type": "concept"}
demo_chunks = [
{"text": "Alpha Beta Gamma"},
{"text": "Gamma Delta"},
{"text": "Delta Epsilon Zeta"},
]
pls = make_chunk_payloads(demo_fm, "x/demo.md", demo_chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta")
from pprint import pprint
pprint(pls)
recon = "".join(p["text"] for p in pls)
print("RECON:", recon)