329 lines
10 KiB
Python
329 lines
10 KiB
Python
import anyio
|
|
|
|
import pytest
|
|
|
|
from collections.abc import AsyncIterator
|
|
|
|
from takopi.model import (
|
|
ActionEvent,
|
|
CompletedEvent,
|
|
EngineId,
|
|
ResumeToken,
|
|
StartedEvent,
|
|
TakopiEvent,
|
|
)
|
|
from takopi.runners.codex import CodexRunner
|
|
|
|
CODEX_ENGINE = EngineId("codex")
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_run_serializes_same_session() -> None:
|
|
runner = CodexRunner(codex_cmd="codex", extra_args=[])
|
|
gate = anyio.Event()
|
|
in_flight = 0
|
|
max_in_flight = 0
|
|
|
|
async def run_stub(*_args, **_kwargs) -> AsyncIterator[TakopiEvent]:
|
|
nonlocal in_flight, max_in_flight
|
|
in_flight += 1
|
|
max_in_flight = max(max_in_flight, in_flight)
|
|
try:
|
|
await gate.wait()
|
|
yield CompletedEvent(
|
|
engine=CODEX_ENGINE,
|
|
resume=ResumeToken(engine=CODEX_ENGINE, value="sid"),
|
|
ok=True,
|
|
answer="ok",
|
|
)
|
|
finally:
|
|
in_flight -= 1
|
|
|
|
runner.run_impl = run_stub # type: ignore[assignment]
|
|
|
|
async def drain(prompt: str, resume: ResumeToken | None) -> None:
|
|
async for _event in runner.run(prompt, resume):
|
|
pass
|
|
|
|
token = ResumeToken(engine=CODEX_ENGINE, value="sid")
|
|
async with anyio.create_task_group() as tg:
|
|
tg.start_soon(drain, "a", token)
|
|
tg.start_soon(drain, "b", token)
|
|
await anyio.sleep(0)
|
|
gate.set()
|
|
assert max_in_flight == 1
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_run_allows_parallel_new_sessions() -> None:
|
|
runner = CodexRunner(codex_cmd="codex", extra_args=[])
|
|
gate = anyio.Event()
|
|
in_flight = 0
|
|
max_in_flight = 0
|
|
|
|
async def run_stub(*_args, **_kwargs) -> AsyncIterator[TakopiEvent]:
|
|
nonlocal in_flight, max_in_flight
|
|
in_flight += 1
|
|
max_in_flight = max(max_in_flight, in_flight)
|
|
try:
|
|
await gate.wait()
|
|
yield CompletedEvent(
|
|
engine=CODEX_ENGINE,
|
|
resume=ResumeToken(engine=CODEX_ENGINE, value="sid"),
|
|
ok=True,
|
|
answer="ok",
|
|
)
|
|
finally:
|
|
in_flight -= 1
|
|
|
|
runner.run_impl = run_stub # type: ignore[assignment]
|
|
|
|
async def drain(prompt: str, resume: ResumeToken | None) -> None:
|
|
async for _event in runner.run(prompt, resume):
|
|
pass
|
|
|
|
async with anyio.create_task_group() as tg:
|
|
tg.start_soon(drain, "a", None)
|
|
tg.start_soon(drain, "b", None)
|
|
await anyio.sleep(0)
|
|
gate.set()
|
|
assert max_in_flight == 2
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_run_allows_parallel_different_sessions() -> None:
|
|
runner = CodexRunner(codex_cmd="codex", extra_args=[])
|
|
gate = anyio.Event()
|
|
in_flight = 0
|
|
max_in_flight = 0
|
|
|
|
async def run_stub(*_args, **_kwargs) -> AsyncIterator[TakopiEvent]:
|
|
nonlocal in_flight, max_in_flight
|
|
in_flight += 1
|
|
max_in_flight = max(max_in_flight, in_flight)
|
|
try:
|
|
await gate.wait()
|
|
yield CompletedEvent(
|
|
engine=CODEX_ENGINE,
|
|
resume=ResumeToken(engine=CODEX_ENGINE, value="sid"),
|
|
ok=True,
|
|
answer="ok",
|
|
)
|
|
finally:
|
|
in_flight -= 1
|
|
|
|
runner.run_impl = run_stub # type: ignore[assignment]
|
|
|
|
async def drain(prompt: str, resume: ResumeToken | None) -> None:
|
|
async for _event in runner.run(prompt, resume):
|
|
pass
|
|
|
|
token_a = ResumeToken(engine=CODEX_ENGINE, value="sid-a")
|
|
token_b = ResumeToken(engine=CODEX_ENGINE, value="sid-b")
|
|
async with anyio.create_task_group() as tg:
|
|
tg.start_soon(drain, "a", token_a)
|
|
tg.start_soon(drain, "b", token_b)
|
|
await anyio.sleep(0)
|
|
gate.set()
|
|
assert max_in_flight == 2
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_run_serializes_new_session_after_session_is_known(
|
|
tmp_path, monkeypatch
|
|
) -> None:
|
|
gate_path = tmp_path / "gate"
|
|
resume_marker = tmp_path / "resume_started"
|
|
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
|
|
|
|
codex_path = tmp_path / "codex"
|
|
codex_path.write_text(
|
|
"#!/usr/bin/env python3\n"
|
|
"import json\n"
|
|
"import os\n"
|
|
"import sys\n"
|
|
"import time\n"
|
|
"\n"
|
|
"gate = os.environ['CODEX_TEST_GATE']\n"
|
|
"resume_marker = os.environ['CODEX_TEST_RESUME_MARKER']\n"
|
|
"thread_id = os.environ['CODEX_TEST_THREAD_ID']\n"
|
|
"\n"
|
|
"args = sys.argv[1:]\n"
|
|
"if 'resume' in args:\n"
|
|
" print(json.dumps({'type': 'thread.started', 'thread_id': thread_id}), flush=True)\n"
|
|
" with open(resume_marker, 'w', encoding='utf-8') as f:\n"
|
|
" f.write('started')\n"
|
|
" f.flush()\n"
|
|
" sys.exit(0)\n"
|
|
"\n"
|
|
"print(json.dumps({'type': 'thread.started', 'thread_id': thread_id}), flush=True)\n"
|
|
"while not os.path.exists(gate):\n"
|
|
" time.sleep(0.001)\n"
|
|
"sys.exit(0)\n",
|
|
encoding="utf-8",
|
|
)
|
|
codex_path.chmod(0o755)
|
|
|
|
monkeypatch.setenv("CODEX_TEST_GATE", str(gate_path))
|
|
monkeypatch.setenv("CODEX_TEST_RESUME_MARKER", str(resume_marker))
|
|
monkeypatch.setenv("CODEX_TEST_THREAD_ID", thread_id)
|
|
|
|
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
|
|
|
|
session_started = anyio.Event()
|
|
resume_value: str | None = None
|
|
|
|
new_done = anyio.Event()
|
|
|
|
async def run_new() -> None:
|
|
nonlocal resume_value
|
|
async for event in runner.run("hello", None):
|
|
if isinstance(event, StartedEvent):
|
|
resume_value = event.resume.value
|
|
session_started.set()
|
|
new_done.set()
|
|
|
|
async def run_resume() -> None:
|
|
assert resume_value is not None
|
|
async for _event in runner.run(
|
|
"resume", ResumeToken(engine=CODEX_ENGINE, value=resume_value)
|
|
):
|
|
pass
|
|
|
|
async with anyio.create_task_group() as tg:
|
|
tg.start_soon(run_new)
|
|
await session_started.wait()
|
|
|
|
tg.start_soon(run_resume)
|
|
await anyio.sleep(0.01)
|
|
|
|
assert not resume_marker.exists()
|
|
|
|
gate_path.write_text("go", encoding="utf-8")
|
|
await new_done.wait()
|
|
|
|
with anyio.fail_after(2):
|
|
while not resume_marker.exists():
|
|
await anyio.sleep(0.001)
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_codex_runner_preserves_warning_order(tmp_path) -> None:
|
|
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
|
|
|
|
codex_path = tmp_path / "codex"
|
|
codex_path.write_text(
|
|
"#!/usr/bin/env python3\n"
|
|
"import json\n"
|
|
"import sys\n"
|
|
"\n"
|
|
"sys.stdin.read()\n"
|
|
"print(json.dumps({'type': 'error', 'message': 'warning one', 'fatal': False}), flush=True)\n"
|
|
f"print(json.dumps({{'type': 'thread.started', 'thread_id': '{thread_id}'}}), flush=True)\n"
|
|
"print(json.dumps({'type': 'item.completed', 'item': {'id': 'item_0', 'type': 'agent_message', 'text': 'ok'}}), flush=True)\n",
|
|
encoding="utf-8",
|
|
)
|
|
codex_path.chmod(0o755)
|
|
|
|
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
|
|
seen = [evt async for evt in runner.run("hi", None)]
|
|
|
|
assert len(seen) == 3
|
|
assert isinstance(seen[0], ActionEvent)
|
|
assert seen[0].phase == "completed"
|
|
assert seen[0].ok is False
|
|
assert seen[0].action.kind == "warning"
|
|
assert seen[0].action.title == "warning one"
|
|
|
|
assert isinstance(seen[1], StartedEvent)
|
|
assert seen[1].resume.value == thread_id
|
|
|
|
assert isinstance(seen[2], CompletedEvent)
|
|
assert seen[2].resume == seen[1].resume
|
|
assert seen[2].answer == "ok"
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_codex_runner_includes_stderr_reason(tmp_path) -> None:
|
|
codex_path = tmp_path / "codex"
|
|
codex_path.write_text(
|
|
"#!/usr/bin/env python3\n"
|
|
"import sys\n"
|
|
"\n"
|
|
"sys.stderr.write('Not inside a trusted directory and --skip-git-repo-check was not specified.\\n')\n"
|
|
"sys.stderr.flush()\n"
|
|
"sys.exit(1)\n",
|
|
encoding="utf-8",
|
|
)
|
|
codex_path.chmod(0o755)
|
|
|
|
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
|
|
events = [evt async for evt in runner.run("hi", None)]
|
|
|
|
completed = next(evt for evt in events if isinstance(evt, CompletedEvent))
|
|
assert completed.ok is False
|
|
assert completed.error is not None
|
|
assert "codex exec failed (rc=1)." in completed.error
|
|
assert "\n\nNot inside a trusted directory" in completed.error
|
|
|
|
|
|
@pytest.mark.anyio
|
|
async def test_run_serializes_two_new_sessions_same_thread(
|
|
tmp_path, monkeypatch
|
|
) -> None:
|
|
gate_path = tmp_path / "gate"
|
|
thread_id = "019b73c4-0c3f-7701-a0bb-aac6b4d8a3bc"
|
|
|
|
codex_path = tmp_path / "codex"
|
|
codex_path.write_text(
|
|
"#!/usr/bin/env python3\n"
|
|
"import json\n"
|
|
"import os\n"
|
|
"import sys\n"
|
|
"import time\n"
|
|
"\n"
|
|
"gate = os.environ['CODEX_TEST_GATE']\n"
|
|
"thread_id = os.environ['CODEX_TEST_THREAD_ID']\n"
|
|
"\n"
|
|
"print(json.dumps({'type': 'thread.started', 'thread_id': thread_id}), flush=True)\n"
|
|
"while not os.path.exists(gate):\n"
|
|
" time.sleep(0.001)\n"
|
|
"sys.exit(0)\n",
|
|
encoding="utf-8",
|
|
)
|
|
codex_path.chmod(0o755)
|
|
|
|
monkeypatch.setenv("CODEX_TEST_GATE", str(gate_path))
|
|
monkeypatch.setenv("CODEX_TEST_THREAD_ID", thread_id)
|
|
|
|
runner = CodexRunner(codex_cmd=str(codex_path), extra_args=[])
|
|
|
|
started_first = anyio.Event()
|
|
started_second = anyio.Event()
|
|
|
|
async def run_first() -> None:
|
|
async for event in runner.run("one", None):
|
|
if isinstance(event, StartedEvent):
|
|
started_first.set()
|
|
|
|
async def run_second() -> None:
|
|
async for event in runner.run("two", None):
|
|
if isinstance(event, StartedEvent):
|
|
started_second.set()
|
|
|
|
async with anyio.create_task_group() as tg:
|
|
tg.start_soon(run_first)
|
|
tg.start_soon(run_second)
|
|
|
|
with anyio.fail_after(2):
|
|
while not (started_first.is_set() or started_second.is_set()):
|
|
await anyio.sleep(0.001)
|
|
|
|
assert not (started_first.is_set() and started_second.is_set())
|
|
|
|
gate_path.write_text("go", encoding="utf-8")
|
|
|
|
with anyio.fail_after(2):
|
|
await started_first.wait()
|
|
await started_second.wait()
|