Replaces hardcoded mappings with database-driven, self-learning system. Backend: - Migration 007: activity_type_mappings table - Supports global and user-specific mappings - Seeded with 40+ default mappings (German + English) - Unique constraint: (activity_type, profile_id) - Refactored: get_training_type_for_activity() queries DB - Priority: user-specific → global → NULL - Bulk categorization now saves mapping automatically - Source: 'bulk' for learned mappings - admin_activity_mappings.py: Full CRUD endpoints - List, Get, Create, Update, Delete - Coverage stats endpoint - CSV import uses DB mappings (no hardcoded logic) Frontend: - AdminActivityMappingsPage: Full mapping management UI - Coverage stats (% mapped, unmapped count) - Filter: All / Global - Create/Edit/Delete mappings - Tip: System learns from bulk categorization - Added route + admin link - API methods: adminList/Get/Create/Update/DeleteActivityMapping Benefits: - No code changes needed for new activity types - System learns from user bulk categorizations - User-specific mappings override global defaults - Admin can manage all mappings via UI - Migration pre-populates 40+ common German/English types Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
220 lines
6.6 KiB
Python
220 lines
6.6 KiB
Python
"""
|
|
Admin Activity Type Mappings Management - v9d Phase 1b
|
|
|
|
CRUD operations for activity_type_mappings (learnable system).
|
|
"""
|
|
import logging
|
|
from typing import Optional
|
|
from fastapi import APIRouter, HTTPException, Depends
|
|
from pydantic import BaseModel
|
|
|
|
from db import get_db, get_cursor, r2d
|
|
from auth import require_admin
|
|
|
|
router = APIRouter(prefix="/api/admin/activity-mappings", tags=["admin", "activity-mappings"])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class ActivityMappingCreate(BaseModel):
|
|
activity_type: str
|
|
training_type_id: int
|
|
profile_id: Optional[str] = None
|
|
source: str = 'admin'
|
|
|
|
|
|
class ActivityMappingUpdate(BaseModel):
|
|
training_type_id: Optional[int] = None
|
|
profile_id: Optional[str] = None
|
|
source: Optional[str] = None
|
|
|
|
|
|
@router.get("")
|
|
def list_activity_mappings(
|
|
profile_id: Optional[str] = None,
|
|
global_only: bool = False,
|
|
session: dict = Depends(require_admin)
|
|
):
|
|
"""
|
|
Get all activity type mappings.
|
|
|
|
Filters:
|
|
- profile_id: Show only mappings for specific profile
|
|
- global_only: Show only global mappings (profile_id IS NULL)
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
query = """
|
|
SELECT m.id, m.activity_type, m.training_type_id, m.profile_id, m.source,
|
|
m.created_at, m.updated_at,
|
|
t.name_de as training_type_name_de,
|
|
t.category, t.subcategory, t.icon
|
|
FROM activity_type_mappings m
|
|
JOIN training_types t ON m.training_type_id = t.id
|
|
"""
|
|
|
|
conditions = []
|
|
params = []
|
|
|
|
if global_only:
|
|
conditions.append("m.profile_id IS NULL")
|
|
elif profile_id:
|
|
conditions.append("m.profile_id = %s")
|
|
params.append(profile_id)
|
|
|
|
if conditions:
|
|
query += " WHERE " + " AND ".join(conditions)
|
|
|
|
query += " ORDER BY m.activity_type"
|
|
|
|
cur.execute(query, params)
|
|
rows = cur.fetchall()
|
|
|
|
return [r2d(r) for r in rows]
|
|
|
|
|
|
@router.get("/{mapping_id}")
|
|
def get_activity_mapping(mapping_id: int, session: dict = Depends(require_admin)):
|
|
"""Get single activity mapping by ID."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("""
|
|
SELECT m.id, m.activity_type, m.training_type_id, m.profile_id, m.source,
|
|
m.created_at, m.updated_at,
|
|
t.name_de as training_type_name_de,
|
|
t.category, t.subcategory
|
|
FROM activity_type_mappings m
|
|
JOIN training_types t ON m.training_type_id = t.id
|
|
WHERE m.id = %s
|
|
""", (mapping_id,))
|
|
row = cur.fetchone()
|
|
|
|
if not row:
|
|
raise HTTPException(404, "Mapping not found")
|
|
|
|
return r2d(row)
|
|
|
|
|
|
@router.post("")
|
|
def create_activity_mapping(data: ActivityMappingCreate, session: dict = Depends(require_admin)):
|
|
"""
|
|
Create new activity type mapping.
|
|
|
|
Note: Duplicate (activity_type, profile_id) will fail with 409 Conflict.
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
try:
|
|
cur.execute("""
|
|
INSERT INTO activity_type_mappings
|
|
(activity_type, training_type_id, profile_id, source)
|
|
VALUES (%s, %s, %s, %s)
|
|
RETURNING id
|
|
""", (
|
|
data.activity_type,
|
|
data.training_type_id,
|
|
data.profile_id,
|
|
data.source
|
|
))
|
|
|
|
new_id = cur.fetchone()['id']
|
|
|
|
logger.info(f"[ADMIN] Mapping created: {data.activity_type} → training_type_id {data.training_type_id} (profile: {data.profile_id})")
|
|
|
|
except Exception as e:
|
|
if 'unique_activity_type_per_profile' in str(e):
|
|
raise HTTPException(409, f"Mapping for '{data.activity_type}' already exists (profile: {data.profile_id})")
|
|
raise HTTPException(400, f"Failed to create mapping: {str(e)}")
|
|
|
|
return {"id": new_id, "message": "Mapping created"}
|
|
|
|
|
|
@router.put("/{mapping_id}")
|
|
def update_activity_mapping(
|
|
mapping_id: int,
|
|
data: ActivityMappingUpdate,
|
|
session: dict = Depends(require_admin)
|
|
):
|
|
"""Update existing activity type mapping."""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
# Build update query dynamically
|
|
updates = []
|
|
values = []
|
|
|
|
if data.training_type_id is not None:
|
|
updates.append("training_type_id = %s")
|
|
values.append(data.training_type_id)
|
|
if data.profile_id is not None:
|
|
updates.append("profile_id = %s")
|
|
values.append(data.profile_id)
|
|
if data.source is not None:
|
|
updates.append("source = %s")
|
|
values.append(data.source)
|
|
|
|
if not updates:
|
|
raise HTTPException(400, "No fields to update")
|
|
|
|
updates.append("updated_at = CURRENT_TIMESTAMP")
|
|
values.append(mapping_id)
|
|
|
|
cur.execute(f"""
|
|
UPDATE activity_type_mappings
|
|
SET {', '.join(updates)}
|
|
WHERE id = %s
|
|
""", values)
|
|
|
|
if cur.rowcount == 0:
|
|
raise HTTPException(404, "Mapping not found")
|
|
|
|
logger.info(f"[ADMIN] Mapping updated: {mapping_id}")
|
|
|
|
return {"id": mapping_id, "message": "Mapping updated"}
|
|
|
|
|
|
@router.delete("/{mapping_id}")
|
|
def delete_activity_mapping(mapping_id: int, session: dict = Depends(require_admin)):
|
|
"""
|
|
Delete activity type mapping.
|
|
|
|
This will cause future imports to NOT auto-assign training type for this activity_type.
|
|
Existing activities with this mapping remain unchanged.
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
cur.execute("DELETE FROM activity_type_mappings WHERE id = %s", (mapping_id,))
|
|
|
|
if cur.rowcount == 0:
|
|
raise HTTPException(404, "Mapping not found")
|
|
|
|
logger.info(f"[ADMIN] Mapping deleted: {mapping_id}")
|
|
|
|
return {"message": "Mapping deleted"}
|
|
|
|
|
|
@router.get("/stats/coverage")
|
|
def get_mapping_coverage(session: dict = Depends(require_admin)):
|
|
"""
|
|
Get statistics about mapping coverage.
|
|
|
|
Returns how many activities are mapped vs unmapped across all profiles.
|
|
"""
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
|
|
cur.execute("""
|
|
SELECT
|
|
COUNT(*) as total_activities,
|
|
COUNT(training_type_id) as mapped_activities,
|
|
COUNT(*) - COUNT(training_type_id) as unmapped_activities,
|
|
COUNT(DISTINCT activity_type) as unique_activity_types,
|
|
COUNT(DISTINCT CASE WHEN training_type_id IS NULL THEN activity_type END) as unmapped_types
|
|
FROM activity_log
|
|
""")
|
|
stats = r2d(cur.fetchone())
|
|
|
|
return stats
|