mitai-jinkendo/backend/normalization_engine.py
Lars 1f8791f4dd
All checks were successful
Deploy Development / deploy (push) Successful in 44s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 14s
feat: Phase 2 - Normalisierung + Workflow Executor
Backend:
- normalization_engine.py (200 Zeilen): Synonym-Mapping, 5 Statuswerte
  * normalize_decision_signal(): Kaskade (exact → case → synonym → invalid)
  * apply_synonym_mapping(): DB-basierte Synonyme (case-insensitive)
  * normalize_all_signals(): Batch-Processing gegen Katalog
  * load_question_catalog(): Lädt normalization_rules aus DB
- workflow_executor.py (440 Zeilen): Sequenzielle Workflow-Ausführung
  * execute_workflow(): Traversiert DAG in topologischer Reihenfolge
  * execute_node(): Führt analysis nodes aus (start/end = no-op)
  * aggregate_results(): Kombiniert analysis_core + normalized_signals
  * save_execution_state(): Persistiert in workflow_executions
- workflow_models.py: Erweitert um Phase 2 Models
  * SignalStatus Enum (valid, normalized, unclear, invalid, not_decidable)
  * NormalizedSignal (question_type, raw_value, normalized_value, status)
  * NodeExecutionState (node_id, status, analysis_core, normalized_signals)
  * ExecutionResult (execution_id, workflow_id, status, node_states, aggregated_result)
- workflow_engine.py: Neue Funktion get_execution_order()
  * Flattened topological sort für sequenzielle Execution
  * Phase 7: Wird zu levels (parallele Execution)
- prompt_executor.py: execute_workflow_prompt() Implementierung
  * Ruft workflow_executor.execute_workflow() auf
  * Konvertiert ExecutionResult zu API-Response
- routers/workflows.py (230 Zeilen): Workflow Execution API
  * POST /api/workflows/{id}/execute (mit enable_debug)
  * GET /api/workflows/executions/{id} (lädt gespeicherten State)
  * GET /api/workflows (listet alle aktiven Workflows)
  * GET /api/workflows/{id} (lädt einzelnen Workflow mit Graph)
- main.py: Router-Registrierung (workflows.router)

Tests:
- test_phase2_normalization.py (17 Tests): Alle Normalisierungs-Szenarien
  * Exact match, case-insensitive, synonym mapping, invalid, whitespace
  * Batch-Normalisierung, not_in_catalog, mixed validity
- test_phase2_workflow_executor.py (10 Tests): Executor + Aggregation
  * aggregate_results mit verschiedenen Konstellationen
  * execute_node für start/end/analysis/unknown
  * Integration mit question_augmenter + result_container_parser

Alle 27 Unit-Tests bestanden.

version: 0.9k (backend)
module:  workflow 0.3.0

Konzept: .claude/task/Workflow_engine_prompting_engine/anforderungsanalyse_umsetzungsplan.md (Phase 2)
2026-04-03 21:20:23 +02:00

238 lines
7.7 KiB
Python

