mitai-jinkendo/backend/csv_parser/core.py
Lars 7226e04e9c
All checks were successful
Deploy Development / deploy (push) Successful in 1m0s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat: implement effective CSV delimiter resolution for imports
- Added `resolve_effective_csv_delimiter` function to determine the correct delimiter based on the uploaded file and template.
- Updated CSV import logic to utilize the new delimiter resolution method, ensuring accurate parsing of CSV files with varying delimiters.
- Enhanced documentation to reflect changes in delimiter handling.
- Added unit tests for the new delimiter resolution functionality.
2026-04-18 10:12:33 +02:00

261 lines
8.2 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CSV bytes → text, delimiter sniffing, strukturierte Erstzeilen für Analyse (Issue #21).
"""
from __future__ import annotations
import csv
import io
import re
from typing import Any, Dict, Iterator, List, Sequence, Tuple
_DEFAULT_DELIMS = [",", ";", "\t"]
def decode_raw_bytes(raw: bytes) -> str:
"""UTF-8 bevorzugt, Fallback Latin-1; BOM entfernen."""
if not raw:
return ""
for enc in ("utf-8-sig", "utf-8", "latin-1"):
try:
text = raw.decode(enc)
break
except UnicodeDecodeError:
text = ""
continue
else:
text = raw.decode("utf-8", errors="replace")
if text.startswith("\ufeff"):
text = text[1:]
return text
def sniff_delimiter(sample_line: str) -> str:
"""
Heuristik: Zähle Vorkommen der Kandidaten in der ersten Datenzeile.
Kein csv.Sniffer (robuster gegen kurze Zeilen).
"""
if not sample_line or not sample_line.strip():
return ","
best = ","
best_count = -1
for d in _DEFAULT_DELIMS:
c = sample_line.count(d)
if c > best_count:
best_count = c
best = d
return best
def _csv_field_count(line: str, delimiter: str) -> int:
"""Anzahl Felder in einer Zeile (csv.reader, berücksichtigt Anführungszeichen)."""
if not line or not line.strip():
return 0
try:
row = next(csv.reader(io.StringIO(line), delimiter=delimiter))
except StopIteration:
return 0
return len(row)
def resolve_effective_csv_delimiter(text: str, template_delimiter: str | None = None) -> str:
"""
Trennzeichen für die hochgeladene Datei wählen. Gespeicherte Vorlagen haben oft «,»
(Apple EN), tatsächliche Exporte je nach Region «;» (Apple DE / Excel) — mit falschem
Zeichen wird die Kopfzeile zu **einer** Spalte und das Mapping bricht vollständig.
"""
tpl = (template_delimiter or "").strip()
if tpl not in _DEFAULT_DELIMS:
tpl = None
lines = _split_first_lines(text, max_lines=5)
if not lines:
return tpl or ","
header = lines[0]
scores: list[tuple[int, str]] = []
for d in _DEFAULT_DELIMS:
scores.append((_csv_field_count(header, d), d))
max_n = max(n for n, _ in scores)
if max_n <= 1:
return tpl or sniff_delimiter(header)
at_max = [d for n, d in scores if n == max_n]
if tpl and tpl in at_max:
return tpl
return at_max[0]
def _split_first_lines(text: str, max_lines: int = 5) -> List[str]:
lines: List[str] = []
for line in text.splitlines():
if line.strip():
lines.append(line)
if len(lines) >= max_lines:
break
return lines
def canonical_csv_header_label(name: str | None) -> str:
"""
Einheitlicher Spalten-Key für Analyse (Vorlage/Dialog), Import und Signatur.
BOM und NBSP (häufig in Excel/Apple-Exporten) werden vereinheitlicht, damit
field_mappings exakt zu DictReader-Zeilen passt.
"""
if name is None:
return ""
s = str(name).replace("\ufeff", "").replace("\u00a0", " ").strip()
return s
def parse_csv_sample(
text: str,
delimiter: str | None = None,
has_header: bool = True,
max_data_rows: int = 5,
) -> Tuple[List[str], List[dict[str, str]], str]:
"""
Gibt (headers, rows_as_dicts, verwendetes_delimiter) zurück.
rows sind Rohstrings pro Zelle.
"""
lines = _split_first_lines(text, max_lines=50)
if not lines:
return [], [], ","
delim = delimiter if delimiter is not None else sniff_delimiter(lines[0])
reader = csv.reader(io.StringIO(text.replace("\r\n", "\n").replace("\r", "\n")), delimiter=delim)
rows_raw: List[List[str]] = []
for i, row in enumerate(reader):
if i >= 1 + max_data_rows + (1 if has_header else 0):
break
if not any(c.strip() for c in row):
continue
rows_raw.append(row)
if not rows_raw:
return [], [], delim
if has_header:
headers = [canonical_csv_header_label(h) for h in rows_raw[0]]
data = rows_raw[1 : 1 + max_data_rows]
else:
n = len(rows_raw[0])
headers = [f"col_{i}" for i in range(n)]
data = rows_raw[:max_data_rows]
dict_rows: List[dict[str, str]] = []
for r in data:
row_dict: dict[str, str] = {}
for j, h in enumerate(headers):
row_dict[h] = r[j].strip() if j < len(r) else ""
dict_rows.append(row_dict)
return headers, dict_rows, delim
def normalize_header_for_signature(name: str) -> str:
s = canonical_csv_header_label(name).lower()
s = re.sub(r"\s+", "_", s)
s = re.sub(r"[^a-z0-9_äöüß().%-]+", "_", s)
return s.strip("_")
def column_signature(headers: List[str]) -> List[str]:
"""Sortierte normalisierte Spaltennamen für Signatur-Vergleich."""
return sorted(
{normalize_header_for_signature(h) for h in headers if h is not None and canonical_csv_header_label(str(h))}
)
def headers_signature_match_score(sig_csv: List[str], sig_template: List[str]) -> float:
"""Jaccard-Überlappung 0..1 (|A∩B|/|AB|). Fällt stark, wenn die CSV viele Zusatzspalten hat."""
a, b = set(sig_csv), set(sig_template)
if not a and not b:
return 1.0
if not a or not b:
return 0.0
inter = len(a & b)
union = len(a | b)
return inter / union if union else 0.0
def headers_signature_template_recall(sig_csv: Sequence[str], sig_template: Sequence[str]) -> float:
"""
Anteil der Template-Spalten (Signatur), die in der CSV vorkommen: |A∩B|/|B|.
100 %, sobald alle für die Vorlage relevanten Spalten in der Datei sind — unabhängig von
Zusatzspalten (Gewicht + Ernährung in einer Datei erzeugt keinen „Abzug“ für die jeweilige Vorlage).
"""
a = set(sig_csv)
b = {normalize_header_for_signature(str(x)) for x in sig_template}
b.discard("")
if not b:
return 1.0 if not a else 0.0
inter = len(a & b)
return inter / len(b)
def headers_signature_rank_metrics(sig_csv: List[str], sig_template: List[str]) -> dict[str, Any]:
"""
Einheitliche Kennzahlen für Vorlagen-Ranking und UI.
confidence = template_recall (empfohlen für Anzeige / Sortierung primär).
"""
a = set(sig_csv)
b = {normalize_header_for_signature(str(x)) for x in sig_template}
b.discard("")
inter = a & b
n_inter = len(inter)
n_b = len(b)
n_a = len(a)
union = len(a | b)
template_recall = n_inter / n_b if n_b else (1.0 if not n_a else 0.0)
jaccard = n_inter / union if union else 0.0
return {
"confidence": round(template_recall, 4),
"template_recall": round(template_recall, 4),
"jaccard": round(jaccard, 4),
"columns_matched": n_inter,
"columns_in_template": n_b,
"columns_in_csv": n_a,
}
def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]:
"""Liest Limits aus system_config.csv_import; Fallback bei fehlendem Key."""
defaults = {"max_rows_per_file": 50_000, "max_file_bytes": 52_428_800}
if not conn_row or "value" not in conn_row:
return defaults
val = conn_row["value"]
if isinstance(val, dict):
out = {**defaults, **{k: int(v) for k, v in val.items() if k in defaults}}
return out
return defaults
def iter_csv_dict_rows(
text: str,
delimiter: str,
*,
has_header: bool = True,
) -> Iterator[Dict[str, str]]:
"""
Vollständige Datei zeilenweise als Dict (Header = Keys).
Spaltenreihenfolge ist egal; zusätzliche Spalten werden ignoriert, wenn sie nicht
in field_mappings vorkommen. Keine Obergrenze für die Spaltenanzahl (nur Zeilenlimits
kommen aus system_config / Import-Router).
"""
if not has_header:
raise ValueError("CSV ohne Kopfzeile wird für Import noch nicht unterstützt")
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
reader = csv.DictReader(io.StringIO(normalized), delimiter=delimiter)
for row in reader:
if row is None:
continue
if not any(v and str(v).strip() for v in row.values()):
continue
yield {
canonical_csv_header_label(k): (v or "").strip()
for k, v in row.items()
if canonical_csv_header_label(k)
}