mindnet/scripts/import_markdown.py
Lars 364502244a
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s
scripts/import_markdown.py aktualisiert
2025-09-05 07:41:28 +02:00

247 lines
9.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Name: scripts/import_markdown.py
Version: v2.2.0 (2025-09-05)
Kurzbeschreibung:
Importiert Obsidian-Markdown-Notes in Qdrant (Notes/Chunks/Edges).
- Leitet Wikilink-Edges (references/backlink/references_at) direkt aus Volltext + echten Chunk-Texten ab.
- Idempotenz: Ermittelt hash_fulltext; bei Änderung werden alte Chunks/Edges der Note entfernt (Replace-on-Change).
- Unveränderte Noten werden übersprungen (schnell).
Aufruf (aus Projekt-Root, im venv):
python3 -m scripts.import_markdown --vault ./vault [--apply] [--note-id NOTE_ID] [--embed-note] [--force-replace]
Parameter:
--vault Pfad zum Vault (z. B. ./vault)
--apply Führt Upserts in Qdrant aus (ohne Flag = Dry-Run mit JSON-Summaries)
--note-id Bearbeite nur eine konkrete Note-ID
--embed-note Optional: Note-Vektor (Volltext) zusätzlich einbetten
--force-replace Erzwingt Purge & Neuaufbau auch ohne Hash-Änderung (Debug)
Umgebungsvariablen (optional):
QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM (Default 384)
Exitcodes:
0 = OK, 2 = keine Markdown-Dateien gefunden
Hinweise:
- Wikilink-Ableitung basiert auf app.core.derive_edges (Slug-/ID-Auflösung, unresolved-Status).
- Für references_at werden echte Chunk-Texte übergeben (sonst würden sie fehlen).
- Purge verwendet Qdrant-Filter:
* Chunks: payload.note_id == NOTE_ID
* Edges : (source_id == NOTE_ID) OR (target_id == NOTE_ID) OR (source_id startswith NOTE_ID + "#")
- Notes/Chunks/Edges bleiben 1:1 kompatibel zu Validator & Backfill.
Changelog:
v2.2.0: Hash-basierte Replace-on-Change-Logik; Purge pro Note; Skip unverändert.
v2.1.1: Sicherstellung references_at durch Übergabe echter Chunk-Texte.
v2.1.0: Vorab-Note-Index über Vault; direkte Edge-Ableitung.
"""
from __future__ import annotations
import argparse, os, glob, json, sys, hashlib
from typing import List, Dict, Tuple
from dotenv import load_dotenv
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter
from app.core.note_payload import make_note_payload
from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
from app.core.embed import embed_texts, embed_one
from app.core.qdrant import QdrantConfig, ensure_collections, get_client
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
from app.core.derive_edges import build_note_index, derive_wikilink_edges # Wikilinks
# -------------------
# Utility / Helpers
# -------------------
def iter_md(root: str, exclude=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]:
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
out: List[str] = []
for p in files:
pn = p.replace("\\", "/")
if any(ex in pn for ex in exclude):
continue
out.append(p)
return out
def minimal_note_index_payload(abs_path: str, vault_root: str) -> Dict:
"""Nur Felder, die der Resolver braucht (id/title/path)."""
parsed = read_markdown(abs_path)
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
relpath = os.path.relpath(abs_path, vault_root).replace("\\", "/")
return {"note_id": fm.get("id") or fm.get("note_id"), "title": fm.get("title"), "path": relpath}
def compute_hash_fulltext(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def note_uuid5(note_id: str) -> str:
import uuid
return str(uuid.uuid5(uuid.NAMESPACE_URL, note_id))
def fetch_existing_note_payload(client: QdrantClient, notes_col: str, note_id: str) -> Dict | None:
"""Hole bestehende Note (per deterministischem UUIDv5) oder None."""
pid = note_uuid5(note_id)
try:
res = client.retrieve(collection_name=notes_col, ids=[pid], with_payload=True, with_vectors=False)
if not res:
return None
return res[0].payload or {}
except Exception:
return None
def purge_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> None:
"""Löscht alle Chunks & Edges einer Note (Replace-on-Change)."""
notes_col, chunks_col, edges_col = f"{cfg.prefix}_notes", f"{cfg.prefix}_chunks", f"{cfg.prefix}_edges"
# Chunks: payload.note_id == NOTE_ID
f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
if hasattr(client, "delete_points"):
client.delete_points(chunks_col, f_chunks, wait=True)
else:
client.delete(chunks_col, f_chunks, wait=True)
# Edges: (source_id == NOTE_ID) OR (target_id == NOTE_ID) OR (source_id startswith NOTE_ID + "#")
conds = [
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")),
]
f_edges = rest.Filter(should=conds, minimum_should=1)
if hasattr(client, "delete_points"):
client.delete_points(edges_col, f_edges, wait=True)
else:
client.delete(edges_col, f_edges, wait=True)
# -------------------
# Main
# -------------------
def main():
load_dotenv()
ap = argparse.ArgumentParser()
ap.add_argument("--vault", required=True, help="Obsidian Vault Pfad (z.B. mindnet/vault)")
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)")
ap.add_argument("--note-id", help="Nur eine Note-ID verarbeiten")
ap.add_argument("--embed-note", action="store_true", help="Auch Note-Volltext einbetten (optional)")
ap.add_argument("--force-replace", action="store_true", help="Purge & Neuaufbau erzwingen (Debug)")
args = ap.parse_args()
# Qdrant
cfg = QdrantConfig(
url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"),
api_key=os.getenv("QDRANT_API_KEY") or None,
prefix=os.getenv("COLLECTION_PREFIX", "mindnet"),
dim=int(os.getenv("VECTOR_DIM", "384")),
)
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim)
root = os.path.abspath(args.vault)
files = iter_md(root)
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2)
# 1) Vorab-Lauf: globaler Note-Index für robuste Auflösung
index_payloads: List[Dict] = []
for path in files:
try:
pl = minimal_note_index_payload(path, root)
if not pl.get("note_id"):
continue
if args.note_id and pl["note_id"] != args.note_id:
continue
index_payloads.append(pl)
except Exception:
continue
note_index = build_note_index(index_payloads) # by_id/by_slug/by_file_slug :contentReference[oaicite:3]{index=3}
notes_col = f"{cfg.prefix}_notes"
total_notes = 0
# 2) Hauptlauf pro Datei
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
try:
validate_required_frontmatter(fm)
except Exception:
continue
if args.note_id and fm.get("id") != args.note_id:
continue
total_notes += 1
# Note-Payload inkl. hash_fulltext
note_pl = make_note_payload(parsed, vault_root=root)
validate_note_payload(note_pl)
h = compute_hash_fulltext(parsed.body)
note_pl["hash_fulltext"] = h # im Schema vorgesehen :contentReference[oaicite:4]{index=4}
# Chunks + Payloads
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
# Embeddings (Chunks)
texts = [ch.text for ch in chunks]
vectors = embed_texts(texts)
# Optional: Note-Vektor
note_vec = embed_one(parsed.body) if args.embed_note else None
# Edges (aus Volltext + echten Chunk-Texten) :contentReference[oaicite:5]{index=5}
note_pl_for_edges = {"note_id": fm["id"], "title": fm.get("title"), "path": note_pl["path"], "fulltext": parsed.body}
chunks_for_links = []
for i, pl in enumerate(chunk_pls):
cid = pl.get("chunk_id") or pl.get("id")
txt = chunks[i].text if i < len(chunks) else ""
chunks_for_links.append({"chunk_id": cid, "text": txt})
edges = derive_wikilink_edges(note_pl_for_edges, chunks_for_links, note_index)
# Bestehende Note laden (für Hash-Vergleich)
existing = fetch_existing_note_payload(client, notes_col, fm["id"])
changed = args.force_replaces if False else False # placeholder, fixed below
if existing and isinstance(existing, dict):
old_h = existing.get("hash_fulltext")
changed = (old_h != h)
else:
changed = True # neu
# Dry-Run-Summary
print(json.dumps({
"note_id": fm["id"],
"title": fm["title"],
"chunks": len(chunk_pls),
"edges": len(edges),
"changed": changed or args.force_replaces,
"path": note_pl["path"],
}, ensure_ascii=False))
if not args.apply:
continue
# Replace-on-Change: vorherige Artefakte der Note löschen
if changed or args.force_replaces:
purge_note(client, cfg, fm["id"])
# Upserts
notes_col_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
upsert_batch(client, notes_col_name, note_pts)
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
upsert_batch(client, chunks_col, chunk_pts)
edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_col, edge_pts)
print(f"Done. Processed notes: {total_notes}")
if __name__ == "__main__":
main()