Dateien nach "app/core" hochladen
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-08 22:06:21 +01:00
parent b84906283e
commit 6dc37ccb66
2 changed files with 284 additions and 465 deletions

View File

@ -1,280 +1,144 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Modul: app/core/chunk_payload.py
# Version: 2.3.1
# Datum: 2025-11-08
#
# Zweck
# -----
# Erzeugt Qdrant-Payloads für Chunks. Voll abwärtskompatibel zu v2.2.0.
# Fixes:
# - 'retriever_weight' aus Frontmatter wird IMMER in jeden Chunk-Payload übernommen
# (Float; Default via ENV MINDNET_DEFAULT_RETRIEVER_WEIGHT, sonst 1.0).
# - 'chunk_profile' aus Frontmatter wird falls vorhanden in jeden Chunk-Payload übernommen.
# - Robustere Fenster/Overlap-Erzeugung bleibt erhalten.
#
# Hinweis zu Qdrant:
# Qdrant ist schemaflexibel. Ein Feld erscheint in der UI/HTTP-API erst,
# wenn mindestens 1 Punkt es im Payload besitzt. Für konsistente Typisierung
# empfiehlt sich zusätzlich eine Payload-Index-Definition (z.B. FLOAT für
# 'retriever_weight').
"""
chunk_payload.py Mindnet payload helpers
Version: 0.5.2 (generated 2025-11-08 21:03:48)
Purpose:
- Build CHUNK payloads list while preserving existing chunk fields (text, seq, etc.).
- Inject into *every* chunk:
* retriever_weight (resolved like note payload)
* chunk_profile (resolved like note payload)
Resolution order identical to note_payload.make_note_payload.
Signature tolerant to match existing importers.
"""
from __future__ import annotations
from typing import Any, Dict, List, Optional, Union
from pathlib import Path
import os
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
try:
# Typgerechtes Overlap aus deiner Konfiguration holen
from app.core.chunk_config import get_sizes as _get_sizes
except Exception:
def _get_sizes(_note_type: str):
# konservativer Default, falls Import fehlschlägt
return {"overlap": (40, 60), "target": (250, 350), "max": 500}
import yaml # type: ignore
except Exception: # pragma: no cover
yaml = None # will skip YAML loading if unavailable
# ------------------------------- Utils ------------------------------- #
def _get_attr_or_key(obj: Any, key: str, default=None):
def _coerce_mapping(obj: Any) -> Dict[str, Any]:
if obj is None:
return default
return {{}}
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
return dict(obj)
out: Dict[str, Any] = {{}}
if hasattr(obj, "__dict__"):
out.update(getattr(obj, "__dict__"))
for k in ("id","note_id","title","type","path","source_path","frontmatter"):
if hasattr(obj, k) and k not in out:
out[k] = getattr(obj, k)
return out
def _as_window_text(chunk: Any) -> str:
"""Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern)."""
for k in ("window", "text", "content", "raw"):
v = _get_attr_or_key(chunk, k, None)
if isinstance(v, str) and v:
return v
return ""
def _to_int(x: Any, default: int = 0) -> int:
try:
return int(x)
except Exception:
return default
def _coerce_chunk_dict(obj: Any) -> Dict[str, Any]:
if isinstance(obj, dict):
return dict(obj)
d = {{}}
# common attributes for a chunk object
for k in ("chunk_id","id","note_id","seq","start","end","text","title","type","source_path"):
if hasattr(obj, k):
d[k] = getattr(obj, k)
if hasattr(obj, "__dict__"):
for k,v in obj.__dict__.items():
d.setdefault(k, v)
return d
def _normalize_rel_path(p: str) -> str:
p = (p or "").replace("\\", "/")
while p.startswith("/"):
p = p[1:]
return p
def _to_float(val: Any, default: float) -> float:
try:
if val is None:
return float(default)
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip().replace(",", ".")
return float(s)
except Exception:
return float(default)
def _get_frontmatter(parsed: Dict[str, Any]) -> Dict[str, Any]:
fm = parsed.get("frontmatter")
return dict(fm) if isinstance(fm, dict) else {{}}
# ---------------------- Overlap & Offsets ---------------------------- #
def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]:
"""
Entfernt linkes Overlap aus echten Fenster-Strings.
Rückgabe: (segments, overlaps_left, reconstructed_text)
"""
segments: List[str] = []
overlaps_left: List[int] = []
reconstructed = ""
for w in windows:
w = w or ""
max_k = min(len(w), len(reconstructed))
k = 0
for cand in range(max_k, -1, -1):
if reconstructed.endswith(w[:cand]):
k = cand
def _load_types_from_yaml(types_file: Optional[Union[str, Path]]) -> Dict[str, Any]:
if types_file is None:
for cand in (Path("config/types.yaml"), Path("config/types.yml"), Path("config.yaml"), Path("config.yml")):
if cand.exists():
types_file = cand
break
seg = w[k:]
segments.append(seg)
overlaps_left.append(k)
reconstructed += seg
return segments, overlaps_left, reconstructed
if types_file is None or yaml is None:
return {{}}
p = Path(types_file)
if not p.exists():
return {{}}
try:
data = yaml.safe_load(p.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {{}}
if "types" in data and isinstance(data["types"], dict):
return dict(data["types"])
return data
except Exception:
return {{}}
def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int:
"""Länge längsten Suffix(a), der Prefix(b) ist."""
if not a or not b:
return 0
a1 = a[-max_probe:]
b1 = b[:max_probe]
n = min(len(a1), len(b1))
for k in range(n, 0, -1):
if a1[-k:] == b1[:k]:
return k
return 0
# ----------------------------- Public API ---------------------------- #
def _resolve_type_defaults(note_type: Optional[str], types: Optional[Dict[str,Any]]) -> Dict[str, Any]:
if not note_type or not types or not isinstance(types, dict):
return {{}}
block = types.get(note_type)
return dict(block) if isinstance(block, dict) else {{}}
def make_chunk_payloads(
frontmatter: Dict[str, Any],
rel_path: str,
chunks: Iterable[Union[Dict[str, Any], Any]],
note_text: Optional[str] = None,
) -> List[Dict[str, Any]]:
"""
Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden,
erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap.
Zusätzlich werden 'retriever_weight' (float) und 'chunk_profile' übernommen.
"""
note_id = str(frontmatter.get("id") or "").strip()
note_type = str(frontmatter.get("type", "")).lower()
note_title = frontmatter.get("title", None)
note_tags = frontmatter.get("tags", None)
rel_path = _normalize_rel_path(rel_path)
# --- neue Felder aus FM (mit Defaults) ---
default_rw = _to_float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0), 1.0)
fm_rw = _to_float(frontmatter.get("retriever_weight"), default_rw)
fm_chunk_profile = frontmatter.get("chunk_profile") or frontmatter.get("profile") or None
def _to_float(val: Any, fallback: float) -> float:
if val is None:
return fallback
try:
return float(val)
except Exception:
return fallback
# 1) Rohdaten sammeln (so wie geliefert)
chunks_list = list(chunks)
raw_windows: List[str] = []
seqs: List[int] = []
ids_in: List[Optional[str]] = []
token_counts: List[Optional[int]] = []
section_titles: List[Optional[str]] = []
section_paths: List[Optional[str]] = []
any_explicit_window = False
for idx, c in enumerate(chunks_list):
# Fensterquelle
w = _get_attr_or_key(c, "window", None)
if isinstance(w, str) and w:
any_explicit_window = True
raw_windows.append(w)
else:
raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz
# Ordnung
seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx))
# IDs, Tokens, Sektionen
cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None))
ids_in.append(str(cid) if isinstance(cid, str) and cid else None)
tc = _get_attr_or_key(c, "token_count", None)
token_counts.append(_to_int(tc, 0) if tc is not None else None)
section_titles.append(_get_attr_or_key(c, "section_title", None))
section_paths.append(_get_attr_or_key(c, "section_path", None))
# 2) Segmente & Overlaps bestimmen
if any_explicit_window:
# Es existieren echte Fenster → dedupe, um Kernsegmente zu finden
segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows)
windows_final = raw_windows[:] # bereits mit Overlap geliefert
else:
# Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher),
# wir erzeugen synthetische Fenster mit Overlap gemäß Typ
segments = [w or "" for w in raw_windows]
overlaps_left = []
windows_final = []
recon = ""
try:
overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60)))
except Exception:
overlap_low, overlap_high = (40, 60)
overlap_target = int(overlap_low)
for i, seg in enumerate(segments):
if i == 0:
# erstes Fenster: kein linker Kontext
windows_final.append(seg)
overlaps_left.append(0)
recon += seg
else:
# synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts
k = min(overlap_target, len(recon))
left_ctx = recon[-k:] if k > 0 else ""
windows_final.append(left_ctx + seg)
overlaps_left.append(k)
recon += seg # Rekonstruktion bleibt kerntreu
# 3) overlap_right bestimmen
overlaps_right: List[int] = []
for i in range(len(windows_final)):
if i + 1 < len(windows_final):
ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096)
else:
ov = 0
overlaps_right.append(ov)
# 4) start/end-Offsets (exakt via note_text, sonst kumulativ)
starts: List[int] = [0] * len(segments)
ends: List[int] = [0] * len(segments)
pos = 0
if isinstance(note_text, str) and note_text:
search_pos = 0
for i, seg in enumerate(segments):
if not seg:
starts[i] = ends[i] = search_pos
def _first_nonempty(*vals):
for v in vals:
if v is not None:
if isinstance(v, str) and v.strip() == "":
continue
j = note_text.find(seg, search_pos)
if j >= 0:
starts[i] = j
ends[i] = j + len(seg)
search_pos = ends[i]
else:
# Fallback: kumulativ
starts[i] = pos
pos += len(seg)
ends[i] = pos
else:
for i, seg in enumerate(segments):
starts[i] = pos
pos += len(seg)
ends[i] = pos
return v
return None
# 5) Payload-Dicts
payloads: List[Dict[str, Any]] = []
for i, (win, seg) in enumerate(zip(windows_final, segments)):
chunk_id = ids_in[i] or f"{note_id}#{i+1}"
pl: Dict[str, Any] = {
"note_id": note_id,
"chunk_id": chunk_id,
"id": chunk_id, # Alias
"chunk_index": i,
"seq": seqs[i],
"path": rel_path,
"window": win,
"text": seg,
"start": starts[i],
"end": ends[i],
"overlap_left": overlaps_left[i],
"overlap_right": overlaps_right[i],
# NEU:
"retriever_weight": fm_rw,
}
# optionale Metadaten
if note_type:
pl["type"] = note_type
if note_title is not None:
pl["title"] = note_title
if note_tags is not None:
pl["tags"] = note_tags
if token_counts[i] is not None:
pl["token_count"] = int(token_counts[i])
if section_titles[i] is not None:
pl["section_title"] = section_titles[i]
if section_paths[i] is not None:
sp = str(section_paths[i]).replace("\\", "/")
pl["section_path"] = sp if sp else "/"
if fm_chunk_profile is not None:
pl["chunk_profile"] = str(fm_chunk_profile)
payloads.append(pl)
def make_chunk_payloads(parsed_note: Any, chunks: List[Any], **kwargs) -> List[Dict[str, Any]]:
parsed = _coerce_mapping(parsed_note)
fm = _get_frontmatter(parsed)
return payloads
# external sources
types_registry = kwargs.get("types") or kwargs.get("types_registry")
types_from_yaml = _load_types_from_yaml(kwargs.get("types_file"))
types_all: Dict[str, Any] = types_registry if isinstance(types_registry, dict) else types_from_yaml
if __name__ == "__main__": # pragma: no cover
fm = {"id": "demo", "title": "Demo", "type": "concept", "retriever_weight": 0.75, "chunk_profile": "tight"}
chunks = [
{"id": "demo#1", "text": "Alpha Beta Gamma"},
{"id": "demo#2", "text": "Gamma Delta"},
{"id": "demo#3", "text": "Delta Epsilon Zeta"},
]
pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta")
from pprint import pprint
pprint(pls)
note_type: Optional[str] = _first_nonempty(parsed.get("type"), fm.get("type"))
type_defaults = _resolve_type_defaults(note_type, types_all)
env_default = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT")
env_default_val = _to_float(env_default, 1.0) if env_default is not None else 1.0
effective_retriever_weight = _to_float(
_first_nonempty(
fm.get("retriever_weight"),
type_defaults.get("retriever_weight"),
env_default_val,
1.0,
),
1.0,
)
effective_chunk_profile = _first_nonempty(
fm.get("chunk_profile"),
fm.get("profile"),
type_defaults.get("chunk_profile"),
os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE"),
)
out: List[Dict[str, Any]] = []
for ch in chunks or []:
payload = _coerce_chunk_dict(ch) # preserve all existing chunk fields
payload["retriever_weight"] = effective_retriever_weight
if effective_chunk_profile is not None:
payload["chunk_profile"] = effective_chunk_profile
out.append(payload)
return out

