scripts/import_markdown.py aktualisiert
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s

This commit is contained in:
Lars 2025-09-05 09:22:50 +02:00
parent 4faf86fbbf
commit 7f9981bf31

View File

@ -2,41 +2,32 @@
# -*- coding: utf-8 -*-
"""
Name: scripts/import_markdown.py
Version: v2.2.3 (2025-09-05)
Version: v2.2.4 (2025-09-05)
Kurzbeschreibung:
Importiert Obsidian-Markdown-Notes in Qdrant (Notes/Chunks/Edges).
- Edges direkt aus Wikilinks: references / backlink / references_at (Chunk-basiert).
- Idempotenz: hash_fulltext; bei Änderung Purge & Neuaufbau (Chunks/Edges) pro Note.
- Unveränderte Notes werden übersprungen.
- Edges direkt aus Wikilinks: references / backlink / references_at.
- Idempotenz: hash_fulltext; bei Änderung Purge & Neuaufbau pro Note.
- **Sicher im Dry-Run**: keine destruktiven Collection-Änderungen.
Aufruf (aus Projekt-Root, im venv):
Aufruf:
python3 -m scripts.import_markdown --vault ./vault [--apply] [--note-id NOTE_ID] [--embed-note] [--force-replace] [--version]
Parameter:
--vault Pfad zum Vault (z. B. ./vault)
--apply Schreibt in Qdrant (ohne Flag = Dry-Run; nur JSON-Zeilen je Note)
--note-id Verarbeite nur die angegebene Note-ID
--embed-note Optional: Note-Vektor (Volltext) zusätzlich einbetten
--force-replace Erzwingt Purge & Neuaufbau auch ohne Hash-Änderung (Debug)
--version Druckt nur die Script-Version und beendet
Umgebungsvariablen (optional):
QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM (Default 384)
Exitcodes:
0 = OK, 2 = keine Markdown-Dateien gefunden
--vault Vault-Pfad
--apply schreibt nach Qdrant (ohne: Dry-Run)
--note-id nur bestimmte Note-ID
--embed-note optional Note-Vektor
--force-replace erzwingt Purge auch ohne Hash-Änderung
--version zeigt Script-Version
Wichtige Quellen:
- Edge-Ableitung (neu, Wikilinks): app/core/derive_edges.py (build_note_index, derive_wikilink_edges)
- Qdrant-Setup (1D-Edges, COSINE): app/core/qdrant.py
- Upsert/ID-Vergabe (UUIDv5): app/core/qdrant_points.py
- (Legacy) app/core/edges.py ist NICHT mehr im Importer verdrahtet (Kompatibilität bleibt, aber nicht aktiv). :contentReference[oaicite:6]{index=6}
- Edge-Ableitung (Wikilinks): app/core/derive_edges.py
- Qdrant-Setup & Points: app/core/qdrant.py, app/core/qdrant_points.py
Changelog:
v2.2.3: Schutz gegen Legacy-Import-Pfad; --version-Flag; klare Log-Zeile mit Script-Version.
v2.2.2: Purge kompatibel gemacht (ohne minimum_should); Scroll-Fallback für source_id-Prefix.
v2.2.1: Bugfix Tippfehler (args.force_replace).
v2.2.0: Hash-basierte Replace-on-Change-Logik; Purge pro Note; Skip unverändert.
v2.2.4: ensure_collections(..., destructive=False) verhindert Edge-Drop im Dry-Run.
v2.2.3: Version-Flag, Hash/Purge-Fix, Scroll-Fallback.
"""
from __future__ import annotations
import argparse, os, glob, json, sys, hashlib
@ -46,21 +37,17 @@ from dotenv import load_dotenv
from qdrant_client import QdrantClient
from qdrant_client.http import models as rest
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter # :contentReference[oaicite:7]{index=7}
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter # :contentReference[oaicite:4]{index=4}
from app.core.note_payload import make_note_payload
from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
from app.core.embed import embed_texts, embed_one
from app.core.qdrant import QdrantConfig, ensure_collections, get_client #
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch # :contentReference[oaicite:9]{index=9}
from app.core.derive_edges import build_note_index, derive_wikilink_edges # NEU/aktiv
from app.core.qdrant import QdrantConfig, ensure_collections, get_client # (neu: destructive=False) :contentReference[oaicite:5]{index=5}
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch # :contentReference[oaicite:6]{index=6}
from app.core.derive_edges import build_note_index, derive_wikilink_edges # :contentReference[oaicite:7]{index=7}
SCRIPT_VERSION = "v2.2.3"
# -------------------
# Utility / Helpers
# -------------------
SCRIPT_VERSION = "v2.2.4"
def iter_md(root: str, exclude=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]:
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
@ -72,14 +59,6 @@ def iter_md(root: str, exclude=("/.obsidian/", "/_backup_frontmatter/", "/_impor
out.append(p)
return out
def minimal_note_index_payload(abs_path: str, vault_root: str) -> Dict:
"""Nur Felder, die der Resolver braucht (id/title/path)."""
parsed = read_markdown(abs_path)
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
relpath = os.path.relpath(abs_path, vault_root).replace("\\", "/")
return {"note_id": fm.get("id") or fm.get("note_id"), "title": fm.get("title"), "path": relpath}
def compute_hash_fulltext(text: str) -> str:
return hashlib.sha256(text.encode("utf-8")).hexdigest()
@ -88,7 +67,6 @@ def note_uuid5(note_id: str) -> str:
return str(uuid.uuid5(uuid.NAMESPACE_URL, note_id))
def fetch_existing_note_payload(client: QdrantClient, notes_col: str, note_id: str) -> Optional[Dict]:
"""Hole bestehende Note (per deterministischem UUIDv5) oder None."""
pid = note_uuid5(note_id)
try:
res = client.retrieve(collection_name=notes_col, ids=[pid], with_payload=True, with_vectors=False)
@ -99,13 +77,11 @@ def fetch_existing_note_payload(client: QdrantClient, notes_col: str, note_id: s
return None
def _client_delete_points(client: QdrantClient, collection: str, selector_or_filter):
"""Kompat-Schicht: delete_points (neu) oder delete (alt)."""
if hasattr(client, "delete_points"):
return client.delete_points(collection, selector_or_filter, wait=True)
return client.delete(collection, selector_or_filter, wait=True)
def _scroll_edge_ids_by_source_prefix(client: QdrantClient, edges_col: str, source_prefix: str, batch: int = 1000) -> List[int]:
"""Sucht Edge-Point-IDs, deren payload.source_id mit 'source_prefix' beginnt (via Scroll)."""
next_offset = None
ids: List[int] = []
while True:
@ -126,15 +102,12 @@ def _scroll_edge_ids_by_source_prefix(client: QdrantClient, edges_col: str, sour
break
return ids
def purge_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> None:
"""Löscht alle Chunks & Edges einer Note (Replace-on-Change)."""
_, chunks_col, edges_col = f"{cfg.prefix}_notes", f"{cfg.prefix}_chunks", f"{cfg.prefix}_edges"
def purge_note(client: QdrantClient, prefix: str, note_id: str) -> None:
notes_col, chunks_col, edges_col = f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
# Chunks: payload.note_id == NOTE_ID
f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
_client_delete_points(client, chunks_col, f_chunks)
# Edges: (source_id == NOTE_ID) OR (target_id == NOTE_ID) OR (source_id startswith NOTE_ID + "#")
conds = [
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
@ -151,10 +124,6 @@ def purge_note(client: QdrantClient, cfg: QdrantConfig, note_id: str) -> None:
selector = rest.PointIdsList(points=ids)
_client_delete_points(client, edges_col, selector)
# -------------------
# Main
# -------------------
def main():
load_dotenv()
ap = argparse.ArgumentParser()
@ -168,43 +137,36 @@ def main():
if args.version:
print(SCRIPT_VERSION); return
# Deutliche Signatur in der Konsole
print(f"[import_markdown] {SCRIPT_VERSION}", file=sys.stderr)
# Qdrant
cfg = QdrantConfig(
url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"),
api_key=os.getenv("QDRANT_API_KEY") or None,
prefix=os.getenv("COLLECTION_PREFIX", "mindnet"),
dim=int(os.getenv("VECTOR_DIM", "384")),
)
cfg = QdrantConfig.from_env()
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim) # legt u. a. Edges-Collection (1D DOT) an
# **NICHT destruktiv**: keine Collection-Recreation im Dry-Run
ensure_collections(client, cfg.prefix, cfg.dim, destructive=False) # <- wichtig :contentReference[oaicite:8]{index=8}
root = os.path.abspath(args.vault)
files = iter_md(root)
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr); sys.exit(2)
# 1) Vorab-Lauf: globaler Note-Index für robuste Wikilink-Auflösung
# Index für Linkauflösung (by_id/by_slug/by_file_slug)
index_payloads: List[Dict] = []
for path in files:
try:
pl = minimal_note_index_payload(path, root)
if not pl.get("note_id"):
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
if args.note_id and fm.get("id") != args.note_id:
continue
if args.note_id and pl["note_id"] != args.note_id:
continue
index_payloads.append(pl)
relpath = os.path.relpath(path, root).replace("\\", "/")
index_payloads.append({"note_id": fm["id"], "title": fm.get("title"), "path": relpath})
except Exception:
continue
note_index = build_note_index(index_payloads) # by_id/by_slug/by_file_slug :contentReference[oaicite:12]{index=12}
note_index = build_note_index(index_payloads)
notes_col = f"{cfg.prefix}_notes"
total_notes = 0
# 2) Hauptlauf pro Datei
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
@ -217,57 +179,40 @@ def main():
total_notes += 1
# Note-Payload inkl. hash_fulltext
note_pl = make_note_payload(parsed, vault_root=root)
validate_note_payload(note_pl)
h = compute_hash_fulltext(parsed.body)
note_pl["hash_fulltext"] = h # im Schema vorgesehen
note_pl["hash_fulltext"] = h
# Chunks + Payloads
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
# Embeddings (Chunks)
texts = [ch.text for ch in chunks]
vectors = embed_texts(texts)
# Optional: Note-Vektor
note_vec = embed_one(parsed.body) if args.embed_note else None
# Edges (aus Volltext + echten Chunk-Texten)
# Edges aus Volltext + Chunk-Texten
note_pl_for_edges = {"note_id": fm["id"], "title": fm.get("title"), "path": note_pl["path"], "fulltext": parsed.body}
chunks_for_links = []
for i, pl in enumerate(chunk_pls):
cid = pl.get("chunk_id") or pl.get("id")
txt = chunks[i].text if i < len(chunks) else ""
chunks_for_links.append({"chunk_id": cid, "text": txt})
edges = derive_wikilink_edges(note_pl_for_edges, chunks_for_links, note_index) # :contentReference[oaicite:13]{index=13}
chunks_for_links = [{"chunk_id": (pl.get("chunk_id") or pl.get("id") or f"{fm['id']}#{i+1}"),
"text": chunks[i].text} for i, pl in enumerate(chunk_pls) if i < len(chunks)]
edges = derive_wikilink_edges(note_pl_for_edges, chunks_for_links, note_index)
# Bestehende Note laden (für Hash-Vergleich)
# Hash-Vergleich
existing = fetch_existing_note_payload(client, notes_col, fm["id"])
changed = False
if existing and isinstance(existing, dict):
old_h = existing.get("hash_fulltext")
changed = (old_h != h)
else:
changed = True # neu
changed = True if not existing else (existing.get("hash_fulltext") != h)
# Dry-Run-Summary
print(json.dumps({
"note_id": fm["id"],
"title": fm["title"],
"chunks": len(chunk_pls),
"edges": len(edges),
"changed": changed or args.force_replace,
"path": note_pl["path"],
}, ensure_ascii=False))
# Dry-Run-Output
print(json.dumps({"note_id": fm["id"], "title": fm["title"], "chunks": len(chunk_pls),
"edges": len(edges), "changed": changed or args.force_replace,
"path": note_pl["path"]}, ensure_ascii=False))
if not args.apply:
continue
# Replace-on-Change: vorherige Artefakte der Note löschen
# Replace-on-Change
if changed or args.force_replace:
purge_note(client, cfg, fm["id"])
purge_note(client, cfg.prefix, fm["id"])
# Upserts
notes_col_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
@ -281,6 +226,5 @@ def main():
print(f"Done. Processed notes: {total_notes}")
if __name__ == "__main__":
main()