From 9cb2b66fa25731f2255199b11d09292cd66848b6 Mon Sep 17 00:00:00 2001 From: banteg <4562643+banteg@users.noreply.github.com> Date: Sun, 4 Jan 2026 13:54:05 +0400 Subject: [PATCH] feat: migrate to structlog (#46) --- pyproject.toml | 1 + src/takopi/bridge.py | 172 ++++++++++++------- src/takopi/cli.py | 9 +- src/takopi/lockfile.py | 12 +- src/takopi/logging.py | 298 ++++++++++++++++++++++++++++----- src/takopi/onboarding.py | 8 +- src/takopi/runner.py | 159 +++++++++++++++--- src/takopi/runners/claude.py | 16 +- src/takopi/runners/codex.py | 19 +-- src/takopi/runners/opencode.py | 18 +- src/takopi/runners/pi.py | 9 +- src/takopi/telegram.py | 57 ++++--- src/takopi/utils/streams.py | 22 ++- src/takopi/utils/subprocess.py | 19 ++- tests/test_telegram_client.py | 18 +- uv.lock | 11 ++ 16 files changed, 629 insertions(+), 219 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 82033e2..e6d1611 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -13,6 +13,7 @@ dependencies = [ "msgspec>=0.20.0", "questionary>=2.1.1", "rich>=14.2.0", + "structlog>=25.5.0", "sulguk>=0.11.1", "typer>=0.21.0", ] diff --git a/src/takopi/bridge.py b/src/takopi/bridge.py index d96aace..3564bb5 100644 --- a/src/takopi/bridge.py +++ b/src/takopi/bridge.py @@ -2,7 +2,6 @@ from __future__ import annotations -import logging import time from collections.abc import AsyncIterator, Awaitable, Callable from dataclasses import dataclass, field @@ -11,6 +10,7 @@ from typing import Any import anyio from .model import CompletedEvent, EngineId, ResumeToken, StartedEvent, TakopiEvent +from .logging import bind_run_context, clear_context, get_logger from .render import ( ExecProgressRenderer, MarkdownParts, @@ -24,17 +24,17 @@ from .scheduler import ThreadJob, ThreadScheduler from .telegram import BotClient -logger = logging.getLogger(__name__) +logger = get_logger(__name__) def _log_runner_event(evt: TakopiEvent) -> None: for line in render_event_cli(evt): - logger.info("[runner] %s", line) - if isinstance(evt, CompletedEvent): - if evt.ok: - logger.info("[runner] done") - else: - logger.info("[runner] error: %s", evt.error or "error") + logger.debug( + "runner.event.cli", + line=line, + event_type=getattr(evt, "type", None), + engine=getattr(evt, "engine", None), + ) def _is_cancel_command(text: str) -> bool: @@ -101,14 +101,18 @@ async def _set_command_menu(cfg: BridgeConfig) -> None: try: ok = await cfg.bot.set_my_commands(commands) except Exception as exc: - logger.info("[startup] command menu update failed: %s", exc) + logger.info( + "startup.command_menu.failed", + error=str(exc), + error_type=exc.__class__.__name__, + ) return if not ok: - logger.info("[startup] command menu update rejected") + logger.info("startup.command_menu.rejected") return logger.info( - "[startup] command menu updated commands=%s", - ", ".join(cmd["command"] for cmd in commands), + "startup.command_menu.updated", + commands=[cmd["command"] for cmd in commands], ) @@ -168,6 +172,12 @@ async def _send_or_edit_markdown( else: rendered, entities = prepared if edit_message_id is not None: + logger.debug( + "telegram.edit_message", + chat_id=chat_id, + message_id=edit_message_id, + rendered=rendered, + ) edited = await bot.edit_message_text( chat_id=chat_id, message_id=edit_message_id, @@ -177,6 +187,12 @@ async def _send_or_edit_markdown( if edited is not None: return (edited, True) + logger.debug( + "telegram.send_message", + chat_id=chat_id, + reply_to_message_id=reply_to_message_id, + rendered=rendered, + ) return ( await bot.send_message( chat_id=chat_id, @@ -238,11 +254,13 @@ class ProgressEdits: seq_at_render = self.event_seq now = self.clock() parts = self.renderer.render_progress_parts(now - self.started_at) - md = assemble_markdown_parts(parts) rendered, entities = prepare_telegram(parts) if rendered != self.last_rendered: logger.debug( - "[progress] edit message_id=%s md=%s", self.progress_id, md + "telegram.edit_message", + chat_id=self.chat_id, + message_id=self.progress_id, + rendered=rendered, ) self.last_edit_at = now edited = await self.bot.edit_message_text( @@ -289,14 +307,14 @@ class RunningTask: async def _send_startup(cfg: BridgeConfig) -> None: - logger.debug("[startup] message: %s", cfg.startup_msg) + logger.debug("startup.message", text=cfg.startup_msg) sent, _ = await _send_or_edit_markdown( cfg.bot, chat_id=cfg.chat_id, parts=MarkdownParts(header=cfg.startup_msg), ) if sent is not None: - logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id) + logger.info("startup.sent", chat_id=cfg.chat_id) async def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None: @@ -306,12 +324,12 @@ async def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None: offset=offset, timeout_s=0, allowed_updates=["message"] ) if updates is None: - logger.info("[startup] backlog drain failed") + logger.info("startup.backlog.failed") return offset - logger.debug("[startup] backlog updates: %s", updates) + logger.debug("startup.backlog.updates", updates=updates) if not updates: if drained: - logger.info("[startup] drained %s pending update(s)", drained) + logger.info("startup.backlog.drained", count=drained) return offset offset = updates[-1]["update_id"] + 1 drained += len(updates) @@ -338,14 +356,12 @@ async def send_initial_progress( last_rendered: str | None = None initial_parts = renderer.render_progress_parts(0.0, label=label) - initial_md = assemble_markdown_parts(initial_parts) initial_rendered, initial_entities = prepare_telegram(initial_parts) logger.debug( - "[progress] send reply_to=%s md=%s rendered=%s entities=%s", - user_msg_id, - initial_md, - initial_rendered, - initial_entities, + "telegram.send_message", + chat_id=chat_id, + reply_to_message_id=user_msg_id, + rendered=initial_rendered, ) progress_msg = await cfg.bot.send_message( chat_id=chat_id, @@ -358,7 +374,11 @@ async def send_initial_progress( progress_id = int(progress_msg["message_id"]) last_edit_at = clock() last_rendered = initial_rendered - logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id) + logger.debug( + "progress.sent", + chat_id=chat_id, + message_id=progress_id, + ) return ProgressMessageState( message_id=progress_id, @@ -392,6 +412,7 @@ async def run_runner_with_cancel( _log_runner_event(evt) if isinstance(evt, StartedEvent): outcome.resume = evt.resume + bind_run_context(resume=evt.resume.value) if running_task is not None and running_task.resume is None: running_task.resume = evt.resume running_task.resume_ready.set() @@ -448,7 +469,12 @@ async def send_result_message( if final_msg is None: return if progress_id is not None and (edit_message_id is None or not edited): - logger.debug("[%s] delete progress message_id=%s", delete_tag, progress_id) + logger.debug( + "telegram.delete_message", + chat_id=chat_id, + message_id=progress_id, + tag=delete_tag, + ) await cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id) @@ -468,12 +494,12 @@ async def handle_message( sleep: Callable[[float], Awaitable[None]] = anyio.sleep, progress_edit_every: float = PROGRESS_EDIT_EVERY_S, ) -> None: - logger.debug( - "[handle] incoming chat_id=%s message_id=%s resume=%r text=%s", - chat_id, - user_msg_id, - resume_token, - text, + logger.info( + "handle.incoming", + chat_id=chat_id, + user_msg_id=user_msg_id, + resume=resume_token.value if resume_token else None, + text=text, ) started_at = clock() is_resume_line = runner.is_resume_line @@ -539,9 +565,13 @@ async def handle_message( running_task=running_task, on_thread_known=on_thread_known, ) - except Exception as e: - error = e - logger.exception("[handle] runner failed") + except Exception as exc: + error = exc + logger.exception( + "handle.runner_failed", + error=str(exc), + error_type=exc.__class__.__name__, + ) finally: if ( running_task is not None @@ -564,8 +594,9 @@ async def handle_message( elapsed, err_body, status="error" ) logger.debug( - "[error] markdown: %s", - assemble_markdown_parts(final_parts), + "handle.error.markdown", + error=err_body, + markdown=assemble_markdown_parts(final_parts), ) await send_result_message( cfg, @@ -582,9 +613,9 @@ async def handle_message( if outcome.cancelled: resume = sync_resume_token(progress_renderer, outcome.resume) logger.info( - "[handle] cancelled resume=%s elapsed=%.1fs", - resume.value if resume else None, - elapsed, + "handle.cancelled", + resume=resume.value if resume else None, + elapsed_s=elapsed, ) final_parts = progress_renderer.render_progress_parts( elapsed, label="`cancelled`" @@ -618,34 +649,33 @@ async def handle_message( status = ( "error" if run_ok is False else ("done" if final_answer.strip() else "error") ) + resume_value = None + resume_token = completed.resume or outcome.resume + if resume_token is not None: + resume_value = resume_token.value + logger.info( + "runner.completed", + ok=run_ok, + error=run_error, + answer_len=len(final_answer or ""), + elapsed_s=round(elapsed, 2), + action_count=progress_renderer.action_count, + resume=resume_value, + ) sync_resume_token(progress_renderer, completed.resume or outcome.resume) final_parts = progress_renderer.render_final_parts( elapsed, final_answer, status=status ) logger.debug( - "[final] markdown: %s", - assemble_markdown_parts(final_parts), + "handle.final.markdown", + markdown=assemble_markdown_parts(final_parts), + status=status, ) final_rendered, final_entities = prepare_telegram(final_parts) can_edit_final = progress_id is not None edit_message_id = None if cfg.final_notify or not can_edit_final else progress_id - if edit_message_id is None: - logger.debug( - "[final] send reply_to=%s rendered=%s entities=%s", - user_msg_id, - final_rendered, - final_entities, - ) - else: - logger.debug( - "[final] edit message_id=%s rendered=%s entities=%s", - edit_message_id, - final_rendered, - final_entities, - ) - await send_result_message( cfg, chat_id=chat_id, @@ -669,10 +699,10 @@ async def poll_updates(cfg: BridgeConfig) -> AsyncIterator[dict[str, Any]]: offset=offset, timeout_s=50, allowed_updates=["message"] ) if updates is None: - logger.info("[loop] getUpdates failed") + logger.info("loop.get_updates.failed") await anyio.sleep(2) continue - logger.debug("[loop] updates: %s", updates) + logger.debug("loop.updates", updates=updates) for upd in updates: offset = upd["update_id"] + 1 @@ -719,7 +749,11 @@ async def _handle_cancel( ) return - logger.info("[cancel] cancelling progress_message_id=%s", progress_id) + logger.info( + "cancel.requested", + chat_id=chat_id, + progress_message_id=progress_id, + ) running_task.cancel_requested.set() @@ -838,6 +872,12 @@ async def run_main_loop( reason=reason, ) return + bind_run_context( + chat_id=chat_id, + user_msg_id=user_msg_id, + engine=entry.runner.engine, + resume=resume_token.value if resume_token else None, + ) await handle_message( cfg, runner=entry.runner, @@ -850,8 +890,14 @@ async def run_main_loop( on_thread_known=on_thread_known, progress_edit_every=cfg.progress_edit_every, ) - except Exception: - logger.exception("[handle] worker failed") + except Exception as exc: + logger.exception( + "handle.worker_failed", + error=str(exc), + error_type=exc.__class__.__name__, + ) + finally: + clear_context() async def run_thread_job(job: ThreadJob) -> None: await run_job( diff --git a/src/takopi/cli.py b/src/takopi/cli.py index 0269c04..b8fe00e 100644 --- a/src/takopi/cli.py +++ b/src/takopi/cli.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import os import shutil import sys @@ -16,12 +15,12 @@ from .bridge import BridgeConfig, run_main_loop from .config import ConfigError, load_telegram_config from .engines import get_backend, get_engine_config, list_backends from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint -from .logging import setup_logging +from .logging import get_logger, setup_logging from .onboarding import SetupResult, check_setup, interactive_setup from .router import AutoRouter, RunnerEntry from .telegram import TelegramClient -logger = logging.getLogger(__name__) +logger = get_logger(__name__) def _print_version_and_exit() -> None: @@ -172,7 +171,7 @@ def _build_router( ) for warning in warnings: - logger.warning("[setup] %s", warning) + logger.warning("setup.warning", issue=warning) return AutoRouter(entries=entries, default_engine=default_engine) @@ -300,7 +299,7 @@ def _run_auto_router( typer.echo(f"error: {e}", err=True) raise typer.Exit(code=1) except KeyboardInterrupt: - logger.info("[shutdown] interrupted") + logger.info("shutdown.interrupted") raise typer.Exit(code=130) finally: if lock_handle is not None: diff --git a/src/takopi/lockfile.py b/src/takopi/lockfile.py index e801787..049b6ba 100644 --- a/src/takopi/lockfile.py +++ b/src/takopi/lockfile.py @@ -2,12 +2,13 @@ from __future__ import annotations import hashlib import json -import logging import os from dataclasses import dataclass from pathlib import Path -logger = logging.getLogger(__name__) +from .logging import get_logger + +logger = get_logger(__name__) @dataclass(frozen=True) @@ -36,7 +37,12 @@ class LockHandle: try: self.path.unlink(missing_ok=True) except OSError as exc: - logger.warning("[lock] failed to remove lock file %s: %s", self.path, exc) + logger.warning( + "lock.release.failed", + path=str(self.path), + error=str(exc), + error_type=exc.__class__.__name__, + ) def __enter__(self) -> "LockHandle": return self diff --git a/src/takopi/logging.py b/src/takopi/logging.py index 1792461..e72308e 100644 --- a/src/takopi/logging.py +++ b/src/takopi/logging.py @@ -1,62 +1,274 @@ from __future__ import annotations import errno -import logging +import io +import os import re import sys +from contextlib import contextmanager +from contextvars import ContextVar +from typing import Any, TextIO, cast + +import structlog +from structlog.types import Processor TELEGRAM_TOKEN_RE = re.compile(r"bot\d+:[A-Za-z0-9_-]+") TELEGRAM_BARE_TOKEN_RE = re.compile(r"\b\d+:[A-Za-z0-9_-]{10,}\b") +_LEVELS: dict[str, int] = { + "debug": 10, + "info": 20, + "warning": 30, + "error": 40, + "exception": 40, + "critical": 50, +} -class RedactTokenFilter(logging.Filter): - def filter(self, record: logging.LogRecord) -> bool: +_MIN_LEVEL = _LEVELS["info"] +_PIPELINE_LEVEL_NAME = "debug" + +_suppress_below: ContextVar[int | None] = ContextVar( + "takopi_suppress_below", default=None +) +_log_file_handle: TextIO | None = None + + +def _truthy(value: str | None) -> bool: + if value is None: + return False + return value.strip().lower() in {"1", "true", "yes", "on"} + + +def _level_value(value: str | None, *, default: str = "info") -> int: + if not value: + return _LEVELS[default] + level = _LEVELS.get(value.strip().lower()) + return level if level is not None else _LEVELS[default] + + +def pipeline_log_level() -> str: + return _PIPELINE_LEVEL_NAME + + +def log_pipeline(logger: Any, event: str, **fields: Any) -> None: + if _PIPELINE_LEVEL_NAME == "info": + logger.info(event, **fields) + else: + logger.debug(event, **fields) + + +def _drop_below_level( + logger: Any, method_name: str, event_dict: dict[str, Any] +) -> dict[str, Any]: + level_value = _LEVELS.get(method_name, 0) + if level_value < _MIN_LEVEL: + raise structlog.DropEvent + suppress = _suppress_below.get() + if suppress is not None and level_value < suppress: + raise structlog.DropEvent + return event_dict + + +def _redact_text(value: str) -> str: + redacted = TELEGRAM_TOKEN_RE.sub("bot[REDACTED]", value) + return TELEGRAM_BARE_TOKEN_RE.sub("[REDACTED_TOKEN]", redacted) + + +def _redact_value(value: Any, memo: dict[int, Any]) -> Any: + if isinstance(value, str): + return _redact_text(value) + if isinstance(value, (bytes, bytearray)): + return _redact_text(value.decode("utf-8", errors="replace")) + obj_id = id(value) + if obj_id in memo: + return memo[obj_id] + if isinstance(value, dict): + redacted: dict[Any, Any] = {} + memo[obj_id] = redacted + for key, val in value.items(): + redacted[key] = _redact_value(val, memo) + return redacted + if isinstance(value, list): + redacted_list: list[Any] = [] + memo[obj_id] = redacted_list + redacted_list.extend(_redact_value(item, memo) for item in value) + return redacted_list + if isinstance(value, tuple): + redacted_tuple: list[Any] = [] + memo[obj_id] = redacted_tuple + redacted_tuple.extend(_redact_value(item, memo) for item in value) + return tuple(redacted_tuple) + if isinstance(value, set): + redacted_set: set[Any] = set() + memo[obj_id] = redacted_set + redacted_set.update(_redact_value(item, memo) for item in value) + return redacted_set + return value + + +def _redact_event_dict( + logger: Any, method_name: str, event_dict: dict[str, Any] +) -> dict[str, Any]: + _ = logger, method_name + return _redact_value(event_dict, memo={}) + + +def _file_sink( + logger: Any, method_name: str, event_dict: dict[str, Any] +) -> dict[str, Any]: + if _log_file_handle is None: + return event_dict + try: + payload = structlog.processors.JSONRenderer(default=str)( + logger, method_name, dict(event_dict) + ) + if isinstance(payload, bytes): + payload = payload.decode("utf-8", errors="replace") + _log_file_handle.write(payload + "\n") + _log_file_handle.flush() + except Exception: + pass + return event_dict + + +def _add_logger_name( + logger: Any, method_name: str, event_dict: dict[str, Any] +) -> dict[str, Any]: + if "logger" in event_dict: + return event_dict + name = event_dict.pop("logger_name", None) + if isinstance(name, str) and name: + event_dict["logger"] = name + return event_dict + fallback = getattr(logger, "name", None) + if isinstance(fallback, str) and fallback: + event_dict["logger"] = fallback + return event_dict + + +def get_logger(name: str | None = None) -> Any: + if name: + return structlog.get_logger(logger_name=name) + return structlog.get_logger() + + +def bind_run_context(**fields: Any) -> None: + structlog.contextvars.bind_contextvars(**fields) + + +def clear_context() -> None: + structlog.contextvars.clear_contextvars() + + +class SafeWriter(io.TextIOBase): + def __init__(self, stream: Any) -> None: + self._stream = stream + self._closed = False + + def write(self, message: str) -> int: + if self._closed: + return 0 try: - message = record.getMessage() - except (TypeError, ValueError): - return True + return self._stream.write(message) + except (BrokenPipeError, ValueError): + self._close() + return 0 + except OSError as exc: + if exc.errno == errno.EPIPE: + self._close() + return 0 + raise - redacted = TELEGRAM_TOKEN_RE.sub("bot[REDACTED]", message) - redacted = TELEGRAM_BARE_TOKEN_RE.sub("[REDACTED_TOKEN]", redacted) - if redacted != message: - record.msg = redacted - record.args = () - return True - - -class SafeStreamHandler(logging.StreamHandler): - def handleError(self, record: logging.LogRecord) -> None: - exc = sys.exc_info()[1] - if isinstance(exc, BrokenPipeError): - try: - self.stream.close() - except Exception: - pass + def flush(self) -> None: + if self._closed: return - if isinstance(exc, OSError) and exc.errno == errno.EPIPE: - try: - self.stream.close() - except Exception: - pass + try: + self._stream.flush() + except (BrokenPipeError, ValueError): + self._close() + except OSError as exc: + if exc.errno == errno.EPIPE: + self._close() + return + raise + + def isatty(self) -> bool: + isatty = getattr(self._stream, "isatty", None) + return bool(isatty()) if callable(isatty) else False + + def _close(self) -> None: + if self._closed: return - super().handleError(record) + self._closed = True + try: + self._stream.close() + except Exception: + pass def setup_logging(*, debug: bool = False) -> None: - root_logger = logging.getLogger() - root_logger.setLevel(logging.DEBUG if debug else logging.INFO) - logging.getLogger("markdown_it").setLevel(logging.WARNING) - logging.getLogger("httpcore").setLevel(logging.WARNING) - for handler in root_logger.handlers[:]: - root_logger.removeHandler(handler) - handler.close() + global _MIN_LEVEL, _PIPELINE_LEVEL_NAME + global _log_file_handle - fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s") - redactor = RedactTokenFilter() + level_name = os.environ.get("TAKOPI_LOG_LEVEL") + if debug: + level_name = "debug" + _MIN_LEVEL = _level_value(level_name, default="info") - console = SafeStreamHandler(sys.stdout) - console.setLevel(logging.DEBUG if debug else logging.INFO) - console.setFormatter(fmt) - console.addFilter(redactor) - root_logger.addFilter(redactor) - root_logger.addHandler(console) + trace_pipeline = _truthy(os.environ.get("TAKOPI_TRACE_PIPELINE")) + _PIPELINE_LEVEL_NAME = "info" if trace_pipeline else "debug" + + format_value = os.environ.get("TAKOPI_LOG_FORMAT", "console").strip().lower() + color_override = os.environ.get("TAKOPI_LOG_COLOR") + if color_override is None: + is_tty = sys.stdout.isatty() + else: + is_tty = _truthy(color_override) + if format_value == "json": + renderer: Any = structlog.processors.JSONRenderer(default=str) + else: + renderer = structlog.dev.ConsoleRenderer(colors=is_tty) + + safe_stream = cast(TextIO, SafeWriter(sys.stdout)) + log_file = os.environ.get("TAKOPI_LOG_FILE") + if _log_file_handle is not None: + try: + _log_file_handle.close() + except Exception: + pass + _log_file_handle = None + if log_file: + try: + _log_file_handle = open(log_file, "a", encoding="utf-8") + except Exception: + _log_file_handle = None + + processors = cast( + list[Processor], + [ + _drop_below_level, + structlog.contextvars.merge_contextvars, + structlog.processors.TimeStamper(fmt="iso", utc=True), + structlog.processors.add_log_level, + _add_logger_name, + structlog.processors.format_exc_info, + _redact_event_dict, + _file_sink, + cast(Processor, renderer), + ], + ) + + structlog.configure( + processors=processors, + logger_factory=structlog.PrintLoggerFactory(file=safe_stream), + cache_logger_on_first_use=True, + ) + + +@contextmanager +def suppress_logs(level: str = "warning"): + token = _suppress_below.set(_level_value(level, default="warning")) + try: + yield + finally: + _suppress_below.reset(token) diff --git a/src/takopi/onboarding.py b/src/takopi/onboarding.py index 4763aaf..4587745 100644 --- a/src/takopi/onboarding.py +++ b/src/takopi/onboarding.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import shutil from contextlib import contextmanager from dataclasses import dataclass @@ -25,6 +24,7 @@ from .backends import EngineBackend, SetupIssue from .backends_helpers import install_issue from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config from .engines import list_backends +from .logging import suppress_logs from .telegram import TelegramClient @@ -225,12 +225,8 @@ def _render_engine_table(console: Console) -> list[tuple[str, bool, str | None]] @contextmanager def _suppress_logging(): - prev_disable = logging.root.manager.disable - logging.disable(logging.INFO) - try: + with suppress_logs(): yield - finally: - logging.disable(prev_disable) def _confirm(message: str, *, default: bool = True) -> bool | None: diff --git a/src/takopi/runner.py b/src/takopi/runner.py index 2c652ae..32642a8 100644 --- a/src/takopi/runner.py +++ b/src/takopi/runner.py @@ -3,7 +3,6 @@ from __future__ import annotations import json -import logging import re import subprocess from collections.abc import AsyncIterator, Callable @@ -13,6 +12,7 @@ from weakref import WeakValueDictionary import anyio +from .logging import get_logger, log_pipeline from .model import ( Action, ActionEvent, @@ -131,8 +131,8 @@ class JsonlRunState: class JsonlSubprocessRunner(BaseRunner): - def get_logger(self) -> logging.Logger: - return getattr(self, "logger", logging.getLogger(__name__)) + def get_logger(self) -> Any: + return getattr(self, "logger", get_logger(__name__)) def command(self) -> str: raise NotImplementedError @@ -230,15 +230,9 @@ class JsonlSubprocessRunner(BaseRunner): async def iter_json_lines( self, stream: Any, - *, - logger: logging.Logger, - tag: str, ) -> AsyncIterator[bytes]: async for raw_line in iter_bytes_lines(stream): - raw = raw_line.rstrip(b"\n") - text = raw.decode("utf-8", errors="replace") - logger.debug("[%s][jsonl] %s", tag, text) - yield raw + yield raw_line.rstrip(b"\n") def decode_error_events( self, @@ -356,6 +350,13 @@ class JsonlSubprocessRunner(BaseRunner): cmd = [self.command(), *self.build_args(prompt, resume, state=state)] payload = self.stdin_payload(prompt, resume, state=state) env = self.env(state=state) + logger.info( + "runner.start", + engine=self.engine, + resume=resume.value if resume else None, + prompt=prompt, + prompt_len=len(prompt), + ) async with manage_subprocess( cmd, @@ -369,12 +370,23 @@ class JsonlSubprocessRunner(BaseRunner): if payload is not None and proc.stdin is None: raise RuntimeError(self.pipes_error_message()) - logger.debug("[%s] spawn pid=%s args=%r", tag, proc.pid, cmd) + logger.info( + "subprocess.spawn", + cmd=cmd[0] if cmd else None, + args=cmd[1:], + pid=proc.pid, + ) if payload is not None: assert proc.stdin is not None await proc.stdin.send(payload) await proc.stdin.aclose() + logger.info( + "subprocess.stdin.send", + pid=proc.pid, + resume=resume.value if resume else None, + bytes=len(payload), + ) elif proc.stdin is not None: await proc.stdin.aclose() @@ -382,6 +394,8 @@ class JsonlSubprocessRunner(BaseRunner): expected_session: ResumeToken | None = resume found_session: ResumeToken | None = None did_emit_completed = False + ignored_after_completed = False + jsonl_seq = 0 async with anyio.create_task_group() as tg: tg.start_soon( @@ -390,19 +404,34 @@ class JsonlSubprocessRunner(BaseRunner): logger, tag, ) - async for raw_line in self.iter_json_lines( - proc.stdout, logger=logger, tag=tag - ): + async for raw_line in self.iter_json_lines(proc.stdout): if did_emit_completed: + if not ignored_after_completed: + log_pipeline( + logger, + "runner.drop.jsonl_after_completed", + pid=proc.pid, + ) + ignored_after_completed = True continue line = raw_line.strip() if not line: continue + jsonl_seq += 1 + seq = jsonl_seq raw_text = raw_line.decode("utf-8", errors="replace") line_text = line.decode("utf-8", errors="replace") try: decoded = self.decode_jsonl(line=line) except Exception as exc: + log_pipeline( + logger, + "jsonl.parse.error", + pid=proc.pid, + jsonl_seq=seq, + line=line_text, + error=str(exc), + ) events = self.decode_error_events( raw=raw_text, line=line_text, @@ -411,6 +440,19 @@ class JsonlSubprocessRunner(BaseRunner): ) else: if decoded is None: + log_pipeline( + logger, + "jsonl.parse.invalid", + pid=proc.pid, + jsonl_seq=seq, + line=line_text, + ) + logger.info( + "runner.jsonl.invalid", + pid=proc.pid, + jsonl_seq=seq, + line=line_text, + ) events = self.invalid_json_events( raw=raw_text, line=line_text, @@ -425,6 +467,13 @@ class JsonlSubprocessRunner(BaseRunner): found_session=found_session, ) except Exception as exc: + log_pipeline( + logger, + "runner.translate.error", + pid=proc.pid, + jsonl_seq=seq, + error=str(exc), + ) events = self.translate_error_events( data=decoded, error=exc, @@ -433,22 +482,74 @@ class JsonlSubprocessRunner(BaseRunner): for evt in events: if isinstance(evt, StartedEvent): - found_session, emit = self.handle_started_event( - evt, - expected_session=expected_session, - found_session=found_session, + prior_found = found_session + try: + found_session, emit = self.handle_started_event( + evt, + expected_session=expected_session, + found_session=found_session, + ) + except Exception as exc: + log_pipeline( + logger, + "runner.started.error", + pid=proc.pid, + jsonl_seq=seq, + resume=evt.resume.value, + expected_session=expected_session.value + if expected_session + else None, + found_session=prior_found.value + if prior_found + else None, + error=str(exc), + ) + raise + if prior_found is None and emit: + reason = ( + "matched_expected" + if expected_session is not None + else "first_seen" + ) + elif prior_found is not None and not emit: + reason = "duplicate" + else: + reason = "unknown" + log_pipeline( + logger, + "runner.started.seen", + pid=proc.pid, + jsonl_seq=seq, + resume=evt.resume.value, + expected_session=expected_session.value + if expected_session + else None, + found_session=found_session.value + if found_session + else None, + emit=emit, + reason=reason, ) if not emit: continue if isinstance(evt, CompletedEvent): did_emit_completed = True + log_pipeline( + logger, + "runner.completed.seen", + pid=proc.pid, + jsonl_seq=seq, + ok=evt.ok, + has_answer=bool(evt.answer.strip()), + emit=True, + ) yield evt break yield evt rc = await proc.wait() - logger.debug("[%s] process exit pid=%s rc=%s", tag, proc.pid, rc) + logger.info("subprocess.exit", pid=proc.pid, rc=rc) if did_emit_completed: return if rc is not None and rc != 0: @@ -459,6 +560,16 @@ class JsonlSubprocessRunner(BaseRunner): state=state, ) for evt in events: + if isinstance(evt, CompletedEvent): + log_pipeline( + logger, + "runner.completed.seen", + pid=proc.pid, + ok=evt.ok, + has_answer=bool(evt.answer.strip()), + emit=True, + source="process_error", + ) yield evt return @@ -468,6 +579,16 @@ class JsonlSubprocessRunner(BaseRunner): state=state, ) for evt in events: + if isinstance(evt, CompletedEvent): + log_pipeline( + logger, + "runner.completed.seen", + pid=proc.pid, + ok=evt.ok, + has_answer=bool(evt.answer.strip()), + emit=True, + source="stream_end", + ) yield evt diff --git a/src/takopi/runners/claude.py b/src/takopi/runners/claude.py index 5b57453..475b353 100644 --- a/src/takopi/runners/claude.py +++ b/src/takopi/runners/claude.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import os import re from dataclasses import dataclass, field @@ -11,12 +10,13 @@ import msgspec from ..backends import EngineBackend, EngineConfig from ..events import EventFactory +from ..logging import get_logger from ..model import Action, ActionKind, EngineId, ResumeToken, TakopiEvent from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner from ..schemas import claude as claude_schema from ..utils.paths import relativize_command, relativize_path -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ENGINE: EngineId = EngineId("claude") DEFAULT_ALLOWED_TOOLS = ["Bash", "Read", "Edit", "Write"] @@ -399,12 +399,7 @@ class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner): *, state: ClaudeStreamState, ) -> None: - _ = state - logger.info( - "[claude] start run resume=%r", - resume.value if resume else None, - ) - logger.debug("[claude] prompt: %s", prompt) + _ = state, prompt, resume def decode_jsonl( self, @@ -424,7 +419,10 @@ class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner): _ = raw, line, state if isinstance(error, msgspec.DecodeError): self.get_logger().warning( - "[%s] invalid msgspec event: %s", self.tag(), error + "jsonl.msgspec.invalid", + tag=self.tag(), + error=str(error), + error_type=error.__class__.__name__, ) return [] return super().decode_error_events( diff --git a/src/takopi/runners/codex.py b/src/takopi/runners/codex.py index 5f0c825..bc6401f 100644 --- a/src/takopi/runners/codex.py +++ b/src/takopi/runners/codex.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import re from dataclasses import dataclass from pathlib import Path @@ -11,12 +10,13 @@ import msgspec from ..backends import EngineBackend, EngineConfig from ..config import ConfigError from ..events import EventFactory +from ..logging import get_logger from ..model import ActionPhase, EngineId, ResumeToken, TakopiEvent from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner from ..schemas import codex as codex_schema from ..utils.paths import relativize_command -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ENGINE: EngineId = EngineId("codex") @@ -394,9 +394,7 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): *, state: CodexRunState, ) -> None: - _ = state - logger.info("[codex] start run resume=%r", resume.value if resume else None) - logger.debug("[codex] prompt: %s", prompt) + _ = state, prompt, resume def decode_jsonl(self, *, line: bytes) -> codex_schema.ThreadEvent: return codex_schema.decode_event(line) @@ -412,7 +410,10 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): _ = raw, line if isinstance(error, msgspec.DecodeError): self.get_logger().warning( - "[%s] invalid msgspec event: %s", self.tag(), error + "jsonl.msgspec.invalid", + tag=self.tag(), + error=str(error), + error_type=error.__class__.__name__, ) return [] return super().decode_error_events( @@ -485,9 +486,7 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): if state.final_answer is None: state.final_answer = text else: - logger.debug( - "[codex] emitted multiple agent messages; using the last one" - ) + logger.debug("codex.multiple_agent_messages") state.final_answer = text case _: pass @@ -538,7 +537,7 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner): resume=resume_for_completed, ) ] - logger.info("[codex] done run session=%s", found_session.value) + logger.info("codex.session.completed", resume=found_session.value) return [ state.factory.completed_ok( answer=state.final_answer or "", diff --git a/src/takopi/runners/opencode.py b/src/takopi/runners/opencode.py index 7dcea54..d0b7d44 100644 --- a/src/takopi/runners/opencode.py +++ b/src/takopi/runners/opencode.py @@ -13,7 +13,6 @@ Session IDs use the format: ses_XXXX (e.g., ses_494719016ffe85dkDMj0FPRbHK) from __future__ import annotations -import logging import re from dataclasses import dataclass, field from pathlib import Path @@ -23,6 +22,7 @@ import msgspec from ..backends import EngineBackend, EngineConfig from ..config import ConfigError +from ..logging import get_logger from ..model import ( Action, ActionEvent, @@ -37,7 +37,7 @@ from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner from ..schemas import opencode as opencode_schema from ..utils.paths import relativize_command, relativize_path -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ENGINE: EngineId = EngineId("opencode") @@ -350,7 +350,7 @@ class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner): opencode_cmd: str = "opencode" model: str | None = None session_title: str = "opencode" - logger: logging.Logger = logger + logger = logger def format_resume(self, token: ResumeToken) -> str: if token.engine != ENGINE: @@ -397,12 +397,7 @@ class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner): *, state: OpenCodeStreamState, ) -> None: - _ = state - logger.info( - "[opencode] start run resume=%r", - resume.value if resume else None, - ) - logger.debug("[opencode] prompt: %s", prompt) + _ = state, prompt, resume def invalid_json_events( self, @@ -444,7 +439,10 @@ class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner): _ = raw, line, state if isinstance(error, msgspec.DecodeError): self.get_logger().warning( - "[%s] invalid msgspec event: %s", self.tag(), error + "jsonl.msgspec.invalid", + tag=self.tag(), + error=str(error), + error_type=error.__class__.__name__, ) return [] return super().decode_error_events( diff --git a/src/takopi/runners/pi.py b/src/takopi/runners/pi.py index d0d6ec5..85328f6 100644 --- a/src/takopi/runners/pi.py +++ b/src/takopi/runners/pi.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import os import re from dataclasses import dataclass, field @@ -13,6 +12,7 @@ import msgspec from ..backends import EngineBackend, EngineConfig from ..config import ConfigError +from ..logging import get_logger from ..model import ( Action, ActionEvent, @@ -29,7 +29,7 @@ from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner from ..schemas import pi as pi_schema from ..utils.paths import relativize_command, relativize_path -logger = logging.getLogger(__name__) +logger = get_logger(__name__) ENGINE: EngineId = EngineId("pi") @@ -370,7 +370,10 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner): _ = raw, line, state if isinstance(error, msgspec.DecodeError): self.get_logger().warning( - "[%s] invalid msgspec event: %s", self.tag(), error + "jsonl.msgspec.invalid", + tag=self.tag(), + error=str(error), + error_type=error.__class__.__name__, ) return [] return super().decode_error_events( diff --git a/src/takopi/telegram.py b/src/takopi/telegram.py index e01d160..0e46e46 100644 --- a/src/takopi/telegram.py +++ b/src/takopi/telegram.py @@ -1,14 +1,12 @@ from __future__ import annotations -import logging from typing import Any, Protocol import httpx -from .logging import RedactTokenFilter +from .logging import get_logger -logger = logging.getLogger(__name__) -logger.addFilter(RedactTokenFilter()) +logger = get_logger(__name__) class BotClient(Protocol): @@ -71,13 +69,17 @@ class TelegramClient: await self._client.aclose() async def _post(self, method: str, json_data: dict[str, Any]) -> Any | None: - logger.debug("[telegram] request %s: %s", method, json_data) + logger.debug("telegram.request", method=method, payload=json_data) try: resp = await self._client.post(f"{self._base}/{method}", json=json_data) except httpx.HTTPError as e: url = getattr(e.request, "url", None) logger.error( - "[telegram] network error method=%s url=%s: %s", method, url, e + "telegram.network_error", + method=method, + url=str(url) if url is not None else None, + error=str(e), + error_type=e.__class__.__name__, ) return None @@ -86,12 +88,12 @@ class TelegramClient: except httpx.HTTPStatusError as e: body = resp.text logger.error( - "[telegram] http error method=%s status=%s url=%s: %s body=%r", - method, - resp.status_code, - resp.request.url, - e, - body, + "telegram.http_error", + method=method, + status=resp.status_code, + url=str(resp.request.url), + error=str(e), + body=body, ) return None @@ -100,34 +102,35 @@ class TelegramClient: except Exception as e: body = resp.text logger.error( - "[telegram] bad response method=%s status=%s url=%s: %s body=%r", - method, - resp.status_code, - resp.request.url, - e, - body, + "telegram.bad_response", + method=method, + status=resp.status_code, + url=str(resp.request.url), + error=str(e), + error_type=e.__class__.__name__, + body=body, ) return None if not isinstance(payload, dict): logger.error( - "[telegram] invalid response method=%s url=%s: %r", - method, - resp.request.url, - payload, + "telegram.invalid_payload", + method=method, + url=str(resp.request.url), + payload=payload, ) return None if not payload.get("ok"): logger.error( - "[telegram] api error method=%s url=%s: %s", - method, - resp.request.url, - payload, + "telegram.api_error", + method=method, + url=str(resp.request.url), + payload=payload, ) return None - logger.debug("[telegram] response %s: %s", method, payload) + logger.debug("telegram.response", method=method, payload=payload) return payload.get("result") async def get_updates( diff --git a/src/takopi/utils/streams.py b/src/takopi/utils/streams.py index cca5d2e..dd6a3bf 100644 --- a/src/takopi/utils/streams.py +++ b/src/takopi/utils/streams.py @@ -1,13 +1,15 @@ from __future__ import annotations from collections.abc import AsyncIterator -import logging import sys +from typing import Any import anyio from anyio.abc import ByteReceiveStream from anyio.streams.buffered import BufferedByteReceiveStream +from ..logging import log_pipeline + async def iter_bytes_lines(stream: ByteReceiveStream) -> AsyncIterator[bytes]: buffered = BufferedByteReceiveStream(stream) @@ -21,12 +23,22 @@ async def iter_bytes_lines(stream: ByteReceiveStream) -> AsyncIterator[bytes]: async def drain_stderr( stream: ByteReceiveStream, - logger: logging.Logger, + logger: Any, tag: str, ) -> None: try: async for line in iter_bytes_lines(stream): text = line.decode("utf-8", errors="replace") - logger.debug("[%s][stderr] %s", tag, text) - except Exception as e: - logger.debug("[%s][stderr] drain error: %s", tag, e) + log_pipeline( + logger, + "subprocess.stderr", + tag=tag, + line=text, + ) + except Exception as exc: + log_pipeline( + logger, + "subprocess.stderr.error", + tag=tag, + error=str(exc), + ) diff --git a/src/takopi/utils/subprocess.py b/src/takopi/utils/subprocess.py index 929331d..f1a4e14 100644 --- a/src/takopi/utils/subprocess.py +++ b/src/takopi/utils/subprocess.py @@ -1,6 +1,5 @@ from __future__ import annotations -import logging import os import signal from collections.abc import AsyncIterator, Sequence @@ -10,7 +9,9 @@ from typing import Any import anyio from anyio.abc import Process -logger = logging.getLogger(__name__) +from ..logging import get_logger + +logger = get_logger(__name__) async def wait_for_process(proc: Process, timeout: float) -> bool: @@ -29,7 +30,12 @@ def terminate_process(proc: Process) -> None: except ProcessLookupError: return except Exception as e: - logger.debug("[subprocess] failed to terminate process group: %s", e) + logger.debug( + "subprocess.terminate.failed", + error=str(e), + error_type=e.__class__.__name__, + pid=proc.pid, + ) try: proc.terminate() except ProcessLookupError: @@ -46,7 +52,12 @@ def kill_process(proc: Process) -> None: except ProcessLookupError: return except Exception as e: - logger.debug("[subprocess] failed to kill process group: %s", e) + logger.debug( + "subprocess.kill.failed", + error=str(e), + error_type=e.__class__.__name__, + pid=proc.pid, + ) try: proc.kill() except ProcessLookupError: diff --git a/tests/test_telegram_client.py b/tests/test_telegram_client.py index 161cffe..1686172 100644 --- a/tests/test_telegram_client.py +++ b/tests/test_telegram_client.py @@ -1,9 +1,7 @@ -import logging - import httpx import pytest -from takopi.logging import RedactTokenFilter +from takopi.logging import setup_logging from takopi.telegram import TelegramClient @@ -38,19 +36,16 @@ async def test_telegram_429_no_retry() -> None: @pytest.mark.anyio async def test_no_token_in_logs_on_http_error( - caplog: pytest.LogCaptureFixture, + capsys: pytest.CaptureFixture[str], ) -> None: token = "123:abcDEF_ghij" - redactor = RedactTokenFilter() - root_logger = logging.getLogger() - root_logger.addFilter(redactor) + setup_logging(debug=True) def handler(request: httpx.Request) -> httpx.Response: return httpx.Response(500, text="oops", request=request) transport = httpx.MockTransport(handler) - caplog.set_level(logging.ERROR) client = httpx.AsyncClient(transport=transport) try: tg = TelegramClient(token, client=client) @@ -58,7 +53,6 @@ async def test_no_token_in_logs_on_http_error( finally: await client.aclose() - root_logger.removeFilter(redactor) - - assert token not in caplog.text - assert "bot[REDACTED]" in caplog.text + out = capsys.readouterr().out + assert token not in out + assert "bot[REDACTED]" in out diff --git a/uv.lock b/uv.lock index 6487c72..64fae8e 100644 --- a/uv.lock +++ b/uv.lock @@ -387,6 +387,15 @@ wheels = [ { url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" }, ] +[[package]] +name = "structlog" +version = "25.5.0" +source = { registry = "https://pypi.org/simple" } +sdist = { url = "https://files.pythonhosted.org/packages/ef/52/9ba0f43b686e7f3ddfeaa78ac3af750292662284b3661e91ad5494f21dbc/structlog-25.5.0.tar.gz", hash = "sha256:098522a3bebed9153d4570c6d0288abf80a031dfdb2048d59a49e9dc2190fc98", size = 1460830, upload-time = "2025-10-27T08:28:23.028Z" } +wheels = [ + { url = "https://files.pythonhosted.org/packages/a8/45/a132b9074aa18e799b891b91ad72133c98d8042c70f6240e4c5f9dabee2f/structlog-25.5.0-py3-none-any.whl", hash = "sha256:a8453e9b9e636ec59bd9e79bbd4a72f025981b3ba0f5837aebf48f02f37a7f9f", size = 72510, upload-time = "2025-10-27T08:28:21.535Z" }, +] + [[package]] name = "sulguk" version = "0.11.1" @@ -411,6 +420,7 @@ dependencies = [ { name = "msgspec" }, { name = "questionary" }, { name = "rich" }, + { name = "structlog" }, { name = "sulguk" }, { name = "typer" }, ] @@ -432,6 +442,7 @@ requires-dist = [ { name = "msgspec", specifier = ">=0.20.0" }, { name = "questionary", specifier = ">=2.1.1" }, { name = "rich", specifier = ">=14.2.0" }, + { name = "structlog", specifier = ">=25.5.0" }, { name = "sulguk", specifier = ">=0.11.1" }, { name = "typer", specifier = ">=0.21.0" }, ]