diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 18e06a0..dd3d78e 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,8 +5,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.0.0: Synchronisierung der bidirektionalen Graph-Logik. -VERSION: 3.0.0 (WP-24c: Symmetric Graph Ingestion) + AUDIT v3.1.0: Korrektur der bidirektionalen Graph-Injektion für Qdrant. +VERSION: 3.1.0 (WP-24c: Symmetric Edge Injection Fix) STATUS: Active """ import logging @@ -30,11 +30,11 @@ from app.services.embeddings_client import EmbeddingsClient from app.services.edge_registry import registry as edge_registry from app.services.llm_service import LLMService -# Package-Interne Imports (Refactoring WP-14 / WP-24c) +# Package-Interne Imports from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts -# WP-24c: Import der erweiterten Symmetrie-Logik -from .ingestion_validation import validate_edge_candidate, validate_and_symmetrize +# WP-24c: Wir nutzen die Basis-Validierung; die Symmetrie wird im Prozessor injiziert +from .ingestion_validation import validate_edge_candidate from .ingestion_note_payload import make_note_payload from .ingestion_chunk_payload import make_chunk_payloads @@ -169,22 +169,21 @@ class IngestionService: # WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor. chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) - # Semantische Kanten-Validierung & Symmetrie (WP-24c / WP-25a) + # Semantische Kanten-Validierung (Primärprüfung) for ch in chunks: new_pool = [] for cand in getattr(ch, "candidate_pool", []): - # WP-24c: Nutzung des erweiterten Symmetrie-Gateways + # WP-25a: Profilgesteuerte binäre Validierung if cand.get("provenance") == "global_pool" and enable_smart: - # Erzeugt Primär- und Inverse Kanten falls validiert - res_batch = await validate_and_symmetrize( + is_valid = await validate_edge_candidate( chunk_text=ch.text, edge=cand, - source_id=note_id, batch_cache=self.batch_cache, llm_service=self.llm, profile_name="ingest_validator" ) - new_pool.extend(res_batch) + if is_valid: + new_pool.append(cand) else: # Explizite Kanten (Wikilinks/Callouts) werden übernommen new_pool.append(cand) @@ -200,19 +199,51 @@ class IngestionService: vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] # Aggregation aller finalen Kanten (Edges) - edges = build_edges_for_note( + raw_edges = build_edges_for_note( note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs ) - # Kanten-Typen via Registry validieren/auflösen - for e in edges: - e["kind"] = edge_registry.resolve( + # --- WP-24c: Symmetrie-Injektion (Invers-Kanten Fix) --- + # Wir bauen die finalen Kanten-Objekte inklusive ihrer Gegenstücke + final_edges = [] + for e in raw_edges: + # 1. Primär-Kante auflösen & kanonisieren + resolved_kind = edge_registry.resolve( e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"), context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")} ) + e["kind"] = resolved_kind + final_edges.append(e) + + # 2. Symmetrie-Erzeugung via Registry + inverse_kind = edge_registry.get_inverse(resolved_kind) + target_id = e.get("target_id") + + # Wir erzeugen eine Inverse nur bei sinnvoller Symmetrie und existierendem Ziel + if inverse_kind and inverse_kind != resolved_kind and target_id: + # Deep Copy für die Inverse zur Vermeidung von Side-Effects + inv_edge = e.copy() + + # Richtungs-Umkehr + inv_edge["note_id"] = target_id # Ursprung ist nun das Ziel + inv_edge["target_id"] = note_id # Ziel ist nun die Quelle + inv_edge["kind"] = inverse_kind + + # Metadaten-Anpassung + inv_edge["virtual"] = True + inv_edge["provenance"] = "structure" # Schutz durch Firewall + inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9 # Leichte Dämpfung + + # Lifecycle-Verankerung: Die Inverse gehört logisch zur Quell-Note + inv_edge["origin_note_id"] = note_id + + final_edges.append(inv_edge) + logger.info(f"🔄 [SYMMETRY] Built inverse in payload: {target_id} --({inverse_kind})--> {note_id}") + + edges = final_edges # 4. DB Upsert via modularisierter Points-Logik if purge_before and old_payload: @@ -227,7 +258,7 @@ class IngestionService: c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) - # Speichern der Kanten + # Speichern der Kanten (inklusive der virtuellen Inversen) if edges: e_pts = points_for_edges(self.prefix, edges)[1] upsert_batch(self.client, f"{self.prefix}_edges", e_pts)