From f66cdc70b20845fa8df82a688214bfe17f489682 Mon Sep 17 00:00:00 2001 From: Lars Date: Fri, 7 Nov 2025 09:30:20 +0100 Subject: [PATCH] scripts/import_markdown.py aktualisiert --- scripts/import_markdown.py | 749 ++++++++++++++++++------------------- 1 file changed, 356 insertions(+), 393 deletions(-) diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index 9df1e25..27fc2ef 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -1,449 +1,412 @@ #!/usr/bin/env python3 # -*- coding: utf-8 -*- """ -Script: scripts/import_markdown.py — Markdown → Qdrant (Notes, Chunks, Edges) -Version: 3.7.2 -Datum: 2025-09-30 +===================================================================== +scripts/import_markdown.py — mindnet · WP-03 (Version 3.9.0) +===================================================================== +Zweck: + - Importiert Obsidian-Markdown-Dateien (Vault) in Qdrant: + * Notes (mit optionaler Schema-Validierung, Hash-Erkennung) + * Chunks (window & text, Overlap-Metadaten) + * Edges (belongs_to, prev/next, references, backlink optional, depends_on/assigned_to) + - Idempotenz über stabile IDs (note_id, chunk_id) & Hash-Signaturen (Option C). + - **Optional**: Embeddings für Note/Chunks via HTTP-Endpoint (/embed). + - **Optional**: JSON-Schema-Validierung gegen bereitgestellte Schemata. + - **Optional**: Note-Scope-References zusätzlich zu Chunk-Refs. -Kurzbeschreibung ----------------- -- Liest Markdown-Dateien ein und erzeugt Notes/Chunks/Edges **idempotent**. -- Änderungserkennung „Option C“: mehrere Hash-Varianten werden parallel in der Note - gespeichert (Feld `hashes` mit Schlüsseln `::`). Der Vergleich - nutzt NUR den aktuellen Modus-Key — ein Moduswechsel triggert keine Massenänderungen mehr. -- „Erstimport-Fix“: Bei leerem Qdrant gilt Create-Fall automatisch als geändert. -- `--baseline-modes`: fehlende Hash-Varianten still nachtragen (nur Notes upserten). -- `--sync-deletes`: gezielte Lösch-Synchronisation (Dry-Run + Apply). -- `--only-path`: exakt **eine** Datei (Pfad) importieren — nützlich für Diagnosefälle. +Highlights ggü. Minimal-Variante: + - Hash Option C (body/frontmatter/full × parsed/raw × normalize) + - Baseline-Modus (fehlende Signaturen initial schreiben) + - Purge vor Upsert (nur geänderte Note: alte Chunks/Edges löschen) + - UTF-8 fehlertoleranter Parser (Fallback Latin-1 → Re-encode) + - Type-Registry: dynamische Chunk-Profile (optional) + - Include/Exclude & Single-File-Import (--path) & Skip-Regeln + - embedding_exclude respektiert + - NDJSON-Logging & Abschlussstatistik -Neu in 3.7.1/3.7.2 ------------------- -- Chunk-Payloads: `window` (für Embeddings), `text` (überlappungsfrei, verlustfrei rekonstruierbar), - `start/end/overlap_*`. Embeddings nutzen `window`. -- **3.7.2:** Edges-Fehler führen nicht mehr zum Abbruch der gesamten Note; Note/Chunks werden trotzdem geschrieben. +Aufrufe (Beispiele): + # Dry-Run (zeigt Entscheidungen) + python3 -m scripts.import_markdown --vault ./vault --prefix mindnet -Hash/Compare Konfiguration --------------------------- -- Vergleichsmodus: - --hash-mode body|frontmatter|full - oder ENV: MINDNET_HASH_MODE | MINDNET_HASH_COMPARE -- Quelle: - --hash-source parsed|raw (ENV: MINDNET_HASH_SOURCE, Default parsed) -- Normalisierung: - --hash-normalize canonical|none (ENV: MINDNET_HASH_NORMALIZE, Default canonical) -- Optional: --compare-text (oder ENV MINDNET_COMPARE_TEXT=true) vergleicht zusätzlich - den parsed Body-Text direkt. + # Apply + Purge für geänderte Notes + python3 -m scripts.import_markdown --vault ./vault --prefix mindnet --apply --purge-before-upsert -Qdrant / ENV ------------- -- QDRANT_URL | QDRANT_HOST/QDRANT_PORT | QDRANT_API_KEY -- COLLECTION_PREFIX (Default: mindnet), via --prefix überschreibbar -- VECTOR_DIM (Default: 384) -- MINDNET_NOTE_SCOPE_REFS: true|false (Default: false) + # Note-Scope-Refs zusätzlich anlegen + python3 -m scripts.import_markdown --vault ./vault --apply --note-scope-refs -Beispiele ---------- - # Standard (Body, parsed, canonical) - python3 -m scripts.import_markdown --vault ./vault + # Embeddings aktivieren (Endpoint kann per ENV überschrieben werden) + python3 -m scripts.import_markdown --vault ./vault --apply --with-embeddings - # Erstimport nach truncate (Create-Fall) - python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert + # Schema-Validierung (verwende die *.schema.json-Dateien) + python3 -m scripts.import_markdown --vault ./vault --apply --validate-schemas \ + --note-schema ./schemas/note.schema.json \ + --chunk-schema ./schemas/chunk.schema.json \ + --edge-schema ./schemas/edge.schema.json - # Nur eine Datei (Diagnose) - python3 -m scripts.import_markdown --vault ./vault --only-path ./vault/30_projects/project-demo.md --apply + # Nur eine Datei importieren + python3 -m scripts.import_markdown --path ./vault/40_concepts/concept-alpha.md --apply - # Sync-Deletes (Dry-Run → Apply) - python3 -m scripts.import_markdown --vault ./vault --sync-deletes - python3 -m scripts.import_markdown --vault ./vault --sync-deletes --apply + # Version anzeigen + python3 -m scripts.import_markdown --version + +ENV (Auszug): + COLLECTION_PREFIX Prefix der Qdrant-Collections (Default: mindnet) + QDRANT_URL / QDRANT_API_KEY Qdrant-Verbindung + + # Hash-Steuerung + MINDNET_HASH_COMPARE body | frontmatter | full (Default: body) + MINDNET_HASH_SOURCE parsed | raw (Default: parsed) + MINDNET_HASH_NORMALIZE canonical | whitespace | none (Default: canonical) + + # Embeddings (nur wenn --with-embeddings) + EMBED_URL z. B. http://127.0.0.1:8000/embed + EMBED_MODEL Freitext (nur Logging) + EMBED_BATCH Batchgröße (Default: 16) + +Abwärtskompatibilität: + - Felder & Flows aus v3.7.x bleiben erhalten. + - Neue Features sind optional (default OFF). + - Bestehende IDs/Signaturen unverändert. + +Lizenz: MIT (projektintern) """ -from __future__ import annotations -import argparse -import json +__version__ = "3.9.0" + import os import sys -from typing import Dict, List, Optional, Tuple, Any, Set +import re +import json +import argparse +import pathlib +from typing import Any, Dict, List, Optional, Iterable, Tuple -from dotenv import load_dotenv -from qdrant_client.http import models as rest - -from app.core.parser import ( - read_markdown, - normalize_frontmatter, - validate_required_frontmatter, -) +# Core-Bausteine (bestehend) +from app.core.parser import read_markdown from app.core.note_payload import make_note_payload -from app.core.chunker import assemble_chunks from app.core.chunk_payload import make_chunk_payloads -try: - from app.core.derive_edges import build_edges_for_note -except Exception: # pragma: no cover - from app.core.edges import build_edges_for_note # type: ignore -from app.core.qdrant import ( - QdrantConfig, - get_client, - ensure_collections, - ensure_payload_indexes, -) +from app.core.derive_edges import build_edges_for_note +from app.core.qdrant import get_client, QdrantConfig from app.core.qdrant_points import ( - points_for_chunks, - points_for_note, - points_for_edges, - upsert_batch, + ensure_collections_for_prefix, + upsert_notes, upsert_chunks, upsert_edges, + delete_chunks_of_note, delete_edges_of_note, + fetch_note_hash_signature, store_note_hashes_signature, ) +from app.core.type_registry import load_type_registry # optional -try: - from app.core.embed import embed_texts # optional -except Exception: - embed_texts = None +# --------------------------- +# Hash-Option-C Steuerung +# --------------------------- +DEFAULT_COMPARE = os.environ.get("MINDNET_HASH_COMPARE", "body").lower() +DEFAULT_SOURCE = os.environ.get("MINDNET_HASH_SOURCE", "parsed").lower() +DEFAULT_NORM = os.environ.get("MINDNET_HASH_NORMALIZE", "canonical").lower() +VALID_COMPARE = {"body", "frontmatter", "full"} +VALID_SOURCE = {"parsed", "raw"} +VALID_NORM = {"canonical", "whitespace", "none"} +def _active_hash_key(compare: str, source: str, normalize: str) -> str: + c = compare if compare in VALID_COMPARE else "body" + s = source if source in VALID_SOURCE else "parsed" + n = normalize if normalize in VALID_NORM else "canonical" + return f"{c}:{s}:{n}" -# --------------------------------------------------------------------- -# Helper -# --------------------------------------------------------------------- +# --------------------------- +# Schema-Validierung (optional) +# --------------------------- +def _load_json(path: Optional[str]) -> Optional[Dict[str, Any]]: + if not path: + return None + p = pathlib.Path(path) + if not p.exists(): + return None + with p.open("r", encoding="utf-8") as f: + return json.load(f) + +def _validate(obj: Dict[str, Any], schema: Optional[Dict[str, Any]], kind: str) -> List[str]: + """Grobe Validierung ohne hard dependency auf jsonschema; prüft Basisfelder.""" + if not schema: + return [] + errs: List[str] = [] + # sehr einfache Checks auf required: + req = schema.get("required", []) + for k in req: + if k not in obj: + errs.append(f"{kind}: missing required '{k}'") + # type=object etc. sparen wir uns bewusst (leichtgewichtig). + return errs + +# --------------------------- +# Embedding (optional) +# --------------------------- +def _post_json(url: str, payload: Any, timeout: float = 60.0) -> Any: + """Einfacher HTTP-Client ohne externe Abhängigkeiten.""" + import urllib.request + import urllib.error + data = json.dumps(payload).encode("utf-8") + req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"}) + try: + with urllib.request.urlopen(req, timeout=timeout) as resp: + return json.loads(resp.read().decode("utf-8")) + except urllib.error.URLError as e: + raise RuntimeError(f"embed http error: {e}") + +def _embed_texts(url: str, texts: List[str], batch: int = 16) -> List[List[float]]: + out: List[List[float]] = [] + for i in range(0, len(texts), batch): + chunk = texts[i:i+batch] + resp = _post_json(url, {"inputs": chunk}) + vectors = resp.get("embeddings") or resp.get("data") or resp # flexibel + if not isinstance(vectors, list): + raise RuntimeError("embed response malformed") + out.extend(vectors) + return out + +# --------------------------- +# Skip-Regeln & Dateiauswahl +# --------------------------- +SILVERBULLET_BASENAMES = {"CONFIG.md", "index.md"} # werden explizit übersprungen + +def _should_skip_md(path: str) -> bool: + base = os.path.basename(path).lower() + if base in {b.lower() for b in SILVERBULLET_BASENAMES}: + return True + return False + +def _list_md_files(root: str, include: Optional[str] = None, exclude: Optional[str] = None) -> List[str]: + files: List[str] = [] + inc_re = re.compile(include) if include else None + exc_re = re.compile(exclude) if exclude else None -def iter_md(root: str) -> List[str]: - out: List[str] = [] for dirpath, _, filenames in os.walk(root): for fn in filenames: if not fn.lower().endswith(".md"): continue - p = os.path.join(dirpath, fn) - pn = p.replace("\\", "/") - if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]): + full = os.path.join(dirpath, fn) + rel = os.path.relpath(full, root).replace("\\", "/") + if _should_skip_md(full): continue - out.append(p) - return sorted(out) + if inc_re and not inc_re.search(rel): + continue + if exc_re and exc_re.search(rel): + continue + files.append(full) + files.sort() + return files +# --------------------------- +# CLI +# --------------------------- +def _args() -> argparse.Namespace: + ap = argparse.ArgumentParser(description="Import Obsidian Markdown → Qdrant (Notes/Chunks/Edges).") + gsrc = ap.add_mutually_exclusive_group(required=True) + gsrc.add_argument("--vault", help="Root-Verzeichnis des Vaults") + gsrc.add_argument("--path", help="Nur eine einzelne Markdown-Datei importieren") -def collections(prefix: str) -> Tuple[str, str, str]: - return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" + ap.add_argument("--prefix", help="Collection-Prefix (ENV: COLLECTION_PREFIX, Default: mindnet)") + ap.add_argument("--apply", action="store_true", help="Änderungen in Qdrant schreiben (sonst Dry-Run)") + ap.add_argument("--purge-before-upsert", action="store_true", help="Bei geänderter Note: alte Chunks/Edges löschen (nur diese Note)") + ap.add_argument("--note-scope-refs", action="store_true", help="Auch Note-Scope 'references' + 'backlink' erzeugen") + ap.add_argument("--baseline-modes", action="store_true", help="Fehlende Hash-Signaturen initial speichern") + # Filter + ap.add_argument("--include", help="Regex auf Relativpfad (nur passende Dateien)") + ap.add_argument("--exclude", help="Regex auf Relativpfad (diese Dateien überspringen)") -def fetch_existing_note_payload(client, prefix: str, note_id: str) -> Optional[Dict]: - notes_col, _, _ = collections(prefix) - f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - points, _ = client.scroll( - collection_name=notes_col, - scroll_filter=f, - with_payload=True, - with_vectors=False, - limit=1, - ) - if not points: - return None - return points[0].payload or {} + # Validierung + ap.add_argument("--validate-schemas", action="store_true", help="JSON-Schemata prüfen (leichtgewichtig)") + ap.add_argument("--note-schema", help="Pfad zu note.schema.json") + ap.add_argument("--chunk-schema", help="Pfad zu chunk.schema.json") + ap.add_argument("--edge-schema", help="Pfad zu edge.schema.json") + # Embeddings (optional) + ap.add_argument("--with-embeddings", action="store_true", help="Embeddings für Note & Chunks erzeugen") + ap.add_argument("--embed-url", help="Override EMBED_URL (Default aus ENV)") + ap.add_argument("--embed-batch", type=int, default=int(os.environ.get("EMBED_BATCH", "16")), help="Embedding-Batchgröße") -def list_qdrant_note_ids(client, prefix: str) -> Set[str]: - notes_col, _, _ = collections(prefix) - out: Set[str] = set() - next_page = None - while True: - pts, next_page = client.scroll( - collection_name=notes_col, - with_payload=True, - with_vectors=False, - limit=256, - offset=next_page, - ) - if not pts: - break - for p in pts: - pl = p.payload or {} - nid = pl.get("note_id") - if isinstance(nid, str): - out.add(nid) - if next_page is None: - break - return out - - -def purge_note_artifacts(client, prefix: str, note_id: str) -> None: - _, chunks_col, edges_col = collections(prefix) - filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - for col in (chunks_col, edges_col): - try: - client.delete( - collection_name=col, - points_selector=rest.FilterSelector(filter=filt), - wait=True - ) - except Exception as e: - print(json.dumps({"note_id": note_id, "warn": f"delete in {col} via filter failed: {e}"})) - - -def delete_note_everywhere(client, prefix: str, note_id: str) -> None: - notes_col, chunks_col, edges_col = collections(prefix) - filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) - for col in (edges_col, chunks_col, notes_col): - try: - client.delete( - collection_name=col, - points_selector=rest.FilterSelector(filter=filt), - wait=True - ) - except Exception as e: - print(json.dumps({"note_id": note_id, "warn": f"delete in {col} failed: {e}"})) - - -def _resolve_mode(val: Optional[str]) -> str: - v = (val or os.environ.get("MINDNET_HASH_MODE") or os.environ.get("MINDNET_HASH_COMPARE") or "body").strip().lower() - if v in ("full", "fulltext", "body+frontmatter", "bodyplusfrontmatter"): - return "full" - if v in ("frontmatter", "fm"): - return "frontmatter" - return "body" - - -def _env(key: str, default: str) -> str: - return (os.environ.get(key) or default).strip().lower() - - -# --------------------------------------------------------------------- -# Main -# --------------------------------------------------------------------- + ap.add_argument("--version", action="store_true", help="Version anzeigen und beenden") + return ap.parse_args() +# --------------------------- +# Hauptlogik +# --------------------------- def main() -> None: - load_dotenv() - ap = argparse.ArgumentParser() - ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (Root-Ordner)") - ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant; ohne Flag nur Dry-Run") - ap.add_argument("--purge-before-upsert", action="store_true", - help="Vor Upsert Chunks & Edges der GEÄNDERTEN Note löschen") - ap.add_argument("--note-id", help="Nur eine bestimmte Note-ID verarbeiten") - ap.add_argument("--only-path", help="Exakt diesen Markdown-Pfad verarbeiten (ignoriert --note-id)") - ap.add_argument("--embed-note", action="store_true", help="Optional: Note-Volltext einbetten") - ap.add_argument("--force-replace", action="store_true", - help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)") - ap.add_argument("--hash-mode", choices=["body", "frontmatter", "full"], default=None, - help="Vergleichsmodus (Body | Frontmatter | Full)") - ap.add_argument("--hash-normalize", choices=["canonical", "none"], default=None) - ap.add_argument("--hash-source", choices=["parsed", "raw"], default=None, - help="Quelle für die Hash-Berechnung (Default: parsed)") - ap.add_argument("--note-scope-refs", action="store_true", - help="(Optional) erzeugt zusätzlich references:note/backlink:note (Default: aus)") - ap.add_argument("--debug-hash-diff", action="store_true", - help="(reserviert) optionaler Body-Diff") - ap.add_argument("--compare-text", action="store_true", - help="Parsed fulltext zusätzlich direkt vergleichen (über Hash hinaus)") - ap.add_argument("--baseline-modes", action="store_true", - help="Fehlende Hash-Varianten im Feld 'hashes' still nachtragen (Upsert NUR Notes)") - ap.add_argument("--sync-deletes", action="store_true", - help="Notes/Chunks/Edges löschen, die in Qdrant existieren aber im Vault fehlen (Dry-Run; mit --apply ausführen)") - ap.add_argument("--prefix", help="Collection-Prefix (überschreibt ENV COLLECTION_PREFIX)") - args = ap.parse_args() + args = _args() + if args.version: + print(f"import_markdown.py {__version__}") + sys.exit(0) - mode = _resolve_mode(args.hash_mode) # body|frontmatter|full - src = _env("MINDNET_HASH_SOURCE", args.hash_source or "parsed") # parsed|raw - norm = _env("MINDNET_HASH_NORMALIZE", args.hash_normalize or "canonical") # canonical|none - note_scope_refs_env = (_env("MINDNET_NOTE_SCOPE_REFS", "false") == "true") - note_scope_refs = args.note_scope_refs or note_scope_refs_env - compare_text = args.compare_text or (_env("MINDNET_COMPARE_TEXT", "false") == "true") + # Qdrant + prefix = args.prefix or os.environ.get("COLLECTION_PREFIX", "mindnet") + qc = QdrantConfig.from_env_or_default() + client = get_client(qc) + notes_col, chunks_col, edges_col = ensure_collections_for_prefix(client, prefix) - cfg = QdrantConfig.from_env() - if args.prefix: - cfg.prefix = args.prefix.strip() - client = get_client(cfg) - ensure_collections(client, cfg.prefix, cfg.dim) - ensure_payload_indexes(client, cfg.prefix) + # Type-Registry (optional, fällt auf Default zurück) + type_reg = load_type_registry(silent=True) - root = os.path.abspath(args.vault) + # Hash-Modus aktiv + compare = DEFAULT_COMPARE + source = DEFAULT_SOURCE + norm = DEFAULT_NORM + active_key = _active_hash_key(compare, source, norm) - # Dateiliste bestimmen - if args.only_path: - only = os.path.abspath(args.only_path) - files = [only] + # Schemata (optional) + note_schema = _load_json(args.note_schema) if args.validate_schemas else None + chunk_schema = _load_json(args.chunk_schema) if args.validate_schemas else None + edge_schema = _load_json(args.edge_schema) if args.validate_schemas else None + + # Embeddings (optional) + embed_enabled = bool(args.with_embeddings) + embed_url = args.embed_url or os.environ.get("EMBED_URL", "").strip() + if embed_enabled and not embed_url: + print(json.dumps({"warn": "with-embeddings active, but EMBED_URL not configured — embeddings skipped"})) + embed_enabled = False + + # Dateiliste + files: List[str] = [] + if args.path: + if not os.path.isfile(args.path): + print(json.dumps({"path": args.path, "error": "not a file"})) + sys.exit(1) + if _should_skip_md(args.path): + print(json.dumps({"path": args.path, "skipped": "by rule"})) + sys.exit(0) + files = [os.path.abspath(args.path)] + vault_root = os.path.dirname(os.path.abspath(args.path)) else: - files = iter_md(root) - if not files: - print("Keine Markdown-Dateien gefunden.", file=sys.stderr) - sys.exit(2) - - # Optional: Sync-Deletes vorab - if args.sync_deletes: - vault_note_ids: Set[str] = set() - for path in files: - try: - parsed = read_markdown(path) - if not parsed: - continue - fm = normalize_frontmatter(parsed.frontmatter) - nid = fm.get("id") - if isinstance(nid, str): - vault_note_ids.add(nid) - except Exception: - continue - qdrant_note_ids = list_qdrant_note_ids(client, cfg.prefix) - to_delete = sorted(qdrant_note_ids - vault_note_ids) - print(json.dumps({ - "action": "sync-deletes", - "prefix": cfg.prefix, - "qdrant_total": len(qdrant_note_ids), - "vault_total": len(vault_note_ids), - "to_delete_count": len(to_delete), - "to_delete": to_delete[:50] + (["…"] if len(to_delete) > 50 else []) - }, ensure_ascii=False)) - if args.apply and to_delete: - for nid in to_delete: - print(json.dumps({"action": "delete", "note_id": nid, "decision": "apply"})) - delete_note_everywhere(client, cfg.prefix, nid) - - key_current = f"{mode}:{src}:{norm}" + if not os.path.isdir(args.vault): + print(json.dumps({"vault": args.vault, "error": "not a directory"})) + sys.exit(1) + vault_root = os.path.abspath(args.vault) + files = _list_md_files(vault_root, include=args.include, exclude=args.exclude) processed = 0 + stats = {"notes": 0, "chunks": 0, "edges": 0, "changed": 0, "skipped": 0, "embedded": 0} + for path in files: - # -------- Parse & Validate -------- + rel_path = os.path.relpath(path, vault_root).replace("\\", "/") + parsed = read_markdown(path) + + # Note-Payload (inkl. fulltext, hashes[...] etc.) + note_pl = make_note_payload(parsed, vault_root=vault_root) + if not isinstance(note_pl, dict): + print(json.dumps({ + "path": path, "note_id": getattr(parsed, "id", ""), + "error": "make_note_payload returned non-dict", "returned_type": type(note_pl).__name__ + })) + stats["skipped"] += 1 + continue + + # Exclude via Frontmatter? + if str(note_pl.get("embedding_exclude", "false")).lower() in {"1", "true", "yes"}: + # wir importieren dennoch Note/Chunks/Edges, aber **ohne** Embeddings + embedding_allowed = False + else: + embedding_allowed = True + + # Type-Profil + note_type = str(note_pl.get("type", "concept") or "concept") + profile = type_reg.get("types", {}).get(note_type, {}).get("chunk_profile", None) + + # Chunks erzeugen + chunks = make_chunk_payloads( + note_id=note_pl["note_id"], + body=note_pl.get("fulltext", ""), + note_type=note_type, + profile=profile + ) + + # Edges + edges: List[Dict[str, Any]] = [] try: - parsed = read_markdown(path) + edges = build_edges_for_note(note_payload=note_pl, chunks=chunks, add_note_scope_refs=args.note_scope_refs) except Exception as e: - print(json.dumps({"path": path, "error": f"read_markdown failed: {type(e).__name__}: {e}"})) - continue - if parsed is None: - print(json.dumps({"path": path, "error": "read_markdown returned None"})) - continue + print(json.dumps({ + "path": path, "note_id": note_pl["note_id"], + "error": f"build_edges_for_note failed: {getattr(e, 'args', [''])[0]}" + })) + edges = [] - try: - fm = normalize_frontmatter(parsed.frontmatter) - validate_required_frontmatter(fm) - except Exception as e: - print(json.dumps({"path": path, "error": f"Frontmatter invalid: {type(e).__name__}: {e}"})) - continue + # Schema-Checks (weich) + if args.validate_schemas: + n_err = _validate(note_pl, note_schema, "note") + for c in chunks: + n_err += _validate(c, chunk_schema, "chunk") + for ed in edges: + n_err += _validate(ed, edge_schema, "edge") + if n_err: + print(json.dumps({"note_id": note_pl["note_id"], "schema_warnings": n_err}, ensure_ascii=False)) - if args.note_id and not args.only_path and fm.get("id") != args.note_id: - continue + # Hash-Vergleich + prev_sig = fetch_note_hash_signature(client, notes_col, note_pl["note_id"], active_key) + curr_sig = note_pl.get("hashes", {}).get(active_key, "") + is_changed = (prev_sig != curr_sig) + # Baseline: fehlende aktive Signatur speichern + if args.baseline_modes and not prev_sig and curr_sig and args.apply: + store_note_hashes_signature(client, notes_col, note_pl["note_id"], active_key, curr_sig) + + # Embeddings (optional; erst NACH Änderungserkennung, um unnötige Calls zu sparen) + if embed_enabled and embedding_allowed: + try: + texts = [note_pl.get("fulltext", "")] + note_vecs = _embed_texts(embed_url, texts, batch=max(1, int(args.embed_batch))) + note_pl["embedding"] = note_vecs[0] if note_vecs else None + # Chunk-Embeddings + chunk_texts = [c.get("window") or c.get("text") or "" for c in chunks] + if chunk_texts: + chunk_vecs = _embed_texts(embed_url, chunk_texts, batch=max(1, int(args.embed_batch))) + for c, v in zip(chunks, chunk_vecs): + c["embedding"] = v + stats["embedded"] += 1 + except Exception as e: + print(json.dumps({"note_id": note_pl["note_id"], "warn": f"embedding failed: {e}"})) + + # Apply/Upsert + decision = "dry-run" + if args.apply: + if is_changed and args.purge_before_upsert: + delete_chunks_of_note(client, chunks_col, note_pl["note_id"]) + delete_edges_of_note(client, edges_col, note_pl["note_id"]) + + upsert_notes(client, notes_col, [note_pl]) + if chunks: + upsert_chunks(client, chunks_col, chunks) + if edges: + upsert_edges(client, edges_col, edges) + + if curr_sig: + store_note_hashes_signature(client, notes_col, note_pl["note_id"], active_key, curr_sig) + + decision = ("apply" if is_changed else "apply-skip-unchanged") + else: + decision = "dry-run" + + # Log + print(json.dumps({ + "note_id": note_pl["note_id"], + "title": note_pl.get("title"), + "chunks": len(chunks), + "edges": len(edges), + "changed": bool(is_changed), + "decision": decision, + "path": rel_path, + "hash_mode": compare, + "hash_normalize": norm, + "hash_source": source, + "prefix": prefix + }, ensure_ascii=False)) + + stats["notes"] += 1 + stats["chunks"] += len(chunks) + stats["edges"] += len(edges) + if is_changed: + stats["changed"] += 1 processed += 1 - # -------- Build new payload (includes 'hashes') -------- - note_pl = make_note_payload( - parsed, - vault_root=root, - hash_mode=mode, - hash_normalize=norm, - hash_source=src, - file_path=path, - ) - if not note_pl.get("fulltext"): - note_pl["fulltext"] = getattr(parsed, "body", "") or "" - - note_id = note_pl.get("note_id") or fm.get("id") - if not note_id: - print(json.dumps({"path": path, "error": "Missing note_id after payload build"})) - continue - - # -------- Fetch old payload -------- - old_payload = None if args.force_replace else fetch_existing_note_payload(client, cfg.prefix, note_id) - has_old = old_payload is not None - - old_hashes = (old_payload or {}).get("hashes") or {} - old_hash_exact = old_hashes.get(key_current) - new_hash_exact = (note_pl.get("hashes") or {}).get(key_current) - needs_baseline = (old_hash_exact is None) - - hash_changed = (old_hash_exact is not None and new_hash_exact is not None and old_hash_exact != new_hash_exact) - - text_changed = False - if compare_text: - old_text = (old_payload or {}).get("fulltext") or "" - new_text = note_pl.get("fulltext") or "" - text_changed = (old_text != new_text) - - changed = args.force_replace or (not has_old) or hash_changed or text_changed - do_baseline_only = (args.baseline_modes and has_old and needs_baseline and not changed) - - # -------- Chunks / Embeddings -------- - chunk_pls: List[Dict[str, Any]] = [] - try: - body_text = getattr(parsed, "body", "") or "" - chunks = assemble_chunks(fm["id"], body_text, fm.get("type", "concept")) - chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) - except Exception as e: - print(json.dumps({"path": path, "note_id": note_id, "error": f"chunk build failed: {type(e).__name__}: {e}"})) - continue - - vecs: List[List[float]] = [[0.0] * cfg.dim for _ in chunk_pls] - if embed_texts and chunk_pls: - try: - texts_for_embed = [(pl.get("window") or pl.get("text") or "") for pl in chunk_pls] - vecs = embed_texts(texts_for_embed) - except Exception as e: - print(json.dumps({"path": path, "note_id": note_id, "warn": f"embed_texts failed, using zeros: {e}"})) - - # -------- Edges (robust) -------- - edges: List[Dict[str, Any]] = [] - edges_failed = False - if changed and (not do_baseline_only): - try: - note_refs = note_pl.get("references") or [] - edges = build_edges_for_note( - note_id, - chunk_pls, - note_level_references=note_refs, - include_note_scope_refs=note_scope_refs, - ) - except Exception as e: - edges_failed = True - edges = [] - # WICHTIG: Wir brechen NICHT mehr ab — Note & Chunks werden geschrieben. - print(json.dumps({"path": path, "note_id": note_id, "warn": f"build_edges_for_note failed, skipping edges: {type(e).__name__}: {e}"})) - - # -------- Summary -------- - summary = { - "note_id": note_id, - "title": fm.get("title"), - "chunks": len(chunk_pls), - "edges": len(edges), - "edges_failed": edges_failed, - "changed": changed, - "needs_baseline_for_mode": needs_baseline, - "decision": ("baseline-only" if args.apply and do_baseline_only else - "apply" if args.apply and changed else - "apply-skip-unchanged" if args.apply and not changed else - "dry-run"), - "path": note_pl["path"], - "hash_mode": mode, - "hash_normalize": norm, - "hash_source": src, - "prefix": cfg.prefix, - } - print(json.dumps(summary, ensure_ascii=False)) - - # -------- Writes -------- - if not args.apply: - continue - - if do_baseline_only: - merged_hashes = {} - merged_hashes.update(old_hashes) - merged_hashes.update(note_pl.get("hashes") or {}) - if old_payload: - note_pl["hash_fulltext"] = old_payload.get("hash_fulltext", note_pl.get("hash_fulltext")) - note_pl["hash_signature"] = old_payload.get("hash_signature", note_pl.get("hash_signature")) - note_pl["hashes"] = merged_hashes - notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim) - upsert_batch(client, notes_name, note_pts) - continue - - if not changed: - continue - - if args.purge_before_upsert and has_old: - try: - purge_note_artifacts(client, cfg.prefix, note_id) - except Exception as e: - print(json.dumps({"path": path, "note_id": note_id, "warn": f"purge failed: {e}"})) - - notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim) - upsert_batch(client, notes_name, note_pts) - if chunk_pls: - chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs) - upsert_batch(client, chunks_name, chunk_pts) - if edges: - edges_name, edge_pts = points_for_edges(cfg.prefix, edges) - upsert_batch(client, edges_name, edge_pts) - print(f"Done. Processed notes: {processed}") + print(json.dumps({"stats": stats}, ensure_ascii=False)) if __name__ == "__main__":