app/core/chunk_payload.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
This commit is contained in:
parent
4eb5e34ea7
commit
307335a733
|
|
@ -1,135 +1,283 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
chunk_payload.py v2.2.1
|
||||
Modul: app/core/chunk_payload.py
|
||||
Version: 2.3.0
|
||||
Datum: 2025-11-08
|
||||
|
||||
Zweck:
|
||||
- Erzeugt Chunk-Payloads (inkl. 'text' und 'window'), vollständig
|
||||
rückwärtskompatibel zu v2.2.0 (Signaturen beibehalten).
|
||||
- Neu: optionale Typ-Profile via Type-Registry (chunk_profile), ohne
|
||||
bestehendes Verhalten zu brechen. Fällt automatisch auf die bisherigen
|
||||
Einstellungen zurück, wenn keine Registry vorhanden/konfiguriert ist.
|
||||
Änderungen ggü. 2.2.0
|
||||
----------------------
|
||||
- Optionaler Parameter `chunk_profile: str | None` (abwärtskompatibel).
|
||||
- Bei fehlenden *echten* Fenstern (kein Overlap geliefert) wird das synthetische
|
||||
Overlap anhand des Chunk-Profils (short|medium|long) gewählt. Ohne Profil
|
||||
bleibt das bisherige Verhalten bestehen (Übernahme aus get_sizes(note_type)).
|
||||
|
||||
Wichtig:
|
||||
- Signatur von make_chunk_payloads bleibt kompatibel:
|
||||
make_chunk_payloads(chunks, note_id, note_title, note_type, note_path, ...)
|
||||
- 'window' != 'text', wenn Overlap > 0; ansonsten identisch.
|
||||
Hinweis
|
||||
------
|
||||
IDs, Felder und Vektoren bleiben unverändert.
|
||||
"""
|
||||
|
||||
from __future__ import annotations
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple
|
||||
|
||||
# Annahme: Diese Utilities existieren bei dir bereits (aus funktionierendem Stand):
|
||||
# - Chunk Typ (dataclass/NamedTuple) mit Feldern: id (oder idx), text, start, end
|
||||
# - windowing-Funktion (oder wir erzeugen ein simples Fenster über Nachbar-Chunks)
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
||||
|
||||
try:
|
||||
from app.core.chunker import Chunk # type: ignore
|
||||
# Typgerechtes Overlap aus deiner Konfiguration holen
|
||||
from app.core.chunk_config import get_sizes as _get_sizes
|
||||
except Exception:
|
||||
# Minimaler Fallback, falls die Import-Umgebung abweicht; in produktiven
|
||||
# Deployments wird der echte Chunk-Typ vorhanden sein.
|
||||
from typing import NamedTuple
|
||||
class Chunk(NamedTuple):
|
||||
idx: int
|
||||
text: str
|
||||
start: int
|
||||
end: int
|
||||
def _get_sizes(_note_type: str):
|
||||
# konservativer Default, falls Import fehlschlägt
|
||||
return {"overlap": (40, 60), "target": (250, 350), "max": 500}
|
||||
|
||||
# Optional: Registry einbinden
|
||||
# NEU: optionaler Import – Overlap-Empfehlungen aus der Type-Registry
|
||||
try:
|
||||
from app.core.type_registry import resolve_chunk_profile
|
||||
from app.core.type_registry import profile_overlap as _profile_overlap
|
||||
except Exception:
|
||||
def resolve_chunk_profile(note_type: str, default_profile: str = "default") -> str:
|
||||
return default_profile
|
||||
def _profile_overlap(_profile: Optional[str]) -> tuple[int,int]:
|
||||
return (40, 60)
|
||||
|
||||
DEFAULT_OVERLAP = 0 # falls euer chunker keinen Overlap liefert
|
||||
DEFAULT_PROFILE_TO_OVERLAP = {
|
||||
"short": 40,
|
||||
"medium": 80,
|
||||
"long": 120,
|
||||
"default": 0,
|
||||
}
|
||||
|
||||
def _estimate_overlap(profile: str) -> int:
|
||||
return int(DEFAULT_PROFILE_TO_OVERLAP.get(profile, DEFAULT_OVERLAP))
|
||||
# ------------------------------- Utils ------------------------------- #
|
||||
|
||||
def _make_window_text(chunks: List[Chunk], i: int, overlap_chars: int) -> str:
|
||||
def _get_attr_or_key(obj: Any, key: str, default=None):
|
||||
if obj is None:
|
||||
return default
|
||||
if isinstance(obj, dict):
|
||||
return obj.get(key, default)
|
||||
return getattr(obj, key, default)
|
||||
|
||||
def _as_window_text(chunk: Any) -> str:
|
||||
"""Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern)."""
|
||||
for k in ("window", "text", "content", "raw"):
|
||||
v = _get_attr_or_key(chunk, k, None)
|
||||
if isinstance(v, str) and v:
|
||||
return v
|
||||
return ""
|
||||
|
||||
def _to_int(x: Any, default: int = 0) -> int:
|
||||
try:
|
||||
return int(x)
|
||||
except Exception:
|
||||
return default
|
||||
|
||||
def _normalize_rel_path(p: str) -> str:
|
||||
p = (p or "").replace("\\", "/")
|
||||
while p.startswith("/"):
|
||||
p = p[1:]
|
||||
return p
|
||||
|
||||
|
||||
# ---------------------- Overlap & Offsets ---------------------------- #
|
||||
|
||||
def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]:
|
||||
"""
|
||||
Erzeugt ein einfaches Fenster: left-overlap-Anteil aus dem vorherigen Chunk
|
||||
+ eigener Text + right-overlap-Anteil aus dem nächsten Chunk.
|
||||
Entfernt linkes Overlap aus echten Fenster-Strings.
|
||||
Rückgabe: (segments, overlaps_left, reconstructed_text)
|
||||
"""
|
||||
center = chunks[i].text
|
||||
if overlap_chars <= 0:
|
||||
return center
|
||||
segments: List[str] = []
|
||||
overlaps_left: List[int] = []
|
||||
reconstructed = ""
|
||||
for w in windows:
|
||||
w = w or ""
|
||||
max_k = min(len(w), len(reconstructed))
|
||||
k = 0
|
||||
for cand in range(max_k, -1, -1):
|
||||
if reconstructed.endswith(w[:cand]):
|
||||
k = cand
|
||||
break
|
||||
seg = w[k:]
|
||||
segments.append(seg)
|
||||
overlaps_left.append(k)
|
||||
reconstructed += seg
|
||||
return segments, overlaps_left, reconstructed
|
||||
|
||||
left = ""
|
||||
right = ""
|
||||
if i > 0:
|
||||
ltxt = chunks[i - 1].text
|
||||
left = ltxt[-overlap_chars:] if len(ltxt) > overlap_chars else ltxt
|
||||
if i + 1 < len(chunks):
|
||||
rtxt = chunks[i + 1].text
|
||||
right = rtxt[:overlap_chars] if len(rtxt) > overlap_chars else rtxt
|
||||
def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int:
|
||||
"""Länge längsten Suffix(a), der Prefix(b) ist."""
|
||||
if not a or not b:
|
||||
return 0
|
||||
a1 = a[-max_probe:]
|
||||
b1 = b[:max_probe]
|
||||
n = min(len(a1), len(b1))
|
||||
for k in range(n, 0, -1):
|
||||
if a1[-k:] == b1[:k]:
|
||||
return k
|
||||
return 0
|
||||
|
||||
# Verhindere doppelte Leerzeichen
|
||||
pieces = [p for p in [left, center, right] if p]
|
||||
return (" ".join(pieces)).strip()
|
||||
|
||||
# ----------------------------- Public API ---------------------------- #
|
||||
|
||||
def make_chunk_payloads(
|
||||
chunks: Iterable[Chunk],
|
||||
note_id: str,
|
||||
note_title: str,
|
||||
note_type: Optional[str],
|
||||
note_path: str,
|
||||
*,
|
||||
frontmatter: Dict[str, Any],
|
||||
rel_path: str,
|
||||
chunks: Iterable[Union[Dict[str, Any], Any]],
|
||||
note_text: Optional[str] = None,
|
||||
*, # neue, nur-keyword Parameter bleiben abwärtskompatibel
|
||||
chunk_profile: Optional[str] = None,
|
||||
window_overwrite: Optional[int] = None,
|
||||
extra_payload: Optional[Dict[str, Any]] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
"""
|
||||
Rückwärtskompatible Fabrik für Chunk-Payloads.
|
||||
|
||||
Parameter:
|
||||
- chunks: Iterable[Chunk] — Ergebnis aus eurem chunker.
|
||||
- note_id: str — ID der Note (z. B. aus Frontmatter 'id').
|
||||
- note_title: str — Titel der Note.
|
||||
- note_type: Optional[str] — Typ aus Frontmatter (concept, task, ...).
|
||||
- note_path: str — relativer Pfad im Vault.
|
||||
- chunk_profile: Optional[str] — Override-Profil (z. B. 'short').
|
||||
- window_overwrite: Optional[int] — erzwinge Overlap in Zeichen.
|
||||
- extra_payload: Optional[dict] — zusätzliche Felder in Payload.
|
||||
|
||||
Rückgabe:
|
||||
- Liste von dict-Payloads; jedes enthält mind.:
|
||||
{ "note_id", "note_title", "note_type", "path", "chunk_id",
|
||||
"text", "window", "start", "end" }
|
||||
Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden,
|
||||
erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap.
|
||||
"""
|
||||
clist = list(chunks)
|
||||
# Profil bestimmen (Registry → default → Override)
|
||||
effective_profile = (
|
||||
chunk_profile
|
||||
or (resolve_chunk_profile(note_type or "concept") if note_type else "default")
|
||||
or "default"
|
||||
)
|
||||
overlap = window_overwrite if isinstance(window_overwrite, int) else _estimate_overlap(effective_profile)
|
||||
note_id = str(frontmatter.get("id") or "").strip()
|
||||
note_type = str(frontmatter.get("type", "")).lower()
|
||||
note_title = frontmatter.get("title", None)
|
||||
note_tags = frontmatter.get("tags", None)
|
||||
rel_path = _normalize_rel_path(rel_path)
|
||||
|
||||
# 1) Rohdaten sammeln (so wie geliefert)
|
||||
chunks_list = list(chunks)
|
||||
raw_windows: List[str] = []
|
||||
seqs: List[int] = []
|
||||
ids_in: List[Optional[str]] = []
|
||||
token_counts: List[Optional[int]] = []
|
||||
section_titles: List[Optional[str]] = []
|
||||
section_paths: List[Optional[str]] = []
|
||||
any_explicit_window = False
|
||||
|
||||
for idx, c in enumerate(chunks_list):
|
||||
# Fensterquelle
|
||||
w = _get_attr_or_key(c, "window", None)
|
||||
if isinstance(w, str) and w:
|
||||
any_explicit_window = True
|
||||
raw_windows.append(w)
|
||||
else:
|
||||
raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz
|
||||
# Ordnung
|
||||
seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx))
|
||||
# IDs, Tokens, Sektionen
|
||||
cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None))
|
||||
ids_in.append(str(cid) if isinstance(cid, str) and cid else None)
|
||||
tc = _get_attr_or_key(c, "token_count", None)
|
||||
token_counts.append(_to_int(tc, 0) if tc is not None else None)
|
||||
section_titles.append(_get_attr_or_key(c, "section_title", None))
|
||||
section_paths.append(_get_attr_or_key(c, "section_path", None))
|
||||
|
||||
# 2) Segmente & Overlaps bestimmen
|
||||
if any_explicit_window:
|
||||
# Es existieren echte Fenster → dedupe, um Kernsegmente zu finden
|
||||
segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows)
|
||||
windows_final = raw_windows[:] # bereits mit Overlap geliefert
|
||||
else:
|
||||
# Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher),
|
||||
# wir erzeugen synthetische Fenster mit Overlap.
|
||||
segments = [w or "" for w in raw_windows]
|
||||
overlaps_left = []
|
||||
windows_final = []
|
||||
recon = ""
|
||||
|
||||
try:
|
||||
# Bisheriges Verhalten: aus get_sizes(note_type)
|
||||
overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60)))
|
||||
except Exception:
|
||||
overlap_low, overlap_high = (40, 60)
|
||||
|
||||
# Registry-Profil (falls vorhanden) übersteuert *nur* den Overlap defensiv
|
||||
if isinstance(chunk_profile, str) and chunk_profile.strip():
|
||||
try:
|
||||
o_low, o_high = _profile_overlap(chunk_profile.strip().lower())
|
||||
# defensiver Clamp: niemals größer als 3x Default
|
||||
overlap_low = max(0, min(o_low, overlap_low * 3))
|
||||
overlap_high = max(overlap_low, min(o_high, overlap_high * 3))
|
||||
except Exception:
|
||||
pass
|
||||
|
||||
overlap_target = int(overlap_low)
|
||||
|
||||
for i, seg in enumerate(segments):
|
||||
if i == 0:
|
||||
# erstes Fenster: kein linker Kontext
|
||||
windows_final.append(seg)
|
||||
overlaps_left.append(0)
|
||||
recon += seg
|
||||
else:
|
||||
# synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts
|
||||
k = min(overlap_target, len(recon))
|
||||
left_ctx = recon[-k:] if k > 0 else ""
|
||||
windows_final.append(left_ctx + seg)
|
||||
overlaps_left.append(k)
|
||||
recon += seg # Rekonstruktion bleibt kerntreu
|
||||
|
||||
# 3) overlap_right bestimmen
|
||||
overlaps_right: List[int] = []
|
||||
for i in range(len(windows_final)):
|
||||
if i + 1 < len(windows_final):
|
||||
ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096)
|
||||
else:
|
||||
ov = 0
|
||||
overlaps_right.append(ov)
|
||||
|
||||
# 4) start/end-Offsets (exakt via note_text, sonst kumulativ)
|
||||
starts: List[int] = [0] * len(segments)
|
||||
ends: List[int] = [0] * len(segments)
|
||||
pos = 0
|
||||
if isinstance(note_text, str) and note_text:
|
||||
search_pos = 0
|
||||
for i, seg in enumerate(segments):
|
||||
if not seg:
|
||||
starts[i] = ends[i] = search_pos
|
||||
continue
|
||||
j = note_text.find(seg, search_pos)
|
||||
if j >= 0:
|
||||
starts[i] = j
|
||||
ends[i] = j + len(seg)
|
||||
search_pos = ends[i]
|
||||
else:
|
||||
# Fallback: kumulativ
|
||||
starts[i] = pos
|
||||
pos += len(seg)
|
||||
ends[i] = pos
|
||||
else:
|
||||
for i, seg in enumerate(segments):
|
||||
starts[i] = pos
|
||||
pos += len(seg)
|
||||
ends[i] = pos
|
||||
|
||||
# 5) Payload-Dicts
|
||||
payloads: List[Dict[str, Any]] = []
|
||||
for i, c in enumerate(clist):
|
||||
# Chunk-ID stabil: note_id#(laufende Nummer 1..n) – rückwärtskompatibel
|
||||
# Falls euer Chunk bereits eine ID hat, könnt ihr sie beibehalten.
|
||||
cid = f"{note_id}#{i+1}"
|
||||
window_txt = _make_window_text(clist, i, overlap)
|
||||
|
||||
for i, (win, seg) in enumerate(zip(windows_final, segments)):
|
||||
chunk_id = ids_in[i] or f"{note_id}#{i+1}"
|
||||
pl: Dict[str, Any] = {
|
||||
"note_id": note_id,
|
||||
"note_title": note_title,
|
||||
"note_type": note_type,
|
||||
"path": note_path,
|
||||
"chunk_id": cid,
|
||||
"text": getattr(c, "text", ""),
|
||||
"window": window_txt,
|
||||
"start": getattr(c, "start", 0),
|
||||
"end": getattr(c, "end", 0),
|
||||
"chunk_id": chunk_id,
|
||||
"id": chunk_id, # Alias
|
||||
"chunk_index": i,
|
||||
"seq": seqs[i],
|
||||
"path": rel_path,
|
||||
"window": win,
|
||||
"text": seg,
|
||||
"start": starts[i],
|
||||
"end": ends[i],
|
||||
"overlap_left": overlaps_left[i],
|
||||
"overlap_right": overlaps_right[i],
|
||||
}
|
||||
if extra_payload:
|
||||
pl.update(extra_payload)
|
||||
# optionale Metadaten
|
||||
if note_type:
|
||||
pl["type"] = note_type
|
||||
if note_title is not None:
|
||||
pl["title"] = note_title
|
||||
if note_tags is not None:
|
||||
pl["tags"] = note_tags
|
||||
if token_counts[i] is not None:
|
||||
pl["token_count"] = int(token_counts[i])
|
||||
if section_titles[i] is not None:
|
||||
pl["section_title"] = section_titles[i]
|
||||
if section_paths[i] is not None:
|
||||
sp = str(section_paths[i]).replace("\\", "/")
|
||||
pl["section_path"] = sp if sp else "/"
|
||||
payloads.append(pl)
|
||||
|
||||
return payloads
|
||||
|
||||
|
||||
if __name__ == "__main__": # pragma: no cover
|
||||
fm = {"id": "demo", "title": "Demo", "type": "concept"}
|
||||
# Beispiel ohne echte Fenster → erzeugt synthetische Overlaps
|
||||
chunks = [
|
||||
{"id": "demo#1", "text": "Alpha Beta Gamma"},
|
||||
{"id": "demo#2", "text": "Gamma Delta"},
|
||||
{"id": "demo#3", "text": "Delta Epsilon Zeta"},
|
||||
]
|
||||
pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta", chunk_profile="long")
|
||||
from pprint import pprint
|
||||
pprint(pls)
|
||||
recon = "".join(p["text"] for p in pls)
|
||||
print("RECON:", recon)
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user