mitai-jinkendo/backend/placeholder_metadata_enhanced.py
Lars 650313347f
All checks were successful
Deploy Development / deploy (push) Successful in 54s
Build Test / lint-backend (push) Successful in 1s
Build Test / build-frontend (push) Successful in 15s
feat: Placeholder Metadata V2 - Normative Implementation + ZIP Export Fix
MAJOR CHANGES:
- Enhanced metadata schema with 7 QA fields
- Deterministic derivation logic (no guessing)
- Conservative inference (prefer unknown over wrong)
- Real source tracking (skip safe wrappers)
- Legacy mismatch detection
- Activity quality filter policies
- Completeness scoring (0-100)
- Unresolved fields tracking
- Fixed ZIP/JSON export auth (query param support)

FILES CHANGED:
- backend/placeholder_metadata.py (schema extended)
- backend/placeholder_metadata_enhanced.py (NEW, 418 lines)
- backend/generate_complete_metadata_v2.py (NEW, 334 lines)
- backend/tests/test_placeholder_metadata_v2.py (NEW, 302 lines)
- backend/routers/prompts.py (V2 integration + auth fix)
- docs/PLACEHOLDER_METADATA_VALIDATION.md (NEW, 541 lines)

PROBLEMS FIXED:
✓ value_raw extraction (type-aware, JSON parsing)
✓ Units for dimensionless values (scores, correlations)
✓ Safe wrappers as sources (now skipped)
✓ Time window guessing (confidence flags)
✓ Legacy inconsistencies (marked with flag)
✓ Missing quality filters (activity placeholders)
✓ No completeness metric (0-100 score)
✓ Orphaned placeholders (tracked)
✓ Unresolved fields (explicit list)
✓ ZIP/JSON export auth (query token support for downloads)

AUTH FIX:
- export-catalog-zip now accepts token via query param (?token=xxx)
- export-values-extended now accepts token via query param
- Allows browser downloads without custom headers

Konzept: docs/PLACEHOLDER_METADATA_REQUIREMENTS_V2_NORMATIVE.md

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 21:23:37 +02:00

418 lines
15 KiB
Python

