mitai-jinkendo/backend/routers/focus_areas.py
Lars e7dedd527f
All checks were successful
Deploy Development / deploy (push) Successful in 51s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
feat: Implement focus area usage types management in API and UI
- Added endpoints for listing and updating focus area usage types in the backend.
- Enhanced the AdminFocusAreasPage to display and manage allowed usage types for focus areas.
- Introduced a new state for usage types catalog and integrated it into the focus area editing process.
- Updated API utility functions to support new usage types operations.
2026-04-06 07:28:19 +02:00

488 lines
16 KiB
Python

"""
Focus Areas Router
Manages dynamic focus area definitions and user preferences
"""
from fastapi import APIRouter, HTTPException, Depends
from pydantic import BaseModel, Field
from typing import Optional, List
from db import get_db, get_cursor, r2d
from auth import require_auth
from focus_area_usage_helpers import coerce_usage_type_keys
router = APIRouter(prefix="/api/focus-areas", tags=["focus-areas"])
# ============================================================================
# Models
# ============================================================================
class FocusAreaCreate(BaseModel):
"""Create new focus area definition"""
key: str
name_de: str
name_en: Optional[str] = None
icon: Optional[str] = None
description: Optional[str] = None
category: str = 'custom'
class FocusAreaUpdate(BaseModel):
"""Update focus area definition"""
name_de: Optional[str] = None
name_en: Optional[str] = None
icon: Optional[str] = None
description: Optional[str] = None
category: Optional[str] = None
is_active: Optional[bool] = None
class UserFocusPreferences(BaseModel):
"""User's focus area weightings (dynamic)"""
preferences: dict # {focus_area_id: weight_pct}
class FocusAreaUsageTypesUpdate(BaseModel):
"""Replace all usage-type assignments for one focus area (admin)."""
usage_type_keys: List[str] = Field(default_factory=list)
# ============================================================================
# Focus Area Definitions (Admin)
# ============================================================================
@router.get("/definitions")
def list_focus_area_definitions(
session: dict = Depends(require_auth),
include_inactive: bool = False
):
"""
List all available focus area definitions.
Query params:
- include_inactive: Include inactive focus areas (default: false)
Returns focus areas grouped by category.
"""
with get_db() as conn:
cur = get_cursor(conn)
query = """
SELECT id, key, name_de, name_en, icon, description, category, is_active,
created_at, updated_at,
COALESCE(
(
SELECT json_agg(faut.key ORDER BY faut.sort_order, faut.key)
FROM focus_area_definition_usage_types fadut
JOIN focus_area_usage_types faut ON faut.id = fadut.usage_type_id
WHERE fadut.focus_area_id = focus_area_definitions.id
),
'[]'::json
) AS allowed_usage_type_keys
FROM focus_area_definitions
WHERE is_active = true OR %s
ORDER BY category, name_de
"""
cur.execute(query, (include_inactive,))
areas = []
for row in cur.fetchall():
d = r2d(row)
d['allowed_usage_type_keys'] = coerce_usage_type_keys(d.get('allowed_usage_type_keys'))
areas.append(d)
# Group by category
grouped = {}
for area in areas:
cat = area['category'] or 'other'
if cat not in grouped:
grouped[cat] = []
grouped[cat].append(area)
if session.get('role') != 'admin':
for area in areas:
area.pop('allowed_usage_type_keys', None)
return {
"areas": areas,
"grouped": grouped,
"total": len(areas)
}
@router.post("/definitions")
def create_focus_area_definition(
data: FocusAreaCreate,
session: dict = Depends(require_auth)
):
"""
Create new focus area definition (Admin only).
Note: Requires admin role.
"""
# Admin check
if session.get('role') != 'admin':
raise HTTPException(status_code=403, detail="Admin-Rechte erforderlich")
with get_db() as conn:
cur = get_cursor(conn)
# Check if key already exists
cur.execute(
"SELECT id FROM focus_area_definitions WHERE key = %s",
(data.key,)
)
if cur.fetchone():
raise HTTPException(
status_code=400,
detail=f"Focus Area mit Key '{data.key}' existiert bereits"
)
# Insert
cur.execute("""
INSERT INTO focus_area_definitions
(key, name_de, name_en, icon, description, category)
VALUES (%s, %s, %s, %s, %s, %s)
RETURNING id
""", (
data.key, data.name_de, data.name_en,
data.icon, data.description, data.category
))
area_id = cur.fetchone()['id']
return {
"id": area_id,
"message": f"Focus Area '{data.name_de}' erstellt"
}
@router.put("/definitions/{area_id}")
def update_focus_area_definition(
area_id: str,
data: FocusAreaUpdate,
session: dict = Depends(require_auth)
):
"""Update focus area definition (Admin only)"""
# Admin check
if session.get('role') != 'admin':
raise HTTPException(status_code=403, detail="Admin-Rechte erforderlich")
with get_db() as conn:
cur = get_cursor(conn)
# Build dynamic UPDATE
updates = []
values = []
if data.name_de is not None:
updates.append("name_de = %s")
values.append(data.name_de)
if data.name_en is not None:
updates.append("name_en = %s")
values.append(data.name_en)
if data.icon is not None:
updates.append("icon = %s")
values.append(data.icon)
if data.description is not None:
updates.append("description = %s")
values.append(data.description)
if data.category is not None:
updates.append("category = %s")
values.append(data.category)
if data.is_active is not None:
updates.append("is_active = %s")
values.append(data.is_active)
if not updates:
raise HTTPException(status_code=400, detail="Keine Änderungen angegeben")
updates.append("updated_at = NOW()")
values.append(area_id)
query = f"""
UPDATE focus_area_definitions
SET {', '.join(updates)}
WHERE id = %s
RETURNING id
"""
cur.execute(query, values)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Focus Area nicht gefunden")
return {"message": "Focus Area aktualisiert"}
@router.delete("/definitions/{area_id}")
def delete_focus_area_definition(
area_id: str,
session: dict = Depends(require_auth)
):
"""
Delete focus area definition (Admin only).
Cascades: Deletes all goal_focus_contributions referencing this area.
"""
# Admin check
if session.get('role') != 'admin':
raise HTTPException(status_code=403, detail="Admin-Rechte erforderlich")
with get_db() as conn:
cur = get_cursor(conn)
# Check if area is used
cur.execute(
"SELECT COUNT(*) as count FROM goal_focus_contributions WHERE focus_area_id = %s",
(area_id,)
)
count = cur.fetchone()['count']
if count > 0:
raise HTTPException(
status_code=400,
detail=f"Focus Area wird von {count} Ziel(en) verwendet. "
"Bitte erst Zuordnungen entfernen oder auf 'inaktiv' setzen."
)
# Delete
cur.execute(
"DELETE FROM focus_area_definitions WHERE id = %s RETURNING id",
(area_id,)
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Focus Area nicht gefunden")
return {"message": "Focus Area gelöscht"}
@router.get("/usage-types")
def list_focus_area_usage_types(session: dict = Depends(require_auth)):
"""
Liste aller systemdefinierten Nutzungstypen (Admin, Konfigurations-UI).
Keine freie Anlage neuer Typen über die API.
"""
if session.get('role') != 'admin':
raise HTTPException(status_code=403, detail="Admin-Rechte erforderlich")
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT id, key, label_de, sort_order
FROM focus_area_usage_types
ORDER BY sort_order, key
""")
rows = [r2d(r) for r in cur.fetchall()]
return {"usage_types": rows, "total": len(rows)}
@router.put("/definitions/{area_id}/usage-types")
def replace_focus_area_usage_types(
area_id: str,
data: FocusAreaUsageTypesUpdate,
session: dict = Depends(require_auth),
):
"""
Ersetzt die Nutzungstyp-Zuweisungen einer Focus Area (Admin).
Leere Liste entfernt alle Zuordnungen. Unbekannte Keys → 400.
"""
if session.get('role') != 'admin':
raise HTTPException(status_code=403, detail="Admin-Rechte erforderlich")
keys = list(dict.fromkeys(data.usage_type_keys)) # dedupe, preserve order
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT id FROM focus_area_definitions WHERE id = %s",
(area_id,),
)
if not cur.fetchone():
raise HTTPException(status_code=404, detail="Focus Area nicht gefunden")
if not keys:
cur.execute(
"DELETE FROM focus_area_definition_usage_types WHERE focus_area_id = %s",
(area_id,),
)
return {"message": "Nutzungstyp-Zuweisungen entfernt", "usage_type_keys": []}
placeholders = ','.join(['%s'] * len(keys))
cur.execute(
f"""
SELECT id, key FROM focus_area_usage_types
WHERE key IN ({placeholders})
""",
keys,
)
found = {row['key']: row['id'] for row in cur.fetchall()}
missing = [k for k in keys if k not in found]
if missing:
raise HTTPException(
status_code=400,
detail=f"Unbekannte Nutzungstyp-Keys: {', '.join(missing)}",
)
cur.execute(
"DELETE FROM focus_area_definition_usage_types WHERE focus_area_id = %s",
(area_id,),
)
for k in keys:
ut_id = found[k]
cur.execute(
"""
INSERT INTO focus_area_definition_usage_types (focus_area_id, usage_type_id)
VALUES (%s, %s)
ON CONFLICT (focus_area_id, usage_type_id) DO NOTHING
""",
(area_id, ut_id),
)
return {"message": "Nutzungstyp-Zuweisungen aktualisiert", "usage_type_keys": keys}
# ============================================================================
# User Focus Preferences
# ============================================================================
@router.get("/user-preferences")
def get_user_focus_preferences(session: dict = Depends(require_auth)):
"""
Get user's focus area weightings (dynamic system).
Returns focus areas with user-set weights, grouped by category.
"""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
# Get dynamic preferences (Migration 032)
try:
cur.execute("""
SELECT
fa.id, fa.key, fa.name_de, fa.name_en, fa.icon,
fa.category, fa.description,
ufw.weight
FROM user_focus_area_weights ufw
JOIN focus_area_definitions fa ON ufw.focus_area_id = fa.id
WHERE ufw.profile_id = %s AND ufw.weight > 0
ORDER BY fa.category, fa.name_de
""", (pid,))
weights = [r2d(row) for row in cur.fetchall()]
# Calculate percentages from weights
total_weight = sum(w['weight'] for w in weights)
if total_weight > 0:
for w in weights:
w['percentage'] = round((w['weight'] / total_weight) * 100)
else:
for w in weights:
w['percentage'] = 0
# Group by category
grouped = {}
for w in weights:
cat = w['category'] or 'other'
if cat not in grouped:
grouped[cat] = []
grouped[cat].append(w)
return {
"weights": weights,
"grouped": grouped,
"total_weight": total_weight
}
except Exception as e:
# Migration 032 not applied yet - return empty
print(f"[WARNING] user_focus_area_weights not found: {e}")
return {
"weights": [],
"grouped": {},
"total_weight": 0
}
@router.put("/user-preferences")
def update_user_focus_preferences(
data: dict,
session: dict = Depends(require_auth)
):
"""
Update user's focus area weightings (dynamic system).
Expects: { "weights": { "focus_area_id": weight, ... } }
Weights are relative (0-100), normalized in display only.
"""
pid = session['profile_id']
if 'weights' not in data:
raise HTTPException(status_code=400, detail="'weights' field required")
weights = data['weights'] # Dict: focus_area_id → weight
with get_db() as conn:
cur = get_cursor(conn)
# Delete existing weights
cur.execute(
"DELETE FROM user_focus_area_weights WHERE profile_id = %s",
(pid,)
)
# Insert new weights (only non-zero)
for focus_area_id, weight in weights.items():
weight_int = int(weight)
if weight_int > 0:
cur.execute("""
INSERT INTO user_focus_area_weights
(profile_id, focus_area_id, weight)
VALUES (%s, %s, %s)
ON CONFLICT (profile_id, focus_area_id)
DO UPDATE SET
weight = EXCLUDED.weight,
updated_at = NOW()
""", (pid, focus_area_id, weight_int))
return {
"message": "Focus Area Gewichtungen aktualisiert",
"count": len([w for w in weights.values() if int(w) > 0])
}
# ============================================================================
# Stats & Analytics
# ============================================================================
@router.get("/stats")
def get_focus_area_stats(session: dict = Depends(require_auth)):
"""
Get focus area statistics for current user.
Returns:
- Progress per focus area (avg of all contributing goals)
- Goal count per focus area
- Top/bottom performing areas
"""
pid = session['profile_id']
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("""
SELECT
fa.id, fa.key, fa.name_de, fa.icon, fa.category,
COUNT(DISTINCT gfc.goal_id) as goal_count,
AVG(g.progress_pct) as avg_progress,
SUM(gfc.contribution_weight) as total_contribution
FROM focus_area_definitions fa
LEFT JOIN goal_focus_contributions gfc ON fa.id = gfc.focus_area_id
LEFT JOIN goals g ON gfc.goal_id = g.id AND g.profile_id = %s
WHERE fa.is_active = true
GROUP BY fa.id
HAVING COUNT(DISTINCT gfc.goal_id) > 0 -- Only areas with goals
ORDER BY avg_progress DESC NULLS LAST
""", (pid,))
stats = [r2d(row) for row in cur.fetchall()]
return {
"stats": stats,
"top_area": stats[0] if stats else None,
"bottom_area": stats[-1] if len(stats) > 1 else None
}