mindnet/scripts/import_markdown.py
Lars d9e30d5fb4
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s
scripts/import_markdown.py aktualisiert
2025-09-05 09:02:20 +02:00

287 lines
12 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
Name: scripts/import_markdown.py
Version: v2.2.3 (2025-09-05)
Kurzbeschreibung:
Importiert Obsidian-Markdown-Notes in Qdrant (Notes/Chunks/Edges).
- Edges direkt aus Wikilinks: references / backlink / references_at (Chunk-basiert).
- Idempotenz: hash_fulltext; bei Änderung Purge & Neuaufbau (Chunks/Edges) pro Note.
- Unveränderte Notes werden übersprungen.
Aufruf (aus Projekt-Root, im venv):
python3 -m scripts.import_markdown --vault ./vault [--apply] [--note-id NOTE_ID] [--embed-note] [--force-replace] [--version]
Parameter:
--vault Pfad zum Vault (z. B. ./vault)
--apply Schreibt in Qdrant (ohne Flag = Dry-Run; nur JSON-Zeilen je Note)
--note-id Verarbeite nur die angegebene Note-ID
--embed-note Optional: Note-Vektor (Volltext) zusätzlich einbetten
--force-replace Erzwingt Purge & Neuaufbau auch ohne Hash-Änderung (Debug)
--version Druckt nur die Script-Version und beendet
Umgebungsvariablen (optional):
QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM (Default 384)
Exitcodes:
0 = OK, 2 = keine Markdown-Dateien gefunden
Wichtige Quellen:
- Edge-Ableitung (neu, Wikilinks): app/core/derive_edges.py (build_note_index, derive_wikilink_edges)
- Qdrant-Setup (1D-Edges, COSINE): app/core/qdrant.py
- Upsert/ID-Vergabe (UUIDv5): app/core/qdrant_points.py
- (Legacy) app/core/edges.py ist NICHT mehr im Importer verdrahtet (Kompatibilität bleibt, aber nicht aktiv). :contentReference[oaicite:6]{index=6}
Changelog:
v2.2.3: Schutz gegen Legacy-Import-Pfad; --version-Flag; klare Log-Zeile mit Script-Version.
v2.2.2: Purge kompatibel gemacht (ohne minimum_should); Scroll-Fallback für source_id-Prefix.
v2.2.1: Bugfix Tippfehler (args.force_replace).
v2.2.0: Hash-basierte Replace-on-Change-Logik; Purge pro Note; Skip unverändert.
"""
from __future__ import annotations
import argparse, os, glob, json, sys, hashlib
from typing import List, Dict, Optional
from dotenv import load_dotenv
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter # :contentReference[oaicite:7]{index=7}
from app.core.note_payload import make_note_payload
from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
from app.core.embed import embed_texts, embed_one
from app.core.qdrant import QdrantConfig, ensure_collections, get_client #
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch # :contentReference[oaicite:9]{index=9}
from app.core.derive_edges import build_note_index, derive_wikilink_edges # NEU/aktiv
SCRIPT_VERSION = "v2.2.3"
# -------------------
# Utility / Helpers
# -------------------
def iter_md(root: str, exclude=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]:
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
out: List[str] = []
for p in files:
pn = p.replace("\\", "/")
if any(ex in pn for ex in exclude):
continue
out.append(p)
return out
def minimal_note_index_payload(abs_path: str, vault_root: str) -> Dict:
"""Nur Felder, die der Resolver braucht (id/title/path)."""
parsed = read_markdown(abs_path)
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
relpath = os.path.relpath(abs_path, vault_root).replace("\\", "/")
return {"note_id": fm.get("id") or fm.get("note_id"), "title": fm.get("title"), "path": relpath}
def compute_hash_fulltext(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def note_uuid5(note_id: str) -> str:
import uuid
return str(uuid.uuid5(uuid.NAMESPACE_URL, note_id))
def fetch_existing_note_payload(client: QdrantClient, notes_col: str, note_id: str) -> Optional[Dict]:
"""Hole bestehende Note (per deterministischem UUIDv5) oder None."""
pid = note_uuid5(note_id)
try:
res = client.retrieve(collection_name=notes_col, ids=[pid], with_payload=True, with_vectors=False)
if not res:
return None
return res[0].payload or {}
except Exception:
return None
def _client_delete_points(client: QdrantClient, collection: str, selector_or_filter):
"""Kompat-Schicht: delete_points (neu) oder delete (alt)."""
if hasattr(client, "delete_points"):
return client.delete_points(collection, selector_or_filter, wait=True)
return client.delete(collection, selector_or_filter, wait=True)
def _scroll_edge_ids_by_source_prefix(client: QdrantClient, edges_col: str, source_prefix: str, batch: int = 1000) -> List[int]:
"""Sucht Edge-Point-IDs, deren payload.source_id mit 'source_prefix' beginnt (via Scroll)."""
next_offset = None
ids: List[int] = []
while True:
points, next_offset = client.scroll(
collection_name=edges_col,
limit=batch,
with_payload=True,
with_vectors=False,
offset=next_offset,
)
for p in points:
pl = p.payload or {}
sid = pl.get("source_id") or ""
if isinstance(sid, str) and sid.startswith(source_prefix):
if hasattr(p, "id") and isinstance(p.id, int):
ids.append(p.id)
if next_offset is None:
break
return ids
def purge_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> None:
"""Löscht alle Chunks & Edges einer Note (Replace-on-Change)."""
_, chunks_col, edges_col = f"{cfg.prefix}_notes", f"{cfg.prefix}_chunks", f"{cfg.prefix}_edges"
# Chunks: payload.note_id == NOTE_ID
f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
_client_delete_points(client, chunks_col, f_chunks)
# Edges: (source_id == NOTE_ID) OR (target_id == NOTE_ID) OR (source_id startswith NOTE_ID + "#")
conds = [
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
]
try:
conds.append(rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")))
f_edges = rest.Filter(should=conds)
_client_delete_points(client, edges_col, f_edges)
except Exception:
f_edges_basic = rest.Filter(should=conds)
_client_delete_points(client, edges_col, f_edges_basic)
ids = _scroll_edge_ids_by_source_prefix(client, edges_col, f"{note_id}#")
if ids:
selector = rest.PointIdsList(points=ids)
_client_delete_points(client, edges_col, selector)
# -------------------
# Main
# -------------------
def main():
load_dotenv()
ap = argparse.ArgumentParser()
ap.add_argument("--vault", required=True, help="Obsidian Vault Pfad (z. B. mindnet/vault)")
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)")
ap.add_argument("--note-id", help="Nur eine Note-ID verarbeiten")
ap.add_argument("--embed-note", action="store_true", help="Auch Note-Volltext einbetten (optional)")
ap.add_argument("--force-replace", action="store_true", help="Purge & Neuaufbau erzwingen (Debug)")
ap.add_argument("--version", action="store_true", help="Script-Version anzeigen und beenden")
args = ap.parse_args()
if args.version:
print(SCRIPT_VERSION); return
# Deutliche Signatur in der Konsole
print(f"[import_markdown] {SCRIPT_VERSION}", file=sys.stderr)
# Qdrant
cfg = QdrantConfig(
url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"),
api_key=os.getenv("QDRANT_API_KEY") or None,
prefix=os.getenv("COLLECTION_PREFIX", "mindnet"),
dim=int(os.getenv("VECTOR_DIM", "384")),
)
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim) # legt u. a. Edges-Collection (1D DOT) an
root = os.path.abspath(args.vault)
files = iter_md(root)
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2)
# 1) Vorab-Lauf: globaler Note-Index für robuste Wikilink-Auflösung
index_payloads: List[Dict] = []
for path in files:
try:
pl = minimal_note_index_payload(path, root)
if not pl.get("note_id"):
continue
if args.note_id and pl["note_id"] != args.note_id:
continue
index_payloads.append(pl)
except Exception:
continue
note_index = build_note_index(index_payloads) # by_id/by_slug/by_file_slug :contentReference[oaicite:12]{index=12}
notes_col = f"{cfg.prefix}_notes"
total_notes = 0
# 2) Hauptlauf pro Datei
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
try:
validate_required_frontmatter(fm)
except Exception:
continue
if args.note_id and fm.get("id") != args.note_id:
continue
total_notes += 1
# Note-Payload inkl. hash_fulltext
note_pl = make_note_payload(parsed, vault_root=root)
validate_note_payload(note_pl)
h = compute_hash_fulltext(parsed.body)
note_pl["hash_fulltext"] = h # im Schema vorgesehen
# Chunks + Payloads
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
# Embeddings (Chunks)
texts = [ch.text for ch in chunks]
vectors = embed_texts(texts)
# Optional: Note-Vektor
note_vec = embed_one(parsed.body) if args.embed_note else None
# Edges (aus Volltext + echten Chunk-Texten)
note_pl_for_edges = {"note_id": fm["id"], "title": fm.get("title"), "path": note_pl["path"], "fulltext": parsed.body}
chunks_for_links = []
for i, pl in enumerate(chunk_pls):
cid = pl.get("chunk_id") or pl.get("id")
txt = chunks[i].text if i < len(chunks) else ""
chunks_for_links.append({"chunk_id": cid, "text": txt})
edges = derive_wikilink_edges(note_pl_for_edges, chunks_for_links, note_index) # :contentReference[oaicite:13]{index=13}
# Bestehende Note laden (für Hash-Vergleich)
existing = fetch_existing_note_payload(client, notes_col, fm["id"])
changed = False
if existing and isinstance(existing, dict):
old_h = existing.get("hash_fulltext")
changed = (old_h != h)
else:
changed = True # neu
# Dry-Run-Summary
print(json.dumps({
"note_id": fm["id"],
"title": fm["title"],
"chunks": len(chunk_pls),
"edges": len(edges),
"changed": changed or args.force_replace,
"path": note_pl["path"],
}, ensure_ascii=False))
if not args.apply:
continue
# Replace-on-Change: vorherige Artefakte der Note löschen
if changed or args.force_replace:
purge_note(client, cfg, fm["id"])
# Upserts
notes_col_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
upsert_batch(client, notes_col_name, note_pts)
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
upsert_batch(client, chunks_col, chunk_pts)
edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_col, edge_pts)
print(f"Done. Processed notes: {total_notes}")
if __name__ == "__main__":
main()