- Introduced a new endpoint for validating CSV templates without saving, allowing users to check field mappings and type conversions. - Updated the `create_system_template` and `update_system_template` functions to include validation reports in responses. - Enhanced error handling in CSV import processes by integrating `enrich_row_error` for more informative error messages. - Improved the AdminCsvTemplateEditorPage to support format checking and display validation results, enhancing user experience. - Incremented version numbers for `csv_import` and `admin_csv_templates` to reflect these updates.
431 lines
15 KiB
Python
431 lines
15 KiB
Python
"""
|
|
Admin: System-CSV-Templates (csv_field_mappings, is_system=true) pflegen (Issue #21).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
|
|
from pydantic import BaseModel, Field
|
|
from psycopg2.extras import Json
|
|
|
|
from auth import require_admin
|
|
from db import get_db, get_cursor, r2d
|
|
from csv_parser.core import (
|
|
column_signature,
|
|
decode_raw_bytes,
|
|
get_csv_import_limits,
|
|
headers_signature_rank_metrics,
|
|
normalize_header_for_signature,
|
|
parse_csv_sample,
|
|
)
|
|
from csv_parser.mapping_suggest import build_type_conversions_for_mapping, suggest_field_mappings
|
|
from csv_parser.import_row_processing import (
|
|
validate_import_row_processing as validate_import_row_processing_spec,
|
|
)
|
|
from csv_parser.module_registry import get_module_definition
|
|
from csv_parser.template_validator import validate_csv_template
|
|
|
|
router = APIRouter(prefix="/api/admin/csv-templates", tags=["admin", "csv-import"])
|
|
|
|
|
|
class CsvSystemTemplateCreate(BaseModel):
|
|
module: str
|
|
mapping_name: str = Field(..., min_length=1, max_length=100)
|
|
description: Optional[str] = None
|
|
column_signature: List[str] = Field(default_factory=list)
|
|
delimiter: str = ","
|
|
encoding: str = "utf-8"
|
|
has_header: bool = True
|
|
field_mappings: dict = Field(default_factory=dict)
|
|
type_conversions: Optional[dict] = None
|
|
import_row_processing: Optional[dict] = None
|
|
|
|
|
|
class CsvSystemTemplateUpdate(BaseModel):
|
|
mapping_name: Optional[str] = Field(default=None, min_length=1, max_length=100)
|
|
description: Optional[str] = None
|
|
column_signature: Optional[List[str]] = None
|
|
delimiter: Optional[str] = None
|
|
encoding: Optional[str] = None
|
|
has_header: Optional[bool] = None
|
|
field_mappings: Optional[dict] = None
|
|
type_conversions: Optional[dict] = None
|
|
import_row_processing: Optional[dict] = None
|
|
|
|
|
|
class CsvImportLimitsBody(BaseModel):
|
|
max_rows_per_file: int = Field(..., ge=100, le=2_000_000)
|
|
max_file_bytes: int = Field(..., ge=10_000, le=2_147_483_648)
|
|
|
|
|
|
class CsvTemplateValidateBody(BaseModel):
|
|
"""Formatprüfung ohne Speichern (field_mappings + type_conversions + optional row_processing)."""
|
|
|
|
module: str
|
|
field_mappings: dict = Field(default_factory=dict)
|
|
type_conversions: Optional[dict] = None
|
|
import_row_processing: Optional[dict] = None
|
|
column_signature: Optional[List[str]] = None
|
|
|
|
|
|
def _row_full(m: dict) -> dict:
|
|
return {
|
|
"id": m["id"],
|
|
"module": m["module"],
|
|
"mapping_name": m["mapping_name"],
|
|
"description": m.get("description"),
|
|
"column_signature": list(m["column_signature"]) if m.get("column_signature") else [],
|
|
"delimiter": m["delimiter"],
|
|
"encoding": m["encoding"],
|
|
"has_header": m["has_header"],
|
|
"field_mappings": m["field_mappings"],
|
|
"type_conversions": m.get("type_conversions"),
|
|
"import_row_processing": m.get("import_row_processing"),
|
|
"usage_count": m.get("usage_count"),
|
|
"success_rate": m.get("success_rate"),
|
|
"last_used_at": m.get("last_used_at"),
|
|
"created_at": m.get("created_at"),
|
|
"updated_at": m.get("updated_at"),
|
|
"is_system": m["is_system"],
|
|
}
|
|
|
|
|
|
@router.get("/import-limits")
|
|
def admin_get_csv_import_limits(session: dict = Depends(require_admin)):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT value FROM system_config WHERE key = %s", ("csv_import",))
|
|
row = cur.fetchone()
|
|
return get_csv_import_limits(r2d(row) if row else None)
|
|
|
|
|
|
@router.put("/import-limits")
|
|
def admin_put_csv_import_limits(body: CsvImportLimitsBody, session: dict = Depends(require_admin)):
|
|
payload = {"max_rows_per_file": body.max_rows_per_file, "max_file_bytes": body.max_file_bytes}
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO system_config (key, value, updated_at)
|
|
VALUES ('csv_import', %s, CURRENT_TIMESTAMP)
|
|
ON CONFLICT (key) DO UPDATE
|
|
SET value = EXCLUDED.value, updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(Json(payload),),
|
|
)
|
|
return payload
|
|
|
|
|
|
@router.get("")
|
|
def list_system_templates(
|
|
module: Optional[str] = None,
|
|
session: dict = Depends(require_admin),
|
|
):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM csv_field_mappings
|
|
WHERE is_system = true AND profile_id IS NULL
|
|
AND (%s::text IS NULL OR module = %s)
|
|
ORDER BY module, mapping_name
|
|
""",
|
|
(module, module),
|
|
)
|
|
rows = [r2d(r) for r in cur.fetchall()]
|
|
return {"templates": [_row_full(m) for m in rows]}
|
|
|
|
|
|
@router.post("/analyze-upload")
|
|
async def admin_analyze_csv_for_template(
|
|
file: UploadFile = File(...),
|
|
module: str = Form(...),
|
|
delimiter: Optional[str] = Form(default=None),
|
|
seed_template_id: Optional[int] = Form(default=None),
|
|
session: dict = Depends(require_admin),
|
|
):
|
|
"""
|
|
CSV hochladen wie im Nutzer-Import: Spalten + Vorschau + Vorschläge für field_mappings
|
|
und type_conversions. Optional Seed-Vorlage (ID) oder beste Systemvorlage (Abdeckung, dann Jaccard).
|
|
"""
|
|
_ = session
|
|
if not get_module_definition(module):
|
|
raise HTTPException(400, f"Unbekanntes Modul: {module}")
|
|
|
|
raw = await file.read()
|
|
limits = _admin_csv_limits()
|
|
max_bytes = limits.get("max_file_bytes", 52_428_800)
|
|
if len(raw) > max_bytes:
|
|
raise HTTPException(
|
|
413,
|
|
f"Datei zu groß (max. {max_bytes} Bytes laut Systemkonfiguration)",
|
|
)
|
|
|
|
text = decode_raw_bytes(raw)
|
|
if not text.strip():
|
|
raise HTTPException(400, "Leere Datei")
|
|
|
|
max_rows = limits.get("max_rows_per_file", 50_000)
|
|
if text.count("\n") > max_rows + 5:
|
|
raise HTTPException(
|
|
413,
|
|
f"Zu viele Zeilen (>{max_rows}) laut Systemkonfiguration",
|
|
)
|
|
|
|
delim = delimiter if delimiter in (",", ";", "\t") else None
|
|
headers, sample_rows, used_delim = parse_csv_sample(text, delimiter=delim, max_data_rows=5)
|
|
if not headers:
|
|
raise HTTPException(400, "Keine Kopfzeile oder leeres CSV")
|
|
|
|
sig = column_signature(headers)
|
|
|
|
seed_row: dict | None = None
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
if seed_template_id is not None:
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM csv_field_mappings
|
|
WHERE id = %s AND is_system = true AND profile_id IS NULL AND module = %s
|
|
""",
|
|
(seed_template_id, module),
|
|
)
|
|
seed_row = r2d(cur.fetchone())
|
|
if not seed_row:
|
|
raise HTTPException(404, "Seed-Vorlage nicht gefunden oder falsches Modul")
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM csv_field_mappings
|
|
WHERE is_system = true AND profile_id IS NULL AND module = %s
|
|
""",
|
|
(module,),
|
|
)
|
|
rows = [r2d(r) for r in cur.fetchall()]
|
|
best: dict | None = None
|
|
best_key: tuple[float, int, float] = (-1.0, -1, -1.0)
|
|
for t in rows:
|
|
t_sig = list(t.get("column_signature") or [])
|
|
m = headers_signature_rank_metrics(sig, t_sig)
|
|
key = (m["confidence"], m["columns_matched"], m["jaccard"])
|
|
if key > best_key:
|
|
best_key = key
|
|
best = t
|
|
if best and best_key[0] > 0:
|
|
seed_row = best
|
|
|
|
seed_fm = (seed_row or {}).get("field_mappings") or {}
|
|
if isinstance(seed_fm, str):
|
|
seed_fm = {}
|
|
seed_tc = (seed_row or {}).get("type_conversions")
|
|
if not isinstance(seed_tc, dict):
|
|
seed_tc = {}
|
|
|
|
field_mappings = suggest_field_mappings(headers, module, seed_fm if seed_fm else None)
|
|
type_conversions = build_type_conversions_for_mapping(module, field_mappings, seed_tc if seed_tc else None)
|
|
|
|
seed_meta = None
|
|
if seed_row:
|
|
t_sig = list(seed_row.get("column_signature") or [])
|
|
sm = headers_signature_rank_metrics(sig, t_sig)
|
|
seed_meta = {
|
|
"id": seed_row["id"],
|
|
"mapping_name": seed_row.get("mapping_name"),
|
|
"confidence": sm["confidence"],
|
|
"template_recall": sm["template_recall"],
|
|
"jaccard": sm["jaccard"],
|
|
"columns_matched": sm["columns_matched"],
|
|
"columns_in_template": sm["columns_in_template"],
|
|
"columns_in_csv": sm["columns_in_csv"],
|
|
}
|
|
|
|
return {
|
|
"filename": file.filename,
|
|
"module": module,
|
|
"delimiter": used_delim,
|
|
"encoding": "utf-8",
|
|
"columns": headers,
|
|
"column_signature_normalized": sig,
|
|
"sample_rows": sample_rows,
|
|
"seed_template": seed_meta,
|
|
"field_mappings": field_mappings,
|
|
"type_conversions": type_conversions,
|
|
}
|
|
|
|
|
|
def _admin_csv_limits() -> dict[str, int]:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT value FROM system_config WHERE key = %s", ("csv_import",))
|
|
row = cur.fetchone()
|
|
return get_csv_import_limits(r2d(row) if row else None)
|
|
|
|
|
|
@router.post("/validate")
|
|
def validate_system_template_dry_run(body: CsvTemplateValidateBody, session: dict = Depends(require_admin)):
|
|
"""
|
|
Validatorlauf für eine Vorlagen-Konfiguration (ohne DB-Schreiben).
|
|
Nutzbar aus dem Admin-Editor vor dem Speichern.
|
|
"""
|
|
if not get_module_definition(body.module):
|
|
raise HTTPException(400, f"Unbekanntes Modul: {body.module}")
|
|
return validate_csv_template(
|
|
body.module,
|
|
body.field_mappings,
|
|
body.type_conversions,
|
|
body.import_row_processing,
|
|
body.column_signature,
|
|
)
|
|
|
|
|
|
@router.get("/{template_id}")
|
|
def get_system_template(template_id: int, session: dict = Depends(require_admin)):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT * FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL",
|
|
(template_id,),
|
|
)
|
|
m = r2d(cur.fetchone())
|
|
if not m:
|
|
raise HTTPException(404, "System-Template nicht gefunden")
|
|
return _row_full(m)
|
|
|
|
|
|
@router.post("")
|
|
def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depends(require_admin)):
|
|
if not get_module_definition(body.module):
|
|
raise HTTPException(400, f"Unbekanntes Modul: {body.module}")
|
|
report = validate_csv_template(
|
|
body.module,
|
|
body.field_mappings,
|
|
body.type_conversions,
|
|
body.import_row_processing,
|
|
body.column_signature,
|
|
)
|
|
if not report["valid"]:
|
|
raise HTTPException(status_code=422, detail=report)
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO csv_field_mappings (
|
|
profile_id, is_system, module, mapping_name, description,
|
|
column_signature, delimiter, encoding, has_header,
|
|
field_mappings, type_conversions, import_row_processing
|
|
) VALUES (
|
|
NULL, true, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
|
|
) RETURNING id
|
|
""",
|
|
(
|
|
body.module,
|
|
body.mapping_name,
|
|
body.description,
|
|
body.column_signature,
|
|
body.delimiter,
|
|
body.encoding,
|
|
body.has_header,
|
|
Json(body.field_mappings),
|
|
Json(body.type_conversions) if body.type_conversions is not None else None,
|
|
Json(body.import_row_processing) if body.import_row_processing is not None else None,
|
|
),
|
|
)
|
|
new_id = cur.fetchone()["id"]
|
|
return {"id": new_id, "validation": report}
|
|
|
|
|
|
@router.put("/{template_id}")
|
|
def update_system_template(
|
|
template_id: int,
|
|
body: CsvSystemTemplateUpdate,
|
|
session: dict = Depends(require_admin),
|
|
):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT * FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL",
|
|
(template_id,),
|
|
)
|
|
existing = r2d(cur.fetchone())
|
|
if not existing:
|
|
raise HTTPException(404, "System-Template nicht gefunden")
|
|
|
|
patch: dict[str, Any] = body.model_dump(exclude_unset=True)
|
|
if not patch:
|
|
return _row_full(existing)
|
|
|
|
fm = patch.get("field_mappings", existing["field_mappings"])
|
|
tc_eff = patch.get("type_conversions", existing.get("type_conversions"))
|
|
irp_eff = patch.get("import_row_processing", existing.get("import_row_processing"))
|
|
col_eff = patch.get("column_signature", existing.get("column_signature"))
|
|
|
|
report = validate_csv_template(
|
|
existing["module"],
|
|
fm,
|
|
tc_eff,
|
|
irp_eff,
|
|
col_eff if isinstance(col_eff, list) else None,
|
|
)
|
|
if not report["valid"]:
|
|
raise HTTPException(status_code=422, detail=report)
|
|
|
|
fields_sql = []
|
|
vals: list = []
|
|
if "mapping_name" in patch:
|
|
fields_sql.append("mapping_name = %s")
|
|
vals.append(patch["mapping_name"])
|
|
if "description" in patch:
|
|
fields_sql.append("description = %s")
|
|
vals.append(patch["description"])
|
|
if "column_signature" in patch:
|
|
fields_sql.append("column_signature = %s")
|
|
vals.append(patch["column_signature"])
|
|
if "delimiter" in patch:
|
|
fields_sql.append("delimiter = %s")
|
|
vals.append(patch["delimiter"])
|
|
if "encoding" in patch:
|
|
fields_sql.append("encoding = %s")
|
|
vals.append(patch["encoding"])
|
|
if "has_header" in patch:
|
|
fields_sql.append("has_header = %s")
|
|
vals.append(patch["has_header"])
|
|
if "field_mappings" in patch:
|
|
fields_sql.append("field_mappings = %s")
|
|
vals.append(Json(patch["field_mappings"]))
|
|
if "type_conversions" in patch:
|
|
fields_sql.append("type_conversions = %s")
|
|
tc = patch["type_conversions"]
|
|
vals.append(Json(tc) if tc is not None else None)
|
|
if "import_row_processing" in patch:
|
|
irp = patch["import_row_processing"]
|
|
fields_sql.append("import_row_processing = %s")
|
|
vals.append(Json(irp) if irp is not None else None)
|
|
|
|
fields_sql.append("updated_at = CURRENT_TIMESTAMP")
|
|
vals.append(template_id)
|
|
|
|
cur.execute(
|
|
f"UPDATE csv_field_mappings SET {', '.join(fields_sql)} WHERE id = %s",
|
|
tuple(vals),
|
|
)
|
|
|
|
cur.execute("SELECT * FROM csv_field_mappings WHERE id = %s", (template_id,))
|
|
m = r2d(cur.fetchone())
|
|
return {**_row_full(m), "validation": report}
|
|
|
|
|
|
@router.delete("/{template_id}")
|
|
def delete_system_template(template_id: int, session: dict = Depends(require_admin)):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"DELETE FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL RETURNING id",
|
|
(template_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "System-Template nicht gefunden")
|
|
return {"deleted": template_id}
|