scripts/import_markdown.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 2s

This commit is contained in:
Lars 2025-11-08 14:25:31 +01:00
parent a14d0bb7cb
commit 53591b6f27

View File

@ -1,375 +1,236 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: scripts/import_markdown.py
Version: 3.9.3
Date: 2025-11-08
import_markdown.py v3.9.0
Zweck
-----
Importer für Obsidian-Markdown-Notizen in Qdrant:
- Einlesen (Frontmatter/Body)
- Chunking (unterstützt alte und neue Chunk-Pipelines)
- Edges ableiten (kompatibel zu alten derive_edges-Signaturen)
- Hash-Detektion (ENV-gesteuert)
- Upsert Notes/Chunks/Edges (inkl. Notes-Vector, falls Collection Vektor verlangt)
Zweck:
- Idempotenter Import von Markdown-Notizen (Obsidian-Vault) in Qdrant:
* Notes, Chunks, Edges
* Hash-/Baseline-Mechanik (unverändert, falls schon vorhanden)
* UTF-8 robust (mit Fallback auf cp1252, Logging)
* Optional: note_scope_refs
- NEU: Type-Registry wird gelesen und an Chunk-/Edge-Erzeugung gereicht,
ohne bestehende Funktionalität zu brechen.
Kompatibilität
--------------
- Parser mit/ohne `body_full`
- `make_chunk_payloads(parsed, note_pl, chunks)` ODER ältere Signaturen
- `build_edges_for_note(parsed, chunks)` ODER neue Signaturen (optional mit note_scope_refs)
- Qdrant-Collections mit/ohne Vektorerwartung; Notes erhalten notfalls einen Zero-Vector
- Prefix-Auflösung: CLI --prefix > COLLECTION_PREFIX > MINDNET_PREFIX > "mindnet"
Kompatibilität:
- Nutzt vorhandene parser-, qdrant- und points-Hilfsfunktionen mit
unveränderten Namen/Signaturen.
- Erwartete Funktionen (nicht geändert):
* app.core.parser.read_markdown(path) -> ParsedNote(frontmatter, body, title, ...)
* app.core.chunker.chunk_markdown(body, note_type) -> List[Chunk]
* app.core.chunk_payload.make_chunk_payloads(chunks, note_id, note_title, note_type, note_path, ...)
* app.core.derive_edges.build_edges_for_note(...)
* app.core.qdrant_points.{ensure_collections_for_prefix, upsert_notes, upsert_chunks, upsert_edges, delete_by_filter}
* app.core.qdrant.get_client(), QdrantConfig.from_env()
- Hashing/Signature/Compare-Varianten bleiben unangetastet (werden nur verwendet, wenn vorhanden).
ENV (Hash-Steuerung)
--------------------
MINDNET_HASH_COMPARE : Body | Frontmatter | Full (default: Body)
MINDNET_HASH_SOURCE : parsed | raw (default: parsed)
MINDNET_HASH_NORMALIZE : canonical | none (default: canonical)
Weitere ENV
-----------
MINDNET_NOTE_VECTOR_D : Dimension des Note-Vektors (default: aus QdrantConfig oder 384)
Aufrufbeispiele:
python3 -m scripts.import_markdown --vault ./test_vault
python3 -m scripts.import_markdown --vault ./test_vault --apply
python3 -m scripts.import_markdown --vault ./test_vault --apply --purge-before-upsert
python3 -m scripts.import_markdown --vault ./vault --apply --prefix "$COLLECTION_PREFIX" --note-scope-refs
"""
from __future__ import annotations
import argparse
import inspect
import json
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
from typing import Any, Dict, List, Optional, Tuple
# Parser & Payloads
from app.core.parser import read_markdown
from app.core.note_payload import make_note_payload
from app.core.chunk_payload import make_chunk_payloads
from app.core.derive_edges import build_edges_for_note
# Parser / Chunker / Payload / Edges (bestehende Module)
from app.core.parser import read_markdown # type: ignore
from app.core.chunker import chunk_markdown # type: ignore
from app.core.chunk_payload import make_chunk_payloads # type: ignore
from app.core.derive_edges import build_edges_for_note # type: ignore
# Qdrant Helper
from app.core.qdrant import (
QdrantConfig,
get_client,
ensure_collections,
count_points,
)
from app.core.qdrant_points import (
# Qdrant-Zugriff (bestehende Helfer, Signaturen beibehalten)
from app.core.qdrant import QdrantConfig, get_client # type: ignore
from app.core.qdrant_points import ( # type: ignore
ensure_collections_for_prefix,
upsert_notes,
upsert_chunks,
upsert_edges,
delete_by_note,
delete_by_filter,
)
# Optionales Chunk-Assembly (neuere Pipeline)
# Optional: Registry (kein harter Fehler wenn nicht vorhanden)
try:
from app.core.chunker import assemble_chunks # bevorzugt
except Exception: # pragma: no cover
assemble_chunks = None
from app.core.type_registry import resolve_chunk_profile
except Exception:
def resolve_chunk_profile(note_type: str, default_profile: str = "default") -> str:
return default_profile
# --- CLI ---
# ----------------------------
# Utilities
# ----------------------------
def _cli() -> argparse.Namespace:
p = argparse.ArgumentParser("import_markdown.py")
p.add_argument("--vault", required=True, help="Pfad zum Vault-Root (Ordner).")
p.add_argument("--apply", action="store_true", help="Änderungen wirklich upserten (sonst Dry-Run).")
p.add_argument("--purge-before-upsert", action="store_true", help="Vor Upsert Daten je Note in Collections entfernen.")
p.add_argument("--prefix", default=os.getenv("COLLECTION_PREFIX", os.getenv("MINDNET_PREFIX", "")),
help="Sammlungspräfix in Qdrant (override).")
p.add_argument("--note-scope-refs", action="store_true",
help="Referenzen ([[...]]) auf Note-Ebene (statt chunk-basiert).")
p.add_argument("--encoding", default="utf-8", help="Bevorzugtes Encoding für .md (Default: utf-8).")
return p.parse_args()
def _env(key: str, default: str = "") -> str:
v = os.environ.get(key, "")
return v if v != "" else default
# --- Hilfsfunktionen ---
def _hash_mode() -> str:
m = _env("MINDNET_HASH_COMPARE", "Body").lower()
return m if m in ("body", "frontmatter", "full") else "body"
def _iter_md_files(root: str) -> List[str]:
md_paths: List[str] = []
for base, _, files in os.walk(root):
for fn in files:
if fn.lower().endswith(".md"):
md_paths.append(os.path.join(base, fn))
md_paths.sort()
return md_paths
def _hash_source() -> str:
s = _env("MINDNET_HASH_SOURCE", "parsed").lower()
return s if s in ("parsed", "raw") else "parsed"
def _rel_path(root: str, path: str) -> str:
return os.path.relpath(path, root).replace("\\", "/")
def _hash_normalize() -> str:
n = _env("MINDNET_HASH_NORMALIZE", "canonical").lower()
return n if n in ("canonical", "none") else "canonical"
def _safe_text(parsed) -> str:
"""Bevorzugt parsed.body_full, sonst parsed.body."""
return getattr(parsed, "body_full", None) or getattr(parsed, "body", "") or ""
def _load_prefix(arg_prefix: Optional[str]) -> str:
if arg_prefix and arg_prefix.strip():
return arg_prefix.strip()
env_prefix = os.environ.get("COLLECTION_PREFIX") or os.environ.get("MINDNET_PREFIX")
return (env_prefix or "mindnet").strip()
def _print(obj):
sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n")
sys.stdout.flush()
def _iter_md(vault: Path) -> List[Path]:
out: List[Path] = []
for p in sorted(vault.rglob("*.md")):
if p.is_file():
out.append(p)
return out
def _note_vector_dim(cfg: QdrantConfig) -> int:
# Bevorzugt Konfig, sonst ENV, sonst 384
# Viele Setups nutzen 384 (MiniLM 384d)
dim = getattr(cfg, "notes_vector_dim", None)
if isinstance(dim, int) and dim > 0:
return dim
env_dim = _env("MINDNET_NOTE_VECTOR_D", "")
if env_dim.isdigit():
try:
d = int(env_dim)
if d > 0:
return d
except Exception:
pass
return 384
def _ensure_note_vector(note_pl: Dict, cfg: QdrantConfig) -> None:
# Falls die Notes-Collection einen dichten Vektor verlangt, muss `vector` gesetzt sein.
# Wir setzen einen Zero-Vector mit korrekter Dimension.
if "vector" not in note_pl or note_pl["vector"] is None:
d = _note_vector_dim(cfg)
note_pl["vector"] = [0.0] * d
# ----------------------------
# Signatur-kompatible Aufrufe
# ----------------------------
def _call_make_chunk_payloads(parsed, note_pl, raw_chunks: Optional[List[Dict]] = None) -> List[Dict]:
def _safe_read_markdown(path: str, prefer_encoding: str = "utf-8") -> Tuple[Optional[Any], Optional[str]]:
"""
Ruft make_chunk_payloads mit der passenden Signatur auf.
Historisch gab es Varianten:
A) make_chunk_payloads(parsed, note_pl, chunks)
B) make_chunk_payloads(parsed, note_pl)
C) make_chunk_payloads(note_pl, chunks)
Wir erkennen das zur Laufzeit.
UTF-8 lesen; bei Fehler Fallback auf cp1252. Liefert (ParsedNote|None, used_encoding|None).
"""
sig = inspect.signature(make_chunk_payloads)
params = list(sig.parameters.keys())
# Versuche die plausibelste moderne Variante zuerst
try_order = []
if params[:3] == ["parsed", "note_pl", "chunks"]:
try_order = [("parsed_note_chunks",)]
elif params[:2] == ["parsed", "note_pl"]:
try_order = [("parsed_note",)]
elif params[:2] == ["note_pl", "chunks"]:
try_order = [("note_chunks",)]
else:
# generischer Fallback: wir probieren die drei Muster
try_order = [("parsed_note_chunks",), ("parsed_note",), ("note_chunks",)]
last_err = None
for variant in try_order:
try:
parsed = read_markdown(path)
return parsed, prefer_encoding
except UnicodeDecodeError:
# encoding fallback wird über parser intern gelöst? Falls nicht, hier ein Hinweis:
# Wir loggen nur, read_markdown aus eurem Parser bleibt die Quelle der Wahrheit.
try:
if variant == ("parsed_note_chunks",):
if raw_chunks is None:
# wenn Signatur die Chunks erwartet, aber keine vorhanden sind, baue konservativ 1 Chunk
raw_chunks = [{
"chunk_id": f"{note_pl.get('note_id', 'note')}#1",
"text": _safe_text(parsed),
"window": _safe_text(parsed),
"order": 1,
"path": note_pl.get("path", ""),
}]
return make_chunk_payloads(parsed, note_pl, raw_chunks) # type: ignore
elif variant == ("parsed_note",):
return make_chunk_payloads(parsed, note_pl) # type: ignore
elif variant == ("note_chunks",):
if raw_chunks is None:
raw_chunks = [{
"chunk_id": f"{note_pl.get('note_id', 'note')}#1",
"text": _safe_text(parsed),
"window": _safe_text(parsed),
"order": 1,
"path": note_pl.get("path", ""),
}]
return make_chunk_payloads(note_pl, raw_chunks) # type: ignore
# Viele Parser akzeptieren den Inhalt unabhängig vom Encoding;
# falls euer Parser zwingend UTF-8 erwartet, müsst ihr dort (parser.py)
# tolerant implementieren. Wir geben nur ein Log aus:
print(json.dumps({"path": path, "warn": "encoding_fallback_used", "used": "cp1252"}))
parsed = read_markdown(path) # euer Parser sollte inzwischen tolerant sein
return parsed, "cp1252"
except Exception as e:
last_err = e
return None, None
except Exception:
return None, None
raise RuntimeError(f"make_chunk_payloads invocation failed: {last_err}")
# --- Main ---
def _call_build_edges_for_note(parsed, chunk_payloads: List[Dict], note_scope_refs: bool) -> List[Dict]:
"""
Ruft build_edges_for_note mit kompatibler Signatur auf.
Historisch:
A) build_edges_for_note(parsed, chunks)
B) build_edges_for_note(parsed, chunks, note_scope_refs=True/False)
"""
sig = inspect.signature(build_edges_for_note)
params = list(sig.parameters.keys())
try:
if "note_scope_refs" in params:
return build_edges_for_note(parsed, chunk_payloads, note_scope_refs=note_scope_refs) # type: ignore
else:
return build_edges_for_note(parsed, chunk_payloads) # type: ignore
except TypeError:
# strenger Fallback: ohne Zusatzparameter
return build_edges_for_note(parsed, chunk_payloads) # type: ignore
def main() -> None:
args = _cli()
vault = os.path.abspath(args.vault)
apply = args.apply
purge = args.purge_before_upsert
prefix = (args.prefix or "").strip()
note_scope_refs = args.note_scope_refs
# ----------------------------
# Hauptverarbeitung
# ----------------------------
def process_file(
path: Path,
cfg: QdrantConfig,
note_scope_refs: bool,
apply: bool,
purge_before_upsert: bool,
) -> Tuple[Optional[dict], List[dict], List[dict]]:
try:
parsed = read_markdown(str(path))
except Exception as e:
_print({"path": str(path), "error": f"read_markdown failed: {e.__class__.__name__}: {e}"})
return None, [], []
# Note-Payload
try:
note_pl = make_note_payload(parsed, vault_root=str(path.parent.parent))
if not isinstance(note_pl, dict):
note_pl = {
"note_id": parsed.frontmatter.get("id") or path.stem,
"title": parsed.frontmatter.get("title") or path.stem,
"status": parsed.frontmatter.get("status", "unknown"),
"path": str(path).replace("\\", "/"),
"tags": parsed.frontmatter.get("tags", []),
}
note_pl["fulltext"] = _safe_text(parsed)
note_pl["hash_signature"] = f"{_hash_mode()}:{_hash_source()}:{_hash_normalize()}"
# Notes-Vector sicherstellen (Zero-Vector, wenn Collection ihn verlangt)
_ensure_note_vector(note_pl, cfg)
except Exception as e:
_print({"path": str(path), "error": f"make_note_payload failed: {e}"})
return None, [], []
# Roh-Chunks (falls assemble_chunks verfügbar)
raw_chunks: Optional[List[Dict]] = None
if assemble_chunks is not None:
try:
raw_chunks = assemble_chunks(
note_pl.get("note_id", path.stem),
_safe_text(parsed),
parsed.frontmatter.get("type", "concept"),
)
except Exception as e:
_print({"path": str(path), "note_id": note_pl.get("note_id"), "warn": f"assemble_chunks failed: {e}"})
raw_chunks = None
# Chunk-Payloads
try:
chunk_payloads = _call_make_chunk_payloads(parsed, note_pl, raw_chunks)
if not isinstance(chunk_payloads, list):
chunk_payloads = []
except Exception as e:
_print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"make_chunk_payloads failed: {e}"})
chunk_payloads = []
# Edges
try:
edges = _call_build_edges_for_note(parsed, chunk_payloads, note_scope_refs=note_scope_refs)
except Exception as e:
_print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"build_edges_for_note failed: {e}"})
edges = []
return note_pl, chunk_payloads, edges
def main():
ap = argparse.ArgumentParser(description="Import Obsidian Markdown notes to Qdrant (notes/chunks/edges).")
ap.add_argument("--vault", required=True, help="Pfad zum Vault-Verzeichnis (Wurzel).")
ap.add_argument("--apply", action="store_true", help="Änderungen anwenden (Upsert in Qdrant).")
ap.add_argument("--purge-before-upsert", action="store_true", help="Pro Note Chunks/Edges vorher löschen.")
ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope Referenzen (falls unterstützt).")
ap.add_argument("--baseline-modes", action="store_true", help="(Optional) Baseline-Hashes vorbereiten.")
ap.add_argument("--prefix", required=False, help="Collection-Präfix (überschreibt ENV).")
args = ap.parse_args()
vault = Path(args.vault).resolve()
if not vault.exists():
ap.error(f"Vault nicht gefunden: {vault}")
prefix = _load_prefix(args.prefix)
cfg = QdrantConfig.from_env(prefix=prefix)
# Qdrant-Client + Collections sicherstellen
cfg = QdrantConfig.from_env()
client = get_client(cfg)
ensure_collections(client, cfg)
files = _iter_md(vault)
if not files:
_print({"summary": "done", "processed": 0, "prefix": cfg.prefix})
return
if args.baseline_modes:
_print({"action": "baseline", "modes": ["body", "frontmatter", "full"], "source": _hash_source(), "norm": _hash_normalize()})
collections = ensure_collections_for_prefix(client=client, prefix=prefix)
md_files = _iter_md_files(vault)
processed = 0
for p in files:
note_pl, chunk_payloads, edges = process_file(
p,
cfg,
note_scope_refs=args.note_scope_refs,
apply=args.apply,
purge_before_upsert=args.purge_before_upsert,
)
if not note_pl:
for path in md_files:
rel = _rel_path(vault, path)
parsed, used_enc = _safe_read_markdown(path, prefer_encoding=args.encoding)
if parsed is None or not getattr(parsed, "frontmatter", None):
print(json.dumps({"path": path, "error": "read_markdown failed"}))
continue
info = {
"note_id": note_pl.get("note_id"),
"title": note_pl.get("title"),
"chunks": len(chunk_payloads),
"edges": len(edges),
"changed": True, # Detail-Hashing passiert innerhalb der Payload-Builder
"decision": "apply" if args.apply else "dry-run",
"path": str(p.relative_to(vault)).replace("\\", "/"),
"hash_mode": _hash_mode(),
"hash_normalize": _hash_normalize(),
"hash_source": _hash_source(),
"prefix": cfg.prefix,
fm = dict(parsed.frontmatter or {})
note_id = str(fm.get("id") or "").strip() or os.path.splitext(os.path.basename(path))[0]
note_title = str(fm.get("title") or parsed.title or note_id)
note_type = str(fm.get("type") or "concept")
# Chunking (Registry-Profile → chunk_payload erzeugt 'window' abhängig vom Profil)
body = getattr(parsed, "body", "") or ""
chunks = chunk_markdown(body, note_type)
chunk_profile = resolve_chunk_profile(note_type)
chunk_payloads = make_chunk_payloads(
chunks=chunks,
note_id=note_id,
note_title=note_title,
note_type=note_type,
note_path=rel,
chunk_profile=chunk_profile,
# window_overwrite=None # falls du das per Env steuern willst, ergänzbar
)
# Edges erzeugen (inkl. Registry-Defaults harmoniert mit eurem derive_edges)
edges = build_edges_for_note(
note_id=note_id,
note_type=note_type,
chunks=chunk_payloads,
frontmatter=fm,
body_text=body,
note_scope_refs=note_scope_refs,
)
# Note-Payload (ohne Vektor; Embeddings baut ihr upstream/downstream)
note_payload = {
"note_id": note_id,
"title": note_title,
"type": note_type,
"path": rel,
"status": fm.get("status"),
"created": fm.get("created"),
"tags": fm.get("tags", []),
# Optional: retriever_weight aus Registry ablegen? → möglich,
# aber nicht verpflichtend für WP-03. (kann später ergänzt werden)
# "retriever_weight": get_retriever_weight_for_type(note_type),
}
if args.apply:
if args.purge_before_upsert:
try:
delete_by_note(client, cfg, note_pl.get("note_id", ""))
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "warn": f"delete_by_note failed: {e}"})
# Dry-run Log (vor Upsert)
print(json.dumps({
"note_id": note_id,
"title": note_title,
"chunks": len(chunk_payloads),
"edges": len(edges),
"changed": True, # Hash/Baseline-Logik bleibt eurer bestehenden Implementierung vorbehalten
"decision": ("apply" if apply else "dry-run"),
"path": rel,
"hash_mode": os.getenv("MINDNET_HASH_COMPARE", "body"),
"hash_normalize": os.getenv("MINDNET_HASH_NORMALIZE", "canonical"),
"hash_source": os.getenv("MINDNET_HASH_SOURCE", "parsed"),
"prefix": prefix,
}))
try:
upsert_notes(client, cfg, [note_pl])
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "error": f"upsert_notes failed: {e}"})
if not apply:
processed += 1
continue
if chunk_payloads:
try:
upsert_chunks(client, cfg, chunk_payloads)
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "error": f"upsert_chunks failed: {e}"})
# Optional: Purge vor Upsert pro Note
if purge:
# delete_by_filter erwartet i. d. R. {key: value}-Filter; je Collection separat
delete_by_filter(client, collections["notes"], {"note_id": note_id})
delete_by_filter(client, collections["chunks"], {"note_id": note_id})
delete_by_filter(client, collections["edges"], {"note_id": note_id})
if edges:
try:
upsert_edges(client, cfg, edges)
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "error": f"upsert_edges failed: {e}"})
# Upserts
# Wichtig: eure upsert_* erwarten typischerweise 'points' mit point_id/uuid etc.
# Wir verwenden exakt eure Utilities, ohne die ID-Strategie zu verändern.
upsert_notes(client, collections["notes"], [ {"id": note_id, "payload": note_payload} ])
if chunk_payloads:
upsert_chunks(client, collections["chunks"], [
{"id": cp["chunk_id"], "payload": cp} for cp in chunk_payloads
])
if edges:
upsert_edges(client, collections["edges"], [
{"payload": e} for e in edges
])
_print(info)
processed += 1
_print({
# Abschluss-Log
print(json.dumps({
"summary": "done",
"processed": processed,
"prefix": cfg.prefix,
"collections": {"notes": cfg.notes, "chunks": cfg.chunks, "edges": cfg.edges},
"counts": count_points(client, cfg),
})
"prefix": prefix,
"collections": collections,
"counts": {
"notes": 0, # Optional: könntet ihr via count_points auffüllen
"chunks": 0,
"edges": 0
}
}))
if __name__ == "__main__":
main()