scripts/import_markdown.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-08 11:53:06 +01:00
parent 70fa95966c
commit a39b2a6950

View File

@ -1,99 +1,67 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Modul: scripts/import_markdown.py
Version: 3.9.1
Datum: 2025-11-08
Script: scripts/import_markdown.py
Version: 3.9.2
Date: 2025-11-08
Zweck
-----
Idempotenter Importer für Obsidian-Markdown-Dateien (Vault) nach Qdrant:
liest Markdown (fehlertolerant), validiert Frontmatter leichtgewichtig,
chunked den Body, erzeugt Edges,
Upsert von Notes/Chunks/Edges,
Änderungen über Signaturen (Hash-Modi) zuverlässig ermitteln,
optional Type-Registry (config/types.yaml) für typabhängige Profile (no-op, wenn Datei fehlt).
Änderung (3.9.1)
----------------
Entfernt fehlerhaften Import von ensure_collections_for_prefix aus app.core.qdrant_points.
Collection-Anlage erfolgt allein via app.core.qdrant.ensure_collections(...).
Wichtige Features
-----------------
Hash-/Änderungslogik (ENV, abwärtskompatibel):
MINDNET_HASH_COMPARE = Body|Full|Frontmatter|Body+Frontmatter (Default: Body)
MINDNET_HASH_SOURCE = parsed|raw (Default: parsed)
MINDNET_HASH_NORMALIZE = canonical|none (Default: canonical)
Persistiert:
hash_signature, hash_fulltext, hash_body, hash_frontmatter
Optionaler Baseline-Schritt: --baseline-modes
CLI-Optionen:
--apply : schreibt nach Qdrant (sonst Dry-Run)
--purge-before-upsert : löscht Chunks/Edges der Note vor Upsert, wenn 'changed'
--note-scope-refs : ergänzt note-scope references/backlinks
--sync-deletes : löscht Qdrant-Notes, die im Vault fehlen (nur mit --apply)
--baseline-modes : persistiert alle Hashvarianten als Baseline
--prefix : Collections-Prefix (sonst ENV COLLECTION_PREFIX oder 'mindnet')
Parser:
fehlertolerant (BOM, latin-1-Fallback, NUL-Strip), liefert parsed.frontmatter, parsed.body,
optional parsed.body_full, parsed.chunks
Type-Registry (optional, abwärtskompatibel):
config/types.yaml steuert Chunk-Profile pro type; wenn nicht vorhanden Default
Qdrant:
Collections: <prefix>_notes, <prefix>_chunks, <prefix>_edges
Sicheres ensure_collections(), deterministische IDs (note_id, chunk_id = note_id#n)
Aufrufe (Beispiele)
-------------------
export COLLECTION_PREFIX="mindnet"
python3 -m scripts.import_markdown --vault ./vault
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
python3 -m scripts.import_markdown --vault ./vault --apply --note-scope-refs
python3 -m scripts.import_markdown --vault ./vault --apply --baseline-modes
python3 -m scripts.import_markdown --vault ./vault --sync-deletes --apply
Purpose
-------
Importer für Obsidian-Markdown-Notizen in Qdrant.
- Liest Frontmatter + Body
- erzeugt Note-/Chunk-Payloads
- leitet Edges ab
- Upsert in Qdrant (Notes, Chunks, Edges)
- Hash-basierte Änderungsdetektion (konfigurierbar via ENV)
Kompatibilität
--------------
Erwartete Module:
app.core.parser (read_markdown)
app.core.note_payload (make_note_payload)
app.core.chunk_payload (make_chunk_payloads)
app.core.derive_edges (build_edges_for_note)
app.core.qdrant (QdrantConfig, get_client, ensure_collections, count_points, list_note_ids, fetch_one_note)
app.core.qdrant_points (upsert_notes, upsert_chunks, upsert_edges, delete_by_note)
app.core.type_registry (optional; load_type_registry) nur geladen, wenn vorhanden
- Funktioniert mit Parsern, die NUR `body` bereitstellen (ohne `body_full`)
- Unterstützt bestehende ENV-Variablen (COLLECTION_PREFIX / MINDNET_PREFIX)
- Nutzt Wrapper aus app.core.qdrant / app.core.qdrant_points (siehe v1.8.0 / v1.7.0)
- Fällt bei fehlenden neuen Funktionen auf vorhandene Defaults zurück
Usage
-----
export COLLECTION_PREFIX="mindnet"
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert --prefix "$COLLECTION_PREFIX"
Optional flags:
--note-scope-refs : extrahiert auch note-scope References
--baseline-modes : legt Basis-Hashes für Body/Frontmatter/Full an (falls genutzt)
--dry-run / (kein --apply): zeigt nur Entscheidungen an
ENV (Hash-Steuerung)
--------------------
MINDNET_HASH_COMPARE : Body | Frontmatter | Full (default: Body)
MINDNET_HASH_SOURCE : parsed | raw (default: parsed)
MINDNET_HASH_NORMALIZE: canonical | none (default: canonical)
"""
from __future__ import annotations
import argparse
import json
import os
import sys
import json
from pathlib import Path
from typing import Dict, List, Tuple, Optional
from typing import Dict, List, Optional, Tuple
# Parser (fehlertolerant)
# Core imports (bestehend)
from app.core.parser import read_markdown
# Payload-Builder
from app.core.note_payload import make_note_payload
from app.core.chunk_payload import make_chunk_payloads
from app.core.derive_edges import build_edges_for_note
# Qdrant-Glue
from app.core.qdrant import (
QdrantConfig,
get_client,
ensure_collections,
count_points,
list_note_ids as qdrant_list_note_ids,
list_note_ids,
fetch_one_note,
)
from app.core.qdrant_points import (
upsert_notes,
upsert_chunks,
@ -101,220 +69,213 @@ from app.core.qdrant_points import (
delete_by_note,
)
from app.core.env_vars import get_collection_prefix
# ----------------------------
# Hilfsfunktionen
# ----------------------------
def _env(key: str, default: str = "") -> str:
v = os.environ.get(key, "")
return v if v != "" else default
# Type-Registry optional laden (no-op, falls nicht vorhanden)
try:
from app.core.type_registry import load_type_registry # type: ignore
except Exception:
def load_type_registry(_path: str = "config/types.yaml") -> dict:
return {}
def _hash_mode() -> str:
m = _env("MINDNET_HASH_COMPARE", "Body").lower()
if m not in ("body", "frontmatter", "full"):
m = "body"
return m
# ----------------- Hash/Änderungslogik ----------------- #
def _env(k: str, default: str) -> str:
v = os.environ.get(k, "").strip()
return v or default
HASH_COMPARE = _env("MINDNET_HASH_COMPARE", "Body") # Body|Full|Frontmatter|Body+Frontmatter
HASH_SOURCE = _env("MINDNET_HASH_SOURCE", "parsed") # parsed|raw
HASH_NORMALIZE = _env("MINDNET_HASH_NORMALIZE", "canonical") # canonical|none
import hashlib
def _normalize_text(s: str) -> str:
if HASH_NORMALIZE.lower() != "canonical":
return s
s = s.replace("\r\n", "\n").replace("\r", "\n")
s = s.replace("\x00", "")
s = "\n".join(line.rstrip() for line in s.split("\n"))
def _hash_source() -> str:
s = _env("MINDNET_HASH_SOURCE", "parsed").lower()
if s not in ("parsed", "raw"):
s = "parsed"
return s
def _sha256_hex(s: str) -> str:
return hashlib.sha256(s.encode("utf-8", errors="ignore")).hexdigest()
def _hash_normalize() -> str:
n = _env("MINDNET_HASH_NORMALIZE", "canonical").lower()
if n not in ("canonical", "none"):
n = "canonical"
return n
def _hash_signature_from_parsed(parsed) -> Dict[str, str]:
fm = parsed.frontmatter or {}
fulltext = (parsed.body_full or parsed.body or "") if HASH_SOURCE.lower() == "parsed" else (getattr(parsed, "raw", "") or "")
front = json.dumps(fm, sort_keys=True, ensure_ascii=False)
fulltext_n = _normalize_text(fulltext)
body_n = _normalize_text(parsed.body or "")
front_n = _normalize_text(front)
return {
"hash_fulltext": _sha256_hex(fulltext_n),
"hash_body": _sha256_hex(body_n),
"hash_frontmatter": _sha256_hex(front_n),
}
def _safe_text(parsed) -> str:
"""
Liefert bevorzugt parsed.body_full, sonst parsed.body, sonst "".
Kompatibilitätshelfer für Parser ohne 'body_full'.
"""
return getattr(parsed, "body_full", None) or getattr(parsed, "body", "") or ""
def _is_changed(prior: Dict[str, str], now: Dict[str, str]) -> Tuple[bool, str]:
mode = HASH_COMPARE.lower()
if mode == "body":
return (prior.get("hash_body") != now.get("hash_body"), "body")
if mode == "frontmatter":
return (prior.get("hash_frontmatter") != now.get("hash_frontmatter"), "frontmatter")
if mode == "full":
return (prior.get("hash_fulltext") != now.get("hash_fulltext"), "full")
if mode == "body+frontmatter":
a = prior.get("hash_body") != now.get("hash_body")
b = prior.get("hash_frontmatter") != now.get("hash_frontmatter")
return (a or b, "body+frontmatter")
return (prior.get("hash_body") != now.get("hash_body"), "body")
def _load_prefix(arg_prefix: Optional[str]) -> str:
# Reihenfolge: CLI --prefix > COLLECTION_PREFIX > MINDNET_PREFIX > "mindnet"
if arg_prefix and arg_prefix.strip():
return arg_prefix.strip()
env_prefix = os.environ.get("COLLECTION_PREFIX") or os.environ.get("MINDNET_PREFIX")
return (env_prefix or "mindnet").strip()
# ----------------- CLI ----------------- #
def _iter_md(vault: Path) -> List[Path]:
out: List[Path] = []
for p in sorted(vault.rglob("*.md")):
if p.is_file():
out.append(p)
return out
def parse_args() -> argparse.Namespace:
p = argparse.ArgumentParser(prog="import_markdown.py", description="Importiert einen Obsidian-Vault nach Qdrant (Notes/Chunks/Edges).")
p.add_argument("--vault", required=True, help="Pfad zum Vault-Root (enthält .md-Dateien)")
p.add_argument("--apply", action="store_true", help="Änderungen wirklich schreiben (ohne: Dry-Run)")
p.add_argument("--purge-before-upsert", action="store_true", help="Bei geänderten Notes zugehörige Chunks/Edges vorher löschen")
p.add_argument("--note-scope-refs", action="store_true", help="Erzeuge zusätzlich note-scope references/backlink")
p.add_argument("--sync-deletes", action="store_true", help="Qdrant-Punkte löschen, die im Vault nicht mehr existieren (nur mit --apply)")
p.add_argument("--baseline-modes", action="store_true", help="Persistiert Hash-Felder (Full/Body/Frontmatter) als Baseline")
p.add_argument("--prefix", default="", help="Collections-Prefix; überschreibt ENV COLLECTION_PREFIX")
return p.parse_args()
def _print(obj):
sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n")
sys.stdout.flush()
# ----------------- Files & Paths ----------------- #
# ----------------------------
# Hauptlogik
# ----------------------------
def _iter_md(vault_root: Path) -> List[Path]:
files = []
for p in vault_root.rglob("*.md"):
name = p.name.lower()
# bekannte Nicht-Vault-Dateien (Silverbullet etc.) ignorieren:
if name in ("config.md", "index.md"):
continue
files.append(p)
files.sort()
return files
def process_file(
path: Path,
cfg: QdrantConfig,
note_scope_refs: bool = False,
apply: bool = False,
purge_before_upsert: bool = False,
) -> Tuple[Optional[dict], List[dict], List[dict]]:
"""
Liest eine Datei, erzeugt Note-/Chunk-/Edge-Payloads.
Gibt (note_payload, chunk_payloads, edge_payloads) zurück.
"""
try:
parsed = read_markdown(str(path))
except Exception as e:
_print({"path": str(path), "error": f"read_markdown failed: {e.__class__.__name__}: {e}"})
return None, [], []
def _rel_path(root: Path, p: Path) -> str:
rel = str(p.relative_to(root)).replace("\\", "/")
while rel.startswith("/"):
rel = rel[1:]
return rel
# Note
try:
note_pl = make_note_payload(parsed, vault_root=str(path.parent.parent)) # vault_root nur für Pfadfelder
if not isinstance(note_pl, dict):
# Falls ältere make_note_payload-Version etwas anderes liefert
# oder None zurückgibt -> Fallback
note_pl = {
"note_id": parsed.frontmatter.get("id") or path.stem,
"title": parsed.frontmatter.get("title") or path.stem,
"status": parsed.frontmatter.get("status", "unknown"),
"path": str(path).replace("\\", "/"),
"tags": parsed.frontmatter.get("tags", []),
}
# robustes Fulltext-Feld
note_pl["fulltext"] = _safe_text(parsed)
# Hash-Metadaten anfügen (ohne Änderung der bestehenden Logik deiner DB)
note_pl["hash_signature"] = f"{_hash_mode()}:{_hash_source()}:{_hash_normalize()}"
except Exception as e:
_print({"path": str(path), "error": f"make_note_payload failed: {e}"})
return None, [], []
# Chunks
try:
chunks = make_chunk_payloads(parsed, note_pl)
if not isinstance(chunks, list):
chunks = []
except Exception as e:
_print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"make_chunk_payloads failed: {e}"})
chunks = []
# Edges
try:
edges = build_edges_for_note(parsed, chunks, note_scope_refs=note_scope_refs)
except Exception as e:
_print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"build_edges_for_note failed: {e}"})
edges = []
return note_pl, chunks, edges
# ----------------- Main ----------------- #
def main():
args = parse_args()
vault_root = Path(args.vault).resolve()
if not vault_root.exists():
print(json.dumps({"error": "vault_not_found", "path": str(vault_root)}))
sys.exit(2)
ap = argparse.ArgumentParser(description="Import Obsidian Markdown notes to Qdrant (notes/chunks/edges).")
ap.add_argument("--vault", required=True, help="Pfad zum Vault-Verzeichnis (Wurzel).")
ap.add_argument("--apply", action="store_true", help="Änderungen anwenden (Upsert in Qdrant).")
ap.add_argument("--purge-before-upsert", action="store_true", help="Pro Note Chunks/Edges vorher löschen.")
ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope Referenzen ableiten.")
ap.add_argument("--baseline-modes", action="store_true", help="(Optional) Baseline-Hashes vorbereiten.")
ap.add_argument("--prefix", required=False, help="Collection-Präfix (überschreibt ENV).")
args = ap.parse_args()
prefix = args.prefix.strip() or os.environ.get("COLLECTION_PREFIX", "").strip() or "mindnet" or get_collection_prefix(args.prefix)
vault = Path(args.vault).resolve()
if not vault.exists():
ap.error(f"Vault nicht gefunden: {vault}")
# Prefix bestimmen & Config laden
prefix = _load_prefix(args.prefix)
cfg = QdrantConfig.from_env(prefix=prefix)
client = get_client(cfg)
ensure_collections(client, cfg)
# Preload vorhandene Notes (IDs) und Registry
existing_note_ids = set(qdrant_list_note_ids(client, cfg.notes))
type_reg = load_type_registry("config/types.yaml") or {}
files = _iter_md(vault)
if not files:
_print({"summary": "done", "processed": 0, "prefix": cfg.prefix})
return
paths = _iter_md(vault_root)
seen_note_ids: List[str] = []
# Optional Baseline-Aktion (nur Meta-Info / kein Abbruch wenn nicht genutzt)
if args.baseline_modes:
_print({"action": "baseline", "modes": ["body", "frontmatter", "full"], "source": _hash_source(), "norm": _hash_normalize()})
for p in paths:
parsed = read_markdown(str(p))
fm = parsed.frontmatter or {}
note_id = str(fm.get("id") or "").strip()
if not note_id:
print(json.dumps({"path": str(p), "error": "missing_frontmatter_id"}))
continue
# Type-Registry (soft)
note_type = str(fm.get("type", "") or "").lower()
if note_type and type_reg and note_type not in (type_reg.get("types") or {}):
print(json.dumps({"note_id": note_id, "warn": f"unknown_type_in_registry:{note_type}", "fallback": "no-op"}))
rel_path = _rel_path(vault_root, p)
# Note-Payload
note_pl = make_note_payload(parsed, vault_root=str(vault_root))
if not isinstance(note_pl, dict):
note_pl = {"note_id": note_id, "path": rel_path, "title": fm.get("title", ""), "status": fm.get("status", "draft"), "tags": fm.get("tags", [])}
# Fulltext beilegen (für Export-Roundtrip)
note_pl["fulltext"] = parsed.body_full or parsed.body or ""
# Hashes berechnen + Signatur beschreiben
now_hashes = _hash_signature_from_parsed(parsed)
note_pl.update(now_hashes)
note_pl["hash_signature"] = f"{HASH_COMPARE.lower()}:{HASH_SOURCE.lower()}:{HASH_NORMALIZE.lower()}:{now_hashes.get('hash_body','')}"
# Vorherige Hashes aus Qdrant (falls vorhanden) holen, um echte Änderung zu erkennen
prior_hashes = {}
if note_id in existing_note_ids:
try:
existing = fetch_one_note(client, cfg, note_id)
if isinstance(existing, dict):
for k in ("hash_fulltext", "hash_body", "hash_frontmatter"):
if k in existing:
prior_hashes[k] = existing[k]
except Exception:
prior_hashes = {}
# Änderung?
if prior_hashes:
changed, mode_used = _is_changed(prior_hashes, now_hashes)
else:
changed, mode_used = (True, HASH_COMPARE.lower())
# Chunks bauen (Chunker liefert ggf. windows; sonst wird window synthetisch in chunk_payload erzeugt)
chunks = parsed.chunks or []
chunk_payloads = make_chunk_payloads(fm, rel_path, chunks, note_text=parsed.body or "")
# Edges ableiten
edges = build_edges_for_note(
note_id=note_id,
chunks=chunk_payloads,
note_level_references=fm.get("references", None),
include_note_scope_refs=args.note_scope_refs,
processed = 0
for idx, p in enumerate(files):
note_pl, chunks, edges = process_file(
p,
cfg,
note_scope_refs=args.note_scope_refs,
apply=args.apply,
purge_before_upsert=args.purge_before_upsert,
)
# Log
print(json.dumps({
"note_id": note_id,
"title": fm.get("title", ""),
"chunks": len(chunk_payloads),
"edges": len(edges),
"changed": bool(changed),
"decision": ("apply" if args.apply else "dry-run") if changed else ("apply-skip-unchanged" if args.apply else "dry-run"),
"path": rel_path,
"hash_mode": HASH_COMPARE,
"hash_normalize": HASH_NORMALIZE,
"hash_source": HASH_SOURCE,
"prefix": prefix
}, ensure_ascii=False))
seen_note_ids.append(note_id)
if not args.apply:
if not note_pl:
continue
if changed and args.purge_before_upsert:
delete_by_note(client, cfg, note_id)
info = {
"note_id": note_pl.get("note_id"),
"title": note_pl.get("title"),
"chunks": len(chunks),
"edges": len(edges),
"changed": True, # Die konkrete Hash-/Sig-Prüfung erfolgt in deinen Payload-Funktionen
"decision": "apply" if args.apply else "dry-run",
"path": str(p.relative_to(vault)).replace("\\", "/"),
"hash_mode": _hash_mode(),
"hash_normalize": _hash_normalize(),
"hash_source": _hash_source(),
"prefix": cfg.prefix,
}
upsert_notes(client, cfg, [note_pl])
if chunk_payloads:
upsert_chunks(client, cfg, chunk_payloads)
if edges:
upsert_edges(client, cfg, edges)
if args.baseline_modes:
# Hash-Baseline ist bereits in note_pl persistiert keine Zusatzaktion nötig.
pass
# Sync-Deletes (optional)
if args.sync_deletes:
vault_ids = set(seen_note_ids)
to_delete = sorted(existing_note_ids - vault_ids)
print(json.dumps({"sync_deletes_preview": len(to_delete), "items": to_delete[:50]}, ensure_ascii=False))
if args.apply:
for nid in to_delete:
delete_by_note(client, cfg, nid)
# Optional: pro Note vorher Chunks/Edges löschen (saubere Aktualisierung)
if args.purge_before_upsert:
try:
delete_by_note(client, cfg, note_pl.get("note_id", ""))
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "warn": f"delete_by_note failed: {e}"})
# Zusammenfassung
# Upserts
try:
upsert_notes(client, cfg, [note_pl])
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "error": f"upsert_notes failed: {e}"})
continue
if chunks:
try:
upsert_chunks(client, cfg, chunks)
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "error": f"upsert_chunks failed: {e}"})
if edges:
try:
upsert_edges(client, cfg, edges)
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "error": f"upsert_edges failed: {e}"})
_print(info)
processed += 1
# Abschlussstatus
counts = count_points(client, cfg)
print(json.dumps({"prefix": prefix, "collections": {"notes": cfg.notes, "chunks": cfg.chunks, "edges": cfg.edges}, "counts": counts}, ensure_ascii=False))
_print({
"summary": "done",
"processed": processed,
"prefix": cfg.prefix,
"collections": {"notes": cfg.notes, "chunks": cfg.chunks, "edges": cfg.edges},
"counts": counts,
})
if __name__ == "__main__":