From 82151d6bc309c7fdb81bf18b3449068238a83dcb Mon Sep 17 00:00:00 2001 From: Lars Date: Sat, 8 Nov 2025 21:46:49 +0100 Subject: [PATCH] Dateien nach "app/core" hochladen --- app/core/chunk_payload.py | 413 +++++++++++++++++++++++--------------- app/core/note_payload.py | 364 ++++++++++++++++++--------------- 2 files changed, 461 insertions(+), 316 deletions(-) diff --git a/app/core/chunk_payload.py b/app/core/chunk_payload.py index 1806688..5676190 100644 --- a/app/core/chunk_payload.py +++ b/app/core/chunk_payload.py @@ -1,174 +1,271 @@ - +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- """ -chunk_payload.py — Mindnet core payload builder (v0.5, 2025-11-08) +Modul: app/core/chunk_payload.py +Version: 2.2.0 +Datum: 2025-10-06 -Purpose -------- -Builds a list of **JSON-serializable** payload dicts for chunks of a note to be -stored in `_chunks`. Ensures `retriever_weight` is set on every chunk. +Zweck +----- +Erzeugt Qdrant-Payloads für Chunks. Voll abwärtskompatibel zu v2.0.1. +Neu: Wenn der Chunker KEIN Overlap im Fenster liefert (== window fehlt / identisch zur Kernpassage), +erzeugen wir FENSTER mit synthetischem Overlap auf Basis chunk_config.get_sizes(note_type)['overlap']. -Public API ----------- -make_chunk_payloads(parsed_note, chunks, *args, retriever_weight=None, vault_root=None, type_defaults=None, **kwargs) -> list[dict] +Felder (beibehalten aus 2.0.1): + - note_id, chunk_id, id (Alias), chunk_index, seq, path + - window (mit Overlap), text (ohne linkes Overlap) + - start, end (Offsets im gesamten Body) + - overlap_left, overlap_right + - token_count?, section_title?, section_path?, type?, title?, tags? + +Kompatibilität: + - 'id' == 'chunk_id' als Alias + - Pfade bleiben relativ (keine führenden '/'), Backslashes → Slashes + - Robust für Chunk-Objekte oder Dicts; Fensterquelle: 'window'|'text'|'content'|'raw' + +Lizenz: MIT (projektintern) """ - from __future__ import annotations -from pathlib import Path -from typing import Any, Dict, List, Optional, Union, Mapping -import datetime, math -Json = Union[None, bool, int, float, str, list, dict] +from typing import Any, Dict, Iterable, List, Optional, Tuple, Union -# ------------------------- helpers ------------------------- +try: + # Typgerechtes Overlap aus deiner Konfiguration holen + from app.core.chunk_config import get_sizes as _get_sizes +except Exception: + def _get_sizes(_note_type: str): + # konservativer Default, falls Import fehlschlägt + return {"overlap": (40, 60), "target": (250, 350), "max": 500} -def _is_mapping(x: Any) -> bool: - return isinstance(x, Mapping) -def _get(obj: Any, *names: str, default: Any=None) -> Any: - for n in names: - if hasattr(obj, n): - try: - return getattr(obj, n) - except Exception: - pass - if _is_mapping(obj) and n in obj: - try: - return obj[n] - except Exception: - pass - return default +# ------------------------------- Utils ------------------------------- # -def _to_float(x: Any, default: float=1.0) -> float: - if x is None: - return float(default) - if isinstance(x, (int, float)) and math.isfinite(x): - return float(x) - try: - s = str(x).strip().replace(',', '.') - return float(s) - except Exception: - return float(default) - -def _ensure_list(x: Any) -> list: - if x is None: - return [] - if isinstance(x, list): - return x - if isinstance(x, (set, tuple)): - return list(x) - return [x] - -def _sanitize(obj: Any) -> Json: - if obj is None or isinstance(obj, (bool, int, float, str)): - return obj - if callable(obj): - return None - if isinstance(obj, (list, tuple, set)): - return [_sanitize(v) for v in obj] +def _get_attr_or_key(obj: Any, key: str, default=None): + if obj is None: + return default if isinstance(obj, dict): - out = {} - for k, v in obj.items(): - if callable(v): + return obj.get(key, default) + return getattr(obj, key, default) + +def _as_window_text(chunk: Any) -> str: + """Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern).""" + for k in ("window", "text", "content", "raw"): + v = _get_attr_or_key(chunk, k, None) + if isinstance(v, str) and v: + return v + return "" + +def _to_int(x: Any, default: int = 0) -> int: + try: + return int(x) + except Exception: + return default + +def _normalize_rel_path(p: str) -> str: + p = (p or "").replace("\\", "/") + while p.startswith("/"): + p = p[1:] + return p + + +# ---------------------- Overlap & Offsets ---------------------------- # + +def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]: + """ + Entfernt linkes Overlap aus echten Fenster-Strings. + Rückgabe: (segments, overlaps_left, reconstructed_text) + """ + segments: List[str] = [] + overlaps_left: List[int] = [] + reconstructed = "" + for w in windows: + w = w or "" + max_k = min(len(w), len(reconstructed)) + k = 0 + for cand in range(max_k, -1, -1): + if reconstructed.endswith(w[:cand]): + k = cand + break + seg = w[k:] + segments.append(seg) + overlaps_left.append(k) + reconstructed += seg + return segments, overlaps_left, reconstructed + +def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int: + """Länge längsten Suffix(a), der Prefix(b) ist.""" + if not a or not b: + return 0 + a1 = a[-max_probe:] + b1 = b[:max_probe] + n = min(len(a1), len(b1)) + for k in range(n, 0, -1): + if a1[-k:] == b1[:k]: + return k + return 0 + + +# ----------------------------- Public API ---------------------------- # + +def make_chunk_payloads( + frontmatter: Dict[str, Any], + rel_path: str, + chunks: Iterable[Union[Dict[str, Any], Any]], + note_text: Optional[str] = None, +) -> List[Dict[str, Any]]: + """ + Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden, + erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap. + """ + note_id = str(frontmatter.get("id") or "").strip() + note_type = str(frontmatter.get("type", "")).lower() + note_title = frontmatter.get("title", None) + note_tags = frontmatter.get("tags", None) + rel_path = _normalize_rel_path(rel_path) + + # 1) Rohdaten sammeln (so wie geliefert) + chunks_list = list(chunks) + raw_windows: List[str] = [] + seqs: List[int] = [] + ids_in: List[Optional[str]] = [] + token_counts: List[Optional[int]] = [] + section_titles: List[Optional[str]] = [] + section_paths: List[Optional[str]] = [] + any_explicit_window = False + + for idx, c in enumerate(chunks_list): + # Fensterquelle + w = _get_attr_or_key(c, "window", None) + if isinstance(w, str) and w: + any_explicit_window = True + raw_windows.append(w) + else: + raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz + # Ordnung + seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx)) + # IDs, Tokens, Sektionen + cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None)) + ids_in.append(str(cid) if isinstance(cid, str) and cid else None) + tc = _get_attr_or_key(c, "token_count", None) + token_counts.append(_to_int(tc, 0) if tc is not None else None) + section_titles.append(_get_attr_or_key(c, "section_title", None)) + section_paths.append(_get_attr_or_key(c, "section_path", None)) + + # 2) Segmente & Overlaps bestimmen + if any_explicit_window: + # Es existieren echte Fenster → dedupe, um Kernsegmente zu finden + segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows) + windows_final = raw_windows[:] # bereits mit Overlap geliefert + else: + # Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher), + # wir erzeugen synthetische Fenster mit Overlap gemäß Typ + segments = [w or "" for w in raw_windows] + overlaps_left = [] + windows_final = [] + recon = "" + try: + overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60))) + except Exception: + overlap_low, overlap_high = (40, 60) + overlap_target = int(overlap_low) + + for i, seg in enumerate(segments): + if i == 0: + # erstes Fenster: kein linker Kontext + windows_final.append(seg) + overlaps_left.append(0) + recon += seg + else: + # synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts + k = min(overlap_target, len(recon)) + left_ctx = recon[-k:] if k > 0 else "" + windows_final.append(left_ctx + seg) + overlaps_left.append(k) + recon += seg # Rekonstruktion bleibt kerntreu + + # 3) overlap_right bestimmen + overlaps_right: List[int] = [] + for i in range(len(windows_final)): + if i + 1 < len(windows_final): + ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096) + else: + ov = 0 + overlaps_right.append(ov) + + # 4) start/end-Offsets (exakt via note_text, sonst kumulativ) + starts: List[int] = [0] * len(segments) + ends: List[int] = [0] * len(segments) + pos = 0 + if isinstance(note_text, str) and note_text: + search_pos = 0 + for i, seg in enumerate(segments): + if not seg: + starts[i] = ends[i] = search_pos continue - out[str(k)] = _sanitize(v) - return out - if isinstance(obj, Path): - return str(obj) - if isinstance(obj, datetime.datetime): - return obj.isoformat() - if hasattr(obj, "__str__"): - try: - return str(obj) - except Exception: - return None - return None + j = note_text.find(seg, search_pos) + if j >= 0: + starts[i] = j + ends[i] = j + len(seg) + search_pos = ends[i] + else: + # Fallback: kumulativ + starts[i] = pos + pos += len(seg) + ends[i] = pos + else: + for i, seg in enumerate(segments): + starts[i] = pos + pos += len(seg) + ends[i] = pos -def _compute_retriever_weight(explicit: Any, frontmatter: dict, type_defaults: Optional[dict], note_type: Optional[str]) -> float: - if explicit is not None: - return _to_float(explicit, 1.0) - for key in ("retriever_weight", "retriever.weight", "retrieverWeight"): - if key in frontmatter: - return _to_float(frontmatter.get(key), 1.0) - if type_defaults and note_type: - tdef = type_defaults.get(note_type) or {} - for key in ("retriever_weight", "retriever.weight", "retrieverWeight"): - if key in tdef: - return _to_float(tdef.get(key), 1.0) - return 1.0 - -# ------------------------- public API ------------------------- - -def make_chunk_payloads(parsed_note: Any, - chunks: List[Any], - *args, - retriever_weight: Optional[float]=None, - vault_root: Optional[str]=None, - type_defaults: Optional[dict]=None, - **kwargs) -> List[Dict[str, Json]]: - """ - Build JSON-safe payloads for all chunks in a note. - - Parameters - ---------- - parsed_note : object or dict - chunks : list of objects or dicts - Expected per-chunk fields/keys (best-effort): text, index, start/end offsets, - tokens/n_tokens, section/heading. - retriever_weight : float|None - vault_root : str|None - type_defaults : dict|None - - Returns - ------- - list[dict] suitable for Qdrant payloads - """ - fm = _get(parsed_note, "frontmatter", "fm", default={}) - if not isinstance(fm, dict): - fm = {} - - note_id = _get(parsed_note, "note_id", "id", default=fm.get("id")) - title = _get(parsed_note, "title", default=fm.get("title")) - ntype = _get(parsed_note, "type", default=fm.get("type")) - raw_path = _get(parsed_note, "path", "rel_path", "relpath", default=fm.get("path")) - chunk_profile = _get(parsed_note, "chunk_profile", "profile", default=fm.get("chunk_profile")) - tags = _ensure_list(_get(parsed_note, "tags", default=fm.get("tags"))) - - rel_path = raw_path - if raw_path and vault_root: - try: - rel_path = str(Path(raw_path)).replace(str(Path(vault_root)), "").lstrip("/\\") - except Exception: - rel_path = str(raw_path) - - rw = _compute_retriever_weight(retriever_weight, fm, type_defaults, ntype) - - out: List[Dict[str, Json]] = [] - for i, ch in enumerate(chunks): - # tolerate missing/variant fields - text = _get(ch, "text", "content", "body", "value", default="") - idx = _get(ch, "index", "idx", default=i) - start = _get(ch, "start", "start_char", "offset_start", "char_start", default=None) - end = _get(ch, "end", "end_char", "offset_end", "char_end", default=None) - tokens = _get(ch, "n_tokens", "tokens", "token_count", default=None) - section = _get(ch, "section", "section_title", "heading", default=None) - section_level = _get(ch, "section_level", "heading_level", default=None) - - payload = { + # 5) Payload-Dicts + payloads: List[Dict[str, Any]] = [] + for i, (win, seg) in enumerate(zip(windows_final, segments)): + chunk_id = ids_in[i] or f"{note_id}#{i+1}" + pl: Dict[str, Any] = { "note_id": note_id, - "title": title, - "type": ntype, - "path": rel_path or raw_path, - "chunk_index": int(idx) if isinstance(idx, (int, float)) else i, - "text": text, - "start": start, - "end": end, - "tokens": tokens, - "section": section, - "section_level": section_level, - "chunk_profile": chunk_profile, - "tags": tags, - "retriever_weight": float(rw), + "chunk_id": chunk_id, + "id": chunk_id, # Alias + "chunk_index": i, + "seq": seqs[i], + "path": rel_path, + "window": win, + "text": seg, + "start": starts[i], + "end": ends[i], + "overlap_left": overlaps_left[i], + "overlap_right": overlaps_right[i], } - out.append(_sanitize(payload)) + # optionale Metadaten + if note_type: + pl["type"] = note_type + if note_title is not None: + pl["title"] = note_title + if note_tags is not None: + pl["tags"] = note_tags + if token_counts[i] is not None: + pl["token_count"] = int(token_counts[i]) + if section_titles[i] is not None: + pl["section_title"] = section_titles[i] + if section_paths[i] is not None: + sp = str(section_paths[i]).replace("\\", "/") + pl["section_path"] = sp if sp else "/" + payloads.append(pl) - return out + return payloads + + +# __main__ Demo (optional) +if __name__ == "__main__": # pragma: no cover + fm = {"id": "demo", "title": "Demo", "type": "concept"} + # Beispiel ohne echte Fenster → erzeugt synthetische Overlaps + chunks = [ + {"id": "demo#1", "text": "Alpha Beta Gamma"}, + {"id": "demo#2", "text": "Gamma Delta"}, + {"id": "demo#3", "text": "Delta Epsilon Zeta"}, + ] + pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta") + from pprint import pprint + pprint(pls) + recon = "".join(p["text"] for p in pls) + print("RECON:", recon) diff --git a/app/core/note_payload.py b/app/core/note_payload.py index fa4d848..f60db87 100644 --- a/app/core/note_payload.py +++ b/app/core/note_payload.py @@ -1,181 +1,229 @@ - -""" -note_payload.py — Mindnet core payload builder (v0.5, 2025-11-08) - -Purpose -------- -Builds a **JSON-serializable** payload dict for a single note to be stored in -the `_notes` collection. The function is defensive against both -attribute- and dict-like ParsedNote inputs, unknown kwargs, and ensures -`retriever_weight` is always present as a float. - -Key guarantees --------------- -- Accepts extra positional/keyword args without error (for importer compatibility). -- Tolerant of attribute vs dict access for ParsedNote. -- Always sets 'retriever_weight' in the payload (float). -- Never includes non-serializable objects (functions, PosixPath, datetime, etc.). - -Public API ----------- -make_note_payload(parsed_note, *args, retriever_weight=None, vault_root=None, type_defaults=None, **kwargs) -> dict -""" +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- +# Modul: app/core/note_payload.py +# Version: 1.7.0 +# Datum: 2025-09-09 from __future__ import annotations -from pathlib import Path -from typing import Any, Dict, Optional, Union, Iterable, Mapping -import datetime, math -Json = Union[None, bool, int, float, str, list, dict] +import argparse +import hashlib +import json +import os +from typing import Any, Dict, Optional, Tuple -# ------------------------- helpers ------------------------- +try: + from app.core.parser import read_markdown, extract_wikilinks, FRONTMATTER_RE +except Exception: # pragma: no cover + from .parser import read_markdown, extract_wikilinks, FRONTMATTER_RE # type: ignore -def _is_mapping(x: Any) -> bool: - return isinstance(x, Mapping) +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- -def _get(obj: Any, *names: str, default: Any=None) -> Any: - """Try attribute lookup, then mapping (dict) lookup, first hit wins.""" - for n in names: - if hasattr(obj, n): - try: - return getattr(obj, n) - except Exception: - pass - if _is_mapping(obj) and n in obj: - try: - return obj[n] - except Exception: - pass - return default +def _canon_frontmatter(fm: Dict[str, Any]) -> str: + return json.dumps(fm or {}, ensure_ascii=False, separators=(",", ":"), sort_keys=True) -def _to_float(x: Any, default: float=1.0) -> float: - if x is None: - return float(default) - if isinstance(x, (int, float)) and math.isfinite(x): - return float(x) +def _normalize_body(body: str, mode: str) -> str: + if mode == "none": + return body if body is not None else "" + text = (body or "").replace("\r\n", "\n").replace("\r", "\n") + text = "\n".join(line.rstrip() for line in text.split("\n")) + return text + +def _resolve_hash_mode(explicit: Optional[str]) -> str: + if explicit: + val = explicit.strip().lower() + else: + val = (os.environ.get("MINDNET_HASH_MODE") + or os.environ.get("MINDNET_HASH_COMPARE") + or "body").strip().lower() + if val in ("full", "fulltext", "body+frontmatter", "bodyplusfrontmatter"): + return "full" + if val in ("frontmatter", "fm"): + return "frontmatter" + return "body" + +def _read_raw_body_from_file(file_path: Optional[str]) -> Tuple[str, Dict[str, Any]]: + if not file_path or not os.path.exists(file_path): + return "", {} try: - s = str(x).strip().replace(',', '.') - return float(s) + with open(file_path, "r", encoding="utf-8") as f: + raw = f.read() except Exception: - return float(default) - -def _ensure_list(x: Any) -> list: - if x is None: - return [] - if isinstance(x, list): - return x - if isinstance(x, (set, tuple)): - return list(x) - return [x] - -def _sanitize(obj: Any) -> Json: - """Recursively convert to JSON-serializable primitives; drop callables.""" - if obj is None or isinstance(obj, (bool, int, float, str)): - return obj - if callable(obj): - return None - if isinstance(obj, (list, tuple, set)): - return [_sanitize(v) for v in obj] - if isinstance(obj, dict): - out = {} - for k, v in obj.items(): - if callable(v): - continue - out[str(k)] = _sanitize(v) - return out - if isinstance(obj, Path): - return str(obj) - if isinstance(obj, datetime.datetime): - return obj.isoformat() - if hasattr(obj, "__str__"): + return "", {} + m = FRONTMATTER_RE.match(raw) + fm = {} + if m: + fm_txt = m.group(1) try: - return str(obj) + import yaml # lazy + fm = yaml.safe_load(fm_txt) or {} except Exception: - return None - return None + fm = {} + body = raw[m.end():] + else: + body = raw + return body, fm -def _compute_retriever_weight(explicit: Any, frontmatter: dict, type_defaults: Optional[dict], note_type: Optional[str]) -> float: - if explicit is not None: - return _to_float(explicit, 1.0) - # common frontmatter keys - for key in ("retriever_weight", "retriever.weight", "retrieverWeight"): - if key in frontmatter: - return _to_float(frontmatter.get(key), 1.0) - # type defaults map like: {"concept": {"retriever_weight": 0.9}, ...} - if type_defaults and note_type: - tdef = type_defaults.get(note_type) or {} - for key in ("retriever_weight", "retriever.weight", "retrieverWeight"): - if key in tdef: - return _to_float(tdef.get(key), 1.0) - return 1.0 +def _sha256(s: str) -> str: + h = hashlib.sha256() + h.update(s.encode("utf-8")) + return h.hexdigest() -# ------------------------- public API ------------------------- +def _hash_for(mode: str, *, body: str, fm: Dict[str, Any], normalize: str) -> str: + body_n = _normalize_body(body or "", normalize) + fm_s = _canon_frontmatter(fm or {}) + if mode == "frontmatter": + return _sha256(fm_s) + if mode == "full": + return _sha256(body_n + "\n--FM--\n" + fm_s) + # default: body + return _sha256(body_n) -def make_note_payload(parsed_note: Any, - *args, - retriever_weight: Optional[float]=None, - vault_root: Optional[str]=None, - type_defaults: Optional[dict]=None, - **kwargs) -> Dict[str, Json]: +# --------------------------------------------------------------------------- +# Kernfunktion +# --------------------------------------------------------------------------- + +def make_note_payload( + parsed: Any, + vault_root: Optional[str] = None, + *, + hash_mode: Optional[str] = None, + hash_normalize: Optional[str] = None, + hash_source: Optional[str] = None, + file_path: Optional[str] = None, +) -> Dict[str, Any]: """ - Build a JSON-safe payload for the note. - - Parameters (tolerant; unknown args are ignored) - ---------- - parsed_note : object or dict - Expected fields/keys (best-effort): note_id/id, title, type, path/rel_path, - frontmatter, tags, aliases, chunk_profile. - retriever_weight : float|None - Overrides frontmatter/type-defaults if provided. - vault_root : str|None - Optional; used to produce a normalized relative path. - type_defaults : dict|None - Optional map for per-type defaults. - - Returns - ------- - dict suitable for Qdrant payload + Liefert den Note-Payload inkl. Mehrfach-Hashes. + - Es werden IMMER die drei Hashes für (body|frontmatter|full) unter + 'parsed:canonical' erzeugt (Schlüssel: z. B. 'body:parsed:canonical'). + - Zusätzlich werden – falls die aktuelle Konfig (source/normalize) davon + abweicht – die drei Hashes unter den entsprechenden Schlüsseln erzeugt, + z. B. 'frontmatter:raw:none'. + - 'hash_fulltext' und 'hash_signature' repräsentieren den *aktuellen* Modus. """ - fm = _get(parsed_note, "frontmatter", "fm", default={}) - if not isinstance(fm, dict): - fm = {} + # dict oder Objekt akzeptieren + if isinstance(parsed, dict): + fm = parsed.get("frontmatter") or {} + body_parsed = parsed.get("body") or "" + path = parsed.get("path") or "" + else: + fm = getattr(parsed, "frontmatter", {}) or {} + body_parsed = getattr(parsed, "body", "") or "" + path = getattr(parsed, "path", "") or "" - note_id = _get(parsed_note, "note_id", "id", default=fm.get("id")) - title = _get(parsed_note, "title", default=fm.get("title")) - ntype = _get(parsed_note, "type", default=fm.get("type")) - raw_path = _get(parsed_note, "path", "rel_path", "relpath", default=fm.get("path")) - tags = _ensure_list(_get(parsed_note, "tags", default=fm.get("tags"))) - aliases = _ensure_list(_get(parsed_note, "aliases", default=fm.get("aliases"))) - chunk_profile = _get(parsed_note, "chunk_profile", "profile", default=fm.get("chunk_profile")) - created = _get(parsed_note, "created", default=fm.get("created")) - updated = _get(parsed_note, "updated", default=fm.get("updated")) + # Zielpfad relativieren + rel_path = path + try: + if vault_root: + rel = os.path.relpath(path, vault_root) + rel = rel.replace("\\", "/").lstrip("/") + rel_path = rel + except Exception: + pass - # normalize path relative to vault root if both available - rel_path = raw_path - if raw_path and vault_root: - try: - rel_path = str(Path(raw_path)).replace(str(Path(vault_root)), "").lstrip("/\\") - except Exception: - rel_path = str(raw_path) + # Konfiguration auflösen + mode_resolved = _resolve_hash_mode(hash_mode) # body|frontmatter|full + src = (hash_source or os.environ.get("MINDNET_HASH_SOURCE", "parsed")).strip().lower() # parsed|raw + norm = (hash_normalize or os.environ.get("MINDNET_HASH_NORMALIZE", "canonical")).strip().lower() # canonical|none - rw = _compute_retriever_weight(retriever_weight, fm, type_defaults, ntype) + # Body-Quelle laden + raw_body, raw_fm = ("", {}) + if src == "raw": + raw_body, raw_fm = _read_raw_body_from_file(file_path or path) + if isinstance(raw_fm, dict) and raw_fm: + merged_fm = dict(fm) + for k, v in raw_fm.items(): + merged_fm.setdefault(k, v) + fm = merged_fm + body_for_hash = raw_body + else: + body_for_hash = body_parsed - payload = { - "note_id": note_id, - "title": title, - "type": ntype, - "path": rel_path or raw_path, - "tags": tags, - "aliases": aliases, - "chunk_profile": chunk_profile, - "created": created, - "updated": updated, - "retriever_weight": float(rw), + # --- 1) Standard-Tripel (parsed:canonical) immer erzeugen --- + std_src = "parsed" + std_norm = "canonical" + std_hashes: Dict[str, str] = {} + for m in ("body", "frontmatter", "full"): + std_hashes[f"{m}:{std_src}:{std_norm}"] = _hash_for( + m, body=body_parsed, fm=fm, normalize=std_norm + ) + + # Convenience-Felder (für Tools) + hash_body = std_hashes["body:parsed:canonical"] + hash_frontmatter = std_hashes["frontmatter:parsed:canonical"] + hash_full = std_hashes["full:parsed:canonical"] + + # --- 2) Hashes für die *aktuelle* Konfiguration (falls abweichend) --- + cur_hashes: Dict[str, str] = {} + if not (src == std_src and norm == std_norm): + for m in ("body", "frontmatter", "full"): + cur_hashes[f"{m}:{src}:{norm}"] = _hash_for( + m, body=body_for_hash, fm=fm, normalize=norm + ) + + # --- 3) Aktueller Modus für Backwards-Compat Felder --- + current_hash = _hash_for(mode_resolved, body=body_for_hash, fm=fm, normalize=norm) + hash_signature = f"{mode_resolved}:{src}:{norm}:{current_hash}" + + # Wikilinks (Note-Ebene) + refs = list(dict.fromkeys(extract_wikilinks(body_parsed))) if body_parsed else [] + + payload: Dict[str, Any] = { + "note_id": fm.get("id") or fm.get("note_id"), + "title": fm.get("title"), + "type": fm.get("type"), + "status": fm.get("status"), + "created": fm.get("created"), + "updated": fm.get("updated"), + "path": rel_path or fm.get("path"), + "tags": fm.get("tags"), + # Volltext für verlustfreien Export + "fulltext": body_parsed, + # Backwards-Compat: + "hash_fulltext": current_hash, + "hash_signature": hash_signature, + # Option C: Mehrfach-Hashes + "hashes": {**std_hashes, **cur_hashes}, + "hash_body": hash_body, + "hash_frontmatter": hash_frontmatter, + "hash_full": hash_full, + # Fallback-Refs + "references": refs, } - # Add selected FM fields if present (safe subset) - for key in ("status", "priority", "owner", "source"): - if key in fm: - payload[key] = fm.get(key) + for k in ("area", "project", "source", "lang", "slug", "aliases"): + if k in fm: + payload[k] = fm[k] - return _sanitize(payload) + return payload + +# --------------------------------------------------------------------------- +# CLI – Sichtprüfung +# --------------------------------------------------------------------------- + +def _cli() -> None: + ap = argparse.ArgumentParser(description="Note-Payload aus Markdown erzeugen und anzeigen") + ap.add_argument("--from-file", dest="src", required=True) + ap.add_argument("--vault-root", dest="vault_root", default=None) + ap.add_argument("--print", dest="do_print", action="store_true") + ap.add_argument("--hash-mode", choices=["body", "frontmatter", "full"], default=None) + ap.add_argument("--hash-normalize", choices=["canonical", "none"], default=None) + ap.add_argument("--hash-source", choices=["parsed", "raw"], default=None) + args = ap.parse_args() + + parsed = read_markdown(args.src) + payload = make_note_payload( + parsed, + vault_root=args.vault_root, + hash_mode=args.hash_mode, + hash_normalize=args.hash_normalize, + hash_source=args.hash_source, + file_path=args.src, + ) + if args.do_print: + print(json.dumps(payload, ensure_ascii=False, indent=2)) + +if __name__ == "__main__": # pragma: no cover + _cli()