mitai-jinkendo/scripts/gitea/gitea_lib.py
Lars dc87e7f3b8
All checks were successful
Deploy Development / deploy (push) Successful in 44s
Build Test / lint-backend (push) Successful in 0s
Build Test / build-frontend (push) Successful in 13s
cursor_Setup
2026-04-04 14:05:50 +02:00

227 lines
6.1 KiB
Python

"""
Shared Gitea REST helpers (stdlib). Used by gitea_api.py CLI and mcp_server_gitea.py.
"""
from __future__ import annotations
import json
import os
import urllib.error
import urllib.request
from pathlib import Path
from typing import Any
def load_dotenv(repo_root: Path) -> None:
env_path = repo_root / ".env"
if not env_path.is_file():
return
for line in env_path.read_text(encoding="utf-8", errors="replace").splitlines():
line = line.strip()
if not line or line.startswith("#"):
continue
if "=" not in line:
continue
k, _, v = line.partition("=")
k, v = k.strip(), v.strip().strip('"').strip("'")
if k and k not in os.environ:
os.environ[k] = v
def repo_root() -> Path:
return Path(__file__).resolve().parents[2]
def get_config() -> tuple[str, str, str, str]:
base = os.getenv("GITEA_BASE_URL", "").rstrip("/")
token = os.getenv("GITEA_TOKEN", "")
owner = os.getenv("GITEA_OWNER", "")
reponame = os.getenv("GITEA_REPO", "")
return base, token, owner, reponame
def require_config() -> tuple[str, str, str, str]:
base, token, owner, reponame = get_config()
missing = [n for n, v in (
("GITEA_BASE_URL", base),
("GITEA_TOKEN", token),
("GITEA_OWNER", owner),
("GITEA_REPO", reponame),
) if not v]
if missing:
raise RuntimeError(
"Fehlende Umgebungsvariablen: " + ", ".join(missing)
+ " — setze sie in .env im Repo-Root oder in der MCP-env."
)
return base, token, owner, reponame
def request_json(
method: str,
url: str,
token: str,
data: dict | None = None,
) -> tuple[int, Any]:
body = None if data is None else json.dumps(data).encode("utf-8")
req = urllib.request.Request(url, data=body, method=method)
req.add_header("Authorization", f"token {token}")
req.add_header("Accept", "application/json")
if body is not None:
req.add_header("Content-Type", "application/json")
try:
with urllib.request.urlopen(req, timeout=120) as resp:
raw = resp.read().decode("utf-8", errors="replace")
status = resp.status
except urllib.error.HTTPError as e:
raw = e.read().decode("utf-8", errors="replace")
try:
return e.code, json.loads(raw) if raw else {}
except json.JSONDecodeError:
return e.code, {"message": raw or str(e)}
if not raw:
return status, {}
try:
return status, json.loads(raw)
except json.JSONDecodeError:
return status, raw
def issues_list_page(
base: str,
token: str,
owner: str,
repo: str,
*,
state: str = "open",
page: int = 1,
limit: int = 50,
) -> tuple[int, list]:
if state == "all":
open_st, open_i = issues_list_page(
base, token, owner, repo, state="open", page=page, limit=limit
)
closed_st, closed_i = issues_list_page(
base, token, owner, repo, state="closed", page=page, limit=limit
)
merged = (open_i or []) + (closed_i or [])
st = max(open_st, closed_st) if open_st >= 400 or closed_st >= 400 else 200
return st, merged[:limit]
q = f"?state={state}&page={page}&limit={limit}"
url = f"{base}/api/v1/repos/{owner}/{repo}/issues{q}"
status, payload = request_json("GET", url, token)
if status >= 400:
return status, []
if not isinstance(payload, list):
return status, []
return status, payload
def issues_list_all(
base: str,
token: str,
owner: str,
repo: str,
*,
state: str = "open",
limit: int = 50,
) -> list[dict]:
if state == "all":
o = issues_list_all(
base, token, owner, repo, state="open", limit=limit
)
c = issues_list_all(
base, token, owner, repo, state="closed", limit=limit
)
return o + c
out: list[dict] = []
page = 1
while True:
_, batch = issues_list_page(
base, token, owner, repo, state=state, page=page, limit=limit
)
if not batch:
break
out.extend(batch)
if len(batch) < limit:
break
page += 1
return out
def issues_get(
base: str, token: str, owner: str, repo: str, number: int
) -> tuple[int, Any]:
url = f"{base}/api/v1/repos/{owner}/{repo}/issues/{number}"
return request_json("GET", url, token)
def issues_create(
base: str,
token: str,
owner: str,
repo: str,
*,
title: str,
body: str = "",
labels: list[str] | None = None,
) -> tuple[int, Any]:
url = f"{base}/api/v1/repos/{owner}/{repo}/issues"
return request_json(
"POST",
url,
token,
{"title": title, "body": body, "labels": labels or []},
)
def issues_comment(
base: str,
token: str,
owner: str,
repo: str,
number: int,
body: str,
) -> tuple[int, Any]:
url = f"{base}/api/v1/repos/{owner}/{repo}/issues/{number}/comments"
return request_json("POST", url, token, {"body": body})
def issues_patch(
base: str,
token: str,
owner: str,
repo: str,
number: int,
fields: dict,
) -> tuple[int, Any]:
"""Gitea: PATCH issue (state, title, body, …)."""
url = f"{base}/api/v1/repos/{owner}/{repo}/issues/{number}"
return request_json("PATCH", url, token, fields)
def repo_file_content(
base: str,
token: str,
owner: str,
repo: str,
path: str,
ref: str = "",
) -> tuple[int, Any]:
from urllib.parse import quote
from base64 import b64decode
p = quote(path, safe="/")
r = f"?ref={ref}" if ref else ""
url = f"{base}/api/v1/repos/{owner}/{repo}/contents/{p}{r}"
st, payload = request_json("GET", url, token)
if st >= 400:
return st, payload
if isinstance(payload, dict) and payload.get("type") == "file" and payload.get(
"content"
):
try:
text = b64decode(payload["content"]).decode("utf-8", errors="replace")
return st, {"path": path, "encoding": "text", "content": text}
except Exception:
return st, payload
return st, payload