Files
takopi/tests/test_exec_runner.py
T

552 lines
20 KiB
Python

import anyio
import pytest
from collections.abc import AsyncIterator
from takopi.model import (
ActionEvent,
CompletedEvent,
ResumeToken,
StartedEvent,
TakopiEvent,
)
from takopi.runners.codex import CodexRunner, find_exec_only_flag
CODEX_ENGINE = "codex"
@pytest.mark.anyio
async def test_run_serializes_same_session() -> None:
runner = CodexRunner(codex_cmd="codex", extra_args=[])
gate = anyio.Event()
in_flight = 0
max_in_flight = 0
async def run_stub(*_args, **_kwargs) -> AsyncIterator[TakopiEvent]:
nonlocal in_flight, max_in_flight
in_flight += 1
max_in_flight = max(max_in_flight, in_flight)
try:
await gate.wait()
yield CompletedEvent(
engine=CODEX_ENGINE,
resume=ResumeToken(engine=CODEX_ENGINE, value="sid"),
ok=True,
answer="ok",
)
finally:
in_flight -= 1
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
pass
token = ResumeToken(engine=CODEX_ENGINE, value="sid")
async with anyio.create_task_group() as tg:
tg.start_soon(drain, "a", token)
tg.start_soon(drain, "b", token)
await anyio.sleep(0)
gate.set()
assert max_in_flight == 1
@pytest.mark.anyio
async def test_run_allows_parallel_new_sessions() -> None:
runner = CodexRunner(codex_cmd="codex", extra_args=[])
gate = anyio.Event()
in_flight = 0
max_in_flight = 0
async def run_stub(*_args, **_kwargs) -> AsyncIterator[TakopiEvent]:
nonlocal in_flight, max_in_flight
in_flight += 1
max_in_flight = max(max_in_flight, in_flight)
try:
await gate.wait()
yield CompletedEvent(
engine=CODEX_ENGINE,
resume=ResumeToken(engine=CODEX_ENGINE, value="sid"),
ok=True,
answer="ok",
)
finally:
in_flight -= 1
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
pass
async with anyio.create_task_group() as tg:
tg.start_soon(drain, "a", None)
tg.start_soon(drain, "b", None)
await anyio.sleep(0)
gate.set()
assert max_in_flight == 2
@pytest.mark.anyio
async def test_run_allows_parallel_different_sessions() -> None:
runner = CodexRunner(codex_cmd="codex", extra_args=[])
gate = anyio.Event()
in_flight = 0
max_in_flight = 0
async def run_stub(*_args, **_kwargs) -> AsyncIterator[TakopiEvent]:
nonlocal in_flight, max_in_flight
in_flight += 1
max_in_flight = max(max_in_flight, in_flight)
try:
await gate.wait()
yield CompletedEvent(
engine=CODEX_ENGINE,
resume=ResumeToken(engine=CODEX_ENGINE, value="sid"),
ok=True,
answer="ok",
)
finally:
in_flight -= 1
runner.run_impl = run_stub # type: ignore[assignment]
async def drain(prompt: str, resume: ResumeToken | None) -> None:
async for _event in runner.run(prompt, resume):
pass
token_a = ResumeToken(engine=CODEX_ENGINE, value="sid-a")
token_b = ResumeToken(engine=CODEX_ENGINE, value="sid-b")
async with anyio.create_task_group() as tg:
tg.start_soon(drain, "a", token_a)
tg.start_soon(drain, "b", token_b)
await anyio.sleep(0)
gate.set()
assert max_in_flight == 2
def test_codex_exec_flags_after_exec() -> None:
runner = CodexRunner(
codex_cmd="codex",
extra_args=["-c", "notify=[]"],
)
state = runner.new_state("hi", None)
args = runner.build_args("hi", None, state=state)
assert args == [
"-c",
"notify=[]",
"exec",
"--json",
"--skip-git-repo-check",
"--color=never",
"-",
]
@pytest.mark.parametrize(
("extra_args", "expected"),
[
([], None),
(["-c", "notify=[]"], None),
(["--skip-git-repo-check"], "--skip-git-repo-check"),
(["--color=never"], "--color=never"),
(["--output-schema", "schema.json"], "--output-schema"),
(["--output-last-message=out.txt"], "--output-last-message=out.txt"),
(["-o", "out.txt"], "-o"),
],
)
def test_find_exec_only_flag(extra_args: list[str], expected: str | None) -> None:
assert find_exec_only_flag(extra_args) == expected
@pytest.mark.anyio
async def test_run_serializes_new_session_after_session_is_known(
tmp_path, monkeypatch
) -> None:
gate_path = tmp_path / "gate"
resume_marker = tmp_path / "resume_started"
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import os\n"
"import sys\n"
"import time\n"
"\n"
"gate = os.environ['CODEX_TEST_GATE']\n"
"resume_marker = os.environ['CODEX_TEST_RESUME_MARKER']\n"
"thread_id = os.environ['CODEX_TEST_THREAD_ID']\n"
"\n"
"args = sys.argv[1:]\n"
"if 'resume' in args:\n"
" print(json.dumps({'type': 'thread.started', 'thread_id': thread_id}), flush=True)\n"
" with open(resume_marker, 'w', encoding='utf-8') as f:\n"
" f.write('started')\n"
" f.flush()\n"
" sys.exit(0)\n"
"\n"
"print(json.dumps({'type': 'thread.started', 'thread_id': thread_id}), flush=True)\n"
"while not os.path.exists(gate):\n"
" time.sleep(0.001)\n"
"sys.exit(0)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
monkeypatch.setenv("CODEX_TEST_GATE", str(gate_path))
monkeypatch.setenv("CODEX_TEST_RESUME_MARKER", str(resume_marker))
monkeypatch.setenv("CODEX_TEST_THREAD_ID", thread_id)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
session_started = anyio.Event()
resume_value: str | None = None
new_done = anyio.Event()
async def run_new() -> None:
nonlocal resume_value
async for event in runner.run("hello", None):
if isinstance(event, StartedEvent):
resume_value = event.resume.value
session_started.set()
new_done.set()
async def run_resume() -> None:
assert resume_value is not None
async for _event in runner.run(
"resume", ResumeToken(engine=CODEX_ENGINE, value=resume_value)
):
pass
async with anyio.create_task_group() as tg:
tg.start_soon(run_new)
await session_started.wait()
tg.start_soon(run_resume)
await anyio.sleep(0.01)
assert not resume_marker.exists()
gate_path.write_text("go", encoding="utf-8")
await new_done.wait()
with anyio.fail_after(2):
while not resume_marker.exists():
await anyio.sleep(0.001)
@pytest.mark.anyio
async def test_codex_runner_preserves_warning_order(tmp_path) -> None:
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import sys\n"
"\n"
"sys.stdin.read()\n"
"print(json.dumps({'type': 'error', 'message': 'warning one'}), flush=True)\n"
f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'agent_message', 'text': 'ok'}}), flush=True)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
seen = [evt async for evt in runner.run("hi", None)]
assert len(seen) == 3
assert isinstance(seen[0], ActionEvent)
assert seen[0].phase == "completed"
assert seen[0].ok is False
assert seen[0].action.kind == "warning"
assert seen[0].action.title == "warning one"
assert isinstance(seen[1], StartedEvent)
assert seen[1].resume.value == thread_id
assert isinstance(seen[2], CompletedEvent)
assert seen[2].resume == seen[1].resume
assert seen[2].answer == "ok"
@pytest.mark.anyio
async def test_codex_runner_reconnect_notice_is_non_fatal(tmp_path) -> None:
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import sys\n"
"\n"
"sys.stdin.read()\n"
"print(json.dumps({'type': 'error', 'message': 'Reconnecting... 1/5'}), flush=True)\n"
f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'agent_message', 'text': 'ok'}}), flush=True)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
seen = [evt async for evt in runner.run("hi", None)]
assert len(seen) == 3
assert isinstance(seen[0], ActionEvent)
assert seen[0].phase == "started"
assert seen[0].ok is None
assert seen[0].action.kind == "note"
assert seen[0].action.title == "Reconnecting... 1/5"
assert isinstance(seen[1], StartedEvent)
assert seen[1].resume.value == thread_id
assert isinstance(seen[2], CompletedEvent)
assert seen[2].resume == seen[1].resume
assert seen[2].answer == "ok"
@pytest.mark.anyio
async def test_codex_runner_reconnect_notice_updates_phase(tmp_path) -> None:
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import sys\n"
"\n"
"sys.stdin.read()\n"
"print(json.dumps({'type': 'error', 'message': 'Reconnecting... 1/5'}), flush=True)\n"
"print(json.dumps({'type': 'error', 'message': 'Reconnecting... 2/5'}), flush=True)\n"
f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'agent_message', 'text': 'ok'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.completed', 'usage': {'input_tokens': 1, 'cached_input_tokens': 0, 'output_tokens': 1}}), flush=True)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
seen = [evt async for evt in runner.run("hi", None)]
assert len(seen) == 4
first = seen[0]
second = seen[1]
assert isinstance(first, ActionEvent)
assert isinstance(second, ActionEvent)
assert first.phase == "started"
assert second.phase == "updated"
assert first.action.id == second.action.id == "codex.reconnect"
assert isinstance(seen[2], StartedEvent)
assert isinstance(seen[3], CompletedEvent)
@pytest.mark.anyio
async def test_codex_runner_prefers_final_answer_phase(tmp_path) -> None:
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import sys\n"
"\n"
"sys.stdin.read()\n"
f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.started'}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'agent_message', 'phase': 'commentary', 'text': 'Working through the task.'}}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_1', 'type': 'agent_message', 'phase': 'final_answer', 'text': 'Done.'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.completed', 'usage': {'input_tokens': 1, 'cached_input_tokens': 0, 'output_tokens': 1}}), flush=True)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
seen = [evt async for evt in runner.run("hi", None)]
assert len(seen) == 4
assert isinstance(seen[0], StartedEvent)
assert isinstance(seen[1], ActionEvent)
assert seen[1].action.kind == "turn"
assert isinstance(seen[2], ActionEvent)
assert seen[2].action.kind == "note"
assert seen[2].action.title == "Working through the task."
assert seen[2].phase == "completed"
assert seen[2].ok is True
assert isinstance(seen[3], CompletedEvent)
assert seen[3].answer == "Done."
@pytest.mark.anyio
async def test_codex_runner_legacy_agent_message_no_phase(tmp_path) -> None:
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import sys\n"
"\n"
"sys.stdin.read()\n"
f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.started'}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'agent_message', 'text': 'first'}}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_1', 'type': 'agent_message', 'text': 'second'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.completed', 'usage': {'input_tokens': 1, 'cached_input_tokens': 0, 'output_tokens': 1}}), flush=True)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
seen = [evt async for evt in runner.run("hi", None)]
completed = next(evt for evt in seen if isinstance(evt, CompletedEvent))
assert completed.answer == "second"
@pytest.mark.anyio
async def test_codex_runner_collab_tool_call_does_not_break_stream(tmp_path) -> None:
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import sys\n"
"\n"
"sys.stdin.read()\n"
f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.started'}), flush=True)\n"
"print(json.dumps({'type': 'item.started', 'item': {'id': 'item_0', 'type': 'collab_tool_call', 'tool': 'spawn_agent', 'sender_thread_id': 'main', 'receiver_thread_ids': ['worker'], 'prompt': 'check tests', 'agents_states': {}, 'status': 'in_progress'}}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'collab_tool_call', 'tool': 'spawn_agent', 'sender_thread_id': 'main', 'receiver_thread_ids': ['worker'], 'prompt': 'check tests', 'agents_states': {'worker': {'status': 'completed', 'message': 'ok'}}, 'status': 'completed'}}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_1', 'type': 'agent_message', 'text': 'ok'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.completed', 'usage': {'input_tokens': 1, 'cached_input_tokens': 0, 'output_tokens': 1}}), flush=True)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
seen = [evt async for evt in runner.run("hi", None)]
completed = next(evt for evt in seen if isinstance(evt, CompletedEvent))
assert completed.answer == "ok"
@pytest.mark.anyio
async def test_codex_runner_unknown_item_type_does_not_break_stream(tmp_path) -> None:
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import sys\n"
"\n"
"sys.stdin.read()\n"
f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.started'}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'future_item', 'foo': 'bar'}}), flush=True)\n"
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_1', 'type': 'agent_message', 'text': 'ok'}}), flush=True)\n"
"print(json.dumps({'type': 'turn.completed', 'usage': {'input_tokens': 1, 'cached_input_tokens': 0, 'output_tokens': 1}}), flush=True)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
seen = [evt async for evt in runner.run("hi", None)]
completed = next(evt for evt in seen if isinstance(evt, CompletedEvent))
assert completed.ok is True
assert completed.answer == "ok"
@pytest.mark.anyio
async def test_codex_runner_includes_stderr_reason(tmp_path) -> None:
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import sys\n"
"\n"
"sys.stderr.write('Not inside a trusted directory and --skip-git-repo-check was not specified.\\n')\n"
"sys.stderr.flush()\n"
"sys.exit(1)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
events = [evt async for evt in runner.run("hi", None)]
completed = next(evt for evt in events if isinstance(evt, CompletedEvent))
assert completed.ok is False
assert completed.error is not None
assert "codex exec failed (rc=1)." in completed.error
assert "Not inside a trusted directory" not in completed.error
@pytest.mark.anyio
async def test_run_serializes_two_new_sessions_same_thread(
tmp_path, monkeypatch
) -> None:
gate_path = tmp_path / "gate"
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
codex_path = tmp_path / "codex"
codex_path.write_text(
"#!/usr/bin/env python3\n"
"import json\n"
"import os\n"
"import sys\n"
"import time\n"
"\n"
"gate = os.environ['CODEX_TEST_GATE']\n"
"thread_id = os.environ['CODEX_TEST_THREAD_ID']\n"
"\n"
"print(json.dumps({'type': 'thread.started', 'thread_id': thread_id}), flush=True)\n"
"while not os.path.exists(gate):\n"
" time.sleep(0.001)\n"
"sys.exit(0)\n",
encoding="utf-8",
)
codex_path.chmod(0o755)
monkeypatch.setenv("CODEX_TEST_GATE", str(gate_path))
monkeypatch.setenv("CODEX_TEST_THREAD_ID", thread_id)
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
started_first = anyio.Event()
started_second = anyio.Event()
async def run_first() -> None:
async for event in runner.run("one", None):
if isinstance(event, StartedEvent):
started_first.set()
async def run_second() -> None:
async for event in runner.run("two", None):
if isinstance(event, StartedEvent):
started_second.set()
async with anyio.create_task_group() as tg:
tg.start_soon(run_first)
tg.start_soon(run_second)
with anyio.fail_after(2):
while not (started_first.is_set() or started_second.is_set()):
await anyio.sleep(0.001)
assert not (started_first.is_set() and started_second.is_set())
gate_path.write_text("go", encoding="utf-8")
with anyio.fail_after(2):
await started_first.wait()
await started_second.wait()