From b6c8e63f4e67e2084bdab5fce0d68d6ed7cf263e Mon Sep 17 00:00:00 2001 From: Isaac Paul Date: Thu, 4 Jun 2026 22:11:02 -0400 Subject: [PATCH] feat(runners): add runner_bridge context window display, enhance claude/codex/opencode runners and telegram session handling --- src/takopi/markdown.py | 2 + src/takopi/progress.py | 3 ++ src/takopi/runner_bridge.py | 58 ++++++++++++++++++++++++++++ src/takopi/runners/claude.py | 22 ++++++++++- src/takopi/runners/codex.py | 14 +++++-- src/takopi/runners/opencode.py | 16 ++++++++ src/takopi/schemas/codex.py | 1 + src/takopi/telegram/chat_sessions.py | 13 +++++++ src/takopi/telegram/loop.py | 10 +++++ src/takopi/telegram/topic_state.py | 13 +++++++ 10 files changed, 148 insertions(+), 4 deletions(-) diff --git a/src/takopi/markdown.py b/src/takopi/markdown.py index 84dc4d9..e7dc6e9 100644 --- a/src/takopi/markdown.py +++ b/src/takopi/markdown.py @@ -240,6 +240,8 @@ class MarkdownFormatter: def _format_footer(self, state: ProgressState) -> str | None: lines: list[str] = [] + if state.usage_line: + lines.append(state.usage_line) if state.context_line: lines.append(state.context_line) if state.resume_line: diff --git a/src/takopi/progress.py b/src/takopi/progress.py index 9201197..2aa46fb 100644 --- a/src/takopi/progress.py +++ b/src/takopi/progress.py @@ -25,6 +25,7 @@ class ProgressState: resume: ResumeToken | None resume_line: str | None context_line: str | None + usage_line: str | None = None class ProgressTracker: @@ -82,6 +83,7 @@ class ProgressTracker: *, resume_formatter: Callable[[ResumeToken], str] | None = None, context_line: str | None = None, + usage_line: str | None = None, ) -> ProgressState: resume_line: str | None = None if self.resume is not None and resume_formatter is not None: @@ -96,4 +98,5 @@ class ProgressTracker: resume=self.resume, resume_line=resume_line, context_line=context_line, + usage_line=usage_line, ) diff --git a/src/takopi/runner_bridge.py b/src/takopi/runner_bridge.py index c22c11d..a1b1a63 100644 --- a/src/takopi/runner_bridge.py +++ b/src/takopi/runner_bridge.py @@ -25,6 +25,61 @@ from .transport import ( logger = get_logger(__name__) +_CONTEXT_WINDOWS: dict[str, int] = { + "claude": 200_000, + "pi": 200_000, +} + + +def _fuzzy_tokens(n: int) -> str: + if n < 1000: + return str(n) + k = n / 1000 + formatted = f"{k:.2f}".rstrip("0").rstrip(".") + return f"{formatted}k" + + +def _format_usage_line(usage: dict | None, *, engine: str = "") -> str: + if not usage: + return "ctx: n/a" + inner: dict = usage.get("usage") or {} + is_claude_nested = bool(inner) + if is_claude_nested: + input_tokens: int | None = inner.get("input_tokens") + _out: int | None = inner.get("output_tokens") + ctx_tokens: int | None = ( + (input_tokens + _out) + if input_tokens is not None and _out is not None + else (input_tokens or _out) + ) + output_tokens: int | None = None # folded into ctx_tokens + else: + # codex: use last_usage (per-request) for accurate context window display. + # cumulative usage.input_tokens grows across API calls and exceeds window size. + last: dict = usage.get("last_usage") or {} + last_input: int | None = last.get("input_tokens") + last_cached: int | None = last.get("cached_input_tokens") + if last_input is not None: + ctx_tokens = last_input + (last_cached or 0) + else: + ctx_tokens = None + output_tokens = None + if ctx_tokens is None: + return "ctx: n/a" + ctx_str = _fuzzy_tokens(ctx_tokens) + context_window = _CONTEXT_WINDOWS.get(engine) + if context_window: + max_str = _fuzzy_tokens(context_window) + parts = [f"ctx: {ctx_str}/{max_str}"] + else: + parts = [f"ctx: {ctx_str}"] + if output_tokens is not None: + parts.append(f"out: {_fuzzy_tokens(output_tokens)}") + cost = usage.get("total_cost_usd") + if cost is not None: + parts.append(f"${cost:.4f}") + return " ยท ".join(parts) + def _log_runner_event(evt: TakopiEvent) -> None: for line in render_event_cli(evt): @@ -499,6 +554,7 @@ async def handle_message( state = progress_tracker.snapshot( resume_formatter=runner.format_resume, context_line=context_line, + usage_line=_format_usage_line(None, engine=runner.engine), ) final_rendered = cfg.presenter.render_final( state, @@ -535,6 +591,7 @@ async def handle_message( state = progress_tracker.snapshot( resume_formatter=runner.format_resume, context_line=context_line, + usage_line=_format_usage_line(None, engine=runner.engine), ) final_rendered = cfg.presenter.render_progress( state, @@ -589,6 +646,7 @@ async def handle_message( state = progress_tracker.snapshot( resume_formatter=runner.format_resume, context_line=context_line, + usage_line=_format_usage_line(completed.usage, engine=completed.engine), ) final_rendered = cfg.presenter.render_final( state, diff --git a/src/takopi/runners/claude.py b/src/takopi/runners/claude.py index fad4bbe..e740208 100644 --- a/src/takopi/runners/claude.py +++ b/src/takopi/runners/claude.py @@ -10,6 +10,7 @@ from typing import Any import msgspec from ..backends import EngineBackend, EngineConfig +from ..config import ConfigError from ..events import EventFactory from ..logging import get_logger from ..model import Action, ActionKind, EngineId, ResumeToken, TakopiEvent @@ -288,6 +289,7 @@ class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner): allowed_tools: list[str] | None = None dangerously_skip_permissions: bool = False use_api_billing: bool = False + extra_args: list[str] | None = None session_title: str = "claude" logger = logger @@ -311,6 +313,8 @@ class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner): args.extend(["--allowedTools", allowed_tools]) if self.dangerously_skip_permissions is True: args.append("--dangerously-skip-permissions") + if self.extra_args: + args.extend(self.extra_args) args.append("--") args.append(prompt) return args @@ -455,7 +459,11 @@ class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner): def build_runner(config: EngineConfig, _config_path: Path) -> Runner: - claude_cmd = shutil.which("claude") or "claude" + cmd_override = config.get("claude_cmd") + if isinstance(cmd_override, str) and cmd_override: + claude_cmd = shutil.which(cmd_override) or cmd_override + else: + claude_cmd = shutil.which("claude") or "claude" model = config.get("model") if "allowed_tools" in config: @@ -464,6 +472,17 @@ def build_runner(config: EngineConfig, _config_path: Path) -> Runner: allowed_tools = DEFAULT_ALLOWED_TOOLS dangerously_skip_permissions = config.get("dangerously_skip_permissions") is True use_api_billing = config.get("use_api_billing") is True + extra_args_raw = config.get("extra_args") + if extra_args_raw is None: + extra_args = None + elif isinstance(extra_args_raw, list) and all( + isinstance(x, str) for x in extra_args_raw + ): + extra_args = list(extra_args_raw) + else: + raise ConfigError( + f"Invalid `claude.extra_args` in {_config_path}; expected a list of strings." + ) title = str(model) if model is not None else "claude" return ClaudeRunner( @@ -472,6 +491,7 @@ def build_runner(config: EngineConfig, _config_path: Path) -> Runner: allowed_tools=allowed_tools, dangerously_skip_permissions=dangerously_skip_permissions, use_api_billing=use_api_billing, + extra_args=extra_args, session_title=title, ) diff --git a/src/takopi/runners/codex.py b/src/takopi/runners/codex.py index 4527d42..63d4c21 100644 --- a/src/takopi/runners/codex.py +++ b/src/takopi/runners/codex.py @@ -585,13 +585,16 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): title="turn started", ) ] - case codex_schema.TurnCompleted(usage=usage): + case codex_schema.TurnCompleted(usage=usage, last_usage=last_usage): resume_for_completed = found_session or resume + usage_dict = msgspec.to_builtins(usage) + if last_usage is not None: + usage_dict["last_usage"] = msgspec.to_builtins(last_usage) return [ factory.completed_ok( answer=state.final_answer or "", resume=resume_for_completed, - usage=msgspec.to_builtins(usage), + usage=usage_dict, ) ] case codex_schema.ItemCompleted( @@ -664,7 +667,12 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): def build_runner(config: EngineConfig, config_path: Path) -> Runner: - codex_cmd = "codex" + cmd_value = config.get("cmd") + if cmd_value is not None and not isinstance(cmd_value, str): + raise ConfigError( + f"Invalid `codex.cmd` in {config_path}; expected a string." + ) + codex_cmd = cmd_value if isinstance(cmd_value, str) else "codex" extra_args_value = config.get("extra_args") if extra_args_value is None: diff --git a/src/takopi/runners/opencode.py b/src/takopi/runners/opencode.py index 529be65..3b928f5 100644 --- a/src/takopi/runners/opencode.py +++ b/src/takopi/runners/opencode.py @@ -308,6 +308,7 @@ class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner): opencode_cmd: str = "opencode" model: str | None = None + extra_args: list[str] | None = None session_title: str = "opencode" logger = logger @@ -335,6 +336,8 @@ class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner): model = run_options.model if model is not None: args.extend(["--model", str(model)]) + if self.extra_args: + args.extend(self.extra_args) args.extend(["--", prompt]) return args @@ -486,11 +489,24 @@ def build_runner(config: EngineConfig, config_path: Path) -> Runner: f"Invalid `opencode.model` in {config_path}; expected a string." ) + extra_args_raw = config.get("extra_args") + if extra_args_raw is None: + extra_args = None + elif isinstance(extra_args_raw, list) and all( + isinstance(x, str) for x in extra_args_raw + ): + extra_args = list(extra_args_raw) + else: + raise ConfigError( + f"Invalid `opencode.extra_args` in {config_path}; expected a list of strings." + ) + title = str(model) if model is not None else "opencode" return OpenCodeRunner( opencode_cmd=opencode_cmd, model=model, + extra_args=extra_args, session_title=title, ) diff --git a/src/takopi/schemas/codex.py b/src/takopi/schemas/codex.py index 00a9b08..460a6b0 100644 --- a/src/takopi/schemas/codex.py +++ b/src/takopi/schemas/codex.py @@ -54,6 +54,7 @@ class TurnStarted(msgspec.Struct, tag="turn.started", kw_only=True): class TurnCompleted(msgspec.Struct, tag="turn.completed", kw_only=True): usage: Usage + last_usage: Usage | None = None class TurnFailed(msgspec.Struct, tag="turn.failed", kw_only=True): diff --git a/src/takopi/telegram/chat_sessions.py b/src/takopi/telegram/chat_sessions.py index 8bba540..725f893 100644 --- a/src/takopi/telegram/chat_sessions.py +++ b/src/takopi/telegram/chat_sessions.py @@ -90,6 +90,19 @@ class ChatSessionStore(JsonStateStore[_ChatSessionsState]): chat.sessions[token.engine] = _SessionState(resume=token.value) self._save_locked() + async def get_any_session_resume( + self, chat_id: int, owner_id: int | None + ) -> ResumeToken | None: + async with self._lock: + self._reload_locked_if_needed() + chat = self._get_chat_locked(chat_id, owner_id) + if chat is None: + return None + for engine, entry in chat.sessions.items(): + if entry.resume: + return ResumeToken(engine=engine, value=entry.resume) + return None + async def clear_sessions(self, chat_id: int, owner_id: int | None) -> None: async with self._lock: self._reload_locked_if_needed() diff --git a/src/takopi/telegram/loop.py b/src/takopi/telegram/loop.py index 6c6ba12..bd48406 100644 --- a/src/takopi/telegram/loop.py +++ b/src/takopi/telegram/loop.py @@ -717,6 +717,11 @@ class ResumeResolver: topic_key[1], engine_for_session, ) + if stored is None: + stored = await self._topic_store.get_any_session_resume( + topic_key[0], + topic_key[1], + ) if stored is not None: resume_token = stored if ( @@ -729,6 +734,11 @@ class ResumeResolver: chat_session_key[1], engine_for_session, ) + if stored is None: + stored = await self._chat_session_store.get_any_session_resume( + chat_session_key[0], + chat_session_key[1], + ) if stored is not None: resume_token = stored return ResumeDecision(resume_token=resume_token, handled_by_running_task=False) diff --git a/src/takopi/telegram/topic_state.py b/src/takopi/telegram/topic_state.py index ba4f60c..355897e 100644 --- a/src/takopi/telegram/topic_state.py +++ b/src/takopi/telegram/topic_state.py @@ -174,6 +174,19 @@ class TopicStateStore(JsonStateStore[_TopicState]): return None return ResumeToken(engine=engine, value=entry.resume) + async def get_any_session_resume( + self, chat_id: int, thread_id: int + ) -> ResumeToken | None: + async with self._lock: + self._reload_locked_if_needed() + thread = self._get_thread_locked(chat_id, thread_id) + if thread is None: + return None + for engine, entry in thread.sessions.items(): + if entry.resume: + return ResumeToken(engine=engine, value=entry.resume) + return None + async def get_default_engine(self, chat_id: int, thread_id: int) -> str | None: async with self._lock: self._reload_locked_if_needed()