diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index a07ad40..fdeb80a 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -1,280 +1,144 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Modul: app/core/chunk_payload.py -# Version: 2.3.1 -# Datum: 2025-11-08 -# -# Zweck -# ----- -# Erzeugt Qdrant-Payloads für Chunks. Voll abwärtskompatibel zu v2.2.0. -# Fixes: -# - 'retriever_weight' aus Frontmatter wird IMMER in jeden Chunk-Payload übernommen -# (Float; Default via ENV MINDNET_DEFAULT_RETRIEVER_WEIGHT, sonst 1.0). -# - 'chunk_profile' aus Frontmatter wird – falls vorhanden – in jeden Chunk-Payload übernommen. -# - Robustere Fenster/Overlap-Erzeugung bleibt erhalten. -# -# Hinweis zu Qdrant: -# Qdrant ist schemaflexibel. Ein Feld erscheint in der UI/HTTP-API erst, -# wenn mindestens 1 Punkt es im Payload besitzt. Für konsistente Typisierung -# empfiehlt sich zusätzlich eine Payload-Index-Definition (z.B. FLOAT für -# 'retriever_weight'). + +""" +chunk_payload.py — Mindnet payload helpers +Version: 0.5.2 (generated 2025-11-08 21:03:48) +Purpose: + - Build CHUNK payloads list while preserving existing chunk fields (text, seq, etc.). + - Inject into *every* chunk: + * retriever_weight (resolved like note payload) + * chunk_profile (resolved like note payload) +Resolution order identical to note_payload.make_note_payload. +Signature tolerant to match existing importers. +""" from __future__ import annotations - +from typing import Any, Dict, List, Optional, Union +from pathlib import Path import os -from typing import Any, Dict, Iterable, List, Optional, Tuple, Union try: - # Typgerechtes Overlap aus deiner Konfiguration holen - from app.core.chunk_config import get_sizes as _get_sizes -except Exception: - def _get_sizes(_note_type: str): - # konservativer Default, falls Import fehlschlägt - return {"overlap": (40, 60), "target": (250, 350), "max": 500} + import yaml # type: ignore +except Exception: # pragma: no cover + yaml = None # will skip YAML loading if unavailable -# ------------------------------- Utils ------------------------------- # -def _get_attr_or_key(obj: Any, key: str, default=None): +def _coerce_mapping(obj: Any) -> Dict[str, Any]: if obj is None: - return default + return {{}} if isinstance(obj, dict): - return obj.get(key, default) - return getattr(obj, key, default) + return dict(obj) + out: Dict[str, Any] = {{}} + if hasattr(obj, "__dict__"): + out.update(getattr(obj, "__dict__")) + for k in ("id","note_id","title","type","path","source_path","frontmatter"): + if hasattr(obj, k) and k not in out: + out[k] = getattr(obj, k) + return out -def _as_window_text(chunk: Any) -> str: - """Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern).""" - for k in ("window", "text", "content", "raw"): - v = _get_attr_or_key(chunk, k, None) - if isinstance(v, str) and v: - return v - return "" -def _to_int(x: Any, default: int = 0) -> int: - try: - return int(x) - except Exception: - return default +def _coerce_chunk_dict(obj: Any) -> Dict[str, Any]: + if isinstance(obj, dict): + return dict(obj) + d = {{}} + # common attributes for a chunk object + for k in ("chunk_id","id","note_id","seq","start","end","text","title","type","source_path"): + if hasattr(obj, k): + d[k] = getattr(obj, k) + if hasattr(obj, "__dict__"): + for k,v in obj.__dict__.items(): + d.setdefault(k, v) + return d -def _normalize_rel_path(p: str) -> str: - p = (p or "").replace("\\", "/") - while p.startswith("/"): - p = p[1:] - return p -def _to_float(val: Any, default: float) -> float: - try: - if val is None: - return float(default) - if isinstance(val, (int, float)): - return float(val) - s = str(val).strip().replace(",", ".") - return float(s) - except Exception: - return float(default) +def _get_frontmatter(parsed: Dict[str, Any]) -> Dict[str, Any]: + fm = parsed.get("frontmatter") + return dict(fm) if isinstance(fm, dict) else {{}} -# ---------------------- Overlap & Offsets ---------------------------- # -def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]: - """ - Entfernt linkes Overlap aus echten Fenster-Strings. - Rückgabe: (segments, overlaps_left, reconstructed_text) - """ - segments: List[str] = [] - overlaps_left: List[int] = [] - reconstructed = "" - for w in windows: - w = w or "" - max_k = min(len(w), len(reconstructed)) - k = 0 - for cand in range(max_k, -1, -1): - if reconstructed.endswith(w[:cand]): - k = cand +def _load_types_from_yaml(types_file: Optional[Union[str, Path]]) -> Dict[str, Any]: + if types_file is None: + for cand in (Path("config/types.yaml"), Path("config/types.yml"), Path("config.yaml"), Path("config.yml")): + if cand.exists(): + types_file = cand break - seg = w[k:] - segments.append(seg) - overlaps_left.append(k) - reconstructed += seg - return segments, overlaps_left, reconstructed + if types_file is None or yaml is None: + return {{}} + p = Path(types_file) + if not p.exists(): + return {{}} + try: + data = yaml.safe_load(p.read_text(encoding="utf-8")) + if not isinstance(data, dict): + return {{}} + if "types" in data and isinstance(data["types"], dict): + return dict(data["types"]) + return data + except Exception: + return {{}} -def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int: - """Länge längsten Suffix(a), der Prefix(b) ist.""" - if not a or not b: - return 0 - a1 = a[-max_probe:] - b1 = b[:max_probe] - n = min(len(a1), len(b1)) - for k in range(n, 0, -1): - if a1[-k:] == b1[:k]: - return k - return 0 -# ----------------------------- Public API ---------------------------- # +def _resolve_type_defaults(note_type: Optional[str], types: Optional[Dict[str,Any]]) -> Dict[str, Any]: + if not note_type or not types or not isinstance(types, dict): + return {{}} + block = types.get(note_type) + return dict(block) if isinstance(block, dict) else {{}} -def make_chunk_payloads( - frontmatter: Dict[str, Any], - rel_path: str, - chunks: Iterable[Union[Dict[str, Any], Any]], - note_text: Optional[str] = None, -) -> List[Dict[str, Any]]: - """ - Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden, - erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap. - Zusätzlich werden 'retriever_weight' (float) und 'chunk_profile' übernommen. - """ - note_id = str(frontmatter.get("id") or "").strip() - note_type = str(frontmatter.get("type", "")).lower() - note_title = frontmatter.get("title", None) - note_tags = frontmatter.get("tags", None) - rel_path = _normalize_rel_path(rel_path) - # --- neue Felder aus FM (mit Defaults) --- - default_rw = _to_float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0), 1.0) - fm_rw = _to_float(frontmatter.get("retriever_weight"), default_rw) - fm_chunk_profile = frontmatter.get("chunk_profile") or frontmatter.get("profile") or None +def _to_float(val: Any, fallback: float) -> float: + if val is None: + return fallback + try: + return float(val) + except Exception: + return fallback - # 1) Rohdaten sammeln (so wie geliefert) - chunks_list = list(chunks) - raw_windows: List[str] = [] - seqs: List[int] = [] - ids_in: List[Optional[str]] = [] - token_counts: List[Optional[int]] = [] - section_titles: List[Optional[str]] = [] - section_paths: List[Optional[str]] = [] - any_explicit_window = False - for idx, c in enumerate(chunks_list): - # Fensterquelle - w = _get_attr_or_key(c, "window", None) - if isinstance(w, str) and w: - any_explicit_window = True - raw_windows.append(w) - else: - raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz - # Ordnung - seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx)) - # IDs, Tokens, Sektionen - cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None)) - ids_in.append(str(cid) if isinstance(cid, str) and cid else None) - tc = _get_attr_or_key(c, "token_count", None) - token_counts.append(_to_int(tc, 0) if tc is not None else None) - section_titles.append(_get_attr_or_key(c, "section_title", None)) - section_paths.append(_get_attr_or_key(c, "section_path", None)) - - # 2) Segmente & Overlaps bestimmen - if any_explicit_window: - # Es existieren echte Fenster → dedupe, um Kernsegmente zu finden - segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows) - windows_final = raw_windows[:] # bereits mit Overlap geliefert - else: - # Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher), - # wir erzeugen synthetische Fenster mit Overlap gemäß Typ - segments = [w or "" for w in raw_windows] - overlaps_left = [] - windows_final = [] - recon = "" - try: - overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60))) - except Exception: - overlap_low, overlap_high = (40, 60) - overlap_target = int(overlap_low) - - for i, seg in enumerate(segments): - if i == 0: - # erstes Fenster: kein linker Kontext - windows_final.append(seg) - overlaps_left.append(0) - recon += seg - else: - # synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts - k = min(overlap_target, len(recon)) - left_ctx = recon[-k:] if k > 0 else "" - windows_final.append(left_ctx + seg) - overlaps_left.append(k) - recon += seg # Rekonstruktion bleibt kerntreu - - # 3) overlap_right bestimmen - overlaps_right: List[int] = [] - for i in range(len(windows_final)): - if i + 1 < len(windows_final): - ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096) - else: - ov = 0 - overlaps_right.append(ov) - - # 4) start/end-Offsets (exakt via note_text, sonst kumulativ) - starts: List[int] = [0] * len(segments) - ends: List[int] = [0] * len(segments) - pos = 0 - if isinstance(note_text, str) and note_text: - search_pos = 0 - for i, seg in enumerate(segments): - if not seg: - starts[i] = ends[i] = search_pos +def _first_nonempty(*vals): + for v in vals: + if v is not None: + if isinstance(v, str) and v.strip() == "": continue - j = note_text.find(seg, search_pos) - if j >= 0: - starts[i] = j - ends[i] = j + len(seg) - search_pos = ends[i] - else: - # Fallback: kumulativ - starts[i] = pos - pos += len(seg) - ends[i] = pos - else: - for i, seg in enumerate(segments): - starts[i] = pos - pos += len(seg) - ends[i] = pos + return v + return None - # 5) Payload-Dicts - payloads: List[Dict[str, Any]] = [] - for i, (win, seg) in enumerate(zip(windows_final, segments)): - chunk_id = ids_in[i] or f"{note_id}#{i+1}" - pl: Dict[str, Any] = { - "note_id": note_id, - "chunk_id": chunk_id, - "id": chunk_id, # Alias - "chunk_index": i, - "seq": seqs[i], - "path": rel_path, - "window": win, - "text": seg, - "start": starts[i], - "end": ends[i], - "overlap_left": overlaps_left[i], - "overlap_right": overlaps_right[i], - # NEU: - "retriever_weight": fm_rw, - } - # optionale Metadaten - if note_type: - pl["type"] = note_type - if note_title is not None: - pl["title"] = note_title - if note_tags is not None: - pl["tags"] = note_tags - if token_counts[i] is not None: - pl["token_count"] = int(token_counts[i]) - if section_titles[i] is not None: - pl["section_title"] = section_titles[i] - if section_paths[i] is not None: - sp = str(section_paths[i]).replace("\\", "/") - pl["section_path"] = sp if sp else "/" - if fm_chunk_profile is not None: - pl["chunk_profile"] = str(fm_chunk_profile) - payloads.append(pl) +def make_chunk_payloads(parsed_note: Any, chunks: List[Any], **kwargs) -> List[Dict[str, Any]]: + parsed = _coerce_mapping(parsed_note) + fm = _get_frontmatter(parsed) - return payloads + # external sources + types_registry = kwargs.get("types") or kwargs.get("types_registry") + types_from_yaml = _load_types_from_yaml(kwargs.get("types_file")) + types_all: Dict[str, Any] = types_registry if isinstance(types_registry, dict) else types_from_yaml -if __name__ == "__main__": # pragma: no cover - fm = {"id": "demo", "title": "Demo", "type": "concept", "retriever_weight": 0.75, "chunk_profile": "tight"} - chunks = [ - {"id": "demo#1", "text": "Alpha Beta Gamma"}, - {"id": "demo#2", "text": "Gamma Delta"}, - {"id": "demo#3", "text": "Delta Epsilon Zeta"}, - ] - pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta") - from pprint import pprint - pprint(pls) + note_type: Optional[str] = _first_nonempty(parsed.get("type"), fm.get("type")) + type_defaults = _resolve_type_defaults(note_type, types_all) + + env_default = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT") + env_default_val = _to_float(env_default, 1.0) if env_default is not None else 1.0 + + effective_retriever_weight = _to_float( + _first_nonempty( + fm.get("retriever_weight"), + type_defaults.get("retriever_weight"), + env_default_val, + 1.0, + ), + 1.0, + ) + + effective_chunk_profile = _first_nonempty( + fm.get("chunk_profile"), + fm.get("profile"), + type_defaults.get("chunk_profile"), + os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE"), + ) + + out: List[Dict[str, Any]] = [] + for ch in chunks or []: + payload = _coerce_chunk_dict(ch) # preserve all existing chunk fields + payload["retriever_weight"] = effective_retriever_weight + if effective_chunk_profile is not None: + payload["chunk_profile"] = effective_chunk_profile + out.append(payload) + return out diff --git a/app/core/note_payload.py b/app/core/note_payload.py index af698c6..3eab9c5 100644 --- a/app/core/note_payload.py +++ b/app/core/note_payload.py @@ -1,246 +1,201 @@ -#!/usr/bin/env python3 -# -*- coding: utf-8 -*- -# Modul: app/core/note_payload.py -# Version: 1.8.0 -# Datum: 2025-11-08 -# Änderungen: -# - 'retriever_weight' (Float; Default via ENV MINDNET_DEFAULT_RETRIEVER_WEIGHT, sonst 1.0) aus Frontmatter in Note-Payload übernommen. -# - 'chunk_profile' (falls vorhanden) übernommen. -# - Hash-Logik unverändert, kompatibel zu 1.7.0. + +""" +note_payload.py — Mindnet payload helpers +Version: 0.5.2 (generated 2025-11-08 21:03:48) +Purpose: + - Build a NOTE payload without dropping existing fields. + - Resolve and inject: + * retriever_weight + * chunk_profile + * edge_defaults +Resolution order: + 1) Frontmatter fields + 2) Type defaults from a provided registry ('types' kwarg) OR YAML file (types_file kwarg). + YAML formats supported: + - root['types'][note_type]{{retriever_weight, chunk_profile, edge_defaults}} + - root[note_type] is the type block directly + 3) ENV MINDNET_DEFAULT_RETRIEVER_WEIGHT + 4) Fallback 1.0 +Notes: + - Function signature tolerant: accepts **kwargs (e.g. vault_root, types_file, types, types_registry). + - Does NOT attempt to create edges; it only exposes 'edge_defaults' in the NOTE payload for later stages. +""" from __future__ import annotations - -import argparse -import hashlib -import json +from typing import Any, Dict, Optional, Mapping, Union import os -from typing import Any, Dict, Optional, Tuple +from pathlib import Path try: - from app.core.parser import read_markdown, extract_wikilinks, FRONTMATTER_RE + import yaml # type: ignore except Exception: # pragma: no cover - from .parser import read_markdown, extract_wikilinks, FRONTMATTER_RE # type: ignore + yaml = None # will skip YAML loading if unavailable -# --------------------------------------------------------------------------- -# Helpers -# --------------------------------------------------------------------------- -def _canon_frontmatter(fm: Dict[str, Any]) -> str: - return json.dumps(fm or {}, ensure_ascii=False, separators=(",", ":"), sort_keys=True) +# -------- helpers -------- -def _normalize_body(body: str, mode: str) -> str: - if mode == "none": - return body if body is not None else "" - text = (body or "").replace("\r\n", "\n").replace("\r", "\n") - text = "\n".join(line.rstrip() for line in text.split("\n")) - return text +def _coerce_mapping(obj: Any) -> Dict[str, Any]: + if obj is None: + return {{}} + if isinstance(obj, dict): + return dict(obj) + # try common attributes + out: Dict[str, Any] = {{}} + for k in ("__dict__",): + if hasattr(obj, k): + out.update(getattr(obj, k)) + # named attributes we often see + for k in ("id","note_id","title","type","path","source_path","frontmatter"): + if hasattr(obj, k) and k not in out: + out[k] = getattr(obj, k) + return out -def _resolve_hash_mode(explicit: Optional[str]) -> str: - if explicit: - val = explicit.strip().lower() - else: - val = (os.environ.get("MINDNET_HASH_MODE") or os.environ.get("MINDNET_HASH_COMPARE") or "body").strip().lower() - if val in ("full", "fulltext", "body+frontmatter", "bodyplusfrontmatter"): - return "full" - if val in ("frontmatter", "fm"): - return "frontmatter" - return "body" -def _read_raw_body_from_file(file_path: Optional[str]) -> Tuple[str, Dict[str, Any]]: - if not file_path or not os.path.exists(file_path): - return "", {} +def _get_frontmatter(parsed: Mapping[str, Any]) -> Dict[str, Any]: + fm = parsed.get("frontmatter") + if isinstance(fm, dict): + return dict(fm) + return {{}} # tolerate notes without frontmatter + + +def _load_types_from_yaml(types_file: Optional[Union[str, Path]]) -> Dict[str, Any]: + if types_file is None: + # try common defaults + candidates = [ + Path("config/types.yaml"), + Path("config/types.yml"), + Path("config.yaml"), + Path("config.yml"), + ] + for p in candidates: + if p.exists(): + types_file = p + break + if types_file is None: + return {{}} + p = Path(types_file) + if not p.exists() or yaml is None: + return {{}} try: - with open(file_path, "r", encoding="utf-8") as f: - raw = f.read() + data = yaml.safe_load(p.read_text(encoding="utf-8")) + if not isinstance(data, dict): + return {{}} + # support both shapes: {{types: {{concept: ...}}}} OR {{concept: ...}} + if "types" in data and isinstance(data["types"], dict): + return dict(data["types"]) + return data except Exception: - return "", {} - m = FRONTMATTER_RE.match(raw) - fm = {} - if m: - fm_txt = m.group(1) - try: - import yaml # lazy - fm = yaml.safe_load(fm_txt) or {} - except Exception: - fm = {} - body = raw[m.end():] - else: - body = raw - return body, fm + return {{}} -def _sha256(s: str) -> str: - h = hashlib.sha256() - h.update(s.encode("utf-8")) - return h.hexdigest() -def _hash_for(mode: str, *, body: str, fm: Dict[str, Any], normalize: str) -> str: - body_n = _normalize_body(body or "", normalize) - fm_s = _canon_frontmatter(fm or {}) - if mode == "frontmatter": - return _sha256(fm_s) - if mode == "full": - return _sha256(body_n + "\n--FM--\n" + fm_s) - # default: body - return _sha256(body_n) +def _resolve_type_defaults(note_type: Optional[str], types: Optional[Dict[str,Any]]) -> Dict[str, Any]: + defaults = {{}} + if not note_type or not types or not isinstance(types, dict): + return defaults + block = types.get(note_type) + if isinstance(block, dict): + defaults.update(block) + return defaults -def _to_float(val: Any, default: float) -> float: + +def _to_float(val: Any, fallback: float) -> float: + if val is None: + return fallback try: - if val is None: - return float(default) - if isinstance(val, (int, float)): - return float(val) - s = str(val).strip().replace(",", ".") - return float(s) + return float(val) except Exception: - return float(default) + return fallback -# --------------------------------------------------------------------------- -# Kernfunktion -# --------------------------------------------------------------------------- -def make_note_payload( - parsed: Any, - vault_root: Optional[str] = None, - *, - hash_mode: Optional[str] = None, - hash_normalize: Optional[str] = None, - hash_source: Optional[str] = None, - file_path: Optional[str] = None, -) -> Dict[str, Any]: - """ - Liefert den Note-Payload inkl. Mehrfach-Hashes und FM-Feldern. - """ - # dict oder Objekt akzeptieren - if isinstance(parsed, dict): - fm = parsed.get("frontmatter") or {} - body_parsed = parsed.get("body") or "" - path = parsed.get("path") or "" - else: - fm = getattr(parsed, "frontmatter", {}) or {} - body_parsed = getattr(parsed, "body", "") or "" - path = getattr(parsed, "path", "") or "" +def _first_nonempty(*vals): + for v in vals: + if v is not None: + if isinstance(v, str) and v.strip() == "": + continue + return v + return None - # Zielpfad relativieren - rel_path = path - try: - if vault_root: - rel = os.path.relpath(path, vault_root) - rel = rel.replace("\\", "/").lstrip("/") - rel_path = rel - except Exception: - pass - # Konfiguration auflösen - mode_resolved = _resolve_hash_mode(hash_mode) # body|frontmatter|full - src = (hash_source or os.environ.get("MINDNET_HASH_SOURCE", "parsed")).strip().lower() # parsed|raw - norm = (hash_normalize or os.environ.get("MINDNET_HASH_NORMALIZE", "canonical")).strip().lower() # canonical|none +# -------- main API -------- - # Body-Quelle laden - raw_body, raw_fm = ("", {}) - if src == "raw": - raw_body, raw_fm = _read_raw_body_from_file(file_path or path) - if isinstance(raw_fm, dict) and raw_fm: - merged_fm = dict(fm) - for k, v in raw_fm.items(): - merged_fm.setdefault(k, v) - fm = merged_fm - body_for_hash = raw_body - else: - body_for_hash = body_parsed +def make_note_payload(parsed_note: Any, **kwargs) -> Dict[str, Any]: + parsed = _coerce_mapping(parsed_note) + fm = _get_frontmatter(parsed) - # --- 1) Standard-Tripel (parsed:canonical) immer erzeugen --- - std_src = "parsed" - std_norm = "canonical" - std_hashes: Dict[str, str] = {} - for m in ("body", "frontmatter", "full"): - std_hashes[f"{m}:{std_src}:{std_norm}"] = _hash_for( - m, body=body_parsed, fm=fm, normalize=std_norm - ) + # external sources + types_registry = kwargs.get("types") or kwargs.get("types_registry") + types_from_yaml = _load_types_from_yaml(kwargs.get("types_file")) + # registry wins over YAML if provided + types_all: Dict[str, Any] = types_registry if isinstance(types_registry, dict) else types_from_yaml - # Convenience-Felder (für Tools) - hash_body = std_hashes["body:parsed:canonical"] - hash_frontmatter = std_hashes["frontmatter:parsed:canonical"] - hash_full = std_hashes["full:parsed:canonical"] + note_type: Optional[str] = _first_nonempty(parsed.get("type"), fm.get("type")) + title: Optional[str] = _first_nonempty(parsed.get("title"), fm.get("title")) + note_id: Optional[str] = _first_nonempty(parsed.get("note_id"), parsed.get("id"), fm.get("id")) - # --- 2) Hashes für die *aktuelle* Konfiguration (falls abweichend) --- - cur_hashes: Dict[str, str] = {} - if not (src == std_src and norm == std_norm): - for m in ("body", "frontmatter", "full"): - cur_hashes[f"{m}:{src}:{norm}"] = _hash_for( - m, body=body_for_hash, fm=fm, normalize=norm - ) + type_defaults = _resolve_type_defaults(note_type, types_all) - # --- 3) Aktueller Modus für Backwards-Compat Felder --- - current_hash = _hash_for(mode_resolved, body=body_for_hash, fm=fm, normalize=norm) - hash_signature = f"{mode_resolved}:{src}:{norm}:{current_hash}" + # --- resolve retriever_weight --- + env_default = os.getenv("MINDNET_DEFAULT_RETRIEVER_WEIGHT") + env_default_val = _to_float(env_default, 1.0) if env_default is not None else 1.0 - # Wikilinks (Note-Ebene) - refs = list(dict.fromkeys(extract_wikilinks(body_parsed))) if body_parsed else [] + effective_retriever_weight = _to_float( + _first_nonempty( + fm.get("retriever_weight"), + type_defaults.get("retriever_weight"), + env_default_val, + 1.0, + ), + 1.0, + ) - # NEU: Defaults & Casting - default_rw = _to_float(os.environ.get("MINDNET_DEFAULT_RETRIEVER_WEIGHT", 1.0), 1.0) - fm_rw = _to_float(fm.get("retriever_weight"), default_rw) - fm_chunk_profile = fm.get("chunk_profile") or fm.get("profile") or None + # --- resolve chunk_profile --- + effective_chunk_profile = _first_nonempty( + fm.get("chunk_profile"), + fm.get("profile"), + type_defaults.get("chunk_profile"), + os.getenv("MINDNET_DEFAULT_CHUNK_PROFILE"), + ) - payload: Dict[str, Any] = { - "note_id": fm.get("id") or fm.get("note_id"), - "title": fm.get("title"), - "type": fm.get("type"), - "status": fm.get("status"), - "created": fm.get("created"), - "updated": fm.get("updated"), - "path": rel_path or fm.get("path"), - "tags": fm.get("tags"), - # Volltext für verlustfreien Export - "fulltext": body_parsed, - # Backwards-Compat: - "hash_fulltext": current_hash, - "hash_signature": hash_signature, - # Option C: Mehrfach-Hashes - "hashes": {**std_hashes, **cur_hashes}, - "hash_body": hash_body, - "hash_frontmatter": hash_frontmatter, - "hash_full": hash_full, - # Fallback-Refs - "references": refs, - # NEU: - "retriever_weight": fm_rw, - } + # --- resolve edge_defaults (list[str]) --- + edge_defaults = _first_nonempty( + fm.get("edge_defaults"), + type_defaults.get("edge_defaults"), + ) + if edge_defaults is None: + edge_defaults = [] + if isinstance(edge_defaults, str): + # allow "a,b,c" + edge_defaults = [s.strip() for s in edge_defaults.split(",") if s.strip()] + elif not isinstance(edge_defaults, list): + edge_defaults = [] - if fm_chunk_profile is not None: - payload["chunk_profile"] = str(fm_chunk_profile) + # Start payload by preserving existing parsed keys (shallow copy); DO NOT drop fields + payload: Dict[str, Any] = dict(parsed) - for k in ("area", "project", "source", "lang", "slug", "aliases"): - if k in fm: - payload[k] = fm[k] + # Ensure canonical top-level fields + if note_id is not None: + payload["id"] = note_id + payload["note_id"] = note_id + if title is not None: + payload["title"] = title + if note_type is not None: + payload["type"] = note_type + + payload["retriever_weight"] = effective_retriever_weight + if effective_chunk_profile is not None: + payload["chunk_profile"] = effective_chunk_profile + if edge_defaults: + payload["edge_defaults"] = edge_defaults + + # keep frontmatter merged (without duplication) + if "frontmatter" in payload and isinstance(payload["frontmatter"], dict): + fm_out = dict(payload["frontmatter"]) + fm_out.setdefault("type", note_type) + fm_out["retriever_weight"] = effective_retriever_weight + if effective_chunk_profile is not None: + fm_out["chunk_profile"] = effective_chunk_profile + if edge_defaults: + fm_out["edge_defaults"] = edge_defaults + payload["frontmatter"] = fm_out return payload - -# --------------------------------------------------------------------------- -# CLI – Sichtprüfung -# --------------------------------------------------------------------------- - -def _cli() -> None: - ap = argparse.ArgumentParser(description="Note-Payload aus Markdown erzeugen und anzeigen") - ap.add_argument("--from-file", dest="src", required=True) - ap.add_argument("--vault-root", dest="vault_root", default=None) - ap.add_argument("--print", dest="do_print", action="store_true") - ap.add_argument("--hash-mode", choices=["body", "frontmatter", "full"], default=None) - ap.add_argument("--hash-normalize", choices=["canonical", "none"], default=None) - ap.add_argument("--hash-source", choices=["parsed", "raw"], default=None) - args = ap.parse_args() - - parsed = read_markdown(args.src) - payload = make_note_payload( - parsed, - vault_root=args.vault_root, - hash_mode=args.hash_mode, - hash_normalize=args.hash_normalize, - hash_source=args.hash_source, - file_path=args.src, - ) - if args.do_print: - print(json.dumps(payload, ensure_ascii=False, indent=2)) - -if __name__ == "__main__": # pragma: no cover - _cli()