Dateien nach "app/core" hochladen
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-09 09:51:05 +01:00
parent bbd5a7fa48
commit 4dcd606c10
2 changed files with 372 additions and 341 deletions

View File

@ -1,55 +1,105 @@
""" """
chunk_payload.py Mindnet payload builder (Chunks) chunk_payload.py v1.4.2
Version: 1.3.0 (2025-11-09) -------------------------
Robuste, abwärtskompatible Payload-Erzeugung für Chunks.
Purpose Ziele
------- - Setzt pro Chunk `text`, `retriever_weight`, `chunk_profile`, `note_id`.
Build Qdrant-compatible JSON payloads for *chunks* of a parsed note. - Akzeptiert ParsedNote-Objekte *oder* Dicts, inklusive bereits vorsegmentierter .chunks.
Tolerant to different call signatures and accepts both dict-like and object-like inputs. - Verträgt zusätzliche args/kwargs (kompatibel zu älteren Aufrufern).
- Konfig-Auflösung identisch zu note_payload.py.
Key features Autor: ChatGPT
------------ Lizenz: MIT
- Reads type defaults from `config/config.yaml` or `config/types.yaml` (same schema).
- Resolves fields with precedence:
Frontmatter > type-defaults > ENV > fallback.
- Sets per chunk:
* `note_id`, `note_title`, `type`
* `retriever_weight` (float)
* `chunk_profile` (short|medium|long)
* `text` (never empty: falls back to whole note body/text)
* `order`, `section`, `start`, `end` (if available)
- Backwards-compatible signature: accepts **kwargs to swallow unknown args.
Input
-----
`parsed_note` may be:
- dict with keys: id, title, body/text, chunks(list), frontmatter(dict), type
- object with equivalent attributes
Each chunk may be dict-like or object-like with keys/attrs such as:
id, text, order, section, start, end
""" """
from __future__ import annotations from __future__ import annotations
import os import os
import hashlib
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, List, Optional, Union
try: try:
import yaml # type: ignore import yaml # type: ignore
except Exception: # pragma: no cover except Exception: # pragma: no cover
yaml = None yaml = None # type: ignore
def _get(obj: Any, key: str, default: Any = None) -> Any:
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def _frontmatter(obj: Any) -> Dict[str, Any]: def _as_dict(note: Any) -> Dict[str, Any]:
fm = _get(obj, "frontmatter", {}) or {} if isinstance(note, dict):
return fm if isinstance(fm, dict) else {} return dict(note)
out: Dict[str, Any] = {}
for attr in ("note_id", "id", "title", "type", "frontmatter", "meta", "body", "text", "content", "path", "chunks"):
if hasattr(note, attr):
out[attr] = getattr(note, attr)
if hasattr(note, "__dict__"):
for k, v in note.__dict__.items():
if k not in out:
out[k] = v
return out
def _load_types_config(search_root: Optional[Union[str, Path]] = None,
preloaded: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
if isinstance(preloaded, dict) and "types" in preloaded:
return preloaded
candidates: List[Path] = []
if search_root:
root = Path(search_root)
candidates.extend([root / "config.yaml", root / "config" / "config.yaml", root / "config" / "types.yaml"])
cwd = Path.cwd()
candidates.extend([cwd / "config.yaml", cwd / "config" / "config.yaml", cwd / "config" / "types.yaml"])
for p in candidates:
if p.exists() and p.is_file():
if yaml is None:
break
try:
data = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
if isinstance(data, dict) and "types" in data:
return data
except Exception:
pass
return {"version": "1.0", "types": {}}
def _safe_get(d: Dict[str, Any], key: str, default: Any = None) -> Any:
if not isinstance(d, dict):
return default
return d.get(key, default)
def _resolve_type(note_d: Dict[str, Any]) -> str:
fm = note_d.get("frontmatter") or {}
t = _safe_get(fm, "type") or note_d.get("type")
if not t and isinstance(note_d.get("meta"), dict):
t = note_d["meta"].get("type")
return str(t or "concept")
def _resolve_note_id(note_d: Dict[str, Any]) -> Optional[str]:
for k in ("note_id", "id"):
v = note_d.get(k)
if isinstance(v, str) and v:
return v
return None
def _resolve_body(note_d: Dict[str, Any]) -> str:
for k in ("body", "text", "content"):
v = note_d.get(k)
if isinstance(v, str) and v.strip():
return v
return ""
def _resolve_defaults_for_type(types_cfg: Dict[str, Any], typ: str) -> Dict[str, Any]:
if not isinstance(types_cfg, dict):
return {}
t = (types_cfg.get("types") or {}).get(typ) or {}
return t if isinstance(t, dict) else {}
def _coerce_float(val: Any, default: float) -> float: def _coerce_float(val: Any, default: float) -> float:
try: try:
@ -57,124 +107,109 @@ def _coerce_float(val: Any, default: float) -> float:
return default return default
if isinstance(val, (int, float)): if isinstance(val, (int, float)):
return float(val) return float(val)
if isinstance(val, str) and val.strip(): if isinstance(val, str):
return float(val.strip()) return float(val.strip())
except Exception: except Exception:
pass pass
return default return default
def _normalize_chunk_profile(val: Any, fallback: str = "medium") -> str:
if not isinstance(val, str):
return fallback
v = val.strip().lower()
if v in {"short", "medium", "long"}:
return v
return fallback
def _safe_text(s: Any) -> str: def _compute_retriever_weight(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> float:
if s is None: fm = note_d.get("frontmatter") or {}
return "" if "retriever_weight" in fm:
return _coerce_float(fm.get("retriever_weight"), 1.0)
tdef = _resolve_defaults_for_type(types_cfg, typ)
if "retriever_weight" in tdef:
return _coerce_float(tdef.get("retriever_weight"), 1.0)
envv = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT")
if envv:
return _coerce_float(envv, 1.0)
return 1.0
def _compute_chunk_profile(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> str:
fm = note_d.get("frontmatter") or {}
if "chunk_profile" in fm:
return str(fm.get("chunk_profile"))
tdef = _resolve_defaults_for_type(types_cfg, typ)
if "chunk_profile" in tdef:
return str(tdef.get("chunk_profile"))
envv = os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE")
if envv:
return str(envv)
return "medium"
def _norm_chunk_text(s: Any) -> str:
if isinstance(s, str): if isinstance(s, str):
return s return s.strip()
return str(s) return ""
def _load_types_config(
explicit_config: Optional[Dict[str, Any]] = None,
search_root: Union[str, Path, None] = None,
) -> Dict[str, Any]:
if explicit_config and isinstance(explicit_config, dict):
if "types" in explicit_config and isinstance(explicit_config["types"], dict):
return explicit_config
if yaml is None:
return {"types": {}}
candidates: List[Path] = []
root = Path(search_root) if search_root else Path.cwd()
candidates.append(root / "config" / "config.yaml")
candidates.append(root / "config" / "types.yaml")
candidates.append(Path.cwd() / "config" / "config.yaml")
candidates.append(Path.cwd() / "config" / "types.yaml")
for p in candidates:
try:
if p.exists():
import yaml as _y
with p.open("r", encoding="utf-8") as f:
loaded = _y.safe_load(f) or {}
if isinstance(loaded, dict) and isinstance(loaded.get("types"), dict):
return {"types": loaded["types"]}
except Exception:
continue
return {"types": {}}
def _type_defaults(note_type: str, cfg: Dict[str, Any]) -> Dict[str, Any]: def _hash(s: str) -> str:
return (cfg.get("types") or {}).get(note_type, {}) if isinstance(cfg, dict) else {} return hashlib.sha1(s.encode("utf-8")).hexdigest()[:12]
def make_chunk_payloads(
parsed_note: Any,
config: Optional[Dict[str, Any]] = None,
**kwargs: Any,
) -> List[Dict[str, Any]]:
search_root = kwargs.get("search_root")
fm = _frontmatter(parsed_note)
note_type = fm.get("type") or _get(parsed_note, "type") or "concept"
note_type = str(note_type).strip().lower()
cfg = _load_types_config(config, search_root) def make_chunk_payloads(note: Any, *args, **kwargs) -> List[Dict[str, Any]]:
defaults = _type_defaults(note_type, cfg) """Erzeugt Payloads für alle Chunks der Note.
# Resolve retriever_weight: FM > type-defaults > ENV > 1.0 Akzeptierte zusätzliche kwargs:
rw = fm.get("retriever_weight") - types_config: dict wie in config.yaml
if rw is None: - search_root / vault_root: für Konfigsuche
rw = defaults.get("retriever_weight")
if rw is None: *args werden ignoriert (Kompatibilität zu älteren Aufrufern).
env_rw = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT") """
rw = _coerce_float(env_rw, 1.0) note_d = _as_dict(note)
types_config = kwargs.get("types_config")
search_root = kwargs.get("search_root") or kwargs.get("vault_root")
types_cfg = _load_types_config(search_root, types_config)
typ = _resolve_type(note_d)
note_id = _resolve_note_id(note_d) or ""
r_weight = _compute_retriever_weight(note_d, types_cfg, typ)
c_profile = _compute_chunk_profile(note_d, types_cfg, typ)
out: List[Dict[str, Any]] = []
# 1) Falls der Parser bereits Chunks liefert, nutzen
pre = note_d.get("chunks")
if isinstance(pre, list) and pre:
for idx, c in enumerate(pre):
if isinstance(c, dict):
text = _norm_chunk_text(c.get("text") or c.get("body") or c.get("content"))
else: else:
rw = _coerce_float(rw, 1.0) text = _norm_chunk_text(getattr(c, "text", ""))
# Resolve chunk_profile: FM > type-defaults > ENV > medium
cp = fm.get("chunk_profile")
if cp is None:
cp = defaults.get("chunk_profile")
if cp is None:
cp = os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE", "medium")
cp = _normalize_chunk_profile(cp, "medium")
note_id = _get(parsed_note, "id")
note_title = _get(parsed_note, "title")
body = _get(parsed_note, "body") or _get(parsed_note, "text") or ""
items = _get(parsed_note, "chunks") or []
payloads: List[Dict[str, Any]] = []
if not items:
items = [{
"id": f"{note_id}::0" if note_id else None,
"text": body,
"order": 0,
"section": None,
"start": 0,
"end": len(body) if isinstance(body, str) else None,
}]
for ch in items:
text = _safe_text(_get(ch, "text"))
if not text: if not text:
text = _safe_text(body) # Fallback auf Note-Body, falls leer
text = _resolve_body(note_d)
if not text:
continue
chunk_id = f"{note_id}#{idx:03d}" if note_id else _hash(text)[:8]
payload = { payload = {
"note_id": note_id, "note_id": note_id,
"note_title": note_title, "chunk_id": chunk_id,
"type": note_type,
"retriever_weight": float(rw),
"chunk_profile": cp,
"text": text, "text": text,
"order": _get(ch, "order"), "retriever_weight": float(r_weight),
"section": _get(ch, "section"), "chunk_profile": str(c_profile),
"start": _get(ch, "start"), "type": typ,
"end": _get(ch, "end"),
"chunk_id": _get(ch, "id"),
} }
payload = {k: v for k, v in payload.items() if v is not None} out.append(payload)
payloads.append(payload)
return payloads # 2) Sonst als Single-Chunk aus Body/Text
if not out:
text = _resolve_body(note_d)
if text:
chunk_id = f"{note_id}#000" if note_id else _hash(text)[:8]
out.append({
"note_id": note_id,
"chunk_id": chunk_id,
"text": text,
"retriever_weight": float(r_weight),
"chunk_profile": str(c_profile),
"type": typ,
})
return out

View File

@ -1,81 +1,100 @@
""" """
note_payload.py Mindnet payload builder (Notes) note_payload.py v1.4.2
Version: 1.3.0 (2025-11-09) ------------------------
Robuste, abwärtskompatible Payload-Erzeugung für Notes.
Purpose Ziele
------- - Setzt `retriever_weight`, `chunk_profile`, `edge_defaults` deterministisch.
Build Qdrant-compatible JSON payloads for *notes* from a parsed Markdown - Priorität: Frontmatter > Typ-Defaults (config/config.yaml oder config/types.yaml) > ENV > Fallback.
representation. The function is tolerant to different call signatures and - Akzeptiert ParsedNote-Objekte *oder* Dicts.
accepts both dict-like and object-like "ParsedNote" inputs. - Verträgt zusätzliche kwargs (z. B. vault_root/search_root/cfg).
- Keine Verwendung nicht-serialisierbarer Typen.
Key features Hinweis
------------ - Diese Datei **lädt Konfig** nur opportunistisch (./config/config.yaml oder ./config/types.yaml relativ zum CWD
- Reads type defaults from `config/config.yaml` or `config/types.yaml` (same schema). bzw. zu `search_root`/`vault_root`, falls übergeben). Wenn dein Aufrufer bereits eine Konfiguration geladen hat,
- Resolves fields with the following precedence: kann er sie via `types_config` kwarg übergeben (dict wie in deinem Beispiel).
Frontmatter > type-defaults > ENV > hard-coded fallback.
- Ensures only JSON-serializable types are included (no sets, Path, callables).
- Sets/normalizes:
* `type` : note type (e.g., concept, task, experience, project)
* `retriever_weight` : float, influences retrieval blending downstream
* `chunk_profile` : short | medium | long (string)
* `edge_defaults` : list[str], used by edge builder outside of this module
- Backwards-compatible signature: accepts **kwargs to swallow unknown args
(e.g., vault_root, prefix, ...).
Expected input (flexible) Autor: ChatGPT
------------------------- Lizenz: MIT
`parsed_note` may be:
- dict with keys: id, title, body/text, path, frontmatter (dict), type, ...
- object with attributes: id, title, body/text, path, frontmatter, type, ...
Schema for config files
-----------------------
version: 1.0
types:
concept:
chunk_profile: medium
edge_defaults: ["references", "related_to"]
retriever_weight: 0.33
task:
chunk_profile: short
edge_defaults: ["depends_on", "belongs_to"]
retriever_weight: 0.8
experience:
chunk_profile: medium
edge_defaults: ["derived_from", "inspired_by"]
retriever_weight: 0.9
project:
chunk_profile: long
edge_defaults: ["references", "depends_on"]
retriever_weight: 0.95
""" """
from __future__ import annotations from __future__ import annotations
import json
import os import os
from pathlib import Path from pathlib import Path
from typing import Any, Dict, List, Optional, Union from typing import Any, Dict, Optional, Union, List
try: try:
import yaml # type: ignore import yaml # type: ignore
except Exception: # pragma: no cover except Exception: # pragma: no cover - yaml ist optional, wir degradieren dann sauber
yaml = None # The caller must ensure PyYAML is installed yaml = None # type: ignore
# ------------------------------ # ------------------------------
# Helpers # Hilfsfunktionen (keine I/O Magie)
# ------------------------------ # ------------------------------
def _get(obj: Any, key: str, default: Any = None) -> Any: def _as_dict(note: Any) -> Dict[str, Any]:
"""Get key from dict-like or attribute from object-like.""" """Konvertiert eine ParsedNote-ähnliche Struktur robust in ein Dict."""
if isinstance(obj, dict): if isinstance(note, dict):
return obj.get(key, default) return dict(note)
return getattr(obj, key, default) # Objekt -> vorsichtig Attribute lesen
out: Dict[str, Any] = {}
for attr in ("note_id", "id", "title", "type", "frontmatter", "meta", "body", "text", "content", "path"):
if hasattr(note, attr):
out[attr] = getattr(note, attr)
# Manche Parser haben .data / .raw etc.
if hasattr(note, "__dict__"):
# nichts überschreiben, nur fehlende ergänzen (nur einfache Typen)
for k, v in note.__dict__.items():
if k not in out:
out[k] = v
return out
def _safe_get(d: Dict[str, Any], key: str, default: Any = None) -> Any:
"""Dict-get ohne Mutation, akzeptiert fehlende Dicts."""
if not isinstance(d, dict):
return default
return d.get(key, default)
def _load_types_config(search_root: Optional[Union[str, Path]] = None,
preloaded: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""Lädt Typ-Defaults aus config.yaml oder types.yaml (falls vorhanden).
Struktur erwartet wie im Beispiel:
{
"version": "1.0",
"types": {
"concept": {"chunk_profile": "medium", "edge_defaults": [...], "retriever_weight": 0.33},
...
}
}
"""
if isinstance(preloaded, dict) and "types" in preloaded:
return preloaded
candidates: List[Path] = []
if search_root:
root = Path(search_root)
candidates.extend([root / "config.yaml", root / "config" / "config.yaml", root / "config" / "types.yaml"])
# relative zum CWD
cwd = Path.cwd()
candidates.extend([cwd / "config.yaml", cwd / "config" / "config.yaml", cwd / "config" / "types.yaml"])
for p in candidates:
if p.exists() and p.is_file():
if yaml is None:
break
try:
data = yaml.safe_load(p.read_text(encoding="utf-8")) or {}
if isinstance(data, dict) and "types" in data:
return data
except Exception:
# still und hart, kein Crash bei kaputter Datei
pass
return {"version": "1.0", "types": {}}
def _frontmatter(obj: Any) -> Dict[str, Any]:
fm = _get(obj, "frontmatter", {}) or {}
return fm if isinstance(fm, dict) else {}
def _coerce_float(val: Any, default: float) -> float: def _coerce_float(val: Any, default: float) -> float:
try: try:
@ -83,170 +102,147 @@ def _coerce_float(val: Any, default: float) -> float:
return default return default
if isinstance(val, (int, float)): if isinstance(val, (int, float)):
return float(val) return float(val)
if isinstance(val, str) and val.strip(): if isinstance(val, str):
return float(val.strip()) return float(val.strip())
except Exception: except Exception:
pass pass
return default return default
def _normalize_chunk_profile(val: Any, fallback: str = "medium") -> str:
if not isinstance(val, str): def _ensure_str_list(v: Any) -> List[str]:
return fallback if v is None:
v = val.strip().lower() return []
if v in {"short", "medium", "long"}: if isinstance(v, (list, tuple)):
return [str(x) for x in v if x is not None]
return [str(v)]
def _resolve_type(note_d: Dict[str, Any]) -> str:
fm = note_d.get("frontmatter") or {}
t = _safe_get(fm, "type") or note_d.get("type")
if not t and isinstance(note_d.get("meta"), dict):
t = note_d["meta"].get("type")
return str(t or "concept")
def _resolve_title(note_d: Dict[str, Any]) -> str:
fm = note_d.get("frontmatter") or {}
t = _safe_get(fm, "title") or note_d.get("title")
return str(t or "")
def _resolve_note_id(note_d: Dict[str, Any]) -> Optional[str]:
for k in ("note_id", "id"):
v = note_d.get(k)
if isinstance(v, str) and v:
return v return v
return fallback return None
def _coerce_str_list(val: Any) -> List[str]:
if val is None: def _resolve_body(note_d: Dict[str, Any]) -> str:
return [] for k in ("body", "text", "content"):
if isinstance(val, list): v = note_d.get(k)
out: List[str] = [] if isinstance(v, str) and v.strip():
for x in val: return v
if isinstance(x, str): return ""
out.append(x)
else:
out.append(str(x)) def _resolve_defaults_for_type(types_cfg: Dict[str, Any], typ: str) -> Dict[str, Any]:
return out if not isinstance(types_cfg, dict):
if isinstance(val, str): return {}
# allow comma-separated t = (types_cfg.get("types") or {}).get(typ) or {}
return [x.strip() for x in val.split(",") if x.strip()] return t if isinstance(t, dict) else {}
def _compute_retriever_weight(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> float:
fm = note_d.get("frontmatter") or {}
# 1) Frontmatter
if "retriever_weight" in fm:
return _coerce_float(fm.get("retriever_weight"), 1.0)
# 2) Typ-Defaults
tdef = _resolve_defaults_for_type(types_cfg, typ)
if "retriever_weight" in tdef:
return _coerce_float(tdef.get("retriever_weight"), 1.0)
# 3) ENV
envv = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT")
if envv:
return _coerce_float(envv, 1.0)
# 4) Fallback
return 1.0
def _compute_chunk_profile(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> str:
fm = note_d.get("frontmatter") or {}
if "chunk_profile" in fm:
return str(fm.get("chunk_profile"))
tdef = _resolve_defaults_for_type(types_cfg, typ)
if "chunk_profile" in tdef:
return str(tdef.get("chunk_profile"))
envv = os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE")
if envv:
return str(envv)
return "medium"
def _compute_edge_defaults(note_d: Dict[str, Any], types_cfg: Dict[str, Any], typ: str) -> List[str]:
fm = note_d.get("frontmatter") or {}
if "edge_defaults" in fm:
return _ensure_str_list(fm.get("edge_defaults"))
tdef = _resolve_defaults_for_type(types_cfg, typ)
if "edge_defaults" in tdef:
return _ensure_str_list(tdef.get("edge_defaults"))
return [] return []
def _safe_jsonable(value: Any) -> Any:
"""Ensure value is JSON-serializable (no sets, Path, callables, etc.)."""
if isinstance(value, (str, int, float, bool)) or value is None:
return value
if isinstance(value, list):
return [_safe_jsonable(v) for v in value]
if isinstance(value, dict):
return {str(k): _safe_jsonable(v) for k, v in value.items()}
if isinstance(value, Path):
return str(value)
# Avoid sets and other iterables that are not JSON-serializable
try:
json.dumps(value)
return value
except Exception:
return str(value)
# ------------------------------ # ------------------------------
# Config loading # Öffentliche API
# ------------------------------ # ------------------------------
def _load_types_config( def make_note_payload(note: Any, *args, **kwargs) -> Dict[str, Any]:
explicit_config: Optional[Dict[str, Any]] = None, """Erzeugt das Payload-Dict für eine Note.
search_root: Union[str, Path, None] = None,
) -> Dict[str, Any]: Akzeptierte zusätzliche kwargs:
- types_config: bereits geladene Config (dict mit "types")
- search_root / vault_root: Ordner, in dem config/* gesucht wird
""" """
Load types config from: note_d = _as_dict(note)
1) explicit_config (if provided)
2) {search_root}/config/config.yaml
3) {search_root}/config/types.yaml
4) ./config/config.yaml
5) ./config/types.yaml
Returns a dict with shape: {"types": {...}} (empty if none found).
"""
if explicit_config and isinstance(explicit_config, dict):
if "types" in explicit_config and isinstance(explicit_config["types"], dict):
return explicit_config
candidates: List[Path] = [] # Konfig finden
root = Path(search_root) if search_root else Path.cwd() types_config = kwargs.get("types_config")
candidates.append(root / "config" / "config.yaml") search_root = kwargs.get("search_root") or kwargs.get("vault_root")
candidates.append(root / "config" / "types.yaml") types_cfg = _load_types_config(search_root, types_config)
# fallback to CWD when search_root was different
candidates.append(Path.cwd() / "config" / "config.yaml")
candidates.append(Path.cwd() / "config" / "types.yaml")
data = {} # Felder auflösen
if yaml is None: typ = _resolve_type(note_d)
return {"types": {}} title = _resolve_title(note_d)
note_id = _resolve_note_id(note_d)
body = _resolve_body(note_d)
for p in candidates: retriever_weight = _compute_retriever_weight(note_d, types_cfg, typ)
try: chunk_profile = _compute_chunk_profile(note_d, types_cfg, typ)
if p.exists(): edge_defaults = _compute_edge_defaults(note_d, types_cfg, typ)
with p.open("r", encoding="utf-8") as f:
loaded = yaml.safe_load(f) or {}
if isinstance(loaded, dict) and isinstance(loaded.get("types"), dict):
data = {"types": loaded["types"]}
break
except Exception:
continue
if not data:
data = {"types": {}}
return data
def _type_defaults(note_type: str, cfg: Dict[str, Any]) -> Dict[str, Any]:
return (cfg.get("types") or {}).get(note_type, {}) if isinstance(cfg, dict) else {}
# ------------------------------
# Public API
# ------------------------------
def make_note_payload(
parsed_note: Any,
*,
config: Optional[Dict[str, Any]] = None,
search_root: Union[str, Path, None] = None,
**kwargs: Any,
) -> Dict[str, Any]:
"""
Build the payload for a NOTE. Tolerates extra kwargs (e.g., vault_root, prefix).
"""
fm = _frontmatter(parsed_note)
note_type = fm.get("type") or _get(parsed_note, "type") or "concept"
note_type = str(note_type).strip().lower()
# Load config and resolve defaults
cfg = _load_types_config(config, search_root)
defaults = _type_defaults(note_type, cfg)
# retriever_weight: FM > type-defaults > ENV > 1.0
rw = fm.get("retriever_weight")
if rw is None:
rw = defaults.get("retriever_weight")
if rw is None:
env_rw = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT")
rw = _coerce_float(env_rw, 1.0)
else:
rw = _coerce_float(rw, 1.0)
# chunk_profile: FM > type-defaults > ENV > medium
cp = fm.get("chunk_profile")
if cp is None:
cp = defaults.get("chunk_profile")
if cp is None:
cp = os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE", "medium")
cp = _normalize_chunk_profile(cp, "medium")
# edge_defaults: FM > type-defaults > empty
edge_defs = fm.get("edge_defaults")
if edge_defs is None:
edge_defs = defaults.get("edge_defaults", [])
edge_defs = _coerce_str_list(edge_defs)
# Payload zusammenstellen (nur JSON-fähige Typen)
payload: Dict[str, Any] = { payload: Dict[str, Any] = {
"id": _get(parsed_note, "id"), "type": typ,
"note_id": _get(parsed_note, "id"), "title": title,
"title": _get(parsed_note, "title"), "retriever_weight": float(retriever_weight),
"type": note_type, "chunk_profile": str(chunk_profile),
"retriever_weight": float(rw), "edge_defaults": edge_defaults,
"chunk_profile": cp,
"edge_defaults": edge_defs,
# Useful passthrough/meta (all made JSON-safe)
"path": _safe_jsonable(_get(parsed_note, "path")),
"source": _safe_jsonable(_get(parsed_note, "source")),
} }
if note_id:
payload["note_id"] = note_id
if body:
payload["body_preview"] = body[:5000] # nur Vorschau, Retriever nutzt Chunks
# Include raw frontmatter keys (stringify keys; make safe) # Frontmatter relevante Keys durchreichen (ohne Binärdaten/Objekte)
fm = note_d.get("frontmatter") or {}
if isinstance(fm, dict): if isinstance(fm, dict):
for k, v in fm.items(): for k, v in fm.items():
# avoid overwriting normalized fields if k in ("type", "retriever_weight", "chunk_profile", "edge_defaults"):
if k in {"type", "retriever_weight", "chunk_profile", "edge_defaults"}:
continue continue
payload[f"fm_{k}"] = _safe_jsonable(v) # nur einfache/nützliche Typen durchlassen
if isinstance(v, (str, int, float, bool, list, dict)) or v is None:
payload[f"fm_{k}"] = v
# Remove None values to keep payload clean
payload = {k: v for k, v in payload.items() if v is not None}
return payload return payload