mindnet/scripts/import_markdown.py
Lars 756d720384
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 2s
scripts/import_markdown.py aktualisiert
2025-09-04 16:42:43 +02:00

190 lines
6.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

#!/usr/bin/env python3
from __future__ import annotations
"""
WP-03 STEP 1
Ziel dieses Patches: Edges (references/backlink/references_at) entstehen direkt beim Import
auf Basis der **echten Chunks** und eines **Vorab-Note-Index**. Damit verschwinden
"dangling" references_at-Quellen aus dem Backfill-Ansatz.
Ausführung (aus Projekt-Root, im venv):
(.venv) python3 -m scripts.import_markdown --vault ./vault --apply
(.venv) python3 -m scripts.validate_edges --prefix mindnet --details
Abhängigkeiten: qdrant-client, python-dotenv
"""
import argparse
import os
import glob
import json
import sys
from typing import List, Dict
from dotenv import load_dotenv
# Core-Module
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
)
from app.core.note_payload import make_note_payload
from app.core.validate_note import validate_note_payload
from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads
from app.core.embed import embed_texts, embed_one
from app.core.qdrant import QdrantConfig, ensure_collections, get_client
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
# Neue direkte Edge-Ableitung beim Import
from app.core.derive_edges import build_note_index, derive_wikilink_edges
# -----------------------------------------------------------------------------
# Utilities
# -----------------------------------------------------------------------------
def iter_md(root: str, exclude=("/.obsidian/", "/_backup_frontmatter/", "/_imported/")) -> List[str]:
files = [p for p in glob.glob(os.path.join(root, "**", "*.md"), recursive=True)]
out: List[str] = []
for p in files:
pn = p.replace("\\", "/")
if any(ex in pn for ex in exclude):
continue
out.append(p)
return out
def minimal_note_index_payload(abs_path: str, vault_root: str) -> Dict:
"""Nur Felder, die der Resolver braucht (id/title/path)."""
parsed = read_markdown(abs_path)
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
relpath = os.path.relpath(abs_path, vault_root).replace("\\", "/")
return {
"note_id": fm.get("id") or fm.get("note_id"),
"title": fm.get("title"),
"path": relpath,
}
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def main():
load_dotenv()
ap = argparse.ArgumentParser()
ap.add_argument("--vault", required=True, help="Obsidian Vault Pfad (z.B. mindnet/vault)")
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant (sonst Dry-Run)")
ap.add_argument("--note-id", help="Nur eine Note-ID verarbeiten")
ap.add_argument("--embed-note", action="store_true", help="Auch Note-Volltext einbetten (optional)")
args = ap.parse_args()
# Qdrant
cfg = QdrantConfig(
url=os.getenv("QDRANT_URL", "http://127.0.0.1:6333"),
api_key=os.getenv("QDRANT_API_KEY") or None,
prefix=os.getenv("COLLECTION_PREFIX", "mindnet"),
dim=int(os.getenv("VECTOR_DIM", "384")),
)
client = get_client(cfg)
ensure_collections(client, cfg.prefix, cfg.dim)
root = os.path.abspath(args.vault)
files = iter_md(root)
if not files:
print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2)
# ------------------------------------------------------------------
# (1) Vorab-Lauf: Note-Index für robuste Zielauflösung
# ------------------------------------------------------------------
index_payloads: List[Dict] = []
for path in files:
try:
pl = minimal_note_index_payload(path, root)
if not pl.get("note_id"):
# ohne stabile ID lassen wir die Note komplett aus
continue
if args.note_id and pl["note_id"] != args.note_id:
continue
index_payloads.append(pl)
except Exception:
# bewusst still: Einzeldefekte sollen den Gesamtimport nicht stoppen
continue
note_index = build_note_index(index_payloads) # (by_id, by_slug, by_file_slug)
# ------------------------------------------------------------------
# (2) Hauptlauf: pro Note Chunks/Embeddings + Edges aus echten Chunks
# ------------------------------------------------------------------
total_notes = 0
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
try:
validate_required_frontmatter(fm)
except Exception:
continue
if args.note_id and fm.get("id") != args.note_id:
continue
total_notes += 1
# Note-Payload (vollständig für notes-Collection)
note_pl = make_note_payload(parsed, vault_root=root)
validate_note_payload(note_pl)
# Chunks aus Body gemäß Chunking-Strategie
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
# Embeddings (Chunks)
texts = [ch.text for ch in chunks]
vectors = embed_texts(texts)
# Optional: Note-Vektor (z. B. für Doc-Suche/Clustering)
note_vec = None
if args.embed_note:
note_vec = embed_one(parsed.body)
# Edges direkt aus realen Chunks + Note-Index
# Für derive_wikilink_edges darf der Note-Payload einen Volltext enthalten
note_pl_for_edges = dict(note_pl)
note_pl_for_edges.setdefault("fulltext", parsed.body)
edges = derive_wikilink_edges(note_pl_for_edges, chunk_pls, note_index)
# Dry-Run-Log
summary = {
"note_id": fm["id"],
"title": fm["title"],
"chunks": len(chunk_pls),
"edges": len(edges),
"path": note_pl["path"],
}
print(json.dumps(summary, ensure_ascii=False))
if args.apply:
# Notes upsert (idempotent; UUIDv5)
notes_col, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
upsert_batch(client, notes_col, note_pts)
# Chunks upsert (idempotent; chunk_id stabilisiert notfalls qdrant_points)
chunks_col, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vectors)
upsert_batch(client, chunks_col, chunk_pts)
# Edges upsert (idempotent; edge_id deterministisch aus kind/src/tgt/seq)
edges_col, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_col, edge_pts)
print(f"Done. Processed notes: {total_notes}")
if __name__ == "__main__":
main()