app/core/chunk_payload.py aktualisiert
Some checks failed
Deploy mindnet to llm-node / deploy (push) Failing after 2s

This commit is contained in:
Lars 2025-09-09 11:19:06 +02:00
parent dcb02dd194
commit 77fa57a2a6

View File

@ -1,318 +1,159 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
""" """
Script: import_markdown.py Markdown Qdrant (Notes, Chunks, Edges) Modul: app/core/chunk_payload.py
Version: 3.1.1 Version: 1.1.3
Datum: 2025-09-09 Datum: 2025-09-09
Kurzbeschreibung Kurzbeschreibung
---------------- ----------------
Liest Markdown-Dateien aus einem Vault ein und schreibt Notes, Chunks und Edges Erzeugt Qdrant-Payloads für Text-Chunks einer Note. Jeder Chunk enthält
idempotent nach Qdrant. Change-Detection basiert standardmäßig auf dem **Body-Hash** den tatsächlichen Text unter dem Schlüssel ``text``. Dadurch kann der
(ENV: MINDNET_HASH_MODE). Persistiert zusätzlich den Volltext der Note unter Exporter den vollständigen Body verlässlich aus Chunks rekonstruieren,
``payload.fulltext`` und speichert den Vault-Pfad relativ. falls ``notes.payload.fulltext`` fehlt.
Abwärtskompatibilität Wesentliche Features
--------------------- --------------------
- Keine harte Abhängigkeit mehr von nicht gelieferten Modulen (validate_note, embed, edges). - Stabile, idempotente Payload-Struktur für Chunks
* Embeddings: Nullvektor-Fallback (Dim aus ENV/Config). - Persistenter Chunk-Text (``text``)
* Edges: werden aus Chunk-Payloads + Note-Refs lokal abgeleitet. - Extraktion von Wikilinks pro Chunk (``wikilinks`` & ``references``)
- IDs, Collections, Upsert-Semantik unverändert. - Pfadübernahme (relativ zum Vault, wird vom Aufrufer geliefert)
- Bereinigung leerer Felder (keine ``None``/leere Collections im Payload)
ENV / Qdrant Abhängigkeiten
------------ --------------
- QDRANT_URL (oder QDRANT_HOST/QDRANT_PORT) - ``app.core.chunker.Chunk`` und ``assemble_chunks`` (nur für CLI-Demo)
- QDRANT_API_KEY (optional) - ``app.core.parser.extract_wikilinks`` und ``read_markdown`` (nur CLI-Demo)
- COLLECTION_PREFIX (Default: mindnet)
- VECTOR_DIM (Default: 384)
- MINDNET_HASH_MODE: "body" (Default) | "frontmatter" | "body+frontmatter"
Aufruf Beispiele (CLI Sichtprüfung)
------ ------------------------------
python3 -m scripts.import_markdown --vault ./vault python3 -m app.core.chunk_payload --from-file ./vault/demo.md --print
python3 -m scripts.import_markdown --vault ./vault --apply
python3 -m scripts.import_markdown --vault ./vault --apply --purge-before-upsert
python3 -m scripts.import_markdown --vault ./vault --note-id 20250821-foo --apply
python3 -m scripts.import_markdown --vault ./vault --apply --embed-note
""" """
from __future__ import annotations from __future__ import annotations
from typing import Dict, List, Any
import argparse import argparse
import json import json
import os import os
import sys
from typing import Dict, List, Optional, Tuple
from dotenv import load_dotenv # WICHTIG: Keine direkten Selbst- oder Kreuz-Imports, die Zyklen erzeugen!
from qdrant_client.http import models as rest # Für die Kernfunktion reicht extract_wikilinks als Typ-unabhängige Hilfsfunktion.
try:
from app.core.parser import extract_wikilinks # nur Funktion, erzeugt keine Zyklen
except Exception: # pragma: no cover
from .parser import extract_wikilinks # type: ignore
# Core/Projekt-Module (vorhanden) # Typ-Hinweis für Chunk (zur Laufzeit nicht notwendig)
from app.core.parser import ( try:
read_markdown, from app.core.chunker import Chunk # nur Typ-Annotation / Nutzung im CLI
normalize_frontmatter, except Exception: # pragma: no cover
validate_required_frontmatter, class Chunk: # type: ignore
extract_wikilinks, pass
)
from app.core.note_payload import make_note_payload
# ---------------------------------------------------------------------------
# Utils
# ---------------------------------------------------------------------------
def _drop_empty(d: Dict[str, Any]) -> Dict[str, Any]:
"""Entfernt leere/None-Felder aus einem Dict (für saubere Payloads)."""
return {k: v for k, v in d.items() if v not in (None, [], {}, "")}
# ---------------------------------------------------------------------------
# Kernfunktion
# ---------------------------------------------------------------------------
def make_chunk_payloads(note_meta: Dict[str, Any], path: str, chunks: List[Chunk]) -> List[Dict[str, Any]]:
"""
Baut Payloads für alle ``chunks`` der gegebenen Note.
Parameters
----------
note_meta : Dict[str, Any]
Minimale Metadaten der Note (mind. ``id``, ``title``; optional ``type``,
``area``, ``project``, ``tags``, ``lang``).
path : str
Relativer Pfad der Note innerhalb des Vaults (z. B. "area/topic/file.md").
chunks : List[Chunk]
Liste vorsegmentierter Chunks (vgl. app.core.chunker.Chunk).
Returns
-------
List[Dict[str, Any]]
Payload-Objekte, bereit für Qdrant-Upserts.
"""
res: List[Dict[str, Any]] = []
for ch in chunks:
# 'text' kann bei leerem Abschnitt None/'' sein → robust behandeln
text_val = getattr(ch, "text", "") or ""
wikilinks = extract_wikilinks(text_val)
payload = {
"id": getattr(ch, "id", None),
"note_id": note_meta.get("id"),
"note_title": note_meta.get("title"),
"chunk_index": getattr(ch, "index", None),
"char_start": getattr(ch, "char_start", None),
"char_end": getattr(ch, "char_end", None),
"token_count": getattr(ch, "token_count", None),
"section_title": getattr(ch, "section_title", None),
"section_path": getattr(ch, "section_path", None),
"lang": note_meta.get("lang"),
"wikilinks": wikilinks,
"external_links": [],
"references": [{"target_id": w, "kind": "wikilink"} for w in wikilinks],
"neighbors": {
"prev": getattr(ch, "neighbors_prev", None),
"next": getattr(ch, "neighbors_next", None),
},
"path": path, # vom Aufrufer relativ geliefert
"text": text_val, # WICHTIG für Export/Rekonstruktion
}
res.append(_drop_empty(payload))
return res
# ---------------------------------------------------------------------------
# CLI nur zur Sichtprüfung / Debug
# ---------------------------------------------------------------------------
def _cli() -> None:
# CLI lädt hier *lazy* die schweren Abhängigkeiten,
# damit die Modul-Initialisierung zirkularfrei bleibt.
try:
from app.core.parser import read_markdown
from app.core.chunker import assemble_chunks from app.core.chunker import assemble_chunks
from app.core.chunk_payload import make_chunk_payloads except Exception: # pragma: no cover
from app.core.qdrant import QdrantConfig, get_client, ensure_collections from .parser import read_markdown # type: ignore
from app.core.qdrant_points import ( from .chunker import assemble_chunks # type: ignore
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
# Optionale Module (wenn nicht vorhanden → Fallback) ap = argparse.ArgumentParser(description="Vorschau: Chunk-Payloads erzeugen und anzeigen")
try: ap.add_argument("--from-file", dest="src", required=True, help="Pfad zu einer Markdown-Datei")
from app.core.embed import embed_texts, embed_one # type: ignore ap.add_argument("--print", dest="do_print", action="store_true", help="Payload auf stdout ausgeben")
except Exception:
embed_texts = None
embed_one = None
# -----------------------------------------------------------------------------
# Hilfsfunktionen
# -----------------------------------------------------------------------------
def iter_md(root: str) -> List[str]:
"""Sammelt alle .md-Dateien unterhalb von root, filtert systemische Ordner."""
out: List[str] = []
for dirpath, _, filenames in os.walk(root):
for fn in filenames:
if not fn.lower().endswith(".md"):
continue
p = os.path.join(dirpath, fn)
pn = p.replace("\\", "/")
if any(ex in pn for ex in ["/.obsidian/", "/_backup_frontmatter/", "/_imported/"]):
continue
out.append(p)
return sorted(out)
def collections(prefix: str) -> Tuple[str, str, str]:
return f"{prefix}_notes", f"{prefix}_chunks", f"{prefix}_edges"
def fetch_existing_note_hash(client, prefix: str, note_id: str) -> Optional[str]:
"""Liest den bisherigen hash_fulltext aus der Notes-Collection (falls vorhanden)."""
notes_col, _, _ = collections(prefix)
f = rest.Filter(must=[rest.FieldCondition(
key="note_id",
match=rest.MatchValue(value=note_id),
)])
points, _ = client.scroll(
collection_name=notes_col,
scroll_filter=f,
with_payload=True,
with_vectors=False,
limit=1,
)
if not points:
return None
pl = points[0].payload or {}
return pl.get("hash_fulltext")
def purge_note_artifacts(client, prefix: str, note_id: str) -> None:
"""
Löscht alle Chunks+Edges zu einer Note. Collections bleiben bestehen.
"""
_, chunks_col, edges_col = collections(prefix)
# Chunks der Note löschen
f_chunks = rest.Filter(
must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]
)
client.delete(collection_name=chunks_col, points_selector=f_chunks, wait=True)
# Edges der Note löschen (Chunk- und Note-Scope)
should = [
rest.FieldCondition(key="source_id", match=rest.MatchText(text=f"{note_id}#")),
rest.FieldCondition(key="target_id", match=rest.MatchText(text=f"{note_id}#")),
rest.FieldCondition(key="source_id", match=rest.MatchValue(value=note_id)),
rest.FieldCondition(key="target_id", match=rest.MatchValue(value=note_id)),
]
f_edges = rest.Filter(should=should)
client.delete(collection_name=edges_col, points_selector=f_edges, wait=True)
def has_chunk_level_refs(chunk_payloads: List[Dict]) -> bool:
return any(isinstance(ch.get("references"), list) and ch["references"] for ch in chunk_payloads)
def derive_edges(note_meta: Dict, chunk_payloads: List[Dict]) -> List[Dict]:
"""
Erzeugt Edges aus den verfügbaren Payloads (ohne externe Abhängigkeiten).
- belongs_to (chunk -> note)
- prev/next (zwischen chunks)
- references (aus chunk.references[*].target_id) scope: chunk
- backlink (note <-> note) aus note_meta['references'] (falls vorhanden) scope: note
"""
edges: List[Dict] = []
# belongs_to + prev/next (Chunk-Scope)
for ch in chunk_payloads:
src = ch["id"]
edges.append({"src_id": src, "dst_id": note_meta["id"], "edge_type": "belongs_to", "scope": "chunk"})
nb = ch.get("neighbors") or {}
prev_id = nb.get("prev")
next_id = nb.get("next")
if prev_id:
edges.append({"src_id": prev_id, "dst_id": src, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": src, "dst_id": prev_id, "edge_type": "prev", "scope": "chunk"})
if next_id:
edges.append({"src_id": src, "dst_id": next_id, "edge_type": "next", "scope": "chunk"})
edges.append({"src_id": next_id, "dst_id": src, "edge_type": "prev", "scope": "chunk"})
# references aus Chunk
for ref in (ch.get("references") or []):
tid = ref.get("target_id")
if not tid:
continue
edges.append({"src_id": src, "dst_id": tid, "edge_type": "references", "scope": "chunk"})
# Note-Scope references/backlink
for tid in (note_meta.get("references") or []):
edges.append({"src_id": note_meta["id"], "dst_id": tid, "edge_type": "references", "scope": "note"})
edges.append({"src_id": tid, "dst_id": note_meta["id"], "edge_type": "backlink", "scope": "note"})
# Dedupe
uniq = {}
for e in edges:
key = (e["src_id"], e["dst_id"], e["edge_type"], e.get("scope", ""))
uniq[key] = e
return list(uniq.values())
def _normalize_rel_path(abs_path: str, vault_root: str) -> str:
"""Gibt einen **relativen** Pfad zurück, normalisiert auf forward slashes."""
try:
rel = os.path.relpath(abs_path, vault_root)
except Exception:
rel = abs_path # Fallback
return rel.replace("\\", "/").lstrip("/")
# -----------------------------------------------------------------------------
# Main
# -----------------------------------------------------------------------------
def main() -> None:
load_dotenv()
ap = argparse.ArgumentParser()
ap.add_argument("--vault", required=True, help="Pfad zum Obsidian-Vault (Root-Ordner)")
ap.add_argument("--apply", action="store_true", help="Schreibt in Qdrant; ohne Flag nur Dry-Run")
ap.add_argument("--purge-before-upsert", action="store_true",
help="Vor Upsert Chunks & Edges der GEÄNDERTEN Note löschen")
ap.add_argument("--note-id", help="Nur eine bestimmte Note-ID verarbeiten")
ap.add_argument("--embed-note", action="store_true", help="Optional: Note-Volltext einbetten")
ap.add_argument("--force-replace", action="store_true",
help="Änderungserkennung ignorieren und immer upserten (+ optional Purge)")
args = ap.parse_args() args = ap.parse_args()
# Qdrant note = read_markdown(args.src)
cfg = QdrantConfig.from_env() note_meta = {
client = get_client(cfg) "id": note.frontmatter.get("id") or note.frontmatter.get("note_id"),
ensure_collections(client, cfg.prefix, cfg.dim) "title": note.frontmatter.get("title"),
notes_col, chunks_col, edges_col = collections(cfg.prefix) "type": note.frontmatter.get("type"),
"area": note.frontmatter.get("area"),
root = os.path.abspath(args.vault) "project": note.frontmatter.get("project"),
files = iter_md(root) "tags": note.frontmatter.get("tags", []),
if not files: "lang": note.frontmatter.get("lang"),
print("Keine Markdown-Dateien gefunden.", file=sys.stderr)
sys.exit(2)
processed = 0
for path in files:
parsed = read_markdown(path)
fm = normalize_frontmatter(parsed.frontmatter)
# Pflichtfelder prüfen (z. B. id, title) bei Fehler: Note überspringen
try:
validate_required_frontmatter(fm)
except Exception as e:
print(json.dumps({"path": path, "error": f"Frontmatter invalid: {e}"}))
continue
if args.note_id and fm.get("id") != args.note_id:
continue
processed += 1
# Note-Payload
note_pl = make_note_payload(parsed, vault_root=root)
if "fulltext" not in (note_pl or {}):
note_pl["fulltext"] = parsed.body or ""
if note_pl.get("path"):
note_pl["path"] = _normalize_rel_path(
os.path.join(root, note_pl["path"]) if not os.path.isabs(note_pl["path"]) else note_pl["path"], root
)
else:
note_pl["path"] = _normalize_rel_path(parsed.path, root)
note_id = note_pl["note_id"]
# Change-Detection
new_hash = note_pl.get("hash_fulltext")
old_hash = None if args.force_replace else fetch_existing_note_hash(client, cfg.prefix, note_id)
changed = args.force_replace or (old_hash != new_hash)
# Chunks + Embeddings (mit Fallback Nullvektoren)
chunks = assemble_chunks(fm["id"], parsed.body, fm.get("type", "concept"))
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks)
if embed_texts:
vecs = embed_texts([getattr(c, "text", "") for c in chunks])
else:
vecs = [[0.0] * cfg.dim for _ in chunks]
# Optionaler Note-Vektor (Fallback Nullvektor)
if args.embed_note and embed_one:
note_vec = embed_one(parsed.body or "")
elif args.embed_note:
note_vec = [0.0] * cfg.dim
else:
note_vec = None
# Edges lokal ableiten
edges = derive_edges(fm | {"id": note_id, "references": note_pl.get("references")}, chunk_pls)
# Zusammenfassung
summary = {
"note_id": note_id,
"title": fm.get("title"),
"chunks": len(chunk_pls),
"edges": len(edges),
"changed": changed,
"decision": ("apply" if args.apply and changed else
"apply-skip-unchanged" if args.apply and not changed else
"dry-run"),
"path": note_pl["path"],
} }
print(json.dumps(summary, ensure_ascii=False))
# Dry-Run? chunks = assemble_chunks(note.frontmatter.get("id"), note.body or "", note.frontmatter.get("type", "concept"))
if not args.apply:
continue
# Optionaler Purge NUR für geänderte Notes # Vault-Root heuristisch relativieren (nur für Demo)
if changed and args.purge_before_upsert: vault_root = os.path.dirname(os.path.dirname(args.src)) # heuristisch
purge_note_artifacts(client, cfg.prefix, note_id) rel_path = os.path.relpath(args.src, vault_root).replace("\\", "/").lstrip("/")
# Upserts: Notes / Chunks / Edges payloads = make_chunk_payloads(note_meta, rel_path, chunks)
notes_name, note_pts = points_for_note(cfg.prefix, note_pl, note_vec, cfg.dim)
upsert_batch(client, notes_name, note_pts)
chunks_name, chunk_pts = points_for_chunks(cfg.prefix, chunk_pls, vecs) if args.do_print:
upsert_batch(client, chunks_name, chunk_pts) print(json.dumps(payloads, ensure_ascii=False, indent=2))
edges_name, edge_pts = points_for_edges(cfg.prefix, edges)
upsert_batch(client, edges_name, edge_pts)
print(f"Done. Processed notes: {processed}")
if __name__ == "__main__": if __name__ == "__main__": # pragma: no cover
main() _cli()