""" CSV bytes → text, delimiter sniffing, strukturierte Erstzeilen für Analyse (Issue #21). """ from __future__ import annotations import csv import io import re from typing import Any, Dict, Iterator, List, Sequence, Tuple _DEFAULT_DELIMS = [",", ";", "\t"] def decode_raw_bytes(raw: bytes) -> str: """UTF-8 bevorzugt, Fallback Latin-1; BOM entfernen.""" if not raw: return "" for enc in ("utf-8-sig", "utf-8", "latin-1"): try: text = raw.decode(enc) break except UnicodeDecodeError: text = "" continue else: text = raw.decode("utf-8", errors="replace") if text.startswith("\ufeff"): text = text[1:] return text def sniff_delimiter(sample_line: str) -> str: """ Heuristik: Zähle Vorkommen der Kandidaten in der ersten Datenzeile. Kein csv.Sniffer (robuster gegen kurze Zeilen). """ if not sample_line or not sample_line.strip(): return "," best = "," best_count = -1 for d in _DEFAULT_DELIMS: c = sample_line.count(d) if c > best_count: best_count = c best = d return best def _split_first_lines(text: str, max_lines: int = 5) -> List[str]: lines: List[str] = [] for line in text.splitlines(): if line.strip(): lines.append(line) if len(lines) >= max_lines: break return lines def parse_csv_sample( text: str, delimiter: str | None = None, has_header: bool = True, max_data_rows: int = 5, ) -> Tuple[List[str], List[dict[str, str]], str]: """ Gibt (headers, rows_as_dicts, verwendetes_delimiter) zurück. rows sind Rohstrings pro Zelle. """ lines = _split_first_lines(text, max_lines=50) if not lines: return [], [], "," delim = delimiter if delimiter is not None else sniff_delimiter(lines[0]) reader = csv.reader(io.StringIO(text.replace("\r\n", "\n").replace("\r", "\n")), delimiter=delim) rows_raw: List[List[str]] = [] for i, row in enumerate(reader): if i >= 1 + max_data_rows + (1 if has_header else 0): break if not any(c.strip() for c in row): continue rows_raw.append(row) if not rows_raw: return [], [], delim if has_header: headers = [h.strip() for h in rows_raw[0]] data = rows_raw[1 : 1 + max_data_rows] else: n = len(rows_raw[0]) headers = [f"col_{i}" for i in range(n)] data = rows_raw[:max_data_rows] dict_rows: List[dict[str, str]] = [] for r in data: row_dict: dict[str, str] = {} for j, h in enumerate(headers): row_dict[h] = r[j].strip() if j < len(r) else "" dict_rows.append(row_dict) return headers, dict_rows, delim def normalize_header_for_signature(name: str) -> str: s = name.strip().lower() s = re.sub(r"\s+", "_", s) s = re.sub(r"[^a-z0-9_äöüß().%-]+", "_", s) return s.strip("_") def column_signature(headers: List[str]) -> List[str]: """Sortierte normalisierte Spaltennamen für Signatur-Vergleich.""" return sorted({normalize_header_for_signature(h) for h in headers if h is not None and str(h).strip()}) def headers_signature_match_score(sig_csv: List[str], sig_template: List[str]) -> float: """Jaccard-Überlappung 0..1 (|A∩B|/|A∪B|). Fällt stark, wenn die CSV viele Zusatzspalten hat.""" a, b = set(sig_csv), set(sig_template) if not a and not b: return 1.0 if not a or not b: return 0.0 inter = len(a & b) union = len(a | b) return inter / union if union else 0.0 def headers_signature_template_recall(sig_csv: Sequence[str], sig_template: Sequence[str]) -> float: """ Anteil der Template-Spalten (Signatur), die in der CSV vorkommen: |A∩B|/|B|. 100 %, sobald alle für die Vorlage relevanten Spalten in der Datei sind — unabhängig von Zusatzspalten (Gewicht + Ernährung in einer Datei erzeugt keinen „Abzug“ für die jeweilige Vorlage). """ a = set(sig_csv) b = {normalize_header_for_signature(str(x)) for x in sig_template} b.discard("") if not b: return 1.0 if not a else 0.0 inter = len(a & b) return inter / len(b) def headers_signature_rank_metrics(sig_csv: List[str], sig_template: List[str]) -> dict[str, Any]: """ Einheitliche Kennzahlen für Vorlagen-Ranking und UI. confidence = template_recall (empfohlen für Anzeige / Sortierung primär). """ a = set(sig_csv) b = {normalize_header_for_signature(str(x)) for x in sig_template} b.discard("") inter = a & b n_inter = len(inter) n_b = len(b) n_a = len(a) union = len(a | b) template_recall = n_inter / n_b if n_b else (1.0 if not n_a else 0.0) jaccard = n_inter / union if union else 0.0 return { "confidence": round(template_recall, 4), "template_recall": round(template_recall, 4), "jaccard": round(jaccard, 4), "columns_matched": n_inter, "columns_in_template": n_b, "columns_in_csv": n_a, } def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]: """Liest Limits aus system_config.csv_import; Fallback bei fehlendem Key.""" defaults = {"max_rows_per_file": 50_000, "max_file_bytes": 52_428_800} if not conn_row or "value" not in conn_row: return defaults val = conn_row["value"] if isinstance(val, dict): out = {**defaults, **{k: int(v) for k, v in val.items() if k in defaults}} return out return defaults def _strip_header_key(k: str | None) -> str: if k is None: return "" return str(k).strip().removeprefix("\ufeff") def iter_csv_dict_rows( text: str, delimiter: str, *, has_header: bool = True, ) -> Iterator[Dict[str, str]]: """ Vollständige Datei zeilenweise als Dict (Header = Keys). Spaltenreihenfolge ist egal; zusätzliche Spalten werden ignoriert, wenn sie nicht in field_mappings vorkommen. Keine Obergrenze für die Spaltenanzahl (nur Zeilenlimits kommen aus system_config / Import-Router). """ if not has_header: raise ValueError("CSV ohne Kopfzeile wird für Import noch nicht unterstützt") normalized = text.replace("\r\n", "\n").replace("\r", "\n") reader = csv.DictReader(io.StringIO(normalized), delimiter=delimiter) for row in reader: if row is None: continue if not any(v and str(v).strip() for v in row.values()): continue yield {_strip_header_key(k): (v or "").strip() for k, v in row.items() if _strip_header_key(k)}