mindnet/scripts/import_markdown.py
Lars f66cdc70b2
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 5s
scripts/import_markdown.py aktualisiert
2025-11-07 09:30:20 +01:00

414 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
=====================================================================
scripts/import_markdown.py — mindnet · WP-03 (Version 3.9.0)
=====================================================================
Zweck:
- Importiert Obsidian-Markdown-Dateien (Vault) in Qdrant:
* Notes (mit optionaler Schema-Validierung, Hash-Erkennung)
* Chunks (window & text, Overlap-Metadaten)
* Edges (belongs_to, prev/next, references, backlink optional, depends_on/assigned_to)
- Idempotenz über stabile IDs (note_id, chunk_id) & Hash-Signaturen (Option C).
- **Optional**: Embeddings für Note/Chunks via HTTP-Endpoint (/embed).
- **Optional**: JSON-Schema-Validierung gegen bereitgestellte Schemata.
- **Optional**: Note-Scope-References zusätzlich zu Chunk-Refs.
Highlights ggü. Minimal-Variante:
- Hash Option C (body/frontmatter/full × parsed/raw × normalize)
- Baseline-Modus (fehlende Signaturen initial schreiben)
- Purge vor Upsert (nur geänderte Note: alte Chunks/Edges löschen)
- UTF-8 fehlertoleranter Parser (Fallback Latin-1 → Re-encode)
- Type-Registry: dynamische Chunk-Profile (optional)
- Include/Exclude & Single-File-Import (--path) & Skip-Regeln
- embedding_exclude respektiert
- NDJSON-Logging & Abschlussstatistik
Aufrufe (Beispiele):
# Dry-Run (zeigt Entscheidungen)
python3 -m scripts.import_markdown --vault ./vault --prefix mindnet
# Apply + Purge für geänderte Notes
python3 -m scripts.import_markdown --vault ./vault --prefix mindnet --apply --purge-before-upsert
# Note-Scope-Refs zusätzlich anlegen
python3 -m scripts.import_markdown --vault ./vault --apply --note-scope-refs
# Embeddings aktivieren (Endpoint kann per ENV überschrieben werden)
python3 -m scripts.import_markdown --vault ./vault --apply --with-embeddings
# Schema-Validierung (verwende die *.schema.json-Dateien)
python3 -m scripts.import_markdown --vault ./vault --apply --validate-schemas \
--note-schema ./schemas/note.schema.json \
--chunk-schema ./schemas/chunk.schema.json \
--edge-schema ./schemas/edge.schema.json
# Nur eine Datei importieren
python3 -m scripts.import_markdown --path ./vault/40_concepts/concept-alpha.md --apply
# Version anzeigen
python3 -m scripts.import_markdown --version
ENV (Auszug):
COLLECTION_PREFIX Prefix der Qdrant-Collections (Default: mindnet)
QDRANT_URL / QDRANT_API_KEY Qdrant-Verbindung
# Hash-Steuerung
MINDNET_HASH_COMPARE body | frontmatter | full (Default: body)
MINDNET_HASH_SOURCE parsed | raw (Default: parsed)
MINDNET_HASH_NORMALIZE canonical | whitespace | none (Default: canonical)
# Embeddings (nur wenn --with-embeddings)
EMBED_URL z. B. http://127.0.0.1:8000/embed
EMBED_MODEL Freitext (nur Logging)
EMBED_BATCH Batchgröße (Default: 16)
Abwärtskompatibilität:
- Felder & Flows aus v3.7.x bleiben erhalten.
- Neue Features sind optional (default OFF).
- Bestehende IDs/Signaturen unverändert.
Lizenz: MIT (projektintern)
"""
__version__ = "3.9.0"
import os
import sys
import re
import json
import argparse
import pathlib
from typing import Any, Dict, List, Optional, Iterable, Tuple
# Core-Bausteine (bestehend)
from app.core.parser import read_markdown
from app.core.note_payload import make_note_payload
from app.core.chunk_payload import make_chunk_payloads
from app.core.derive_edges import build_edges_for_note
from app.core.qdrant import get_client, QdrantConfig
from app.core.qdrant_points import (
ensure_collections_for_prefix,
upsert_notes, upsert_chunks, upsert_edges,
delete_chunks_of_note, delete_edges_of_note,
fetch_note_hash_signature, store_note_hashes_signature,
)
from app.core.type_registry import load_type_registry # optional
# ---------------------------
# Hash-Option-C Steuerung
# ---------------------------
DEFAULT_COMPARE = os.environ.get("MINDNET_HASH_COMPARE", "body").lower()
DEFAULT_SOURCE = os.environ.get("MINDNET_HASH_SOURCE", "parsed").lower()
DEFAULT_NORM = os.environ.get("MINDNET_HASH_NORMALIZE", "canonical").lower()
VALID_COMPARE = {"body", "frontmatter", "full"}
VALID_SOURCE = {"parsed", "raw"}
VALID_NORM = {"canonical", "whitespace", "none"}
def _active_hash_key(compare: str, source: str, normalize: str) -> str:
c = compare if compare in VALID_COMPARE else "body"
s = source if source in VALID_SOURCE else "parsed"
n = normalize if normalize in VALID_NORM else "canonical"
return f"{c}:{s}:{n}"
# ---------------------------
# Schema-Validierung (optional)
# ---------------------------
def _load_json(path: Optional[str]) -> Optional[Dict[str, Any]]:
if not path:
return None
p = pathlib.Path(path)
if not p.exists():
return None
with p.open("r", encoding="utf-8") as f:
return json.load(f)
def _validate(obj: Dict[str, Any], schema: Optional[Dict[str, Any]], kind: str) -> List[str]:
"""Grobe Validierung ohne hard dependency auf jsonschema; prüft Basisfelder."""
if not schema:
return []
errs: List[str] = []
# sehr einfache Checks auf required:
req = schema.get("required", [])
for k in req:
if k not in obj:
errs.append(f"{kind}: missing required '{k}'")
# type=object etc. sparen wir uns bewusst (leichtgewichtig).
return errs
# ---------------------------
# Embedding (optional)
# ---------------------------
def _post_json(url: str, payload: Any, timeout: float = 60.0) -> Any:
"""Einfacher HTTP-Client ohne externe Abhängigkeiten."""
import urllib.request
import urllib.error
data = json.dumps(payload).encode("utf-8")
req = urllib.request.Request(url, data=data, headers={"Content-Type": "application/json"})
try:
with urllib.request.urlopen(req, timeout=timeout) as resp:
return json.loads(resp.read().decode("utf-8"))
except urllib.error.URLError as e:
raise RuntimeError(f"embed http error: {e}")
def _embed_texts(url: str, texts: List[str], batch: int = 16) -> List[List[float]]:
out: List[List[float]] = []
for i in range(0, len(texts), batch):
chunk = texts[i:i+batch]
resp = _post_json(url, {"inputs": chunk})
vectors = resp.get("embeddings") or resp.get("data") or resp # flexibel
if not isinstance(vectors, list):
raise RuntimeError("embed response malformed")
out.extend(vectors)
return out
# ---------------------------
# Skip-Regeln & Dateiauswahl
# ---------------------------
SILVERBULLET_BASENAMES = {"CONFIG.md", "index.md"} # werden explizit übersprungen
def _should_skip_md(path: str) -> bool:
base = os.path.basename(path).lower()
if base in {b.lower() for b in SILVERBULLET_BASENAMES}:
return True
return False
def _list_md_files(root: str, include: Optional[str] = None, exclude: Optional[str] = None) -> List[str]:
files: List[str] = []
inc_re = re.compile(include) if include else None
exc_re = re.compile(exclude) if exclude else None
for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if not fn.lower().endswith(".md"):
continue
full = os.path.join(dirpath, fn)
rel = os.path.relpath(full, root).replace("\\", "/")
if _should_skip_md(full):
continue
if inc_re and not inc_re.search(rel):
continue
if exc_re and exc_re.search(rel):
continue
files.append(full)
files.sort()
return files
# ---------------------------
# CLI
# ---------------------------
def _args() -> argparse.Namespace:
ap = argparse.ArgumentParser(description="Import Obsidian Markdown → Qdrant (Notes/Chunks/Edges).")
gsrc = ap.add_mutually_exclusive_group(required=True)
gsrc.add_argument("--vault", help="Root-Verzeichnis des Vaults")
gsrc.add_argument("--path", help="Nur eine einzelne Markdown-Datei importieren")
ap.add_argument("--prefix", help="Collection-Prefix (ENV: COLLECTION_PREFIX, Default: mindnet)")
ap.add_argument("--apply", action="store_true", help="Änderungen in Qdrant schreiben (sonst Dry-Run)")
ap.add_argument("--purge-before-upsert", action="store_true", help="Bei geänderter Note: alte Chunks/Edges löschen (nur diese Note)")
ap.add_argument("--note-scope-refs", action="store_true", help="Auch Note-Scope 'references' + 'backlink' erzeugen")
ap.add_argument("--baseline-modes", action="store_true", help="Fehlende Hash-Signaturen initial speichern")
# Filter
ap.add_argument("--include", help="Regex auf Relativpfad (nur passende Dateien)")
ap.add_argument("--exclude", help="Regex auf Relativpfad (diese Dateien überspringen)")
# Validierung
ap.add_argument("--validate-schemas", action="store_true", help="JSON-Schemata prüfen (leichtgewichtig)")
ap.add_argument("--note-schema", help="Pfad zu note.schema.json")
ap.add_argument("--chunk-schema", help="Pfad zu chunk.schema.json")
ap.add_argument("--edge-schema", help="Pfad zu edge.schema.json")
# Embeddings (optional)
ap.add_argument("--with-embeddings", action="store_true", help="Embeddings für Note & Chunks erzeugen")
ap.add_argument("--embed-url", help="Override EMBED_URL (Default aus ENV)")
ap.add_argument("--embed-batch", type=int, default=int(os.environ.get("EMBED_BATCH", "16")), help="Embedding-Batchgröße")
ap.add_argument("--version", action="store_true", help="Version anzeigen und beenden")
return ap.parse_args()
# ---------------------------
# Hauptlogik
# ---------------------------
def main() -> None:
args = _args()
if args.version:
print(f"import_markdown.py {__version__}")
sys.exit(0)
# Qdrant
prefix = args.prefix or os.environ.get("COLLECTION_PREFIX", "mindnet")
qc = QdrantConfig.from_env_or_default()
client = get_client(qc)
notes_col, chunks_col, edges_col = ensure_collections_for_prefix(client, prefix)
# Type-Registry (optional, fällt auf Default zurück)
type_reg = load_type_registry(silent=True)
# Hash-Modus aktiv
compare = DEFAULT_COMPARE
source = DEFAULT_SOURCE
norm = DEFAULT_NORM
active_key = _active_hash_key(compare, source, norm)
# Schemata (optional)
note_schema = _load_json(args.note_schema) if args.validate_schemas else None
chunk_schema = _load_json(args.chunk_schema) if args.validate_schemas else None
edge_schema = _load_json(args.edge_schema) if args.validate_schemas else None
# Embeddings (optional)
embed_enabled = bool(args.with_embeddings)
embed_url = args.embed_url or os.environ.get("EMBED_URL", "").strip()
if embed_enabled and not embed_url:
print(json.dumps({"warn": "with-embeddings active, but EMBED_URL not configured — embeddings skipped"}))
embed_enabled = False
# Dateiliste
files: List[str] = []
if args.path:
if not os.path.isfile(args.path):
print(json.dumps({"path": args.path, "error": "not a file"}))
sys.exit(1)
if _should_skip_md(args.path):
print(json.dumps({"path": args.path, "skipped": "by rule"}))
sys.exit(0)
files = [os.path.abspath(args.path)]
vault_root = os.path.dirname(os.path.abspath(args.path))
else:
if not os.path.isdir(args.vault):
print(json.dumps({"vault": args.vault, "error": "not a directory"}))
sys.exit(1)
vault_root = os.path.abspath(args.vault)
files = _list_md_files(vault_root, include=args.include, exclude=args.exclude)
processed = 0
stats = {"notes": 0, "chunks": 0, "edges": 0, "changed": 0, "skipped": 0, "embedded": 0}
for path in files:
rel_path = os.path.relpath(path, vault_root).replace("\\", "/")
parsed = read_markdown(path)
# Note-Payload (inkl. fulltext, hashes[...] etc.)
note_pl = make_note_payload(parsed, vault_root=vault_root)
if not isinstance(note_pl, dict):
print(json.dumps({
"path": path, "note_id": getattr(parsed, "id", "<unknown>"),
"error": "make_note_payload returned non-dict", "returned_type": type(note_pl).__name__
}))
stats["skipped"] += 1
continue
# Exclude via Frontmatter?
if str(note_pl.get("embedding_exclude", "false")).lower() in {"1", "true", "yes"}:
# wir importieren dennoch Note/Chunks/Edges, aber **ohne** Embeddings
embedding_allowed = False
else:
embedding_allowed = True
# Type-Profil
note_type = str(note_pl.get("type", "concept") or "concept")
profile = type_reg.get("types", {}).get(note_type, {}).get("chunk_profile", None)
# Chunks erzeugen
chunks = make_chunk_payloads(
note_id=note_pl["note_id"],
body=note_pl.get("fulltext", ""),
note_type=note_type,
profile=profile
)
# Edges
edges: List[Dict[str, Any]] = []
try:
edges = build_edges_for_note(note_payload=note_pl, chunks=chunks, add_note_scope_refs=args.note_scope_refs)
except Exception as e:
print(json.dumps({
"path": path, "note_id": note_pl["note_id"],
"error": f"build_edges_for_note failed: {getattr(e, 'args', [''])[0]}"
}))
edges = []
# Schema-Checks (weich)
if args.validate_schemas:
n_err = _validate(note_pl, note_schema, "note")
for c in chunks:
n_err += _validate(c, chunk_schema, "chunk")
for ed in edges:
n_err += _validate(ed, edge_schema, "edge")
if n_err:
print(json.dumps({"note_id": note_pl["note_id"], "schema_warnings": n_err}, ensure_ascii=False))
# Hash-Vergleich
prev_sig = fetch_note_hash_signature(client, notes_col, note_pl["note_id"], active_key)
curr_sig = note_pl.get("hashes", {}).get(active_key, "")
is_changed = (prev_sig != curr_sig)
# Baseline: fehlende aktive Signatur speichern
if args.baseline_modes and not prev_sig and curr_sig and args.apply:
store_note_hashes_signature(client, notes_col, note_pl["note_id"], active_key, curr_sig)
# Embeddings (optional; erst NACH Änderungserkennung, um unnötige Calls zu sparen)
if embed_enabled and embedding_allowed:
try:
texts = [note_pl.get("fulltext", "")]
note_vecs = _embed_texts(embed_url, texts, batch=max(1, int(args.embed_batch)))
note_pl["embedding"] = note_vecs[0] if note_vecs else None
# Chunk-Embeddings
chunk_texts = [c.get("window") or c.get("text") or "" for c in chunks]
if chunk_texts:
chunk_vecs = _embed_texts(embed_url, chunk_texts, batch=max(1, int(args.embed_batch)))
for c, v in zip(chunks, chunk_vecs):
c["embedding"] = v
stats["embedded"] += 1
except Exception as e:
print(json.dumps({"note_id": note_pl["note_id"], "warn": f"embedding failed: {e}"}))
# Apply/Upsert
decision = "dry-run"
if args.apply:
if is_changed and args.purge_before_upsert:
delete_chunks_of_note(client, chunks_col, note_pl["note_id"])
delete_edges_of_note(client, edges_col, note_pl["note_id"])
upsert_notes(client, notes_col, [note_pl])
if chunks:
upsert_chunks(client, chunks_col, chunks)
if edges:
upsert_edges(client, edges_col, edges)
if curr_sig:
store_note_hashes_signature(client, notes_col, note_pl["note_id"], active_key, curr_sig)
decision = ("apply" if is_changed else "apply-skip-unchanged")
else:
decision = "dry-run"
# Log
print(json.dumps({
"note_id": note_pl["note_id"],
"title": note_pl.get("title"),
"chunks": len(chunks),
"edges": len(edges),
"changed": bool(is_changed),
"decision": decision,
"path": rel_path,
"hash_mode": compare,
"hash_normalize": norm,
"hash_source": source,
"prefix": prefix
}, ensure_ascii=False))
stats["notes"] += 1
stats["chunks"] += len(chunks)
stats["edges"] += len(edges)
if is_changed:
stats["changed"] += 1
processed += 1
print(f"Done. Processed notes: {processed}")
print(json.dumps({"stats": stats}, ensure_ascii=False))
if __name__ == "__main__":
main()