Dateien nach "app/core" hochladen
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
This commit is contained in:
parent
c7644a36aa
commit
f18a40d76c
|
|
@ -1,182 +1,161 @@
|
|||
#!/usr/bin/env python3
|
||||
# -*- coding: utf-8 -*-
|
||||
"""
|
||||
app/core/chunk_payload.py — Mindnet V2 (compat)
|
||||
app/core/chunk_payload.py (Mindnet V2 — robust)
|
||||
|
||||
Ziele (unveränderte v1-Basis, weniger Duplikate):
|
||||
- **Kanonicum:** `index`
|
||||
- **Standard‑Alias (v2):** `ord` (abschaltbar über ENV MINDNET_CHUNK_INCLUDE_ORD=0)
|
||||
- **Optionale Aliase:** gesteuert über ENV MINDNET_CHUNK_INDEX_ALIASES
|
||||
(z. B. "chunk_num,Chunk_Nummer" oder "Chunk_Number"). Standard: kein zusätzlicher Alias.
|
||||
- Verarbeitet Chunks als Dict **oder** Objekt (Dataclass) und setzt immer `id` (= `chunk_id`)
|
||||
- Berechnet `neighbors.prev/next`, falls nicht vorhanden
|
||||
- Denormalisiert Note‑`tags` auf Chunks
|
||||
- Akzeptiert `file_path=` als Alias zu `path_arg`
|
||||
Aufgabe
|
||||
-------
|
||||
Erzeugt Chunk-Payloads aus den vom Chunker gelieferten "Chunk"-Objekten.
|
||||
- Spiegelt `retriever_weight` und `chunk_profile` in **jedem** Chunk-Payload.
|
||||
- Werteauflösung: Frontmatter > types.yaml > Defaults.
|
||||
- Lädt `config/types.yaml` selbst, wenn `types_cfg` nicht übergeben wurde.
|
||||
|
||||
ENV:
|
||||
- MINDNET_CHUNK_INCLUDE_ORD: "1" (Default) | "0"
|
||||
- MINDNET_CHUNK_INDEX_ALIASES: CSV‑Liste zulässiger Namen: chunk_num,Chunk_Nummer,Chunk_Number
|
||||
Eingang
|
||||
-------
|
||||
- note: Dict mit mind. { frontmatter: {...}, id, type, title, path }
|
||||
- note_path: Pfad der Note (für Payload-Feld `path`)
|
||||
- chunks_from_chunker: Liste von Objekten mit Attributen/Feldern:
|
||||
id, note_id, index, text, window, neighbors_prev, neighbors_next
|
||||
- note_text: voller Text der Note (optional, kann leer sein)
|
||||
- types_cfg: optional; wenn None → config wird intern geladen
|
||||
- file_path: optional, für Debug/Tracing im Payload
|
||||
|
||||
Ausgang (pro Chunk)
|
||||
-------------------
|
||||
- Pflichtfelder: note_id, chunk_id, index (0-basiert), ord (1-basiert), type, tags
|
||||
- Texte: text, window
|
||||
- Nachbarn: neighbors_prev, neighbors_next
|
||||
- Spiegelungen: retriever_weight, chunk_profile
|
||||
- Meta: source_path, path, section (leer), created/updated opt. aus Frontmatter
|
||||
|
||||
Hinweis: `edge_defaults` sind Note‑Regeln (nicht pro Chunk).
|
||||
"""
|
||||
from __future__ import annotations
|
||||
|
||||
import json
|
||||
import os
|
||||
import pathlib
|
||||
import hashlib
|
||||
from typing import Any, Dict, List, Optional
|
||||
import os, yaml
|
||||
|
||||
from app.core.chunker import assemble_chunks
|
||||
def _env(n: str, d: Optional[str]=None) -> str:
|
||||
v = os.getenv(n)
|
||||
return v if v is not None else (d or "")
|
||||
|
||||
# ---------- Helpers ----------
|
||||
def _deep_get(root: Any, path: str) -> Any:
|
||||
cur = root
|
||||
for key in path.split("."):
|
||||
if not isinstance(cur, dict) or key not in cur:
|
||||
return None
|
||||
cur = cur[key]
|
||||
return cur
|
||||
|
||||
def _as_dict(obj):
|
||||
if isinstance(obj, dict):
|
||||
return obj
|
||||
d = {}
|
||||
for k in ("index","ord","chunk_index","text","window","id","chunk_id","neighbors","note_id","type","title"):
|
||||
if hasattr(obj, k):
|
||||
d[k] = getattr(obj, k)
|
||||
return d
|
||||
def _as_float(x: Any):
|
||||
try:
|
||||
return float(x)
|
||||
except Exception:
|
||||
return None
|
||||
|
||||
def _coalesce(*vals):
|
||||
for v in vals:
|
||||
if v is not None:
|
||||
return v
|
||||
def _load_types_local() -> dict:
|
||||
p = _env("MINDNET_TYPES_FILE", "./config/types.yaml")
|
||||
try:
|
||||
with open(p, "r", encoding="utf-8") as f:
|
||||
return yaml.safe_load(f) or {}
|
||||
except Exception:
|
||||
return {}
|
||||
|
||||
def _effective_chunk_profile(note_type: str, fm: Dict[str, Any], reg: dict) -> Optional[str]:
|
||||
# Frontmatter zuerst
|
||||
if isinstance(fm.get("chunk_profile"), str):
|
||||
return fm.get("chunk_profile")
|
||||
# Registry
|
||||
types = reg.get("types") if isinstance(reg.get("types"), dict) else reg
|
||||
if isinstance(types, dict):
|
||||
v = types.get(note_type, {})
|
||||
if isinstance(v, dict):
|
||||
cp = v.get("chunk_profile")
|
||||
if isinstance(cp, str):
|
||||
return cp
|
||||
return None
|
||||
|
||||
def _env_float(name: str, default: float) -> float:
|
||||
try:
|
||||
return float(os.environ.get(name, default))
|
||||
except Exception:
|
||||
return default
|
||||
def _effective_retriever_weight(note_type: str, fm: Dict[str, Any], reg: dict) -> float:
|
||||
# Frontmatter zuerst
|
||||
if fm.get("retriever_weight") is not None:
|
||||
v = _as_float(fm.get("retriever_weight"))
|
||||
if v is not None:
|
||||
return float(v)
|
||||
# Registry-Pfade
|
||||
types = reg.get("types") if isinstance(reg.get("types"), dict) else reg
|
||||
candidates = [
|
||||
f"{note_type}.retriever_weight",
|
||||
f"{note_type}.retriever.weight",
|
||||
f"{note_type}.retrieval.weight",
|
||||
"defaults.retriever_weight",
|
||||
"defaults.retriever.weight",
|
||||
"global.retriever_weight",
|
||||
"global.retriever.weight",
|
||||
]
|
||||
for path in candidates:
|
||||
# Wenn types == reg-root (flatten), erlauben sowohl "types.X" als auch "X"
|
||||
val = _deep_get(types, path) if "." in path else (types.get(path) if isinstance(types, dict) else None)
|
||||
if val is None and isinstance(reg, dict):
|
||||
# versuche absolute Pfade
|
||||
val = _deep_get(reg, f"types.{path}")
|
||||
v = _as_float(val)
|
||||
if v is not None:
|
||||
return float(v)
|
||||
return 1.0
|
||||
|
||||
def _ensure_list(x) -> list:
|
||||
if x is None: return []
|
||||
if isinstance(x, list): return [str(i) for i in x]
|
||||
if isinstance(x, (set, tuple)): return [str(i) for i in x]
|
||||
return [str(x)]
|
||||
def make_chunk_payloads(note: Dict[str, Any],
|
||||
note_path: str,
|
||||
chunks_from_chunker: List[Any],
|
||||
*,
|
||||
note_text: str = "",
|
||||
types_cfg: Optional[dict] = None,
|
||||
file_path: Optional[str] = None) -> List[Dict[str, Any]]:
|
||||
fm = (note or {}).get("frontmatter", {})
|
||||
note_type = fm.get("type") or note.get("type") or "concept"
|
||||
reg = types_cfg if isinstance(types_cfg, dict) else _load_types_local()
|
||||
|
||||
def _text_from_note(note: Dict[str, Any]) -> str:
|
||||
return note.get("body") or note.get("text") or ""
|
||||
# Effektive Werte bestimmen
|
||||
cp = _effective_chunk_profile(note_type, fm, reg)
|
||||
rw = _effective_retriever_weight(note_type, fm, reg)
|
||||
|
||||
tags = fm.get("tags") or []
|
||||
if isinstance(tags, str):
|
||||
tags = [tags]
|
||||
|
||||
def _iter_chunks(note: Dict[str, Any], chunk_profile: str, fulltext: str) -> List[Dict[str, Any]]:
|
||||
"""Nutze bestehenden assemble_chunks(note_id, body, type). Rückgabe kann Objektliste sein."""
|
||||
note_id = note.get("id") or (note.get("frontmatter") or {}).get("id")
|
||||
ntype = (note.get("frontmatter") or {}).get("type") or note.get("type") or "note"
|
||||
raw_list = assemble_chunks(note_id, fulltext, ntype)
|
||||
out: List[Dict[str, Any]] = []
|
||||
for c in raw_list:
|
||||
out.append(_as_dict(c) if not isinstance(c, dict) else c)
|
||||
return out
|
||||
|
||||
# ---------- Main ----------
|
||||
for idx, ch in enumerate(chunks_from_chunker):
|
||||
# Chunk-Grunddaten (Attribute oder Keys)
|
||||
cid = getattr(ch, "id", None) or (ch.get("id") if isinstance(ch, dict) else None)
|
||||
nid = getattr(ch, "note_id", None) or (ch.get("note_id") if isinstance(ch, dict) else fm.get("id"))
|
||||
index = getattr(ch, "index", None) or (ch.get("index") if isinstance(ch, dict) else idx)
|
||||
text = getattr(ch, "text", None) or (ch.get("text") if isinstance(ch, dict) else "")
|
||||
window = getattr(ch, "window", None) or (ch.get("window") if isinstance(ch, dict) else text)
|
||||
prev_id = getattr(ch, "neighbors_prev", None) or (ch.get("neighbors_prev") if isinstance(ch, dict) else None)
|
||||
next_id = getattr(ch, "neighbors_next", None) or (ch.get("neighbors_next") if isinstance(ch, dict) else None)
|
||||
|
||||
def make_chunk_payloads(
|
||||
note: Any,
|
||||
path_arg: Optional[str] = None,
|
||||
chunks_from_chunker: Optional[List[Dict[str, Any]]] = None,
|
||||
*,
|
||||
file_path: Optional[str] = None,
|
||||
note_text: Optional[str] = None,
|
||||
types_cfg: Optional[dict] = None,
|
||||
) -> List[Dict[str, Any]]:
|
||||
n = note if isinstance(note, dict) else {"frontmatter": {}}
|
||||
fm = n.get("frontmatter") or {}
|
||||
note_type = str(fm.get("type") or n.get("type") or "note")
|
||||
types_cfg = types_cfg or {}
|
||||
cfg_for_type = types_cfg.get(note_type, {}) if isinstance(types_cfg, dict) else {}
|
||||
|
||||
default_rw = _env_float("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0)
|
||||
retriever_weight = _coalesce(fm.get("retriever_weight"), cfg_for_type.get("retriever_weight"), default_rw)
|
||||
try:
|
||||
retriever_weight = float(retriever_weight)
|
||||
except Exception:
|
||||
retriever_weight = default_rw
|
||||
|
||||
chunk_profile = _coalesce(fm.get("chunk_profile"), cfg_for_type.get("chunk_profile"), os.environ.get("MINDNET_DEFAULT_CHUNK_PROFILE","medium"))
|
||||
chunk_profile = chunk_profile if isinstance(chunk_profile, str) else "medium"
|
||||
|
||||
note_id = n.get("note_id") or n.get("id") or fm.get("id")
|
||||
title = n.get("title") or fm.get("title") or ""
|
||||
|
||||
# Pfad (file_path > note['path'] > path_arg)
|
||||
path = file_path or n.get("path") or path_arg
|
||||
if isinstance(path, pathlib.Path):
|
||||
path = str(path)
|
||||
path = path or ""
|
||||
|
||||
# Tags denormalisieren (optional)
|
||||
tags = fm.get("tags") or fm.get("keywords") or n.get("tags")
|
||||
tags_list = _ensure_list(tags) if tags else []
|
||||
|
||||
# Chunks holen
|
||||
fulltext = note_text if isinstance(note_text, str) else _text_from_note(n)
|
||||
raw_chunks = chunks_from_chunker if isinstance(chunks_from_chunker, list) else _iter_chunks(n, chunk_profile, fulltext)
|
||||
|
||||
include_ord = (os.environ.get("MINDNET_CHUNK_INCLUDE_ORD", "1") != "0")
|
||||
alias_csv = os.environ.get("MINDNET_CHUNK_INDEX_ALIASES", "").strip()
|
||||
extra_aliases = [a.strip() for a in alias_csv.split(",") if a.strip()] if alias_csv else []
|
||||
|
||||
payloads: List[Dict[str, Any]] = []
|
||||
for c in raw_chunks:
|
||||
cdict = c if isinstance(c, dict) else _as_dict(c)
|
||||
idx = _coalesce(cdict.get("index"), cdict.get("ord"), cdict.get("chunk_index"), len(payloads))
|
||||
try:
|
||||
idx = int(idx)
|
||||
except Exception:
|
||||
idx = len(payloads)
|
||||
|
||||
text = _coalesce(cdict.get("window"), cdict.get("text"), "")
|
||||
if not isinstance(text, str):
|
||||
text = str(text or "")
|
||||
|
||||
# deterministische ID
|
||||
key = f"{note_id}|{idx}"
|
||||
h = hashlib.sha1(key.encode("utf-8")).hexdigest()[:12] if note_id else hashlib.sha1(f"{path}|{idx}".encode("utf-8")).hexdigest()[:12]
|
||||
chunk_id = cdict.get("chunk_id") or cdict.get("id") or (f"{note_id}-{idx:03d}-{h}" if note_id else h)
|
||||
|
||||
payload = {
|
||||
"id": chunk_id, # v1 erwartet 'id'
|
||||
"chunk_id": chunk_id,
|
||||
"index": idx, # Kanonisch
|
||||
"note_id": note_id,
|
||||
pl: Dict[str, Any] = {
|
||||
"note_id": nid,
|
||||
"chunk_id": cid,
|
||||
"index": int(index),
|
||||
"ord": int(index) + 1,
|
||||
"type": note_type,
|
||||
"title": title,
|
||||
"path": path,
|
||||
"tags": tags,
|
||||
"text": text,
|
||||
"window": text,
|
||||
"retriever_weight": retriever_weight,
|
||||
"chunk_profile": chunk_profile,
|
||||
"window": window,
|
||||
"neighbors_prev": prev_id,
|
||||
"neighbors_next": next_id,
|
||||
"section": getattr(ch, "section", None) or (ch.get("section") if isinstance(ch, dict) else ""),
|
||||
"path": note_path,
|
||||
"source_path": file_path or note_path,
|
||||
"retriever_weight": float(rw),
|
||||
}
|
||||
if include_ord:
|
||||
payload["ord"] = idx # v2‑Standard, abschaltbar
|
||||
for alias in extra_aliases:
|
||||
# nur whitelisted Namen zulassen
|
||||
if alias in ("chunk_num","Chunk_Nummer","Chunk_Number"):
|
||||
payload[alias] = idx
|
||||
if cp is not None:
|
||||
pl["chunk_profile"] = cp
|
||||
|
||||
nb = cdict.get("neighbors")
|
||||
if isinstance(nb, dict):
|
||||
prev_id = nb.get("prev"); next_id = nb.get("next")
|
||||
payload["neighbors"] = {"prev": prev_id, "next": next_id}
|
||||
# Aufräumen: keine historischen Aliasfelder
|
||||
for alias in ("chunk_num", "Chunk_Number"):
|
||||
if alias in pl:
|
||||
pl.pop(alias, None)
|
||||
|
||||
if tags_list:
|
||||
payload["tags"] = tags_list
|
||||
out.append(pl)
|
||||
|
||||
json.loads(json.dumps(payload, ensure_ascii=False))
|
||||
payloads.append(payload)
|
||||
|
||||
# neighbors berechnen, falls fehlend
|
||||
for i, p in enumerate(payloads):
|
||||
nb = p.get("neighbors") or {}
|
||||
prev_id = nb.get("prev")
|
||||
next_id = nb.get("next")
|
||||
if prev_id is None and i > 0:
|
||||
prev_id = payloads[i-1]["id"]
|
||||
if next_id is None and i+1 < len(payloads):
|
||||
next_id = payloads[i+1]["id"]
|
||||
p["neighbors"] = {"prev": prev_id, "next": next_id}
|
||||
|
||||
return payloads
|
||||
return out
|
||||
|
|
|
|||
Loading…
Reference in New Issue
Block a user