238 lines
7.7 KiB
Python
238 lines
7.7 KiB
Python
"""
|
|
Normalization Engine (Phase 2)
|
|
|
|
Normalisiert decision_signals gegen answer_spectrum mit Synonymen.
|
|
|
|
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.5)
|
|
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2)
|
|
"""
|
|
from typing import Dict, List, Optional
|
|
from workflow_models import NormalizedSignal, SignalStatus
|
|
import logging
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def normalize_decision_signal(
|
|
question_type: str,
|
|
raw_value: str,
|
|
answer_spectrum: List[str],
|
|
normalization_rules: Optional[Dict] = None
|
|
) -> NormalizedSignal:
|
|
"""
|
|
Normalisiert ein einzelnes Entscheidungssignal.
|
|
|
|
Normalisierungs-Kaskade:
|
|
1. Exakte Übereinstimmung → valid
|
|
2. Case-insensitive Übereinstimmung → normalized
|
|
3. Synonym-Mapping (aus normalization_rules) → normalized
|
|
4. Keine Übereinstimmung → invalid
|
|
|
|
Args:
|
|
question_type: Typ der Frage (z.B. "relevanz")
|
|
raw_value: Rohe LLM-Antwort (z.B. "JA", "yes", "ja")
|
|
answer_spectrum: Erlaubte Werte (z.B. ["ja", "nein", "unklar"])
|
|
normalization_rules: Optional: {"synonyms": {"ja": ["yes", "Ja", "JA"], ...}}
|
|
|
|
Returns:
|
|
NormalizedSignal mit status + normalized_value
|
|
|
|
Beispiele:
|
|
>>> normalize_decision_signal("relevanz", "ja", ["ja", "nein"])
|
|
NormalizedSignal(status=VALID, normalized_value="ja")
|
|
|
|
>>> normalize_decision_signal("relevanz", "JA", ["ja", "nein"])
|
|
NormalizedSignal(status=NORMALIZED, normalized_value="ja")
|
|
|
|
>>> normalize_decision_signal("relevanz", "yes", ["ja", "nein"],
|
|
... {"synonyms": {"ja": ["yes", "Yes"]}})
|
|
NormalizedSignal(status=NORMALIZED, normalized_value="ja")
|
|
|
|
>>> normalize_decision_signal("relevanz", "vielleicht", ["ja", "nein"])
|
|
NormalizedSignal(status=INVALID, normalized_value=None)
|
|
"""
|
|
# 1. Exakte Übereinstimmung
|
|
if raw_value in answer_spectrum:
|
|
logger.debug(f"{question_type}: '{raw_value}' → valid (exact match)")
|
|
return NormalizedSignal(
|
|
question_type=question_type,
|
|
raw_value=raw_value,
|
|
normalized_value=raw_value,
|
|
status=SignalStatus.VALID
|
|
)
|
|
|
|
# 2. Case-insensitive Matching
|
|
raw_lower = raw_value.strip().lower()
|
|
for allowed in answer_spectrum:
|
|
if raw_lower == allowed.lower():
|
|
logger.debug(f"{question_type}: '{raw_value}' → normalized (case-insensitive)")
|
|
return NormalizedSignal(
|
|
question_type=question_type,
|
|
raw_value=raw_value,
|
|
normalized_value=allowed,
|
|
status=SignalStatus.NORMALIZED,
|
|
metadata={"method": "case_insensitive"}
|
|
)
|
|
|
|
# 3. Synonym Mapping
|
|
if normalization_rules and "synonyms" in normalization_rules:
|
|
normalized = apply_synonym_mapping(
|
|
raw_value=raw_value,
|
|
synonyms=normalization_rules["synonyms"]
|
|
)
|
|
if normalized:
|
|
logger.debug(f"{question_type}: '{raw_value}' → normalized (synonym → '{normalized}')")
|
|
return NormalizedSignal(
|
|
question_type=question_type,
|
|
raw_value=raw_value,
|
|
normalized_value=normalized,
|
|
status=SignalStatus.NORMALIZED,
|
|
metadata={"method": "synonym"}
|
|
)
|
|
|
|
# 4. Keine Übereinstimmung
|
|
logger.warning(f"{question_type}: '{raw_value}' → invalid (no match in spectrum {answer_spectrum})")
|
|
return NormalizedSignal(
|
|
question_type=question_type,
|
|
raw_value=raw_value,
|
|
normalized_value=None,
|
|
status=SignalStatus.INVALID
|
|
)
|
|
|
|
|
|
def apply_synonym_mapping(
|
|
raw_value: str,
|
|
synonyms: Dict[str, List[str]]
|
|
) -> Optional[str]:
|
|
"""
|
|
Mappt raw_value auf Synonym-Gruppe (case-insensitive).
|
|
|
|
Args:
|
|
raw_value: "yes" oder "YES" oder "Yes"
|
|
synonyms: {"ja": ["yes", "Ja", "JA"], "nein": ["no", "No"]}
|
|
|
|
Returns:
|
|
"ja" (Schlüssel der Gruppe) oder None
|
|
|
|
Beispiele:
|
|
>>> apply_synonym_mapping("yes", {"ja": ["yes", "Yes"], "nein": ["no"]})
|
|
"ja"
|
|
|
|
>>> apply_synonym_mapping("YES", {"ja": ["yes"], "nein": ["no"]})
|
|
"ja"
|
|
|
|
>>> apply_synonym_mapping("vielleicht", {"ja": ["yes"], "nein": ["no"]})
|
|
None
|
|
"""
|
|
raw_lower = raw_value.strip().lower()
|
|
|
|
for canonical_value, synonym_list in synonyms.items():
|
|
# Check case-insensitive gegen alle Synonyme
|
|
for syn in synonym_list:
|
|
if raw_lower == syn.lower():
|
|
return canonical_value
|
|
|
|
return None
|
|
|
|
|
|
def normalize_all_signals(
|
|
decision_signals: Dict[str, str],
|
|
catalog_dict: Dict[str, Dict]
|
|
) -> List[NormalizedSignal]:
|
|
"""
|
|
Normalisiert alle decision_signals gegen Katalog.
|
|
|
|
Args:
|
|
decision_signals: {"relevanz": "ja", "prioritaet": "HOCH"}
|
|
catalog_dict: {
|
|
"relevanz": {
|
|
"answer_spectrum": ["ja", "nein", "unklar"],
|
|
"normalization_rules": {"synonyms": {...}}
|
|
},
|
|
...
|
|
}
|
|
|
|
Returns:
|
|
Liste von NormalizedSignal (ein Signal pro question_type)
|
|
|
|
Beispiele:
|
|
>>> signals = {"relevanz": "ja", "prioritaet": "HOCH"}
|
|
>>> catalog = {
|
|
... "relevanz": {"answer_spectrum": ["ja", "nein"], "normalization_rules": None},
|
|
... "prioritaet": {"answer_spectrum": ["hoch", "mittel", "niedrig"], "normalization_rules": None}
|
|
... }
|
|
>>> normalized = normalize_all_signals(signals, catalog)
|
|
>>> len(normalized)
|
|
2
|
|
>>> normalized[0].status
|
|
<SignalStatus.VALID: 'valid'>
|
|
>>> normalized[1].status
|
|
<SignalStatus.NORMALIZED: 'normalized'>
|
|
"""
|
|
normalized = []
|
|
|
|
for question_type, raw_value in decision_signals.items():
|
|
if question_type not in catalog_dict:
|
|
logger.warning(f"Question type '{question_type}' not in catalog → not_decidable")
|
|
normalized.append(NormalizedSignal(
|
|
question_type=question_type,
|
|
raw_value=raw_value,
|
|
normalized_value=None,
|
|
status=SignalStatus.NOT_DECIDABLE,
|
|
metadata={"error": "not_in_catalog"}
|
|
))
|
|
continue
|
|
|
|
catalog_entry = catalog_dict[question_type]
|
|
signal = normalize_decision_signal(
|
|
question_type=question_type,
|
|
raw_value=raw_value,
|
|
answer_spectrum=catalog_entry["answer_spectrum"],
|
|
normalization_rules=catalog_entry.get("normalization_rules")
|
|
)
|
|
normalized.append(signal)
|
|
|
|
return normalized
|
|
|
|
|
|
def load_question_catalog(db_connection) -> Dict[str, Dict]:
|
|
"""
|
|
Lädt workflow_question_catalog aus DB.
|
|
|
|
Returns:
|
|
{
|
|
"relevanz": {
|
|
"answer_spectrum": ["ja", "nein", "unklar"],
|
|
"normalization_rules": {"synonyms": {"ja": ["yes", "Yes"], ...}}
|
|
},
|
|
"prioritaet": {...},
|
|
...
|
|
}
|
|
|
|
Beispiel:
|
|
>>> from db import get_db
|
|
>>> with get_db() as conn:
|
|
... catalog = load_question_catalog(conn)
|
|
... assert "relevanz" in catalog
|
|
... assert "answer_spectrum" in catalog["relevanz"]
|
|
"""
|
|
from db import get_cursor
|
|
|
|
cur = get_cursor(db_connection)
|
|
cur.execute("""
|
|
SELECT question_type, answer_spectrum, normalization_rules
|
|
FROM workflow_question_catalog
|
|
WHERE active = true
|
|
""")
|
|
rows = cur.fetchall()
|
|
|
|
catalog = {}
|
|
for row in rows:
|
|
catalog[row['question_type']] = {
|
|
"answer_spectrum": row['answer_spectrum'], # JSONB already parsed by psycopg2
|
|
"normalization_rules": row['normalization_rules'] # JSONB or None
|
|
}
|
|
|
|
logger.info(f"Loaded question catalog: {len(catalog)} types")
|
|
return catalog
|