Files
takopi/tests/test_claude_runner.py
T
2026-01-01 20:31:11 +04:00

277 lines
8.9 KiB
Python

import json
from pathlib import Path
import anyio
import pytest
from takopi.model import ActionEvent, CompletedEvent, ResumeToken, StartedEvent
from takopi.runners.claude import (
ClaudeRunner,
ClaudeStreamState,
ENGINE,
translate_claude_event,
)
def _load_fixture(name: str) -> list[dict]:
path = Path(__file__).parent / "fixtures" / name
return [json.loads(line) for line in path.read_text().splitlines() if line.strip()]
def test_claude_resume_format_and_extract() -> None:
runner = ClaudeRunner(claude_cmd="claude")
token = ResumeToken(engine=ENGINE, value="sid")
assert runner.format_resume(token) == "`claude --resume sid`"
assert runner.extract_resume("`claude --resume sid`") == token
assert runner.extract_resume("claude -r other") == ResumeToken(
engine=ENGINE, value="other"
)
assert runner.extract_resume("`codex resume sid`") is None
def test_translate_success_fixture() -> None:
state = ClaudeStreamState()
events: list = []
for event in _load_fixture("claude_stream_success.jsonl"):
events.extend(translate_claude_event(event, title="claude", state=state))
assert isinstance(events[0], StartedEvent)
started = next(evt for evt in events if isinstance(evt, StartedEvent))
action_events = [evt for evt in events if isinstance(evt, ActionEvent)]
assert len(action_events) == 4
started_actions = {
(evt.action.id, evt.phase): evt
for evt in action_events
if evt.phase == "started"
}
assert started_actions[("toolu_1", "started")].action.kind == "command"
write_action = started_actions[("toolu_2", "started")].action
assert write_action.kind == "file_change"
assert write_action.detail["changes"][0]["path"] == "notes.md"
completed_actions = {
(evt.action.id, evt.phase): evt
for evt in action_events
if evt.phase == "completed"
}
assert completed_actions[("toolu_1", "completed")].ok is True
assert completed_actions[("toolu_2", "completed")].ok is True
completed = next(evt for evt in events if isinstance(evt, CompletedEvent))
assert events[-1] == completed
assert completed.ok is True
assert completed.resume == started.resume
assert completed.answer == "Done. Added notes.md."
def test_translate_error_fixture_permission_denials() -> None:
state = ClaudeStreamState()
events: list = []
for event in _load_fixture("claude_stream_error.jsonl"):
events.extend(translate_claude_event(event, title="claude", state=state))
started = next(evt for evt in events if isinstance(evt, StartedEvent))
completed = next(evt for evt in events if isinstance(evt, CompletedEvent))
warnings = [
evt
for evt in events
if isinstance(evt, ActionEvent) and evt.action.kind == "warning"
]
assert warnings
assert events.index(warnings[0]) < events.index(completed)
assert completed.ok is False
assert completed.error == "Permission denied"
assert completed.resume == started.resume
def test_tool_results_pop_pending_actions() -> None:
state = ClaudeStreamState()
tool_use_event = {
"type": "assistant",
"message": {
"id": "msg_1",
"content": [
{
"type": "tool_use",
"id": "toolu_1",
"name": "Bash",
"input": {"command": "echo hi"},
}
],
},
}
tool_result_event = {
"type": "user",
"message": {
"id": "msg_2",
"content": [
{
"type": "tool_result",
"tool_use_id": "toolu_1",
"content": "ok",
"is_error": False,
}
],
},
}
translate_claude_event(tool_use_event, title="claude", state=state)
assert "toolu_1" in state.pending_actions
translate_claude_event(tool_result_event, title="claude", state=state)
assert not state.pending_actions
@pytest.mark.anyio
async def test_run_serializes_same_session() -> None:
runner = ClaudeRunner(claude_cmd="claude")
gate = anyio.Event()
in_flight = 0
max_in_flight = 0
async def run_stub(*_args, **_kwargs):
nonlocal in_flight, max_in_flight
in_flight += 1
max_in_flight = max(max_in_flight, in_flight)
try:
await gate.wait()
yield CompletedEvent(
engine=ENGINE,
resume=ResumeToken(engine=ENGINE, value="sid"),
ok=True,
answer="ok",
)
finally:
in_flight -= 1
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
pass
token = ResumeToken(engine=ENGINE, value="sid")
async with anyio.create_task_group() as tg:
tg.start_soon(drain, "a", token)
tg.start_soon(drain, "b", token)
await anyio.sleep(0)
gate.set()
assert max_in_flight == 1
@pytest.mark.anyio
async def test_run_serializes_new_session_after_session_is_known(
tmp_path, monkeypatch
) -> None:
gate_path = tmp_path / "gate"
resume_marker = tmp_path / "resume_started"
session_id = "session_01"
claude_path = tmp_path / "claude"
claude_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import os\n"
"import sys\n"
"import time\n"
"\n"
"gate = os.environ['CLAUDE_TEST_GATE']\n"
"resume_marker = os.environ['CLAUDE_TEST_RESUME_MARKER']\n"
"session_id = os.environ['CLAUDE_TEST_SESSION_ID']\n"
"\n"
"args = sys.argv[1:]\n"
"if '--resume' in args or '-r' in args:\n"
" print(json.dumps({'type': 'system', 'subtype': 'init', 'session_id': session_id}), flush=True)\n"
" with open(resume_marker, 'w', encoding='utf-8') as f:\n"
" f.write('started')\n"
" f.flush()\n"
" sys.exit(0)\n"
"\n"
"print(json.dumps({'type': 'system', 'subtype': 'init', 'session_id': session_id}), flush=True)\n"
"while not os.path.exists(gate):\n"
" time.sleep(0.001)\n"
"sys.exit(0)\n",
encoding="utf-8",
)
claude_path.chmod(0o755)
monkeypatch.setenv("CLAUDE_TEST_GATE", str(gate_path))
monkeypatch.setenv("CLAUDE_TEST_RESUME_MARKER", str(resume_marker))
monkeypatch.setenv("CLAUDE_TEST_SESSION_ID", session_id)
runner = ClaudeRunner(claude_cmd=str(claude_path))
session_started = anyio.Event()
resume_value: str | None = None
new_done = anyio.Event()
async def run_new() -> None:
nonlocal resume_value
async for event in runner.run("hello", None):
if isinstance(event, StartedEvent):
resume_value = event.resume.value
session_started.set()
new_done.set()
async def run_resume() -> None:
assert resume_value is not None
async for _event in runner.run(
"resume", ResumeToken(engine=ENGINE, value=resume_value)
):
pass
async with anyio.create_task_group() as tg:
tg.start_soon(run_new)
await session_started.wait()
tg.start_soon(run_resume)
await anyio.sleep(0.01)
assert not resume_marker.exists()
gate_path.write_text("go", encoding="utf-8")
await new_done.wait()
with anyio.fail_after(2):
while not resume_marker.exists():
await anyio.sleep(0.001)
@pytest.mark.anyio
async def test_run_strips_anthropic_api_key_by_default(tmp_path, monkeypatch) -> None:
claude_path = tmp_path / "claude"
claude_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import os\n"
"\n"
"session_id = 'session_01'\n"
"status = 'set' if os.environ.get('ANTHROPIC_API_KEY') else 'unset'\n"
"print(json.dumps({'type': 'system', 'subtype': 'init', 'session_id': session_id}), flush=True)\n"
"print(json.dumps({'type': 'result', 'subtype': 'success', 'is_error': False, 'result': f'api={status}', 'session_id': session_id}), flush=True)\n"
"raise SystemExit(0)\n",
encoding="utf-8",
)
claude_path.chmod(0o755)
monkeypatch.setenv("ANTHROPIC_API_KEY", "secret")
runner = ClaudeRunner(claude_cmd=str(claude_path))
answer: str | None = None
async for event in runner.run("hello", None):
if isinstance(event, CompletedEvent):
answer = event.answer
assert answer == "api=unset"
runner_api = ClaudeRunner(claude_cmd=str(claude_path), use_api_billing=True)
answer = None
async for event in runner_api.run("hello", None):
if isinstance(event, CompletedEvent):
answer = event.answer
assert answer == "api=set"