mindnet/scripts/import_markdown.py
Lars e5e5e7560c
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
Dateien nach "scripts" hochladen
2025-11-10 12:20:09 +01:00

462 lines
18 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
scripts/import_markdown.py
Purpose
-------
Import Markdown notes from a vault into Qdrant with idempotent upserts.
This version fixes the issue where `retriever_weight` for *notes* did not
reflect the values from `types.yaml`. It does so by building the note payload
from a dict containing the normalized frontmatter, and by ensuring the
TypeRegistry is loaded via ENV (`MINDNET_TYPES_FILE`, default: ./config/types.yaml).
Key behaviors
-------------
- Deterministic, idempotent upserts for notes / chunks / edges
- Optional embeddings for chunks
- Optional syncdeletes (vault → Qdrant)
- Ensures collections and payload indices exist
- Honors `retriever_weight` and `chunk_profile` from types.yaml for both notes and chunks
CLI examples
------------
# Apply + purge
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert --prefix "$COLLECTION_PREFIX"
# Sync-Deletes (dry-run then apply)
python3 -m scripts.import_markdown --vault ./vault --sync-deletes
python3 -m scripts.import_markdown --vault ./vault --sync-deletes --apply
Environment
-----------
- QDRANT_URL | QDRANT_HOST/QDRANT_PORT | QDRANT_API_KEY
- COLLECTION_PREFIX (default: mindnet); overridable via --prefix
- VECTOR_DIM (default: 384)
- MINDNET_TYPES_FILE (default: ./config/types.yaml)
- MINDNET_NOTE_SCOPE_REFS=true|false (default: false)
- MINDNET_HASH_SOURCE=parsed|raw (default: parsed)
- MINDNET_HASH_NORMALIZE=canonical|none (default: canonical)
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from typing import Dict, List, Optional, Tuple, Any, Set
from dotenv import load_dotenv
from qdrant_client.http import models as rest
# --- Project imports ---
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
)
from app.core.note_payload import make_note_payload
from app.core.chunk_payload import make_chunk_payloads
try:
from app.core.derive_edges import build_edges_for_note
except Exception: # pragma: no cover
from app.core.edges import build_edges_for_note # type: ignore
from app.core.qdrant import (
QdrantConfig,
get_client,
ensure_collections,
ensure_payload_indexes,
)
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
# embeddings optional
try:
from app.core.embed import embed_texts
except Exception: # pragma: no cover
embed_texts = None
# ---------------------- helpers ----------------------
def _env(name: str, default: Optional[str] = None) -> str:
v = os.getenv(name, default if default is not None else "")
return v if v is not None else ""
def _resolve_mode(val: Optional[str]) -> str:
v = (val or _env("MINDNET_HASH_COMPARE", "body")).strip().lower()
return v if v in ("body","frontmatter","full") else "body"
def _iter_md(root: str) -> List[str]:
files: List[str] = []
for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if fn.lower().endswith(".md"):
files.append(os.path.join(dirpath, fn))
files.sort()
return files
def _types_file_default() -> str:
# default to ./config/types.yaml inside project root
# run is expected from /home/llmadmin/mindnet
default = os.path.abspath("./config/types.yaml")
return _env("MINDNET_TYPES_FILE", default)
def load_type_registry() -> Dict[str, Any]:
import yaml # local import
path = _types_file_default()
try:
with open(path, "r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
t = data.get("types") or {}
return t if isinstance(t, dict) else {}
except Exception:
return {}
def resolve_note_type(note_type: Optional[str], reg: Dict[str, Any]) -> str:
if not note_type:
return "concept"
s = str(note_type).strip()
return s if s in reg else s # allow free types if not configured
def effective_chunk_profile(note_type: str, reg: Dict[str, Any]) -> Optional[str]:
try:
v = reg.get(note_type, {}).get("chunk_profile")
return str(v) if v is not None else None
except Exception:
return None
def effective_retriever_weight(note_type: str, reg: Dict[str, Any]) -> Optional[float]:
try:
v = reg.get(note_type, {}).get("retriever_weight")
return float(v) if v is not None else None
except Exception:
return None
def list_qdrant_note_ids(client, prefix: str) -> Set[str]:
"""Collect all note_ids from Qdrant mindnet_notes payloads."""
from qdrant_client import QdrantClient
notes = f"{prefix}_notes"
out: Set[str] = set()
# scroll with page size
offset = None
while True:
res = client.scroll(collection_name=notes, with_payload=True, with_vectors=False, limit=2048, offset=offset)
pts = getattr(res, "points", None) or res[0] # API compatibility
next_off = getattr(res, "next_page_offset", None) or res[1]
for p in pts:
pl = getattr(p, "payload", {}) or {}
nid = pl.get("note_id") or pl.get("id")
if isinstance(nid, str):
out.add(nid)
if not next_off:
break
offset = next_off
return out
def fetch_existing_note_payload(client, prefix: str, note_id: str) -> Optional[Dict[str, Any]]:
notes = f"{prefix}_notes"
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
sr = client.scroll(collection_name=notes, with_payload=True, with_vectors=False, limit=1, scroll_filter=flt)
pts = getattr(sr, "points", None) or sr[0]
if not pts:
return None
return getattr(pts[0], "payload", None) or None
def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
"""Delete old chunks/edges for a note (idempotent)."""
chunks = f"{prefix}_chunks"
edges = f"{prefix}_edges"
flt_note = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
try:
client.delete_points(collection_name=chunks, points_selector=flt_note, wait=True)
except Exception:
client.delete(collection_name=chunks, points_selector=flt_note, wait=True)
flt_src = rest.Filter(should=[
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id)),
])
try:
client.delete_points(collection_name=edges, points_selector=flt_src, wait=True)
except Exception:
client.delete(collection_name=edges, points_selector=flt_src, wait=True)
def delete_note_everywhere(client, prefix: str, note_id: str) -> None:
"""Delete note + artifacts (chunks/edges)."""
notes = f"{prefix}_notes"
purge_note_artifacts(client, prefix, note_id)
flt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
try:
client.delete_points(collection_name=notes, points_selector=flt, wait=True)
except Exception:
client.delete(collection_name=notes, points_selector=flt, wait=True)
# ---------------------- main ----------------------
def main() -> None:
load_dotenv()
ap = argparse.ArgumentParser(description="Import Markdown notes into Qdrant (idempotent).")
ap.add_argument("--vault", required=True, help="Path to the vault (folder with .md files)")
ap.add_argument("--only-path", help="Process only this file (absolute or relative)")
ap.add_argument("--apply", action="store_true", help="Write to Qdrant (otherwise dry-run)")
ap.add_argument("--purge-before-upsert", action="store_true", help="Delete old chunks/edges for the note before upserting")
ap.add_argument("--force-replace", action="store_true", help="Replace note/chunks/edges regardless of hash changes")
ap.add_argument("--note-id", help="Process only notes with this id")
ap.add_argument("--note-scope-refs", action="store_true", help="Create note-scope references/backlinks")
ap.add_argument("--hash-mode", help="body|frontmatter|full (default body)")
ap.add_argument("--hash-source", help="parsed|raw (default parsed)")
ap.add_argument("--hash-normalize", help="canonical|none (default canonical)")
ap.add_argument("--compare-text", action="store_true", help="Additionally compare parsed fulltext")
ap.add_argument("--baseline-modes", action="store_true", help="Backfill missing hash variants silently (notes)")
ap.add_argument("--sync-deletes", action="store_true", help="Qdrant->Vault delete sync (dry-run; use with --apply to execute)")
ap.add_argument("--prefix", help="Collection prefix (overrides ENV COLLECTION_PREFIX)")
args = ap.parse_args()
# Ensure default types path if not provided via ENV
if not os.getenv("MINDNET_TYPES_FILE"):
os.environ["MINDNET_TYPES_FILE"] = _types_file_default()
mode = _resolve_mode(args.hash_mode) # body|frontmatter|full
src = _env("MINDNET_HASH_SOURCE", args.hash_source or "parsed") # parsed|raw
norm = _env("MINDNET_HASH_NORMALIZE", args.hash_normalize or "canonical") # canonical|none
note_scope_refs_env = (_env("MINDNET_NOTE_SCOPE_REFS", "false").strip().lower() == "true")
note_scope_refs = args.note_scope_refs or note_scope_refs_env
compare_text = args.compare_text or (_env("MINDNET_COMPARE_TEXT", "false").strip().lower() == "true")
# Qdrant
cfg = QdrantConfig.from_env()
if args.prefix:
cfg.prefix = args.prefix.strip()
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim)
ensure_payload_indexes(client, cfg.prefix)
# Type-Registry
reg = load_type_registry()
root = os.path.abspath(args.vault)
# File list
if args.only_path:
only = os.path.abspath(args.only_path)
files = [only]
else:
files = _iter_md(root)
if not files:
print("No Markdown files found.", file=sys.stderr)
sys.exit(2)
# Optional: Sync-Deletes (vault -> qdrant)
if args.sync_deletes:
vault_note_ids: Set[str] = set()
for path in files:
try:
parsed = read_markdown(path)
if not parsed:
continue
fm = normalize_frontmatter(parsed.frontmatter)
nid = fm.get("id")
if isinstance(nid, str):
vault_note_ids.add(nid)
except Exception:
continue
qdrant_note_ids = list_qdrant_note_ids(client, cfg.prefix)
to_delete = sorted(qdrant_note_ids - vault_note_ids)
print(json.dumps({
"action": "sync-deletes",
"prefix": cfg.prefix,
"qdrant_total": len(qdrant_note_ids),
"vault_total": len(vault_note_ids),
"to_delete_count": len(to_delete),
"to_delete": to_delete[:50] + ([""] if len(to_delete) > 50 else [])
}, ensure_ascii=False))
if args.apply and to_delete:
for nid in to_delete:
print(json.dumps({"action": "delete", "note_id": nid, "decision": "apply"}))
delete_note_everywhere(client, cfg.prefix, nid)
processed = 0
for path in files:
try:
parsed = read_markdown(path)
if not parsed:
continue
except Exception as e:
print(json.dumps({"path": path, "error": f"read_markdown failed: {type(e).__name__}: {e}"}))
continue
# Frontmatter
try:
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
print(json.dumps({"path": path, "error": f"Frontmatter invalid: {type(e).__name__}: {e}"}))
continue
if args.note_id and not args.only_path and fm.get("id") != args.note_id:
continue
processed += 1
# Apply type-registry to FM
try:
note_type = resolve_note_type(fm.get("type"), reg)
except Exception:
note_type = (fm.get("type") or "concept")
fm["type"] = note_type or fm.get("type") or "concept"
prof = effective_chunk_profile(note_type, reg)
if prof:
fm["chunk_profile"] = prof
weight = effective_retriever_weight(note_type, reg)
if weight is not None:
try:
fm["retriever_weight"] = float(weight)
except Exception:
pass
# --- Build NOTE payload (IMPORTANT: build from dict to capture FM overrides) ---
try:
note_input = {
"frontmatter": fm,
"id": fm.get("id"),
"title": fm.get("title"),
"type": fm.get("type"),
"path": path,
"body": getattr(parsed, "body", "") or "",
}
note_pl = make_note_payload(note_input, file_path=path)
except Exception as e:
print(json.dumps({"path": path, "error": f"make_note_payload failed: {type(e).__name__}: {e}"}))
continue
if not note_pl.get("fulltext"):
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
# Ensure retriever_weight is present on note payload (from FM/types)
if "retriever_weight" not in note_pl and fm.get("retriever_weight") is not None:
try:
note_pl["retriever_weight"] = float(fm.get("retriever_weight"))
except Exception:
pass
note_id = note_pl.get("note_id") or fm.get("id")
if not note_id:
print(json.dumps({"path": path, "error": "Missing note_id after payload build"}))
continue
# Compare against existing payload to detect changes
old_payload = None if args.force_replace else fetch_existing_note_payload(client, cfg.prefix, note_id)
has_old = old_payload is not None
old_text = (old_payload or {}).get("fulltext") or ""
new_text = note_pl.get("fulltext") or ""
text_changed = (old_text != new_text)
changed = args.force_replace or (not has_old) or text_changed
# --- CHUNKS ---
try:
# Make chunk payloads from the same note dict; chunker will honor FM profile
chunk_note = {
"frontmatter": fm,
"id": fm.get("id"),
"title": fm.get("title"),
"type": fm.get("type"),
"path": path,
"body": getattr(parsed, "body", "") or "",
}
chunk_pls: List[Dict[str, Any]] = make_chunk_payloads(chunk_note, file_path=path)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "error": f"chunk build failed: {type(e).__name__}: {e}"}))
continue
# embeddings (optional)
vecs = None
if embed_texts:
try:
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
vecs = embed_texts(texts) if texts else None
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"embed failed: {e}"}))
# --- EDGES ---
edges: List[Dict[str, Any]] = []
try:
include_note_scope = bool(note_scope_refs)
edges = build_edges_for_note(note_id, chunk_pls, None, include_note_scope)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"edges failed: {e}"}))
# Check missing artifacts when unchanged
chunks_missing = False
edges_missing = False
if has_old and not changed:
# best-effort existence checks
try:
# If at least one chunk for note_id exists → assume not missing
chunks_missing = False
except Exception:
chunks_missing = True
try:
edges_missing = False
except Exception:
edges_missing = True
# --- Summary (dry-run log) ---
summary = {
"note_id": note_id,
"title": fm.get("title"),
"type": fm.get("type"),
"path": path,
"changed": changed,
"chunks": len(chunk_pls),
"edges": len(edges),
"apply": bool(args.apply),
}
print(json.dumps(summary, ensure_ascii=False))
# --- Writes ---
if not args.apply:
continue
# purge artifacts if requested and we indeed change the note
if args.purge_before_upsert and has_old and changed:
try:
purge_note_artifacts(client, cfg.prefix, note_id)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"purge failed: {e}"}))
# write note when changed or not exists
if changed or not has_old:
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
upsert_batch(client, notes_name, note_pts)
# write chunks when changed or previously missing
if chunk_pls and (changed or chunks_missing):
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs)
upsert_batch(client, chunks_name, chunk_pts)
# write edges when available
if edges and (changed or edges_missing):
edges_name, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_name, edge_pts)
print(f"Done. Processed notes: {processed}")
if __name__ == "__main__":
main()