- Added permissions for editing and deleting CSV field mappings. - Created type converter for CSV cells to handle various data types. - Implemented database migrations for CSV field mappings and import logs. - Seeded initial system templates for nutrition and activity data imports. - Developed admin endpoints for managing system CSV templates. - Introduced user endpoints for CSV import analysis and mapping retrieval. - Added tests for core CSV parser functionalities, including delimiter detection and value conversion.
138 lines
4.0 KiB
Python
138 lines
4.0 KiB
Python
"""
|
|
CSV bytes → text, delimiter sniffing, strukturierte Erstzeilen für Analyse (Issue #21).
|
|
"""
|
|
from __future__ import annotations
|
|
|
|
import csv
|
|
import io
|
|
import re
|
|
from typing import Any, List, Tuple
|
|
|
|
_DEFAULT_DELIMS = [",", ";", "\t"]
|
|
|
|
|
|
def decode_raw_bytes(raw: bytes) -> str:
|
|
"""UTF-8 bevorzugt, Fallback Latin-1; BOM entfernen."""
|
|
if not raw:
|
|
return ""
|
|
for enc in ("utf-8-sig", "utf-8", "latin-1"):
|
|
try:
|
|
text = raw.decode(enc)
|
|
break
|
|
except UnicodeDecodeError:
|
|
text = ""
|
|
continue
|
|
else:
|
|
text = raw.decode("utf-8", errors="replace")
|
|
if text.startswith("\ufeff"):
|
|
text = text[1:]
|
|
return text
|
|
|
|
|
|
def sniff_delimiter(sample_line: str) -> str:
|
|
"""
|
|
Heuristik: Zähle Vorkommen der Kandidaten in der ersten Datenzeile.
|
|
Kein csv.Sniffer (robuster gegen kurze Zeilen).
|
|
"""
|
|
if not sample_line or not sample_line.strip():
|
|
return ","
|
|
best = ","
|
|
best_count = -1
|
|
for d in _DEFAULT_DELIMS:
|
|
c = sample_line.count(d)
|
|
if c > best_count:
|
|
best_count = c
|
|
best = d
|
|
return best
|
|
|
|
|
|
def _split_first_lines(text: str, max_lines: int = 5) -> List[str]:
|
|
lines: List[str] = []
|
|
for line in text.splitlines():
|
|
if line.strip():
|
|
lines.append(line)
|
|
if len(lines) >= max_lines:
|
|
break
|
|
return lines
|
|
|
|
|
|
def parse_csv_sample(
|
|
text: str,
|
|
delimiter: str | None = None,
|
|
has_header: bool = True,
|
|
max_data_rows: int = 5,
|
|
) -> Tuple[List[str], List[dict[str, str]], str]:
|
|
"""
|
|
Gibt (headers, rows_as_dicts, verwendetes_delimiter) zurück.
|
|
rows sind Rohstrings pro Zelle.
|
|
"""
|
|
lines = _split_first_lines(text, max_lines=50)
|
|
if not lines:
|
|
return [], [], ","
|
|
|
|
delim = delimiter if delimiter is not None else sniff_delimiter(lines[0])
|
|
reader = csv.reader(io.StringIO(text.replace("\r\n", "\n").replace("\r", "\n")), delimiter=delim)
|
|
rows_raw: List[List[str]] = []
|
|
for i, row in enumerate(reader):
|
|
if i >= 1 + max_data_rows + (1 if has_header else 0):
|
|
break
|
|
if not any(c.strip() for c in row):
|
|
continue
|
|
rows_raw.append(row)
|
|
|
|
if not rows_raw:
|
|
return [], [], delim
|
|
|
|
if has_header:
|
|
headers = [h.strip() for h in rows_raw[0]]
|
|
data = rows_raw[1 : 1 + max_data_rows]
|
|
else:
|
|
n = len(rows_raw[0])
|
|
headers = [f"col_{i}" for i in range(n)]
|
|
data = rows_raw[:max_data_rows]
|
|
|
|
dict_rows: List[dict[str, str]] = []
|
|
for r in data:
|
|
row_dict: dict[str, str] = {}
|
|
for j, h in enumerate(headers):
|
|
row_dict[h] = r[j].strip() if j < len(r) else ""
|
|
dict_rows.append(row_dict)
|
|
|
|
return headers, dict_rows, delim
|
|
|
|
|
|
def normalize_header_for_signature(name: str) -> str:
|
|
s = name.strip().lower()
|
|
s = re.sub(r"\s+", "_", s)
|
|
s = re.sub(r"[^a-z0-9_äöüß().%-]+", "_", s)
|
|
return s.strip("_")
|
|
|
|
|
|
def column_signature(headers: List[str]) -> List[str]:
|
|
"""Sortierte normalisierte Spaltennamen für Signatur-Vergleich."""
|
|
return sorted({normalize_header_for_signature(h) for h in headers if h is not None and str(h).strip()})
|
|
|
|
|
|
def headers_signature_match_score(sig_csv: List[str], sig_template: List[str]) -> float:
|
|
"""Jaccard-Überlappung 0..1."""
|
|
a, b = set(sig_csv), set(sig_template)
|
|
if not a and not b:
|
|
return 1.0
|
|
if not a or not b:
|
|
return 0.0
|
|
inter = len(a & b)
|
|
union = len(a | b)
|
|
return inter / union if union else 0.0
|
|
|
|
|
|
def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]:
|
|
"""Liest Limits aus system_config.csv_import; Fallback bei fehlendem Key."""
|
|
defaults = {"max_rows_per_file": 50_000, "max_file_bytes": 52_428_800}
|
|
if not conn_row or "value" not in conn_row:
|
|
return defaults
|
|
val = conn_row["value"]
|
|
if isinstance(val, dict):
|
|
out = {**defaults, **{k: int(v) for k, v in val.items() if k in defaults}}
|
|
return out
|
|
return defaults
|