Notizen overvride von chunk_profile und retriever_weight in der Notiz
This commit is contained in:
parent
c8cdf218f2
commit
a4272c17a9
|
|
@ -1,7 +1,9 @@
|
||||||
"""
|
"""
|
||||||
FILE: app/core/ingestion.py
|
FILE: app/core/ingestion.py
|
||||||
DESCRIPTION: Haupt-Ingestion-Logik. Liest Markdown, steuert Change-Detection (via ENV) und schreibt in Qdrant.
|
DESCRIPTION: Haupt-Ingestion-Logik.
|
||||||
VERSION: 2.6.0 (Feat: Flexible Change Detection & Full Logic Restoration)
|
FIX: Korrekte Priorisierung von Frontmatter für chunk_profile und retriever_weight.
|
||||||
|
Lade Chunk-Config basierend auf dem effektiven Profil, nicht nur dem Notiz-Typ.
|
||||||
|
VERSION: 2.7.0 (Fix: Frontmatter Overrides & Config Loading)
|
||||||
STATUS: Active
|
STATUS: Active
|
||||||
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.core.derive_edges, app.core.qdrant*, app.services.embeddings_client
|
DEPENDENCIES: app.core.parser, app.core.note_payload, app.core.chunker, app.core.derive_edges, app.core.qdrant*, app.services.embeddings_client
|
||||||
EXTERNAL_CONFIG: config/types.yaml
|
EXTERNAL_CONFIG: config/types.yaml
|
||||||
|
|
@ -54,18 +56,42 @@ def resolve_note_type(requested: Optional[str], reg: dict) -> str:
|
||||||
if requested and requested in types: return requested
|
if requested and requested in types: return requested
|
||||||
return "concept"
|
return "concept"
|
||||||
|
|
||||||
def effective_chunk_profile(note_type: str, reg: dict) -> str:
|
def effective_chunk_profile_name(fm: dict, note_type: str, reg: dict) -> str:
|
||||||
|
"""
|
||||||
|
Ermittelt den Namen des Chunk-Profils.
|
||||||
|
Prio: 1. Frontmatter -> 2. Type-Config -> 3. Default
|
||||||
|
"""
|
||||||
|
# 1. Frontmatter Override
|
||||||
|
override = fm.get("chunking_profile") or fm.get("chunk_profile")
|
||||||
|
if override and isinstance(override, str):
|
||||||
|
return override
|
||||||
|
|
||||||
|
# 2. Type Config
|
||||||
t_cfg = reg.get("types", {}).get(note_type, {})
|
t_cfg = reg.get("types", {}).get(note_type, {})
|
||||||
# FIX: Konsistenz mit note_payload.py - Prüfe erst den korrekten Key
|
|
||||||
if t_cfg:
|
if t_cfg:
|
||||||
if t_cfg.get("chunking_profile"): return t_cfg.get("chunking_profile")
|
cp = t_cfg.get("chunking_profile") or t_cfg.get("chunk_profile")
|
||||||
if t_cfg.get("chunk_profile"): return t_cfg.get("chunk_profile") # Legacy
|
if cp: return cp
|
||||||
|
|
||||||
|
# 3. Global Default
|
||||||
return reg.get("defaults", {}).get("chunking_profile", "sliding_standard")
|
return reg.get("defaults", {}).get("chunking_profile", "sliding_standard")
|
||||||
|
|
||||||
def effective_retriever_weight(note_type: str, reg: dict) -> float:
|
def effective_retriever_weight(fm: dict, note_type: str, reg: dict) -> float:
|
||||||
|
"""
|
||||||
|
Ermittelt das Retriever Weight.
|
||||||
|
Prio: 1. Frontmatter -> 2. Type-Config -> 3. Default
|
||||||
|
"""
|
||||||
|
# 1. Frontmatter Override
|
||||||
|
override = fm.get("retriever_weight")
|
||||||
|
if override is not None:
|
||||||
|
try: return float(override)
|
||||||
|
except: pass
|
||||||
|
|
||||||
|
# 2. Type Config
|
||||||
t_cfg = reg.get("types", {}).get(note_type, {})
|
t_cfg = reg.get("types", {}).get(note_type, {})
|
||||||
if t_cfg and "retriever_weight" in t_cfg:
|
if t_cfg and "retriever_weight" in t_cfg:
|
||||||
return float(t_cfg["retriever_weight"])
|
return float(t_cfg["retriever_weight"])
|
||||||
|
|
||||||
|
# 3. Global Default
|
||||||
return float(reg.get("defaults", {}).get("retriever_weight", 1.0))
|
return float(reg.get("defaults", {}).get("retriever_weight", 1.0))
|
||||||
|
|
||||||
|
|
||||||
|
|
@ -90,6 +116,23 @@ class IngestionService:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.warning(f"DB init warning: {e}")
|
logger.warning(f"DB init warning: {e}")
|
||||||
|
|
||||||
|
def _get_chunk_config_by_profile(self, profile_name: str, note_type: str) -> Dict[str, Any]:
|
||||||
|
"""
|
||||||
|
Lädt die konkrete Config (target, max, overlap) für einen Profilnamen.
|
||||||
|
"""
|
||||||
|
# Suche direkt in den definierten Profilen der Registry
|
||||||
|
profiles = self.registry.get("chunking_profiles", {})
|
||||||
|
if profile_name in profiles:
|
||||||
|
cfg = profiles[profile_name].copy()
|
||||||
|
# Tuple-Fix für Overlap (wie in chunker.py)
|
||||||
|
if "overlap" in cfg and isinstance(cfg["overlap"], list):
|
||||||
|
cfg["overlap"] = tuple(cfg["overlap"])
|
||||||
|
return cfg
|
||||||
|
|
||||||
|
# Fallback: Wenn Profilname unbekannt, nutze Standard für den Typ via Chunker
|
||||||
|
logger.warning(f"Profile '{profile_name}' not found in registry. Falling back to type defaults.")
|
||||||
|
return get_chunk_config(note_type)
|
||||||
|
|
||||||
async def process_file(
|
async def process_file(
|
||||||
self,
|
self,
|
||||||
file_path: str,
|
file_path: str,
|
||||||
|
|
@ -98,46 +141,37 @@ class IngestionService:
|
||||||
apply: bool = False,
|
apply: bool = False,
|
||||||
purge_before: bool = False,
|
purge_before: bool = False,
|
||||||
note_scope_refs: bool = False,
|
note_scope_refs: bool = False,
|
||||||
# Hash-Mode wird nicht mehr übergeben, sondern via ENV gesteuert.
|
|
||||||
# Source und Normalize bleiben konfigurierbar.
|
|
||||||
hash_source: str = "parsed",
|
hash_source: str = "parsed",
|
||||||
hash_normalize: str = "canonical"
|
hash_normalize: str = "canonical"
|
||||||
) -> Dict[str, Any]:
|
) -> Dict[str, Any]:
|
||||||
"""
|
|
||||||
Verarbeitet eine einzelne Datei (ASYNC).
|
result = {"path": file_path, "status": "skipped", "changed": False, "error": None}
|
||||||
Inklusive Change Detection (Hash-Check) gegen Qdrant.
|
|
||||||
"""
|
|
||||||
result = {
|
|
||||||
"path": file_path,
|
|
||||||
"status": "skipped",
|
|
||||||
"changed": False,
|
|
||||||
"error": None
|
|
||||||
}
|
|
||||||
|
|
||||||
# 1. Parse & Frontmatter Validation
|
# 1. Parse & Frontmatter Validation
|
||||||
try:
|
try:
|
||||||
parsed = read_markdown(file_path)
|
parsed = read_markdown(file_path)
|
||||||
if not parsed:
|
if not parsed: return {**result, "error": "Empty or unreadable file"}
|
||||||
return {**result, "error": "Empty or unreadable file"}
|
|
||||||
|
|
||||||
fm = normalize_frontmatter(parsed.frontmatter)
|
fm = normalize_frontmatter(parsed.frontmatter)
|
||||||
validate_required_frontmatter(fm)
|
validate_required_frontmatter(fm)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Validation failed for {file_path}: {e}")
|
logger.error(f"Validation failed for {file_path}: {e}")
|
||||||
return {**result, "error": f"Validation failed: {str(e)}"}
|
return {**result, "error": f"Validation failed: {str(e)}"}
|
||||||
|
|
||||||
# 2. Type & Config Resolution
|
# 2. Type & Config Resolution (FIXED)
|
||||||
|
# Wir ermitteln erst den Typ
|
||||||
note_type = resolve_note_type(fm.get("type"), self.registry)
|
note_type = resolve_note_type(fm.get("type"), self.registry)
|
||||||
fm["type"] = note_type
|
fm["type"] = note_type
|
||||||
fm["chunk_profile"] = effective_chunk_profile(note_type, self.registry)
|
|
||||||
|
|
||||||
weight = fm.get("retriever_weight")
|
# Dann ermitteln wir die effektiven Werte unter Berücksichtigung des Frontmatters!
|
||||||
if weight is None:
|
# Hier lag der Fehler: Vorher wurde einfach überschrieben.
|
||||||
weight = effective_retriever_weight(note_type, self.registry)
|
effective_profile = effective_chunk_profile_name(fm, note_type, self.registry)
|
||||||
fm["retriever_weight"] = float(weight)
|
effective_weight = effective_retriever_weight(fm, note_type, self.registry)
|
||||||
|
|
||||||
|
# Wir schreiben die effektiven Werte zurück ins FM, damit note_payload sie sicher hat
|
||||||
|
fm["chunk_profile"] = effective_profile
|
||||||
|
fm["retriever_weight"] = effective_weight
|
||||||
|
|
||||||
# 3. Build Note Payload
|
# 3. Build Note Payload
|
||||||
# Ruft make_note_payload auf, welches JETZT alle Hash-Varianten berechnet.
|
|
||||||
try:
|
try:
|
||||||
note_pl = make_note_payload(
|
note_pl = make_note_payload(
|
||||||
parsed,
|
parsed,
|
||||||
|
|
@ -146,42 +180,33 @@ class IngestionService:
|
||||||
hash_source=hash_source,
|
hash_source=hash_source,
|
||||||
file_path=file_path
|
file_path=file_path
|
||||||
)
|
)
|
||||||
if not note_pl.get("fulltext"):
|
# Text Body Fallback
|
||||||
note_pl["fulltext"] = getattr(parsed, "body", "") or ""
|
if not note_pl.get("fulltext"): note_pl["fulltext"] = getattr(parsed, "body", "") or ""
|
||||||
note_pl["retriever_weight"] = fm["retriever_weight"]
|
|
||||||
|
# Update Payload with explicit effective values (Sicherheit)
|
||||||
|
note_pl["retriever_weight"] = effective_weight
|
||||||
|
note_pl["chunk_profile"] = effective_profile
|
||||||
|
|
||||||
note_id = note_pl["note_id"]
|
note_id = note_pl["note_id"]
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Payload build failed: {e}")
|
logger.error(f"Payload build failed: {e}")
|
||||||
return {**result, "error": f"Payload build failed: {str(e)}"}
|
return {**result, "error": f"Payload build failed: {str(e)}"}
|
||||||
|
|
||||||
# 4. Change Detection (Updated Logic with ENV Strategy)
|
# 4. Change Detection
|
||||||
old_payload = None
|
old_payload = None
|
||||||
if not force_replace:
|
if not force_replace:
|
||||||
old_payload = self._fetch_note_payload(note_id)
|
old_payload = self._fetch_note_payload(note_id)
|
||||||
|
|
||||||
has_old = old_payload is not None
|
has_old = old_payload is not None
|
||||||
|
|
||||||
# Wir bauen den Key basierend auf der ENV-Einstellung
|
|
||||||
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
|
check_key = f"{self.active_hash_mode}:{hash_source}:{hash_normalize}"
|
||||||
|
|
||||||
old_hashes = (old_payload or {}).get("hashes")
|
old_hashes = (old_payload or {}).get("hashes")
|
||||||
|
if isinstance(old_hashes, dict): old_hash = old_hashes.get(check_key)
|
||||||
# Fallback Logik für alte Daten
|
elif isinstance(old_hashes, str) and self.active_hash_mode == "body": old_hash = old_hashes
|
||||||
if isinstance(old_hashes, dict):
|
else: old_hash = None
|
||||||
old_hash = old_hashes.get(check_key)
|
|
||||||
elif isinstance(old_hashes, str):
|
|
||||||
# Sehr alte Legacy Daten hatten Hash direkt als String (meist Body)
|
|
||||||
# Wenn wir im Body-Modus sind, ist das okay, sonst Force Update
|
|
||||||
old_hash = old_hashes if self.active_hash_mode == "body" else None
|
|
||||||
else:
|
|
||||||
old_hash = None
|
|
||||||
|
|
||||||
new_hash = note_pl.get("hashes", {}).get(check_key)
|
new_hash = note_pl.get("hashes", {}).get(check_key)
|
||||||
|
|
||||||
# Vergleich
|
|
||||||
hash_changed = (old_hash != new_hash)
|
hash_changed = (old_hash != new_hash)
|
||||||
|
|
||||||
chunks_missing, edges_missing = self._artifacts_missing(note_id)
|
chunks_missing, edges_missing = self._artifacts_missing(note_id)
|
||||||
|
|
||||||
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
|
should_write = force_replace or (not has_old) or hash_changed or chunks_missing or edges_missing
|
||||||
|
|
@ -192,17 +217,19 @@ class IngestionService:
|
||||||
if not apply:
|
if not apply:
|
||||||
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
return {**result, "status": "dry-run", "changed": True, "note_id": note_id}
|
||||||
|
|
||||||
# 5. Processing (Chunking, Embedding, Edges)
|
# 5. Processing
|
||||||
try:
|
try:
|
||||||
body_text = getattr(parsed, "body", "") or ""
|
body_text = getattr(parsed, "body", "") or ""
|
||||||
|
|
||||||
# --- Config Loading (Clean) ---
|
# FIX: Wir laden jetzt die Config für das SPEZIFISCHE Profil
|
||||||
chunk_config = get_chunk_config(note_type)
|
# (z.B. wenn User "sliding_short" wollte, laden wir dessen Params)
|
||||||
|
chunk_config = self._get_chunk_config_by_profile(effective_profile, note_type)
|
||||||
|
|
||||||
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
|
chunks = await assemble_chunks(fm["id"], body_text, fm["type"], config=chunk_config)
|
||||||
|
|
||||||
|
# chunk_payloads werden mit den aktualisierten FM-Werten gebaut
|
||||||
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
chunk_pls = make_chunk_payloads(fm, note_pl["path"], chunks, note_text=body_text)
|
||||||
|
|
||||||
# Embedding
|
|
||||||
vecs = []
|
vecs = []
|
||||||
if chunk_pls:
|
if chunk_pls:
|
||||||
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
texts = [c.get("window") or c.get("text") or "" for c in chunk_pls]
|
||||||
|
|
@ -217,7 +244,6 @@ class IngestionService:
|
||||||
logger.error(f"Embedding failed: {e}")
|
logger.error(f"Embedding failed: {e}")
|
||||||
raise RuntimeError(f"Embedding failed: {e}")
|
raise RuntimeError(f"Embedding failed: {e}")
|
||||||
|
|
||||||
# Edges
|
|
||||||
try:
|
try:
|
||||||
edges = build_edges_for_note(
|
edges = build_edges_for_note(
|
||||||
note_id,
|
note_id,
|
||||||
|
|
@ -232,7 +258,7 @@ class IngestionService:
|
||||||
logger.error(f"Processing failed: {e}", exc_info=True)
|
logger.error(f"Processing failed: {e}", exc_info=True)
|
||||||
return {**result, "error": f"Processing failed: {str(e)}"}
|
return {**result, "error": f"Processing failed: {str(e)}"}
|
||||||
|
|
||||||
# 6. Upsert Action
|
# 6. Upsert
|
||||||
try:
|
try:
|
||||||
if purge_before and has_old:
|
if purge_before and has_old:
|
||||||
self._purge_artifacts(note_id)
|
self._purge_artifacts(note_id)
|
||||||
|
|
@ -259,9 +285,8 @@ class IngestionService:
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
logger.error(f"Upsert failed: {e}", exc_info=True)
|
logger.error(f"Upsert failed: {e}", exc_info=True)
|
||||||
return {**result, "error": f"DB Upsert failed: {e}"}
|
return {**result, "error": f"DB Upsert failed: {e}"}
|
||||||
|
|
||||||
# --- Qdrant Helper ---
|
# ... (Restliche Methoden wie _fetch_note_payload bleiben unverändert) ...
|
||||||
|
|
||||||
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
def _fetch_note_payload(self, note_id: str) -> Optional[dict]:
|
||||||
from qdrant_client.http import models as rest
|
from qdrant_client.http import models as rest
|
||||||
col = f"{self.prefix}_notes"
|
col = f"{self.prefix}_notes"
|
||||||
|
|
@ -291,38 +316,17 @@ class IngestionService:
|
||||||
self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
|
self.client.delete(collection_name=f"{self.prefix}_{suffix}", points_selector=selector)
|
||||||
except Exception: pass
|
except Exception: pass
|
||||||
|
|
||||||
async def create_from_text(
|
async def create_from_text(self, markdown_content: str, filename: str, vault_root: str, folder: str = "00_Inbox") -> Dict[str, Any]:
|
||||||
self,
|
target_dir = os.path.join(vault_root, folder)
|
||||||
markdown_content: str,
|
os.makedirs(target_dir, exist_ok=True)
|
||||||
filename: str,
|
file_path = os.path.join(target_dir, filename)
|
||||||
vault_root: str,
|
try:
|
||||||
folder: str = "00_Inbox"
|
with open(file_path, "w", encoding="utf-8") as f:
|
||||||
) -> Dict[str, Any]:
|
f.write(markdown_content)
|
||||||
"""
|
f.flush()
|
||||||
WP-11 Persistence API Entrypoint.
|
os.fsync(f.fileno())
|
||||||
"""
|
await asyncio.sleep(0.1)
|
||||||
target_dir = os.path.join(vault_root, folder)
|
logger.info(f"Written file to {file_path}")
|
||||||
os.makedirs(target_dir, exist_ok=True)
|
except Exception as e:
|
||||||
|
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
|
||||||
file_path = os.path.join(target_dir, filename)
|
return await self.process_file(file_path=file_path, vault_root=vault_root, apply=True, force_replace=True, purge_before=True)
|
||||||
|
|
||||||
try:
|
|
||||||
# Robust Write: Ensure Flush & Sync
|
|
||||||
with open(file_path, "w", encoding="utf-8") as f:
|
|
||||||
f.write(markdown_content)
|
|
||||||
f.flush()
|
|
||||||
os.fsync(f.fileno())
|
|
||||||
|
|
||||||
await asyncio.sleep(0.1)
|
|
||||||
|
|
||||||
logger.info(f"Written file to {file_path}")
|
|
||||||
except Exception as e:
|
|
||||||
return {"status": "error", "error": f"Disk write failed: {str(e)}"}
|
|
||||||
|
|
||||||
return await self.process_file(
|
|
||||||
file_path=file_path,
|
|
||||||
vault_root=vault_root,
|
|
||||||
apply=True,
|
|
||||||
force_replace=True,
|
|
||||||
purge_before=True
|
|
||||||
)
|
|
||||||
Loading…
Reference in New Issue
Block a user