diff --git a/app/core/ingestion.py b/app/core/ingestion.py index b433fc4..a5a80d8 100644 --- a/app/core/ingestion.py +++ b/app/core/ingestion.py @@ -4,10 +4,10 @@ DESCRIPTION: Haupt-Ingestion-Logik. Transformiert Markdown in den Graphen. WP-20: Optimiert für OpenRouter (mistralai/mistral-7b-instruct:free). WP-22: Content Lifecycle, Edge Registry Validation & Multi-Hash. WP-15b: Two-Pass Ingestion mit LocalBatchCache & Candidate-Validation. -FIX: Deep Fallback Logic (v2.11.14). Erkennt Policy Violations auch in validen - JSON-Objekten und erzwingt den lokalen Ollama-Sprung, um Kantenverlust - bei umfangreichen Protokollen zu verhindern. -VERSION: 2.12.1 + Sichert, dass explizite Kanten direkt übernommen und nur Pool-Kanten validiert werden. +FIX: Deep Fallback Logic (v2.11.14) für JSON-Recovery. + Robust Lookup Fix: Adressiert Notizen im Cache via ID, Titel und Dateiname. +VERSION: 2.12.2 STATUS: Active DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.services.llm_service, app.services.edge_registry @@ -137,7 +137,12 @@ class IngestionService: for path in file_paths: ctx = pre_scan_markdown(path) if ctx: + # Mehrfache Indizierung für robusten Look-up (WP-15b Fix) self.batch_cache[ctx.note_id] = ctx + self.batch_cache[ctx.title] = ctx + # Dateiname ohne Endung als dritter Schlüssel + fname = os.path.splitext(os.path.basename(path))[0] + self.batch_cache[fname] = ctx logger.info(f"🚀 [Pass 2] Semantic Processing of {len(file_paths)} files...") results = [] @@ -154,10 +159,15 @@ class IngestionService: target_id = edge.get("to") target_ctx = self.batch_cache.get(target_id) + # Fallback Look-up für Links mit Ankern (Anchor entfernen) + if not target_ctx and "#" in target_id: + base_id = target_id.split("#")[0] + target_ctx = self.batch_cache.get(base_id) + # Sicherheits-Fallback: Wenn Zielnotiz nicht im aktuellen Batch ist, # lassen wir die Kante als 'explicit' durch (Hard-Link Integrity). if not target_ctx: - logger.info(f"ℹ️ [VALIDATION SKIP] No cache context for '{target_id}' - allowing link.") + logger.info(f"ℹ️ [VALIDATION SKIP] No context for '{target_id}' - allowing link.") return True provider = self.settings.MINDNET_LLM_PROVIDER @@ -176,9 +186,9 @@ class IngestionService: is_valid = "YES" in response.upper() if is_valid: - logger.info(f"✅ [VALIDATED] Relation '{edge.get('kind')}' to '{target_id}' confirmed.") + logger.info(f"✅ [VALIDATED] Relation to '{target_id}' confirmed.") else: - logger.info(f"🚫 [REJECTED] WP-15b Candidate: '{edge.get('kind')}' -> '{target_id}' not relevant.") + logger.info(f"🚫 [REJECTED] Relation to '{target_id}' irrelevant for this chunk.") return is_valid except Exception as e: @@ -258,15 +268,15 @@ class IngestionService: chunk_cfg = self._get_chunk_config_by_profile(profile, note_type) enable_smart_edges = chunk_cfg.get("enable_smart_edge_allocation", False) - # WP-15b: Chunker bereitet nun den Candidate-Pool vor. + # WP-15b: Chunker bereitet nun den Candidate-Pool vor (inkl. Inheritance). chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_cfg) - # WP-15b: Validierung der Kandidaten aus dem Global Pool. + # WP-15b: Validierung NUR für Kandidaten aus dem global_pool (Unzugeordnete Kanten) for ch_obj in chunks: filtered_pool = [] for cand in getattr(ch_obj, "candidate_pool", []): - # Nur 'global_pool' (Unzugeordnete Kanten) erfordern LLM-Validierung. - # Sektions-Kanten ('inherited') werden direkt akzeptiert. + # Nur 'global_pool' erfordert LLM-Validierung. + # 'explicit' und 'inherited' werden direkt akzeptiert. if cand.get("provenance") == "global_pool" and enable_smart_edges: if await self._validate_candidate(ch_obj.text, cand): filtered_pool.append(cand) @@ -312,12 +322,14 @@ class IngestionService: upsert_batch(self.client, n_name, n_pts) if chunk_pls and vecs: - c_name, c_pts = points_for_chunks(self.prefix, chunk_pls, vecs) - upsert_batch(self.client, c_name, c_pts) + # v2.11.14 Points-Extraction Logic + c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] + upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) if edges: - e_name, e_pts = points_for_edges(self.prefix, edges) - upsert_batch(self.client, e_name, e_pts) + # v2.11.14 Points-Extraction Logic + e_pts = points_for_edges(self.prefix, edges)[1] + upsert_batch(self.client, f"{self.prefix}_edges", e_pts) return {"path": file_path, "status": "success", "changed": True, "note_id": note_id, "chunks_count": len(chunk_pls), "edges_count": len(edges)} except Exception as e: diff --git a/scripts/import_markdown.py b/scripts/import_markdown.py index 917b46a..544ae40 100644 --- a/scripts/import_markdown.py +++ b/scripts/import_markdown.py @@ -3,8 +3,9 @@ scripts/import_markdown.py CLI-Tool zum Importieren von Markdown-Dateien in Qdrant. WP-15b: Implementiert den Two-Pass Workflow (Pre-Scan + Processing). -Sorgt dafür, dass der LocalBatchCache vor der Verarbeitung gefüllt wird. -VERSION: 2.4.0 +Sorgt dafür, dass der LocalBatchCache vor der Verarbeitung robust gefüllt wird. +Indiziert Notizen nach ID, Titel und Dateiname für maximale Link-Kompatibilität. +VERSION: 2.4.1 """ import asyncio import os @@ -13,10 +14,10 @@ import logging from pathlib import Path from dotenv import load_dotenv -# Setzt das Level global auf INFO, damit Sie den Fortschritt sehen +# Setzt das Level global auf INFO, damit der Fortschritt im Log sichtbar ist logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s') -# Importiere den neuen Async Service +# Importiere den neuen Async Service und stelle Python-Pfad sicher import sys sys.path.append(os.getcwd()) @@ -44,30 +45,41 @@ async def main_async(args): logger.info(f"Found {len(files)} markdown files.") # ========================================================================= - # PASS 1: Global Pre-Scan (WP-15b) + # PASS 1: Global Pre-Scan (WP-15b Harvester) # Füllt den LocalBatchCache für die semantische Kanten-Validierung. + # Nutzt ID, Titel und Filename für robusten Look-up. # ========================================================================= logger.info(f"🔍 [Pass 1] Pre-scanning {len(files)} files for global context cache...") for f_path in files: try: ctx = pre_scan_markdown(str(f_path)) if ctx: + # 1. Look-up via Note ID (UUID oder Frontmatter ID) service.batch_cache[ctx.note_id] = ctx + + # 2. Look-up via Titel (Wichtig für Wikilinks [[Titel]]) + service.batch_cache[ctx.title] = ctx + + # 3. Look-up via Dateiname (Wichtig für Wikilinks [[Filename]]) + fname = os.path.splitext(f_path.name)[0] + service.batch_cache[fname] = ctx + except Exception as e: - logger.warning(f"⚠️ Could not pre-scan {f_path}: {e}") + logger.warning(f"⚠️ Could not pre-scan {f_path.name}: {e}") - logger.info(f"✅ Cache populated with {len(service.batch_cache)} note contexts.") + logger.info(f"✅ Context Cache populated for {len(files)} notes.") # ========================================================================= - # PASS 2: Processing (Batch-Verarbeitung) + # PASS 2: Processing (Semantic Batch-Verarbeitung) + # Nutzt den gefüllten Cache zur binären Validierung semantischer Kanten. # ========================================================================= stats = {"processed": 0, "skipped": 0, "errors": 0} - sem = asyncio.Semaphore(5) # Max 5 parallele Dateien für Stabilität + sem = asyncio.Semaphore(5) # Max 5 parallele Dateien für Cloud-Stabilität async def process_with_limit(f_path): async with sem: try: - # Nutzt den nun gefüllten Batch-Cache für die Validierung + # Nutzt den nun gefüllten Batch-Cache in der process_file Logik res = await service.process_file( file_path=str(f_path), vault_root=str(vault_path), @@ -106,7 +118,7 @@ def main(): load_dotenv() default_prefix = os.getenv("COLLECTION_PREFIX", "mindnet") - parser = argparse.ArgumentParser(description="Import Vault to Qdrant (Two-Pass Ingestion)") + parser = argparse.ArgumentParser(description="Two-Pass Markdown Ingestion for Mindnet") parser.add_argument("--vault", default="./vault", help="Path to vault root") parser.add_argument("--prefix", default=default_prefix, help="Collection prefix") parser.add_argument("--force", action="store_true", help="Force re-index all files") @@ -114,7 +126,7 @@ def main(): args = parser.parse_args() - # Starte den Async Loop + # Starte den asynchronen Haupt-Loop asyncio.run(main_async(args)) if __name__ == "__main__":