# Takopi Architecture & Lifecycle ## Layer Diagram ```mermaid flowchart TB subgraph CLI["CLI Layer"] cli[cli.py] cli_desc["Entry point, config loading, lock file"] end subgraph Plugins["Plugin Layer"] entrypoints[plugins.py
entrypoint discovery] engines[engines.py] transports[transports.py] commands[commands.py] api[api.py
public plugin API] end subgraph Orchestration["Orchestration Layer"] router[AutoRouter
router.py] scheduler[ThreadScheduler
scheduler.py] projects[ProjectsConfig
config.py] runtime[TransportRuntime
transport_runtime.py] end subgraph Bridge["Bridge Layer"] tg_bridge[telegram/bridge.py
run_main_loop] runner_bridge[runner_bridge.py
handle_message] end subgraph Runner["Runner Layer"] runner_proto[Runner Protocol
runner.py] runners[runners/
claude, codex, opencode, pi] schemas[schemas/
JSONL decoders] end subgraph Transport["Transport Layer"] transport[Transport Protocol] presenter[Presenter Protocol] tg_client[telegram/client.py] tg_render[telegram/render.py] markdown[markdown.py] end subgraph External["External"] agent_clis[Agent CLIs
claude, codex, pi] telegram_api[Telegram Bot API] end cli --> router cli --> scheduler cli --> projects cli --> engines cli --> transports cli --> commands engines --> entrypoints transports --> entrypoints commands --> entrypoints router --> runtime projects --> runtime router --> tg_bridge scheduler --> tg_bridge runtime --> tg_bridge tg_bridge --> commands tg_bridge --> runner_bridge runner_bridge --> runner_proto runner_proto --> runners runners --> schemas runners --> agent_clis runner_bridge --> transport runner_bridge --> presenter transport --> tg_client presenter --> tg_render presenter --> markdown tg_client --> telegram_api ``` --- ## Plugin Architecture Takopi discovers plugins via Python entrypoints and keeps loading lazy: - **Engine backends** (`takopi.engine_backends`) - **Transport backends** (`takopi.transport_backends`) - **Command backends** (`takopi.command_backends`) Entrypoint names become plugin IDs, are validated up front (reserved names, regex), and are only loaded when needed. The public surface for plugin authors lives in `takopi.api`, while transports and commands interact with core routing via `TransportRuntime`. --- ## Domain Model ```mermaid classDiagram class ResumeToken { +engine: EngineId +value: str } class Action { +id: str +kind: ActionKind +title: str +detail: dict } class StartedEvent { +type: "started" +engine: EngineId +resume: ResumeToken +title: str? } class ActionEvent { +type: "action" +engine: EngineId +action: Action +phase: started|updated|completed +ok: bool? +message: str? } class CompletedEvent { +type: "completed" +engine: EngineId +ok: bool +answer: str +resume: ResumeToken? +usage: dict? } StartedEvent --> ResumeToken ActionEvent --> Action CompletedEvent --> ResumeToken note for Action "ActionKind: command | tool | file_change |\nweb_search | subagent | note | turn | warning | telemetry" ``` --- ## Message Lifecycle ```mermaid sequenceDiagram participant User participant Telegram participant Bridge as telegram/bridge.py participant Scheduler as ThreadScheduler participant RunnerBridge as runner_bridge.py participant Runner participant AgentCLI as Agent CLI participant Command as Command Plugin User->>Telegram: Send message Telegram->>Bridge: poll_incoming() Bridge->>Bridge: Parse slash command alt Command plugin Bridge->>Command: handle(ctx) Command->>RunnerBridge: run_one/run_many (optional) RunnerBridge->>Telegram: Send progress/final else Default routing Bridge->>Bridge: Parse directives
(/engine, /project, @branch) Bridge->>Bridge: Extract resume token
from reply Bridge->>Bridge: Resolve worktree
(if @branch) Bridge->>Scheduler: enqueue(ThreadJob) Scheduler->>RunnerBridge: handle_message() RunnerBridge->>Telegram: Send progress message RunnerBridge->>Runner: run(prompt, resume) end Runner->>AgentCLI: Spawn subprocess loop JSONL Stream AgentCLI-->>Runner: JSONL event Runner-->>RunnerBridge: TakopiEvent RunnerBridge->>Telegram: Edit progress message end AgentCLI-->>Runner: Completed Runner-->>RunnerBridge: CompletedEvent RunnerBridge->>Telegram: Send final answer RunnerBridge->>Telegram: Delete progress message ``` --- ## Runner Execution Flow ```mermaid flowchart TD A[runner.run\nprompt, resume_token] --> B[Acquire Session Lock
SessionLockMixin] B --> C[Build Command] C --> D{Engine?} D -->|Claude| D1["claude --print --output-format stream-json
[--resume id] prompt"] D -->|Codex| D2["codex exec --json
[resume <token>] -"] D -->|Pi| D3["pi --print --mode json
--session <id> <prompt>"] D -->|OpenCode| D4["opencode run --format json
[--session id] -- <prompt>"] D1 --> E[Spawn Subprocess
anyio.open_process] D2 --> E D3 --> E D4 --> E E --> F[Stream JSONL from stdout] F --> G[Decode with msgspec] G --> H[Translate to TakopiEvent] H --> I[yield event] I --> F F -->|EOF| J[Return] ``` --- ## Resume Token Flow ```mermaid sequenceDiagram participant User participant Bridge participant Runner participant CLI as Agent CLI Note over User,CLI: New Conversation User->>Bridge: "fix the bug" Bridge->>Runner: run(prompt, None) Runner->>CLI: claude "fix the bug" CLI-->>Runner: StartedEvent(resume=abc123) Runner-->>Bridge: Stream events Bridge->>User: Final message with:
claude --resume abc123
ctx: project @branch Note over User,CLI: Resume Conversation User->>Bridge: Reply: "now add tests" Bridge->>Bridge: extract_resume(reply_text)
→ ResumeToken(claude, abc123) Bridge->>Bridge: parse_ctx_line()
→ project, branch Bridge->>Runner: run("now add tests", token) Runner->>CLI: claude --resume abc123 "now add tests" CLI-->>Runner: Continues session Runner-->>Bridge: Stream events Bridge->>User: Final message ``` --- ## Component Dependencies ```mermaid flowchart TD cli[cli.py] --> config[config.py] cli --> engines[engines.py] cli --> transports[transports.py] cli --> commands[commands.py] cli --> lockfile[lockfile.py] engines --> plugins[plugins.py] transports --> plugins commands --> plugins engines --> backends[backends.py] backends --> runners[runners/] backends --> runner[runner.py] subgraph runners[runners/] claude[claude.py] codex[codex.py] opencode[opencode.py] pi[pi.py] end subgraph schemas[schemas/] claude_s[claude.py] codex_s[codex.py] opencode_s[opencode.py] pi_s[pi.py] end claude --> claude_s codex --> codex_s opencode --> opencode_s pi --> pi_s cli --> router[router.py] tg_bridge --> runtime[transport_runtime.py] runtime --> router runtime --> config tg_bridge --> commands runner --> runner_bridge[runner_bridge.py] runner_bridge --> tg_bridge tg_bridge --> client[telegram/client.py] tg_bridge --> render[telegram/render.py] client --> transport[transport.py] runner_bridge --> progress[progress.py] runner_bridge --> events[events.py] render --> presenter[presenter.py] presenter --> markdown[markdown.py] ``` --- ## Configuration Structure ```mermaid flowchart LR subgraph Config["~/.takopi/"] toml[takopi.toml] lock[takopi.lock] end subgraph toml_contents["takopi.toml"] direction TB global["transport
default_engine
default_project"] telegram_cfg["[transports.telegram]
bot_token = ...
chat_id = ..."] plugins_cfg["[plugins]
enabled = [...]"] plugins_extra["[plugins.mycommand]
setting = ..."] claude_cfg["[claude]
model = ..."] codex_cfg["[codex]
model = ..."] projects_cfg["[projects.alias]
path = ...
worktrees_dir = ...
default_engine = ..."] end toml --> toml_contents ``` --- ## Thread Scheduling ```mermaid flowchart TD subgraph Incoming[Incoming Messages] m1[Message 1
new thread] m2[Message 2
reply to thread A] m3[Message 3
reply to thread A] m4[Message 4
new thread] end subgraph Scheduler[ThreadScheduler] direction TB q1[Thread A Queue] q2[Thread B Queue] q3[Thread C Queue] end subgraph Workers[Worker Tasks] w1[Worker A] w2[Worker B] w3[Worker C] end m1 --> q2 m2 --> q1 m3 --> q1 m4 --> q3 q1 --> w1 q2 --> w2 q3 --> w3 w1 --> runner1[Runner.run] w2 --> runner2[Runner.run] w3 --> runner3[Runner.run] note1[Jobs in same thread
execute sequentially] note2[Different threads
execute in parallel] ``` --- ## Summary | Layer | Components | Responsibility | |-------|------------|----------------| | **CLI** | `cli.py` | Entry point, config, lock | | **Plugins** | `plugins.py`, `engines.py`, `transports.py`, `commands.py`, `api.py` | Entrypoint discovery, plugin loading, public API boundary | | **Orchestration** | `router.py`, `scheduler.py`, `config.py` | Engine selection, job queuing, project config | | **Bridge** | `telegram/bridge.py`, `runner_bridge.py` | Message handling, execution coordination | | **Runner** | `runner.py`, `runners/*.py`, `schemas/*.py` | Agent CLI subprocess, JSONL parsing, event translation | | **Transport** | `transport.py`, `presenter.py`, `telegram/client.py` | Telegram API, message rendering | | **Domain** | `model.py`, `progress.py`, `events.py` | Event types, action tracking | | **Utils** | `worktrees.py`, `utils/*.py`, `markdown.py` | Git worktrees, formatting, paths |