mitai-jinkendo/backend/placeholder_metadata_extractor.py
Lars a04e7cc042
All checks were successful
Deploy Development / deploy (push) Successful in 44s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
feat: Complete Placeholder Metadata System (Normative Standard v1.0.0)
Implements comprehensive metadata system for all 116 placeholders according to
PLACEHOLDER_METADATA_REQUIREMENTS_V2_NORMATIVE standard.

Backend:
- placeholder_metadata.py: Complete schema (PlaceholderMetadata, Registry, Validation)
- placeholder_metadata_extractor.py: Automatic extraction with heuristics
- placeholder_metadata_complete.py: Hand-curated metadata for all 116 placeholders
- generate_complete_metadata.py: Metadata generation with manual corrections
- generate_placeholder_catalog.py: Documentation generator (4 output files)
- routers/prompts.py: New extended export endpoint (non-breaking)
- tests/test_placeholder_metadata.py: Comprehensive test suite

Documentation:
- PLACEHOLDER_GOVERNANCE.md: Mandatory governance guidelines
- PLACEHOLDER_METADATA_IMPLEMENTATION_SUMMARY.md: Complete implementation docs

Features:
- Normative compliant metadata for all 116 placeholders
- Non-breaking extended export API endpoint
- Automatic + manual metadata curation
- Validation framework with error/warning levels
- Gap reporting for unresolved fields
- Catalog generator (JSON, Markdown, Gap Report, Export Spec)
- Test suite (20+ tests)
- Governance rules for future placeholders

API:
- GET /api/prompts/placeholders/export-values-extended (NEW)
- GET /api/prompts/placeholders/export-values (unchanged, backward compatible)

Architecture:
- PlaceholderType enum: atomic, raw_data, interpreted, legacy_unknown
- TimeWindow enum: latest, 7d, 14d, 28d, 30d, 90d, custom, mixed, unknown
- OutputType enum: string, number, integer, boolean, json, markdown, date, enum
- Complete source tracking (resolver, data_layer, tables)
- Runtime value resolution
- Usage tracking (prompts, pipelines, charts)

Statistics:
- 6 new Python modules (~2500+ lines)
- 1 modified module (extended)
- 2 new documentation files
- 4 generated documentation files (to be created in Docker)
- 20+ test cases
- 116 placeholders inventoried

Next Steps:
1. Run in Docker: python /app/generate_placeholder_catalog.py
2. Test extended export endpoint
3. Verify all 116 placeholders have complete metadata

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
2026-03-29 20:32:37 +02:00

549 lines
18 KiB
Python

