scripts/import_markdown.py aktualisiert
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 1s

This commit is contained in:
Lars 2025-09-09 11:12:33 +02:00
parent 6c25d76135
commit 897d0c9e6d

View File

@ -1,65 +1,82 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
import_markdown.py Markdown Qdrant (Notes, Chunks, Edges) mit Change-Detection & optionalem Purge Script: import_markdown.py Markdown Qdrant (Notes, Chunks, Edges)
Version: 3.1.0
Datum: 2025-09-09
Version: 3.0 (2025-09-08) Kurzbeschreibung
----------------
Liest Markdown-Dateien aus einem Vault ein und schreibt Notes, Chunks und Edges
idempotent nach Qdrant. Die Change-Detection basiert standardmäßig ausschließlich
auf dem **Body-Hash** (ENV: MINDNET_HASH_MODE), sodass reine Frontmatter-Änderungen
nicht unnötig Re-Imports auslösen (abwärtskompatibel).
Änderungen ggü. bisherigen Varianten: Wichtige Kompatibilitäts-Erweiterung
- Stabiler Qdrant-Delete über Filter-API (kompatibel mit aktuellen Clients). ------------------------------------
- Option --purge-before-upsert: löscht NUR Chunks/Edges der geänderten Note(n) vor Upsert. - Persistiert nun den **Volltext** der Note im Notes-Payload unter ``fulltext``.
- Change-Detection ausschließlich über Note-Body-Hash (Frontmatter/Dateidatum ignoriert). So kann der Export **verlustfrei** rekonstruiert werden, auch wenn die Chunk-
Payloads (noch) kein ``text``-Feld enthalten.
- Speichert den ``path`` **relativ** zum Vault (führt absolute Pfade auf relative
Pfade zurück), sodass Exporte zuverlässig unter ``--out`` landen.
Features
--------
- Deterministische IDs (UUIDv5) über qdrant_points.* (beibehalten)
- Idempotente Upserts für Notes/Chunks/Edges
- Optionale Vorreinigung pro geänderter Note: ``--purge-before-upsert``
- Robuste Edge-Erzeugung: - Robuste Edge-Erzeugung:
- Chunk-Level: belongs_to, prev/next, references (+backlink) * Chunk-Scope: belongs_to / prev / next / references (+ backlink)
- Fallback: Note-Level references, falls Chunk-Payloads keine references liefern * Fallback-Mode: Note-Scope references, falls chunk_payloads keine references liefern
- Korrekte Qdrant-Scroll/Query-Nutzung (Rückgabesignaturen neuer Clients). - Optionaler Note-Embedding-Vektor (``--embed-note``)
- Saubere Dry-Run/Apply-Ausgaben je Note als JSON.
Aufruf: Qdrant / ENV
------------
- QDRANT_URL (oder QDRANT_HOST/QDRANT_PORT)
- QDRANT_API_KEY (optional)
- COLLECTION_PREFIX (Default: mindnet)
- VECTOR_DIM (Default: 384)
- MINDNET_HASH_MODE: "body" (Default) | "frontmatter" | "body+frontmatter"
Aufruf
------
python3 -m scripts.import_markdown --vault ./vault python3 -m scripts.import_markdown --vault ./vault
python3 -m scripts.import_markdown --vault ./vault --apply python3 -m scripts.import_markdown --vault ./vault --apply
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-architektur-ki-trainerassistent-761cfe --apply python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo --apply
# optionaler Note-Vektor
python3 -m scripts.import_markdown --vault ./vault --apply --embed-note
Parameter: Beispiele
--vault PATH Pflicht. Root des (Obsidian-)Vaults. ---------
--apply Wirkt schreibend in Qdrant; ohne Flag nur Dry-Run. COLLECTION_PREFIX=mindnet QDRANT_URL=http://127.0.0.1:6333 \\
--purge-before-upsert Vor dem Upsert (nur) für geänderte Notes: zugehörige Chunks & Edges löschen. python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
--note-id ID Optional: Nur eine Note mit genau dieser ID verarbeiten.
--embed-note Optional: zusätzlich Volltext-Vektor für die Note erzeugen.
--force-replace Ignoriert Change-Detection (erzwingt Upsert + optionalen Purge).
Umgebungsvariablen (optional): Hinweise
QDRANT_URL z.B. http://127.0.0.1:6333 (falls QDRANT_HOST/PORT nicht gesetzt) --------
QDRANT_API_KEY API Key (falls erforderlich) - Dry-Run (ohne --apply) zeigt Entscheidungen je Note als JSON (keine Schreibvorgänge).
QDRANT_HOST/QDRANT_PORT Alternative zu QDRANT_URL - Backfill-Tools existieren weiterhin, werden aber nur als Reparatur eingesetzt.
COLLECTION_PREFIX Default "mindnet"
VECTOR_DIM Default 384
Voraussetzungen:
- app/core:
parser.read_markdown, parser.normalize_frontmatter, parser.validate_required_frontmatter
note_payload.make_note_payload (liefert u.a. hash_fulltext)
validate_note.validate_note_payload
chunker.assemble_chunks
chunk_payload.make_chunk_payloads (idealerweise inkl. 'references' je Chunk; wenn nicht Fallback)
embed.embed_texts, embed.embed_one
qdrant.QdrantConfig, qdrant.get_client, qdrant.ensure_collections
qdrant_points.points_for_note / points_for_chunks / points_for_edges / upsert_batch
edges.deriv_edges_for_note (nutzt Chunk-Payloads; wir ergänzen Fallback)
""" """
from __future__ import annotations from __future__ import annotations
import argparse import argparse
import os
import sys
import json import json
import glob import os
from typing import List, Dict, Tuple, Optional import re
import sys
from typing import Dict, Iterable, List, Optional, Tuple
from dotenv import load_dotenv from dotenv import load_dotenv
import yaml
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter # Core/Projekt-Module
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
)
from app.core.note_payload import make_note_payload from app.core.note_payload import make_note_payload
from app.core.validate_note import validate_note_payload from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks from app.core.chunker import assemble_chunks
@ -75,14 +92,18 @@ from app.core.qdrant_points import (
from app.core.edges import deriv_edges_for_note from app.core.edges import deriv_edges_for_note
# ----------------------------- # -----------------------------------------------------------------------------
# Utils # Hilfsfunktionen
# ----------------------------- # -----------------------------------------------------------------------------
def iter_md(root: str) -> List[str]: def iter_md(root: str) -> List[str]:
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)] """Sammelt alle .md-Dateien unterhalb von root, filtert systemische Ordner."""
out = [] out: List[str] = []
for p in files: for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if not fn.lower().endswith(".md"):
continue
p = os.path.join(dirpath, fn)
pn = p.replace("\\", "/") pn = p.replace("\\", "/")
if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]): if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]):
continue continue
@ -95,11 +116,12 @@ def collections(prefix: str) -> Tuple[str, str, str]:
def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]: def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]:
"""Liest den bisherigen hash_fulltext aus der Notes-Collection (falls vorhanden)."""
notes_col, _, _ = collections(prefix) notes_col, _, _ = collections(prefix)
f = rest.Filter( f = rest.Filter(must=[rest.FieldCondition(
must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))] key="note_id",
) match=rest.MatchValue(value=note_id),
# neuere Clients: scroll gibt (points, next_page_offset) zurück )])
points, _ = client.scroll( points, _ = client.scroll(
collection_name=notes_col, collection_name=notes_col,
scroll_filter=f, scroll_filter=f,
@ -115,10 +137,7 @@ def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]
def purge_note_artifacts(client, prefix: str, note_id: str) -> None: def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
""" """
Löscht NUR Chunks & Edges der angegebenen Note (vor Upsert). Löscht alle Chunks+Edges zu einer Note. Collections bleiben bestehen.
- Chunks: payload.note_id == note_id
- Edges : payload.source_id startswith note_id#c ODER payload.target_id startswith note_id#c
plus note-scope edges, falls vorhanden (source_id == note_id oder target_id == note_id)
""" """
_, chunks_col, edges_col = collections(prefix) _, chunks_col, edges_col = collections(prefix)
@ -130,8 +149,10 @@ def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
# Edges der Note löschen (Chunk- und Note-Scope) # Edges der Note löschen (Chunk- und Note-Scope)
should = [ should = [
# Chunk-IDs (note_id#c...)
rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")), rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")),
rest.FieldCondition(key="target_id", match=rest.MatchText(text=f"{note_id}#")), rest.FieldCondition(key="target_id", match=rest.MatchText(text=f"{note_id}#")),
# Note-Scope (source/target == note_id)
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)), rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)), rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
] ]
@ -145,7 +166,7 @@ def has_chunk_level_refs(chunk_payloads: List[Dict]) -> bool:
def fallback_note_level_edges(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]: def fallback_note_level_edges(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]:
""" """
Falls chunk_payloads keine 'references' enthalten, erzeugen wir Falls chunk_payloads keine 'references' enthalten, erzeugen wir:
- belongs_to + prev/next aus chunk_payloads - belongs_to + prev/next aus chunk_payloads
- references/backlink auf Note-Scope aus note_meta['references'] (falls vorhanden) - references/backlink auf Note-Scope aus note_meta['references'] (falls vorhanden)
""" """
@ -173,16 +194,25 @@ def fallback_note_level_edges(note_meta: Dict, chunk_payloads: List[Dict]) -> Li
# Dedupe # Dedupe
uniq = {} uniq = {}
for e in edges: for e in edges:
k = (e["src_id"], e["edge_type"], e["dst_id"], e.get("scope", "note")) key = (e["src_id"], e["dst_id"], e["edge_type"], e.get("scope", ""))
uniq[k] = e uniq[key] = e
return list(uniq.values()) return list(uniq.values())
# ----------------------------- def _normalize_rel_path(abs_path: str, vault_root: str) -> str:
# Main """Gibt einen **relativen** Pfad zurück, normalisiert auf forward slashes."""
# ----------------------------- try:
rel = os.path.relpath(abs_path, vault_root)
except Exception:
rel = abs_path # Fallback
return rel.replace("\\", "/").lstrip("/")
def main():
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def main() -> None:
load_dotenv() load_dotenv()
ap = argparse.ArgumentParser() ap = argparse.ArgumentParser()
ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (Root-Ordner)") ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (Root-Ordner)")
@ -195,7 +225,7 @@ def main():
help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)") help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)")
args = ap.parse_args() args = ap.parse_args()
# Qdrant-Config # Qdrant
cfg = QdrantConfig.from_env() cfg = QdrantConfig.from_env()
client = get_client(cfg) client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim) ensure_collections(client, cfg.prefix, cfg.dim)
@ -212,11 +242,10 @@ def main():
parsed = read_markdown(path) parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter) fm = normalize_frontmatter(parsed.frontmatter)
# Pflichtfelder? # Pflichtfelder prüfen (z. B. id, title) bei Fehler: Note überspringen
try: try:
validate_required_frontmatter(fm) validate_required_frontmatter(fm)
except Exception: except Exception:
# Überspringen, aber nicht abbrechen
continue continue
if args.note_id and fm.get("id") != args.note_id: if args.note_id and fm.get("id") != args.note_id:
@ -226,15 +255,26 @@ def main():
# Note-Payload & Validierung # Note-Payload & Validierung
note_pl = make_note_payload(parsed, vault_root=root) note_pl = make_note_payload(parsed, vault_root=root)
# **Kompat-Erweiterung**: Volltext & relativer Pfad sicherstellen
if "fulltext" not in (note_pl or {}):
note_pl["fulltext"] = parsed.body or ""
# Pfad ggf. relativieren (falls make_note_payload es noch nicht tut)
if note_pl.get("path"):
note_pl["path"] = _normalize_rel_path(os.path.join(root, note_pl["path"])
if not os.path.isabs(note_pl["path"]) else note_pl["path"],
root)
else:
# fallback: relativer Pfad aus parsed.path
note_pl["path"] = _normalize_rel_path(parsed.path, root)
validate_note_payload(note_pl) validate_note_payload(note_pl)
note_id = note_pl["note_id"] note_id = note_pl["note_id"]
# Change-Detection (Body-Hash) # Change-Detection (Body-basiert per hash_fulltext)
new_hash = note_pl.get("hash_fulltext") new_hash = note_pl.get("hash_fulltext")
old_hash = None old_hash = None
if not args.force_replace: if not args.force_replace:
old_hash = fetch_existing_note_hash(client, cfg.prefix, note_id) old_hash = fetch_existing_note_hash(client, cfg.prefix, note_id)
changed = args.force_replace or (old_hash != new_hash) changed = args.force_replace or (old_hash != new_hash)
# Chunks + Embeddings # Chunks + Embeddings
@ -246,16 +286,14 @@ def main():
note_vec = embed_one(parsed.body) if args.embed_note else None note_vec = embed_one(parsed.body) if args.embed_note else None
# Edges # Edges
# 1) Versuch: volle Kanten aus Chunk-Payloads
edges = deriv_edges_for_note(fm, chunk_pls) edges = deriv_edges_for_note(fm, chunk_pls)
# 2) Fallback, falls keine Chunk-References vorhanden sind
if not edges or (not has_chunk_level_refs(chunk_pls) and (fm.get("references") or note_pl.get("references"))): if not edges or (not has_chunk_level_refs(chunk_pls) and (fm.get("references") or note_pl.get("references"))):
# Ergänze fm['references'] falls noch nicht im fm vorhanden # Ergänze fm['references'] falls noch nicht im fm vorhanden
if "references" not in fm and "references" in note_pl: if "references" not in fm and "references" in note_pl:
fm["references"] = note_pl["references"] fm["references"] = note_pl["references"]
edges = fallback_note_level_edges(fm, chunk_pls) edges = fallback_note_level_edges(fm, chunk_pls)
# Zusammenfassung für Log # Zusammenfassung
summary = { summary = {
"note_id": note_id, "note_id": note_id,
"title": fm.get("title"), "title": fm.get("title"),
@ -269,7 +307,7 @@ def main():
} }
print(json.dumps(summary, ensure_ascii=False)) print(json.dumps(summary, ensure_ascii=False))
# Apply? # Dry-Run?
if not args.apply: if not args.apply:
continue continue
@ -277,16 +315,13 @@ def main():
if changed and args.purge_before_upsert: if changed and args.purge_before_upsert:
purge_note_artifacts(client, cfg.prefix, note_id) purge_note_artifacts(client, cfg.prefix, note_id)
# Upserts # Upserts: Notes / Chunks / Edges
# Notes
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim) notes_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
upsert_batch(client, notes_name, note_pts) upsert_batch(client, notes_name, note_pts)
# Chunks
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs) chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs)
upsert_batch(client, chunks_name, chunk_pts) upsert_batch(client, chunks_name, chunk_pts)
# Edges
edges_name, edge_pts = points_for_edges(cfg.prefix, edges) edges_name, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_name, edge_pts) upsert_batch(client, edges_name, edge_pts)