scripts/import_markdown.py aktualisiert
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s

This commit is contained in:
Lars 2025-09-06 15:42:10 +02:00
parent 47e6d56b21
commit 80a7db0e54

View File

@ -2,81 +2,83 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Script: scripts/import_markdown.py Script: scripts/import_markdown.py
Version: 0.6.0 (2025-09-06) Version: 0.7.1 (2025-09-06)
Autor: mindnet / Architektur Datenimporte & Sync Autor: mindnet / Architektur Datenimporte & Sync
Kurzbeschreibung Kurzbeschreibung
--------------- ---------------
Importiert Markdown-Notizen aus einem Obsidian-ähnlichen Vault in Qdrant: Importiert Markdown-Notizen aus einem Obsidian-ähnlichen Vault in Qdrant:
- Validiert Frontmatter / Note-Payload. - Validiert Frontmatter / Note-Payload (gegen note.schema.json).
- Chunking + Embeddings. - Chunking + Embeddings.
- Leitet Edges direkt beim Import aus [[Wikilinks]] ab: - Leitet Edges direkt beim Import aus [[Wikilinks]] ab:
- 'references' (NoteNote) - 'references' (NoteNote)
- 'references_at' (ChunkNote) - 'references_at' (ChunkNote)
- 'backlink' (NoteNote) nur für NoteNote-Kanten. - 'backlink' (NoteNote) (symmetrisch zu 'references')
- Idempotente Upserts (deterministische IDs über qdrant_points).
Neu in 0.6.0 Neu in 0.7.1
------------ ------------
- Option `--purge-before-upsert`: löscht für die jeweils verarbeitete Note - Korrekte Änderungsdetektion via SHA-256 über die **komplette Datei** (Frontmatter+Body):
*vor* dem Upsert alle zugehörigen Chunks und Edges in Qdrant (selektiv!), - Feld: payload.hash_fulltext
um Leichen nach Re-Chunking zu vermeiden. - Vergleicht neuen Hash gegen bestehenden Hash in Qdrant.
- Robuste Link-Auflösung via Note-Index (ID / Titel-Slug / Datei-Slug) - Nur bei Änderung Verarbeitung/Upsert; sonst "skip".
konsistent zu `derive_edges.py`. - `--purge-before-upsert` wird **nur** ausgeführt, wenn sich die Note **wirklich geändert** hat.
- Robuste Qdrant-Scroll-Kompatibilität (2- oder 3-Tupel Rückgaben).
Aufrufbeispiele Aufrufbeispiele
--------------- ---------------
Dry-Run (keine Schreibzugriffe): Dry-Run (nur prüfen, nichts schreiben):
python3 -m scripts.import_markdown --vault ./vault python3 -m scripts.import_markdown --vault ./vault
Nur eine bestimmte Note: Nur eine spezifische Note:
python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo
Apply (schreiben) mit Purge: Apply (schreiben) mit Purge (nur geänderte Noten werden bereinigt + neu geschrieben):
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
Parameter Parameter
--------- ---------
--vault PATH : Pflicht. Root-Verzeichnis des Vaults. --vault PATH : Pflicht. Root-Verzeichnis des Vaults.
--apply : Wenn gesetzt, werden Upserts durchgeführt (sonst Dry-Run). --apply : Wenn gesetzt, werden Upserts durchgeführt (sonst Dry-Run).
--purge-before-upsert : Wenn gesetzt, werden vor dem Upsert (nur bei --apply) --purge-before-upsert : Vor Upsert alte Chunks/Edges der **geänderten** Note löschen.
alte Chunks und Edges dieser Note in Qdrant gelöscht.
--note-id ID : Optional, verarbeitet nur diese eine Note. --note-id ID : Optional, verarbeitet nur diese eine Note.
Umgebungsvariablen (.env) Umgebungsvariablen (.env)
------------------------- -------------------------
QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM QDRANT_URL, QDRANT_API_KEY, COLLECTION_PREFIX, VECTOR_DIM
Standard: url=http://127.0.0.1:6333, prefix=mindnet, dim=384 Defaults: url=http://127.0.0.1:6333, prefix=mindnet, dim=384
Kompatibilität Kompatibilität
-------------- --------------
- Nutzt die bestehenden Kernmodule: - Bestehende Kernmodule werden weiterverwendet:
app.core.parser (read_markdown, normalize_frontmatter, validate_required_frontmatter) app.core.parser (read_markdown, normalize_frontmatter, validate_required_frontmatter)
app.core.validate_note (validate_note_payload) app.core.validate_note (validate_note_payload)
app.core.chunker (assemble_chunks) app.core.chunker (assemble_chunks)
app.core.chunk_payload (make_chunk_payloads) app.core.chunk_payload (make_chunk_payloads)
app.core.embed (embed_texts) app.core.embed (embed_texts)
app.core.qdrant (QdrantConfig, get_client, ensure_collections) app.core.qdrant (QdrantConfig, ensure_collections, get_client, collection_names)
app.core.qdrant_points (points_for_note, points_for_chunks, points_for_edges, upsert_batch) app.core.qdrant_points (points_for_note, points_for_chunks, points_for_edges, upsert_batch)
app.core.derive_edges (build_note_index, derive_wikilink_edges) app.core.derive_edges (build_note_index, derive_wikilink_edges)
Änderungshinweise vs. früherer Importer Hinweise
--------------------------------------- --------
- Alte, globale Lösch-Workarounds entfallen. Selektives Purge ist jetzt optional und sicher. - Bitte im aktivierten venv laufen lassen: source .venv/bin/activate
- Edges werden nur noch in der neuen, einheitlichen Struktur erzeugt.
""" """
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import glob import glob
import hashlib
import json import json
import os import os
import sys import sys
from typing import List, Dict from typing import List, Dict, Tuple, Optional
from dotenv import load_dotenv from dotenv import load_dotenv
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
from qdrant_client import QdrantClient
# Kern-Bausteine (vorhanden in eurem Projekt) # Kern-Bausteine (aus eurem Projekt)
from app.core.parser import ( from app.core.parser import (
read_markdown, read_markdown,
normalize_frontmatter, normalize_frontmatter,
@ -105,17 +107,54 @@ def iter_md(root: str) -> List[str]:
out: List[str] = [] out: List[str] = []
for p in patterns: for p in patterns:
out.extend(glob.glob(os.path.join(root, p), recursive=True)) out.extend(glob.glob(os.path.join(root, p), recursive=True))
return sorted(list(dict.fromkeys(out))) # de-dupe + sort # de-dupe + sort
return sorted(list(dict.fromkeys(out)))
def file_sha256(path: str) -> str:
"""SHA256 über die **Rohdatei** (Frontmatter + Body)."""
h = hashlib.sha256()
with open(path, "rb") as f:
for chunk in iter(lambda: f.read(8192), b""):
h.update(chunk)
return h.hexdigest()
def qdrant_scroll(client: QdrantClient, **kwargs) -> Tuple[List, Optional[str]]:
"""
Wrapper, der sowohl (points, next_offset) als auch (points, next_page, _) Signaturen abdeckt.
"""
res = client.scroll(**kwargs)
if isinstance(res, tuple):
if len(res) == 2:
return res[0], res[1]
if len(res) >= 3:
return res[0], res[1]
return res, None
def fetch_existing_note_hash(client: QdrantClient, prefix: str, note_id: str) -> Optional[str]:
"""Liest hash_fulltext aus mindnet_notes.payload für eine Note."""
notes_col, _, _ = collection_names(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = qdrant_scroll(
client,
collection_name=notes_col,
scroll_filter=f,
with_payload=True,
with_vectors=False,
limit=1
)
if not pts:
return None
pl = getattr(pts[0], "payload", {}) or {}
return pl.get("hash_fulltext")
def make_note_stub(abs_path: str, vault_root: str) -> Dict: def make_note_stub(abs_path: str, vault_root: str) -> Dict:
""" """Minimaler Stub (id/title/path) für build_note_index."""
Erstellt einen minimalen Note-Stub für den Index (build_note_index):
{ note_id, title, path }
"""
parsed = read_markdown(abs_path) parsed = read_markdown(abs_path)
fm = normalize_frontmatter(parsed.frontmatter or {}) fm = normalize_frontmatter(parsed.frontmatter or {})
# Validierung minimal: wir brauchen id + title (title optional für Slug-Auflösung)
if "id" not in fm or not fm["id"]: if "id" not in fm or not fm["id"]:
raise ValueError(f"Missing id in frontmatter: {abs_path}") raise ValueError(f"Missing id in frontmatter: {abs_path}")
rel = os.path.relpath(abs_path, vault_root) rel = os.path.relpath(abs_path, vault_root)
@ -123,48 +162,37 @@ def make_note_stub(abs_path: str, vault_root: str) -> Dict:
def build_vault_index(vault_root: str) -> tuple[Dict, Dict, Dict]: def build_vault_index(vault_root: str) -> tuple[Dict, Dict, Dict]:
""" """Index für robuste Wikilink-Auflösung über alle Noten im Vault."""
Liest alle Noten ein und baut den Dreifach-Index für Wikilink-Auflösung.
"""
files = iter_md(vault_root)
stubs = [] stubs = []
for p in files: for p in iter_md(vault_root):
try: try:
stubs.append(make_note_stub(p, vault_root)) stubs.append(make_note_stub(p, vault_root))
except Exception: except Exception:
# Notiz ohne id → wird vom Importer später ohnehin übersprungen
continue continue
return build_note_index(stubs) return build_note_index(stubs)
def purge_for_note(client, prefix: str, note_id: str, chunk_ids: List[str]) -> Dict[str, int]: def purge_for_note(client: QdrantClient, prefix: str, note_id: str, chunk_ids: List[str]) -> None:
""" """
Selektives Purge für die aktuelle Note: Selektives Purge der alten Daten für **diese** Note:
- Chunks: alle mit payload.note_id == note_id - löscht Chunks (payload.note_id == note_id)
- Edges: alle mit payload.source_id == note_id ODER == einem der chunk_ids - löscht Edges, deren source_id == note_id ODER in chunk_ids
- Notes: werden nicht gelöscht (Upsert überschreibt Payload/Vektor) Notes selbst werden nicht gelöscht (Upsert reicht).
""" """
notes_col, chunks_col, edges_col = collection_names(prefix) notes_col, chunks_col, edges_col = collection_names(prefix)
counts = {"chunks_deleted": 0, "edges_deleted": 0}
# Chunks löschen (Filter must: note_id == X) # Chunks löschen
f_chunks = rest.Filter( f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))] client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True)
)
res_chunks = client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True)
counts["chunks_deleted"] = getattr(res_chunks, "status", None) and 0 or 0 # Qdrant liefert keine count hier
# Edges löschen: OR über Note-ID und alle Chunk-IDs # Edges löschen (OR über source_id Werte)
should_conds = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id))] should_conds = [rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id))]
for cid in chunk_ids: for cid in chunk_ids:
should_conds.append(rest.FieldCondition(key="source_id", match=rest.MatchValue(value=cid))) should_conds.append(rest.FieldCondition(key="source_id", match=rest.MatchValue(value=cid)))
if should_conds:
f_edges = rest.Filter(should=should_conds) if should_conds else None f_edges = rest.Filter(should=should_conds)
if f_edges is not None:
client.delete(collection_name=edges_col, points_selector=f_edges, wait=True) client.delete(collection_name=edges_col, points_selector=f_edges, wait=True)
return counts
# ------------------------------------------------- # -------------------------------------------------
# Main # Main
@ -177,7 +205,7 @@ def main():
ap.add_argument( ap.add_argument(
"--purge-before-upsert", "--purge-before-upsert",
action="store_true", action="store_true",
help="Vor Upsert alte Chunks/Edges der aktuellen Note löschen (nur mit --apply wirksam).", help="Vor Upsert alte Chunks/Edges **nur für geänderte** Noten löschen.",
) )
ap.add_argument("--note-id", help="Optional: nur diese Note verarbeiten") ap.add_argument("--note-id", help="Optional: nur diese Note verarbeiten")
args = ap.parse_args() args = ap.parse_args()
@ -198,7 +226,7 @@ def main():
print("Keine Markdown-Dateien gefunden.", file=sys.stderr) print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2) sys.exit(2)
# 1) Note-Index über den gesamten Vault (für robuste Link-Auflösung) # Index einmal bauen (für Linkauflösung bei geänderten Noten)
note_index = build_vault_index(vault_root) note_index = build_vault_index(vault_root)
processed = 0 processed = 0
@ -215,57 +243,74 @@ def main():
processed += 1 processed += 1
# --- Note-Payload --- # Änderungsdetektion (Datei-Hash vs. Qdrant)
from app.core.note_payload import make_note_payload # lazy import (bestehende Funktion) new_hash = file_sha256(abs_path)
old_hash = fetch_existing_note_hash(client, cfg.prefix, fm["id"])
changed = (old_hash != new_hash)
decision = "skip"
if changed:
decision = "apply" if args.apply else "dry-run"
# Bei "skip" kein teures Chunking/Embedding/Edges nötig
if not changed:
print(json.dumps({
"note_id": fm["id"],
"title": fm.get("title"),
"changed": False,
"decision": "skip",
"path": os.path.relpath(abs_path, vault_root),
}, ensure_ascii=False))
continue
# --- Ab hier: Nur für geänderte Noten ---
# Note-Payload erzeugen
from app.core.note_payload import make_note_payload # lazy import
note_pl = make_note_payload(parsed, vault_root=vault_root) note_pl = make_note_payload(parsed, vault_root=vault_root)
# Hash im Payload mitschreiben (Schema erlaubt hash_fulltext)
note_pl["hash_fulltext"] = new_hash
validate_note_payload(note_pl) validate_note_payload(note_pl)
# --- Chunking & Payloads --- # Chunking & Payloads
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
# --- Embeddings --- # Embeddings
texts = [c.get("text") or c.get("content") or "" for c in chunk_pls] texts = [c.get("text") or c.get("content") or "" for c in chunk_pls]
vectors = embed_texts(texts) vectors = embed_texts(texts)
# --- Edge-Ableitung (direkt) --- # Edges ableiten (Note-/Chunk-Level)
edges = derive_wikilink_edges(note_pl, chunk_pls, note_index) edges = derive_wikilink_edges(note_pl, chunk_pls, note_index)
# --- Ausgabe je Note (Entscheidung) --- # Purge (nur wenn apply + Option gesetzt)
decision = "apply" if args.apply else "dry-run"
# --- Purge vor Upsert (nur wenn --apply) ---
if args.apply and args.purge_before_upsert: if args.apply and args.purge_before_upsert:
# Chunk-IDs (neu) ermitteln → für Edge-Purge by source_id # Chunk-IDs bestimmen (für Edge-Purge by source_id)
chunk_ids = [] chunk_ids = []
for i, ch in enumerate(chunk_pls, start=1): for i, ch in enumerate(chunk_pls, start=1):
cid = ch.get("chunk_id") or ch.get("id") or f"{fm['id']}#{i}" cid = ch.get("chunk_id") or ch.get("id") or f"{fm['id']}#{i}"
ch["chunk_id"] = cid # sicherstellen ch["chunk_id"] = cid # sicherstellen
chunk_ids.append(cid) chunk_ids.append(cid)
purge_for_note(client, cfg.prefix, fm["id"], chunk_ids) purge_for_note(client, cfg.prefix, fm["id"], chunk_ids)
# --- Upserts (nur bei --apply) --- # Upserts (nur Apply)
if args.apply: if args.apply:
# Note
notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec=None, dim=cfg.dim) notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec=None, dim=cfg.dim)
upsert_batch(client, notes_col, note_pts) upsert_batch(client, notes_col, note_pts)
# Chunks
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors) chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
upsert_batch(client, chunks_col, chunk_pts) upsert_batch(client, chunks_col, chunk_pts)
# Edges
edges_col, edge_pts = points_for_edges(cfg.prefix, edges) edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_col, edge_pts) upsert_batch(client, edges_col, edge_pts)
# Logging pro Note # Logging geänderte Note
print(json.dumps({ print(json.dumps({
"note_id": fm["id"], "note_id": fm["id"],
"title": fm.get("title"), "title": fm.get("title"),
"chunks": len(chunk_pls), "chunks": len(chunk_pls),
"edges": len(edges), "edges": len(edges),
"changed": True, # Hash-/Zeitvergleich kann optional hier ergänzt werden "changed": True,
"decision": decision, "decision": decision,
"path": note_pl["path"], "path": note_pl["path"],
}, ensure_ascii=False)) }, ensure_ascii=False))