- Updated `build_ai_placeholder_caption` in `placeholder_registry.py` to improve the generation of AI context captions by prioritizing descriptions and avoiding redundancy. - Introduced `format_value_with_d_modifier` in `placeholder_resolver.py` to format values with contextual information, enhancing the clarity of exported placeholder values. - Modified `export_placeholder_values` in `prompts.py` to utilize the new formatting function, ensuring that exported data includes both raw values and contextual descriptions. - Added tests for the new formatting function and updated existing tests to ensure accurate caption generation. These changes improve the contextual relevance of placeholder data and enhance the user experience when interacting with exported values.
1792 lines
67 KiB
Python
1792 lines
67 KiB
Python
"""
|
|
AI Prompts Management Endpoints for Mitai Jinkendo
|
|
|
|
Handles prompt template configuration (admin-editable).
|
|
"""
|
|
import os
|
|
import json
|
|
import uuid
|
|
import httpx
|
|
from typing import Optional
|
|
from fastapi import APIRouter, Depends, HTTPException, Query, Header
|
|
from fastapi.responses import StreamingResponse
|
|
|
|
from db import get_db, get_cursor, r2d
|
|
from auth import require_auth, require_admin
|
|
from models import (
|
|
PromptCreate, PromptUpdate, PromptGenerateRequest,
|
|
PipelineConfigCreate, PipelineConfigUpdate
|
|
)
|
|
from placeholder_resolver import (
|
|
resolve_placeholders,
|
|
get_unknown_placeholders,
|
|
get_placeholder_example_values,
|
|
format_value_with_d_modifier,
|
|
get_available_placeholders,
|
|
get_placeholder_catalog
|
|
)
|
|
|
|
# Environment variables
|
|
OPENROUTER_KEY = os.getenv("OPENROUTER_API_KEY")
|
|
OPENROUTER_MODEL = os.getenv("OPENROUTER_MODEL", "anthropic/claude-sonnet-4")
|
|
|
|
router = APIRouter(prefix="/api/prompts", tags=["prompts"])
|
|
|
|
# Metadaten-Schlüssel in workflow aggregate_results (nicht als „einziger“ Nutzer-Output)
|
|
_WORKFLOW_AGG_META_KEYS = frozenset({
|
|
"combined_analysis",
|
|
"all_signals",
|
|
"total_nodes",
|
|
"executed_nodes",
|
|
"failed_nodes",
|
|
"skipped_nodes",
|
|
})
|
|
|
|
|
|
def _workflow_user_facing_content(agg: object) -> str:
|
|
"""
|
|
Nutzer-sichtbarer Text wie im Admin WorkflowResultViewer („Final Output“):
|
|
primär aggregated_result['analysis_core'], nicht das gesamte JSON.
|
|
"""
|
|
if agg is None:
|
|
return ""
|
|
if isinstance(agg, str):
|
|
return agg
|
|
if not isinstance(agg, dict):
|
|
return json.dumps(agg, ensure_ascii=False)
|
|
core = agg.get("analysis_core")
|
|
if isinstance(core, str) and core.strip():
|
|
return core
|
|
combined = agg.get("combined_analysis")
|
|
if isinstance(combined, str) and combined.strip():
|
|
return combined
|
|
non_meta = [k for k in agg.keys() if k not in _WORKFLOW_AGG_META_KEYS]
|
|
if len(non_meta) == 1:
|
|
v = agg[non_meta[0]]
|
|
if isinstance(v, str):
|
|
return v
|
|
return json.dumps(v, ensure_ascii=False)
|
|
return json.dumps(agg, ensure_ascii=False)
|
|
|
|
|
|
@router.get("")
|
|
def list_prompts(session: dict=Depends(require_auth)):
|
|
"""
|
|
List AI prompts.
|
|
- Admins: see ALL prompts (including pipeline and inactive)
|
|
- Users: see only active single-analysis prompts
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
is_admin = session.get('role') == 'admin'
|
|
|
|
if is_admin:
|
|
# Admin sees everything
|
|
cur.execute("SELECT * FROM ai_prompts ORDER BY sort_order, slug")
|
|
else:
|
|
# Users see only active, non-pipeline prompts
|
|
cur.execute("SELECT * FROM ai_prompts WHERE active=true AND slug NOT LIKE 'pipeline_%' ORDER BY sort_order")
|
|
|
|
return [r2d(r) for r in cur.fetchall()]
|
|
|
|
|
|
@router.get("/placeholders")
|
|
def list_placeholders_endpoint(session: dict=Depends(require_auth)):
|
|
"""
|
|
Get grouped catalog of available placeholders with descriptions and examples.
|
|
|
|
Returns:
|
|
Dict mapping category to list of {key, description, example}
|
|
|
|
IMPORTANT: This endpoint MUST be defined BEFORE /{prompt_id} to avoid routing conflict.
|
|
"""
|
|
profile_id = session['profile_id']
|
|
return get_placeholder_catalog(profile_id)
|
|
|
|
|
|
@router.get("/export-all")
|
|
def export_all_prompts(session: dict = Depends(require_admin)):
|
|
"""
|
|
Export all prompts as JSON array.
|
|
Admin only. Used for backup and dev→prod sync.
|
|
|
|
IMPORTANT: Must be defined BEFORE /{prompt_id} route to avoid routing conflict.
|
|
"""
|
|
from datetime import datetime
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM ai_prompts ORDER BY sort_order, slug")
|
|
prompts = [r2d(row) for row in cur.fetchall()]
|
|
|
|
# Convert to export format (clean up DB-specific fields)
|
|
export_data = []
|
|
for p in prompts:
|
|
export_item = {
|
|
'slug': p['slug'],
|
|
'name': p['name'],
|
|
'display_name': p.get('display_name'),
|
|
'description': p.get('description'),
|
|
'type': p.get('type', 'pipeline'),
|
|
'category': p.get('category', 'ganzheitlich'),
|
|
'template': p.get('template'),
|
|
'stages': p.get('stages'),
|
|
'output_format': p.get('output_format', 'text'),
|
|
'output_schema': p.get('output_schema'),
|
|
'question_augmentations': p.get('question_augmentations'),
|
|
'graph_data': p.get('graph_data'),
|
|
'active': p.get('active', True),
|
|
'sort_order': p.get('sort_order', 0)
|
|
}
|
|
export_data.append(export_item)
|
|
|
|
return {
|
|
'export_date': datetime.now().isoformat(),
|
|
'count': len(export_data),
|
|
'prompts': export_data
|
|
}
|
|
|
|
|
|
@router.post("/import")
|
|
def import_prompts(
|
|
data: dict,
|
|
overwrite: bool = False,
|
|
session: dict = Depends(require_admin)
|
|
):
|
|
"""
|
|
Import prompts from JSON export.
|
|
|
|
Args:
|
|
data: Export data from /export-all endpoint
|
|
overwrite: If true, update existing prompts. If false, skip duplicates.
|
|
|
|
Returns:
|
|
Summary of import results (created, updated, skipped)
|
|
|
|
IMPORTANT: Must be defined BEFORE /{prompt_id} route to avoid routing conflict.
|
|
"""
|
|
if 'prompts' not in data:
|
|
raise HTTPException(400, "Invalid import data: missing 'prompts' key")
|
|
|
|
prompts = data['prompts']
|
|
created = 0
|
|
updated = 0
|
|
skipped = 0
|
|
errors = []
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
for p in prompts:
|
|
slug = p.get('slug')
|
|
if not slug:
|
|
errors.append('Prompt without slug skipped')
|
|
continue
|
|
|
|
# Check if exists
|
|
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,))
|
|
existing = cur.fetchone()
|
|
|
|
if existing and not overwrite:
|
|
skipped += 1
|
|
continue
|
|
|
|
# Prepare JSON fields if present
|
|
stages_json = None
|
|
if p.get('stages'):
|
|
stages_json = json.dumps(p['stages']) if isinstance(p['stages'], list) else p['stages']
|
|
|
|
output_schema_json = None
|
|
if p.get('output_schema'):
|
|
output_schema_json = json.dumps(p['output_schema']) if isinstance(p['output_schema'], dict) else p['output_schema']
|
|
|
|
question_aug_json = None
|
|
if p.get('question_augmentations'):
|
|
question_aug_json = json.dumps(p['question_augmentations']) if isinstance(p['question_augmentations'], (dict, list)) else p['question_augmentations']
|
|
|
|
graph_data_json = None
|
|
if p.get('graph_data'):
|
|
graph_data_json = json.dumps(p['graph_data']) if isinstance(p['graph_data'], dict) else p['graph_data']
|
|
|
|
if existing:
|
|
# Update existing
|
|
cur.execute("""
|
|
UPDATE ai_prompts SET
|
|
name=%s, display_name=%s, description=%s, type=%s,
|
|
category=%s, template=%s, stages=%s, output_format=%s,
|
|
output_schema=%s, question_augmentations=%s, graph_data=%s,
|
|
active=%s, sort_order=%s,
|
|
updated=CURRENT_TIMESTAMP
|
|
WHERE slug=%s
|
|
""", (
|
|
p.get('name'), p.get('display_name'), p.get('description'),
|
|
p.get('type', 'pipeline'), p.get('category', 'ganzheitlich'),
|
|
p.get('template'), stages_json, p.get('output_format', 'text'),
|
|
output_schema_json, question_aug_json, graph_data_json,
|
|
p.get('active', True), p.get('sort_order', 0), slug
|
|
))
|
|
updated += 1
|
|
else:
|
|
# Create new
|
|
cur.execute("""
|
|
INSERT INTO ai_prompts (
|
|
slug, name, display_name, description, type, category,
|
|
template, stages, output_format, output_schema,
|
|
question_augmentations, graph_data,
|
|
active, sort_order, created, updated
|
|
) VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,%s,CURRENT_TIMESTAMP,CURRENT_TIMESTAMP)
|
|
""", (
|
|
slug, p.get('name'), p.get('display_name'), p.get('description'),
|
|
p.get('type', 'pipeline'), p.get('category', 'ganzheitlich'),
|
|
p.get('template'), stages_json, p.get('output_format', 'text'),
|
|
output_schema_json, question_aug_json, graph_data_json,
|
|
p.get('active', True), p.get('sort_order', 0)
|
|
))
|
|
created += 1
|
|
|
|
conn.commit()
|
|
|
|
return {
|
|
'success': True,
|
|
'created': created,
|
|
'updated': updated,
|
|
'skipped': skipped,
|
|
'errors': errors if errors else None
|
|
}
|
|
|
|
|
|
@router.get("/{prompt_id}")
|
|
def get_prompt(prompt_id: str, session: dict=Depends(require_auth)):
|
|
"""Get single AI prompt by ID (UUID)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM ai_prompts WHERE id=%s", (prompt_id,))
|
|
row = cur.fetchone()
|
|
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Prompt not found")
|
|
|
|
return r2d(row)
|
|
|
|
|
|
@router.post("")
|
|
def create_prompt(p: PromptCreate, session: dict=Depends(require_admin)):
|
|
"""Create new AI prompt (admin only)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Check if slug already exists
|
|
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (p.slug,))
|
|
if cur.fetchone():
|
|
raise HTTPException(status_code=400, detail=f"Prompt with slug '{p.slug}' already exists")
|
|
|
|
prompt_id = str(uuid.uuid4())
|
|
cur.execute(
|
|
"""INSERT INTO ai_prompts (id, name, slug, display_name, description, template, category, active, sort_order, created, updated)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""",
|
|
(prompt_id, p.name, p.slug, p.display_name or p.name, p.description, p.template, p.category, p.active, p.sort_order)
|
|
)
|
|
|
|
return {"id": prompt_id, "slug": p.slug}
|
|
|
|
|
|
@router.put("/{prompt_id}")
|
|
def update_prompt(prompt_id: str, p: PromptUpdate, session: dict=Depends(require_admin)):
|
|
"""Update AI prompt template (admin only)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Build dynamic UPDATE query
|
|
updates = []
|
|
values = []
|
|
|
|
if p.name is not None:
|
|
updates.append('name=%s')
|
|
values.append(p.name)
|
|
if p.display_name is not None:
|
|
updates.append('display_name=%s')
|
|
values.append(p.display_name)
|
|
if p.description is not None:
|
|
updates.append('description=%s')
|
|
values.append(p.description)
|
|
if p.template is not None:
|
|
updates.append('template=%s')
|
|
values.append(p.template)
|
|
if p.category is not None:
|
|
updates.append('category=%s')
|
|
values.append(p.category)
|
|
if p.active is not None:
|
|
updates.append('active=%s')
|
|
values.append(p.active)
|
|
if p.sort_order is not None:
|
|
updates.append('sort_order=%s')
|
|
values.append(p.sort_order)
|
|
|
|
if not updates:
|
|
return {"ok": True}
|
|
|
|
cur.execute(
|
|
f"UPDATE ai_prompts SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s",
|
|
values + [prompt_id]
|
|
)
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.delete("/{prompt_id}")
|
|
def delete_prompt(prompt_id: str, session: dict=Depends(require_admin)):
|
|
"""Delete AI prompt (admin only)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("DELETE FROM ai_prompts WHERE id=%s", (prompt_id,))
|
|
|
|
if cur.rowcount == 0:
|
|
raise HTTPException(status_code=404, detail="Prompt not found")
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/{prompt_id}/duplicate")
|
|
def duplicate_prompt(prompt_id: str, session: dict=Depends(require_admin)):
|
|
"""Duplicate an existing prompt (admin only)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Load original prompt
|
|
cur.execute("SELECT * FROM ai_prompts WHERE id=%s", (prompt_id,))
|
|
original = r2d(cur.fetchone())
|
|
|
|
if not original:
|
|
raise HTTPException(status_code=404, detail="Prompt not found")
|
|
|
|
# Create duplicate with new ID and modified name/slug
|
|
new_id = str(uuid.uuid4())
|
|
new_name = f"{original['name']} (Kopie)"
|
|
new_slug = f"{original['slug']}_copy_{uuid.uuid4().hex[:6]}"
|
|
|
|
new_display_name = f"{original.get('display_name') or original['name']} (Kopie)"
|
|
|
|
# Prepare JSONB fields (convert dict/list to JSON string if needed)
|
|
stages_json = json.dumps(original['stages']) if original.get('stages') else None
|
|
output_schema_json = json.dumps(original['output_schema']) if original.get('output_schema') else None
|
|
question_aug_json = json.dumps(original['question_augmentations']) if original.get('question_augmentations') else None
|
|
graph_data_json = json.dumps(original['graph_data']) if original.get('graph_data') else None
|
|
|
|
cur.execute(
|
|
"""INSERT INTO ai_prompts (
|
|
id, name, slug, display_name, description, template, category,
|
|
type, stages, output_format, output_schema,
|
|
question_augmentations, graph_data,
|
|
active, sort_order, created, updated
|
|
) VALUES (
|
|
%s, %s, %s, %s, %s, %s, %s,
|
|
%s, %s, %s, %s,
|
|
%s, %s,
|
|
%s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP
|
|
)""",
|
|
(new_id, new_name, new_slug, new_display_name,
|
|
original.get('description'), original.get('template'),
|
|
original.get('category', 'ganzheitlich'),
|
|
original.get('type', 'pipeline'), stages_json,
|
|
original.get('output_format', 'text'), output_schema_json,
|
|
question_aug_json, graph_data_json,
|
|
original.get('active', True), original.get('sort_order', 0))
|
|
)
|
|
|
|
return {"id": new_id, "slug": new_slug, "name": new_name}
|
|
|
|
|
|
@router.put("/reorder")
|
|
def reorder_prompts(order: list[str], session: dict=Depends(require_admin)):
|
|
"""
|
|
Reorder prompts by providing list of IDs in desired order.
|
|
|
|
Args:
|
|
order: List of prompt IDs in new order
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
for idx, prompt_id in enumerate(order):
|
|
cur.execute(
|
|
"UPDATE ai_prompts SET sort_order=%s WHERE id=%s",
|
|
(idx, prompt_id)
|
|
)
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/preview")
|
|
def preview_prompt(data: dict, session: dict=Depends(require_auth)):
|
|
"""
|
|
Preview a prompt template with real user data (without calling AI).
|
|
|
|
Args:
|
|
data: {"template": "Your template with {{placeholders}}"}
|
|
|
|
Returns:
|
|
{
|
|
"resolved": "Template with replaced placeholders",
|
|
"unknown_placeholders": ["list", "of", "unknown"]
|
|
}
|
|
"""
|
|
template = data.get('template', '')
|
|
profile_id = session['profile_id']
|
|
|
|
resolved = resolve_placeholders(template, profile_id)
|
|
unknown = get_unknown_placeholders(template)
|
|
|
|
return {
|
|
"resolved": resolved,
|
|
"unknown_placeholders": unknown
|
|
}
|
|
|
|
|
|
@router.get("/placeholders")
|
|
def list_placeholders(session: dict=Depends(require_auth)):
|
|
"""
|
|
Get grouped catalog of available placeholders with descriptions and examples.
|
|
|
|
Returns:
|
|
Dict mapping category to list of {key, description, example}
|
|
"""
|
|
profile_id = session['profile_id']
|
|
return get_placeholder_catalog(profile_id)
|
|
|
|
|
|
@router.get("/placeholders/export-values")
|
|
def export_placeholder_values(session: dict = Depends(require_auth)):
|
|
"""
|
|
Export all available placeholders with their current resolved values.
|
|
|
|
Pro Zeile: value = Rohwert wie bei {{key}}, example = Vorschau wie bei {{key|d}}
|
|
(Wert — ai_caption bzw. description). JSON-Download für das aktive Profil.
|
|
"""
|
|
from datetime import datetime
|
|
profile_id = session['profile_id']
|
|
|
|
# Get all resolved placeholder values
|
|
resolved_values = get_placeholder_example_values(profile_id)
|
|
|
|
# Clean up keys (remove {{ }})
|
|
cleaned_values = {
|
|
key.replace('{{', '').replace('}}', ''): value
|
|
for key, value in resolved_values.items()
|
|
}
|
|
|
|
# Get catalog for metadata
|
|
catalog = get_placeholder_catalog(profile_id)
|
|
|
|
# Organize by category with metadata
|
|
export_data = {
|
|
'export_date': datetime.now().isoformat(),
|
|
'profile_id': profile_id,
|
|
'placeholders_by_category': {}
|
|
}
|
|
|
|
for category, items in catalog.items():
|
|
export_data['placeholders_by_category'][category] = []
|
|
for item in items:
|
|
key = item['key'].replace('{{', '').replace('}}', '')
|
|
raw_val = cleaned_values.get(key, 'nicht verfügbar')
|
|
row = {
|
|
'key': item['key'],
|
|
'description': item['description'],
|
|
'value': raw_val,
|
|
'example': format_value_with_d_modifier(str(raw_val), item),
|
|
}
|
|
if item.get('ai_caption'):
|
|
row['ai_caption'] = item['ai_caption']
|
|
export_data['placeholders_by_category'][category].append(row)
|
|
|
|
# Also include flat list for easy access
|
|
export_data['all_placeholders'] = cleaned_values
|
|
export_data['count'] = len(cleaned_values)
|
|
|
|
return export_data
|
|
|
|
|
|
@router.get("/placeholders/export-values-extended")
|
|
def export_placeholder_values_extended(
|
|
token: Optional[str] = Query(None),
|
|
x_auth_token: Optional[str] = Header(default=None)
|
|
):
|
|
"""
|
|
Extended placeholder export with complete normative metadata V2.
|
|
|
|
Returns structured export with:
|
|
- Legacy format (for backward compatibility)
|
|
- Complete metadata per placeholder (normative standard V2)
|
|
- Quality assurance metrics
|
|
- Summary statistics
|
|
- Gap report
|
|
- Validation results
|
|
|
|
V2 implements strict quality controls:
|
|
- Correct value_raw extraction
|
|
- Accurate unit inference
|
|
- Precise time_window detection
|
|
- Real source provenance
|
|
- Quality filter policies for activity placeholders
|
|
|
|
Token can be passed via:
|
|
- Header: X-Auth-Token
|
|
- Query param: ?token=xxx (for direct access/downloads)
|
|
"""
|
|
from datetime import datetime
|
|
from placeholder_metadata_extractor import build_complete_metadata_registry
|
|
from generate_complete_metadata_v2 import apply_enhanced_corrections
|
|
from auth import get_session
|
|
|
|
# Accept token from query param OR header
|
|
auth_token = token or x_auth_token
|
|
session = get_session(auth_token)
|
|
|
|
if not session:
|
|
raise HTTPException(401, "Nicht eingeloggt")
|
|
|
|
profile_id = session['profile_id']
|
|
|
|
# Get legacy export (for compatibility)
|
|
resolved_values = get_placeholder_example_values(profile_id)
|
|
cleaned_values = {
|
|
key.replace('{{', '').replace('}}', ''): value
|
|
for key, value in resolved_values.items()
|
|
}
|
|
catalog = get_placeholder_catalog(profile_id)
|
|
|
|
# Build complete metadata registry with V2 enhancements
|
|
try:
|
|
registry = build_complete_metadata_registry(profile_id)
|
|
registry = apply_enhanced_corrections(registry) # V2: Enhanced quality controls
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to build metadata registry: {str(e)}"
|
|
)
|
|
|
|
# Get all metadata
|
|
all_metadata = registry.get_all()
|
|
|
|
# Populate runtime values with V2 enhanced extraction
|
|
from placeholder_metadata_enhanced import extract_value_raw as extract_value_raw_v2
|
|
|
|
for key, metadata in all_metadata.items():
|
|
if key in cleaned_values:
|
|
value = cleaned_values[key]
|
|
metadata.value_display = str(value)
|
|
|
|
# V2: Use enhanced extraction logic
|
|
raw_val, success = extract_value_raw_v2(
|
|
str(value),
|
|
metadata.output_type,
|
|
metadata.type
|
|
)
|
|
if success:
|
|
metadata.value_raw = raw_val
|
|
else:
|
|
metadata.value_raw = None
|
|
if 'value_raw' not in metadata.unresolved_fields:
|
|
metadata.unresolved_fields.append('value_raw')
|
|
|
|
# Check availability (Resolver liefert oft „nicht verfügbar — <Grund>“)
|
|
sv = str(value)
|
|
if (
|
|
sv in ['nicht verfügbar', 'nicht genug Daten']
|
|
or sv.startswith('nicht verfügbar —')
|
|
or sv.startswith('[Fehler:')
|
|
or sv.startswith('[Nicht')
|
|
):
|
|
metadata.available = False
|
|
metadata.missing_reason = value
|
|
else:
|
|
metadata.available = False
|
|
metadata.missing_reason = "Placeholder not in resolver output"
|
|
|
|
# Generate gap report (collect unresolved fields)
|
|
# Note: TimeWindow, OutputType, PlaceholderType are from old metadata system
|
|
# Skip gap report for old metadata if not available
|
|
gaps = {}
|
|
try:
|
|
from placeholder_metadata_complete import TimeWindow, OutputType, PlaceholderType
|
|
gaps = {
|
|
'unknown_time_window': [k for k, m in all_metadata.items() if hasattr(m, 'time_window') and m.time_window == TimeWindow.UNKNOWN],
|
|
'unknown_output_type': [k for k, m in all_metadata.items() if hasattr(m, 'output_type') and m.output_type == OutputType.UNKNOWN],
|
|
'legacy_unknown_type': [k for k, m in all_metadata.items() if hasattr(m, 'type') and m.type == PlaceholderType.LEGACY_UNKNOWN],
|
|
'unresolved_fields': {k: m.unresolved_fields for k, m in all_metadata.items() if hasattr(m, 'unresolved_fields') and m.unresolved_fields},
|
|
'legacy_mismatches': [k for k, m in all_metadata.items() if hasattr(m, 'legacy_contract_mismatch') and m.legacy_contract_mismatch],
|
|
'orphaned': [k for k, m in all_metadata.items() if hasattr(m, 'orphaned_placeholder') and m.orphaned_placeholder],
|
|
}
|
|
except ImportError:
|
|
# Old metadata system not available, use empty gaps
|
|
gaps = {
|
|
'unknown_time_window': [],
|
|
'unknown_output_type': [],
|
|
'legacy_unknown_type': [],
|
|
'unresolved_fields': {},
|
|
'legacy_mismatches': [],
|
|
'orphaned': [],
|
|
}
|
|
|
|
# Validation
|
|
validation_results = registry.validate_all()
|
|
|
|
# Build extended export
|
|
export_data = {
|
|
"schema_version": "1.0.0",
|
|
"export_date": datetime.now().isoformat(),
|
|
"profile_id": profile_id,
|
|
|
|
# Legacy format (backward compatibility)
|
|
"legacy": {
|
|
"all_placeholders": cleaned_values,
|
|
"placeholders_by_category": {},
|
|
"count": len(cleaned_values)
|
|
},
|
|
|
|
# Complete metadata
|
|
"metadata": {
|
|
"flat": [],
|
|
"by_category": {},
|
|
"summary": {},
|
|
"gaps": gaps
|
|
},
|
|
|
|
# Validation
|
|
"validation": {
|
|
"compliant": 0,
|
|
"non_compliant": 0,
|
|
"issues": []
|
|
}
|
|
}
|
|
|
|
# Fill legacy by_category
|
|
for category, items in catalog.items():
|
|
export_data['legacy']['placeholders_by_category'][category] = []
|
|
for item in items:
|
|
key = item['key'].replace('{{', '').replace('}}', '')
|
|
raw_val = cleaned_values.get(key, 'nicht verfügbar')
|
|
export_data['legacy']['placeholders_by_category'][category].append({
|
|
'key': item['key'],
|
|
'description': item['description'],
|
|
'value': raw_val,
|
|
'example': format_value_with_d_modifier(str(raw_val), item),
|
|
})
|
|
|
|
# Fill metadata flat
|
|
for key, metadata in sorted(all_metadata.items()):
|
|
export_data['metadata']['flat'].append(metadata.to_dict())
|
|
|
|
# Fill metadata by_category
|
|
by_category = registry.get_by_category()
|
|
for category, metadata_list in by_category.items():
|
|
export_data['metadata']['by_category'][category] = [
|
|
m.to_dict() for m in metadata_list
|
|
]
|
|
|
|
# Fill summary with V2 QA metrics
|
|
total = len(all_metadata)
|
|
available = sum(1 for m in all_metadata.values() if m.available)
|
|
missing = total - available
|
|
|
|
by_type = {}
|
|
by_schema_status = {}
|
|
for metadata in all_metadata.values():
|
|
ptype = metadata.type.value
|
|
by_type[ptype] = by_type.get(ptype, 0) + 1
|
|
|
|
status = metadata.schema_status
|
|
by_schema_status[status] = by_schema_status.get(status, 0) + 1
|
|
|
|
# Calculate average completeness
|
|
avg_completeness = sum(m.metadata_completeness_score for m in all_metadata.values()) / total if total > 0 else 0
|
|
|
|
# Count QA metrics
|
|
legacy_mismatches = sum(1 for m in all_metadata.values() if m.legacy_contract_mismatch)
|
|
orphaned = sum(1 for m in all_metadata.values() if m.orphaned_placeholder)
|
|
has_quality_filter = sum(1 for m in all_metadata.values() if m.quality_filter_policy)
|
|
has_confidence = sum(1 for m in all_metadata.values() if m.confidence_logic)
|
|
|
|
export_data['metadata']['summary'] = {
|
|
"total_placeholders": total,
|
|
"available": available,
|
|
"missing": missing,
|
|
"by_type": by_type,
|
|
"by_schema_status": by_schema_status,
|
|
"quality_metrics": {
|
|
"average_completeness_score": round(avg_completeness, 1),
|
|
"legacy_mismatches": legacy_mismatches,
|
|
"orphaned": orphaned,
|
|
"with_quality_filter": has_quality_filter,
|
|
"with_confidence_logic": has_confidence
|
|
},
|
|
"coverage": {
|
|
"time_window_unknown": len(gaps.get('unknown_time_window', [])),
|
|
"output_type_unknown": len(gaps.get('unknown_output_type', [])),
|
|
"legacy_unknown_type": len(gaps.get('legacy_unknown_type', [])),
|
|
"with_unresolved_fields": len(gaps.get('unresolved_fields', {}))
|
|
}
|
|
}
|
|
|
|
# ── PART A: Registry Integration ─────────────────────────────────────────
|
|
# Add registry metadata for Part A placeholders (kcal_avg, protein_avg, carb_avg, fat_avg)
|
|
try:
|
|
import placeholder_registrations # Auto-registers Part A placeholders
|
|
from placeholder_registry_export import get_registry_metadata_for_export
|
|
|
|
registry_data = get_registry_metadata_for_export(profile_id)
|
|
export_data['registry_metadata'] = registry_data
|
|
except Exception as e:
|
|
# Graceful degradation if registry not available
|
|
export_data['registry_metadata'] = {
|
|
"error": f"Registry not available: {str(e)}",
|
|
"flat": [],
|
|
"by_category": {},
|
|
"evidence_report": {},
|
|
"validation_report": {}
|
|
}
|
|
|
|
# Fill validation
|
|
for key, violations in validation_results.items():
|
|
errors = [v for v in violations if v.severity == "error"]
|
|
if errors:
|
|
export_data['validation']['non_compliant'] += 1
|
|
export_data['validation']['issues'].append({
|
|
"placeholder": key,
|
|
"violations": [
|
|
{"field": v.field, "issue": v.issue, "severity": v.severity}
|
|
for v in violations
|
|
]
|
|
})
|
|
else:
|
|
export_data['validation']['compliant'] += 1
|
|
|
|
return export_data
|
|
|
|
|
|
@router.get("/placeholders/export-catalog-zip")
|
|
def export_placeholder_catalog_zip(
|
|
token: Optional[str] = Query(None),
|
|
x_auth_token: Optional[str] = Header(default=None)
|
|
):
|
|
"""
|
|
Export complete placeholder catalog as ZIP file.
|
|
|
|
Includes:
|
|
- PLACEHOLDER_CATALOG_EXTENDED.json
|
|
- PLACEHOLDER_CATALOG_EXTENDED.md
|
|
- PLACEHOLDER_GAP_REPORT.md
|
|
- PLACEHOLDER_EXPORT_SPEC.md
|
|
|
|
This generates the files on-the-fly and returns as ZIP.
|
|
Admin only.
|
|
|
|
Token can be passed via:
|
|
- Header: X-Auth-Token
|
|
- Query param: ?token=xxx (for browser downloads)
|
|
"""
|
|
import io
|
|
import zipfile
|
|
from datetime import datetime
|
|
from generate_placeholder_catalog import (
|
|
generate_json_catalog,
|
|
generate_markdown_catalog,
|
|
generate_gap_report_md,
|
|
generate_export_spec_md
|
|
)
|
|
from placeholder_metadata_extractor import build_complete_metadata_registry
|
|
from generate_complete_metadata import apply_manual_corrections, generate_gap_report
|
|
from auth import get_session
|
|
|
|
# Accept token from query param OR header
|
|
auth_token = token or x_auth_token
|
|
session = get_session(auth_token)
|
|
|
|
if not session:
|
|
raise HTTPException(401, "Nicht eingeloggt")
|
|
if session['role'] != 'admin':
|
|
raise HTTPException(403, "Nur für Admins")
|
|
|
|
profile_id = session['profile_id']
|
|
|
|
try:
|
|
# Build registry
|
|
registry = build_complete_metadata_registry(profile_id)
|
|
registry = apply_manual_corrections(registry)
|
|
gaps = generate_gap_report(registry)
|
|
|
|
# Create in-memory ZIP
|
|
zip_buffer = io.BytesIO()
|
|
|
|
with zipfile.ZipFile(zip_buffer, 'w', zipfile.ZIP_DEFLATED) as zip_file:
|
|
# Generate each file content in memory and add to ZIP
|
|
|
|
# 1. JSON Catalog
|
|
all_metadata = registry.get_all()
|
|
json_catalog = {
|
|
"schema_version": "1.0.0",
|
|
"generated_at": datetime.now().isoformat(),
|
|
"normative_standard": "PLACEHOLDER_METADATA_REQUIREMENTS_V2_NORMATIVE.md",
|
|
"total_placeholders": len(all_metadata),
|
|
"placeholders": {key: meta.to_dict() for key, meta in sorted(all_metadata.items())}
|
|
}
|
|
zip_file.writestr(
|
|
'PLACEHOLDER_CATALOG_EXTENDED.json',
|
|
json.dumps(json_catalog, indent=2, ensure_ascii=False)
|
|
)
|
|
|
|
# 2. Markdown Catalog (simplified version)
|
|
by_category = registry.get_by_category()
|
|
md_lines = [
|
|
"# Placeholder Catalog (Extended)",
|
|
"",
|
|
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
f"**Total Placeholders:** {len(all_metadata)}",
|
|
"",
|
|
"## Placeholders by Category",
|
|
""
|
|
]
|
|
|
|
for category, metadata_list in sorted(by_category.items()):
|
|
md_lines.append(f"### {category} ({len(metadata_list)} placeholders)")
|
|
md_lines.append("")
|
|
for metadata in sorted(metadata_list, key=lambda m: m.key):
|
|
md_lines.append(f"- `{{{{{metadata.key}}}}}` - {metadata.description}")
|
|
md_lines.append("")
|
|
|
|
zip_file.writestr('PLACEHOLDER_CATALOG_EXTENDED.md', '\n'.join(md_lines))
|
|
|
|
# 3. Gap Report
|
|
gap_lines = [
|
|
"# Placeholder Metadata Gap Report",
|
|
"",
|
|
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
f"**Total Placeholders:** {len(all_metadata)}",
|
|
"",
|
|
"## Gaps Summary",
|
|
""
|
|
]
|
|
|
|
for gap_type, placeholders in sorted(gaps.items()):
|
|
if placeholders:
|
|
gap_lines.append(f"### {gap_type.replace('_', ' ').title()}")
|
|
gap_lines.append(f"Count: {len(placeholders)}")
|
|
gap_lines.append("")
|
|
for ph in placeholders[:10]: # Max 10 per type
|
|
gap_lines.append(f"- {{{{{ph}}}}}")
|
|
if len(placeholders) > 10:
|
|
gap_lines.append(f"- ... and {len(placeholders) - 10} more")
|
|
gap_lines.append("")
|
|
|
|
zip_file.writestr('PLACEHOLDER_GAP_REPORT.md', '\n'.join(gap_lines))
|
|
|
|
# 4. Export Spec (simplified)
|
|
spec_lines = [
|
|
"# Placeholder Export Specification",
|
|
"",
|
|
f"**Version:** 1.0.0",
|
|
f"**Generated:** {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}",
|
|
"",
|
|
"## API Endpoints",
|
|
"",
|
|
"### Extended Export",
|
|
"",
|
|
"```",
|
|
"GET /api/prompts/placeholders/export-values-extended",
|
|
"Header: X-Auth-Token: <token>",
|
|
"```",
|
|
"",
|
|
"Returns complete metadata for all 116 placeholders.",
|
|
"",
|
|
"### ZIP Export (Admin)",
|
|
"",
|
|
"```",
|
|
"GET /api/prompts/placeholders/export-catalog-zip",
|
|
"Header: X-Auth-Token: <token>",
|
|
"```",
|
|
"",
|
|
"Returns ZIP with all catalog files.",
|
|
]
|
|
|
|
zip_file.writestr('PLACEHOLDER_EXPORT_SPEC.md', '\n'.join(spec_lines))
|
|
|
|
# Prepare ZIP for download
|
|
zip_buffer.seek(0)
|
|
|
|
filename = f"placeholder-catalog-{datetime.now().strftime('%Y-%m-%d')}.zip"
|
|
|
|
return StreamingResponse(
|
|
io.BytesIO(zip_buffer.read()),
|
|
media_type="application/zip",
|
|
headers={
|
|
"Content-Disposition": f"attachment; filename={filename}"
|
|
}
|
|
)
|
|
|
|
except Exception as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to generate ZIP: {str(e)}"
|
|
)
|
|
|
|
|
|
# ── KI-Assisted Prompt Engineering ───────────────────────────────────────────
|
|
|
|
async def call_openrouter(prompt: str, max_tokens: int = 1500) -> str:
|
|
"""Call OpenRouter API to get AI response."""
|
|
if not OPENROUTER_KEY:
|
|
raise HTTPException(status_code=500, detail="OpenRouter API key not configured")
|
|
|
|
async with httpx.AsyncClient() as client:
|
|
resp = await client.post(
|
|
"https://openrouter.ai/api/v1/chat/completions",
|
|
headers={"Authorization": f"Bearer {OPENROUTER_KEY}"},
|
|
json={
|
|
"model": OPENROUTER_MODEL,
|
|
"messages": [{"role": "user", "content": prompt}],
|
|
"max_tokens": max_tokens
|
|
},
|
|
timeout=60.0
|
|
)
|
|
|
|
if resp.status_code != 200:
|
|
raise HTTPException(status_code=resp.status_code, detail=f"OpenRouter API error: {resp.text}")
|
|
|
|
return resp.json()['choices'][0]['message']['content'].strip()
|
|
|
|
|
|
def collect_example_data(profile_id: str, data_categories: list[str]) -> dict:
|
|
"""Collect example data from user's profile for specified categories."""
|
|
example_data = {}
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Profil
|
|
cur.execute("SELECT * FROM profiles WHERE id=%s", (profile_id,))
|
|
profile = r2d(cur.fetchone())
|
|
example_data['profil'] = {
|
|
'name': profile.get('name', 'Nutzer'),
|
|
'age': profile.get('dob', 'unbekannt'),
|
|
'height': profile.get('height', 'unbekannt'),
|
|
'sex': profile.get('sex', 'unbekannt')
|
|
}
|
|
|
|
# Körper
|
|
if 'körper' in data_categories:
|
|
cur.execute(
|
|
"SELECT weight, date FROM weight_log WHERE profile_id=%s ORDER BY date DESC LIMIT 3",
|
|
(profile_id,)
|
|
)
|
|
weights = [r2d(r) for r in cur.fetchall()]
|
|
example_data['körper'] = {
|
|
'weight_entries': weights,
|
|
'latest_weight': f"{weights[0]['weight']:.1f} kg" if weights else "nicht verfügbar"
|
|
}
|
|
|
|
# Ernährung
|
|
if 'ernährung' in data_categories:
|
|
cur.execute(
|
|
"""SELECT kcal, protein, carb, fat, date FROM nutrition_log
|
|
WHERE profile_id=%s ORDER BY date DESC LIMIT 3""",
|
|
(profile_id,)
|
|
)
|
|
nutrition = [r2d(r) for r in cur.fetchall()]
|
|
example_data['ernährung'] = {
|
|
'recent_entries': nutrition
|
|
}
|
|
|
|
# Training
|
|
if 'training' in data_categories:
|
|
cur.execute(
|
|
"""SELECT activity_type, duration_min, kcal_active, date FROM activity_log
|
|
WHERE profile_id=%s ORDER BY date DESC LIMIT 5""",
|
|
(profile_id,)
|
|
)
|
|
activities = [r2d(r) for r in cur.fetchall()]
|
|
example_data['training'] = {
|
|
'recent_activities': activities
|
|
}
|
|
|
|
return example_data
|
|
|
|
|
|
@router.post("/generate")
|
|
async def generate_prompt(req: PromptGenerateRequest, session: dict=Depends(require_admin)):
|
|
"""
|
|
Generate AI prompt using KI based on user's goal description.
|
|
|
|
This is a meta-feature: KI helps create better prompts for KI analysis.
|
|
"""
|
|
profile_id = session['profile_id']
|
|
|
|
# Collect example data
|
|
example_data = collect_example_data(profile_id, req.data_categories)
|
|
|
|
# Get available placeholders for selected categories
|
|
available_placeholders = get_available_placeholders(req.data_categories)
|
|
placeholders_list = []
|
|
for cat, phs in available_placeholders.items():
|
|
placeholders_list.extend(phs)
|
|
|
|
# Build meta-prompt for prompt generation
|
|
meta_prompt = f"""Du bist ein Experte für Prompt-Engineering im Bereich Fitness & Gesundheit.
|
|
|
|
**Aufgabe:**
|
|
Erstelle einen optimalen KI-Prompt für folgendes Analyseziel:
|
|
"{req.goal}"
|
|
|
|
**Verfügbare Datenbereiche:**
|
|
{', '.join(req.data_categories)}
|
|
|
|
**Beispieldaten (aktuelle Werte des Nutzers):**
|
|
```json
|
|
{json.dumps(example_data, indent=2, ensure_ascii=False)}
|
|
```
|
|
|
|
**Verfügbare Platzhalter:**
|
|
{', '.join(placeholders_list)}
|
|
|
|
**Anforderungen an den Prompt:**
|
|
1. Nutze relevante Platzhalter ({{{{platzhalter_name}}}}) - diese werden durch echte Daten ersetzt
|
|
2. Sei spezifisch und klar in den Anweisungen
|
|
3. Fordere strukturierte Antworten (z.B. Abschnitte, Bullet Points)
|
|
4. Gib der KI Kontext über ihre Rolle/Expertise (z.B. "Du bist ein Sportwissenschaftler")
|
|
5. Fordere konkrete, umsetzbare Handlungsempfehlungen
|
|
6. Sprache: Deutsch
|
|
7. Der Prompt sollte 150-300 Wörter lang sein
|
|
|
|
{f'**Gewünschtes Antwort-Format:**\\n{req.example_output}' if req.example_output else ''}
|
|
|
|
**Generiere jetzt NUR den Prompt-Text (keine Erklärung, keine Metakommentare):**
|
|
"""
|
|
|
|
# Call AI to generate prompt
|
|
generated_prompt = await call_openrouter(meta_prompt, max_tokens=1000)
|
|
|
|
# Extract placeholders used
|
|
import re
|
|
placeholders_used = list(set(re.findall(r'\{\{(\w+)\}\}', generated_prompt)))
|
|
|
|
# Generate title from goal
|
|
title = generate_title_from_goal(req.goal)
|
|
|
|
# Infer category
|
|
category = infer_category(req.data_categories)
|
|
|
|
return {
|
|
"template": generated_prompt,
|
|
"placeholders_used": placeholders_used,
|
|
"example_data": example_data,
|
|
"suggested_title": title,
|
|
"suggested_category": category
|
|
}
|
|
|
|
|
|
def generate_title_from_goal(goal: str) -> str:
|
|
"""Generate a title from the goal description."""
|
|
goal_lower = goal.lower()
|
|
|
|
# Simple keyword matching
|
|
if 'protein' in goal_lower:
|
|
return 'Protein-Analyse'
|
|
elif 'gewicht' in goal_lower or 'abnehmen' in goal_lower:
|
|
return 'Gewichtstrend-Analyse'
|
|
elif 'training' in goal_lower or 'aktivität' in goal_lower:
|
|
return 'Trainingsanalyse'
|
|
elif 'schlaf' in goal_lower:
|
|
return 'Schlaf-Analyse'
|
|
elif 'regeneration' in goal_lower or 'erholung' in goal_lower:
|
|
return 'Regenerations-Analyse'
|
|
elif 'kraft' in goal_lower or 'muskel' in goal_lower:
|
|
return 'Kraftentwicklung'
|
|
elif 'ausdauer' in goal_lower or 'cardio' in goal_lower:
|
|
return 'Ausdauer-Analyse'
|
|
else:
|
|
return 'Neue Analyse'
|
|
|
|
|
|
def infer_category(data_categories: list[str]) -> str:
|
|
"""Infer prompt category from selected data categories."""
|
|
if len(data_categories) == 1:
|
|
return data_categories[0]
|
|
elif len(data_categories) > 2:
|
|
return 'ganzheitlich'
|
|
else:
|
|
# 2 categories: prefer the first one
|
|
return data_categories[0] if data_categories else 'ganzheitlich'
|
|
|
|
|
|
@router.post("/{prompt_id}/optimize")
|
|
async def optimize_prompt(prompt_id: str, session: dict=Depends(require_admin)):
|
|
"""
|
|
Analyze and optimize an existing prompt using KI.
|
|
|
|
Returns suggestions for improvement with score, strengths, weaknesses,
|
|
and an optimized version of the prompt.
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT * FROM ai_prompts WHERE id=%s", (prompt_id,))
|
|
prompt = r2d(cur.fetchone())
|
|
|
|
if not prompt:
|
|
raise HTTPException(status_code=404, detail="Prompt not found")
|
|
|
|
# Build meta-prompt for optimization
|
|
meta_prompt = f"""Du bist ein Experte für Prompt-Engineering.
|
|
|
|
**Analysiere folgenden KI-Prompt und schlage Verbesserungen vor:**
|
|
|
|
```
|
|
{prompt['template']}
|
|
```
|
|
|
|
**Analysiere folgende Aspekte:**
|
|
1. **Klarheit & Präzision:** Ist die Anweisung klar und eindeutig?
|
|
2. **Struktur & Lesbarkeit:** Ist der Prompt gut strukturiert?
|
|
3. **Platzhalter-Nutzung:** Werden relevante Platzhalter genutzt? Fehlen wichtige Daten?
|
|
4. **Antwort-Format:** Wird eine strukturierte Ausgabe gefordert?
|
|
5. **Kontext:** Hat die KI genug Rollenkontext (z.B. "Du bist ein Ernährungsexperte")?
|
|
6. **Handlungsempfehlungen:** Werden konkrete, umsetzbare Schritte gefordert?
|
|
|
|
**Gib deine Analyse als JSON zurück (NUR das JSON, keine zusätzlichen Kommentare):**
|
|
|
|
```json
|
|
{{
|
|
"score": 0-100,
|
|
"strengths": ["Stärke 1", "Stärke 2", "Stärke 3"],
|
|
"weaknesses": ["Schwäche 1", "Schwäche 2"],
|
|
"optimized_prompt": "Vollständig optimierte Version des Prompts",
|
|
"changes_summary": "Kurze Zusammenfassung was verbessert wurde (2-3 Sätze)"
|
|
}}
|
|
```
|
|
|
|
**Wichtig:**
|
|
- Die optimierte Version sollte alle Platzhalter beibehalten und ggf. ergänzen
|
|
- Sprache: Deutsch
|
|
- Der optimierte Prompt sollte 150-400 Wörter lang sein
|
|
"""
|
|
|
|
# Call AI for optimization
|
|
response = await call_openrouter(meta_prompt, max_tokens=1500)
|
|
|
|
# Parse JSON response
|
|
try:
|
|
# Extract JSON from markdown code blocks if present
|
|
if '```json' in response:
|
|
json_start = response.find('```json') + 7
|
|
json_end = response.find('```', json_start)
|
|
json_str = response[json_start:json_end].strip()
|
|
elif '```' in response:
|
|
json_start = response.find('```') + 3
|
|
json_end = response.find('```', json_start)
|
|
json_str = response[json_start:json_end].strip()
|
|
else:
|
|
json_str = response
|
|
|
|
analysis = json.loads(json_str)
|
|
|
|
except json.JSONDecodeError as e:
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"Failed to parse AI response as JSON: {str(e)}. Response: {response[:200]}"
|
|
)
|
|
|
|
# Ensure required fields
|
|
if not all(k in analysis for k in ['score', 'strengths', 'weaknesses', 'optimized_prompt', 'changes_summary']):
|
|
raise HTTPException(
|
|
status_code=500,
|
|
detail=f"AI response missing required fields. Got: {list(analysis.keys())}"
|
|
)
|
|
|
|
return analysis
|
|
|
|
|
|
# ── Pipeline Config Management (Issue #28) ────────────────────────────────────
|
|
|
|
@router.get("/pipeline-configs")
|
|
def list_pipeline_configs(session: dict=Depends(require_auth)):
|
|
"""
|
|
List pipeline configurations.
|
|
- Admins: see ALL configs
|
|
- Users: see only active configs
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
is_admin = session.get('role') == 'admin'
|
|
|
|
if is_admin:
|
|
cur.execute("SELECT * FROM pipeline_configs ORDER BY is_default DESC, name")
|
|
else:
|
|
cur.execute("SELECT * FROM pipeline_configs WHERE active=true ORDER BY is_default DESC, name")
|
|
|
|
return [r2d(r) for r in cur.fetchall()]
|
|
|
|
|
|
@router.post("/pipeline-configs")
|
|
def create_pipeline_config(p: PipelineConfigCreate, session: dict=Depends(require_admin)):
|
|
"""Create new pipeline configuration (admin only)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Check if name already exists
|
|
cur.execute("SELECT id FROM pipeline_configs WHERE name=%s", (p.name,))
|
|
if cur.fetchone():
|
|
raise HTTPException(status_code=400, detail=f"Pipeline config with name '{p.name}' already exists")
|
|
|
|
# Validate: stage prompts must exist
|
|
all_slugs = p.stage1_prompts + [p.stage2_prompt]
|
|
if p.stage3_prompt:
|
|
all_slugs.append(p.stage3_prompt)
|
|
|
|
for slug in all_slugs:
|
|
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (slug,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=400, detail=f"Prompt '{slug}' does not exist")
|
|
|
|
# If is_default=true, unset other defaults
|
|
if p.is_default:
|
|
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true")
|
|
|
|
config_id = str(uuid.uuid4())
|
|
cur.execute(
|
|
"""INSERT INTO pipeline_configs (
|
|
id, name, description, is_default, active,
|
|
modules, timeframes, stage1_prompts, stage2_prompt, stage3_prompt,
|
|
created, updated
|
|
) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, CURRENT_TIMESTAMP, CURRENT_TIMESTAMP)""",
|
|
(
|
|
config_id, p.name, p.description, p.is_default, p.active,
|
|
json.dumps(p.modules), json.dumps(p.timeframes),
|
|
p.stage1_prompts, p.stage2_prompt, p.stage3_prompt
|
|
)
|
|
)
|
|
|
|
return {"id": config_id, "name": p.name}
|
|
|
|
|
|
@router.put("/pipeline-configs/{config_id}")
|
|
def update_pipeline_config(config_id: str, p: PipelineConfigUpdate, session: dict=Depends(require_admin)):
|
|
"""Update pipeline configuration (admin only)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Check if config exists
|
|
cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Pipeline config not found")
|
|
|
|
# Build dynamic UPDATE query
|
|
updates = []
|
|
values = []
|
|
|
|
if p.name is not None:
|
|
updates.append('name=%s')
|
|
values.append(p.name)
|
|
if p.description is not None:
|
|
updates.append('description=%s')
|
|
values.append(p.description)
|
|
if p.is_default is not None:
|
|
# If setting to default, unset others
|
|
if p.is_default:
|
|
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true AND id!=%s", (config_id,))
|
|
updates.append('is_default=%s')
|
|
values.append(p.is_default)
|
|
if p.active is not None:
|
|
updates.append('active=%s')
|
|
values.append(p.active)
|
|
if p.modules is not None:
|
|
updates.append('modules=%s')
|
|
values.append(json.dumps(p.modules))
|
|
if p.timeframes is not None:
|
|
updates.append('timeframes=%s')
|
|
values.append(json.dumps(p.timeframes))
|
|
if p.stage1_prompts is not None:
|
|
updates.append('stage1_prompts=%s')
|
|
values.append(p.stage1_prompts)
|
|
if p.stage2_prompt is not None:
|
|
updates.append('stage2_prompt=%s')
|
|
values.append(p.stage2_prompt)
|
|
if p.stage3_prompt is not None:
|
|
updates.append('stage3_prompt=%s')
|
|
values.append(p.stage3_prompt)
|
|
|
|
if not updates:
|
|
return {"ok": True}
|
|
|
|
cur.execute(
|
|
f"UPDATE pipeline_configs SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s",
|
|
values + [config_id]
|
|
)
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.delete("/pipeline-configs/{config_id}")
|
|
def delete_pipeline_config(config_id: str, session: dict=Depends(require_admin)):
|
|
"""Delete pipeline configuration (admin only)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Check if it's the only default
|
|
cur.execute("SELECT is_default FROM pipeline_configs WHERE id=%s", (config_id,))
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Pipeline config not found")
|
|
|
|
if row['is_default']:
|
|
# Check if there are other configs
|
|
cur.execute("SELECT COUNT(*) as count FROM pipeline_configs WHERE id!=%s", (config_id,))
|
|
if cur.fetchone()['count'] > 0:
|
|
raise HTTPException(
|
|
status_code=400,
|
|
detail="Cannot delete the default config. Please set another config as default first."
|
|
)
|
|
|
|
cur.execute("DELETE FROM pipeline_configs WHERE id=%s", (config_id,))
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/pipeline-configs/{config_id}/set-default")
|
|
def set_default_pipeline_config(config_id: str, session: dict=Depends(require_admin)):
|
|
"""Set a pipeline config as default (admin only)."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Check if config exists
|
|
cur.execute("SELECT id FROM pipeline_configs WHERE id=%s", (config_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Pipeline config not found")
|
|
|
|
# Unset all other defaults
|
|
cur.execute("UPDATE pipeline_configs SET is_default=false WHERE is_default=true")
|
|
|
|
# Set this one as default
|
|
cur.execute("UPDATE pipeline_configs SET is_default=true, updated=CURRENT_TIMESTAMP WHERE id=%s", (config_id,))
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
@router.post("/{prompt_id}/reset-to-default")
|
|
def reset_prompt_to_default(prompt_id: str, session: dict=Depends(require_admin)):
|
|
"""
|
|
Reset a system prompt to its default template (admin only).
|
|
Only works for prompts with is_system_default=true.
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
cur.execute("SELECT is_system_default, default_template FROM ai_prompts WHERE id=%s", (prompt_id,))
|
|
row = cur.fetchone()
|
|
|
|
if not row:
|
|
raise HTTPException(status_code=404, detail="Prompt not found")
|
|
|
|
if not row['is_system_default']:
|
|
raise HTTPException(status_code=400, detail="Only system prompts can be reset to default")
|
|
|
|
if not row['default_template']:
|
|
raise HTTPException(status_code=400, detail="No default template available for this prompt")
|
|
|
|
# Reset template to default
|
|
cur.execute(
|
|
"UPDATE ai_prompts SET template=%s, updated=CURRENT_TIMESTAMP WHERE id=%s",
|
|
(row['default_template'], prompt_id)
|
|
)
|
|
|
|
return {"ok": True}
|
|
|
|
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
# UNIFIED PROMPT SYSTEM (Issue #28 Phase 2)
|
|
# ══════════════════════════════════════════════════════════════════════════════
|
|
|
|
from prompt_executor import execute_prompt_with_data
|
|
from models import UnifiedPromptCreate, UnifiedPromptUpdate
|
|
|
|
|
|
@router.post("/execute")
|
|
async def execute_unified_prompt(
|
|
prompt_slug: str,
|
|
modules: Optional[dict] = None,
|
|
timeframes: Optional[dict] = None,
|
|
debug: bool = False,
|
|
save: bool = False,
|
|
session: dict = Depends(require_auth)
|
|
):
|
|
"""
|
|
Execute a unified prompt (base or pipeline type).
|
|
|
|
Args:
|
|
prompt_slug: Slug of prompt to execute
|
|
modules: Dict of enabled modules (e.g., {"körper": true})
|
|
timeframes: Dict of timeframes per module (e.g., {"körper": 30})
|
|
debug: If true, include debug information (placeholders, final prompts, etc.)
|
|
save: If true, save result to ai_insights table
|
|
|
|
Returns:
|
|
Execution result with outputs (and debug info if debug=true)
|
|
"""
|
|
profile_id = session['profile_id']
|
|
|
|
# Use default modules/timeframes if not provided
|
|
if not modules:
|
|
modules = {
|
|
'körper': True,
|
|
'ernährung': True,
|
|
'training': True,
|
|
'schlaf': True,
|
|
'vitalwerte': True
|
|
}
|
|
|
|
if not timeframes:
|
|
timeframes = {
|
|
'körper': 30,
|
|
'ernährung': 30,
|
|
'training': 14,
|
|
'schlaf': 14,
|
|
'vitalwerte': 7
|
|
}
|
|
|
|
# Execute with prompt_executor
|
|
# Always enable debug when saving to collect metadata for value table
|
|
result = await execute_prompt_with_data(
|
|
prompt_slug=prompt_slug,
|
|
profile_id=profile_id,
|
|
modules=modules,
|
|
timeframes=timeframes,
|
|
openrouter_call_func=call_openrouter,
|
|
enable_debug=debug or save # Enable debug if saving for metadata collection
|
|
)
|
|
|
|
# Save to ai_insights if requested
|
|
if save:
|
|
# Extract final output text/markdown
|
|
if result['type'] == 'pipeline':
|
|
# For pipeline, get the last stage's output
|
|
final_output = result.get('output', {})
|
|
# If output is dict with single key, use that value
|
|
if isinstance(final_output, dict) and len(final_output) == 1:
|
|
content = list(final_output.values())[0]
|
|
else:
|
|
content = json.dumps(final_output, ensure_ascii=False)
|
|
elif result['type'] == 'workflow':
|
|
content = _workflow_user_facing_content(result.get('aggregated_result'))
|
|
else:
|
|
# For base prompts, use output directly
|
|
content = result.get('output', '')
|
|
if isinstance(content, dict):
|
|
content = json.dumps(content, ensure_ascii=False)
|
|
|
|
# Prepare metadata with resolved placeholders and descriptions
|
|
from placeholder_resolver import get_placeholder_catalog, get_placeholder_example_values
|
|
|
|
metadata = {
|
|
'prompt_type': result['type'],
|
|
'placeholders': {}
|
|
}
|
|
|
|
# Collect all resolved placeholders from debug info
|
|
if result.get('debug'):
|
|
catalog = get_placeholder_catalog(profile_id)
|
|
|
|
# Get full untruncated values from placeholder resolver
|
|
full_values = get_placeholder_example_values(profile_id)
|
|
# Remove {{ }} wrappers
|
|
cleaned_values = {
|
|
key.replace('{{', '').replace('}}', ''): value
|
|
for key, value in full_values.items()
|
|
}
|
|
|
|
if result['type'] == 'base':
|
|
# Base prompt: single set of placeholders
|
|
resolved_keys = result['debug'].get('resolved_placeholders', {}).keys()
|
|
for key in resolved_keys:
|
|
# Get full untruncated value
|
|
value = cleaned_values.get(key, result['debug']['resolved_placeholders'].get(key, ''))
|
|
|
|
# Find description and category in catalog
|
|
desc = None
|
|
category = 'Sonstiges'
|
|
for cat_name, cat_items in catalog.items():
|
|
matching = [item for item in cat_items if item['key'] == key]
|
|
if matching:
|
|
desc = matching[0].get('description', '')
|
|
category = cat_name
|
|
break
|
|
|
|
metadata['placeholders'][key] = {
|
|
'value': value,
|
|
'description': desc or '',
|
|
'category': category
|
|
}
|
|
|
|
elif result['type'] == 'pipeline':
|
|
# Pipeline: collect from all stages
|
|
stages_debug = result['debug'].get('stages', [])
|
|
|
|
# First, collect stage outputs (outputs from base prompts in each stage)
|
|
stage_outputs = {} # Raw stage outputs (for expert mode)
|
|
extracted_values = {} # Individual values extracted from JSON outputs (for normal mode)
|
|
|
|
for stage_debug in stages_debug:
|
|
stage_num = stage_debug.get('stage', 0)
|
|
stage_output = stage_debug.get('output', {})
|
|
if isinstance(stage_output, dict):
|
|
for output_key, output_value in stage_output.items():
|
|
# Store raw stage output (for expert mode)
|
|
placeholder_key = f"stage_{stage_num}_{output_key}"
|
|
stage_outputs[placeholder_key] = output_value
|
|
|
|
# If output is a dict/object, extract individual fields
|
|
if isinstance(output_value, dict):
|
|
for field_key, field_value in output_value.items():
|
|
# Store individual field (for normal mode)
|
|
# Use just the field name as key (e.g., "bmi" instead of "stage_1_body.bmi")
|
|
# This allows deduplication if multiple stages have the same field
|
|
if field_key not in extracted_values:
|
|
extracted_values[field_key] = {
|
|
'value': field_value if isinstance(field_value, str) else json.dumps(field_value, ensure_ascii=False),
|
|
'source_stage': stage_num,
|
|
'source_output': output_key
|
|
}
|
|
|
|
# Add extracted values from stage outputs (individual fields)
|
|
for field_key, field_data in extracted_values.items():
|
|
if field_key not in metadata['placeholders']:
|
|
# Determine category for extracted values
|
|
output_name = field_data['source_output'].replace('stage1_', '').replace('_', ' ').title()
|
|
category = f"Stage {field_data['source_stage']} - {output_name}"
|
|
|
|
metadata['placeholders'][field_key] = {
|
|
'value': field_data['value'],
|
|
'description': f"Aus Stage {field_data['source_stage']} ({field_data['source_output']})",
|
|
'is_extracted': True, # Mark as extracted for filtering
|
|
'category': category
|
|
}
|
|
|
|
# Add all stage outputs (raw JSON) for expert mode - regardless of whether referenced
|
|
for stage_key, stage_value in stage_outputs.items():
|
|
if stage_key not in metadata['placeholders']:
|
|
stage_parts = stage_key.split('_')
|
|
stage_num = stage_parts[1] if len(stage_parts) > 1 else '?'
|
|
output_name = '_'.join(stage_parts[2:]) if len(stage_parts) > 2 else 'output'
|
|
|
|
metadata['placeholders'][stage_key] = {
|
|
'value': json.dumps(stage_value, ensure_ascii=False, indent=2) if isinstance(stage_value, dict) else str(stage_value),
|
|
'description': f"Zwischenergebnis aus Stage {stage_num} ({output_name})",
|
|
'is_stage_raw': True,
|
|
'category': f"Stage {stage_num} - Rohdaten"
|
|
}
|
|
|
|
# Collect all resolved placeholders from prompts (input placeholders)
|
|
for stage_debug in stages_debug:
|
|
for prompt_debug in stage_debug.get('prompts', []):
|
|
resolved_keys = []
|
|
# Check both direct and ref_debug
|
|
if 'resolved_placeholders' in prompt_debug:
|
|
resolved_keys = prompt_debug['resolved_placeholders'].keys()
|
|
elif 'ref_debug' in prompt_debug and 'resolved_placeholders' in prompt_debug['ref_debug']:
|
|
resolved_keys = prompt_debug['ref_debug']['resolved_placeholders'].keys()
|
|
|
|
for key in resolved_keys:
|
|
if key not in metadata['placeholders']: # Avoid duplicates
|
|
# Get value from cleaned_values
|
|
value = cleaned_values.get(key, '')
|
|
|
|
# Find description and category in catalog
|
|
desc = None
|
|
category = 'Sonstiges'
|
|
for cat_name, cat_items in catalog.items():
|
|
matching = [item for item in cat_items if item['key'] == key]
|
|
if matching:
|
|
desc = matching[0].get('description', '')
|
|
category = cat_name
|
|
break
|
|
desc = desc or ''
|
|
|
|
metadata['placeholders'][key] = {
|
|
'value': value if isinstance(value, str) else json.dumps(value, ensure_ascii=False),
|
|
'description': desc,
|
|
'category': category
|
|
}
|
|
|
|
# Save to database with metadata
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""INSERT INTO ai_insights (id, profile_id, scope, content, metadata, created)
|
|
VALUES (%s, %s, %s, %s, %s, CURRENT_TIMESTAMP)""",
|
|
(str(uuid.uuid4()), profile_id, prompt_slug, content, json.dumps(metadata))
|
|
)
|
|
conn.commit()
|
|
|
|
return result
|
|
|
|
|
|
@router.post("/unified")
|
|
def create_unified_prompt(p: UnifiedPromptCreate, session: dict = Depends(require_admin)):
|
|
"""
|
|
Create a new unified prompt (base, pipeline, or workflow type).
|
|
Admin only.
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Auto-generate slug if not provided (for workflows)
|
|
if not p.slug:
|
|
import re
|
|
base_slug = re.sub(r'[^a-z0-9_]+', '_', p.name.lower()).strip('_')
|
|
p.slug = f"{base_slug}_{uuid.uuid4().hex[:6]}"
|
|
|
|
# Check for duplicate slug
|
|
cur.execute("SELECT id FROM ai_prompts WHERE slug=%s", (p.slug,))
|
|
if cur.fetchone():
|
|
raise HTTPException(status_code=400, detail="Slug already exists")
|
|
|
|
# Validate type
|
|
if p.type not in ['base', 'pipeline', 'workflow']:
|
|
raise HTTPException(status_code=400, detail="Type must be 'base', 'pipeline', or 'workflow'")
|
|
|
|
# Validate base type has template
|
|
if p.type == 'base' and not p.template:
|
|
raise HTTPException(status_code=400, detail="Base prompts require a template")
|
|
|
|
# Validate pipeline type has stages
|
|
if p.type == 'pipeline' and not p.stages:
|
|
raise HTTPException(status_code=400, detail="Pipeline prompts require stages")
|
|
|
|
# Validate workflow type has graph_data
|
|
if p.type == 'workflow' and not p.graph_data:
|
|
raise HTTPException(status_code=400, detail="Workflow prompts require graph_data")
|
|
|
|
# Convert stages to JSONB
|
|
stages_json = None
|
|
if p.stages:
|
|
stages_json = json.dumps([
|
|
{
|
|
'stage': s.stage,
|
|
'prompts': [
|
|
{
|
|
'source': pr.source,
|
|
'slug': pr.slug,
|
|
'template': pr.template,
|
|
'output_key': pr.output_key,
|
|
'output_format': pr.output_format,
|
|
'output_schema': pr.output_schema
|
|
}
|
|
for pr in s.prompts
|
|
]
|
|
}
|
|
for s in p.stages
|
|
])
|
|
|
|
prompt_id = str(uuid.uuid4())
|
|
|
|
# Convert graph_data to JSONB
|
|
graph_data_json = None
|
|
if p.graph_data:
|
|
graph_data_json = json.dumps(p.graph_data)
|
|
|
|
cur.execute(
|
|
"""INSERT INTO ai_prompts
|
|
(id, slug, name, display_name, description, template, category, active, sort_order,
|
|
type, stages, output_format, output_schema, graph_data)
|
|
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)""",
|
|
(
|
|
prompt_id, p.slug, p.name, p.display_name, p.description,
|
|
p.template, p.category, p.active, p.sort_order,
|
|
p.type, stages_json, p.output_format,
|
|
json.dumps(p.output_schema) if p.output_schema else None,
|
|
graph_data_json
|
|
)
|
|
)
|
|
|
|
return {"id": prompt_id, "slug": p.slug}
|
|
|
|
|
|
@router.put("/unified/{prompt_id}")
|
|
def update_unified_prompt(prompt_id: str, p: UnifiedPromptUpdate, session: dict = Depends(require_admin)):
|
|
"""
|
|
Update a unified prompt.
|
|
Admin only.
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Check if exists
|
|
cur.execute("SELECT id FROM ai_prompts WHERE id=%s", (prompt_id,))
|
|
if not cur.fetchone():
|
|
raise HTTPException(status_code=404, detail="Prompt not found")
|
|
|
|
# Build update query
|
|
updates = []
|
|
values = []
|
|
|
|
if p.name is not None:
|
|
updates.append('name=%s')
|
|
values.append(p.name)
|
|
if p.display_name is not None:
|
|
updates.append('display_name=%s')
|
|
values.append(p.display_name)
|
|
if p.description is not None:
|
|
updates.append('description=%s')
|
|
values.append(p.description)
|
|
if p.type is not None:
|
|
if p.type not in ['base', 'pipeline', 'workflow']:
|
|
raise HTTPException(status_code=400, detail="Type must be 'base', 'pipeline', or 'workflow'")
|
|
updates.append('type=%s')
|
|
values.append(p.type)
|
|
if p.category is not None:
|
|
updates.append('category=%s')
|
|
values.append(p.category)
|
|
if p.active is not None:
|
|
updates.append('active=%s')
|
|
values.append(p.active)
|
|
if p.sort_order is not None:
|
|
updates.append('sort_order=%s')
|
|
values.append(p.sort_order)
|
|
if p.template is not None:
|
|
updates.append('template=%s')
|
|
values.append(p.template)
|
|
if p.output_format is not None:
|
|
updates.append('output_format=%s')
|
|
values.append(p.output_format)
|
|
if p.output_schema is not None:
|
|
updates.append('output_schema=%s')
|
|
values.append(json.dumps(p.output_schema))
|
|
if p.stages is not None:
|
|
stages_json = json.dumps([
|
|
{
|
|
'stage': s.stage,
|
|
'prompts': [
|
|
{
|
|
'source': pr.source,
|
|
'slug': pr.slug,
|
|
'template': pr.template,
|
|
'output_key': pr.output_key,
|
|
'output_format': pr.output_format,
|
|
'output_schema': pr.output_schema
|
|
}
|
|
for pr in s.prompts
|
|
]
|
|
}
|
|
for s in p.stages
|
|
])
|
|
updates.append('stages=%s')
|
|
values.append(stages_json)
|
|
if p.graph_data is not None:
|
|
updates.append('graph_data=%s')
|
|
values.append(json.dumps(p.graph_data))
|
|
|
|
if not updates:
|
|
return {"ok": True}
|
|
|
|
cur.execute(
|
|
f"UPDATE ai_prompts SET {', '.join(updates)}, updated=CURRENT_TIMESTAMP WHERE id=%s",
|
|
values + [prompt_id]
|
|
)
|
|
|
|
return {"ok": True}
|