mitai-jinkendo/backend/csv_parser/core.py
Lars 5b96bd4f75
All checks were successful
Deploy Development / deploy (push) Successful in 55s
Build Test / pytest-backend (push) Successful in 4s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 17s
feat(csv-import): Add blood pressure and activity row diagnosis functionality
- Introduced `diagnose_blood_pressure_row` and `diagnose_activity_row` functions to validate and analyze blood pressure and activity data from CSV imports.
- Updated the CSV import logic to handle combined datetime columns for blood pressure and activity, improving data integrity during import.
- Enhanced type conversion specifications to include `start_time` for blood pressure and activity, ensuring accurate data mapping.
- Added tests to validate the new diagnosis functions and their integration with existing import processes, ensuring robustness and reliability.
- Updated frontend messages to provide clearer guidance on blood pressure and activity data handling during CSV imports.
2026-04-10 16:43:00 +02:00

209 lines
6.5 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

"""
CSV bytes → text, delimiter sniffing, strukturierte Erstzeilen für Analyse (Issue #21).
"""
from __future__ import annotations
import csv
import io
import re
from typing import Any, Dict, Iterator, List, Sequence, Tuple
_DEFAULT_DELIMS = [",", ";", "\t"]
def decode_raw_bytes(raw: bytes) -> str:
"""UTF-8 bevorzugt, Fallback Latin-1; BOM entfernen."""
if not raw:
return ""
for enc in ("utf-8-sig", "utf-8", "latin-1"):
try:
text = raw.decode(enc)
break
except UnicodeDecodeError:
text = ""
continue
else:
text = raw.decode("utf-8", errors="replace")
if text.startswith("\ufeff"):
text = text[1:]
return text
def sniff_delimiter(sample_line: str) -> str:
"""
Heuristik: Zähle Vorkommen der Kandidaten in der ersten Datenzeile.
Kein csv.Sniffer (robuster gegen kurze Zeilen).
"""
if not sample_line or not sample_line.strip():
return ","
best = ","
best_count = -1
for d in _DEFAULT_DELIMS:
c = sample_line.count(d)
if c > best_count:
best_count = c
best = d
return best
def _split_first_lines(text: str, max_lines: int = 5) -> List[str]:
lines: List[str] = []
for line in text.splitlines():
if line.strip():
lines.append(line)
if len(lines) >= max_lines:
break
return lines
def parse_csv_sample(
text: str,
delimiter: str | None = None,
has_header: bool = True,
max_data_rows: int = 5,
) -> Tuple[List[str], List[dict[str, str]], str]:
"""
Gibt (headers, rows_as_dicts, verwendetes_delimiter) zurück.
rows sind Rohstrings pro Zelle.
"""
lines = _split_first_lines(text, max_lines=50)
if not lines:
return [], [], ","
delim = delimiter if delimiter is not None else sniff_delimiter(lines[0])
reader = csv.reader(io.StringIO(text.replace("\r\n", "\n").replace("\r", "\n")), delimiter=delim)
rows_raw: List[List[str]] = []
for i, row in enumerate(reader):
if i >= 1 + max_data_rows + (1 if has_header else 0):
break
if not any(c.strip() for c in row):
continue
rows_raw.append(row)
if not rows_raw:
return [], [], delim
if has_header:
headers = [h.strip() for h in rows_raw[0]]
data = rows_raw[1 : 1 + max_data_rows]
else:
n = len(rows_raw[0])
headers = [f"col_{i}" for i in range(n)]
data = rows_raw[:max_data_rows]
dict_rows: List[dict[str, str]] = []
for r in data:
row_dict: dict[str, str] = {}
for j, h in enumerate(headers):
row_dict[h] = r[j].strip() if j < len(r) else ""
dict_rows.append(row_dict)
return headers, dict_rows, delim
def normalize_header_for_signature(name: str) -> str:
s = name.strip().lower()
s = re.sub(r"\s+", "_", s)
s = re.sub(r"[^a-z0-9_äöüß().%-]+", "_", s)
return s.strip("_")
def column_signature(headers: List[str]) -> List[str]:
"""Sortierte normalisierte Spaltennamen für Signatur-Vergleich."""
return sorted({normalize_header_for_signature(h) for h in headers if h is not None and str(h).strip()})
def headers_signature_match_score(sig_csv: List[str], sig_template: List[str]) -> float:
"""Jaccard-Überlappung 0..1 (|A∩B|/|AB|). Fällt stark, wenn die CSV viele Zusatzspalten hat."""
a, b = set(sig_csv), set(sig_template)
if not a and not b:
return 1.0
if not a or not b:
return 0.0
inter = len(a & b)
union = len(a | b)
return inter / union if union else 0.0
def headers_signature_template_recall(sig_csv: Sequence[str], sig_template: Sequence[str]) -> float:
"""
Anteil der Template-Spalten (Signatur), die in der CSV vorkommen: |A∩B|/|B|.
100 %, sobald alle für die Vorlage relevanten Spalten in der Datei sind — unabhängig von
Zusatzspalten (Gewicht + Ernährung in einer Datei erzeugt keinen „Abzug“ für die jeweilige Vorlage).
"""
a = set(sig_csv)
b = {normalize_header_for_signature(str(x)) for x in sig_template}
b.discard("")
if not b:
return 1.0 if not a else 0.0
inter = len(a & b)
return inter / len(b)
def headers_signature_rank_metrics(sig_csv: List[str], sig_template: List[str]) -> dict[str, Any]:
"""
Einheitliche Kennzahlen für Vorlagen-Ranking und UI.
confidence = template_recall (empfohlen für Anzeige / Sortierung primär).
"""
a = set(sig_csv)
b = {normalize_header_for_signature(str(x)) for x in sig_template}
b.discard("")
inter = a & b
n_inter = len(inter)
n_b = len(b)
n_a = len(a)
union = len(a | b)
template_recall = n_inter / n_b if n_b else (1.0 if not n_a else 0.0)
jaccard = n_inter / union if union else 0.0
return {
"confidence": round(template_recall, 4),
"template_recall": round(template_recall, 4),
"jaccard": round(jaccard, 4),
"columns_matched": n_inter,
"columns_in_template": n_b,
"columns_in_csv": n_a,
}
def get_csv_import_limits(conn_row: dict | None) -> dict[str, int]:
"""Liest Limits aus system_config.csv_import; Fallback bei fehlendem Key."""
defaults = {"max_rows_per_file": 50_000, "max_file_bytes": 52_428_800}
if not conn_row or "value" not in conn_row:
return defaults
val = conn_row["value"]
if isinstance(val, dict):
out = {**defaults, **{k: int(v) for k, v in val.items() if k in defaults}}
return out
return defaults
def _strip_header_key(k: str | None) -> str:
if k is None:
return ""
return str(k).strip().removeprefix("\ufeff")
def iter_csv_dict_rows(
text: str,
delimiter: str,
*,
has_header: bool = True,
) -> Iterator[Dict[str, str]]:
"""
Vollständige Datei zeilenweise als Dict (Header = Keys).
Spaltenreihenfolge ist egal; zusätzliche Spalten werden ignoriert, wenn sie nicht
in field_mappings vorkommen. Keine Obergrenze für die Spaltenanzahl (nur Zeilenlimits
kommen aus system_config / Import-Router).
"""
if not has_header:
raise ValueError("CSV ohne Kopfzeile wird für Import noch nicht unterstützt")
normalized = text.replace("\r\n", "\n").replace("\r", "\n")
reader = csv.DictReader(io.StringIO(normalized), delimiter=delimiter)
for row in reader:
if row is None:
continue
if not any(v and str(v).strip() for v in row.values()):
continue
yield {_strip_header_key(k): (v or "").strip() for k, v in row.items() if _strip_header_key(k)}