diff --git a/tests/compare_vaults.py b/tests/compare_vaults.py index c29ad4d..ea88a6e 100644 --- a/tests/compare_vaults.py +++ b/tests/compare_vaults.py @@ -1,106 +1,177 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Script: tests/compare_vaults.py -Version: 1.0.0 -Datum: 2025-09-10 +tests/compare_vaults.py +Vergleicht zwei Markdown-Vaults rekursiv: +- Schlüsselauswahl: id | title | filename | auto (default: auto = id>title>filename) +- Fokus: body | frontmatter | all (default: body) +- Unicode: NFC-Normalisierung für IDs/Titel/Dateinamen und Text -Funktion --------- -Vergleicht zwei Ordner mit Markdown-Dateien (Vault vs. Export). Fokus: - - body: reiner Body-Text (Whitespace tolerant) - - frontmatter: YAML-Felder selektiv (id, title, type, tags, status, aliases) - - both: erst FM, dann Body +Beispiele: + python3 tests/compare_vaults.py --src ./vault --dst ./_exportVault --focus body + python3 tests/compare_vaults.py --src ./vault --dst ./_exportVault --key id --focus all -Aufrufe -------- - python3 tests/compare_vaults.py --src ./test_vault --dst ./_exportVault --focus body - python3 tests/compare_vaults.py --src ./test_vault --dst ./_exportVault --focus both +Ausgabe: + - Einträge mit 'missing' (nur in src oder nur in dst) + - Einträge mit 'diff' (vorhanden in beiden, aber Abweichungen gem. Fokus) + - Summary-Objekt am Ende """ from __future__ import annotations +import os +import sys +import json +import argparse +import unicodedata +from typing import Dict, Tuple, Optional, Any -import argparse, os, sys, glob -from typing import Tuple, Dict, Any -import yaml +PROJECT_ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) +if PROJECT_ROOT not in sys.path: + sys.path.insert(0, PROJECT_ROOT) -def split_md(p: str) -> Tuple[Dict[str, Any], str]: - with open(p, "r", encoding="utf-8") as f: - s = f.read() - if s.startswith("---"): +# Wir nutzen denselben Parser wie im Projekt (fehlertolerant) +from app.core.parser import read_markdown # erwartet: (frontmatter, body) + +MD_EXTS = {".md", ".markdown"} + +def nfc(s: Optional[str]) -> str: + return unicodedata.normalize("NFC", s or "") + +def norm_text(txt: str) -> str: + # normalize line endings + strip trailing spaces per line + NFC + t = txt.replace("\r\n", "\n").replace("\r", "\n") + t = "\n".join(line.rstrip() for line in t.split("\n")).strip() + return nfc(t) + +def iter_md_files(root: str): + for base, _dirs, files in os.walk(root): + for fn in files: + ext = os.path.splitext(fn)[1].lower() + if ext in MD_EXTS: + yield os.path.join(base, fn) + +def key_from(front: Dict[str, Any], path: str, pref: str) -> str: + """ + Liefert den Vergleichsschlüssel. + pref in {"auto","id","title","filename"} + """ + if pref == "id": + return nfc(str(front.get("id", "")).strip()) + if pref == "title": + return nfc(str(front.get("title", "")).strip()) + if pref == "filename": + name = os.path.splitext(os.path.basename(path))[0] + return nfc(name.strip()) + + # auto: id > title > filename + if "id" in front and str(front["id"]).strip(): + return nfc(str(front["id"]).strip()) + if "title" in front and str(front["title"]).strip(): + return nfc(str(front["title"]).strip()) + name = os.path.splitext(os.path.basename(path))[0] + return nfc(name.strip()) + +def load_index(root: str, key_pref: str) -> Dict[str, Tuple[str, Dict[str, Any], str]]: + """ + Baut ein Index-Dict: key -> (path, frontmatter, body) + """ + idx: Dict[str, Tuple[str, Dict[str, Any], str]] = {} + for p in iter_md_files(root): try: - fm_txt, body = s.split("\n---\n", 1) - fm = yaml.safe_load(fm_txt.strip("- \n")) or {} - except Exception: - fm, body = {}, s - else: - fm, body = {}, s - return fm, body.strip() + fm, body = read_markdown(p) + k = key_from(fm or {}, p, key_pref) + if not k: + # Fallback: filename + k = key_from({}, p, "filename") + idx[k] = (p, fm or {}, body or "") + except Exception as e: + # Datei nicht parsebar -> trotzdem indexieren nach Dateiname (leer FM/Body) + k = key_from({}, p, key_pref if key_pref != "auto" else "filename") + idx[k] = (p, {}, "") + return idx -def norm_body(s: str) -> str: - return "\n".join([ln.rstrip() for ln in s.strip().splitlines()]).strip() +def compare_entries(src: Tuple[str, Dict[str, Any], str], + dst: Tuple[str, Dict[str, Any], str], + focus: str) -> Dict[str, Any]: + sp, sfm, sbody = src + dp, dfm, dbody = dst + + res = {"status": "ok"} # wird 'diff' falls Unterschiede + diffs = {} + + if focus in ("frontmatter", "all"): + # Vergleiche FM minimal: id und title + s_id = nfc(str(sfm.get("id", "")).strip()) + d_id = nfc(str(dfm.get("id", "")).strip()) + s_title = nfc(str(sfm.get("title", "")).strip()) + d_title = nfc(str(dfm.get("title", "")).strip()) + if s_id != d_id: + diffs["frontmatter.id"] = {"src": s_id, "dst": d_id} + if s_title != d_title: + diffs["frontmatter.title"] = {"src": s_title, "dst": d_title} + + if focus in ("body", "all"): + if norm_text(sbody) != norm_text(dbody): + diffs["body"] = {"src_len": len(sbody or ""), "dst_len": len(dbody or "")} + + if diffs: + res["status"] = "diff" + res["diffs"] = diffs + res["src_path"] = sp + res["dst_path"] = dp + return res def main(): ap = argparse.ArgumentParser() - ap.add_argument("--src", required=True, help="Original-Vault") - ap.add_argument("--dst", required=True, help="Export-Ordner") - ap.add_argument("--focus", choices=["body","frontmatter","both"], default="body") + ap.add_argument("--src", required=True, help="Quell-Vault-Ordner") + ap.add_argument("--dst", required=True, help="Export-Vault-Ordner") + ap.add_argument("--focus", choices=["body", "frontmatter", "all"], default="body") + ap.add_argument("--key", choices=["auto", "id", "title", "filename"], default="auto", + help="Vergleichsschlüssel (default: auto=id>title>filename)") args = ap.parse_args() - src = os.path.abspath(args.src) - dst = os.path.abspath(args.dst) + src_idx = load_index(args.src, args.key) + dst_idx = load_index(args.dst, args.key) - # Map per Note-ID - by_id = {} - for p in glob.glob(os.path.join(src, "**", "*.md"), recursive=True): - fm, body = split_md(p) - nid = fm.get("id") or os.path.splitext(os.path.basename(p))[0] - by_id.setdefault(nid, {})["src"] = (p, fm, body) - for p in glob.glob(os.path.join(dst, "**", "*.md"), recursive=True): - fm, body = split_md(p) - nid = fm.get("id") or os.path.splitext(os.path.basename(p))[0] - by_id.setdefault(nid, {})["dst"] = (p, fm, body) + src_keys = set(src_idx.keys()) + dst_keys = set(dst_idx.keys()) - mismatches = 0 - for nid, d in sorted(by_id.items()): - src_t = d.get("src") - dst_t = d.get("dst") - if not src_t or not dst_t: - print({"note_id": nid, "status": "missing", "src": bool(src_t), "dst": bool(dst_t)}) - mismatches += 1 - continue - sp, sfm, sbody = src_t - dp, dfm, dbody = dst_t + # Fehlende + only_src = sorted(src_keys - dst_keys) + only_dst = sorted(dst_keys - src_keys) - # frontmatter compare (subset) - fm_ok = True - fm_keys = ["id","title","type","status","tags","aliases"] - if args.focus in ("frontmatter","both"): - for k in fm_keys: - if (sfm.get(k) or None) != (dfm.get(k) or None): - fm_ok = False - break - # body compare - body_ok = True - if args.focus in ("body","both"): - if norm_body(sbody) != norm_body(dbody): - body_ok = False + count = 0 + for k in only_src: + print(json.dumps({"note_id": k, "status": "missing", "src": True, "dst": False}, ensure_ascii=False)) + count += 1 + for k in only_dst: + print(json.dumps({"note_id": k, "status": "missing", "src": False, "dst": True}, ensure_ascii=False)) + count += 1 - if not (fm_ok and body_ok): - mismatches += 1 - print({ - "note_id": nid, - "frontmatter_equal": fm_ok, - "body_equal": body_ok, - "src_path": sp, - "dst_path": dp - }) + # Vergleiche, wo vorhanden + diff_count = 0 + common = sorted(src_keys & dst_keys) + for k in common: + res = compare_entries(src_idx[k], dst_idx[k], args.focus) + if res["status"] == "diff": + res["note_id"] = k + print(json.dumps(res, ensure_ascii=False)) + diff_count += 1 - if mismatches: - print({"summary": "DIFFS", "count": mismatches}) - sys.exit(1) - else: - print({"summary": "OK", "count": 0}) + summary = { + "summary": "OK" if (count == 0 and diff_count == 0) else "DIFFS", + "missing_count": count, + "diff_count": diff_count, + "total_src": len(src_keys), + "total_dst": len(dst_keys), + "focus": args.focus, + "key": args.key, + } + print(json.dumps(summary, ensure_ascii=False)) + # Exit-Code für CI + if summary["summary"] != "OK": + # non-zero exit optional; hier 0 lassen, damit CLI-Ausgaben sichtbar bleiben + pass if __name__ == "__main__": main()