refactor: simplify config handling
This commit is contained in:
@@ -1,75 +1,12 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import tomllib
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, Optional
|
||||
from typing import Any
|
||||
|
||||
from .constants import TELEGRAM_CONFIG_PATH
|
||||
|
||||
|
||||
def _load_toml(path: Path) -> Dict[str, Any]:
|
||||
if not path.exists():
|
||||
return {}
|
||||
import tomllib
|
||||
|
||||
return tomllib.loads(path.read_text(encoding="utf-8"))
|
||||
|
||||
|
||||
def load_telegram_config(path: Optional[str] = None) -> Dict[str, Any]:
|
||||
def load_telegram_config(path: str | None = None) -> dict[str, Any]:
|
||||
cfg_path = Path(path) if path else TELEGRAM_CONFIG_PATH
|
||||
return _load_toml(cfg_path)
|
||||
|
||||
|
||||
def config_get(config: Dict[str, Any], key: str) -> Any:
|
||||
if key in config:
|
||||
return config[key]
|
||||
nested = config.get("telegram")
|
||||
if isinstance(nested, dict) and key in nested:
|
||||
return nested[key]
|
||||
return None
|
||||
|
||||
|
||||
def parse_allowed_chat_ids(value: str) -> Optional[set[int]]:
|
||||
"""
|
||||
Parse a comma-separated chat id string like "123,456".
|
||||
"""
|
||||
v = (value or "").strip()
|
||||
if not v:
|
||||
return None
|
||||
out: set[int] = set()
|
||||
for part in v.split(","):
|
||||
part = part.strip()
|
||||
if not part:
|
||||
continue
|
||||
out.add(int(part))
|
||||
return out
|
||||
|
||||
|
||||
def parse_chat_id_list(value: Any) -> Optional[set[int]]:
|
||||
if value is None:
|
||||
return None
|
||||
if isinstance(value, str):
|
||||
return parse_allowed_chat_ids(value)
|
||||
if isinstance(value, int):
|
||||
return {value}
|
||||
if isinstance(value, (list, tuple, set)):
|
||||
out: set[int] = set()
|
||||
for item in value:
|
||||
if item is None:
|
||||
continue
|
||||
if isinstance(item, str):
|
||||
if not item.strip():
|
||||
continue
|
||||
out.add(int(item))
|
||||
else:
|
||||
out.add(int(item))
|
||||
return out or None
|
||||
return None
|
||||
|
||||
|
||||
def resolve_chat_ids(config: Dict[str, Any]) -> Optional[set[int]]:
|
||||
chat_ids = parse_chat_id_list(config_get(config, "chat_id"))
|
||||
if chat_ids is None:
|
||||
chat_ids = parse_chat_id_list(config_get(config, "allowed_chat_ids"))
|
||||
if chat_ids is None:
|
||||
chat_ids = parse_chat_id_list(config_get(config, "startup_chat_ids"))
|
||||
return chat_ids
|
||||
return tomllib.loads(cfg_path.read_text(encoding="utf-8"))
|
||||
|
||||
@@ -17,7 +17,7 @@ from typing import Any, Callable, Dict, Optional, Tuple
|
||||
|
||||
import typer
|
||||
|
||||
from .config import config_get, load_telegram_config, resolve_chat_ids
|
||||
from .config import load_telegram_config
|
||||
from .constants import TELEGRAM_HARD_LIMIT
|
||||
from .exec_render import ExecProgressRenderer, ExecRenderState, render_event_cli
|
||||
from .rendering import render_markdown
|
||||
@@ -342,18 +342,26 @@ def run(
|
||||
) -> None:
|
||||
setup_file_logger(log_file if log_file else None)
|
||||
config = load_telegram_config()
|
||||
token = config_get(config, "bot_token") or ""
|
||||
db_path = config_get(config, "bridge_db") or "./bridge_routes.sqlite3"
|
||||
chat_ids = resolve_chat_ids(config)
|
||||
allowed = chat_ids
|
||||
startup_ids = chat_ids
|
||||
startup_msg = config_get(config, "startup_message") or "✅ exec_bridge started (codex exec)."
|
||||
token = config["bot_token"]
|
||||
db_path = config.get("bridge_db", "./bridge_routes.sqlite3")
|
||||
|
||||
def _as_int_set(value: Any) -> set[int]:
|
||||
if isinstance(value, int):
|
||||
return {value}
|
||||
if isinstance(value, list):
|
||||
return {int(v) for v in value}
|
||||
raise TypeError(f"expected int or list[int], got {type(value).__name__}")
|
||||
|
||||
allowed = _as_int_set(config.get("allowed_chat_ids", config["chat_id"]))
|
||||
startup_ids = _as_int_set(config.get("startup_chat_ids", config["chat_id"]))
|
||||
|
||||
startup_msg = config.get("startup_message", "✅ exec_bridge started (codex exec).")
|
||||
startup_pwd = os.getcwd()
|
||||
startup_msg = f"{startup_msg}\nPWD: {startup_pwd}"
|
||||
|
||||
codex_cmd = config_get(config, "codex_cmd") or "codex"
|
||||
workspace = workdir if workdir is not None else config_get(config, "codex_workspace")
|
||||
raw_exec_args = config_get(config, "codex_exec_args") or ""
|
||||
codex_cmd = config.get("codex_cmd", "codex")
|
||||
workspace = workdir if workdir is not None else config.get("codex_workspace")
|
||||
raw_exec_args = config.get("codex_exec_args", "")
|
||||
if isinstance(raw_exec_args, list):
|
||||
extra_args = [str(v) for v in raw_exec_args]
|
||||
else:
|
||||
@@ -384,11 +392,7 @@ def run(
|
||||
store = RouteStore(db_path)
|
||||
runner = CodexExecRunner(codex_cmd=codex_cmd, workspace=workspace, extra_args=extra_args)
|
||||
|
||||
max_workers = config_get(config, "max_workers")
|
||||
if isinstance(max_workers, str):
|
||||
max_workers = int(max_workers) if max_workers.strip() else None
|
||||
elif not isinstance(max_workers, int):
|
||||
max_workers = None
|
||||
max_workers = config.get("max_workers")
|
||||
pool = ThreadPoolExecutor(max_workers=max_workers or 4)
|
||||
offset: Optional[int] = None
|
||||
ignore_backlog = bool(ignore_backlog)
|
||||
|
||||
Reference in New Issue
Block a user