#!/usr/bin/env python3 from __future__ import annotations import argparse, os, re, sys, json, shutil, time, hashlib from dataclasses import dataclass from typing import Dict, Tuple, Optional, List from slugify import slugify from app.core.parser import read_markdown, normalize_frontmatter from app.core.parser import FRONTMATTER_RE # für Re-Inject from app.core.validate_note import validate_note_payload from app.core.note_payload import make_note_payload DATE_IN_NAME = re.compile(r"(?P\d{4})[-_\.]?(?P\d{2})[-_\.]?(?P\d{2})") TYPE_HINTS = [ ("journal", re.compile(r"/journal(s)?/|(^|[#\s])journal($|\s|#)", re.I)), ("task", re.compile(r"/tasks?/|(^|[#\s])task(s)?($|\s|#)", re.I)), ("project", re.compile(r"/projects?/|(^|[#\s])project($|\s|#)", re.I)), ("thought", re.compile(r"/thoughts?/|(^|[#\s])thought($|\s|#)", re.I)), ("experience",re.compile(r"/experiences?/|(^|[#\s])experience($|\s|#)", re.I)), ("source", re.compile(r"/sources?/|(^|[#\s])source($|\s|#)", re.I)), ("concept", re.compile(r".*")) # fallback ] def guess_type(rel_path: str, tags: List[str]) -> str: text = f"{rel_path} #{' #'.join(tags or [])}" for t, rx in TYPE_HINTS: if rx.search(text): return t return "concept" def guess_title(fm: Dict, body: str, filename: str) -> str: if fm.get("title"): return str(fm["title"]).strip() # H1-Heading for line in body.splitlines(): if line.strip().startswith("# "): return line.strip()[2:].strip() # Fallback Dateiname ohne Endung base = os.path.splitext(os.path.basename(filename))[0] return base.replace("_", " ").replace("-", " ").strip() or "Untitled" def guess_created(fm: Dict, filename: str, abspath: str) -> str: if fm.get("created"): return str(fm["created"]) m = DATE_IN_NAME.search(os.path.basename(filename)) if m: return f"{m.group('y')}-{m.group('m')}-{m.group('d')}" # fallback: mtime (UTC Datum) ts = os.path.getmtime(abspath) return time.strftime("%Y-%m-%d", time.gmtime(ts)) def build_stable_id(rel_path: str, title: str, created: str) -> str: slug = slugify(title, lowercase=True, separator="-") short = hashlib.sha1(rel_path.encode("utf-8")).hexdigest()[:6] ymd = created.replace("-", "") return f"{ymd}-{slug}-{short}" def ensure_list(x) -> list: if x is None or x == "": return [] if isinstance(x, list): return [str(i).strip() for i in x if str(i).strip()] return [str(x).strip()] def coerce_bool(x) -> Optional[bool]: if x is None or x == "": return None if isinstance(x, bool): return x s = str(x).strip().lower() if s in ("true","yes","1","on"): return True if s in ("false","no","0","off"): return False return None def inject_frontmatter(raw: str, new_fm: Dict) -> str: import yaml fm_text = yaml.safe_dump(new_fm, sort_keys=False, allow_unicode=True).strip() block = f"---\n{fm_text}\n---\n" m = FRONTMATTER_RE.match(raw) if m: return block + raw[m.end():] else: return block + raw def diff_fm(old: Dict, new: Dict) -> Dict: changes = {} keys = set(old.keys()) | set(new.keys()) for k in sorted(keys): if old.get(k) != new.get(k): changes[k] = {"old": old.get(k), "new": new.get(k)} return changes def walk_md(root: str) -> List[str]: res = [] for base, dirs, files in os.walk(root): # Skip typical ignore dirs if any(skip in base for skip in ("/.obsidian/", "/_backup_frontmatter/")): continue for f in files: if f.lower().endswith(".md"): res.append(os.path.join(base, f)) return res def main(): ap = argparse.ArgumentParser() ap.add_argument("--vault", default="mindnet/vault") ap.add_argument("--apply", action="store_true", help="Schreibt Änderungen (ohne = Dry-Run)") ap.add_argument("--backup", action="store_true", help="Backup schreiben (empfohlen bei --apply)") args = ap.parse_args() root = os.path.abspath(args.vault) files = walk_md(root) if not files: print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2) # Backup-Ziel backup_root = os.path.join(root, "_backup_frontmatter", time.strftime("%Y%m%d_%H%M%S")) if args.apply and args.backup and not os.path.isdir(backup_root): os.makedirs(backup_root, exist_ok=True) total, changed, failed = 0, 0, 0 for abspath in files: total += 1 try: from app.core.parser import _strip_bom, _normalize_text with open(abspath, "r", encoding="utf-8") as f: raw = _strip_bom(f.read()) raw = _normalize_text(raw) parsed = read_markdown(abspath) fm0 = normalize_frontmatter(parsed.frontmatter) body = parsed.body rel_path = os.path.relpath(abspath, root).replace("\\", "/") # Felder ableiten (nur setzen, wenn fehlen) title = guess_title(fm0, body, abspath) created = guess_created(fm0, abspath, abspath) tags = ensure_list(fm0.get("tags")) note_type = fm0.get("type") or guess_type(rel_path.lower(), tags) status = fm0.get("status") or "draft" emb_ex = fm0.get("embedding_exclude") emb_ex_bool = coerce_bool(emb_ex) note_id = fm0.get("id") if not note_id: note_id = build_stable_id(rel_path, title, created) fm1 = dict(fm0) # clone # Ergänzen / normalisieren (nur wenn fehlend oder klar falsch) fm1.setdefault("title", title) fm1.setdefault("id", note_id) fm1.setdefault("type", note_type) fm1.setdefault("status", status) fm1.setdefault("created", created) if tags: fm1["tags"] = tags if emb_ex_bool is not None: fm1["embedding_exclude"] = emb_ex_bool # Nur wenn es echte Änderungen gibt: if diff_fm(fm0, fm1): if not args.apply: print(json.dumps({ "file": rel_path, "changes": diff_fm(fm0, fm1) }, ensure_ascii=False)) else: if args.backup: dst = os.path.join(backup_root, rel_path) os.makedirs(os.path.dirname(dst), exist_ok=True) shutil.copy2(abspath, dst) new_raw = inject_frontmatter(raw, fm1) with open(abspath, "w", encoding="utf-8") as f: f.write(new_raw) changed += 1 # Optional: gegen Note-Schema prüfen (nur in-memory Payload) try: payload = make_note_payload(parsed, vault_root=root) # Payload reflektiert fm0; nach write neu parsen, damit hash/refs passen if args.apply: parsed2 = read_markdown(abspath) payload = make_note_payload(parsed2, vault_root=root) validate_note_payload(payload) except Exception: pass except Exception as e: failed += 1 print(json.dumps({"file": abspath, "error": str(e)})) print(f"Scanned: {total} | Changed: {changed} | Failed: {failed}") if failed: sys.exit(1) if __name__ == "__main__": main()