Files
local-mcp/app/services/config_service.py
2026-03-27 03:58:57 +08:00

53 lines
1.7 KiB
Python

"""
app/services/config_service.py
Read and write runtime settings stored in the SQLite settings table.
"""
from __future__ import annotations
import logging
from app.database import get_conn, get_write_conn
from app.models import ConfigResponse
logger = logging.getLogger(__name__)
_SETTING_KEYS = {"default_wait_seconds", "default_empty_response", "agent_stale_after_seconds"}
def get_config() -> ConfigResponse:
with get_conn() as conn:
rows = conn.execute("SELECT key, value FROM settings").fetchall()
data = {r["key"]: r["value"] for r in rows}
return ConfigResponse(
default_wait_seconds=int(data.get("default_wait_seconds", 10)),
default_empty_response=data.get("default_empty_response", ""),
agent_stale_after_seconds=int(data.get("agent_stale_after_seconds", 30)),
)
def update_config(
default_wait_seconds: int | None = None,
default_empty_response: str | None = None,
agent_stale_after_seconds: int | None = None,
) -> ConfigResponse:
updates: dict[str, str] = {}
if default_wait_seconds is not None:
updates["default_wait_seconds"] = str(default_wait_seconds)
if default_empty_response is not None:
updates["default_empty_response"] = default_empty_response
if agent_stale_after_seconds is not None:
updates["agent_stale_after_seconds"] = str(agent_stale_after_seconds)
if updates:
with get_write_conn() as conn:
for key, value in updates.items():
conn.execute(
"INSERT OR REPLACE INTO settings (key, value) VALUES (?, ?)",
(key, value),
)
logger.info("Config updated: %s", list(updates.keys()))
return get_config()