""" FILE: app/core/ingestion.py DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen. WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free). WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash. WP-15b: Two-Pass Ingestion mit LocalBatchCache & Candidate-Validation. Sichert, dass explizite Kanten direkt übernommen und nur Pool-Kanten validiert werden. FIX: Deep Fallback Logic (v2.11.14) für JSON-Recovery. Robust Lookup Fix: Adressiert Notizen im Cache via ID, Titel und Dateiname. VERSION: 2.12.2 STATUS: Active DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry """ import os import json import re import logging import asyncio import time from typing import Dict, List, Optional, Tuple, Any # Core Module Imports from app.core.parser import ( read_markdown, pre_scan_markdown, normalize_frontmatter, validate_required_frontmatter, extract_edges_with_context, NoteContext ) from app.core.note_payload import make_note_payload from app.core.chunker import assemble_chunks, get_chunk_config from app.core.chunk_payload import make_chunk_payloads # Fallback für Edges try: from app.core.derive_edges import build_edges_for_note except ImportError: def build_edges_for_note(*args, **kwargs): return [] from app.core.qdrant import QdrantConfig, get_client, ensure_collections, ensure_payload_indexes from app.core.qdrant_points import ( points_for_chunks, points_for_note, points_for_edges, upsert_batch, ) from app.services.embeddings_client import EmbeddingsClient from app.services.edge_registry import registry as edge_registry from app.services.llm_service import LLMService logger = logging.getLogger(__name__) # --- Global Helpers (Full Compatibility v2.11.14) --- def extract_json_from_response(text: str) -> Any: """ Extrahiert JSON-Daten und bereinigt LLM-Steuerzeichen (Mistral/Llama). Entfernt , [OUT], [/OUT] und Markdown-Blöcke für maximale Robustheit. """ if not text or not isinstance(text, str): return [] # 1. Entferne Mistral/Llama Steuerzeichen und Tags clean = text.replace("", "").replace("", "") clean = clean.replace("[OUT]", "").replace("[/OUT]", "") clean = clean.strip() # 2. Suche nach Markdown JSON-Blöcken (```json ... ```) match = re.search(r"```(?:json)?\s*(.*?)\s*```", clean, re.DOTALL) payload = match.group(1) if match else clean try: return json.loads(payload.strip()) except json.JSONDecodeError: # 3. Recovery: Suche nach der ersten [ und letzten ] (Liste) start = payload.find('[') end = payload.rfind(']') + 1 if start != -1 and end > start: try: return json.loads(payload[start:end]) except: pass # 4. Zweite Recovery: Suche nach der ersten { und letzten } (Objekt) start_obj = payload.find('{') end_obj = payload.rfind('}') + 1 if start_obj != -1 and end_obj > start_obj: try: return json.loads(payload[start_obj:end_obj]) except: pass return [] def load_type_registry(custom_path: Optional[str] = None) -> dict: """Lädt die types.yaml zur Steuerung der typ-spezifischen Ingestion.""" import yaml from app.config import get_settings settings = get_settings() path = custom_path or settings.MINDNET_TYPES_FILE if not os.path.exists(path): return {} try: with open(path, "r", encoding="utf-8") as f: return yaml.safe_load(f) or {} except Exception: return {} # --- Service Class --- class IngestionService: def __init__(self, collection_prefix: str = None): from app.config import get_settings self.settings = get_settings() self.prefix = collection_prefix or self.settings.COLLECTION_PREFIX self.cfg = QdrantConfig.from_env() self.cfg.prefix = self.prefix self.client = get_client(self.cfg) self.dim = self.settings.VECTOR_SIZE self.registry = load_type_registry() self.embedder = EmbeddingsClient() self.llm = LLMService() self.active_hash_mode = self.settings.CHANGE_DETECTION_MODE self.batch_cache: Dict[str, NoteContext] = {} # WP-15b LocalBatchCache try: ensure_collections(self.client, self.prefix, self.dim) ensure_payload_indexes(self.client, self.prefix) except Exception as e: logger.warning(f"DB init warning: {e}") async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: """ WP-15b: Implementiert den Two-Pass Ingestion Workflow. Pass 1: Pre-Scan baut flüchtigen Kontext-Cache auf. Pass 2: Processing führt die eigentliche semantische Validierung durch. """ logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") for path in file_paths: ctx = pre_scan_markdown(path) if ctx: # Mehrfache Indizierung für robusten Look-up (WP-15b Fix) self.batch_cache[ctx.note_id] = ctx self.batch_cache[ctx.title] = ctx # Dateiname ohne Endung als dritter Schlüssel fname = os.path.splitext(os.path.basename(path))[0] self.batch_cache[fname] = ctx logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") results = [] for path in file_paths: res = await self.process_file(path, vault_root, apply=True) results.append(res) return results async def _validate_candidate(self, chunk_text: str, edge: Dict) -> bool: """ WP-15b: Validiert einen Kanten-Kandidaten semantisch gegen das Ziel. Nutzt den Cache aus Pass 1, um dem LLM Kontext der Ziel-Note zu geben. """ target_id = edge.get("to") target_ctx = self.batch_cache.get(target_id) # Fallback Look-up für Links mit Ankern (Anchor entfernen) if not target_ctx and "#" in target_id: base_id = target_id.split("#")[0] target_ctx = self.batch_cache.get(base_id) # Sicherheits-Fallback: Wenn Zielnotiz nicht im aktuellen Batch ist, # lassen wir die Kante als 'explicit' durch (Hard-Link Integrity). if not target_ctx: logger.info(f"ℹ️ [VALIDATION SKIP] No context for '{target_id}' - allowing link.") return True provider = self.settings.MINDNET_LLM_PROVIDER template = self.llm.get_prompt("edge_validation", provider) try: logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}'...") prompt = template.format( chunk_text=chunk_text[:1500], target_title=target_ctx.title, target_summary=target_ctx.summary, edge_kind=edge.get("kind", "related_to") ) response = await self.llm.generate_raw_response(prompt, priority="background") is_valid = "YES" in response.upper() if is_valid: logger.info(f"✅ [VALIDATED] Relation to '{target_id}' confirmed.") else: logger.info(f"🚫 [REJECTED] Relation to '{target_id}' irrelevant for this chunk.") return is_valid except Exception as e: logger.warning(f"⚠️ Semantic validation error for {target_id}: {e}") return True # Fallback: Im Zweifel Link behalten def _resolve_note_type(self, requested: Optional[str]) -> str: """Bestimmt den finalen Notiz-Typ (Fallback auf 'concept').""" types = self.registry.get("types", {}) if requested and requested in types: return requested return "concept" def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]: """Holt die Chunker-Parameter für ein spezifisches Profil aus der Registry.""" profiles = self.registry.get("chunking_profiles", {}) if profile_name in profiles: cfg = profiles[profile_name].copy() if "overlap" in cfg and isinstance(cfg["overlap"], list): cfg["overlap"] = tuple(cfg["overlap"]) return cfg return get_chunk_config(note_type) async def process_file( self, file_path: str, vault_root: str, force_replace: bool = False, apply: bool = False, purge_before: bool = False, note_scope_refs: bool = False, hash_source: str = "parsed", hash_normalize: str = "canonical" ) -> Dict[str, Any]: """Transformiert eine Markdown-Datei in den Graphen.""" result = {"path": file_path, "status": "skipped", "changed": False, "error": None} # 1. Parse & Lifecycle Gate try: parsed = read_markdown(file_path) if not parsed: return {**result, "error": "Empty file"} fm = normalize_frontmatter(parsed.frontmatter) validate_required_frontmatter(fm) except Exception as e: return {**result, "error": f"Validation failed: {str(e)}"} # Lifecycle Filter (WP-22) status = fm.get("status", "draft").lower().strip() if status in ["system", "template", "archive", "hidden"]: return {**result, "status": "skipped", "reason": f"lifecycle_{status}"} # 2. Config Resolution & Payload note_type = self._resolve_note_type(fm.get("type")) fm["type"] = note_type try: note_pl = make_note_payload(parsed, vault_root=vault_root, hash_normalize=hash_normalize, hash_source=hash_source, file_path=file_path) note_id = note_pl["note_id"] except Exception as e: return {**result, "error": f"Payload failed: {str(e)}"} # 3. Change Detection (v2.11.14 Logic) old_payload = None if force_replace else self._fetch_note_payload(note_id) check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}" old_hash = (old_payload or {}).get("hashes", {}).get(check_key) new_hash = note_pl.get("hashes", {}).get(check_key) chunks_missing, edges_missing = self._artifacts_missing(note_id) should_write = force_replace or (not old_payload) or (old_hash != new_hash) or chunks_missing or edges_missing if not should_write: return {**result, "status": "unchanged", "note_id": note_id} if not apply: return {**result, "status": "dry-run", "changed": True, "note_id": note_id} # 4. Processing (Chunking, Embedding, Validated Edges) try: body_text = getattr(parsed, "body", "") or "" edge_registry.ensure_latest() # Chunker Resolution profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard" chunk_cfg = self._get_chunk_config_by_profile(profile, note_type) enable_smart_edges = chunk_cfg.get("enable_smart_edge_allocation", False) # WP-15b: Chunker bereitet nun den Candidate-Pool vor (inkl. Inheritance). chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg) # WP-15b: Validierung NUR für Kandidaten aus dem global_pool (Unzugeordnete Kanten) for ch_obj in chunks: filtered_pool = [] for cand in getattr(ch_obj, "candidate_pool", []): # Nur 'global_pool' erfordert LLM-Validierung. # 'explicit' und 'inherited' werden direkt akzeptiert. if cand.get("provenance") == "global_pool" and enable_smart_edges: if await self._validate_candidate(ch_obj.text, cand): filtered_pool.append(cand) else: filtered_pool.append(cand) ch_obj.candidate_pool = filtered_pool chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) # Embeddings generieren vecs = [] if chunk_pls: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] vecs = await self.embedder.embed_documents(texts) # Kanten finalisieren via derive_edges Aggregator (WP-15b kompatibel) # Nutzt das Provenance-Ranking (v2.1.0). edges = build_edges_for_note( note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs ) # Alias-Auflösung & Registry Enforcement context = {"file": file_path, "note_id": note_id} for e in edges: e["kind"] = edge_registry.resolve( edge_type=e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"), context={**context, "line": e.get("line", "system")} ) except Exception as e: logger.error(f"Processing failed for {file_path}: {e}", exc_info=True) return {**result, "error": f"Processing failed: {str(e)}"} # 5. DB Upsert try: if purge_before and old_payload: self._purge_artifacts(note_id) n_name, n_pts = points_for_note(self.prefix, note_pl, None, self.dim) upsert_batch(self.client, n_name, n_pts) if chunk_pls and vecs: # v2.11.14 Points-Extraction Logic c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) if edges: # v2.11.14 Points-Extraction Logic e_pts = points_for_edges(self.prefix, edges)[1] upsert_batch(self.client, f"{self.prefix}_edges", e_pts) return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)} except Exception as e: return {**result, "error": f"DB Upsert failed: {e}"} def _fetch_note_payload(self, note_id: str) -> Optional[dict]: """Holt die Metadaten einer Note aus Qdrant.""" from qdrant_client.http import models as rest try: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) pts, _ = self.client.scroll(collection_name=f"{self.prefix}_notes", scroll_filter=f, limit=1, with_payload=True) return pts[0].payload if pts else None except: return None def _artifacts_missing(self, note_id: str) -> Tuple[bool, bool]: """Prüft Qdrant aktiv auf vorhandene Chunks und Edges.""" from qdrant_client.http import models as rest try: f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) c_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_chunks", scroll_filter=f, limit=1) e_pts, _ = self.client.scroll(collection_name=f"{self.prefix}_edges", scroll_filter=f, limit=1) return (not bool(c_pts)), (not bool(e_pts)) except: return True, True def _purge_artifacts(self, note_id: str): """Löscht verwaiste Chunks/Edges vor einem Re-Import.""" from qdrant_client.http import models as rest f = rest.Filter(must=[rest.FieldCondition(key="note_id", match=rest.MatchValue(value=note_id))]) for suffix in ["chunks", "edges"]: try: self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=rest.FilterSelector(filter=f)) except: pass async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]: """Hilfsmethode zur Erstellung einer Note aus einem Textstream.""" target_dir = os.path.join(vault_root, folder) os.makedirs(target_dir, exist_ok=True) file_path = os.path.join(target_dir, filename) with open(file_path, "w", encoding="utf-8") as f: f.write(markdown_content) await asyncio.sleep(0.1) return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)