mindnet/scripts/import_markdown.py
Lars 8df2069c6f
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s
scripts/import_markdown.py aktualisiert
2025-09-05 12:03:33 +02:00

262 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: scripts/import_markdown.py
Version: v2.4.1 (2025-09-05)
Beschreibung
Importiert Markdown-Notizen in Qdrant (Notes, Chunks, Edges) mit Delta-Detection.
- Chunking + Embedding (MiniLM 384d, externer Embed-Server)
- Edges direkt beim Import aus Wikilinks ([[…]]) ableiten (inkl. references_at)
- Idempotenz via stabile UUIDv5-IDs und hash_fulltext
- Create/Update/Skip pro Note:
* Unverändert (hash_fulltext gleich) ⇒ Skip
* Geändert ⇒ Chunks & Edges der Note purge + Replace (Upsert)
- Dry-Run löscht/ändert nichts; zeigt die Entscheidung je Note
Aufruf
python3 -m scripts.import_markdown --vault ./vault [--apply] [--note-id ID]
[--embed-note] [--force-replace]
Parameter
--vault Pfad zum Obsidian-Vault (erforderlich)
--apply Ohne Flag: Dry-Run (nur JSON-Zeilen). Mit Flag: schreibt in Qdrant.
--note-id Nur eine spezifische Note-ID verarbeiten (Filter)
--embed-note Optional: Note-Volltext zusätzlich einbetten
--force-replace Erzwingt Neuaufbau von Chunks/Edges der Note (auch wenn Hash unverändert)
Hinweise
- Im venv arbeiten: `source .venv/bin/activate`
- Embed-Server muss laufen (http://127.0.0.1:8990)
- Qdrant via ENV: QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM
Changelog
v2.4.1: FIX Kompatibilität zu verschiedenen qdrant-client Versionen:
`scroll()`-Rückgabe kann 2- oder 3-teilig sein → robustes Unpacking.
v2.4.0: NEU Delta-Detection über hash_fulltext; Skip/Replace-Entscheidung.
Purge bei Updates: löscht Chunks & Edges der Quelle, dann Upsert.
Dry-Run garantiert ohne Mutationen.
v2.3.1: FIX Für derive_wikilink_edges werden echte Chunk-Texte übergeben
({"chunk_id","text"}) → erzeugt `references_at`.
v2.3.0: Umstellung auf app.core.derive_edges; Edge-IDs inkl. Occurrence.
"""
from __future__ import annotations
import argparse, os, glob, json, sys, hashlib
from typing import Optional, Tuple, List
from dotenv import load_dotenv
from qdrant_client.http import models as rest
from qdrant_client import QdrantClient
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter
from app.core.note_payload import make_note_payload
from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
from app.core.embed import embed_texts, embed_one
from app.core.qdrant import QdrantConfig, ensure_collections, get_client
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from app.core.derive_edges import build_note_index, derive_wikilink_edges
# -------------------------------
# Utility
# -------------------------------
def iter_md(root: str, exclude_dirs=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]:
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
out = []
for p in files:
pn = p.replace("\\","/")
if any(ex in pn for ex in exclude_dirs):
continue
out.append(p)
return out
def sha256_hex(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def collection_names(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def _scroll(client: QdrantClient, **kwargs):
"""
Wrapper für client.scroll, der 2-teilige und 3-teilige Rückgaben unterstützt.
Neuere qdrant-client Versionen liefern (points, next_page), ältere evtl. (points, offset, next_page).
"""
res = client.scroll(**kwargs)
if isinstance(res, tuple):
if len(res) == 2:
points, _ = res
return points
elif len(res) == 3:
points, _, _ = res
return points
# Fallback: wenn sich API ändert, versuchen wir, wie eine Sequenz zuzugreifen
try:
return res[0]
except Exception:
return []
def fetch_existing_note_hash(client: QdrantClient, prefix: str, note_id: str) -> Optional[str]:
"""Liest hash_fulltext der Note aus Qdrant (falls vorhanden)."""
notes_col, _, _ = collection_names(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts = _scroll(client, collection_name=notes_col, scroll_filter=f, with_payload=True, with_vectors=False, limit=1)
if not pts:
return None
pl = (pts[0].payload or {})
return pl.get("hash_fulltext") or None # kann bei Altbeständen fehlen
def purge_note_edges(client: QdrantClient, prefix: str, source_note_id: str) -> None:
"""
Löscht Edges der Quelle:
- alle mit source_id == note
- alle Backlinks, die auf die Quelle zeigen (kind=backlink & target_id=note)
"""
_, _, edges_col = collection_names(prefix)
cond_source = rest.FieldCondition(key="source_id", match=rest.MatchValue(value=source_note_id))
cond_kind = rest.FieldCondition(key="kind", match=rest.MatchValue(value="backlink"))
cond_target = rest.FieldCondition(key="target_id", match=rest.MatchValue(value=source_note_id))
filt = rest.Filter(should=[cond_source, rest.Filter(must=[cond_kind, cond_target])])
client.delete(collection_name=edges_col, points_selector=filt, wait=True)
def purge_note_chunks(client: QdrantClient, prefix: str, note_id: str) -> None:
"""Löscht alle Chunks einer Note (payload.note_id == note_id)."""
_, chunks_col, _ = collection_names(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
client.delete(collection_name=chunks_col, points_selector=f, wait=True)
# -------------------------------
# Hauptlogik
# -------------------------------
def main():
load_dotenv()
ap = argparse.ArgumentParser()
ap.add_argument("--vault", required=True, help="Pfad zum Obsidian Vault (z.B. ./vault)")
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)")
ap.add_argument("--note-id", help="Nur eine Note-ID verarbeiten")
ap.add_argument("--embed-note", action="store_true", help="Auch Note-Volltext einbetten (optional)")
ap.add_argument("--force-replace", action="store_true", help="Erzwingt Purge+Replace der Note (auch wenn Hash gleich)")
args = ap.parse_args()
# Qdrant
cfg = QdrantConfig(
url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"),
api_key=os.getenv("QDRANT_API_KEY") or None,
prefix=os.getenv("COLLECTION_PREFIX", "mindnet"),
dim=int(os.getenv("VECTOR_DIM","384")),
)
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim)
root = os.path.abspath(args.vault)
files = iter_md(root)
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2)
# --- Vorab: Note-Index für Linkauflösung (by id/slug/path) ---
note_stubs = []
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
try:
validate_required_frontmatter(fm)
except Exception:
continue
if args.note_id and fm.get("id") != args.note_id:
continue
rel = os.path.relpath(parsed.path, root).replace("\\","/")
note_stubs.append({"note_id": fm["id"], "title": fm.get("title",""), "path": rel})
note_index = build_note_index(note_stubs)
total_notes = 0
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
try:
validate_required_frontmatter(fm) # Pflichtfelder lt. Schema/Design
except Exception:
continue
if args.note_id and fm.get("id") != args.note_id:
continue
total_notes += 1
note_id = fm["id"]
# --- Delta-Detection ---
fulltext = parsed.body
new_hash = sha256_hex(fulltext)
old_hash = fetch_existing_note_hash(client, cfg.prefix, note_id)
changed = (old_hash != new_hash) or (old_hash is None) or args.force_replace
# Note-Payload
note_pl = make_note_payload(parsed, vault_root=root)
note_pl["fulltext"] = fulltext # für derive_edges (references)
note_pl["hash_fulltext"] = new_hash # Schema-Feld vorhanden
validate_note_payload(note_pl)
# Früher Exit (Dry-Run/Skip)
if not changed:
print(json.dumps({
"note_id": note_id,
"title": fm["title"],
"changed": False,
"decision": "skip",
"path": note_pl["path"]
}, ensure_ascii=False))
continue
# Chunks (inkl. Texte für references_at)
chunks = assemble_chunks(note_id, fulltext, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
chunks_for_links = [
{"chunk_id": (pl.get("chunk_id") or pl.get("id") or f"{note_id}#{i+1}"),
"text": chunks[i].text}
for i, pl in enumerate(chunk_pls)
if i < len(chunks)
]
# Embeddings (Chunks)
texts = [ch.text for ch in chunks]
vectors = embed_texts(texts)
# Optional: Note-Vektor
note_vec = embed_one(fulltext) if args.embed_note else None
# Kanten (Note- & Chunk-Ebene)
edges = derive_wikilink_edges(note_pl, chunks_for_links, note_index)
# Dry-Run-Ausgabe
print(json.dumps({
"note_id": note_id,
"title": fm["title"],
"chunks": len(chunk_pls),
"edges": len(edges),
"changed": True,
"decision": "replace" if args.apply else "dry-run",
"path": note_pl["path"]
}, ensure_ascii=False))
if args.apply:
# Purge alte Daten der Note (Chunks + Edges), dann Upsert
purge_note_chunks(client, cfg.prefix, note_id)
purge_note_edges(client, cfg.prefix, note_id)
# Notes upsert
notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
upsert_batch(client, notes_col, note_pts)
# Chunks upsert
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
upsert_batch(client, chunks_col, chunk_pts)
# Edges upsert
edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_col, edge_pts)
print(f"Done. Processed notes: {total_notes}")
if __name__ == "__main__":
main()