""" Shared Gitea REST helpers (stdlib). Used by gitea_api.py CLI and mcp_server_gitea.py. """ from __future__ import annotations import json import os import urllib.error import urllib.request from pathlib import Path from typing import Any def load_dotenv(repo_root: Path) -> None: env_path = repo_root / ".env" if not env_path.is_file(): return for line in env_path.read_text(encoding="utf-8", errors="replace").splitlines(): line = line.strip() if not line or line.startswith("#"): continue if "=" not in line: continue k, _, v = line.partition("=") k, v = k.strip(), v.strip().strip('"').strip("'") if k and k not in os.environ: os.environ[k] = v def repo_root() -> Path: return Path(__file__).resolve().parents[2] def get_config() -> tuple[str, str, str, str]: base = os.getenv("GITEA_BASE_URL", "").rstrip("/") token = os.getenv("GITEA_TOKEN", "") owner = os.getenv("GITEA_OWNER", "") reponame = os.getenv("GITEA_REPO", "") return base, token, owner, reponame def require_config() -> tuple[str, str, str, str]: base, token, owner, reponame = get_config() missing = [n for n, v in ( ("GITEA_BASE_URL", base), ("GITEA_TOKEN", token), ("GITEA_OWNER", owner), ("GITEA_REPO", reponame), ) if not v] if missing: raise RuntimeError( "Fehlende Umgebungsvariablen: " + ", ".join(missing) + " — setze sie in .env im Repo-Root oder in der MCP-env." ) return base, token, owner, reponame def request_json( method: str, url: str, token: str, data: dict | None = None, ) -> tuple[int, Any]: body = None if data is None else json.dumps(data).encode("utf-8") req = urllib.request.Request(url, data=body, method=method) req.add_header("Authorization", f"token {token}") req.add_header("Accept", "application/json") if body is not None: req.add_header("Content-Type", "application/json") try: with urllib.request.urlopen(req, timeout=120) as resp: raw = resp.read().decode("utf-8", errors="replace") status = resp.status except urllib.error.HTTPError as e: raw = e.read().decode("utf-8", errors="replace") try: return e.code, json.loads(raw) if raw else {} except json.JSONDecodeError: return e.code, {"message": raw or str(e)} if not raw: return status, {} try: return status, json.loads(raw) except json.JSONDecodeError: return status, raw def issues_list_page( base: str, token: str, owner: str, repo: str, *, state: str = "open", page: int = 1, limit: int = 50, ) -> tuple[int, list]: if state == "all": open_st, open_i = issues_list_page( base, token, owner, repo, state="open", page=page, limit=limit ) closed_st, closed_i = issues_list_page( base, token, owner, repo, state="closed", page=page, limit=limit ) merged = (open_i or []) + (closed_i or []) st = max(open_st, closed_st) if open_st >= 400 or closed_st >= 400 else 200 return st, merged[:limit] q = f"?state={state}&page={page}&limit={limit}" url = f"{base}/api/v1/repos/{owner}/{repo}/issues{q}" status, payload = request_json("GET", url, token) if status >= 400: return status, [] if not isinstance(payload, list): return status, [] return status, payload def issues_list_all( base: str, token: str, owner: str, repo: str, *, state: str = "open", limit: int = 50, ) -> list[dict]: if state == "all": o = issues_list_all( base, token, owner, repo, state="open", limit=limit ) c = issues_list_all( base, token, owner, repo, state="closed", limit=limit ) return o + c out: list[dict] = [] page = 1 while True: _, batch = issues_list_page( base, token, owner, repo, state=state, page=page, limit=limit ) if not batch: break out.extend(batch) if len(batch) < limit: break page += 1 return out def issues_get( base: str, token: str, owner: str, repo: str, number: int ) -> tuple[int, Any]: url = f"{base}/api/v1/repos/{owner}/{repo}/issues/{number}" return request_json("GET", url, token) def issues_create( base: str, token: str, owner: str, repo: str, *, title: str, body: str = "", labels: list[str] | None = None, ) -> tuple[int, Any]: url = f"{base}/api/v1/repos/{owner}/{repo}/issues" return request_json( "POST", url, token, {"title": title, "body": body, "labels": labels or []}, ) def issues_comment( base: str, token: str, owner: str, repo: str, number: int, body: str, ) -> tuple[int, Any]: url = f"{base}/api/v1/repos/{owner}/{repo}/issues/{number}/comments" return request_json("POST", url, token, {"body": body}) def issues_patch( base: str, token: str, owner: str, repo: str, number: int, fields: dict, ) -> tuple[int, Any]: """Gitea: PATCH issue (state, title, body, …).""" url = f"{base}/api/v1/repos/{owner}/{repo}/issues/{number}" return request_json("PATCH", url, token, fields) def repo_file_content( base: str, token: str, owner: str, repo: str, path: str, ref: str = "", ) -> tuple[int, Any]: from urllib.parse import quote from base64 import b64decode p = quote(path, safe="/") r = f"?ref={ref}" if ref else "" url = f"{base}/api/v1/repos/{owner}/{repo}/contents/{p}{r}" st, payload = request_json("GET", url, token) if st >= 400: return st, payload if isinstance(payload, dict) and payload.get("type") == "file" and payload.get( "content" ): try: text = b64decode(payload["content"]).decode("utf-8", errors="replace") return st, {"path": path, "encoding": "text", "content": text} except Exception: return st, payload return st, payload