mindnet/scripts/import_markdown.py
Lars c7644a36aa
All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
Dateien nach "scripts" hochladen
2025-11-16 18:47:36 +01:00

466 lines
19 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
scripts/import_markdown.py (Mindnet V2 — Importer, v2.5.2)
Änderungen in 2.5.2 (minimal & gezielt)
---------------------------------------
- **Explizite Spiegelung** von `chunk_profile` in Note- und Chunk-Payload:
note_pl["chunk_profile"] = fm.get("chunk_profile"); für jeden Chunk cpl["chunk_profile"] = fm.get("chunk_profile").
- **Explizite Spiegelung** von `retriever_weight` in jedem Chunk (falls Builder es nicht gesetzt hat).
- **Feld-basierte Change-Erkennung** erweitert:
Wenn bestehende Note-Payload (`old_payload`) bei `retriever_weight` oder `chunk_profile` vom neuen Wert abweicht
oder ein Feld fehlt, wird `changed = True` gesetzt → Upsert erzwingen.
- **Robuste Übergabe** von Type-Registry an Chunk-Builder:
`types_cfg=(reg.get('types') or reg or {})` damit greifen Resolver in chunk_payload.py sicher auf `types.yaml` zu.
"""
from __future__ import annotations
import argparse
import json
import os
import sys
from typing import Dict, List, Optional, Tuple, Any, Set
from dotenv import load_dotenv
from qdrant_client.http import models as rest
import yaml
# --- Projekt-Imports ---
from app.core.parser import read_markdown, normalize_frontmatter, validate_required_frontmatter
from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
try:
from app.core.derive_edges import build_edges_for_note
except Exception:
from app.core.edges import build_edges_for_note # type: ignore
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.qdrant_points import points_for_chunks, points_for_note, points_for_edges, upsert_batch
# embeddings optional
try:
from app.core.embed import embed_texts # optional
except Exception:
embed_texts = None # type: ignore
# ============================================================
# Type-Registry
# ============================================================
def _env(name: str, default: Optional[str] = None) -> str:
v = os.getenv(name)
return v if v is not None else (default or "")
def _deep_get(root: Any, path: str) -> Any:
cur = root
for key in path.split("."):
if not isinstance(cur, dict) or key not in cur:
return None
cur = cur[key]
return cur
def _as_float(x: Any) -> Optional[float]:
try:
return float(x)
except Exception:
return None
def load_type_registry() -> dict:
path = _env("MINDNET_TYPES_FILE", "./config/types.yaml")
if not os.path.isfile(path):
return {}
try:
with open(path, "r", encoding="utf-8") as f:
return yaml.safe_load(f) or {}
except Exception:
return {}
def get_type_config(note_type: Optional[str], reg: dict) -> dict:
if not note_type or not isinstance(reg, dict):
return {}
types = reg.get("types", {}) if isinstance(reg.get("types"), dict) else {}
return types.get(note_type, {}) if isinstance(types, dict) else {}
def resolve_note_type(requested: Optional[str], reg: dict) -> str:
if requested and isinstance(requested, str):
return requested
types = reg.get("types", {}) if isinstance(reg.get("types"), dict) else {}
return "concept" if "concept" in types else (requested or "concept")
def effective_chunk_profile(note_type: str, reg: dict) -> Optional[str]:
cfg = get_type_config(note_type, reg)
prof = cfg.get("chunk_profile")
if isinstance(prof, str):
return prof
return None
def effective_retriever_weight_from_registry(note_type: str, reg: dict) -> Tuple[Optional[float], Optional[str]]:
candidates = [
f"types.{note_type}.retriever_weight",
f"types.{note_type}.retriever.weight",
f"types.{note_type}.retrieval.weight",
"defaults.retriever_weight",
"defaults.retriever.weight",
"global.retriever_weight",
"global.retriever.weight",
]
for path in candidates:
val = _deep_get(reg, path)
v = _as_float(val)
if v is not None:
return v, path
return None, None
def compute_effective_retriever_weight(fm: Dict[str, Any], note_type: str, reg: dict) -> Tuple[float, str]:
if fm.get("retriever_weight") is not None:
v = _as_float(fm.get("retriever_weight"))
if v is not None:
return v, "frontmatter.retriever_weight"
r, rpath = effective_retriever_weight_from_registry(note_type, reg)
if r is not None:
return float(r), f"types.yaml:{rpath}"
return 1.0, "default:1.0"
# ============================================================
# Qdrant Helpers
# ============================================================
def collections(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def fetch_existing_note_payload(client, prefix: str, note_id: str) -> Optional[Dict]:
notes_col, _, _ = collections(prefix)
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
points, _ = client.scroll(collection_name=notes_col, scroll_filter=f, with_payload=True, with_vectors=False, limit=1)
if not points:
return None
return points[0].payload or {}
def list_qdrant_note_ids(client, prefix: str) -> Set[str]:
notes_col, _, _ = collections(prefix)
out: Set[str] = set()
next_page = None
while True:
pts, next_page = client.scroll(collection_name=notes_col, with_payload=True, with_vectors=False, limit=256, offset=next_page)
if not pts:
break
for p in pts:
pl = p.payload or {}
nid = pl.get("note_id")
if isinstance(nid, str):
out.add(nid)
if next_page is None:
break
return out
def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
_, chunks_col, edges_col = collections(prefix)
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for col in (chunks_col, edges_col):
try:
client.delete(collection_name=col, points_selector=rest.FilterSelector(filter=filt), wait=True)
except Exception as e:
print(json.dumps({"note_id": note_id, "warn": f"delete in {col} via filter failed: {e}"}))
def delete_note_everywhere(client, prefix: str, note_id: str) -> None:
notes_col, chunks_col, edges_col = collections(prefix)
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
for col in (edges_col, chunks_col, notes_col):
try:
client.delete(collection_name=col, points_selector=rest.FilterSelector(filter=filt), wait=True)
except Exception as e:
print(json.dumps({"note_id": note_id, "warn": f"delete in {col} failed: {e}"}))
def _has_any_point(client, collection: str, note_id: str) -> bool:
filt = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = client.scroll(collection_name=collection, scroll_filter=filt, with_payload=False, with_vectors=False, limit=1)
return bool(pts)
def artifacts_missing(client, prefix: str, note_id: str) -> Tuple[bool, bool]:
_, chunks_col, edges_col = collections(prefix)
chunks_missing = not _has_any_point(client, chunks_col, note_id)
edges_missing = not _has_any_point(client, edges_col, note_id)
return chunks_missing, edges_missing
# ============================================================
# Main
# ============================================================
def _resolve_mode(m: Optional[str]) -> str:
m = (m or "body").strip().lower()
return m if m in {"body", "frontmatter", "full"} else "body"
def main() -> None:
load_dotenv()
ap = argparse.ArgumentParser(prog="scripts.import_markdown", description="Importiert Markdown-Notizen in Qdrant (Notes/Chunks/Edges).")
ap.add_argument("--vault", required=True)
ap.add_argument("--only-path")
ap.add_argument("--apply", action="store_true")
ap.add_argument("--purge-before-upsert", action="store_true")
ap.add_argument("--force-replace", action="store_true")
ap.add_argument("--note-id")
ap.add_argument("--note-scope-refs", action="store_true")
ap.add_argument("--hash-mode")
ap.add_argument("--hash-source")
ap.add_argument("--hash-normalize")
ap.add_argument("--compare-text", action="store_true")
ap.add_argument("--baseline-modes", action="store_true")
ap.add_argument("--sync-deletes", action="store_true")
ap.add_argument("--prefix")
args = ap.parse_args()
mode = _resolve_mode(args.hash_mode)
src = _env("MINDNET_HASH_SOURCE", args.hash_source or "parsed")
norm = _env("MINDNET_HASH_NORMALIZE", args.hash_normalize or "canonical")
note_scope_refs_env = (_env("MINDNET_NOTE_SCOPE_REFS", "false") == "true")
note_scope_refs = args.note_scope_refs or note_scope_refs_env
compare_text = args.compare_text or (_env("MINDNET_COMPARE_TEXT", "false") == "true")
cfg = QdrantConfig.from_env()
if args.prefix:
cfg.prefix = args.prefix.strip()
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim)
ensure_payload_indexes(client, cfg.prefix)
reg = load_type_registry()
types_cfg_node = reg.get("types") if isinstance(reg, dict) else {}
root = os.path.abspath(args.vault)
# Datei-Liste
if args.only_path:
files = [os.path.abspath(args.only_path)]
else:
files: List[str] = []
for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if fn.lower().endswith(".md"):
p = os.path.join(dirpath, fn)
pn = p.replace("\\", "/")
if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]):
continue
files.append(p)
files.sort()
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2)
# Optional: Sync-Deletes
if args.sync_deletes:
vault_note_ids: Set[str] = set()
for path in files:
try:
parsed = read_markdown(path)
if not parsed:
continue
fm = normalize_frontmatter(parsed.frontmatter)
nid = fm.get("id")
if isinstance(nid, str):
vault_note_ids.add(nid)
except Exception:
continue
qdrant_note_ids = list_qdrant_note_ids(client, cfg.prefix)
to_delete = sorted(qdrant_note_ids - vault_note_ids)
print(json.dumps({"action":"sync-deletes","prefix":cfg.prefix,"qdrant_total":len(qdrant_note_ids),"vault_total":len(vault_note_ids),"to_delete_count":len(to_delete),"to_delete":to_delete[:50]+([""] if len(to_delete)>50 else [])}, ensure_ascii=False))
if args.apply and to_delete:
for nid in to_delete:
print(json.dumps({"action":"delete","note_id":nid,"decision":"apply"}))
delete_note_everywhere(client, cfg.prefix, nid)
key_current = f"{mode}:{src}:{norm}"
processed = 0
for path in files:
try:
parsed = read_markdown(path)
if not parsed:
continue
except Exception as e:
print(json.dumps({"path": path, "error": f"read_markdown failed: {type(e).__name__}: {e}"}))
continue
# Frontmatter
try:
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
print(json.dumps({"path": path, "error": f"Frontmatter invalid: {type(e).__name__}: {e}"}))
continue
if args.note_id and not args.only_path and fm.get("id") != args.note_id:
continue
processed += 1
# Typ + Profile/Weight
note_type = resolve_note_type(fm.get("type"), reg)
fm["type"] = note_type or fm.get("type") or "concept"
prof = effective_chunk_profile(note_type, reg)
if prof:
fm["chunk_profile"] = prof
rw, rw_source = compute_effective_retriever_weight(fm, note_type, reg)
fm["retriever_weight"] = rw
# Note-Payload
try:
note_pl = make_note_payload(parsed, vault_root=root, hash_mode=mode, hash_normalize=norm, hash_source=src, file_path=path)
except Exception as e:
print(json.dumps({"path": path, "error": f"make_note_payload failed: {type(e).__name__}: {e}"}))
continue
# Explizites Spiegeln: chunk_profile & retriever_weight in Note
if fm.get("chunk_profile") is not None:
note_pl["chunk_profile"] = fm.get("chunk_profile")
note_pl["retriever_weight"] = float(rw) if isinstance(rw, (int, float)) else 1.0
note_id = note_pl.get("note_id") or fm.get("id")
if not note_id:
print(json.dumps({"path": path, "error": "Missing note_id after payload build"}))
continue
# Bestehende Note in Qdrant (für Changed-Detektion)
old_payload = fetch_existing_note_payload(client, cfg.prefix, note_id)
has_old = old_payload is not None
old_hashes = (old_payload or {}).get("hashes") or {}
old_hash_exact = old_hashes.get(key_current)
new_hash_exact = (note_pl.get("hashes") or {}).get(key_current)
needs_baseline = (old_hash_exact is None)
hash_changed = (old_hash_exact is not None and new_hash_exact is not None and old_hash_exact != new_hash_exact)
text_changed = False
# Optionaler Text-Vergleich (teuer)
# if compare_text:
# old_text = (old_payload or {}).get("fulltext") or ""
# new_text = note_pl.get("fulltext") or (getattr(parsed, "body", "") or "")
# text_changed = (old_text != new_text)
changed = args.force_replace or (not has_old) or hash_changed or text_changed
# Feld-basierte Change-Erkennung (erzwingt Update, wenn Werte nicht stimmen)
if has_old:
if old_payload.get("retriever_weight") != note_pl.get("retriever_weight"):
changed = True
if old_payload.get("chunk_profile") != note_pl.get("chunk_profile"):
changed = True
# Chunks
try:
body_text = getattr(parsed, "body", "") or ""
chunks = assemble_chunks(fm["id"], body_text, fm.get("type", "concept"))
# WICHTIG: Note-Objekt mit verschachtelter FM + Registry an Chunk-Builder übergeben
chunk_note = {
"frontmatter": fm,
"id": fm.get("id"),
"type": fm.get("type"),
"title": fm.get("title"),
"path": note_pl.get("path") or path,
"note_id": note_pl.get("note_id"),
"tags": fm.get("tags"),
}
chunk_pls: List[Dict[str, Any]] = make_chunk_payloads(
chunk_note,
note_pl["path"],
chunks,
note_text=body_text,
types_cfg=(reg.get("types") if isinstance(reg, dict) and isinstance(reg.get("types"), dict) else reg if isinstance(reg, dict) else {}),
file_path=path,
)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "error": f"chunk build failed: {type(e).__name__}: {e}"}))
continue
# Explizites Spiegeln: retriever_weight & chunk_profile auf JEDEN Chunk
rwf = float(rw) if isinstance(rw, (int, float)) else 1.0
cpv = fm.get("chunk_profile")
for i, pl in enumerate(chunk_pls):
if "index" not in pl:
pl["index"] = i
pl["ord"] = int(pl.get("index", i)) + 1
if "retriever_weight" not in pl:
pl["retriever_weight"] = rwf
if cpv is not None:
pl["chunk_profile"] = cpv
# entferne alte Aliasse
for alias in ("chunk_num", "Chunk_Number"):
pl.pop(alias, None)
# Embeddings (optional)
vecs: List[List[float]] = [[0.0] * int(cfg.dim) for _ in chunk_pls]
if embed_texts and chunk_pls:
try:
texts_for_embed = [(pl.get("window") or pl.get("text") or "") for pl in chunk_pls]
vecs = embed_texts(texts_for_embed)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"embed_texts failed, using zeros: {e}"}))
# Artefakte vorhanden?
chunks_missing, edges_missing = artifacts_missing(client, cfg.prefix, note_id)
# Edges
edges: List[Dict[str, Any]] = []
edges_failed = False
should_build_edges = (changed and True) or edges_missing # wenn Note geändert oder Kanten fehlen
if should_build_edges:
try:
note_refs = note_pl.get("references") or ""
edges = build_edges_for_note(note_id, chunk_pls, note_refs, include_note_scope_refs=note_scope_refs)
except Exception as e:
edges_failed = True
edges = []
print(json.dumps({"path": path, "note_id": note_id, "warn": f"build_edges_for_note failed, skipping edges: {type(e).__name__}: {e}"}))
# Summary
print(json.dumps({
"note_id": note_id,
"title": fm.get("title"),
"type": fm.get("type"),
"rw": rw,
"chunk_profile": fm.get("chunk_profile"),
"chunks": len(chunk_pls),
"edges": len(edges),
"changed": changed,
"chunks_missing": chunks_missing,
"edges_missing": edges_missing,
"decision": ("apply" if args.apply else "dry-run"),
"prefix": cfg.prefix,
"path": note_pl["path"]
}, ensure_ascii=False))
# Writes
if not args.apply:
continue
# Purge bei Änderungen
if args.purge_before_upsert and changed:
try:
purge_note_artifacts(client, cfg.prefix, note_id)
except Exception as e:
print(json.dumps({"path": path, "note_id": note_id, "warn": f"purge failed: {e}"}))
# Note
if changed:
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, None, cfg.dim)
upsert_batch(client, notes_name, note_pts)
# Chunks (wenn geändert ODER vorher keine vorhanden)
if chunk_pls and (changed or chunks_missing):
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs)
upsert_batch(client, chunks_name, chunk_pts)
# Edges (wenn geändert ODER vorher keine vorhanden)
if edges and (changed or edges_missing):
edges_name, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_name, edge_pts)
print(f"Done. Processed notes: {processed}")
if __name__ == "__main__":
main()