"""
Enhanced Placeholder Metadata Extraction
Improved extraction logic that addresses quality issues:
1. Correct value_raw extraction
2. Accurate unit inference
3. Precise time_window detection
4. Real source provenance
5. Quality filter policies for activity placeholders
"""
import re
import json
from typing import Any, Optional, Tuple, Dict
from placeholder_metadata import (
PlaceholderType,
TimeWindow,
OutputType,
QualityFilterPolicy,
ConfidenceLogic,
ConfidenceLevel
)
# ── Enhanced Value Raw Extraction ─────────────────────────────────────────────
def extract_value_raw(value_display: str, output_type: OutputType, placeholder_type: PlaceholderType) -> Tuple[Any, bool]:
"""
Extract raw value from display string.
Returns: (raw_value, success)
"""
if not value_display or value_display in ['nicht verfügbar', 'nicht genug Daten']:
return None, True
# JSON output type
if output_type == OutputType.JSON:
try:
return json.loads(value_display), True
except (json.JSONDecodeError, TypeError):
# Try to find JSON in string
json_match = re.search(r'(\{.*\}|\[.*\])', value_display, re.DOTALL)
if json_match:
try:
return json.loads(json_match.group(1)), True
except:
pass
return None, False
# Markdown output type
if output_type == OutputType.MARKDOWN:
return value_display, True
# Number types
if output_type in [OutputType.NUMBER, OutputType.INTEGER]:
# Extract first number from string
match = re.search(r'([-+]?\d+\.?\d*)', value_display)
if match:
val = float(match.group(1))
return int(val) if output_type == OutputType.INTEGER else val, True
return None, False
# Date
if output_type == OutputType.DATE:
# Check if already ISO format
if re.match(r'\d{4}-\d{2}-\d{2}', value_display):
return value_display, True
return value_display, False # Unknown format
# String/Enum - return as-is
return value_display, True
# ── Enhanced Unit Inference ───────────────────────────────────────────────────
def infer_unit_strict(key: str, description: str, output_type: OutputType, placeholder_type: PlaceholderType) -> Optional[str]:
"""
Strict unit inference - only return unit if certain.
NO units for:
- Scores (dimensionless)
- Correlations (dimensionless)
- Percentages expressed as 0-100 scale
- Classifications/enums
- JSON/Markdown outputs
"""
key_lower = key.lower()
desc_lower = description.lower()
# JSON/Markdown never have units
if output_type in [OutputType.JSON, OutputType.MARKDOWN, OutputType.ENUM]:
return None
# Scores are dimensionless (0-100 scale)
if 'score' in key_lower or 'adequacy' in key_lower:
return None
# Correlations are dimensionless
if 'correlation' in key_lower:
return None
# Ratios/percentages on 0-100 scale
if any(x in key_lower for x in ['pct', 'ratio', 'balance', 'compliance', 'consistency']):
return None
# Classifications/quadrants
if 'quadrant' in key_lower or 'classification' in key_lower:
return None
# Weight/mass
if any(x in key_lower for x in ['weight', 'gewicht', 'fm_', 'lbm_', 'masse']):
return 'kg'
# Circumferences/lengths
if any(x in key_lower for x in ['umfang', 'waist', 'hip', 'chest', 'arm', 'leg', 'delta']) and 'circumference' in desc_lower:
return 'cm'
# Time durations
if any(x in key_lower for x in ['duration', 'dauer', 'debt']):
if 'hours' in desc_lower or 'stunden' in desc_lower:
return 'Stunden'
elif 'minutes' in desc_lower or 'minuten' in desc_lower:
return 'Minuten'
return None # Unclear
# Heart rate
if 'rhr' in key_lower or ('hr' in key_lower and 'hrv' not in key_lower) or 'puls' in key_lower:
return 'bpm'
# HRV
if 'hrv' in key_lower:
return 'ms'
# VO2 Max
if 'vo2' in key_lower:
return 'ml/kg/min'
# Calories/energy
if 'kcal' in key_lower or 'energy' in key_lower or 'energie' in key_lower:
return 'kcal'
# Macros (protein, carbs, fat)
if any(x in key_lower for x in ['protein', 'carb', 'fat', 'kohlenhydrat', 'fett']) and 'g' in desc_lower:
return 'g'
# Height
if 'height' in key_lower or 'größe' in key_lower:
return 'cm'
# Age
if 'age' in key_lower or 'alter' in key_lower:
return 'Jahre'
# BMI is dimensionless
if 'bmi' in key_lower:
return None
# Default: No unit (conservative)
return None
# ── Enhanced Time Window Detection ────────────────────────────────────────────
def detect_time_window_precise(
key: str,
description: str,
resolver_name: str,
semantic_contract: str
) -> Tuple[TimeWindow, bool, Optional[str]]:
"""
Detect time window with precision.
Returns: (time_window, is_certain, mismatch_note)
"""
key_lower = key.lower()
desc_lower = description.lower()
contract_lower = semantic_contract.lower()
# Explicit suffixes (highest confidence)
if '_7d' in key_lower:
return TimeWindow.DAYS_7, True, None
if '_14d' in key_lower:
return TimeWindow.DAYS_14, True, None
if '_28d' in key_lower:
return TimeWindow.DAYS_28, True, None
if '_30d' in key_lower:
return TimeWindow.DAYS_30, True, None
if '_90d' in key_lower:
return TimeWindow.DAYS_90, True, None
if '_3d' in key_lower:
return TimeWindow.DAYS_7, True, None # Map 3d to closest standard
# Latest/current
if any(x in key_lower for x in ['aktuell', 'latest', 'current', 'letzter']):
return TimeWindow.LATEST, True, None
# Check semantic contract for time window info
if '7 tag' in contract_lower or '7d' in contract_lower:
# Check for description mismatch
mismatch = None
if '30' in desc_lower or '28' in desc_lower:
mismatch = f"Description says 30d/28d but implementation is 7d"
return TimeWindow.DAYS_7, True, mismatch
if '28 tag' in contract_lower or '28d' in contract_lower:
mismatch = None
if '7' in desc_lower and '28' not in desc_lower:
mismatch = f"Description says 7d but implementation is 28d"
return TimeWindow.DAYS_28, True, mismatch
if '30 tag' in contract_lower or '30d' in contract_lower:
return TimeWindow.DAYS_30, True, None
if '90 tag' in contract_lower or '90d' in contract_lower:
return TimeWindow.DAYS_90, True, None
# Check description patterns
if 'letzte 7' in desc_lower or '7 tag' in desc_lower:
return TimeWindow.DAYS_7, False, None
if 'letzte 30' in desc_lower or '30 tag' in desc_lower:
return TimeWindow.DAYS_30, False, None
# Averages typically 30d unless specified
if 'avg' in key_lower or 'durchschn' in key_lower:
if '7' in desc_lower:
return TimeWindow.DAYS_7, False, None
return TimeWindow.DAYS_30, False, "Assumed 30d for average (not explicit)"
# Trends typically 28d
if 'trend' in key_lower:
return TimeWindow.DAYS_28, False, "Assumed 28d for trend"
# Week-based
if 'week' in key_lower or 'woche' in key_lower:
return TimeWindow.DAYS_7, False, None
# Profile data is latest
if key_lower in ['name', 'age', 'height', 'geschlecht']:
return TimeWindow.LATEST, True, None
# Unknown
return TimeWindow.UNKNOWN, False, "Could not determine time window from code or documentation"
# ── Enhanced Source Provenance ────────────────────────────────────────────────
def resolve_real_source(resolver_name: str) -> Tuple[Optional[str], Optional[str], list, str]:
"""
Resolve real source function (not safe wrappers).
Returns: (function, data_layer_module, source_tables, source_kind)
"""
# Skip safe wrappers - they're not real sources
if resolver_name in ['_safe_int', '_safe_float', '_safe_json', '_safe_str']:
return None, None, [], "wrapper"
# Direct mappings to data layer
source_map = {
# Body metrics
'get_latest_weight': ('get_latest_weight_data', 'body_metrics', ['weight_log'], 'direct'),
'get_weight_trend': ('get_weight_trend_data', 'body_metrics', ['weight_log'], 'computed'),
'get_latest_bf': ('get_body_composition_data', 'body_metrics', ['caliper_log'], 'direct'),
'get_circ_summary': ('get_circumference_summary_data', 'body_metrics', ['circumference_log'], 'aggregated'),
'get_caliper_summary': ('get_body_composition_data', 'body_metrics', ['caliper_log'], 'aggregated'),
'calculate_bmi': (None, None, ['weight_log', 'profiles'], 'computed'),
# Nutrition
'get_nutrition_avg': ('get_nutrition_average_data', 'nutrition_metrics', ['nutrition_log'], 'aggregated'),
'get_protein_per_kg': ('get_protein_targets_data', 'nutrition_metrics', ['nutrition_log', 'weight_log'], 'computed'),
'get_nutrition_days': ('get_nutrition_days_data', 'nutrition_metrics', ['nutrition_log'], 'computed'),
# Activity
'get_activity_summary': ('get_activity_summary_data', 'activity_metrics', ['activity_log', 'training_types'], 'aggregated'),
'get_activity_detail': ('get_activity_detail_data', 'activity_metrics', ['activity_log', 'training_types'], 'aggregated'),
'get_training_type_dist': ('get_training_type_distribution_data', 'activity_metrics', ['activity_log', 'training_types'], 'aggregated'),
# Sleep
'get_sleep_duration': ('get_sleep_duration_data', 'recovery_metrics', ['sleep_log'], 'aggregated'),
'get_sleep_quality': ('get_sleep_quality_data', 'recovery_metrics', ['sleep_log'], 'computed'),
# Vitals
'get_resting_hr': ('get_resting_heart_rate_data', 'health_metrics', ['vitals_baseline'], 'direct'),
'get_hrv': ('get_heart_rate_variability_data', 'health_metrics', ['vitals_baseline'], 'direct'),
'get_vo2_max': ('get_vo2_max_data', 'health_metrics', ['vitals_baseline'], 'direct'),
# Profile
'get_profile_data': (None, None, ['profiles'], 'direct'),
'calculate_age': (None, None, ['profiles'], 'computed'),
# Goals
'get_goal_weight': (None, None, ['goals'], 'direct'),
'get_goal_bf_pct': (None, None, ['goals'], 'direct'),
}
if resolver_name in source_map:
return source_map[resolver_name]
# Goals formatting functions
if resolver_name.startswith('_format_goals'):
return (None, None, ['goals', 'goal_focus_contributions'], 'interpreted')
# Unknown
return None, None, [], "unknown"
# ── Quality Filter Policy for Activity Placeholders ───────────────────────────
def create_activity_quality_policy(key: str) -> Optional[QualityFilterPolicy]:
"""
Create quality filter policy for activity-related placeholders.
"""
key_lower = key.lower()
# Activity-related placeholders need quality policies
if any(x in key_lower for x in ['activity', 'training', 'load', 'volume', 'quality_session', 'ability']):
return QualityFilterPolicy(
enabled=True,
default_filter_level="quality",
null_quality_handling="exclude",
includes_poor=False,
includes_excluded=False,
notes="Activity metrics filter for quality='quality' by default. NULL quality_label excluded."
)
return None
# ── Confidence Logic Creation ─────────────────────────────────────────────────
def create_confidence_logic(key: str, data_layer_module: Optional[str]) -> Optional[ConfidenceLogic]:
"""
Create confidence logic if applicable.
"""
key_lower = key.lower()
# Data layer functions typically have confidence
if data_layer_module:
return ConfidenceLogic(
supported=True,
calculation="Based on data availability and quality thresholds",
thresholds={"min_data_points": 1},
notes=f"Confidence determined by {data_layer_module}"
)
# Scores have implicit confidence
if 'score' in key_lower:
return ConfidenceLogic(
supported=True,
calculation="Based on data completeness for score components",
notes="Score confidence correlates with input data availability"
)
# Correlations have confidence
if 'correlation' in key_lower:
return ConfidenceLogic(
supported=True,
calculation="Pearson correlation with significance testing",
thresholds={"min_data_points": 7},
notes="Requires minimum 7 data points for meaningful correlation"
)
return None
# ── Metadata Completeness Score ───────────────────────────────────────────────
def calculate_completeness_score(metadata_dict: Dict) -> int:
"""
Calculate metadata completeness score (0-100).
Checks:
- Required fields filled
- Time window not unknown
- Output type not unknown
- Unit specified (if applicable)
- Source provenance complete
- Quality/confidence policies (if applicable)
"""
score = 0
max_score = 100
# Required fields (30 points)
if metadata_dict.get('category') and metadata_dict['category'] != 'Unknown':
score += 5
if metadata_dict.get('description') and 'No description' not in metadata_dict['description']:
score += 5
if metadata_dict.get('semantic_contract'):
score += 10
if metadata_dict.get('source', {}).get('resolver') and metadata_dict['source']['resolver'] != 'unknown':
score += 10
# Type specification (20 points)
if metadata_dict.get('type') and metadata_dict['type'] != 'legacy_unknown':
score += 10
if metadata_dict.get('time_window') and metadata_dict['time_window'] != 'unknown':
score += 10
# Output specification (20 points)
if metadata_dict.get('output_type') and metadata_dict['output_type'] != 'unknown':
score += 10
if metadata_dict.get('format_hint'):
score += 10
# Source provenance (20 points)
source = metadata_dict.get('source', {})
if source.get('data_layer_module'):
score += 10
if source.get('source_tables'):
score += 10
# Quality policies (10 points)
if metadata_dict.get('quality_filter_policy'):
score += 5
if metadata_dict.get('confidence_logic'):
score += 5
return min(score, max_score)