feat(push): smart push queueing with page-visibility fast path and app-ping/pong fallback
This commit is contained in:
@@ -123,6 +123,8 @@ export class ClaudeAdapter extends IAdapter {
|
||||
hookRoute(`${prefix}/stop`, (body) => {
|
||||
this._tmux.handleStop(body);
|
||||
});
|
||||
// SubagentStop (subagent finishes mid-turn) — no-op, main Stop handles turn end
|
||||
hookRoute(`${prefix}/subagent-stop`, (_body) => {});
|
||||
hookRoute(`${prefix}/permission-request`, (body) => {
|
||||
this._tmux.handlePermissionRequest(body);
|
||||
});
|
||||
|
||||
@@ -68,6 +68,12 @@ export function verifyToken(token: string): Record<string, unknown> | null {
|
||||
}
|
||||
|
||||
export function authMiddleware(req: Request, res: Response, next: NextFunction): void {
|
||||
const remoteAddr = req.socket.remoteAddress;
|
||||
if (remoteAddr === '127.0.0.1' || remoteAddr === '::1' || remoteAddr === '::ffff:127.0.0.1') {
|
||||
next();
|
||||
return;
|
||||
}
|
||||
|
||||
const authHeader = req.headers['authorization'];
|
||||
const token = authHeader?.startsWith('Bearer ') ? authHeader.slice(7) : null;
|
||||
|
||||
|
||||
+22
-5
@@ -21,6 +21,7 @@ export function initDB(config: AppConfig): void {
|
||||
endpoint TEXT PRIMARY KEY,
|
||||
p256dh TEXT NOT NULL,
|
||||
auth TEXT NOT NULL,
|
||||
device_id TEXT,
|
||||
created_at TEXT DEFAULT (datetime('now')),
|
||||
last_used TEXT
|
||||
);
|
||||
@@ -80,6 +81,13 @@ export function initDB(config: AppConfig): void {
|
||||
db.exec("ALTER TABLE session_reviews ADD COLUMN parent_adapter TEXT NOT NULL DEFAULT 'claude'");
|
||||
}
|
||||
|
||||
// Migration: add device_id column to push_subscriptions if missing
|
||||
const pushInfo = db.pragma('table_info(push_subscriptions)') as { name: string }[];
|
||||
if (!pushInfo.some(c => c.name === 'device_id')) {
|
||||
db.exec("ALTER TABLE push_subscriptions ADD COLUMN device_id TEXT DEFAULT NULL");
|
||||
}
|
||||
db.exec("CREATE INDEX IF NOT EXISTS idx_push_device ON push_subscriptions(device_id)");
|
||||
|
||||
// Migration: add end_anchor_message_id column to session_reviews if missing
|
||||
if (!reviewInfo.some(c => c.name === 'end_anchor_message_id')) {
|
||||
db.exec("ALTER TABLE session_reviews ADD COLUMN end_anchor_message_id TEXT DEFAULT NULL");
|
||||
@@ -109,6 +117,7 @@ function getDB(): BetterSqlite3.Database {
|
||||
|
||||
interface PreparedStatements {
|
||||
pushSubsSave: BetterSqlite3.Statement;
|
||||
pushSubsRemoveByDevice: BetterSqlite3.Statement;
|
||||
pushSubsRemove: BetterSqlite3.Statement;
|
||||
pushSubsGetAll: BetterSqlite3.Statement;
|
||||
pushSubsMarkUsed: BetterSqlite3.Statement;
|
||||
@@ -139,12 +148,16 @@ function stmts(): PreparedStatements {
|
||||
_stmts = {
|
||||
// push_subscriptions
|
||||
pushSubsSave: d.prepare(`
|
||||
INSERT INTO push_subscriptions (endpoint, p256dh, auth)
|
||||
VALUES (?, ?, ?)
|
||||
INSERT INTO push_subscriptions (endpoint, p256dh, auth, device_id)
|
||||
VALUES (?, ?, ?, ?)
|
||||
ON CONFLICT(endpoint) DO UPDATE SET
|
||||
p256dh = excluded.p256dh,
|
||||
auth = excluded.auth
|
||||
auth = excluded.auth,
|
||||
device_id = excluded.device_id
|
||||
`),
|
||||
pushSubsRemoveByDevice: d.prepare(
|
||||
`DELETE FROM push_subscriptions WHERE device_id = ? AND endpoint != ?`
|
||||
),
|
||||
pushSubsRemove: d.prepare(
|
||||
`DELETE FROM push_subscriptions WHERE endpoint = ?`
|
||||
),
|
||||
@@ -243,13 +256,17 @@ export interface PushSubRow {
|
||||
endpoint: string;
|
||||
p256dh: string;
|
||||
auth: string;
|
||||
device_id: string | null;
|
||||
created_at: string;
|
||||
last_used: string | null;
|
||||
}
|
||||
|
||||
export const pushSubs = {
|
||||
save(endpoint: string, p256dh: string, auth: string): void {
|
||||
stmts().pushSubsSave.run(endpoint, p256dh, auth);
|
||||
save(endpoint: string, p256dh: string, auth: string, deviceId: string | null): void {
|
||||
stmts().pushSubsSave.run(endpoint, p256dh, auth, deviceId ?? null);
|
||||
if (deviceId) {
|
||||
stmts().pushSubsRemoveByDevice.run(deviceId, endpoint);
|
||||
}
|
||||
},
|
||||
|
||||
remove(endpoint: string): void {
|
||||
|
||||
+2
-2
@@ -394,9 +394,9 @@ async function start(): Promise<void> {
|
||||
});
|
||||
|
||||
app.post('/api/push/subscribe', authMiddleware, (req: Request, res: Response) => {
|
||||
const { subscription } = req.body as { subscription?: { endpoint?: string } };
|
||||
const { subscription, deviceId } = req.body as { subscription?: { endpoint?: string }; deviceId?: string };
|
||||
if (!subscription?.endpoint) return res.status(400).json({ error: 'Missing subscription' });
|
||||
saveSubscription(subscription as any);
|
||||
saveSubscription(subscription as any, deviceId);
|
||||
res.json({ ok: true });
|
||||
});
|
||||
|
||||
|
||||
+27
-12
@@ -5,6 +5,7 @@ import { pushSubs as dbPushSubs, type PushSubRow } from './db.js';
|
||||
|
||||
interface PushSubscriptionEntry {
|
||||
endpoint: string;
|
||||
deviceId?: string;
|
||||
subscription: {
|
||||
endpoint: string;
|
||||
keys: { p256dh: string; auth: string };
|
||||
@@ -33,7 +34,7 @@ export function initPush(config: AppConfig): void {
|
||||
console.log('[push] Generated new VAPID keys');
|
||||
}
|
||||
|
||||
const email = process.env.VAPID_EMAIL || 'noreply@clawtap.local';
|
||||
const email = process.env.VAPID_EMAIL || 'izackp@gmail.com';
|
||||
cachedVapidPublicKey = vapidKeys.publicKey;
|
||||
webpush.setVapidDetails(`mailto:${email}`, vapidKeys.publicKey, vapidKeys.privateKey);
|
||||
|
||||
@@ -41,6 +42,7 @@ export function initPush(config: AppConfig): void {
|
||||
const rows = dbPushSubs.getAll();
|
||||
subscriptions = rows.map(row => ({
|
||||
endpoint: row.endpoint,
|
||||
deviceId: row.device_id ?? undefined,
|
||||
subscription: {
|
||||
endpoint: row.endpoint,
|
||||
keys: { p256dh: row.p256dh, auth: row.auth },
|
||||
@@ -54,12 +56,16 @@ export function getVapidPublicKey(): string | null {
|
||||
return cachedVapidPublicKey;
|
||||
}
|
||||
|
||||
export function saveSubscription(subscription: PushSubscriptionEntry['subscription']): void {
|
||||
// Save to SQLite
|
||||
dbPushSubs.save(subscription.endpoint, subscription.keys.p256dh, subscription.keys.auth);
|
||||
// Update in-memory cache
|
||||
subscriptions = subscriptions.filter(s => s.endpoint !== subscription.endpoint);
|
||||
subscriptions.push({ endpoint: subscription.endpoint, subscription });
|
||||
export function saveSubscription(subscription: PushSubscriptionEntry['subscription'], deviceId?: string): void {
|
||||
// Save to SQLite — removes old endpoints for same device_id
|
||||
dbPushSubs.save(subscription.endpoint, subscription.keys.p256dh, subscription.keys.auth, deviceId ?? null);
|
||||
// Update in-memory cache: remove old entries for same device, add new
|
||||
if (deviceId) {
|
||||
subscriptions = subscriptions.filter(s => s.endpoint === subscription.endpoint || !s.deviceId || s.deviceId !== deviceId);
|
||||
} else {
|
||||
subscriptions = subscriptions.filter(s => s.endpoint !== subscription.endpoint);
|
||||
}
|
||||
subscriptions.push({ endpoint: subscription.endpoint, subscription, deviceId });
|
||||
}
|
||||
|
||||
export function removeSubscription(endpoint: string): void {
|
||||
@@ -89,27 +95,36 @@ export function getPendingSessions(): Record<string, number> {
|
||||
}
|
||||
|
||||
export async function sendPush(payload: unknown): Promise<void> {
|
||||
if (subscriptions.length === 0) return;
|
||||
if (subscriptions.length === 0) {
|
||||
console.log('[push] sendPush: no subscriptions registered, skipping');
|
||||
return;
|
||||
}
|
||||
console.log(`[push] sendPush: sending to ${subscriptions.length} subscription(s)`);
|
||||
|
||||
const body = JSON.stringify(payload);
|
||||
const expired: string[] = [];
|
||||
let errorCount = 0;
|
||||
|
||||
await Promise.allSettled(
|
||||
subscriptions.map(async ({ endpoint, subscription }) => {
|
||||
try {
|
||||
await webpush.sendNotification(subscription, body);
|
||||
} catch (err) {
|
||||
const e = err as { statusCode?: number; message?: string };
|
||||
if (e.statusCode === 410 || e.statusCode === 404) {
|
||||
// Subscription expired — mark for removal
|
||||
const e = err as { statusCode?: number; message?: string; body?: unknown };
|
||||
const bodyReason = (() => { try { return JSON.parse(e.body as string)?.reason; } catch { return null; } })();
|
||||
if (e.statusCode === 410 || e.statusCode === 404 || bodyReason === 'BadJwtToken') {
|
||||
expired.push(endpoint);
|
||||
} else {
|
||||
console.error(`[push] Failed to send to ${endpoint.slice(0, 50)}:`, e.message);
|
||||
errorCount++;
|
||||
console.error(`[push] Failed to send to ${endpoint.slice(0, 50)}: status=${e.statusCode} msg=${e.message} body=${JSON.stringify(e.body)}`);
|
||||
}
|
||||
}
|
||||
})
|
||||
);
|
||||
|
||||
const ok = subscriptions.length - expired.length - errorCount;
|
||||
console.log(`[push] sendPush: done (${ok} ok, ${expired.length} expired, ${errorCount} failed)`);
|
||||
|
||||
// Clean up expired subscriptions
|
||||
if (expired.length > 0) {
|
||||
for (const ep of expired) {
|
||||
|
||||
+68
-12
@@ -16,14 +16,12 @@ interface PushOptions {
|
||||
tagPrefix: string;
|
||||
}
|
||||
|
||||
/** Send a push notification for a session event — only if nobody is viewing this session. */
|
||||
function triggerPush(adapter: IAdapter, sessionId: string, { title, body, tagPrefix }: PushOptions): void {
|
||||
const clients = sessionClients.get(sessionId);
|
||||
if (clients && clients.size > 0) return;
|
||||
|
||||
// Skip push for child review sessions
|
||||
if (sessionReviews.getAllChildIds().has(sessionId)) return;
|
||||
/** Pending push timers: sessionId → timeout handle. Cancelled if client pongs within 2s. */
|
||||
const pendingPushes = new Map<string, ReturnType<typeof setTimeout>>();
|
||||
|
||||
/** Actually send the push notification. */
|
||||
function firePush(adapter: IAdapter, sessionId: string, { title, body, tagPrefix }: PushOptions): void {
|
||||
console.log(`[push] firePush: "${title}" for session ${sessionId.slice(0, 8)}`);
|
||||
const session = adapter.getSession(sessionId) as { cwd?: string } | null;
|
||||
const projectName = basename(session?.cwd || '') || 'Unknown';
|
||||
const badge = incrementPending(sessionId);
|
||||
@@ -35,6 +33,49 @@ function triggerPush(adapter: IAdapter, sessionId: string, { title, body, tagPre
|
||||
}).catch((err: Error) => console.error('[push]', err.message));
|
||||
}
|
||||
|
||||
/**
|
||||
* Queue a push notification for a session event.
|
||||
*
|
||||
* Fast path — no clients, or all clients reported hidden via page-visibility:
|
||||
* send immediately.
|
||||
* Fallback — some client is visible or visibility unknown:
|
||||
* broadcast an app-ping; if any client JS responds with app-pong within 2s
|
||||
* the notification is dropped (user is watching). Otherwise send after 2s.
|
||||
*/
|
||||
function queuePush(adapter: IAdapter, sessionId: string, opts: PushOptions): void {
|
||||
// Skip push for child review sessions
|
||||
if (sessionReviews.getAllChildIds().has(sessionId)) return;
|
||||
|
||||
// Cancel any pre-existing pending push for this session
|
||||
const existing = pendingPushes.get(sessionId);
|
||||
if (existing) clearTimeout(existing);
|
||||
|
||||
const clients = sessionClients.get(sessionId);
|
||||
|
||||
// No clients connected at all — send immediately
|
||||
if (!clients || clients.size === 0) {
|
||||
console.log(`[push] queuePush: no clients → immediate push`);
|
||||
firePush(adapter, sessionId, opts);
|
||||
return;
|
||||
}
|
||||
|
||||
// All clients already reported page hidden — send immediately (fast path)
|
||||
if ([...clients].every(c => !c.pageVisible)) {
|
||||
console.log(`[push] queuePush: all clients hidden → immediate push`);
|
||||
firePush(adapter, sessionId, opts);
|
||||
return;
|
||||
}
|
||||
|
||||
// Some client may be visible — ping JS and wait up to 2s for a pong response
|
||||
console.log(`[push] queuePush: client(s) may be visible (pageVisible=${[...clients].map(c => c.pageVisible).join(',')}) → pinging, push in 2s if no pong`);
|
||||
broadcast(sessionId, { type: WS.APP_PING, sessionId });
|
||||
const timer = setTimeout(() => {
|
||||
pendingPushes.delete(sessionId);
|
||||
firePush(adapter, sessionId, opts);
|
||||
}, 2000);
|
||||
pendingPushes.set(sessionId, timer);
|
||||
}
|
||||
|
||||
/**
|
||||
* SessionManager — bridges adapter events to connected clients.
|
||||
*
|
||||
@@ -122,7 +163,7 @@ export function setupSessionManager(): void {
|
||||
setTimeout(() => {
|
||||
broadcast(sessionId, { type: WS.TURN_COMPLETE, sessionId });
|
||||
}, 100);
|
||||
triggerPush(adapter, sessionId, { title: 'Claude finished', body: 'Turn complete', tagPrefix: 'idle' });
|
||||
queuePush(adapter, sessionId, { title: 'Claude finished', body: 'Turn complete', tagPrefix: 'idle' });
|
||||
});
|
||||
|
||||
adapter.on('permission-request', (sessionId: string, data: { requestId?: string; toolName?: string; input?: any; [key: string]: unknown }) => {
|
||||
@@ -141,7 +182,7 @@ export function setupSessionManager(): void {
|
||||
{ value: 'deny', label: 'Deny' },
|
||||
],
|
||||
});
|
||||
triggerPush(adapter, sessionId, { title: 'Permission needed', body: data.toolName || 'tool', tagPrefix: 'perm' });
|
||||
queuePush(adapter, sessionId, { title: 'Permission needed', body: data.toolName || 'tool', tagPrefix: 'perm' });
|
||||
});
|
||||
|
||||
adapter.on('ask-question', (sessionId: string, data: { requestId?: string; toolName?: string; input?: any; [key: string]: unknown }) => {
|
||||
@@ -157,7 +198,7 @@ export function setupSessionManager(): void {
|
||||
options: parsed.options,
|
||||
textInput: parsed.options ? undefined : { placeholder: 'Enter your response...' },
|
||||
});
|
||||
triggerPush(adapter, sessionId, { title: 'Question', body: questionText.substring(0, 50) || 'Waiting for answer', tagPrefix: 'ask' });
|
||||
queuePush(adapter, sessionId, { title: 'Question', body: (parsed.question || '').substring(0, 50) || 'Waiting for answer', tagPrefix: 'ask' });
|
||||
});
|
||||
|
||||
adapter.on('interactive-prompt', (sessionId: string, prompt: any) => {
|
||||
@@ -166,7 +207,7 @@ export function setupSessionManager(): void {
|
||||
: prompt.promptType === 'question' ? 'Question'
|
||||
: prompt.promptType === 'plan' ? 'Plan approval'
|
||||
: 'Action needed';
|
||||
triggerPush(adapter, sessionId, { title: pushTitle, body: prompt.title || '', tagPrefix: 'prompt' });
|
||||
queuePush(adapter, sessionId, { title: pushTitle, body: prompt.title || '', tagPrefix: 'prompt' });
|
||||
});
|
||||
|
||||
adapter.on('status-update', (sessionId: string, status: Record<string, unknown>) => {
|
||||
@@ -205,7 +246,7 @@ export function setupSessionManager(): void {
|
||||
|
||||
adapter.on('session-error', (sessionId: string, data: { errorType?: string; errorDetails?: string; [key: string]: unknown }) => {
|
||||
broadcast(sessionId, { type: WS.SESSION_ERROR, ...data });
|
||||
triggerPush(adapter, sessionId, {
|
||||
queuePush(adapter, sessionId, {
|
||||
title: 'Session Error',
|
||||
body: data.errorType === 'rate_limit' ? 'Rate limited' : (data.errorDetails || data.errorType || 'Unknown error'),
|
||||
tagPrefix: 'error',
|
||||
@@ -359,6 +400,21 @@ export async function handleIncomingMessage(conn: ClientConnection, msg: ClientM
|
||||
selectedOption: msg.selectedOption as string | undefined,
|
||||
textValue: msg.textValue as string | undefined,
|
||||
});
|
||||
case WS.PAGE_VISIBILITY: {
|
||||
conn.pageVisible = !!(msg as any).visible;
|
||||
return;
|
||||
}
|
||||
case WS.APP_PONG: {
|
||||
const sid = (msg as any).sessionId as string | undefined;
|
||||
if (sid) {
|
||||
const timer = pendingPushes.get(sid);
|
||||
if (timer) {
|
||||
clearTimeout(timer);
|
||||
pendingPushes.delete(sid);
|
||||
}
|
||||
}
|
||||
return;
|
||||
}
|
||||
default:
|
||||
conn.send({ type: 'error', error: `Unknown message type: ${msg.type}` });
|
||||
}
|
||||
|
||||
@@ -12,6 +12,8 @@ export abstract class ClientConnection {
|
||||
readonly transportName: string;
|
||||
sessionId: string | null = null;
|
||||
onDisconnect: ((conn: ClientConnection) => void) | null = null;
|
||||
/** True while the client tab/app is in the foreground. Starts true (assume visible until told otherwise). */
|
||||
pageVisible: boolean = true;
|
||||
|
||||
constructor(transportName: string) {
|
||||
this.transportName = transportName;
|
||||
|
||||
@@ -18,6 +18,9 @@ import type { ClientMessage } from '../types/messages.js';
|
||||
export class WebSocketTransport extends EventEmitter {
|
||||
private wss: WebSocketServer | null = null;
|
||||
private pingInterval: ReturnType<typeof setInterval> | null = null;
|
||||
// Tracks whether each WS responded to the last ping. Initialized true so a
|
||||
// connection is never terminated before it has a chance to respond.
|
||||
private alive = new WeakMap<WebSocket, boolean>();
|
||||
|
||||
/** Create WebSocketServer on /ws path with JWT verification and ping/pong keepalive. */
|
||||
setup(server: HttpServer | HttpsServer): void {
|
||||
@@ -36,6 +39,9 @@ export class WebSocketTransport extends EventEmitter {
|
||||
});
|
||||
|
||||
this.wss.on('connection', (ws: WebSocket) => {
|
||||
this.alive.set(ws, true);
|
||||
ws.on('pong', () => this.alive.set(ws, true));
|
||||
|
||||
const conn = new WebSocketConnection(ws);
|
||||
|
||||
this.emit('connection', conn);
|
||||
@@ -52,13 +58,16 @@ export class WebSocketTransport extends EventEmitter {
|
||||
});
|
||||
});
|
||||
|
||||
// Ping/pong keepalive every 30s
|
||||
// Ping/pong keepalive every 30s — terminate connections that miss a pong.
|
||||
this.pingInterval = setInterval(() => {
|
||||
if (!this.wss) return;
|
||||
for (const ws of this.wss.clients) {
|
||||
if (ws.readyState === 1) {
|
||||
ws.ping();
|
||||
if (!this.alive.get(ws)) {
|
||||
ws.terminate(); // no pong since last ping — dead connection
|
||||
continue;
|
||||
}
|
||||
this.alive.set(ws, false);
|
||||
if (ws.readyState === 1) ws.ping();
|
||||
}
|
||||
}, 30_000);
|
||||
this.pingInterval.unref();
|
||||
|
||||
@@ -14,7 +14,8 @@ export interface ServerMessage {
|
||||
|
||||
export type ClientMessageType =
|
||||
| 'query' | 'permission-response' | 'ask-response' | 'abort'
|
||||
| 'reconnect' | 'set-permission-mode' | 'plan-response';
|
||||
| 'reconnect' | 'set-permission-mode' | 'plan-response'
|
||||
| 'page-visibility' | 'app-pong';
|
||||
|
||||
export interface ClientMessage {
|
||||
type: ClientMessageType;
|
||||
|
||||
@@ -8,6 +8,8 @@ export const WS = {
|
||||
SET_PERMISSION_MODE: 'set-permission-mode',
|
||||
SET_MODEL: 'set-model',
|
||||
PLAN_RESPONSE: 'plan-response',
|
||||
PAGE_VISIBILITY: 'page-visibility',
|
||||
APP_PONG: 'app-pong',
|
||||
// Server → Client
|
||||
SESSION_STATE: 'session-state',
|
||||
SESSION_CREATED: 'session-created',
|
||||
@@ -39,6 +41,8 @@ export const WS = {
|
||||
REVIEW_ENDED: 'review-ended',
|
||||
// Task Progress
|
||||
TASK_STATE: 'task-state',
|
||||
// Push notification coordination
|
||||
APP_PING: 'app-ping',
|
||||
} as const;
|
||||
|
||||
export type WsType = typeof WS[keyof typeof WS];
|
||||
|
||||
Reference in New Issue
Block a user