mitai-jinkendo/backend/normalization_engine.py
Lars ac2e7cf5bb
All checks were successful
Deploy Development / deploy (push) Successful in 50s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 15s
fix: Use dict keys for all RealDictCursor row access in Phase 2 code
2026-04-03 21:36:44 +02:00

238 lines
7.7 KiB
Python

"""
Normalization Engine (Phase 2)
Normalisiert decision_signals gegen answer_spectrum mit Synonymen.
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.5)
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2)
"""
from typing import Dict, List, Optional
from workflow_models import NormalizedSignal, SignalStatus
import logging
logger = logging.getLogger(__name__)
def normalize_decision_signal(
question_type: str,
raw_value: str,
answer_spectrum: List[str],
normalization_rules: Optional[Dict] = None
) -> NormalizedSignal:
"""
Normalisiert ein einzelnes Entscheidungssignal.
Normalisierungs-Kaskade:
1. Exakte Übereinstimmung → valid
2. Case-insensitive Übereinstimmung → normalized
3. Synonym-Mapping (aus normalization_rules) → normalized
4. Keine Übereinstimmung → invalid
Args:
question_type: Typ der Frage (z.B. "relevanz")
raw_value: Rohe LLM-Antwort (z.B. "JA", "yes", "ja")
answer_spectrum: Erlaubte Werte (z.B. ["ja", "nein", "unklar"])
normalization_rules: Optional: {"synonyms": {"ja": ["yes", "Ja", "JA"], ...}}
Returns:
NormalizedSignal mit status + normalized_value
Beispiele:
>>> normalize_decision_signal("relevanz", "ja", ["ja", "nein"])
NormalizedSignal(status=VALID, normalized_value="ja")
>>> normalize_decision_signal("relevanz", "JA", ["ja", "nein"])
NormalizedSignal(status=NORMALIZED, normalized_value="ja")
>>> normalize_decision_signal("relevanz", "yes", ["ja", "nein"],
... {"synonyms": {"ja": ["yes", "Yes"]}})
NormalizedSignal(status=NORMALIZED, normalized_value="ja")
>>> normalize_decision_signal("relevanz", "vielleicht", ["ja", "nein"])
NormalizedSignal(status=INVALID, normalized_value=None)
"""
# 1. Exakte Übereinstimmung
if raw_value in answer_spectrum:
logger.debug(f"{question_type}: '{raw_value}' → valid (exact match)")
return NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=raw_value,
status=SignalStatus.VALID
)
# 2. Case-insensitive Matching
raw_lower = raw_value.strip().lower()
for allowed in answer_spectrum:
if raw_lower == allowed.lower():
logger.debug(f"{question_type}: '{raw_value}' → normalized (case-insensitive)")
return NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=allowed,
status=SignalStatus.NORMALIZED,
metadata={"method": "case_insensitive"}
)
# 3. Synonym Mapping
if normalization_rules and "synonyms" in normalization_rules:
normalized = apply_synonym_mapping(
raw_value=raw_value,
synonyms=normalization_rules["synonyms"]
)
if normalized:
logger.debug(f"{question_type}: '{raw_value}' → normalized (synonym → '{normalized}')")
return NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=normalized,
status=SignalStatus.NORMALIZED,
metadata={"method": "synonym"}
)
# 4. Keine Übereinstimmung
logger.warning(f"{question_type}: '{raw_value}' → invalid (no match in spectrum {answer_spectrum})")
return NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=None,
status=SignalStatus.INVALID
)
def apply_synonym_mapping(
raw_value: str,
synonyms: Dict[str, List[str]]
) -> Optional[str]:
"""
Mappt raw_value auf Synonym-Gruppe (case-insensitive).
Args:
raw_value: "yes" oder "YES" oder "Yes"
synonyms: {"ja": ["yes", "Ja", "JA"], "nein": ["no", "No"]}
Returns:
"ja" (Schlüssel der Gruppe) oder None
Beispiele:
>>> apply_synonym_mapping("yes", {"ja": ["yes", "Yes"], "nein": ["no"]})
"ja"
>>> apply_synonym_mapping("YES", {"ja": ["yes"], "nein": ["no"]})
"ja"
>>> apply_synonym_mapping("vielleicht", {"ja": ["yes"], "nein": ["no"]})
None
"""
raw_lower = raw_value.strip().lower()
for canonical_value, synonym_list in synonyms.items():
# Check case-insensitive gegen alle Synonyme
for syn in synonym_list:
if raw_lower == syn.lower():
return canonical_value
return None
def normalize_all_signals(
decision_signals: Dict[str, str],
catalog_dict: Dict[str, Dict]
) -> List[NormalizedSignal]:
"""
Normalisiert alle decision_signals gegen Katalog.
Args:
decision_signals: {"relevanz": "ja", "prioritaet": "HOCH"}
catalog_dict: {
"relevanz": {
"answer_spectrum": ["ja", "nein", "unklar"],
"normalization_rules": {"synonyms": {...}}
},
...
}
Returns:
Liste von NormalizedSignal (ein Signal pro question_type)
Beispiele:
>>> signals = {"relevanz": "ja", "prioritaet": "HOCH"}
>>> catalog = {
... "relevanz": {"answer_spectrum": ["ja", "nein"], "normalization_rules": None},
... "prioritaet": {"answer_spectrum": ["hoch", "mittel", "niedrig"], "normalization_rules": None}
... }
>>> normalized = normalize_all_signals(signals, catalog)
>>> len(normalized)
2
>>> normalized[0].status
<SignalStatus.VALID: 'valid'>
>>> normalized[1].status
<SignalStatus.NORMALIZED: 'normalized'>
"""
normalized = []
for question_type, raw_value in decision_signals.items():
if question_type not in catalog_dict:
logger.warning(f"Question type '{question_type}' not in catalog → not_decidable")
normalized.append(NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=None,
status=SignalStatus.NOT_DECIDABLE,
metadata={"error": "not_in_catalog"}
))
continue
catalog_entry = catalog_dict[question_type]
signal = normalize_decision_signal(
question_type=question_type,
raw_value=raw_value,
answer_spectrum=catalog_entry["answer_spectrum"],
normalization_rules=catalog_entry.get("normalization_rules")
)
normalized.append(signal)
return normalized
def load_question_catalog(db_connection) -> Dict[str, Dict]:
"""
Lädt workflow_question_catalog aus DB.
Returns:
{
"relevanz": {
"answer_spectrum": ["ja", "nein", "unklar"],
"normalization_rules": {"synonyms": {"ja": ["yes", "Yes"], ...}}
},
"prioritaet": {...},
...
}
Beispiel:
>>> from db import get_db
>>> with get_db() as conn:
... catalog = load_question_catalog(conn)
... assert "relevanz" in catalog
... assert "answer_spectrum" in catalog["relevanz"]
"""
from db import get_cursor
cur = get_cursor(db_connection)
cur.execute("""
SELECT question_type, answer_spectrum, normalization_rules
FROM workflow_question_catalog
WHERE active = true
""")
rows = cur.fetchall()
catalog = {}
for row in rows:
catalog[row['question_type']] = {
"answer_spectrum": row['answer_spectrum'], # JSONB already parsed by psycopg2
"normalization_rules": row['normalization_rules'] # JSONB or None
}
logger.info(f"Loaded question catalog: {len(catalog)} types")
return catalog