Pause VT parser for idle sessions to eliminate CPU waste
When no WebSocket client is connected, the session's readLoop still processes every byte of terminal output through the go-te VT parser (tracker.Feed), Screen.Draw grapheme segmentation, and string allocations — even though nobody is consuming the screen state. For programs like btop inside tmux that produce continuous full-screen redraws, this causes sustained CPU usage and GC pressure over hours. Fix: after a 10-second idle threshold (no client connected), skip tracker.Feed() and only maintain the replay buffer. When a client reconnects (UpdateConnector) or a screenshot is requested (GetScreenSnapshot), rebuild the tracker by replaying the buffer through a fresh VT parser instance. Changes: - Add idleSince atomic timestamp + MarkIdle() to Session interface - handleOutput() skips tracker.Feed when idle > threshold - UpdateConnector() clears idle flag and rebuilds tracker from replay - GetScreenSnapshot() rebuilds stale tracker on-demand for screenshots - Wire MarkIdle() call into handleWebSocket cleanup (client disconnect) - Add TestIdleTrackerPauseAndRebuild covering the full lifecycle
This commit is contained in:
@@ -770,3 +770,61 @@ func TestDockerWatcherStartStop(t *testing.T) {
|
|||||||
time.Sleep(20 * time.Millisecond)
|
time.Sleep(20 * time.Millisecond)
|
||||||
watcher.Stop()
|
watcher.Stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestIdleTrackerPauseAndRebuild(t *testing.T) {
|
||||||
|
s := NewTerminalSession("idle-test", "echo hello")
|
||||||
|
s.tracker = terminalstate.NewTracker(80, 24)
|
||||||
|
conn := &recorderConnector{}
|
||||||
|
s.connector = conn
|
||||||
|
|
||||||
|
// Feed some output while active — tracker and replay both update
|
||||||
|
s.handleOutput([]byte("hello"))
|
||||||
|
snap1 := s.GetScreenSnapshot()
|
||||||
|
if !snap1.HasChanges {
|
||||||
|
t.Fatal("expected HasChanges after active feed")
|
||||||
|
}
|
||||||
|
if got := string(s.GetReplayBuffer()); got != "hello" {
|
||||||
|
t.Fatalf("replay mismatch: %q", got)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Mark idle and advance past threshold
|
||||||
|
s.MarkIdle()
|
||||||
|
s.idleSince.Store(time.Now().Add(-idleTrackerThreshold - time.Second).UnixNano())
|
||||||
|
|
||||||
|
// Feed more output while idle — only replay should update
|
||||||
|
s.handleOutput([]byte(" world"))
|
||||||
|
if got := string(s.GetReplayBuffer()); got != "hello world" {
|
||||||
|
t.Fatalf("replay should accumulate while idle: %q", got)
|
||||||
|
}
|
||||||
|
conn.mu.Lock()
|
||||||
|
idleData := len(conn.data)
|
||||||
|
conn.mu.Unlock()
|
||||||
|
if idleData != 1 {
|
||||||
|
t.Fatalf("connector should NOT receive data while idle, got %d calls", idleData)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetScreenSnapshot should rebuild tracker on-demand
|
||||||
|
snap2 := s.GetScreenSnapshot()
|
||||||
|
if !snap2.HasChanges {
|
||||||
|
t.Fatal("snapshot after idle rebuild should have changes")
|
||||||
|
}
|
||||||
|
|
||||||
|
// UpdateConnector should clear idle and rebuild
|
||||||
|
s.MarkIdle()
|
||||||
|
s.idleSince.Store(time.Now().Add(-idleTrackerThreshold - time.Second).UnixNano())
|
||||||
|
s.handleOutput([]byte("!"))
|
||||||
|
conn2 := &recorderConnector{}
|
||||||
|
s.UpdateConnector(conn2)
|
||||||
|
if s.idleSince.Load() != 0 {
|
||||||
|
t.Fatal("idleSince should be 0 after UpdateConnector")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Feed after reconnect should go through full pipeline again
|
||||||
|
s.handleOutput([]byte("x"))
|
||||||
|
conn2.mu.Lock()
|
||||||
|
got := len(conn2.data)
|
||||||
|
conn2.mu.Unlock()
|
||||||
|
if got != 1 {
|
||||||
|
t.Fatalf("expected 1 data call after reconnect, got %d", got)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
@@ -12,6 +12,8 @@ import (
|
|||||||
"net/url"
|
"net/url"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/rcarmo/webterm/internal/terminalstate"
|
"github.com/rcarmo/webterm/internal/terminalstate"
|
||||||
)
|
)
|
||||||
@@ -51,6 +53,7 @@ type DockerExecSession struct {
|
|||||||
doneOnce sync.Once
|
doneOnce sync.Once
|
||||||
waitErr error
|
waitErr error
|
||||||
writeMu sync.Mutex
|
writeMu sync.Mutex
|
||||||
|
idleSince atomic.Int64 // unix nano; 0 = active
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewDockerExecSession(sessionID string, spec DockerExecSpec, socketPath string) *DockerExecSession {
|
func NewDockerExecSession(sessionID string, spec DockerExecSpec, socketPath string) *DockerExecSession {
|
||||||
@@ -148,6 +151,12 @@ func (s *DockerExecSession) handleOutput(data []byte) {
|
|||||||
connector := s.connector
|
connector := s.connector
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
filtered = FilterUnsupportedModes(filtered)
|
filtered = FilterUnsupportedModes(filtered)
|
||||||
|
if ts := s.idleSince.Load(); ts != 0 && time.Since(time.Unix(0, ts)) > idleTrackerThreshold {
|
||||||
|
if len(filtered) > 0 {
|
||||||
|
s.replay.Add(filtered)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
dispatchSessionOutput(filtered, tracker, s.replay, connector)
|
dispatchSessionOutput(filtered, tracker, s.replay, connector)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -307,6 +316,14 @@ func (s *DockerExecSession) GetReplayBuffer() []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *DockerExecSession) GetScreenSnapshot() terminalstate.Snapshot {
|
func (s *DockerExecSession) GetScreenSnapshot() terminalstate.Snapshot {
|
||||||
|
if s.idleSince.Load() != 0 {
|
||||||
|
s.mu.Lock()
|
||||||
|
s.rebuildTracker()
|
||||||
|
tracker := s.tracker
|
||||||
|
width, height := s.width, s.height
|
||||||
|
s.mu.Unlock()
|
||||||
|
return snapshotFromTracker(tracker, width, height)
|
||||||
|
}
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
tracker := s.tracker
|
tracker := s.tracker
|
||||||
width, height := s.width, s.height
|
width, height := s.width, s.height
|
||||||
@@ -321,4 +338,26 @@ func (s *DockerExecSession) UpdateConnector(connector SessionConnector) {
|
|||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
s.connector = connector
|
s.connector = connector
|
||||||
|
if s.idleSince.Swap(0) != 0 {
|
||||||
|
s.rebuildTracker()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkIdle records that no observer is consuming output.
|
||||||
|
func (s *DockerExecSession) MarkIdle() {
|
||||||
|
s.idleSince.CompareAndSwap(0, time.Now().UnixNano())
|
||||||
|
}
|
||||||
|
|
||||||
|
// rebuildTracker replays the buffer through a fresh tracker so the screen
|
||||||
|
// state is up-to-date after an idle period. Caller must hold s.mu.
|
||||||
|
func (s *DockerExecSession) rebuildTracker() {
|
||||||
|
if s.tracker == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fresh := terminalstate.NewTracker(s.width, s.height)
|
||||||
|
if data := s.replay.Bytes(); len(data) > 0 {
|
||||||
|
_ = fresh.Feed(data)
|
||||||
|
fresh.ConsumeActivityChanged()
|
||||||
|
}
|
||||||
|
s.tracker = fresh
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -487,6 +487,11 @@ func (s *LocalServer) handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
go s.wsSender(client)
|
go s.wsSender(client)
|
||||||
defer s.stopWSClient(routeKey, client)
|
defer s.stopWSClient(routeKey, client)
|
||||||
|
defer func() {
|
||||||
|
if session := s.sessionManager.GetSessionByRouteKey(routeKey); session != nil {
|
||||||
|
session.MarkIdle()
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// Helper to send JSON through the send channel (avoids concurrent conn writes)
|
// Helper to send JSON through the send channel (avoids concurrent conn writes)
|
||||||
sendJSON := func(v any) {
|
sendJSON := func(v any) {
|
||||||
|
|||||||
@@ -88,6 +88,7 @@ func (b *blockingSession) SendMeta(map[string]any) bool { return true }
|
|||||||
func (b *blockingSession) GetReplayBuffer() []byte { return nil }
|
func (b *blockingSession) GetReplayBuffer() []byte { return nil }
|
||||||
func (b *blockingSession) ForceRedraw() error { return nil }
|
func (b *blockingSession) ForceRedraw() error { return nil }
|
||||||
func (b *blockingSession) UpdateConnector(SessionConnector) {}
|
func (b *blockingSession) UpdateConnector(SessionConnector) {}
|
||||||
|
func (b *blockingSession) MarkIdle() {}
|
||||||
func (b *blockingSession) GetScreenSnapshot() terminalstate.Snapshot {
|
func (b *blockingSession) GetScreenSnapshot() terminalstate.Snapshot {
|
||||||
return terminalstate.Snapshot{Width: 80, Height: 24, Buffer: make([][]terminalstate.Cell, 24)}
|
return terminalstate.Snapshot{Width: 80, Height: 24, Buffer: make([][]terminalstate.Cell, 24)}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ type Session interface {
|
|||||||
GetScreenSnapshot() terminalstate.Snapshot
|
GetScreenSnapshot() terminalstate.Snapshot
|
||||||
ForceRedraw() error
|
ForceRedraw() error
|
||||||
UpdateConnector(connector SessionConnector)
|
UpdateConnector(connector SessionConnector)
|
||||||
|
MarkIdle()
|
||||||
}
|
}
|
||||||
|
|
||||||
type noopConnector struct{}
|
type noopConnector struct{}
|
||||||
|
|||||||
@@ -6,13 +6,20 @@ import (
|
|||||||
"os/exec"
|
"os/exec"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"syscall"
|
"syscall"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/creack/pty"
|
"github.com/creack/pty"
|
||||||
"github.com/google/shlex"
|
"github.com/google/shlex"
|
||||||
"github.com/rcarmo/webterm/internal/terminalstate"
|
"github.com/rcarmo/webterm/internal/terminalstate"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// idleTrackerThreshold is the duration after which we stop feeding the VT
|
||||||
|
// parser when no WebSocket client is connected. The replay buffer continues
|
||||||
|
// to accumulate so the tracker can be rebuilt on reconnect.
|
||||||
|
const idleTrackerThreshold = 10 * time.Second
|
||||||
|
|
||||||
type TerminalSession struct {
|
type TerminalSession struct {
|
||||||
sessionID string
|
sessionID string
|
||||||
command string
|
command string
|
||||||
@@ -32,6 +39,7 @@ type TerminalSession struct {
|
|||||||
doneOnce sync.Once
|
doneOnce sync.Once
|
||||||
waitErr error
|
waitErr error
|
||||||
writeMu sync.Mutex
|
writeMu sync.Mutex
|
||||||
|
idleSince atomic.Int64 // unix nano; 0 = active
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewTerminalSession(sessionID string, command string) *TerminalSession {
|
func NewTerminalSession(sessionID string, command string) *TerminalSession {
|
||||||
@@ -137,6 +145,13 @@ func (s *TerminalSession) handleOutput(data []byte) {
|
|||||||
connector := s.connector
|
connector := s.connector
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
filtered = FilterUnsupportedModes(filtered)
|
filtered = FilterUnsupportedModes(filtered)
|
||||||
|
if ts := s.idleSince.Load(); ts != 0 && time.Since(time.Unix(0, ts)) > idleTrackerThreshold {
|
||||||
|
// No client connected — only maintain the replay buffer.
|
||||||
|
if len(filtered) > 0 {
|
||||||
|
s.replay.Add(filtered)
|
||||||
|
}
|
||||||
|
return
|
||||||
|
}
|
||||||
dispatchSessionOutput(filtered, tracker, s.replay, connector)
|
dispatchSessionOutput(filtered, tracker, s.replay, connector)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -224,6 +239,15 @@ func (s *TerminalSession) GetReplayBuffer() []byte {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (s *TerminalSession) GetScreenSnapshot() terminalstate.Snapshot {
|
func (s *TerminalSession) GetScreenSnapshot() terminalstate.Snapshot {
|
||||||
|
if s.idleSince.Load() != 0 {
|
||||||
|
// Tracker is stale — rebuild before snapshotting.
|
||||||
|
s.mu.Lock()
|
||||||
|
s.rebuildTracker()
|
||||||
|
tracker := s.tracker
|
||||||
|
width, height := s.width, s.height
|
||||||
|
s.mu.Unlock()
|
||||||
|
return snapshotFromTracker(tracker, width, height)
|
||||||
|
}
|
||||||
s.mu.RLock()
|
s.mu.RLock()
|
||||||
tracker := s.tracker
|
tracker := s.tracker
|
||||||
width, height := s.width, s.height
|
width, height := s.width, s.height
|
||||||
@@ -238,4 +262,26 @@ func (s *TerminalSession) UpdateConnector(connector SessionConnector) {
|
|||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
defer s.mu.Unlock()
|
defer s.mu.Unlock()
|
||||||
s.connector = connector
|
s.connector = connector
|
||||||
|
if s.idleSince.Swap(0) != 0 {
|
||||||
|
s.rebuildTracker()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarkIdle records that no observer is consuming output.
|
||||||
|
func (s *TerminalSession) MarkIdle() {
|
||||||
|
s.idleSince.CompareAndSwap(0, time.Now().UnixNano())
|
||||||
|
}
|
||||||
|
|
||||||
|
// rebuildTracker replays the buffer through a fresh tracker so the screen
|
||||||
|
// state is up-to-date after an idle period. Caller must hold s.mu.
|
||||||
|
func (s *TerminalSession) rebuildTracker() {
|
||||||
|
if s.tracker == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
fresh := terminalstate.NewTracker(s.width, s.height)
|
||||||
|
if data := s.replay.Bytes(); len(data) > 0 {
|
||||||
|
_ = fresh.Feed(data)
|
||||||
|
fresh.ConsumeActivityChanged()
|
||||||
|
}
|
||||||
|
s.tracker = fresh
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -100,3 +100,5 @@ func (f *fakeSession) UpdateConnector(connector SessionConnector) {
|
|||||||
defer f.mu.Unlock()
|
defer f.mu.Unlock()
|
||||||
f.connector = connector
|
f.connector = connector
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (f *fakeSession) MarkIdle() {}
|
||||||
|
|||||||
Reference in New Issue
Block a user