"""
Placeholder Metadata Extractor
Automatically extracts metadata from existing codebase for all placeholders.
This module bridges the gap between legacy implementation and normative standard.
"""
import re
import inspect
from typing import Dict, List, Optional, Tuple, Any
from placeholder_metadata import (
PlaceholderMetadata,
PlaceholderMetadataRegistry,
PlaceholderType,
TimeWindow,
OutputType,
SourceInfo,
MissingValuePolicy,
ExceptionHandling,
ConfidenceLogic,
QualityFilterPolicy,
UsedBy,
METADATA_REGISTRY
)
# ── Heuristics ────────────────────────────────────────────────────────────────
def infer_type_from_key(key: str, description: str) -> PlaceholderType:
"""
Infer placeholder type from key and description.
Heuristics:
- JSON/Markdown in name → interpreted or raw_data
- "score", "pct", "ratio" → atomic
- "summary", "detail" → raw_data or interpreted
"""
key_lower = key.lower()
desc_lower = description.lower()
# JSON/Markdown outputs
if '_json' in key_lower or '_md' in key_lower:
return PlaceholderType.RAW_DATA
# Scores and percentages are atomic
if any(x in key_lower for x in ['score', 'pct', '_vs_', 'ratio', 'adequacy']):
return PlaceholderType.ATOMIC
# Summaries and details
if any(x in key_lower for x in ['summary', 'detail', 'verteilung', 'distribution']):
return PlaceholderType.RAW_DATA
# Goals and focus areas (interpreted)
if any(x in key_lower for x in ['goal', 'focus', 'top_']):
return PlaceholderType.INTERPRETED
# Correlations are interpreted
if 'correlation' in key_lower or 'plateau' in key_lower or 'driver' in key_lower:
return PlaceholderType.INTERPRETED
# Default: atomic
return PlaceholderType.ATOMIC
def infer_time_window_from_key(key: str) -> TimeWindow:
"""
Infer time window from placeholder key.
Patterns:
- _7d → 7d
- _28d → 28d
- _30d → 30d
- _90d → 90d
- aktuell, latest, current → latest
- avg, median → usually 28d or 30d (default to 30d)
"""
key_lower = key.lower()
# Explicit time windows
if '_7d' in key_lower:
return TimeWindow.DAYS_7
if '_14d' in key_lower:
return TimeWindow.DAYS_14
if '_28d' in key_lower:
return TimeWindow.DAYS_28
if '_30d' in key_lower:
return TimeWindow.DAYS_30
if '_90d' in key_lower:
return TimeWindow.DAYS_90
# Latest/current
if any(x in key_lower for x in ['aktuell', 'latest', 'current', 'letzt']):
return TimeWindow.LATEST
# Averages default to 30d
if 'avg' in key_lower or 'durchschn' in key_lower:
return TimeWindow.DAYS_30
# Trends default to 28d
if 'trend' in key_lower:
return TimeWindow.DAYS_28
# Week-based metrics
if 'week' in key_lower or 'woche' in key_lower:
return TimeWindow.DAYS_7
# Profile data is always latest
if key_lower in ['name', 'age', 'height', 'geschlecht']:
return TimeWindow.LATEST
# Default: unknown
return TimeWindow.UNKNOWN
def infer_output_type_from_key(key: str) -> OutputType:
"""
Infer output data type from key.
Heuristics:
- _json → json
- _md → markdown
- score, pct, ratio → integer
- avg, median, delta, change → number
- name, geschlecht → string
- datum, date → date
"""
key_lower = key.lower()
if '_json' in key_lower:
return OutputType.JSON
if '_md' in key_lower:
return OutputType.MARKDOWN
if key_lower in ['datum_heute', 'zeitraum_7d', 'zeitraum_30d', 'zeitraum_90d']:
return OutputType.DATE
if any(x in key_lower for x in ['score', 'pct', 'count', 'days', 'frequency']):
return OutputType.INTEGER
if any(x in key_lower for x in ['avg', 'median', 'delta', 'change', 'slope',
'weight', 'ratio', 'balance', 'trend']):
return OutputType.NUMBER
if key_lower in ['name', 'geschlecht', 'quadrant']:
return OutputType.STRING
# Default: string (most placeholders format to string for AI)
return OutputType.STRING
def infer_unit_from_key_and_description(key: str, description: str) -> Optional[str]:
"""
Infer unit from key and description.
Common units:
- weight → kg
- duration, time → Stunden or Minuten
- percentage → %
- distance → km
- heart rate → bpm
"""
key_lower = key.lower()
desc_lower = description.lower()
# Weight
if 'weight' in key_lower or 'gewicht' in key_lower or any(x in key_lower for x in ['fm_', 'lbm_']):
return 'kg'
# Body fat, percentages
if any(x in key_lower for x in ['kf_', 'pct', '_bf', 'adequacy', 'score',
'balance', 'compliance', 'quality']):
return '%'
# Circumferences
if any(x in key_lower for x in ['umfang', 'waist', 'hip', 'chest', 'arm', 'leg']):
return 'cm'
# Time/duration
if any(x in key_lower for x in ['duration', 'dauer', 'hours', 'stunden', 'minutes', 'debt']):
if 'hours' in desc_lower or 'stunden' in desc_lower:
return 'Stunden'
elif 'minutes' in desc_lower or 'minuten' in desc_lower:
return 'Minuten'
else:
return 'Stunden' # Default
# Heart rate
if 'hr' in key_lower or 'herzfrequenz' in key_lower or 'puls' in key_lower:
return 'bpm'
# HRV
if 'hrv' in key_lower:
return 'ms'
# VO2 Max
if 'vo2' in key_lower:
return 'ml/kg/min'
# Calories/energy
if 'kcal' in key_lower or 'energy' in key_lower or 'energie' in key_lower:
return 'kcal'
# Macros
if any(x in key_lower for x in ['protein', 'carb', 'fat', 'kohlenhydrat', 'fett']):
return 'g'
# Height
if 'height' in key_lower or 'größe' in key_lower:
return 'cm'
# Age
if 'age' in key_lower or 'alter' in key_lower:
return 'Jahre'
# BMI
if 'bmi' in key_lower:
return None # BMI has no unit
# Load
if 'load' in key_lower:
return None # Unitless
# Default: None
return None
def extract_resolver_name(resolver_func) -> str:
"""
Extract resolver function name from lambda or function.
Most resolvers are lambdas like: lambda pid: function_name(pid)
We want to extract the function_name.
"""
try:
# Get source code of lambda
source = inspect.getsource(resolver_func).strip()
# Pattern: lambda pid: function_name(...)
match = re.search(r'lambda\s+\w+:\s*([a-zA-Z_][a-zA-Z0-9_]*)\s*\(', source)
if match:
return match.group(1)
# Pattern: direct function reference
if hasattr(resolver_func, '__name__'):
return resolver_func.__name__
except (OSError, TypeError):
pass
return "unknown"
def analyze_data_layer_usage(resolver_name: str) -> Tuple[Optional[str], Optional[str], List[str]]:
"""
Analyze which data_layer function and tables are used.
Returns: (data_layer_function, data_layer_module, source_tables)
This is a heuristic analysis based on naming patterns.
"""
# Map common resolver patterns to data layer modules
data_layer_mapping = {
'get_latest_weight': ('get_latest_weight_data', 'body_metrics', ['weight_log']),
'get_weight_trend': ('get_weight_trend_data', 'body_metrics', ['weight_log']),
'get_latest_bf': ('get_body_composition_data', 'body_metrics', ['caliper_log']),
'get_circ_summary': ('get_circumference_summary_data', 'body_metrics', ['circumference_log']),
'get_caliper_summary': ('get_body_composition_data', 'body_metrics', ['caliper_log']),
# Nutrition
'get_nutrition_avg': ('get_nutrition_average_data', 'nutrition_metrics', ['nutrition_log']),
'get_protein_per_kg': ('get_protein_targets_data', 'nutrition_metrics', ['nutrition_log', 'weight_log']),
# Activity
'get_activity_summary': ('get_activity_summary_data', 'activity_metrics', ['activity_log']),
'get_activity_detail': ('get_activity_detail_data', 'activity_metrics', ['activity_log', 'training_types']),
'get_training_type_dist': ('get_training_type_distribution_data', 'activity_metrics', ['activity_log', 'training_types']),
# Sleep
'get_sleep_duration': ('get_sleep_duration_data', 'recovery_metrics', ['sleep_log']),
'get_sleep_quality': ('get_sleep_quality_data', 'recovery_metrics', ['sleep_log']),
# Vitals
'get_resting_hr': ('get_resting_heart_rate_data', 'health_metrics', ['vitals_baseline']),
'get_hrv': ('get_heart_rate_variability_data', 'health_metrics', ['vitals_baseline']),
'get_vo2_max': ('get_vo2_max_data', 'health_metrics', ['vitals_baseline']),
# Goals
'_safe_json': (None, None, ['goals', 'focus_area_definitions', 'goal_focus_contributions']),
'_safe_str': (None, None, []),
'_safe_int': (None, None, []),
'_safe_float': (None, None, []),
}
# Try to find mapping
for pattern, (func, module, tables) in data_layer_mapping.items():
if pattern in resolver_name:
return func, module, tables
# Default: unknown
return None, None, []
# ── Main Extraction ───────────────────────────────────────────────────────────
def extract_metadata_from_placeholder_map(
placeholder_map: Dict[str, Any],
catalog: Dict[str, List[Dict[str, str]]]
) -> Dict[str, PlaceholderMetadata]:
"""
Extract metadata for all placeholders from PLACEHOLDER_MAP and catalog.
Args:
placeholder_map: The PLACEHOLDER_MAP dict from placeholder_resolver
catalog: The catalog from get_placeholder_catalog()
Returns:
Dict mapping key to PlaceholderMetadata
"""
# Flatten catalog for easy lookup
catalog_flat = {}
for category, items in catalog.items():
for item in items:
catalog_flat[item['key']] = {
'category': category,
'description': item['description']
}
metadata_dict = {}
for placeholder_full, resolver_func in placeholder_map.items():
# Extract key (remove {{ }})
key = placeholder_full.replace('{{', '').replace('}}', '')
# Get catalog info
catalog_info = catalog_flat.get(key, {
'category': 'Unknown',
'description': 'No description available'
})
category = catalog_info['category']
description = catalog_info['description']
# Extract resolver name
resolver_name = extract_resolver_name(resolver_func)
# Infer metadata using heuristics
ptype = infer_type_from_key(key, description)
time_window = infer_time_window_from_key(key)
output_type = infer_output_type_from_key(key)
unit = infer_unit_from_key_and_description(key, description)
# Analyze data layer usage
dl_func, dl_module, source_tables = analyze_data_layer_usage(resolver_name)
# Build source info
source = SourceInfo(
resolver=resolver_name,
module="placeholder_resolver.py",
function=dl_func,
data_layer_module=dl_module,
source_tables=source_tables
)
# Build semantic contract (enhanced description)
semantic_contract = build_semantic_contract(key, description, time_window, ptype)
# Format hint
format_hint = build_format_hint(key, unit, output_type)
# Create metadata
metadata = PlaceholderMetadata(
key=key,
placeholder=placeholder_full,
category=category,
type=ptype,
description=description,
semantic_contract=semantic_contract,
unit=unit,
time_window=time_window,
output_type=output_type,
format_hint=format_hint,
example_output=None, # Will be filled at runtime
source=source,
dependencies=['profile_id'], # All placeholders depend on profile_id
used_by=UsedBy(), # Will be filled by usage analysis
version="1.0.0",
deprecated=False,
known_issues=[],
notes=[]
)
metadata_dict[key] = metadata
return metadata_dict
def build_semantic_contract(key: str, description: str, time_window: TimeWindow, ptype: PlaceholderType) -> str:
"""
Build detailed semantic contract from available information.
"""
base = description
# Add time window info
if time_window == TimeWindow.LATEST:
base += " (letzter verfügbarer Wert)"
elif time_window != TimeWindow.UNKNOWN:
base += f" (Zeitfenster: {time_window.value})"
# Add type info
if ptype == PlaceholderType.INTERPRETED:
base += " [KI-interpretiert]"
elif ptype == PlaceholderType.RAW_DATA:
base += " [Strukturierte Rohdaten]"
return base
def build_format_hint(key: str, unit: Optional[str], output_type: OutputType) -> Optional[str]:
"""
Build format hint based on key, unit, and output type.
"""
if output_type == OutputType.JSON:
return "JSON object"
elif output_type == OutputType.MARKDOWN:
return "Markdown-formatted text"
elif output_type == OutputType.DATE:
return "YYYY-MM-DD"
elif unit:
if output_type == OutputType.NUMBER:
return f"12.3 {unit}"
elif output_type == OutputType.INTEGER:
return f"85 {unit}"
else:
return f"Wert {unit}"
else:
if output_type == OutputType.NUMBER:
return "12.3"
elif output_type == OutputType.INTEGER:
return "85"
else:
return "Text"
# ── Usage Analysis ────────────────────────────────────────────────────────────
def analyze_placeholder_usage(profile_id: str) -> Dict[str, UsedBy]:
"""
Analyze where each placeholder is used (prompts, pipelines, charts).
This requires database access to check ai_prompts table.
Returns dict mapping placeholder key to UsedBy object.
"""
from db import get_db, get_cursor, r2d
usage_map: Dict[str, UsedBy] = {}
with get_db() as conn:
cur = get_cursor(conn)
# Get all prompts
cur.execute("SELECT name, template, stages FROM ai_prompts")
prompts = [r2d(row) for row in cur.fetchall()]
# Analyze each prompt
for prompt in prompts:
# Check template
template = prompt.get('template', '')
found_placeholders = re.findall(r'\{\{(\w+)\}\}', template)
for ph_key in found_placeholders:
if ph_key not in usage_map:
usage_map[ph_key] = UsedBy()
if prompt['name'] not in usage_map[ph_key].prompts:
usage_map[ph_key].prompts.append(prompt['name'])
# Check stages (pipeline prompts)
stages = prompt.get('stages')
if stages:
for stage in stages:
for stage_prompt in stage.get('prompts', []):
template = stage_prompt.get('template', '')
found_placeholders = re.findall(r'\{\{(\w+)\}\}', template)
for ph_key in found_placeholders:
if ph_key not in usage_map:
usage_map[ph_key] = UsedBy()
if prompt['name'] not in usage_map[ph_key].pipelines:
usage_map[ph_key].pipelines.append(prompt['name'])
return usage_map
# ── Main Entry Point ──────────────────────────────────────────────────────────
def build_complete_metadata_registry(profile_id: str = None) -> PlaceholderMetadataRegistry:
"""
Build complete metadata registry by extracting from codebase.
Args:
profile_id: Optional profile ID for usage analysis
Returns:
PlaceholderMetadataRegistry with all metadata
"""
from placeholder_resolver import PLACEHOLDER_MAP, get_placeholder_catalog
# Get catalog (use dummy profile if not provided)
if not profile_id:
# Use first available profile or create dummy
from db import get_db, get_cursor
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT id FROM profiles LIMIT 1")
row = cur.fetchone()
profile_id = row['id'] if row else 'dummy'
catalog = get_placeholder_catalog(profile_id)
# Extract base metadata
metadata_dict = extract_metadata_from_placeholder_map(PLACEHOLDER_MAP, catalog)
# Analyze usage
if profile_id != 'dummy':
usage_map = analyze_placeholder_usage(profile_id)
for key, used_by in usage_map.items():
if key in metadata_dict:
metadata_dict[key].used_by = used_by
# Register all metadata
registry = PlaceholderMetadataRegistry()
for metadata in metadata_dict.values():
try:
registry.register(metadata, validate=False) # Don't validate during initial extraction
except Exception as e:
print(f"Warning: Failed to register {metadata.key}: {e}")
return registry
if __name__ == "__main__":
# Test extraction
print("Building metadata registry...")
registry = build_complete_metadata_registry()
print(f"Extracted metadata for {registry.count()} placeholders")
# Show sample
all_metadata = registry.get_all()
if all_metadata:
sample_key = list(all_metadata.keys())[0]
sample = all_metadata[sample_key]
print(f"\nSample metadata for '{sample_key}':")
print(sample.to_json())