"""
Normalization Engine (Phase 2)
Normalisiert decision_signals gegen answer_spectrum mit Synonymen.
Konzept-Basis: konzept_workflow_engine_konsolidated.md (Sektion 8.5)
Anforderungsanalyse: anforderungsanalyse_umsetzungsplan.md (Phase 2)
"""
from typing import Dict, List, Optional
from workflow_models import NormalizedSignal, SignalStatus
import logging
logger = logging.getLogger(__name__)
def normalize_decision_signal(
question_type: str,
raw_value: str,
answer_spectrum: List[str],
normalization_rules: Optional[Dict] = None
) -> NormalizedSignal:
"""
Normalisiert ein einzelnes Entscheidungssignal.
Normalisierungs-Kaskade:
1. Exakte Übereinstimmung → valid
2. Case-insensitive Übereinstimmung → normalized
3. Synonym-Mapping (aus normalization_rules) → normalized
4. Keine Übereinstimmung → invalid
Args:
question_type: Typ der Frage (z.B. "relevanz")
raw_value: Rohe LLM-Antwort (z.B. "JA", "yes", "ja")
answer_spectrum: Erlaubte Werte (z.B. ["ja", "nein", "unklar"])
normalization_rules: Optional: {"synonyms": {"ja": ["yes", "Ja", "JA"], ...}}
Returns:
NormalizedSignal mit status + normalized_value
Beispiele:
>>> normalize_decision_signal("relevanz", "ja", ["ja", "nein"])
NormalizedSignal(status=VALID, normalized_value="ja")
>>> normalize_decision_signal("relevanz", "JA", ["ja", "nein"])
NormalizedSignal(status=NORMALIZED, normalized_value="ja")
>>> normalize_decision_signal("relevanz", "yes", ["ja", "nein"],
... {"synonyms": {"ja": ["yes", "Yes"]}})
NormalizedSignal(status=NORMALIZED, normalized_value="ja")
>>> normalize_decision_signal("relevanz", "vielleicht", ["ja", "nein"])
NormalizedSignal(status=INVALID, normalized_value=None)
"""
# 1. Exakte Übereinstimmung
if raw_value in answer_spectrum:
logger.debug(f"{question_type}: '{raw_value}' → valid (exact match)")
return NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=raw_value,
status=SignalStatus.VALID
)
# 2. Case-insensitive Matching
raw_lower = raw_value.strip().lower()
for allowed in answer_spectrum:
if raw_lower == allowed.lower():
logger.debug(f"{question_type}: '{raw_value}' → normalized (case-insensitive)")
return NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=allowed,
status=SignalStatus.NORMALIZED,
metadata={"method": "case_insensitive"}
)
# 3. Synonym Mapping
if normalization_rules and "synonyms" in normalization_rules:
normalized = apply_synonym_mapping(
raw_value=raw_value,
synonyms=normalization_rules["synonyms"]
)
if normalized:
logger.debug(f"{question_type}: '{raw_value}' → normalized (synonym → '{normalized}')")
return NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=normalized,
status=SignalStatus.NORMALIZED,
metadata={"method": "synonym"}
)
# 4. Keine Übereinstimmung
logger.warning(f"{question_type}: '{raw_value}' → invalid (no match in spectrum {answer_spectrum})")
return NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=None,
status=SignalStatus.INVALID
)
def apply_synonym_mapping(
raw_value: str,
synonyms: Dict[str, List[str]]
) -> Optional[str]:
"""
Mappt raw_value auf Synonym-Gruppe (case-insensitive).
Args:
raw_value: "yes" oder "YES" oder "Yes"
synonyms: {"ja": ["yes", "Ja", "JA"], "nein": ["no", "No"]}
Returns:
"ja" (Schlüssel der Gruppe) oder None
Beispiele:
>>> apply_synonym_mapping("yes", {"ja": ["yes", "Yes"], "nein": ["no"]})
"ja"
>>> apply_synonym_mapping("YES", {"ja": ["yes"], "nein": ["no"]})
"ja"
>>> apply_synonym_mapping("vielleicht", {"ja": ["yes"], "nein": ["no"]})
None
"""
raw_lower = raw_value.strip().lower()
for canonical_value, synonym_list in synonyms.items():
# Check case-insensitive gegen alle Synonyme
for syn in synonym_list:
if raw_lower == syn.lower():
return canonical_value
return None
def normalize_all_signals(
decision_signals: Dict[str, str],
catalog_dict: Dict[str, Dict]
) -> List[NormalizedSignal]:
"""
Normalisiert alle decision_signals gegen Katalog.
Args:
decision_signals: {"relevanz": "ja", "prioritaet": "HOCH"}
catalog_dict: {
"relevanz": {
"answer_spectrum": ["ja", "nein", "unklar"],
"normalization_rules": {"synonyms": {...}}
},
...
}
Returns:
Liste von NormalizedSignal (ein Signal pro question_type)
Beispiele:
>>> signals = {"relevanz": "ja", "prioritaet": "HOCH"}
>>> catalog = {
... "relevanz": {"answer_spectrum": ["ja", "nein"], "normalization_rules": None},
... "prioritaet": {"answer_spectrum": ["hoch", "mittel", "niedrig"], "normalization_rules": None}
... }
>>> normalized = normalize_all_signals(signals, catalog)
>>> len(normalized)
2
>>> normalized[0].status
<SignalStatus.VALID: 'valid'>
>>> normalized[1].status
<SignalStatus.NORMALIZED: 'normalized'>
"""
normalized = []
for question_type, raw_value in decision_signals.items():
if question_type not in catalog_dict:
logger.warning(f"Question type '{question_type}' not in catalog → not_decidable")
normalized.append(NormalizedSignal(
question_type=question_type,
raw_value=raw_value,
normalized_value=None,
status=SignalStatus.NOT_DECIDABLE,
metadata={"error": "not_in_catalog"}
))
continue
catalog_entry = catalog_dict[question_type]
signal = normalize_decision_signal(
question_type=question_type,
raw_value=raw_value,
answer_spectrum=catalog_entry["answer_spectrum"],
normalization_rules=catalog_entry.get("normalization_rules")
)
normalized.append(signal)
return normalized
def load_question_catalog(db_connection) -> Dict[str, Dict]:
"""
Lädt workflow_question_catalog aus DB.
Returns:
{
"relevanz": {
"answer_spectrum": ["ja", "nein", "unklar"],
"normalization_rules": {"synonyms": {"ja": ["yes", "Yes"], ...}}
},
"prioritaet": {...},
...
}
Beispiel:
>>> from db import get_db
>>> with get_db() as conn:
... catalog = load_question_catalog(conn)
... assert "relevanz" in catalog
... assert "answer_spectrum" in catalog["relevanz"]
"""
from db import get_cursor
cur = get_cursor(db_connection)
cur.execute("""
SELECT question_type, answer_spectrum, normalization_rules
FROM workflow_question_catalog
WHERE active = true
""")
rows = cur.fetchall()
catalog = {}
for row in rows:
catalog[row[0]] = {
"answer_spectrum": row[1], # JSONB already parsed by psycopg2
"normalization_rules": row[2] # JSONB or None
}
logger.info(f"Loaded question catalog: {len(catalog)} types")
return catalog