feat: telegram voice transcription (#74)

This commit is contained in:
banteg
2026-01-09 20:57:04 +04:00
committed by GitHub
parent 8421ec8b4a
commit 780ba72b3a
14 changed files with 440 additions and 7 deletions
+18
View File
@@ -16,6 +16,24 @@ This document captures current behavior so transport changes stay intentional.
4. High-value messages enqueue a send.
5. All writes go through the outbox.
## Incoming messages
`parse_incoming_update` accepts text messages and voice notes.
If voice transcription is enabled, takopi downloads the voice payload from Telegram,
transcribes it with OpenAI, and routes the transcript through the same command and
directive pipeline as typed text.
Configuration (under `[transports.telegram]`):
```toml
voice_transcription = true
```
Set `OPENAI_API_KEY` in the environment. If transcription is enabled but the API key
is missing or the audio download fails, takopi replies with a short error and skips
the run.
## Outbox model
- Single worker processes one op at a time.
+5
View File
@@ -18,6 +18,8 @@ parallel runs across threads, per thread queue support.
`/cancel` a running task.
optional voice note transcription for Telegram (routes transcript like typed text).
## requirements
- `uv` for installation (`curl -LsSf https://astral.sh/uv/install.sh | sh`)
@@ -59,6 +61,9 @@ transport = "telegram"
[transports.telegram]
bot_token = "123456789:ABCdefGHIjklMNOpqrsTUVwxyz"
chat_id = 123456789
voice_transcription = true
# set OPENAI_API_KEY in your environment for voice transcription
[codex]
# optional: profile from ~/.codex/config.toml
+14 -1
View File
@@ -25,6 +25,18 @@ _RECONNECTING_RE = re.compile(
r"^Reconnecting\.{3}\s*(?P<attempt>\d+)/(?P<max>\d+)\s*$",
re.IGNORECASE,
)
_EXEC_ONLY_FLAGS = {"--skip-git-repo-check"}
def _split_exec_flags(extra_args: list[str]) -> tuple[list[str], list[str]]:
base_args: list[str] = []
exec_args: list[str] = []
for arg in extra_args:
if arg in _EXEC_ONLY_FLAGS:
exec_args.append(arg)
else:
base_args.append(arg)
return base_args, exec_args
def _parse_reconnect_message(message: str) -> tuple[int, int] | None:
@@ -397,7 +409,8 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner):
state: Any,
) -> list[str]:
_ = prompt, state
args = [*self.extra_args, "exec", "--json"]
base_args, exec_args = _split_exec_flags(self.extra_args)
args = [*base_args, "exec", *exec_args, "--json"]
if resume:
args.extend(["resume", resume.value, "-"])
else:
+1
View File
@@ -25,6 +25,7 @@ class TelegramTransportSettings(BaseModel):
bot_token: SecretStr | None = None
chat_id: int | None = None
voice_transcription: bool = False
@field_validator("bot_token", mode="before")
@classmethod
+2 -1
View File
@@ -1,10 +1,11 @@
"""Telegram-specific clients and adapters."""
from .client import parse_incoming_update, poll_incoming
from .types import TelegramIncomingMessage
from .types import TelegramIncomingMessage, TelegramVoice
__all__ = [
"TelegramIncomingMessage",
"TelegramVoice",
"parse_incoming_update",
"poll_incoming",
]
+11
View File
@@ -14,6 +14,7 @@ from .bridge import (
TelegramBridgeConfig,
TelegramPresenter,
TelegramTransport,
TelegramVoiceTranscriptionConfig,
run_main_loop,
)
from .client import TelegramClient
@@ -43,6 +44,14 @@ def _build_startup_message(
)
def _build_voice_transcription_config(
transport_config: dict[str, object],
) -> TelegramVoiceTranscriptionConfig:
return TelegramVoiceTranscriptionConfig(
enabled=bool(transport_config.get("voice_transcription", False)),
)
class TelegramBackend(TransportBackend):
id = "telegram"
description = "Telegram bot"
@@ -87,12 +96,14 @@ class TelegramBackend(TransportBackend):
presenter=presenter,
final_notify=final_notify,
)
voice_transcription = _build_voice_transcription_config(transport_config)
cfg = TelegramBridgeConfig(
bot=bot,
runtime=runtime,
chat_id=chat_id,
startup_msg=startup_msg,
exec_cfg=exec_cfg,
voice_transcription=voice_transcription,
)
anyio.run(run_main_loop, cfg)
+138 -1
View File
@@ -1,8 +1,10 @@
from __future__ import annotations
import os
import shlex
from collections.abc import AsyncIterator, Awaitable, Callable, Sequence
from dataclasses import dataclass
from pathlib import Path
import anyio
@@ -40,10 +42,14 @@ from ..transport_runtime import TransportRuntime
from .client import BotClient, poll_incoming
from .types import TelegramIncomingMessage
from .render import prepare_telegram
from .transcribe import transcribe_audio
logger = get_logger(__name__)
_MAX_BOT_COMMANDS = 100
_OPENAI_AUDIO_MAX_BYTES = 25 * 1024 * 1024
_OPENAI_TRANSCRIPTION_MODEL = "gpt-4o-mini-transcribe"
_OPENAI_TRANSCRIPTION_CHUNKING = "auto"
def _is_cancel_command(text: str) -> bool:
@@ -191,6 +197,11 @@ class TelegramPresenter:
return RenderedMessage(text=text, extra={"entities": entities})
@dataclass(frozen=True)
class TelegramVoiceTranscriptionConfig:
enabled: bool = False
def _as_int(value: int | str, *, label: str) -> int:
if isinstance(value, bool) or not isinstance(value, int):
raise TypeError(f"Telegram {label} must be int")
@@ -285,6 +296,7 @@ class TelegramBridgeConfig:
chat_id: int
startup_msg: str
exec_cfg: ExecBridgeConfig
voice_transcription: TelegramVoiceTranscriptionConfig | None = None
async def _send_plain(
@@ -345,6 +357,125 @@ async def poll_updates(
yield msg
def _resolve_openai_api_key(
cfg: TelegramVoiceTranscriptionConfig,
) -> str | None:
env_key = os.environ.get("OPENAI_API_KEY")
if isinstance(env_key, str):
env_key = env_key.strip()
if env_key:
return env_key
return None
def _normalize_voice_filename(file_path: str | None, mime_type: str | None) -> str:
name = Path(file_path).name if file_path else ""
if not name:
if mime_type == "audio/ogg":
return "voice.ogg"
return "voice.dat"
if name.endswith(".oga"):
return f"{name[:-4]}.ogg"
return name
async def _transcribe_voice(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
) -> str | None:
voice = msg.voice
if voice is None:
return msg.text
settings = cfg.voice_transcription
if settings is None or not settings.enabled:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice transcription is disabled.",
)
return None
api_key = _resolve_openai_api_key(settings)
if not api_key:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice transcription requires OPENAI_API_KEY.",
)
return None
if voice.file_size is not None and voice.file_size > _OPENAI_AUDIO_MAX_BYTES:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice message is too large to transcribe.",
)
return None
file_info = await cfg.bot.get_file(voice.file_id)
if not isinstance(file_info, dict):
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="failed to fetch voice file.",
)
return None
file_path = file_info.get("file_path")
if not isinstance(file_path, str) or not file_path:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="failed to fetch voice file.",
)
return None
audio_bytes = await cfg.bot.download_file(file_path)
if not audio_bytes:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="failed to download voice message.",
)
return None
if len(audio_bytes) > _OPENAI_AUDIO_MAX_BYTES:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice message is too large to transcribe.",
)
return None
filename = _normalize_voice_filename(file_path, voice.mime_type)
transcript = await transcribe_audio(
audio_bytes,
filename=filename,
api_key=api_key,
model=_OPENAI_TRANSCRIPTION_MODEL,
chunking_strategy=_OPENAI_TRANSCRIPTION_CHUNKING,
mime_type=voice.mime_type,
)
if transcript is None:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice transcription failed.",
)
return None
transcript = transcript.strip()
if not transcript:
await _send_plain(
cfg.exec_cfg.transport,
chat_id=msg.chat_id,
user_msg_id=msg.message_id,
text="voice transcription returned empty text.",
)
return None
return transcript
async def _handle_cancel(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
@@ -702,6 +833,7 @@ class _TelegramCommandExecutor(CommandExecutor):
async def _dispatch_command(
cfg: TelegramBridgeConfig,
msg: TelegramIncomingMessage,
text: str,
command_id: str,
args_text: str,
running_tasks: RunningTasks,
@@ -738,7 +870,7 @@ async def _dispatch_command(
return
ctx = CommandContext(
command=command_id,
text=msg.text,
text=text,
args_text=args_text,
args=_split_command_args(args_text),
message=message_ref,
@@ -826,6 +958,10 @@ async def run_main_loop(
async for msg in poller(cfg):
text = msg.text
if msg.voice is not None:
text = await _transcribe_voice(cfg, msg)
if text is None:
continue
user_msg_id = msg.message_id
chat_id = msg.chat_id
reply_id = msg.reply_to_message_id
@@ -850,6 +986,7 @@ async def run_main_loop(
_dispatch_command,
cfg,
msg,
text,
command_id,
args_text,
running_tasks,
+71 -2
View File
@@ -18,7 +18,7 @@ import httpx
import anyio
from ..logging import get_logger
from .types import TelegramIncomingMessage
from .types import TelegramIncomingMessage, TelegramVoice
logger = get_logger(__name__)
@@ -50,8 +50,30 @@ def parse_incoming_update(
if not isinstance(msg, dict):
return None
text = msg.get("text")
voice_payload: TelegramVoice | None = None
if not isinstance(text, str):
return None
voice = msg.get("voice")
if not isinstance(voice, dict):
return None
file_id = voice.get("file_id")
if not isinstance(file_id, str) or not file_id:
return None
voice_payload = TelegramVoice(
file_id=file_id,
mime_type=voice.get("mime_type")
if isinstance(voice.get("mime_type"), str)
else None,
file_size=voice.get("file_size")
if isinstance(voice.get("file_size"), int)
and not isinstance(voice.get("file_size"), bool)
else None,
duration=voice.get("duration")
if isinstance(voice.get("duration"), int)
and not isinstance(voice.get("duration"), bool)
else None,
raw=voice,
)
text = ""
chat = msg.get("chat")
if not isinstance(chat, dict):
return None
@@ -87,6 +109,7 @@ def parse_incoming_update(
reply_to_message_id=reply_to_message_id,
reply_to_text=reply_to_text,
sender_id=sender_id,
voice=voice_payload,
raw=msg,
)
@@ -123,6 +146,10 @@ class BotClient(Protocol):
allowed_updates: list[str] | None = None,
) -> list[dict] | None: ...
async def get_file(self, file_id: str) -> dict | None: ...
async def download_file(self, file_path: str) -> bytes | None: ...
async def send_message(
self,
chat_id: int,
@@ -356,6 +383,7 @@ class TelegramClient:
raise ValueError("Provide either token or client, not both.")
self._client_override = client
self._base = None
self._file_base = None
self._http_client = None
self._owns_http_client = False
else:
@@ -363,6 +391,7 @@ class TelegramClient:
raise ValueError("Telegram token is empty")
self._client_override = None
self._base = f"https://api.telegram.org/bot{token}"
self._file_base = f"https://api.telegram.org/file/bot{token}"
self._http_client = http_client or httpx.AsyncClient(timeout=timeout_s)
self._owns_http_client = http_client is None
self._clock = clock
@@ -556,6 +585,46 @@ class TelegramClient:
except TelegramRetryAfter as exc:
await self._sleep(exc.retry_after)
async def get_file(self, file_id: str) -> dict | None:
while True:
try:
if self._client_override is not None:
return await self._client_override.get_file(file_id)
result = await self._post("getFile", {"file_id": file_id})
return result if isinstance(result, dict) else None
except TelegramRetryAfter as exc:
await self._sleep(exc.retry_after)
async def download_file(self, file_path: str) -> bytes | None:
if self._client_override is not None:
return await self._client_override.download_file(file_path)
if self._http_client is None or self._file_base is None:
raise RuntimeError("TelegramClient is configured without an HTTP client.")
url = f"{self._file_base}/{file_path}"
try:
resp = await self._http_client.get(url)
except httpx.HTTPError as exc:
request_url = getattr(exc.request, "url", None)
logger.error(
"telegram.file_network_error",
url=str(request_url) if request_url is not None else None,
error=str(exc),
error_type=exc.__class__.__name__,
)
return None
try:
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
logger.error(
"telegram.file_http_error",
status=resp.status_code,
url=str(resp.request.url),
error=str(exc),
body=resp.text,
)
return None
return resp.content
async def send_message(
self,
chat_id: int,
+100
View File
@@ -0,0 +1,100 @@
from __future__ import annotations
from typing import Any
import httpx
from ..logging import get_logger
logger = get_logger(__name__)
OPENAI_TRANSCRIBE_URL = "https://api.openai.com/v1/audio/transcriptions"
async def transcribe_audio(
audio_bytes: bytes,
*,
filename: str,
api_key: str,
model: str,
language: str | None = None,
prompt: str | None = None,
chunking_strategy: str | None = "auto",
mime_type: str | None = None,
timeout_s: float = 120,
http_client: httpx.AsyncClient | None = None,
) -> str | None:
data: dict[str, Any] = {"model": model}
if language:
data["language"] = language
if prompt:
data["prompt"] = prompt
if chunking_strategy:
data["chunking_strategy"] = chunking_strategy
files = {
"file": (
filename,
audio_bytes,
mime_type or "application/octet-stream",
)
}
headers = {"Authorization": f"Bearer {api_key}"}
close_client = False
client = http_client
if client is None:
client = httpx.AsyncClient(timeout=timeout_s)
close_client = True
try:
try:
resp = await client.post(
OPENAI_TRANSCRIBE_URL,
data=data,
files=files,
headers=headers,
)
except httpx.HTTPError as exc:
request_url = getattr(exc.request, "url", None)
logger.error(
"openai.transcribe.network_error",
url=str(request_url) if request_url is not None else None,
error=str(exc),
error_type=exc.__class__.__name__,
)
return None
try:
resp.raise_for_status()
except httpx.HTTPStatusError as exc:
logger.error(
"openai.transcribe.http_error",
status=resp.status_code,
url=str(resp.request.url),
error=str(exc),
body=resp.text,
)
return None
try:
payload = resp.json()
except Exception as exc:
logger.error(
"openai.transcribe.bad_response",
status=resp.status_code,
url=str(resp.request.url),
error=str(exc),
error_type=exc.__class__.__name__,
body=resp.text,
)
return None
finally:
if close_client:
await client.aclose()
text = payload.get("text")
if not isinstance(text, str):
logger.error(
"openai.transcribe.invalid_payload",
payload=payload,
)
return None
return text
+10
View File
@@ -4,6 +4,15 @@ from dataclasses import dataclass
from typing import Any
@dataclass(frozen=True, slots=True)
class TelegramVoice:
file_id: str
mime_type: str | None
file_size: int | None
duration: int | None
raw: dict[str, Any]
@dataclass(frozen=True, slots=True)
class TelegramIncomingMessage:
transport: str
@@ -13,4 +22,5 @@ class TelegramIncomingMessage:
reply_to_message_id: int | None
reply_to_text: str | None
sender_id: int | None
voice: TelegramVoice | None = None
raw: dict[str, Any] | None = None
+17
View File
@@ -128,6 +128,23 @@ async def test_run_allows_parallel_different_sessions() -> None:
assert max_in_flight == 2
def test_codex_exec_flags_after_exec() -> None:
runner = CodexRunner(
codex_cmd="codex",
extra_args=["-c", "notify=[]", "--skip-git-repo-check"],
)
state = runner.new_state("hi", None)
args = runner.build_args("hi", None, state=state)
assert args == [
"-c",
"notify=[]",
"exec",
"--skip-git-repo-check",
"--json",
"-",
]
@pytest.mark.anyio
async def test_run_serializes_new_session_after_session_is_known(
tmp_path, monkeypatch
+16
View File
@@ -106,6 +106,14 @@ class _FakeBot:
_ = allowed_updates
return []
async def get_file(self, file_id: str) -> dict | None:
_ = file_id
return None
async def download_file(self, file_path: str) -> bytes | None:
_ = file_path
return None
async def send_message(
self,
chat_id: int,
@@ -386,6 +394,14 @@ async def test_telegram_transport_edit_wait_false_returns_ref() -> None:
) -> list[dict] | None:
return None
async def get_file(self, file_id: str) -> dict | None:
_ = file_id
return None
async def download_file(self, file_path: str) -> bytes | None:
_ = file_path
return None
async def send_message(
self,
chat_id: int,
+29 -2
View File
@@ -22,6 +22,7 @@ def test_parse_incoming_update_maps_fields() -> None:
assert msg.reply_to_message_id == 5
assert msg.reply_to_text == "prev"
assert msg.sender_id == 99
assert msg.voice is None
assert msg.raw == update["message"]
@@ -38,10 +39,36 @@ def test_parse_incoming_update_filters_non_matching_chat() -> None:
assert parse_incoming_update(update, chat_id=999) is None
def test_parse_incoming_update_filters_non_text() -> None:
def test_parse_incoming_update_filters_non_text_and_non_voice() -> None:
update = {
"update_id": 1,
"message": {"message_id": 10, "chat": {"id": 123}},
"message": {"message_id": 10, "chat": {"id": 123}, "photo": []},
}
assert parse_incoming_update(update, chat_id=123) is None
def test_parse_incoming_update_voice_message() -> None:
update = {
"update_id": 1,
"message": {
"message_id": 10,
"chat": {"id": 123},
"voice": {
"file_id": "voice-id",
"file_unique_id": "uniq",
"duration": 3,
"mime_type": "audio/ogg",
"file_size": 1234,
},
},
}
msg = parse_incoming_update(update, chat_id=123)
assert msg is not None
assert msg.text == ""
assert msg.voice is not None
assert msg.voice.file_id == "voice-id"
assert msg.voice.mime_type == "audio/ogg"
assert msg.voice.file_size == 1234
assert msg.voice.duration == 3
+8
View File
@@ -92,6 +92,14 @@ class _FakeBot:
self._updates_attempts += 1
return []
async def get_file(self, file_id: str) -> dict | None:
_ = file_id
return None
async def download_file(self, file_path: str) -> bytes | None:
_ = file_path
return None
async def close(self) -> None:
return None