From 9b3fd7723ee613a9134f02584cfe1924941760c4 Mon Sep 17 00:00:00 2001 From: Lars Date: Fri, 9 Jan 2026 14:25:46 +0100 Subject: [PATCH] Update ingestion processor to version 3.1.0: Fix bidirectional edge injection for Qdrant, streamline edge validation by removing symmetry logic from the validation step, and enhance inverse edge generation in the processing pipeline. Improve logging for symmetry creation in edge payloads. --- app/core/ingestion/ingestion_processor.py | 63 +++++++++++++++++------ 1 file changed, 47 insertions(+), 16 deletions(-) diff --git a/app/core/ingestion/ingestion_processor.py b/app/core/ingestion/ingestion_processor.py index 18e06a0..dd3d78e 100644 --- a/app/core/ingestion/ingestion_processor.py +++ b/app/core/ingestion/ingestion_processor.py @@ -5,8 +5,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator). WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. - AUDIT v3.0.0: Synchronisierung der bidirektionalen Graph-Logik. -VERSION: 3.0.0 (WP-24c: Symmetric Graph Ingestion) + AUDIT v3.1.0: Korrektur der bidirektionalen Graph-Injektion für Qdrant. +VERSION: 3.1.0 (WP-24c: Symmetric Edge Injection Fix) STATUS: Active """ import logging @@ -30,11 +30,11 @@ from app.services.embeddings_client import EmbeddingsClient from app.services.edge_registry import registry as edge_registry from app.services.llm_service import LLMService -# Package-Interne Imports (Refactoring WP-14 / WP-24c) +# Package-Interne Imports from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts -# WP-24c: Import der erweiterten Symmetrie-Logik -from .ingestion_validation import validate_edge_candidate, validate_and_symmetrize +# WP-24c: Wir nutzen die Basis-Validierung; die Symmetrie wird im Prozessor injiziert +from .ingestion_validation import validate_edge_candidate from .ingestion_note_payload import make_note_payload from .ingestion_chunk_payload import make_chunk_payloads @@ -169,22 +169,21 @@ class IngestionService: # WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor. chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) - # Semantische Kanten-Validierung & Symmetrie (WP-24c / WP-25a) + # Semantische Kanten-Validierung (Primärprüfung) for ch in chunks: new_pool = [] for cand in getattr(ch, "candidate_pool", []): - # WP-24c: Nutzung des erweiterten Symmetrie-Gateways + # WP-25a: Profilgesteuerte binäre Validierung if cand.get("provenance") == "global_pool" and enable_smart: - # Erzeugt Primär- und Inverse Kanten falls validiert - res_batch = await validate_and_symmetrize( + is_valid = await validate_edge_candidate( chunk_text=ch.text, edge=cand, - source_id=note_id, batch_cache=self.batch_cache, llm_service=self.llm, profile_name="ingest_validator" ) - new_pool.extend(res_batch) + if is_valid: + new_pool.append(cand) else: # Explizite Kanten (Wikilinks/Callouts) werden übernommen new_pool.append(cand) @@ -200,19 +199,51 @@ class IngestionService: vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] # Aggregation aller finalen Kanten (Edges) - edges = build_edges_for_note( + raw_edges = build_edges_for_note( note_id, chunk_pls, note_level_references=note_pl.get("references", []), include_note_scope_refs=note_scope_refs ) - # Kanten-Typen via Registry validieren/auflösen - for e in edges: - e["kind"] = edge_registry.resolve( + # --- WP-24c: Symmetrie-Injektion (Invers-Kanten Fix) --- + # Wir bauen die finalen Kanten-Objekte inklusive ihrer Gegenstücke + final_edges = [] + for e in raw_edges: + # 1. Primär-Kante auflösen & kanonisieren + resolved_kind = edge_registry.resolve( e.get("kind", "related_to"), provenance=e.get("provenance", "explicit"), context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")} ) + e["kind"] = resolved_kind + final_edges.append(e) + + # 2. Symmetrie-Erzeugung via Registry + inverse_kind = edge_registry.get_inverse(resolved_kind) + target_id = e.get("target_id") + + # Wir erzeugen eine Inverse nur bei sinnvoller Symmetrie und existierendem Ziel + if inverse_kind and inverse_kind != resolved_kind and target_id: + # Deep Copy für die Inverse zur Vermeidung von Side-Effects + inv_edge = e.copy() + + # Richtungs-Umkehr + inv_edge["note_id"] = target_id # Ursprung ist nun das Ziel + inv_edge["target_id"] = note_id # Ziel ist nun die Quelle + inv_edge["kind"] = inverse_kind + + # Metadaten-Anpassung + inv_edge["virtual"] = True + inv_edge["provenance"] = "structure" # Schutz durch Firewall + inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9 # Leichte Dämpfung + + # Lifecycle-Verankerung: Die Inverse gehört logisch zur Quell-Note + inv_edge["origin_note_id"] = note_id + + final_edges.append(inv_edge) + logger.info(f"🔄 [SYMMETRY] Built inverse in payload: {target_id} --({inverse_kind})--> {note_id}") + + edges = final_edges # 4. DB Upsert via modularisierter Points-Logik if purge_before and old_payload: @@ -227,7 +258,7 @@ class IngestionService: c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) - # Speichern der Kanten + # Speichern der Kanten (inklusive der virtuellen Inversen) if edges: e_pts = points_for_edges(self.prefix, edges)[1] upsert_batch(self.client, f"{self.prefix}_edges", e_pts)