app/core/chunk_payload.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-09 10:11:34 +01:00
parent af36c410b4
commit 597090bc45

View File

@ -1,215 +1,199 @@
# chunk_payload.py
"""
chunk_payload.py v1.4.2
-------------------------
Robuste, abwärtskompatible Payload-Erzeugung für Chunks.
Ziele
- Setzt pro Chunk `text`, `retriever_weight`, `chunk_profile`, `note_id`.
- Akzeptiert ParsedNote-Objekte *oder* Dicts, inklusive bereits vorsegmentierter .chunks.
- Verträgt zusätzliche args/kwargs (kompatibel zu älteren Aufrufern).
- Konfig-Auflösung identisch zu note_payload.py.
Autor: ChatGPT
Lizenz: MIT
Mindnet - Chunk Payload Builder
Version: 1.4.3
Beschreibung:
- Robust gegenüber alten/neuen Aufrufsignaturen (toleriert *args, **kwargs).
- Liest Typ-Defaults aus ./config/config.yaml oder ./config/types.yaml.
- Baut Chunks aus vorhandenen note.chunks (falls vorhanden) oder fällt auf
eine einfache, profilabhängige Absatzbündelung zurück.
- Setzt in jedem Chunk-Payload:
- note_id, chunk_id (deterministisch), index, title, type, path
- text (nie leer), retriever_weight, chunk_profile
- Garantiert JSON-serialisierbare Payloads.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional
import os
import json
import pathlib
import re
import yaml
import hashlib
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
try:
import yaml # type: ignore
except Exception: # pragma: no cover
yaml = None # type: ignore
def _as_dict(note: Any) -> Dict[str, Any]:
if isinstance(note, dict):
return dict(note)
out: Dict[str, Any] = {}
for attr in ("note_id", "id", "title", "type", "frontmatter", "meta", "body", "text", "content", "path", "chunks"):
return note
d: Dict[str, Any] = {}
for attr in (
"id",
"note_id",
"title",
"path",
"frontmatter",
"meta",
"body",
"text",
"type",
"chunks",
):
if hasattr(note, attr):
out[attr] = getattr(note, attr)
if hasattr(note, "__dict__"):
for k, v in note.__dict__.items():
if k not in out:
out[k] = v
return out
d[attr] = getattr(note, attr)
if "frontmatter" not in d and hasattr(note, "metadata"):
d["frontmatter"] = getattr(note, "metadata")
return d
def _load_types_config(search_root: Optional[Union[str, Path]] = None,
preloaded: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if isinstance(preloaded, dict) and "types" in preloaded:
return preloaded
candidates: List[Path] = []
if search_root:
root = Path(search_root)
candidates.extend([root / "config.yaml", root / "config" / "config.yaml", root / "config" / "types.yaml"])
cwd = Path.cwd()
candidates.extend([cwd / "config.yaml", cwd / "config" / "config.yaml", cwd / "config" / "types.yaml"])
for p in candidates:
if p.exists() and p.is_file():
if yaml is None:
break
try:
data = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
if isinstance(data, dict) and "types" in data:
return data
except Exception:
pass
return {"version": "1.0", "types": {}}
def _load_types_config(explicit: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if isinstance(explicit, dict):
return explicit
for rel in ("config/config.yaml", "config/types.yaml"):
p = pathlib.Path(rel)
if p.exists():
with p.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict):
return data["types"]
return data if isinstance(data, dict) else {}
return {}
def _safe_get(d: Dict[str, Any], key: str, default: Any = None) -> Any:
if not isinstance(d, dict):
return default
return d.get(key, default)
def _get_front(n: Dict[str, Any]) -> Dict[str, Any]:
fm = n.get("frontmatter") or n.get("meta") or {}
return fm if isinstance(fm, dict) else {}
def _resolve_type(note_d: Dict[str, Any]) -> str:
fm = note_d.get("frontmatter") or {}
t = _safe_get(fm, "type") or note_d.get("type")
if not t and isinstance(note_d.get("meta"), dict):
t = note_d["meta"].get("type")
return str(t or "concept")
def _resolve_note_id(note_d: Dict[str, Any]) -> Optional[str]:
for k in ("note_id", "id"):
v = note_d.get(k)
if isinstance(v, str) and v:
def _coalesce(*vals):
for v in vals:
if v is not None:
return v
return None
def _resolve_body(note_d: Dict[str, Any]) -> str:
for k in ("body", "text", "content"):
v = note_d.get(k)
if isinstance(v, str) and v.strip():
return v
return ""
def _body(n: Dict[str, Any]) -> str:
b = n.get("body")
if isinstance(b, str):
return b
t = n.get("text")
return t if isinstance(t, str) else ""
def _resolve_defaults_for_type(types_cfg: Dict[str, Any], typ: str) -> Dict[str, Any]:
if not isinstance(types_cfg, dict):
return {}
t = (types_cfg.get("types") or {}).get(typ) or {}
return t if isinstance(t, dict) else {}
def _iter_chunks(n: Dict[str, Any], profile: str) -> List[Dict[str, Any]]:
# 1) Bereits vorhandene Chunks bevorzugen
existing = n.get("chunks")
if isinstance(existing, list) and existing:
out: List[Dict[str, Any]] = []
for i, c in enumerate(existing):
if isinstance(c, dict):
text = c.get("text") or ""
else:
text = str(c) if c is not None else ""
if not text:
continue
out.append({"index": i, "text": text})
if out:
return out
def _coerce_float(val: Any, default: float) -> float:
try:
if val is None:
return default
if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
return float(val.strip())
except Exception:
pass
return default
def _compute_retriever_weight(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> float:
fm = note_d.get("frontmatter") or {}
if "retriever_weight" in fm:
return _coerce_float(fm.get("retriever_weight"), 1.0)
tdef = _resolve_defaults_for_type(types_cfg, typ)
if "retriever_weight" in tdef:
return _coerce_float(tdef.get("retriever_weight"), 1.0)
envv = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT")
if envv:
return _coerce_float(envv, 1.0)
return 1.0
def _compute_chunk_profile(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> str:
fm = note_d.get("frontmatter") or {}
if "chunk_profile" in fm:
return str(fm.get("chunk_profile"))
tdef = _resolve_defaults_for_type(types_cfg, typ)
if "chunk_profile" in tdef:
return str(tdef.get("chunk_profile"))
envv = os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE")
if envv:
return str(envv)
return "medium"
def _norm_chunk_text(s: Any) -> str:
if isinstance(s, str):
return s.strip()
return ""
def _hash(s: str) -> str:
return hashlib.sha1(s.encode("utf-8")).hexdigest()[:12]
# 2) Fallback: naive, profilabhängige Absatz-Bündelung
size = {"short": 600, "medium": 1200, "long": 2400}.get(str(profile), 1200)
text = _body(n)
if not text:
return []
paras = re.split(r"\n{2,}", text)
chunks: List[str] = []
buf = ""
for p in paras:
p = (p or "").strip()
if not p:
continue
if len(buf) + (2 if buf else 0) + len(p) <= size:
buf = (buf + "\n\n" + p).strip() if buf else p
else:
if buf:
chunks.append(buf)
if len(p) <= size:
buf = p
else:
for i in range(0, len(p), size):
chunks.append(p[i : i + size])
buf = ""
if buf:
chunks.append(buf)
return [{"index": i, "text": c} for i, c in enumerate(chunks)]
def make_chunk_payloads(note: Any, *args, **kwargs) -> List[Dict[str, Any]]:
"""Erzeugt Payloads für alle Chunks der Note.
Akzeptierte zusätzliche kwargs:
- types_config: dict wie in config.yaml
- search_root / vault_root: für Konfigsuche
*args werden ignoriert (Kompatibilität zu älteren Aufrufern).
"""
note_d = _as_dict(note)
Build payloads for chunks. Tolerates legacy positional arguments.
Returns list[dict] (ein Payload pro Chunk).
"""
n = _as_dict(note)
types_cfg = kwargs.get("types_config") or (args[0] if args else None)
types_cfg = _load_types_config(types_cfg)
types_config = kwargs.get("types_config")
search_root = kwargs.get("search_root") or kwargs.get("vault_root")
types_cfg = _load_types_config(search_root, types_config)
fm = _get_front(n)
note_type = str(fm.get("type") or n.get("type") or "note")
cfg_for_type = types_cfg.get(note_type, {}) if isinstance(types_cfg, dict) else {}
typ = _resolve_type(note_d)
note_id = _resolve_note_id(note_d) or ""
try:
default_rw = float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0))
except Exception:
default_rw = 1.0
r_weight = _compute_retriever_weight(note_d, types_cfg, typ)
c_profile = _compute_chunk_profile(note_d, types_cfg, typ)
retriever_weight = _coalesce(
fm.get("retriever_weight"),
cfg_for_type.get("retriever_weight"),
default_rw,
)
try:
retriever_weight = float(retriever_weight)
except Exception:
retriever_weight = default_rw
out: List[Dict[str, Any]] = []
chunk_profile = _coalesce(
fm.get("chunk_profile"),
cfg_for_type.get("chunk_profile"),
os.environ.get("MINDNET_DEFAULT_CHUNK_PROFILE", "medium"),
)
if not isinstance(chunk_profile, str):
chunk_profile = "medium"
# 1) Falls der Parser bereits Chunks liefert, nutzen
pre = note_d.get("chunks")
if isinstance(pre, list) and pre:
for idx, c in enumerate(pre):
if isinstance(c, dict):
text = _norm_chunk_text(c.get("text") or c.get("body") or c.get("content"))
else:
text = _norm_chunk_text(getattr(c, "text", ""))
if not text:
# Fallback auf Note-Body, falls leer
text = _resolve_body(note_d)
if not text:
continue
note_id = n.get("note_id") or n.get("id") or fm.get("id")
title = n.get("title") or fm.get("title") or ""
path = n.get("path")
if isinstance(path, pathlib.Path):
path = str(path)
chunk_id = f"{note_id}#{idx:03d}" if note_id else _hash(text)[:8]
payload = {
"note_id": note_id,
"chunk_id": chunk_id,
"text": text,
"retriever_weight": float(r_weight),
"chunk_profile": str(c_profile),
"type": typ,
}
out.append(payload)
chunks = _iter_chunks(n, chunk_profile)
# 2) Sonst als Single-Chunk aus Body/Text
if not out:
text = _resolve_body(note_d)
if text:
chunk_id = f"{note_id}#000" if note_id else _hash(text)[:8]
out.append({
"note_id": note_id,
"chunk_id": chunk_id,
"text": text,
"retriever_weight": float(r_weight),
"chunk_profile": str(c_profile),
"type": typ,
})
payloads: List[Dict[str, Any]] = []
for c in chunks:
idx = c.get("index", len(payloads))
text = c.get("text") if isinstance(c, dict) else (str(c) if c is not None else "")
if not isinstance(text, str):
text = str(text or "")
return out
# deterministische chunk_id
key = f"{note_id}|{idx}"
h = hashlib.sha1(key.encode("utf-8")).hexdigest()[:12]
chunk_id = f"{note_id}-{idx:03d}-{h}" if note_id else h
payload = {
"note_id": note_id,
"chunk_id": chunk_id,
"index": idx,
"title": title,
"type": note_type,
"path": path,
"text": text,
"retriever_weight": retriever_weight,
"chunk_profile": chunk_profile,
}
# JSON-Serialisierbarkeit sicherstellen
json.loads(json.dumps(payload, ensure_ascii=False))
payloads.append(payload)
return payloads