mindnet/app/core/ingestion/ingestion_validation.py

150 lines
6.1 KiB
Python
Raw Blame History

This file contains invisible Unicode characters

This file contains invisible Unicode characters that are indistinguishable to humans but may be processed differently by a computer. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
FILE: app/core/ingestion/ingestion_validation.py
DESCRIPTION: WP-15b semantische Validierung von Kanten gegen den LocalBatchCache.
WP-24c: Erweiterung um automatische Symmetrie-Generierung (Inverse Kanten).
WP-25b: Konsequente Lazy-Prompt-Orchestration (prompt_key + variables).
VERSION: 3.0.0 (WP-24c: Symmetric Edge Management)
STATUS: Active
FIX:
- WP-24c: Integration der EdgeRegistry zur dynamischen Inversions-Ermittlung.
- WP-24c: Implementierung von validate_and_symmetrize für bidirektionale Graphen.
- WP-25b: Beibehaltung der hierarchischen Prompt-Resolution und Modell-Spezi-Logik.
"""
import logging
from typing import Dict, Any, Optional, List
from app.core.parser import NoteContext
# Import der neutralen Bereinigungs-Logik zur Vermeidung von Circular Imports
from app.core.registry import clean_llm_text
# WP-24c: Zugriff auf das dynamische Vokabular
from app.services.edge_registry import registry as edge_registry
logger = logging.getLogger(__name__)
async def validate_edge_candidate(
chunk_text: str,
edge: Dict,
batch_cache: Dict[str, NoteContext],
llm_service: Any,
provider: Optional[str] = None,
profile_name: str = "ingest_validator"
) -> bool:
"""
WP-15b/25b: Validiert einen Kandidaten semantisch gegen das Ziel im Cache.
Nutzt Lazy-Prompt-Loading (PROMPT-TRACE) für deterministische YES/NO Entscheidungen.
"""
target_id = edge.get("to")
target_ctx = batch_cache.get(target_id)
# Robust Lookup Fix (v2.12.2): Support für Anker (Note#Section)
if not target_ctx and "#" in str(target_id):
base_id = target_id.split("#")[0]
target_ctx = batch_cache.get(base_id)
# Sicherheits-Fallback (Hard-Link Integrity)
# Wenn das Ziel nicht im Cache ist, erlauben wir die Kante (Link-Erhalt).
if not target_ctx:
logger.info(f" [VALIDATION SKIP] No context for '{target_id}' - allowing link.")
return True
try:
logger.info(f"⚖️ [VALIDATING] Relation '{edge.get('kind')}' -> '{target_id}' (Profile: {profile_name})...")
# WP-25b: Lazy-Prompt Aufruf.
# Übergabe von prompt_key und Variablen für modell-optimierte Formatierung.
raw_response = await llm_service.generate_raw_response(
prompt_key="edge_validation",
variables={
"chunk_text": chunk_text[:1500],
"target_title": target_ctx.title,
"target_summary": target_ctx.summary,
"edge_kind": edge.get("kind", "related_to")
},
priority="background",
profile_name=profile_name
)
# Bereinigung zur Sicherstellung der Interpretierbarkeit (Mistral/Qwen Safe)
response = clean_llm_text(raw_response)
# Semantische Prüfung des Ergebnisses
is_valid = "YES" in response.upper()
if is_valid:
logger.info(f"✅ [VALIDATED] Relation to '{target_id}' confirmed.")
else:
logger.info(f"🚫 [REJECTED] Relation to '{target_id}' irrelevant for this chunk.")
return is_valid
except Exception as e:
error_str = str(e).lower()
error_type = type(e).__name__
# WP-25b: Differenzierung zwischen transienten und permanenten Fehlern
# Transiente Fehler (Netzwerk) → erlauben (Integrität vor Präzision)
if any(x in error_str for x in ["timeout", "connection", "network", "unreachable", "refused"]):
logger.warning(f"⚠️ Transient error for {target_id}: {error_type} - {e}. Allowing edge.")
return True
# Permanente Fehler → ablehnen (Graph-Qualität schützen)
logger.error(f"❌ Permanent validation error for {target_id}: {error_type} - {e}")
return False
async def validate_and_symmetrize(
chunk_text: str,
edge: Dict,
source_id: str,
batch_cache: Dict[str, NoteContext],
llm_service: Any,
profile_name: str = "ingest_validator"
) -> List[Dict]:
"""
WP-24c: Erweitertes Validierungs-Gateway.
Prüft die Primärkante und erzeugt bei Erfolg automatisch die inverse Kante.
Returns:
List[Dict]: Eine Liste mit 0, 1 (nur Primär) oder 2 (Primär + Invers) Kanten.
"""
# 1. Semantische Prüfung der Primärkante (A -> B)
is_valid = await validate_edge_candidate(
chunk_text=chunk_text,
edge=edge,
batch_cache=batch_cache,
llm_service=llm_service,
profile_name=profile_name
)
if not is_valid:
return []
validated_edges = [edge]
# 2. WP-24c: Symmetrie-Generierung (B -> A)
# Wir laden den inversen Typ dynamisch aus der EdgeRegistry (Single Source of Truth)
original_kind = edge.get("kind", "related_to")
inverse_kind = edge_registry.get_inverse(original_kind)
# Wir erzeugen eine inverse Kante nur, wenn ein sinnvoller inverser Typ existiert
# und das Ziel der Primärkante (to) valide ist.
target_id = edge.get("to")
if target_id and source_id:
# Die inverse Kante zeigt vom Ziel der Primärkante zurück zur Quelle.
# Sie wird als 'virtual' markiert, um sie im Retrieval/UI identifizierbar zu machen.
inverse_edge = {
"to": source_id,
"kind": inverse_kind,
"provenance": "structure", # System-generiert, geschützt durch Firewall
"confidence": edge.get("confidence", 0.9) * 0.9, # Leichte Dämpfung für virtuelle Pfade
"virtual": True,
"note_id": target_id, # Die Note, von der die inverse Kante ausgeht
"rule_id": f"symmetry:{original_kind}"
}
# Wir fügen die Symmetrie nur hinzu, wenn sie einen echten Mehrwert bietet
# (Vermeidung von redundanten related_to -> related_to Loops)
if inverse_kind != original_kind or original_kind not in ["related_to", "references"]:
validated_edges.append(inverse_edge)
logger.info(f"🔄 [SYMMETRY] Generated inverse edge: '{target_id}' --({inverse_kind})--> '{source_id}'")
return validated_edges