379 lines
17 KiB
Python
379 lines
17 KiB
Python
"""
|
||
FILE: app/core/ingestion.py
|
||
DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen (Notes, Chunks, Edges).
|
||
WP-20: Integration von Smart Edge Allocation via Hybrid LLM (Gemini/Gemma/OpenRouter).
|
||
WP-22: Integration von Content Lifecycle (Status Gate) und Edge Registry Validation.
|
||
WP-22: Kontextsensitive Kanten-Validierung mit Fundort-Reporting (Zeilennummern).
|
||
WP-22: Multi-Hash Refresh für konsistente Change Detection.
|
||
FIX: Robuste Verarbeitung von LLM-Antworten (Dict vs String) zur Vermeidung von Item-Assignment-Errors.
|
||
VERSION: 2.11.5
|
||
STATUS: Active
|
||
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry
|
||
EXTERNAL_CONFIG: config/types.yaml, config/prompts.yaml
|
||
"""
|
||
import os
|
||
import json
|
||
import logging
|
||
import asyncio
|
||
import time
|
||
from typing import Dict, List, Optional, Tuple, Any
|
||
|
||
# Core Module Imports
|
||
from app.core.parser import (
|
||
read_markdown,
|
||
normalize_frontmatter,
|
||
validate_required_frontmatter,
|
||
extract_edges_with_context,
|
||
)
|
||
from app.core.note_payload import make_note_payload
|
||
from app.core.chunker import assemble_chunks, get_chunk_config
|
||
from app.core.chunk_payload import make_chunk_payloads
|
||
|
||
# Fallback für Edges
|
||
try:
|
||
from app.core.derive_edges import build_edges_for_note
|
||
except ImportError:
|
||
def build_edges_for_note(*args, **kwargs): return []
|
||
|
||
from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes
|
||
from app.core.qdrant_points import (
|
||
points_for_chunks,
|
||
points_for_note,
|
||
points_for_edges,
|
||
upsert_batch,
|
||
)
|
||
|
||
from app.services.embeddings_client import EmbeddingsClient
|
||
from app.services.edge_registry import registry as edge_registry
|
||
from app.services.llm_service import LLMService
|
||
|
||
logger = logging.getLogger(__name__)
|
||
|
||
# --- Helper ---
|
||
def load_type_registry(custom_path: Optional[str] = None) -> dict:
|
||
"""Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion."""
|
||
import yaml
|
||
from app.config import get_settings
|
||
settings = get_settings()
|
||
path = custom_path or settings.MINDNET_TYPES_FILE
|
||
if not os.path.exists(path): return {}
|
||
try:
|
||
with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {}
|
||
except Exception: return {}
|
||
|
||
def resolve_note_type(requested: Optional[str], reg: dict) -> str:
|
||
"""Bestimmt den finalen Notiz-Typ (Fallback auf 'concept')."""
|
||
types = reg.get("types", {})
|
||
if requested and requested in types: return requested
|
||
return "concept"
|
||
|
||
def effective_chunk_profile_name(fm: dict, note_type: str, reg: dict) -> str:
|
||
"""Ermittelt den Namen des zu nutzenden Chunk-Profils."""
|
||
override = fm.get("chunking_profile") or fm.get("chunk_profile")
|
||
if override and isinstance(override, str): return override
|
||
t_cfg = reg.get("types", {}).get(note_type, {})
|
||
if t_cfg:
|
||
cp = t_cfg.get("chunking_profile") or t_cfg.get("chunk_profile")
|
||
if cp: return cp
|
||
return reg.get("defaults", {}).get("chunking_profile", "sliding_standard")
|
||
|
||
def effective_retriever_weight(fm: dict, note_type: str, reg: dict) -> float:
|
||
"""Ermittelt das effektive retriever_weight für das Scoring."""
|
||
override = fm.get("retriever_weight")
|
||
if override is not None:
|
||
try: return float(override)
|
||
except: pass
|
||
t_cfg = reg.get("types", {}).get(note_type, {})
|
||
if t_cfg and "retriever_weight" in t_cfg: return float(t_cfg["retriever_weight"])
|
||
return float(reg.get("defaults", {}).get("retriever_weight", 1.0))
|
||
|
||
|
||
class IngestionService:
|
||
def __init__(self, collection_prefix: str = None):
|
||
from app.config import get_settings
|
||
self.settings = get_settings()
|
||
|
||
self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX
|
||
self.cfg = QdrantConfig.from_env()
|
||
self.cfg.prefix = self.prefix
|
||
self.client = get_client(self.cfg)
|
||
self.dim = self.settings.VECTOR_SIZE
|
||
self.registry = load_type_registry()
|
||
self.embedder = EmbeddingsClient()
|
||
self.llm = LLMService()
|
||
|
||
self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE
|
||
|
||
try:
|
||
ensure_collections(self.client, self.prefix, self.dim)
|
||
ensure_payload_indexes(self.client, self.prefix)
|
||
except Exception as e:
|
||
logger.warning(f"DB init warning: {e}")
|
||
|
||
def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]:
|
||
"""Holt die Chunker-Parameter für ein spezifisches Profil."""
|
||
profiles = self.registry.get("chunking_profiles", {})
|
||
if profile_name in profiles:
|
||
cfg = profiles[profile_name].copy()
|
||
if "overlap" in cfg and isinstance(cfg["overlap"], list):
|
||
cfg["overlap"] = tuple(cfg["overlap"])
|
||
return cfg
|
||
return get_chunk_config(note_type)
|
||
|
||
async def _perform_smart_edge_allocation(self, text: str, note_id: str) -> List[Dict]:
|
||
"""
|
||
WP-20: Nutzt den Hybrid LLM Service für die semantische Kanten-Extraktion.
|
||
QUOTEN-SCHUTZ: Bevorzugt OpenRouter (Gemma 2), um Gemini-Tageslimits zu schonen.
|
||
"""
|
||
provider = "openrouter" if self.settings.OPENROUTER_API_KEY else self.settings.MINDNET_LLM_PROVIDER
|
||
model = self.settings.GEMMA_MODEL
|
||
|
||
logger.info(f"🚀 [Ingestion] Turbo-Mode: Extracting edges for '{note_id}' using {model} on {provider}")
|
||
|
||
# WP-22: Hole valide Typen für das Prompt-Template
|
||
edge_registry.ensure_latest()
|
||
valid_types_str = ", ".join(sorted(list(edge_registry.valid_types)))
|
||
|
||
template = self.llm.get_prompt("edge_extraction", provider)
|
||
|
||
try:
|
||
# Befülle das Template (v2.5.0 erwartet valid_types)
|
||
prompt = template.format(
|
||
text=text[:6000],
|
||
note_id=note_id,
|
||
valid_types=valid_types_str
|
||
)
|
||
|
||
response_json = await self.llm.generate_raw_response(
|
||
prompt=prompt,
|
||
priority="background",
|
||
force_json=True,
|
||
provider=provider,
|
||
model_override=model
|
||
)
|
||
|
||
# Robustes Parsing (WP-20 Fix für 'str' object assignment error)
|
||
raw_data = json.loads(response_json)
|
||
processed_edges = []
|
||
|
||
# Das LLM liefert manchmal ein Dict mit einem Key statt einer Liste
|
||
if isinstance(raw_data, dict):
|
||
logger.debug(f"ℹ️ [Ingestion] LLM returned dict for {note_id}, attempting recovery.")
|
||
for key in ["edges", "links", "results", "kanten"]:
|
||
if key in raw_data and isinstance(raw_data[key], list):
|
||
raw_data = raw_data[key]
|
||
break
|
||
|
||
if not isinstance(raw_data, list):
|
||
logger.warning(f"⚠️ [Ingestion] LLM output for {note_id} is not a list: {type(raw_data)}")
|
||
return []
|
||
|
||
for item in raw_data:
|
||
# Fall 1: Element ist bereits ein Dict (Idealfall)
|
||
if isinstance(item, dict) and "to" in item:
|
||
item["provenance"] = "semantic_ai"
|
||
item["line"] = f"ai-{provider}"
|
||
processed_edges.append(item)
|
||
|
||
# Fall 2: Element ist ein String (z.B. "kind:target") -> Umwandlung
|
||
elif isinstance(item, str) and ":" in item:
|
||
parts = item.split(":", 1)
|
||
processed_edges.append({
|
||
"to": parts[1].strip(),
|
||
"kind": parts[0].strip(),
|
||
"provenance": "semantic_ai",
|
||
"line": f"ai-{provider}"
|
||
})
|
||
else:
|
||
logger.debug(f"⏩ [Ingestion] Skipping unparseable AI edge: {item}")
|
||
|
||
return processed_edges
|
||
|
||
except Exception as e:
|
||
logger.warning(f"⚠️ [Ingestion] Smart Edge Allocation failed for {note_id} on {provider}: {e}")
|
||
return []
|
||
|
||
async def process_file(
|
||
self,
|
||
file_path: str,
|
||
vault_root: str,
|
||
force_replace: bool = False,
|
||
apply: bool = False,
|
||
purge_before: bool = False,
|
||
note_scope_refs: bool = False,
|
||
hash_source: str = "parsed",
|
||
hash_normalize: str = "canonical"
|
||
) -> Dict[str, Any]:
|
||
"""Verarbeitet eine Markdown-Datei und schreibt sie in den Graphen."""
|
||
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
|
||
|
||
# 1. Parse & Frontmatter Validation
|
||
try:
|
||
parsed = read_markdown(file_path)
|
||
if not parsed: return {**result, "error": "Empty or unreadable file"}
|
||
fm = normalize_frontmatter(parsed.frontmatter)
|
||
validate_required_frontmatter(fm)
|
||
except Exception as e:
|
||
logger.error(f"Validation failed for {file_path}: {e}")
|
||
return {**result, "error": f"Validation failed: {str(e)}"}
|
||
|
||
# --- WP-22: Content Lifecycle Gate ---
|
||
status = fm.get("status", "draft").lower().strip()
|
||
if status in ["system", "template", "archive", "hidden"]:
|
||
return {**result, "status": "skipped", "reason": f"lifecycle_status_{status}"}
|
||
|
||
# 2. Type & Config Resolution
|
||
note_type = resolve_note_type(fm.get("type"), self.registry)
|
||
fm["type"] = note_type
|
||
effective_profile = effective_chunk_profile_name(fm, note_type, self.registry)
|
||
effective_weight = effective_retriever_weight(fm, note_type, self.registry)
|
||
|
||
fm["chunk_profile"] = effective_profile
|
||
fm["retriever_weight"] = effective_weight
|
||
|
||
# 3. Build Note Payload
|
||
try:
|
||
note_pl = make_note_payload(parsed, vault_root=vault_root, hash_normalize=hash_normalize, hash_source=hash_source, file_path=file_path)
|
||
if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or ""
|
||
note_pl["retriever_weight"] = effective_weight
|
||
note_pl["chunk_profile"] = effective_profile
|
||
note_pl["status"] = status
|
||
note_id = note_pl["note_id"]
|
||
except Exception as e:
|
||
return {**result, "error": f"Payload build failed: {str(e)}"}
|
||
|
||
# 4. Change Detection (WP-22 Multi-Hash)
|
||
old_payload = None
|
||
if not force_replace:
|
||
old_payload = self._fetch_note_payload(note_id)
|
||
|
||
has_old = old_payload is not None
|
||
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
|
||
|
||
old_hashes = (old_payload or {}).get("hashes", {})
|
||
old_hash = old_hashes.get(check_key) if isinstance(old_hashes, dict) else None
|
||
new_hash = note_pl.get("hashes", {}).get(check_key)
|
||
|
||
hash_changed = (old_hash != new_hash)
|
||
chunks_missing, edges_missing = self._artifacts_missing(note_id)
|
||
|
||
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
|
||
|
||
if not should_write:
|
||
return {**result, "status": "unchanged", "note_id": note_id}
|
||
|
||
if not apply:
|
||
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
||
|
||
# 5. Processing (Chunking, Embedding, Edge Generation)
|
||
try:
|
||
body_text = getattr(parsed, "body", "") or ""
|
||
|
||
# WP-22 STABILITY PATCH: Prüfen, ob ensure_latest existiert
|
||
if hasattr(edge_registry, "ensure_latest"):
|
||
edge_registry.ensure_latest()
|
||
|
||
chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type)
|
||
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
|
||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
||
|
||
vecs = []
|
||
if chunk_pls:
|
||
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
||
vecs = await self.embedder.embed_documents(texts)
|
||
|
||
# --- WP-22/WP-20: Kanten-Extraktion & Validierung ---
|
||
edges = []
|
||
context = {"file": file_path, "note_id": note_id}
|
||
|
||
# A. Explizite User-Kanten
|
||
explicit_edges = extract_edges_with_context(parsed)
|
||
for e in explicit_edges:
|
||
e["kind"] = edge_registry.resolve(edge_type=e["kind"], provenance="explicit", context={**context, "line": e.get("line")})
|
||
edges.append(e)
|
||
|
||
# B. WP-20: Smart AI Edges (Hybrid Turbo Acceleration)
|
||
ai_edges = await self._perform_smart_edge_allocation(body_text, note_id)
|
||
for e in ai_edges:
|
||
# Validierung gegen EdgeRegistry (Vermeidet 'Transition' etc.)
|
||
valid_kind = edge_registry.resolve(edge_type=e.get("kind"), provenance="semantic_ai", context={**context, "line": e.get("line")})
|
||
e["kind"] = valid_kind
|
||
edges.append(e)
|
||
|
||
# C. System-Kanten (Struktur)
|
||
try:
|
||
raw_system_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs)
|
||
except TypeError:
|
||
raw_system_edges = build_edges_for_note(note_id, chunk_pls)
|
||
|
||
for e in raw_system_edges:
|
||
valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"})
|
||
if valid_kind:
|
||
e["kind"] = valid_kind
|
||
edges.append(e)
|
||
|
||
except Exception as e:
|
||
logger.error(f"Processing failed for {file_path}: {e}", exc_info=True)
|
||
return {**result, "error": f"Processing failed: {str(e)}"}
|
||
|
||
# 6. Upsert
|
||
try:
|
||
if purge_before and has_old: self._purge_artifacts(note_id)
|
||
|
||
n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim)
|
||
upsert_batch(self.client, n_name, n_pts)
|
||
|
||
if chunk_pls and vecs:
|
||
c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)
|
||
upsert_batch(self.client, c_name, c_pts)
|
||
|
||
if edges:
|
||
e_name, e_pts = points_for_edges(self.prefix, edges)
|
||
upsert_batch(self.client, e_name, e_pts)
|
||
|
||
return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)}
|
||
except Exception as e:
|
||
logger.error(f"Upsert failed for {note_id}: {e}", exc_info=True)
|
||
return {**result, "error": f"DB Upsert failed: {e}"}
|
||
|
||
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
||
from qdrant_client.http import models as rest
|
||
col = f"{self.prefix}_notes"
|
||
try:
|
||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||
pts, _ = self.client.scroll(collection_name=col, scroll_filter=f, limit=1, with_payload=True)
|
||
return pts[0].payload if pts else None
|
||
except: return None
|
||
|
||
def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]:
|
||
from qdrant_client.http import models as rest
|
||
try:
|
||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||
c_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_chunks", scroll_filter=f, limit=1)
|
||
e_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_edges", scroll_filter=f, limit=1)
|
||
return (not bool(c_pts)), (not bool(e_pts))
|
||
except: return True, True
|
||
|
||
def _purge_artifacts(self, note_id: str):
|
||
from qdrant_client.http import models as rest
|
||
f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))])
|
||
selector = rest.FilterSelector(filter=f)
|
||
for suffix in ["chunks", "edges"]:
|
||
try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
|
||
except Exception: pass
|
||
|
||
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
|
||
"""Hilfsmethode zur Erstellung einer Note aus einem Textstream."""
|
||
target_dir = os.path.join(vault_root, folder)
|
||
os.makedirs(target_dir, exist_ok=True)
|
||
file_path = os.path.join(target_dir, filename)
|
||
try:
|
||
with open(file_path, "w", encoding="utf-8") as f:
|
||
f.write(markdown_content)
|
||
f.flush()
|
||
os.fsync(f.fileno())
|
||
await asyncio.sleep(0.1)
|
||
logger.info(f"Written file to {file_path}")
|
||
except Exception as e:
|
||
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
|
||
return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True) |