- Updated the CSV import logic to include SAVEPOINT management, allowing for better error handling during the vitals baseline import process. - Enhanced the SQL migration script to drop existing CHECK constraints related to the 'source' field, ensuring compatibility with the new universal CSV import. - Incremented DB_SCHEMA_VERSION to "20260409c" to reflect these changes and improve the import process reliability.
566 lines
20 KiB
Python
566 lines
20 KiB
Python
"""
|
|
CSV-Import: Nutzer-Endpunkte für Analyse, Mappings, Limits (Issue #21).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import Any, Optional
|
|
|
|
from fastapi import APIRouter, Depends, File, Form, Header, HTTPException, UploadFile
|
|
from pydantic import BaseModel
|
|
|
|
from psycopg2.extras import Json
|
|
|
|
from auth import require_auth, check_feature_access, increment_feature_usage
|
|
from feature_logger import log_feature_usage
|
|
from db import get_db, get_cursor, r2d
|
|
from routers.profiles import get_pid
|
|
from csv_parser.executor import run_universal_csv_import
|
|
from csv_parser.core import (
|
|
decode_raw_bytes,
|
|
column_signature,
|
|
get_csv_import_limits,
|
|
headers_signature_rank_metrics,
|
|
normalize_header_for_signature,
|
|
parse_csv_sample,
|
|
)
|
|
from csv_parser.field_units import source_unit_choices_for_field
|
|
from csv_parser.module_registry import get_module_definition, list_modules, validate_field_mappings
|
|
from csv_parser.sleep_apple_import import detect_apple_sleep_csv_format
|
|
|
|
router = APIRouter(prefix="/api/csv", tags=["csv-import"])
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _load_import_limits() -> dict[str, int]:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute("SELECT value FROM system_config WHERE key = %s", ("csv_import",))
|
|
row = cur.fetchone()
|
|
return get_csv_import_limits(r2d(row) if row else None)
|
|
|
|
|
|
def _mapping_to_summary(m: dict) -> dict:
|
|
return {
|
|
"id": m["id"],
|
|
"module": m["module"],
|
|
"name": m["mapping_name"],
|
|
"description": m.get("description"),
|
|
"is_system": m["is_system"],
|
|
"usage_count": m.get("usage_count"),
|
|
"success_rate": m.get("success_rate"),
|
|
"last_used_at": m.get("last_used_at"),
|
|
"created_at": m.get("created_at"),
|
|
}
|
|
|
|
|
|
@router.get("/modules")
|
|
def csv_modules(session: dict = Depends(require_auth)):
|
|
"""Unterstützte Import-Module und Felddefinitionen."""
|
|
out = []
|
|
for mid in list_modules():
|
|
d = get_module_definition(mid)
|
|
if d:
|
|
fields_out = {}
|
|
for fname, finfo in (d.get("fields") or {}).items():
|
|
fd = dict(finfo)
|
|
opts = source_unit_choices_for_field(mid, fname)
|
|
if opts:
|
|
fd["source_unit_options"] = opts
|
|
fields_out[fname] = fd
|
|
out.append(
|
|
{
|
|
"id": mid,
|
|
"table": d["table"],
|
|
"fields": fields_out,
|
|
"import_mode": d.get("import_mode"),
|
|
"import_row_processing_default": d.get("import_row_processing_default"),
|
|
}
|
|
)
|
|
return {"modules": out}
|
|
|
|
|
|
@router.get("/limits")
|
|
def csv_limits(session: dict = Depends(require_auth)):
|
|
"""Admin-konfigurierbare Import-Limits (system_config.csv_import)."""
|
|
return _load_import_limits()
|
|
|
|
|
|
@router.get("/mappings")
|
|
def list_csv_mappings(
|
|
module: Optional[str] = None,
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
"""System-Templates + eigene User-Mappings."""
|
|
pid = str(session["profile_id"])
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT id, module, mapping_name, description, is_system, profile_id,
|
|
usage_count, success_rate, last_used_at, created_at
|
|
FROM csv_field_mappings
|
|
WHERE is_system = true
|
|
AND (%s::text IS NULL OR module = %s)
|
|
ORDER BY usage_count DESC NULLS LAST, mapping_name
|
|
""",
|
|
(module, module),
|
|
)
|
|
system_rows = [r2d(r) for r in cur.fetchall()]
|
|
|
|
cur.execute(
|
|
"""
|
|
SELECT id, module, mapping_name, description, is_system, profile_id,
|
|
usage_count, success_rate, last_used_at, created_at
|
|
FROM csv_field_mappings
|
|
WHERE is_system = false AND profile_id = %s::uuid
|
|
AND (%s::text IS NULL OR module = %s)
|
|
ORDER BY last_used_at DESC NULLS LAST, mapping_name
|
|
""",
|
|
(pid, module, module),
|
|
)
|
|
user_rows = [r2d(r) for r in cur.fetchall()]
|
|
|
|
return {
|
|
"system_templates": [_mapping_to_summary(m) for m in system_rows],
|
|
"user_mappings": [_mapping_to_summary(m) for m in user_rows],
|
|
}
|
|
|
|
|
|
class CopyMappingBody(BaseModel):
|
|
name: Optional[str] = None
|
|
|
|
|
|
@router.post("/mappings/{mapping_id}/copy")
|
|
def copy_csv_mapping(
|
|
mapping_id: int,
|
|
body: CopyMappingBody | None = None,
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
"""System- oder eigenes Mapping als neues User-Mapping kopieren."""
|
|
pid = str(session["profile_id"])
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM csv_field_mappings WHERE id = %s
|
|
""",
|
|
(mapping_id,),
|
|
)
|
|
src = r2d(cur.fetchone())
|
|
if not src:
|
|
raise HTTPException(404, "Mapping nicht gefunden")
|
|
if not src["is_system"] and str(src.get("profile_id")) != pid:
|
|
raise HTTPException(403, "Kein Zugriff auf dieses Mapping")
|
|
|
|
base_name = (body.name if body and body.name else None) or f"{src['mapping_name']} (Kopie)"
|
|
name = base_name
|
|
n = 1
|
|
while True:
|
|
cur.execute(
|
|
"""
|
|
SELECT 1 FROM csv_field_mappings
|
|
WHERE profile_id = %s::uuid AND module = %s AND mapping_name = %s
|
|
""",
|
|
(pid, src["module"], name),
|
|
)
|
|
if not cur.fetchone():
|
|
break
|
|
n += 1
|
|
name = f"{base_name} {n}"
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO csv_field_mappings (
|
|
profile_id, is_system, module, mapping_name, description,
|
|
column_signature, delimiter, encoding, has_header,
|
|
field_mappings, type_conversions, import_row_processing,
|
|
usage_count, success_rate
|
|
) VALUES (
|
|
%s::uuid, false, %s, %s, %s,
|
|
%s, %s, %s, %s, %s, %s, %s, 0, 1.0
|
|
) RETURNING id
|
|
""",
|
|
(
|
|
pid,
|
|
src["module"],
|
|
name,
|
|
src.get("description"),
|
|
src["column_signature"],
|
|
src["delimiter"],
|
|
src["encoding"],
|
|
src["has_header"],
|
|
Json(src["field_mappings"]),
|
|
Json(src["type_conversions"]) if src.get("type_conversions") is not None else None,
|
|
Json(src["import_row_processing"])
|
|
if src.get("import_row_processing") is not None
|
|
else None,
|
|
),
|
|
)
|
|
new_id = cur.fetchone()["id"]
|
|
return {"new_mapping_id": new_id, "mapping_name": name}
|
|
|
|
|
|
@router.post("/analyze")
|
|
async def analyze_csv(
|
|
file: UploadFile = File(...),
|
|
module: Optional[str] = Form(default=None),
|
|
delimiter: Optional[str] = Form(default=None),
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
"""
|
|
Erste Zeilen parsen, Signatur bilden, Vorlagen nach Ähnlichkeit ranken.
|
|
Ohne `module`: alle System- + eigene User-Vorlagen (echte Format-Erkennung über Spalten-Signatur).
|
|
Mit `module`: nur Vorlagen dieses Moduls (Abwärtskompatibilität).
|
|
"""
|
|
if module and not get_module_definition(module):
|
|
raise HTTPException(400, f"Unbekanntes Modul: {module}")
|
|
|
|
pid = str(session["profile_id"])
|
|
raw = await file.read()
|
|
limits = _load_import_limits()
|
|
max_bytes = limits.get("max_file_bytes", 52_428_800)
|
|
if len(raw) > max_bytes:
|
|
raise HTTPException(
|
|
413,
|
|
f"Datei zu groß (max. {max_bytes} Bytes laut Systemkonfiguration)",
|
|
)
|
|
|
|
text = decode_raw_bytes(raw)
|
|
max_rows = limits.get("max_rows_per_file", 50_000)
|
|
if text.count("\n") > max_rows + 5:
|
|
raise HTTPException(
|
|
413,
|
|
f"Zu viele Zeilen (>{max_rows}) laut Systemkonfiguration csv_import.max_rows_per_file",
|
|
)
|
|
delim = delimiter if delimiter in (",", ";", "\t") else None
|
|
headers, sample_rows, used_delim = parse_csv_sample(text, delimiter=delim, max_data_rows=5)
|
|
sig = column_signature(headers)
|
|
|
|
apple_sleep_csv = False
|
|
try:
|
|
detect_apple_sleep_csv_format(headers)
|
|
apple_sleep_csv = True
|
|
except ValueError:
|
|
pass
|
|
|
|
mod_def = get_module_definition(module) if module else None
|
|
available_fields = mod_def["fields"] if mod_def else None
|
|
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
if module:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, module, mapping_name, description, column_signature,
|
|
delimiter, encoding, has_header, field_mappings, type_conversions, is_system
|
|
FROM csv_field_mappings
|
|
WHERE module = %s
|
|
AND (is_system = true OR (is_system = false AND profile_id = %s::uuid))
|
|
""",
|
|
(module, pid),
|
|
)
|
|
else:
|
|
cur.execute(
|
|
"""
|
|
SELECT id, module, mapping_name, description, column_signature,
|
|
delimiter, encoding, has_header, field_mappings, type_conversions, is_system
|
|
FROM csv_field_mappings
|
|
WHERE is_system = true OR (is_system = false AND profile_id = %s::uuid)
|
|
""",
|
|
(pid,),
|
|
)
|
|
templates = [r2d(r) for r in cur.fetchall()]
|
|
|
|
ranked = []
|
|
for t in templates:
|
|
t_sig = list(t["column_signature"]) if t["column_signature"] else []
|
|
metrics = headers_signature_rank_metrics(sig, t_sig)
|
|
conf = float(metrics["confidence"] or 0)
|
|
if apple_sleep_csv and t.get("module") == "sleep":
|
|
conf = max(conf, 1.0)
|
|
ranked.append(
|
|
{
|
|
"mapping_id": t["id"],
|
|
"module": t["module"],
|
|
"mapping_name": t["mapping_name"],
|
|
"is_system": bool(t.get("is_system")),
|
|
"confidence": round(conf, 4),
|
|
"template_recall": metrics["template_recall"],
|
|
"jaccard": metrics["jaccard"],
|
|
"columns_matched": metrics["columns_matched"],
|
|
"columns_in_template": metrics["columns_in_template"],
|
|
"columns_in_csv": metrics["columns_in_csv"],
|
|
"match_type": "template_recall",
|
|
}
|
|
)
|
|
ranked.sort(
|
|
key=lambda x: (
|
|
-(x.get("confidence") or 0),
|
|
-(x.get("columns_matched") or 0),
|
|
-(x.get("jaccard") or 0),
|
|
),
|
|
)
|
|
|
|
top = ranked[:25]
|
|
recommended = top[0] if top and (top[0]["confidence"] or 0) > 0 else None
|
|
|
|
warnings: list[str] = []
|
|
if apple_sleep_csv and not any(t.get("module") == "sleep" for t in templates):
|
|
warnings.append(
|
|
"Diese Datei ist ein Apple-Schlafexport, aber es fehlt eine Vorlage für das Modul «Schlaf» "
|
|
"(z. B. Migration 044 / Admin → CSV-Vorlagen)."
|
|
)
|
|
if (
|
|
apple_sleep_csv
|
|
and recommended
|
|
and recommended.get("module") != "sleep"
|
|
and any(t.get("module") == "sleep" for t in templates)
|
|
):
|
|
warnings.append(
|
|
"Apple-Schlaf-CSV erkannt — bitte eine Vorlage unter Modul «Schlaf» wählen, nicht «"
|
|
+ str(recommended.get("module") or "")
|
|
+ "».",
|
|
)
|
|
|
|
return {
|
|
"module_filter": module,
|
|
"filename": file.filename,
|
|
"encoding_note": "utf-8/latin-1 mit BOM-Strip",
|
|
"delimiter": used_delim,
|
|
"columns": headers,
|
|
"column_signature_normalized": sig,
|
|
"sample_rows": sample_rows,
|
|
"detected_mappings": top,
|
|
"recommended": recommended,
|
|
"available_fields": available_fields,
|
|
"format_detection": {"apple_sleep": apple_sleep_csv},
|
|
"warnings": warnings,
|
|
}
|
|
|
|
|
|
def _fetch_mapping_row(
|
|
cur,
|
|
mapping_id: int,
|
|
profile_id: str,
|
|
module: Optional[str] = None,
|
|
) -> dict:
|
|
cur.execute(
|
|
"""
|
|
SELECT * FROM csv_field_mappings WHERE id = %s
|
|
""",
|
|
(mapping_id,),
|
|
)
|
|
m = r2d(cur.fetchone())
|
|
if not m:
|
|
raise HTTPException(404, "Mapping nicht gefunden")
|
|
if module is not None and m.get("module") != module:
|
|
raise HTTPException(400, "Mapping gehört zu einem anderen Modul")
|
|
if not m.get("is_system"):
|
|
if str(m.get("profile_id") or "") != profile_id:
|
|
raise HTTPException(403, "Kein Zugriff auf dieses Mapping")
|
|
return m
|
|
|
|
|
|
def _check_module_feature_access(pid: str, module: str) -> None:
|
|
if module == "nutrition":
|
|
access = check_feature_access(pid, "nutrition_entries")
|
|
log_feature_usage(pid, "nutrition_entries", access, "csv_universal_import")
|
|
if not access["allowed"]:
|
|
raise HTTPException(
|
|
403,
|
|
f"Limit erreicht (Ernährungseinträge): {access.get('used')}/{access.get('limit')}",
|
|
)
|
|
elif module == "weight":
|
|
access = check_feature_access(pid, "weight_entries")
|
|
log_feature_usage(pid, "weight_entries", access, "csv_universal_import")
|
|
if not access["allowed"]:
|
|
raise HTTPException(
|
|
403,
|
|
f"Limit erreicht (Gewichtseinträge): {access.get('used')}/{access.get('limit')}",
|
|
)
|
|
elif module == "activity":
|
|
access = check_feature_access(pid, "activity_entries")
|
|
log_feature_usage(pid, "activity_entries", access, "csv_universal_import")
|
|
if not access["allowed"]:
|
|
raise HTTPException(
|
|
403,
|
|
f"Limit erreicht (Aktivitätseinträge): {access.get('used')}/{access.get('limit')}",
|
|
)
|
|
|
|
|
|
@router.post("/import")
|
|
async def csv_import_execute(
|
|
file: UploadFile = File(...),
|
|
mapping_id: int = Form(...),
|
|
module: Optional[str] = Form(default=None),
|
|
x_profile_id: Optional[str] = Header(default=None),
|
|
session: dict = Depends(require_auth),
|
|
):
|
|
"""
|
|
Universal-CSV-Import: Zielmodul kommt aus der gewählten Vorlage (`mapping_id`).
|
|
Optional `module` dient nur der Absicherung (muss mit Vorlage übereinstimmen).
|
|
"""
|
|
pid = get_pid(x_profile_id)
|
|
|
|
access_di = check_feature_access(pid, "data_import")
|
|
log_feature_usage(pid, "data_import", access_di, "csv_universal_import")
|
|
if not access_di["allowed"]:
|
|
raise HTTPException(
|
|
403,
|
|
"Limit erreicht (Daten importieren): "
|
|
f"{access_di.get('used')}/{access_di.get('limit')}",
|
|
)
|
|
|
|
raw = await file.read()
|
|
limits = _load_import_limits()
|
|
max_bytes = limits.get("max_file_bytes", 52_428_800)
|
|
if len(raw) > max_bytes:
|
|
raise HTTPException(
|
|
413,
|
|
f"Datei zu groß (max. {max_bytes} Bytes laut Systemkonfiguration)",
|
|
)
|
|
text = decode_raw_bytes(raw)
|
|
if not text.strip():
|
|
raise HTTPException(400, "Leere Datei")
|
|
max_rows = limits.get("max_rows_per_file", 50_000)
|
|
if text.count("\n") > max_rows + 5:
|
|
raise HTTPException(
|
|
413,
|
|
f"Zu viele Zeilen (>{max_rows}) laut Systemkonfiguration",
|
|
)
|
|
|
|
log_id: int | None = None
|
|
err_response: HTTPException | None = None
|
|
result: dict | None = None
|
|
resolved_module: str | None = None
|
|
|
|
try:
|
|
with get_db() as conn:
|
|
cur = get_cursor(conn)
|
|
m = _fetch_mapping_row(cur, mapping_id, pid, module)
|
|
exec_module = m["module"]
|
|
resolved_module = exec_module
|
|
|
|
if not get_module_definition(exec_module):
|
|
raise HTTPException(
|
|
400,
|
|
f"Modul der Vorlage wird vom Importer noch nicht unterstützt: {exec_module}",
|
|
)
|
|
|
|
_check_module_feature_access(pid, exec_module)
|
|
|
|
cur.execute(
|
|
"""
|
|
INSERT INTO csv_import_log (
|
|
profile_id, mapping_id, module, filename,
|
|
rows_total, rows_imported, rows_updated, rows_skipped, rows_errors,
|
|
status, error_details, affected_ids
|
|
) VALUES (
|
|
%s::uuid, %s, %s, %s,
|
|
0, 0, 0, 0, 0,
|
|
'running', NULL, NULL
|
|
) RETURNING id
|
|
""",
|
|
(pid, mapping_id, exec_module, file.filename or "upload.csv"),
|
|
)
|
|
log_id = cur.fetchone()["id"]
|
|
|
|
cur.execute("SAVEPOINT csv_import_exec")
|
|
try:
|
|
result = run_universal_csv_import(
|
|
cur,
|
|
pid,
|
|
exec_module,
|
|
text,
|
|
file.filename or "upload.csv",
|
|
m,
|
|
)
|
|
except Exception as exec_err:
|
|
logger.exception("Universal-CSV-Import fehlgeschlagen: %s", exec_err)
|
|
cur.execute("ROLLBACK TO SAVEPOINT csv_import_exec")
|
|
cur.execute(
|
|
"""
|
|
UPDATE csv_import_log SET
|
|
finished_at = CURRENT_TIMESTAMP,
|
|
status = 'failed',
|
|
error_details = %s
|
|
WHERE id = %s
|
|
""",
|
|
(Json([{"error": str(exec_err)}]), log_id),
|
|
)
|
|
err_response = HTTPException(500, f"Import fehlgeschlagen: {exec_err}")
|
|
else:
|
|
cur.execute("RELEASE SAVEPOINT csv_import_exec")
|
|
cur.execute(
|
|
"""
|
|
UPDATE csv_import_log SET
|
|
finished_at = CURRENT_TIMESTAMP,
|
|
status = 'success',
|
|
rows_total = %s,
|
|
rows_imported = %s,
|
|
rows_updated = %s,
|
|
rows_skipped = %s,
|
|
rows_errors = %s,
|
|
error_details = %s,
|
|
affected_ids = %s
|
|
WHERE id = %s
|
|
""",
|
|
(
|
|
result["rows_total"],
|
|
result["rows_imported"],
|
|
result["rows_updated"],
|
|
result["rows_skipped"],
|
|
result["rows_errors"],
|
|
Json(result["error_details"]),
|
|
Json(result["affected_ids"]),
|
|
log_id,
|
|
),
|
|
)
|
|
|
|
cur.execute(
|
|
"""
|
|
UPDATE csv_field_mappings SET
|
|
usage_count = usage_count + 1,
|
|
last_used_at = CURRENT_TIMESTAMP,
|
|
updated_at = CURRENT_TIMESTAMP
|
|
WHERE id = %s
|
|
""",
|
|
(mapping_id,),
|
|
)
|
|
|
|
except HTTPException:
|
|
raise
|
|
|
|
if err_response:
|
|
raise err_response
|
|
|
|
assert result is not None
|
|
|
|
increment_feature_usage(pid, "data_import")
|
|
|
|
ne = result.get("new_entries", result["rows_imported"])
|
|
if resolved_module == "nutrition":
|
|
for _ in range(ne):
|
|
increment_feature_usage(pid, "nutrition_entries")
|
|
elif resolved_module == "weight":
|
|
for _ in range(ne):
|
|
increment_feature_usage(pid, "weight_entries")
|
|
elif resolved_module == "activity":
|
|
for _ in range(ne):
|
|
increment_feature_usage(pid, "activity_entries")
|
|
|
|
return {
|
|
"success": True,
|
|
"import_log_id": log_id,
|
|
"module": resolved_module,
|
|
"stats": {
|
|
"total_rows": result["rows_total"],
|
|
"imported": result["rows_imported"],
|
|
"updated": result["rows_updated"],
|
|
"skipped": result["rows_skipped"],
|
|
"errors": result["rows_errors"],
|
|
},
|
|
"error_details": result["error_details"],
|
|
}
|