scripts/import_markdown.py aktualisiert
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 2s

This commit is contained in:
Lars 2025-09-09 11:26:57 +02:00
parent 77fa57a2a6
commit f9c0b5df19

View File

@ -2,41 +2,28 @@
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Script: import_markdown.py Markdown Qdrant (Notes, Chunks, Edges) Script: import_markdown.py Markdown Qdrant (Notes, Chunks, Edges)
Version: 3.1.0 Version: 3.2.0
Datum: 2025-09-09 Datum: 2025-09-09
Kurzbeschreibung Kurzbeschreibung
---------------- ----------------
Liest Markdown-Dateien aus einem Vault ein und schreibt Notes, Chunks und Edges Liest Markdown-Dateien aus einem Vault ein und schreibt Notes, Chunks und Edges
idempotent nach Qdrant. Die Change-Detection basiert standardmäßig ausschließlich idempotent nach Qdrant. Change-Detection basiert standardmäßig auf dem **Body-Hash**.
auf dem **Body-Hash** (ENV: MINDNET_HASH_MODE), sodass reine Frontmatter-Änderungen Neu: Hash-Modus und Normalisierung sind auch per **CLI** steuerbar.
nicht unnötig Re-Imports auslösen (abwärtskompatibel).
Wichtige Kompatibilitäts-Erweiterung ENV / Qdrant
------------------------------------
- Persistiert nun den **Volltext** der Note im Notes-Payload unter ``fulltext``.
So kann der Export **verlustfrei** rekonstruiert werden, auch wenn die Chunk-
Payloads (noch) kein ``text``-Feld enthalten.
- Speichert den ``path`` **relativ** zum Vault (führt absolute Pfade auf relative
Pfade zurück), sodass Exporte zuverlässig unter ``--out`` landen.
Features
--------
- Deterministische IDs (UUIDv5) über qdrant_points.* (beibehalten)
- Idempotente Upserts für Notes/Chunks/Edges
- Optionale Vorreinigung pro geänderter Note: ``--purge-before-upsert``
- Robuste Edge-Erzeugung:
* Chunk-Scope: belongs_to / prev / next / references (+ backlink)
* Fallback-Mode: Note-Scope references, falls chunk_payloads keine references liefern
- Optionaler Note-Embedding-Vektor (``--embed-note``)
Qdrant / ENV
------------ ------------
- QDRANT_URL (oder QDRANT_HOST/QDRANT_PORT) - QDRANT_URL (oder QDRANT_HOST/QDRANT_PORT)
- QDRANT_API_KEY (optional) - QDRANT_API_KEY (optional)
- COLLECTION_PREFIX (Default: mindnet) - COLLECTION_PREFIX (Default: mindnet)
- VECTOR_DIM (Default: 384) - VECTOR_DIM (Default: 384)
- MINDNET_HASH_MODE: "body" (Default) | "frontmatter" | "body+frontmatter" - MINDNET_HASH_MODE: "body" (Default) | "frontmatter" | "body+frontmatter"
- MINDNET_HASH_NORMALIZE: "canonical" (Default) | "none"
CLI (übersteuert ENV)
---------------------
--hash-mode body|frontmatter|body+frontmatter
--hash-normalize canonical|none
Aufruf Aufruf
------ ------
@ -44,18 +31,9 @@ Aufruf
python3 -m scripts.import_markdown --vault ./vault --apply python3 -m scripts.import_markdown --vault ./vault --apply
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo --apply python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo --apply
# optionaler Note-Vektor
python3 -m scripts.import_markdown --vault ./vault --apply --embed-note python3 -m scripts.import_markdown --vault ./vault --apply --embed-note
# Feingranulare Erkennung (jede Kleinigkeit im Body zählt):
Beispiele python3 -m scripts.import_markdown --vault ./vault --hash-normalize none
---------
COLLECTION_PREFIX=mindnet QDRANT_URL=http://127.0.0.1:6333 \\
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
Hinweise
--------
- Dry-Run (ohne --apply) zeigt Entscheidungen je Note als JSON (keine Schreibvorgänge).
- Backfill-Tools existieren weiterhin, werden aber nur als Reparatur eingesetzt.
""" """
from __future__ import annotations from __future__ import annotations
@ -63,25 +41,21 @@ from __future__ import annotations
import argparse import argparse
import json import json
import os import os
import re
import sys import sys
from typing import Dict, Iterable, List, Optional, Tuple from typing import Dict, List, Optional, Tuple
from dotenv import load_dotenv from dotenv import load_dotenv
import yaml
from qdrant_client.http import models as rest from qdrant_client.http import models as rest
# Core/Projekt-Module
from app.core.parser import ( from app.core.parser import (
read_markdown, read_markdown,
normalize_frontmatter, normalize_frontmatter,
validate_required_frontmatter, validate_required_frontmatter,
extract_wikilinks,
) )
from app.core.note_payload import make_note_payload from app.core.note_payload import make_note_payload
from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads from app.core.chunk_payload import make_chunk_payloads
from app.core.embed import embed_texts, embed_one
from app.core.qdrant import QdrantConfig, get_client, ensure_collections from app.core.qdrant import QdrantConfig, get_client, ensure_collections
from app.core.qdrant_points import ( from app.core.qdrant_points import (
points_for_chunks, points_for_chunks,
@ -89,7 +63,12 @@ from app.core.qdrant_points import (
points_for_edges, points_for_edges,
upsert_batch, upsert_batch,
) )
from app.core.edges import deriv_edges_for_note
try:
from app.core.embed import embed_texts, embed_one # optional
except Exception:
embed_texts = None
embed_one = None
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
@ -97,7 +76,6 @@ from app.core.edges import deriv_edges_for_note
# ----------------------------------------------------------------------------- # -----------------------------------------------------------------------------
def iter_md(root: str) -> List[str]: def iter_md(root: str) -> List[str]:
"""Sammelt alle .md-Dateien unterhalb von root, filtert systemische Ordner."""
out: List[str] = [] out: List[str] = []
for dirpath, _, filenames in os.walk(root): for dirpath, _, filenames in os.walk(root):
for fn in filenames: for fn in filenames:
@ -110,13 +88,10 @@ def iter_md(root: str) -> List[str]:
out.append(p) out.append(p)
return sorted(out) return sorted(out)
def collections(prefix: str) -> Tuple[str, str, str]: def collections(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges" return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]: def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]:
"""Liest den bisherigen hash_fulltext aus der Notes-Collection (falls vorhanden)."""
notes_col, _, _ = collections(prefix) notes_col, _, _ = collections(prefix)
f = rest.Filter(must=[rest.FieldCondition( f = rest.Filter(must=[rest.FieldCondition(
key="note_id", key="note_id",
@ -134,77 +109,24 @@ def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]
pl = points[0].payload or {} pl = points[0].payload or {}
return pl.get("hash_fulltext") return pl.get("hash_fulltext")
def purge_note_artifacts(client, prefix: str, note_id: str) -> None: def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
"""
Löscht alle Chunks+Edges zu einer Note. Collections bleiben bestehen.
"""
_, chunks_col, edges_col = collections(prefix) _, chunks_col, edges_col = collections(prefix)
f_chunks = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
# Chunks der Note löschen
f_chunks = rest.Filter(
must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]
)
client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True) client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True)
# Edges der Note löschen (Chunk- und Note-Scope)
should = [ should = [
# Chunk-IDs (note_id#c...)
rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")), rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")),
rest.FieldCondition(key="target_id", match=rest.MatchText(text=f"{note_id}#")), rest.FieldCondition(key="target_id", match=rest.MatchText(text=f"{note_id}#")),
# Note-Scope (source/target == note_id)
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)), rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)), rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
] ]
f_edges = rest.Filter(should=should) f_edges = rest.Filter(should=should)
client.delete(collection_name=edges_col, points_selector=f_edges, wait=True) client.delete(collection_name=edges_col, points_selector=f_edges, wait=True)
def has_chunk_level_refs(chunk_payloads: List[Dict]) -> bool:
return any(isinstance(ch.get("references"), list) and ch["references"] for ch in chunk_payloads)
def fallback_note_level_edges(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]:
"""
Falls chunk_payloads keine 'references' enthalten, erzeugen wir:
- belongs_to + prev/next aus chunk_payloads
- references/backlink auf Note-Scope aus note_meta['references'] (falls vorhanden)
"""
edges: List[Dict] = []
# belongs_to + prev/next (Chunk-Scope)
for ch in chunk_payloads:
src = ch["id"]
edges.append({"src_id": src, "dst_id": note_meta["id"], "edge_type": "belongs_to", "scope": "chunk"})
nb = ch.get("neighbors") or {}
prev_id = nb.get("prev")
next_id = nb.get("next")
if prev_id:
edges.append({"src_id": src, "dst_id": prev_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": prev_id, "dst_id": src, "edge_type": "prev", "scope": "chunk"})
if next_id:
edges.append({"src_id": src, "dst_id": next_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": next_id, "dst_id": src, "edge_type": "prev", "scope": "chunk"})
# Note-Scope references/backlink
for tid in (note_meta.get("references") or []):
edges.append({"src_id": note_meta["id"], "dst_id": tid, "edge_type": "references", "scope": "note"})
edges.append({"src_id": tid, "dst_id": note_meta["id"], "edge_type": "backlink", "scope": "note"})
# Dedupe
uniq = {}
for e in edges:
key = (e["src_id"], e["dst_id"], e["edge_type"], e.get("scope", ""))
uniq[key] = e
return list(uniq.values())
def _normalize_rel_path(abs_path: str, vault_root: str) -> str: def _normalize_rel_path(abs_path: str, vault_root: str) -> str:
"""Gibt einen **relativen** Pfad zurück, normalisiert auf forward slashes."""
try: try:
rel = os.path.relpath(abs_path, vault_root) rel = os.path.relpath(abs_path, vault_root)
except Exception: except Exception:
rel = abs_path # Fallback rel = abs_path
return rel.replace("\\", "/").lstrip("/") return rel.replace("\\", "/").lstrip("/")
@ -223,9 +145,11 @@ def main() -> None:
ap.add_argument("--embed-note", action="store_true", help="Optional: Note-Volltext einbetten") ap.add_argument("--embed-note", action="store_true", help="Optional: Note-Volltext einbetten")
ap.add_argument("--force-replace", action="store_true", ap.add_argument("--force-replace", action="store_true",
help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)") help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)")
# NEU: Hash-Steuerung per CLI
ap.add_argument("--hash-mode", choices=["body", "frontmatter", "body+frontmatter"], default=None)
ap.add_argument("--hash-normalize", choices=["canonical", "none"], default=None)
args = ap.parse_args() args = ap.parse_args()
# Qdrant
cfg = QdrantConfig.from_env() cfg = QdrantConfig.from_env()
client = get_client(cfg) client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim) ensure_collections(client, cfg.prefix, cfg.dim)
@ -242,10 +166,10 @@ def main() -> None:
parsed = read_markdown(path) parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter) fm = normalize_frontmatter(parsed.frontmatter)
# Pflichtfelder prüfen (z. B. id, title) bei Fehler: Note überspringen
try: try:
validate_required_frontmatter(fm) validate_required_frontmatter(fm)
except Exception: except Exception as e:
print(json.dumps({"path": path, "error": f"Frontmatter invalid: {e}"}))
continue continue
if args.note_id and fm.get("id") != args.note_id: if args.note_id and fm.get("id") != args.note_id:
@ -253,45 +177,58 @@ def main() -> None:
processed += 1 processed += 1
# Note-Payload & Validierung # Note-Payload (mit expliziten Hash-Parametern)
note_pl = make_note_payload(parsed, vault_root=root) note_pl = make_note_payload(parsed, vault_root=root,
# **Kompat-Erweiterung**: Volltext & relativer Pfad sicherstellen hash_mode=args.hash_mode, hash_normalize=args.hash_normalize)
if "fulltext" not in (note_pl or {}): if "fulltext" not in (note_pl or {}):
note_pl["fulltext"] = parsed.body or "" note_pl["fulltext"] = parsed.body or ""
# Pfad ggf. relativieren (falls make_note_payload es noch nicht tut)
if note_pl.get("path"): if note_pl.get("path"):
note_pl["path"] = _normalize_rel_path(os.path.join(root, note_pl["path"]) note_pl["path"] = _normalize_rel_path(
if not os.path.isabs(note_pl["path"]) else note_pl["path"], os.path.join(root, note_pl["path"]) if not os.path.isabs(note_pl["path"]) else note_pl["path"], root
root) )
else: else:
# fallback: relativer Pfad aus parsed.path
note_pl["path"] = _normalize_rel_path(parsed.path, root) note_pl["path"] = _normalize_rel_path(parsed.path, root)
validate_note_payload(note_pl)
note_id = note_pl["note_id"] note_id = note_pl["note_id"]
# Change-Detection (Body-basiert per hash_fulltext) # Change-Detection
new_hash = note_pl.get("hash_fulltext") new_hash = note_pl.get("hash_fulltext")
old_hash = None old_hash = None if args.force_replace else fetch_existing_note_hash(client, cfg.prefix, note_id)
if not args.force_replace:
old_hash = fetch_existing_note_hash(client, cfg.prefix, note_id)
changed = args.force_replace or (old_hash != new_hash) changed = args.force_replace or (old_hash != new_hash)
# Chunks + Embeddings # Chunks + Embeddings (Nullvektor-Fallback)
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept")) chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks) chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
vecs = embed_texts([c.text for c in chunks]) if embed_texts:
vecs = embed_texts([getattr(c, "text", "") for c in chunks])
else:
vecs = [[0.0] * cfg.dim for _ in chunks]
# Optional: Note-Vektor # Edges (leichtgewichtig direkt hier ableiten)
note_vec = embed_one(parsed.body) if args.embed_note else None # belongs_to / prev/next aus Chunk-Nachbarschaften + Wikilinks als references
edges: List[Dict] = []
# Edges for ch in chunk_pls:
edges = deriv_edges_for_note(fm, chunk_pls) cid = ch["id"]
if not edges or (not has_chunk_level_refs(chunk_pls) and (fm.get("references") or note_pl.get("references"))): edges.append({"src_id": cid, "dst_id": note_id, "edge_type": "belongs_to", "scope": "chunk"})
# Ergänze fm['references'] falls noch nicht im fm vorhanden nb = ch.get("neighbors") or {}
if "references" not in fm and "references" in note_pl: if nb.get("prev"):
fm["references"] = note_pl["references"] edges.append({"src_id": nb["prev"], "dst_id": cid, "edge_type": "next", "scope": "chunk"})
edges = fallback_note_level_edges(fm, chunk_pls) edges.append({"src_id": cid, "dst_id": nb["prev"], "edge_type": "prev", "scope": "chunk"})
if nb.get("next"):
edges.append({"src_id": cid, "dst_id": nb["next"], "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": nb["next"], "dst_id": cid, "edge_type": "prev", "scope": "chunk"})
for ref in (ch.get("references") or []):
tid = ref.get("target_id")
if tid:
edges.append({"src_id": cid, "dst_id": tid, "edge_type": "references", "scope": "chunk"})
for tid in (note_pl.get("references") or []):
edges.append({"src_id": note_id, "dst_id": tid, "edge_type": "references", "scope": "note"})
edges.append({"src_id": tid, "dst_id": note_id, "edge_type": "backlink", "scope": "note"})
# Dedupe
_uniq = {}
for e in edges:
_uniq[(e["src_id"], e["dst_id"], e["edge_type"], e.get("scope", ""))] = e
edges = list(_uniq.values())
# Zusammenfassung # Zusammenfassung
summary = { summary = {
@ -304,19 +241,20 @@ def main() -> None:
"apply-skip-unchanged" if args.apply and not changed else "apply-skip-unchanged" if args.apply and not changed else
"dry-run"), "dry-run"),
"path": note_pl["path"], "path": note_pl["path"],
"hash_mode": args.hash_mode or os.environ.get("MINDNET_HASH_MODE", "body"),
"hash_normalize": args.hash_normalize or os.environ.get("MINDNET_HASH_NORMALIZE", "canonical"),
"hash_old": old_hash,
"hash_new": new_hash,
} }
print(json.dumps(summary, ensure_ascii=False)) print(json.dumps(summary, ensure_ascii=False))
# Dry-Run?
if not args.apply: if not args.apply:
continue continue
# Optionaler Purge NUR für geänderte Notes
if changed and args.purge_before_upsert: if changed and args.purge_before_upsert:
purge_note_artifacts(client, cfg.prefix, note_id) purge_note_artifacts(client, cfg.prefix, note_id)
# Upserts: Notes / Chunks / Edges notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
upsert_batch(client, notes_name, note_pts) upsert_batch(client, notes_name, note_pts)
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs) chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs)