diff --git a/docs/transports/telegram.md b/docs/transports/telegram.md index cd7a74b..95537e6 100644 --- a/docs/transports/telegram.md +++ b/docs/transports/telegram.md @@ -16,6 +16,24 @@ This document captures current behavior so transport changes stay intentional. 4. High-value messages enqueue a send. 5. All writes go through the outbox. +## Incoming messages + +`parse_incoming_update` accepts text messages and voice notes. + +If voice transcription is enabled, takopi downloads the voice payload from Telegram, +transcribes it with OpenAI, and routes the transcript through the same command and +directive pipeline as typed text. + +Configuration (under `[transports.telegram]`): + +```toml +voice_transcription = true +``` + +Set `OPENAI_API_KEY` in the environment. If transcription is enabled but the API key +is missing or the audio download fails, takopi replies with a short error and skips +the run. + ## Outbox model - Single worker processes one op at a time. diff --git a/readme.md b/readme.md index aa8be23..8904d1d 100644 --- a/readme.md +++ b/readme.md @@ -18,6 +18,8 @@ parallel runs across threads, per thread queue support. `/cancel` a running task. +optional voice note transcription for Telegram (routes transcript like typed text). + ## requirements - `uv` for installation (`curl -LsSf https://astral.sh/uv/install.sh | sh`) @@ -59,6 +61,9 @@ transport = "telegram" [transports.telegram] bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz" chat_id = 123456789 +voice_transcription = true + +# set OPENAI_API_KEY in your environment for voice transcription [codex] # optional: profile from ~/.codex/config.toml diff --git a/src/takopi/runners/codex.py b/src/takopi/runners/codex.py index aa57fd0..4e9061f 100644 --- a/src/takopi/runners/codex.py +++ b/src/takopi/runners/codex.py @@ -25,6 +25,18 @@ _RECONNECTING_RE = re.compile( r"^Reconnecting\.{3}\s*(?P\d+)/(?P\d+)\s*$", re.IGNORECASE, ) +_EXEC_ONLY_FLAGS = {"--skip-git-repo-check"} + + +def _split_exec_flags(extra_args: list[str]) -> tuple[list[str], list[str]]: + base_args: list[str] = [] + exec_args: list[str] = [] + for arg in extra_args: + if arg in _EXEC_ONLY_FLAGS: + exec_args.append(arg) + else: + base_args.append(arg) + return base_args, exec_args def _parse_reconnect_message(message: str) -> tuple[int, int] | None: @@ -397,7 +409,8 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): state: Any, ) -> list[str]: _ = prompt, state - args = [*self.extra_args, "exec", "--json"] + base_args, exec_args = _split_exec_flags(self.extra_args) + args = [*base_args, "exec", *exec_args, "--json"] if resume: args.extend(["resume", resume.value, "-"]) else: diff --git a/src/takopi/settings.py b/src/takopi/settings.py index 53009df..36b4b7a 100644 --- a/src/takopi/settings.py +++ b/src/takopi/settings.py @@ -25,6 +25,7 @@ class TelegramTransportSettings(BaseModel): bot_token: SecretStr | None = None chat_id: int | None = None + voice_transcription: bool = False @field_validator("bot_token", mode="before") @classmethod diff --git a/src/takopi/telegram/__init__.py b/src/takopi/telegram/__init__.py index b133e53..5a606b8 100644 --- a/src/takopi/telegram/__init__.py +++ b/src/takopi/telegram/__init__.py @@ -1,10 +1,11 @@ """Telegram-specific clients and adapters.""" from .client import parse_incoming_update, poll_incoming -from .types import TelegramIncomingMessage +from .types import TelegramIncomingMessage, TelegramVoice __all__ = [ "TelegramIncomingMessage", + "TelegramVoice", "parse_incoming_update", "poll_incoming", ] diff --git a/src/takopi/telegram/backend.py b/src/takopi/telegram/backend.py index 718c260..5a52515 100644 --- a/src/takopi/telegram/backend.py +++ b/src/takopi/telegram/backend.py @@ -14,6 +14,7 @@ from .bridge import ( TelegramBridgeConfig, TelegramPresenter, TelegramTransport, + TelegramVoiceTranscriptionConfig, run_main_loop, ) from .client import TelegramClient @@ -43,6 +44,14 @@ def _build_startup_message( ) +def _build_voice_transcription_config( + transport_config: dict[str, object], +) -> TelegramVoiceTranscriptionConfig: + return TelegramVoiceTranscriptionConfig( + enabled=bool(transport_config.get("voice_transcription", False)), + ) + + class TelegramBackend(TransportBackend): id = "telegram" description = "Telegram bot" @@ -87,12 +96,14 @@ class TelegramBackend(TransportBackend): presenter=presenter, final_notify=final_notify, ) + voice_transcription = _build_voice_transcription_config(transport_config) cfg = TelegramBridgeConfig( bot=bot, runtime=runtime, chat_id=chat_id, startup_msg=startup_msg, exec_cfg=exec_cfg, + voice_transcription=voice_transcription, ) anyio.run(run_main_loop, cfg) diff --git a/src/takopi/telegram/bridge.py b/src/takopi/telegram/bridge.py index 055f1cc..d1693de 100644 --- a/src/takopi/telegram/bridge.py +++ b/src/takopi/telegram/bridge.py @@ -1,8 +1,10 @@ from __future__ import annotations +import os import shlex from collections.abc import AsyncIterator, Awaitable, Callable, Sequence from dataclasses import dataclass +from pathlib import Path import anyio @@ -40,10 +42,14 @@ from ..transport_runtime import TransportRuntime from .client import BotClient, poll_incoming from .types import TelegramIncomingMessage from .render import prepare_telegram +from .transcribe import transcribe_audio logger = get_logger(__name__) _MAX_BOT_COMMANDS = 100 +_OPENAI_AUDIO_MAX_BYTES = 25 * 1024 * 1024 +_OPENAI_TRANSCRIPTION_MODEL = "gpt-4o-mini-transcribe" +_OPENAI_TRANSCRIPTION_CHUNKING = "auto" def _is_cancel_command(text: str) -> bool: @@ -191,6 +197,11 @@ class TelegramPresenter: return RenderedMessage(text=text, extra={"entities": entities}) +@dataclass(frozen=True) +class TelegramVoiceTranscriptionConfig: + enabled: bool = False + + def _as_int(value: int | str, *, label: str) -> int: if isinstance(value, bool) or not isinstance(value, int): raise TypeError(f"Telegram {label} must be int") @@ -285,6 +296,7 @@ class TelegramBridgeConfig: chat_id: int startup_msg: str exec_cfg: ExecBridgeConfig + voice_transcription: TelegramVoiceTranscriptionConfig | None = None async def _send_plain( @@ -345,6 +357,125 @@ async def poll_updates( yield msg +def _resolve_openai_api_key( + cfg: TelegramVoiceTranscriptionConfig, +) -> str | None: + env_key = os.environ.get("OPENAI_API_KEY") + if isinstance(env_key, str): + env_key = env_key.strip() + if env_key: + return env_key + return None + + +def _normalize_voice_filename(file_path: str | None, mime_type: str | None) -> str: + name = Path(file_path).name if file_path else "" + if not name: + if mime_type == "audio/ogg": + return "voice.ogg" + return "voice.dat" + if name.endswith(".oga"): + return f"{name[:-4]}.ogg" + return name + + +async def _transcribe_voice( + cfg: TelegramBridgeConfig, + msg: TelegramIncomingMessage, +) -> str | None: + voice = msg.voice + if voice is None: + return msg.text + settings = cfg.voice_transcription + if settings is None or not settings.enabled: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="voice transcription is disabled.", + ) + return None + api_key = _resolve_openai_api_key(settings) + if not api_key: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="voice transcription requires OPENAI_API_KEY.", + ) + return None + if voice.file_size is not None and voice.file_size > _OPENAI_AUDIO_MAX_BYTES: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="voice message is too large to transcribe.", + ) + return None + file_info = await cfg.bot.get_file(voice.file_id) + if not isinstance(file_info, dict): + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="failed to fetch voice file.", + ) + return None + file_path = file_info.get("file_path") + if not isinstance(file_path, str) or not file_path: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="failed to fetch voice file.", + ) + return None + audio_bytes = await cfg.bot.download_file(file_path) + if not audio_bytes: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="failed to download voice message.", + ) + return None + if len(audio_bytes) > _OPENAI_AUDIO_MAX_BYTES: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="voice message is too large to transcribe.", + ) + return None + filename = _normalize_voice_filename(file_path, voice.mime_type) + transcript = await transcribe_audio( + audio_bytes, + filename=filename, + api_key=api_key, + model=_OPENAI_TRANSCRIPTION_MODEL, + chunking_strategy=_OPENAI_TRANSCRIPTION_CHUNKING, + mime_type=voice.mime_type, + ) + if transcript is None: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="voice transcription failed.", + ) + return None + transcript = transcript.strip() + if not transcript: + await _send_plain( + cfg.exec_cfg.transport, + chat_id=msg.chat_id, + user_msg_id=msg.message_id, + text="voice transcription returned empty text.", + ) + return None + return transcript + + async def _handle_cancel( cfg: TelegramBridgeConfig, msg: TelegramIncomingMessage, @@ -702,6 +833,7 @@ class _TelegramCommandExecutor(CommandExecutor): async def _dispatch_command( cfg: TelegramBridgeConfig, msg: TelegramIncomingMessage, + text: str, command_id: str, args_text: str, running_tasks: RunningTasks, @@ -738,7 +870,7 @@ async def _dispatch_command( return ctx = CommandContext( command=command_id, - text=msg.text, + text=text, args_text=args_text, args=_split_command_args(args_text), message=message_ref, @@ -826,6 +958,10 @@ async def run_main_loop( async for msg in poller(cfg): text = msg.text + if msg.voice is not None: + text = await _transcribe_voice(cfg, msg) + if text is None: + continue user_msg_id = msg.message_id chat_id = msg.chat_id reply_id = msg.reply_to_message_id @@ -850,6 +986,7 @@ async def run_main_loop( _dispatch_command, cfg, msg, + text, command_id, args_text, running_tasks, diff --git a/src/takopi/telegram/client.py b/src/takopi/telegram/client.py index b21fe57..137abe1 100644 --- a/src/takopi/telegram/client.py +++ b/src/takopi/telegram/client.py @@ -18,7 +18,7 @@ import httpx import anyio from ..logging import get_logger -from .types import TelegramIncomingMessage +from .types import TelegramIncomingMessage, TelegramVoice logger = get_logger(__name__) @@ -50,8 +50,30 @@ def parse_incoming_update( if not isinstance(msg, dict): return None text = msg.get("text") + voice_payload: TelegramVoice | None = None if not isinstance(text, str): - return None + voice = msg.get("voice") + if not isinstance(voice, dict): + return None + file_id = voice.get("file_id") + if not isinstance(file_id, str) or not file_id: + return None + voice_payload = TelegramVoice( + file_id=file_id, + mime_type=voice.get("mime_type") + if isinstance(voice.get("mime_type"), str) + else None, + file_size=voice.get("file_size") + if isinstance(voice.get("file_size"), int) + and not isinstance(voice.get("file_size"), bool) + else None, + duration=voice.get("duration") + if isinstance(voice.get("duration"), int) + and not isinstance(voice.get("duration"), bool) + else None, + raw=voice, + ) + text = "" chat = msg.get("chat") if not isinstance(chat, dict): return None @@ -87,6 +109,7 @@ def parse_incoming_update( reply_to_message_id=reply_to_message_id, reply_to_text=reply_to_text, sender_id=sender_id, + voice=voice_payload, raw=msg, ) @@ -123,6 +146,10 @@ class BotClient(Protocol): allowed_updates: list[str] | None = None, ) -> list[dict] | None: ... + async def get_file(self, file_id: str) -> dict | None: ... + + async def download_file(self, file_path: str) -> bytes | None: ... + async def send_message( self, chat_id: int, @@ -356,6 +383,7 @@ class TelegramClient: raise ValueError("Provide either token or client, not both.") self._client_override = client self._base = None + self._file_base = None self._http_client = None self._owns_http_client = False else: @@ -363,6 +391,7 @@ class TelegramClient: raise ValueError("Telegram token is empty") self._client_override = None self._base = f"https://api.telegram.org/bot{token}" + self._file_base = f"https://api.telegram.org/file/bot{token}" self._http_client = http_client or httpx.AsyncClient(timeout=timeout_s) self._owns_http_client = http_client is None self._clock = clock @@ -556,6 +585,46 @@ class TelegramClient: except TelegramRetryAfter as exc: await self._sleep(exc.retry_after) + async def get_file(self, file_id: str) -> dict | None: + while True: + try: + if self._client_override is not None: + return await self._client_override.get_file(file_id) + result = await self._post("getFile", {"file_id": file_id}) + return result if isinstance(result, dict) else None + except TelegramRetryAfter as exc: + await self._sleep(exc.retry_after) + + async def download_file(self, file_path: str) -> bytes | None: + if self._client_override is not None: + return await self._client_override.download_file(file_path) + if self._http_client is None or self._file_base is None: + raise RuntimeError("TelegramClient is configured without an HTTP client.") + url = f"{self._file_base}/{file_path}" + try: + resp = await self._http_client.get(url) + except httpx.HTTPError as exc: + request_url = getattr(exc.request, "url", None) + logger.error( + "telegram.file_network_error", + url=str(request_url) if request_url is not None else None, + error=str(exc), + error_type=exc.__class__.__name__, + ) + return None + try: + resp.raise_for_status() + except httpx.HTTPStatusError as exc: + logger.error( + "telegram.file_http_error", + status=resp.status_code, + url=str(resp.request.url), + error=str(exc), + body=resp.text, + ) + return None + return resp.content + async def send_message( self, chat_id: int, diff --git a/src/takopi/telegram/transcribe.py b/src/takopi/telegram/transcribe.py new file mode 100644 index 0000000..e3db960 --- /dev/null +++ b/src/takopi/telegram/transcribe.py @@ -0,0 +1,100 @@ +from __future__ import annotations + +from typing import Any + +import httpx + +from ..logging import get_logger + +logger = get_logger(__name__) + +OPENAI_TRANSCRIBE_URL = "https://api.openai.com/v1/audio/transcriptions" + + +async def transcribe_audio( + audio_bytes: bytes, + *, + filename: str, + api_key: str, + model: str, + language: str | None = None, + prompt: str | None = None, + chunking_strategy: str | None = "auto", + mime_type: str | None = None, + timeout_s: float = 120, + http_client: httpx.AsyncClient | None = None, +) -> str | None: + data: dict[str, Any] = {"model": model} + if language: + data["language"] = language + if prompt: + data["prompt"] = prompt + if chunking_strategy: + data["chunking_strategy"] = chunking_strategy + + files = { + "file": ( + filename, + audio_bytes, + mime_type or "application/octet-stream", + ) + } + + headers = {"Authorization": f"Bearer {api_key}"} + close_client = False + client = http_client + if client is None: + client = httpx.AsyncClient(timeout=timeout_s) + close_client = True + try: + try: + resp = await client.post( + OPENAI_TRANSCRIBE_URL, + data=data, + files=files, + headers=headers, + ) + except httpx.HTTPError as exc: + request_url = getattr(exc.request, "url", None) + logger.error( + "openai.transcribe.network_error", + url=str(request_url) if request_url is not None else None, + error=str(exc), + error_type=exc.__class__.__name__, + ) + return None + try: + resp.raise_for_status() + except httpx.HTTPStatusError as exc: + logger.error( + "openai.transcribe.http_error", + status=resp.status_code, + url=str(resp.request.url), + error=str(exc), + body=resp.text, + ) + return None + try: + payload = resp.json() + except Exception as exc: + logger.error( + "openai.transcribe.bad_response", + status=resp.status_code, + url=str(resp.request.url), + error=str(exc), + error_type=exc.__class__.__name__, + body=resp.text, + ) + return None + finally: + if close_client: + await client.aclose() + + text = payload.get("text") + if not isinstance(text, str): + logger.error( + "openai.transcribe.invalid_payload", + payload=payload, + ) + return None + return text diff --git a/src/takopi/telegram/types.py b/src/takopi/telegram/types.py index 5425702..5a19df3 100644 --- a/src/takopi/telegram/types.py +++ b/src/takopi/telegram/types.py @@ -4,6 +4,15 @@ from dataclasses import dataclass from typing import Any +@dataclass(frozen=True, slots=True) +class TelegramVoice: + file_id: str + mime_type: str | None + file_size: int | None + duration: int | None + raw: dict[str, Any] + + @dataclass(frozen=True, slots=True) class TelegramIncomingMessage: transport: str @@ -13,4 +22,5 @@ class TelegramIncomingMessage: reply_to_message_id: int | None reply_to_text: str | None sender_id: int | None + voice: TelegramVoice | None = None raw: dict[str, Any] | None = None diff --git a/tests/test_exec_runner.py b/tests/test_exec_runner.py index a229e3e..bebe564 100644 --- a/tests/test_exec_runner.py +++ b/tests/test_exec_runner.py @@ -128,6 +128,23 @@ async def test_run_allows_parallel_different_sessions() -> None: assert max_in_flight == 2 +def test_codex_exec_flags_after_exec() -> None: + runner = CodexRunner( + codex_cmd="codex", + extra_args=["-c", "notify=[]", "--skip-git-repo-check"], + ) + state = runner.new_state("hi", None) + args = runner.build_args("hi", None, state=state) + assert args == [ + "-c", + "notify=[]", + "exec", + "--skip-git-repo-check", + "--json", + "-", + ] + + @pytest.mark.anyio async def test_run_serializes_new_session_after_session_is_known( tmp_path, monkeypatch diff --git a/tests/test_telegram_bridge.py b/tests/test_telegram_bridge.py index 629511a..56fb058 100644 --- a/tests/test_telegram_bridge.py +++ b/tests/test_telegram_bridge.py @@ -106,6 +106,14 @@ class _FakeBot: _ = allowed_updates return [] + async def get_file(self, file_id: str) -> dict | None: + _ = file_id + return None + + async def download_file(self, file_path: str) -> bytes | None: + _ = file_path + return None + async def send_message( self, chat_id: int, @@ -386,6 +394,14 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None: ) -> list[dict] | None: return None + async def get_file(self, file_id: str) -> dict | None: + _ = file_id + return None + + async def download_file(self, file_path: str) -> bytes | None: + _ = file_path + return None + async def send_message( self, chat_id: int, diff --git a/tests/test_telegram_incoming.py b/tests/test_telegram_incoming.py index c527792..264d2bf 100644 --- a/tests/test_telegram_incoming.py +++ b/tests/test_telegram_incoming.py @@ -22,6 +22,7 @@ def test_parse_incoming_update_maps_fields() -> None: assert msg.reply_to_message_id == 5 assert msg.reply_to_text == "prev" assert msg.sender_id == 99 + assert msg.voice is None assert msg.raw == update["message"] @@ -38,10 +39,36 @@ def test_parse_incoming_update_filters_non_matching_chat() -> None: assert parse_incoming_update(update, chat_id=999) is None -def test_parse_incoming_update_filters_non_text() -> None: +def test_parse_incoming_update_filters_non_text_and_non_voice() -> None: update = { "update_id": 1, - "message": {"message_id": 10, "chat": {"id": 123}}, + "message": {"message_id": 10, "chat": {"id": 123}, "photo": []}, } assert parse_incoming_update(update, chat_id=123) is None + + +def test_parse_incoming_update_voice_message() -> None: + update = { + "update_id": 1, + "message": { + "message_id": 10, + "chat": {"id": 123}, + "voice": { + "file_id": "voice-id", + "file_unique_id": "uniq", + "duration": 3, + "mime_type": "audio/ogg", + "file_size": 1234, + }, + }, + } + + msg = parse_incoming_update(update, chat_id=123) + assert msg is not None + assert msg.text == "" + assert msg.voice is not None + assert msg.voice.file_id == "voice-id" + assert msg.voice.mime_type == "audio/ogg" + assert msg.voice.file_size == 1234 + assert msg.voice.duration == 3 diff --git a/tests/test_telegram_queue.py b/tests/test_telegram_queue.py index 67fab50..12e8daf 100644 --- a/tests/test_telegram_queue.py +++ b/tests/test_telegram_queue.py @@ -92,6 +92,14 @@ class _FakeBot: self._updates_attempts += 1 return [] + async def get_file(self, file_id: str) -> dict | None: + _ = file_id + return None + + async def download_file(self, file_path: str) -> bytes | None: + _ = file_path + return None + async def close(self) -> None: return None