From 597090bc457619994015c9838ee59241058e0350 Mon Sep 17 00:00:00 2001 From: Lars Date: Sun, 9 Nov 2025 10:11:34 +0100 Subject: [PATCH] app/core/chunk_payload.py aktualisiert --- app/core/chunk_payload.py | 344 ++++++++++++++++++-------------------- 1 file changed, 164 insertions(+), 180 deletions(-) diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index 632dfbb..46e79cc 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -1,215 +1,199 @@ +# chunk_payload.py """ -chunk_payload.py — v1.4.2 -------------------------- -Robuste, abwärtskompatible Payload-Erzeugung für Chunks. - -Ziele -- Setzt pro Chunk `text`, `retriever_weight`, `chunk_profile`, `note_id`. -- Akzeptiert ParsedNote-Objekte *oder* Dicts, inklusive bereits vorsegmentierter .chunks. -- Verträgt zusätzliche args/kwargs (kompatibel zu älteren Aufrufern). -- Konfig-Auflösung identisch zu note_payload.py. - -Autor: ChatGPT -Lizenz: MIT +Mindnet - Chunk Payload Builder +Version: 1.4.3 +Beschreibung: +- Robust gegenüber alten/neuen Aufrufsignaturen (toleriert *args, **kwargs). +- Liest Typ-Defaults aus ./config/config.yaml oder ./config/types.yaml. +- Baut Chunks aus vorhandenen note.chunks (falls vorhanden) oder fällt auf + eine einfache, profilabhängige Absatzbündelung zurück. +- Setzt in jedem Chunk-Payload: + - note_id, chunk_id (deterministisch), index, title, type, path + - text (nie leer), retriever_weight, chunk_profile +- Garantiert JSON-serialisierbare Payloads. """ + from __future__ import annotations - +from typing import Any, Dict, List, Optional import os +import json +import pathlib +import re +import yaml import hashlib -from pathlib import Path -from typing import Any, Dict, List, Optional, Union - -try: - import yaml # type: ignore -except Exception: # pragma: no cover - yaml = None # type: ignore def _as_dict(note: Any) -> Dict[str, Any]: if isinstance(note, dict): - return dict(note) - out: Dict[str, Any] = {} - for attr in ("note_id", "id", "title", "type", "frontmatter", "meta", "body", "text", "content", "path", "chunks"): + return note + d: Dict[str, Any] = {} + for attr in ( + "id", + "note_id", + "title", + "path", + "frontmatter", + "meta", + "body", + "text", + "type", + "chunks", + ): if hasattr(note, attr): - out[attr] = getattr(note, attr) - if hasattr(note, "__dict__"): - for k, v in note.__dict__.items(): - if k not in out: - out[k] = v - return out + d[attr] = getattr(note, attr) + if "frontmatter" not in d and hasattr(note, "metadata"): + d["frontmatter"] = getattr(note, "metadata") + return d -def _load_types_config(search_root: Optional[Union[str, Path]] = None, - preloaded: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: - if isinstance(preloaded, dict) and "types" in preloaded: - return preloaded - - candidates: List[Path] = [] - if search_root: - root = Path(search_root) - candidates.extend([root / "config.yaml", root / "config" / "config.yaml", root / "config" / "types.yaml"]) - cwd = Path.cwd() - candidates.extend([cwd / "config.yaml", cwd / "config" / "config.yaml", cwd / "config" / "types.yaml"]) - - for p in candidates: - if p.exists() and p.is_file(): - if yaml is None: - break - try: - data = yaml.safe_load(p.read_text(encoding="utf-8")) or {} - if isinstance(data, dict) and "types" in data: - return data - except Exception: - pass - return {"version": "1.0", "types": {}} +def _load_types_config(explicit: Optional[Dict[str, Any]] = None) -> Dict[str, Any]: + if isinstance(explicit, dict): + return explicit + for rel in ("config/config.yaml", "config/types.yaml"): + p = pathlib.Path(rel) + if p.exists(): + with p.open("r", encoding="utf-8") as f: + data = yaml.safe_load(f) or {} + if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict): + return data["types"] + return data if isinstance(data, dict) else {} + return {} -def _safe_get(d: Dict[str, Any], key: str, default: Any = None) -> Any: - if not isinstance(d, dict): - return default - return d.get(key, default) +def _get_front(n: Dict[str, Any]) -> Dict[str, Any]: + fm = n.get("frontmatter") or n.get("meta") or {} + return fm if isinstance(fm, dict) else {} -def _resolve_type(note_d: Dict[str, Any]) -> str: - fm = note_d.get("frontmatter") or {} - t = _safe_get(fm, "type") or note_d.get("type") - if not t and isinstance(note_d.get("meta"), dict): - t = note_d["meta"].get("type") - return str(t or "concept") - - -def _resolve_note_id(note_d: Dict[str, Any]) -> Optional[str]: - for k in ("note_id", "id"): - v = note_d.get(k) - if isinstance(v, str) and v: +def _coalesce(*vals): + for v in vals: + if v is not None: return v return None -def _resolve_body(note_d: Dict[str, Any]) -> str: - for k in ("body", "text", "content"): - v = note_d.get(k) - if isinstance(v, str) and v.strip(): - return v - return "" +def _body(n: Dict[str, Any]) -> str: + b = n.get("body") + if isinstance(b, str): + return b + t = n.get("text") + return t if isinstance(t, str) else "" -def _resolve_defaults_for_type(types_cfg: Dict[str, Any], typ: str) -> Dict[str, Any]: - if not isinstance(types_cfg, dict): - return {} - t = (types_cfg.get("types") or {}).get(typ) or {} - return t if isinstance(t, dict) else {} +def _iter_chunks(n: Dict[str, Any], profile: str) -> List[Dict[str, Any]]: + # 1) Bereits vorhandene Chunks bevorzugen + existing = n.get("chunks") + if isinstance(existing, list) and existing: + out: List[Dict[str, Any]] = [] + for i, c in enumerate(existing): + if isinstance(c, dict): + text = c.get("text") or "" + else: + text = str(c) if c is not None else "" + if not text: + continue + out.append({"index": i, "text": text}) + if out: + return out - -def _coerce_float(val: Any, default: float) -> float: - try: - if val is None: - return default - if isinstance(val, (int, float)): - return float(val) - if isinstance(val, str): - return float(val.strip()) - except Exception: - pass - return default - - -def _compute_retriever_weight(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> float: - fm = note_d.get("frontmatter") or {} - if "retriever_weight" in fm: - return _coerce_float(fm.get("retriever_weight"), 1.0) - tdef = _resolve_defaults_for_type(types_cfg, typ) - if "retriever_weight" in tdef: - return _coerce_float(tdef.get("retriever_weight"), 1.0) - envv = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT") - if envv: - return _coerce_float(envv, 1.0) - return 1.0 - - -def _compute_chunk_profile(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> str: - fm = note_d.get("frontmatter") or {} - if "chunk_profile" in fm: - return str(fm.get("chunk_profile")) - tdef = _resolve_defaults_for_type(types_cfg, typ) - if "chunk_profile" in tdef: - return str(tdef.get("chunk_profile")) - envv = os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE") - if envv: - return str(envv) - return "medium" - - -def _norm_chunk_text(s: Any) -> str: - if isinstance(s, str): - return s.strip() - return "" - - -def _hash(s: str) -> str: - return hashlib.sha1(s.encode("utf-8")).hexdigest()[:12] + # 2) Fallback: naive, profilabhängige Absatz-Bündelung + size = {"short": 600, "medium": 1200, "long": 2400}.get(str(profile), 1200) + text = _body(n) + if not text: + return [] + paras = re.split(r"\n{2,}", text) + chunks: List[str] = [] + buf = "" + for p in paras: + p = (p or "").strip() + if not p: + continue + if len(buf) + (2 if buf else 0) + len(p) <= size: + buf = (buf + "\n\n" + p).strip() if buf else p + else: + if buf: + chunks.append(buf) + if len(p) <= size: + buf = p + else: + for i in range(0, len(p), size): + chunks.append(p[i : i + size]) + buf = "" + if buf: + chunks.append(buf) + return [{"index": i, "text": c} for i, c in enumerate(chunks)] def make_chunk_payloads(note: Any, *args, **kwargs) -> List[Dict[str, Any]]: - """Erzeugt Payloads für alle Chunks der Note. - - Akzeptierte zusätzliche kwargs: - - types_config: dict wie in config.yaml - - search_root / vault_root: für Konfigsuche - - *args werden ignoriert (Kompatibilität zu älteren Aufrufern). """ - note_d = _as_dict(note) + Build payloads for chunks. Tolerates legacy positional arguments. + Returns list[dict] (ein Payload pro Chunk). + """ + n = _as_dict(note) + types_cfg = kwargs.get("types_config") or (args[0] if args else None) + types_cfg = _load_types_config(types_cfg) - types_config = kwargs.get("types_config") - search_root = kwargs.get("search_root") or kwargs.get("vault_root") - types_cfg = _load_types_config(search_root, types_config) + fm = _get_front(n) + note_type = str(fm.get("type") or n.get("type") or "note") + cfg_for_type = types_cfg.get(note_type, {}) if isinstance(types_cfg, dict) else {} - typ = _resolve_type(note_d) - note_id = _resolve_note_id(note_d) or "" + try: + default_rw = float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0)) + except Exception: + default_rw = 1.0 - r_weight = _compute_retriever_weight(note_d, types_cfg, typ) - c_profile = _compute_chunk_profile(note_d, types_cfg, typ) + retriever_weight = _coalesce( + fm.get("retriever_weight"), + cfg_for_type.get("retriever_weight"), + default_rw, + ) + try: + retriever_weight = float(retriever_weight) + except Exception: + retriever_weight = default_rw - out: List[Dict[str, Any]] = [] + chunk_profile = _coalesce( + fm.get("chunk_profile"), + cfg_for_type.get("chunk_profile"), + os.environ.get("MINDNET_DEFAULT_CHUNK_PROFILE", "medium"), + ) + if not isinstance(chunk_profile, str): + chunk_profile = "medium" - # 1) Falls der Parser bereits Chunks liefert, nutzen - pre = note_d.get("chunks") - if isinstance(pre, list) and pre: - for idx, c in enumerate(pre): - if isinstance(c, dict): - text = _norm_chunk_text(c.get("text") or c.get("body") or c.get("content")) - else: - text = _norm_chunk_text(getattr(c, "text", "")) - if not text: - # Fallback auf Note-Body, falls leer - text = _resolve_body(note_d) - if not text: - continue + note_id = n.get("note_id") or n.get("id") or fm.get("id") + title = n.get("title") or fm.get("title") or "" + path = n.get("path") + if isinstance(path, pathlib.Path): + path = str(path) - chunk_id = f"{note_id}#{idx:03d}" if note_id else _hash(text)[:8] - payload = { - "note_id": note_id, - "chunk_id": chunk_id, - "text": text, - "retriever_weight": float(r_weight), - "chunk_profile": str(c_profile), - "type": typ, - } - out.append(payload) + chunks = _iter_chunks(n, chunk_profile) - # 2) Sonst als Single-Chunk aus Body/Text - if not out: - text = _resolve_body(note_d) - if text: - chunk_id = f"{note_id}#000" if note_id else _hash(text)[:8] - out.append({ - "note_id": note_id, - "chunk_id": chunk_id, - "text": text, - "retriever_weight": float(r_weight), - "chunk_profile": str(c_profile), - "type": typ, - }) + payloads: List[Dict[str, Any]] = [] + for c in chunks: + idx = c.get("index", len(payloads)) + text = c.get("text") if isinstance(c, dict) else (str(c) if c is not None else "") + if not isinstance(text, str): + text = str(text or "") - return out + # deterministische chunk_id + key = f"{note_id}|{idx}" + h = hashlib.sha1(key.encode("utf-8")).hexdigest()[:12] + chunk_id = f"{note_id}-{idx:03d}-{h}" if note_id else h + + payload = { + "note_id": note_id, + "chunk_id": chunk_id, + "index": idx, + "title": title, + "type": note_type, + "path": path, + "text": text, + "retriever_weight": retriever_weight, + "chunk_profile": chunk_profile, + } + + # JSON-Serialisierbarkeit sicherstellen + json.loads(json.dumps(payload, ensure_ascii=False)) + payloads.append(payload) + + return payloads