All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s
181 lines
6.5 KiB
Python
181 lines
6.5 KiB
Python
# chunk_payload.py
|
|
from __future__ import annotations
|
|
from typing import Any, Dict, List, Optional, Tuple
|
|
import os, json, pathlib, re, yaml, hashlib
|
|
|
|
FRONTMATTER_RE = re.compile(r"^---\s*\n.*?\n---\s*\n?", re.DOTALL)
|
|
|
|
def _as_dict(note: Any) -> Dict[str, Any]:
|
|
if isinstance(note, dict):
|
|
return note
|
|
d: Dict[str, Any] = {}
|
|
for attr in ("id","note_id","title","path","frontmatter","meta","metadata","body","text","content","raw","markdown","type","chunks"):
|
|
if hasattr(note, attr):
|
|
d[attr] = getattr(note, attr)
|
|
fm = d.get("frontmatter") or d.get("meta") or d.get("metadata") or {}
|
|
d["frontmatter"] = fm if isinstance(fm, dict) else {}
|
|
return d
|
|
|
|
def _pick_args(*args, **kwargs) -> Tuple[Optional[str], Optional[Dict[str,Any]]]:
|
|
path = kwargs.get("path")
|
|
types_cfg = kwargs.get("types_config")
|
|
for a in args:
|
|
if path is None and isinstance(a, (str, pathlib.Path)):
|
|
path = str(a)
|
|
if types_cfg is None and isinstance(a, dict):
|
|
types_cfg = a
|
|
return path, types_cfg
|
|
|
|
def _load_types_config(explicit: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
|
|
if isinstance(explicit, dict):
|
|
return explicit
|
|
for rel in ("config/config.yaml", "config/types.yaml"):
|
|
p = pathlib.Path(rel)
|
|
if p.exists():
|
|
with p.open("r", encoding="utf-8") as f:
|
|
data = yaml.safe_load(f) or {}
|
|
if isinstance(data, dict) and "types" in data and isinstance(data["types"], dict):
|
|
return data["types"]
|
|
return data if isinstance(data, dict) else {}
|
|
return {}
|
|
|
|
def _coalesce(*vals):
|
|
for v in vals:
|
|
if v is not None:
|
|
return v
|
|
return None
|
|
|
|
def _text_from_note(n: Dict[str, Any], path_hint: Optional[str]) -> str:
|
|
# häufige Felder
|
|
cand = [
|
|
n.get("body"),
|
|
n.get("text"),
|
|
n.get("markdown"),
|
|
n.get("raw"),
|
|
]
|
|
content = n.get("content")
|
|
if isinstance(content, str):
|
|
cand.append(content)
|
|
elif isinstance(content, dict):
|
|
for k in ("text","body","raw","markdown","content"):
|
|
v = content.get(k)
|
|
if isinstance(v, str):
|
|
cand.append(v)
|
|
for t in cand:
|
|
if isinstance(t, str) and t.strip():
|
|
return t
|
|
|
|
# Fallback: Datei lesen und Frontmatter entfernen
|
|
p = n.get("path") or path_hint
|
|
if p:
|
|
try:
|
|
pth = pathlib.Path(p)
|
|
if pth.exists():
|
|
txt = pth.read_text(encoding="utf-8", errors="ignore")
|
|
if txt:
|
|
return FRONTMATTER_RE.sub("", txt).strip()
|
|
except Exception:
|
|
pass
|
|
return ""
|
|
|
|
def _iter_chunks(n: Dict[str, Any], profile: str, fulltext: str) -> List[Dict[str, Any]]:
|
|
# 1) vorhandene Chunks nehmen, wenn sinnvoll
|
|
existing = n.get("chunks")
|
|
out: List[Dict[str, Any]] = []
|
|
if isinstance(existing, list) and existing:
|
|
for i, c in enumerate(existing):
|
|
text = ""
|
|
if isinstance(c, dict):
|
|
text = c.get("text") or c.get("body") or c.get("raw") or ""
|
|
elif isinstance(c, str):
|
|
text = c
|
|
if text and text.strip():
|
|
out.append({"index": i, "text": text})
|
|
if out:
|
|
return out
|
|
|
|
# 2) Fallback: profilabhängige Bündelung
|
|
if not isinstance(profile, str):
|
|
profile = "medium"
|
|
size = {"short": 600, "medium": 1200, "long": 2400}.get(profile, 1200)
|
|
if not fulltext:
|
|
return []
|
|
paras = re.split(r"\n{2,}", fulltext)
|
|
buf = ""
|
|
chunks: List[str] = []
|
|
for p in paras:
|
|
p = (p or "").strip()
|
|
if not p:
|
|
continue
|
|
if len(buf) + (2 if buf else 0) + len(p) <= size:
|
|
buf = (buf + "\n\n" + p).strip() if buf else p
|
|
else:
|
|
if buf: chunks.append(buf)
|
|
if len(p) <= size:
|
|
buf = p
|
|
else:
|
|
for i in range(0, len(p), size):
|
|
chunks.append(p[i:i+size])
|
|
buf = ""
|
|
if buf: chunks.append(buf)
|
|
return [{"index": i, "text": c} for i, c in enumerate(chunks)]
|
|
|
|
def make_chunk_payloads(note: Any, *args, **kwargs) -> List[Dict[str, Any]]:
|
|
n = _as_dict(note)
|
|
path_arg, types_cfg_explicit = _pick_args(*args, **kwargs)
|
|
types_cfg = _load_types_config(types_cfg_explicit)
|
|
|
|
fm = n.get("frontmatter") or {}
|
|
note_type = str(fm.get("type") or n.get("type") or "note")
|
|
cfg_for_type = types_cfg.get(note_type, {}) if isinstance(types_cfg, dict) else {}
|
|
|
|
try:
|
|
default_rw = float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0))
|
|
except Exception:
|
|
default_rw = 1.0
|
|
|
|
retriever_weight = _coalesce(fm.get("retriever_weight"), cfg_for_type.get("retriever_weight"), default_rw)
|
|
try:
|
|
retriever_weight = float(retriever_weight)
|
|
except Exception:
|
|
retriever_weight = default_rw
|
|
|
|
chunk_profile = _coalesce(fm.get("chunk_profile"), cfg_for_type.get("chunk_profile"), os.environ.get("MINDNET_DEFAULT_CHUNK_PROFILE","medium"))
|
|
chunk_profile = chunk_profile if isinstance(chunk_profile, str) else "medium"
|
|
|
|
note_id = n.get("note_id") or n.get("id") or fm.get("id")
|
|
title = n.get("title") or fm.get("title") or ""
|
|
path = n.get("path") or path_arg
|
|
if isinstance(path, pathlib.Path):
|
|
path = str(path)
|
|
path = path or "" # immer vorhanden
|
|
|
|
fulltext = _text_from_note(n, path)
|
|
chunks = _iter_chunks(n, chunk_profile, fulltext)
|
|
|
|
payloads: List[Dict[str, Any]] = []
|
|
for c in chunks:
|
|
idx = c.get("index", len(payloads))
|
|
text = c.get("text") if isinstance(c, dict) else (str(c) if c is not None else "")
|
|
text = text if isinstance(text, str) else str(text or "")
|
|
|
|
key = f"{note_id}|{idx}"
|
|
h = hashlib.sha1(key.encode("utf-8")).hexdigest()[:12] if note_id else hashlib.sha1(f"{path}|{idx}".encode("utf-8")).hexdigest()[:12]
|
|
chunk_id = f"{note_id}-{idx:03d}-{h}" if note_id else f"{h}"
|
|
|
|
payload = {
|
|
"note_id": note_id,
|
|
"chunk_id": chunk_id,
|
|
"index": idx,
|
|
"title": title,
|
|
"type": note_type,
|
|
"path": path, # <- garantiert vorhanden
|
|
"text": text, # <- nie leer, sonst werden keine Chunks erzeugt
|
|
"retriever_weight": retriever_weight,
|
|
"chunk_profile": chunk_profile,
|
|
}
|
|
json.loads(json.dumps(payload, ensure_ascii=False))
|
|
payloads.append(payload)
|
|
|
|
return payloads
|