scripts/import_markdown.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s

This commit is contained in:
Lars 2025-11-08 08:31:49 +01:00
parent b5958a9f63
commit 2041771b14

View File

@ -1,412 +1,317 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
===================================================================== Modul: scripts/import_markdown.py
scripts/import_markdown.py mindnet · WP-03 (Version 3.9.0) Version: 3.9.1
===================================================================== Datum: 2025-11-08
Zweck:
- Importiert Obsidian-Markdown-Dateien (Vault) in Qdrant:
* Notes (mit optionaler Schema-Validierung, Hash-Erkennung)
* Chunks (window & text, Overlap-Metadaten)
* Edges (belongs_to, prev/next, references, backlink optional, depends_on/assigned_to)
- Idempotenz über stabile IDs (note_id, chunk_id) & Hash-Signaturen (Option C).
- **Optional**: Embeddings für Note/Chunks via HTTP-Endpoint (/embed).
- **Optional**: JSON-Schema-Validierung gegen bereitgestellte Schemata.
- **Optional**: Note-Scope-References zusätzlich zu Chunk-Refs.
Highlights ggü. Minimal-Variante: Zweck
- Hash Option C (body/frontmatter/full × parsed/raw × normalize) -----
- Baseline-Modus (fehlende Signaturen initial schreiben) Idempotenter Importer für Obsidian-Markdown-Dateien (Vault) nach Qdrant:
- Purge vor Upsert (nur geänderte Note: alte Chunks/Edges löschen) liest Markdown (fehlertolerant), validiert Frontmatter leichtgewichtig,
- UTF-8 fehlertoleranter Parser (Fallback Latin-1 Re-encode) chunked den Body, erzeugt Edges,
- Type-Registry: dynamische Chunk-Profile (optional) Upsert von Notes/Chunks/Edges,
- Include/Exclude & Single-File-Import (--path) & Skip-Regeln Änderungen über Signaturen (Hash-Modi) zuverlässig ermitteln,
- embedding_exclude respektiert optional Type-Registry (config/types.yaml) für typabhängige Profile (no-op, wenn Datei fehlt).
- NDJSON-Logging & Abschlussstatistik
Aufrufe (Beispiele): Änderung (3.9.1)
# Dry-Run (zeigt Entscheidungen) ----------------
python3 -m scripts.import_markdown --vault ./vault --prefix mindnet Entfernt fehlerhaften Import von ensure_collections_for_prefix aus app.core.qdrant_points.
Collection-Anlage erfolgt allein via app.core.qdrant.ensure_collections(...).
# Apply + Purge für geänderte Notes Wichtige Features
python3 -m scripts.import_markdown --vault ./vault --prefix mindnet --apply --purge-before-upsert -----------------
Hash-/Änderungslogik (ENV, abwärtskompatibel):
MINDNET_HASH_COMPARE = Body|Full|Frontmatter|Body+Frontmatter (Default: Body)
MINDNET_HASH_SOURCE = parsed|raw (Default: parsed)
MINDNET_HASH_NORMALIZE = canonical|none (Default: canonical)
Persistiert:
hash_signature, hash_fulltext, hash_body, hash_frontmatter
Optionaler Baseline-Schritt: --baseline-modes
# Note-Scope-Refs zusätzlich anlegen CLI-Optionen:
--apply : schreibt nach Qdrant (sonst Dry-Run)
--purge-before-upsert : löscht Chunks/Edges der Note vor Upsert, wenn 'changed'
--note-scope-refs : ergänzt note-scope references/backlinks
--sync-deletes : löscht Qdrant-Notes, die im Vault fehlen (nur mit --apply)
--baseline-modes : persistiert alle Hashvarianten als Baseline
--prefix : Collections-Prefix (sonst ENV COLLECTION_PREFIX oder 'mindnet')
Parser:
fehlertolerant (BOM, latin-1-Fallback, NUL-Strip), liefert parsed.frontmatter, parsed.body,
optional parsed.body_full, parsed.chunks
Type-Registry (optional, abwärtskompatibel):
config/types.yaml steuert Chunk-Profile pro type; wenn nicht vorhanden Default
Qdrant:
Collections: <prefix>_notes, <prefix>_chunks, <prefix>_edges
Sicheres ensure_collections(), deterministische IDs (note_id, chunk_id = note_id#n)
Aufrufe (Beispiele)
-------------------
export COLLECTION_PREFIX="mindnet"
python3 -m scripts.import_markdown --vault ./vault
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
python3 -m scripts.import_markdown --vault ./vault --apply --note-scope-refs python3 -m scripts.import_markdown --vault ./vault --apply --note-scope-refs
python3 -m scripts.import_markdown --vault ./vault --apply --baseline-modes
python3 -m scripts.import_markdown --vault ./vault --sync-deletes --apply
# Embeddings aktivieren (Endpoint kann per ENV überschrieben werden) Kompatibilität
python3 -m scripts.import_markdown --vault ./vault --apply --with-embeddings --------------
Erwartete Module:
# Schema-Validierung (verwende die *.schema.json-Dateien) app.core.parser (read_markdown)
python3 -m scripts.import_markdown --vault ./vault --apply --validate-schemas \ app.core.note_payload (make_note_payload)
--note-schema ./schemas/note.schema.json \ app.core.chunk_payload (make_chunk_payloads)
--chunk-schema ./schemas/chunk.schema.json \ app.core.derive_edges (build_edges_for_note)
--edge-schema ./schemas/edge.schema.json app.core.qdrant (QdrantConfig, get_client, ensure_collections, count_points, list_note_ids, fetch_one_note)
app.core.qdrant_points (upsert_notes, upsert_chunks, upsert_edges, delete_by_note)
# Nur eine Datei importieren app.core.type_registry (optional; load_type_registry) nur geladen, wenn vorhanden
python3 -m scripts.import_markdown --path ./vault/40_concepts/concept-alpha.md --apply
# Version anzeigen
python3 -m scripts.import_markdown --version
ENV (Auszug):
COLLECTION_PREFIX Prefix der Qdrant-Collections (Default: mindnet)
QDRANT_URL / QDRANT_API_KEY Qdrant-Verbindung
# Hash-Steuerung
MINDNET_HASH_COMPARE body | frontmatter | full (Default: body)
MINDNET_HASH_SOURCE parsed | raw (Default: parsed)
MINDNET_HASH_NORMALIZE canonical | whitespace | none (Default: canonical)
# Embeddings (nur wenn --with-embeddings)
EMBED_URL z. B. http://127.0.0.1:8000/embed
EMBED_MODEL Freitext (nur Logging)
EMBED_BATCH Batchgröße (Default: 16)
Abwärtskompatibilität:
- Felder & Flows aus v3.7.x bleiben erhalten.
- Neue Features sind optional (default OFF).
- Bestehende IDs/Signaturen unverändert.
Lizenz: MIT (projektintern)
""" """
from __future__ import annotations
__version__ = "3.9.0" import argparse
import os import os
import sys import sys
import re
import json import json
import argparse from pathlib import Path
import pathlib from typing import Dict, List, Tuple, Optional
from typing import Any, Dict, List, Optional, Iterable, Tuple
# Core-Bausteine (bestehend) # Parser (fehlertolerant)
from app.core.parser import read_markdown from app.core.parser import read_markdown
# Payload-Builder
from app.core.note_payload import make_note_payload from app.core.note_payload import make_note_payload
from app.core.chunk_payload import make_chunk_payloads from app.core.chunk_payload import make_chunk_payloads
from app.core.derive_edges import build_edges_for_note from app.core.derive_edges import build_edges_for_note
from app.core.qdrant import get_client, QdrantConfig
from app.core.qdrant_points import ( # Qdrant-Glue
ensure_collections_for_prefix, from app.core.qdrant import (
upsert_notes, upsert_chunks, upsert_edges, QdrantConfig,
delete_chunks_of_note, delete_edges_of_note, get_client,
fetch_note_hash_signature, store_note_hashes_signature, ensure_collections,
count_points,
list_note_ids as qdrant_list_note_ids,
fetch_one_note,
)
from app.core.qdrant_points import (
upsert_notes,
upsert_chunks,
upsert_edges,
delete_by_note,
) )
from app.core.type_registry import load_type_registry # optional
# --------------------------- # Type-Registry optional laden (no-op, falls nicht vorhanden)
# Hash-Option-C Steuerung
# ---------------------------
DEFAULT_COMPARE = os.environ.get("MINDNET_HASH_COMPARE", "body").lower()
DEFAULT_SOURCE = os.environ.get("MINDNET_HASH_SOURCE", "parsed").lower()
DEFAULT_NORM = os.environ.get("MINDNET_HASH_NORMALIZE", "canonical").lower()
VALID_COMPARE = {"body", "frontmatter", "full"}
VALID_SOURCE = {"parsed", "raw"}
VALID_NORM = {"canonical", "whitespace", "none"}
def _active_hash_key(compare: str, source: str, normalize: str) -> str:
c = compare if compare in VALID_COMPARE else "body"
s = source if source in VALID_SOURCE else "parsed"
n = normalize if normalize in VALID_NORM else "canonical"
return f"{c}:{s}:{n}"
# ---------------------------
# Schema-Validierung (optional)
# ---------------------------
def _load_json(path: Optional[str]) -> Optional[Dict[str, Any]]:
if not path:
return None
p = pathlib.Path(path)
if not p.exists():
return None
with p.open("r", encoding="utf-8") as f:
return json.load(f)
def _validate(obj: Dict[str, Any], schema: Optional[Dict[str, Any]], kind: str) -> List[str]:
"""Grobe Validierung ohne hard dependency auf jsonschema; prüft Basisfelder."""
if not schema:
return []
errs: List[str] = []
# sehr einfache Checks auf required:
req = schema.get("required", [])
for k in req:
if k not in obj:
errs.append(f"{kind}: missing required '{k}'")
# type=object etc. sparen wir uns bewusst (leichtgewichtig).
return errs
# ---------------------------
# Embedding (optional)
# ---------------------------
def _post_json(url: str, payload: Any, timeout: float = 60.0) -> Any:
"""Einfacher HTTP-Client ohne externe Abhängigkeiten."""
import urllib.request
import urllib.error
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
try: try:
with urllib.request.urlopen(req, timeout=timeout) as resp: from app.core.type_registry import load_type_registry # type: ignore
return json.loads(resp.read().decode("utf-8")) except Exception:
except urllib.error.URLError as e: def load_type_registry(_path: str = "config/types.yaml") -> dict:
raise RuntimeError(f"embed http error: {e}") return {}
def _embed_texts(url: str, texts: List[str], batch: int = 16) -> List[List[float]]: # ----------------- Hash/Änderungslogik ----------------- #
out: List[List[float]] = []
for i in range(0, len(texts), batch):
chunk = texts[i:i+batch]
resp = _post_json(url, {"inputs": chunk})
vectors = resp.get("embeddings") or resp.get("data") or resp # flexibel
if not isinstance(vectors, list):
raise RuntimeError("embed response malformed")
out.extend(vectors)
return out
# --------------------------- def _env(k: str, default: str) -> str:
# Skip-Regeln & Dateiauswahl v = os.environ.get(k, "").strip()
# --------------------------- return v or default
SILVERBULLET_BASENAMES = {"CONFIG.md", "index.md"} # werden explizit übersprungen
def _should_skip_md(path: str) -> bool: HASH_COMPARE = _env("MINDNET_HASH_COMPARE", "Body") # Body|Full|Frontmatter|Body+Frontmatter
base = os.path.basename(path).lower() HASH_SOURCE = _env("MINDNET_HASH_SOURCE", "parsed") # parsed|raw
if base in {b.lower() for b in SILVERBULLET_BASENAMES}: HASH_NORMALIZE = _env("MINDNET_HASH_NORMALIZE", "canonical") # canonical|none
return True
return False
def _list_md_files(root: str, include: Optional[str] = None, exclude: Optional[str] = None) -> List[str]: import hashlib
files: List[str] = []
inc_re = re.compile(include) if include else None
exc_re = re.compile(exclude) if exclude else None
for dirpath, _, filenames in os.walk(root): def _normalize_text(s: str) -> str:
for fn in filenames: if HASH_NORMALIZE.lower() != "canonical":
if not fn.lower().endswith(".md"): return s
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = s.replace("\x00", "")
s = "\n".join(line.rstrip() for line in s.split("\n"))
return s
def _sha256_hex(s: str) -> str:
return hashlib.sha256(s.encode("utf-8", errors="ignore")).hexdigest()
def _hash_signature_from_parsed(parsed) -> Dict[str, str]:
fm = parsed.frontmatter or {}
fulltext = (parsed.body_full or parsed.body or "") if HASH_SOURCE.lower() == "parsed" else (getattr(parsed, "raw", "") or "")
front = json.dumps(fm, sort_keys=True, ensure_ascii=False)
fulltext_n = _normalize_text(fulltext)
body_n = _normalize_text(parsed.body or "")
front_n = _normalize_text(front)
return {
"hash_fulltext": _sha256_hex(fulltext_n),
"hash_body": _sha256_hex(body_n),
"hash_frontmatter": _sha256_hex(front_n),
}
def _is_changed(prior: Dict[str, str], now: Dict[str, str]) -> Tuple[bool, str]:
mode = HASH_COMPARE.lower()
if mode == "body":
return (prior.get("hash_body") != now.get("hash_body"), "body")
if mode == "frontmatter":
return (prior.get("hash_frontmatter") != now.get("hash_frontmatter"), "frontmatter")
if mode == "full":
return (prior.get("hash_fulltext") != now.get("hash_fulltext"), "full")
if mode == "body+frontmatter":
a = prior.get("hash_body") != now.get("hash_body")
b = prior.get("hash_frontmatter") != now.get("hash_frontmatter")
return (a or b, "body+frontmatter")
return (prior.get("hash_body") != now.get("hash_body"), "body")
# ----------------- CLI ----------------- #
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(prog="import_markdown.py", description="Importiert einen Obsidian-Vault nach Qdrant (Notes/Chunks/Edges).")
p.add_argument("--vault", required=True, help="Pfad zum Vault-Root (enthält .md-Dateien)")
p.add_argument("--apply", action="store_true", help="Änderungen wirklich schreiben (ohne: Dry-Run)")
p.add_argument("--purge-before-upsert", action="store_true", help="Bei geänderten Notes zugehörige Chunks/Edges vorher löschen")
p.add_argument("--note-scope-refs", action="store_true", help="Erzeuge zusätzlich note-scope references/backlink")
p.add_argument("--sync-deletes", action="store_true", help="Qdrant-Punkte löschen, die im Vault nicht mehr existieren (nur mit --apply)")
p.add_argument("--baseline-modes", action="store_true", help="Persistiert Hash-Felder (Full/Body/Frontmatter) als Baseline")
p.add_argument("--prefix", default="", help="Collections-Prefix; überschreibt ENV COLLECTION_PREFIX")
return p.parse_args()
# ----------------- Files & Paths ----------------- #
def _iter_md(vault_root: Path) -> List[Path]:
files = []
for p in vault_root.rglob("*.md"):
name = p.name.lower()
# bekannte Nicht-Vault-Dateien (Silverbullet etc.) ignorieren:
if name in ("config.md", "index.md"):
continue continue
full = os.path.join(dirpath, fn) files.append(p)
rel = os.path.relpath(full, root).replace("\\", "/")
if _should_skip_md(full):
continue
if inc_re and not inc_re.search(rel):
continue
if exc_re and exc_re.search(rel):
continue
files.append(full)
files.sort() files.sort()
return files return files
# --------------------------- def _rel_path(root: Path, p: Path) -> str:
# CLI rel = str(p.relative_to(root)).replace("\\", "/")
# --------------------------- while rel.startswith("/"):
def _args() -> argparse.Namespace: rel = rel[1:]
ap = argparse.ArgumentParser(description="Import Obsidian Markdown → Qdrant (Notes/Chunks/Edges).") return rel
gsrc = ap.add_mutually_exclusive_group(required=True)
gsrc.add_argument("--vault", help="Root-Verzeichnis des Vaults")
gsrc.add_argument("--path", help="Nur eine einzelne Markdown-Datei importieren")
ap.add_argument("--prefix", help="Collection-Prefix (ENV: COLLECTION_PREFIX, Default: mindnet)") # ----------------- Main ----------------- #
ap.add_argument("--apply", action="store_true", help="Änderungen in Qdrant schreiben (sonst Dry-Run)")
ap.add_argument("--purge-before-upsert", action="store_true", help="Bei geänderter Note: alte Chunks/Edges löschen (nur diese Note)")
ap.add_argument("--note-scope-refs", action="store_true", help="Auch Note-Scope 'references' + 'backlink' erzeugen")
ap.add_argument("--baseline-modes", action="store_true", help="Fehlende Hash-Signaturen initial speichern")
# Filter def main():
ap.add_argument("--include", help="Regex auf Relativpfad (nur passende Dateien)") args = parse_args()
ap.add_argument("--exclude", help="Regex auf Relativpfad (diese Dateien überspringen)") vault_root = Path(args.vault).resolve()
if not vault_root.exists():
print(json.dumps({"error": "vault_not_found", "path": str(vault_root)}))
sys.exit(2)
# Validierung prefix = args.prefix.strip() or os.environ.get("COLLECTION_PREFIX", "").strip() or "mindnet"
ap.add_argument("--validate-schemas", action="store_true", help="JSON-Schemata prüfen (leichtgewichtig)") cfg = QdrantConfig.from_env(prefix=prefix)
ap.add_argument("--note-schema", help="Pfad zu note.schema.json") client = get_client(cfg)
ap.add_argument("--chunk-schema", help="Pfad zu chunk.schema.json") ensure_collections(client, cfg)
ap.add_argument("--edge-schema", help="Pfad zu edge.schema.json")
# Embeddings (optional) # Preload vorhandene Notes (IDs) und Registry
ap.add_argument("--with-embeddings", action="store_true", help="Embeddings für Note & Chunks erzeugen") existing_note_ids = set(qdrant_list_note_ids(client, cfg.notes))
ap.add_argument("--embed-url", help="Override EMBED_URL (Default aus ENV)") type_reg = load_type_registry("config/types.yaml") or {}
ap.add_argument("--embed-batch", type=int, default=int(os.environ.get("EMBED_BATCH", "16")), help="Embedding-Batchgröße")
ap.add_argument("--version", action="store_true", help="Version anzeigen und beenden") paths = _iter_md(vault_root)
return ap.parse_args() seen_note_ids: List[str] = []
# --------------------------- for p in paths:
# Hauptlogik parsed = read_markdown(str(p))
# --------------------------- fm = parsed.frontmatter or {}
def main() -> None: note_id = str(fm.get("id") or "").strip()
args = _args() if not note_id:
if args.version: print(json.dumps({"path": str(p), "error": "missing_frontmatter_id"}))
print(f"import_markdown.py {__version__}")
sys.exit(0)
# Qdrant
prefix = args.prefix or os.environ.get("COLLECTION_PREFIX", "mindnet")
qc = QdrantConfig.from_env_or_default()
client = get_client(qc)
notes_col, chunks_col, edges_col = ensure_collections_for_prefix(client, prefix)
# Type-Registry (optional, fällt auf Default zurück)
type_reg = load_type_registry(silent=True)
# Hash-Modus aktiv
compare = DEFAULT_COMPARE
source = DEFAULT_SOURCE
norm = DEFAULT_NORM
active_key = _active_hash_key(compare, source, norm)
# Schemata (optional)
note_schema = _load_json(args.note_schema) if args.validate_schemas else None
chunk_schema = _load_json(args.chunk_schema) if args.validate_schemas else None
edge_schema = _load_json(args.edge_schema) if args.validate_schemas else None
# Embeddings (optional)
embed_enabled = bool(args.with_embeddings)
embed_url = args.embed_url or os.environ.get("EMBED_URL", "").strip()
if embed_enabled and not embed_url:
print(json.dumps({"warn": "with-embeddings active, but EMBED_URL not configured — embeddings skipped"}))
embed_enabled = False
# Dateiliste
files: List[str] = []
if args.path:
if not os.path.isfile(args.path):
print(json.dumps({"path": args.path, "error": "not a file"}))
sys.exit(1)
if _should_skip_md(args.path):
print(json.dumps({"path": args.path, "skipped": "by rule"}))
sys.exit(0)
files = [os.path.abspath(args.path)]
vault_root = os.path.dirname(os.path.abspath(args.path))
else:
if not os.path.isdir(args.vault):
print(json.dumps({"vault": args.vault, "error": "not a directory"}))
sys.exit(1)
vault_root = os.path.abspath(args.vault)
files = _list_md_files(vault_root, include=args.include, exclude=args.exclude)
processed = 0
stats = {"notes": 0, "chunks": 0, "edges": 0, "changed": 0, "skipped": 0, "embedded": 0}
for path in files:
rel_path = os.path.relpath(path, vault_root).replace("\\", "/")
parsed = read_markdown(path)
# Note-Payload (inkl. fulltext, hashes[...] etc.)
note_pl = make_note_payload(parsed, vault_root=vault_root)
if not isinstance(note_pl, dict):
print(json.dumps({
"path": path, "note_id": getattr(parsed, "id", "<unknown>"),
"error": "make_note_payload returned non-dict", "returned_type": type(note_pl).__name__
}))
stats["skipped"] += 1
continue continue
# Exclude via Frontmatter? # Type-Registry (soft)
if str(note_pl.get("embedding_exclude", "false")).lower() in {"1", "true", "yes"}: note_type = str(fm.get("type", "") or "").lower()
# wir importieren dennoch Note/Chunks/Edges, aber **ohne** Embeddings if note_type and type_reg and note_type not in (type_reg.get("types") or {}):
embedding_allowed = False print(json.dumps({"note_id": note_id, "warn": f"unknown_type_in_registry:{note_type}", "fallback": "no-op"}))
rel_path = _rel_path(vault_root, p)
# Note-Payload
note_pl = make_note_payload(parsed, vault_root=str(vault_root))
if not isinstance(note_pl, dict):
note_pl = {"note_id": note_id, "path": rel_path, "title": fm.get("title", ""), "status": fm.get("status", "draft"), "tags": fm.get("tags", [])}
# Fulltext beilegen (für Export-Roundtrip)
note_pl["fulltext"] = parsed.body_full or parsed.body or ""
# Hashes berechnen + Signatur beschreiben
now_hashes = _hash_signature_from_parsed(parsed)
note_pl.update(now_hashes)
note_pl["hash_signature"] = f"{HASH_COMPARE.lower()}:{HASH_SOURCE.lower()}:{HASH_NORMALIZE.lower()}:{now_hashes.get('hash_body','')}"
# Vorherige Hashes aus Qdrant (falls vorhanden) holen, um echte Änderung zu erkennen
prior_hashes = {}
if note_id in existing_note_ids:
try:
existing = fetch_one_note(client, cfg, note_id)
if isinstance(existing, dict):
for k in ("hash_fulltext", "hash_body", "hash_frontmatter"):
if k in existing:
prior_hashes[k] = existing[k]
except Exception:
prior_hashes = {}
# Änderung?
if prior_hashes:
changed, mode_used = _is_changed(prior_hashes, now_hashes)
else: else:
embedding_allowed = True changed, mode_used = (True, HASH_COMPARE.lower())
# Type-Profil # Chunks bauen (Chunker liefert ggf. windows; sonst wird window synthetisch in chunk_payload erzeugt)
note_type = str(note_pl.get("type", "concept") or "concept") chunks = parsed.chunks or []
profile = type_reg.get("types", {}).get(note_type, {}).get("chunk_profile", None) chunk_payloads = make_chunk_payloads(fm, rel_path, chunks, note_text=parsed.body or "")
# Chunks erzeugen # Edges ableiten
chunks = make_chunk_payloads( edges = build_edges_for_note(
note_id=note_pl["note_id"], note_id=note_id,
body=note_pl.get("fulltext", ""), chunks=chunk_payloads,
note_type=note_type, note_level_references=fm.get("references", None),
profile=profile include_note_scope_refs=args.note_scope_refs,
) )
# Edges
edges: List[Dict[str, Any]] = []
try:
edges = build_edges_for_note(note_payload=note_pl, chunks=chunks, add_note_scope_refs=args.note_scope_refs)
except Exception as e:
print(json.dumps({
"path": path, "note_id": note_pl["note_id"],
"error": f"build_edges_for_note failed: {getattr(e, 'args', [''])[0]}"
}))
edges = []
# Schema-Checks (weich)
if args.validate_schemas:
n_err = _validate(note_pl, note_schema, "note")
for c in chunks:
n_err += _validate(c, chunk_schema, "chunk")
for ed in edges:
n_err += _validate(ed, edge_schema, "edge")
if n_err:
print(json.dumps({"note_id": note_pl["note_id"], "schema_warnings": n_err}, ensure_ascii=False))
# Hash-Vergleich
prev_sig = fetch_note_hash_signature(client, notes_col, note_pl["note_id"], active_key)
curr_sig = note_pl.get("hashes", {}).get(active_key, "")
is_changed = (prev_sig != curr_sig)
# Baseline: fehlende aktive Signatur speichern
if args.baseline_modes and not prev_sig and curr_sig and args.apply:
store_note_hashes_signature(client, notes_col, note_pl["note_id"], active_key, curr_sig)
# Embeddings (optional; erst NACH Änderungserkennung, um unnötige Calls zu sparen)
if embed_enabled and embedding_allowed:
try:
texts = [note_pl.get("fulltext", "")]
note_vecs = _embed_texts(embed_url, texts, batch=max(1, int(args.embed_batch)))
note_pl["embedding"] = note_vecs[0] if note_vecs else None
# Chunk-Embeddings
chunk_texts = [c.get("window") or c.get("text") or "" for c in chunks]
if chunk_texts:
chunk_vecs = _embed_texts(embed_url, chunk_texts, batch=max(1, int(args.embed_batch)))
for c, v in zip(chunks, chunk_vecs):
c["embedding"] = v
stats["embedded"] += 1
except Exception as e:
print(json.dumps({"note_id": note_pl["note_id"], "warn": f"embedding failed: {e}"}))
# Apply/Upsert
decision = "dry-run"
if args.apply:
if is_changed and args.purge_before_upsert:
delete_chunks_of_note(client, chunks_col, note_pl["note_id"])
delete_edges_of_note(client, edges_col, note_pl["note_id"])
upsert_notes(client, notes_col, [note_pl])
if chunks:
upsert_chunks(client, chunks_col, chunks)
if edges:
upsert_edges(client, edges_col, edges)
if curr_sig:
store_note_hashes_signature(client, notes_col, note_pl["note_id"], active_key, curr_sig)
decision = ("apply" if is_changed else "apply-skip-unchanged")
else:
decision = "dry-run"
# Log # Log
print(json.dumps({ print(json.dumps({
"note_id": note_pl["note_id"], "note_id": note_id,
"title": note_pl.get("title"), "title": fm.get("title", ""),
"chunks": len(chunks), "chunks": len(chunk_payloads),
"edges": len(edges), "edges": len(edges),
"changed": bool(is_changed), "changed": bool(changed),
"decision": decision, "decision": ("apply" if args.apply else "dry-run") if changed else ("apply-skip-unchanged" if args.apply else "dry-run"),
"path": rel_path, "path": rel_path,
"hash_mode": compare, "hash_mode": HASH_COMPARE,
"hash_normalize": norm, "hash_normalize": HASH_NORMALIZE,
"hash_source": source, "hash_source": HASH_SOURCE,
"prefix": prefix "prefix": prefix
}, ensure_ascii=False)) }, ensure_ascii=False))
stats["notes"] += 1 seen_note_ids.append(note_id)
stats["chunks"] += len(chunks) if not args.apply:
stats["edges"] += len(edges) continue
if is_changed:
stats["changed"] += 1
processed += 1
print(f"Done. Processed notes: {processed}") if changed and args.purge_before_upsert:
print(json.dumps({"stats": stats}, ensure_ascii=False)) delete_by_note(client, cfg, note_id)
upsert_notes(client, cfg, [note_pl])
if chunk_payloads:
upsert_chunks(client, cfg, chunk_payloads)
if edges:
upsert_edges(client, cfg, edges)
if args.baseline_modes:
# Hash-Baseline ist bereits in note_pl persistiert keine Zusatzaktion nötig.
pass
# Sync-Deletes (optional)
if args.sync_deletes:
vault_ids = set(seen_note_ids)
to_delete = sorted(existing_note_ids - vault_ids)
print(json.dumps({"sync_deletes_preview": len(to_delete), "items": to_delete[:50]}, ensure_ascii=False))
if args.apply:
for nid in to_delete:
delete_by_note(client, cfg, nid)
# Zusammenfassung
counts = count_points(client, cfg)
print(json.dumps({"prefix": prefix, "collections": {"notes": cfg.notes, "chunks": cfg.chunks, "edges": cfg.edges}, "counts": counts}, ensure_ascii=False))
if __name__ == "__main__": if __name__ == "__main__":