mindnet/app/core/ingestion.py
2025-12-23 07:31:33 +01:00

383 lines
16 KiB
Python

"""
FILE: app/core/ingestion.py
DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen (Notes, Chunks, Edges).
FIX: Korrekte Priorisierung von Frontmatter für chunk_profile und retriever_weight.
Lade Chunk-Config basierend auf dem effektiven Profil, nicht nur dem Notiz-Typ.
WP-22: Integration von Content Lifecycle (Status Gate) und Edge Registry Validation.
WP-22: Kontextsensitive Kanten-Validierung mit Fundort-Reporting (Zeilennummern).
WP-22: Multi-Hash Refresh für konsistente Change Detection.
VERSION: 2.9.0 (WP-22 Full Integration: Context-Aware Registry)
STATUS: Active
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.core.derive_edges, app.core.qdrant*, app.services.embeddings_client, app.services.edge_registry
EXTERNAL_CONFIG: config/types.yaml
"""
import os
import logging
import asyncio
import time
from typing import Dict, List, Optional, Tuple, Any
# Core Module Imports
from app.core.parser import (
read_markdown,
normalize_frontmatter,
validate_required_frontmatter,
extract_edges_with_context, # WP-22: Neue Funktion für Zeilennummern
)
from app.core.note_payload import make_note_payload
from app.core.chunker import assemble_chunks, get_chunk_config
from app.core.chunk_payload import make_chunk_payloads
# Fallback für Edges
try:
from app.core.derive_edges import build_edges_for_note
except ImportError:
def build_edges_for_note(*args, **kwargs): return []
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
from app.core.qdrant_points import (
points_for_chunks,
points_for_note,
points_for_edges,
upsert_batch,
)
from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry
logger = logging.getLogger(__name__)
# --- Helper ---
def load_type_registry(custom_path: Optional[str] = None) -> dict:
"""Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion."""
import yaml
path = custom_path or os.getenv("MINDNET_TYPES_FILE", "config/types.yaml")
if not os.path.exists(path): return {}
try:
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
except Exception: return {}
def resolve_note_type(requested: Optional[str], reg: dict) -> str:
"""Bestimmt den finalen Notiz-Typ (Fallback auf 'concept')."""
types = reg.get("types", {})
if requested and requested in types: return requested
return "concept"
def effective_chunk_profile_name(fm: dict, note_type: str, reg: dict) -> str:
"""
Ermittelt den Namen des zu nutzenden Chunk-Profils.
Priorität: 1. Frontmatter Override -> 2. Type Config -> 3. Global Default
"""
# 1. Frontmatter Override
override = fm.get("chunking_profile") or fm.get("chunk_profile")
if override and isinstance(override, str):
return override
# 2. Type Config
t_cfg = reg.get("types", {}).get(note_type, {})
if t_cfg:
cp = t_cfg.get("chunking_profile") or t_cfg.get("chunk_profile")
if cp: return cp
# 3. Global Default
return reg.get("defaults", {}).get("chunking_profile", "sliding_standard")
def effective_retriever_weight(fm: dict, note_type: str, reg: dict) -> float:
"""
Ermittelt das effektive retriever_weight für das Scoring.
Priorität: 1. Frontmatter Override -> 2. Type Config -> 3. Global Default
"""
# 1. Frontmatter Override
override = fm.get("retriever_weight")
if override is not None:
try: return float(override)
except: pass
# 2. Type Config
t_cfg = reg.get("types", {}).get(note_type, {})
if t_cfg and "retriever_weight" in t_cfg:
return float(t_cfg["retriever_weight"])
# 3. Global Default
return float(reg.get("defaults", {}).get("retriever_weight", 1.0))
class IngestionService:
def __init__(self, collection_prefix: str = None):
env_prefix = os.getenv("COLLECTION_PREFIX", "mindnet")
self.prefix = collection_prefix or env_prefix
self.cfg = QdrantConfig.from_env()
self.cfg.prefix = self.prefix
self.client = get_client(self.cfg)
self.dim = self.cfg.dim
self.registry = load_type_registry()
self.embedder = EmbeddingsClient()
# Change Detection Modus (full oder body)
self.active_hash_mode = os.getenv("MINDNET_CHANGE_DETECTION_MODE", "full")
try:
ensure_collections(self.client, self.prefix, self.dim)
ensure_payload_indexes(self.client, self.prefix)
except Exception as e:
logger.warning(f"DB init warning: {e}")
def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]:
"""Holt die Chunker-Parameter (max, target, overlap) für ein spezifisches Profil."""
profiles = self.registry.get("chunking_profiles", {})
if profile_name in profiles:
cfg = profiles[profile_name].copy()
if "overlap" in cfg and isinstance(cfg["overlap"], list):
cfg["overlap"] = tuple(cfg["overlap"])
return cfg
return get_chunk_config(note_type)
async def process_file(
self,
file_path: str,
vault_root: str,
force_replace: bool = False,
apply: bool = False,
purge_before: bool = False,
note_scope_refs: bool = False,
hash_source: str = "parsed",
hash_normalize: str = "canonical"
) -> Dict[str, Any]:
"""
Verarbeitet eine Markdown-Datei und schreibt sie in den Graphen.
Folgt dem 14-Schritte-Workflow.
"""
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
# 1. Parse & Frontmatter Validation
try:
parsed = read_markdown(file_path)
if not parsed: return {**result, "error": "Empty or unreadable file"}
fm = normalize_frontmatter(parsed.frontmatter)
validate_required_frontmatter(fm)
except Exception as e:
logger.error(f"Validation failed for {file_path}: {e}")
return {**result, "error": f"Validation failed: {str(e)}"}
# --- WP-22: Content Lifecycle Gate ---
status = fm.get("status", "draft").lower().strip()
# Hard Skip für System- oder Archiv-Dateien
if status in ["system", "template", "archive", "hidden"]:
logger.info(f"Skipping file {file_path} (Status: {status})")
return {**result, "status": "skipped", "reason": f"lifecycle_status_{status}"}
# 2. Type & Config Resolution
note_type = resolve_note_type(fm.get("type"), self.registry)
fm["type"] = note_type
effective_profile = effective_chunk_profile_name(fm, note_type, self.registry)
effective_weight = effective_retriever_weight(fm, note_type, self.registry)
fm["chunk_profile"] = effective_profile
fm["retriever_weight"] = effective_weight
# 3. Build Note Payload (Inkl. Multi-Hash für WP-22)
try:
note_pl = make_note_payload(
parsed,
vault_root=vault_root,
hash_normalize=hash_normalize,
hash_source=hash_source,
file_path=file_path
)
# Text Body Fallback
if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or ""
# Sicherstellen der effektiven Werte im Payload
note_pl["retriever_weight"] = effective_weight
note_pl["chunk_profile"] = effective_profile
# WP-22: Status speichern
note_pl["status"] = status
note_id = note_pl["note_id"]
except Exception as e:
logger.error(f"Payload build failed: {e}")
return {**result, "error": f"Payload build failed: {str(e)}"}
# 4. Change Detection
old_payload = None
if not force_replace:
old_payload = self._fetch_note_payload(note_id)
has_old = old_payload is not None
# Prüfung gegen den aktuell konfigurierten Hash-Modus (body oder full)
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
old_hashes = (old_payload or {}).get("hashes")
if isinstance(old_hashes, dict): old_hash = old_hashes.get(check_key)
elif isinstance(old_hashes, str) and self.active_hash_mode == "body": old_hash = old_hashes
else: old_hash = None
new_hash = note_pl.get("hashes", {}).get(check_key)
hash_changed = (old_hash != new_hash)
chunks_missing, edges_missing = self._artifacts_missing(note_id)
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
if not should_write:
return {**result, "status": "unchanged", "note_id": note_id}
if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
# 5. Processing (Chunking, Embedding, Edge Generation)
try:
body_text = getattr(parsed, "body", "") or ""
# WP-22: Sicherstellen, dass die Registry aktuell ist (Lazy Reload)
edge_registry.ensure_latest()
# Konfiguration für das spezifische Profil laden
chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type)
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
# Chunks mit Metadaten anreichern
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
vecs = []
if chunk_pls:
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
try:
if hasattr(self.embedder, 'embed_documents'):
vecs = await self.embedder.embed_documents(texts)
else:
for t in texts:
v = await self.embedder.embed_query(t)
vecs.append(v)
except Exception as e:
logger.error(f"Embedding failed: {e}")
raise RuntimeError(f"Embedding failed: {e}")
# --- WP-22: Kanten-Extraktion & Validierung ---
# A. Explizite User-Kanten mit Zeilennummern extrahieren
explicit_edges = extract_edges_with_context(parsed)
# B. System-Kanten generieren (Struktur: belongs_to, next, prev)
try:
raw_system_edges = build_edges_for_note(
note_id,
chunk_pls,
note_level_references=note_pl.get("references", []),
include_note_scope_refs=note_scope_refs
)
except TypeError:
raw_system_edges = build_edges_for_note(note_id, chunk_pls)
# C. Alle Kanten validieren und über die Registry mappen
edges = []
context = {"file": file_path, "note_id": note_id}
# Zuerst User-Kanten (provenance="explicit")
for e in explicit_edges:
valid_kind = edge_registry.resolve(
edge_type=e["kind"],
provenance="explicit",
context={**context, "line": e.get("line")}
)
e["kind"] = valid_kind
edges.append(e)
# Dann System-Kanten (provenance="structure")
for e in raw_system_edges:
# Sicherstellen, dass System-Kanten korrekt markiert sind
valid_kind = edge_registry.resolve(
edge_type=e.get("kind", "belongs_to"),
provenance="structure",
context={**context, "line": "system"}
)
e["kind"] = valid_kind
# Nur hinzufügen, wenn die Registry einen validen Typ zurückgibt
if valid_kind:
edges.append(e)
except Exception as e:
logger.error(f"Processing failed: {e}", exc_info=True)
return {**result, "error": f"Processing failed: {str(e)}"}
# 6. Upsert in Qdrant
try:
# Alte Fragmente löschen, um "Geister-Chunks" zu vermeiden
if purge_before and has_old:
self._purge_artifacts(note_id)
# Note Metadaten
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
upsert_batch(self.client, n_name, n_pts)
# Chunks (Vektoren)
if chunk_pls and vecs:
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
upsert_batch(self.client, c_name, c_pts)
# Kanten
if edges:
e_name, e_pts = points_for_edges(self.prefix, edges)
upsert_batch(self.client, e_name, e_pts)
return {
"path": file_path,
"status": "success",
"changed": True,
"note_id": note_id,
"chunks_count": len(chunk_pls),
"edges_count": len(edges)
}
except Exception as e:
logger.error(f"Upsert failed: {e}", exc_info=True)
return {**result, "error": f"DB Upsert failed: {e}"}
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
"""Holt das aktuelle Payload einer Note aus Qdrant."""
from qdrant_client.http import models as rest
col = f"{self.prefix}_notes"
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
pts, _ = self.client.scroll(collection_name=col, scroll_filter=f, limit=1, with_payload=True)
return pts[0].payload if pts else None
except: return None
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
"""Prüft, ob Chunks oder Kanten für eine Note fehlen (Integritätscheck)."""
from qdrant_client.http import models as rest
c_col = f"{self.prefix}_chunks"
e_col = f"{self.prefix}_edges"
try:
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
c_pts, _ = self.client.scroll(collection_name=c_col, scroll_filter=f, limit=1)
e_pts, _ = self.client.scroll(collection_name=e_col, scroll_filter=f, limit=1)
return (not bool(c_pts)), (not bool(e_pts))
except: return True, True
def _purge_artifacts(self, note_id: str):
"""Löscht alle Chunks und Edges einer Note (vor dem Neu-Schreiben)."""
from qdrant_client.http import models as rest
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
selector = rest.FilterSelector(filter=f)
for suffix in ["chunks", "edges"]:
try:
self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
except Exception: pass
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
"""Hilfsmethode zur Erstellung einer Note aus einem Textstream (Editor-Save)."""
target_dir = os.path.join(vault_root, folder)
os.makedirs(target_dir, exist_ok=True)
file_path = os.path.join(target_dir, filename)
try:
with open(file_path, "w", encoding="utf-8") as f:
f.write(markdown_content)
f.flush()
os.fsync(f.fileno())
await asyncio.sleep(0.1)
logger.info(f"Written file to {file_path}")
except Exception as e:
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)