""" Placeholder Metadata Extractor Automatically extracts metadata from existing codebase for all placeholders. This module bridges the gap between legacy implementation and normative standard. """ import re import inspect from typing import Dict, List, Optional, Tuple, Any from placeholder_metadata import ( PlaceholderMetadata, PlaceholderMetadataRegistry, PlaceholderType, TimeWindow, OutputType, SourceInfo, MissingValuePolicy, ExceptionHandling, ConfidenceLogic, QualityFilterPolicy, UsedBy, METADATA_REGISTRY ) # ── Heuristics ──────────────────────────────────────────────────────────────── def infer_type_from_key(key: str, description: str) -> PlaceholderType: """ Infer placeholder type from key and description. Heuristics: - JSON/Markdown in name → interpreted or raw_data - "score", "pct", "ratio" → atomic - "summary", "detail" → raw_data or interpreted """ key_lower = key.lower() desc_lower = description.lower() # JSON/Markdown outputs if '_json' in key_lower or '_md' in key_lower: return PlaceholderType.RAW_DATA # Scores and percentages are atomic if any(x in key_lower for x in ['score', 'pct', '_vs_', 'ratio', 'adequacy']): return PlaceholderType.ATOMIC # Summaries and details if any(x in key_lower for x in ['summary', 'detail', 'verteilung', 'distribution']): return PlaceholderType.RAW_DATA # Goals and focus areas (interpreted) if any(x in key_lower for x in ['goal', 'focus', 'top_']): return PlaceholderType.INTERPRETED # Correlations are interpreted if 'correlation' in key_lower or 'plateau' in key_lower or 'driver' in key_lower: return PlaceholderType.INTERPRETED # Default: atomic return PlaceholderType.ATOMIC def infer_time_window_from_key(key: str) -> TimeWindow: """ Infer time window from placeholder key. Patterns: - _7d → 7d - _28d → 28d - _30d → 30d - _90d → 90d - aktuell, latest, current → latest - avg, median → usually 28d or 30d (default to 30d) """ key_lower = key.lower() # Explicit time windows if '_7d' in key_lower: return TimeWindow.DAYS_7 if '_14d' in key_lower: return TimeWindow.DAYS_14 if '_28d' in key_lower: return TimeWindow.DAYS_28 if '_30d' in key_lower: return TimeWindow.DAYS_30 if '_90d' in key_lower: return TimeWindow.DAYS_90 # Latest/current if any(x in key_lower for x in ['aktuell', 'latest', 'current', 'letzt']): return TimeWindow.LATEST # Averages default to 30d if 'avg' in key_lower or 'durchschn' in key_lower: return TimeWindow.DAYS_30 # Trends default to 28d if 'trend' in key_lower: return TimeWindow.DAYS_28 # Week-based metrics if 'week' in key_lower or 'woche' in key_lower: return TimeWindow.DAYS_7 # Profile data is always latest if key_lower in ['name', 'age', 'height', 'geschlecht']: return TimeWindow.LATEST # Default: unknown return TimeWindow.UNKNOWN def infer_output_type_from_key(key: str) -> OutputType: """ Infer output data type from key. Heuristics: - _json → json - _md → markdown - score, pct, ratio → integer - avg, median, delta, change → number - name, geschlecht → string - datum, date → date """ key_lower = key.lower() if '_json' in key_lower: return OutputType.JSON if '_md' in key_lower: return OutputType.MARKDOWN if key_lower in ['datum_heute', 'zeitraum_7d', 'zeitraum_30d', 'zeitraum_90d']: return OutputType.DATE if any(x in key_lower for x in ['score', 'pct', 'count', 'days', 'frequency']): return OutputType.INTEGER if any(x in key_lower for x in ['avg', 'median', 'delta', 'change', 'slope', 'weight', 'ratio', 'balance', 'trend']): return OutputType.NUMBER if key_lower in ['name', 'geschlecht', 'quadrant']: return OutputType.STRING # Default: string (most placeholders format to string for AI) return OutputType.STRING def infer_unit_from_key_and_description(key: str, description: str) -> Optional[str]: """ Infer unit from key and description. Common units: - weight → kg - duration, time → Stunden or Minuten - percentage → % - distance → km - heart rate → bpm """ key_lower = key.lower() desc_lower = description.lower() # Weight if 'weight' in key_lower or 'gewicht' in key_lower or any(x in key_lower for x in ['fm_', 'lbm_']): return 'kg' # Body fat, percentages if any(x in key_lower for x in ['kf_', 'pct', '_bf', 'adequacy', 'score', 'balance', 'compliance', 'quality']): return '%' # Circumferences if any(x in key_lower for x in ['umfang', 'waist', 'hip', 'chest', 'arm', 'leg']): return 'cm' # Time/duration if any(x in key_lower for x in ['duration', 'dauer', 'hours', 'stunden', 'minutes', 'debt']): if 'hours' in desc_lower or 'stunden' in desc_lower: return 'Stunden' elif 'minutes' in desc_lower or 'minuten' in desc_lower: return 'Minuten' else: return 'Stunden' # Default # Heart rate if 'hr' in key_lower or 'herzfrequenz' in key_lower or 'puls' in key_lower: return 'bpm' # HRV if 'hrv' in key_lower: return 'ms' # VO2 Max if 'vo2' in key_lower: return 'ml/kg/min' # Calories/energy if 'kcal' in key_lower or 'energy' in key_lower or 'energie' in key_lower: return 'kcal' # Macros if any(x in key_lower for x in ['protein', 'carb', 'fat', 'kohlenhydrat', 'fett']): return 'g' # Height if 'height' in key_lower or 'größe' in key_lower: return 'cm' # Age if 'age' in key_lower or 'alter' in key_lower: return 'Jahre' # BMI if 'bmi' in key_lower: return None # BMI has no unit # Load if 'load' in key_lower: return None # Unitless # Default: None return None def extract_resolver_name(resolver_func) -> str: """ Extract resolver function name from lambda or function. Most resolvers are lambdas like: lambda pid: function_name(pid) We want to extract the function_name. """ try: # Get source code of lambda source = inspect.getsource(resolver_func).strip() # Pattern: lambda pid: function_name(...) match = re.search(r'lambda\s+\w+:\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\(', source) if match: return match.group(1) # Pattern: direct function reference if hasattr(resolver_func, '__name__'): return resolver_func.__name__ except (OSError, TypeError): pass return "unknown" def analyze_data_layer_usage(resolver_name: str) -> Tuple[Optional[str], Optional[str], List[str]]: """ Analyze which data_layer function and tables are used. Returns: (data_layer_function, data_layer_module, source_tables) This is a heuristic analysis based on naming patterns. """ # Map common resolver patterns to data layer modules data_layer_mapping = { 'get_latest_weight': ('get_latest_weight_data', 'body_metrics', ['weight_log']), 'get_weight_trend': ('get_weight_trend_data', 'body_metrics', ['weight_log']), 'get_latest_bf': ('get_body_composition_data', 'body_metrics', ['caliper_log']), 'get_circ_summary': ('get_circumference_summary_data', 'body_metrics', ['circumference_log']), 'get_caliper_summary': ('get_body_composition_data', 'body_metrics', ['caliper_log']), # Nutrition 'get_nutrition_avg': ('get_nutrition_average_data', 'nutrition_metrics', ['nutrition_log']), 'get_protein_per_kg': ('get_protein_targets_data', 'nutrition_metrics', ['nutrition_log', 'weight_log']), # Activity 'get_activity_summary': ('get_activity_summary_data', 'activity_metrics', ['activity_log']), 'get_activity_detail': ('get_activity_detail_data', 'activity_metrics', ['activity_log', 'training_types']), 'get_training_type_dist': ('get_training_type_distribution_data', 'activity_metrics', ['activity_log', 'training_types']), # Sleep 'get_sleep_duration': ('get_sleep_duration_data', 'recovery_metrics', ['sleep_log']), 'get_sleep_quality': ('get_sleep_quality_data', 'recovery_metrics', ['sleep_log']), # Vitals 'get_resting_hr': ('get_resting_heart_rate_data', 'health_metrics', ['vitals_baseline']), 'get_hrv': ('get_heart_rate_variability_data', 'health_metrics', ['vitals_baseline']), 'get_vo2_max': ('get_vo2_max_data', 'health_metrics', ['vitals_baseline']), # Goals '_safe_json': (None, None, ['goals', 'focus_area_definitions', 'goal_focus_contributions']), '_safe_str': (None, None, []), '_safe_int': (None, None, []), '_safe_float': (None, None, []), } # Try to find mapping for pattern, (func, module, tables) in data_layer_mapping.items(): if pattern in resolver_name: return func, module, tables # Default: unknown return None, None, [] # ── Main Extraction ─────────────────────────────────────────────────────────── def extract_metadata_from_placeholder_map( placeholder_map: Dict[str, Any], catalog: Dict[str, List[Dict[str, str]]] ) -> Dict[str, PlaceholderMetadata]: """ Extract metadata for all placeholders from PLACEHOLDER_MAP and catalog. Args: placeholder_map: The PLACEHOLDER_MAP dict from placeholder_resolver catalog: The catalog from get_placeholder_catalog() Returns: Dict mapping key to PlaceholderMetadata """ # Flatten catalog for easy lookup catalog_flat = {} for category, items in catalog.items(): for item in items: catalog_flat[item['key']] = { 'category': category, 'description': item['description'] } metadata_dict = {} for placeholder_full, resolver_func in placeholder_map.items(): # Extract key (remove {{ }}) key = placeholder_full.replace('{{', '').replace('}}', '') # Get catalog info catalog_info = catalog_flat.get(key, { 'category': 'Unknown', 'description': 'No description available' }) category = catalog_info['category'] description = catalog_info['description'] # Extract resolver name resolver_name = extract_resolver_name(resolver_func) # Infer metadata using heuristics ptype = infer_type_from_key(key, description) time_window = infer_time_window_from_key(key) output_type = infer_output_type_from_key(key) unit = infer_unit_from_key_and_description(key, description) # Analyze data layer usage dl_func, dl_module, source_tables = analyze_data_layer_usage(resolver_name) # Build source info source = SourceInfo( resolver=resolver_name, module="placeholder_resolver.py", function=dl_func, data_layer_module=dl_module, source_tables=source_tables ) # Build semantic contract (enhanced description) semantic_contract = build_semantic_contract(key, description, time_window, ptype) # Format hint format_hint = build_format_hint(key, unit, output_type) # Create metadata metadata = PlaceholderMetadata( key=key, placeholder=placeholder_full, category=category, type=ptype, description=description, semantic_contract=semantic_contract, unit=unit, time_window=time_window, output_type=output_type, format_hint=format_hint, example_output=None, # Will be filled at runtime source=source, dependencies=['profile_id'], # All placeholders depend on profile_id used_by=UsedBy(), # Will be filled by usage analysis version="1.0.0", deprecated=False, known_issues=[], notes=[] ) metadata_dict[key] = metadata return metadata_dict def build_semantic_contract(key: str, description: str, time_window: TimeWindow, ptype: PlaceholderType) -> str: """ Build detailed semantic contract from available information. """ base = description # Add time window info if time_window == TimeWindow.LATEST: base += " (letzter verfügbarer Wert)" elif time_window != TimeWindow.UNKNOWN: base += f" (Zeitfenster: {time_window.value})" # Add type info if ptype == PlaceholderType.INTERPRETED: base += " [KI-interpretiert]" elif ptype == PlaceholderType.RAW_DATA: base += " [Strukturierte Rohdaten]" return base def build_format_hint(key: str, unit: Optional[str], output_type: OutputType) -> Optional[str]: """ Build format hint based on key, unit, and output type. """ if output_type == OutputType.JSON: return "JSON object" elif output_type == OutputType.MARKDOWN: return "Markdown-formatted text" elif output_type == OutputType.DATE: return "YYYY-MM-DD" elif unit: if output_type == OutputType.NUMBER: return f"12.3 {unit}" elif output_type == OutputType.INTEGER: return f"85 {unit}" else: return f"Wert {unit}" else: if output_type == OutputType.NUMBER: return "12.3" elif output_type == OutputType.INTEGER: return "85" else: return "Text" # ── Usage Analysis ──────────────────────────────────────────────────────────── def analyze_placeholder_usage(profile_id: str) -> Dict[str, UsedBy]: """ Analyze where each placeholder is used (prompts, pipelines, charts). This requires database access to check ai_prompts table. Returns dict mapping placeholder key to UsedBy object. """ from db import get_db, get_cursor, r2d usage_map: Dict[str, UsedBy] = {} with get_db() as conn: cur = get_cursor(conn) # Get all prompts cur.execute("SELECT name, template, stages FROM ai_prompts") prompts = [r2d(row) for row in cur.fetchall()] # Analyze each prompt for prompt in prompts: # Check template template = prompt.get('template') or '' if template: # Only process if template is not empty/None found_placeholders = re.findall(r'\{\{(\w+)\}\}', template) for ph_key in found_placeholders: if ph_key not in usage_map: usage_map[ph_key] = UsedBy() if prompt['name'] not in usage_map[ph_key].prompts: usage_map[ph_key].prompts.append(prompt['name']) # Check stages (pipeline prompts) stages = prompt.get('stages') if stages: for stage in stages: for stage_prompt in stage.get('prompts', []): template = stage_prompt.get('template') or '' if not template: # Skip if template is None/empty continue found_placeholders = re.findall(r'\{\{(\w+)\}\}', template) for ph_key in found_placeholders: if ph_key not in usage_map: usage_map[ph_key] = UsedBy() if prompt['name'] not in usage_map[ph_key].pipelines: usage_map[ph_key].pipelines.append(prompt['name']) return usage_map # ── Main Entry Point ────────────────────────────────────────────────────────── def build_complete_metadata_registry(profile_id: str = None) -> PlaceholderMetadataRegistry: """ Build complete metadata registry by extracting from codebase. Args: profile_id: Optional profile ID for usage analysis Returns: PlaceholderMetadataRegistry with all metadata """ from placeholder_resolver import PLACEHOLDER_MAP, get_placeholder_catalog # Get catalog (use dummy profile if not provided) if not profile_id: # Use first available profile or create dummy from db import get_db, get_cursor with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT id FROM profiles LIMIT 1") row = cur.fetchone() profile_id = row['id'] if row else 'dummy' catalog = get_placeholder_catalog(profile_id) # Extract base metadata metadata_dict = extract_metadata_from_placeholder_map(PLACEHOLDER_MAP, catalog) # Analyze usage if profile_id != 'dummy': usage_map = analyze_placeholder_usage(profile_id) for key, used_by in usage_map.items(): if key in metadata_dict: metadata_dict[key].used_by = used_by # Register all metadata registry = PlaceholderMetadataRegistry() for metadata in metadata_dict.values(): try: registry.register(metadata, validate=False) # Don't validate during initial extraction except Exception as e: print(f"Warning: Failed to register {metadata.key}: {e}") return registry if __name__ == "__main__": # Test extraction print("Building metadata registry...") registry = build_complete_metadata_registry() print(f"Extracted metadata for {registry.count()} placeholders") # Show sample all_metadata = registry.get_all() if all_metadata: sample_key = list(all_metadata.keys())[0] sample = all_metadata[sample_key] print(f"\nSample metadata for '{sample_key}':") print(sample.to_json())