mitai-jinkendo/backend/csv_parser/core.py
Lars e4e8c70cd2
All checks were successful
Deploy Development / deploy (push) Successful in 49s
Build Test / pytest-backend (push) Successful in 5s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 16s
feat: Enhance CSV header normalization and mapping for activity data
- Introduced a new utility function `canonical_csv_header_label` to standardize CSV header labels, improving consistency in field mapping.
- Updated the `_lookup_db_field` function to support prefix matching for longer manual keys, enhancing the accuracy of field resolution.
- Added tests to validate handling of non-breaking space characters in CSV headers and ensure correct mapping to normalized keys, improving robustness of CSV parsing.
2026-04-15 10:04:32 +02:00

221 lines
6.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CSV bytes → text, delimiter sniffing, strukturierte Erstzeilen für Analyse (Issue #21).
"""
from __future__ import annotations
import csv
import io
import re
from typing import Any, Dict, Iterator, List, Sequence, Tuple
_DEFAULT_DELIMS = [",", ";", "\t"]
def decode_raw_bytes(raw: bytes) -> str:
"""UTF-8 bevorzugt, Fallback Latin-1; BOM entfernen."""
if not raw:
return ""
for enc in ("utf-8-sig", "utf-8", "latin-1"):
try:
text = raw.decode(enc)
break
except UnicodeDecodeError:
text = ""
continue
else:
text = raw.decode("utf-8", errors="replace")
if text.startswith("\ufeff"):
text = text[1:]
return text
def sniff_delimiter(sample_line: str) -> str:
"""
Heuristik: Zähle Vorkommen der Kandidaten in der ersten Datenzeile.
Kein csv.Sniffer (robuster gegen kurze Zeilen).
"""
if not sample_line or not sample_line.strip():
return ","
best = ","
best_count = -1
for d in _DEFAULT_DELIMS:
c = sample_line.count(d)
if c > best_count:
best_count = c
best = d
return best
def _split_first_lines(text: str, max_lines: int = 5) -> List[str]:
lines: List[str] = []
for line in text.splitlines():
if line.strip():
lines.append(line)
if len(lines) >= max_lines:
break
return lines
def canonical_csv_header_label(name: str | None) -> str:
"""
Einheitlicher Spalten-Key für Analyse (Vorlage/Dialog), Import und Signatur.
BOM und NBSP (häufig in Excel/Apple-Exporten) werden vereinheitlicht, damit
field_mappings exakt zu DictReader-Zeilen passt.
"""
if name is None:
return ""
s = str(name).replace("\ufeff", "").replace("\u00a0", " ").strip()
return s
def parse_csv_sample(
text: str,
delimiter: str | None = None,
has_header: bool = True,
max_data_rows: int = 5,
) -> Tuple[List[str], List[dict[str, str]], str]:
"""
Gibt (headers, rows_as_dicts, verwendetes_delimiter) zurück.
rows sind Rohstrings pro Zelle.
"""
lines = _split_first_lines(text, max_lines=50)
if not lines:
return [], [], ","
delim = delimiter if delimiter is not None else sniff_delimiter(lines[0])
reader = csv.reader(io.StringIO(text.replace("\r\n", "\n").replace("\r", "\n")), delimiter=delim)
rows_raw: List[List[str]] = []
for i, row in enumerate(reader):
if i >= 1 + max_data_rows + (1 if has_header else 0):
break
if not any(c.strip() for c in row):
continue
rows_raw.append(row)
if not rows_raw:
return [], [], delim
if has_header:
headers = [canonical_csv_header_label(h) for h in rows_raw[0]]
data = rows_raw[1 : 1 + max_data_rows]
else:
n = len(rows_raw[0])
headers = [f"col_{i}" for i in range(n)]
data = rows_raw[:max_data_rows]
dict_rows: List[dict[str, str]] = []
for r in data:
row_dict: dict[str, str] = {}
for j, h in enumerate(headers):
row_dict[h] = r[j].strip() if j < len(r) else ""
dict_rows.append(row_dict)
return headers, dict_rows, delim
def normalize_header_for_signature(name: str) -> str:
s = canonical_csv_header_label(name).lower()
s = re.sub(r"\s+", "_", s)
s = re.sub(r"[^a-z0-9_äöüß().%-]+", "_", s)
return s.strip("_")
def column_signature(headers: List[str]) -> List[str]:
"""Sortierte normalisierte Spaltennamen für Signatur-Vergleich."""
return sorted(
{normalize_header_for_signature(h) for h in headers if h is not None and canonical_csv_header_label(str(h))}
)
def headers_signature_match_score(sig_csv: List[str], sig_template: List[str]) -> float:
"""Jaccard-Überlappung 0..1 (|A∩B|/|AB|). Fällt stark, wenn die CSV viele Zusatzspalten hat."""
a, b = set(sig_csv), set(sig_template)
if not a and not b:
return 1.0
if not a or not b:
return 0.0
inter = len(a & b)
union = len(a | b)
return inter / union if union else 0.0
def headers_signature_template_recall(sig_csv: Sequence[str], sig_template: Sequence[str]) -> float:
"""
Anteil der Template-Spalten (Signatur), die in der CSV vorkommen: |A∩B|/|B|.
100 %, sobald alle für die Vorlage relevanten Spalten in der Datei sind — unabhängig von
Zusatzspalten (Gewicht + Ernährung in einer Datei erzeugt keinen „Abzug“ für die jeweilige Vorlage).
"""
a = set(sig_csv)
b = {normalize_header_for_signature(str(x)) for x in sig_template}
b.discard("")
if not b:
return 1.0 if not a else 0.0
inter = len(a & b)
return inter / len(b)
def headers_signature_rank_metrics(sig_csv: List[str], sig_template: List[str]) -> dict[str, Any]:
"""
Einheitliche Kennzahlen für Vorlagen-Ranking und UI.
confidence = template_recall (empfohlen für Anzeige / Sortierung primär).
"""
a = set(sig_csv)
b = {normalize_header_for_signature(str(x)) for x in sig_template}
b.discard("")
inter = a & b
n_inter = len(inter)
n_b = len(b)
n_a = len(a)
union = len(a | b)
template_recall = n_inter / n_b if n_b else (1.0 if not n_a else 0.0)
jaccard = n_inter / union if union else 0.0
return {
"confidence": round(template_recall, 4),
"template_recall": round(template_recall, 4),
"jaccard": round(jaccard, 4),
"columns_matched": n_inter,
"columns_in_template": n_b,
"columns_in_csv": n_a,
}
def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]:
"""Liest Limits aus system_config.csv_import; Fallback bei fehlendem Key."""
defaults = {"max_rows_per_file": 50_000, "max_file_bytes": 52_428_800}
if not conn_row or "value" not in conn_row:
return defaults
val = conn_row["value"]
if isinstance(val, dict):
out = {**defaults, **{k: int(v) for k, v in val.items() if k in defaults}}
return out
return defaults
def iter_csv_dict_rows(
text: str,
delimiter: str,
*,
has_header: bool = True,
) -> Iterator[Dict[str, str]]:
"""
Vollständige Datei zeilenweise als Dict (Header = Keys).
Spaltenreihenfolge ist egal; zusätzliche Spalten werden ignoriert, wenn sie nicht
in field_mappings vorkommen. Keine Obergrenze für die Spaltenanzahl (nur Zeilenlimits
kommen aus system_config / Import-Router).
"""
if not has_header:
raise ValueError("CSV ohne Kopfzeile wird für Import noch nicht unterstützt")
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
reader = csv.DictReader(io.StringIO(normalized), delimiter=delimiter)
for row in reader:
if row is None:
continue
if not any(v and str(v).strip() for v in row.values()):
continue
yield {
canonical_csv_header_label(k): (v or "").strip()
for k, v in row.items()
if canonical_csv_header_label(k)
}