scripts/import_markdown.py aktualisiert
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s

This commit is contained in:
Lars 2025-09-08 18:36:59 +02:00
parent aa7d0190c8
commit 0a116ea3f4

View File

@ -1,319 +1,294 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Script: scripts/import_markdown.py import_markdown.py Markdown Qdrant (Notes, Chunks, Edges) mit Change-Detection & optionalem Purge
Version: 0.7.1 (2025-09-06)
Autor: mindnet / Architektur Datenimporte & Sync
Kurzbeschreibung Version: 3.0 (2025-09-08)
---------------
Importiert Markdown-Notizen aus einem Obsidian-ähnlichen Vault in Qdrant:
- Validiert Frontmatter / Note-Payload (gegen note.schema.json).
- Chunking + Embeddings.
- Leitet Edges direkt beim Import aus [[Wikilinks]] ab:
- 'references' (NoteNote)
- 'references_at' (ChunkNote)
- 'backlink' (NoteNote) (symmetrisch zu 'references')
- Idempotente Upserts (deterministische IDs über qdrant_points).
Neu in 0.7.1 Änderungen ggü. bisherigen Varianten:
------------ - Stabiler Qdrant-Delete über Filter-API (kompatibel mit aktuellen Clients).
- Korrekte Änderungsdetektion via SHA-256 über die **komplette Datei** (Frontmatter+Body): - Option --purge-before-upsert: löscht NUR Chunks/Edges der geänderten Note(n) vor Upsert.
- Feld: payload.hash_fulltext - Change-Detection ausschließlich über Note-Body-Hash (Frontmatter/Dateidatum ignoriert).
- Vergleicht neuen Hash gegen bestehenden Hash in Qdrant. - Robuste Edge-Erzeugung:
- Nur bei Änderung Verarbeitung/Upsert; sonst "skip". - Chunk-Level: belongs_to, prev/next, references (+backlink)
- `--purge-before-upsert` wird **nur** ausgeführt, wenn sich die Note **wirklich geändert** hat. - Fallback: Note-Level references, falls Chunk-Payloads keine references liefern
- Robuste Qdrant-Scroll-Kompatibilität (2- oder 3-Tupel Rückgaben). - Korrekte Qdrant-Scroll/Query-Nutzung (Rückgabesignaturen neuer Clients).
- Saubere Dry-Run/Apply-Ausgaben je Note als JSON.
Aufrufbeispiele Aufruf:
--------------- python3 -m scripts.import_markdown --vault ./vault
Dry-Run (nur prüfen, nichts schreiben): python3 -m scripts.import_markdown --vault ./vault --apply
python3 -m scripts.import_markdown --vault ./vault python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-architektur-ki-trainerassistent-761cfe --apply
Nur eine spezifische Note: Parameter:
python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo --vault PATH Pflicht. Root des (Obsidian-)Vaults.
--apply Wirkt schreibend in Qdrant; ohne Flag nur Dry-Run.
--purge-before-upsert Vor dem Upsert (nur) für geänderte Notes: zugehörige Chunks & Edges löschen.
--note-id ID Optional: Nur eine Note mit genau dieser ID verarbeiten.
--embed-note Optional: zusätzlich Volltext-Vektor für die Note erzeugen.
--force-replace Ignoriert Change-Detection (erzwingt Upsert + optionalen Purge).
Apply (schreiben) mit Purge (nur geänderte Noten werden bereinigt + neu geschrieben): Umgebungsvariablen (optional):
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert QDRANT_URL z.B. http://127.0.0.1:6333 (falls QDRANT_HOST/PORT nicht gesetzt)
QDRANT_API_KEY API Key (falls erforderlich)
QDRANT_HOST/QDRANT_PORT Alternative zu QDRANT_URL
COLLECTION_PREFIX Default "mindnet"
VECTOR_DIM Default 384
Parameter Voraussetzungen:
--------- - app/core:
--vault PATH : Pflicht. Root-Verzeichnis des Vaults. parser.read_markdown, parser.normalize_frontmatter, parser.validate_required_frontmatter
--apply : Wenn gesetzt, werden Upserts durchgeführt (sonst Dry-Run). note_payload.make_note_payload (liefert u.a. hash_fulltext)
--purge-before-upsert : Vor Upsert alte Chunks/Edges der **geänderten** Note löschen. validate_note.validate_note_payload
--note-id ID : Optional, verarbeitet nur diese eine Note. chunker.assemble_chunks
chunk_payload.make_chunk_payloads (idealerweise inkl. 'references' je Chunk; wenn nicht Fallback)
Umgebungsvariablen (.env) embed.embed_texts, embed.embed_one
------------------------- qdrant.QdrantConfig, qdrant.get_client, qdrant.ensure_collections
QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM qdrant_points.points_for_note / points_for_chunks / points_for_edges / upsert_batch
Defaults: url=http://127.0.0.1:6333, prefix=mindnet, dim=384 edges.deriv_edges_for_note (nutzt Chunk-Payloads; wir ergänzen Fallback)
Kompatibilität
--------------
- Bestehende Kernmodule werden weiterverwendet:
app.core.parser (read_markdown, normalize_frontmatter, validate_required_frontmatter)
app.core.validate_note (validate_note_payload)
app.core.chunker (assemble_chunks)
app.core.chunk_payload (make_chunk_payloads)
app.core.embed (embed_texts)
app.core.qdrant (QdrantConfig, ensure_collections, get_client, collection_names)
app.core.qdrant_points (points_for_note, points_for_chunks, points_for_edges, upsert_batch)
app.core.derive_edges (build_note_index, derive_wikilink_edges)
Hinweise
--------
- Bitte im aktivierten venv laufen lassen: source .venv/bin/activate
""" """
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import glob
import hashlib
import json
import os import os
import sys import sys
import json
import glob
from typing import List, Dict, Tuple, Optional from typing import List, Dict, Tuple, Optional
from dotenv import load_dotenv from dotenv import load_dotenv
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
from qdrant_client import QdrantClient
# Kern-Bausteine (aus eurem Projekt) from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter
from app.core.parser import ( from app.core.note_payload import make_note_payload
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
)
from app.core.validate_note import validate_note_payload from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads from app.core.chunk_payload import make_chunk_payloads
from app.core.embed import embed_texts from app.core.embed import embed_texts, embed_one
from app.core.qdrant import QdrantConfig, ensure_collections, get_client, collection_names from app.core.qdrant import QdrantConfig, get_client, ensure_collections
from app.core.qdrant_points import ( from app.core.qdrant_points import (
points_for_note,
points_for_chunks, points_for_chunks,
points_for_note,
points_for_edges, points_for_edges,
upsert_batch, upsert_batch,
) )
from app.core.derive_edges import build_note_index, derive_wikilink_edges from app.core.edges import deriv_edges_for_note
# ------------------------------------------------- # -----------------------------
# Hilfsfunktionen # Utils
# ------------------------------------------------- # -----------------------------
def iter_md(root: str) -> List[str]: def iter_md(root: str) -> List[str]:
patterns = ["**/*.md", "*.md"] files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
out: List[str] = [] out = []
for p in patterns: for p in files:
out.extend(glob.glob(os.path.join(root, p), recursive=True)) pn = p.replace("\\", "/")
# de-dupe + sort if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]):
return sorted(list(dict.fromkeys(out))) continue
out.append(p)
return sorted(out)
def file_sha256(path: str) -> str: def collections(prefix: str) -> Tuple[str, str, str]:
"""SHA256 über die **Rohdatei** (Frontmatter + Body).""" return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def qdrant_scroll(client: QdrantClient, **kwargs) -> Tuple[List, Optional[str]]: def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]:
""" notes_col, _, _ = collections(prefix)
Wrapper, der sowohl (points, next_offset) als auch (points, next_page, _) Signaturen abdeckt. f = rest.Filter(
""" must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]
res = client.scroll(**kwargs) )
if isinstance(res, tuple): # neuere Clients: scroll gibt (points, next_page_offset) zurück
if len(res) == 2: points, _ = client.scroll(
return res[0], res[1]
if len(res) >= 3:
return res[0], res[1]
return res, None
def fetch_existing_note_hash(client: QdrantClient, prefix: str, note_id: str) -> Optional[str]:
"""Liest hash_fulltext aus mindnet_notes.payload für eine Note."""
notes_col, _, _ = collection_names(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = qdrant_scroll(
client,
collection_name=notes_col, collection_name=notes_col,
scroll_filter=f, scroll_filter=f,
with_payload=True, with_payload=True,
with_vectors=False, with_vectors=False,
limit=1 limit=1,
) )
if not pts: if not points:
return None return None
pl = getattr(pts[0], "payload", {}) or {} pl = points[0].payload or {}
return pl.get("hash_fulltext") return pl.get("hash_fulltext")
def make_note_stub(abs_path: str, vault_root: str) -> Dict: def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
"""Minimaler Stub (id/title/path) für build_note_index."""
parsed = read_markdown(abs_path)
fm = normalize_frontmatter(parsed.frontmatter or {})
if "id" not in fm or not fm["id"]:
raise ValueError(f"Missing id in frontmatter: {abs_path}")
rel = os.path.relpath(abs_path, vault_root)
return {"note_id": fm["id"], "title": fm.get("title"), "path": rel}
def build_vault_index(vault_root: str) -> tuple[Dict, Dict, Dict]:
"""Index für robuste Wikilink-Auflösung über alle Noten im Vault."""
stubs = []
for p in iter_md(vault_root):
try:
stubs.append(make_note_stub(p, vault_root))
except Exception:
continue
return build_note_index(stubs)
def purge_for_note(client: QdrantClient, prefix: str, note_id: str, chunk_ids: List[str]) -> None:
""" """
Selektives Purge der alten Daten für **diese** Note: Löscht NUR Chunks & Edges der angegebenen Note (vor Upsert).
- löscht Chunks (payload.note_id == note_id) - Chunks: payload.note_id == note_id
- löscht Edges, deren source_id == note_id ODER in chunk_ids - Edges : payload.source_id startswith note_id#c ODER payload.target_id startswith note_id#c
Notes selbst werden nicht gelöscht (Upsert reicht). plus note-scope edges, falls vorhanden (source_id == note_id oder target_id == note_id)
""" """
notes_col, chunks_col, edges_col = collection_names(prefix) _, chunks_col, edges_col = collections(prefix)
# Chunks löschen # Chunks der Note löschen
f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) f_chunks = rest.Filter(
must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]
)
client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True) client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True)
# Edges löschen (OR über source_id Werte) # Edges der Note löschen (Chunk- und Note-Scope)
should_conds = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id))] should = [
for cid in chunk_ids: rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")),
should_conds.append(rest.FieldCondition(key="source_id", match=rest.MatchValue(value=cid))) rest.FieldCondition(key="target_id", match=rest.MatchText(text=f"{note_id}#")),
if should_conds: rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
f_edges = rest.Filter(should=should_conds) rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
client.delete(collection_name=edges_col, points_selector=f_edges, wait=True) ]
f_edges = rest.Filter(should=should)
client.delete(collection_name=edges_col, points_selector=f_edges, wait=True)
# ------------------------------------------------- def has_chunk_level_refs(chunk_payloads: List[Dict]) -> bool:
return any(isinstance(ch.get("references"), list) and ch["references"] for ch in chunk_payloads)
def fallback_note_level_edges(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]:
"""
Falls chunk_payloads keine 'references' enthalten, erzeugen wir
- belongs_to + prev/next aus chunk_payloads
- references/backlink auf Note-Scope aus note_meta['references'] (falls vorhanden)
"""
edges: List[Dict] = []
# belongs_to + prev/next (Chunk-Scope)
for ch in chunk_payloads:
src = ch["id"]
edges.append({"src_id": src, "dst_id": note_meta["id"], "edge_type": "belongs_to", "scope": "chunk"})
nb = ch.get("neighbors") or {}
prev_id = nb.get("prev")
next_id = nb.get("next")
if prev_id:
edges.append({"src_id": src, "dst_id": prev_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": prev_id, "dst_id": src, "edge_type": "prev", "scope": "chunk"})
if next_id:
edges.append({"src_id": src, "dst_id": next_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": next_id, "dst_id": src, "edge_type": "prev", "scope": "chunk"})
# Note-Scope references/backlink
for tid in (note_meta.get("references") or []):
edges.append({"src_id": note_meta["id"], "dst_id": tid, "edge_type": "references", "scope": "note"})
edges.append({"src_id": tid, "dst_id": note_meta["id"], "edge_type": "backlink", "scope": "note"})
# Dedupe
uniq = {}
for e in edges:
k = (e["src_id"], e["edge_type"], e["dst_id"], e.get("scope", "note"))
uniq[k] = e
return list(uniq.values())
# -----------------------------
# Main # Main
# ------------------------------------------------- # -----------------------------
def main(): def main():
load_dotenv()
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("--vault", required=True, help="Pfad zum Vault-Root") ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (Root-Ordner)")
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)") ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant; ohne Flag nur Dry-Run")
ap.add_argument( ap.add_argument("--purge-before-upsert", action="store_true",
"--purge-before-upsert", help="Vor Upsert Chunks & Edges der GEÄNDERTEN Note löschen")
action="store_true", ap.add_argument("--note-id", help="Nur eine bestimmte Note-ID verarbeiten")
help="Vor Upsert alte Chunks/Edges **nur für geänderte** Noten löschen.", ap.add_argument("--embed-note", action="store_true", help="Optional: Note-Volltext einbetten")
) ap.add_argument("--force-replace", action="store_true",
ap.add_argument("--note-id", help="Optional: nur diese Note verarbeiten") help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)")
args = ap.parse_args() args = ap.parse_args()
load_dotenv() # Qdrant-Config
cfg = QdrantConfig( cfg = QdrantConfig.from_env()
url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"),
api_key=os.getenv("QDRANT_API_KEY", None),
prefix=os.getenv("COLLECTION_PREFIX", "mindnet"),
dim=int(os.getenv("VECTOR_DIM", "384")),
)
client = get_client(cfg) client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim) ensure_collections(client, cfg.prefix, cfg.dim)
notes_col, chunks_col, edges_col = collections(cfg.prefix)
vault_root = os.path.abspath(args.vault) root = os.path.abspath(args.vault)
files = iter_md(vault_root) files = iter_md(root)
if not files: if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr) print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2) sys.exit(2)
# Index einmal bauen (für Linkauflösung bei geänderten Noten)
note_index = build_vault_index(vault_root)
processed = 0 processed = 0
for abs_path in files: for path in files:
parsed = read_markdown(abs_path) parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter or {}) fm = normalize_frontmatter(parsed.frontmatter)
# Pflichtfelder?
try: try:
validate_required_frontmatter(fm) validate_required_frontmatter(fm)
except Exception: except Exception:
# unvollständige Note überspringen # Überspringen, aber nicht abbrechen
continue continue
if args.note_id and fm.get("id") != args.note_id: if args.note_id and fm.get("id") != args.note_id:
continue continue
processed += 1 processed += 1
# Änderungsdetektion (Datei-Hash vs. Qdrant) # Note-Payload & Validierung
new_hash = file_sha256(abs_path) note_pl = make_note_payload(parsed, vault_root=root)
old_hash = fetch_existing_note_hash(client, cfg.prefix, fm["id"])
changed = (old_hash != new_hash)
decision = "skip"
if changed:
decision = "apply" if args.apply else "dry-run"
# Bei "skip" kein teures Chunking/Embedding/Edges nötig
if not changed:
print(json.dumps({
"note_id": fm["id"],
"title": fm.get("title"),
"changed": False,
"decision": "skip",
"path": os.path.relpath(abs_path, vault_root),
}, ensure_ascii=False))
continue
# --- Ab hier: Nur für geänderte Noten ---
# Note-Payload erzeugen
from app.core.note_payload import make_note_payload # lazy import
note_pl = make_note_payload(parsed, vault_root=vault_root)
# Hash im Payload mitschreiben (Schema erlaubt hash_fulltext)
note_pl["hash_fulltext"] = new_hash
validate_note_payload(note_pl) validate_note_payload(note_pl)
note_id = note_pl["note_id"]
# Chunking & Payloads # Change-Detection (Body-Hash)
new_hash = note_pl.get("hash_fulltext")
old_hash = None
if not args.force_replace:
old_hash = fetch_existing_note_hash(client, cfg.prefix, note_id)
changed = args.force_replace or (old_hash != new_hash)
# Chunks + Embeddings
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
vecs = embed_texts([c.text for c in chunks])
# Embeddings # Optional: Note-Vektor
texts = [c.get("text") or c.get("content") or "" for c in chunk_pls] note_vec = embed_one(parsed.body) if args.embed_note else None
vectors = embed_texts(texts)
# Edges ableiten (Note-/Chunk-Level) # Edges
edges = derive_wikilink_edges(note_pl, chunk_pls, note_index) # 1) Versuch: volle Kanten aus Chunk-Payloads
edges = deriv_edges_for_note(fm, chunk_pls)
# 2) Fallback, falls keine Chunk-References vorhanden sind
if not edges or (not has_chunk_level_refs(chunk_pls) and (fm.get("references") or note_pl.get("references"))):
# Ergänze fm['references'] falls noch nicht im fm vorhanden
if "references" not in fm and "references" in note_pl:
fm["references"] = note_pl["references"]
edges = fallback_note_level_edges(fm, chunk_pls)
# Purge (nur wenn apply + Option gesetzt) # Zusammenfassung für Log
if args.apply and args.purge_before_upsert: summary = {
# Chunk-IDs bestimmen (für Edge-Purge by source_id) "note_id": note_id,
chunk_ids = []
for i, ch in enumerate(chunk_pls, start=1):
cid = ch.get("chunk_id") or ch.get("id") or f"{fm['id']}#{i}"
ch["chunk_id"] = cid # sicherstellen
chunk_ids.append(cid)
purge_for_note(client, cfg.prefix, fm["id"], chunk_ids)
# Upserts (nur Apply)
if args.apply:
notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec=None, dim=cfg.dim)
upsert_batch(client, notes_col, note_pts)
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
upsert_batch(client, chunks_col, chunk_pts)
edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_col, edge_pts)
# Logging geänderte Note
print(json.dumps({
"note_id": fm["id"],
"title": fm.get("title"), "title": fm.get("title"),
"chunks": len(chunk_pls), "chunks": len(chunk_pls),
"edges": len(edges), "edges": len(edges),
"changed": True, "changed": changed,
"decision": decision, "decision": ("apply" if args.apply and changed else
"apply-skip-unchanged" if args.apply and not changed else
"dry-run"),
"path": note_pl["path"], "path": note_pl["path"],
}, ensure_ascii=False)) }
print(json.dumps(summary, ensure_ascii=False))
# Apply?
if not args.apply:
continue
# Optionaler Purge NUR für geänderte Notes
if changed and args.purge_before_upsert:
purge_note_artifacts(client, cfg.prefix, note_id)
# Upserts
# Notes
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
upsert_batch(client, notes_name, note_pts)
# Chunks
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs)
upsert_batch(client, chunks_name, chunk_pts)
# Edges
edges_name, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_name, edge_pts)
print(f"Done. Processed notes: {processed}") print(f"Done. Processed notes: {processed}")