Implement ID collision detection and enhance logging in ingestion_processor.py

Add a check for ID collisions during the ingestion process to prevent multiple files from using the same note_id. Update logging levels to DEBUG for detailed diagnostics on hash comparisons, body lengths, and frontmatter keys, improving traceability and debugging capabilities in the ingestion workflow.
This commit is contained in:
Lars 2026-01-12 08:56:28 +01:00
parent f9118a36f8
commit ec9b3c68af

View File

@ -269,8 +269,20 @@ class IngestionService:
# Prüft Hash VOR der Verarbeitung, um redundante Ingestion zu vermeiden # Prüft Hash VOR der Verarbeitung, um redundante Ingestion zu vermeiden
old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id) old_payload = None if force_replace else fetch_note_payload(self.client, self.prefix, note_id)
# WP-24c v4.5.9-DEBUG: Erweiterte Diagnose-Logs für Change-Detection (INFO-Level für Sichtbarkeit) # WP-24c v4.5.10: Prüfe auf ID-Kollisionen (zwei Dateien mit derselben note_id)
logger.info(f"🔍 [CHANGE-DETECTION] Start für '{note_id}': force_replace={force_replace}, old_payload={old_payload is not None}") if old_payload and not force_replace:
old_path = old_payload.get("path", "")
if old_path and old_path != normalized_file_path:
# ID-Kollision erkannt: Zwei verschiedene Dateien haben dieselbe note_id
logger.error(
f"❌ [ID-KOLLISION] Kritischer Fehler: Die note_id '{note_id}' wird bereits von einer anderen Datei verwendet!\n"
f" Bereits vorhanden: '{old_path}'\n"
f" Konflikt mit: '{normalized_file_path}'\n"
f" Lösung: Bitte ändern Sie die 'id' im Frontmatter einer der beiden Dateien, um eine eindeutige ID zu gewährleisten."
)
return {**result, "status": "error", "error": "id_collision", "note_id": note_id, "existing_path": old_path, "conflicting_path": normalized_file_path}
logger.debug(f"🔍 [CHANGE-DETECTION] Start für '{note_id}': force_replace={force_replace}, old_payload={old_payload is not None}")
content_changed = True content_changed = True
hash_match = False hash_match = False
@ -283,16 +295,16 @@ class IngestionService:
# WP-24c v4.5.9-DEBUG: Detaillierte Hash-Diagnose (INFO-Level) # WP-24c v4.5.9-DEBUG: Detaillierte Hash-Diagnose (INFO-Level)
logger.info(f"🔍 [CHANGE-DETECTION] Hash-Vergleich für '{note_id}':") logger.info(f"🔍 [CHANGE-DETECTION] Hash-Vergleich für '{note_id}':")
logger.info(f" -> Hash-Key: '{h_key}'") logger.debug(f" -> Hash-Key: '{h_key}'")
logger.info(f" -> Active Hash-Mode: '{self.active_hash_mode or 'full'}'") logger.debug(f" -> Active Hash-Mode: '{self.active_hash_mode or 'full'}'")
logger.info(f" -> New Hash vorhanden: {bool(new_h)}") logger.debug(f" -> New Hash vorhanden: {bool(new_h)}")
logger.info(f" -> Old Hash vorhanden: {bool(old_h)}") logger.debug(f" -> Old Hash vorhanden: {bool(old_h)}")
if new_h: if new_h:
logger.info(f" -> New Hash (erste 32 Zeichen): {new_h[:32]}...") logger.debug(f" -> New Hash (erste 32 Zeichen): {new_h[:32]}...")
if old_h: if old_h:
logger.info(f" -> Old Hash (erste 32 Zeichen): {old_h[:32]}...") logger.debug(f" -> Old Hash (erste 32 Zeichen): {old_h[:32]}...")
logger.info(f" -> Verfügbare Hash-Keys in new: {list(note_pl.get('hashes', {}).keys())}") logger.debug(f" -> Verfügbare Hash-Keys in new: {list(note_pl.get('hashes', {}).keys())}")
logger.info(f" -> Verfügbare Hash-Keys in old: {list(old_payload.get('hashes', {}).keys())}") logger.debug(f" -> Verfügbare Hash-Keys in old: {list(old_payload.get('hashes', {}).keys())}")
if new_h and old_h: if new_h and old_h:
hash_match = (new_h == old_h) hash_match = (new_h == old_h)
@ -304,48 +316,48 @@ class IngestionService:
# Finde erste unterschiedliche Position # Finde erste unterschiedliche Position
diff_pos = next((i for i, (a, b) in enumerate(zip(new_h, old_h)) if a != b), None) diff_pos = next((i for i, (a, b) in enumerate(zip(new_h, old_h)) if a != b), None)
if diff_pos is not None: if diff_pos is not None:
logger.info(f" -> Hash-Unterschied: Erste unterschiedliche Position: {diff_pos}") logger.debug(f" -> Hash-Unterschied: Erste unterschiedliche Position: {diff_pos}")
else: else:
logger.info(f" -> Hash-Unterschied: Längen unterschiedlich (new={len(new_h)}, old={len(old_h)})") logger.debug(f" -> Hash-Unterschied: Längen unterschiedlich (new={len(new_h)}, old={len(old_h)})")
# WP-24c v4.5.9-DEBUG: Logge Hash-Input für Diagnose # WP-24c v4.5.10: Logge Hash-Input für Diagnose (DEBUG-Level)
# WICHTIG: _get_hash_source_content benötigt ein Dictionary, nicht das ParsedNote-Objekt! # WICHTIG: _get_hash_source_content benötigt ein Dictionary, nicht das ParsedNote-Objekt!
from app.core.ingestion.ingestion_note_payload import _get_hash_source_content, _as_dict from app.core.ingestion.ingestion_note_payload import _get_hash_source_content, _as_dict
hash_mode = self.active_hash_mode or 'full' hash_mode = self.active_hash_mode or 'full'
# Konvertiere parsed zu Dictionary für _get_hash_source_content # Konvertiere parsed zu Dictionary für _get_hash_source_content
parsed_dict = _as_dict(parsed) parsed_dict = _as_dict(parsed)
hash_input = _get_hash_source_content(parsed_dict, hash_mode) hash_input = _get_hash_source_content(parsed_dict, hash_mode)
logger.info(f" -> Hash-Input (erste 200 Zeichen): {hash_input[:200]}...") logger.debug(f" -> Hash-Input (erste 200 Zeichen): {hash_input[:200]}...")
logger.info(f" -> Hash-Input Länge: {len(hash_input)}") logger.debug(f" -> Hash-Input Länge: {len(hash_input)}")
# WP-24c v4.5.9-DEBUG: Vergleiche auch Body-Länge und Frontmatter # WP-24c v4.5.10: Vergleiche auch Body-Länge und Frontmatter (DEBUG-Level)
# Verwende parsed.body statt note_pl.get("body") # Verwende parsed.body statt note_pl.get("body")
new_body = str(getattr(parsed, "body", "") or "").strip() new_body = str(getattr(parsed, "body", "") or "").strip()
old_body = str(old_payload.get("body", "")).strip() if old_payload else "" old_body = str(old_payload.get("body", "")).strip() if old_payload else ""
logger.info(f" -> Body-Länge: new={len(new_body)}, old={len(old_body)}") logger.debug(f" -> Body-Länge: new={len(new_body)}, old={len(old_body)}")
if len(new_body) != len(old_body): if len(new_body) != len(old_body):
logger.warning(f" -> ⚠️ Body-Länge unterschiedlich! Mögliche Ursache: Parsing-Unterschiede") logger.debug(f" -> ⚠️ Body-Länge unterschiedlich! Mögliche Ursache: Parsing-Unterschiede")
# Verwende parsed.frontmatter statt note_pl.get("frontmatter") # Verwende parsed.frontmatter statt note_pl.get("frontmatter")
new_fm = getattr(parsed, "frontmatter", {}) or {} new_fm = getattr(parsed, "frontmatter", {}) or {}
old_fm = old_payload.get("frontmatter", {}) if old_payload else {} old_fm = old_payload.get("frontmatter", {}) if old_payload else {}
logger.info(f" -> Frontmatter-Keys: new={sorted(new_fm.keys())}, old={sorted(old_fm.keys())}") logger.debug(f" -> Frontmatter-Keys: new={sorted(new_fm.keys())}, old={sorted(old_fm.keys())}")
# Prüfe relevante Frontmatter-Felder # Prüfe relevante Frontmatter-Felder
relevant_keys = ["title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight", "split_level", "strict_heading_split"] relevant_keys = ["title", "type", "status", "tags", "chunking_profile", "chunk_profile", "retriever_weight", "split_level", "strict_heading_split"]
for key in relevant_keys: for key in relevant_keys:
new_val = new_fm.get(key) if isinstance(new_fm, dict) else getattr(new_fm, key, None) new_val = new_fm.get(key) if isinstance(new_fm, dict) else getattr(new_fm, key, None)
old_val = old_fm.get(key) if isinstance(old_fm, dict) else None old_val = old_fm.get(key) if isinstance(old_fm, dict) else None
if new_val != old_val: if new_val != old_val:
logger.warning(f" -> ⚠️ Frontmatter '{key}' unterschiedlich: new={new_val}, old={old_val}") logger.debug(f" -> ⚠️ Frontmatter '{key}' unterschiedlich: new={new_val}, old={old_val}")
else: else:
# WP-24c v4.5.9: Wenn Hash fehlt, als geändert behandeln (Sicherheit) # WP-24c v4.5.10: Wenn Hash fehlt, als geändert behandeln (Sicherheit)
logger.warning(f"⚠️ [CHANGE-DETECTION] Hash fehlt für '{note_id}': new_h={bool(new_h)}, old_h={bool(old_h)}") logger.debug(f"⚠️ [CHANGE-DETECTION] Hash fehlt für '{note_id}': new_h={bool(new_h)}, old_h={bool(old_h)}")
logger.info(f" -> Grund: Hash wird als 'geändert' behandelt, da Hash-Werte fehlen") logger.debug(f" -> Grund: Hash wird als 'geändert' behandelt, da Hash-Werte fehlen")
else: else:
if force_replace: if force_replace:
logger.info(f"🔍 [CHANGE-DETECTION] '{note_id}': force_replace=True -> überspringe Hash-Check") logger.debug(f"🔍 [CHANGE-DETECTION] '{note_id}': force_replace=True -> überspringe Hash-Check")
elif not old_payload: elif not old_payload:
logger.warning(f"🔍 [CHANGE-DETECTION] '{note_id}': ⚠️ Keine alte Payload gefunden -> erste Verarbeitung oder gelöscht") logger.debug(f"🔍 [CHANGE-DETECTION] '{note_id}': ⚠️ Keine alte Payload gefunden -> erste Verarbeitung oder gelöscht")
# WP-24c v4.5.9: Strikte Logik - überspringe komplett wenn Hash identisch # WP-24c v4.5.9: Strikte Logik - überspringe komplett wenn Hash identisch
# WICHTIG: Artifact-Check NACH Hash-Check, da purge_before die Artefakte löschen kann # WICHTIG: Artifact-Check NACH Hash-Check, da purge_before die Artefakte löschen kann
@ -357,12 +369,12 @@ class IngestionService:
logger.info(f"⏭️ [SKIP] '{note_id}' unverändert (Hash identisch - überspringe komplett, auch wenn Artefakte fehlen)") logger.info(f"⏭️ [SKIP] '{note_id}' unverändert (Hash identisch - überspringe komplett, auch wenn Artefakte fehlen)")
return {**result, "status": "unchanged", "note_id": note_id, "reason": "hash_identical"} return {**result, "status": "unchanged", "note_id": note_id, "reason": "hash_identical"}
elif not force_replace and old_payload and not hash_match: elif not force_replace and old_payload and not hash_match:
# WP-24c v4.5.9-DEBUG: Hash geändert - erlaube Verarbeitung # WP-24c v4.5.10: Hash geändert - erlaube Verarbeitung (DEBUG-Level)
logger.info(f"🔍 [CHANGE-DETECTION] '{note_id}': Hash geändert -> erlaube Verarbeitung") logger.debug(f"🔍 [CHANGE-DETECTION] '{note_id}': Hash geändert -> erlaube Verarbeitung")
# WP-24c v4.5.9: Hash geändert oder keine alte Payload - prüfe Artefakte für normale Verarbeitung # WP-24c v4.5.10: Hash geändert oder keine alte Payload - prüfe Artefakte für normale Verarbeitung
c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id) c_miss, e_miss = artifacts_missing(self.client, self.prefix, note_id)
logger.info(f"🔍 [CHANGE-DETECTION] '{note_id}': Artifact-Check: c_miss={c_miss}, e_miss={e_miss}") logger.debug(f"🔍 [CHANGE-DETECTION] '{note_id}': Artifact-Check: c_miss={c_miss}, e_miss={e_miss}")
if not apply: if not apply:
return {**result, "status": "dry-run", "changed": True, "note_id": note_id} return {**result, "status": "dry-run", "changed": True, "note_id": note_id}