Dateien nach "app/core" hochladen
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-08 21:19:18 +01:00
parent 52eae52061
commit b2043f4f84
2 changed files with 199 additions and 488 deletions

View File

@ -1,288 +1,158 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modul: app/core/chunk_payload.py
Version: 2.2.1
Datum: 2025-11-08
Zweck
-----
Erzeugt Qdrant-Payloads für Chunks. Voll abwärtskompatibel zu v2.0.1.
Neu: Wenn der Chunker KEIN Overlap im Fenster liefert (== window fehlt / identisch zur Kernpassage),
erzeugen wir FENSTER mit synthetischem Overlap auf Basis chunk_config.get_sizes(note_type)['overlap'].
Felder (beibehalten aus 2.0.1):
- note_id, chunk_id, id (Alias), chunk_index, seq, path
- window (mit Overlap), text (ohne linkes Overlap)
- start, end (Offsets im gesamten Body)
- overlap_left, overlap_right
- token_count?, section_title?, section_path?, type?, title?, tags?
- retriever_weight? (NEU: aus Frontmatter übernommen, numerisch gespeichert)
Kompatibilität:
- 'id' == 'chunk_id' als Alias
- Pfade bleiben relativ (keine führenden '/'), Backslashes Slashes
- Robust für Chunk-Objekte oder Dicts; Fensterquelle: 'window'|'text'|'content'|'raw'
Lizenz: MIT (projektintern)
"""
from __future__ import annotations from __future__ import annotations
from typing import Any, Dict, Iterable, List, Optional, Union
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union # ---- Helpers ----
def _coerce_float(val: Any) -> Optional[float]:
from app.core.type_registry import profile_overlap if val is None:
return None
try:
# Typgerechtes Overlap aus deiner Konfiguration holen
from app.core.chunk_config import get_sizes as _get_sizes
except Exception:
def _get_sizes(_note_type: str):
# konservativer Default, falls Import fehlschlägt
return {"overlap": (40, 60), "target": (250, 350), "max": 500}
# ------------------------------- Utils ------------------------------- #
def _get_attr_or_key(obj: Any, key: str, default=None):
if obj is None:
return default
if isinstance(obj, dict):
return obj.get(key, default)
return getattr(obj, key, default)
def _as_window_text(chunk: Any) -> str:
"""Fenstertext robust lesen (bevorzugt echte Fenster, sonst Kern)."""
for k in ("window", "text", "content", "raw"):
v = _get_attr_or_key(chunk, k, None)
if isinstance(v, str) and v:
return v
return ""
def _to_int(x: Any, default: int = 0) -> int:
try: try:
return int(x) if isinstance(val, (int, float)):
return float(val)
if isinstance(val, str):
v = val.strip()
if not v:
return None
return float(v.replace(",", "."))
except Exception: except Exception:
return default return None
return None
def _normalize_rel_path(p: str) -> str: def _extract_weight(frontmatter: Dict[str, Any], explicit: Optional[float]) -> Optional[float]:
p = (p or "").replace("\\", "/") if explicit is not None:
while p.startswith("/"): return _coerce_float(explicit)
p = p[1:] if frontmatter is None:
return p return None
if "retriever_weight" in frontmatter:
return _coerce_float(frontmatter.get("retriever_weight"))
# also accept nested style: retriever: { weight: 0.8 }
retriever = frontmatter.get("retriever")
if isinstance(retriever, dict) and "weight" in retriever:
return _coerce_float(retriever.get("weight"))
return None
def _ensure_list(x: Any) -> List[Any]:
if x is None:
return []
if isinstance(x, list):
return x
return [x]
# ---------------------- Overlap & Offsets ---------------------------- # def _resolve_note_id(frontmatter: Dict[str, Any], kw_note_id: Optional[str]) -> Optional[str]:
if kw_note_id:
return kw_note_id
if not isinstance(frontmatter, dict):
return None
return frontmatter.get("id") or frontmatter.get("note_id")
def _dedupe_windows_to_segments(windows: List[str]) -> Tuple[List[str], List[int], str]: def _base_fields(frontmatter: Dict[str, Any], note_id: Optional[str], path: str) -> Dict[str, Any]:
""" title = None
Entfernt linkes Overlap aus echten Fenster-Strings. typ = None
Rückgabe: (segments, overlaps_left, reconstructed_text) tags = None
""" if isinstance(frontmatter, dict):
segments: List[str] = [] title = frontmatter.get("title")
overlaps_left: List[int] = [] typ = frontmatter.get("type") or frontmatter.get("note_type")
reconstructed = "" # tags can be list[str] or comma separated string
for w in windows: tags = frontmatter.get("tags")
w = w or "" if isinstance(tags, str):
max_k = min(len(w), len(reconstructed)) tags = [t.strip() for t in tags.split(",") if t.strip()]
k = 0 return {
for cand in range(max_k, -1, -1): "note_id": note_id,
if reconstructed.endswith(w[:cand]): "title": title,
k = cand "type": typ,
break "tags": tags,
seg = w[k:] "path": path or None,
segments.append(seg) }
overlaps_left.append(k)
reconstructed += seg
return segments, overlaps_left, reconstructed
def _overlap_len_suffix_prefix(a: str, b: str, max_probe: int = 4096) -> int:
"""Länge längsten Suffix(a), der Prefix(b) ist."""
if not a or not b:
return 0
a1 = a[-max_probe:]
b1 = b[:max_probe]
n = min(len(a1), len(b1))
for k in range(n, 0, -1):
if a1[-k:] == b1[:k]:
return k
return 0
# ----------------------------- Public API ---------------------------- #
# ---- Public API ----
def make_chunk_payloads( def make_chunk_payloads(
frontmatter: Dict[str, Any], frontmatter: Dict[str, Any],
rel_path: str, *args,
chunks: Iterable[Union[Dict[str, Any], Any]], note_id: Optional[str] = None,
note_text: Optional[str] = None, chunks: Optional[Iterable[Any]] = None,
path: str = "",
chunk_profile: Optional[str] = None,
retriever_weight: Optional[float] = None,
**kwargs,
) -> List[Dict[str, Any]]: ) -> List[Dict[str, Any]]:
""" """
Baut Payloads pro Chunk. Falls Fenster ohne Overlap geliefert werden, Build chunk payload dictionaries for Qdrant.
erzeugen wir synthetische 'window'-Texte mit typgerechtem Overlap.
This function is intentionally permissive to stay compatible with older callers:
- If `chunks` is a list of dictionaries that already contain payload-like fields,
those are augmented.
- If `chunks` is a list of strings, minimal payloads are created.
- If `chunks` is a list of dicts with keys like `text`, `window`, or `index`, they are normalized.
Always injects `retriever_weight` into each payload when available (from explicit arg or frontmatter).
""" """
note_id = str(frontmatter.get("id") or "").strip() # Backward-compat for callers that might pass via kwargs
note_type = str(frontmatter.get("type", "")).lower() if chunks is None:
note_title = frontmatter.get("title", None) chunks = kwargs.get("payloads") or kwargs.get("pls") or kwargs.get("items") or kwargs.get("chunk_items")
note_tags = frontmatter.get("tags", None)
rel_path = _normalize_rel_path(rel_path)
# 1) Rohdaten sammeln (so wie geliefert) note_id_resolved = _resolve_note_id(frontmatter, note_id)
chunks_list = list(chunks) weight = _extract_weight(frontmatter, retriever_weight)
raw_windows: List[str] = [] base = _base_fields(frontmatter, note_id_resolved, path)
seqs: List[int] = []
ids_in: List[Optional[str]] = []
token_counts: List[Optional[int]] = []
section_titles: List[Optional[str]] = []
section_paths: List[Optional[str]] = []
any_explicit_window = False
for idx, c in enumerate(chunks_list): out: List[Dict[str, Any]] = []
# Fensterquelle for idx, item in enumerate(_ensure_list(chunks)):
w = _get_attr_or_key(c, "window", None) # Case A: already a full payload dict (heuristic: has 'text' or 'window' or 'note_id' keys)
if isinstance(w, str) and w: if isinstance(item, dict) and ("text" in item or "window" in item or "note_id" in item):
any_explicit_window = True pl = dict(item) # shallow copy
raw_windows.append(w) # ensure base fields exist if missing
else: for k, v in base.items():
raw_windows.append(_as_window_text(c)) # 'text'|'content'|'raw' als Ersatz pl.setdefault(k, v)
# Ordnung # ensure chunk_index if not present
seqs.append(_to_int(_get_attr_or_key(c, "seq", _get_attr_or_key(c, "chunk_index", idx)), idx)) pl.setdefault("chunk_index", item.get("index", idx))
# IDs, Tokens, Sektionen # inject retriever_weight
cid = _get_attr_or_key(c, "chunk_id", _get_attr_or_key(c, "id", None)) if weight is not None:
ids_in.append(str(cid) if isinstance(cid, str) and cid else None) pl["retriever_weight"] = weight
tc = _get_attr_or_key(c, "token_count", None) out.append(pl)
token_counts.append(_to_int(tc, 0) if tc is not None else None) continue
section_titles.append(_get_attr_or_key(c, "section_title", None))
section_paths.append(_get_attr_or_key(c, "section_path", None))
# 2) Segmente & Overlaps bestimmen # Case B: item is a dict with nested 'payload'
if any_explicit_window: if isinstance(item, dict) and "payload" in item and isinstance(item["payload"], dict):
# Es existieren echte Fenster → dedupe, um Kernsegmente zu finden pl = dict(item["payload"])
segments, overlaps_left, recon = _dedupe_windows_to_segments(raw_windows) for k, v in base.items():
windows_final = raw_windows[:] # bereits mit Overlap geliefert pl.setdefault(k, v)
else: pl.setdefault("chunk_index", pl.get("index", idx))
# Keine echten Fenster → Segmente sind identisch zu "Fenstern" (bisher), if weight is not None:
# wir erzeugen synthetische Fenster mit Overlap gemäß Typ pl["retriever_weight"] = weight
segments = [w or "" for w in raw_windows] out.append(pl)
overlaps_left = [] continue
windows_final = []
recon = ""
try:
overlap_low, overlap_high = tuple(_get_sizes(note_type).get("overlap", (40, 60)))
except Exception:
overlap_low, overlap_high = (40, 60)
overlap_target = int(overlap_low)
for i, seg in enumerate(segments): # Case C: item is a plain string -> treat as text (no window context)
if i == 0: if isinstance(item, str):
# erstes Fenster: kein linker Kontext text_val = item
windows_final.append(seg) pl = {
overlaps_left.append(0) **base,
recon += seg "chunk_index": idx,
else: "text": text_val,
# synthetischer linker Kontext = Suffix des bisher rekonstruierten Texts "window": text_val,
k = min(overlap_target, len(recon)) }
left_ctx = recon[-k:] if k > 0 else "" if weight is not None:
windows_final.append(left_ctx + seg) pl["retriever_weight"] = weight
overlaps_left.append(k) out.append(pl)
recon += seg # Rekonstruktion bleibt kerntreu continue
# 3) overlap_right bestimmen # Case D: item has 'text'/'window' under different names
overlaps_right: List[int] = [] if isinstance(item, dict):
for i in range(len(windows_final)): text_val = item.get("text") or item.get("body") or item.get("content") or ""
if i + 1 < len(windows_final): window_val = item.get("window") or text_val
ov = _overlap_len_suffix_prefix(windows_final[i], windows_final[i + 1], max_probe=4096) pl = {
else: **base,
ov = 0 "chunk_index": item.get("chunk_index", item.get("index", idx)),
overlaps_right.append(ov) "text": text_val,
"window": window_val,
}
if weight is not None:
pl["retriever_weight"] = weight
out.append(pl)
continue
# 4) start/end-Offsets (exakt via note_text, sonst kumulativ) # Fallback: minimal payload
starts: List[int] = [0] * len(segments) pl = {**base, "chunk_index": idx}
ends: List[int] = [0] * len(segments) if weight is not None:
pos = 0 pl["retriever_weight"] = weight
if isinstance(note_text, str) and note_text: out.append(pl)
search_pos = 0
for i, seg in enumerate(segments):
if not seg:
starts[i] = ends[i] = search_pos
continue
j = note_text.find(seg, search_pos)
if j >= 0:
starts[i] = j
ends[i] = j + len(seg)
search_pos = ends[i]
else:
# Fallback: kumulativ
starts[i] = pos
pos += len(seg)
ends[i] = pos
else:
for i, seg in enumerate(segments):
starts[i] = pos
pos += len(seg)
ends[i] = pos
# 5) Payload-Dicts return out
payloads: List[Dict[str, Any]] = []
# retriever_weight aus Frontmatter einlesen (einmalig auflösen)
_rw_val: Optional[float] = None
if isinstance(frontmatter, dict) and frontmatter.get("retriever_weight") is not None:
try:
_rw_val = float(frontmatter.get("retriever_weight"))
except Exception:
# Wenn keine Zahl, als None ignorieren (Qdrant-Index verlangt numerisch/Null)
_rw_val = None
for i, (win, seg) in enumerate(zip(windows_final, segments)):
chunk_id = ids_in[i] or f"{note_id}#{i+1}"
pl: Dict[str, Any] = {
"note_id": note_id,
"chunk_id": chunk_id,
"id": chunk_id, # Alias
"chunk_index": i,
"seq": seqs[i],
"path": rel_path,
"window": win,
"text": seg,
"start": starts[i],
"end": ends[i],
"overlap_left": overlaps_left[i],
"overlap_right": overlaps_right[i],
}
# optionale Metadaten
if note_type:
pl["type"] = note_type
if note_title is not None:
pl["title"] = note_title
if note_tags is not None:
pl["tags"] = note_tags
if token_counts[i] is not None:
pl["token_count"] = int(token_counts[i])
if section_titles[i] is not None:
pl["section_title"] = section_titles[i]
if section_paths[i] is not None:
sp = str(section_paths[i]).replace("\\", "/")
pl["section_path"] = sp if sp else "/"
# ---> HINZUGEFÜGT: retriever_weight pro Chunk aus Frontmatter numerisch mitschreiben
if _rw_val is not None:
pl["retriever_weight"] = _rw_val
payloads.append(pl)
return payloads
# __main__ Demo (optional)
if __name__ == "__main__": # pragma: no cover
fm = {"id": "demo", "title": "Demo", "type": "concept", "retriever_weight": 0.85}
# Beispiel ohne echte Fenster → erzeugt synthetische Overlaps
chunks = [
{"id": "demo#1", "text": "Alpha Beta Gamma"},
{"id": "demo#2", "text": "Gamma Delta"},
{"id": "demo#3", "text": "Delta Epsilon Zeta"},
]
pls = make_chunk_payloads(fm, "path/demo.md", chunks, note_text="Alpha Beta Gamma Delta Epsilon Zeta")
from pprint import pprint
pprint(pls)
recon = "".join(p["text"] for p in pls)
print("RECON:", recon)

View File

@ -1,237 +1,78 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# Modul: app/core/note_payload.py
# Version: 1.7.1
# Datum: 2025-11-08
from __future__ import annotations from __future__ import annotations
from typing import Any, Dict, Optional
import argparse def _coerce_float(val: Any) -> Optional[float]:
import hashlib if val is None:
import json return None
import os
from typing import Any, Dict, Optional, Tuple
try:
from app.core.parser import read_markdown, extract_wikilinks, FRONTMATTER_RE
except Exception: # pragma: no cover
from .parser import read_markdown, extract_wikilinks, FRONTMATTER_RE # type: ignore
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _canon_frontmatter(fm: Dict[str, Any]) -> str:
return json.dumps(fm or {}, ensure_ascii=False, separators=(",", ":"), sort_keys=True)
def _normalize_body(body: str, mode: str) -> str:
if mode == "none":
return body if body is not None else ""
text = (body or "").replace("\r\n", "\n").replace("\r", "\n")
text = "\n".join(line.rstrip() for line in text.split("\n"))
return text
def _resolve_hash_mode(explicit: Optional[str]) -> str:
if explicit:
val = explicit.strip().lower()
else:
val = (os.environ.get("MINDNET_HASH_MODE")
or os.environ.get("MINDNET_HASH_COMPARE")
or "body").strip().lower()
if val in ("full", "fulltext", "body+frontmatter", "bodyplusfrontmatter"):
return "full"
if val in ("frontmatter", "fm"):
return "frontmatter"
return "body"
def _read_raw_body_from_file(file_path: Optional[str]) -> Tuple[str, Dict[str, Any]]:
if not file_path or not os.path.exists(file_path):
return "", {}
try: try:
with open(file_path, "r", encoding="utf-8") as f: if isinstance(val, (int, float)):
raw = f.read() return float(val)
if isinstance(val, str):
v = val.strip()
if not v:
return None
return float(v.replace(",", "."))
except Exception: except Exception:
return "", {} return None
m = FRONTMATTER_RE.match(raw) return None
fm = {}
if m:
fm_txt = m.group(1)
try:
import yaml # lazy
fm = yaml.safe_load(fm_txt) or {}
except Exception:
fm = {}
body = raw[m.end():]
else:
body = raw
return body, fm
def _sha256(s: str) -> str: def _extract_weight(frontmatter: Dict[str, Any], explicit: Optional[float]) -> Optional[float]:
h = hashlib.sha256() if explicit is not None:
h.update(s.encode("utf-8")) return _coerce_float(explicit)
return h.hexdigest() if frontmatter is None:
return None
if "retriever_weight" in frontmatter:
return _coerce_float(frontmatter.get("retriever_weight"))
retriever = frontmatter.get("retriever")
if isinstance(retriever, dict) and "weight" in retriever:
return _coerce_float(retriever.get("weight"))
return None
def _hash_for(mode: str, *, body: str, fm: Dict[str, Any], normalize: str) -> str: def _resolve_note_id(frontmatter: Dict[str, Any], kw_note_id: Optional[str]) -> Optional[str]:
body_n = _normalize_body(body or "", normalize) if kw_note_id:
fm_s = _canon_frontmatter(fm or {}) return kw_note_id
if mode == "frontmatter": if not isinstance(frontmatter, dict):
return _sha256(fm_s) return None
if mode == "full": return frontmatter.get("id") or frontmatter.get("note_id")
return _sha256(body_n + "\n--FM--\n" + fm_s)
# default: body
return _sha256(body_n)
# ---------------------------------------------------------------------------
# Kernfunktion
# ---------------------------------------------------------------------------
def make_note_payload( def make_note_payload(
parsed: Any, frontmatter: Dict[str, Any],
vault_root: Optional[str] = None, *args,
*, note_id: Optional[str] = None,
hash_mode: Optional[str] = None, path: str = "",
hash_normalize: Optional[str] = None, text: str = "",
hash_source: Optional[str] = None, retriever_weight: Optional[float] = None,
file_path: Optional[str] = None, **kwargs,
) -> Dict[str, Any]: ) -> Dict[str, Any]:
""" """
Liefert den Note-Payload inkl. Mehrfach-Hashes. Build a note-level payload for Qdrant and inject `retriever_weight` if provided
- Es werden IMMER die drei Hashes für (body|frontmatter|full) unter in frontmatter or as explicit argument.
'parsed:canonical' erzeugt (Schlüssel: z. B. 'body:parsed:canonical'). Extra *args/**kwargs are accepted for backward compatibility.
- Zusätzlich werden falls die aktuelle Konfig (source/normalize) davon
abweicht die drei Hashes unter den entsprechenden Schlüsseln erzeugt,
z. B. 'frontmatter:raw:none'.
- 'hash_fulltext' und 'hash_signature' repräsentieren den *aktuellen* Modus.
""" """
# dict oder Objekt akzeptieren nid = _resolve_note_id(frontmatter, note_id)
if isinstance(parsed, dict): title = None
fm = parsed.get("frontmatter") or {} typ = None
body_parsed = parsed.get("body") or "" tags = None
path = parsed.get("path") or "" if isinstance(frontmatter, dict):
else: title = frontmatter.get("title")
fm = getattr(parsed, "frontmatter", {}) or {} typ = frontmatter.get("type") or frontmatter.get("note_type")
body_parsed = getattr(parsed, "body", "") or "" tags = frontmatter.get("tags")
path = getattr(parsed, "path", "") or "" if isinstance(tags, str):
tags = [t.strip() for t in tags.split(",") if t.strip()]
# Zielpfad relativieren payload = {
rel_path = path "id": nid, # keep both 'id' and 'note_id' for downstream compatibility
try: "note_id": nid,
if vault_root: "title": title,
rel = os.path.relpath(path, vault_root) "type": typ,
rel = rel.replace("\\", "/").lstrip("/") "tags": tags,
rel_path = rel "path": path or None,
except Exception: # keep optional raw text for convenience (some tools scroll notes by text)
pass "text": text or None,
# Konfiguration auflösen
mode_resolved = _resolve_hash_mode(hash_mode) # body|frontmatter|full
src = (hash_source or os.environ.get("MINDNET_HASH_SOURCE", "parsed")).strip().lower() # parsed|raw
norm = (hash_normalize or os.environ.get("MINDNET_HASH_NORMALIZE", "canonical")).strip().lower() # canonical|none
# Body-Quelle laden
raw_body, raw_fm = ("", {})
if src == "raw":
raw_body, raw_fm = _read_raw_body_from_file(file_path or path)
if isinstance(raw_fm, dict) and raw_fm:
merged_fm = dict(fm)
for k, v in raw_fm.items():
merged_fm.setdefault(k, v)
fm = merged_fm
body_for_hash = raw_body
else:
body_for_hash = body_parsed
# --- 1) Standard-Tripel (parsed:canonical) immer erzeugen ---
std_src = "parsed"
std_norm = "canonical"
std_hashes: Dict[str, str] = {}
for m in ("body", "frontmatter", "full"):
std_hashes[f"{m}:{std_src}:{std_norm}"] = _hash_for(
m, body=body_parsed, fm=fm, normalize=std_norm
)
# Convenience-Felder (für Tools)
hash_body = std_hashes["body:parsed:canonical"]
hash_frontmatter = std_hashes["frontmatter:parsed:canonical"]
hash_full = std_hashes["full:parsed:canonical"]
# --- 2) Hashes für die *aktuelle* Konfiguration (falls abweichend) ---
cur_hashes: Dict[str, str] = {}
if not (src == std_src and norm == std_norm):
for m in ("body", "frontmatter", "full"):
cur_hashes[f"{m}:{src}:{norm}"] = _hash_for(
m, body=body_for_hash, fm=fm, normalize=norm
)
# --- 3) Aktueller Modus für Backwards-Compat Felder ---
current_hash = _hash_for(mode_resolved, body=body_for_hash, fm=fm, normalize=norm)
hash_signature = f"{mode_resolved}:{src}:{norm}:{current_hash}"
# Wikilinks (Note-Ebene)
refs = list(dict.fromkeys(extract_wikilinks(body_parsed))) if body_parsed else []
payload: Dict[str, Any] = {
"note_id": fm.get("id") or fm.get("note_id"),
"title": fm.get("title"),
"type": fm.get("type"),
"status": fm.get("status"),
"created": fm.get("created"),
"updated": fm.get("updated"),
"path": rel_path or fm.get("path"),
"tags": fm.get("tags"),
# Volltext für verlustfreien Export
"fulltext": body_parsed,
# Backwards-Compat:
"hash_fulltext": current_hash,
"hash_signature": hash_signature,
# Option C: Mehrfach-Hashes
"hashes": {**std_hashes, **cur_hashes},
"hash_body": hash_body,
"hash_frontmatter": hash_frontmatter,
"hash_full": hash_full,
# Fallback-Refs
"references": refs,
} }
# ---> HINZUGEFÜGT: retriever_weight auf Note-Ebene numerisch, falls möglich weight = _extract_weight(frontmatter, retriever_weight)
if fm.get("retriever_weight") is not None: if weight is not None:
try: payload["retriever_weight"] = weight
payload["retriever_weight"] = float(fm.get("retriever_weight"))
except Exception:
# Falls kein Float (z. B. "high"), als Rohwert ablegen (kompatibel)
payload["retriever_weight"] = fm.get("retriever_weight")
for k in ("area", "project", "source", "lang", "slug", "aliases"):
if k in fm:
payload[k] = fm[k]
return payload return payload
# ---------------------------------------------------------------------------
# CLI Sichtprüfung
# ---------------------------------------------------------------------------
def _cli() -> None:
ap = argparse.ArgumentParser(description="Note-Payload aus Markdown erzeugen und anzeigen")
ap.add_argument("--from-file", dest="src", required=True)
ap.add_argument("--vault-root", dest="vault_root", default=None)
ap.add_argument("--print", dest="do_print", action="store_true")
ap.add_argument("--hash-mode", choices=["body", "frontmatter", "full"], default=None)
ap.add_argument("--hash-normalize", choices=["canonical", "none"], default=None)
ap.add_argument("--hash-source", choices=["parsed", "raw"], default=None)
args = ap.parse_args()
parsed = read_markdown(args.src)
payload = make_note_payload(
parsed,
vault_root=args.vault_root,
hash_mode=args.hash_mode,
hash_normalize=args.hash_normalize,
hash_source=args.hash_source,
file_path=args.src,
)
if args.do_print:
print(json.dumps(payload, ensure_ascii=False, indent=2))
if __name__ == "__main__": # pragma: no cover
_cli()