- Introduced a new utility function `canonical_csv_header_label` to standardize CSV header labels, improving consistency in field mapping. - Updated the `_lookup_db_field` function to support prefix matching for longer manual keys, enhancing the accuracy of field resolution. - Added tests to validate handling of non-breaking space characters in CSV headers and ensure correct mapping to normalized keys, improving robustness of CSV parsing.
221 lines
6.9 KiB
Python
221 lines
6.9 KiB
Python
"""
|
||
CSV bytes → text, delimiter sniffing, strukturierte Erstzeilen für Analyse (Issue #21).
|
||
"""
|
||
|
||
from __future__ import annotations
|
||
|
||
import csv
|
||
import io
|
||
import re
|
||
from typing import Any, Dict, Iterator, List, Sequence, Tuple
|
||
|
||
_DEFAULT_DELIMS = [",", ";", "\t"]
|
||
|
||
|
||
def decode_raw_bytes(raw: bytes) -> str:
|
||
"""UTF-8 bevorzugt, Fallback Latin-1; BOM entfernen."""
|
||
if not raw:
|
||
return ""
|
||
for enc in ("utf-8-sig", "utf-8", "latin-1"):
|
||
try:
|
||
text = raw.decode(enc)
|
||
break
|
||
except UnicodeDecodeError:
|
||
text = ""
|
||
continue
|
||
else:
|
||
text = raw.decode("utf-8", errors="replace")
|
||
if text.startswith("\ufeff"):
|
||
text = text[1:]
|
||
return text
|
||
|
||
|
||
def sniff_delimiter(sample_line: str) -> str:
|
||
"""
|
||
Heuristik: Zähle Vorkommen der Kandidaten in der ersten Datenzeile.
|
||
Kein csv.Sniffer (robuster gegen kurze Zeilen).
|
||
"""
|
||
if not sample_line or not sample_line.strip():
|
||
return ","
|
||
best = ","
|
||
best_count = -1
|
||
for d in _DEFAULT_DELIMS:
|
||
c = sample_line.count(d)
|
||
if c > best_count:
|
||
best_count = c
|
||
best = d
|
||
return best
|
||
|
||
|
||
def _split_first_lines(text: str, max_lines: int = 5) -> List[str]:
|
||
lines: List[str] = []
|
||
for line in text.splitlines():
|
||
if line.strip():
|
||
lines.append(line)
|
||
if len(lines) >= max_lines:
|
||
break
|
||
return lines
|
||
|
||
|
||
def canonical_csv_header_label(name: str | None) -> str:
|
||
"""
|
||
Einheitlicher Spalten-Key für Analyse (Vorlage/Dialog), Import und Signatur.
|
||
BOM und NBSP (häufig in Excel/Apple-Exporten) werden vereinheitlicht, damit
|
||
field_mappings exakt zu DictReader-Zeilen passt.
|
||
"""
|
||
if name is None:
|
||
return ""
|
||
s = str(name).replace("\ufeff", "").replace("\u00a0", " ").strip()
|
||
return s
|
||
|
||
|
||
def parse_csv_sample(
|
||
text: str,
|
||
delimiter: str | None = None,
|
||
has_header: bool = True,
|
||
max_data_rows: int = 5,
|
||
) -> Tuple[List[str], List[dict[str, str]], str]:
|
||
"""
|
||
Gibt (headers, rows_as_dicts, verwendetes_delimiter) zurück.
|
||
rows sind Rohstrings pro Zelle.
|
||
"""
|
||
lines = _split_first_lines(text, max_lines=50)
|
||
if not lines:
|
||
return [], [], ","
|
||
|
||
delim = delimiter if delimiter is not None else sniff_delimiter(lines[0])
|
||
reader = csv.reader(io.StringIO(text.replace("\r\n", "\n").replace("\r", "\n")), delimiter=delim)
|
||
rows_raw: List[List[str]] = []
|
||
for i, row in enumerate(reader):
|
||
if i >= 1 + max_data_rows + (1 if has_header else 0):
|
||
break
|
||
if not any(c.strip() for c in row):
|
||
continue
|
||
rows_raw.append(row)
|
||
|
||
if not rows_raw:
|
||
return [], [], delim
|
||
|
||
if has_header:
|
||
headers = [canonical_csv_header_label(h) for h in rows_raw[0]]
|
||
data = rows_raw[1 : 1 + max_data_rows]
|
||
else:
|
||
n = len(rows_raw[0])
|
||
headers = [f"col_{i}" for i in range(n)]
|
||
data = rows_raw[:max_data_rows]
|
||
|
||
dict_rows: List[dict[str, str]] = []
|
||
for r in data:
|
||
row_dict: dict[str, str] = {}
|
||
for j, h in enumerate(headers):
|
||
row_dict[h] = r[j].strip() if j < len(r) else ""
|
||
dict_rows.append(row_dict)
|
||
|
||
return headers, dict_rows, delim
|
||
|
||
|
||
def normalize_header_for_signature(name: str) -> str:
|
||
s = canonical_csv_header_label(name).lower()
|
||
s = re.sub(r"\s+", "_", s)
|
||
s = re.sub(r"[^a-z0-9_äöüß().%-]+", "_", s)
|
||
return s.strip("_")
|
||
|
||
|
||
def column_signature(headers: List[str]) -> List[str]:
|
||
"""Sortierte normalisierte Spaltennamen für Signatur-Vergleich."""
|
||
return sorted(
|
||
{normalize_header_for_signature(h) for h in headers if h is not None and canonical_csv_header_label(str(h))}
|
||
)
|
||
|
||
|
||
def headers_signature_match_score(sig_csv: List[str], sig_template: List[str]) -> float:
|
||
"""Jaccard-Überlappung 0..1 (|A∩B|/|A∪B|). Fällt stark, wenn die CSV viele Zusatzspalten hat."""
|
||
a, b = set(sig_csv), set(sig_template)
|
||
if not a and not b:
|
||
return 1.0
|
||
if not a or not b:
|
||
return 0.0
|
||
inter = len(a & b)
|
||
union = len(a | b)
|
||
return inter / union if union else 0.0
|
||
|
||
|
||
def headers_signature_template_recall(sig_csv: Sequence[str], sig_template: Sequence[str]) -> float:
|
||
"""
|
||
Anteil der Template-Spalten (Signatur), die in der CSV vorkommen: |A∩B|/|B|.
|
||
100 %, sobald alle für die Vorlage relevanten Spalten in der Datei sind — unabhängig von
|
||
Zusatzspalten (Gewicht + Ernährung in einer Datei erzeugt keinen „Abzug“ für die jeweilige Vorlage).
|
||
"""
|
||
a = set(sig_csv)
|
||
b = {normalize_header_for_signature(str(x)) for x in sig_template}
|
||
b.discard("")
|
||
if not b:
|
||
return 1.0 if not a else 0.0
|
||
inter = len(a & b)
|
||
return inter / len(b)
|
||
|
||
|
||
def headers_signature_rank_metrics(sig_csv: List[str], sig_template: List[str]) -> dict[str, Any]:
|
||
"""
|
||
Einheitliche Kennzahlen für Vorlagen-Ranking und UI.
|
||
confidence = template_recall (empfohlen für Anzeige / Sortierung primär).
|
||
"""
|
||
a = set(sig_csv)
|
||
b = {normalize_header_for_signature(str(x)) for x in sig_template}
|
||
b.discard("")
|
||
inter = a & b
|
||
n_inter = len(inter)
|
||
n_b = len(b)
|
||
n_a = len(a)
|
||
union = len(a | b)
|
||
template_recall = n_inter / n_b if n_b else (1.0 if not n_a else 0.0)
|
||
jaccard = n_inter / union if union else 0.0
|
||
return {
|
||
"confidence": round(template_recall, 4),
|
||
"template_recall": round(template_recall, 4),
|
||
"jaccard": round(jaccard, 4),
|
||
"columns_matched": n_inter,
|
||
"columns_in_template": n_b,
|
||
"columns_in_csv": n_a,
|
||
}
|
||
|
||
|
||
def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]:
|
||
"""Liest Limits aus system_config.csv_import; Fallback bei fehlendem Key."""
|
||
defaults = {"max_rows_per_file": 50_000, "max_file_bytes": 52_428_800}
|
||
if not conn_row or "value" not in conn_row:
|
||
return defaults
|
||
val = conn_row["value"]
|
||
if isinstance(val, dict):
|
||
out = {**defaults, **{k: int(v) for k, v in val.items() if k in defaults}}
|
||
return out
|
||
return defaults
|
||
|
||
|
||
def iter_csv_dict_rows(
|
||
text: str,
|
||
delimiter: str,
|
||
*,
|
||
has_header: bool = True,
|
||
) -> Iterator[Dict[str, str]]:
|
||
"""
|
||
Vollständige Datei zeilenweise als Dict (Header = Keys).
|
||
Spaltenreihenfolge ist egal; zusätzliche Spalten werden ignoriert, wenn sie nicht
|
||
in field_mappings vorkommen. Keine Obergrenze für die Spaltenanzahl (nur Zeilenlimits
|
||
kommen aus system_config / Import-Router).
|
||
"""
|
||
if not has_header:
|
||
raise ValueError("CSV ohne Kopfzeile wird für Import noch nicht unterstützt")
|
||
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
|
||
reader = csv.DictReader(io.StringIO(normalized), delimiter=delimiter)
|
||
for row in reader:
|
||
if row is None:
|
||
continue
|
||
if not any(v and str(v).strip() for v in row.values()):
|
||
continue
|
||
yield {
|
||
canonical_csv_header_label(k): (v or "").strip()
|
||
for k, v in row.items()
|
||
if canonical_csv_header_label(k)
|
||
}
|