scripts/import_markdown.py aktualisiert
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s

This commit is contained in:
Lars 2025-11-08 12:48:31 +01:00
parent a39b2a6950
commit 2f9ce824a0

View File

@ -2,66 +2,60 @@
# -*- coding: utf-8 -*-
"""
Script: scripts/import_markdown.py
Version: 3.9.2
Version: 3.9.3
Date: 2025-11-08
Purpose
-------
Importer für Obsidian-Markdown-Notizen in Qdrant.
- Liest Frontmatter + Body
- erzeugt Note-/Chunk-Payloads
- leitet Edges ab
- Upsert in Qdrant (Notes, Chunks, Edges)
- Hash-basierte Änderungsdetektion (konfigurierbar via ENV)
Zweck
-----
Importer für Obsidian-Markdown-Notizen in Qdrant:
- Einlesen (Frontmatter/Body)
- Chunking (unterstützt alte und neue Chunk-Pipelines)
- Edges ableiten (kompatibel zu alten derive_edges-Signaturen)
- Hash-Detektion (ENV-gesteuert)
- Upsert Notes/Chunks/Edges (inkl. Notes-Vector, falls Collection Vektor verlangt)
Kompatibilität
--------------
- Funktioniert mit Parsern, die NUR `body` bereitstellen (ohne `body_full`)
- Unterstützt bestehende ENV-Variablen (COLLECTION_PREFIX / MINDNET_PREFIX)
- Nutzt Wrapper aus app.core.qdrant / app.core.qdrant_points (siehe v1.8.0 / v1.7.0)
- Fällt bei fehlenden neuen Funktionen auf vorhandene Defaults zurück
Usage
-----
export COLLECTION_PREFIX="mindnet"
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert --prefix "$COLLECTION_PREFIX"
Optional flags:
--note-scope-refs : extrahiert auch note-scope References
--baseline-modes : legt Basis-Hashes für Body/Frontmatter/Full an (falls genutzt)
--dry-run / (kein --apply): zeigt nur Entscheidungen an
- Parser mit/ohne `body_full`
- `make_chunk_payloads(parsed, note_pl, chunks)` ODER ältere Signaturen
- `build_edges_for_note(parsed, chunks)` ODER neue Signaturen (optional mit note_scope_refs)
- Qdrant-Collections mit/ohne Vektorerwartung; Notes erhalten notfalls einen Zero-Vector
- Prefix-Auflösung: CLI --prefix > COLLECTION_PREFIX > MINDNET_PREFIX > "mindnet"
ENV (Hash-Steuerung)
--------------------
MINDNET_HASH_COMPARE : Body | Frontmatter | Full (default: Body)
MINDNET_HASH_SOURCE : parsed | raw (default: parsed)
MINDNET_HASH_NORMALIZE: canonical | none (default: canonical)
MINDNET_HASH_COMPARE : Body | Frontmatter | Full (default: Body)
MINDNET_HASH_SOURCE : parsed | raw (default: parsed)
MINDNET_HASH_NORMALIZE : canonical | none (default: canonical)
Weitere ENV
-----------
MINDNET_NOTE_VECTOR_D : Dimension des Note-Vektors (default: aus QdrantConfig oder 384)
"""
from __future__ import annotations
import argparse
import inspect
import json
import os
import sys
from pathlib import Path
from typing import Dict, List, Optional, Tuple
# Core imports (bestehend)
# Parser & Payloads
from app.core.parser import read_markdown
from app.core.note_payload import make_note_payload
from app.core.chunk_payload import make_chunk_payloads
from app.core.derive_edges import build_edges_for_note
# Qdrant Helper
from app.core.qdrant import (
QdrantConfig,
get_client,
ensure_collections,
count_points,
list_note_ids,
fetch_one_note,
)
from app.core.qdrant_points import (
upsert_notes,
upsert_chunks,
@ -69,8 +63,15 @@ from app.core.qdrant_points import (
delete_by_note,
)
# Optionales Chunk-Assembly (neuere Pipeline)
try:
from app.core.chunker import assemble_chunks # bevorzugt
except Exception: # pragma: no cover
assemble_chunks = None
# ----------------------------
# Hilfsfunktionen
# Utilities
# ----------------------------
def _env(key: str, default: str = "") -> str:
@ -79,36 +80,30 @@ def _env(key: str, default: str = "") -> str:
def _hash_mode() -> str:
m = _env("MINDNET_HASH_COMPARE", "Body").lower()
if m not in ("body", "frontmatter", "full"):
m = "body"
return m
return m if m in ("body", "frontmatter", "full") else "body"
def _hash_source() -> str:
s = _env("MINDNET_HASH_SOURCE", "parsed").lower()
if s not in ("parsed", "raw"):
s = "parsed"
return s
return s if s in ("parsed", "raw") else "parsed"
def _hash_normalize() -> str:
n = _env("MINDNET_HASH_NORMALIZE", "canonical").lower()
if n not in ("canonical", "none"):
n = "canonical"
return n
return n if n in ("canonical", "none") else "canonical"
def _safe_text(parsed) -> str:
"""
Liefert bevorzugt parsed.body_full, sonst parsed.body, sonst "".
Kompatibilitätshelfer für Parser ohne 'body_full'.
"""
"""Bevorzugt parsed.body_full, sonst parsed.body."""
return getattr(parsed, "body_full", None) or getattr(parsed, "body", "") or ""
def _load_prefix(arg_prefix: Optional[str]) -> str:
# Reihenfolge: CLI --prefix > COLLECTION_PREFIX > MINDNET_PREFIX > "mindnet"
if arg_prefix and arg_prefix.strip():
return arg_prefix.strip()
env_prefix = os.environ.get("COLLECTION_PREFIX") or os.environ.get("MINDNET_PREFIX")
return (env_prefix or "mindnet").strip()
def _print(obj):
sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n")
sys.stdout.flush()
def _iter_md(vault: Path) -> List[Path]:
out: List[Path] = []
for p in sorted(vault.rglob("*.md")):
@ -116,37 +111,130 @@ def _iter_md(vault: Path) -> List[Path]:
out.append(p)
return out
def _print(obj):
sys.stdout.write(json.dumps(obj, ensure_ascii=False) + "\n")
sys.stdout.flush()
def _note_vector_dim(cfg: QdrantConfig) -> int:
# Bevorzugt Konfig, sonst ENV, sonst 384
# Viele Setups nutzen 384 (MiniLM 384d)
dim = getattr(cfg, "notes_vector_dim", None)
if isinstance(dim, int) and dim > 0:
return dim
env_dim = _env("MINDNET_NOTE_VECTOR_D", "")
if env_dim.isdigit():
try:
d = int(env_dim)
if d > 0:
return d
except Exception:
pass
return 384
def _ensure_note_vector(note_pl: Dict, cfg: QdrantConfig) -> None:
# Falls die Notes-Collection einen dichten Vektor verlangt, muss `vector` gesetzt sein.
# Wir setzen einen Zero-Vector mit korrekter Dimension.
if "vector" not in note_pl or note_pl["vector"] is None:
d = _note_vector_dim(cfg)
note_pl["vector"] = [0.0] * d
# ----------------------------
# Hauptlogik
# Signatur-kompatible Aufrufe
# ----------------------------
def _call_make_chunk_payloads(parsed, note_pl, raw_chunks: Optional[List[Dict]] = None) -> List[Dict]:
"""
Ruft make_chunk_payloads mit der passenden Signatur auf.
Historisch gab es Varianten:
A) make_chunk_payloads(parsed, note_pl, chunks)
B) make_chunk_payloads(parsed, note_pl)
C) make_chunk_payloads(note_pl, chunks)
Wir erkennen das zur Laufzeit.
"""
sig = inspect.signature(make_chunk_payloads)
params = list(sig.parameters.keys())
# Versuche die plausibelste moderne Variante zuerst
try_order = []
if params[:3] == ["parsed", "note_pl", "chunks"]:
try_order = [("parsed_note_chunks",)]
elif params[:2] == ["parsed", "note_pl"]:
try_order = [("parsed_note",)]
elif params[:2] == ["note_pl", "chunks"]:
try_order = [("note_chunks",)]
else:
# generischer Fallback: wir probieren die drei Muster
try_order = [("parsed_note_chunks",), ("parsed_note",), ("note_chunks",)]
last_err = None
for variant in try_order:
try:
if variant == ("parsed_note_chunks",):
if raw_chunks is None:
# wenn Signatur die Chunks erwartet, aber keine vorhanden sind, baue konservativ 1 Chunk
raw_chunks = [{
"chunk_id": f"{note_pl.get('note_id', 'note')}#1",
"text": _safe_text(parsed),
"window": _safe_text(parsed),
"order": 1,
"path": note_pl.get("path", ""),
}]
return make_chunk_payloads(parsed, note_pl, raw_chunks) # type: ignore
elif variant == ("parsed_note",):
return make_chunk_payloads(parsed, note_pl) # type: ignore
elif variant == ("note_chunks",):
if raw_chunks is None:
raw_chunks = [{
"chunk_id": f"{note_pl.get('note_id', 'note')}#1",
"text": _safe_text(parsed),
"window": _safe_text(parsed),
"order": 1,
"path": note_pl.get("path", ""),
}]
return make_chunk_payloads(note_pl, raw_chunks) # type: ignore
except Exception as e:
last_err = e
raise RuntimeError(f"make_chunk_payloads invocation failed: {last_err}")
def _call_build_edges_for_note(parsed, chunk_payloads: List[Dict], note_scope_refs: bool) -> List[Dict]:
"""
Ruft build_edges_for_note mit kompatibler Signatur auf.
Historisch:
A) build_edges_for_note(parsed, chunks)
B) build_edges_for_note(parsed, chunks, note_scope_refs=True/False)
"""
sig = inspect.signature(build_edges_for_note)
params = list(sig.parameters.keys())
try:
if "note_scope_refs" in params:
return build_edges_for_note(parsed, chunk_payloads, note_scope_refs=note_scope_refs) # type: ignore
else:
return build_edges_for_note(parsed, chunk_payloads) # type: ignore
except TypeError:
# strenger Fallback: ohne Zusatzparameter
return build_edges_for_note(parsed, chunk_payloads) # type: ignore
# ----------------------------
# Hauptverarbeitung
# ----------------------------
def process_file(
path: Path,
cfg: QdrantConfig,
note_scope_refs: bool = False,
apply: bool = False,
purge_before_upsert: bool = False,
note_scope_refs: bool,
apply: bool,
purge_before_upsert: bool,
) -> Tuple[Optional[dict], List[dict], List[dict]]:
"""
Liest eine Datei, erzeugt Note-/Chunk-/Edge-Payloads.
Gibt (note_payload, chunk_payloads, edge_payloads) zurück.
"""
try:
parsed = read_markdown(str(path))
except Exception as e:
_print({"path": str(path), "error": f"read_markdown failed: {e.__class__.__name__}: {e}"})
return None, [], []
# Note
# Note-Payload
try:
note_pl = make_note_payload(parsed, vault_root=str(path.parent.parent)) # vault_root nur für Pfadfelder
note_pl = make_note_payload(parsed, vault_root=str(path.parent.parent))
if not isinstance(note_pl, dict):
# Falls ältere make_note_payload-Version etwas anderes liefert
# oder None zurückgibt -> Fallback
note_pl = {
"note_id": parsed.frontmatter.get("id") or path.stem,
"title": parsed.frontmatter.get("title") or path.stem,
@ -154,31 +242,44 @@ def process_file(
"path": str(path).replace("\\", "/"),
"tags": parsed.frontmatter.get("tags", []),
}
# robustes Fulltext-Feld
note_pl["fulltext"] = _safe_text(parsed)
# Hash-Metadaten anfügen (ohne Änderung der bestehenden Logik deiner DB)
note_pl["hash_signature"] = f"{_hash_mode()}:{_hash_source()}:{_hash_normalize()}"
# Notes-Vector sicherstellen (Zero-Vector, wenn Collection ihn verlangt)
_ensure_note_vector(note_pl, cfg)
except Exception as e:
_print({"path": str(path), "error": f"make_note_payload failed: {e}"})
return None, [], []
# Chunks
# Roh-Chunks (falls assemble_chunks verfügbar)
raw_chunks: Optional[List[Dict]] = None
if assemble_chunks is not None:
try:
raw_chunks = assemble_chunks(
note_pl.get("note_id", path.stem),
_safe_text(parsed),
parsed.frontmatter.get("type", "concept"),
)
except Exception as e:
_print({"path": str(path), "note_id": note_pl.get("note_id"), "warn": f"assemble_chunks failed: {e}"})
raw_chunks = None
# Chunk-Payloads
try:
chunks = make_chunk_payloads(parsed, note_pl)
if not isinstance(chunks, list):
chunks = []
chunk_payloads = _call_make_chunk_payloads(parsed, note_pl, raw_chunks)
if not isinstance(chunk_payloads, list):
chunk_payloads = []
except Exception as e:
_print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"make_chunk_payloads failed: {e}"})
chunks = []
chunk_payloads = []
# Edges
try:
edges = build_edges_for_note(parsed, chunks, note_scope_refs=note_scope_refs)
edges = _call_build_edges_for_note(parsed, chunk_payloads, note_scope_refs=note_scope_refs)
except Exception as e:
_print({"path": str(path), "note_id": note_pl.get("note_id"), "error": f"build_edges_for_note failed: {e}"})
edges = []
return note_pl, chunks, edges
return note_pl, chunk_payloads, edges
def main():
@ -186,7 +287,7 @@ def main():
ap.add_argument("--vault", required=True, help="Pfad zum Vault-Verzeichnis (Wurzel).")
ap.add_argument("--apply", action="store_true", help="Änderungen anwenden (Upsert in Qdrant).")
ap.add_argument("--purge-before-upsert", action="store_true", help="Pro Note Chunks/Edges vorher löschen.")
ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope Referenzen ableiten.")
ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope Referenzen (falls unterstützt).")
ap.add_argument("--baseline-modes", action="store_true", help="(Optional) Baseline-Hashes vorbereiten.")
ap.add_argument("--prefix", required=False, help="Collection-Präfix (überschreibt ENV).")
args = ap.parse_args()
@ -195,7 +296,6 @@ def main():
if not vault.exists():
ap.error(f"Vault nicht gefunden: {vault}")
# Prefix bestimmen & Config laden
prefix = _load_prefix(args.prefix)
cfg = QdrantConfig.from_env(prefix=prefix)
client = get_client(cfg)
@ -206,29 +306,27 @@ def main():
_print({"summary": "done", "processed": 0, "prefix": cfg.prefix})
return
# Optional Baseline-Aktion (nur Meta-Info / kein Abbruch wenn nicht genutzt)
if args.baseline_modes:
_print({"action": "baseline", "modes": ["body", "frontmatter", "full"], "source": _hash_source(), "norm": _hash_normalize()})
processed = 0
for idx, p in enumerate(files):
note_pl, chunks, edges = process_file(
for p in files:
note_pl, chunk_payloads, edges = process_file(
p,
cfg,
note_scope_refs=args.note_scope_refs,
apply=args.apply,
purge_before_upsert=args.purge_before_upsert,
)
if not note_pl:
continue
info = {
"note_id": note_pl.get("note_id"),
"title": note_pl.get("title"),
"chunks": len(chunks),
"chunks": len(chunk_payloads),
"edges": len(edges),
"changed": True, # Die konkrete Hash-/Sig-Prüfung erfolgt in deinen Payload-Funktionen
"changed": True, # Detail-Hashing passiert innerhalb der Payload-Builder
"decision": "apply" if args.apply else "dry-run",
"path": str(p.relative_to(vault)).replace("\\", "/"),
"hash_mode": _hash_mode(),
@ -238,23 +336,20 @@ def main():
}
if args.apply:
# Optional: pro Note vorher Chunks/Edges löschen (saubere Aktualisierung)
if args.purge_before_upsert:
try:
delete_by_note(client, cfg, note_pl.get("note_id", ""))
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "warn": f"delete_by_note failed: {e}"})
# Upserts
try:
upsert_notes(client, cfg, [note_pl])
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "error": f"upsert_notes failed: {e}"})
continue
if chunks:
if chunk_payloads:
try:
upsert_chunks(client, cfg, chunks)
upsert_chunks(client, cfg, chunk_payloads)
except Exception as e:
_print({"note_id": note_pl.get("note_id"), "error": f"upsert_chunks failed: {e}"})
@ -267,14 +362,12 @@ def main():
_print(info)
processed += 1
# Abschlussstatus
counts = count_points(client, cfg)
_print({
"summary": "done",
"processed": processed,
"prefix": cfg.prefix,
"collections": {"notes": cfg.notes, "chunks": cfg.chunks, "edges": cfg.edges},
"counts": counts,
"counts": count_points(client, cfg),
})