All checks were successful
Deploy mindnet to llm-node / deploy (push) Successful in 3s
221 lines
6.4 KiB
Python
221 lines
6.4 KiB
Python
# app/core/chunk_payload.py
|
|
# Line count: 214
|
|
|
|
from __future__ import annotations
|
|
|
|
from typing import Any, Dict, Iterable, List, Optional, Tuple, Union
|
|
|
|
|
|
def _get(obj: Any, key: str, default: Any = None) -> Any:
|
|
if obj is None:
|
|
return default
|
|
if hasattr(obj, key):
|
|
try:
|
|
val = getattr(obj, key)
|
|
return val if val is not None else default
|
|
except Exception:
|
|
pass
|
|
if isinstance(obj, dict):
|
|
if key in obj:
|
|
val = obj.get(key, default)
|
|
return val if val is not None else default
|
|
return default
|
|
|
|
|
|
def _get_frontmatter(note: Any) -> Dict[str, Any]:
|
|
fm = _get(note, "frontmatter", None)
|
|
if isinstance(fm, dict):
|
|
return fm
|
|
meta = _get(note, "meta", None)
|
|
if isinstance(meta, dict) and isinstance(meta.get("frontmatter"), dict):
|
|
return meta["frontmatter"]
|
|
return {}
|
|
|
|
|
|
def _get_from_frontmatter(fm: Dict[str, Any], key: str, default: Any = None) -> Any:
|
|
if not isinstance(fm, dict):
|
|
return default
|
|
if key in fm:
|
|
val = fm.get(key, default)
|
|
return val if val is not None else default
|
|
return default
|
|
|
|
|
|
def _coerce_tags(val: Any) -> List[str]:
|
|
if val is None:
|
|
return []
|
|
if isinstance(val, list):
|
|
return [str(x) for x in val]
|
|
if isinstance(val, str):
|
|
parts = [t.strip() for t in val.split(",")]
|
|
return [p for p in parts if p]
|
|
return []
|
|
|
|
|
|
def _resolve_retriever_weight(
|
|
fm: Dict[str, Any],
|
|
explicit: Optional[float],
|
|
) -> Optional[float]:
|
|
if explicit is not None:
|
|
return explicit
|
|
val = _get_from_frontmatter(fm, "retriever_weight", None)
|
|
if isinstance(val, (int, float)):
|
|
return float(val)
|
|
retr = fm.get("retriever")
|
|
if isinstance(retr, dict):
|
|
v = retr.get("weight")
|
|
if isinstance(v, (int, float)):
|
|
return float(v)
|
|
return None
|
|
|
|
|
|
def _resolve_note_fields(note: Any) -> Dict[str, Any]:
|
|
fm = _get_frontmatter(note)
|
|
|
|
note_id = _get_from_frontmatter(fm, "id", None)
|
|
if note_id is None:
|
|
note_id = _get(note, "note_id", None)
|
|
if note_id is None:
|
|
note_id = _get(note, "id", None)
|
|
|
|
title = _get_from_frontmatter(fm, "title", None)
|
|
if title is None:
|
|
title = _get(note, "title", None)
|
|
|
|
ntype = _get_from_frontmatter(fm, "type", None)
|
|
if ntype is None:
|
|
ntype = _get(note, "type", None)
|
|
|
|
tags = _get_from_frontmatter(fm, "tags", None)
|
|
if tags is None:
|
|
tags = _get(note, "tags", None)
|
|
tags = _coerce_tags(tags)
|
|
|
|
path = _get_from_frontmatter(fm, "path", None)
|
|
if path is None:
|
|
path = _get(note, "path", None)
|
|
if path is None:
|
|
path = _get(note, "source", None)
|
|
if path is None:
|
|
path = _get(note, "filepath", None)
|
|
|
|
return {
|
|
"note_id": note_id,
|
|
"title": title,
|
|
"type": ntype,
|
|
"tags": tags,
|
|
"path": path,
|
|
"frontmatter": fm,
|
|
}
|
|
|
|
|
|
def _extract_chunk_text_and_index(
|
|
chunk: Any,
|
|
fallback_index: int,
|
|
) -> Tuple[str, int]:
|
|
"""
|
|
Akzeptiert verschiedene Chunk-Formate:
|
|
- str (reiner Text)
|
|
- dict mit keys: text | window | body | content
|
|
- Objekt mit Attributen: text | window | body | content
|
|
- (text, idx) Tuple
|
|
"""
|
|
# Tuple (text, idx)
|
|
if isinstance(chunk, tuple) and len(chunk) == 2 and isinstance(chunk[0], str):
|
|
txt, idx = chunk
|
|
try:
|
|
idx_int = int(idx)
|
|
except Exception:
|
|
idx_int = fallback_index
|
|
return txt, idx_int
|
|
|
|
# String
|
|
if isinstance(chunk, str):
|
|
return chunk, fallback_index
|
|
|
|
# Dict
|
|
if isinstance(chunk, dict):
|
|
txt = (
|
|
chunk.get("text")
|
|
or chunk.get("window")
|
|
or chunk.get("body")
|
|
or chunk.get("content")
|
|
)
|
|
if isinstance(txt, str):
|
|
idx = chunk.get("index")
|
|
try:
|
|
idx_int = int(idx) if idx is not None else fallback_index
|
|
except Exception:
|
|
idx_int = fallback_index
|
|
return txt, idx_int
|
|
|
|
# Objekt mit Attributen
|
|
for attr in ("text", "window", "body", "content"):
|
|
if hasattr(chunk, attr):
|
|
try:
|
|
txt = getattr(chunk, attr)
|
|
except Exception:
|
|
txt = None
|
|
if isinstance(txt, str):
|
|
# Optionale "index"-Quelle
|
|
idx = None
|
|
if hasattr(chunk, "index"):
|
|
try:
|
|
idx = getattr(chunk, "index")
|
|
except Exception:
|
|
idx = None
|
|
try:
|
|
idx_int = int(idx) if idx is not None else fallback_index
|
|
except Exception:
|
|
idx_int = fallback_index
|
|
return txt, idx_int
|
|
|
|
# Wenn nichts passt -> klarer Fehler
|
|
raise ValueError("Unsupported chunk format: cannot extract text/index")
|
|
|
|
|
|
def make_chunk_payloads(
|
|
note: Any,
|
|
chunks: Iterable[Any],
|
|
*,
|
|
retriever_weight: Optional[float] = None,
|
|
base_payload: Optional[Dict[str, Any]] = None,
|
|
) -> List[Dict[str, Any]]:
|
|
"""
|
|
Erzeugt Qdrant-Payloads für Chunk-Punkte.
|
|
- Kopiert Note-Metadaten (note_id/title/type/tags/path)
|
|
- Schreibt text + chunk_index je Chunk
|
|
- Setzt retriever_weight, wenn vorhanden/angegeben
|
|
"""
|
|
out: List[Dict[str, Any]] = []
|
|
note_fields = _resolve_note_fields(note)
|
|
fm = note_fields["frontmatter"]
|
|
rw = _resolve_retriever_weight(fm, retriever_weight)
|
|
|
|
# Basisfelder, die jeder Chunk tragen soll
|
|
common: Dict[str, Any] = {}
|
|
if base_payload:
|
|
common.update({k: v for k, v in base_payload.items() if v is not None})
|
|
|
|
if note_fields.get("note_id") is not None:
|
|
common["note_id"] = note_fields["note_id"]
|
|
if note_fields.get("title") is not None:
|
|
common["title"] = note_fields["title"]
|
|
if note_fields.get("type") is not None:
|
|
common["type"] = note_fields["type"]
|
|
if note_fields.get("tags"):
|
|
common["tags"] = note_fields["tags"]
|
|
if note_fields.get("path") is not None:
|
|
common["path"] = note_fields["path"]
|
|
if rw is not None:
|
|
common["retriever_weight"] = rw
|
|
|
|
for i, ch in enumerate(chunks):
|
|
text, idx = _extract_chunk_text_and_index(ch, i)
|
|
payload = dict(common) # copy
|
|
payload["chunk_index"] = idx
|
|
payload["text"] = text
|
|
out.append(payload)
|
|
|
|
return out
|