mindnet/scripts/import_markdown.py
Lars a73542a391
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 4s
Dateien nach "scripts" hochladen
2025-11-16 18:16:29 +01:00

558 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
scripts/import_markdown.py (Mindnet V2 — Importer, v2.5.0)
Zweck
-----
- Liest Markdown-Notizen aus einem Vault ein
- Erzeugt Note-Payload, Chunk-Payloads (+ optionale Embeddings) und Edges
- Schreibt alles idempotent in Qdrant (Notes, Chunks, Edges)
- Nutzt optional eine Type-Registry (`./config/types.yaml`) für:
- `chunk_profile` pro type
- `retriever_weight` pro type (Frontmatter > Registry > Default 1.0)
Kernfeatures
------------
- Deterministische Hashes via `make_note_payload(...)` (hash_mode/source/normalize)
- Idempotenter Upsert; Baseline-Hash-Merge je Modus
- Optionales `--sync-deletes` (Vault → Qdrant)
- Edges aus realen Referenzen + Nachbarschaft (next/prev/belongs_to) via `derive_edges.py`/`edges.py`
- `retriever_weight` wird **immer** in Notes & Chunks geschrieben
- Keine Duplikate wie `chunk_num`/`Chunk_Number` → wir halten uns an `index` (0basiert) und `ord` (= index+1)
Aufruf
------
# Import (Apply + Purge-Update)
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert --prefix "$COLLECTION_PREFIX"
# Nur Sync-Deletes (wenn Dateien entfernt wurden)
python3 -m scripts.import_markdown --vault ./vault --sync-deletes --apply --prefix "$COLLECTION_PREFIX"
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from typing import Dict, List, Optional, Tuple, Any, Set
from dotenv import load_dotenv
from qdrant_client.http import models as rest
import yaml
# --- Projekt-Imports ---
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter
from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
try:
# bevorzugt: derive_edges (regelbasiert + real)
from app.core.derive_edges import build_edges_for_note
except Exception: # pragma: no cover
# Fallback: einfache Kanten in edges.py
from app.core.edges import build_edges_for_note # type: ignore
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
# embeddings optional
try:
from app.core.embed import embed_texts # optional
except Exception: # pragma: no cover
embed_texts = None # type: ignore
# ============================================================
# Type-Registry (config/types.yaml)
# ============================================================
def _env(name: str, default: Optional[str] = None) -> str:
v = os.getenv(name)
return v if v is not None else (default or "")
def _deep_get(root: Any, path: str) -> Any:
cur = root
for key in path.split("."):
if not isinstance(cur, dict) or key not in cur:
return None
cur = cur[key]
return cur
def _as_float(x: Any) -> Optional[float]:
try:
return float(x)
except Exception:
return None
def load_type_registry() -> dict:
# ENV überschreibt Pfad
path = _env("MINDNET_TYPES_FILE", "./config/types.yaml")
if not os.path.isfile(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception:
return {}
def get_type_config(note_type: Optional[str], reg: dict) -> dict:
if not note_type or not isinstance(reg, dict):
return {}
types = reg.get("types", {}) if isinstance(reg.get("types"), dict) else {}
return types.get(note_type, {}) if isinstance(types, dict) else {}
def resolve_note_type(requested: Optional[str], reg: dict) -> str:
if requested and isinstance(requested, str):
return requested
types = reg.get("types", {}) if isinstance(reg.get("types"), dict) else {}
return "concept" if "concept" in types else (requested or "concept")
def effective_chunk_profile(note_type: str, reg: dict) -> Optional[str]:
cfg = get_type_config(note_type, reg)
prof = cfg.get("chunk_profile")
if isinstance(prof, str):
return prof
return None
def effective_retriever_weight_from_registry(note_type: str, reg: dict) -> Tuple[Optional[float], Optional[str]]:
"""
Liefert (Wert, Pfad) aus Registry, wenn vorhanden.
Unterstützte Pfade (in dieser Reihenfolge):
- types.<type>.retriever_weight
- types.<type>.retriever.weight
- types.<type>.retrieval.weight
- defaults.retriever_weight
- defaults.retriever.weight
- global.retriever_weight
- global.retriever.weight
"""
candidates = [
f"types.{note_type}.retriever_weight",
f"types.{note_type}.retriever.weight",
f"types.{note_type}.retrieval.weight",
"defaults.retriever_weight",
"defaults.retriever.weight",
"global.retriever_weight",
"global.retriever.weight",
]
for path in candidates:
val = _deep_get(reg, path)
v = _as_float(val)
if v is not None:
return v, path
return None, None
def compute_effective_retriever_weight(fm: Dict[str, Any], note_type: str, reg: dict) -> Tuple[float, str]:
"""
Priorität: Frontmatter > Registry > Default (1.0). Gibt (Wert, Quelle) zurück.
"""
if fm.get("retriever_weight") is not None:
v = _as_float(fm.get("retriever_weight"))
if v is not None:
return v, "frontmatter.retriever_weight"
r, rpath = effective_retriever_weight_from_registry(note_type, reg)
if r is not None:
return float(r), f"types.yaml:{rpath}"
return 1.0, "default:1.0"
# ============================================================
# Qdrant Helpers
# ============================================================
def collections(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def fetch_existing_note_payload(client, prefix: str, note_id: str) -> Optional[Dict]:
notes_col, _, _ = collections(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
points, _ = client.scroll(
collection_name=notes_col,
scroll_filter=f,
with_payload=True,
with_vectors=False,
limit=1,
)
if not points:
return None
return points[0].payload or {}
def list_qdrant_note_ids(client, prefix: str) -> Set[str]:
notes_col, _, _ = collections(prefix)
out: Set[str] = set()
next_page = None
while True:
pts, next_page = client.scroll(
collection_name=notes_col,
with_payload=True,
with_vectors=False,
limit=256,
offset=next_page,
)
if not pts:
break
for p in pts:
pl = p.payload or {}
nid = pl.get("note_id")
if isinstance(nid, str):
out.add(nid)
if next_page is None:
break
return out
def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
_, chunks_col, edges_col = collections(prefix)
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for col in (chunks_col, edges_col):
try:
client.delete(
collection_name=col,
points_selector=rest.FilterSelector(filter=filt),
wait=True
)
except Exception as e:
print(json.dumps({"note_id": note_id, "warn": f"delete in {col} via filter failed: {e}"}))
def delete_note_everywhere(client, prefix: str, note_id: str) -> None:
notes_col, chunks_col, edges_col = collections(prefix)
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for col in (edges_col, chunks_col, notes_col):
try:
client.delete(
collection_name=col,
points_selector=rest.FilterSelector(filter=filt),
wait=True
)
except Exception as e:
print(json.dumps({"note_id": note_id, "warn": f"delete in {col} failed: {e}"}))
def _has_any_point(client, collection: str, note_id: str) -> bool:
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = client.scroll(
collection_name=collection,
scroll_filter=filt,
with_payload=False,
with_vectors=False,
limit=1,
)
return bool(pts)
def artifacts_missing(client, prefix: str, note_id: str) -> Tuple[bool, bool]:
_, chunks_col, edges_col = collections(prefix)
chunks_missing = not _has_any_point(client, chunks_col, note_id)
edges_missing = not _has_any_point(client, edges_col, note_id)
return chunks_missing, edges_missing
# ============================================================
# Main
# ============================================================
def _resolve_mode(m: Optional[str]) -> str:
m = (m or "body").strip().lower()
return m if m in {"body", "frontmatter", "full"} else "body"
def main() -> None:
load_dotenv()
ap = argparse.ArgumentParser(
prog="scripts.import_markdown",
description="Importiert Markdown-Notizen in Qdrant (Notes/Chunks/Edges)."
)
ap.add_argument("--vault", required=True, help="Pfad zum Vault (Ordner mit .md-Dateien)")
ap.add_argument("--only-path", help="Nur diese Datei verarbeiten (absolut oder relativ)")
ap.add_argument("--apply", action="store_true", help="Schreibt nach Qdrant (sonst Dry-Run)")
ap.add_argument("--purge-before-upsert", action="store_true", help="Alte Chunks/Edges der Note vorher löschen")
ap.add_argument("--force-replace", action="store_true", help="Note/Chunks/Edges unabhängig von Hash neu schreiben")
ap.add_argument("--note-id", help="Nur Notes mit dieser ID verarbeiten (Filter)")
ap.add_argument("--note-scope-refs", action="store_true", help="Note-scope References/Backlinks erzeugen")
ap.add_argument("--hash-mode", help="body|frontmatter|full (Default body)")
ap.add_argument("--hash-source", help="parsed|raw (Default parsed)")
ap.add_argument("--hash-normalize", help="canonical|none (Default canonical)")
ap.add_argument("--compare-text", action="store_true", help="Parsed fulltext zusätzlich direkt vergleichen")
ap.add_argument("--baseline-modes", action="store_true", help="Fehlende Hash-Varianten still nachtragen (Notes)")
ap.add_argument("--sync-deletes", action="store_true", help="Qdrant->Vault Lösch-Sync (mit --apply ausführen)")
ap.add_argument("--prefix", help="Collection-Prefix (überschreibt ENV COLLECTION_PREFIX)")
args = ap.parse_args()
mode = _resolve_mode(args.hash_mode) # body|frontmatter|full
src = _env("MINDNET_HASH_SOURCE", args.hash_source or "parsed") # parsed|raw
norm = _env("MINDNET_HASH_NORMALIZE", args.hash_normalize or "canonical") # canonical|none
note_scope_refs_env = (_env("MINDNET_NOTE_SCOPE_REFS", "false") == "true")
note_scope_refs = args.note_scope_refs or note_scope_refs_env
compare_text = args.compare_text or (_env("MINDNET_COMPARE_TEXT", "false") == "true")
# Qdrant
cfg = QdrantConfig.from_env()
if args.prefix:
cfg.prefix = args.prefix.strip()
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim)
ensure_payload_indexes(client, cfg.prefix)
# Type-Registry laden
reg = load_type_registry()
root = os.path.abspath(args.vault)
# Dateiliste
if args.only_path:
only = os.path.abspath(args.only_path)
files = [only]
else:
files: List[str] = []
for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if not fn.lower().endswith(".md"):
continue
p = os.path.join(dirpath, fn)
pn = p.replace("\\", "/")
if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]):
continue
files.append(p)
files.sort()
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2)
# Optional: Sync-Deletes vorab
if args.sync_deletes:
vault_note_ids: Set[str] = set()
for path in files:
try:
parsed = read_markdown(path)
if not parsed:
continue
fm = normalize_frontmatter(parsed.frontmatter)
nid = fm.get("id")
if isinstance(nid, str):
vault_note_ids.add(nid)
except Exception:
continue
qdrant_note_ids = list_qdrant_note_ids(client, cfg.prefix)
to_delete = sorted(qdrant_note_ids - vault_note_ids)
print(json.dumps({
"action": "sync-deletes",
"prefix": cfg.prefix,
"qdrant_total": len(qdrant_note_ids),
"vault_total": len(vault_note_ids),
"to_delete_count": len(to_delete),
"to_delete": to_delete[:50] + ([""] if len(to_delete) > 50 else [])
}, ensure_ascii=False))
if args.apply and to_delete:
for nid in to_delete:
print(json.dumps({"action": "delete", "note_id": nid, "decision": "apply"}))
delete_note_everywhere(client, cfg.prefix, nid)
key_current = f"{mode}:{src}:{norm}"
processed = 0
for path in files:
try:
parsed = read_markdown(path)
if not parsed:
continue
except Exception as e:
print(json.dumps({"path": path, "error": f"read_markdown failed: {type(e).__name__}: {e}"}))
continue
# --- Frontmatter prüfen ---
try:
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
print(json.dumps({"path": path, "error": f"Frontmatter invalid: {type(e).__name__}: {e}"}))
continue
if args.note_id and not args.only_path and fm.get("id") != args.note_id:
continue
processed += 1
# --- Typ & Profile aus Registry übernehmen ---
try:
note_type = resolve_note_type(fm.get("type"), reg)
except Exception:
note_type = (fm.get("type") or "concept")
fm["type"] = note_type or fm.get("type") or "concept"
prof = effective_chunk_profile(note_type, reg)
if prof:
fm["chunk_profile"] = prof
# --- retriever_weight auflösen (FM > Registry > 1.0) ---
rw, rw_source = compute_effective_retriever_weight(fm, note_type, reg)
fm["retriever_weight"] = rw # Spiegeln ins FM für Builder
# --- Payload aufbauen (inkl. Hashes) ---
try:
note_pl = make_note_payload(
parsed,
vault_root=root,
hash_mode=mode,
hash_normalize=norm,
hash_source=src,
file_path=path,
)
except Exception as e:
print(json.dumps({"path": path, "error": f"make_note_payload failed: {type(e).__name__}: {e}"}))
continue
if not note_pl.get("fulltext"):
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
# retriever_weight **immer** in Note-Payload schreiben
try:
note_pl["retriever_weight"] = float(rw)
except Exception:
note_pl["retriever_weight"] = 1.0
note_id = note_pl.get("note_id") or fm.get("id")
if not note_id:
print(json.dumps({"path": path, "error": "Missing note_id after payload build"}))
continue
# --- bestehenden Payload laden (zum Diff) ---
old_payload = None if args.force_replace else fetch_existing_note_payload(client, cfg.prefix, note_id)
has_old = old_payload is not None
old_hashes = (old_payload or {}).get("hashes") or {}
old_hash_exact = old_hashes.get(key_current)
new_hash_exact = (note_pl.get("hashes") or {}).get(key_current)
needs_baseline = (old_hash_exact is None)
hash_changed = (old_hash_exact is not None and new_hash_exact is not None and old_hash_exact != new_hash_exact)
text_changed = False
if compare_text:
old_text = (old_payload or {}).get("fulltext") or ""
new_text = note_pl.get("fulltext") or ""
text_changed = (old_text != new_text)
changed = args.force_replace or (not has_old) or hash_changed or text_changed
do_baseline_only = (args.baseline_modes and has_old and needs_baseline and not changed)
# --- Chunks + Embeddings vorbereiten ---
try:
body_text = getattr(parsed, "body", "") or ""
chunks = assemble_chunks(fm["id"], body_text, fm.get("type", "concept"))
chunk_pls: List[Dict[str, Any]] = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "error": f"chunk build failed: {type(e).__name__}: {e}"}))
continue
# **Felder-Policy**: `index` (0-based) & `ord` (1-based) sicherstellen; keine doppelten Alias-Felder
for i, pl in enumerate(chunk_pls):
if "index" not in pl:
pl["index"] = i
pl["ord"] = int(pl.get("index", i)) + 1
# Entferne ggf. Alt-Aliase, um Duplikate zu vermeiden
for alias in ("chunk_num", "Chunk_Number"):
if alias in pl:
pl.pop(alias, None)
# retriever_weight **immer** auf Chunk-Payload spiegeln
rwf = float(rw) if isinstance(rw, (int, float)) else 1.0
for pl in chunk_pls:
pl["retriever_weight"] = rwf
# Embeddings (fallback: Nullvektoren)
vecs: List[List[float]] = [[0.0] * int(cfg.dim) for _ in chunk_pls]
if embed_texts and chunk_pls:
try:
texts_for_embed = [(pl.get("window") or pl.get("text") or "") for pl in chunk_pls]
vecs = embed_texts(texts_for_embed)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"embed_texts failed, using zeros: {e}"}))
# --- Fehlende Artefakte in Qdrant ermitteln ---
chunks_missing, edges_missing = artifacts_missing(client, cfg.prefix, note_id)
# --- Edges (robust) ---
edges: List[Dict[str, Any]] = []
edges_failed = False
should_build_edges = (changed and (not do_baseline_only)) or edges_missing
if should_build_edges:
try:
note_refs = note_pl.get("references") or []
# Positionsargumente → kompatibel mit derive_edges.py UND edges.py
edges = build_edges_for_note(note_id, chunk_pls, note_refs, include_note_scope_refs=note_scope_refs)
except Exception as e:
edges_failed = True
edges = []
print(json.dumps({"path": path, "note_id": note_id, "warn": f"build_edges_for_note failed, skipping edges: {type(e).__name__}: {e}"}))
# --- Summary (stdout) ---
summary = {
"note_id": note_id,
"title": fm.get("title"),
"type": fm.get("type"),
"rw": rw,
"rw_source": rw_source,
"chunks": len(chunk_pls),
"edges": len(edges),
"edges_failed": edges_failed,
"changed": changed,
"chunks_missing": chunks_missing,
"edges_missing": edges_missing,
"needs_baseline_for_mode": needs_baseline,
"decision": ("baseline-only" if args.apply and do_baseline_only else
"apply" if args.apply and (changed or chunks_missing or edges_missing) else
"apply-skip-unchanged" if args.apply and not (changed or chunks_missing or edges_missing) else
"dry-run"),
"path": note_pl["path"],
"hash_mode": mode,
"hash_normalize": norm,
"hash_source": src,
"prefix": cfg.prefix,
}
print(json.dumps(summary, ensure_ascii=False))
# --- Writes ---
if not args.apply:
continue
if do_baseline_only:
merged_hashes = {}
merged_hashes.update(old_hashes)
merged_hashes.update(note_pl.get("hashes") or {})
if old_payload:
note_pl["hash_fulltext"] = old_payload.get("hash_fulltext", note_pl.get("hash_fulltext"))
note_pl["hash_signature"] = old_payload.get("hash_signature", note_pl.get("hash_signature"))
note_pl["hashes"] = merged_hashes
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
upsert_batch(client, notes_name, note_pts)
continue
# Wenn nichts geändert und keine Artefakte fehlen → nichts zu tun
if not changed and not (chunks_missing or edges_missing):
continue
# Purge nur bei echten Änderungen
if args.purge_before_upsert and has_old and changed:
try:
purge_note_artifacts(client, cfg.prefix, note_id)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"purge failed: {e}"}))
# Note nur bei Änderungen neu schreiben
if changed:
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
upsert_batch(client, notes_name, note_pts)
# Chunks schreiben, wenn geändert ODER vorher fehlend
if chunk_pls and (changed or chunks_missing):
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs)
upsert_batch(client, chunks_name, chunk_pts)
# Edges schreiben, wenn vorhanden und (geändert ODER vorher fehlend)
if edges and (changed or edges_missing):
edges_name, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_name, edge_pts)
print(f"Done. Processed notes: {processed}")
if __name__ == "__main__":
main()