288 lines
12 KiB
Python
288 lines
12 KiB
Python
#!/usr/bin/env python3
|
||
# -*- coding: utf-8 -*-
|
||
"""
|
||
Name: scripts/import_markdown.py
|
||
Version: v2.2.2 (2025-09-05)
|
||
Kurzbeschreibung:
|
||
Importiert Obsidian-Markdown-Notes in Qdrant (Notes/Chunks/Edges).
|
||
- Leitet Wikilink-Edges (references/backlink/references_at) direkt aus Volltext + echten Chunk-Texten ab.
|
||
- Idempotenz: Ermittelt hash_fulltext; bei Änderung werden alte Chunks/Edges der Note entfernt (Replace-on-Change).
|
||
- Unveränderte Noten werden übersprungen (schnell).
|
||
- Purge für Edges robust gegen Client-/API-Unterschiede (Filter + Scroll-Delete Fallback).
|
||
|
||
Aufruf (aus Projekt-Root, im venv):
|
||
python3 -m scripts.import_markdown --vault ./vault [--apply] [--note-id NOTE_ID] [--embed-note] [--force-replace]
|
||
|
||
Parameter:
|
||
--vault Pfad zum Vault (z. B. ./vault)
|
||
--apply Führt Upserts in Qdrant aus (ohne Flag = Dry-Run mit JSON-Summaries)
|
||
--note-id Bearbeite nur eine konkrete Note-ID
|
||
--embed-note Optional: Note-Vektor (Volltext) zusätzlich einbetten
|
||
--force-replace Erzwingt Purge & Neuaufbau auch ohne Hash-Änderung (Debug)
|
||
|
||
Umgebungsvariablen (optional):
|
||
QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM (Default 384)
|
||
|
||
Exitcodes:
|
||
0 = OK, 2 = keine Markdown-Dateien gefunden
|
||
|
||
Hinweise:
|
||
- Wikilink-Ableitung basiert auf app.core.derive_edges (Slug-/ID-Auflösung, unresolved-Status). :contentReference[oaicite:3]{index=3}
|
||
- Für references_at werden echte Chunk-Texte übergeben (sonst würden sie fehlen). :contentReference[oaicite:4]{index=4}
|
||
- Purge verwendet:
|
||
* Chunks: payload.note_id == NOTE_ID
|
||
* Edges : (source_id == NOTE_ID) OR (target_id == NOTE_ID) OR (source_id startswith NOTE_ID + "#")
|
||
→ Falls MatchText/Prefix nicht unterstützt: Scroll & Delete-by-IDs als Fallback.
|
||
- Notes/Chunks/Edges bleiben kompatibel zu Validator & Backfill (UUIDv5, 1D-Edges).
|
||
|
||
Changelog:
|
||
v2.2.2: Entfernt minimum_should (Kompatibilität); Edge-Purge mit Scroll-Fallback für source_id-Prefix.
|
||
v2.2.1: Bugfix Tippfehler (args.force_replace).
|
||
v2.2.0: Hash-basierte Replace-on-Change-Logik; Purge pro Note; Skip unverändert. :contentReference[oaicite:6]{index=6}
|
||
v2.1.1: Sicherstellung references_at durch Übergabe echter Chunk-Texte. :contentReference[oaicite:7]{index=7}
|
||
v2.1.0: Vorab-Note-Index über Vault; direkte Edge-Ableitung. :contentReference[oaicite:8]{index=8}
|
||
"""
|
||
from __future__ import annotations
|
||
import argparse, os, glob, json, sys, hashlib
|
||
from typing import List, Dict, Optional
|
||
|
||
from dotenv import load_dotenv
|
||
from qdrant_client import QdrantClient
|
||
from qdrant_client.http import models as rest
|
||
|
||
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter
|
||
from app.core.note_payload import make_note_payload
|
||
from app.core.validate_note import validate_note_payload
|
||
from app.core.chunker import assemble_chunks
|
||
from app.core.chunk_payload import make_chunk_payloads
|
||
from app.core.embed import embed_texts, embed_one
|
||
from app.core.qdrant import QdrantConfig, ensure_collections, get_client
|
||
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
|
||
from app.core.derive_edges import build_note_index, derive_wikilink_edges # Wikilinks :contentReference[oaicite:9]{index=9}
|
||
|
||
# -------------------
|
||
# Utility / Helpers
|
||
# -------------------
|
||
|
||
def iter_md(root: str, exclude=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]:
|
||
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
|
||
out: List[str] = []
|
||
for p in files:
|
||
pn = p.replace("\\", "/")
|
||
if any(ex in pn for ex in exclude):
|
||
continue
|
||
out.append(p)
|
||
return out
|
||
|
||
def minimal_note_index_payload(abs_path: str, vault_root: str) -> Dict:
|
||
"""Nur Felder, die der Resolver braucht (id/title/path)."""
|
||
parsed = read_markdown(abs_path)
|
||
fm = normalize_frontmatter(parsed.frontmatter)
|
||
validate_required_frontmatter(fm)
|
||
relpath = os.path.relpath(abs_path, vault_root).replace("\\", "/")
|
||
return {"note_id": fm.get("id") or fm.get("note_id"), "title": fm.get("title"), "path": relpath}
|
||
|
||
def compute_hash_fulltext(text: str) -> str:
|
||
return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
||
|
||
def note_uuid5(note_id: str) -> str:
|
||
import uuid
|
||
return str(uuid.uuid5(uuid.NAMESPACE_URL, note_id))
|
||
|
||
def fetch_existing_note_payload(client: QdrantClient, notes_col: str, note_id: str) -> Optional[Dict]:
|
||
"""Hole bestehende Note (per deterministischem UUIDv5) – oder None."""
|
||
pid = note_uuid5(note_id)
|
||
try:
|
||
res = client.retrieve(collection_name=notes_col, ids=[pid], with_payload=True, with_vectors=False)
|
||
if not res:
|
||
return None
|
||
return res[0].payload or {}
|
||
except Exception:
|
||
return None
|
||
|
||
def _client_delete_points(client: QdrantClient, collection: str, selector_or_filter):
|
||
"""Kompat-Schicht: delete_points (neu) oder delete (alt)."""
|
||
if hasattr(client, "delete_points"):
|
||
return client.delete_points(collection, selector_or_filter, wait=True)
|
||
return client.delete(collection, selector_or_filter, wait=True)
|
||
|
||
def _scroll_edge_ids_by_source_prefix(client: QdrantClient, edges_col: str, source_prefix: str, batch: int = 1000) -> List[int]:
|
||
"""Sucht Edge-Point-IDs, deren payload.source_id mit 'source_prefix' beginnt (via Scroll)."""
|
||
# Wir holen alle Edges (mit payload) und filtern lokal – robust gegen fehlende Prefix-Operatoren.
|
||
next_offset = None
|
||
ids: List[int] = []
|
||
while True:
|
||
points, next_offset = client.scroll(
|
||
collection_name=edges_col,
|
||
limit=batch,
|
||
with_payload=True,
|
||
with_vectors=False,
|
||
offset=next_offset,
|
||
)
|
||
for p in points:
|
||
pl = p.payload or {}
|
||
sid = pl.get("source_id") or ""
|
||
if isinstance(sid, str) and sid.startswith(source_prefix):
|
||
if hasattr(p, "id") and isinstance(p.id, int):
|
||
ids.append(p.id)
|
||
if next_offset is None:
|
||
break
|
||
return ids
|
||
|
||
def purge_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> None:
|
||
"""Löscht alle Chunks & Edges einer Note (Replace-on-Change)."""
|
||
notes_col, chunks_col, edges_col = f"{cfg.prefix}_notes", f"{cfg.prefix}_chunks", f"{cfg.prefix}_edges"
|
||
|
||
# Chunks: payload.note_id == NOTE_ID
|
||
f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||
_client_delete_points(client, chunks_col, f_chunks)
|
||
|
||
# Edges: (source_id == NOTE_ID) OR (target_id == NOTE_ID) OR (source_id startswith NOTE_ID + "#")
|
||
conds = [
|
||
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
|
||
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
|
||
]
|
||
# Versuche MatchText (falls verfügbar) für Prefix "NOTE_ID#"
|
||
try:
|
||
conds.append(rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")))
|
||
f_edges = rest.Filter(should=conds) # kein minimum_should in deiner Client-Version
|
||
_client_delete_points(client, edges_col, f_edges)
|
||
except Exception:
|
||
# Fallback: Scroll & Delete-by-IDs für source_id prefix
|
||
f_edges_basic = rest.Filter(should=conds) # lösche erst exakte Note-Edges
|
||
_client_delete_points(client, edges_col, f_edges_basic)
|
||
|
||
# jetzt alle Kanten mit source_id == NOTE_ID#* auf IDs suchen und gezielt löschen
|
||
ids = _scroll_edge_ids_by_source_prefix(client, edges_col, f"{note_id}#")
|
||
if ids:
|
||
# Delete-by-IDs
|
||
selector = rest.PointIdsList(points=ids)
|
||
_client_delete_points(client, edges_col, selector)
|
||
|
||
# -------------------
|
||
# Main
|
||
# -------------------
|
||
|
||
def main():
|
||
load_dotenv()
|
||
ap = argparse.ArgumentParser()
|
||
ap.add_argument("--vault", required=True, help="Obsidian Vault Pfad (z.B. mindnet/vault)")
|
||
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)")
|
||
ap.add_argument("--note-id", help="Nur eine Note-ID verarbeiten")
|
||
ap.add_argument("--embed-note", action="store_true", help="Auch Note-Volltext einbetten (optional)")
|
||
ap.add_argument("--force-replace", action="store_true", help="Purge & Neuaufbau erzwingen (Debug)")
|
||
args = ap.parse_args()
|
||
|
||
# Qdrant
|
||
cfg = QdrantConfig(
|
||
url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"),
|
||
api_key=os.getenv("QDRANT_API_KEY") or None,
|
||
prefix=os.getenv("COLLECTION_PREFIX", "mindnet"),
|
||
dim=int(os.getenv("VECTOR_DIM", "384")),
|
||
)
|
||
client = get_client(cfg)
|
||
ensure_collections(client, cfg.prefix, cfg.dim)
|
||
|
||
root = os.path.abspath(args.vault)
|
||
files = iter_md(root)
|
||
if not files:
|
||
print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2)
|
||
|
||
# 1) Vorab-Lauf: globaler Note-Index für robuste Auflösung
|
||
index_payloads: List[Dict] = []
|
||
for path in files:
|
||
try:
|
||
pl = minimal_note_index_payload(path, root)
|
||
if not pl.get("note_id"):
|
||
continue
|
||
if args.note_id and pl["note_id"] != args.note_id:
|
||
continue
|
||
index_payloads.append(pl)
|
||
except Exception:
|
||
continue
|
||
note_index = build_note_index(index_payloads) # by_id/by_slug/by_file_slug :contentReference[oaicite:10]{index=10}
|
||
|
||
notes_col = f"{cfg.prefix}_notes"
|
||
total_notes = 0
|
||
|
||
# 2) Hauptlauf pro Datei
|
||
for path in files:
|
||
parsed = read_markdown(path)
|
||
fm = normalize_frontmatter(parsed.frontmatter)
|
||
try:
|
||
validate_required_frontmatter(fm)
|
||
except Exception:
|
||
continue
|
||
if args.note_id and fm.get("id") != args.note_id:
|
||
continue
|
||
|
||
total_notes += 1
|
||
|
||
# Note-Payload inkl. hash_fulltext
|
||
note_pl = make_note_payload(parsed, vault_root=root)
|
||
validate_note_payload(note_pl)
|
||
h = compute_hash_fulltext(parsed.body)
|
||
note_pl["hash_fulltext"] = h # Feld ist im Schema vorgesehen :contentReference[oaicite:11]{index=11}
|
||
|
||
# Chunks + Payloads
|
||
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
|
||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
|
||
|
||
# Embeddings (Chunks)
|
||
texts = [ch.text for ch in chunks]
|
||
vectors = embed_texts(texts)
|
||
|
||
# Optional: Note-Vektor
|
||
note_vec = embed_one(parsed.body) if args.embed_note else None
|
||
|
||
# Edges (aus Volltext + echten Chunk-Texten) :contentReference[oaicite:12]{index=12}
|
||
note_pl_for_edges = {"note_id": fm["id"], "title": fm.get("title"), "path": note_pl["path"], "fulltext": parsed.body}
|
||
chunks_for_links = []
|
||
for i, pl in enumerate(chunk_pls):
|
||
cid = pl.get("chunk_id") or pl.get("id")
|
||
txt = chunks[i].text if i < len(chunks) else ""
|
||
chunks_for_links.append({"chunk_id": cid, "text": txt})
|
||
edges = derive_wikilink_edges(note_pl_for_edges, chunks_for_links, note_index)
|
||
|
||
# Bestehende Note laden (für Hash-Vergleich)
|
||
existing = fetch_existing_note_payload(client, notes_col, fm["id"])
|
||
changed = False
|
||
if existing and isinstance(existing, dict):
|
||
old_h = existing.get("hash_fulltext")
|
||
changed = (old_h != h)
|
||
else:
|
||
changed = True # neu
|
||
|
||
# Dry-Run-Summary
|
||
print(json.dumps({
|
||
"note_id": fm["id"],
|
||
"title": fm["title"],
|
||
"chunks": len(chunk_pls),
|
||
"edges": len(edges),
|
||
"changed": changed or args.force_replace,
|
||
"path": note_pl["path"],
|
||
}, ensure_ascii=False))
|
||
|
||
if not args.apply:
|
||
continue
|
||
|
||
# Replace-on-Change: vorherige Artefakte der Note löschen
|
||
if changed or args.force_replace:
|
||
purge_note(client, cfg, fm["id"])
|
||
|
||
# Upserts
|
||
notes_col_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
|
||
upsert_batch(client, notes_col_name, note_pts)
|
||
|
||
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
|
||
upsert_batch(client, chunks_col, chunk_pts)
|
||
|
||
edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
|
||
upsert_batch(client, edges_col, edge_pts)
|
||
|
||
print(f"Done. Processed notes: {total_notes}")
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|