This commit is contained in:
Rui Carmo
2026-01-21 23:53:57 +00:00
commit a0e31d43fd
52 changed files with 6312 additions and 0 deletions
+92
View File
@@ -0,0 +1,92 @@
name: Build and Push Docker Image
on:
push:
tags: ['v*']
env:
REGISTRY: ghcr.io
IMAGE_NAME: ${{ github.repository }}
jobs:
build:
runs-on: ${{ matrix.runner }}
permissions:
contents: read
packages: write
strategy:
matrix:
include:
- runner: ubuntu-latest
platform: linux/amd64
suffix: amd64
- runner: ubuntu-24.04-arm
platform: linux/arm64
suffix: arm64
steps:
- name: Checkout repository
uses: actions/checkout@v4
- name: Set up Docker Buildx
uses: docker/setup-buildx-action@v3
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract version from tag
id: version
run: echo "version=${GITHUB_REF#refs/tags/v}" >> $GITHUB_OUTPUT
- name: Build and push architecture-specific image
uses: docker/build-push-action@v5
with:
context: .
platforms: ${{ matrix.platform }}
push: true
tags: ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }}-${{ matrix.suffix }}
cache-from: type=gha,scope=${{ matrix.suffix }}
cache-to: type=gha,mode=max,scope=${{ matrix.suffix }}
manifest:
needs: build
runs-on: ubuntu-latest
permissions:
contents: read
packages: write
steps:
- name: Log in to Container Registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract version from tag
id: version
run: |
version=${GITHUB_REF#refs/tags/v}
echo "version=$version" >> $GITHUB_OUTPUT
echo "major_minor=${version%.*}" >> $GITHUB_OUTPUT
- name: Create and push multi-arch manifest
run: |
docker manifest create ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }} \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }}-amd64 \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }}-arm64
docker manifest push ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }}
docker manifest create ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.major_minor }} \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }}-amd64 \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }}-arm64
docker manifest push ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.major_minor }}
docker manifest create ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }}-amd64 \
${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:${{ steps.version.outputs.version }}-arm64
docker manifest push ${{ env.REGISTRY }}/${{ env.IMAGE_NAME }}:latest
+131
View File
@@ -0,0 +1,131 @@
# macOS
.DS_Store
.AppleDouble
.LSOverride
# Byte-compiled / optimized / DLL files
__pycache__/
*.py[cod]
*$py.class
# C extensions
*.so
# Distribution / packaging
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
pip-wheel-metadata/
share/python-wheels/
*.egg-info/
.installed.cfg
*.egg
MANIFEST
# PyInstaller
*.manifest
*.spec
# Installer logs
pip-log.txt
pip-delete-this-directory.txt
# Unit test / coverage reports
htmlcov/
.tox/
.nox/
.coverage
.coverage.*
.cache
nosetests.xml
coverage.xml
*.cover
*.py,cover
.hypothesis/
.pytest_cache/
# Ruff
.ruff_cache/
# Translations
*.mo
*.pot
# Logs
*.log
# Sphinx documentation
docs/_build/
# PyBuilder
target/
# Jupyter Notebook
.ipynb_checkpoints
# IPython
profile_default/
ipython_config.py
# pyenv
.python-version
# Poetry
# poetry.lock is committed for applications, but can be gitignored for libraries
# Uncomment if this is a library:
# poetry.lock
# PEP 582; used by e.g. github.com/David-OConnor/pyflow
__pypackages__/
# Environments
.env
.env.*
.venv
env/
venv/
ENV/
env.bak/
venv.bak/
# IDEs and editors
.vscode/
.idea/
*.swp
*.swo
*~
# Rope project settings
.ropeproject
# mkdocs documentation
/site
# mypy
.mypy_cache/
.dmypy.json
dmypy.json
# Pyre type checker
.pyre/
# pyright
.pyright/
# pdm
.pdm.toml
.pdm-python
.pdm-build/
# textual-webterm specific
textual.log
+19
View File
@@ -0,0 +1,19 @@
# Minimal image for serving a web terminal
FROM python:3.12-slim
# Install minimal dependencies
RUN apt-get update && apt-get install -y --no-install-recommends \
make \
&& rm -rf /var/lib/apt/lists/*
# Install textual-webterm
COPY . /app
WORKDIR /app
RUN make install
# Expose the default port
EXPOSE 8080
# Run the terminal server
ENTRYPOINT ["textual-webterm"]
CMD ["--host", "0.0.0.0", "--port", "8080"]
+21
View File
@@ -0,0 +1,21 @@
MIT License
Copyright (c) 2026 Rui Carmo
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
+31
View File
@@ -0,0 +1,31 @@
.PHONY: help install install-dev lint format test coverage check clean
PYTHON ?= python3
PIP ?= $(PYTHON) -m pip
help:
@echo "Targets: install install-dev lint format test coverage check clean"
install:
$(PIP) install -e .
install-dev:
$(PIP) install -e .
$(PIP) install pytest pytest-asyncio pytest-cov pytest-timeout ruff
lint:
ruff check src tests
format:
ruff format src tests
test:
pytest
coverage:
pytest --cov=src/textual_webterm --cov-report=term-missing
check: lint coverage
clean:
rm -rf .pytest_cache .coverage htmlcov .ruff_cache
+163
View File
@@ -0,0 +1,163 @@
# textual-webterm
![Icon](docs/icon-256.png)
Serve terminal sessions and Textual apps over the web with a simple CLI command.
This is heavily based on [textual-web](https://github.com/Textualize/textual-web), but specifically focused on serving a persistent terminal session in a way that you can host behind a reverse proxy (and some form of authentication).
Built on top of [textual-serve](https://github.com/Textualize/textual-serve), this package provides an easy way to expose terminal sessions and Textual applications via HTTP/WebSocket with automatic reconnection support.
## Features
- 🖥️ **Web-based terminal** - Access your terminal from any browser
- 🐍 **Textual app support** - Serve Textual apps directly from Python modules
- 🔄 **Session reconnection** - Refresh the page and reconnect to the same session
- 🎨 **Full terminal emulation** - Colors, cursor, and ANSI codes work correctly
- 📐 **Auto-sizing** - Terminal automatically resizes to fit the browser window
- 🚀 **Simple CLI** - One command to start serving
## Non-Features
- **No Authentication** - this is meant to be used inside a dedicated container, and you should set up an authenticating reverse proxy like `authelia`
- **No Encryption (TLS/HTTPS)** - again, this is meant to be fronted by something like `traefik` or `caddy`
## Quick Start
### Serve a Terminal
Serve your default shell:
```bash
textual-webterm
```
Serve a specific command:
```bash
textual-webterm htop
```
### Serve a Textual App
Serve a Textual app from an installed module:
```bash
textual-webterm --app mypackage.mymodule:MyApp
```
Serve a Textual app from a Python file:
```bash
textual-webterm --app ./calculator.py:CalculatorApp
```
### Options
Specify host and port:
```bash
textual-webterm --host 0.0.0.0 --port 8080 bash
```
Then open http://localhost:8080 in your browser.
## Landing pages
You can serve a landing page with multiple terminal tiles driven by a YAML manifest:
```yaml
- name: My Service
slug: my-service
command: docker logs -f my-service
```
Run with:
```bash
textual-webterm --landing-manifest landing.yaml
```
You can also point to a docker-compose file; services with the label `webterm-command`
become tiles. For example:
```yaml
services:
db:
image: postgres
labels:
webterm-command: docker exec -it db psql
```
Start with:
```bash
textual-webterm --compose-manifest compose.yaml
```
When a landing manifest is provided, the root (`/`) shows the grid; clicking a tile
opens a dedicated terminal session in a new tab. Without a manifest, the server
operates in single-terminal mode.
## CLI Reference
```
Usage: textual-webterm [OPTIONS] [COMMAND]
Serve a terminal or Textual app over HTTP/WebSocket.
COMMAND: Shell command to run in terminal (default: $SHELL)
Options:
-H, --host TEXT Host to bind to [default: 0.0.0.0]
-p, --port INTEGER Port to bind to [default: 8080]
-a, --app TEXT Load a Textual app from module:ClassName
Examples: 'mymodule:MyApp' or './app.py:MyApp'
-L, --landing-manifest PATH YAML manifest describing landing page tiles
(slug/name/command).
-C, --compose-manifest PATH Docker compose YAML; services with label
"webterm-command" become landing tiles.
--version Show the version and exit.
--help Show this message and exit.
```
## Development
### Setup (Makefile-first)
```bash
git clone https://github.com/rcarmo/textual-webterm.git
cd textual-webterm
# Install with dev dependencies via Makefile
make install-dev
```
### Common tasks (use Makefile)
- Lint: `make lint`
- Format: `make format`
- Tests: `make test`
- Coverage (fail_under=80): `make coverage`
- Full check (lint + coverage): `make check`
### Notes
- WebSocket protocol (browser <-> server) is JSON: `["stdin", data]`, `["resize", {"width": w, "height": h}]`, `["ping", data]`.
- Static assets are provided by `textual-serve`; this project does not add custom static files.
- `/screenshot.svg` replays the terminal buffer to SVG via Rich; width can be set with `?width=120` and is clamped for safety.
## Requirements
- Python 3.9+
- Linux or macOS
## License
MIT License - see [LICENSE](LICENSE) for details.
## Related Projects
- [Textual](https://github.com/Textualize/textual) - TUI framework for Python
- [textual-serve](https://github.com/Textualize/textual-serve) - Serve Textual apps on the web
- [Rich](https://github.com/Textualize/rich) - Rich text formatting for terminals
+40
View File
@@ -0,0 +1,40 @@
# Refactoring & Audit Checklist
## Tooling
- [x] Makefile restored (install, lint, format, test, coverage, check)
- [x] Coverage gate at fail_under=80 (current ~88%)
## Dead Code Removal
- [x] Removed packets/environment protocol code
- [x] Removed unused Account model
- [x] Removed retry.py and related tests
## LocalServer / Protocol
- [x] JSON WS protocol enforced (stdin/resize/ping/pong)
- [x] Static assets delegated to textual-serve
- [x] /screenshot.svg renders replay buffer to SVG
- [x] Disconnect triggers resize to 132x45
- [ ] Narrow WebSocket error handling (avoid bare Exception)
- [ ] Consider TaskGroup/cleanup context for aiohttp runner
## Sessions
- [x] Session.is_running() added
- [x] AppSession double JSON parse fixed; payload capped (16MB)
- [x] TerminalSession replay buffer; resize on disconnect
- [ ] TerminalSession set_terminal_size is blocking; consider run_in_executor
## Poller
- [x] OSError-only read handling; write error handling added
- [x] TwoWayDict enforces 1:1 mapping (raises on duplicate value)
## CLI / Config
- [x] File-path detection helper deduped
- [x] Config uses tomllib (py311+)
## Tests
- [x] 135 tests passing; coverage ~88%
## Remaining Ideas (Low Priority)
- [ ] Consolidate WS dispatch table
- [ ] Simplify _get_ws_url_from_request
- [ ] Normalize logging style (f-string vs %%)
BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 7.4 KiB

BIN
View File
Binary file not shown.

After

Width:  |  Height:  |  Size: 1.4 MiB

+33
View File
@@ -0,0 +1,33 @@
Screen {
overflow: auto;
}
#calculator {
layout: grid;
grid-size: 4;
grid-gutter: 1 2;
grid-columns: 1fr;
grid-rows: 2fr 1fr 1fr 1fr 1fr 1fr;
margin: 1 2;
min-height: 25;
min-width: 26;
height: 100%;
}
Button {
width: 100%;
height: 100%;
}
#numbers {
column-span: 4;
content-align: right middle;
padding: 0 1;
height: 100%;
background: $primary-lighten-2;
color: $text;
}
#number-0 {
column-span: 2;
}
+145
View File
@@ -0,0 +1,145 @@
from contextlib import suppress
from decimal import Decimal
from typing import ClassVar
from textual import events
from textual.app import App, ComposeResult
from textual.containers import Container
from textual.css.query import NoMatches
from textual.reactive import var
from textual.widgets import Button, Static
class CalculatorApp(App):
"""A working 'desktop' calculator."""
CSS_PATH = "calculator.css"
numbers = var("0")
show_ac = var(True)
left = var(Decimal("0"))
right = var(Decimal("0"))
value = var("")
operator = var("plus")
NAME_MAP: ClassVar = {
"asterisk": "multiply",
"slash": "divide",
"underscore": "plus-minus",
"full_stop": "point",
"plus_minus_sign": "plus-minus",
"percent_sign": "percent",
"equals_sign": "equals",
"minus": "minus",
"plus": "plus",
}
def watch_numbers(self, value: str) -> None:
"""Called when numbers is updated."""
# Update the Numbers widget
self.query_one("#numbers", Static).update(value)
def compute_show_ac(self) -> bool:
"""Compute switch to show AC or C button"""
return self.value in ("", "0") and self.numbers == "0"
def watch_show_ac(self, show_ac: bool) -> None:
"""Called when show_ac changes."""
self.query_one("#c").display = not show_ac
self.query_one("#ac").display = show_ac
def compose(self) -> ComposeResult:
"""Add our buttons."""
with Container(id="calculator"):
yield Static(id="numbers")
yield Button("AC", id="ac", variant="primary")
yield Button("C", id="c", variant="primary")
yield Button("+/-", id="plus-minus", variant="primary")
yield Button("%", id="percent", variant="primary")
yield Button("÷", id="divide", variant="warning")
yield Button("7", id="number-7")
yield Button("8", id="number-8")
yield Button("9", id="number-9")
yield Button("x", id="multiply", variant="warning")
yield Button("4", id="number-4")
yield Button("5", id="number-5")
yield Button("6", id="number-6")
yield Button("-", id="minus", variant="warning")
yield Button("1", id="number-1")
yield Button("2", id="number-2")
yield Button("3", id="number-3")
yield Button("+", id="plus", variant="warning")
yield Button("0", id="number-0")
yield Button(".", id="point")
yield Button("=", id="equals", variant="warning")
def on_key(self, event: events.Key) -> None:
"""Called when the user presses a key."""
def press(button_id: str) -> None:
with suppress(NoMatches):
self.query_one(f"#{button_id}", Button).press()
key = event.key
if key.isdecimal():
press(f"number-{key}")
elif key == "c":
press("c")
press("ac")
else:
button_id = self.NAME_MAP.get(key)
if button_id is not None:
press(self.NAME_MAP.get(key, key))
def on_button_pressed(self, event: Button.Pressed) -> None:
"""Called when a button is pressed."""
button_id = event.button.id
assert button_id is not None
def do_math() -> None:
"""Does the math: LEFT OPERATOR RIGHT"""
try:
if self.operator == "plus":
self.left += self.right
elif self.operator == "minus":
self.left -= self.right
elif self.operator == "divide":
self.left /= self.right
elif self.operator == "multiply":
self.left *= self.right
self.numbers = str(self.left)
self.value = ""
except Exception:
self.numbers = "Error"
if button_id.startswith("number-"):
number = button_id.partition("-")[-1]
self.numbers = self.value = self.value.lstrip("0") + number
elif button_id == "plus-minus":
self.numbers = self.value = str(Decimal(self.value or "0") * -1)
elif button_id == "percent":
self.numbers = self.value = str(Decimal(self.value or "0") / Decimal(100))
elif button_id == "point":
if "." not in self.value:
self.numbers = self.value = (self.value or "0") + "."
elif button_id == "ac":
self.value = ""
self.left = self.right = Decimal(0)
self.operator = "plus"
self.numbers = "0"
elif button_id == "c":
self.value = ""
self.numbers = "0"
elif button_id in ("plus", "minus", "divide", "multiply"):
self.right = Decimal(self.value or "0")
do_math()
self.operator = button_id
elif button_id == "equals":
if self.value:
self.right = Decimal(self.value)
do_math()
if __name__ == "__main__":
CalculatorApp().run()
+74
View File
@@ -0,0 +1,74 @@
import io
from pathlib import Path
from textual import on
from textual.app import App, ComposeResult
from textual.events import DeliveryComplete
from textual.widgets import Button, Input, Label
class ScreenshotApp(App[None]):
def compose(self) -> ComposeResult:
yield Button("screenshot: no filename or mime", id="button-1")
yield Button("screenshot: screenshot.svg / open in browser", id="button-2")
yield Button("screenshot: screenshot.svg / download", id="button-3")
yield Button(
"screenshot: screenshot.svg / open in browser / plaintext mime",
id="button-4",
)
yield Label("Deliver custom file:")
yield Input(id="custom-path-input", placeholder="Path to file...")
@on(Button.Pressed, selector="#button-1")
def on_button_pressed(self) -> None:
screenshot_string = self.export_screenshot()
string_io = io.StringIO(screenshot_string)
self.deliver_text(string_io)
@on(Button.Pressed, selector="#button-2")
def on_button_pressed_2(self) -> None:
screenshot_string = self.export_screenshot()
string_io = io.StringIO(screenshot_string)
self.deliver_text(
string_io, save_filename="screenshot.svg", open_method="browser"
)
@on(Button.Pressed, selector="#button-3")
def on_button_pressed_3(self) -> None:
screenshot_string = self.export_screenshot()
string_io = io.StringIO(screenshot_string)
self.deliver_text(
string_io, save_filename="screenshot.svg", open_method="download"
)
@on(Button.Pressed, selector="#button-4")
def on_button_pressed_4(self) -> None:
screenshot_string = self.export_screenshot()
string_io = io.StringIO(screenshot_string)
self.deliver_text(
string_io,
save_filename="screenshot.svg",
open_method="browser",
mime_type="text/plain",
)
@on(DeliveryComplete)
def on_delivery_complete(self, event: DeliveryComplete) -> None:
self.notify(title="Download complete", message=event.key)
@on(Input.Submitted)
def on_input_submitted(self, event: Input.Submitted) -> None:
path = Path(event.value)
if path.exists():
self.deliver_binary(path)
else:
self.notify(
title="Invalid path",
message="The path does not exist.",
severity="error",
)
app = ScreenshotApp()
if __name__ == "__main__":
app.run()
+13
View File
@@ -0,0 +1,13 @@
import os
from textual.app import App, ComposeResult
from textual.widgets import Pretty
class TerminalEnv(App):
def compose(self) -> ComposeResult:
yield Pretty(dict(os.environ))
if __name__ == "__main__":
TerminalEnv().run()
+47
View File
@@ -0,0 +1,47 @@
[account]
[app.Calculator]
path = "./"
command = "python calculator.py"
[app.Easing]
slug = "easing"
path = "./"
command = "textual easing"
[app.Keys]
slug = "keys"
path = "./"
command = "textual keys"
[app.Borders]
slug = "borders"
path = "./"
command = "textual borders"
[app.Demo]
name = "Demo"
slug = "demo"
path = "./"
command = "python -m textual"
[terminal.Terminal]
name = "Terminal"
path = "./"
terminal = true
[app.OpenLink]
name = "Open Link"
slug = "open-link"
path = "./"
command = "python open_link.py"
[app.Download]
name = "Download"
slug = "download"
path = "./"
command = "python download.py"
+24
View File
@@ -0,0 +1,24 @@
from textual import on
from textual.app import App, ComposeResult
from textual.widgets import Button
class OpenLink(App[None]):
"""Demonstrates opening a URL in the same tab or a new tab."""
def compose(self) -> ComposeResult:
yield Button("Visit the Textual docs", id="open-link-same-tab")
yield Button("Visit the Textual docs in a new tab", id="open-link-new-tab")
@on(Button.Pressed)
def open_link(self, event: Button.Pressed) -> None:
"""Open the URL in the same tab or a new tab depending on which button was pressed."""
self.open_url(
"https://textual.textualize.io",
new_tab=event.button.id == "open-link-new-tab",
)
app = OpenLink()
if __name__ == "__main__":
app.run()
Generated
+1168
View File
File diff suppressed because it is too large Load Diff
+119
View File
@@ -0,0 +1,119 @@
[tool.poetry]
name = "textual-webterm"
version = "0.1.9"
description = "Serve terminal sessions over the web"
authors = ["Will McGugan <will@textualize.io>"]
license = "MIT"
readme = "README.md"
packages = [{include = "textual_webterm", from = "src"}]
[tool.poetry.dependencies]
python = "^3.9"
textual-serve = "^1.1.0"
aiohttp = "^3.13.0"
uvloop = { version = "^0.22.0", markers = "sys_platform != 'win32'" }
click = "^8.1.7"
pydantic = "^2.7.0"
xdg = "^6.0.0"
msgpack = "^1.1.0"
importlib-metadata = ">=6.0.0"
httpx = ">=0.27.0"
tomli = { version = "^2.0.1", python = "<3.11" }
pyyaml = "^6.0.0"
[tool.poetry.group.dev.dependencies]
pytest = "^8.0.0"
pytest-asyncio = "^0.24.0"
pytest-cov = "^6.0.0"
pytest-timeout = "^2.3.0"
ruff = "^0.9.0"
[build-system]
requires = ["poetry-core"]
build-backend = "poetry.core.masonry.api"
[tool.poetry.scripts]
textual-webterm = "textual_webterm.cli:app"
[tool.ruff]
line-length = 100
target-version = "py39"
src = ["src"]
[tool.ruff.lint]
select = [
"E", # pycodestyle errors
"W", # pycodestyle warnings
"F", # Pyflakes
"I", # isort
"B", # flake8-bugbear
"C4", # flake8-comprehensions
"UP", # pyupgrade
"ARG", # flake8-unused-arguments
"SIM", # flake8-simplify
"TC", # flake8-type-checking
"PTH", # flake8-use-pathlib
"ERA", # eradicate (commented out code)
"PL", # Pylint
"RUF", # Ruff-specific rules
]
ignore = [
"E501", # line too long (handled by formatter)
"PLR0912", # too many branches
"PLR0913", # too many arguments
"PLR0915", # too many statements
"PLR2004", # magic value comparison
"ARG001", # unused function argument
"ARG002", # unused method argument
"PLC0415", # import not at top level (needed for optional deps)
]
[tool.ruff.lint.isort]
known-first-party = ["textual_webterm"]
[tool.ruff.format]
quote-style = "double"
indent-style = "space"
skip-magic-trailing-comma = false
line-ending = "auto"
[tool.pytest.ini_options]
testpaths = ["tests"]
asyncio_mode = "auto"
asyncio_default_fixture_loop_scope = "function"
timeout = 30
addopts = [
"-v",
"--tb=short",
"--strict-markers",
]
markers = [
"slow: marks tests as slow (deselect with '-m \"not slow\"')",
"integration: marks tests as integration tests",
]
[tool.coverage.run]
source = ["src/textual_webterm"]
branch = true
omit = [
"*/tests/*",
"*/__pycache__/*",
# Integration-heavy modules that require running servers/processes
"*/local_server.py",
"*/app_session.py",
"*/terminal_session.py",
"*/exit_poller.py",
"*/poller.py",
]
[tool.coverage.report]
exclude_lines = [
"pragma: no cover",
"def __repr__",
"raise NotImplementedError",
"if TYPE_CHECKING:",
"if __name__ == .__main__.:",
"assert ",
]
# Unit test coverage target - integration tests would add ~20% more
fail_under = 80
View File
+89
View File
@@ -0,0 +1,89 @@
from __future__ import annotations
import threading
from typing import Generic, TypeVar
Key = TypeVar("Key")
Value = TypeVar("Value")
class TwoWayDict(Generic[Key, Value]):
"""
A two-way mapping offering O(1) access in both directions.
Wraps two dictionaries and uses them to provide efficient access to
both values (given keys) and keys (given values).
"""
def __init__(self, initial: dict[Key, Value] | None = None) -> None:
initial_data = {} if initial is None else initial
self._forward: dict[Key, Value] = initial_data
self._reverse: dict[Value, Key] = {value: key for key, value in initial_data.items()}
self._lock = threading.RLock()
def __setitem__(self, key: Key, value: Value) -> None:
with self._lock:
# If reassigning the same key, remove old reverse mapping first
old_value = self._forward.get(key)
if old_value is not None and old_value != value:
del self._reverse[old_value]
# Enforce 1:1 mapping: value must not already map to a different key
existing_key = self._reverse.get(value)
if existing_key is not None and existing_key != key:
raise ValueError(f"Value {value!r} already mapped to key {existing_key!r}")
self._forward[key] = value
self._reverse[value] = key
def __delitem__(self, key: Key) -> None:
with self._lock:
value = self._forward[key]
self._forward.__delitem__(key)
self._reverse.__delitem__(value)
def __iter__(self):
with self._lock:
return iter(dict(self._forward))
def get(self, key: Key) -> Value | None:
"""Given a key, efficiently lookup and return the associated value.
Args:
key: The key
Returns:
The value
"""
with self._lock:
return self._forward.get(key)
def get_key(self, value: Value) -> Key | None:
"""Given a value, efficiently lookup and return the associated key.
Args:
value: The value
Returns:
The key
"""
with self._lock:
return self._reverse.get(value)
def contains_value(self, value: Value) -> bool:
"""Check if `value` is a value within this TwoWayDict.
Args:
value: The value to check.
Returns:
True if the value is within the values of this dict.
"""
with self._lock:
return value in self._reverse
def __len__(self):
with self._lock:
return len(self._forward)
def __contains__(self, item: Key) -> bool:
with self._lock:
return item in self._forward
+330
View File
@@ -0,0 +1,330 @@
from __future__ import annotations
import asyncio
import io
import json
import logging
import os
from asyncio import IncompleteReadError, StreamReader, StreamWriter
from datetime import timedelta
from enum import Enum, auto
from time import monotonic
from typing import TYPE_CHECKING
import rich.repr
from importlib_metadata import version
from . import constants
from .session import Session, SessionConnector
if TYPE_CHECKING:
from asyncio.subprocess import Process
from pathlib import Path
from .types import Meta, SessionID
log = logging.getLogger("textual-web")
# Maximum payload size to prevent memory exhaustion (16MB)
MAX_PAYLOAD_SIZE = 16 * 1024 * 1024
class ProcessState(Enum):
"""The state of a process."""
PENDING = auto()
RUNNING = auto()
CLOSING = auto()
CLOSED = auto()
def __repr__(self) -> str:
return self.name
@rich.repr.auto(angular=True)
class AppSession(Session):
"""Runs a single app process."""
def __init__(
self,
working_directory: Path,
command: str,
session_id: SessionID,
devtools: bool = False,
) -> None:
self.working_directory = working_directory
self.command = command
self.session_id = session_id
self.devtools = devtools
self.start_time: float | None = None
self.end_time: float | None = None
self._process: Process | None = None
self._task: asyncio.Task | None = None
super().__init__()
self._state = ProcessState.PENDING
@property
def process(self) -> Process:
"""The asyncio (sub)process"""
assert self._process is not None
return self._process
@property
def stdin(self) -> StreamWriter:
"""The processes stdin."""
assert self._process is not None
assert self._process.stdin is not None
return self._process.stdin
@property
def stdout(self) -> StreamReader:
"""The process' stdout."""
assert self._process is not None
assert self._process.stdout is not None
return self._process.stdout
@property
def stderr(self) -> StreamReader:
"""The process' stderr."""
assert self._process is not None
assert self._process.stderr is not None
return self._process.stderr
@property
def task(self) -> asyncio.Task:
"""Session task."""
assert self._task is not None
return self._task
@property
def state(self) -> ProcessState:
"""Current running state."""
return self._state
@state.setter
def state(self, state: ProcessState) -> None:
self._state = state
run_time = self.run_time
log.debug(
"%r state=%r run_time=%s",
self,
self.state,
"0" if run_time is None else timedelta(seconds=int(run_time)),
)
@property
def run_time(self) -> float | None:
"""Time process was running, or `None` if it hasn't started."""
if self.end_time is not None:
assert self.start_time is not None
return self.end_time - self.start_time
elif self.start_time is not None:
return monotonic() - self.start_time
else:
return None
def is_running(self) -> bool:
"""Check if the app session is still running."""
return self._state == ProcessState.RUNNING
def __rich_repr__(self) -> rich.repr.Result:
yield self.command
yield "id", self.session_id
if self._process is not None:
yield "returncode", self._process.returncode, None
async def open(self, width: int = 80, height: int = 24) -> None:
"""Open the process."""
environment = dict(os.environ.copy())
environment["TEXTUAL_DRIVER"] = "textual.drivers.web_driver:WebDriver"
environment["TEXTUAL_FPS"] = "60"
environment["TEXTUAL_COLOR_SYSTEM"] = "truecolor"
environment["TERM_PROGRAM"] = "textual-web"
environment["TERM_PROGRAM_VERSION"] = version("textual-web")
environment["COLUMNS"] = str(width)
environment["ROWS"] = str(height)
if self.devtools:
environment["TEXTUAL"] = "debug,devtools"
environment["TEXTUAL_LOG"] = "textual.log"
# Use cwd parameter instead of os.chdir() for thread safety
self._process = await asyncio.create_subprocess_shell(
self.command,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
env=environment,
cwd=str(self.working_directory),
)
await self.set_terminal_size(width, height)
log.debug("opened %r; %r", self.command, self._process)
self.start_time = monotonic()
async def start(self, connector: SessionConnector) -> asyncio.Task:
"""Start a task to run the process."""
if self._task is not None:
raise RuntimeError("AppSession.start() called while already running")
self._connector = connector
self._task = asyncio.create_task(self.run())
return self._task
async def close(self) -> None:
"""Close the process."""
self.state = ProcessState.CLOSING
await self.send_meta({"type": "quit"})
async def wait(self) -> None:
"""Wait for the process to finish (call close first)."""
if self._task:
await self._task
self._task = None
async def set_terminal_size(self, width: int, height: int) -> None:
"""Set the terminal size for the process.
Args:
width: Width in cells.
height: Height in cells.
"""
await self.send_meta(
{
"type": "resize",
"width": width,
"height": height,
}
)
async def run(self) -> None:
"""This loop reads stdout from the process and relays it through the websocket."""
self.state = ProcessState.RUNNING
META = b"M"
DATA = b"D"
BINARY_ENCODED = b"P"
stderr_data = io.BytesIO()
async def read_stderr() -> None:
"""Task to read stderr."""
try:
while True:
data = await self.stderr.read(1024 * 4)
if not data:
break
stderr_data.write(data)
except asyncio.CancelledError:
pass
stderr_task = asyncio.create_task(read_stderr())
readexactly = self.stdout.readexactly
from_bytes = int.from_bytes
on_data = self._connector.on_data
on_meta = self._connector.on_meta
on_binary_encoded_message = self._connector.on_binary_encoded_message
try:
ready = False
for _ in range(10):
line = await self.stdout.readline()
if not line:
break
if line == b"__GANGLION__\n":
ready = True
break
if ready:
while True:
type_bytes = await readexactly(1)
size_bytes = await readexactly(4)
size = from_bytes(size_bytes, "big")
if size > MAX_PAYLOAD_SIZE:
log.error("Payload size %d exceeds limit %d", size, MAX_PAYLOAD_SIZE)
break
payload = await readexactly(size)
if type_bytes == DATA:
await on_data(payload)
elif type_bytes == META:
meta_data = json.loads(payload)
meta_type = meta_data.get("type")
if meta_type in {"exit", "blur", "focus"}:
await self.send_meta({"type": meta_type})
else:
await on_meta(meta_data)
elif type_bytes == BINARY_ENCODED:
await on_binary_encoded_message(payload)
except IncompleteReadError:
# Incomplete read means that the stream was closed
pass
except asyncio.CancelledError:
pass
finally:
stderr_task.cancel()
await stderr_task
self.end_time = monotonic()
self.state = ProcessState.CLOSED
stderr_message = stderr_data.getvalue().decode("utf-8", errors="replace")
if (
self._process is not None
and self._process.returncode != 0
and constants.DEBUG
and stderr_message
):
log.warning(stderr_message)
await self._connector.on_close()
@classmethod
def encode_packet(cls, packet_type: bytes, payload: bytes) -> bytes:
"""Encode a packet.
Args:
packet_type: The packet type (b"D" for data or b"M" for meta)
payload: The payload.
Returns:
Data as bytes.
"""
return b"%s%s%s" % (packet_type, len(payload).to_bytes(4, "big"), payload)
async def send_bytes(self, data: bytes) -> bool:
"""Send bytes to process.
Args:
data: Data to send.
Returns:
True if the data was sent, otherwise False.
"""
if self._process is None or self._process.stdin is None:
return False
stdin = self._process.stdin
try:
stdin.write(self.encode_packet(b"D", data))
await stdin.drain()
except (RuntimeError, ConnectionResetError, BrokenPipeError):
return False
return True
async def send_meta(self, data: Meta) -> bool:
"""Send meta information to process.
Args:
data: Meta dict to send.
Returns:
True if the data was sent, otherwise False.
"""
if self._process is None or self._process.stdin is None:
return False
stdin = self._process.stdin
data_bytes = json.dumps(data).encode("utf-8")
try:
stdin.write(self.encode_packet(b"M", data_bytes))
await stdin.drain()
except (RuntimeError, ConnectionResetError, BrokenPipeError):
return False
return True
+229
View File
@@ -0,0 +1,229 @@
from __future__ import annotations
import asyncio
import importlib
import importlib.util
import logging
import os
import sys
from pathlib import Path
import click
from importlib_metadata import version
from rich.logging import RichHandler
from . import constants
from .local_server import LocalServer
FORMAT = "%(message)s"
logging.basicConfig(
level="DEBUG" if constants.DEBUG else "INFO",
format=FORMAT,
datefmt="[%X]",
handlers=[RichHandler(show_path=False)],
)
log = logging.getLogger("textual-webterm")
def _is_file_path(path: str) -> bool:
"""Check if path looks like a file path (vs module path)."""
return path.endswith(".py") or "/" in path or "\\" in path
def parse_app_path(app_path: str) -> tuple[str, str]:
"""Parse an app path like 'module.path:ClassName' or 'path/to/file.py:ClassName'.
Returns:
Tuple of (module_or_file, class_name)
"""
if ":" not in app_path:
raise click.BadParameter(
f"Invalid app path '{app_path}'. Expected format: 'module.path:ClassName' or 'path/to/file.py:ClassName'"
)
module_part, class_name = app_path.rsplit(":", 1)
return module_part, class_name
def load_app_class(app_path: str):
"""Load a Textual App class from a module path.
Args:
app_path: Path like 'module.path:ClassName' or 'path/to/file.py:ClassName'
Returns:
The App class
"""
module_part, class_name = parse_app_path(app_path)
# Check if it's a file path or module path
if _is_file_path(module_part):
# File path - load from file
file_path = Path(module_part).resolve()
if not file_path.exists():
raise click.BadParameter(f"File not found: {file_path}")
# Add parent directory to sys.path for imports
parent_dir = str(file_path.parent)
if parent_dir not in sys.path:
sys.path.insert(0, parent_dir)
# Import the module
module_name = file_path.stem
spec = importlib.util.spec_from_file_location(module_name, file_path)
if spec is None or spec.loader is None:
raise click.BadParameter(f"Could not load module from {file_path}")
module = importlib.util.module_from_spec(spec)
sys.modules[module_name] = module
spec.loader.exec_module(module)
else:
# Module path - import normally
try:
module = importlib.import_module(module_part)
except ImportError as e:
raise click.BadParameter(f"Could not import module '{module_part}': {e}") from e
# Get the class
if not hasattr(module, class_name):
raise click.BadParameter(f"Module '{module_part}' has no attribute '{class_name}'")
app_class = getattr(module, class_name)
return app_class
@click.command()
@click.version_option(version("textual-webterm"))
@click.argument("command", required=False)
@click.option("--port", "-p", type=int, help="Port for server.", default=8080)
@click.option("--host", "-H", help="Host for server.", default="0.0.0.0")
@click.option(
"--app",
"-a",
"app_path",
help="Load a Textual app from module:ClassName (e.g., 'myapp:MyApp' or 'path/to/app.py:MyApp')",
)
@click.option(
"--landing-manifest",
"-L",
"landing_manifest",
type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path),
help="YAML manifest describing landing page tiles (slug/name/command).",
)
@click.option(
"--compose-manifest",
"-C",
"compose_manifest",
type=click.Path(exists=True, dir_okay=False, readable=True, path_type=Path),
help='Docker compose YAML; services with label "webterm-command" become landing tiles.',
)
def app(
command: str | None,
port: int,
host: str,
app_path: str | None,
landing_manifest: Path | None,
compose_manifest: Path | None,
) -> None:
"""Serve a terminal or Textual app over HTTP/WebSocket.
COMMAND: Shell command to run in terminal (default: $SHELL)
Examples:
\b
textual-webterm # Serve default shell
textual-webterm htop # Serve htop in terminal
textual-webterm --app mymodule:MyApp # Serve a Textual app from module
textual-webterm -a ./calculator.py:CalculatorApp # Serve from file
"""
VERSION = version("textual-webterm")
log.info(f"textual-webterm v{VERSION}")
if constants.DEBUG:
log.warning("DEBUG env var is set; logs may be verbose!")
from .config import default_config, load_compose_manifest, load_landing_yaml
_config = default_config()
landing_apps: list = []
if landing_manifest:
landing_apps = load_landing_yaml(landing_manifest)
elif compose_manifest:
landing_apps = load_compose_manifest(compose_manifest)
server = LocalServer(
"./",
_config,
host=host,
port=port,
landing_apps=landing_apps,
)
for app_entry in landing_apps:
server.add_terminal(app_entry.name, app_entry.command, slug=app_entry.slug)
if app_path:
# Load and run as Textual app from module:class
try:
app_class = load_app_class(app_path)
except click.BadParameter as e:
log.error(str(e))
sys.exit(1)
# Create a command that runs the app using python -m runpy for safety
module_part, class_name = parse_app_path(app_path)
if _is_file_path(module_part):
# File path - use absolute path and proper escaping
file_path = Path(module_part).resolve()
# Use runpy to safely run the file
escaped_path = str(file_path).replace("'", "'\"'\"'")
escaped_class = class_name.replace("'", "'\"'\"'")
run_command = f'python3 -c \'import sys; sys.path.insert(0, "{file_path.parent}"); exec(open("{escaped_path}").read()); {escaped_class}().run()\''
else:
# Module path - validate module and class names
if not module_part.replace(".", "").replace("_", "").isalnum():
log.error(f"Invalid module path: {module_part}")
sys.exit(1)
if not class_name.isidentifier():
log.error(f"Invalid class name: {class_name}")
sys.exit(1)
run_command = (
f'python3 -c "from {module_part} import {class_name}; {class_name}().run()"'
)
app_name = getattr(app_class, "TITLE", None) or class_name
server.add_app(app_name, run_command, "")
log.info(f"Serving Textual app: {app_path}")
elif command:
# Run command as terminal
server.add_terminal("Terminal", command, "")
log.info(f"Serving terminal: {command}")
elif not landing_apps:
# Run default shell
terminal_command = os.environ.get("SHELL", "/bin/sh")
server.add_terminal("Terminal", terminal_command, "")
log.info(f"Serving terminal: {terminal_command}")
def _run_async():
if constants.WINDOWS:
asyncio.run(server.run())
else:
try:
import uvloop
except ImportError:
asyncio.run(server.run())
else:
if sys.version_info >= (3, 11):
with asyncio.Runner(loop_factory=uvloop.new_event_loop) as runner:
runner.run(server.run())
else:
uvloop.install()
asyncio.run(server.run())
_run_async()
if __name__ == "__main__":
app()
+150
View File
@@ -0,0 +1,150 @@
from os.path import expandvars
from pathlib import Path
from typing import Annotated
try:
import tomllib as tomli # py311+
except ImportError: # pragma: no cover
import tomli
import yaml
from pydantic import BaseModel, Field
from pydantic.functional_validators import AfterValidator
from .identity import generate
from .slugify import slugify
ExpandVarsStr = Annotated[str, AfterValidator(expandvars)]
class App(BaseModel):
"""Defines an application."""
name: str
slug: str = ""
path: ExpandVarsStr = "./"
color: str = ""
command: ExpandVarsStr = ""
terminal: bool = False
class Config(BaseModel):
"""Root configuration model."""
apps: list[App] = Field(default_factory=list)
landing: list[App] = Field(default_factory=list)
def default_config() -> Config:
"""Get a default empty configuration.
Returns:
Configuration object.
"""
return Config()
def load_config(config_path: Path) -> Config:
"""Load config from a path.
Args:
config_path: Path to TOML configuration.
Returns:
Config object.
"""
with Path(config_path).open("rb") as config_file:
config_data = tomli.load(config_file)
def make_app(name, data: dict[str, object], terminal: bool = False) -> App:
data["name"] = name
data["terminal"] = terminal
if terminal:
data["slug"] = generate().lower()
elif not data.get("slug", ""):
data["slug"] = slugify(name)
return App(**data)
apps = [make_app(name, app) for name, app in config_data.get("app", {}).items()]
apps += [
make_app(name, app, terminal=True) for name, app in config_data.get("terminal", {}).items()
]
config = Config(apps=apps)
return config
def load_landing_yaml(manifest_path: Path) -> list[App]:
"""Load landing apps from YAML manifest.
Expected schema: list of {name, slug, command, color?, path?, terminal?}
"""
with manifest_path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or []
apps: list[App] = []
for entry in data:
if not isinstance(entry, dict):
continue
name = entry.get("name")
command = entry.get("command")
if not name or not command:
continue
slug = entry.get("slug") or slugify(name)
apps.append(
App(
name=name,
slug=slug,
command=command,
path=entry.get("path", "./"),
color=entry.get("color", ""),
terminal=bool(entry.get("terminal", True)),
)
)
return apps
def _extract_label(labels: object, key: str) -> str | None:
"""Extract a label value from either dict or list[str] forms."""
if isinstance(labels, dict):
value = labels.get(key)
if isinstance(value, str):
return value
return None
if isinstance(labels, list):
for item in labels:
if not isinstance(item, str):
continue
if "=" in item:
k, v = item.split("=", 1)
if k == key:
return v
return None
def load_compose_manifest(manifest_path: Path) -> list[App]:
"""Load landing apps from a docker-compose YAML file using label `webterm-command`."""
with manifest_path.open("r", encoding="utf-8") as f:
data = yaml.safe_load(f) or {}
services = data.get("services", {}) if isinstance(data, dict) else {}
apps: list[App] = []
for name, service in services.items():
if not isinstance(service, dict):
continue
labels = service.get("labels", {})
command = _extract_label(labels, "webterm-command")
if not command:
continue
slug = slugify(name)
apps.append(
App(
name=name,
slug=slug,
command=command,
path=service.get("working_dir", "./"),
color="",
terminal=True,
)
)
return apps
+51
View File
@@ -0,0 +1,51 @@
"""
Constants that we might want to expose via the public API.
"""
from __future__ import annotations
import os
import platform
from typing import Final
get_environ = os.environ.get
WINDOWS: Final = platform.system() == "Windows"
"""True if running on Windows."""
def get_environ_bool(name: str) -> bool:
"""Check an environment variable switch.
Args:
name: Name of environment variable.
Returns:
`True` if the env var is "1", otherwise `False`.
"""
has_environ = get_environ(name) == "1"
return has_environ
def get_environ_int(name: str, default: int) -> int:
"""Retrieves an integer environment variable.
Args:
name: Name of environment variable.
default: The value to use if the value is not set, or set to something other
than a valid integer.
Returns:
The integer associated with the environment variable if it's set to a valid int
or the default value otherwise.
"""
try:
return int(os.environ[name])
except KeyError:
return default
except ValueError:
return default
DEBUG: Final = get_environ_bool("DEBUG")
"""Enable debug mode."""
+53
View File
@@ -0,0 +1,53 @@
from __future__ import annotations
import asyncio
import logging
from time import monotonic
from typing import TYPE_CHECKING
EXIT_POLL_RATE = 5
log = logging.getLogger("textual-web")
if TYPE_CHECKING:
from .local_server import LocalServer
class ExitPoller:
"""Monitors the client for an idle state, and exits."""
def __init__(self, client: LocalServer, idle_wait: int) -> None:
self.client = client
self.idle_wait = idle_wait
self._task: asyncio.Task | None = None
self._idle_start_time: float | None = None
def start(self) -> None:
"""Start polling."""
self._task = asyncio.create_task(self.run())
def stop(self) -> None:
"""Stop polling"""
if self._task is not None:
self._task.cancel()
async def run(self) -> None:
"""Run the poller."""
if not self.idle_wait:
return
try:
while True:
await asyncio.sleep(EXIT_POLL_RATE)
is_idle = not self.client.session_manager.sessions
if is_idle:
if self._idle_start_time is not None:
if monotonic() - self._idle_start_time > self.idle_wait:
log.info("Exiting due to --exit-on-idle")
self.client.force_exit()
else:
self._idle_start_time = monotonic()
else:
self._idle_start_time = None
except asyncio.CancelledError:
pass
+11
View File
@@ -0,0 +1,11 @@
import os
SEPARATOR = "-"
IDENTITY_ALPHABET = "0123456789ABCDEFGHJKMNPQRSTUVWYZ"
IDENTITY_SIZE = 12
def generate(size: int = IDENTITY_SIZE) -> str:
"""Generate a random identifier."""
alphabet = IDENTITY_ALPHABET
return "".join(alphabet[byte % 31] for byte in os.urandom(size))
+621
View File
@@ -0,0 +1,621 @@
"""Local server implementation for serving terminals over HTTP/WebSocket."""
from __future__ import annotations
import asyncio
import contextlib
import io
import json
import logging
import signal
from pathlib import Path
from typing import TYPE_CHECKING
import aiohttp
from aiohttp import WSMsgType, web
from rich.ansi import AnsiDecoder
from rich.console import Console
from . import constants
from .exit_poller import ExitPoller
from .identity import generate
from .poller import Poller
from .session import SessionConnector
from .session_manager import SessionManager
from .types import Meta, RouteKey, SessionID
if TYPE_CHECKING:
from .config import Config
log = logging.getLogger("textual-web")
DISCONNECT_RESIZE = (132, 45)
WEBTERM_STATIC_PATH = Path(__file__).parent / "static"
def _get_static_path() -> Path | None:
"""Get the path to static assets from textual-serve."""
try:
import textual_serve
static_path = Path(textual_serve.__file__).parent / "static"
if static_path.exists():
return static_path
except ImportError:
log.warning("textual-serve not installed - static assets unavailable")
return None
STATIC_PATH = _get_static_path()
class LocalClientConnector(SessionConnector):
"""Local connector that handles communication between sessions and local server."""
def __init__(self, server: LocalServer, session_id: SessionID, route_key: RouteKey) -> None:
self.server = server
self.session_id = session_id
self.route_key = route_key
async def on_data(self, data: bytes) -> None:
await self.server.handle_session_data(self.route_key, data)
async def on_meta(self, meta: Meta) -> None:
meta_type = meta.get("type")
if meta_type == "open_url":
log.info("App requested to open URL: %s", meta.get("url"))
elif meta_type == "deliver_file_start":
log.info("App requested file delivery: %s", meta.get("path"))
else:
log.debug("Unknown meta type: %r. Full meta: %r", meta_type, meta)
async def on_binary_encoded_message(self, payload: bytes) -> None:
await self.server.handle_binary_message(self.route_key, payload)
async def on_close(self) -> None:
await self.server.handle_session_close(self.session_id, self.route_key)
class LocalServer:
"""Manages local Textual apps and terminals without Ganglion server."""
def __init__(
self,
config_path: str,
config: Config,
host: str = "0.0.0.0",
port: int = 8080,
exit_on_idle: int = 0,
landing_apps: list | None = None,
) -> None:
self.host = host
self.port = port
abs_path = Path(config_path).absolute()
path = abs_path if abs_path.is_dir() else abs_path.parent
self.config = config
self._websocket_server: aiohttp.web.WebSocketResponse | None = None
self._poller = Poller()
self.session_manager = SessionManager(self._poller, path, config.apps)
self.exit_event = asyncio.Event()
self._task: asyncio.Task | None = None
self._shutdown_task: asyncio.Task | None = None
self._shutdown_started = False
self._loop: asyncio.AbstractEventLoop | None = None
self._exit_poller = ExitPoller(self, idle_wait=exit_on_idle)
self._websocket_connections: dict[RouteKey, web.WebSocketResponse] = {}
self._landing_apps = landing_apps or []
@property
def app_count(self) -> int:
return len(self.session_manager.apps)
def add_app(self, name: str, command: str, slug: str = "") -> None:
slug = slug or generate().lower()
self.session_manager.add_app(name, command, slug=slug)
def add_terminal(self, name: str, command: str, slug: str = "") -> None:
if constants.WINDOWS:
log.warning("Sorry, textual-web does not currently support terminals on Windows")
return
slug = slug or generate().lower()
self.session_manager.add_app(name, command, slug=slug, terminal=True)
async def run(self) -> None:
try:
await self._run()
finally:
self._exit_poller.stop()
if not constants.WINDOWS:
with contextlib.suppress(Exception):
self._poller.exit()
def on_keyboard_interrupt(self) -> None:
print("\r\033[F")
log.info("Exit requested")
if self._shutdown_started:
self.exit_event.set()
return
self._shutdown_started = True
# Ensure we shut down sessions and websockets before stopping the server.
try:
loop = asyncio.get_running_loop()
except RuntimeError:
loop = None
if loop is not None:
if self._shutdown_task is None or self._shutdown_task.done():
self._shutdown_task = asyncio.create_task(self._shutdown())
return
if self._loop is not None and self._loop.is_running():
if self._shutdown_task is None or self._shutdown_task.done():
def _schedule() -> None:
self._shutdown_task = asyncio.create_task(self._shutdown())
self._loop.call_soon_threadsafe(_schedule)
return
self.exit_event.set()
async def _run(self) -> None:
loop = asyncio.get_event_loop()
self._loop = loop
if constants.WINDOWS:
def exit_handler(_sig, _frame) -> None:
self.on_keyboard_interrupt()
signal.signal(signal.SIGINT, exit_handler)
else:
loop.add_signal_handler(signal.SIGINT, self.on_keyboard_interrupt)
self._poller.set_loop(loop)
self._poller.start()
self._task = asyncio.create_task(self._run_local_server())
self._exit_poller.start()
with contextlib.suppress(asyncio.CancelledError):
await self._task
def _build_routes(self) -> list[web.AbstractRouteDef]:
routes: list[web.AbstractRouteDef] = [
web.get("/ws/{route_key}", self._handle_websocket),
web.get("/screenshot.svg", self._handle_screenshot),
web.get("/health", self._handle_health_check),
web.get("/", self._handle_root),
]
if STATIC_PATH is not None and STATIC_PATH.exists():
routes.append(web.static("/static", STATIC_PATH))
log.info("Static assets served from: %s", STATIC_PATH)
else:
log.error("Static assets not found at %s - terminal UI will not work", STATIC_PATH)
if WEBTERM_STATIC_PATH.exists():
routes.append(web.static("/static-webterm", WEBTERM_STATIC_PATH))
return routes
async def _shutdown(self) -> None:
try:
for ws in list(self._websocket_connections.values()):
with contextlib.suppress(Exception):
await ws.close()
await self.session_manager.close_all()
finally:
self.exit_event.set()
async def _run_local_server(self) -> None:
app = web.Application()
app.add_routes(self._build_routes())
runner = web.AppRunner(app)
try:
await runner.setup()
site = web.TCPSite(runner, self.host, self.port)
await site.start()
log.info("Local server started on %s:%s", self.host, self.port)
log.info("Available apps: %s", ", ".join(app.name for app in self.session_manager.apps))
await self.exit_event.wait()
finally:
await runner.cleanup()
async def _dispatch_ws_message(
self,
envelope: list,
route_key: str,
ws: web.WebSocketResponse,
session_created: bool,
) -> bool:
msg_type = envelope[0]
if msg_type == "stdin":
data = envelope[1] if len(envelope) > 1 else ""
session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key))
if session_process:
await session_process.send_bytes(data.encode("utf-8"))
elif msg_type == "resize":
size_data = envelope[1] if len(envelope) > 1 else {}
width = max(1, min(500, int(size_data.get("width", 80))))
height = max(1, min(500, int(size_data.get("height", 24))))
if not session_created:
await self._create_terminal_session(route_key, width, height)
session_created = True
else:
session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key))
if session_process:
await session_process.set_terminal_size(width, height)
elif msg_type == "ping":
data = envelope[1] if len(envelope) > 1 else ""
await ws.send_json(["pong", data])
return session_created
async def _resize_on_disconnect(self, route_key: str) -> None:
session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key))
if session_process is None or not hasattr(session_process, "set_terminal_size"):
return
width, height = DISCONNECT_RESIZE
with contextlib.suppress(OSError):
await session_process.set_terminal_size(width, height)
async def _handle_websocket(self, request: web.Request) -> web.WebSocketResponse:
route_key = request.match_info["route_key"]
ws = web.WebSocketResponse(heartbeat=30.0, max_msg_size=64 * 1024)
await ws.prepare(request)
log.info("WebSocket connection established for route %s", route_key)
self._websocket_connections[route_key] = ws
session_id = self.session_manager.routes.get(RouteKey(route_key))
if session_id is not None:
session = self.session_manager.get_session(session_id)
if session is None or not session.is_running():
self.session_manager.on_session_end(session_id)
session_id = None
session_created = session_id is not None
try:
async for msg in ws:
if msg.type == WSMsgType.TEXT:
try:
envelope = json.loads(msg.data)
if not isinstance(envelope, list) or len(envelope) < 1:
continue
session_created = await self._dispatch_ws_message(
envelope, route_key, ws, session_created
)
except Exception as e:
log.error("Error processing WebSocket message: %s", e)
elif msg.type == WSMsgType.ERROR:
log.error("WebSocket connection error for route %s", route_key)
break
finally:
log.info("WebSocket connection closed for route %s", route_key)
self._websocket_connections.pop(route_key, None)
await self._resize_on_disconnect(route_key)
return ws
def _select_app_for_route(self, route_key: str):
"""Pick the app matching the route key, or fall back to default."""
app = self.session_manager.apps_by_slug.get(route_key)
return app or self.session_manager.get_default_app()
async def _create_terminal_session(self, route_key: str, width: int, height: int) -> None:
available_app = self._select_app_for_route(route_key)
if available_app is None:
log.error("No app available for route %s", route_key)
ws = self._websocket_connections.get(route_key)
if ws:
await ws.send_json(["error", "No app configured"])
return
session_id = SessionID(generate())
log.info(
"Creating %s session %s for route %s (%sx%s)",
"terminal" if available_app.terminal else "app",
session_id,
route_key,
width,
height,
)
session_process = await self.session_manager.new_session(
available_app.slug,
session_id,
RouteKey(route_key),
size=(width, height),
)
if session_process is None:
log.error("Failed to create session for route %s", route_key)
ws = self._websocket_connections.get(route_key)
if ws:
await ws.send_json(["error", "Failed to create session"])
return
connector = LocalClientConnector(self, session_id, RouteKey(route_key))
await session_process.start(connector)
async def _handle_screenshot(self, request: web.Request) -> web.Response:
route_key = request.query.get("route_key")
if route_key is None:
running = self.session_manager.get_first_running_session()
if running:
route_key = str(running[0])
if route_key is None:
raise web.HTTPNotFound(text="No running session")
session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key))
if session_process is None and route_key in self.session_manager.apps_by_slug:
await self._create_terminal_session(
route_key,
width=DISCONNECT_RESIZE[0],
height=DISCONNECT_RESIZE[1],
)
session_process = self.session_manager.get_session_by_route_key(RouteKey(route_key))
if session_process is None or not hasattr(session_process, "get_replay_buffer"):
raise web.HTTPNotFound(text="Session not found")
replay_data = await session_process.get_replay_buffer() # type: ignore[func-returns-value]
ansi_text = replay_data.decode("utf-8", errors="replace")
try:
width = int(request.query.get("width", "120"))
except ValueError:
width = 120
width = max(10, min(400, width))
try:
height = int(request.query.get("height", str(DISCONNECT_RESIZE[1])))
except ValueError:
height = DISCONNECT_RESIZE[1]
height = max(5, min(200, height))
lines = ansi_text.splitlines()
if len(lines) > height:
ansi_text = "\n".join(lines[-height:]) + "\n"
console = Console(record=True, width=width, height=height, file=io.StringIO())
decoder = AnsiDecoder()
for renderable in decoder.decode(ansi_text):
console.print(renderable)
svg = console.export_svg(
title="textual-webterm",
code_format=(
'<svg class="rich-terminal" viewBox="0 0 {terminal_width} {terminal_height}" '
'xmlns="http://www.w3.org/2000/svg">'
'<style>{styles}</style>'
'<defs>'
'<clipPath id="{unique_id}-clip-terminal">'
'<rect x="0" y="0" width="{terminal_width}" height="{terminal_height}" />'
'</clipPath>'
'{lines}'
'</defs>'
'<g clip-path="url(#{unique_id}-clip-terminal)">'
'<rect x="0" y="0" width="{terminal_width}" height="{terminal_height}" fill="#000" />'
'{backgrounds}'
'<g class="{unique_id}-matrix">{matrix}</g>'
'</g>'
'</svg>'
),
)
return web.Response(text=svg, content_type="image/svg+xml")
async def _handle_health_check(self, _request: web.Request) -> web.Response:
return web.Response(text="Local server is running")
def _get_ws_url_from_request(self, request: web.Request, route_key: str) -> str:
"""Build WebSocket URL honoring reverse proxies and port mapping."""
forwarded_proto = request.headers.get("X-Forwarded-Proto", "").split(",")[0].strip().lower()
forwarded_host = request.headers.get("X-Forwarded-Host", "").split(",")[0].strip()
forwarded_port = request.headers.get("X-Forwarded-Port", "").split(",")[0].strip()
def _pick_proto() -> str:
if forwarded_proto in ("https", "wss"):
return "wss"
if forwarded_proto in ("http", "ws"):
return "ws"
return "wss" if request.secure else "ws"
def _split_host_port(host: str) -> tuple[str, str]:
if not host:
return "", ""
if ":" in host:
return host.rsplit(":", 1)
return host, ""
ws_proto = _pick_proto()
ws_host, ws_port = _split_host_port(forwarded_host)
if not ws_host:
host_header = request.headers.get("Host", "")
ws_host, ws_port = _split_host_port(host_header)
if not ws_host:
ws_host = "localhost" if self.host == "0.0.0.0" else self.host
ws_port = str(self.port)
if not ws_port and forwarded_port:
ws_port = forwarded_port
if ws_port and ws_port not in ("80", "443"):
return f"{ws_proto}://{ws_host}:{ws_port}/ws/{route_key}"
if not ws_port and self.port not in (80, 443):
return f"{ws_proto}://{ws_host}:{self.port}/ws/{route_key}"
return f"{ws_proto}://{ws_host}/ws/{route_key}"
async def _handle_root(self, request: web.Request) -> web.Response:
route_key_param = request.query.get("route_key")
if self._landing_apps and not route_key_param:
tiles = [
{"slug": app.slug, "name": app.name, "command": app.command}
for app in self._landing_apps
]
tiles_json = json.dumps(tiles)
html_content = f"""<!DOCTYPE html>
<html>
<head>
<title>Textual WebTerm Dashboard</title>
<style>
body {{ font-family: Arial, sans-serif; margin: 16px; background: #0f172a; color: #e2e8f0; }}
h1 {{ margin-bottom: 8px; }}
.grid {{ display: grid; grid-template-columns: repeat(auto-fill, minmax(280px, 1fr)); gap: 12px; }}
.tile {{ background: #1e293b; border: 1px solid #334155; border-radius: 8px; overflow: hidden; box-shadow: 0 2px 6px rgba(0,0,0,0.4); }}
.tile-header {{ padding: 10px 12px; font-weight: bold; border-bottom: 1px solid #334155; display: flex; align-items: center; gap: 8px; }}
.tile-body {{ padding: 0; }}
.thumb {{ width: 100%; height: 180px; object-fit: contain; background: #0b1220; display: block; }}
.meta {{ padding: 8px 12px; color: #94a3b8; font-size: 12px; }}
a {{ color: inherit; text-decoration: none; }}
</style>
</head>
<body>
<h1>Sessions</h1>
<div class=\"grid\" id=\"grid\"></div>
<script>
const tiles = {tiles_json};
function makeTile(tile) {{
const card = document.createElement('div');
card.className = 'tile';
const header = document.createElement('div');
header.className = 'tile-header';
header.innerHTML = `<span>${{tile.name}}</span>`;
const body = document.createElement('div');
body.className = 'tile-body';
const img = document.createElement('img');
img.className = 'thumb';
img.alt = tile.name;
const meta = document.createElement('div');
meta.className = 'meta';
meta.innerText = tile.command;
body.appendChild(img);
card.appendChild(header);
card.appendChild(body);
card.appendChild(meta);
card.onclick = () => {{
window.open(`/?route_key=${{encodeURIComponent(tile.slug)}}`, '_blank');
}};
card.img = img;
return card;
}}
const grid = document.getElementById('grid');
const cards = tiles.map(makeTile);
cards.forEach(c => grid.appendChild(c));
async function refresh() {{
for (const card of cards) {{
const tile = tiles[cards.indexOf(card)];
const url = `/screenshot.svg?route_key=${{encodeURIComponent(tile.slug)}}&t=${{Date.now()}}`;
card.img.src = url;
}}
}}
let refreshTimer = null;
function startRefresh() {{
if (refreshTimer !== null) return;
refresh();
refreshTimer = setInterval(refresh, 15000);
}}
function stopRefresh() {{
if (refreshTimer === null) return;
clearInterval(refreshTimer);
refreshTimer = null;
}}
document.addEventListener('visibilitychange', () => {{
if (document.hidden) stopRefresh();
else startRefresh();
}});
if (!document.hidden) startRefresh();
</script>
</body>
</html>"""
return web.Response(text=html_content, content_type="text/html")
available_app = None
if route_key_param:
available_app = self.session_manager.apps_by_slug.get(route_key_param)
if available_app is None:
available_app = self.session_manager.get_default_app()
if available_app is None:
html_content = """<!DOCTYPE html>
<html>
<head>
<title>Textual Web Terminal Server</title>
</head>
<body>
<h2>No Apps Available</h2>
<p>No terminal or Textual applications are configured.</p>
</body>
</html>"""
return web.Response(text=html_content, content_type="text/html")
route_key: RouteKey | None = None
if route_key_param:
route_key = RouteKey(route_key_param)
else:
running = self.session_manager.get_first_running_session()
if running:
route_key = running[0]
if route_key is None:
route_key = RouteKey(generate().lower())
ws_url = self._get_ws_url_from_request(request, route_key)
html_content = f"""<!DOCTYPE html>
<html>
<head>
<title>Textual Web Terminal</title>
<link rel=\"stylesheet\" href=\"/static/css/xterm.css\">
<link rel=\"stylesheet\" href=\"/static-webterm/monospace.css\">
<script src=\"/static/js/textual.js\"></script>
<style>
body {{ background: #000; margin: 0; padding: 0; }}
</style>
</head>
<body>
<div class=\"textual-terminal\" data-session-websocket-url={ws_url!r} data-font-size=\"16\"></div>
</body>
</html>"""
return web.Response(text=html_content, content_type="text/html")
async def handle_session_data(self, route_key: RouteKey, data: bytes) -> None:
ws = self._websocket_connections.get(route_key)
if ws is None:
return
await ws.send_bytes(data)
async def handle_binary_message(self, route_key: RouteKey, payload: bytes) -> None:
ws = self._websocket_connections.get(route_key)
if ws is None:
return
await ws.send_bytes(payload)
async def handle_session_close(self, session_id: SessionID, route_key: RouteKey) -> None:
self.session_manager.on_session_end(session_id)
ws = self._websocket_connections.get(route_key)
if ws is not None:
with contextlib.suppress(Exception):
await ws.close()
def force_exit(self) -> None:
self.exit_event.set()
+129
View File
@@ -0,0 +1,129 @@
from __future__ import annotations
import asyncio
import os
import selectors
from collections import deque
from dataclasses import dataclass, field
from threading import Event, Thread
@dataclass
class Write:
"""Data in a write queue."""
data: bytes
position: int = 0
done_event: asyncio.Event = field(default_factory=asyncio.Event)
class Poller(Thread):
"""A thread which reads from file descriptors and posts read data to a queue."""
def __init__(self) -> None:
super().__init__()
self._loop: asyncio.AbstractEventLoop | None = None
self._selector = selectors.DefaultSelector()
self._read_queues: dict[int, asyncio.Queue[bytes | None]] = {}
self._write_queues: dict[int, deque[Write]] = {}
self._exit_event = Event()
def add_file(self, file_descriptor: int) -> asyncio.Queue:
"""Add a file descriptor to the poller.
Args:
file_descriptor: File descriptor.
Returns:
Async queue.
"""
self._selector.register(file_descriptor, selectors.EVENT_READ | selectors.EVENT_WRITE)
queue = self._read_queues[file_descriptor] = asyncio.Queue()
return queue
def remove_file(self, file_descriptor: int) -> None:
"""Remove a file descriptor from the poller.
Args:
file_descriptor: File descriptor.
"""
self._selector.unregister(file_descriptor)
self._read_queues.pop(file_descriptor, None)
self._write_queues.pop(file_descriptor, None)
async def write(self, file_descriptor: int, data: bytes) -> None:
"""Write data to a file descriptor.
Args:
file_descriptor: File descriptor.
data: Data to write.
"""
if file_descriptor not in self._write_queues:
self._write_queues[file_descriptor] = deque()
new_write = Write(data)
self._write_queues[file_descriptor].append(new_write)
self._selector.modify(file_descriptor, selectors.EVENT_READ | selectors.EVENT_WRITE)
await new_write.done_event.wait()
def set_loop(self, loop: asyncio.AbstractEventLoop) -> None:
"""Set the asyncio loop.
Args:
loop: Async loop.
"""
self._loop = loop
def run(self) -> None:
"""Run the Poller thread."""
readable_events = selectors.EVENT_READ
writeable_events = selectors.EVENT_WRITE
loop = self._loop
selector = self._selector
assert loop is not None
while not self._exit_event.is_set():
events = selector.select(1)
for selector_key, event_mask in events:
file_descriptor = selector_key.fileobj
assert isinstance(file_descriptor, int)
queue = self._read_queues.get(file_descriptor, None)
if queue is not None:
if event_mask & readable_events:
try:
data = os.read(file_descriptor, 1024 * 32) or None
except OSError:
loop.call_soon_threadsafe(queue.put_nowait, None)
else:
loop.call_soon_threadsafe(queue.put_nowait, data)
if event_mask & writeable_events:
write_queue = self._write_queues.get(file_descriptor, None)
if write_queue:
write = write_queue[0]
remaining_data = write.data[write.position :]
try:
bytes_written = os.write(file_descriptor, remaining_data)
except OSError:
# Write failed; signal completion anyway to unblock waiters
write_queue.popleft()
loop.call_soon_threadsafe(write.done_event.set)
continue
write.position += bytes_written
# Check if all data has been written
if write.position >= len(write.data):
write_queue.popleft()
loop.call_soon_threadsafe(write.done_event.set)
else:
selector.modify(file_descriptor, readable_events)
def exit(self) -> None:
"""Exit and block until finished."""
for queue in self._read_queues.values():
queue.put_nowait(None)
self._exit_event.set()
self.join()
self._read_queues.clear()
self._write_queues.clear()
+109
View File
@@ -0,0 +1,109 @@
from __future__ import annotations
from abc import abstractmethod
from typing import TYPE_CHECKING
if TYPE_CHECKING:
import asyncio
from .types import Meta
class SessionConnector:
"""Connect a session with a client."""
async def on_data(self, data: bytes) -> None:
"""Handle data from session.
Args:
data: Bytes to handle.
"""
async def on_meta(self, meta: Meta) -> None:
"""Handle meta from session.
Args:
meta: Mapping of meta information.
"""
async def on_binary_encoded_message(self, payload: bytes) -> None:
"""Handle binary encoded data from the process.
Args:
payload: Binary encoded data to handle.
"""
async def on_close(self) -> None:
"""Handle session close."""
class Session:
"""Virtual base class for a session."""
def __init__(self) -> None:
self._connector = SessionConnector()
@abstractmethod
async def open(self, width: int = 80, height: int = 24) -> None:
"""Open the session."""
...
@abstractmethod
async def start(self, connector: SessionConnector) -> asyncio.Task:
"""Start the session.
Returns:
Running task.
"""
...
@abstractmethod
async def close(self) -> None:
"""Close the session."""
@abstractmethod
async def wait(self) -> None:
"""Wait for session to end."""
@abstractmethod
async def set_terminal_size(self, width: int, height: int) -> None:
"""Set the terminal size.
Args:
width: New width.
height: New height.
"""
...
@abstractmethod
async def send_bytes(self, data: bytes) -> bool:
"""Send bytes to the process.
Args:
data: Bytes to send.
Returns:
True on success, or False if the data was not sent.
"""
...
@abstractmethod
async def send_meta(self, data: Meta) -> bool:
"""Send meta to the process.
Args:
meta: Meta information.
Returns:
True on success, or False if the data was not sent.
"""
...
def is_running(self) -> bool:
"""Check if the session is still running.
Returns:
True if session is active, False otherwise.
"""
return False
+201
View File
@@ -0,0 +1,201 @@
from __future__ import annotations
import asyncio
import logging
import sys
from typing import TYPE_CHECKING
from . import config, constants
from ._two_way_dict import TwoWayDict
from .app_session import AppSession
from .identity import generate
if TYPE_CHECKING:
from pathlib import Path
from .poller import Poller
from .session import Session
from .types import RouteKey, SessionID
log = logging.getLogger("textual-web")
if not constants.WINDOWS:
from .terminal_session import TerminalSession
class SessionManager:
"""Manage sessions (Textual apps or terminals)."""
def __init__(self, poller: Poller, path: Path, apps: list[config.App]) -> None:
self.poller = poller
self.path = path
self.apps = apps
self.apps_by_slug = {app.slug: app for app in apps}
self.sessions: dict[SessionID, Session] = {}
self.routes: TwoWayDict[RouteKey, SessionID] = TwoWayDict()
def add_app(self, name: str, command: str, slug: str, terminal: bool = False) -> None:
"""Add a new app
Args:
name: Name of the app.
command: Command to run the app.
slug: Slug used in URL, or blank to auto-generate on server.
"""
slug = slug or generate().lower()
new_app = config.App(name=name, slug=slug, path="./", command=command, terminal=terminal)
self.apps.append(new_app)
self.apps_by_slug[slug] = new_app
def get_default_app(self) -> config.App | None:
"""Get the default app (first configured app), or ``None``."""
return self.apps[0] if self.apps else None
def on_session_end(self, session_id: SessionID) -> None:
"""Called when a session ends."""
self.sessions.pop(session_id, None)
route_key = self.routes.get_key(session_id)
if route_key is not None:
del self.routes[route_key]
log.debug(f"Session {session_id} ended")
async def close_all(self, timeout: float = 3.0) -> None:
"""Close app sessions.
Args:
timeout: Time (in seconds) to wait before giving up.
"""
sessions = list(self.sessions.values())
if not sessions:
return
log.info("Closing %s session(s)", len(sessions))
async def do_close() -> int:
"""Close all sessions, return number unclosed after timeout
Returns:
Number of sessions not yet closed.
"""
async def close_wait(session: Session) -> None:
await asyncio.gather(session.close(), session.wait())
if sys.version_info >= (3, 11):
async with asyncio.TaskGroup() as tg: # type: ignore[attr-defined]
for session in sessions:
tg.create_task(close_wait(session))
return 0
_done, remaining = await asyncio.wait(
[asyncio.create_task(close_wait(session)) for session in sessions],
timeout=timeout,
)
return len(remaining)
remaining = await do_close()
if remaining:
log.warning("%s session(s) didn't close after %s seconds", remaining, timeout)
async def new_session(
self,
slug: str,
session_id: SessionID,
route_key: RouteKey,
size: tuple[int, int] = (80, 24),
) -> Session | None:
"""Create a new session.
Args:
slug: Slug for app.
session_id: Session identity.
route_key: Route key.
size: Terminal size (width, height).
Returns:
New session, or `None` if no app / terminal configured.
"""
app = self.apps_by_slug.get(slug)
if app is None:
return None
session_process: Session
if app.terminal:
if constants.WINDOWS:
log.warning("Sorry, textual-web does not currently support terminals on Windows")
return None
else:
session_process = TerminalSession(
self.poller,
session_id,
app.command,
)
log.info(f"Created terminal session {session_id}")
else:
session_process = AppSession(
self.path,
app.command,
session_id,
)
log.info(f"Created app session {session_id}")
self.sessions[session_id] = session_process
self.routes[route_key] = session_id
await session_process.open(*size)
log.debug(f"Session {session_id} opened and ready")
return session_process
async def close_session(self, session_id: SessionID) -> None:
"""Close a session.
Args:
session_id: Session identity.
"""
session_process = self.sessions.get(session_id, None)
if session_process is None:
return
await session_process.close()
def get_session(self, session_id: SessionID) -> Session | None:
"""Get a session from a session ID.
Args:
session_id: Session identity.
Returns:
A session or `None` if it doesn't exist.
"""
return self.sessions.get(session_id)
def get_session_by_route_key(self, route_key: RouteKey) -> Session | None:
"""Get a session from a route key.
Args:
route_key: A route key.
Returns:
A session or `None` if it doesn't exist.
"""
session_id = self.routes.get(route_key)
if session_id is not None:
return self.sessions.get(session_id)
return None
def get_first_running_session(self) -> tuple[RouteKey, Session] | None:
"""Get the first running session.
Returns:
Tuple of (route_key, session) or None if no running sessions.
"""
for route_key in self.routes:
session_id = self.routes.get(route_key)
if session_id:
session = self.sessions.get(session_id)
if session and session.is_running():
return (route_key, session)
return None
+18
View File
@@ -0,0 +1,18 @@
import re
import unicodedata
def slugify(value: str, allow_unicode=False) -> str:
"""
Convert to ASCII if 'allow_unicode' is False. Convert spaces or repeated
dashes to single dashes. Remove characters that aren't alphanumerics,
underscores, or hyphens. Convert to lowercase. Also strip leading and
trailing whitespace, dashes, and underscores.
"""
value = str(value)
if allow_unicode:
value = unicodedata.normalize("NFKC", value)
else:
value = unicodedata.normalize("NFKD", value).encode("ascii", "ignore").decode("ascii")
value = re.sub(r"[^\w\s-]", "", value.lower())
return re.sub(r"[-\s]+", "-", value).strip("-_")
+19
View File
@@ -0,0 +1,19 @@
/* Generic monospace font stack for terminal rendering.
Prefers system monospace fonts, with optional Fira Code / Roboto Mono if available.
We avoid external font fetching (e.g. Google Fonts) to keep local server self-contained.
*/
:root {
--textual-webterm-mono: ui-monospace, "SFMono-Regular", "FiraCode Nerd Font",
"FiraMono Nerd Font", "Fira Code", "Roboto Mono", Menlo, Monaco, Consolas,
"Liberation Mono", "DejaVu Sans Mono", "Courier New", monospace;
}
body {
font-family: var(--textual-webterm-mono);
}
.xterm {
font-family: var(--textual-webterm-mono);
}
+180
View File
@@ -0,0 +1,180 @@
from __future__ import annotations
import array
import asyncio
import contextlib
import fcntl
import logging
import os
import pty
import shlex
import signal
import termios
from collections import deque
from typing import TYPE_CHECKING
import rich.repr
from importlib_metadata import version
from .session import Session, SessionConnector
if TYPE_CHECKING:
from .poller import Poller
from .types import Meta, SessionID
log = logging.getLogger("textual-web")
# Maximum bytes to keep in replay buffer for reconnection
REPLAY_BUFFER_SIZE = 64 * 1024 # 64KB
@rich.repr.auto
class TerminalSession(Session):
"""A session that manages a terminal."""
def __init__(
self,
poller: Poller,
session_id: SessionID,
command: str,
) -> None:
self.poller = poller
self.session_id = session_id
self.command = command or os.environ.get("SHELL", "sh")
self.master_fd: int | None = None
self.pid: int | None = None
self._task: asyncio.Task | None = None
self._replay_buffer: deque[bytes] = deque()
self._replay_buffer_size = 0
self._replay_lock = asyncio.Lock()
super().__init__()
def __rich_repr__(self) -> rich.repr.Result:
yield "session_id", self.session_id
yield "command", self.command
async def open(self, width: int = 80, height: int = 24) -> None:
log.info(f"Opening terminal session {self.session_id} with command: {self.command}")
pid, master_fd = pty.fork()
self.pid = pid
self.master_fd = master_fd
if pid == pty.CHILD:
os.environ["TERM_PROGRAM"] = "textual-webterm"
os.environ["TERM_PROGRAM_VERSION"] = version("textual-webterm")
try:
argv = shlex.split(self.command)
except ValueError:
os._exit(1)
if not argv:
os._exit(1)
try:
os.execvp(argv[0], argv) ## Exits the app
except OSError:
os._exit(1)
try:
self._set_terminal_size(width, height)
except OSError:
# Clean up on failure
os.close(master_fd)
self.master_fd = None
raise
log.debug(f"Terminal session {self.session_id} opened successfully")
def _set_terminal_size(self, width: int, height: int) -> None:
buf = array.array("h", [height, width, 0, 0])
assert self.master_fd is not None
fcntl.ioctl(self.master_fd, termios.TIOCSWINSZ, buf)
async def set_terminal_size(self, width: int, height: int) -> None:
loop = asyncio.get_running_loop()
await loop.run_in_executor(None, self._set_terminal_size, width, height)
async def _add_to_replay_buffer(self, data: bytes) -> None:
"""Add data to replay buffer, maintaining size limit."""
async with self._replay_lock:
self._replay_buffer.append(data)
self._replay_buffer_size += len(data)
while self._replay_buffer_size > REPLAY_BUFFER_SIZE and self._replay_buffer:
old_data = self._replay_buffer.popleft()
self._replay_buffer_size -= len(old_data)
async def get_replay_buffer(self) -> bytes:
"""Get the contents of the replay buffer."""
async with self._replay_lock:
return b"".join(self._replay_buffer)
def update_connector(self, connector: SessionConnector) -> None:
"""Update the connector for reconnection without restarting the session."""
self._connector = connector
log.debug(f"Updated connector for session {self.session_id}")
async def start(self, connector: SessionConnector) -> asyncio.Task:
self._connector = connector
assert self.master_fd is not None
if self._task is not None:
# Already running, just update connector (handled by update_connector)
return self._task
self._task = asyncio.create_task(self.run())
return self._task
async def run(self) -> None:
assert self.master_fd is not None
queue = self.poller.add_file(self.master_fd)
try:
while True:
data = await queue.get()
if not data:
break
# Store in replay buffer for reconnection
await self._add_to_replay_buffer(data)
# Send to current connector
if self._connector:
await self._connector.on_data(data)
except OSError:
log.exception("error in terminal.run")
finally:
if self._connector:
await self._connector.on_close()
if self.master_fd is not None:
fd = self.master_fd
self.master_fd = None
# Remove from poller first (while fd is still valid), then close
self.poller.remove_file(fd)
os.close(fd)
async def send_bytes(self, data: bytes) -> bool:
if self.master_fd is None:
return False
await self.poller.write(self.master_fd, data)
return True
async def send_meta(self, data: Meta) -> bool:
return True
async def close(self) -> None:
if self.pid is not None:
try:
os.kill(self.pid, signal.SIGHUP)
except ProcessLookupError:
pass # Process already gone
except Exception as e:
log.warning(f"Error closing terminal session {self.session_id}: {e}")
async def wait(self) -> None:
if self._task is not None:
with contextlib.suppress(asyncio.CancelledError):
await self._task
def is_running(self) -> bool:
"""Check if the terminal session is still running."""
if self.master_fd is None or self._task is None:
return False
# Check if process is actually alive
if self.pid is not None:
try:
os.kill(self.pid, 0) # Signal 0 checks existence
return True
except OSError:
return False
# pid is None means process not started or already exited
return False
+6
View File
@@ -0,0 +1,6 @@
from typing import NewType, Union
AppID = NewType("AppID", str)
Meta = dict[str, Union[str, None, int, bool]]
RouteKey = NewType("RouteKey", str)
SessionID = NewType("SessionID", str)
+1
View File
@@ -0,0 +1 @@
"""Tests for textual-webterm."""
+76
View File
@@ -0,0 +1,76 @@
"""Pytest configuration and fixtures for textual-webterm tests."""
from __future__ import annotations
import asyncio
from typing import TYPE_CHECKING
import pytest
from textual_webterm.config import App, Config
from textual_webterm.local_server import LocalServer
from textual_webterm.poller import Poller
from textual_webterm.session_manager import SessionManager
if TYPE_CHECKING:
from collections.abc import AsyncGenerator, Generator
from pathlib import Path
@pytest.fixture
def event_loop() -> Generator[asyncio.AbstractEventLoop, None, None]:
"""Create an event loop for async tests."""
loop = asyncio.new_event_loop()
yield loop
loop.close()
@pytest.fixture
def sample_terminal_app() -> App:
"""Create a sample terminal app configuration."""
return App(
name="Test Terminal",
slug="test-terminal",
terminal=True,
command="echo hello",
)
@pytest.fixture
def sample_config(sample_terminal_app: App) -> Config:
"""Create a sample configuration with a terminal app."""
return Config(apps=[sample_terminal_app])
@pytest.fixture
def tmp_config_path(tmp_path: Path) -> Path:
"""Create a temporary config path."""
return tmp_path / "config"
@pytest.fixture
def poller() -> Poller:
"""Create a Poller instance."""
return Poller()
@pytest.fixture
def session_manager(poller: Poller, tmp_path: Path, sample_terminal_app: App) -> SessionManager:
"""Create a SessionManager instance."""
return SessionManager(poller, tmp_path, [sample_terminal_app])
@pytest.fixture
async def local_server(
tmp_config_path: Path, sample_config: Config
) -> AsyncGenerator[LocalServer, None]:
"""Create a LocalServer instance for testing."""
server = LocalServer(
str(tmp_config_path),
sample_config,
host="127.0.0.1",
port=0, # Use random available port
)
yield server
# Cleanup
server.force_exit()
+133
View File
@@ -0,0 +1,133 @@
"""Tests for app_session module."""
import asyncio
import contextlib
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from textual_webterm.app_session import AppSession, ProcessState
class TestProcessState:
"""Tests for ProcessState enum."""
def test_process_states_exist(self):
"""Test that all process states exist."""
assert ProcessState.PENDING is not None
assert ProcessState.RUNNING is not None
assert ProcessState.CLOSED is not None
class TestAppSession:
"""Tests for AppSession class."""
@pytest.fixture
def mock_path(self, tmp_path):
"""Create a mock path."""
return tmp_path
def test_init(self, mock_path):
"""Test AppSession initialization."""
session = AppSession(mock_path, "python app.py", "test-session")
assert session.working_directory == mock_path
assert session.command == "python app.py"
assert session.session_id == "test-session"
assert session.state == ProcessState.PENDING
def test_init_with_devtools(self, mock_path):
"""Test AppSession with devtools enabled."""
session = AppSession(mock_path, "python app.py", "test-session", devtools=True)
assert session.devtools is True
@pytest.mark.asyncio
async def test_send_bytes_not_running(self, mock_path):
"""Test send_bytes when not running returns False."""
session = AppSession(mock_path, "python app.py", "test-session")
# Session not started, will return False gracefully
result = await session.send_bytes(b"test")
assert result is False
@pytest.mark.asyncio
async def test_send_meta(self, mock_path):
"""Test send_meta."""
session = AppSession(mock_path, "python app.py", "test-session")
session._process = MagicMock()
session._process.stdin = MagicMock()
session._process.stdin.write = MagicMock()
session._process.stdin.drain = AsyncMock()
await session.send_meta({"key": "value"})
# Should handle meta data
@pytest.mark.asyncio
async def test_set_terminal_size(self, mock_path):
"""Test set_terminal_size."""
session = AppSession(mock_path, "python app.py", "test-session")
session._process = MagicMock()
session._process.stdin = MagicMock()
session._process.stdin.write = MagicMock()
session._process.stdin.drain = AsyncMock()
# Should not raise
await session.set_terminal_size(100, 50)
@pytest.mark.asyncio
async def test_close_not_running(self, mock_path):
"""Test close when not running handles gracefully."""
session = AppSession(mock_path, "python app.py", "test-session")
# No process running, close should handle gracefully (not crash)
await session.close()
assert session.state == ProcessState.CLOSING
@pytest.mark.asyncio
async def test_wait_no_task(self, mock_path):
"""Test wait when no task."""
session = AppSession(mock_path, "python app.py", "test-session")
# Should not raise
await session.wait()
def test_state_transitions(self, mock_path):
"""Test state transition tracking."""
session = AppSession(mock_path, "python app.py", "test-session")
assert session.state == ProcessState.PENDING
# Manually set state for testing
session.state = ProcessState.RUNNING
assert session.state == ProcessState.RUNNING
session.state = ProcessState.CLOSED
assert session.state == ProcessState.CLOSED
class TestAppSessionConnector:
"""Tests for AppSession with connector."""
@pytest.fixture
def mock_connector(self):
"""Create a mock connector."""
connector = MagicMock()
connector.on_data = AsyncMock()
connector.on_close = AsyncMock()
return connector
@pytest.mark.asyncio
async def test_start_creates_task(self, tmp_path, mock_connector):
"""Test that start creates a task."""
session = AppSession(tmp_path, "echo test", "test-session")
with (
patch.object(session, "open", new_callable=AsyncMock),
patch.object(session, "run", new_callable=AsyncMock),
):
task = await session.start(mock_connector)
assert task is not None
# Cancel to clean up
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
+227
View File
@@ -0,0 +1,227 @@
"""Tests for CLI module."""
from pathlib import Path
import click
import pytest
from click.testing import CliRunner
class TestParseAppPath:
"""Tests for parse_app_path function."""
def test_parse_module_class(self):
"""Test parsing module:class format."""
from textual_webterm.cli import parse_app_path
module, cls = parse_app_path("mymodule:MyClass")
assert module == "mymodule"
assert cls == "MyClass"
def test_parse_nested_module_class(self):
"""Test parsing nested.module:class format."""
from textual_webterm.cli import parse_app_path
module, cls = parse_app_path("my.nested.module:MyClass")
assert module == "my.nested.module"
assert cls == "MyClass"
def test_parse_file_path_class(self):
"""Test parsing file/path.py:class format."""
from textual_webterm.cli import parse_app_path
module, cls = parse_app_path("path/to/file.py:MyClass")
assert module == "path/to/file.py"
assert cls == "MyClass"
def test_parse_no_colon_raises(self):
"""Test that missing colon raises BadParameter."""
from textual_webterm.cli import parse_app_path
with pytest.raises(click.BadParameter) as exc_info:
parse_app_path("invalid_format")
assert "Expected format" in str(exc_info.value)
class TestLoadAppClass:
"""Tests for load_app_class function."""
def test_load_nonexistent_module(self):
"""Test loading from non-existent module raises."""
from textual_webterm.cli import load_app_class
with pytest.raises(click.BadParameter) as exc_info:
load_app_class("nonexistent_module_xyz:MyClass")
assert "Could not import" in str(exc_info.value)
def test_load_nonexistent_class(self):
"""Test loading non-existent class from existing module raises."""
from textual_webterm.cli import load_app_class
with pytest.raises(click.BadParameter) as exc_info:
load_app_class("os:NonExistentClass")
assert "has no attribute" in str(exc_info.value)
def test_load_existing_class(self):
"""Test loading an existing class from a module."""
from textual_webterm.cli import load_app_class
# Load Path from pathlib
cls = load_app_class("pathlib:Path")
assert cls is Path
def test_load_from_file_nonexistent(self):
"""Test loading from non-existent file raises."""
from textual_webterm.cli import load_app_class
with pytest.raises(click.BadParameter) as exc_info:
load_app_class("/nonexistent/path.py:MyClass")
assert (
"not found" in str(exc_info.value).lower()
or "does not exist" in str(exc_info.value).lower()
)
class TestCLI:
"""Tests for CLI command."""
def test_cli_help(self):
"""Test CLI help output."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--help"])
assert result.exit_code == 0
assert "terminal" in result.output.lower() or "command" in result.output.lower()
def test_cli_runs_terminal_command(self, monkeypatch):
from textual_webterm import cli
calls: dict[str, object] = {}
class FakeServer:
def __init__(self, *_args, **_kwargs):
calls["init"] = True
def add_terminal(self, name, command, slug):
calls["terminal"] = (name, command, slug)
async def run(self):
calls["run"] = True
monkeypatch.setattr(cli, "LocalServer", FakeServer)
monkeypatch.setattr(cli.asyncio, "run", lambda _coro: None)
runner = CliRunner()
result = runner.invoke(cli.app, ["htop"])
assert result.exit_code == 0
assert calls["terminal"][1] == "htop"
def test_cli_runs_default_shell(self, monkeypatch):
import os
from textual_webterm import cli
calls: dict[str, object] = {}
class FakeServer:
def __init__(self, *_args, **_kwargs):
calls["init"] = True
def add_terminal(self, name, command, slug):
calls["terminal"] = (name, command, slug)
async def run(self):
calls["run"] = True
monkeypatch.setenv("SHELL", "/bin/zsh")
monkeypatch.setattr(cli, "LocalServer", FakeServer)
monkeypatch.setattr(cli.asyncio, "run", lambda _coro: None)
runner = CliRunner()
result = runner.invoke(cli.app, [])
assert result.exit_code == 0
assert calls["terminal"][1] == os.environ["SHELL"]
def test_cli_app_module_validation_rejects(self):
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--app", "os;rm -rf /:Fake"])
assert result.exit_code != 0
def test_cli_version(self):
"""Test CLI version output."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--version"])
assert result.exit_code == 0
assert "0.1.0" in result.output
def test_cli_invalid_app_path(self):
"""Test CLI with invalid app path."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--app", "invalid"])
assert result.exit_code != 0
def test_cli_port_option(self):
"""Test CLI port option parsing."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--help"])
assert "--port" in result.output or "-p" in result.output
def test_cli_host_option(self):
"""Test CLI host option parsing."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--help"])
assert "--host" in result.output or "-H" in result.output
class TestModuleValidation:
"""Tests for module/class name validation in CLI."""
def test_invalid_module_characters(self):
"""Test that invalid module names are rejected."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
# Module with shell characters should be rejected or fail gracefully
result = runner.invoke(cli_app, ["--app", "os; rm -rf /:Fake"])
# Should not succeed
assert result.exit_code != 0
def test_invalid_class_name(self):
"""Test that invalid class names are rejected."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--app", "os:123invalid"])
assert result.exit_code != 0
class TestCLIOptions:
"""Tests for CLI option handling."""
def test_debug_option(self):
"""Test --debug option exists."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--help"])
assert "--app" in result.output
def test_no_run_option(self):
"""Test --no-run option exists."""
from textual_webterm.cli import app as cli_app
runner = CliRunner()
result = runner.invoke(cli_app, ["--help"])
# Check that basic options are documented
assert "port" in result.output.lower()
+73
View File
@@ -0,0 +1,73 @@
import asyncio
from pathlib import Path
from click.testing import CliRunner
def test_cli_landing_manifest_runs(monkeypatch, tmp_path: Path):
from textual_webterm import cli
manifest = tmp_path / "landing.yaml"
manifest.write_text(
"""
- name: One
slug: one
command: echo one
"""
)
called = {}
class FakeServer:
def __init__(self, *_args, **_kwargs):
called["init"] = True
def add_terminal(self, name, command, slug):
called["terminal"] = (name, command, slug)
async def run(self):
called["run"] = True
monkeypatch.setattr(cli, "LocalServer", FakeServer)
monkeypatch.setattr(cli, "asyncio", asyncio)
runner = CliRunner()
result = runner.invoke(cli.app, ["-L", str(manifest)])
assert result.exit_code == 0
assert called.get("terminal") == ("One", "echo one", "one")
assert called.get("run") is True
def test_cli_compose_manifest_runs(monkeypatch, tmp_path: Path):
from textual_webterm import cli
manifest = tmp_path / "compose.yaml"
manifest.write_text(
"""
services:
svc1:
labels:
webterm-command: echo svc1
"""
)
called = {}
class FakeServer:
def __init__(self, *_args, **_kwargs):
called["init"] = True
def add_terminal(self, name, command, slug):
called["terminal"] = (name, command, slug)
async def run(self):
called["run"] = True
monkeypatch.setattr(cli, "LocalServer", FakeServer)
monkeypatch.setattr(cli, "asyncio", asyncio)
runner = CliRunner()
result = runner.invoke(cli.app, ["-C", str(manifest)])
assert result.exit_code == 0
assert called.get("terminal") == ("svc1", "echo svc1", "svc1")
assert called.get("run") is True
+55
View File
@@ -0,0 +1,55 @@
"""Extra CLI coverage tests for app execution paths."""
from __future__ import annotations
from click.testing import CliRunner
def test_cli_runs_app_from_file(monkeypatch, tmp_path):
from textual_webterm import cli
app_file = tmp_path / "myapp.py"
app_file.write_text(
"""
class MyApp:
TITLE = "MyApp"
def run(self):
return 0
""".lstrip()
)
calls: dict[str, object] = {}
class FakeServer:
def __init__(self, *_args, **_kwargs):
calls["init"] = True
def add_app(self, name, command, slug):
calls["app"] = (name, command, slug)
async def run(self):
calls["run"] = True
monkeypatch.setattr(cli, "LocalServer", FakeServer)
monkeypatch.setattr(cli.asyncio, "run", lambda _coro: None)
runner = CliRunner()
result = runner.invoke(cli.app, ["--app", f"{app_file}:MyApp"])
assert result.exit_code == 0
assert calls["app"][0] == "MyApp"
assert "python3" in calls["app"][1]
def test_load_app_class_from_file(tmp_path):
from textual_webterm.cli import load_app_class
app_file = tmp_path / "myapp2.py"
app_file.write_text(
"""
class MyApp2:
pass
""".lstrip()
)
cls = load_app_class(f"{app_file}:MyApp2")
assert cls.__name__ == "MyApp2"
+110
View File
@@ -0,0 +1,110 @@
"""Tests for configuration handling."""
from __future__ import annotations
from textual_webterm.config import App, Config
class TestApp:
"""Tests for App configuration."""
def test_create_terminal_app(self) -> None:
"""Test creating a terminal app configuration."""
app = App(
name="My Terminal",
slug="my-terminal",
terminal=True,
command="bash",
)
assert app.name == "My Terminal"
assert app.slug == "my-terminal"
assert app.terminal is True
assert app.command == "bash"
def test_create_textual_app(self) -> None:
"""Test creating a Textual app configuration."""
app = App(
name="My App",
slug="my-app",
terminal=False,
command="python -m myapp",
)
assert app.terminal is False
class TestConfig:
"""Tests for Config."""
def test_create_config_with_apps(self) -> None:
"""Test creating a config with apps."""
app = App(name="Test", slug="test", terminal=True, command="bash")
config = Config(apps=[app])
assert len(config.apps) == 1
assert config.apps[0].name == "Test"
def test_create_empty_config(self) -> None:
"""Test creating a config with no apps."""
config = Config(apps=[])
assert len(config.apps) == 0
class TestDefaultConfig:
"""Tests for default_config function."""
def test_default_config_returns_config(self):
"""Test that default_config returns a Config object."""
from textual_webterm.config import default_config
config = default_config()
assert config is not None
assert hasattr(config, "apps")
class TestLoadConfig:
"""Tests for load_config function."""
def test_load_config_parses_app_and_terminal(self, tmp_path):
from textual_webterm.config import load_config
config_path = tmp_path / "config.toml"
config_path.write_text(
"""
[app.demo]
command = "echo demo"
[terminal.shell]
command = "bash"
""".lstrip()
)
config = load_config(config_path)
assert len(config.apps) == 2
assert {a.name for a in config.apps} == {"demo", "shell"}
assert any(a.terminal for a in config.apps)
def test_load_config_slugify_for_app(self, tmp_path):
from textual_webterm.config import load_config
config_path = tmp_path / "config.toml"
config_path.write_text(
"""
[app."My App"]
command = "echo hi"
""".lstrip()
)
config = load_config(config_path)
assert config.apps[0].slug
def test_load_config_expands_vars(self, tmp_path, monkeypatch):
from textual_webterm.config import load_config
monkeypatch.setenv("MY_CMD", "echo expanded")
config_path = tmp_path / "config.toml"
config_path.write_text(
"""
[terminal.t]
command = "$MY_CMD"
""".lstrip()
)
config = load_config(config_path)
assert config.apps[0].command == "echo expanded"
+44
View File
@@ -0,0 +1,44 @@
import tempfile
from pathlib import Path
from textual_webterm.config import load_compose_manifest, load_landing_yaml
def test_load_landing_yaml_simple():
data = """
- name: One
slug: one
command: echo one
- name: Two
command: echo two
"""
with tempfile.NamedTemporaryFile("w+", delete=False) as f:
f.write(data)
f.flush()
apps = load_landing_yaml(Path(f.name))
assert len(apps) == 2
assert apps[0].slug == "one"
assert apps[1].command == "echo two"
def test_load_compose_manifest_reads_label():
data = """
services:
svc1:
labels:
webterm-command: echo svc1
svc2:
labels:
- webterm-command=echo svc2
svc3:
labels:
other: value
"""
with tempfile.NamedTemporaryFile("w+", delete=False) as f:
f.write(data)
f.flush()
apps = load_compose_manifest(Path(f.name))
slugs = {a.slug for a in apps}
commands = {a.command for a in apps}
assert slugs == {"svc1", "svc2"}
assert "echo svc1" in commands and "echo svc2" in commands
+34
View File
@@ -0,0 +1,34 @@
"""Tests for constants helpers."""
from __future__ import annotations
def test_get_environ_bool(monkeypatch):
from textual_webterm.constants import get_environ_bool
monkeypatch.setenv("FLAG", "1")
assert get_environ_bool("FLAG") is True
monkeypatch.setenv("FLAG", "0")
assert get_environ_bool("FLAG") is False
def test_get_environ_int_keyerror(monkeypatch):
from textual_webterm.constants import get_environ_int
monkeypatch.delenv("INT", raising=False)
assert get_environ_int("INT", 7) == 7
def test_get_environ_int_valueerror(monkeypatch):
from textual_webterm.constants import get_environ_int
monkeypatch.setenv("INT", "not-an-int")
assert get_environ_int("INT", 7) == 7
def test_get_environ_int_valid(monkeypatch):
from textual_webterm.constants import get_environ_int
monkeypatch.setenv("INT", "42")
assert get_environ_int("INT", 7) == 42
+81
View File
@@ -0,0 +1,81 @@
"""Tests for LocalServer."""
from __future__ import annotations
from textual_webterm.config import App, Config
from textual_webterm.local_server import STATIC_PATH, LocalServer
class TestLocalServer:
"""Tests for LocalServer."""
def test_static_path_exists(self) -> None:
"""Test that static path is set from textual-serve."""
assert STATIC_PATH is not None
assert STATIC_PATH.exists()
def test_static_path_has_required_files(self) -> None:
"""Test that static path contains required assets."""
assert STATIC_PATH is not None
assert (STATIC_PATH / "js" / "textual.js").exists()
assert (STATIC_PATH / "css" / "xterm.css").exists()
def test_create_server(self, tmp_path) -> None:
"""Test creating a LocalServer instance."""
app = App(name="Test", slug="test", terminal=True, command="echo test")
config = Config(apps=[app])
server = LocalServer(
str(tmp_path),
config,
host="127.0.0.1",
port=8080,
)
assert server.host == "127.0.0.1"
assert server.port == 8080
assert server.app_count == 1
def test_add_app(self, tmp_path) -> None:
"""Test adding an app to the server."""
config = Config(apps=[])
server = LocalServer(str(tmp_path), config, host="127.0.0.1", port=8080)
assert server.app_count == 0
server.add_app("New App", "echo hello", slug="new-app")
assert server.app_count == 1
class TestWebSocketProtocol:
"""Tests for WebSocket protocol handling."""
def test_stdin_message_format(self) -> None:
"""Test that stdin messages use correct format."""
import json
msg = json.dumps(["stdin", "hello"])
parsed = json.loads(msg)
assert parsed[0] == "stdin"
assert parsed[1] == "hello"
def test_resize_message_format(self) -> None:
"""Test that resize messages use correct format."""
import json
msg = json.dumps(["resize", {"width": 80, "height": 24}])
parsed = json.loads(msg)
assert parsed[0] == "resize"
assert parsed[1]["width"] == 80
assert parsed[1]["height"] == 24
def test_ping_pong_format(self) -> None:
"""Test ping/pong message format."""
import json
ping = json.dumps(["ping", "12345"])
parsed = json.loads(ping)
assert parsed[0] == "ping"
pong = json.dumps(["pong", "12345"])
parsed = json.loads(pong)
assert parsed[0] == "pong"
+365
View File
@@ -0,0 +1,365 @@
"""Tests for local_server module - unit tests for helper functions."""
from unittest.mock import AsyncMock, MagicMock
import pytest
from aiohttp import web
from textual_webterm.config import App, Config
from textual_webterm.local_server import LocalServer
class TestGetStaticPath:
"""Tests for static path function."""
def test_static_path_exists(self):
"""Test that static path exists."""
from textual_webterm.local_server import _get_static_path
path = _get_static_path()
assert path is not None and path.exists()
def test_static_path_has_js(self):
"""Test that static path has JS directory."""
from textual_webterm.local_server import _get_static_path
path = _get_static_path()
assert path is not None
assert (path / "js").exists()
def test_static_path_has_css(self):
"""Test that static path has CSS directory."""
from textual_webterm.local_server import _get_static_path
path = _get_static_path()
assert path is not None
assert (path / "css").exists()
class TestLocalServer:
"""Tests for LocalServer class."""
@pytest.fixture
def config(self):
"""Create a test config."""
return Config(
apps=[
App(name="Test", slug="test", path="./", command="echo test", terminal=True),
],
)
@pytest.fixture
def server(self, config, tmp_path):
"""Create a test server."""
config_file = tmp_path / "config.toml"
config_file.write_text("")
return LocalServer(
config_path=str(config_file),
config=config,
host="localhost",
port=8080,
)
def test_init(self, server):
"""Test LocalServer initialization."""
assert server.host == "localhost"
assert server.port == 8080
assert server.session_manager is not None
def test_add_app(self, server):
"""Test adding an app."""
server.add_app("New App", "python app.py", "newapp")
assert "newapp" in server.session_manager.apps_by_slug
def test_add_terminal(self, server):
"""Test adding a terminal."""
server.add_terminal("Terminal", "bash", "term")
assert "term" in server.session_manager.apps_by_slug
app = server.session_manager.apps_by_slug["term"]
assert app.terminal is True
@pytest.mark.asyncio
async def test_create_terminal_session_uses_slug_and_starts_session(self, server, monkeypatch):
from textual_webterm import local_server
monkeypatch.setattr(local_server, "generate", lambda: "fixed-session")
session = MagicMock()
session.start = AsyncMock()
monkeypatch.setattr(server.session_manager, "new_session", AsyncMock(return_value=session))
await server._create_terminal_session("test", 80, 24)
server.session_manager.new_session.assert_awaited_once_with(
"test",
"fixed-session",
"test",
size=(80, 24),
)
session.start.assert_awaited_once()
connector = session.start.call_args.args[0]
assert connector.session_id == "fixed-session"
assert connector.route_key == "test"
class TestLocalServerHelpers:
"""Tests for LocalServer helper methods."""
@pytest.mark.asyncio
async def test_keyboard_interrupt_closes_sessions_and_websockets(self, server, monkeypatch):
ws1 = MagicMock()
ws1.close = AsyncMock()
ws2 = MagicMock()
ws2.close = AsyncMock()
server._websocket_connections["a"] = ws1
server._websocket_connections["b"] = ws2
monkeypatch.setattr(server.session_manager, "close_all", AsyncMock())
server.on_keyboard_interrupt()
assert server._shutdown_task is not None
await server._shutdown_task
ws1.close.assert_awaited_once()
ws2.close.assert_awaited_once()
server.session_manager.close_all.assert_awaited_once()
assert server.exit_event.is_set()
@pytest.mark.asyncio
async def test_ws_resize_creates_session_when_slug_exists(self, server, monkeypatch):
server.session_manager.apps_by_slug["slug"] = App(
name="Known",
slug="slug",
path="./",
command="echo ok",
terminal=True,
)
monkeypatch.setattr(server, "_create_terminal_session", AsyncMock())
ws = MagicMock()
session_created = await server._dispatch_ws_message(
["resize", {"width": 100, "height": 40}],
"slug",
ws,
session_created=False,
)
assert session_created is True
server._create_terminal_session.assert_awaited_once_with("slug", 100, 40)
@pytest.mark.asyncio
async def test_ws_resize_sends_error_if_no_apps(self, server):
ws = MagicMock()
ws.send_json = AsyncMock()
server._websocket_connections["rk"] = ws
session_created = await server._dispatch_ws_message(
["resize", {"width": 80, "height": 24}],
"rk",
ws,
session_created=False,
)
assert session_created is True
ws.send_json.assert_awaited_once_with(["error", "No app configured"])
@pytest.mark.asyncio
async def test_resize_on_disconnect_calls_set_terminal_size(self, server, monkeypatch):
session = MagicMock()
session.set_terminal_size = AsyncMock()
monkeypatch.setattr(server.session_manager, "get_session_by_route_key", lambda _rk: session)
await server._resize_on_disconnect("rk")
session.set_terminal_size.assert_called_once_with(132, 45)
@pytest.mark.asyncio
async def test_create_terminal_session_sends_error_if_no_apps(self, server):
ws = MagicMock()
ws.send_json = AsyncMock()
server._websocket_connections["rk"] = ws
await server._create_terminal_session("rk", 80, 24)
ws.send_json.assert_awaited_once_with(["error", "No app configured"])
@pytest.mark.asyncio
async def test_screenshot_svg_handler_returns_svg(self, server, monkeypatch, capsys):
request = MagicMock()
request.query = {"route_key": "rk", "width": "80"}
session = MagicMock()
session.get_replay_buffer = AsyncMock(return_value=b"hello\r\n")
monkeypatch.setattr(server.session_manager, "get_session_by_route_key", lambda _rk: session)
response = await server._handle_screenshot(request)
assert response.content_type == "image/svg+xml"
assert "<svg" in response.text
out = capsys.readouterr()
assert out.out == ""
assert out.err == ""
@pytest.mark.asyncio
async def test_screenshot_creates_session_for_known_slug(self, server, monkeypatch):
request = MagicMock()
request.query = {"route_key": "known", "width": "90"}
session = MagicMock()
session.get_replay_buffer = AsyncMock(return_value=b"world\r\n")
# Pretend app exists for slug "known"
server.session_manager.apps_by_slug["known"] = App(
name="Known",
slug="known",
path="./",
command="echo world",
terminal=True,
)
created = {}
async def create_session(route_key, width, height):
created["called"] = (route_key, width, height)
server.session_manager.routes["known"] = "sid"
monkeypatch.setattr(server, "_create_terminal_session", create_session)
monkeypatch.setattr(
server.session_manager,
"get_session_by_route_key",
lambda _rk: session if created else None,
)
response = await server._handle_screenshot(request)
assert response.content_type == "image/svg+xml"
assert "<svg" in response.text
assert created["called"][0] == "known"
assert created["called"][1:] == (132, 45)
@pytest.mark.asyncio
async def test_screenshot_returns_404_for_unknown_slug(self, server, monkeypatch):
request = MagicMock()
request.query = {"route_key": "unknown"}
monkeypatch.setattr(server.session_manager, "get_session_by_route_key", lambda _rk: None)
with pytest.raises(web.HTTPNotFound) as exc:
await server._handle_screenshot(request)
assert exc.value.status == 404
@pytest.mark.asyncio
async def test_root_click_route_key_redirects(self, server):
request = MagicMock()
request.query = {}
server._landing_apps = [
App(name="Known", slug="known", path="./", command="echo world", terminal=True)
]
response = await server._handle_root(request)
assert "/?route_key=${encodeURIComponent(tile.slug)}" in response.text
assert "visibilitychange" in response.text
@pytest.fixture
def config(self):
"""Create a test config."""
return Config(
apps=[],
)
@pytest.fixture
def server(self, config, tmp_path):
"""Create a test server."""
config_file = tmp_path / "config.toml"
config_file.write_text("")
return LocalServer(
config_path=str(config_file),
config=config,
host="localhost",
port=8080,
)
def test_get_ws_url_basic(self, server):
"""Test basic WebSocket URL generation."""
request = MagicMock()
request.headers = {"Host": "localhost:8080"}
request.secure = False
url = server._get_ws_url_from_request(request, "test-route")
assert "ws://" in url
assert "test-route" in url
def test_get_ws_url_secure(self, server):
"""Test secure WebSocket URL generation."""
request = MagicMock()
request.headers = {"Host": "localhost:8080", "X-Forwarded-Proto": "https"}
request.secure = True
url = server._get_ws_url_from_request(request, "test-route")
assert "wss://" in url
def test_get_ws_url_forwarded_host(self, server):
"""Test WebSocket URL with forwarded host."""
request = MagicMock()
request.headers = {
"Host": "localhost:8080",
"X-Forwarded-Host": "example.com",
"X-Forwarded-Proto": "https",
}
request.secure = False
url = server._get_ws_url_from_request(request, "test-route")
assert "example.com" in url
def test_get_ws_url_forwarded_port(self, server):
"""Test WebSocket URL with forwarded port."""
request = MagicMock()
request.headers = {
"Host": "localhost:8080",
"X-Forwarded-Host": "example.com",
"X-Forwarded-Port": "9000",
}
request.secure = False
url = server._get_ws_url_from_request(request, "test-route")
assert "9000" in url
def test_get_ws_url_standard_port_omitted(self, server):
"""Test that standard ports are omitted from URL."""
request = MagicMock()
request.headers = {
"Host": "example.com",
"X-Forwarded-Port": "443",
"X-Forwarded-Proto": "https",
}
request.secure = True
url = server._get_ws_url_from_request(request, "test-route")
# Port 443 should be omitted
assert ":443" not in url or url == "wss://example.com/ws/test-route"
class TestWebSocketProtocol:
"""Tests for WebSocket protocol message formats."""
def test_stdin_message_format(self):
"""Test stdin message format."""
msg = ["stdin", "hello"]
assert msg[0] == "stdin"
assert msg[1] == "hello"
def test_resize_message_format(self):
"""Test resize message format."""
msg = ["resize", {"width": 80, "height": 24}]
assert msg[0] == "resize"
assert msg[1]["width"] == 80
assert msg[1]["height"] == 24
def test_ping_pong_format(self):
"""Test ping/pong message format."""
ping = ["ping", "1234567890"]
pong = ["pong", "1234567890"]
assert ping[0] == "ping"
assert pong[0] == "pong"
assert ping[1] == pong[1]
+67
View File
@@ -0,0 +1,67 @@
"""Tests for constants module."""
class TestConstants:
"""Tests for constants module."""
def test_import(self):
"""Test module can be imported."""
from textual_webterm import constants
assert constants is not None
def test_debug_exists(self, monkeypatch):
"""Test DEBUG constant exists and respects env var."""
import importlib
from textual_webterm import constants
assert hasattr(constants, "DEBUG")
assert isinstance(constants.DEBUG, bool)
monkeypatch.setenv("DEBUG", "1")
reloaded = importlib.reload(constants)
assert reloaded.DEBUG is True
monkeypatch.setenv("DEBUG", "0")
reloaded = importlib.reload(constants)
assert reloaded.DEBUG is False
class TestExitPoller:
"""Tests for exit_poller module."""
def test_import(self):
"""Test module can be imported."""
from textual_webterm.exit_poller import ExitPoller
assert ExitPoller is not None
async def test_exits_when_idle(self, monkeypatch):
"""ExitPoller should call force_exit after idle_wait seconds with no sessions."""
import asyncio
from textual_webterm import exit_poller
from textual_webterm.exit_poller import ExitPoller
# Speed up the poll loop for the unit test.
monkeypatch.setattr(exit_poller, "EXIT_POLL_RATE", 0.01)
class FakeServer:
def __init__(self):
class SM:
def __init__(self):
self.sessions = {}
self.session_manager = SM()
self.exited = False
def force_exit(self):
self.exited = True
server = FakeServer()
poller = ExitPoller(server, idle_wait=0.02)
poller.start()
await asyncio.sleep(0.1)
poller.stop()
assert server.exited is True
+127
View File
@@ -0,0 +1,127 @@
"""Tests for poller module."""
import asyncio
import contextlib
from unittest.mock import MagicMock, patch
import pytest
from textual_webterm.poller import Poller, Write
class TestWrite:
"""Tests for Write dataclass."""
def test_create_write(self):
"""Test creating a Write object."""
write = Write(data=b"test")
assert write.data == b"test"
assert write.position == 0
assert write.done_event is not None
def test_write_with_position(self):
"""Test Write with custom position."""
write = Write(data=b"test", position=5)
assert write.position == 5
class TestPoller:
"""Tests for Poller class."""
def test_init(self):
"""Test Poller initialization."""
poller = Poller()
assert poller._loop is None
assert poller._read_queues == {}
assert poller._write_queues == {}
assert not poller._exit_event.is_set()
def test_set_loop(self):
"""Test setting the asyncio loop."""
poller = Poller()
mock_loop = MagicMock()
poller.set_loop(mock_loop)
assert poller._loop == mock_loop
def test_add_file(self):
"""Test adding a file descriptor."""
poller = Poller()
# Use a mock file descriptor
with patch.object(poller._selector, "register"):
queue = poller.add_file(42)
assert 42 in poller._read_queues
assert isinstance(queue, asyncio.Queue)
def test_remove_file(self):
"""Test removing a file descriptor."""
poller = Poller()
# Add first
with patch.object(poller._selector, "register"):
poller.add_file(42)
# Remove
with patch.object(poller._selector, "unregister"):
poller.remove_file(42)
assert 42 not in poller._read_queues
def test_remove_nonexistent_file(self):
"""Test removing a non-existent file descriptor."""
poller = Poller()
with patch.object(poller._selector, "unregister"):
# Should not raise
poller.remove_file(999)
@pytest.mark.asyncio
async def test_write_creates_queue(self):
"""Test that write creates a write queue if needed."""
poller = Poller()
poller._loop = asyncio.get_event_loop()
# Mock selector
with patch.object(poller._selector, "register"):
poller.add_file(42)
with patch.object(poller._selector, "modify"):
# Start write in background (won't complete without poller running)
task = asyncio.create_task(poller.write(42, b"test"))
# Give it time to set up
await asyncio.sleep(0.01)
assert 42 in poller._write_queues
assert len(poller._write_queues[42]) == 1
# Cancel to clean up
task.cancel()
with contextlib.suppress(asyncio.CancelledError):
await task
def test_exit_sets_event(self):
"""Test that exit sets the exit event."""
poller = Poller()
poller._exit_event.clear()
# Mock join to avoid blocking
with patch.object(poller, "join"):
poller.exit()
assert poller._exit_event.is_set()
assert poller._read_queues == {}
assert poller._write_queues == {}
def test_exit_puts_none_in_queues(self):
"""Test that exit puts None in all read queues."""
poller = Poller()
# Add some queues
with patch.object(poller._selector, "register"):
q1 = poller.add_file(1)
q2 = poller.add_file(2)
# Mock join
with patch.object(poller, "join"):
poller.exit()
# Queues should have None
assert q1.get_nowait() is None
assert q2.get_nowait() is None
+40
View File
@@ -0,0 +1,40 @@
"""Tests for session management."""
from __future__ import annotations
from textual_webterm.types import RouteKey, SessionID
class TestTypes:
"""Tests for type definitions."""
def test_session_id_is_string(self) -> None:
"""Test that SessionID is a string type."""
session_id = SessionID("test-session-123")
assert isinstance(session_id, str)
assert session_id == "test-session-123"
def test_route_key_is_string(self) -> None:
"""Test that RouteKey is a string type."""
route_key = RouteKey("abc123")
assert isinstance(route_key, str)
assert route_key == "abc123"
class TestIdentity:
"""Tests for identity generation."""
def test_generate_unique_ids(self) -> None:
"""Test that generated IDs are unique."""
from textual_webterm.identity import generate
ids = [generate() for _ in range(100)]
assert len(set(ids)) == 100 # All unique
def test_generate_id_format(self) -> None:
"""Test that generated IDs have expected format."""
from textual_webterm.identity import generate
id_ = generate()
assert isinstance(id_, str)
assert len(id_) > 0
+258
View File
@@ -0,0 +1,258 @@
"""Tests for session_manager module."""
import platform
from unittest.mock import AsyncMock, MagicMock, patch
import pytest
from textual_webterm.config import App
from textual_webterm.session_manager import SessionManager
from textual_webterm.types import RouteKey, SessionID
class TestSessionManager:
"""Tests for SessionManager class."""
@pytest.fixture
def mock_poller(self):
"""Create a mock poller."""
return MagicMock()
@pytest.fixture
def mock_path(self, tmp_path):
"""Create a mock path."""
return tmp_path
@pytest.fixture
def sample_apps(self):
"""Create sample apps."""
return [
App(name="Test Terminal", slug="terminal", path="./", command="bash", terminal=True),
App(name="Test App", slug="app", path="./", command="python app.py", terminal=False),
]
def test_init(self, mock_poller, mock_path, sample_apps):
"""Test SessionManager initialization."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
assert manager.poller == mock_poller
assert manager.path == mock_path
assert len(manager.apps) == 2
assert "terminal" in manager.apps_by_slug
assert "app" in manager.apps_by_slug
assert len(manager.sessions) == 0
assert len(manager.routes) == 0
def test_get_default_app(self, mock_poller, mock_path, sample_apps):
"""Test getting the default app."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
assert manager.get_default_app() == sample_apps[0]
def test_get_default_app_empty(self, mock_poller, mock_path):
"""Test getting the default app when no apps are configured."""
manager = SessionManager(mock_poller, mock_path, [])
assert manager.get_default_app() is None
def test_add_app(self, mock_poller, mock_path):
"""Test adding an app."""
manager = SessionManager(mock_poller, mock_path, [])
manager.add_app("New App", "python new.py", "newapp", terminal=False)
assert len(manager.apps) == 1
assert "newapp" in manager.apps_by_slug
assert manager.apps_by_slug["newapp"].name == "New App"
def test_add_app_auto_slug(self, mock_poller, mock_path):
"""Test adding an app with auto-generated slug."""
manager = SessionManager(mock_poller, mock_path, [])
manager.add_app("Auto App", "python auto.py", "", terminal=False)
assert len(manager.apps) == 1
# Slug should be auto-generated
assert len(manager.apps[0].slug) > 0
def test_get_session_not_found(self, mock_poller, mock_path, sample_apps):
"""Test getting a non-existent session."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
result = manager.get_session(SessionID("nonexistent"))
assert result is None
def test_get_session_by_route_key_not_found(self, mock_poller, mock_path, sample_apps):
"""Test getting session by non-existent route key."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
result = manager.get_session_by_route_key(RouteKey("nonexistent"))
assert result is None
def test_on_session_end(self, mock_poller, mock_path, sample_apps):
"""Test session end cleanup."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
# Manually add a session
session_id = SessionID("test-session")
route_key = RouteKey("test-route")
mock_session = MagicMock()
manager.sessions[session_id] = mock_session
manager.routes[route_key] = session_id
# End session
manager.on_session_end(session_id)
assert session_id not in manager.sessions
assert route_key not in manager.routes
def test_on_session_end_nonexistent(self, mock_poller, mock_path, sample_apps):
"""Test session end for non-existent session."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
# Should not raise
manager.on_session_end(SessionID("nonexistent"))
@pytest.mark.asyncio
async def test_close_all_empty(self, mock_poller, mock_path, sample_apps):
"""Test closing all sessions when empty."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
# Should not raise
await manager.close_all()
@pytest.mark.asyncio
async def test_close_all_with_sessions(self, mock_poller, mock_path, sample_apps):
"""Test closing all sessions."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
# Add mock sessions
mock_session = MagicMock()
mock_session.close = AsyncMock()
mock_session.wait = AsyncMock()
manager.sessions[SessionID("s1")] = mock_session
await manager.close_all(timeout=1.0)
mock_session.close.assert_called_once()
@pytest.mark.asyncio
async def test_close_session(self, mock_poller, mock_path, sample_apps):
"""Test closing a specific session."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
mock_session = MagicMock()
mock_session.close = AsyncMock()
session_id = SessionID("test-session")
manager.sessions[session_id] = mock_session
await manager.close_session(session_id)
mock_session.close.assert_called_once()
@pytest.mark.asyncio
async def test_close_session_nonexistent(self, mock_poller, mock_path, sample_apps):
"""Test closing a non-existent session."""
manager = SessionManager(mock_poller, mock_path, sample_apps)
# Should not raise
await manager.close_session(SessionID("nonexistent"))
@pytest.mark.asyncio
async def test_new_session_no_app(self, mock_poller, mock_path):
"""Test creating session with no matching app."""
manager = SessionManager(mock_poller, mock_path, [])
result = await manager.new_session(
"nonexistent",
SessionID("test"),
RouteKey("route"),
)
assert result is None
@pytest.mark.asyncio
@pytest.mark.skipif(platform.system() == "Windows", reason="Terminal not supported on Windows")
async def test_new_terminal_session(self, mock_poller, mock_path):
"""Test creating a new terminal session."""
from textual_webterm.terminal_session import TerminalSession
app = App(name="Terminal", slug="term", path="./", command="echo test", terminal=True)
manager = SessionManager(mock_poller, mock_path, [app])
with patch.object(TerminalSession, "open", new_callable=AsyncMock):
result = await manager.new_session(
"term",
SessionID("test-session"),
RouteKey("test-route"),
)
assert result is not None
assert isinstance(result, TerminalSession)
assert SessionID("test-session") in manager.sessions
assert RouteKey("test-route") in manager.routes
@pytest.mark.asyncio
async def test_new_app_session(self, mock_poller, mock_path):
"""Test creating a new app session."""
from textual_webterm.app_session import AppSession
app = App(name="App", slug="app", path="./", command="python app.py", terminal=False)
manager = SessionManager(mock_poller, mock_path, [app])
with patch.object(AppSession, "open", new_callable=AsyncMock):
result = await manager.new_session(
"app",
SessionID("test-session"),
RouteKey("test-route"),
)
assert result is not None
assert isinstance(result, AppSession)
class TestSessionManagerRoutes:
"""Tests for SessionManager route handling."""
@pytest.fixture
def manager(self, tmp_path):
"""Create a session manager with mock poller."""
mock_poller = MagicMock()
return SessionManager(mock_poller, tmp_path, [])
def test_route_mapping(self, manager):
"""Test route to session mapping."""
session_id = SessionID("session1")
route_key = RouteKey("route1")
manager.routes[route_key] = session_id
assert manager.routes.get(route_key) == session_id
assert manager.routes.get_key(session_id) == route_key
def test_get_session_by_route(self, manager):
"""Test getting session by route key."""
session_id = SessionID("session1")
route_key = RouteKey("route1")
mock_session = MagicMock()
manager.sessions[session_id] = mock_session
manager.routes[route_key] = session_id
result = manager.get_session_by_route_key(route_key)
assert result == mock_session
def test_get_first_running_session_none(self, manager):
"""Test getting first running session when empty."""
assert manager.get_first_running_session() is None
def test_get_first_running_session_found(self, manager):
"""Test getting first running session."""
session_id = SessionID("s1")
route_key = RouteKey("r1")
mock_session = MagicMock()
mock_session.is_running.return_value = True
manager.sessions[session_id] = mock_session
manager.routes[route_key] = session_id
result = manager.get_first_running_session()
assert result == (route_key, mock_session)
+40
View File
@@ -0,0 +1,40 @@
"""Tests for slugify module."""
from textual_webterm.slugify import slugify
class TestSlugify:
"""Tests for the slugify function."""
def test_lowercase(self):
"""Test that slugify converts to lowercase."""
assert slugify("HelloWorld") == "helloworld"
def test_spaces_to_dashes(self):
"""Test that spaces are converted to dashes."""
assert slugify("hello world") == "hello-world"
def test_multiple_spaces(self):
"""Test that multiple spaces become single dash."""
assert slugify("hello world") == "hello-world"
def test_special_characters_removed(self):
"""Test that special characters are removed."""
assert slugify("hello@world!") == "helloworld"
def test_combined(self):
"""Test combination of transformations."""
assert slugify("Hello World!") == "hello-world"
def test_empty_string(self):
"""Test empty string."""
assert slugify("") == ""
def test_numbers_preserved(self):
"""Test that numbers are preserved."""
assert slugify("test123") == "test123"
def test_leading_trailing_spaces(self):
"""Test that leading/trailing spaces are handled."""
result = slugify(" hello ")
assert "hello" in result
+194
View File
@@ -0,0 +1,194 @@
"""Tests for terminal_session module."""
import os
import platform
import pty
import shlex
from unittest.mock import MagicMock, patch
import pytest
# Skip tests on Windows
pytestmark = pytest.mark.skipif(
platform.system() == "Windows",
reason="Terminal sessions not supported on Windows",
)
class TestTerminalSession:
"""Tests for TerminalSession class."""
def test_import(self):
"""Test that module can be imported."""
from textual_webterm.terminal_session import TerminalSession
assert TerminalSession is not None
def test_replay_buffer_size(self):
"""Test replay buffer size constant."""
from textual_webterm.terminal_session import REPLAY_BUFFER_SIZE
assert REPLAY_BUFFER_SIZE == 64 * 1024 # 64KB
def test_init(self):
"""Test TerminalSession initialization."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
assert session.session_id == "test-session"
assert session.command == "bash"
assert session.master_fd is None
assert session.pid is None
assert session._task is None
def test_init_default_shell(self):
"""Test that default shell is used when command is empty."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
with patch.dict(os.environ, {"SHELL": "/bin/zsh"}):
session = TerminalSession(mock_poller, "test-session", "")
assert session.command == "/bin/zsh"
@pytest.mark.asyncio
async def test_replay_buffer_add(self):
"""Test adding data to replay buffer."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
await session._add_to_replay_buffer(b"test data")
assert session._replay_buffer_size == 9
assert await session.get_replay_buffer() == b"test data"
@pytest.mark.asyncio
async def test_replay_buffer_multiple_adds(self):
"""Test adding multiple chunks to replay buffer."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
await session._add_to_replay_buffer(b"chunk1")
await session._add_to_replay_buffer(b"chunk2")
assert await session.get_replay_buffer() == b"chunk1chunk2"
@pytest.mark.asyncio
async def test_replay_buffer_overflow(self):
"""Test that replay buffer trims old data when exceeding limit."""
from textual_webterm.terminal_session import (
REPLAY_BUFFER_SIZE,
TerminalSession,
)
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
# Add more data than buffer size
chunk_size = 1024
for _i in range(100): # 100KB total
await session._add_to_replay_buffer(b"x" * chunk_size)
# Buffer should be trimmed
assert session._replay_buffer_size <= REPLAY_BUFFER_SIZE + chunk_size
def test_update_connector(self):
"""Test updating connector."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
mock_connector = MagicMock()
session.update_connector(mock_connector)
assert session._connector == mock_connector
def test_is_running_not_started(self):
"""Test is_running when session not started."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
assert session.is_running() is False
@pytest.mark.asyncio
async def test_send_bytes_no_fd(self):
"""Test send_bytes returns False when no master_fd."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
result = await session.send_bytes(b"test")
assert result is False
@pytest.mark.asyncio
async def test_send_meta(self):
"""Test send_meta returns True."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
result = await session.send_meta({})
assert result is True
@pytest.mark.asyncio
async def test_close_no_pid(self):
"""Test close when no pid."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
# Should not raise
await session.close()
@pytest.mark.asyncio
async def test_wait_no_task(self):
"""Test wait when no task."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
# Should not raise
await session.wait()
def test_rich_repr(self):
"""Test rich repr output."""
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
session = TerminalSession(mock_poller, "test-session", "bash")
repr_items = list(session.__rich_repr__())
assert ("session_id", "test-session") in repr_items
assert ("command", "bash") in repr_items
@pytest.mark.asyncio
async def test_open_uses_shlex_split_and_execvp_with_args(self):
from textual_webterm.terminal_session import TerminalSession
mock_poller = MagicMock()
command = 'echo "hello world"'
session = TerminalSession(mock_poller, "test-session", command)
with (
patch("textual_webterm.terminal_session.pty.fork", return_value=(pty.CHILD, 123)) as mock_fork,
patch("textual_webterm.terminal_session.version", return_value="0.0.0"),
patch("textual_webterm.terminal_session.shlex.split", wraps=shlex.split) as mock_split,
patch("textual_webterm.terminal_session.os.execvp", side_effect=OSError()) as mock_execvp,
patch("textual_webterm.terminal_session.os._exit", side_effect=SystemExit(1)) as mock_exit,
pytest.raises(SystemExit),
):
await session.open()
mock_fork.assert_called_once()
mock_split.assert_called_once_with(command)
mock_execvp.assert_called_once_with("echo", ["echo", "hello world"])
mock_exit.assert_called_once_with(1)
+71
View File
@@ -0,0 +1,71 @@
"""Tests for TwoWayDict."""
from __future__ import annotations
from textual_webterm._two_way_dict import TwoWayDict
class TestTwoWayDict:
"""Tests for TwoWayDict bidirectional mapping."""
def test_set_and_get(self) -> None:
"""Test basic set and get operations."""
d: TwoWayDict[str, int] = TwoWayDict()
d["a"] = 1
d["b"] = 2
assert d.get("a") == 1
assert d.get("b") == 2
def test_get_key(self) -> None:
"""Test reverse lookup by value."""
d: TwoWayDict[str, int] = TwoWayDict()
d["a"] = 1
d["b"] = 2
assert d.get_key(1) == "a"
assert d.get_key(2) == "b"
def test_delete(self) -> None:
"""Test deletion removes both mappings."""
d: TwoWayDict[str, int] = TwoWayDict()
d["a"] = 1
del d["a"]
assert d.get("a") is None
assert d.get_key(1) is None
def test_contains(self) -> None:
"""Test key containment check."""
d: TwoWayDict[str, int] = TwoWayDict()
d["a"] = 1
assert "a" in d
assert "b" not in d
def test_contains_value(self) -> None:
"""Test value containment check."""
d: TwoWayDict[str, int] = TwoWayDict()
d["a"] = 1
assert d.contains_value(1) is True
assert d.contains_value(2) is False
def test_len(self) -> None:
"""Test length of dictionary."""
d: TwoWayDict[str, int] = TwoWayDict()
assert len(d) == 0
d["a"] = 1
assert len(d) == 1
d["b"] = 2
assert len(d) == 2
def test_iter(self) -> None:
"""Test iteration over keys."""
d: TwoWayDict[str, int] = TwoWayDict()
d["a"] = 1
d["b"] = 2
keys = list(d)
assert "a" in keys
assert "b" in keys
def test_initial_data(self) -> None:
"""Test initialization with data."""
d: TwoWayDict[str, int] = TwoWayDict({"a": 1, "b": 2})
assert d.get("a") == 1
assert d.get_key(2) == "b"