feat: migrate to structlog (#46)

This commit is contained in:
banteg
2026-01-04 13:54:05 +04:00
committed by GitHub
parent 92302a6fe6
commit 9cb2b66fa2
16 changed files with 629 additions and 219 deletions
+1
View File
@@ -13,6 +13,7 @@ dependencies = [
"msgspec>=0.20.0",
"questionary>=2.1.1",
"rich>=14.2.0",
"structlog>=25.5.0",
"sulguk>=0.11.1",
"typer>=0.21.0",
]
+109 -63
View File
@@ -2,7 +2,6 @@
from __future__ import annotations
import logging
import time
from collections.abc import AsyncIterator, Awaitable, Callable
from dataclasses import dataclass, field
@@ -11,6 +10,7 @@ from typing import Any
import anyio
from .model import CompletedEvent, EngineId, ResumeToken, StartedEvent, TakopiEvent
from .logging import bind_run_context, clear_context, get_logger
from .render import (
ExecProgressRenderer,
MarkdownParts,
@@ -24,17 +24,17 @@ from .scheduler import ThreadJob, ThreadScheduler
from .telegram import BotClient
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
def _log_runner_event(evt: TakopiEvent) -> None:
for line in render_event_cli(evt):
logger.info("[runner] %s", line)
if isinstance(evt, CompletedEvent):
if evt.ok:
logger.info("[runner] done")
else:
logger.info("[runner] error: %s", evt.error or "error")
logger.debug(
"runner.event.cli",
line=line,
event_type=getattr(evt, "type", None),
engine=getattr(evt, "engine", None),
)
def _is_cancel_command(text: str) -> bool:
@@ -101,14 +101,18 @@ async def _set_command_menu(cfg: BridgeConfig) -> None:
try:
ok = await cfg.bot.set_my_commands(commands)
except Exception as exc:
logger.info("[startup] command menu update failed: %s", exc)
logger.info(
"startup.command_menu.failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
return
if not ok:
logger.info("[startup] command menu update rejected")
logger.info("startup.command_menu.rejected")
return
logger.info(
"[startup] command menu updated commands=%s",
", ".join(cmd["command"] for cmd in commands),
"startup.command_menu.updated",
commands=[cmd["command"] for cmd in commands],
)
@@ -168,6 +172,12 @@ async def _send_or_edit_markdown(
else:
rendered, entities = prepared
if edit_message_id is not None:
logger.debug(
"telegram.edit_message",
chat_id=chat_id,
message_id=edit_message_id,
rendered=rendered,
)
edited = await bot.edit_message_text(
chat_id=chat_id,
message_id=edit_message_id,
@@ -177,6 +187,12 @@ async def _send_or_edit_markdown(
if edited is not None:
return (edited, True)
logger.debug(
"telegram.send_message",
chat_id=chat_id,
reply_to_message_id=reply_to_message_id,
rendered=rendered,
)
return (
await bot.send_message(
chat_id=chat_id,
@@ -238,11 +254,13 @@ class ProgressEdits:
seq_at_render = self.event_seq
now = self.clock()
parts = self.renderer.render_progress_parts(now - self.started_at)
md = assemble_markdown_parts(parts)
rendered, entities = prepare_telegram(parts)
if rendered != self.last_rendered:
logger.debug(
"[progress] edit message_id=%s md=%s", self.progress_id, md
"telegram.edit_message",
chat_id=self.chat_id,
message_id=self.progress_id,
rendered=rendered,
)
self.last_edit_at = now
edited = await self.bot.edit_message_text(
@@ -289,14 +307,14 @@ class RunningTask:
async def _send_startup(cfg: BridgeConfig) -> None:
logger.debug("[startup] message: %s", cfg.startup_msg)
logger.debug("startup.message", text=cfg.startup_msg)
sent, _ = await _send_or_edit_markdown(
cfg.bot,
chat_id=cfg.chat_id,
parts=MarkdownParts(header=cfg.startup_msg),
)
if sent is not None:
logger.info("[startup] sent startup message to chat_id=%s", cfg.chat_id)
logger.info("startup.sent", chat_id=cfg.chat_id)
async def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None:
@@ -306,12 +324,12 @@ async def _drain_backlog(cfg: BridgeConfig, offset: int | None) -> int | None:
offset=offset, timeout_s=0, allowed_updates=["message"]
)
if updates is None:
logger.info("[startup] backlog drain failed")
logger.info("startup.backlog.failed")
return offset
logger.debug("[startup] backlog updates: %s", updates)
logger.debug("startup.backlog.updates", updates=updates)
if not updates:
if drained:
logger.info("[startup] drained %s pending update(s)", drained)
logger.info("startup.backlog.drained", count=drained)
return offset
offset = updates[-1]["update_id"] + 1
drained += len(updates)
@@ -338,14 +356,12 @@ async def send_initial_progress(
last_rendered: str | None = None
initial_parts = renderer.render_progress_parts(0.0, label=label)
initial_md = assemble_markdown_parts(initial_parts)
initial_rendered, initial_entities = prepare_telegram(initial_parts)
logger.debug(
"[progress] send reply_to=%s md=%s rendered=%s entities=%s",
user_msg_id,
initial_md,
initial_rendered,
initial_entities,
"telegram.send_message",
chat_id=chat_id,
reply_to_message_id=user_msg_id,
rendered=initial_rendered,
)
progress_msg = await cfg.bot.send_message(
chat_id=chat_id,
@@ -358,7 +374,11 @@ async def send_initial_progress(
progress_id = int(progress_msg["message_id"])
last_edit_at = clock()
last_rendered = initial_rendered
logger.debug("[progress] sent chat_id=%s message_id=%s", chat_id, progress_id)
logger.debug(
"progress.sent",
chat_id=chat_id,
message_id=progress_id,
)
return ProgressMessageState(
message_id=progress_id,
@@ -392,6 +412,7 @@ async def run_runner_with_cancel(
_log_runner_event(evt)
if isinstance(evt, StartedEvent):
outcome.resume = evt.resume
bind_run_context(resume=evt.resume.value)
if running_task is not None and running_task.resume is None:
running_task.resume = evt.resume
running_task.resume_ready.set()
@@ -448,7 +469,12 @@ async def send_result_message(
if final_msg is None:
return
if progress_id is not None and (edit_message_id is None or not edited):
logger.debug("[%s] delete progress message_id=%s", delete_tag, progress_id)
logger.debug(
"telegram.delete_message",
chat_id=chat_id,
message_id=progress_id,
tag=delete_tag,
)
await cfg.bot.delete_message(chat_id=chat_id, message_id=progress_id)
@@ -468,12 +494,12 @@ async def handle_message(
sleep: Callable[[float], Awaitable[None]] = anyio.sleep,
progress_edit_every: float = PROGRESS_EDIT_EVERY_S,
) -> None:
logger.debug(
"[handle] incoming chat_id=%s message_id=%s resume=%r text=%s",
chat_id,
user_msg_id,
resume_token,
text,
logger.info(
"handle.incoming",
chat_id=chat_id,
user_msg_id=user_msg_id,
resume=resume_token.value if resume_token else None,
text=text,
)
started_at = clock()
is_resume_line = runner.is_resume_line
@@ -539,9 +565,13 @@ async def handle_message(
running_task=running_task,
on_thread_known=on_thread_known,
)
except Exception as e:
error = e
logger.exception("[handle] runner failed")
except Exception as exc:
error = exc
logger.exception(
"handle.runner_failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
finally:
if (
running_task is not None
@@ -564,8 +594,9 @@ async def handle_message(
elapsed, err_body, status="error"
)
logger.debug(
"[error] markdown: %s",
assemble_markdown_parts(final_parts),
"handle.error.markdown",
error=err_body,
markdown=assemble_markdown_parts(final_parts),
)
await send_result_message(
cfg,
@@ -582,9 +613,9 @@ async def handle_message(
if outcome.cancelled:
resume = sync_resume_token(progress_renderer, outcome.resume)
logger.info(
"[handle] cancelled resume=%s elapsed=%.1fs",
resume.value if resume else None,
elapsed,
"handle.cancelled",
resume=resume.value if resume else None,
elapsed_s=elapsed,
)
final_parts = progress_renderer.render_progress_parts(
elapsed, label="`cancelled`"
@@ -618,34 +649,33 @@ async def handle_message(
status = (
"error" if run_ok is False else ("done" if final_answer.strip() else "error")
)
resume_value = None
resume_token = completed.resume or outcome.resume
if resume_token is not None:
resume_value = resume_token.value
logger.info(
"runner.completed",
ok=run_ok,
error=run_error,
answer_len=len(final_answer or ""),
elapsed_s=round(elapsed, 2),
action_count=progress_renderer.action_count,
resume=resume_value,
)
sync_resume_token(progress_renderer, completed.resume or outcome.resume)
final_parts = progress_renderer.render_final_parts(
elapsed, final_answer, status=status
)
logger.debug(
"[final] markdown: %s",
assemble_markdown_parts(final_parts),
"handle.final.markdown",
markdown=assemble_markdown_parts(final_parts),
status=status,
)
final_rendered, final_entities = prepare_telegram(final_parts)
can_edit_final = progress_id is not None
edit_message_id = None if cfg.final_notify or not can_edit_final else progress_id
if edit_message_id is None:
logger.debug(
"[final] send reply_to=%s rendered=%s entities=%s",
user_msg_id,
final_rendered,
final_entities,
)
else:
logger.debug(
"[final] edit message_id=%s rendered=%s entities=%s",
edit_message_id,
final_rendered,
final_entities,
)
await send_result_message(
cfg,
chat_id=chat_id,
@@ -669,10 +699,10 @@ async def poll_updates(cfg: BridgeConfig) -> AsyncIterator[dict[str, Any]]:
offset=offset, timeout_s=50, allowed_updates=["message"]
)
if updates is None:
logger.info("[loop] getUpdates failed")
logger.info("loop.get_updates.failed")
await anyio.sleep(2)
continue
logger.debug("[loop] updates: %s", updates)
logger.debug("loop.updates", updates=updates)
for upd in updates:
offset = upd["update_id"] + 1
@@ -719,7 +749,11 @@ async def _handle_cancel(
)
return
logger.info("[cancel] cancelling progress_message_id=%s", progress_id)
logger.info(
"cancel.requested",
chat_id=chat_id,
progress_message_id=progress_id,
)
running_task.cancel_requested.set()
@@ -838,6 +872,12 @@ async def run_main_loop(
reason=reason,
)
return
bind_run_context(
chat_id=chat_id,
user_msg_id=user_msg_id,
engine=entry.runner.engine,
resume=resume_token.value if resume_token else None,
)
await handle_message(
cfg,
runner=entry.runner,
@@ -850,8 +890,14 @@ async def run_main_loop(
on_thread_known=on_thread_known,
progress_edit_every=cfg.progress_edit_every,
)
except Exception:
logger.exception("[handle] worker failed")
except Exception as exc:
logger.exception(
"handle.worker_failed",
error=str(exc),
error_type=exc.__class__.__name__,
)
finally:
clear_context()
async def run_thread_job(job: ThreadJob) -> None:
await run_job(
+4 -5
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import logging
import os
import shutil
import sys
@@ -16,12 +15,12 @@ from .bridge import BridgeConfig, run_main_loop
from .config import ConfigError, load_telegram_config
from .engines import get_backend, get_engine_config, list_backends
from .lockfile import LockError, LockHandle, acquire_lock, token_fingerprint
from .logging import setup_logging
from .logging import get_logger, setup_logging
from .onboarding import SetupResult, check_setup, interactive_setup
from .router import AutoRouter, RunnerEntry
from .telegram import TelegramClient
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
def _print_version_and_exit() -> None:
@@ -172,7 +171,7 @@ def _build_router(
)
for warning in warnings:
logger.warning("[setup] %s", warning)
logger.warning("setup.warning", issue=warning)
return AutoRouter(entries=entries, default_engine=default_engine)
@@ -300,7 +299,7 @@ def _run_auto_router(
typer.echo(f"error: {e}", err=True)
raise typer.Exit(code=1)
except KeyboardInterrupt:
logger.info("[shutdown] interrupted")
logger.info("shutdown.interrupted")
raise typer.Exit(code=130)
finally:
if lock_handle is not None:
+9 -3
View File
@@ -2,12 +2,13 @@ from __future__ import annotations
import hashlib
import json
import logging
import os
from dataclasses import dataclass
from pathlib import Path
logger = logging.getLogger(__name__)
from .logging import get_logger
logger = get_logger(__name__)
@dataclass(frozen=True)
@@ -36,7 +37,12 @@ class LockHandle:
try:
self.path.unlink(missing_ok=True)
except OSError as exc:
logger.warning("[lock] failed to remove lock file %s: %s", self.path, exc)
logger.warning(
"lock.release.failed",
path=str(self.path),
error=str(exc),
error_type=exc.__class__.__name__,
)
def __enter__(self) -> "LockHandle":
return self
+253 -41
View File
@@ -1,62 +1,274 @@
from __future__ import annotations
import errno
import logging
import io
import os
import re
import sys
from contextlib import contextmanager
from contextvars import ContextVar
from typing import Any, TextIO, cast
import structlog
from structlog.types import Processor
TELEGRAM_TOKEN_RE = re.compile(r"bot\d+:[A-Za-z0-9_-]+")
TELEGRAM_BARE_TOKEN_RE = re.compile(r"\b\d+:[A-Za-z0-9_-]{10,}\b")
_LEVELS: dict[str, int] = {
"debug": 10,
"info": 20,
"warning": 30,
"error": 40,
"exception": 40,
"critical": 50,
}
class RedactTokenFilter(logging.Filter):
def filter(self, record: logging.LogRecord) -> bool:
_MIN_LEVEL = _LEVELS["info"]
_PIPELINE_LEVEL_NAME = "debug"
_suppress_below: ContextVar[int | None] = ContextVar(
"takopi_suppress_below", default=None
)
_log_file_handle: TextIO | None = None
def _truthy(value: str | None) -> bool:
if value is None:
return False
return value.strip().lower() in {"1", "true", "yes", "on"}
def _level_value(value: str | None, *, default: str = "info") -> int:
if not value:
return _LEVELS[default]
level = _LEVELS.get(value.strip().lower())
return level if level is not None else _LEVELS[default]
def pipeline_log_level() -> str:
return _PIPELINE_LEVEL_NAME
def log_pipeline(logger: Any, event: str, **fields: Any) -> None:
if _PIPELINE_LEVEL_NAME == "info":
logger.info(event, **fields)
else:
logger.debug(event, **fields)
def _drop_below_level(
logger: Any, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
level_value = _LEVELS.get(method_name, 0)
if level_value < _MIN_LEVEL:
raise structlog.DropEvent
suppress = _suppress_below.get()
if suppress is not None and level_value < suppress:
raise structlog.DropEvent
return event_dict
def _redact_text(value: str) -> str:
redacted = TELEGRAM_TOKEN_RE.sub("bot[REDACTED]", value)
return TELEGRAM_BARE_TOKEN_RE.sub("[REDACTED_TOKEN]", redacted)
def _redact_value(value: Any, memo: dict[int, Any]) -> Any:
if isinstance(value, str):
return _redact_text(value)
if isinstance(value, (bytes, bytearray)):
return _redact_text(value.decode("utf-8", errors="replace"))
obj_id = id(value)
if obj_id in memo:
return memo[obj_id]
if isinstance(value, dict):
redacted: dict[Any, Any] = {}
memo[obj_id] = redacted
for key, val in value.items():
redacted[key] = _redact_value(val, memo)
return redacted
if isinstance(value, list):
redacted_list: list[Any] = []
memo[obj_id] = redacted_list
redacted_list.extend(_redact_value(item, memo) for item in value)
return redacted_list
if isinstance(value, tuple):
redacted_tuple: list[Any] = []
memo[obj_id] = redacted_tuple
redacted_tuple.extend(_redact_value(item, memo) for item in value)
return tuple(redacted_tuple)
if isinstance(value, set):
redacted_set: set[Any] = set()
memo[obj_id] = redacted_set
redacted_set.update(_redact_value(item, memo) for item in value)
return redacted_set
return value
def _redact_event_dict(
logger: Any, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
_ = logger, method_name
return _redact_value(event_dict, memo={})
def _file_sink(
logger: Any, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
if _log_file_handle is None:
return event_dict
try:
message = record.getMessage()
except (TypeError, ValueError):
return True
redacted = TELEGRAM_TOKEN_RE.sub("bot[REDACTED]", message)
redacted = TELEGRAM_BARE_TOKEN_RE.sub("[REDACTED_TOKEN]", redacted)
if redacted != message:
record.msg = redacted
record.args = ()
return True
class SafeStreamHandler(logging.StreamHandler):
def handleError(self, record: logging.LogRecord) -> None:
exc = sys.exc_info()[1]
if isinstance(exc, BrokenPipeError):
try:
self.stream.close()
payload = structlog.processors.JSONRenderer(default=str)(
logger, method_name, dict(event_dict)
)
if isinstance(payload, bytes):
payload = payload.decode("utf-8", errors="replace")
_log_file_handle.write(payload + "\n")
_log_file_handle.flush()
except Exception:
pass
return
if isinstance(exc, OSError) and exc.errno == errno.EPIPE:
return event_dict
def _add_logger_name(
logger: Any, method_name: str, event_dict: dict[str, Any]
) -> dict[str, Any]:
if "logger" in event_dict:
return event_dict
name = event_dict.pop("logger_name", None)
if isinstance(name, str) and name:
event_dict["logger"] = name
return event_dict
fallback = getattr(logger, "name", None)
if isinstance(fallback, str) and fallback:
event_dict["logger"] = fallback
return event_dict
def get_logger(name: str | None = None) -> Any:
if name:
return structlog.get_logger(logger_name=name)
return structlog.get_logger()
def bind_run_context(**fields: Any) -> None:
structlog.contextvars.bind_contextvars(**fields)
def clear_context() -> None:
structlog.contextvars.clear_contextvars()
class SafeWriter(io.TextIOBase):
def __init__(self, stream: Any) -> None:
self._stream = stream
self._closed = False
def write(self, message: str) -> int:
if self._closed:
return 0
try:
self.stream.close()
return self._stream.write(message)
except (BrokenPipeError, ValueError):
self._close()
return 0
except OSError as exc:
if exc.errno == errno.EPIPE:
self._close()
return 0
raise
def flush(self) -> None:
if self._closed:
return
try:
self._stream.flush()
except (BrokenPipeError, ValueError):
self._close()
except OSError as exc:
if exc.errno == errno.EPIPE:
self._close()
return
raise
def isatty(self) -> bool:
isatty = getattr(self._stream, "isatty", None)
return bool(isatty()) if callable(isatty) else False
def _close(self) -> None:
if self._closed:
return
self._closed = True
try:
self._stream.close()
except Exception:
pass
return
super().handleError(record)
def setup_logging(*, debug: bool = False) -> None:
root_logger = logging.getLogger()
root_logger.setLevel(logging.DEBUG if debug else logging.INFO)
logging.getLogger("markdown_it").setLevel(logging.WARNING)
logging.getLogger("httpcore").setLevel(logging.WARNING)
for handler in root_logger.handlers[:]:
root_logger.removeHandler(handler)
handler.close()
global _MIN_LEVEL, _PIPELINE_LEVEL_NAME
global _log_file_handle
fmt = logging.Formatter("%(asctime)s %(levelname)s %(name)s: %(message)s")
redactor = RedactTokenFilter()
level_name = os.environ.get("TAKOPI_LOG_LEVEL")
if debug:
level_name = "debug"
_MIN_LEVEL = _level_value(level_name, default="info")
console = SafeStreamHandler(sys.stdout)
console.setLevel(logging.DEBUG if debug else logging.INFO)
console.setFormatter(fmt)
console.addFilter(redactor)
root_logger.addFilter(redactor)
root_logger.addHandler(console)
trace_pipeline = _truthy(os.environ.get("TAKOPI_TRACE_PIPELINE"))
_PIPELINE_LEVEL_NAME = "info" if trace_pipeline else "debug"
format_value = os.environ.get("TAKOPI_LOG_FORMAT", "console").strip().lower()
color_override = os.environ.get("TAKOPI_LOG_COLOR")
if color_override is None:
is_tty = sys.stdout.isatty()
else:
is_tty = _truthy(color_override)
if format_value == "json":
renderer: Any = structlog.processors.JSONRenderer(default=str)
else:
renderer = structlog.dev.ConsoleRenderer(colors=is_tty)
safe_stream = cast(TextIO, SafeWriter(sys.stdout))
log_file = os.environ.get("TAKOPI_LOG_FILE")
if _log_file_handle is not None:
try:
_log_file_handle.close()
except Exception:
pass
_log_file_handle = None
if log_file:
try:
_log_file_handle = open(log_file, "a", encoding="utf-8")
except Exception:
_log_file_handle = None
processors = cast(
list[Processor],
[
_drop_below_level,
structlog.contextvars.merge_contextvars,
structlog.processors.TimeStamper(fmt="iso", utc=True),
structlog.processors.add_log_level,
_add_logger_name,
structlog.processors.format_exc_info,
_redact_event_dict,
_file_sink,
cast(Processor, renderer),
],
)
structlog.configure(
processors=processors,
logger_factory=structlog.PrintLoggerFactory(file=safe_stream),
cache_logger_on_first_use=True,
)
@contextmanager
def suppress_logs(level: str = "warning"):
token = _suppress_below.set(_level_value(level, default="warning"))
try:
yield
finally:
_suppress_below.reset(token)
+2 -6
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import logging
import shutil
from contextlib import contextmanager
from dataclasses import dataclass
@@ -25,6 +24,7 @@ from .backends import EngineBackend, SetupIssue
from .backends_helpers import install_issue
from .config import ConfigError, HOME_CONFIG_PATH, load_telegram_config
from .engines import list_backends
from .logging import suppress_logs
from .telegram import TelegramClient
@@ -225,12 +225,8 @@ def _render_engine_table(console: Console) -> list[tuple[str, bool, str | None]]
@contextmanager
def _suppress_logging():
prev_disable = logging.root.manager.disable
logging.disable(logging.INFO)
try:
with suppress_logs():
yield
finally:
logging.disable(prev_disable)
def _confirm(message: str, *, default: bool = True) -> bool | None:
+136 -15
View File
@@ -3,7 +3,6 @@
from __future__ import annotations
import json
import logging
import re
import subprocess
from collections.abc import AsyncIterator, Callable
@@ -13,6 +12,7 @@ from weakref import WeakValueDictionary
import anyio
from .logging import get_logger, log_pipeline
from .model import (
Action,
ActionEvent,
@@ -131,8 +131,8 @@ class JsonlRunState:
class JsonlSubprocessRunner(BaseRunner):
def get_logger(self) -> logging.Logger:
return getattr(self, "logger", logging.getLogger(__name__))
def get_logger(self) -> Any:
return getattr(self, "logger", get_logger(__name__))
def command(self) -> str:
raise NotImplementedError
@@ -230,15 +230,9 @@ class JsonlSubprocessRunner(BaseRunner):
async def iter_json_lines(
self,
stream: Any,
*,
logger: logging.Logger,
tag: str,
) -> AsyncIterator[bytes]:
async for raw_line in iter_bytes_lines(stream):
raw = raw_line.rstrip(b"\n")
text = raw.decode("utf-8", errors="replace")
logger.debug("[%s][jsonl] %s", tag, text)
yield raw
yield raw_line.rstrip(b"\n")
def decode_error_events(
self,
@@ -356,6 +350,13 @@ class JsonlSubprocessRunner(BaseRunner):
cmd = [self.command(), *self.build_args(prompt, resume, state=state)]
payload = self.stdin_payload(prompt, resume, state=state)
env = self.env(state=state)
logger.info(
"runner.start",
engine=self.engine,
resume=resume.value if resume else None,
prompt=prompt,
prompt_len=len(prompt),
)
async with manage_subprocess(
cmd,
@@ -369,12 +370,23 @@ class JsonlSubprocessRunner(BaseRunner):
if payload is not None and proc.stdin is None:
raise RuntimeError(self.pipes_error_message())
logger.debug("[%s] spawn pid=%s args=%r", tag, proc.pid, cmd)
logger.info(
"subprocess.spawn",
cmd=cmd[0] if cmd else None,
args=cmd[1:],
pid=proc.pid,
)
if payload is not None:
assert proc.stdin is not None
await proc.stdin.send(payload)
await proc.stdin.aclose()
logger.info(
"subprocess.stdin.send",
pid=proc.pid,
resume=resume.value if resume else None,
bytes=len(payload),
)
elif proc.stdin is not None:
await proc.stdin.aclose()
@@ -382,6 +394,8 @@ class JsonlSubprocessRunner(BaseRunner):
expected_session: ResumeToken | None = resume
found_session: ResumeToken | None = None
did_emit_completed = False
ignored_after_completed = False
jsonl_seq = 0
async with anyio.create_task_group() as tg:
tg.start_soon(
@@ -390,19 +404,34 @@ class JsonlSubprocessRunner(BaseRunner):
logger,
tag,
)
async for raw_line in self.iter_json_lines(
proc.stdout, logger=logger, tag=tag
):
async for raw_line in self.iter_json_lines(proc.stdout):
if did_emit_completed:
if not ignored_after_completed:
log_pipeline(
logger,
"runner.drop.jsonl_after_completed",
pid=proc.pid,
)
ignored_after_completed = True
continue
line = raw_line.strip()
if not line:
continue
jsonl_seq += 1
seq = jsonl_seq
raw_text = raw_line.decode("utf-8", errors="replace")
line_text = line.decode("utf-8", errors="replace")
try:
decoded = self.decode_jsonl(line=line)
except Exception as exc:
log_pipeline(
logger,
"jsonl.parse.error",
pid=proc.pid,
jsonl_seq=seq,
line=line_text,
error=str(exc),
)
events = self.decode_error_events(
raw=raw_text,
line=line_text,
@@ -411,6 +440,19 @@ class JsonlSubprocessRunner(BaseRunner):
)
else:
if decoded is None:
log_pipeline(
logger,
"jsonl.parse.invalid",
pid=proc.pid,
jsonl_seq=seq,
line=line_text,
)
logger.info(
"runner.jsonl.invalid",
pid=proc.pid,
jsonl_seq=seq,
line=line_text,
)
events = self.invalid_json_events(
raw=raw_text,
line=line_text,
@@ -425,6 +467,13 @@ class JsonlSubprocessRunner(BaseRunner):
found_session=found_session,
)
except Exception as exc:
log_pipeline(
logger,
"runner.translate.error",
pid=proc.pid,
jsonl_seq=seq,
error=str(exc),
)
events = self.translate_error_events(
data=decoded,
error=exc,
@@ -433,22 +482,74 @@ class JsonlSubprocessRunner(BaseRunner):
for evt in events:
if isinstance(evt, StartedEvent):
prior_found = found_session
try:
found_session, emit = self.handle_started_event(
evt,
expected_session=expected_session,
found_session=found_session,
)
except Exception as exc:
log_pipeline(
logger,
"runner.started.error",
pid=proc.pid,
jsonl_seq=seq,
resume=evt.resume.value,
expected_session=expected_session.value
if expected_session
else None,
found_session=prior_found.value
if prior_found
else None,
error=str(exc),
)
raise
if prior_found is None and emit:
reason = (
"matched_expected"
if expected_session is not None
else "first_seen"
)
elif prior_found is not None and not emit:
reason = "duplicate"
else:
reason = "unknown"
log_pipeline(
logger,
"runner.started.seen",
pid=proc.pid,
jsonl_seq=seq,
resume=evt.resume.value,
expected_session=expected_session.value
if expected_session
else None,
found_session=found_session.value
if found_session
else None,
emit=emit,
reason=reason,
)
if not emit:
continue
if isinstance(evt, CompletedEvent):
did_emit_completed = True
log_pipeline(
logger,
"runner.completed.seen",
pid=proc.pid,
jsonl_seq=seq,
ok=evt.ok,
has_answer=bool(evt.answer.strip()),
emit=True,
)
yield evt
break
yield evt
rc = await proc.wait()
logger.debug("[%s] process exit pid=%s rc=%s", tag, proc.pid, rc)
logger.info("subprocess.exit", pid=proc.pid, rc=rc)
if did_emit_completed:
return
if rc is not None and rc != 0:
@@ -459,6 +560,16 @@ class JsonlSubprocessRunner(BaseRunner):
state=state,
)
for evt in events:
if isinstance(evt, CompletedEvent):
log_pipeline(
logger,
"runner.completed.seen",
pid=proc.pid,
ok=evt.ok,
has_answer=bool(evt.answer.strip()),
emit=True,
source="process_error",
)
yield evt
return
@@ -468,6 +579,16 @@ class JsonlSubprocessRunner(BaseRunner):
state=state,
)
for evt in events:
if isinstance(evt, CompletedEvent):
log_pipeline(
logger,
"runner.completed.seen",
pid=proc.pid,
ok=evt.ok,
has_answer=bool(evt.answer.strip()),
emit=True,
source="stream_end",
)
yield evt
+7 -9
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import logging
import os
import re
from dataclasses import dataclass, field
@@ -11,12 +10,13 @@ import msgspec
from ..backends import EngineBackend, EngineConfig
from ..events import EventFactory
from ..logging import get_logger
from ..model import Action, ActionKind, EngineId, ResumeToken, TakopiEvent
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..schemas import claude as claude_schema
from ..utils.paths import relativize_command, relativize_path
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
ENGINE: EngineId = EngineId("claude")
DEFAULT_ALLOWED_TOOLS = ["Bash", "Read", "Edit", "Write"]
@@ -399,12 +399,7 @@ class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
*,
state: ClaudeStreamState,
) -> None:
_ = state
logger.info(
"[claude] start run resume=%r",
resume.value if resume else None,
)
logger.debug("[claude] prompt: %s", prompt)
_ = state, prompt, resume
def decode_jsonl(
self,
@@ -424,7 +419,10 @@ class ClaudeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
_ = raw, line, state
if isinstance(error, msgspec.DecodeError):
self.get_logger().warning(
"[%s] invalid msgspec event: %s", self.tag(), error
"jsonl.msgspec.invalid",
tag=self.tag(),
error=str(error),
error_type=error.__class__.__name__,
)
return []
return super().decode_error_events(
+9 -10
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import logging
import re
from dataclasses import dataclass
from pathlib import Path
@@ -11,12 +10,13 @@ import msgspec
from ..backends import EngineBackend, EngineConfig
from ..config import ConfigError
from ..events import EventFactory
from ..logging import get_logger
from ..model import ActionPhase, EngineId, ResumeToken, TakopiEvent
from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..schemas import codex as codex_schema
from ..utils.paths import relativize_command
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
ENGINE: EngineId = EngineId("codex")
@@ -394,9 +394,7 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner):
*,
state: CodexRunState,
) -> None:
_ = state
logger.info("[codex] start run resume=%r", resume.value if resume else None)
logger.debug("[codex] prompt: %s", prompt)
_ = state, prompt, resume
def decode_jsonl(self, *, line: bytes) -> codex_schema.ThreadEvent:
return codex_schema.decode_event(line)
@@ -412,7 +410,10 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner):
_ = raw, line
if isinstance(error, msgspec.DecodeError):
self.get_logger().warning(
"[%s] invalid msgspec event: %s", self.tag(), error
"jsonl.msgspec.invalid",
tag=self.tag(),
error=str(error),
error_type=error.__class__.__name__,
)
return []
return super().decode_error_events(
@@ -485,9 +486,7 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner):
if state.final_answer is None:
state.final_answer = text
else:
logger.debug(
"[codex] emitted multiple agent messages; using the last one"
)
logger.debug("codex.multiple_agent_messages")
state.final_answer = text
case _:
pass
@@ -538,7 +537,7 @@ class CodexRunner(ResumeTokenMixin, JsonlSubprocessRunner):
resume=resume_for_completed,
)
]
logger.info("[codex] done run session=%s", found_session.value)
logger.info("codex.session.completed", resume=found_session.value)
return [
state.factory.completed_ok(
answer=state.final_answer or "",
+8 -10
View File
@@ -13,7 +13,6 @@ Session IDs use the format: ses_XXXX (e.g., ses_494719016ffe85dkDMj0FPRbHK)
from __future__ import annotations
import logging
import re
from dataclasses import dataclass, field
from pathlib import Path
@@ -23,6 +22,7 @@ import msgspec
from ..backends import EngineBackend, EngineConfig
from ..config import ConfigError
from ..logging import get_logger
from ..model import (
Action,
ActionEvent,
@@ -37,7 +37,7 @@ from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..schemas import opencode as opencode_schema
from ..utils.paths import relativize_command, relativize_path
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
ENGINE: EngineId = EngineId("opencode")
@@ -350,7 +350,7 @@ class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
opencode_cmd: str = "opencode"
model: str | None = None
session_title: str = "opencode"
logger: logging.Logger = logger
logger = logger
def format_resume(self, token: ResumeToken) -> str:
if token.engine != ENGINE:
@@ -397,12 +397,7 @@ class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
*,
state: OpenCodeStreamState,
) -> None:
_ = state
logger.info(
"[opencode] start run resume=%r",
resume.value if resume else None,
)
logger.debug("[opencode] prompt: %s", prompt)
_ = state, prompt, resume
def invalid_json_events(
self,
@@ -444,7 +439,10 @@ class OpenCodeRunner(ResumeTokenMixin, JsonlSubprocessRunner):
_ = raw, line, state
if isinstance(error, msgspec.DecodeError):
self.get_logger().warning(
"[%s] invalid msgspec event: %s", self.tag(), error
"jsonl.msgspec.invalid",
tag=self.tag(),
error=str(error),
error_type=error.__class__.__name__,
)
return []
return super().decode_error_events(
+6 -3
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import logging
import os
import re
from dataclasses import dataclass, field
@@ -13,6 +12,7 @@ import msgspec
from ..backends import EngineBackend, EngineConfig
from ..config import ConfigError
from ..logging import get_logger
from ..model import (
Action,
ActionEvent,
@@ -29,7 +29,7 @@ from ..runner import JsonlSubprocessRunner, ResumeTokenMixin, Runner
from ..schemas import pi as pi_schema
from ..utils.paths import relativize_command, relativize_path
logger = logging.getLogger(__name__)
logger = get_logger(__name__)
ENGINE: EngineId = EngineId("pi")
@@ -370,7 +370,10 @@ class PiRunner(ResumeTokenMixin, JsonlSubprocessRunner):
_ = raw, line, state
if isinstance(error, msgspec.DecodeError):
self.get_logger().warning(
"[%s] invalid msgspec event: %s", self.tag(), error
"jsonl.msgspec.invalid",
tag=self.tag(),
error=str(error),
error_type=error.__class__.__name__,
)
return []
return super().decode_error_events(
+30 -27
View File
@@ -1,14 +1,12 @@
from __future__ import annotations
import logging
from typing import Any, Protocol
import httpx
from .logging import RedactTokenFilter
from .logging import get_logger
logger = logging.getLogger(__name__)
logger.addFilter(RedactTokenFilter())
logger = get_logger(__name__)
class BotClient(Protocol):
@@ -71,13 +69,17 @@ class TelegramClient:
await self._client.aclose()
async def _post(self, method: str, json_data: dict[str, Any]) -> Any | None:
logger.debug("[telegram] request %s: %s", method, json_data)
logger.debug("telegram.request", method=method, payload=json_data)
try:
resp = await self._client.post(f"{self._base}/{method}", json=json_data)
except httpx.HTTPError as e:
url = getattr(e.request, "url", None)
logger.error(
"[telegram] network error method=%s url=%s: %s", method, url, e
"telegram.network_error",
method=method,
url=str(url) if url is not None else None,
error=str(e),
error_type=e.__class__.__name__,
)
return None
@@ -86,12 +88,12 @@ class TelegramClient:
except httpx.HTTPStatusError as e:
body = resp.text
logger.error(
"[telegram] http error method=%s status=%s url=%s: %s body=%r",
method,
resp.status_code,
resp.request.url,
e,
body,
"telegram.http_error",
method=method,
status=resp.status_code,
url=str(resp.request.url),
error=str(e),
body=body,
)
return None
@@ -100,34 +102,35 @@ class TelegramClient:
except Exception as e:
body = resp.text
logger.error(
"[telegram] bad response method=%s status=%s url=%s: %s body=%r",
method,
resp.status_code,
resp.request.url,
e,
body,
"telegram.bad_response",
method=method,
status=resp.status_code,
url=str(resp.request.url),
error=str(e),
error_type=e.__class__.__name__,
body=body,
)
return None
if not isinstance(payload, dict):
logger.error(
"[telegram] invalid response method=%s url=%s: %r",
method,
resp.request.url,
payload,
"telegram.invalid_payload",
method=method,
url=str(resp.request.url),
payload=payload,
)
return None
if not payload.get("ok"):
logger.error(
"[telegram] api error method=%s url=%s: %s",
method,
resp.request.url,
payload,
"telegram.api_error",
method=method,
url=str(resp.request.url),
payload=payload,
)
return None
logger.debug("[telegram] response %s: %s", method, payload)
logger.debug("telegram.response", method=method, payload=payload)
return payload.get("result")
async def get_updates(
+17 -5
View File
@@ -1,13 +1,15 @@
from __future__ import annotations
from collections.abc import AsyncIterator
import logging
import sys
from typing import Any
import anyio
from anyio.abc import ByteReceiveStream
from anyio.streams.buffered import BufferedByteReceiveStream
from ..logging import log_pipeline
async def iter_bytes_lines(stream: ByteReceiveStream) -> AsyncIterator[bytes]:
buffered = BufferedByteReceiveStream(stream)
@@ -21,12 +23,22 @@ async def iter_bytes_lines(stream: ByteReceiveStream) -> AsyncIterator[bytes]:
async def drain_stderr(
stream: ByteReceiveStream,
logger: logging.Logger,
logger: Any,
tag: str,
) -> None:
try:
async for line in iter_bytes_lines(stream):
text = line.decode("utf-8", errors="replace")
logger.debug("[%s][stderr] %s", tag, text)
except Exception as e:
logger.debug("[%s][stderr] drain error: %s", tag, e)
log_pipeline(
logger,
"subprocess.stderr",
tag=tag,
line=text,
)
except Exception as exc:
log_pipeline(
logger,
"subprocess.stderr.error",
tag=tag,
error=str(exc),
)
+15 -4
View File
@@ -1,6 +1,5 @@
from __future__ import annotations
import logging
import os
import signal
from collections.abc import AsyncIterator, Sequence
@@ -10,7 +9,9 @@ from typing import Any
import anyio
from anyio.abc import Process
logger = logging.getLogger(__name__)
from ..logging import get_logger
logger = get_logger(__name__)
async def wait_for_process(proc: Process, timeout: float) -> bool:
@@ -29,7 +30,12 @@ def terminate_process(proc: Process) -> None:
except ProcessLookupError:
return
except Exception as e:
logger.debug("[subprocess] failed to terminate process group: %s", e)
logger.debug(
"subprocess.terminate.failed",
error=str(e),
error_type=e.__class__.__name__,
pid=proc.pid,
)
try:
proc.terminate()
except ProcessLookupError:
@@ -46,7 +52,12 @@ def kill_process(proc: Process) -> None:
except ProcessLookupError:
return
except Exception as e:
logger.debug("[subprocess] failed to kill process group: %s", e)
logger.debug(
"subprocess.kill.failed",
error=str(e),
error_type=e.__class__.__name__,
pid=proc.pid,
)
try:
proc.kill()
except ProcessLookupError:
+6 -12
View File
@@ -1,9 +1,7 @@
import logging
import httpx
import pytest
from takopi.logging import RedactTokenFilter
from takopi.logging import setup_logging
from takopi.telegram import TelegramClient
@@ -38,19 +36,16 @@ async def test_telegram_429_no_retry() -> None:
@pytest.mark.anyio
async def test_no_token_in_logs_on_http_error(
caplog: pytest.LogCaptureFixture,
capsys: pytest.CaptureFixture[str],
) -> None:
token = "123:abcDEF_ghij"
redactor = RedactTokenFilter()
root_logger = logging.getLogger()
root_logger.addFilter(redactor)
setup_logging(debug=True)
def handler(request: httpx.Request) -> httpx.Response:
return httpx.Response(500, text="oops", request=request)
transport = httpx.MockTransport(handler)
caplog.set_level(logging.ERROR)
client = httpx.AsyncClient(transport=transport)
try:
tg = TelegramClient(token, client=client)
@@ -58,7 +53,6 @@ async def test_no_token_in_logs_on_http_error(
finally:
await client.aclose()
root_logger.removeFilter(redactor)
assert token not in caplog.text
assert "bot[REDACTED]" in caplog.text
out = capsys.readouterr().out
assert token not in out
assert "bot[REDACTED]" in out
Generated
+11
View File
@@ -387,6 +387,15 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/b7/ce/149a00dd41f10bc29e5921b496af8b574d8413afcd5e30dfa0ed46c2cc5e/six-1.17.0-py2.py3-none-any.whl", hash = "sha256:4721f391ed90541fddacab5acf947aa0d3dc7d27b2e1e8eda2be8970586c3274", size = 11050, upload-time = "2024-12-04T17:35:26.475Z" },
]
[[package]]
name = "structlog"
version = "25.5.0"
source = { registry = "https://pypi.org/simple" }
sdist = { url = "https://files.pythonhosted.org/packages/ef/52/9ba0f43b686e7f3ddfeaa78ac3af750292662284b3661e91ad5494f21dbc/structlog-25.5.0.tar.gz", hash = "sha256:098522a3bebed9153d4570c6d0288abf80a031dfdb2048d59a49e9dc2190fc98", size = 1460830, upload-time = "2025-10-27T08:28:23.028Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/a8/45/a132b9074aa18e799b891b91ad72133c98d8042c70f6240e4c5f9dabee2f/structlog-25.5.0-py3-none-any.whl", hash = "sha256:a8453e9b9e636ec59bd9e79bbd4a72f025981b3ba0f5837aebf48f02f37a7f9f", size = 72510, upload-time = "2025-10-27T08:28:21.535Z" },
]
[[package]]
name = "sulguk"
version = "0.11.1"
@@ -411,6 +420,7 @@ dependencies = [
{ name = "msgspec" },
{ name = "questionary" },
{ name = "rich" },
{ name = "structlog" },
{ name = "sulguk" },
{ name = "typer" },
]
@@ -432,6 +442,7 @@ requires-dist = [
{ name = "msgspec", specifier = ">=0.20.0" },
{ name = "questionary", specifier = ">=2.1.1" },
{ name = "rich", specifier = ">=14.2.0" },
{ name = "structlog", specifier = ">=25.5.0" },
{ name = "sulguk", specifier = ">=0.11.1" },
{ name = "typer", specifier = ">=0.21.0" },
]