Harden long-lived SSE and WebSocket connections
Address intermittent stalls seen after many hours by improving liveness detection and failure handling for both streaming channels. WebSocket changes: - Added periodic server ping frames and a read deadline refreshed by pong replies. - On sender write/ping failure, explicitly close the underlying connection so clients promptly observe disconnect and reconnect instead of remaining half-open. SSE changes: - Excluded /events from gzip middleware and added X-Accel-Buffering: no to reduce proxy buffering risk. - Stop the SSE loop on write errors for activity/keepalive frames so dead subscribers are cleaned up immediately. Tests: - Added regression coverage for gzip bypass on /events. - Added regression coverage ensuring SSE handler exits and unsubscribes on write failure. - Verified with make format && make check. Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
This commit is contained in:
+35
-5
@@ -25,6 +25,8 @@ import (
|
|||||||
const (
|
const (
|
||||||
wsSendQueueMax = 256
|
wsSendQueueMax = 256
|
||||||
wsSendTimeout = 2 * time.Second
|
wsSendTimeout = 2 * time.Second
|
||||||
|
wsReadTimeout = 90 * time.Second
|
||||||
|
wsPingPeriod = 30 * time.Second
|
||||||
stdinWriteTimeout = 2 * time.Second
|
stdinWriteTimeout = 2 * time.Second
|
||||||
screenshotCacheSeconds = 300 * time.Millisecond
|
screenshotCacheSeconds = 300 * time.Millisecond
|
||||||
maxScreenshotCacheTTL = 20 * time.Second
|
maxScreenshotCacheTTL = 20 * time.Second
|
||||||
@@ -344,11 +346,28 @@ func (s *LocalServer) stopWSClient(routeKey string) {
|
|||||||
|
|
||||||
func (s *LocalServer) wsSender(client *wsClient) {
|
func (s *LocalServer) wsSender(client *wsClient) {
|
||||||
defer close(client.done)
|
defer close(client.done)
|
||||||
for outbound := range client.send {
|
pingTicker := time.NewTicker(wsPingPeriod)
|
||||||
|
defer pingTicker.Stop()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case outbound, ok := <-client.send:
|
||||||
|
if !ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
_ = client.conn.SetWriteDeadline(time.Now().Add(wsSendTimeout))
|
_ = client.conn.SetWriteDeadline(time.Now().Add(wsSendTimeout))
|
||||||
if err := client.conn.WriteMessage(outbound.messageType, outbound.payload); err != nil {
|
if err := client.conn.WriteMessage(outbound.messageType, outbound.payload); err != nil {
|
||||||
|
client.closed.Store(true)
|
||||||
|
_ = client.conn.Close()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
case <-pingTicker.C:
|
||||||
|
deadline := time.Now().Add(wsSendTimeout)
|
||||||
|
if err := client.conn.WriteControl(websocket.PingMessage, nil, deadline); err != nil {
|
||||||
|
client.closed.Store(true)
|
||||||
|
_ = client.conn.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -414,6 +433,10 @@ func (s *LocalServer) loggingMiddleware(next http.Handler) http.Handler {
|
|||||||
|
|
||||||
func (s *LocalServer) gzipMiddleware(next http.Handler) http.Handler {
|
func (s *LocalServer) gzipMiddleware(next http.Handler) http.Handler {
|
||||||
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
|
||||||
|
if r.URL.Path == "/events" {
|
||||||
|
next.ServeHTTP(w, r)
|
||||||
|
return
|
||||||
|
}
|
||||||
if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
|
if !strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
|
||||||
next.ServeHTTP(w, r)
|
next.ServeHTTP(w, r)
|
||||||
return
|
return
|
||||||
@@ -490,8 +513,10 @@ func (s *LocalServer) handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
_ = conn.SetReadDeadline(time.Time{})
|
_ = conn.SetReadDeadline(time.Now().Add(wsReadTimeout))
|
||||||
conn.SetPongHandler(func(string) error { return nil })
|
conn.SetPongHandler(func(string) error {
|
||||||
|
return conn.SetReadDeadline(time.Now().Add(wsReadTimeout))
|
||||||
|
})
|
||||||
|
|
||||||
for {
|
for {
|
||||||
messageType, payload, err := conn.ReadMessage()
|
messageType, payload, err := conn.ReadMessage()
|
||||||
@@ -741,6 +766,7 @@ func (s *LocalServer) handleEvents(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.Header().Set("Content-Type", "text/event-stream")
|
w.Header().Set("Content-Type", "text/event-stream")
|
||||||
w.Header().Set("Cache-Control", "no-cache")
|
w.Header().Set("Cache-Control", "no-cache")
|
||||||
w.Header().Set("Connection", "keep-alive")
|
w.Header().Set("Connection", "keep-alive")
|
||||||
|
w.Header().Set("X-Accel-Buffering", "no")
|
||||||
flusher, ok := w.(http.Flusher)
|
flusher, ok := w.(http.Flusher)
|
||||||
if !ok {
|
if !ok {
|
||||||
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
http.Error(w, "streaming unsupported", http.StatusInternalServerError)
|
||||||
@@ -764,10 +790,14 @@ func (s *LocalServer) handleEvents(w http.ResponseWriter, r *http.Request) {
|
|||||||
case <-notify:
|
case <-notify:
|
||||||
return
|
return
|
||||||
case routeKey := <-channel:
|
case routeKey := <-channel:
|
||||||
_, _ = fmt.Fprintf(w, "event: activity\ndata: %s\n\n", routeKey)
|
if _, err := fmt.Fprintf(w, "event: activity\ndata: %s\n\n", routeKey); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
_, _ = io.WriteString(w, ": keepalive\n\n")
|
if _, err := io.WriteString(w, ": keepalive\n\n"); err != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
flusher.Flush()
|
flusher.Flush()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package webterm
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/http/httptest"
|
"net/http/httptest"
|
||||||
@@ -13,6 +14,26 @@ import (
|
|||||||
"github.com/gorilla/websocket"
|
"github.com/gorilla/websocket"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type failingSSEWriter struct {
|
||||||
|
header http.Header
|
||||||
|
writeErr error
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *failingSSEWriter) Header() http.Header {
|
||||||
|
if w.header == nil {
|
||||||
|
w.header = make(http.Header)
|
||||||
|
}
|
||||||
|
return w.header
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *failingSSEWriter) WriteHeader(int) {}
|
||||||
|
|
||||||
|
func (w *failingSSEWriter) Write([]byte) (int, error) {
|
||||||
|
return 0, w.writeErr
|
||||||
|
}
|
||||||
|
|
||||||
|
func (w *failingSSEWriter) Flush() {}
|
||||||
|
|
||||||
func newServerForTests(t *testing.T, withLanding bool) (*LocalServer, *httptest.Server, *syncSessionMap) {
|
func newServerForTests(t *testing.T, withLanding bool) (*LocalServer, *httptest.Server, *syncSessionMap) {
|
||||||
t.Helper()
|
t.Helper()
|
||||||
config := Config{
|
config := Config{
|
||||||
@@ -267,3 +288,60 @@ func TestMarkRouteActivityBroadcastsWithoutBlockingGlobalLock(t *testing.T) {
|
|||||||
t.Fatalf("expected route activity broadcast")
|
t.Fatalf("expected route activity broadcast")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestGzipMiddlewareSkipsEventsPath(t *testing.T) {
|
||||||
|
server := NewLocalServer(Config{}, ServerOptions{})
|
||||||
|
handler := server.gzipMiddleware(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) {
|
||||||
|
_, _ = io.WriteString(w, "ok")
|
||||||
|
}))
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/events", nil)
|
||||||
|
req.Header.Set("Accept-Encoding", "gzip")
|
||||||
|
rr := httptest.NewRecorder()
|
||||||
|
handler.ServeHTTP(rr, req)
|
||||||
|
if got := rr.Header().Get("Content-Encoding"); got != "" {
|
||||||
|
t.Fatalf("expected no gzip encoding for SSE path, got %q", got)
|
||||||
|
}
|
||||||
|
if rr.Body.String() != "ok" {
|
||||||
|
t.Fatalf("unexpected body: %q", rr.Body.String())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestHandleEventsReturnsOnWriteError(t *testing.T) {
|
||||||
|
server := NewLocalServer(Config{}, ServerOptions{})
|
||||||
|
req := httptest.NewRequest(http.MethodGet, "/events", nil)
|
||||||
|
writer := &failingSSEWriter{writeErr: errors.New("broken pipe")}
|
||||||
|
done := make(chan struct{})
|
||||||
|
go func() {
|
||||||
|
server.handleEvents(writer, req)
|
||||||
|
close(done)
|
||||||
|
}()
|
||||||
|
|
||||||
|
deadline := time.Now().Add(250 * time.Millisecond)
|
||||||
|
for {
|
||||||
|
server.mu.RLock()
|
||||||
|
count := len(server.sseSubscribers)
|
||||||
|
server.mu.RUnlock()
|
||||||
|
if count == 1 {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
t.Fatalf("expected SSE subscriber to be registered")
|
||||||
|
}
|
||||||
|
time.Sleep(5 * time.Millisecond)
|
||||||
|
}
|
||||||
|
|
||||||
|
server.markRouteActivity("route-a")
|
||||||
|
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
case <-time.After(2 * time.Second):
|
||||||
|
t.Fatalf("handleEvents did not exit after write error")
|
||||||
|
}
|
||||||
|
|
||||||
|
server.mu.RLock()
|
||||||
|
count := len(server.sseSubscribers)
|
||||||
|
server.mu.RUnlock()
|
||||||
|
if count != 0 {
|
||||||
|
t.Fatalf("expected SSE subscriber cleanup after write error, got %d", count)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user