Update ingestion processor to version 3.1.0: Fix bidirectional edge injection for Qdrant, streamline edge validation by removing symmetry logic from the validation step, and enhance inverse edge generation in the processing pipeline. Improve logging for symmetry creation in edge payloads.

This commit is contained in:
Lars 2026-01-09 14:25:46 +01:00
parent 4802eba27b
commit 9b3fd7723e

View File

@ -5,8 +5,8 @@ DESCRIPTION: Der zentrale IngestionService (Orchestrator).
WP-25a: Integration der Mixture of Experts (MoE) Architektur. WP-25a: Integration der Mixture of Experts (MoE) Architektur.
WP-15b: Two-Pass Workflow mit globalem Kontext-Cache. WP-15b: Two-Pass Workflow mit globalem Kontext-Cache.
WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert. WP-20/22: Cloud-Resilienz und Content-Lifecycle integriert.
AUDIT v3.0.0: Synchronisierung der bidirektionalen Graph-Logik. AUDIT v3.1.0: Korrektur der bidirektionalen Graph-Injektion für Qdrant.
VERSION: 3.0.0 (WP-24c: Symmetric Graph Ingestion) VERSION: 3.1.0 (WP-24c: Symmetric Edge Injection Fix)
STATUS: Active STATUS: Active
""" """
import logging import logging
@ -30,11 +30,11 @@ from app.services.embeddings_client import EmbeddingsClient
from app.services.edge_registry import registry as edge_registry from app.services.edge_registry import registry as edge_registry
from app.services.llm_service import LLMService from app.services.llm_service import LLMService
# Package-Interne Imports (Refactoring WP-14 / WP-24c) # Package-Interne Imports
from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile from .ingestion_utils import load_type_registry, resolve_note_type, get_chunk_config_by_profile
from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts from .ingestion_db import fetch_note_payload, artifacts_missing, purge_artifacts
# WP-24c: Import der erweiterten Symmetrie-Logik # WP-24c: Wir nutzen die Basis-Validierung; die Symmetrie wird im Prozessor injiziert
from .ingestion_validation import validate_edge_candidate, validate_and_symmetrize from .ingestion_validation import validate_edge_candidate
from .ingestion_note_payload import make_note_payload from .ingestion_note_payload import make_note_payload
from .ingestion_chunk_payload import make_chunk_payloads from .ingestion_chunk_payload import make_chunk_payloads
@ -169,22 +169,21 @@ class IngestionService:
# WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor. # WP-15b: Chunker-Aufruf bereitet den Candidate-Pool pro Chunk vor.
chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg) chunks = await assemble_chunks(note_id, body_text, note_type, config=chunk_cfg)
# Semantische Kanten-Validierung & Symmetrie (WP-24c / WP-25a) # Semantische Kanten-Validierung (Primärprüfung)
for ch in chunks: for ch in chunks:
new_pool = [] new_pool = []
for cand in getattr(ch, "candidate_pool", []): for cand in getattr(ch, "candidate_pool", []):
# WP-24c: Nutzung des erweiterten Symmetrie-Gateways # WP-25a: Profilgesteuerte binäre Validierung
if cand.get("provenance") == "global_pool" and enable_smart: if cand.get("provenance") == "global_pool" and enable_smart:
# Erzeugt Primär- und Inverse Kanten falls validiert is_valid = await validate_edge_candidate(
res_batch = await validate_and_symmetrize(
chunk_text=ch.text, chunk_text=ch.text,
edge=cand, edge=cand,
source_id=note_id,
batch_cache=self.batch_cache, batch_cache=self.batch_cache,
llm_service=self.llm, llm_service=self.llm,
profile_name="ingest_validator" profile_name="ingest_validator"
) )
new_pool.extend(res_batch) if is_valid:
new_pool.append(cand)
else: else:
# Explizite Kanten (Wikilinks/Callouts) werden übernommen # Explizite Kanten (Wikilinks/Callouts) werden übernommen
new_pool.append(cand) new_pool.append(cand)
@ -200,19 +199,51 @@ class IngestionService:
vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else [] vecs = await self.embedder.embed_documents([c.get("window") or "" for c in chunk_pls]) if chunk_pls else []
# Aggregation aller finalen Kanten (Edges) # Aggregation aller finalen Kanten (Edges)
edges = build_edges_for_note( raw_edges = build_edges_for_note(
note_id, chunk_pls, note_id, chunk_pls,
note_level_references=note_pl.get("references", []), note_level_references=note_pl.get("references", []),
include_note_scope_refs=note_scope_refs include_note_scope_refs=note_scope_refs
) )
# Kanten-Typen via Registry validieren/auflösen # --- WP-24c: Symmetrie-Injektion (Invers-Kanten Fix) ---
for e in edges: # Wir bauen die finalen Kanten-Objekte inklusive ihrer Gegenstücke
e["kind"] = edge_registry.resolve( final_edges = []
for e in raw_edges:
# 1. Primär-Kante auflösen & kanonisieren
resolved_kind = edge_registry.resolve(
e.get("kind", "related_to"), e.get("kind", "related_to"),
provenance=e.get("provenance", "explicit"), provenance=e.get("provenance", "explicit"),
context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")} context={"file": file_path, "note_id": note_id, "line": e.get("line", "system")}
) )
e["kind"] = resolved_kind
final_edges.append(e)
# 2. Symmetrie-Erzeugung via Registry
inverse_kind = edge_registry.get_inverse(resolved_kind)
target_id = e.get("target_id")
# Wir erzeugen eine Inverse nur bei sinnvoller Symmetrie und existierendem Ziel
if inverse_kind and inverse_kind != resolved_kind and target_id:
# Deep Copy für die Inverse zur Vermeidung von Side-Effects
inv_edge = e.copy()
# Richtungs-Umkehr
inv_edge["note_id"] = target_id # Ursprung ist nun das Ziel
inv_edge["target_id"] = note_id # Ziel ist nun die Quelle
inv_edge["kind"] = inverse_kind
# Metadaten-Anpassung
inv_edge["virtual"] = True
inv_edge["provenance"] = "structure" # Schutz durch Firewall
inv_edge["confidence"] = e.get("confidence", 0.9) * 0.9 # Leichte Dämpfung
# Lifecycle-Verankerung: Die Inverse gehört logisch zur Quell-Note
inv_edge["origin_note_id"] = note_id
final_edges.append(inv_edge)
logger.info(f"🔄 [SYMMETRY] Built inverse in payload: {target_id} --({inverse_kind})--> {note_id}")
edges = final_edges
# 4. DB Upsert via modularisierter Points-Logik # 4. DB Upsert via modularisierter Points-Logik
if purge_before and old_payload: if purge_before and old_payload:
@ -227,7 +258,7 @@ class IngestionService:
c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1] c_pts = points_for_chunks(self.prefix, chunk_pls, vecs)[1]
upsert_batch(self.client, f"{self.prefix}_chunks", c_pts) upsert_batch(self.client, f"{self.prefix}_chunks", c_pts)
# Speichern der Kanten # Speichern der Kanten (inklusive der virtuellen Inversen)
if edges: if edges:
e_pts = points_for_edges(self.prefix, edges)[1] e_pts = points_for_edges(self.prefix, edges)[1]
upsert_batch(self.client, f"{self.prefix}_edges", e_pts) upsert_batch(self.client, f"{self.prefix}_edges", e_pts)