mitai-jinkendo/backend/routers/admin_csv_templates.py
Lars 4a771f6a83
All checks were successful
Deploy Development / deploy (push) Successful in 50s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 15s
feat(csv-parser): Implement CSV import functionality with mapping and type conversion
- Added permissions for editing and deleting CSV field mappings.
- Created type converter for CSV cells to handle various data types.
- Implemented database migrations for CSV field mappings and import logs.
- Seeded initial system templates for nutrition and activity data imports.
- Developed admin endpoints for managing system CSV templates.
- Introduced user endpoints for CSV import analysis and mapping retrieval.
- Added tests for core CSV parser functionalities, including delimiter detection and value conversion.
2026-04-09 21:37:19 +02:00

246 lines
8.5 KiB
Python

"""
Admin: System-CSV-Templates (csv_field_mappings, is_system=true) pflegen (Issue #21).
"""
from __future__ import annotations
from typing import Any, List, Optional
from fastapi import APIRouter, Depends, HTTPException
from pydantic import BaseModel, Field
from psycopg2.extras import Json
from auth import require_admin
from db import get_db, get_cursor, r2d
from csv_parser.core import get_csv_import_limits
from csv_parser.module_registry import get_module_definition, validate_field_mappings
router = APIRouter(prefix="/api/admin/csv-templates", tags=["admin", "csv-import"])
class CsvSystemTemplateCreate(BaseModel):
module: str
mapping_name: str = Field(..., min_length=1, max_length=100)
description: Optional[str] = None
column_signature: List[str] = Field(default_factory=list)
delimiter: str = ","
encoding: str = "utf-8"
has_header: bool = True
field_mappings: dict = Field(default_factory=dict)
type_conversions: Optional[dict] = None
class CsvSystemTemplateUpdate(BaseModel):
mapping_name: Optional[str] = Field(default=None, min_length=1, max_length=100)
description: Optional[str] = None
column_signature: Optional[List[str]] = None
delimiter: Optional[str] = None
encoding: Optional[str] = None
has_header: Optional[bool] = None
field_mappings: Optional[dict] = None
type_conversions: Optional[dict] = None
class CsvImportLimitsBody(BaseModel):
max_rows_per_file: int = Field(..., ge=100, le=2_000_000)
max_file_bytes: int = Field(..., ge=10_000, le=2_147_483_648)
def _row_full(m: dict) -> dict:
return {
"id": m["id"],
"module": m["module"],
"mapping_name": m["mapping_name"],
"description": m.get("description"),
"column_signature": list(m["column_signature"]) if m.get("column_signature") else [],
"delimiter": m["delimiter"],
"encoding": m["encoding"],
"has_header": m["has_header"],
"field_mappings": m["field_mappings"],
"type_conversions": m.get("type_conversions"),
"usage_count": m.get("usage_count"),
"success_rate": m.get("success_rate"),
"last_used_at": m.get("last_used_at"),
"created_at": m.get("created_at"),
"updated_at": m.get("updated_at"),
"is_system": m["is_system"],
}
@router.get("/import-limits")
def admin_get_csv_import_limits(session: dict = Depends(require_admin)):
with get_db() as conn:
cur = get_cursor(conn)
cur.execute("SELECT value FROM system_config WHERE key = %s", ("csv_import",))
row = cur.fetchone()
return get_csv_import_limits(r2d(row) if row else None)
@router.put("/import-limits")
def admin_put_csv_import_limits(body: CsvImportLimitsBody, session: dict = Depends(require_admin)):
payload = {"max_rows_per_file": body.max_rows_per_file, "max_file_bytes": body.max_file_bytes}
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
INSERT INTO system_config (key, value, updated_at)
VALUES ('csv_import', %s, CURRENT_TIMESTAMP)
ON CONFLICT (key) DO UPDATE
SET value = EXCLUDED.value, updated_at = CURRENT_TIMESTAMP
""",
(Json(payload),),
)
return payload
@router.get("")
def list_system_templates(
module: Optional[str] = None,
session: dict = Depends(require_admin),
):
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
SELECT * FROM csv_field_mappings
WHERE is_system = true AND profile_id IS NULL
AND (%s::text IS NULL OR module = %s)
ORDER BY module, mapping_name
""",
(module, module),
)
rows = [r2d(r) for r in cur.fetchall()]
return {"templates": [_row_full(m) for m in rows]}
@router.get("/{template_id}")
def get_system_template(template_id: int, session: dict = Depends(require_admin)):
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT * FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL",
(template_id,),
)
m = r2d(cur.fetchone())
if not m:
raise HTTPException(404, "System-Template nicht gefunden")
return _row_full(m)
@router.post("")
def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depends(require_admin)):
if not get_module_definition(body.module):
raise HTTPException(400, f"Unbekanntes Modul: {body.module}")
try:
validate_field_mappings(body.module, body.field_mappings)
except ValueError as e:
raise HTTPException(400, str(e))
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"""
INSERT INTO csv_field_mappings (
profile_id, is_system, module, mapping_name, description,
column_signature, delimiter, encoding, has_header,
field_mappings, type_conversions
) VALUES (
NULL, true, %s, %s, %s, %s, %s, %s, %s, %s, %s
) RETURNING id
""",
(
body.module,
body.mapping_name,
body.description,
body.column_signature,
body.delimiter,
body.encoding,
body.has_header,
Json(body.field_mappings),
Json(body.type_conversions) if body.type_conversions is not None else None,
),
)
new_id = cur.fetchone()["id"]
return {"id": new_id}
@router.put("/{template_id}")
def update_system_template(
template_id: int,
body: CsvSystemTemplateUpdate,
session: dict = Depends(require_admin),
):
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"SELECT * FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL",
(template_id,),
)
existing = r2d(cur.fetchone())
if not existing:
raise HTTPException(404, "System-Template nicht gefunden")
patch: dict[str, Any] = body.model_dump(exclude_unset=True)
if not patch:
return _row_full(existing)
fm = patch.get("field_mappings", existing["field_mappings"])
if "field_mappings" in patch:
try:
validate_field_mappings(existing["module"], fm)
except ValueError as e:
raise HTTPException(400, str(e))
fields_sql = []
vals: list = []
if "mapping_name" in patch:
fields_sql.append("mapping_name = %s")
vals.append(patch["mapping_name"])
if "description" in patch:
fields_sql.append("description = %s")
vals.append(patch["description"])
if "column_signature" in patch:
fields_sql.append("column_signature = %s")
vals.append(patch["column_signature"])
if "delimiter" in patch:
fields_sql.append("delimiter = %s")
vals.append(patch["delimiter"])
if "encoding" in patch:
fields_sql.append("encoding = %s")
vals.append(patch["encoding"])
if "has_header" in patch:
fields_sql.append("has_header = %s")
vals.append(patch["has_header"])
if "field_mappings" in patch:
fields_sql.append("field_mappings = %s")
vals.append(Json(patch["field_mappings"]))
if "type_conversions" in patch:
fields_sql.append("type_conversions = %s")
tc = patch["type_conversions"]
vals.append(Json(tc) if tc is not None else None)
fields_sql.append("updated_at = CURRENT_TIMESTAMP")
vals.append(template_id)
cur.execute(
f"UPDATE csv_field_mappings SET {', '.join(fields_sql)} WHERE id = %s",
tuple(vals),
)
cur.execute("SELECT * FROM csv_field_mappings WHERE id = %s", (template_id,))
m = r2d(cur.fetchone())
return _row_full(m)
@router.delete("/{template_id}")
def delete_system_template(template_id: int, session: dict = Depends(require_admin)):
with get_db() as conn:
cur = get_cursor(conn)
cur.execute(
"DELETE FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL RETURNING id",
(template_id,),
)
row = cur.fetchone()
if not row:
raise HTTPException(404, "System-Template nicht gefunden")
return {"deleted": template_id}