View File

@ -1,246 +1,201 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Modul: app/core/note_payload.py
# Version: 1.8.0
# Datum: 2025-11-08
# Änderungen:
# - 'retriever_weight' (Float; Default via ENV MINDNET_DEFAULT_RETRIEVER_WEIGHT, sonst 1.0) aus Frontmatter in Note-Payload übernommen.
# - 'chunk_profile' (falls vorhanden) übernommen.
# - Hash-Logik unverändert, kompatibel zu 1.7.0.
"""
note_payload.py Mindnet payload helpers
Version: 0.5.2 (generated 2025-11-08 21:03:48)
Purpose:
- Build a NOTE payload without dropping existing fields.
- Resolve and inject:
* retriever_weight
* chunk_profile
* edge_defaults
Resolution order:
1) Frontmatter fields
2) Type defaults from a provided registry ('types' kwarg) OR YAML file (types_file kwarg).
YAML formats supported:
- root['types'][note_type]{{retriever_weight, chunk_profile, edge_defaults}}
- root[note_type] is the type block directly
3) ENV MINDNET_DEFAULT_RETRIEVER_WEIGHT
4) Fallback 1.0
Notes:
- Function signature tolerant: accepts **kwargs (e.g. vault_root, types_file, types, types_registry).
- Does NOT attempt to create edges; it only exposes 'edge_defaults' in the NOTE payload for later stages.
"""
from __future__ import annotations
import argparse
import hashlib
import json
from typing import Any, Dict, Optional, Mapping, Union
import os
from typing import Any, Dict, Optional, Tuple
from pathlib import Path
try:
from app.core.parser import read_markdown, extract_wikilinks, FRONTMATTER_RE
import yaml # type: ignore
except Exception: # pragma: no cover
from .parser import read_markdown, extract_wikilinks, FRONTMATTER_RE # type: ignore
yaml = None # will skip YAML loading if unavailable
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _canon_frontmatter(fm: Dict[str, Any]) -> str:
return json.dumps(fm or {}, ensure_ascii=False, separators=(",", ":"), sort_keys=True)
# -------- helpers --------
def _normalize_body(body: str, mode: str) -> str:
if mode == "none":
return body if body is not None else ""
text = (body or "").replace("\r\n", "\n").replace("\r", "\n")
text = "\n".join(line.rstrip() for line in text.split("\n"))
return text
def _coerce_mapping(obj: Any) -> Dict[str, Any]:
if obj is None:
return {{}}
if isinstance(obj, dict):
return dict(obj)
# try common attributes
out: Dict[str, Any] = {{}}
for k in ("__dict__",):
if hasattr(obj, k):
out.update(getattr(obj, k))
# named attributes we often see
for k in ("id","note_id","title","type","path","source_path","frontmatter"):
if hasattr(obj, k) and k not in out:
out[k] = getattr(obj, k)
return out
def _resolve_hash_mode(explicit: Optional[str]) -> str:
if explicit:
val = explicit.strip().lower()
else:
val = (os.environ.get("MINDNET_HASH_MODE") or os.environ.get("MINDNET_HASH_COMPARE") or "body").strip().lower()
if val in ("full", "fulltext", "body+frontmatter", "bodyplusfrontmatter"):
return "full"
if val in ("frontmatter", "fm"):
return "frontmatter"
return "body"
def _read_raw_body_from_file(file_path: Optional[str]) -> Tuple[str, Dict[str, Any]]:
if not file_path or not os.path.exists(file_path):
return "", {}
def _get_frontmatter(parsed: Mapping[str, Any]) -> Dict[str, Any]:
fm = parsed.get("frontmatter")
if isinstance(fm, dict):
return dict(fm)
return {{}} # tolerate notes without frontmatter
def _load_types_from_yaml(types_file: Optional[Union[str, Path]]) -> Dict[str, Any]:
if types_file is None:
# try common defaults
candidates = [
Path("config/types.yaml"),
Path("config/types.yml"),
Path("config.yaml"),
Path("config.yml"),
]
for p in candidates:
if p.exists():
types_file = p
break
if types_file is None:
return {{}}
p = Path(types_file)
if not p.exists() or yaml is None:
return {{}}
try:
with open(file_path, "r", encoding="utf-8") as f:
raw = f.read()
data = yaml.safe_load(p.read_text(encoding="utf-8"))
if not isinstance(data, dict):
return {{}}
# support both shapes: {{types: {{concept: ...}}}} OR {{concept: ...}}
if "types" in data and isinstance(data["types"], dict):
return dict(data["types"])
return data
except Exception:
return "", {}
m = FRONTMATTER_RE.match(raw)
fm = {}
if m:
fm_txt = m.group(1)
try:
import yaml # lazy
fm = yaml.safe_load(fm_txt) or {}
except Exception:
fm = {}
body = raw[m.end():]
else:
body = raw
return body, fm
return {{}}
def _sha256(s: str) -> str:
h = hashlib.sha256()
h.update(s.encode("utf-8"))
return h.hexdigest()
def _hash_for(mode: str, *, body: str, fm: Dict[str, Any], normalize: str) -> str:
body_n = _normalize_body(body or "", normalize)
fm_s = _canon_frontmatter(fm or {})
if mode == "frontmatter":
return _sha256(fm_s)
if mode == "full":
return _sha256(body_n + "\n--FM--\n" + fm_s)
# default: body
return _sha256(body_n)
def _resolve_type_defaults(note_type: Optional[str], types: Optional[Dict[str,Any]]) -> Dict[str, Any]:
defaults = {{}}
if not note_type or not types or not isinstance(types, dict):
return defaults
block = types.get(note_type)
if isinstance(block, dict):
defaults.update(block)
return defaults
def _to_float(val: Any, default: float) -> float:
def _to_float(val: Any, fallback: float) -> float:
if val is None:
return fallback
try:
if val is None:
return float(default)
if isinstance(val, (int, float)):
return float(val)
s = str(val).strip().replace(",", ".")
return float(s)
return float(val)
except Exception:
return float(default)
return fallback
# ---------------------------------------------------------------------------
# Kernfunktion
# ---------------------------------------------------------------------------
def make_note_payload(
parsed: Any,
vault_root: Optional[str] = None,
*,
hash_mode: Optional[str] = None,
hash_normalize: Optional[str] = None,
hash_source: Optional[str] = None,
file_path: Optional[str] = None,
) -> Dict[str, Any]:
"""
Liefert den Note-Payload inkl. Mehrfach-Hashes und FM-Feldern.
"""
# dict oder Objekt akzeptieren
if isinstance(parsed, dict):
fm = parsed.get("frontmatter") or {}
body_parsed = parsed.get("body") or ""
path = parsed.get("path") or ""
else:
fm = getattr(parsed, "frontmatter", {}) or {}
body_parsed = getattr(parsed, "body", "") or ""
path = getattr(parsed, "path", "") or ""
def _first_nonempty(*vals):
for v in vals:
if v is not None:
if isinstance(v, str) and v.strip() == "":
continue
return v
return None
# Zielpfad relativieren
rel_path = path
try:
if vault_root:
rel = os.path.relpath(path, vault_root)
rel = rel.replace("\\", "/").lstrip("/")
rel_path = rel
except Exception:
pass
# Konfiguration auflösen
mode_resolved = _resolve_hash_mode(hash_mode) # body|frontmatter|full
src = (hash_source or os.environ.get("MINDNET_HASH_SOURCE", "parsed")).strip().lower() # parsed|raw
norm = (hash_normalize or os.environ.get("MINDNET_HASH_NORMALIZE", "canonical")).strip().lower() # canonical|none
# -------- main API --------
# Body-Quelle laden
raw_body, raw_fm = ("", {})
if src == "raw":
raw_body, raw_fm = _read_raw_body_from_file(file_path or path)
if isinstance(raw_fm, dict) and raw_fm:
merged_fm = dict(fm)
for k, v in raw_fm.items():
merged_fm.setdefault(k, v)
fm = merged_fm
body_for_hash = raw_body
else:
body_for_hash = body_parsed
def make_note_payload(parsed_note: Any, **kwargs) -> Dict[str, Any]:
parsed = _coerce_mapping(parsed_note)
fm = _get_frontmatter(parsed)
# --- 1) Standard-Tripel (parsed:canonical) immer erzeugen ---
std_src = "parsed"
std_norm = "canonical"
std_hashes: Dict[str, str] = {}
for m in ("body", "frontmatter", "full"):
std_hashes[f"{m}:{std_src}:{std_norm}"] = _hash_for(
m, body=body_parsed, fm=fm, normalize=std_norm
)
# external sources
types_registry = kwargs.get("types") or kwargs.get("types_registry")
types_from_yaml = _load_types_from_yaml(kwargs.get("types_file"))
# registry wins over YAML if provided
types_all: Dict[str, Any] = types_registry if isinstance(types_registry, dict) else types_from_yaml
# Convenience-Felder (für Tools)
hash_body = std_hashes["body:parsed:canonical"]
hash_frontmatter = std_hashes["frontmatter:parsed:canonical"]
hash_full = std_hashes["full:parsed:canonical"]
note_type: Optional[str] = _first_nonempty(parsed.get("type"), fm.get("type"))
title: Optional[str] = _first_nonempty(parsed.get("title"), fm.get("title"))
note_id: Optional[str] = _first_nonempty(parsed.get("note_id"), parsed.get("id"), fm.get("id"))
# --- 2) Hashes für die *aktuelle* Konfiguration (falls abweichend) ---
cur_hashes: Dict[str, str] = {}
if not (src == std_src and norm == std_norm):
for m in ("body", "frontmatter", "full"):
cur_hashes[f"{m}:{src}:{norm}"] = _hash_for(
m, body=body_for_hash, fm=fm, normalize=norm
)
type_defaults = _resolve_type_defaults(note_type, types_all)
# --- 3) Aktueller Modus für Backwards-Compat Felder ---
current_hash = _hash_for(mode_resolved, body=body_for_hash, fm=fm, normalize=norm)
hash_signature = f"{mode_resolved}:{src}:{norm}:{current_hash}"
# --- resolve retriever_weight ---
env_default = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT")
env_default_val = _to_float(env_default, 1.0) if env_default is not None else 1.0
# Wikilinks (Note-Ebene)
refs = list(dict.fromkeys(extract_wikilinks(body_parsed))) if body_parsed else []
effective_retriever_weight = _to_float(
_first_nonempty(
fm.get("retriever_weight"),
type_defaults.get("retriever_weight"),
env_default_val,
1.0,
),
1.0,
)
# NEU: Defaults & Casting
default_rw = _to_float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0), 1.0)
fm_rw = _to_float(fm.get("retriever_weight"), default_rw)
fm_chunk_profile = fm.get("chunk_profile") or fm.get("profile") or None
# --- resolve chunk_profile ---
effective_chunk_profile = _first_nonempty(
fm.get("chunk_profile"),
fm.get("profile"),
type_defaults.get("chunk_profile"),
os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE"),
)
payload: Dict[str, Any] = {
"note_id": fm.get("id") or fm.get("note_id"),
"title": fm.get("title"),
"type": fm.get("type"),
"status": fm.get("status"),
"created": fm.get("created"),
"updated": fm.get("updated"),
"path": rel_path or fm.get("path"),
"tags": fm.get("tags"),
# Volltext für verlustfreien Export
"fulltext": body_parsed,
# Backwards-Compat:
"hash_fulltext": current_hash,
"hash_signature": hash_signature,
# Option C: Mehrfach-Hashes
"hashes": {**std_hashes, **cur_hashes},
"hash_body": hash_body,
"hash_frontmatter": hash_frontmatter,
"hash_full": hash_full,
# Fallback-Refs
"references": refs,
# NEU:
"retriever_weight": fm_rw,
}
# --- resolve edge_defaults (list[str]) ---
edge_defaults = _first_nonempty(
fm.get("edge_defaults"),
type_defaults.get("edge_defaults"),
)
if edge_defaults is None:
edge_defaults = []
if isinstance(edge_defaults, str):
# allow "a,b,c"
edge_defaults = [s.strip() for s in edge_defaults.split(",") if s.strip()]
elif not isinstance(edge_defaults, list):
edge_defaults = []
if fm_chunk_profile is not None:
payload["chunk_profile"] = str(fm_chunk_profile)
# Start payload by preserving existing parsed keys (shallow copy); DO NOT drop fields
payload: Dict[str, Any] = dict(parsed)
for k in ("area", "project", "source", "lang", "slug", "aliases"):
if k in fm:
payload[k] = fm[k]
# Ensure canonical top-level fields
if note_id is not None:
payload["id"] = note_id
payload["note_id"] = note_id
if title is not None:
payload["title"] = title
if note_type is not None:
payload["type"] = note_type
payload["retriever_weight"] = effective_retriever_weight
if effective_chunk_profile is not None:
payload["chunk_profile"] = effective_chunk_profile
if edge_defaults:
payload["edge_defaults"] = edge_defaults
# keep frontmatter merged (without duplication)
if "frontmatter" in payload and isinstance(payload["frontmatter"], dict):
fm_out = dict(payload["frontmatter"])
fm_out.setdefault("type", note_type)
fm_out["retriever_weight"] = effective_retriever_weight
if effective_chunk_profile is not None:
fm_out["chunk_profile"] = effective_chunk_profile
if edge_defaults:
fm_out["edge_defaults"] = edge_defaults
payload["frontmatter"] = fm_out
return payload
# ---------------------------------------------------------------------------
# CLI Sichtprüfung
# ---------------------------------------------------------------------------
def _cli() -> None:
ap = argparse.ArgumentParser(description="Note-Payload aus Markdown erzeugen und anzeigen")
ap.add_argument("--from-file", dest="src", required=True)
ap.add_argument("--vault-root", dest="vault_root", default=None)
ap.add_argument("--print", dest="do_print", action="store_true")
ap.add_argument("--hash-mode", choices=["body", "frontmatter", "full"], default=None)
ap.add_argument("--hash-normalize", choices=["canonical", "none"], default=None)
ap.add_argument("--hash-source", choices=["parsed", "raw"], default=None)
args = ap.parse_args()
parsed = read_markdown(args.src)
payload = make_note_payload(
parsed,
vault_root=args.vault_root,
hash_mode=args.hash_mode,
hash_normalize=args.hash_normalize,
hash_source=args.hash_source,
file_path=args.src,
)
if args.do_print:
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == "__main__": # pragma: no cover
_cli()