231 lines
9.4 KiB
Python
231 lines
9.4 KiB
Python
#!/usr/bin/env python3
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
Name: scripts/import_markdown.py
|
|
Version: v2.2.4 (2025-09-05)
|
|
|
|
Kurzbeschreibung:
|
|
Importiert Obsidian-Markdown-Notes in Qdrant (Notes/Chunks/Edges).
|
|
- Edges direkt aus Wikilinks: references / backlink / references_at.
|
|
- Idempotenz: hash_fulltext; bei Änderung Purge & Neuaufbau pro Note.
|
|
- **Sicher im Dry-Run**: keine destruktiven Collection-Änderungen.
|
|
|
|
Aufruf:
|
|
python3 -m scripts.import_markdown --vault ./vault [--apply] [--note-id NOTE_ID] [--embed-note] [--force-replace] [--version]
|
|
|
|
Parameter:
|
|
--vault Vault-Pfad
|
|
--apply schreibt nach Qdrant (ohne: Dry-Run)
|
|
--note-id nur bestimmte Note-ID
|
|
--embed-note optional Note-Vektor
|
|
--force-replace erzwingt Purge auch ohne Hash-Änderung
|
|
--version zeigt Script-Version
|
|
|
|
Wichtige Quellen:
|
|
- Edge-Ableitung (Wikilinks): app/core/derive_edges.py
|
|
- Qdrant-Setup & Points: app/core/qdrant.py, app/core/qdrant_points.py
|
|
|
|
Changelog:
|
|
v2.2.4: ensure_collections(..., destructive=False) → verhindert Edge-Drop im Dry-Run.
|
|
v2.2.3: Version-Flag, Hash/Purge-Fix, Scroll-Fallback.
|
|
"""
|
|
from __future__ import annotations
|
|
import argparse, os, glob, json, sys, hashlib
|
|
from typing import List, Dict, Optional
|
|
|
|
from dotenv import load_dotenv
|
|
from qdrant_client import QdrantClient
|
|
from qdrant_client.http import models as rest
|
|
|
|
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter # :contentReference[oaicite:4]{index=4}
|
|
from app.core.note_payload import make_note_payload
|
|
from app.core.validate_note import validate_note_payload
|
|
from app.core.chunker import assemble_chunks
|
|
from app.core.chunk_payload import make_chunk_payloads
|
|
from app.core.embed import embed_texts, embed_one
|
|
from app.core.qdrant import QdrantConfig, ensure_collections, get_client # (neu: destructive=False) :contentReference[oaicite:5]{index=5}
|
|
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch # :contentReference[oaicite:6]{index=6}
|
|
from app.core.derive_edges import build_note_index, derive_wikilink_edges # :contentReference[oaicite:7]{index=7}
|
|
|
|
SCRIPT_VERSION = "v2.2.4"
|
|
|
|
def iter_md(root: str, exclude=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]:
|
|
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
|
|
out: List[str] = []
|
|
for p in files:
|
|
pn = p.replace("\\", "/")
|
|
if any(ex in pn for ex in exclude):
|
|
continue
|
|
out.append(p)
|
|
return out
|
|
|
|
def compute_hash_fulltext(text: str) -> str:
|
|
return hashlib.sha256(text.encode("utf-8")).hexdigest()
|
|
|
|
def note_uuid5(note_id: str) -> str:
|
|
import uuid
|
|
return str(uuid.uuid5(uuid.NAMESPACE_URL, note_id))
|
|
|
|
def fetch_existing_note_payload(client: QdrantClient, notes_col: str, note_id: str) -> Optional[Dict]:
|
|
pid = note_uuid5(note_id)
|
|
try:
|
|
res = client.retrieve(collection_name=notes_col, ids=[pid], with_payload=True, with_vectors=False)
|
|
if not res:
|
|
return None
|
|
return res[0].payload or {}
|
|
except Exception:
|
|
return None
|
|
|
|
def _client_delete_points(client: QdrantClient, collection: str, selector_or_filter):
|
|
if hasattr(client, "delete_points"):
|
|
return client.delete_points(collection, selector_or_filter, wait=True)
|
|
return client.delete(collection, selector_or_filter, wait=True)
|
|
|
|
def _scroll_edge_ids_by_source_prefix(client: QdrantClient, edges_col: str, source_prefix: str, batch: int = 1000) -> List[int]:
|
|
next_offset = None
|
|
ids: List[int] = []
|
|
while True:
|
|
points, next_offset = client.scroll(
|
|
collection_name=edges_col,
|
|
limit=batch,
|
|
with_payload=True,
|
|
with_vectors=False,
|
|
offset=next_offset,
|
|
)
|
|
for p in points:
|
|
pl = p.payload or {}
|
|
sid = pl.get("source_id") or ""
|
|
if isinstance(sid, str) and sid.startswith(source_prefix):
|
|
if hasattr(p, "id") and isinstance(p.id, int):
|
|
ids.append(p.id)
|
|
if next_offset is None:
|
|
break
|
|
return ids
|
|
|
|
def purge_note(client: QdrantClient, prefix: str, note_id: str) -> None:
|
|
notes_col, chunks_col, edges_col = f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
|
|
|
|
f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
|
_client_delete_points(client, chunks_col, f_chunks)
|
|
|
|
conds = [
|
|
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
|
|
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
|
|
]
|
|
try:
|
|
conds.append(rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")))
|
|
f_edges = rest.Filter(should=conds)
|
|
_client_delete_points(client, edges_col, f_edges)
|
|
except Exception:
|
|
f_edges_basic = rest.Filter(should=conds)
|
|
_client_delete_points(client, edges_col, f_edges_basic)
|
|
ids = _scroll_edge_ids_by_source_prefix(client, edges_col, f"{note_id}#")
|
|
if ids:
|
|
selector = rest.PointIdsList(points=ids)
|
|
_client_delete_points(client, edges_col, selector)
|
|
|
|
def main():
|
|
load_dotenv()
|
|
ap = argparse.ArgumentParser()
|
|
ap.add_argument("--vault", required=True, help="Obsidian Vault Pfad (z. B. mindnet/vault)")
|
|
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)")
|
|
ap.add_argument("--note-id", help="Nur eine Note-ID verarbeiten")
|
|
ap.add_argument("--embed-note", action="store_true", help="Auch Note-Volltext einbetten (optional)")
|
|
ap.add_argument("--force-replace", action="store_true", help="Purge & Neuaufbau erzwingen (Debug)")
|
|
ap.add_argument("--version", action="store_true", help="Script-Version anzeigen und beenden")
|
|
args = ap.parse_args()
|
|
|
|
if args.version:
|
|
print(SCRIPT_VERSION); return
|
|
print(f"[import_markdown] {SCRIPT_VERSION}", file=sys.stderr)
|
|
|
|
cfg = QdrantConfig.from_env()
|
|
client = get_client(cfg)
|
|
# **NICHT destruktiv**: keine Collection-Recreation im Dry-Run
|
|
ensure_collections(client, cfg.prefix, cfg.dim, destructive=False) # <- wichtig :contentReference[oaicite:8]{index=8}
|
|
|
|
root = os.path.abspath(args.vault)
|
|
files = iter_md(root)
|
|
if not files:
|
|
print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2)
|
|
|
|
# Index für Linkauflösung (by_id/by_slug/by_file_slug)
|
|
index_payloads: List[Dict] = []
|
|
for path in files:
|
|
try:
|
|
parsed = read_markdown(path)
|
|
fm = normalize_frontmatter(parsed.frontmatter)
|
|
validate_required_frontmatter(fm)
|
|
if args.note_id and fm.get("id") != args.note_id:
|
|
continue
|
|
relpath = os.path.relpath(path, root).replace("\\", "/")
|
|
index_payloads.append({"note_id": fm["id"], "title": fm.get("title"), "path": relpath})
|
|
except Exception:
|
|
continue
|
|
note_index = build_note_index(index_payloads)
|
|
|
|
notes_col = f"{cfg.prefix}_notes"
|
|
total_notes = 0
|
|
|
|
for path in files:
|
|
parsed = read_markdown(path)
|
|
fm = normalize_frontmatter(parsed.frontmatter)
|
|
try:
|
|
validate_required_frontmatter(fm)
|
|
except Exception:
|
|
continue
|
|
if args.note_id and fm.get("id") != args.note_id:
|
|
continue
|
|
|
|
total_notes += 1
|
|
|
|
note_pl = make_note_payload(parsed, vault_root=root)
|
|
validate_note_payload(note_pl)
|
|
h = compute_hash_fulltext(parsed.body)
|
|
note_pl["hash_fulltext"] = h
|
|
|
|
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
|
|
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
|
|
|
|
texts = [ch.text for ch in chunks]
|
|
vectors = embed_texts(texts)
|
|
|
|
note_vec = embed_one(parsed.body) if args.embed_note else None
|
|
|
|
# Edges aus Volltext + Chunk-Texten
|
|
note_pl_for_edges = {"note_id": fm["id"], "title": fm.get("title"), "path": note_pl["path"], "fulltext": parsed.body}
|
|
chunks_for_links = [{"chunk_id": (pl.get("chunk_id") or pl.get("id") or f"{fm['id']}#{i+1}"),
|
|
"text": chunks[i].text} for i, pl in enumerate(chunk_pls) if i < len(chunks)]
|
|
edges = derive_wikilink_edges(note_pl_for_edges, chunks_for_links, note_index)
|
|
|
|
# Hash-Vergleich
|
|
existing = fetch_existing_note_payload(client, notes_col, fm["id"])
|
|
changed = True if not existing else (existing.get("hash_fulltext") != h)
|
|
|
|
# Dry-Run-Output
|
|
print(json.dumps({"note_id": fm["id"], "title": fm["title"], "chunks": len(chunk_pls),
|
|
"edges": len(edges), "changed": changed or args.force_replace,
|
|
"path": note_pl["path"]}, ensure_ascii=False))
|
|
|
|
if not args.apply:
|
|
continue
|
|
|
|
# Replace-on-Change
|
|
if changed or args.force_replace:
|
|
purge_note(client, cfg.prefix, fm["id"])
|
|
|
|
# Upserts
|
|
notes_col_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
|
|
upsert_batch(client, notes_col_name, note_pts)
|
|
|
|
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
|
|
upsert_batch(client, chunks_col, chunk_pts)
|
|
|
|
edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
|
|
upsert_batch(client, edges_col, edge_pts)
|
|
|
|
print(f"Done. Processed notes: {total_notes}")
|
|
|
|
if __name__ == "__main__":
|
|
main()
|