mindnet/app/core/chunk_payload.py
Lars f1e1cde597
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
Dateien nach "app/core" hochladen
2025-11-08 21:53:49 +01:00

281 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Modul: app/core/chunk_payload.py
# Version: 2.3.1
# Datum: 2025-11-08
#
# Zweck
# -----
# Erzeugt Qdrant-Payloads für Chunks. Voll abwärtskompatibel zu v2.2.0.
# Fixes:
# - 'retriever_weight' aus Frontmatter wird IMMER in jeden Chunk-Payload übernommen
# (Float; Default via ENV MINDNET_DEFAULT_RETRIEVER_WEIGHT, sonst 1.0).
# - 'chunk_profile' aus Frontmatter wird falls vorhanden in jeden Chunk-Payload übernommen.
# - Robustere Fenster/Overlap-Erzeugung bleibt erhalten.
#
# Hinweis zu Qdrant:
# Qdrant ist schemaflexibel. Ein Feld erscheint in der UI/HTTP-API erst,
# wenn mindestens 1 Punkt es im Payload besitzt. Für konsistente Typisierung
# empfiehlt sich zusätzlich eine Payload-Index-Definition (z.B. FLOAT für
# 'retriever_weight').
from __future__ import annotations
import os
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
try:
# Typgerechtes Overlap aus deiner Konfiguration holen
from app.core.chunk_config import get_sizes as _get_sizes
except Exception:
def _get_sizes(_note_type: str):
# konservativer Default, falls Import fehlschlägt
return {"overlap": (40, 60), "target": (250, 350), "max": 500}
# ------------------------------- Utils ------------------------------- #
def _get_attr_or_key(obj: Any, key: str, default=None):
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def _as_window_text(chunk: Any) -> str:
"""Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern)."""
for k in ("window", "text", "content", "raw"):
v = _get_attr_or_key(chunk, k, None)
if isinstance(v, str) and v:
return v
return ""
def _to_int(x: Any, default: int = 0) -> int:
try:
return int(x)
except Exception:
return default
def _normalize_rel_path(p: str) -> str:
p = (p or "").replace("\\", "/")
while p.startswith("/"):
p = p[1:]
return p
def _to_float(val: Any, default: float) -> float:
try:
if val is None:
return float(default)
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip().replace(",", ".")
return float(s)
except Exception:
return float(default)
# ---------------------- Overlap & Offsets ---------------------------- #
def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]:
"""
Entfernt linkes Overlap aus echten Fenster-Strings.
Rückgabe: (segments, overlaps_left, reconstructed_text)
"""
segments: List[str] = []
overlaps_left: List[int] = []
reconstructed = ""
for w in windows:
w = w or ""
max_k = min(len(w), len(reconstructed))
k = 0
for cand in range(max_k, -1, -1):
if reconstructed.endswith(w[:cand]):
k = cand
break
seg = w[k:]
segments.append(seg)
overlaps_left.append(k)
reconstructed += seg
return segments, overlaps_left, reconstructed
def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int:
"""Länge längsten Suffix(a), der Prefix(b) ist."""
if not a or not b:
return 0
a1 = a[-max_probe:]
b1 = b[:max_probe]
n = min(len(a1), len(b1))
for k in range(n, 0, -1):
if a1[-k:] == b1[:k]:
return k
return 0
# ----------------------------- Public API ---------------------------- #
def make_chunk_payloads(
frontmatter: Dict[str, Any],
rel_path: str,
chunks: Iterable[Union[Dict[str, Any], Any]],
note_text: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden,
erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap.
Zusätzlich werden 'retriever_weight' (float) und 'chunk_profile' übernommen.
"""
note_id = str(frontmatter.get("id") or "").strip()
note_type = str(frontmatter.get("type", "")).lower()
note_title = frontmatter.get("title", None)
note_tags = frontmatter.get("tags", None)
rel_path = _normalize_rel_path(rel_path)
# --- neue Felder aus FM (mit Defaults) ---
default_rw = _to_float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0), 1.0)
fm_rw = _to_float(frontmatter.get("retriever_weight"), default_rw)
fm_chunk_profile = frontmatter.get("chunk_profile") or frontmatter.get("profile") or None
# 1) Rohdaten sammeln (so wie geliefert)
chunks_list = list(chunks)
raw_windows: List[str] = []
seqs: List[int] = []
ids_in: List[Optional[str]] = []
token_counts: List[Optional[int]] = []
section_titles: List[Optional[str]] = []
section_paths: List[Optional[str]] = []
any_explicit_window = False
for idx, c in enumerate(chunks_list):
# Fensterquelle
w = _get_attr_or_key(c, "window", None)
if isinstance(w, str) and w:
any_explicit_window = True
raw_windows.append(w)
else:
raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz
# Ordnung
seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx))
# IDs, Tokens, Sektionen
cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None))
ids_in.append(str(cid) if isinstance(cid, str) and cid else None)
tc = _get_attr_or_key(c, "token_count", None)
token_counts.append(_to_int(tc, 0) if tc is not None else None)
section_titles.append(_get_attr_or_key(c, "section_title", None))
section_paths.append(_get_attr_or_key(c, "section_path", None))
# 2) Segmente & Overlaps bestimmen
if any_explicit_window:
# Es existieren echte Fenster → dedupe, um Kernsegmente zu finden
segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows)
windows_final = raw_windows[:] # bereits mit Overlap geliefert
else:
# Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher),
# wir erzeugen synthetische Fenster mit Overlap gemäß Typ
segments = [w or "" for w in raw_windows]
overlaps_left = []
windows_final = []
recon = ""
try:
overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60)))
except Exception:
overlap_low, overlap_high = (40, 60)
overlap_target = int(overlap_low)
for i, seg in enumerate(segments):
if i == 0:
# erstes Fenster: kein linker Kontext
windows_final.append(seg)
overlaps_left.append(0)
recon += seg
else:
# synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts
k = min(overlap_target, len(recon))
left_ctx = recon[-k:] if k > 0 else ""
windows_final.append(left_ctx + seg)
overlaps_left.append(k)
recon += seg # Rekonstruktion bleibt kerntreu
# 3) overlap_right bestimmen
overlaps_right: List[int] = []
for i in range(len(windows_final)):
if i + 1 < len(windows_final):
ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096)
else:
ov = 0
overlaps_right.append(ov)
# 4) start/end-Offsets (exakt via note_text, sonst kumulativ)
starts: List[int] = [0] * len(segments)
ends: List[int] = [0] * len(segments)
pos = 0
if isinstance(note_text, str) and note_text:
search_pos = 0
for i, seg in enumerate(segments):
if not seg:
starts[i] = ends[i] = search_pos
continue
j = note_text.find(seg, search_pos)
if j >= 0:
starts[i] = j
ends[i] = j + len(seg)
search_pos = ends[i]
else:
# Fallback: kumulativ
starts[i] = pos
pos += len(seg)
ends[i] = pos
else:
for i, seg in enumerate(segments):
starts[i] = pos
pos += len(seg)
ends[i] = pos
# 5) Payload-Dicts
payloads: List[Dict[str, Any]] = []
for i, (win, seg) in enumerate(zip(windows_final, segments)):
chunk_id = ids_in[i] or f"{note_id}#{i+1}"
pl: Dict[str, Any] = {
"note_id": note_id,
"chunk_id": chunk_id,
"id": chunk_id, # Alias
"chunk_index": i,
"seq": seqs[i],
"path": rel_path,
"window": win,
"text": seg,
"start": starts[i],
"end": ends[i],
"overlap_left": overlaps_left[i],
"overlap_right": overlaps_right[i],
# NEU:
"retriever_weight": fm_rw,
}
# optionale Metadaten
if note_type:
pl["type"] = note_type
if note_title is not None:
pl["title"] = note_title
if note_tags is not None:
pl["tags"] = note_tags
if token_counts[i] is not None:
pl["token_count"] = int(token_counts[i])
if section_titles[i] is not None:
pl["section_title"] = section_titles[i]
if section_paths[i] is not None:
sp = str(section_paths[i]).replace("\\", "/")
pl["section_path"] = sp if sp else "/"
if fm_chunk_profile is not None:
pl["chunk_profile"] = str(fm_chunk_profile)
payloads.append(pl)
return payloads
if __name__ == "__main__": # pragma: no cover
fm = {"id": "demo", "title": "Demo", "type": "concept", "retriever_weight": 0.75, "chunk_profile": "tight"}
chunks = [
{"id": "demo#1", "text": "Alpha Beta Gamma"},
{"id": "demo#2", "text": "Gamma Delta"},
{"id": "demo#3", "text": "Delta Epsilon Zeta"},
]
pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta")
from pprint import pprint
pprint(pls)