feat(csv-parser): Enhance CSV parsing with header normalization and flexible date handling
All checks were successful
Deploy Development / deploy (push) Successful in 53s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s

- Added a new function to strip header keys of unwanted characters, improving CSV import consistency.
- Updated CSV row iteration to utilize the new header normalization function, ensuring cleaner data processing.
- Enhanced date parsing capabilities to support flexible formats, accommodating various date representations in CSV files.
- Introduced additional tests to validate the new header normalization and date parsing functionalities.
This commit is contained in:
Lars 2026-04-10 06:23:46 +02:00
parent 5e5f3b4e5a
commit 338163ac0b
3 changed files with 403 additions and 42 deletions

View File

@ -138,6 +138,12 @@ def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]:
return defaults
def _strip_header_key(k: str | None) -> str:
if k is None:
return ""
return str(k).strip().removeprefix("\ufeff")
def iter_csv_dict_rows(
text: str,
delimiter: str,
@ -154,4 +160,4 @@ def iter_csv_dict_rows(
continue
if not any(v and str(v).strip() for v in row.values()):
continue
yield {k: (v or "").strip() for k, v in row.items()}
yield {_strip_header_key(k): (v or "").strip() for k, v in row.items() if _strip_header_key(k)}

View File

@ -1,5 +1,8 @@
"""
Typkonvertierung für CSV-Zellen gemäß type_conversions-JSON (Issue #21).
Locale-robust: dieselbe Vorlage kann Exporte mit wechselndem Datumsformat oder
Dezimaltrenner verarbeiten, wenn flexible oder auto-Optionen gesetzt sind.
"""
from __future__ import annotations
@ -7,7 +10,11 @@ from __future__ import annotations
import datetime as dt
import re
from decimal import Decimal, InvalidOperation
from typing import Any, Mapping
from typing import Any, Mapping, Sequence
from dateutil import parser as dateutil_parser
from csv_parser.core import normalize_header_for_signature
# Alias → strptime (JSON in Kleinbuchstaben)
DATE_FORMAT_STRPTIME: dict[str, str] = {
@ -15,9 +22,10 @@ DATE_FORMAT_STRPTIME: dict[str, str] = {
"mm/dd/yyyy": "%m/%d/%Y",
"dd/mm/yyyy": "%d/%m/%Y",
"dd.mm.yyyy": "%d.%m.%Y",
"dd.mm.yyyy hh:mm": "%d.%m.%Y %H:%M",
"dd.mm.yyyy HH:MM": "%d.%m.%Y %H:%M",
"yyyy-mm-dd HH:MM:SS": "%Y-%m-%d %H:%M:%S",
"yyyy-mm-dd hh:mm:ss": "%Y-%m-%d %H:%M:%S",
"yyyy-mm-dd HH:MM:SS": "%Y-%m-%d %H:%M:%S",
}
TIME_FORMAT_STRPTIME: dict[str, str] = {
@ -25,20 +33,264 @@ TIME_FORMAT_STRPTIME: dict[str, str] = {
"HH:MM:SS": "%H:%M:%S",
}
# Wenn flexible: zusätzliche strptime-Versuche (ungefähr häufig → seltener)
_STRPTIME_FALLBACK_DATES: list[str] = [
"%Y-%m-%d",
"%d.%m.%Y",
"%d.%m.%y",
"%d/%m/%Y",
"%m/%d/%Y",
"%Y/%m/%d",
"%Y%m%d",
]
_STRPTIME_FALLBACK_DATETIME: list[str] = [
"%Y-%m-%d",
"%Y-%m-%d %H:%M:%S",
"%Y-%m-%d %H:%M",
"%d.%m.%Y %H:%M:%S",
"%d.%m.%Y %H:%M",
"%d.%m.%y %H:%M:%S",
"%d.%m.%y %H:%M",
"%Y-%m-%dT%H:%M:%S",
"%Y-%m-%dT%H:%M:%SZ",
"%Y-%m-%dT%H:%M:%S%z",
]
def _parse_float(raw: str, decimal_sep: str = ".") -> float:
s = raw.strip()
def _normalize_num_token(raw: str) -> str:
return re.sub(r"[\s\u00a0\u202f]", "", raw.strip())
def _parse_float_auto(s: str) -> float:
"""
Heuristik ohne festes Locale: Punkt/Komma als Tausender vs. Dezimal,
basierend auf der letzten erkannten Trennstelle und Gruppierung.
"""
s = _normalize_num_token(s)
if not s or s in ("-", "", ""):
raise ValueError("leer")
neg = False
if s.startswith("(") and s.endswith(")"):
neg = True
s = s[1:-1].strip()
if s.startswith("-"):
neg = not neg
s = s[1:]
elif s.startswith("+"):
s = s[1:]
last_comma = s.rfind(",")
last_dot = s.rfind(".")
if last_comma >= 0 and last_dot >= 0:
if last_comma > last_dot:
s = s.replace(".", "").replace(",", ".")
else:
s = s.replace(",", "")
elif last_comma >= 0:
parts = s.split(",")
if len(parts) == 2 and len(parts[1]) <= 2:
s = parts[0].replace(".", "") + "." + parts[1]
elif len(parts) == 2 and len(parts[1]) == 3 and len(parts[0]) <= 3:
s = parts[0] + parts[1]
else:
s = s.replace(",", "")
elif last_dot >= 0:
parts = s.split(".")
if len(parts) == 2 and len(parts[1]) <= 2:
s = parts[0].replace(",", "") + "." + parts[1]
elif len(parts) == 2 and len(parts[1]) == 3 and len(parts[0]) <= 3:
s = parts[0] + parts[1]
elif len(parts) > 2:
if len(parts[-1]) <= 2:
s = "".join(parts[:-1]) + "." + parts[-1]
else:
s = "".join(parts)
else:
s = s.replace(".", "")
try:
v = float(Decimal(s))
except (InvalidOperation, ValueError) as e:
raise ValueError(f"Zahl nicht parsbar: {raw!r}") from e
return -v if neg else v
def _parse_float(raw: str, decimal_sep: str) -> float:
s = _normalize_num_token(raw)
if not s:
raise ValueError("leer")
if "." in s and "," in s:
return _parse_float_auto(s)
if decimal_sep == ",":
if "," not in s and "." in s:
return _parse_float_auto(s)
s = s.replace(".", "").replace(",", ".")
else:
if "," in s and "." not in s:
return _parse_float_auto(s)
s = s.replace(",", "")
return float(Decimal(s))
def _parse_int(raw: str) -> int:
s = re.sub(r"[^\d-]", "", raw.strip())
def _float_from_spec(raw: str, spec: Mapping[str, Any]) -> float:
dec = spec.get("decimal_separator", ".")
flexible = bool(spec.get("flexible"))
if dec in (None, "auto"):
return _parse_float_auto(raw)
try:
return _parse_float(raw, str(dec))
except (InvalidOperation, ValueError):
if flexible:
return _parse_float_auto(raw)
raise
def _resolve_strptime_pattern(fmt_key: str) -> str | None:
k = fmt_key.strip()
if k.startswith("%"):
return k
return DATE_FORMAT_STRPTIME.get(k.lower())
def _collect_strptime_date_formats(spec: Mapping[str, Any], *, for_datetime: bool) -> list[str]:
seen: set[str] = set()
out: list[str] = []
def add(fmt_key: str) -> None:
p = _resolve_strptime_pattern(fmt_key)
if p and p not in seen:
seen.add(p)
out.append(p)
if not for_datetime and p.endswith(" %H:%M") and "%H:%M:%S" not in p:
p2 = p + ":%S"
if p2 not in seen:
seen.add(p2)
out.append(p2)
primary = spec.get("format")
if primary:
add(str(primary))
extra = spec.get("formats")
if isinstance(extra, Sequence) and not isinstance(extra, (str, bytes)):
for item in extra:
if item:
add(str(item))
if bool(spec.get("flexible")):
for p in _STRPTIME_FALLBACK_DATETIME if for_datetime else _STRPTIME_FALLBACK_DATES:
if p not in seen:
seen.add(p)
out.append(p)
return out
def _try_strptime(s: str, patterns: Sequence[str]) -> dt.datetime | None:
for pat in patterns:
try:
return dt.datetime.strptime(s, pat)
except ValueError:
continue
return None
def _try_strptime_trim_time(s: str, patterns: Sequence[str]) -> dt.datetime | None:
head = s.split(maxsplit=1)[0].strip() if s else ""
if head and head != s:
hit = _try_strptime(head, patterns)
if hit:
return hit
return _try_strptime(s, patterns)
def _dateutil_parse(s: str, spec: Mapping[str, Any]) -> dt.datetime | None:
dayfirst_opt = spec.get("dayfirst")
tries: list[bool | None]
if dayfirst_opt is True:
tries = [True]
elif dayfirst_opt is False:
tries = [False]
else:
tries = [True, False]
for df in tries:
try:
return dateutil_parser.parse(s, dayfirst=df)
except (ValueError, TypeError, OverflowError):
continue
return None
def _parse_date_typed(s: str, spec: Mapping[str, Any]) -> dt.date | dt.datetime:
extract = spec.get("extract", "date_only")
patterns = _collect_strptime_date_formats(spec, for_datetime=False)
part = _try_strptime_trim_time(s, patterns) if patterns else None
if part is None:
part = _try_strptime(s, _collect_strptime_date_formats(spec, for_datetime=True))
if part is None and (bool(spec.get("flexible")) or spec.get("formats")):
part = _dateutil_parse(s, spec)
if part is None:
fmt_key = str(spec.get("format", ""))
raise ValueError(f"Datum nicht parsbar: {fmt_key} / {s!r}")
if extract == "date_only":
return part.date()
return part
def _parse_datetime_typed(s: str, spec: Mapping[str, Any]) -> dt.datetime:
patterns = _collect_strptime_date_formats(spec, for_datetime=True)
part = _try_strptime(s, patterns)
if part is None and (bool(spec.get("flexible")) or spec.get("formats")):
du = _dateutil_parse(s, spec)
if du:
part = du
if part is None:
fmt_key = str(spec.get("format", ""))
raise ValueError(f"Datetime nicht parsbar: {fmt_key} / {s!r}")
return part
def _parse_time_typed(s: str, spec: Mapping[str, Any]) -> dt.time:
patterns: list[str] = []
seen: set[str] = set()
primary = spec.get("format")
if primary:
fk = str(primary)
p = TIME_FORMAT_STRPTIME.get(fk, _resolve_strptime_pattern(fk) or fk)
if p not in seen:
seen.add(p)
patterns.append(p)
extra = spec.get("formats")
if isinstance(extra, Sequence) and not isinstance(extra, (str, bytes)):
for item in extra:
if not item:
continue
p = TIME_FORMAT_STRPTIME.get(str(item), str(item))
if p not in seen:
seen.add(p)
patterns.append(p)
if bool(spec.get("flexible")):
for p in ("%H:%M:%S", "%H:%M"):
if p not in seen:
seen.add(p)
patterns.append(p)
part = _try_strptime(s.strip(), patterns)
if part is None:
raise ValueError(f"Zeit nicht parsbar: {s!r}")
return part.time()
def _parse_int(raw: str, spec: Mapping[str, Any]) -> int:
s = raw.strip()
if bool(spec.get("flexible")) or spec.get("thousands_separator") == "auto":
s2 = _normalize_num_token(s)
neg = s2.startswith("-")
body = s2[1:] if neg else s2
digits = re.sub(r"\D", "", body)
if not digits:
raise ValueError("leer")
v = int(digits)
return -v if neg else v
s = re.sub(r"[^\d-]", "", s)
if not s:
raise ValueError("leer")
return int(s)
@ -52,6 +304,12 @@ def convert_value(
"""
Konvertiert eine Roh-Zelle in einen Python-Wert.
spec kommt aus type_conversions[db_field].
Optionen (JSON):
- flexible: true nach Primärformat Fallbacks (Datum/Zahl/Zeit/Duration).
- decimal_separator: ".", ",", "auto" bei auto Heuristik EU/US-Mischformen.
- formats: [ "yyyy-mm-dd", "%d.%m.%y", ... ] weitere strptime-/Alias-Ketten.
- dayfirst: true|false nur für dateutil-Fallback; Standard: true dann false.
"""
if spec is None:
return raw.strip() if raw else None
@ -66,59 +324,73 @@ def convert_value(
return s
if t in ("float", "number"):
dec = spec.get("decimal_separator", ".")
v = _parse_float(s, dec)
v = _float_from_spec(raw, spec)
factor = spec.get("conversion_factor")
if factor is not None:
v = float(v) * float(factor)
return v
if t == "int":
return _parse_int(s)
return _parse_int(raw, spec)
if t == "date":
fmt_key = str(spec.get("format", "yyyy-mm-dd"))
fmt = DATE_FORMAT_STRPTIME.get(fmt_key.lower())
if not fmt:
raise ValueError(f"Unbekanntes Datumsformat: {fmt_key}")
part = dt.datetime.strptime(s, fmt)
extract = spec.get("extract", "date_only")
if extract == "date_only":
return part.date()
return part
return _parse_date_typed(s, spec)
if t == "time":
fmt_key = str(spec.get("format", "HH:MM"))
fmt = TIME_FORMAT_STRPTIME.get(fmt_key, fmt_key)
part = dt.datetime.strptime(s, fmt)
return part.time()
return _parse_time_typed(s, spec)
if t == "datetime":
fmt_key = str(spec.get("format", "yyyy-mm-dd HH:MM:SS"))
fmt = DATE_FORMAT_STRPTIME.get(fmt_key.lower())
if not fmt:
raise ValueError(f"Unbekanntes Datetime-Format: {fmt_key}")
return dt.datetime.strptime(s, fmt)
return _parse_datetime_typed(s, spec)
if t == "duration":
# z. B. HH:MM:SS → Minuten
fmt_key = str(spec.get("format", "HH:MM:SS"))
target = spec.get("target_unit", "minutes")
parts = s.split(":")
if fmt_key == "HH:MM:SS" and len(parts) == 3:
h, m, sec = int(parts[0]), int(parts[1]), int(parts[2])
total_min = h * 60 + m + sec / 60.0
if target == "minutes":
return round(total_min, 4)
raise ValueError(f"Unbekannte duration target_unit: {target}")
if fmt_key == "HH:MM" and len(parts) == 2:
h, m = int(parts[0]), int(parts[1])
return h * 60 + m
parts = [p.strip() for p in s.split(":")]
flexible = bool(spec.get("flexible"))
if len(parts) == 3:
try:
h, m, sec = int(parts[0]), int(parts[1]), int(parts[2])
if target == "minutes":
return round(h * 60 + m + sec / 60.0, 4)
except ValueError:
if not flexible:
raise ValueError(f"Duration nicht parsbar: {s!r}") from None
if flexible:
try:
h, m, sec = int(parts[0]), int(parts[1]), float(parts[2])
if target == "minutes":
return round(h * 60 + m + sec / 60.0, 4)
except ValueError:
pass
if len(parts) == 2:
try:
h, m = int(parts[0]), int(parts[1])
return h * 60 + m
except ValueError:
pass
raise ValueError(f"Duration nicht parsbar: {s!r}")
return s
def _lookup_db_field(csv_col: str, field_mappings: Mapping[str, str]) -> str | None:
"""
CSV-Spaltennamen können Roh-Header sein; Vorlagen-Schlüssel oft normalisiert
(wie column_signature). Exakter Treffer, dann Schlüssel nach Normalisierung,
dann Abgleich aller Vorlagen-Keys über deren Normalform.
"""
v = field_mappings.get(csv_col)
if v:
return v if v not in ("-", "_skip") else None
norm = normalize_header_for_signature(csv_col)
v = field_mappings.get(norm)
if v:
return v if v not in ("-", "_skip") else None
for k, fv in field_mappings.items():
if normalize_header_for_signature(str(k)) == norm:
return fv if fv not in ("-", "_skip") else None
return None
def build_row_after_mapping(
csv_row: Mapping[str, str],
field_mappings: Mapping[str, str],
@ -131,8 +403,8 @@ def build_row_after_mapping(
out: dict[str, Any] = {}
tc = type_conversions or {}
for csv_col, raw in csv_row.items():
db_field = field_mappings.get(csv_col)
if not db_field or db_field in ("-", "_skip"):
db_field = _lookup_db_field(str(csv_col), field_mappings)
if not db_field:
continue
spec = tc.get(db_field)
try:

View File

@ -74,3 +74,86 @@ def test_build_row_after_mapping():
out = build_row_after_mapping(csv_row, fm, tc)
assert out["date"].month == 1
assert out["kcal"] is not None
def test_build_row_fddb_raw_header_keys_match_normalized_template():
"""FDDB: DictReader liefert deutsche Überschrift, Seed nutzt normalisierten Key."""
csv_row = {
"Datum Tag Monat Jahr Stunde Minute": "01.01.2024 8:30",
"kJ": "42000",
"Fett (g)": "50",
"KH (g)": "200",
"Protein (g)": "100",
}
fm = {
"datum_tag_monat_jahr_stunde_minute": "date",
"kj": "kcal",
"fett_g": "fat_g",
"kh_g": "carbs_g",
"protein_g": "protein_g",
}
tc = {
"date": {"type": "date", "format": "dd.mm.yyyy HH:MM", "extract": "date_only"},
"kcal": {
"type": "float",
"conversion_factor": 0.239,
"decimal_separator": ",",
},
"fat_g": {"type": "float", "decimal_separator": ","},
"carbs_g": {"type": "float", "decimal_separator": ","},
"protein_g": {"type": "float", "decimal_separator": ","},
}
out = build_row_after_mapping(csv_row, fm, tc)
assert out["date"].year == 2024 and out["date"].month == 1 and out["date"].day == 1
def test_convert_date_ddmm_with_seconds():
d = convert_value(
"15.01.2024 14:30:00",
"date",
{"type": "date", "format": "dd.mm.yyyy HH:MM", "extract": "date_only"},
)
assert d.month == 1 and d.day == 15
def test_float_decimal_separator_auto_eu_us():
assert abs(convert_value("1.234,56", "x", {"type": "float", "decimal_separator": "auto"}) - 1234.56) < 1e-9
assert abs(convert_value("1,234.56", "x", {"type": "float", "decimal_separator": "auto"}) - 1234.56) < 1e-9
assert abs(convert_value("1234,5", "x", {"type": "float", "decimal_separator": "auto"}) - 1234.5) < 1e-9
def test_float_flexible_falls_back_to_auto():
spec = {"type": "float", "decimal_separator": ",", "flexible": True}
assert abs(convert_value("1234.56", "x", spec) - 1234.56) < 1e-9
def test_date_flexible_iso_while_primary_ddmm():
spec = {
"type": "date",
"format": "dd.mm.yyyy",
"flexible": True,
"extract": "date_only",
}
d1 = convert_value("2024-03-15", "d", spec)
d2 = convert_value("15.03.2024", "d", spec)
assert d1 == d2
def test_date_extra_formats_without_days_in_name():
spec = {
"type": "date",
"format": "yyyy-mm-dd",
"formats": ["%d.%m.%Y"],
"extract": "date_only",
}
assert convert_value("08.04.2026", "d", spec).day == 8
def test_int_flexible_thousands():
assert convert_value("1.234", "n", {"type": "int", "flexible": True}) == 1234
def test_datetime_flexible():
spec = {"type": "datetime", "format": "yyyy-mm-dd HH:MM:SS", "flexible": True}
dtv = convert_value("15.01.2024 14:30:00", "t", spec)
assert dtv.month == 1 and dtv.day == 15 and dtv.hour == 14