feat: add interactive onboarding (#39)

This commit is contained in:
banteg
2026-01-03 22:32:29 +04:00
committed by GitHub
parent 0a56d4002f
commit ae1718dbe8
14 changed files with 594 additions and 142 deletions
+1 -1
View File
@@ -128,7 +128,7 @@ Auto-discovers runner modules in `takopi.runners` that export `BACKEND`.
```python ```python
def load_telegram_config() -> tuple[dict, Path]: def load_telegram_config() -> tuple[dict, Path]:
# Loads ./.takopi/takopi.toml, then ~/.takopi/takopi.toml # Loads ~/.takopi/takopi.toml
``` ```
### `logging.py` - Secure logging setup ### `logging.py` - Secure logging setup
+2 -5
View File
@@ -62,17 +62,14 @@ Takopi should document this clearly: if permissions arent configured and Clau
## Config additions ## Config additions
Takopi config lives at either: Takopi config lives at `~/.takopi/takopi.toml`.
* `.takopi/takopi.toml` (project-local), or
* `~/.takopi/takopi.toml` (home). (Existing Takopi behavior.)
Add a new optional `[claude]` section. Add a new optional `[claude]` section.
Recommended v1 schema: Recommended v1 schema:
```toml ```toml
# .takopi/takopi.toml # ~/.takopi/takopi.toml
default_engine = "claude" default_engine = "claude"
+2 -5
View File
@@ -50,17 +50,14 @@ Pi does not accept `-- <prompt>` to protect prompts starting with `-`. Takopi pr
## Config additions ## Config additions
Takopi config lives at either: Takopi config lives at `~/.takopi/takopi.toml`.
* `.takopi/takopi.toml` (project-local), or
* `~/.takopi/takopi.toml` (home).
Add a new optional `[pi]` section. Add a new optional `[pi]` section.
Recommended v1 schema: Recommended v1 schema:
```toml ```toml
# .takopi/takopi.toml # ~/.takopi/takopi.toml
default_engine = "pi" default_engine = "pi"
+1
View File
@@ -10,6 +10,7 @@ dependencies = [
"anyio>=4.12.0", "anyio>=4.12.0",
"httpx>=0.28.1", "httpx>=0.28.1",
"markdown-it-py", "markdown-it-py",
"questionary>=2.1.1",
"rich>=14.2.0", "rich>=14.2.0",
"sulguk>=0.11.1", "sulguk>=0.11.1",
"typer>=0.21.0", "typer>=0.21.0",
+10 -5
View File
@@ -34,14 +34,19 @@ parallel runs across threads, per thread queue support.
## setup ## setup
1. get `bot_token` from [@BotFather](https://t.me/BotFather) run `takopi` in a TTY and follow the interactive prompts. it will:
2. get `chat_id` from [@myidbot](https://t.me/myidbot)
3. send `/start` to the bot (telegram won't let it message you first) - help you create a bot token (via @BotFather)
4. run your agent cli once interactively in the repo to trust the directory - capture your `chat_id` from the most recent message you send to the bot
- check installed agents and set a default engine
to re-run onboarding (and overwrite config), use `takopi --onboard`.
run your agent cli once interactively in the repo to trust the directory.
## config ## config
global config `~/.takopi/takopi.toml`, repo-level config `.takopi/takopi.toml` global config `~/.takopi/takopi.toml`
```toml ```toml
default_engine = "codex" default_engine = "codex"
+62 -4
View File
@@ -3,6 +3,7 @@ from __future__ import annotations
import logging import logging
import os import os
import shutil import shutil
import sys
from collections.abc import Callable from collections.abc import Callable
from pathlib import Path from pathlib import Path
@@ -16,7 +17,7 @@ from .config import ConfigError, load_telegram_config
from .engines import get_backend, get_engine_config, list_backends from .engines import get_backend, get_engine_config, list_backends
from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint
from .logging import setup_logging from .logging import setup_logging
from .onboarding import check_setup, render_setup_guide from .onboarding import SetupResult, check_setup, interactive_setup
from .router import AutoRouter, RunnerEntry from .router import AutoRouter, RunnerEntry
from .telegram import TelegramClient from .telegram import TelegramClient
@@ -223,8 +224,35 @@ def _parse_bridge_config(
) )
def _config_path_display(path: Path) -> str:
home = Path.home()
try:
return f"~/{path.relative_to(home)}"
except ValueError:
return str(path)
def _should_run_interactive() -> bool:
if os.environ.get("TAKOPI_NO_INTERACTIVE"):
return False
return sys.stdin.isatty() and sys.stdout.isatty()
def _setup_needs_config(setup: SetupResult) -> bool:
return any(issue.title == "create a config" for issue in setup.issues)
def _fail_missing_config(path: Path) -> None:
display = _config_path_display(path)
typer.echo(f"error: missing takopi config at {display}", err=True)
def _run_auto_router( def _run_auto_router(
*, default_engine_override: str | None, final_notify: bool, debug: bool *,
default_engine_override: str | None,
final_notify: bool,
debug: bool,
onboard: bool,
) -> None: ) -> None:
setup_logging(debug=debug) setup_logging(debug=debug)
lock_handle: LockHandle | None = None lock_handle: LockHandle | None = None
@@ -234,10 +262,28 @@ def _run_auto_router(
except ConfigError as e: except ConfigError as e:
typer.echo(f"error: {e}", err=True) typer.echo(f"error: {e}", err=True)
raise typer.Exit(code=1) raise typer.Exit(code=1)
if onboard:
if not _should_run_interactive():
typer.echo("error: --onboard requires a TTY", err=True)
raise typer.Exit(code=1)
if not interactive_setup(force=True):
raise typer.Exit(code=1)
default_engine = _default_engine_for_setup(default_engine_override)
backend = get_backend(default_engine)
setup = check_setup(backend) setup = check_setup(backend)
if not setup.ok: if not setup.ok:
render_setup_guide(setup) if _setup_needs_config(setup) and _should_run_interactive():
raise typer.Exit(code=1) if interactive_setup(force=False):
default_engine = _default_engine_for_setup(default_engine_override)
backend = get_backend(default_engine)
setup = check_setup(backend)
if not setup.ok:
if _setup_needs_config(setup):
_fail_missing_config(setup.config_path)
else:
first = setup.issues[0]
typer.echo(f"error: {first.title}", err=True)
raise typer.Exit(code=1)
try: try:
config, config_path, token, chat_id = load_and_validate_config() config, config_path, token, chat_id = load_and_validate_config()
lock_handle = acquire_config_lock(config_path, token) lock_handle = acquire_config_lock(config_path, token)
@@ -283,6 +329,11 @@ def app_main(
"--final-notify/--no-final-notify", "--final-notify/--no-final-notify",
help="Send the final response as a new message (not an edit).", help="Send the final response as a new message (not an edit).",
), ),
onboard: bool = typer.Option(
False,
"--onboard/--no-onboard",
help="Run the interactive setup wizard before starting.",
),
debug: bool = typer.Option( debug: bool = typer.Option(
False, False,
"--debug/--no-debug", "--debug/--no-debug",
@@ -295,6 +346,7 @@ def app_main(
default_engine_override=None, default_engine_override=None,
final_notify=final_notify, final_notify=final_notify,
debug=debug, debug=debug,
onboard=onboard,
) )
raise typer.Exit() raise typer.Exit()
@@ -306,6 +358,11 @@ def make_engine_cmd(engine_id: str) -> Callable[..., None]:
"--final-notify/--no-final-notify", "--final-notify/--no-final-notify",
help="Send the final response as a new message (not an edit).", help="Send the final response as a new message (not an edit).",
), ),
onboard: bool = typer.Option(
False,
"--onboard/--no-onboard",
help="Run the interactive setup wizard before starting.",
),
debug: bool = typer.Option( debug: bool = typer.Option(
False, False,
"--debug/--no-debug", "--debug/--no-debug",
@@ -316,6 +373,7 @@ def make_engine_cmd(engine_id: str) -> Callable[..., None]:
default_engine_override=engine_id, default_engine_override=engine_id,
final_notify=final_notify, final_notify=final_notify,
debug=debug, debug=debug,
onboard=onboard,
) )
_cmd.__name__ = f"run_{engine_id}" _cmd.__name__ = f"run_{engine_id}"
+4 -19
View File
@@ -3,7 +3,6 @@ from __future__ import annotations
import tomllib import tomllib
from pathlib import Path from pathlib import Path
LOCAL_CONFIG_NAME = Path(".takopi") / "takopi.toml"
HOME_CONFIG_PATH = Path.home() / ".takopi" / "takopi.toml" HOME_CONFIG_PATH = Path.home() / ".takopi" / "takopi.toml"
@@ -11,13 +10,6 @@ class ConfigError(RuntimeError):
pass pass
def _config_candidates() -> list[Path]:
candidates = [Path.cwd() / LOCAL_CONFIG_NAME, HOME_CONFIG_PATH]
if candidates[0] == candidates[1]:
return [candidates[0]]
return candidates
def _read_config(cfg_path: Path) -> dict: def _read_config(cfg_path: Path) -> dict:
try: try:
raw = cfg_path.read_text(encoding="utf-8") raw = cfg_path.read_text(encoding="utf-8")
@@ -35,14 +27,7 @@ def load_telegram_config(path: str | Path | None = None) -> tuple[dict, Path]:
if path: if path:
cfg_path = Path(path).expanduser() cfg_path = Path(path).expanduser()
return _read_config(cfg_path), cfg_path return _read_config(cfg_path), cfg_path
cfg_path = HOME_CONFIG_PATH
config_candidates = _config_candidates() if cfg_path.exists() and not cfg_path.is_file():
for candidate in config_candidates: raise ConfigError(f"Config path {cfg_path} exists but is not a file.") from None
if candidate.exists() and not candidate.is_file(): return _read_config(cfg_path), cfg_path
raise ConfigError(
f"Config path {candidate} exists but is not a file."
) from None
if candidate.is_file():
return _read_config(candidate), candidate
checked_display = ", ".join(str(candidate) for candidate in config_candidates)
raise ConfigError(f"Missing takopi config. Checked: {checked_display}")
-54
View File
@@ -1,54 +0,0 @@
from __future__ import annotations
import typer
from .backends import SetupIssue
from .config import ConfigError
from .engines import get_backend, list_backend_ids
from .onboarding import SetupResult, check_setup, config_issue, render_setup_guide
def _dedupe_issues(issues: list[SetupIssue]) -> list[SetupIssue]:
seen: set[SetupIssue] = set()
deduped: list[SetupIssue] = []
for issue in issues:
if issue in seen:
continue
seen.add(issue)
deduped.append(issue)
return deduped
def run(
engine: str = typer.Option(
"codex",
"--engine",
help=f"Engine backend id ({', '.join(list_backend_ids())}).",
),
force: bool = typer.Option(
True,
"--force/--no-force",
help="Render onboarding panel even if setup looks OK.",
),
) -> None:
try:
backend = get_backend(engine)
except ConfigError as e:
typer.echo(str(e), err=True)
raise typer.Exit(code=1)
setup = check_setup(backend)
if force:
forced_issues = [config_issue(setup.config_path)]
setup = SetupResult(
issues=_dedupe_issues([*setup.issues, *forced_issues]),
config_path=setup.config_path,
)
render_setup_guide(setup)
def main() -> None:
typer.run(run)
if __name__ == "__main__":
main()
+367 -48
View File
@@ -1,18 +1,31 @@
from __future__ import annotations from __future__ import annotations
import logging
import shutil import shutil
from contextlib import contextmanager
from dataclasses import dataclass from dataclasses import dataclass
from pathlib import Path from pathlib import Path
from typing import Any
import anyio
import questionary
from prompt_toolkit import PromptSession
from prompt_toolkit.formatted_text import to_formatted_text
from prompt_toolkit.key_binding import KeyBindings
from prompt_toolkit.keys import Keys
from questionary.constants import DEFAULT_QUESTION_PREFIX
from questionary.question import Question
from questionary.styles import merge_styles_default
from rich import box
from rich.console import Console from rich.console import Console
from rich.panel import Panel from rich.panel import Panel
from rich.table import Table
from .backends import EngineBackend, SetupIssue from .backends import EngineBackend, SetupIssue
from .backends_helpers import install_issue from .backends_helpers import install_issue
from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config
from .engines import list_backends
_OCTOPUS = "\N{OCTOPUS}" from .telegram import TelegramClient
@dataclass(slots=True) @dataclass(slots=True)
@@ -25,24 +38,45 @@ class SetupResult:
return not self.issues return not self.issues
@dataclass(frozen=True, slots=True)
class ChatInfo:
chat_id: int
username: str | None
title: str | None
first_name: str | None
last_name: str | None
chat_type: str | None
@property
def is_group(self) -> bool:
return self.chat_type in {"group", "supergroup"}
@property
def display(self) -> str:
if self.is_group:
if self.title:
return f'group "{self.title}"'
return "group chat"
if self.chat_type == "channel":
if self.title:
return f'channel "{self.title}"'
return "channel"
if self.username:
return f"@{self.username}"
full_name = " ".join(part for part in [self.first_name, self.last_name] if part)
return full_name or "private chat"
def _display_path(path: Path) -> str:
home = Path.home()
try:
return f"~/{path.relative_to(home)}"
except ValueError:
return str(path)
def config_issue(path: Path) -> SetupIssue: def config_issue(path: Path) -> SetupIssue:
config_display = _config_path_display(path) return SetupIssue("create a config", (f" {_display_path(path)}",))
return SetupIssue(
"create a config",
(
f" [dim]{config_display}[/]",
"",
' [cyan]bot_token[/] = [green]"123456789:ABCdef..."[/]',
" [cyan]chat_id[/] = [green]123456789[/]",
"",
"[dim]" + ("-" * 56) + "[/]",
"",
"[bold]getting your telegram credentials:[/]",
"",
" [cyan]bot_token[/] create a bot with [link=https://t.me/BotFather]@BotFather[/]",
" [cyan]chat_id[/] message [link=https://t.me/myidbot]@myidbot[/] to get your id",
),
)
def check_setup(backend: EngineBackend) -> SetupResult: def check_setup(backend: EngineBackend) -> SetupResult:
@@ -74,39 +108,324 @@ def check_setup(backend: EngineBackend) -> SetupResult:
return SetupResult(issues=issues, config_path=config_path) return SetupResult(issues=issues, config_path=config_path)
def _config_path_display(path: Path) -> str: def _mask_token(token: str) -> str:
home = Path.home() token = token.strip()
if len(token) <= 12:
return "*" * len(token)
return f"{token[:9]}...{token[-5:]}"
def _toml_escape(value: str) -> str:
return value.replace("\\", "\\\\").replace('"', '\\"')
def _render_config(token: str, chat_id: int, default_engine: str | None) -> str:
lines: list[str] = []
if default_engine:
lines.append(f'default_engine = "{_toml_escape(default_engine)}"')
lines.append("")
lines.append(f'bot_token = "{_toml_escape(token)}"')
lines.append(f"chat_id = {chat_id}")
return "\n".join(lines) + "\n"
async def _get_bot_info(token: str) -> dict[str, Any] | None:
bot = TelegramClient(token)
try: try:
return f"~/{path.relative_to(home)}" return await bot.get_me()
except ValueError: finally:
return str(path) await bot.close()
def render_setup_guide(result: SetupResult) -> None: async def _wait_for_chat(token: str) -> ChatInfo:
if result.ok: bot = TelegramClient(token)
return try:
offset: int | None = None
allowed_updates = ["message"]
drained = await bot.get_updates(
offset=None, timeout_s=0, allowed_updates=allowed_updates
)
if drained:
offset = drained[-1]["update_id"] + 1
while True:
updates = await bot.get_updates(
offset=offset, timeout_s=50, allowed_updates=allowed_updates
)
if updates is None:
await anyio.sleep(1)
continue
if not updates:
continue
offset = updates[-1]["update_id"] + 1
update = updates[-1]
msg = update.get("message")
if not isinstance(msg, dict):
continue
sender = msg.get("from")
if isinstance(sender, dict) and sender.get("is_bot") is True:
continue
chat = msg.get("chat")
if not isinstance(chat, dict):
continue
chat_id = chat.get("id")
if not isinstance(chat_id, int):
continue
return ChatInfo(
chat_id=chat_id,
username=chat.get("username")
if isinstance(chat.get("username"), str)
else None,
title=chat.get("title") if isinstance(chat.get("title"), str) else None,
first_name=chat.get("first_name")
if isinstance(chat.get("first_name"), str)
else None,
last_name=chat.get("last_name")
if isinstance(chat.get("last_name"), str)
else None,
chat_type=chat.get("type")
if isinstance(chat.get("type"), str)
else None,
)
finally:
await bot.close()
console = Console(stderr=True)
parts: list[str] = []
step = 0
def add_step(title: str, *lines: str) -> None: async def _send_confirmation(token: str, chat_id: int) -> bool:
nonlocal step bot = TelegramClient(token)
step += 1 try:
parts.append(f"[bold yellow]{step}.[/] [bold]{title}[/]") res = await bot.send_message(
parts.append("") chat_id=chat_id,
parts.extend(lines) text="takopi is configured and ready.",
parts.append("") )
return res is not None
finally:
await bot.close()
for issue in result.issues:
add_step(issue.title, *issue.lines)
panel = Panel( def _render_engine_table(console: Console) -> list[tuple[str, bool, str | None]]:
"\n".join(parts).rstrip(), backends = list_backends()
title="[bold]welcome to takopi![/]", rows: list[tuple[str, bool, str | None]] = []
subtitle=f"{_OCTOPUS} setup required", table = Table(show_header=True, header_style="bold", box=box.SIMPLE)
border_style="yellow", table.add_column("agent")
padding=(1, 2), table.add_column("status")
expand=False, table.add_column("install command")
for backend in backends:
cmd = backend.cli_cmd or backend.id
installed = shutil.which(cmd) is not None
status = "[green]✓ installed[/]" if installed else "[dim]✗ not found[/]"
rows.append((backend.id, installed, backend.install_cmd))
table.add_row(
backend.id,
status,
"" if installed else (backend.install_cmd or "-"),
)
console.print(table)
return rows
@contextmanager
def _suppress_logging():
prev_disable = logging.root.manager.disable
logging.disable(logging.INFO)
try:
yield
finally:
logging.disable(prev_disable)
def _confirm(message: str, *, default: bool = True) -> bool | None:
merged_style = merge_styles_default([None])
status = {"answer": None, "complete": False}
def get_prompt_tokens():
tokens = [
("class:qmark", DEFAULT_QUESTION_PREFIX),
("class:question", f" {message} "),
]
if not status["complete"]:
tokens.append(("class:instruction", "(yes/no) "))
if status["answer"] is not None:
tokens.append(("class:answer", "yes" if status["answer"] else "no"))
return to_formatted_text(tokens)
def exit_with_result(event):
status["complete"] = True
event.app.exit(result=status["answer"])
bindings = KeyBindings()
@bindings.add(Keys.ControlQ, eager=True)
@bindings.add(Keys.ControlC, eager=True)
def _(event):
event.app.exit(exception=KeyboardInterrupt, style="class:aborting")
@bindings.add("n")
@bindings.add("N")
def key_n(event):
status["answer"] = False
exit_with_result(event)
@bindings.add("y")
@bindings.add("Y")
def key_y(event):
status["answer"] = True
exit_with_result(event)
@bindings.add(Keys.ControlH)
def key_backspace(event):
status["answer"] = None
@bindings.add(Keys.ControlM, eager=True)
def set_answer(event):
if status["answer"] is None:
status["answer"] = default
exit_with_result(event)
@bindings.add(Keys.Any)
def other(event):
_ = event
question = Question(
PromptSession(get_prompt_tokens, key_bindings=bindings, style=merged_style).app
) )
console.print(panel) return question.ask()
def _prompt_token(console: Console) -> tuple[str, dict[str, Any]] | None:
while True:
token = questionary.password("paste your bot token:").ask()
if token is None:
return None
token = token.strip()
if not token:
console.print(" token cannot be empty")
continue
console.print(" validating...")
info = anyio.run(_get_bot_info, token)
if info:
username = info.get("username")
if isinstance(username, str) and username:
console.print(f" connected to @{username}")
else:
name = info.get("first_name") or "your bot"
console.print(f" connected to {name}")
return token, info
console.print(" failed to connect, check the token and try again")
retry = _confirm("try again?", default=True)
if not retry:
return None
def interactive_setup(*, force: bool) -> bool:
console = Console()
config_path = HOME_CONFIG_PATH
suppress_logs = _suppress_logging()
if config_path.exists() and not force:
console.print(
f"config already exists at {_display_path(config_path)}. "
"use --onboard to reconfigure."
)
return True
if config_path.exists() and force:
overwrite = _confirm(
f"overwrite existing config at {_display_path(config_path)}?",
default=False,
)
if not overwrite:
return False
with suppress_logs:
panel = Panel(
"let's set up your telegram bot.",
title="welcome to takopi!",
border_style="yellow",
padding=(1, 2),
expand=False,
)
console.print(panel)
console.print("step 1: telegram bot setup\n")
have_token = _confirm("do you have a telegram bot token?")
if have_token is None:
return False
if not have_token:
console.print(" 1. open telegram and message @BotFather")
console.print(" 2. send /newbot and follow the prompts")
console.print(" 3. copy the token (looks like 123456789:ABCdef...)")
console.print("")
token_info = _prompt_token(console)
if token_info is None:
return False
token, info = token_info
bot_ref = f"@{info['username']}"
console.print("")
console.print(f" send /start to {bot_ref} (works in groups too)")
console.print(" waiting...")
try:
chat = anyio.run(_wait_for_chat, token)
except KeyboardInterrupt:
console.print(" cancelled")
return False
if chat is None:
console.print(" cancelled")
return False
console.print(f" got chat_id {chat.chat_id} from {chat.display}")
sent = anyio.run(_send_confirmation, token, chat.chat_id)
if sent:
console.print(" sent confirmation message")
else:
console.print(" could not send confirmation message")
console.print("\nstep 2: agent cli tools")
rows = _render_engine_table(console)
installed_ids = [engine_id for engine_id, installed, _ in rows if installed]
default_engine: str | None = None
if installed_ids:
default_engine = questionary.select(
"choose default agent:",
choices=installed_ids,
).ask()
if default_engine is None:
return False
else:
console.print("no agents found on PATH. install one to continue.")
config_preview = _render_config(
_mask_token(token),
chat.chat_id,
default_engine,
).rstrip()
console.print("\nstep 3: save configuration\n")
console.print(f" {_display_path(config_path)}\n")
for line in config_preview.splitlines():
console.print(f" {line}")
console.print("")
save = _confirm(
f"save this config to {_display_path(config_path)}?",
default=True,
)
if not save:
return False
config_path.parent.mkdir(parents=True, exist_ok=True)
config_text = _render_config(token, chat.chat_id, default_engine)
config_path.write_text(config_text, encoding="utf-8")
console.print(f" config saved to {_display_path(config_path)}")
done_panel = Panel(
"setup complete. starting takopi...",
border_style="green",
padding=(1, 2),
expand=False,
)
console.print("\n")
console.print(done_panel)
return True
+1 -1
View File
@@ -560,5 +560,5 @@ def build_runner(config: EngineConfig, config_path: Path) -> Runner:
BACKEND = EngineBackend( BACKEND = EngineBackend(
id="opencode", id="opencode",
build_runner=build_runner, build_runner=build_runner,
install_cmd="npm i -g opencode-ai@latest", install_cmd="npm install -g opencode-ai@latest",
) )
+6
View File
@@ -50,6 +50,8 @@ class BotClient(Protocol):
language_code: str | None = None, language_code: str | None = None,
) -> bool: ... ) -> bool: ...
async def get_me(self) -> dict | None: ...
class TelegramClient: class TelegramClient:
def __init__( def __init__(
@@ -207,3 +209,7 @@ class TelegramClient:
params["language_code"] = language_code params["language_code"] = language_code
res = await self._post("setMyCommands", params) res = await self._post("setMyCommands", params)
return bool(res) return bool(res)
async def get_me(self) -> dict | None:
res = await self._post("getMe", {})
return res if isinstance(res, dict) else None
+3
View File
@@ -257,6 +257,9 @@ class _FakeBot:
async def close(self) -> None: async def close(self) -> None:
return None return None
async def get_me(self) -> dict | None:
return {"id": 1}
class _FakeClock: class _FakeClock:
def __init__(self, start: float = 0.0) -> None: def __init__(self, start: float = 0.0) -> None:
+100
View File
@@ -0,0 +1,100 @@
from __future__ import annotations
from takopi import onboarding
from takopi.backends import EngineBackend
def test_mask_token_short() -> None:
assert onboarding._mask_token("short") == "*****"
def test_mask_token_long() -> None:
token = "123456789:ABCdefGH"
masked = onboarding._mask_token(token)
assert masked.startswith("123456789")
assert masked.endswith("defGH")
assert "..." in masked
def test_render_config_escapes() -> None:
config = onboarding._render_config(
'token"with\\quote',
123,
"codex",
)
assert 'default_engine = "codex"' in config
assert 'bot_token = "token\\"with\\\\quote"' in config
assert "chat_id = 123" in config
assert config.endswith("\n")
class _FakeQuestion:
def __init__(self, value):
self._value = value
def ask(self):
return self._value
def _queue(values):
it = iter(values)
def _make(*_args, **_kwargs):
return _FakeQuestion(next(it))
return _make
def _queue_values(values):
it = iter(values)
def _next(*_args, **_kwargs):
return next(it)
return _next
def test_interactive_setup_skips_when_config_exists(monkeypatch, tmp_path) -> None:
config_path = tmp_path / "takopi.toml"
config_path.write_text('bot_token = "token"\nchat_id = 123\n', encoding="utf-8")
monkeypatch.setattr(onboarding, "HOME_CONFIG_PATH", config_path)
assert onboarding.interactive_setup(force=False) is True
def test_interactive_setup_writes_config(monkeypatch, tmp_path) -> None:
config_path = tmp_path / "takopi.toml"
monkeypatch.setattr(onboarding, "HOME_CONFIG_PATH", config_path)
backend = EngineBackend(id="codex", build_runner=lambda _cfg, _path: None)
monkeypatch.setattr(onboarding, "list_backends", lambda: [backend])
monkeypatch.setattr(onboarding.shutil, "which", lambda _cmd: "/usr/bin/codex")
monkeypatch.setattr(onboarding, "_confirm", _queue_values([True, True]))
monkeypatch.setattr(
onboarding.questionary, "password", _queue(["123456789:ABCdef"])
)
monkeypatch.setattr(onboarding.questionary, "select", _queue(["codex"]))
def _fake_run(func, *args, **kwargs):
if func is onboarding._get_bot_info:
return {"username": "my_bot"}
if func is onboarding._wait_for_chat:
return onboarding.ChatInfo(
chat_id=123,
username="alice",
title=None,
first_name="Alice",
last_name=None,
chat_type="private",
)
if func is onboarding._send_confirmation:
return True
raise AssertionError(f"unexpected anyio.run target: {func}")
monkeypatch.setattr(onboarding.anyio, "run", _fake_run)
assert onboarding.interactive_setup(force=False) is True
saved = config_path.read_text(encoding="utf-8")
assert 'bot_token = "123456789:ABCdef"' in saved
assert "chat_id = 123" in saved
assert 'default_engine = "codex"' in saved
Generated
+35
View File
@@ -230,6 +230,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" }, { url = "https://files.pythonhosted.org/packages/54/20/4d324d65cc6d9205fabedc306948156824eb9f0ee1633355a8f7ec5c66bf/pluggy-1.6.0-py3-none-any.whl", hash = "sha256:e920276dd6813095e9377c0bc5566d94c932c33b27a3e3945d8389c374dd4746", size = 20538, upload-time = "2025-05-15T12:30:06.134Z" },
] ]
[[package]]
name = "prompt-toolkit"
version = "3.0.52"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "wcwidth" },
]
sdist = { url = "https://files.pythonhosted.org/packages/a1/96/06e01a7b38dce6fe1db213e061a4602dd6032a8a97ef6c1a862537732421/prompt_toolkit-3.0.52.tar.gz", hash = "sha256:28cde192929c8e7321de85de1ddbe736f1375148b02f2e17edd840042b1be855", size = 434198, upload-time = "2025-08-27T15:24:02.057Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/84/03/0d3ce49e2505ae70cf43bc5bb3033955d2fc9f932163e84dc0779cc47f48/prompt_toolkit-3.0.52-py3-none-any.whl", hash = "sha256:9aac639a3bbd33284347de5ad8d68ecc044b91a762dc39b7c21095fcd6a19955", size = 391431, upload-time = "2025-08-27T15:23:59.498Z" },
]
[[package]] [[package]]
name = "pygments" name = "pygments"
version = "2.19.2" version = "2.19.2"
@@ -282,6 +294,18 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" }, { url = "https://files.pythonhosted.org/packages/ee/49/1377b49de7d0c1ce41292161ea0f721913fa8722c19fb9c1e3aa0367eecb/pytest_cov-7.0.0-py3-none-any.whl", hash = "sha256:3b8e9558b16cc1479da72058bdecf8073661c7f57f7d3c5f22a1c23507f2d861", size = 22424, upload-time = "2025-09-09T10:57:00.695Z" },
] ]
[[package]]
name = "questionary"
version = "2.1.1"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "prompt-toolkit" },
]
sdist = { url = "https://files.pythonhosted.org/packages/f6/45/eafb0bba0f9988f6a2520f9ca2df2c82ddfa8d67c95d6625452e97b204a5/questionary-2.1.1.tar.gz", hash = "sha256:3d7e980292bb0107abaa79c68dd3eee3c561b83a0f89ae482860b181c8bd412d", size = 25845, upload-time = "2025-08-28T19:00:20.851Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/3c/26/1062c7ec1b053db9e499b4d2d5bc231743201b74051c973dadeac80a8f43/questionary-2.1.1-py3-none-any.whl", hash = "sha256:a51af13f345f1cdea62347589fbb6df3b290306ab8930713bfae4d475a7d4a59", size = 36753, upload-time = "2025-08-28T19:00:19.56Z" },
]
[[package]] [[package]]
name = "rich" name = "rich"
version = "14.2.0" version = "14.2.0"
@@ -360,6 +384,7 @@ dependencies = [
{ name = "anyio" }, { name = "anyio" },
{ name = "httpx" }, { name = "httpx" },
{ name = "markdown-it-py" }, { name = "markdown-it-py" },
{ name = "questionary" },
{ name = "rich" }, { name = "rich" },
{ name = "sulguk" }, { name = "sulguk" },
{ name = "typer" }, { name = "typer" },
@@ -379,6 +404,7 @@ requires-dist = [
{ name = "anyio", specifier = ">=4.12.0" }, { name = "anyio", specifier = ">=4.12.0" },
{ name = "httpx", specifier = ">=0.28.1" }, { name = "httpx", specifier = ">=0.28.1" },
{ name = "markdown-it-py" }, { name = "markdown-it-py" },
{ name = "questionary", specifier = ">=2.1.1" },
{ name = "rich", specifier = ">=14.2.0" }, { name = "rich", specifier = ">=14.2.0" },
{ name = "sulguk", specifier = ">=0.11.1" }, { name = "sulguk", specifier = ">=0.11.1" },
{ name = "typer", specifier = ">=0.21.0" }, { name = "typer", specifier = ">=0.21.0" },
@@ -442,6 +468,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" }, { url = "https://files.pythonhosted.org/packages/18/67/36e9267722cc04a6b9f15c7f3441c2363321a3ea07da7ae0c0707beb2a9c/typing_extensions-4.15.0-py3-none-any.whl", hash = "sha256:f0fa19c6845758ab08074a0cfa8b7aecb71c999ca73d62883bc25cc018c4e548", size = 44614, upload-time = "2025-08-25T13:49:24.86Z" },
] ]
[[package]]
name = "wcwidth"
version = "0.2.14"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/24/30/6b0809f4510673dc723187aeaf24c7f5459922d01e2f794277a3dfb90345/wcwidth-0.2.14.tar.gz", hash = "sha256:4d478375d31bc5395a3c55c40ccdf3354688364cd61c4f6adacaa9215d0b3605", size = 102293, upload-time = "2025-09-22T16:29:53.023Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/af/b5/123f13c975e9f27ab9c0770f514345bd406d0e8d3b7a0723af9d43f710af/wcwidth-0.2.14-py2.py3-none-any.whl", hash = "sha256:a7bb560c8aee30f9957e5f9895805edd20602f2d7f720186dfd906e82b4982e1", size = 37286, upload-time = "2025-09-22T16:29:51.641Z" },
]
[[package]] [[package]]
name = "webencodings" name = "webencodings"
version = "0.5.1" version = "0.5.1"