From d43270d5dcddd49556667f4e313202194a81b6e4 Mon Sep 17 00:00:00 2001 From: GitHub Copilot Date: Sun, 15 Feb 2026 14:28:09 +0000 Subject: [PATCH] Harden long-lived SSE and WebSocket connections Address intermittent stalls seen after many hours by improving liveness detection and failure handling for both streaming channels. WebSocket changes: - Added periodic server ping frames and a read deadline refreshed by pong replies. - On sender write/ping failure, explicitly close the underlying connection so clients promptly observe disconnect and reconnect instead of remaining half-open. SSE changes: - Excluded /events from gzip middleware and added X-Accel-Buffering: no to reduce proxy buffering risk. - Stop the SSE loop on write errors for activity/keepalive frames so dead subscribers are cleaned up immediately. Tests: - Added regression coverage for gzip bypass on /events. - Added regression coverage ensuring SSE handler exits and unsubscribes on write failure. - Verified with make format && make check. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com> --- webterm/server.go | 46 ++++++++++++++++++++----- webterm/server_test.go | 78 ++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 116 insertions(+), 8 deletions(-) diff --git a/webterm/server.go b/webterm/server.go index 5a426a0..0153589 100644 --- a/webterm/server.go +++ b/webterm/server.go @@ -25,6 +25,8 @@ import ( const ( wsSendQueueMax = 256 wsSendTimeout = 2 * time.Second + wsReadTimeout = 90 * time.Second + wsPingPeriod = 30 * time.Second stdinWriteTimeout = 2 * time.Second screenshotCacheSeconds = 300 * time.Millisecond maxScreenshotCacheTTL = 20 * time.Second @@ -344,10 +346,27 @@ func (s *LocalServer) stopWSClient(routeKey string) { func (s *LocalServer) wsSender(client *wsClient) { defer close(client.done) - for outbound := range client.send { - _ = client.conn.SetWriteDeadline(time.Now().Add(wsSendTimeout)) - if err := client.conn.WriteMessage(outbound.messageType, outbound.payload); err != nil { - return + pingTicker := time.NewTicker(wsPingPeriod) + defer pingTicker.Stop() + for { + select { + case outbound, ok := <-client.send: + if !ok { + return + } + _ = client.conn.SetWriteDeadline(time.Now().Add(wsSendTimeout)) + if err := client.conn.WriteMessage(outbound.messageType, outbound.payload); err != nil { + client.closed.Store(true) + _ = client.conn.Close() + return + } + case <-pingTicker.C: + deadline := time.Now().Add(wsSendTimeout) + if err := client.conn.WriteControl(websocket.PingMessage, nil, deadline); err != nil { + client.closed.Store(true) + _ = client.conn.Close() + return + } } } } @@ -414,6 +433,10 @@ func (s *LocalServer) loggingMiddleware(next http.Handler) http.Handler { func (s *LocalServer) gzipMiddleware(next http.Handler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.URL.Path == "/events" { + next.ServeHTTP(w, r) + return + } if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { next.ServeHTTP(w, r) return @@ -490,8 +513,10 @@ func (s *LocalServer) handleWebSocket(w http.ResponseWriter, r *http.Request) { } } - _ = conn.SetReadDeadline(time.Time{}) - conn.SetPongHandler(func(string) error { return nil }) + _ = conn.SetReadDeadline(time.Now().Add(wsReadTimeout)) + conn.SetPongHandler(func(string) error { + return conn.SetReadDeadline(time.Now().Add(wsReadTimeout)) + }) for { messageType, payload, err := conn.ReadMessage() @@ -741,6 +766,7 @@ func (s *LocalServer) handleEvents(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "text/event-stream") w.Header().Set("Cache-Control", "no-cache") w.Header().Set("Connection", "keep-alive") + w.Header().Set("X-Accel-Buffering", "no") flusher, ok := w.(http.Flusher) if !ok { http.Error(w, "streaming unsupported", http.StatusInternalServerError) @@ -764,10 +790,14 @@ func (s *LocalServer) handleEvents(w http.ResponseWriter, r *http.Request) { case <-notify: return case routeKey := <-channel: - _, _ = fmt.Fprintf(w, "event: activity\ndata: %s\n\n", routeKey) + if _, err := fmt.Fprintf(w, "event: activity\ndata: %s\n\n", routeKey); err != nil { + return + } flusher.Flush() case <-ticker.C: - _, _ = io.WriteString(w, ": keepalive\n\n") + if _, err := io.WriteString(w, ": keepalive\n\n"); err != nil { + return + } flusher.Flush() } } diff --git a/webterm/server_test.go b/webterm/server_test.go index 9d57376..9e66525 100644 --- a/webterm/server_test.go +++ b/webterm/server_test.go @@ -2,6 +2,7 @@ package webterm import ( "encoding/json" + "errors" "io" "net/http" "net/http/httptest" @@ -13,6 +14,26 @@ import ( "github.com/gorilla/websocket" ) +type failingSSEWriter struct { + header http.Header + writeErr error +} + +func (w *failingSSEWriter) Header() http.Header { + if w.header == nil { + w.header = make(http.Header) + } + return w.header +} + +func (w *failingSSEWriter) WriteHeader(int) {} + +func (w *failingSSEWriter) Write([]byte) (int, error) { + return 0, w.writeErr +} + +func (w *failingSSEWriter) Flush() {} + func newServerForTests(t *testing.T, withLanding bool) (*LocalServer, *httptest.Server, *syncSessionMap) { t.Helper() config := Config{ @@ -267,3 +288,60 @@ func TestMarkRouteActivityBroadcastsWithoutBlockingGlobalLock(t *testing.T) { t.Fatalf("expected route activity broadcast") } } + +func TestGzipMiddlewareSkipsEventsPath(t *testing.T) { + server := NewLocalServer(Config{}, ServerOptions{}) + handler := server.gzipMiddleware(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + _, _ = io.WriteString(w, "ok") + })) + req := httptest.NewRequest(http.MethodGet, "/events", nil) + req.Header.Set("Accept-Encoding", "gzip") + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + if got := rr.Header().Get("Content-Encoding"); got != "" { + t.Fatalf("expected no gzip encoding for SSE path, got %q", got) + } + if rr.Body.String() != "ok" { + t.Fatalf("unexpected body: %q", rr.Body.String()) + } +} + +func TestHandleEventsReturnsOnWriteError(t *testing.T) { + server := NewLocalServer(Config{}, ServerOptions{}) + req := httptest.NewRequest(http.MethodGet, "/events", nil) + writer := &failingSSEWriter{writeErr: errors.New("broken pipe")} + done := make(chan struct{}) + go func() { + server.handleEvents(writer, req) + close(done) + }() + + deadline := time.Now().Add(250 * time.Millisecond) + for { + server.mu.RLock() + count := len(server.sseSubscribers) + server.mu.RUnlock() + if count == 1 { + break + } + if time.Now().After(deadline) { + t.Fatalf("expected SSE subscriber to be registered") + } + time.Sleep(5 * time.Millisecond) + } + + server.markRouteActivity("route-a") + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatalf("handleEvents did not exit after write error") + } + + server.mu.RLock() + count := len(server.sseSubscribers) + server.mu.RUnlock() + if count != 0 { + t.Fatalf("expected SSE subscriber cleanup after write error, got %d", count) + } +}