scripts/import_markdown.py aktualisiert
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s

This commit is contained in:
Lars 2025-09-06 14:04:46 +02:00
parent df33293621
commit 47e6d56b21

View File

@ -1,261 +1,277 @@
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Script: scripts/import_markdown.py
Version: v2.4.1 (2025-09-05)
Script: scripts/import_markdown.py
Version: 0.6.0 (2025-09-06)
Autor: mindnet / Architektur Datenimporte & Sync
Beschreibung
Importiert Markdown-Notizen in Qdrant (Notes, Chunks, Edges) mit Delta-Detection.
- Chunking + Embedding (MiniLM 384d, externer Embed-Server)
- Edges direkt beim Import aus Wikilinks ([[]]) ableiten (inkl. references_at)
- Idempotenz via stabile UUIDv5-IDs und hash_fulltext
- Create/Update/Skip pro Note:
* Unverändert (hash_fulltext gleich) Skip
* Geändert Chunks & Edges der Note purge + Replace (Upsert)
- Dry-Run löscht/ändert nichts; zeigt die Entscheidung je Note
Kurzbeschreibung
---------------
Importiert Markdown-Notizen aus einem Obsidian-ähnlichen Vault in Qdrant:
- Validiert Frontmatter / Note-Payload.
- Chunking + Embeddings.
- Leitet Edges direkt beim Import aus [[Wikilinks]] ab:
- 'references' (NoteNote)
- 'references_at' (ChunkNote)
- 'backlink' (NoteNote) nur für NoteNote-Kanten.
Aufruf
python3 -m scripts.import_markdown --vault ./vault [--apply] [--note-id ID]
[--embed-note] [--force-replace]
Neu in 0.6.0
------------
- Option `--purge-before-upsert`: löscht für die jeweils verarbeitete Note
*vor* dem Upsert alle zugehörigen Chunks und Edges in Qdrant (selektiv!),
um Leichen nach Re-Chunking zu vermeiden.
- Robuste Link-Auflösung via Note-Index (ID / Titel-Slug / Datei-Slug)
konsistent zu `derive_edges.py`.
Aufrufbeispiele
---------------
Dry-Run (keine Schreibzugriffe):
python3 -m scripts.import_markdown --vault ./vault
Nur eine bestimmte Note:
python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo
Apply (schreiben) mit Purge:
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
Parameter
--vault Pfad zum Obsidian-Vault (erforderlich)
--apply Ohne Flag: Dry-Run (nur JSON-Zeilen). Mit Flag: schreibt in Qdrant.
--note-id Nur eine spezifische Note-ID verarbeiten (Filter)
--embed-note Optional: Note-Volltext zusätzlich einbetten
--force-replace Erzwingt Neuaufbau von Chunks/Edges der Note (auch wenn Hash unverändert)
---------
--vault PATH : Pflicht. Root-Verzeichnis des Vaults.
--apply : Wenn gesetzt, werden Upserts durchgeführt (sonst Dry-Run).
--purge-before-upsert : Wenn gesetzt, werden vor dem Upsert (nur bei --apply)
alte Chunks und Edges dieser Note in Qdrant gelöscht.
--note-id ID : Optional, verarbeitet nur diese eine Note.
Hinweise
- Im venv arbeiten: `source .venv/bin/activate`
- Embed-Server muss laufen (http://127.0.0.1:8990)
- Qdrant via ENV: QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM
Umgebungsvariablen (.env)
-------------------------
QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM
Standard: url=http://127.0.0.1:6333, prefix=mindnet, dim=384
Changelog
v2.4.1: FIX Kompatibilität zu verschiedenen qdrant-client Versionen:
`scroll()`-Rückgabe kann 2- oder 3-teilig sein robustes Unpacking.
v2.4.0: NEU Delta-Detection über hash_fulltext; Skip/Replace-Entscheidung.
Purge bei Updates: löscht Chunks & Edges der Quelle, dann Upsert.
Dry-Run garantiert ohne Mutationen.
v2.3.1: FIX Für derive_wikilink_edges werden echte Chunk-Texte übergeben
({"chunk_id","text"}) erzeugt `references_at`.
v2.3.0: Umstellung auf app.core.derive_edges; Edge-IDs inkl. Occurrence.
Kompatibilität
--------------
- Nutzt die bestehenden Kernmodule:
app.core.parser (read_markdown, normalize_frontmatter, validate_required_frontmatter)
app.core.validate_note (validate_note_payload)
app.core.chunker (assemble_chunks)
app.core.chunk_payload (make_chunk_payloads)
app.core.embed (embed_texts)
app.core.qdrant (QdrantConfig, get_client, ensure_collections)
app.core.qdrant_points (points_for_note, points_for_chunks, points_for_edges, upsert_batch)
app.core.derive_edges (build_note_index, derive_wikilink_edges)
Änderungshinweise vs. früherer Importer
---------------------------------------
- Alte, globale Lösch-Workarounds entfallen. Selektives Purge ist jetzt optional und sicher.
- Edges werden nur noch in der neuen, einheitlichen Struktur erzeugt.
"""
from __future__ import annotations
import argparse, os, glob, json, sys, hashlib
from typing import Optional, Tuple, List
import argparse
import glob
import json
import os
import sys
from typing import List, Dict
from dotenv import load_dotenv
from qdrant_client.http import models as rest
from qdrant_client import QdrantClient
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter
from app.core.note_payload import make_note_payload
# Kern-Bausteine (vorhanden in eurem Projekt)
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
)
from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
from app.core.embed import embed_texts, embed_one
from app.core.qdrant import QdrantConfig, ensure_collections, get_client
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from app.core.embed import embed_texts
from app.core.qdrant import QdrantConfig, ensure_collections, get_client, collection_names
from app.core.qdrant_points import (
points_for_note,
points_for_chunks,
points_for_edges,
upsert_batch,
)
from app.core.derive_edges import build_note_index, derive_wikilink_edges
# -------------------------------
# Utility
# -------------------------------
def iter_md(root: str, exclude_dirs=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]:
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
out = []
# -------------------------------------------------
# Hilfsfunktionen
# -------------------------------------------------
def iter_md(root: str) -> List[str]:
patterns = ["**/*.md", "*.md"]
out: List[str] = []
for p in patterns:
out.extend(glob.glob(os.path.join(root, p), recursive=True))
return sorted(list(dict.fromkeys(out))) # de-dupe + sort
def make_note_stub(abs_path: str, vault_root: str) -> Dict:
"""
Erstellt einen minimalen Note-Stub für den Index (build_note_index):
{ note_id, title, path }
"""
parsed = read_markdown(abs_path)
fm = normalize_frontmatter(parsed.frontmatter or {})
# Validierung minimal: wir brauchen id + title (title optional für Slug-Auflösung)
if "id" not in fm or not fm["id"]:
raise ValueError(f"Missing id in frontmatter: {abs_path}")
rel = os.path.relpath(abs_path, vault_root)
return {"note_id": fm["id"], "title": fm.get("title"), "path": rel}
def build_vault_index(vault_root: str) -> tuple[Dict, Dict, Dict]:
"""
Liest alle Noten ein und baut den Dreifach-Index für Wikilink-Auflösung.
"""
files = iter_md(vault_root)
stubs = []
for p in files:
pn = p.replace("\\","/")
if any(ex in pn for ex in exclude_dirs):
try:
stubs.append(make_note_stub(p, vault_root))
except Exception:
# Notiz ohne id → wird vom Importer später ohnehin übersprungen
continue
out.append(p)
return out
return build_note_index(stubs)
def sha256_hex(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def collection_names(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def _scroll(client: QdrantClient, **kwargs):
def purge_for_note(client, prefix: str, note_id: str, chunk_ids: List[str]) -> Dict[str, int]:
"""
Wrapper für client.scroll, der 2-teilige und 3-teilige Rückgaben unterstützt.
Neuere qdrant-client Versionen liefern (points, next_page), ältere evtl. (points, offset, next_page).
Selektives Purge für die aktuelle Note:
- Chunks: alle mit payload.note_id == note_id
- Edges: alle mit payload.source_id == note_id ODER == einem der chunk_ids
- Notes: werden nicht gelöscht (Upsert überschreibt Payload/Vektor)
"""
res = client.scroll(**kwargs)
if isinstance(res, tuple):
if len(res) == 2:
points, _ = res
return points
elif len(res) == 3:
points, _, _ = res
return points
# Fallback: wenn sich API ändert, versuchen wir, wie eine Sequenz zuzugreifen
try:
return res[0]
except Exception:
return []
notes_col, chunks_col, edges_col = collection_names(prefix)
counts = {"chunks_deleted": 0, "edges_deleted": 0}
def fetch_existing_note_hash(client: QdrantClient, prefix: str, note_id: str) -> Optional[str]:
"""Liest hash_fulltext der Note aus Qdrant (falls vorhanden)."""
notes_col, _, _ = collection_names(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts = _scroll(client, collection_name=notes_col, scroll_filter=f, with_payload=True, with_vectors=False, limit=1)
if not pts:
return None
pl = (pts[0].payload or {})
return pl.get("hash_fulltext") or None # kann bei Altbeständen fehlen
# Chunks löschen (Filter must: note_id == X)
f_chunks = rest.Filter(
must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]
)
res_chunks = client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True)
counts["chunks_deleted"] = getattr(res_chunks, "status", None) and 0 or 0 # Qdrant liefert keine count hier
def purge_note_edges(client: QdrantClient, prefix: str, source_note_id: str) -> None:
"""
Löscht Edges der Quelle:
- alle mit source_id == note
- alle Backlinks, die auf die Quelle zeigen (kind=backlink & target_id=note)
"""
_, _, edges_col = collection_names(prefix)
cond_source = rest.FieldCondition(key="source_id", match=rest.MatchValue(value=source_note_id))
cond_kind = rest.FieldCondition(key="kind", match=rest.MatchValue(value="backlink"))
cond_target = rest.FieldCondition(key="target_id", match=rest.MatchValue(value=source_note_id))
filt = rest.Filter(should=[cond_source, rest.Filter(must=[cond_kind, cond_target])])
client.delete(collection_name=edges_col, points_selector=filt, wait=True)
# Edges löschen: OR über Note-ID und alle Chunk-IDs
should_conds = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id))]
for cid in chunk_ids:
should_conds.append(rest.FieldCondition(key="source_id", match=rest.MatchValue(value=cid)))
def purge_note_chunks(client: QdrantClient, prefix: str, note_id: str) -> None:
"""Löscht alle Chunks einer Note (payload.note_id == note_id)."""
_, chunks_col, _ = collection_names(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
client.delete(collection_name=chunks_col, points_selector=f, wait=True)
f_edges = rest.Filter(should=should_conds) if should_conds else None
if f_edges is not None:
client.delete(collection_name=edges_col, points_selector=f_edges, wait=True)
# -------------------------------
# Hauptlogik
# -------------------------------
return counts
# -------------------------------------------------
# Main
# -------------------------------------------------
def main():
load_dotenv()
ap = argparse.ArgumentParser()
ap.add_argument("--vault", required=True, help="Pfad zum Obsidian Vault (z.B. ./vault)")
ap.add_argument("--vault", required=True, help="Pfad zum Vault-Root")
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)")
ap.add_argument("--note-id", help="Nur eine Note-ID verarbeiten")
ap.add_argument("--embed-note", action="store_true", help="Auch Note-Volltext einbetten (optional)")
ap.add_argument("--force-replace", action="store_true", help="Erzwingt Purge+Replace der Note (auch wenn Hash gleich)")
ap.add_argument(
"--purge-before-upsert",
action="store_true",
help="Vor Upsert alte Chunks/Edges der aktuellen Note löschen (nur mit --apply wirksam).",
)
ap.add_argument("--note-id", help="Optional: nur diese Note verarbeiten")
args = ap.parse_args()
# Qdrant
load_dotenv()
cfg = QdrantConfig(
url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"),
api_key=os.getenv("QDRANT_API_KEY") or None,
api_key=os.getenv("QDRANT_API_KEY", None),
prefix=os.getenv("COLLECTION_PREFIX", "mindnet"),
dim=int(os.getenv("VECTOR_DIM","384")),
dim=int(os.getenv("VECTOR_DIM", "384")),
)
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim)
root = os.path.abspath(args.vault)
files = iter_md(root)
vault_root = os.path.abspath(args.vault)
files = iter_md(vault_root)
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2)
print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2)
# --- Vorab: Note-Index für Linkauflösung (by id/slug/path) ---
note_stubs = []
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
# 1) Note-Index über den gesamten Vault (für robuste Link-Auflösung)
note_index = build_vault_index(vault_root)
processed = 0
for abs_path in files:
parsed = read_markdown(abs_path)
fm = normalize_frontmatter(parsed.frontmatter or {})
try:
validate_required_frontmatter(fm)
except Exception:
continue
if args.note_id and fm.get("id") != args.note_id:
continue
rel = os.path.relpath(parsed.path, root).replace("\\","/")
note_stubs.append({"note_id": fm["id"], "title": fm.get("title",""), "path": rel})
note_index = build_note_index(note_stubs)
total_notes = 0
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
try:
validate_required_frontmatter(fm) # Pflichtfelder lt. Schema/Design
except Exception:
# unvollständige Note überspringen
continue
if args.note_id and fm.get("id") != args.note_id:
continue
total_notes += 1
note_id = fm["id"]
processed += 1
# --- Delta-Detection ---
fulltext = parsed.body
new_hash = sha256_hex(fulltext)
old_hash = fetch_existing_note_hash(client, cfg.prefix, note_id)
changed = (old_hash != new_hash) or (old_hash is None) or args.force_replace
# Note-Payload
note_pl = make_note_payload(parsed, vault_root=root)
note_pl["fulltext"] = fulltext # für derive_edges (references)
note_pl["hash_fulltext"] = new_hash # Schema-Feld vorhanden
# --- Note-Payload ---
from app.core.note_payload import make_note_payload # lazy import (bestehende Funktion)
note_pl = make_note_payload(parsed, vault_root=vault_root)
validate_note_payload(note_pl)
# Früher Exit (Dry-Run/Skip)
if not changed:
print(json.dumps({
"note_id": note_id,
"title": fm["title"],
"changed": False,
"decision": "skip",
"path": note_pl["path"]
}, ensure_ascii=False))
continue
# Chunks (inkl. Texte für references_at)
chunks = assemble_chunks(note_id, fulltext, fm.get("type", "concept"))
# --- Chunking & Payloads ---
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
chunks_for_links = [
{"chunk_id": (pl.get("chunk_id") or pl.get("id") or f"{note_id}#{i+1}"),
"text": chunks[i].text}
for i, pl in enumerate(chunk_pls)
if i < len(chunks)
]
# Embeddings (Chunks)
texts = [ch.text for ch in chunks]
# --- Embeddings ---
texts = [c.get("text") or c.get("content") or "" for c in chunk_pls]
vectors = embed_texts(texts)
# Optional: Note-Vektor
note_vec = embed_one(fulltext) if args.embed_note else None
# --- Edge-Ableitung (direkt) ---
edges = derive_wikilink_edges(note_pl, chunk_pls, note_index)
# Kanten (Note- & Chunk-Ebene)
edges = derive_wikilink_edges(note_pl, chunks_for_links, note_index)
# --- Ausgabe je Note (Entscheidung) ---
decision = "apply" if args.apply else "dry-run"
# Dry-Run-Ausgabe
print(json.dumps({
"note_id": note_id,
"title": fm["title"],
"chunks": len(chunk_pls),
"edges": len(edges),
"changed": True,
"decision": "replace" if args.apply else "dry-run",
"path": note_pl["path"]
}, ensure_ascii=False))
# --- Purge vor Upsert (nur wenn --apply) ---
if args.apply and args.purge_before_upsert:
# Chunk-IDs (neu) ermitteln → für Edge-Purge by source_id
chunk_ids = []
for i, ch in enumerate(chunk_pls, start=1):
cid = ch.get("chunk_id") or ch.get("id") or f"{fm['id']}#{i}"
ch["chunk_id"] = cid # sicherstellen
chunk_ids.append(cid)
purge_for_note(client, cfg.prefix, fm["id"], chunk_ids)
# --- Upserts (nur bei --apply) ---
if args.apply:
# Purge alte Daten der Note (Chunks + Edges), dann Upsert
purge_note_chunks(client, cfg.prefix, note_id)
purge_note_edges(client, cfg.prefix, note_id)
# Notes upsert
notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
# Note
notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec=None, dim=cfg.dim)
upsert_batch(client, notes_col, note_pts)
# Chunks upsert
# Chunks
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
upsert_batch(client, chunks_col, chunk_pts)
# Edges upsert
# Edges
edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_col, edge_pts)
print(f"Done. Processed notes: {total_notes}")
# Logging pro Note
print(json.dumps({
"note_id": fm["id"],
"title": fm.get("title"),
"chunks": len(chunk_pls),
"edges": len(edges),
"changed": True, # Hash-/Zeitvergleich kann optional hier ergänzt werden
"decision": decision,
"path": note_pl["path"],
}, ensure_ascii=False))
print(f"Done. Processed notes: {processed}")
if __name__ == "__main__":
main()