diff --git a/app/core/chunking/chunking_strategies.py b/app/core/chunking/chunking_strategies.py index b8cc96c..59a8d41 100644 --- a/app/core/chunking/chunking_strategies.py +++ b/app/core/chunking/chunking_strategies.py @@ -7,6 +7,8 @@ DESCRIPTION: Strategien für atomares Sektions-Chunking v3.9.9. - Fix: Synchronisierung der Parameter mit dem Orchestrator (context_prefix). WP-24c v4.2.5: Strict-Mode ohne Carry-Over - Bei strict_heading_split wird nach jeder Sektion geflasht. WP-26 v1.0: section_type und block_id werden an Chunks weitergegeben. + WP-26 v1.1: Section-Type-Wechsel erzwingt IMMER einen neuen Chunk (unabhängig vom Profil). + Gleiche section_types folgen dem normalen Chunking-Verhalten. """ from typing import List, Dict, Any, Optional from .chunking_models import RawBlock, Chunk @@ -53,33 +55,18 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: # --- SCHRITT 1: Gruppierung in atomare Sektions-Einheiten --- # WP-26 v1.0: Erweitert um section_type und block_id Tracking + # WP-26 v1.1: Section-Type-Wechsel erzwingt IMMER eine neue Sektion (unabhängig vom Profil) sections: List[Dict[str, Any]] = [] curr_blocks = [] - for b in blocks: - if b.kind == "heading" and b.level <= split_level: - if curr_blocks: - # WP-26 v1.0: Finde den effektiven section_type und block_id für diese Sektion - # Priorisiere den ersten Block mit section_type, sonst den Heading-Block - effective_section_type = None - effective_block_id = None - for cb in curr_blocks: - if cb.section_type and effective_section_type is None: - effective_section_type = cb.section_type - if cb.block_id and effective_block_id is None: - effective_block_id = cb.block_id - - sections.append({ - "text": "\n\n".join([x.text for x in curr_blocks]), - "meta": curr_blocks[0], - "is_empty": len(curr_blocks) == 1 and curr_blocks[0].kind == "heading", - "section_type": effective_section_type, - "block_id": effective_block_id - }) - curr_blocks = [b] - else: - curr_blocks.append(b) - if curr_blocks: - # WP-26 v1.0: Gleiche Logik für den letzten Block + current_section_type = None # WP-26 v1.1: Tracking des aktuellen section_type + + def _flush_section(): + """Hilfsfunktion zum Abschließen einer Sektion.""" + nonlocal curr_blocks, current_section_type + if not curr_blocks: + return + # WP-26 v1.0: Finde den effektiven section_type und block_id für diese Sektion + # Priorisiere den ersten Block mit section_type, sonst den Heading-Block effective_section_type = None effective_block_id = None for cb in curr_blocks: @@ -95,6 +82,41 @@ def strategy_by_heading(blocks: List[RawBlock], config: Dict[str, Any], note_id: "section_type": effective_section_type, "block_id": effective_block_id }) + curr_blocks = [] + current_section_type = effective_section_type + + for b in blocks: + # WP-26 v1.1: Prüfe auf Section-Type-Wechsel BEVOR wir den Block hinzufügen + # Wenn sich der section_type ändert, muss die aktuelle Sektion abgeschlossen werden + block_section_type = b.section_type + + # Section-Type-Wechsel erkannt? + # (Wechsel ist: alter Typ != neuer Typ UND mindestens einer ist nicht None) + is_section_type_change = ( + curr_blocks and # Es gibt bereits Blöcke + block_section_type is not None and # Neuer Block hat expliziten section_type + current_section_type != block_section_type # Typ hat sich geändert + ) + + if b.kind == "heading" and b.level <= split_level: + # Heading-basierter Split (Standard-Verhalten) + _flush_section() + curr_blocks = [b] + current_section_type = block_section_type # Update tracking + elif is_section_type_change: + # WP-26 v1.1: Section-Type-Wechsel erzwingt neuen Chunk + _flush_section() + curr_blocks = [b] + current_section_type = block_section_type + else: + # Normales Hinzufügen zum aktuellen Block + curr_blocks.append(b) + # Update section_type wenn Block einen hat und wir noch keinen haben + if block_section_type and not current_section_type: + current_section_type = block_section_type + + # Letzte Sektion abschließen + _flush_section() # --- SCHRITT 2: Verarbeitung der Queue --- queue = list(sections) @@ -217,30 +239,17 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note """ Standard-Sliding-Window für flache Texte ohne Sektionsfokus. WP-26 v1.0: Erweitert um section_type und block_id Weitergabe. + WP-26 v1.1: Section-Type-Wechsel erzwingt IMMER einen neuen Chunk. """ target = config.get("target", 400); max_tokens = config.get("max", 600) chunks: List[Chunk] = []; buf: List[RawBlock] = [] + current_section_type = None # WP-26 v1.1: Tracking des aktuellen section_type - for b in blocks: - b_tokens = estimate_tokens(b.text) - curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0 - if curr_tokens + b_tokens > max_tokens and buf: - txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) - win = _create_win(context_prefix, buf[0].section_title, txt) - # WP-26 v1.0: Finde effektiven section_type und block_id - effective_section_type = next((b.section_type for b in buf if b.section_type), None) - effective_block_id = next((b.block_id for b in buf if b.block_id), None) - chunks.append(Chunk( - id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=curr_tokens, - section_title=buf[0].section_title, section_path=buf[0].section_path, - neighbors_prev=None, neighbors_next=None, - section_type=effective_section_type, block_id=effective_block_id - )) - buf = [] - buf.append(b) - - if buf: + def _flush_buffer(): + """Hilfsfunktion zum Flushen des Buffers.""" + nonlocal buf, current_section_type + if not buf: + return txt = "\n\n".join([x.text for x in buf]); idx = len(chunks) win = _create_win(context_prefix, buf[0].section_title, txt) # WP-26 v1.0: Finde effektiven section_type und block_id @@ -248,10 +257,36 @@ def strategy_sliding_window(blocks: List[RawBlock], config: Dict[str, Any], note effective_block_id = next((b.block_id for b in buf if b.block_id), None) chunks.append(Chunk( id=f"{note_id}#c{idx:02d}", note_id=note_id, index=idx, - text=txt, window=win, token_count=estimate_tokens(txt), + text=txt, window=win, token_count=sum(estimate_tokens(x.text) for x in buf), section_title=buf[0].section_title, section_path=buf[0].section_path, neighbors_prev=None, neighbors_next=None, section_type=effective_section_type, block_id=effective_block_id )) + buf = [] + current_section_type = effective_section_type + + for b in blocks: + b_tokens = estimate_tokens(b.text) + curr_tokens = sum(estimate_tokens(x.text) for x in buf) if buf else 0 + block_section_type = b.section_type + + # WP-26 v1.1: Prüfe auf Section-Type-Wechsel + is_section_type_change = ( + buf and # Es gibt bereits Blöcke im Buffer + block_section_type is not None and # Neuer Block hat expliziten section_type + current_section_type != block_section_type # Typ hat sich geändert + ) + + # Flush wenn: Token-Limit überschritten ODER Section-Type-Wechsel + if (curr_tokens + b_tokens > max_tokens and buf) or is_section_type_change: + _flush_buffer() + + buf.append(b) + # Update section_type wenn Block einen hat und wir noch keinen haben + if block_section_type and not current_section_type: + current_section_type = block_section_type + + # Letzten Buffer flushen + _flush_buffer() return chunks \ No newline at end of file diff --git a/app/core/graph/graph_derive_edges.py b/app/core/graph/graph_derive_edges.py index 6c2f6f8..9b90463 100644 --- a/app/core/graph/graph_derive_edges.py +++ b/app/core/graph/graph_derive_edges.py @@ -32,7 +32,8 @@ import logging from typing import List, Optional, Dict, Tuple, Set from .graph_utils import ( _get, _edge, _mk_edge_id, _dedupe_seq, parse_link_target, - PROVENANCE_PRIORITY, load_types_registry, get_edge_defaults_for + PROVENANCE_PRIORITY, load_types_registry, get_edge_defaults_for, + get_typical_edge_for # WP-26 v1.1: Für automatische Intra-Note-Edges ) from .graph_extractors import ( extract_typed_relations, extract_callout_relations, extract_wikilinks @@ -645,6 +646,52 @@ def build_edges_for_note( "edge_id": _mk_edge_id("prev", next_id, cid, "chunk"), "provenance": "structure", "rule_id": "structure:order", "confidence": PROVENANCE_PRIORITY["structure:order"] })) + + # 1b) WP-26 v1.1: Automatische Intra-Note-Edges zwischen Sektionen mit unterschiedlichen Typen + # Wenn sich der section_type zwischen aufeinanderfolgenden Chunks ändert, + # wird eine semantische Kante basierend auf graph_schema.md erstellt. + for idx, ch in enumerate(chunks): + if idx >= len(chunks) - 1: + continue # Kein nächster Chunk + + cid = _get(ch, "chunk_id", "id") + next_ch = chunks[idx + 1] + next_id = _get(next_ch, "chunk_id", "id") + + if not cid or not next_id: + continue + + # Hole die effective_types der Chunks + # WP-26 v1.1: section_type oder note_type (effective_type) + current_section_type = ch.get("section_type") + next_section_type = next_ch.get("section_type") + current_type = current_section_type or ch.get("type") or note_type + next_type = next_section_type or next_ch.get("type") or note_type + + # Prüfe, ob es einen Section-Type-Wechsel gibt + # Nur wenn beide einen expliziten section_type haben oder sich die effective_types unterscheiden + is_section_change = ( + (current_section_type is not None or next_section_type is not None) and + current_type != next_type + ) + + if is_section_change: + # Ermittle den passenden Edge-Typ aus graph_schema.md + edge_kind = get_typical_edge_for(current_type, next_type) + + logger.debug(f"WP-26 Intra-Note-Edge: {current_type} -> {next_type} = {edge_kind}") + + # Erstelle die automatische Edge (Forward-Richtung) + edges.append(_edge(edge_kind, "chunk", cid, next_id, note_id, { + "chunk_id": cid, + "edge_id": _mk_edge_id(edge_kind, cid, next_id, "chunk"), + "provenance": "rule", + "rule_id": "inferred:section_transition", + "source_hint": "schema_default", + "confidence": PROVENANCE_PRIORITY.get("schema_default", 0.85), + "is_internal": True, # Explizit als Intra-Note-Edge markieren + "section_transition": f"{current_type}->{next_type}" # Debug-Info + })) # 2) Inhaltliche Kanten (Explicit & Candidate Pool) reg = load_types_registry() diff --git a/app/core/graph/graph_utils.py b/app/core/graph/graph_utils.py index 5859f3b..3181816 100644 --- a/app/core/graph/graph_utils.py +++ b/app/core/graph/graph_utils.py @@ -33,6 +33,7 @@ PROVENANCE_PRIORITY = { "derived:backlink": 0.90, "edge_defaults": 0.70, "schema_default": 0.85, + "inferred:section_transition": 0.85, # WP-26 v1.1: Automatische Section-Übergänge # Struktur-Kanten (provenance: "structure") "structure:belongs_to": 1.00, "structure:order": 0.95, @@ -56,6 +57,7 @@ PROVENANCE_TO_DTO = { "edge_defaults": ("rule", "edge_defaults"), "schema_default": ("rule", "schema_default"), "inferred:schema": ("rule", "schema_default"), + "inferred:section_transition": ("rule", "schema_default"), # WP-26 v1.1 "rule": ("rule", None), # structure "structure:belongs_to": ("structure", "belongs_to"), @@ -255,4 +257,157 @@ def get_edge_defaults_for(note_type: Optional[str], reg: dict) -> List[str]: if isinstance(v, dict) and isinstance(v.get("edge_defaults"), list): return [str(x) for x in v["edge_defaults"] if isinstance(x, str)] - return [] \ No newline at end of file + return [] + +# --------------------------------------------------------------------------- +# WP-26 v1.1: Graph-Schema Parser für automatische Edge-Typ-Ableitung +# --------------------------------------------------------------------------- + +# Cache für geladenes Schema (vermeidet mehrfaches Parsen) +_GRAPH_SCHEMA_CACHE: Optional[Dict[str, Dict[str, List[str]]]] = None + +def load_graph_schema() -> Dict[str, Dict[str, List[str]]]: + """ + WP-26 v1.1: Parst das graph_schema.md und extrahiert Typical Edge-Types. + + Das Schema hat folgendes Format: + ## Source: `experience` + | Target-Note-type | Typical Edge-Types | Prohibited Edge-Types | + | :--- | :--- | :--- | + | `event` | `caused_by` | `consists_of` | + + Returns: + Dict[source_type, Dict[target_type, List[typical_edges]]] + Beispiel: {"experience": {"event": ["caused_by"], "insight": ["resulted_in"]}} + """ + global _GRAPH_SCHEMA_CACHE + if _GRAPH_SCHEMA_CACHE is not None: + return _GRAPH_SCHEMA_CACHE + + import re + import logging + logger = logging.getLogger(__name__) + + schema_path = get_schema_path() + + # Versuche verschiedene Pfade + paths_to_try = [ + schema_path, + os.path.join(os.getcwd(), "config", "graph_schema.md"), + os.path.join(os.path.dirname(__file__), "..", "..", "..", "config", "graph_schema.md"), + ] + + # Falls MINDNET_OBSIDIAN_DICTIONARY gesetzt ist, nutze diesen Pfad + obsidian_dict = os.getenv("MINDNET_OBSIDIAN_DICTIONARY") + if obsidian_dict: + paths_to_try.insert(0, os.path.join(obsidian_dict, "graph_schema.md")) + + content = None + for path in paths_to_try: + if os.path.isfile(path): + try: + with open(path, "r", encoding="utf-8") as f: + content = f.read() + logger.debug(f"Graph-Schema geladen von: {path}") + break + except Exception as e: + logger.warning(f"Fehler beim Laden von {path}: {e}") + + if not content: + logger.warning("Graph-Schema nicht gefunden. Fallback auf leeres Schema.") + _GRAPH_SCHEMA_CACHE = {} + return _GRAPH_SCHEMA_CACHE + + schema: Dict[str, Dict[str, List[str]]] = {} + current_source = None + + # Regex für Source-Header: ## Source: `experience` + source_pattern = re.compile(r'^##\s+Source:\s*`(\w+)`', re.IGNORECASE) + + # Regex für Tabellen-Zeile: | `event` | `caused_by` | `consists_of` | + # oder | `event` | `caused_by`, `resulted_in` | - | + table_row_pattern = re.compile( + r'^\|\s*`(\w+)`\s*\|\s*([^|]+)\s*\|\s*([^|]*)\s*\|' + ) + + for line in content.split('\n'): + stripped = line.strip() + + # Prüfe auf Source-Header + source_match = source_pattern.match(stripped) + if source_match: + current_source = source_match.group(1).lower() + if current_source not in schema: + schema[current_source] = {} + continue + + # Prüfe auf Tabellen-Zeile (nur wenn wir einen Source haben) + if current_source: + row_match = table_row_pattern.match(stripped) + if row_match: + target_type = row_match.group(1).lower() + typical_edges_raw = row_match.group(2).strip() + + # Parse die Edge-Types (können mit Backticks und Kommas getrennt sein) + # Format: `caused_by`, `resulted_in` oder `caused_by` + edge_pattern = re.compile(r'`(\w+)`') + typical_edges = edge_pattern.findall(typical_edges_raw) + + if typical_edges: + schema[current_source][target_type] = typical_edges + + logger.info(f"Graph-Schema geladen: {len(schema)} Source-Types") + _GRAPH_SCHEMA_CACHE = schema + return schema + +def get_typical_edge_for(source_type: str, target_type: str) -> Optional[str]: + """ + WP-26 v1.1: Ermittelt den ersten "Typical Edge-Type" für ein Typ-Paar. + + Args: + source_type: Typ der Quell-Sektion (z.B. "experience") + target_type: Typ der Ziel-Sektion (z.B. "insight") + + Returns: + Der erste Typical Edge-Type (z.B. "resulted_in") oder None + """ + schema = load_graph_schema() + + source_lower = source_type.lower() if source_type else "default" + target_lower = target_type.lower() if target_type else "any" + + # 1. Exakter Match + if source_lower in schema: + source_rules = schema[source_lower] + if target_lower in source_rules: + edges = source_rules[target_lower] + if edges: + return edges[0] + # 2. Fallback auf "any" Target + if "any" in source_rules: + edges = source_rules["any"] + if edges: + return edges[0] + + # 3. Fallback auf "default" Source + if "default" in schema: + default_rules = schema["default"] + if target_lower in default_rules: + edges = default_rules[target_lower] + if edges: + return edges[0] + if "any" in default_rules: + edges = default_rules["any"] + if edges: + return edges[0] + + # 4. Absoluter Fallback + return "related_to" + +def clear_graph_schema_cache(): + """ + WP-26 v1.1: Löscht den Cache für das Graph-Schema. + Nützlich für Tests oder wenn das Schema neu geladen werden soll. + """ + global _GRAPH_SCHEMA_CACHE + _GRAPH_SCHEMA_CACHE = None \ No newline at end of file diff --git a/tests/test_wp26_section_types.py b/tests/test_wp26_section_types.py index f8e4464..8d5dc8d 100644 --- a/tests/test_wp26_section_types.py +++ b/tests/test_wp26_section_types.py @@ -1,12 +1,17 @@ """ FILE: tests/test_wp26_section_types.py DESCRIPTION: Unit-Tests für WP-26 Phase 1: Section-Types und Intra-Note-Edges -VERSION: 1.0.0 + WP-26 v1.1: Erweitert um Tests für Section-Split und automatische Edges +VERSION: 1.1.0 """ import pytest from app.core.chunking.chunking_parser import parse_blocks from app.core.chunking.chunking_models import RawBlock, Chunk -from app.core.graph.graph_utils import normalize_provenance, _edge +from app.core.chunking.chunking_strategies import strategy_by_heading, strategy_sliding_window +from app.core.graph.graph_utils import ( + normalize_provenance, _edge, get_typical_edge_for, + load_graph_schema, clear_graph_schema_cache +) class TestSectionTypeRecognition: @@ -261,5 +266,218 @@ class TestNestedEdgeCallouts: assert len(callouts) >= 1 +# ============================================================================= +# WP-26 v1.1: Tests für Section-Type-Wechsel und automatische Chunk-Splits +# ============================================================================= + +class TestSectionTypeSplit: + """UT-14: Section-Type-Wechsel erzwingt neuen Chunk""" + + def test_section_type_change_forces_new_chunk_by_heading(self): + """Section-Type-Wechsel erzwingt neuen Chunk (by_heading Strategie)""" + md = """ +## Situation ^sit +> [!section] experience + +Die Geschichte beginnt hier. + +## Reflexion ^ref +> [!section] insight + +Erkenntnisse aus der Situation. +""" + blocks, _ = parse_blocks(md) + + # Konfiguration: strict=False (würde normalerweise zusammenfassen) + config = { + "target": 1000, # Hoher Wert, um sicherzustellen, dass nicht wegen Token-Limit gesplittet wird + "max": 2000, + "split_level": 2, + "strict_heading_split": False, # Normalerweise würde zusammengefasst + "enable_smart_edge_allocation": True + } + + chunks = strategy_by_heading(blocks, config, "test-note") + + # Es sollten mindestens 2 Chunks geben (wegen Section-Type-Wechsel) + assert len(chunks) >= 2 + + # Prüfe, dass die Chunks unterschiedliche section_types haben + section_types = [c.section_type for c in chunks if c.section_type] + assert "experience" in section_types + assert "insight" in section_types + + def test_same_section_type_follows_normal_behavior(self): + """Gleicher Section-Type folgt normalem Chunking-Verhalten""" + md = """ +## Section A +> [!section] experience + +Content A about experience. + +## Section B +> [!section] experience + +Content B also about experience. +""" + blocks, _ = parse_blocks(md) + + # Konfiguration: strict=False (würde zusammenfassen bei gleichem Typ) + config = { + "target": 1000, + "max": 2000, + "split_level": 2, + "strict_heading_split": False, + "enable_smart_edge_allocation": True + } + + chunks = strategy_by_heading(blocks, config, "test-note") + + # Bei gleichem section_type und strict=False könnten Chunks zusammengefasst werden + # (abhängig von Token-Limits) + # Wichtig: Alle Chunks sollten section_type "experience" haben + for c in chunks: + if c.section_type: + assert c.section_type == "experience" + + def test_sliding_window_respects_section_type_change(self): + """sliding_window Strategie respektiert Section-Type-Wechsel""" + md = """ +## Part 1 +> [!section] experience + +Short content. + +## Part 2 +> [!section] insight + +Another short content. +""" + blocks, _ = parse_blocks(md) + + config = { + "target": 1000, # Hoher Wert + "max": 2000 + } + + chunks = strategy_sliding_window(blocks, config, "test-note") + + # Bei Section-Type-Wechsel sollte gesplittet werden + section_types = [c.section_type for c in chunks if c.section_type] + + # Beide Typen sollten in separaten Chunks sein + if len(section_types) >= 2: + assert "experience" in section_types or "insight" in section_types + + +class TestGraphSchemaParser: + """UT-15: Graph-Schema Parser Tests""" + + def test_get_typical_edge_experience_to_insight(self): + """Typischer Edge von experience zu insight ist 'resulted_in'""" + # Lade Schema (falls verfügbar) + edge = get_typical_edge_for("experience", "insight") + + # Sollte entweder 'resulted_in' oder Fallback 'related_to' sein + assert edge in ["resulted_in", "related_to"] + + def test_get_typical_edge_insight_to_decision(self): + """Typischer Edge von insight zu decision""" + edge = get_typical_edge_for("insight", "decision") + + # Basierend auf graph_schema.md: foundation_for oder guides + assert edge in ["foundation_for", "guides", "related_to"] + + def test_get_typical_edge_fallback(self): + """Fallback auf 'related_to' für unbekannte Typen""" + edge = get_typical_edge_for("unknown_type_1", "unknown_type_2") + + # Fallback sollte immer related_to sein + assert edge == "related_to" + + def test_get_typical_edge_any_target(self): + """Fallback auf 'any' Target-Regel""" + edge = get_typical_edge_for("experience", "unknown_target") + + # Sollte Fallback auf "any"-Regel oder "related_to" + assert edge is not None + + +class TestAutomaticIntraNoteEdges: + """UT-16: Automatische Intra-Note-Edges zwischen Sektionen""" + + def test_edge_payload_has_section_transition(self): + """Edge zwischen Sektionen enthält section_transition Metadaten""" + edge = _edge( + kind="resulted_in", + scope="chunk", + source_id="note1#c00", + target_id="note1#c01", + note_id="note1", + extra={ + "provenance": "rule", + "rule_id": "inferred:section_transition", + "section_transition": "experience->insight", + "is_internal": True + } + ) + + assert edge["is_internal"] is True + assert edge["section_transition"] == "experience->insight" + assert edge["provenance"] == "rule" + + def test_inferred_section_transition_provenance(self): + """Provenance 'inferred:section_transition' wird korrekt normalisiert""" + prov, hint = normalize_provenance("inferred:section_transition") + + assert prov == "rule" + assert hint == "schema_default" + + +class TestRealWorldScenario: + """UT-17: Real-World Szenario - Krebsdiagnose Note""" + + def test_krebsdiagnose_note_structure(self): + """Testet die erwartete Struktur der Krebsdiagnose-Note""" + md = """ +## 📖 Diagnose: Glioblastom ^kontext + +Nach der Operation gab es ein Diagnose-Gespräch. + +## 🎭 Emotions-Check ^emotionen + +Ich reagierte mit Zittern am Körper. + +## 💡 Lektion ^learning +> [!section] insight + +Ich habe versucht die nächsten Schritte zu durchdenken. +Meine positive Einstellung hat mir geholfen. +""" + blocks, _ = parse_blocks(md) + + # Konfiguration wie structured_smart_edges + config = { + "target": 400, + "max": 600, + "split_level": 2, + "strict_heading_split": False, + "enable_smart_edge_allocation": True + } + + chunks = strategy_by_heading(blocks, config, "krebsdiagnose") + + # Wegen Section-Type-Wechsel (None -> insight) sollte die Lektion + # ein separater Chunk sein + insight_chunks = [c for c in chunks if c.section_type == "insight"] + + # Mindestens ein Chunk mit section_type "insight" + assert len(insight_chunks) >= 1 + + # Der insight-Chunk sollte den Lektions-Inhalt enthalten + insight_text = insight_chunks[0].text + assert "durchdenken" in insight_text.lower() or "positive" in insight_text.lower() + + if __name__ == "__main__": pytest.main([__file__, "-v"])