""" Admin: System-CSV-Templates (csv_field_mappings, is_system=true) pflegen (Issue #21). """ from __future__ import annotations from typing import Any, List, Optional from fastapi import APIRouter, Depends, HTTPException from pydantic import BaseModel, Field from psycopg2.extras import Json from auth import require_admin from db import get_db, get_cursor, r2d from csv_parser.core import get_csv_import_limits from csv_parser.module_registry import get_module_definition, validate_field_mappings router = APIRouter(prefix="/api/admin/csv-templates", tags=["admin", "csv-import"]) class CsvSystemTemplateCreate(BaseModel): module: str mapping_name: str = Field(..., min_length=1, max_length=100) description: Optional[str] = None column_signature: List[str] = Field(default_factory=list) delimiter: str = "," encoding: str = "utf-8" has_header: bool = True field_mappings: dict = Field(default_factory=dict) type_conversions: Optional[dict] = None class CsvSystemTemplateUpdate(BaseModel): mapping_name: Optional[str] = Field(default=None, min_length=1, max_length=100) description: Optional[str] = None column_signature: Optional[List[str]] = None delimiter: Optional[str] = None encoding: Optional[str] = None has_header: Optional[bool] = None field_mappings: Optional[dict] = None type_conversions: Optional[dict] = None class CsvImportLimitsBody(BaseModel): max_rows_per_file: int = Field(..., ge=100, le=2_000_000) max_file_bytes: int = Field(..., ge=10_000, le=2_147_483_648) def _row_full(m: dict) -> dict: return { "id": m["id"], "module": m["module"], "mapping_name": m["mapping_name"], "description": m.get("description"), "column_signature": list(m["column_signature"]) if m.get("column_signature") else [], "delimiter": m["delimiter"], "encoding": m["encoding"], "has_header": m["has_header"], "field_mappings": m["field_mappings"], "type_conversions": m.get("type_conversions"), "usage_count": m.get("usage_count"), "success_rate": m.get("success_rate"), "last_used_at": m.get("last_used_at"), "created_at": m.get("created_at"), "updated_at": m.get("updated_at"), "is_system": m["is_system"], } @router.get("/import-limits") def admin_get_csv_import_limits(session: dict = Depends(require_admin)): with get_db() as conn: cur = get_cursor(conn) cur.execute("SELECT value FROM system_config WHERE key = %s", ("csv_import",)) row = cur.fetchone() return get_csv_import_limits(r2d(row) if row else None) @router.put("/import-limits") def admin_put_csv_import_limits(body: CsvImportLimitsBody, session: dict = Depends(require_admin)): payload = {"max_rows_per_file": body.max_rows_per_file, "max_file_bytes": body.max_file_bytes} with get_db() as conn: cur = get_cursor(conn) cur.execute( """ INSERT INTO system_config (key, value, updated_at) VALUES ('csv_import', %s, CURRENT_TIMESTAMP) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value, updated_at = CURRENT_TIMESTAMP """, (Json(payload),), ) return payload @router.get("") def list_system_templates( module: Optional[str] = None, session: dict = Depends(require_admin), ): with get_db() as conn: cur = get_cursor(conn) cur.execute( """ SELECT * FROM csv_field_mappings WHERE is_system = true AND profile_id IS NULL AND (%s::text IS NULL OR module = %s) ORDER BY module, mapping_name """, (module, module), ) rows = [r2d(r) for r in cur.fetchall()] return {"templates": [_row_full(m) for m in rows]} @router.get("/{template_id}") def get_system_template(template_id: int, session: dict = Depends(require_admin)): with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT * FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL", (template_id,), ) m = r2d(cur.fetchone()) if not m: raise HTTPException(404, "System-Template nicht gefunden") return _row_full(m) @router.post("") def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depends(require_admin)): if not get_module_definition(body.module): raise HTTPException(400, f"Unbekanntes Modul: {body.module}") try: validate_field_mappings(body.module, body.field_mappings) except ValueError as e: raise HTTPException(400, str(e)) with get_db() as conn: cur = get_cursor(conn) cur.execute( """ INSERT INTO csv_field_mappings ( profile_id, is_system, module, mapping_name, description, column_signature, delimiter, encoding, has_header, field_mappings, type_conversions ) VALUES ( NULL, true, %s, %s, %s, %s, %s, %s, %s, %s, %s ) RETURNING id """, ( body.module, body.mapping_name, body.description, body.column_signature, body.delimiter, body.encoding, body.has_header, Json(body.field_mappings), Json(body.type_conversions) if body.type_conversions is not None else None, ), ) new_id = cur.fetchone()["id"] return {"id": new_id} @router.put("/{template_id}") def update_system_template( template_id: int, body: CsvSystemTemplateUpdate, session: dict = Depends(require_admin), ): with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT * FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL", (template_id,), ) existing = r2d(cur.fetchone()) if not existing: raise HTTPException(404, "System-Template nicht gefunden") patch: dict[str, Any] = body.model_dump(exclude_unset=True) if not patch: return _row_full(existing) fm = patch.get("field_mappings", existing["field_mappings"]) if "field_mappings" in patch: try: validate_field_mappings(existing["module"], fm) except ValueError as e: raise HTTPException(400, str(e)) fields_sql = [] vals: list = [] if "mapping_name" in patch: fields_sql.append("mapping_name = %s") vals.append(patch["mapping_name"]) if "description" in patch: fields_sql.append("description = %s") vals.append(patch["description"]) if "column_signature" in patch: fields_sql.append("column_signature = %s") vals.append(patch["column_signature"]) if "delimiter" in patch: fields_sql.append("delimiter = %s") vals.append(patch["delimiter"]) if "encoding" in patch: fields_sql.append("encoding = %s") vals.append(patch["encoding"]) if "has_header" in patch: fields_sql.append("has_header = %s") vals.append(patch["has_header"]) if "field_mappings" in patch: fields_sql.append("field_mappings = %s") vals.append(Json(patch["field_mappings"])) if "type_conversions" in patch: fields_sql.append("type_conversions = %s") tc = patch["type_conversions"] vals.append(Json(tc) if tc is not None else None) fields_sql.append("updated_at = CURRENT_TIMESTAMP") vals.append(template_id) cur.execute( f"UPDATE csv_field_mappings SET {', '.join(fields_sql)} WHERE id = %s", tuple(vals), ) cur.execute("SELECT * FROM csv_field_mappings WHERE id = %s", (template_id,)) m = r2d(cur.fetchone()) return _row_full(m) @router.delete("/{template_id}") def delete_system_template(template_id: int, session: dict = Depends(require_admin)): with get_db() as conn: cur = get_cursor(conn) cur.execute( "DELETE FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL RETURNING id", (template_id,), ) row = cur.fetchone() if not row: raise HTTPException(404, "System-Template nicht gefunden") return {"deleted": template_id}