refactor: simplify telegram loop and jsonl runner (#155)

This commit is contained in:
banteg
2026-01-16 18:11:27 +04:00
committed by GitHub
parent da881fcee5
commit 190b2f6d6e
18 changed files with 2881 additions and 2087 deletions
-1090
View File
File diff suppressed because it is too large Load Diff
+189
View File
@@ -0,0 +1,189 @@
from __future__ import annotations
# ruff: noqa: F401
from collections.abc import Callable
import sys
from pathlib import Path
import typer
from .. import __version__
from ..config import (
ConfigError,
HOME_CONFIG_PATH,
load_or_init_config,
write_config,
)
from ..config_migrations import migrate_config
from ..commands import get_command
from ..engines import get_backend, list_backend_ids
from ..ids import RESERVED_CHAT_COMMANDS, RESERVED_COMMAND_IDS, RESERVED_ENGINE_IDS
from ..lockfile import LockError, LockHandle, acquire_lock, token_fingerprint
from ..logging import setup_logging
from ..runtime_loader import build_runtime_spec, resolve_plugins_allowlist
from ..settings import (
TakopiSettings,
load_settings,
load_settings_if_exists,
validate_settings_data,
)
from ..plugins import (
COMMAND_GROUP,
ENGINE_GROUP,
TRANSPORT_GROUP,
entrypoint_distribution_name,
get_load_errors,
is_entrypoint_allowed,
list_entrypoints,
normalize_allowlist,
)
from ..transports import get_transport
from ..utils.git import resolve_default_base, resolve_main_worktree_root
from ..telegram import onboarding
from ..telegram.client import TelegramClient
from ..telegram.topics import _validate_topics_setup_for
from .doctor import (
DoctorCheck,
DoctorStatus,
_doctor_file_checks,
_doctor_telegram_checks,
_doctor_voice_checks,
run_doctor,
)
from .init import (
_default_alias_from_path,
_ensure_projects_table,
_prompt_alias,
run_init,
)
from .onboarding_cmd import chat_id, onboarding_paths
from .plugins import plugins_cmd
from .run import (
_default_engine_for_setup,
_print_version_and_exit,
_resolve_setup_engine,
_resolve_transport_id,
_run_auto_router,
_setup_needs_config,
_should_run_interactive,
_version_callback,
acquire_config_lock,
app_main,
make_engine_cmd,
)
from .config import (
_CONFIG_PATH_OPTION,
_config_path_display,
_exit_config_error,
_fail_missing_config,
_flatten_config,
_load_config_or_exit,
_normalized_value_from_settings,
_parse_key_path,
_parse_value,
_resolve_config_path_override,
_toml_literal,
config_get,
config_list,
config_path_cmd,
config_set,
config_unset,
)
def _load_settings_optional() -> tuple[TakopiSettings | None, Path | None]:
try:
loaded = load_settings_if_exists()
except ConfigError:
return None, None
if loaded is None:
return None, None
return loaded
def init(
alias: str | None = typer.Argument(
None, help="Project alias (used as /alias in messages)."
),
default: bool = typer.Option(
False,
"--default",
help="Set this project as the default_project.",
),
) -> None:
"""Register the current repo as a Takopi project."""
run_init(
alias=alias,
default=default,
load_or_init_config_fn=load_or_init_config,
resolve_main_worktree_root_fn=resolve_main_worktree_root,
resolve_default_base_fn=resolve_default_base,
list_backend_ids_fn=list_backend_ids,
resolve_plugins_allowlist_fn=resolve_plugins_allowlist,
)
def doctor() -> None:
"""Run configuration checks for the active transport."""
setup_logging(debug=False, cache_logger_on_first_use=False)
run_doctor(
load_settings_fn=load_settings,
telegram_checks=_doctor_telegram_checks,
file_checks=_doctor_file_checks,
voice_checks=_doctor_voice_checks,
)
def _engine_ids_for_cli() -> list[str]:
allowlist: list[str] | None = None
try:
config, _ = load_or_init_config()
except ConfigError:
return list_backend_ids()
raw_plugins = config.get("plugins")
if isinstance(raw_plugins, dict):
enabled = raw_plugins.get("enabled")
if isinstance(enabled, list):
allowlist = [
value.strip()
for value in enabled
if isinstance(value, str) and value.strip()
]
if not allowlist:
allowlist = None
return list_backend_ids(allowlist=allowlist)
def create_app() -> typer.Typer:
app = typer.Typer(
add_completion=False,
invoke_without_command=True,
help="Telegram bridge for coding agents. Docs: https://takopi.dev/",
)
config_app = typer.Typer(help="Read and modify takopi config.")
config_app.command(name="path")(config_path_cmd)
config_app.command(name="list")(config_list)
config_app.command(name="get")(config_get)
config_app.command(name="set")(config_set)
config_app.command(name="unset")(config_unset)
app.command(name="init")(init)
app.command(name="chat-id")(chat_id)
app.command(name="doctor")(doctor)
app.command(name="onboarding-paths")(onboarding_paths)
app.command(name="plugins")(plugins_cmd)
app.add_typer(config_app, name="config")
app.callback()(app_main)
for engine_id in _engine_ids_for_cli():
help_text = f"Run with the {engine_id} engine."
app.command(name=engine_id, help=help_text)(make_engine_cmd(engine_id))
return app
def main() -> None:
app = create_app()
app()
if __name__ == "__main__":
main()
+320
View File
@@ -0,0 +1,320 @@
from __future__ import annotations
import re
import sys
import tomllib
from pathlib import Path
from typing import Any
import typer
from pydantic import BaseModel
from ..config import (
ConfigError,
HOME_CONFIG_PATH,
dump_toml,
read_config,
write_config,
)
from ..config_migrations import migrate_config
from ..settings import TakopiSettings, validate_settings_data
_KEY_SEGMENT_RE = re.compile(r"^[A-Za-z0-9_-]+$")
_MISSING = object()
_CONFIG_PATH_OPTION = typer.Option(
None,
"--config-path",
help="Override the default config path.",
)
def _config_path_display(path: Path) -> str:
home = Path.home()
try:
return f"~/{path.relative_to(home)}"
except ValueError:
return str(path)
def _fail_missing_config(path: Path) -> None:
display = _config_path_display(path)
if path.exists():
typer.echo(f"error: invalid takopi config at {display}", err=True)
else:
typer.echo(f"error: missing takopi config at {display}", err=True)
def _resolve_config_path_override(value: Path | None) -> Path:
if value is None:
return _resolve_home_config_path()
return value.expanduser()
def _resolve_home_config_path() -> Path:
cli_module = sys.modules.get("takopi.cli")
if cli_module is not None:
override = getattr(cli_module, "HOME_CONFIG_PATH", None)
if override is not None:
return Path(override)
return HOME_CONFIG_PATH
def _exit_config_error(exc: ConfigError, *, code: int = 2) -> None:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(code=code) from exc
def _parse_key_path(raw: str) -> list[str]:
value = raw.strip()
if not value:
raise ConfigError("Invalid key path; expected a non-empty value.")
segments = value.split(".")
for segment in segments:
if not segment:
raise ConfigError(f"Invalid key path {raw!r}; empty segment.")
if not _KEY_SEGMENT_RE.fullmatch(segment):
raise ConfigError(
f"Invalid key segment {segment!r} in {raw!r}; "
"use only letters, numbers, '_' or '-'."
)
return segments
def _parse_value(raw: str) -> Any:
value = raw.strip()
if not value:
return ""
try:
return tomllib.loads(f"__v__ = {value}")["__v__"]
except tomllib.TOMLDecodeError:
return value
def _toml_literal(value: Any) -> str:
dumped = dump_toml({"__v__": value})
prefix = "__v__ = "
if dumped.startswith(prefix):
return dumped[len(prefix) :].rstrip("\n")
raise ConfigError("Unsupported config value; unable to render TOML literal.")
def _normalized_value_from_settings(
settings: TakopiSettings, segments: list[str]
) -> Any:
node: Any = settings
for segment in segments:
if isinstance(node, BaseModel):
if segment in node.__class__.model_fields:
node = getattr(node, segment)
else:
extra = node.model_extra or {}
node = extra.get(segment, _MISSING)
elif isinstance(node, dict):
node = node.get(segment, _MISSING)
else:
return _MISSING
if node is _MISSING:
return _MISSING
if isinstance(node, BaseModel):
return node.model_dump(exclude_unset=True)
return node
def _flatten_config(config: dict[str, Any]) -> list[tuple[str, Any]]:
items: list[tuple[str, Any]] = []
def _walk(node: Any, prefix: str) -> None:
if isinstance(node, dict):
for key in sorted(node):
value = node[key]
path = f"{prefix}.{key}" if prefix else key
if isinstance(value, dict):
_walk(value, path)
else:
items.append((path, value))
elif prefix:
items.append((prefix, node))
_walk(config, "")
return items
def _load_config_or_exit(path: Path, *, missing_code: int) -> dict[str, Any]:
if not path.exists():
_fail_missing_config(path)
raise typer.Exit(code=missing_code)
try:
return read_config(path)
except ConfigError as exc:
_exit_config_error(exc)
return {}
def config_path_cmd(
config_path: Path | None = _CONFIG_PATH_OPTION,
) -> None:
"""Print the resolved config path."""
path = _resolve_config_path_override(config_path)
typer.echo(_config_path_display(path))
def config_list(
config_path: Path | None = _CONFIG_PATH_OPTION,
) -> None:
"""List config keys as flattened dot-paths."""
path = _resolve_config_path_override(config_path)
config = _load_config_or_exit(path, missing_code=1)
try:
for key, value in _flatten_config(config):
literal = _toml_literal(value)
typer.echo(f"{key} = {literal}")
except ConfigError as exc:
_exit_config_error(exc)
def config_get(
key: str = typer.Argument(..., help="Dot-path key to fetch."),
config_path: Path | None = _CONFIG_PATH_OPTION,
) -> None:
"""Fetch a single config key."""
path = _resolve_config_path_override(config_path)
config = _load_config_or_exit(path, missing_code=2)
try:
segments = _parse_key_path(key)
except ConfigError as exc:
_exit_config_error(exc)
node: Any = config
for index, segment in enumerate(segments):
if not isinstance(node, dict):
prefix = ".".join(segments[:index])
_exit_config_error(
ConfigError(f"Invalid `{prefix}` in {path}; expected a table.")
)
if segment not in node:
raise typer.Exit(code=1)
node = node[segment]
if isinstance(node, dict):
typer.echo(
f"error: {'.'.join(segments)!r} is a table; pick a leaf node.",
err=True,
)
raise typer.Exit(code=2)
try:
typer.echo(_toml_literal(node))
except ConfigError as exc:
_exit_config_error(exc)
def config_set(
key: str = typer.Argument(..., help="Dot-path key to set."),
value: str = typer.Argument(..., help="Value to assign (auto-parsed)."),
config_path: Path | None = _CONFIG_PATH_OPTION,
) -> None:
"""Set a config value."""
path = _resolve_config_path_override(config_path)
config = _load_config_or_exit(path, missing_code=2)
try:
segments = _parse_key_path(key)
except ConfigError as exc:
_exit_config_error(exc)
try:
migrate_config(config, config_path=path)
except ConfigError as exc:
_exit_config_error(exc)
parsed = _parse_value(value)
node: Any = config
for index, segment in enumerate(segments[:-1]):
next_node = node.get(segment)
if next_node is None:
created: dict[str, Any] = {}
node[segment] = created
node = created
continue
if not isinstance(next_node, dict):
prefix = ".".join(segments[: index + 1])
_exit_config_error(
ConfigError(f"Invalid `{prefix}` in {path}; expected a table.")
)
node = next_node
node[segments[-1]] = parsed
try:
settings = validate_settings_data(config, config_path=path)
except ConfigError as exc:
_exit_config_error(exc)
normalized = _normalized_value_from_settings(settings, segments)
if normalized is not _MISSING:
node[segments[-1]] = normalized
parsed = normalized
try:
write_config(config, path)
except ConfigError as exc:
_exit_config_error(exc)
rendered = _toml_literal(parsed)
typer.echo(f"updated {'.'.join(segments)} = {rendered}")
def config_unset(
key: str = typer.Argument(..., help="Dot-path key to remove."),
config_path: Path | None = _CONFIG_PATH_OPTION,
) -> None:
"""Remove a config key."""
path = _resolve_config_path_override(config_path)
config = _load_config_or_exit(path, missing_code=2)
try:
segments = _parse_key_path(key)
except ConfigError as exc:
_exit_config_error(exc)
try:
migrate_config(config, config_path=path)
except ConfigError as exc:
_exit_config_error(exc)
node: Any = config
stack: list[tuple[dict[str, Any], str]] = []
for index, segment in enumerate(segments[:-1]):
if not isinstance(node, dict):
prefix = ".".join(segments[:index])
_exit_config_error(
ConfigError(f"Invalid `{prefix}` in {path}; expected a table.")
)
next_node = node.get(segment)
if next_node is None:
raise typer.Exit(code=1)
if not isinstance(next_node, dict):
prefix = ".".join(segments[: index + 1])
_exit_config_error(
ConfigError(f"Invalid `{prefix}` in {path}; expected a table.")
)
stack.append((node, segment))
node = next_node
if not isinstance(node, dict):
prefix = ".".join(segments[:-1])
_exit_config_error(
ConfigError(f"Invalid `{prefix}` in {path}; expected a table.")
)
leaf = segments[-1]
if leaf not in node:
raise typer.Exit(code=1)
node.pop(leaf, None)
while stack and not node:
parent, key_name = stack.pop()
parent.pop(key_name, None)
node = parent
try:
validate_settings_data(config, config_path=path)
write_config(config, path)
except ConfigError as exc:
_exit_config_error(exc)
+168
View File
@@ -0,0 +1,168 @@
from __future__ import annotations
import os
import sys
from collections.abc import Awaitable, Callable
from dataclasses import dataclass
from pathlib import Path
from typing import Literal
import anyio
import typer
from ..config import ConfigError
from ..engines import list_backend_ids
from ..ids import RESERVED_CHAT_COMMANDS
from ..runtime_loader import resolve_plugins_allowlist
from ..settings import TakopiSettings, TelegramTopicsSettings
from ..telegram.client import TelegramClient
from ..telegram.topics import _validate_topics_setup_for
DoctorStatus = Literal["ok", "warning", "error"]
@dataclass(frozen=True, slots=True)
class DoctorCheck:
label: str
status: DoctorStatus
detail: str | None = None
def render(self) -> str:
if self.detail:
return f"- {self.label}: {self.status} ({self.detail})"
return f"- {self.label}: {self.status}"
def _doctor_file_checks(settings: TakopiSettings) -> list[DoctorCheck]:
files = settings.transports.telegram.files
if not files.enabled:
return [DoctorCheck("file transfer", "ok", "disabled")]
if files.allowed_user_ids:
count = len(files.allowed_user_ids)
detail = f"restricted to {count} user id(s)"
return [DoctorCheck("file transfer", "ok", detail)]
return [DoctorCheck("file transfer", "warning", "enabled for all users")]
def _doctor_voice_checks(settings: TakopiSettings) -> list[DoctorCheck]:
if not settings.transports.telegram.voice_transcription:
return [DoctorCheck("voice transcription", "ok", "disabled")]
if os.environ.get("OPENAI_API_KEY"):
return [DoctorCheck("voice transcription", "ok", "OPENAI_API_KEY set")]
return [DoctorCheck("voice transcription", "error", "OPENAI_API_KEY not set")]
async def _doctor_telegram_checks(
token: str,
chat_id: int,
topics: TelegramTopicsSettings,
project_chat_ids: tuple[int, ...],
) -> list[DoctorCheck]:
checks: list[DoctorCheck] = []
client_factory = _resolve_cli_attr("TelegramClient") or TelegramClient
validate_topics = (
_resolve_cli_attr("_validate_topics_setup_for") or _validate_topics_setup_for
)
bot = client_factory(token)
try:
me = await bot.get_me()
if me is None:
checks.append(
DoctorCheck("telegram token", "error", "failed to fetch bot info")
)
checks.append(DoctorCheck("chat_id", "error", "skipped (token invalid)"))
if topics.enabled:
checks.append(DoctorCheck("topics", "error", "skipped (token invalid)"))
else:
checks.append(DoctorCheck("topics", "ok", "disabled"))
return checks
bot_label = f"@{me.username}" if me.username else f"id={me.id}"
checks.append(DoctorCheck("telegram token", "ok", bot_label))
chat = await bot.get_chat(chat_id)
if chat is None:
checks.append(DoctorCheck("chat_id", "error", f"unreachable ({chat_id})"))
else:
checks.append(DoctorCheck("chat_id", "ok", f"{chat.type} ({chat_id})"))
if topics.enabled:
try:
await validate_topics(
bot=bot,
topics=topics,
chat_id=chat_id,
project_chat_ids=project_chat_ids,
)
checks.append(DoctorCheck("topics", "ok", f"scope={topics.scope}"))
except ConfigError as exc:
checks.append(DoctorCheck("topics", "error", str(exc)))
else:
checks.append(DoctorCheck("topics", "ok", "disabled"))
except Exception as exc: # noqa: BLE001
checks.append(DoctorCheck("telegram", "error", str(exc)))
finally:
await bot.close()
return checks
def run_doctor(
*,
load_settings_fn: Callable[[], tuple[TakopiSettings, Path]],
telegram_checks: Callable[
[str, int, TelegramTopicsSettings, tuple[int, ...]],
Awaitable[list[DoctorCheck]],
],
file_checks: Callable[[TakopiSettings], list[DoctorCheck]],
voice_checks: Callable[[TakopiSettings], list[DoctorCheck]],
) -> None:
try:
settings, config_path = load_settings_fn()
except ConfigError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(code=1) from exc
if settings.transport != "telegram":
typer.echo(
"error: takopi doctor currently supports the telegram transport only.",
err=True,
)
raise typer.Exit(code=1)
allowlist = resolve_plugins_allowlist(settings)
engine_ids = list_backend_ids(allowlist=allowlist)
try:
projects_cfg = settings.to_projects_config(
config_path=config_path,
engine_ids=engine_ids,
reserved=RESERVED_CHAT_COMMANDS,
)
except ConfigError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(code=1) from exc
tg = settings.transports.telegram
project_chat_ids = projects_cfg.project_chat_ids()
telegram_checks_result = anyio.run(
telegram_checks,
tg.bot_token,
tg.chat_id,
tg.topics,
project_chat_ids,
)
if telegram_checks_result is None:
telegram_checks_result = []
checks = [
*telegram_checks_result,
*file_checks(settings),
*voice_checks(settings),
]
typer.echo("takopi doctor")
for check in checks:
typer.echo(check.render())
if any(check.status == "error" for check in checks):
raise typer.Exit(code=1)
def _resolve_cli_attr(name: str) -> object | None:
cli_module = sys.modules.get("takopi.cli")
if cli_module is None:
return None
return getattr(cli_module, name, None)
+113
View File
@@ -0,0 +1,113 @@
from __future__ import annotations
from collections.abc import Callable
from pathlib import Path
import typer
from ..config import ConfigError, write_config
from ..config_migrations import migrate_config
from ..ids import RESERVED_CHAT_COMMANDS
from ..settings import TakopiSettings, validate_settings_data
from .config import _config_path_display
def _prompt_alias(value: str | None, *, default_alias: str | None = None) -> str:
if value is not None:
alias = value
elif default_alias:
alias = typer.prompt("project alias", default=default_alias)
else:
alias = typer.prompt("project alias")
alias = alias.strip()
if not alias:
typer.echo("error: project alias cannot be empty", err=True)
raise typer.Exit(code=1)
return alias
def _default_alias_from_path(path: Path) -> str | None:
name = path.name
if not name:
return None
name = name.removesuffix(".git")
return name or None
def _ensure_projects_table(config: dict, config_path: Path) -> dict:
projects = config.setdefault("projects", {})
if not isinstance(projects, dict):
raise ConfigError(f"Invalid `projects` in {config_path}; expected a table.")
return projects
def run_init(
*,
alias: str | None,
default: bool,
load_or_init_config_fn: Callable[[], tuple[dict, Path]],
resolve_main_worktree_root_fn: Callable[[Path], Path | None],
resolve_default_base_fn: Callable[[Path], str | None],
list_backend_ids_fn: Callable[..., list[str]],
resolve_plugins_allowlist_fn: Callable[[TakopiSettings], list[str] | None],
) -> None:
config, config_path = load_or_init_config_fn()
if config_path.exists():
applied = migrate_config(config, config_path=config_path)
if applied:
write_config(config, config_path)
cwd = Path.cwd()
project_path = resolve_main_worktree_root_fn(cwd) or cwd
default_alias = _default_alias_from_path(project_path)
alias = _prompt_alias(alias, default_alias=default_alias)
settings = validate_settings_data(config, config_path=config_path)
allowlist = resolve_plugins_allowlist_fn(settings)
engine_ids = list_backend_ids_fn(allowlist=allowlist)
projects_cfg = settings.to_projects_config(
config_path=config_path,
engine_ids=engine_ids,
reserved=RESERVED_CHAT_COMMANDS,
)
alias_key = alias.lower()
if alias_key in {engine.lower() for engine in engine_ids}:
raise ConfigError(
f"Invalid project alias {alias!r}; aliases must not match engine ids."
)
if alias_key in RESERVED_CHAT_COMMANDS:
raise ConfigError(
f"Invalid project alias {alias!r}; aliases must not match reserved commands."
)
existing = projects_cfg.projects.get(alias_key)
if existing is not None:
overwrite = typer.confirm(
f"project {existing.alias!r} already exists, overwrite?",
default=False,
)
if not overwrite:
raise typer.Exit(code=1)
projects = _ensure_projects_table(config, config_path)
if existing is not None and existing.alias in projects:
projects.pop(existing.alias, None)
default_engine = settings.default_engine
worktree_base = resolve_default_base_fn(project_path)
entry: dict[str, object] = {
"path": str(project_path),
"worktrees_dir": ".worktrees",
"default_engine": default_engine,
}
if worktree_base:
entry["worktree_base"] = worktree_base
projects[alias] = entry
if default:
config["default_project"] = alias
write_config(config, config_path)
typer.echo(f"saved project {alias!r} to {_config_path_display(config_path)}")
+126
View File
@@ -0,0 +1,126 @@
from __future__ import annotations
import sys
from collections.abc import Callable
from functools import partial
from pathlib import Path
from typing import Any, cast
import anyio
import typer
from ..config import ConfigError, load_or_init_config, write_config
from ..config_migrations import migrate_config
from ..logging import setup_logging
from ..settings import TakopiSettings
from ..telegram import onboarding
from .init import _ensure_projects_table
from .run import _load_settings_optional
def chat_id(
token: str | None = typer.Option(
None,
"--token",
help="Telegram bot token (defaults to config if available).",
),
project: str | None = typer.Option(
None,
"--project",
help="Project alias to print a chat_id snippet for.",
),
) -> None:
"""Capture a Telegram chat id and exit."""
setup_logging_fn = cast(
Callable[..., None],
_resolve_cli_attr("setup_logging") or setup_logging,
)
load_settings_optional_fn = cast(
Callable[[], tuple[TakopiSettings | None, Path | None]],
_resolve_cli_attr("_load_settings_optional") or _load_settings_optional,
)
onboarding_mod = cast(
Any,
_resolve_cli_attr("onboarding") or onboarding,
)
load_or_init_config_fn = cast(
Callable[[], tuple[dict, Path]],
_resolve_cli_attr("load_or_init_config") or load_or_init_config,
)
ensure_projects_table_fn = cast(
Callable[[dict, Path], dict],
_resolve_cli_attr("_ensure_projects_table") or _ensure_projects_table,
)
migrate_config_fn = cast(
Callable[..., object],
_resolve_cli_attr("migrate_config") or migrate_config,
)
write_config_fn = cast(
Callable[[dict, Path], None],
_resolve_cli_attr("write_config") or write_config,
)
setup_logging_fn(debug=False, cache_logger_on_first_use=False)
if token is None:
settings, _ = load_settings_optional_fn()
if settings is not None:
tg = settings.transports.telegram
token = tg.bot_token or None
chat = anyio.run(partial(onboarding_mod.capture_chat_id, token=token))
if chat is None:
raise typer.Exit(code=1)
if project:
project = project.strip()
if not project:
raise ConfigError("Invalid `--project`; expected a non-empty string.")
config, config_path = load_or_init_config_fn()
if config_path.exists():
applied = migrate_config_fn(config, config_path=config_path)
if applied:
write_config_fn(config, config_path)
projects = ensure_projects_table_fn(config, config_path)
entry = projects.get(project)
if entry is None:
lowered = project.lower()
for key, value in projects.items():
if isinstance(key, str) and key.lower() == lowered:
entry = value
project = key
break
if entry is None:
raise ConfigError(
f"Unknown project {project!r}; run `takopi init {project}` first."
)
if not isinstance(entry, dict):
raise ConfigError(
f"Invalid `projects.{project}` in {config_path}; expected a table."
)
entry["chat_id"] = chat.chat_id
write_config_fn(config, config_path)
typer.echo(f"updated projects.{project}.chat_id = {chat.chat_id}")
return
typer.echo(f"chat_id = {chat.chat_id}")
def onboarding_paths() -> None:
"""Print all possible onboarding paths."""
setup_logging_fn = cast(
Callable[..., None],
_resolve_cli_attr("setup_logging") or setup_logging,
)
onboarding_mod = cast(
Any,
_resolve_cli_attr("onboarding") or onboarding,
)
setup_logging_fn(debug=False, cache_logger_on_first_use=False)
onboarding_mod.debug_onboarding_paths()
def _resolve_cli_attr(name: str) -> object | None:
cli_module = sys.modules.get("takopi.cli")
if cli_module is None:
return None
return getattr(cli_module, name, None)
+196
View File
@@ -0,0 +1,196 @@
from __future__ import annotations
import sys
from collections.abc import Callable
from importlib.metadata import EntryPoint
from pathlib import Path
from typing import cast
import typer
from ..commands import get_command
from ..config import ConfigError
from ..engines import get_backend
from ..ids import RESERVED_COMMAND_IDS, RESERVED_ENGINE_IDS
from ..plugins import (
COMMAND_GROUP,
ENGINE_GROUP,
PluginLoadError,
TRANSPORT_GROUP,
entrypoint_distribution_name,
get_load_errors,
is_entrypoint_allowed,
list_entrypoints,
normalize_allowlist,
)
from ..runtime_loader import resolve_plugins_allowlist
from ..settings import TakopiSettings, load_settings_if_exists
from ..transports import get_transport
def _load_settings_optional() -> tuple[TakopiSettings | None, Path | None]:
try:
loaded = load_settings_if_exists()
except ConfigError:
return None, None
if loaded is None:
return None, None
return loaded
def _print_entrypoints(
label: str,
entrypoints: list[EntryPoint],
*,
allowlist: set[str] | None,
entrypoint_distribution_name_fn: Callable[[EntryPoint], str | None],
is_entrypoint_allowed_fn: Callable[[EntryPoint, set[str] | None], bool],
) -> None:
typer.echo(f"{label}:")
if not entrypoints:
typer.echo(" (none)")
return
for ep in entrypoints:
dist = entrypoint_distribution_name_fn(ep) or "unknown"
status = ""
if allowlist is not None:
allowed = is_entrypoint_allowed_fn(ep, allowlist)
status = " enabled" if allowed else " disabled"
typer.echo(f" {ep.name} ({dist}){status}")
def plugins_cmd(
load: bool = typer.Option(
False,
"--load/--no-load",
help="Load plugins to validate and surface import errors.",
),
) -> None:
"""List discovered plugins and optionally validate them."""
load_settings_optional = cast(
Callable[[], tuple[TakopiSettings | None, Path | None]],
_resolve_cli_attr("_load_settings_optional") or _load_settings_optional,
)
resolve_plugins_allowlist_fn = cast(
Callable[[TakopiSettings | None], list[str] | None],
_resolve_cli_attr("resolve_plugins_allowlist") or resolve_plugins_allowlist,
)
list_entrypoints_fn = cast(
Callable[..., list[EntryPoint]],
_resolve_cli_attr("list_entrypoints") or list_entrypoints,
)
get_backend_fn = cast(
Callable[..., object],
_resolve_cli_attr("get_backend") or get_backend,
)
get_transport_fn = cast(
Callable[..., object],
_resolve_cli_attr("get_transport") or get_transport,
)
get_command_fn = cast(
Callable[..., object],
_resolve_cli_attr("get_command") or get_command,
)
get_load_errors_fn = cast(
Callable[[], tuple[PluginLoadError, ...]],
_resolve_cli_attr("get_load_errors") or get_load_errors,
)
entrypoint_distribution_name_fn = cast(
Callable[[EntryPoint], str | None],
_resolve_cli_attr("entrypoint_distribution_name")
or entrypoint_distribution_name,
)
is_entrypoint_allowed_fn = cast(
Callable[[EntryPoint, set[str] | None], bool],
_resolve_cli_attr("is_entrypoint_allowed") or is_entrypoint_allowed,
)
normalize_allowlist_fn = cast(
Callable[[list[str] | None], set[str] | None],
_resolve_cli_attr("normalize_allowlist") or normalize_allowlist,
)
settings_hint, _ = load_settings_optional()
allowlist = resolve_plugins_allowlist_fn(settings_hint)
allowlist_set = normalize_allowlist_fn(allowlist)
engine_eps = list_entrypoints_fn(
ENGINE_GROUP,
reserved_ids=RESERVED_ENGINE_IDS,
)
transport_eps = list_entrypoints_fn(TRANSPORT_GROUP)
command_eps = list_entrypoints_fn(
COMMAND_GROUP,
reserved_ids=RESERVED_COMMAND_IDS,
)
_print_entrypoints(
"engine backends",
engine_eps,
allowlist=allowlist_set,
entrypoint_distribution_name_fn=entrypoint_distribution_name_fn,
is_entrypoint_allowed_fn=is_entrypoint_allowed_fn,
)
_print_entrypoints(
"transport backends",
transport_eps,
allowlist=allowlist_set,
entrypoint_distribution_name_fn=entrypoint_distribution_name_fn,
is_entrypoint_allowed_fn=is_entrypoint_allowed_fn,
)
_print_entrypoints(
"command backends",
command_eps,
allowlist=allowlist_set,
entrypoint_distribution_name_fn=entrypoint_distribution_name_fn,
is_entrypoint_allowed_fn=is_entrypoint_allowed_fn,
)
if load:
for ep in engine_eps:
if allowlist_set is not None and not is_entrypoint_allowed_fn(
ep, allowlist_set
):
continue
try:
get_backend_fn(ep.name, allowlist=allowlist)
except ConfigError:
continue
for ep in transport_eps:
if allowlist_set is not None and not is_entrypoint_allowed_fn(
ep, allowlist_set
):
continue
try:
get_transport_fn(ep.name, allowlist=allowlist)
except ConfigError:
continue
for ep in command_eps:
if allowlist_set is not None and not is_entrypoint_allowed_fn(
ep, allowlist_set
):
continue
try:
get_command_fn(ep.name, allowlist=allowlist)
except ConfigError:
continue
errors = get_load_errors_fn()
if errors:
typer.echo("errors:")
for err in errors:
group = err.group
if group == ENGINE_GROUP:
group = "engine"
elif group == TRANSPORT_GROUP:
group = "transport"
elif group == COMMAND_GROUP:
group = "command"
dist = err.distribution or "unknown"
typer.echo(f" {group} {err.name} ({dist}): {err.error}")
def _resolve_cli_attr(name: str) -> object | None:
cli_module = sys.modules.get("takopi.cli")
if cli_module is None:
return None
return getattr(cli_module, name, None)
+419
View File
@@ -0,0 +1,419 @@
from __future__ import annotations
import os
import sys
from collections.abc import Callable
from functools import partial
from pathlib import Path
from typing import Any, cast
import anyio
import typer
from .. import __version__
from ..backends import EngineBackend
from ..config import ConfigError, load_or_init_config
from ..engines import get_backend
from ..ids import RESERVED_CHAT_COMMANDS
from ..lockfile import LockError, LockHandle, acquire_lock, token_fingerprint
from ..logging import get_logger, setup_logging
from ..runtime_loader import build_runtime_spec, resolve_plugins_allowlist
from ..settings import TakopiSettings, load_settings, load_settings_if_exists
from ..transports import SetupResult, get_transport
from .config import _config_path_display, _fail_missing_config
logger = get_logger(__name__)
def _load_settings_optional() -> tuple[TakopiSettings | None, Path | None]:
try:
loaded = load_settings_if_exists()
except ConfigError:
return None, None
if loaded is None:
return None, None
return loaded
def _resolve_transport_id(override: str | None) -> str:
if override is not None:
value = override.strip()
if not value:
raise ConfigError("Invalid `--transport`; expected a non-empty string.")
return value
load_or_init_config_fn = cast(
Callable[[], tuple[dict, Path]],
_resolve_cli_attr("load_or_init_config") or load_or_init_config,
)
try:
config, _ = load_or_init_config_fn()
except ConfigError:
return "telegram"
raw = config.get("transport")
if not isinstance(raw, str) or not raw.strip():
return "telegram"
return raw.strip()
def acquire_config_lock(config_path: Path, token: str | None) -> LockHandle:
fingerprint = token_fingerprint(token) if token else None
acquire_lock_fn = cast(
Callable[..., LockHandle],
_resolve_cli_attr("acquire_lock") or acquire_lock,
)
try:
return acquire_lock_fn(
config_path=config_path,
token_fingerprint=fingerprint,
)
except LockError as exc:
lines = str(exc).splitlines()
if lines:
typer.echo(lines[0], err=True)
if len(lines) > 1:
typer.echo("\n".join(lines[1:]), err=True)
else:
typer.echo("error: unknown error", err=True)
raise typer.Exit(code=1) from exc
def _default_engine_for_setup(
override: str | None,
*,
settings: TakopiSettings | None,
config_path: Path | None,
) -> str:
if override:
return override
if settings is None or config_path is None:
return "codex"
value = settings.default_engine
return value
def _resolve_setup_engine(
default_engine_override: str | None,
) -> tuple[
TakopiSettings | None,
Path | None,
list[str] | None,
str,
EngineBackend,
]:
load_settings_optional_fn = cast(
Callable[[], tuple[TakopiSettings | None, Path | None]],
_resolve_cli_attr("_load_settings_optional") or _load_settings_optional,
)
resolve_plugins_allowlist_fn = cast(
Callable[[TakopiSettings | None], list[str] | None],
_resolve_cli_attr("resolve_plugins_allowlist") or resolve_plugins_allowlist,
)
default_engine_for_setup_fn = cast(
Callable[..., str],
_resolve_cli_attr("_default_engine_for_setup") or _default_engine_for_setup,
)
get_backend_fn = cast(
Callable[..., EngineBackend],
_resolve_cli_attr("get_backend") or get_backend,
)
settings_hint, config_hint = load_settings_optional_fn()
allowlist = resolve_plugins_allowlist_fn(settings_hint)
default_engine = default_engine_for_setup_fn(
default_engine_override,
settings=settings_hint,
config_path=config_hint,
)
engine_backend = get_backend_fn(default_engine, allowlist=allowlist)
return settings_hint, config_hint, allowlist, default_engine, engine_backend
def _should_run_interactive() -> bool:
if os.environ.get("TAKOPI_NO_INTERACTIVE"):
return False
return sys.stdin.isatty() and sys.stdout.isatty()
def _setup_needs_config(setup: SetupResult) -> bool:
config_titles = {"create a config", "configure telegram"}
return any(issue.title in config_titles for issue in setup.issues)
def _run_auto_router(
*,
default_engine_override: str | None,
transport_override: str | None,
final_notify: bool,
debug: bool,
onboard: bool,
) -> None:
setup_logging_fn = cast(
Callable[..., None],
_resolve_cli_attr("setup_logging") or setup_logging,
)
resolve_setup_engine_fn = cast(
Callable[
[str | None],
tuple[
TakopiSettings | None,
Path | None,
list[str] | None,
str,
EngineBackend,
],
],
_resolve_cli_attr("_resolve_setup_engine") or _resolve_setup_engine,
)
resolve_transport_id_fn = cast(
Callable[[str | None], str],
_resolve_cli_attr("_resolve_transport_id") or _resolve_transport_id,
)
get_transport_fn = cast(
Callable[..., Any],
_resolve_cli_attr("get_transport") or get_transport,
)
should_run_interactive_fn = cast(
Callable[[], bool],
_resolve_cli_attr("_should_run_interactive") or _should_run_interactive,
)
setup_needs_config_fn = cast(
Callable[[SetupResult], bool],
_resolve_cli_attr("_setup_needs_config") or _setup_needs_config,
)
config_path_display_fn = cast(
Callable[[Path], str],
_resolve_cli_attr("_config_path_display") or _config_path_display,
)
fail_missing_config_fn = cast(
Callable[[Path], None],
_resolve_cli_attr("_fail_missing_config") or _fail_missing_config,
)
load_settings_fn = cast(
Callable[[], tuple[TakopiSettings, Path]],
_resolve_cli_attr("load_settings") or load_settings,
)
build_runtime_spec_fn = cast(
Callable[..., Any],
_resolve_cli_attr("build_runtime_spec") or build_runtime_spec,
)
acquire_config_lock_fn = cast(
Callable[[Path, str | None], LockHandle],
_resolve_cli_attr("acquire_config_lock") or acquire_config_lock,
)
if debug:
os.environ.setdefault("TAKOPI_LOG_FILE", "debug.log")
setup_logging_fn(debug=debug)
lock_handle: LockHandle | None = None
try:
(
settings_hint,
config_hint,
allowlist,
default_engine,
engine_backend,
) = resolve_setup_engine_fn(default_engine_override)
transport_id = resolve_transport_id_fn(transport_override)
transport_backend = get_transport_fn(transport_id, allowlist=allowlist)
except ConfigError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(code=1) from exc
if onboard:
if not should_run_interactive_fn():
typer.echo("error: --onboard requires a TTY", err=True)
raise typer.Exit(code=1)
if not anyio.run(partial(transport_backend.interactive_setup, force=True)):
raise typer.Exit(code=1)
(
settings_hint,
config_hint,
allowlist,
default_engine,
engine_backend,
) = resolve_setup_engine_fn(default_engine_override)
setup = transport_backend.check_setup(
engine_backend,
transport_override=transport_override,
)
if not setup.ok:
if setup_needs_config_fn(setup) and should_run_interactive_fn():
if setup.config_path.exists():
display = config_path_display_fn(setup.config_path)
run_onboard = typer.confirm(
f"config at {display} is missing/invalid for "
f"{transport_backend.id}, run onboarding now?",
default=False,
)
if run_onboard and anyio.run(
partial(transport_backend.interactive_setup, force=True)
):
(
settings_hint,
config_hint,
allowlist,
default_engine,
engine_backend,
) = resolve_setup_engine_fn(default_engine_override)
setup = transport_backend.check_setup(
engine_backend,
transport_override=transport_override,
)
elif anyio.run(partial(transport_backend.interactive_setup, force=False)):
(
settings_hint,
config_hint,
allowlist,
default_engine,
engine_backend,
) = resolve_setup_engine_fn(default_engine_override)
setup = transport_backend.check_setup(
engine_backend,
transport_override=transport_override,
)
if not setup.ok:
if setup_needs_config_fn(setup):
fail_missing_config_fn(setup.config_path)
else:
first = setup.issues[0]
typer.echo(f"error: {first.title}", err=True)
raise typer.Exit(code=1)
try:
settings, config_path = load_settings_fn()
if transport_override and transport_override != settings.transport:
settings = settings.model_copy(update={"transport": transport_override})
spec = build_runtime_spec_fn(
settings=settings,
config_path=config_path,
default_engine_override=default_engine_override,
reserved=RESERVED_CHAT_COMMANDS,
)
if settings.transport == "telegram":
transport_config = settings.transports.telegram
else:
transport_config = settings.transport_config(
settings.transport, config_path=config_path
)
lock_token = transport_backend.lock_token(
transport_config=transport_config,
_config_path=config_path,
)
lock_handle = acquire_config_lock_fn(config_path, lock_token)
runtime = spec.to_runtime(config_path=config_path)
transport_backend.build_and_run(
final_notify=final_notify,
default_engine_override=default_engine_override,
config_path=config_path,
transport_config=transport_config,
runtime=runtime,
)
except ConfigError as exc:
typer.echo(f"error: {exc}", err=True)
raise typer.Exit(code=1) from exc
except KeyboardInterrupt:
logger.info("shutdown.interrupted")
raise typer.Exit(code=130) from None
finally:
if lock_handle is not None:
lock_handle.release()
def _print_version_and_exit() -> None:
typer.echo(__version__)
raise typer.Exit()
def _version_callback(value: bool) -> None:
if value:
_print_version_and_exit()
def app_main(
ctx: typer.Context,
version: bool = typer.Option(
False,
"--version",
help="Show the version and exit.",
callback=_version_callback,
is_eager=True,
),
final_notify: bool = typer.Option(
True,
"--final-notify/--no-final-notify",
help="Send the final response as a new message (not an edit).",
),
onboard: bool = typer.Option(
False,
"--onboard/--no-onboard",
help="Run the interactive setup wizard before starting.",
),
transport: str | None = typer.Option(
None,
"--transport",
help="Override the transport backend id.",
),
debug: bool = typer.Option(
False,
"--debug/--no-debug",
help="Log engine JSONL, Telegram requests, and rendered messages.",
),
) -> None:
"""Takopi CLI."""
if ctx.invoked_subcommand is None:
run_auto_router = cast(
Callable[..., None],
_resolve_cli_attr("_run_auto_router") or _run_auto_router,
)
run_auto_router(
default_engine_override=None,
transport_override=transport,
final_notify=final_notify,
debug=debug,
onboard=onboard,
)
raise typer.Exit()
def make_engine_cmd(engine_id: str) -> Callable[..., None]:
def _cmd(
final_notify: bool = typer.Option(
True,
"--final-notify/--no-final-notify",
help="Send the final response as a new message (not an edit).",
),
onboard: bool = typer.Option(
False,
"--onboard/--no-onboard",
help="Run the interactive setup wizard before starting.",
),
transport: str | None = typer.Option(
None,
"--transport",
help="Override the transport backend id.",
),
debug: bool = typer.Option(
False,
"--debug/--no-debug",
help="Log engine JSONL, Telegram requests, and rendered messages.",
),
) -> None:
run_auto_router = cast(
Callable[..., None],
_resolve_cli_attr("_run_auto_router") or _run_auto_router,
)
run_auto_router(
default_engine_override=engine_id,
transport_override=transport,
final_notify=final_notify,
debug=debug,
onboard=onboard,
)
_cmd.__name__ = f"run_{engine_id}"
return _cmd
def _resolve_cli_attr(name: str) -> object | None:
cli_module = sys.modules.get("takopi.cli")
if cli_module is None:
return None
return getattr(cli_module, name, None)
+272 -172
View File
@@ -131,6 +131,15 @@ class JsonlRunState:
note_seq: int = 0
@dataclass(slots=True)
class JsonlStreamState:
expected_session: ResumeToken | None
found_session: ResumeToken | None = None
did_emit_completed: bool = False
ignored_after_completed: bool = False
jsonl_seq: int = 0
class JsonlSubprocessRunner(BaseRunner):
def get_logger(self) -> Any:
return getattr(self, "logger", get_logger(__name__))
@@ -340,6 +349,250 @@ class JsonlSubprocessRunner(BaseRunner):
raise RuntimeError(message)
return found_session, False
async def _send_payload(
self,
proc: Any,
payload: bytes | None,
*,
logger: Any,
resume: ResumeToken | None,
) -> None:
if payload is not None:
assert proc.stdin is not None
await proc.stdin.send(payload)
await proc.stdin.aclose()
logger.info(
"subprocess.stdin.send",
pid=proc.pid,
resume=resume.value if resume else None,
bytes=len(payload),
)
elif proc.stdin is not None:
await proc.stdin.aclose()
def _decode_jsonl_events(
self,
*,
raw_line: bytes,
line: bytes,
jsonl_seq: int,
state: Any,
resume: ResumeToken | None,
found_session: ResumeToken | None,
logger: Any,
pid: int,
) -> list[TakopiEvent]:
raw_text = raw_line.decode("utf-8", errors="replace")
line_text = line.decode("utf-8", errors="replace")
try:
decoded = self.decode_jsonl(line=line)
except Exception as exc: # noqa: BLE001
log_pipeline(
logger,
"jsonl.parse.error",
pid=pid,
jsonl_seq=jsonl_seq,
line=line_text,
error=str(exc),
)
return self.decode_error_events(
raw=raw_text,
line=line_text,
error=exc,
state=state,
)
if decoded is None:
log_pipeline(
logger,
"jsonl.parse.invalid",
pid=pid,
jsonl_seq=jsonl_seq,
line=line_text,
)
logger.info(
"runner.jsonl.invalid",
pid=pid,
jsonl_seq=jsonl_seq,
line=line_text,
)
return self.invalid_json_events(
raw=raw_text,
line=line_text,
state=state,
)
try:
return self.translate(
decoded,
state=state,
resume=resume,
found_session=found_session,
)
except Exception as exc: # noqa: BLE001
log_pipeline(
logger,
"runner.translate.error",
pid=pid,
jsonl_seq=jsonl_seq,
error=str(exc),
)
return self.translate_error_events(
data=decoded,
error=exc,
state=state,
)
def _process_started_event(
self,
event: StartedEvent,
*,
expected_session: ResumeToken | None,
found_session: ResumeToken | None,
logger: Any,
pid: int,
jsonl_seq: int,
) -> tuple[ResumeToken | None, bool]:
prior_found = found_session
try:
found_session, emit = self.handle_started_event(
event,
expected_session=expected_session,
found_session=found_session,
)
except Exception as exc:
log_pipeline(
logger,
"runner.started.error",
pid=pid,
jsonl_seq=jsonl_seq,
resume=event.resume.value,
expected_session=expected_session.value if expected_session else None,
found_session=prior_found.value if prior_found else None,
error=str(exc),
)
raise
if prior_found is None and emit:
reason = (
"matched_expected" if expected_session is not None else "first_seen"
)
elif prior_found is not None and not emit:
reason = "duplicate"
else:
reason = "unknown"
log_pipeline(
logger,
"runner.started.seen",
pid=pid,
jsonl_seq=jsonl_seq,
resume=event.resume.value,
expected_session=expected_session.value if expected_session else None,
found_session=found_session.value if found_session else None,
emit=emit,
reason=reason,
)
return found_session, emit
def _log_completed_event(
self,
*,
logger: Any,
pid: int,
event: CompletedEvent,
jsonl_seq: int | None = None,
source: str | None = None,
) -> None:
payload: dict[str, Any] = {
"pid": pid,
"ok": event.ok,
"has_answer": bool(event.answer.strip()),
"emit": True,
}
if jsonl_seq is not None:
payload["jsonl_seq"] = jsonl_seq
if source is not None:
payload["source"] = source
log_pipeline(logger, "runner.completed.seen", **payload)
def _handle_jsonl_line(
self,
*,
raw_line: bytes,
stream: JsonlStreamState,
state: Any,
resume: ResumeToken | None,
logger: Any,
pid: int,
) -> list[TakopiEvent]:
if stream.did_emit_completed:
if not stream.ignored_after_completed:
log_pipeline(
logger,
"runner.drop.jsonl_after_completed",
pid=pid,
)
stream.ignored_after_completed = True
return []
line = raw_line.strip()
if not line:
return []
stream.jsonl_seq += 1
seq = stream.jsonl_seq
events = self._decode_jsonl_events(
raw_line=raw_line,
line=line,
jsonl_seq=seq,
state=state,
resume=resume,
found_session=stream.found_session,
logger=logger,
pid=pid,
)
output: list[TakopiEvent] = []
for evt in events:
if isinstance(evt, StartedEvent):
stream.found_session, emit = self._process_started_event(
evt,
expected_session=stream.expected_session,
found_session=stream.found_session,
logger=logger,
pid=pid,
jsonl_seq=seq,
)
if not emit:
continue
if isinstance(evt, CompletedEvent):
stream.did_emit_completed = True
self._log_completed_event(
logger=logger,
pid=pid,
event=evt,
jsonl_seq=seq,
)
output.append(evt)
break
output.append(evt)
return output
async def _iter_jsonl_events(
self,
*,
stdout: Any,
stream: JsonlStreamState,
state: Any,
resume: ResumeToken | None,
logger: Any,
pid: int,
) -> AsyncIterator[TakopiEvent]:
async for raw_line in self.iter_json_lines(stdout):
for evt in self._handle_jsonl_line(
raw_line=raw_line,
stream=stream,
state=state,
resume=resume,
logger=logger,
pid=pid,
):
yield evt
async def run_impl(
self, prompt: str, resume: ResumeToken | None
) -> AsyncIterator[TakopiEvent]:
@@ -381,25 +634,10 @@ class JsonlSubprocessRunner(BaseRunner):
pid=proc.pid,
)
if payload is not None:
assert proc.stdin is not None
await proc.stdin.send(payload)
await proc.stdin.aclose()
logger.info(
"subprocess.stdin.send",
pid=proc.pid,
resume=resume.value if resume else None,
bytes=len(payload),
)
elif proc.stdin is not None:
await proc.stdin.aclose()
await self._send_payload(proc, payload, logger=logger, resume=resume)
rc: int | None = None
expected_session: ResumeToken | None = resume
found_session: ResumeToken | None = None
did_emit_completed = False
ignored_after_completed = False
jsonl_seq = 0
stream = JsonlStreamState(expected_session=resume)
async with anyio.create_task_group() as tg:
tg.start_soon(
@@ -408,154 +646,22 @@ class JsonlSubprocessRunner(BaseRunner):
logger,
tag,
)
async for raw_line in self.iter_json_lines(proc.stdout):
if did_emit_completed:
if not ignored_after_completed:
log_pipeline(
logger,
"runner.drop.jsonl_after_completed",
pid=proc.pid,
)
ignored_after_completed = True
continue
line = raw_line.strip()
if not line:
continue
jsonl_seq += 1
seq = jsonl_seq
raw_text = raw_line.decode("utf-8", errors="replace")
line_text = line.decode("utf-8", errors="replace")
try:
decoded = self.decode_jsonl(line=line)
except Exception as exc: # noqa: BLE001
log_pipeline(
logger,
"jsonl.parse.error",
pid=proc.pid,
jsonl_seq=seq,
line=line_text,
error=str(exc),
)
events = self.decode_error_events(
raw=raw_text,
line=line_text,
error=exc,
state=state,
)
else:
if decoded is None:
log_pipeline(
logger,
"jsonl.parse.invalid",
pid=proc.pid,
jsonl_seq=seq,
line=line_text,
)
logger.info(
"runner.jsonl.invalid",
pid=proc.pid,
jsonl_seq=seq,
line=line_text,
)
events = self.invalid_json_events(
raw=raw_text,
line=line_text,
state=state,
)
else:
try:
events = self.translate(
decoded,
state=state,
resume=resume,
found_session=found_session,
)
except Exception as exc: # noqa: BLE001
log_pipeline(
logger,
"runner.translate.error",
pid=proc.pid,
jsonl_seq=seq,
error=str(exc),
)
events = self.translate_error_events(
data=decoded,
error=exc,
state=state,
)
for evt in events:
if isinstance(evt, StartedEvent):
prior_found = found_session
try:
found_session, emit = self.handle_started_event(
evt,
expected_session=expected_session,
found_session=found_session,
)
except Exception as exc:
log_pipeline(
logger,
"runner.started.error",
pid=proc.pid,
jsonl_seq=seq,
resume=evt.resume.value,
expected_session=expected_session.value
if expected_session
else None,
found_session=prior_found.value
if prior_found
else None,
error=str(exc),
)
raise
if prior_found is None and emit:
reason = (
"matched_expected"
if expected_session is not None
else "first_seen"
)
elif prior_found is not None and not emit:
reason = "duplicate"
else:
reason = "unknown"
log_pipeline(
logger,
"runner.started.seen",
pid=proc.pid,
jsonl_seq=seq,
resume=evt.resume.value,
expected_session=expected_session.value
if expected_session
else None,
found_session=found_session.value
if found_session
else None,
emit=emit,
reason=reason,
)
if not emit:
continue
if isinstance(evt, CompletedEvent):
did_emit_completed = True
log_pipeline(
logger,
"runner.completed.seen",
pid=proc.pid,
jsonl_seq=seq,
ok=evt.ok,
has_answer=bool(evt.answer.strip()),
emit=True,
)
yield evt
break
yield evt
async for evt in self._iter_jsonl_events(
stdout=proc.stdout,
stream=stream,
state=state,
resume=resume,
logger=logger,
pid=proc.pid,
):
yield evt
rc = await proc.wait()
logger.info("subprocess.exit", pid=proc.pid, rc=rc)
if did_emit_completed:
if stream.did_emit_completed:
return
found_session = stream.found_session
if rc is not None and rc != 0:
events = self.process_error_events(
rc,
@@ -565,13 +671,10 @@ class JsonlSubprocessRunner(BaseRunner):
)
for evt in events:
if isinstance(evt, CompletedEvent):
log_pipeline(
logger,
"runner.completed.seen",
self._log_completed_event(
logger=logger,
pid=proc.pid,
ok=evt.ok,
has_answer=bool(evt.answer.strip()),
emit=True,
event=evt,
source="process_error",
)
yield evt
@@ -584,13 +687,10 @@ class JsonlSubprocessRunner(BaseRunner):
)
for evt in events:
if isinstance(evt, CompletedEvent):
log_pipeline(
logger,
"runner.completed.seen",
self._log_completed_event(
logger=logger,
pid=proc.pid,
ok=evt.ok,
has_answer=bool(evt.answer.strip()),
emit=True,
event=evt,
source="stream_end",
)
yield evt
+1 -4
View File
@@ -27,10 +27,7 @@ async def _check_agent_permissions(
if sender_id is None:
await reply(text="cannot verify sender for agent defaults.")
return False
is_private = msg.chat_type == "private"
if msg.chat_type is None:
is_private = msg.chat_id > 0
if is_private:
if msg.is_private:
return True
member = await cfg.bot.get_chat_member(msg.chat_id, sender_id)
if member is None:
@@ -107,10 +107,7 @@ async def _check_file_permissions(
await reply(text="file transfer is not allowed for this user.")
return False
return True
is_private = msg.chat_type == "private"
if msg.chat_type is None:
is_private = msg.chat_id > 0
if is_private:
if msg.is_private:
return True
member = await cfg.bot.get_chat_member(msg.chat_id, sender_id)
if member is None:
+43
View File
@@ -0,0 +1,43 @@
from __future__ import annotations
# ruff: noqa: F401
from .agent import _handle_agent_command as handle_agent_command
from .dispatch import _dispatch_command as dispatch_command
from .executor import _run_engine as run_engine
from .executor import _should_show_resume_line as should_show_resume_line
from .file_transfer import _handle_file_command as handle_file_command
from .file_transfer import _handle_file_put_default as handle_file_put_default
from .file_transfer import _save_file_put as save_file_put
from .media import _handle_media_group as handle_media_group
from .menu import _reserved_commands as get_reserved_commands
from .menu import _set_command_menu as set_command_menu
from .model import _handle_model_command as handle_model_command
from .parse import _parse_slash_command as parse_slash_command
from .reasoning import _handle_reasoning_command as handle_reasoning_command
from .topics import _handle_chat_new_command as handle_chat_new_command
from .topics import _handle_ctx_command as handle_ctx_command
from .topics import _handle_new_command as handle_new_command
from .topics import _handle_topic_command as handle_topic_command
from .trigger import _handle_trigger_command as handle_trigger_command
__all__ = [
"dispatch_command",
"get_reserved_commands",
"handle_agent_command",
"handle_chat_new_command",
"handle_ctx_command",
"handle_file_command",
"handle_file_put_default",
"handle_media_group",
"handle_model_command",
"handle_new_command",
"handle_reasoning_command",
"handle_topic_command",
"handle_trigger_command",
"parse_slash_command",
"run_engine",
"save_file_put",
"set_command_menu",
"should_show_resume_line",
]
+60 -129
View File
@@ -3,14 +3,20 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from ...context import RunContext
from ...directives import DirectiveError
from ..chat_prefs import ChatPrefsStore
from ..engine_defaults import resolve_engine_for_message
from ..engine_overrides import EngineOverrides, resolve_override_value
from ..files import split_command_args
from ..topic_state import TopicStateStore
from ..topics import _topic_key
from ..types import TelegramIncomingMessage
from .overrides import (
ENGINE_SOURCE_LABELS,
OVERRIDE_SOURCE_LABELS,
apply_engine_override,
parse_set_args,
require_admin_or_private,
resolve_engine_selection,
)
from .reply import make_reply
if TYPE_CHECKING:
@@ -22,79 +28,6 @@ MODEL_USAGE = (
)
async def _check_model_permissions(
cfg: TelegramBridgeConfig, msg: TelegramIncomingMessage
) -> bool:
reply = make_reply(cfg, msg)
sender_id = msg.sender_id
if sender_id is None:
await reply(text="cannot verify sender for model overrides.")
return False
is_private = msg.chat_type == "private"
if msg.chat_type is None:
is_private = msg.chat_id > 0
if is_private:
return True
member = await cfg.bot.get_chat_member(msg.chat_id, sender_id)
if member is None:
await reply(text="failed to verify model override permissions.")
return False
if member.status in {"creator", "administrator"}:
return True
await reply(text="changing model overrides is restricted to group admins.")
return False
async def _resolve_engine_selection(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
*,
ambient_context: RunContext | None,
topic_store: TopicStateStore | None,
chat_prefs: ChatPrefsStore | None,
topic_key: tuple[int, int] | None,
) -> tuple[str, str] | None:
reply = make_reply(cfg, msg)
try:
resolved = cfg.runtime.resolve_message(
text="",
reply_text=msg.reply_to_text,
ambient_context=ambient_context,
chat_id=msg.chat_id,
)
except DirectiveError as exc:
await reply(text=f"error:\n{exc}")
return None
selection = await resolve_engine_for_message(
runtime=cfg.runtime,
context=resolved.context,
explicit_engine=None,
chat_id=msg.chat_id,
topic_key=topic_key,
topic_store=topic_store,
chat_prefs=chat_prefs,
)
return selection.engine, selection.source
def _parse_set_args(
tokens: tuple[str, ...], *, engine_ids: set[str]
) -> tuple[str | None, str | None]:
if len(tokens) < 2:
return None, None
if len(tokens) == 2:
maybe_engine = tokens[1].strip().lower()
if maybe_engine in engine_ids:
return None, None
return None, tokens[1].strip()
maybe_engine = tokens[1].strip().lower()
if maybe_engine in engine_ids:
model = " ".join(tokens[2:]).strip()
return maybe_engine, model or None
model = " ".join(tokens[1:]).strip()
return None, model or None
async def _handle_model_command(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
@@ -117,7 +50,7 @@ async def _handle_model_command(
engine_ids = {engine.lower() for engine in cfg.runtime.engine_ids}
if action in {"show", ""}:
selection = await _resolve_engine_selection(
selection = await resolve_engine_selection(
cfg,
msg,
ambient_context=ambient_context,
@@ -141,21 +74,11 @@ async def _handle_model_command(
chat_override=chat_override,
field="model",
)
source_labels = {
"directive": "directive",
"topic_default": "topic default",
"chat_default": "chat default",
"project_default": "project default",
"global_default": "global default",
}
override_labels = {
"topic_override": "topic override",
"chat_default": "chat default",
"default": "no override",
}
engine_line = f"engine: {engine} ({source_labels[engine_source]})"
engine_line = f"engine: {engine} ({ENGINE_SOURCE_LABELS[engine_source]})"
model_value = resolution.value or "default"
model_line = f"model: {model_value} ({override_labels[resolution.source]})"
model_line = (
f"model: {model_value} ({OVERRIDE_SOURCE_LABELS[resolution.source]})"
)
topic_label = resolution.topic_value or "none"
if tkey is None:
topic_label = "none"
@@ -170,14 +93,20 @@ async def _handle_model_command(
return
if action == "set":
engine_arg, model = _parse_set_args(tokens, engine_ids=engine_ids)
engine_arg, model = parse_set_args(tokens, engine_ids=engine_ids)
if model is None:
await reply(text=MODEL_USAGE)
return
if not await _check_model_permissions(cfg, msg):
if not await require_admin_or_private(
cfg,
msg,
missing_sender="cannot verify sender for model overrides.",
failed_member="failed to verify model override permissions.",
denied="changing model overrides is restricted to group admins.",
):
return
if engine_arg is None:
selection = await _resolve_engine_selection(
selection = await resolve_engine_selection(
cfg,
msg,
ambient_context=ambient_context,
@@ -196,16 +125,23 @@ async def _handle_model_command(
text=f"unknown engine `{engine}`.\navailable agents: `{available}`"
)
return
if tkey is not None:
if topic_store is None:
await reply(text="topic model overrides are unavailable.")
return
current = await topic_store.get_engine_override(tkey[0], tkey[1], engine)
updated = EngineOverrides(
scope = await apply_engine_override(
reply=reply,
tkey=tkey,
topic_store=topic_store,
chat_prefs=chat_prefs,
chat_id=msg.chat_id,
engine=engine,
update=lambda current: EngineOverrides(
model=model,
reasoning=current.reasoning if current is not None else None,
)
await topic_store.set_engine_override(tkey[0], tkey[1], engine, updated)
),
topic_unavailable="topic model overrides are unavailable.",
chat_unavailable="chat model overrides are unavailable (no config path).",
)
if scope is None:
return
if scope == "topic":
await reply(
text=(
f"topic model override set to `{model}` for `{engine}`.\n"
@@ -213,15 +149,6 @@ async def _handle_model_command(
)
)
return
if chat_prefs is None:
await reply(text="chat model overrides are unavailable (no config path).")
return
current = await chat_prefs.get_engine_override(msg.chat_id, engine)
updated = EngineOverrides(
model=model,
reasoning=current.reasoning if current is not None else None,
)
await chat_prefs.set_engine_override(msg.chat_id, engine, updated)
await reply(
text=(
f"chat model override set to `{model}` for `{engine}`.\n"
@@ -237,10 +164,16 @@ async def _handle_model_command(
return
if len(tokens) == 2:
engine = tokens[1].strip().lower() or None
if not await _check_model_permissions(cfg, msg):
if not await require_admin_or_private(
cfg,
msg,
missing_sender="cannot verify sender for model overrides.",
failed_member="failed to verify model override permissions.",
denied="changing model overrides is restricted to group admins.",
):
return
if engine is None:
selection = await _resolve_engine_selection(
selection = await resolve_engine_selection(
cfg,
msg,
ambient_context=ambient_context,
@@ -257,27 +190,25 @@ async def _handle_model_command(
text=f"unknown engine `{engine}`.\navailable agents: `{available}`"
)
return
if tkey is not None:
if topic_store is None:
await reply(text="topic model overrides are unavailable.")
return
current = await topic_store.get_engine_override(tkey[0], tkey[1], engine)
updated = EngineOverrides(
scope = await apply_engine_override(
reply=reply,
tkey=tkey,
topic_store=topic_store,
chat_prefs=chat_prefs,
chat_id=msg.chat_id,
engine=engine,
update=lambda current: EngineOverrides(
model=None,
reasoning=current.reasoning if current is not None else None,
)
await topic_store.set_engine_override(tkey[0], tkey[1], engine, updated)
),
topic_unavailable="topic model overrides are unavailable.",
chat_unavailable="chat model overrides are unavailable (no config path).",
)
if scope is None:
return
if scope == "topic":
await reply(text="topic model override cleared (using chat default).")
return
if chat_prefs is None:
await reply(text="chat model overrides are unavailable (no config path).")
return
current = await chat_prefs.get_engine_override(msg.chat_id, engine)
updated = EngineOverrides(
model=None,
reasoning=current.reasoning if current is not None else None,
)
await chat_prefs.set_engine_override(msg.chat_id, engine, updated)
await reply(text="chat model override cleared.")
return
+133
View File
@@ -0,0 +1,133 @@
from __future__ import annotations
from collections.abc import Awaitable, Callable
from typing import TYPE_CHECKING, Literal
from ...context import RunContext
from ...directives import DirectiveError
from ..chat_prefs import ChatPrefsStore
from ..engine_defaults import resolve_engine_for_message
from ..engine_overrides import EngineOverrides
from ..topic_state import TopicStateStore
from ..types import TelegramIncomingMessage
from .reply import make_reply
if TYPE_CHECKING:
from ..bridge import TelegramBridgeConfig
ENGINE_SOURCE_LABELS = {
"directive": "directive",
"topic_default": "topic default",
"chat_default": "chat default",
"project_default": "project default",
"global_default": "global default",
}
OVERRIDE_SOURCE_LABELS = {
"topic_override": "topic override",
"chat_default": "chat default",
"default": "no override",
}
async def require_admin_or_private(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
*,
missing_sender: str,
failed_member: str,
denied: str,
) -> bool:
reply = make_reply(cfg, msg)
sender_id = msg.sender_id
if sender_id is None:
await reply(text=missing_sender)
return False
if msg.is_private:
return True
member = await cfg.bot.get_chat_member(msg.chat_id, sender_id)
if member is None:
await reply(text=failed_member)
return False
if member.status in {"creator", "administrator"}:
return True
await reply(text=denied)
return False
async def resolve_engine_selection(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
*,
ambient_context: RunContext | None,
topic_store: TopicStateStore | None,
chat_prefs: ChatPrefsStore | None,
topic_key: tuple[int, int] | None,
) -> tuple[str, str] | None:
reply = make_reply(cfg, msg)
try:
resolved = cfg.runtime.resolve_message(
text="",
reply_text=msg.reply_to_text,
ambient_context=ambient_context,
chat_id=msg.chat_id,
)
except DirectiveError as exc:
await reply(text=f"error:\n{exc}")
return None
selection = await resolve_engine_for_message(
runtime=cfg.runtime,
context=resolved.context,
explicit_engine=None,
chat_id=msg.chat_id,
topic_key=topic_key,
topic_store=topic_store,
chat_prefs=chat_prefs,
)
return selection.engine, selection.source
def parse_set_args(
tokens: tuple[str, ...], *, engine_ids: set[str]
) -> tuple[str | None, str | None]:
if len(tokens) < 2:
return None, None
if len(tokens) == 2:
maybe_engine = tokens[1].strip().lower()
if maybe_engine in engine_ids:
return None, None
return None, tokens[1].strip()
maybe_engine = tokens[1].strip().lower()
if maybe_engine in engine_ids:
value = " ".join(tokens[2:]).strip()
return maybe_engine, value or None
value = " ".join(tokens[1:]).strip()
return None, value or None
async def apply_engine_override(
*,
reply: Callable[..., Awaitable[None]],
tkey: tuple[int, int] | None,
topic_store: TopicStateStore | None,
chat_prefs: ChatPrefsStore | None,
chat_id: int,
engine: str,
update: Callable[[EngineOverrides | None], EngineOverrides],
topic_unavailable: str,
chat_unavailable: str,
) -> Literal["topic", "chat"] | None:
if tkey is not None:
if topic_store is None:
await reply(text=topic_unavailable)
return None
current = await topic_store.get_engine_override(tkey[0], tkey[1], engine)
updated = update(current)
await topic_store.set_engine_override(tkey[0], tkey[1], engine, updated)
return "topic"
if chat_prefs is None:
await reply(text=chat_unavailable)
return None
current = await chat_prefs.get_engine_override(chat_id, engine)
updated = update(current)
await chat_prefs.set_engine_override(chat_id, engine, updated)
return "chat"
+59 -133
View File
@@ -3,9 +3,7 @@ from __future__ import annotations
from typing import TYPE_CHECKING
from ...context import RunContext
from ...directives import DirectiveError
from ..chat_prefs import ChatPrefsStore
from ..engine_defaults import resolve_engine_for_message
from ..engine_overrides import (
EngineOverrides,
allowed_reasoning_levels,
@@ -15,6 +13,14 @@ from ..files import split_command_args
from ..topic_state import TopicStateStore
from ..topics import _topic_key
from ..types import TelegramIncomingMessage
from .overrides import (
ENGINE_SOURCE_LABELS,
OVERRIDE_SOURCE_LABELS,
apply_engine_override,
parse_set_args,
require_admin_or_private,
resolve_engine_selection,
)
from .reply import make_reply
if TYPE_CHECKING:
@@ -26,79 +32,6 @@ REASONING_USAGE = (
)
async def _check_reasoning_permissions(
cfg: TelegramBridgeConfig, msg: TelegramIncomingMessage
) -> bool:
reply = make_reply(cfg, msg)
sender_id = msg.sender_id
if sender_id is None:
await reply(text="cannot verify sender for reasoning overrides.")
return False
is_private = msg.chat_type == "private"
if msg.chat_type is None:
is_private = msg.chat_id > 0
if is_private:
return True
member = await cfg.bot.get_chat_member(msg.chat_id, sender_id)
if member is None:
await reply(text="failed to verify reasoning override permissions.")
return False
if member.status in {"creator", "administrator"}:
return True
await reply(text="changing reasoning overrides is restricted to group admins.")
return False
async def _resolve_engine_selection(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
*,
ambient_context: RunContext | None,
topic_store: TopicStateStore | None,
chat_prefs: ChatPrefsStore | None,
topic_key: tuple[int, int] | None,
) -> tuple[str, str] | None:
reply = make_reply(cfg, msg)
try:
resolved = cfg.runtime.resolve_message(
text="",
reply_text=msg.reply_to_text,
ambient_context=ambient_context,
chat_id=msg.chat_id,
)
except DirectiveError as exc:
await reply(text=f"error:\n{exc}")
return None
selection = await resolve_engine_for_message(
runtime=cfg.runtime,
context=resolved.context,
explicit_engine=None,
chat_id=msg.chat_id,
topic_key=topic_key,
topic_store=topic_store,
chat_prefs=chat_prefs,
)
return selection.engine, selection.source
def _parse_set_args(
tokens: tuple[str, ...], *, engine_ids: set[str]
) -> tuple[str | None, str | None]:
if len(tokens) < 2:
return None, None
if len(tokens) == 2:
maybe_engine = tokens[1].strip().lower()
if maybe_engine in engine_ids:
return None, None
return None, tokens[1].strip()
maybe_engine = tokens[1].strip().lower()
if maybe_engine in engine_ids:
level = " ".join(tokens[2:]).strip()
return maybe_engine, level or None
level = " ".join(tokens[1:]).strip()
return None, level or None
async def _handle_reasoning_command(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
@@ -121,7 +54,7 @@ async def _handle_reasoning_command(
engine_ids = {engine.lower() for engine in cfg.runtime.engine_ids}
if action in {"show", ""}:
selection = await _resolve_engine_selection(
selection = await resolve_engine_selection(
cfg,
msg,
ambient_context=ambient_context,
@@ -145,22 +78,11 @@ async def _handle_reasoning_command(
chat_override=chat_override,
field="reasoning",
)
source_labels = {
"directive": "directive",
"topic_default": "topic default",
"chat_default": "chat default",
"project_default": "project default",
"global_default": "global default",
}
override_labels = {
"topic_override": "topic override",
"chat_default": "chat default",
"default": "no override",
}
engine_line = f"engine: {engine} ({source_labels[engine_source]})"
engine_line = f"engine: {engine} ({ENGINE_SOURCE_LABELS[engine_source]})"
reasoning_value = resolution.value or "default"
reasoning_line = (
f"reasoning: {reasoning_value} ({override_labels[resolution.source]})"
f"reasoning: {reasoning_value} "
f"({OVERRIDE_SOURCE_LABELS[resolution.source]})"
)
topic_label = resolution.topic_value or "none"
if tkey is None:
@@ -179,14 +101,20 @@ async def _handle_reasoning_command(
return
if action == "set":
engine_arg, level = _parse_set_args(tokens, engine_ids=engine_ids)
engine_arg, level = parse_set_args(tokens, engine_ids=engine_ids)
if level is None:
await reply(text=REASONING_USAGE)
return
if not await _check_reasoning_permissions(cfg, msg):
if not await require_admin_or_private(
cfg,
msg,
missing_sender="cannot verify sender for reasoning overrides.",
failed_member="failed to verify reasoning override permissions.",
denied="changing reasoning overrides is restricted to group admins.",
):
return
if engine_arg is None:
selection = await _resolve_engine_selection(
selection = await resolve_engine_selection(
cfg,
msg,
ambient_context=ambient_context,
@@ -215,16 +143,23 @@ async def _handle_reasoning_command(
)
)
return
if tkey is not None:
if topic_store is None:
await reply(text="topic reasoning overrides are unavailable.")
return
current = await topic_store.get_engine_override(tkey[0], tkey[1], engine)
updated = EngineOverrides(
scope = await apply_engine_override(
reply=reply,
tkey=tkey,
topic_store=topic_store,
chat_prefs=chat_prefs,
chat_id=msg.chat_id,
engine=engine,
update=lambda current: EngineOverrides(
model=current.model if current is not None else None,
reasoning=normalized_level,
)
await topic_store.set_engine_override(tkey[0], tkey[1], engine, updated)
),
topic_unavailable="topic reasoning overrides are unavailable.",
chat_unavailable="chat reasoning overrides are unavailable (no config path).",
)
if scope is None:
return
if scope == "topic":
await reply(
text=(
f"topic reasoning override set to `{normalized_level}` "
@@ -233,17 +168,6 @@ async def _handle_reasoning_command(
)
)
return
if chat_prefs is None:
await reply(
text="chat reasoning overrides are unavailable (no config path)."
)
return
current = await chat_prefs.get_engine_override(msg.chat_id, engine)
updated = EngineOverrides(
model=current.model if current is not None else None,
reasoning=normalized_level,
)
await chat_prefs.set_engine_override(msg.chat_id, engine, updated)
await reply(
text=(
f"chat reasoning override set to `{normalized_level}` for `{engine}`.\n"
@@ -259,10 +183,16 @@ async def _handle_reasoning_command(
return
if len(tokens) == 2:
engine = tokens[1].strip().lower() or None
if not await _check_reasoning_permissions(cfg, msg):
if not await require_admin_or_private(
cfg,
msg,
missing_sender="cannot verify sender for reasoning overrides.",
failed_member="failed to verify reasoning override permissions.",
denied="changing reasoning overrides is restricted to group admins.",
):
return
if engine is None:
selection = await _resolve_engine_selection(
selection = await resolve_engine_selection(
cfg,
msg,
ambient_context=ambient_context,
@@ -279,29 +209,25 @@ async def _handle_reasoning_command(
text=f"unknown engine `{engine}`.\navailable agents: `{available}`"
)
return
if tkey is not None:
if topic_store is None:
await reply(text="topic reasoning overrides are unavailable.")
return
current = await topic_store.get_engine_override(tkey[0], tkey[1], engine)
updated = EngineOverrides(
scope = await apply_engine_override(
reply=reply,
tkey=tkey,
topic_store=topic_store,
chat_prefs=chat_prefs,
chat_id=msg.chat_id,
engine=engine,
update=lambda current: EngineOverrides(
model=current.model if current is not None else None,
reasoning=None,
)
await topic_store.set_engine_override(tkey[0], tkey[1], engine, updated)
),
topic_unavailable="topic reasoning overrides are unavailable.",
chat_unavailable="chat reasoning overrides are unavailable (no config path).",
)
if scope is None:
return
if scope == "topic":
await reply(text="topic reasoning override cleared (using chat default).")
return
if chat_prefs is None:
await reply(
text="chat reasoning overrides are unavailable (no config path)."
)
return
current = await chat_prefs.get_engine_override(msg.chat_id, engine)
updated = EngineOverrides(
model=current.model if current is not None else None,
reasoning=None,
)
await chat_prefs.set_engine_override(msg.chat_id, engine, updated)
await reply(text="chat reasoning override cleared.")
return
+15 -25
View File
@@ -8,6 +8,7 @@ from ..topic_state import TopicStateStore
from ..topics import _topic_key
from ..trigger_mode import resolve_trigger_mode
from ..types import TelegramIncomingMessage
from .overrides import require_admin_or_private
from .reply import make_reply
if TYPE_CHECKING:
@@ -18,29 +19,6 @@ TRIGGER_USAGE = (
)
async def _check_trigger_permissions(
cfg: TelegramBridgeConfig, msg: TelegramIncomingMessage
) -> bool:
reply = make_reply(cfg, msg)
sender_id = msg.sender_id
if sender_id is None:
await reply(text="cannot verify sender for trigger settings.")
return False
is_private = msg.chat_type == "private"
if msg.chat_type is None:
is_private = msg.chat_id > 0
if is_private:
return True
member = await cfg.bot.get_chat_member(msg.chat_id, sender_id)
if member is None:
await reply(text="failed to verify trigger permissions.")
return False
if member.status in {"creator", "administrator"}:
return True
await reply(text="changing trigger mode is restricted to group admins.")
return False
async def _handle_trigger_command(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
@@ -91,7 +69,13 @@ async def _handle_trigger_command(
return
if action in {"all", "mentions"}:
if not await _check_trigger_permissions(cfg, msg):
if not await require_admin_or_private(
cfg,
msg,
missing_sender="cannot verify sender for trigger settings.",
failed_member="failed to verify trigger permissions.",
denied="changing trigger mode is restricted to group admins.",
):
return
if tkey is not None:
if topic_store is None:
@@ -108,7 +92,13 @@ async def _handle_trigger_command(
return
if action == "clear":
if not await _check_trigger_permissions(cfg, msg):
if not await require_admin_or_private(
cfg,
msg,
missing_sender="cannot verify sender for trigger settings.",
failed_member="failed to verify trigger permissions.",
denied="changing trigger mode is restricted to group admins.",
):
return
if tkey is not None:
if topic_store is None:
File diff suppressed because it is too large Load Diff
+6
View File
@@ -42,6 +42,12 @@ class TelegramIncomingMessage:
document: TelegramDocument | None = None
raw: dict[str, Any] | None = None
@property
def is_private(self) -> bool:
if self.chat_type is not None:
return self.chat_type == "private"
return self.chat_id > 0
@dataclass(frozen=True, slots=True)
class TelegramCallbackQuery: