- Updated the CSV import logic to merge active training parameters with static fields for the activity module, improving field mapping accuracy. - Enhanced validation functions to incorporate dynamic field definitions based on active training parameters, ensuring better data integrity during imports. - Refactored related functions to streamline the process of handling CSV templates and field mappings, improving maintainability and clarity. - Added new utility functions for resolving activity log column patches and upserting session metrics from CSV, enhancing the overall import functionality.
454 lines
16 KiB
Python
454 lines
16 KiB
Python
"""
|
|
Admin: System-CSV-Templates (csv_field_mappings, is_system=true) pflegen (Issue #21).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, List, Optional
|
|
|
|
from fastapi import APIRouter, Depends, File, Form, HTTPException, UploadFile
|
|
from pydantic import BaseModel, Field
|
|
from psycopg2.extras import Json
|
|
|
|
from auth import require_admin
|
|
from db import get_db, get_cursor, r2d
|
|
from csv_parser.core import (
|
|
column_signature,
|
|
decode_raw_bytes,
|
|
get_csv_import_limits,
|
|
headers_signature_rank_metrics,
|
|
normalize_header_for_signature,
|
|
parse_csv_sample,
|
|
)
|
|
from csv_parser.mapping_suggest import build_type_conversions_for_mapping, suggest_field_mappings
|
|
from csv_parser.import_row_processing import (
|
|
validate_import_row_processing as validate_import_row_processing_spec,
|
|
)
|
|
from csv_parser.module_registry import get_module_definition
|
|
from csv_parser.template_validator import validate_csv_template
|
|
from data_layer.activity_persistence_orchestrator import merge_activity_csv_module_fields
|
|
|
|
router = APIRouter(prefix="/api/admin/csv-templates", tags=["admin", "csv-import"])
|
|
|
|
|
|
class CsvSystemTemplateCreate(BaseModel):
|
|
module: str
|
|
mapping_name: str = Field(..., min_length=1, max_length=100)
|
|
description: Optional[str] = None
|
|
column_signature: List[str] = Field(default_factory=list)
|
|
delimiter: str = ","
|
|
encoding: str = "utf-8"
|
|
has_header: bool = True
|
|
field_mappings: dict = Field(default_factory=dict)
|
|
type_conversions: Optional[dict] = None
|
|
import_row_processing: Optional[dict] = None
|
|
|
|
|
|
class CsvSystemTemplateUpdate(BaseModel):
|
|
mapping_name: Optional[str] = Field(default=None, min_length=1, max_length=100)
|
|
description: Optional[str] = None
|
|
column_signature: Optional[List[str]] = None
|
|
delimiter: Optional[str] = None
|
|
encoding: Optional[str] = None
|
|
has_header: Optional[bool] = None
|
|
field_mappings: Optional[dict] = None
|
|
type_conversions: Optional[dict] = None
|
|
import_row_processing: Optional[dict] = None
|
|
|
|
|
|
class CsvImportLimitsBody(BaseModel):
|
|
max_rows_per_file: int = Field(..., ge=100, le=2_000_000)
|
|
max_file_bytes: int = Field(..., ge=10_000, le=2_147_483_648)
|
|
|
|
|
|
class CsvTemplateValidateBody(BaseModel):
|
|
"""Formatprüfung ohne Speichern (field_mappings + type_conversions + optional row_processing)."""
|
|
|
|
module: str
|
|
field_mappings: dict = Field(default_factory=dict)
|
|
type_conversions: Optional[dict] = None
|
|
import_row_processing: Optional[dict] = None
|
|
column_signature: Optional[List[str]] = None
|
|
|
|
|
|
def _row_full(m: dict) -> dict:
|
|
return {
|
|
"id": m["id"],
|
|
"module": m["module"],
|
|
"mapping_name": m["mapping_name"],
|
|
"description": m.get("description"),
|
|
"column_signature": list(m["column_signature"]) if m.get("column_signature") else [],
|
|
"delimiter": m["delimiter"],
|
|
"encoding": m["encoding"],
|
|
"has_header": m["has_header"],
|
|
"field_mappings": m["field_mappings"],
|
|
"type_conversions": m.get("type_conversions"),
|
|
"import_row_processing": m.get("import_row_processing"),
|
|
"usage_count": m.get("usage_count"),
|
|
"success_rate": m.get("success_rate"),
|
|
"last_used_at": m.get("last_used_at"),
|
|
"created_at": m.get("created_at"),
|
|
"updated_at": m.get("updated_at"),
|
|
"is_system": m["is_system"],
|
|
}
|
|
|
|
|
|
@router.get("/import-limits")
|
|
def admin_get_csv_import_limits(session: dict = Depends(require_admin)):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT value FROM system_config WHERE key = %s", ("csv_import",))
|
|
row = cur.fetchone()
|
|
return get_csv_import_limits(r2d(row) if row else None)
|
|
|
|
|
|
@router.put("/import-limits")
|
|
def admin_put_csv_import_limits(body: CsvImportLimitsBody, session: dict = Depends(require_admin)):
|
|
payload = {"max_rows_per_file": body.max_rows_per_file, "max_file_bytes": body.max_file_bytes}
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO system_config (key, value, updated_at)
|
|
VALUES ('csv_import', %s, CURRENT_TIMESTAMP)
|
|
ON CONFLICT (key) DO UPDATE
|
|
SET value = EXCLUDED.value, updated_at = CURRENT_TIMESTAMP
|
|
""",
|
|
(Json(payload),),
|
|
)
|
|
return payload
|
|
|
|
|
|
@router.get("")
|
|
def list_system_templates(
|
|
module: Optional[str] = None,
|
|
session: dict = Depends(require_admin),
|
|
):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM csv_field_mappings
|
|
WHERE is_system = true AND profile_id IS NULL
|
|
AND (%s::text IS NULL OR module = %s)
|
|
ORDER BY module, mapping_name
|
|
""",
|
|
(module, module),
|
|
)
|
|
rows = [r2d(r) for r in cur.fetchall()]
|
|
return {"templates": [_row_full(m) for m in rows]}
|
|
|
|
|
|
@router.post("/analyze-upload")
|
|
async def admin_analyze_csv_for_template(
|
|
file: UploadFile = File(...),
|
|
module: str = Form(...),
|
|
delimiter: Optional[str] = Form(default=None),
|
|
seed_template_id: Optional[int] = Form(default=None),
|
|
session: dict = Depends(require_admin),
|
|
):
|
|
"""
|
|
CSV hochladen wie im Nutzer-Import: Spalten + Vorschau + Vorschläge für field_mappings
|
|
und type_conversions. Optional Seed-Vorlage (ID) oder beste Systemvorlage (Abdeckung, dann Jaccard).
|
|
"""
|
|
_ = session
|
|
if not get_module_definition(module):
|
|
raise HTTPException(400, f"Unbekanntes Modul: {module}")
|
|
|
|
raw = await file.read()
|
|
limits = _admin_csv_limits()
|
|
max_bytes = limits.get("max_file_bytes", 52_428_800)
|
|
if len(raw) > max_bytes:
|
|
raise HTTPException(
|
|
413,
|
|
f"Datei zu groß (max. {max_bytes} Bytes laut Systemkonfiguration)",
|
|
)
|
|
|
|
text = decode_raw_bytes(raw)
|
|
if not text.strip():
|
|
raise HTTPException(400, "Leere Datei")
|
|
|
|
max_rows = limits.get("max_rows_per_file", 50_000)
|
|
if text.count("\n") > max_rows + 5:
|
|
raise HTTPException(
|
|
413,
|
|
f"Zu viele Zeilen (>{max_rows}) laut Systemkonfiguration",
|
|
)
|
|
|
|
delim = delimiter if delimiter in (",", ";", "\t") else None
|
|
headers, sample_rows, used_delim = parse_csv_sample(text, delimiter=delim, max_data_rows=5)
|
|
if not headers:
|
|
raise HTTPException(400, "Keine Kopfzeile oder leeres CSV")
|
|
|
|
sig = column_signature(headers)
|
|
|
|
seed_row: dict | None = None
|
|
field_mappings: dict = {}
|
|
type_conversions: dict = {}
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
if seed_template_id is not None:
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM csv_field_mappings
|
|
WHERE id = %s AND is_system = true AND profile_id IS NULL AND module = %s
|
|
""",
|
|
(seed_template_id, module),
|
|
)
|
|
seed_row = r2d(cur.fetchone())
|
|
if not seed_row:
|
|
raise HTTPException(404, "Seed-Vorlage nicht gefunden oder falsches Modul")
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM csv_field_mappings
|
|
WHERE is_system = true AND profile_id IS NULL AND module = %s
|
|
""",
|
|
(module,),
|
|
)
|
|
rows = [r2d(r) for r in cur.fetchall()]
|
|
best: dict | None = None
|
|
best_key: tuple[float, int, float] = (-1.0, -1, -1.0)
|
|
for t in rows:
|
|
t_sig = list(t.get("column_signature") or [])
|
|
m = headers_signature_rank_metrics(sig, t_sig)
|
|
key = (m["confidence"], m["columns_matched"], m["jaccard"])
|
|
if key > best_key:
|
|
best_key = key
|
|
best = t
|
|
if best and best_key[0] > 0:
|
|
seed_row = best
|
|
|
|
mod_def = get_module_definition(module) or {}
|
|
eff_fields = dict(mod_def.get("fields") or {})
|
|
if module == "activity":
|
|
eff_fields = merge_activity_csv_module_fields(cur, eff_fields)
|
|
|
|
seed_fm = (seed_row or {}).get("field_mappings") or {}
|
|
if isinstance(seed_fm, str):
|
|
seed_fm = {}
|
|
seed_tc = (seed_row or {}).get("type_conversions")
|
|
if not isinstance(seed_tc, dict):
|
|
seed_tc = {}
|
|
|
|
field_mappings = suggest_field_mappings(
|
|
headers,
|
|
module,
|
|
seed_fm if seed_fm else None,
|
|
effective_fields=eff_fields,
|
|
)
|
|
type_conversions = build_type_conversions_for_mapping(
|
|
module,
|
|
field_mappings,
|
|
seed_tc if seed_tc else None,
|
|
effective_fields=eff_fields,
|
|
)
|
|
|
|
seed_meta = None
|
|
if seed_row:
|
|
t_sig = list(seed_row.get("column_signature") or [])
|
|
sm = headers_signature_rank_metrics(sig, t_sig)
|
|
seed_meta = {
|
|
"id": seed_row["id"],
|
|
"mapping_name": seed_row.get("mapping_name"),
|
|
"confidence": sm["confidence"],
|
|
"template_recall": sm["template_recall"],
|
|
"jaccard": sm["jaccard"],
|
|
"columns_matched": sm["columns_matched"],
|
|
"columns_in_template": sm["columns_in_template"],
|
|
"columns_in_csv": sm["columns_in_csv"],
|
|
}
|
|
|
|
return {
|
|
"filename": file.filename,
|
|
"module": module,
|
|
"delimiter": used_delim,
|
|
"encoding": "utf-8",
|
|
"columns": headers,
|
|
"column_signature_normalized": sig,
|
|
"sample_rows": sample_rows,
|
|
"seed_template": seed_meta,
|
|
"field_mappings": field_mappings,
|
|
"type_conversions": type_conversions,
|
|
}
|
|
|
|
|
|
def _admin_csv_limits() -> dict[str, int]:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT value FROM system_config WHERE key = %s", ("csv_import",))
|
|
row = cur.fetchone()
|
|
return get_csv_import_limits(r2d(row) if row else None)
|
|
|
|
|
|
@router.post("/validate")
|
|
def validate_system_template_dry_run(body: CsvTemplateValidateBody, session: dict = Depends(require_admin)):
|
|
"""
|
|
Validatorlauf für eine Vorlagen-Konfiguration (ohne DB-Schreiben).
|
|
Nutzbar aus dem Admin-Editor vor dem Speichern.
|
|
"""
|
|
if not get_module_definition(body.module):
|
|
raise HTTPException(400, f"Unbekanntes Modul: {body.module}")
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
return validate_csv_template(
|
|
body.module,
|
|
body.field_mappings,
|
|
body.type_conversions,
|
|
body.import_row_processing,
|
|
body.column_signature,
|
|
cur=cur,
|
|
)
|
|
|
|
|
|
@router.get("/{template_id}")
|
|
def get_system_template(template_id: int, session: dict = Depends(require_admin)):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT * FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL",
|
|
(template_id,),
|
|
)
|
|
m = r2d(cur.fetchone())
|
|
if not m:
|
|
raise HTTPException(404, "System-Template nicht gefunden")
|
|
return _row_full(m)
|
|
|
|
|
|
@router.post("")
|
|
def create_system_template(body: CsvSystemTemplateCreate, session: dict = Depends(require_admin)):
|
|
if not get_module_definition(body.module):
|
|
raise HTTPException(400, f"Unbekanntes Modul: {body.module}")
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
report = validate_csv_template(
|
|
body.module,
|
|
body.field_mappings,
|
|
body.type_conversions,
|
|
body.import_row_processing,
|
|
body.column_signature,
|
|
cur=cur,
|
|
)
|
|
if not report["valid"]:
|
|
raise HTTPException(status_code=422, detail=report)
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO csv_field_mappings (
|
|
profile_id, is_system, module, mapping_name, description,
|
|
column_signature, delimiter, encoding, has_header,
|
|
field_mappings, type_conversions, import_row_processing
|
|
) VALUES (
|
|
NULL, true, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
|
|
) RETURNING id
|
|
""",
|
|
(
|
|
body.module,
|
|
body.mapping_name,
|
|
body.description,
|
|
body.column_signature,
|
|
body.delimiter,
|
|
body.encoding,
|
|
body.has_header,
|
|
Json(body.field_mappings),
|
|
Json(body.type_conversions) if body.type_conversions is not None else None,
|
|
Json(body.import_row_processing) if body.import_row_processing is not None else None,
|
|
),
|
|
)
|
|
new_id = cur.fetchone()["id"]
|
|
return {"id": new_id, "validation": report}
|
|
|
|
|
|
@router.put("/{template_id}")
|
|
def update_system_template(
|
|
template_id: int,
|
|
body: CsvSystemTemplateUpdate,
|
|
session: dict = Depends(require_admin),
|
|
):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"SELECT * FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL",
|
|
(template_id,),
|
|
)
|
|
existing = r2d(cur.fetchone())
|
|
if not existing:
|
|
raise HTTPException(404, "System-Template nicht gefunden")
|
|
|
|
patch: dict[str, Any] = body.model_dump(exclude_unset=True)
|
|
if not patch:
|
|
return _row_full(existing)
|
|
|
|
fm = patch.get("field_mappings", existing["field_mappings"])
|
|
tc_eff = patch.get("type_conversions", existing.get("type_conversions"))
|
|
irp_eff = patch.get("import_row_processing", existing.get("import_row_processing"))
|
|
col_eff = patch.get("column_signature", existing.get("column_signature"))
|
|
|
|
report = validate_csv_template(
|
|
existing["module"],
|
|
fm,
|
|
tc_eff,
|
|
irp_eff,
|
|
col_eff if isinstance(col_eff, list) else None,
|
|
cur=cur,
|
|
)
|
|
if not report["valid"]:
|
|
raise HTTPException(status_code=422, detail=report)
|
|
|
|
fields_sql = []
|
|
vals: list = []
|
|
if "mapping_name" in patch:
|
|
fields_sql.append("mapping_name = %s")
|
|
vals.append(patch["mapping_name"])
|
|
if "description" in patch:
|
|
fields_sql.append("description = %s")
|
|
vals.append(patch["description"])
|
|
if "column_signature" in patch:
|
|
fields_sql.append("column_signature = %s")
|
|
vals.append(patch["column_signature"])
|
|
if "delimiter" in patch:
|
|
fields_sql.append("delimiter = %s")
|
|
vals.append(patch["delimiter"])
|
|
if "encoding" in patch:
|
|
fields_sql.append("encoding = %s")
|
|
vals.append(patch["encoding"])
|
|
if "has_header" in patch:
|
|
fields_sql.append("has_header = %s")
|
|
vals.append(patch["has_header"])
|
|
if "field_mappings" in patch:
|
|
fields_sql.append("field_mappings = %s")
|
|
vals.append(Json(patch["field_mappings"]))
|
|
if "type_conversions" in patch:
|
|
fields_sql.append("type_conversions = %s")
|
|
tc = patch["type_conversions"]
|
|
vals.append(Json(tc) if tc is not None else None)
|
|
if "import_row_processing" in patch:
|
|
irp = patch["import_row_processing"]
|
|
fields_sql.append("import_row_processing = %s")
|
|
vals.append(Json(irp) if irp is not None else None)
|
|
|
|
fields_sql.append("updated_at = CURRENT_TIMESTAMP")
|
|
vals.append(template_id)
|
|
|
|
cur.execute(
|
|
f"UPDATE csv_field_mappings SET {', '.join(fields_sql)} WHERE id = %s",
|
|
tuple(vals),
|
|
)
|
|
|
|
cur.execute("SELECT * FROM csv_field_mappings WHERE id = %s", (template_id,))
|
|
m = r2d(cur.fetchone())
|
|
return {**_row_full(m), "validation": report}
|
|
|
|
|
|
@router.delete("/{template_id}")
|
|
def delete_system_template(template_id: int, session: dict = Depends(require_admin)):
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"DELETE FROM csv_field_mappings WHERE id = %s AND is_system = true AND profile_id IS NULL RETURNING id",
|
|
(template_id,),
|
|
)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
raise HTTPException(404, "System-Template nicht gefunden")
|
|
return {"deleted": template_id}
|