""" PDF-Bericht aus ReportProfilePayload: ReportLab für Text/Layout, Matplotlib für Chart.js-ähnliche Payloads. """ from __future__ import annotations import io import logging from typing import Any from xml.sax.saxutils import escape import matplotlib matplotlib.use("Agg") import matplotlib.pyplot as plt from reportlab.lib.pagesizes import A4 from reportlab.lib.styles import getSampleStyleSheet from reportlab.lib.units import mm from reportlab.platypus import Image as RLImage from reportlab.platypus import Paragraph, SimpleDocTemplate, Spacer from db import get_cursor, get_db from report_chart_fetch import fetch_chart_payload from report_profile_schema import ( AiInsightBlock, ChartBlock, ReportProfilePayload, SectionBlock, ) logger = logging.getLogger(__name__) _CONTENT_TRUNCATE = 12000 def _color_to_rgb(hex_or_rgba: str) -> tuple[float, float, float]: s = (hex_or_rgba or "#333333").strip() if s.startswith("#") and len(s) >= 7: try: r = int(s[1:3], 16) / 255.0 g = int(s[3:5], 16) / 255.0 b = int(s[5:7], 16) / 255.0 return (r, g, b) except ValueError: pass return (0.12, 0.62, 0.46) def chart_payload_to_png(payload: dict[str, Any], fig_width_in: float = 6.2, fig_height_in: float = 3.4) -> bytes: """Erzeugt PNG aus Chart.js-kompatiblem Payload (line, bar, pie).""" chart_type = payload.get("chart_type") or "line" data = payload.get("data") or {} labels = data.get("labels") or [] datasets = data.get("datasets") or [] fig, ax = plt.subplots(figsize=(fig_width_in, fig_height_in), dpi=120) ax.set_facecolor("#fafaf9") fig.patch.set_facecolor("#ffffff") if chart_type == "pie" and datasets: ds0 = datasets[0] values = ds0.get("data") or [] colors = ds0.get("backgroundColor") or ["#1D9E75", "#378ADD", "#D85A30"] if labels and values and len(labels) == len(values): ax.pie(values, labels=labels, autopct="%1.0f%%", colors=colors[: len(values)], startangle=90) ax.axis("equal") else: ax.text(0.5, 0.5, "Keine Daten", ha="center", va="center", transform=ax.transAxes) elif chart_type in ("line", "bar", "scatter") and datasets: x = range(len(labels)) if labels else [] for i, ds in enumerate(datasets): y = ds.get("data") or [] if not y: continue lab = ds.get("label") or f"Serie {i + 1}" col = _color_to_rgb(str(ds.get("borderColor") or ds.get("backgroundColor") or "#1D9E75")) if chart_type == "bar": yv = y[: len(labels)] if labels else y bg = ds.get("backgroundColor") if isinstance(bg, list): cols = [_color_to_rgb(str(c)) for c in bg[: len(yv)]] else: cols = [_color_to_rgb(str(bg or "#1D9E75"))] * len(yv) ax.bar(list(range(len(yv))), yv, label=lab, color=cols[: len(yv)], alpha=0.88) else: ax.plot( list(x)[: len(y)], y, label=lab, color=col, linewidth=1.6, marker="o", markersize=2, ) if labels and chart_type != "bar": step = max(1, len(labels) // 8) ax.set_xticks(list(x)[::step]) ax.set_xticklabels([labels[j] for j in range(0, len(labels), step)], rotation=25, fontsize=7) elif labels and chart_type == "bar": ax.set_xticks(list(x)) ax.set_xticklabels(labels, rotation=30, fontsize=7) ax.legend(loc="upper right", fontsize=7) ax.grid(True, alpha=0.25) ax.set_xmargin(0.02) else: ax.text(0.5, 0.5, "Diagrammtyp nicht unterstützt oder leer", ha="center", va="center", transform=ax.transAxes) fig.tight_layout() buf = io.BytesIO() fig.savefig(buf, format="png", bbox_inches="tight", facecolor=fig.get_facecolor()) plt.close(fig) buf.seek(0) return buf.read() def _insight_text(profile_id: str, insight_id: str | None) -> tuple[str, str]: """Returns (heading, body_text).""" if not insight_id: return ( "KI-Auswertung", "(Noch keine Auswahl — in einer späteren Version kannst du hier eine gespeicherte KI-Analyse " "verknüpfen.)", ) try: with get_db() as conn: cur = get_cursor(conn) cur.execute( "SELECT scope, content, created FROM ai_insights WHERE id = %s AND profile_id = %s", (insight_id, profile_id), ) row = cur.fetchone() if not row: return ("KI-Auswertung", "Eintrag nicht gefunden oder keine Berechtigung.") scope = row.get("scope") or "Analyse" content = row.get("content") or "" if len(content) > _CONTENT_TRUNCATE: content = content[:_CONTENT_TRUNCATE] + "\n\n[… gekürzt …]" created = row.get("created") sub = f"{scope}" + (f" · {created}" if created else "") return (sub, content) except Exception as e: logger.warning("report pdf insight load failed: %s", e) return ("KI-Auswertung", "Fehler beim Laden des Eintrags.") def build_structured_report_pdf( *, profile_id: str, profile_name: str, payload: ReportProfilePayload, ) -> bytes: """Vollständiges PDF als Bytes (A4).""" buf = io.BytesIO() doc = SimpleDocTemplate( buf, pagesize=A4, leftMargin=14 * mm, rightMargin=14 * mm, topMargin=16 * mm, bottomMargin=16 * mm, ) styles = getSampleStyleSheet() story: list[Any] = [] title = (payload.document_title or "").strip() or f"{profile_name} – Bericht" story.append(Paragraph(escape(title), styles["Title"])) story.append(Spacer(1, 6 * mm)) for block in payload.blocks: if isinstance(block, SectionBlock): story.append(Spacer(1, 4 * mm)) story.append(Paragraph(escape(block.title), styles["Heading2"])) story.append(Spacer(1, 2 * mm)) elif isinstance(block, ChartBlock): try: chart = fetch_chart_payload(block.chart_id, profile_id, block.window_days) except Exception as e: logger.warning("chart fetch %s: %s", block.chart_id, e) story.append(Paragraph(f"Diagramm {block.chart_id}: Fehler bei Daten.", styles["Normal"])) continue meta = chart.get("metadata") or {} if meta.get("confidence") == "insufficient": msg = meta.get("message") or "Nicht genug Daten" story.append(Paragraph(f"{block.chart_id}: {msg}", styles["Normal"])) story.append(Spacer(1, 3 * mm)) continue try: png = chart_payload_to_png(chart) img_buf = io.BytesIO(png) # Breite ~ volle Textbreite (~180mm auf A4 mit Standardrändern Platypus) iw = 170 * mm ih = 85 * mm story.append(RLImage(img_buf, width=iw, height=ih)) except Exception as e: logger.warning("chart render %s: %s", block.chart_id, e) story.append(Paragraph(f"Diagramm {block.chart_id}: Darstellung fehlgeschlagen.", styles["Normal"])) story.append(Spacer(1, 4 * mm)) elif isinstance(block, AiInsightBlock): heading, body = _insight_text(profile_id, block.insight_id) if block.title.strip(): story.append(Paragraph(escape(block.title), styles["Heading3"])) else: story.append(Paragraph(escape(heading), styles["Heading3"])) for para in body.split("\n\n"): p = (para or "").strip() if p: story.append(Paragraph(escape(p), styles["BodyText"])) story.append(Spacer(1, 4 * mm)) doc.build(story) return buf.getvalue()