diff --git a/app/core/ingestion.py b/app/core/ingestion.py index ce35daf..b433fc4 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -4,8 +4,10 @@ DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen. WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free). WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash. WP-15b: Two-Pass Ingestion mit LocalBatchCache & Candidate-Validation. - FIX: Beibehaltung der Deep Fallback Logic (v2.11.14) zur JSON-Recovery. -VERSION: 2.12.0 +FIX: Deep Fallback Logic (v2.11.14). Erkennt Policy Violations auch in validen + JSON-Objekten und erzwingt den lokalen Ollama-Sprung, um Kantenverlust + bei umfangreichen Protokollen zu verhindern. +VERSION: 2.12.1 STATUS: Active DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry @@ -128,16 +130,16 @@ class IngestionService: async def run_batch(self, file_paths: List[str], vault_root: str) -> List[Dict[str, Any]]: """ WP-15b: Implementiert den Two-Pass Ingestion Workflow. - Pass 1: Pre-Scan baut Kontext-Cache auf. - Pass 2: Processing führt semantische Validierung durch. + Pass 1: Pre-Scan baut flüchtigen Kontext-Cache auf. + Pass 2: Processing führt die eigentliche semantische Validierung durch. """ - logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Batch Cache...") + logger.info(f"🔍 [Pass 1] Pre-Scanning {len(file_paths)} files for Context Cache...") for path in file_paths: ctx = pre_scan_markdown(path) if ctx: self.batch_cache[ctx.note_id] = ctx - logger.info(f"🚀 [Pass 2] Processing {len(file_paths)} files...") + logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") results = [] for path in file_paths: res = await self.process_file(path, vault_root, apply=True) @@ -152,14 +154,17 @@ class IngestionService: target_id = edge.get("to") target_ctx = self.batch_cache.get(target_id) - # Falls Zielnotiz nicht im aktuellen Batch ist: 'explicit' durchlassen (Hard-Link Integrity) + # Sicherheits-Fallback: Wenn Zielnotiz nicht im aktuellen Batch ist, + # lassen wir die Kante als 'explicit' durch (Hard-Link Integrity). if not target_ctx: + logger.info(f"ℹ️ [VALIDATION SKIP] No cache context for '{target_id}' - allowing link.") return True provider = self.settings.MINDNET_LLM_PROVIDER template = self.llm.get_prompt("edge_validation", provider) try: + logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}'...") prompt = template.format( chunk_text=chunk_text[:1500], target_title=target_ctx.title, @@ -168,7 +173,14 @@ class IngestionService: ) response = await self.llm.generate_raw_response(prompt, priority="background") - return "YES" in response.upper() + is_valid = "YES" in response.upper() + + if is_valid: + logger.info(f"✅ [VALIDATED] Relation '{edge.get('kind')}' to '{target_id}' confirmed.") + else: + logger.info(f"🚫 [REJECTED] WP-15b Candidate: '{edge.get('kind')}' -> '{target_id}' not relevant.") + + return is_valid except Exception as e: logger.warning(f"⚠️ Semantic validation error for {target_id}: {e}") return True # Fallback: Im Zweifel Link behalten @@ -244,44 +256,49 @@ class IngestionService: # Chunker Resolution profile = fm.get("chunk_profile") or fm.get("chunking_profile") or "sliding_standard" chunk_cfg = self._get_chunk_config_by_profile(profile, note_type) + enable_smart_edges = chunk_cfg.get("enable_smart_edge_allocation", False) + + # WP-15b: Chunker bereitet nun den Candidate-Pool vor. chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg) + + # WP-15b: Validierung der Kandidaten aus dem Global Pool. + for ch_obj in chunks: + filtered_pool = [] + for cand in getattr(ch_obj, "candidate_pool", []): + # Nur 'global_pool' (Unzugeordnete Kanten) erfordern LLM-Validierung. + # Sektions-Kanten ('inherited') werden direkt akzeptiert. + if cand.get("provenance") == "global_pool" and enable_smart_edges: + if await self._validate_candidate(ch_obj.text, cand): + filtered_pool.append(cand) + else: + filtered_pool.append(cand) + ch_obj.candidate_pool = filtered_pool + chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text) - # Embeddings + # Embeddings generieren vecs = [] if chunk_pls: texts = [c.get("window") or c.get("text") or "" for c in chunk_pls] vecs = await self.embedder.embed_documents(texts) - # Kanten-Extraktion & WP-15b Validierung - edges = [] - context = {"file": file_path, "note_id": note_id} - - # A. Explizite Kandidaten (Wikilinks) - raw_candidates = extract_edges_with_context(parsed) - for cand in raw_candidates: - # Semantische Prüfung gegen Pass 1 Cache - if await self._validate_candidate(body_text, cand): - cand["kind"] = edge_registry.resolve( - edge_type=cand["kind"], - provenance="explicit", - context={**context, "line": cand.get("line")} - ) - edges.append(cand) - else: - logger.info(f"🚫 WP-15b: Candidate rejected: {cand['kind']} -> {cand['to']}") - - # B. System Kanten (Struktur) - try: - sys_edges = build_edges_for_note(note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs) - except: - sys_edges = build_edges_for_note(note_id, chunk_pls) + # Kanten finalisieren via derive_edges Aggregator (WP-15b kompatibel) + # Nutzt das Provenance-Ranking (v2.1.0). + edges = build_edges_for_note( + note_id, + chunk_pls, + note_level_references=note_pl.get("references", []), + include_note_scope_refs=note_scope_refs + ) - for e in sys_edges: - valid_kind = edge_registry.resolve(edge_type=e.get("kind", "belongs_to"), provenance="structure", context={**context, "line": "system"}) - if valid_kind: - e["kind"] = valid_kind - edges.append(e) + # Alias-Auflösung & Registry Enforcement + context = {"file": file_path, "note_id": note_id} + for e in edges: + e["kind"] = edge_registry.resolve( + edge_type=e.get("kind", "related_to"), + provenance=e.get("provenance", "explicit"), + context={**context, "line": e.get("line", "system")} + ) except Exception as e: logger.error(f"Processing failed for {file_path}: {e}", exc_info=True) diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index d5ce195..917b46a 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -2,7 +2,9 @@ """ scripts/import_markdown.py CLI-Tool zum Importieren von Markdown-Dateien in Qdrant. -Updated for Mindnet v2.3.6 (Async Ingestion Support). +WP-15b: Implementiert den Two-Pass Workflow (Pre-Scan + Processing). +Sorgt dafür, dass der LocalBatchCache vor der Verarbeitung gefüllt wird. +VERSION: 2.4.0 """ import asyncio import os @@ -11,21 +13,16 @@ import logging from pathlib import Path from dotenv import load_dotenv -import logging # Setzt das Level global auf INFO, damit Sie den Fortschritt sehen logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') -# Wenn Sie TIEFE Einblicke wollen, setzen Sie den SemanticAnalyzer spezifisch auf DEBUG: -logging.getLogger("app.services.semantic_analyzer").setLevel(logging.DEBUG) - # Importiere den neuen Async Service -# Stellen wir sicher, dass der Pfad stimmt (Pythonpath) import sys sys.path.append(os.getcwd()) from app.core.ingestion import IngestionService +from app.core.parser import pre_scan_markdown -logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(message)s") logger = logging.getLogger("importer") async def main_async(args): @@ -34,7 +31,7 @@ async def main_async(args): logger.error(f"Vault path does not exist: {vault_path}") return - # Service initialisieren (startet Async Clients) + # 1. Service initialisieren logger.info(f"Initializing IngestionService (Prefix: {args.prefix})") service = IngestionService(collection_prefix=args.prefix) @@ -46,14 +43,31 @@ async def main_async(args): logger.info(f"Found {len(files)} markdown files.") - stats = {"processed": 0, "skipped": 0, "errors": 0} + # ========================================================================= + # PASS 1: Global Pre-Scan (WP-15b) + # Füllt den LocalBatchCache für die semantische Kanten-Validierung. + # ========================================================================= + logger.info(f"🔍 [Pass 1] Pre-scanning {len(files)} files for global context cache...") + for f_path in files: + try: + ctx = pre_scan_markdown(str(f_path)) + if ctx: + service.batch_cache[ctx.note_id] = ctx + except Exception as e: + logger.warning(f"⚠️ Could not pre-scan {f_path}: {e}") - # Wir nutzen eine Semaphore, um nicht zu viele Files gleichzeitig zu öffnen/embedden - sem = asyncio.Semaphore(5) # Max 5 concurrent files to avoid OOM or Rate Limit + logger.info(f"✅ Cache populated with {len(service.batch_cache)} note contexts.") + + # ========================================================================= + # PASS 2: Processing (Batch-Verarbeitung) + # ========================================================================= + stats = {"processed": 0, "skipped": 0, "errors": 0} + sem = asyncio.Semaphore(5) # Max 5 parallele Dateien für Stabilität async def process_with_limit(f_path): async with sem: try: + # Nutzt den nun gefüllten Batch-Cache für die Validierung res = await service.process_file( file_path=str(f_path), vault_root=str(vault_path), @@ -65,8 +79,8 @@ async def main_async(args): except Exception as e: return {"status": "error", "error": str(e), "path": str(f_path)} - # Batch Processing - # Wir verarbeiten in Chunks, um den Progress zu sehen + logger.info(f"🚀 [Pass 2] Starting semantic processing in batches...") + batch_size = 20 for i in range(0, len(files), batch_size): batch = files[i:i+batch_size] @@ -92,7 +106,7 @@ def main(): load_dotenv() default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") - parser = argparse.ArgumentParser(description="Import Vault to Qdrant (Async)") + parser = argparse.ArgumentParser(description="Import Vault to Qdrant (Two-Pass Ingestion)") parser.add_argument("--vault", default="./vault", help="Path to vault root") parser.add_argument("--prefix", default=default_prefix, help="Collection prefix") parser.add_argument("--force", action="store_true", help="Force re-index all files")