feat(tasks): aggregated task progress FAB + bottom sheet
- Add TaskAggregator (server/stores) to unify TaskCreate/TaskUpdate/TodoWrite - Broadcast task-state snapshots via new WS event on tool events + reconnect - TaskFab: SVG progress ring with fade-out on completion, reappears on new tasks - TaskBottomSheet: full task list with dependencies, activeForm, expandable description - Remove inline TodoWrite rendering (TaskProgress), filter task tools from chat flow - Rebuild task state from JSONL history on server restart/reconnect Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
@@ -7,6 +7,7 @@ import { basename } from 'path';
|
||||
import type { ClientConnection } from './transport/client-connection.js';
|
||||
import { sessionReviews, sessionAdapters } from './db.js';
|
||||
import { parseAskQuestionInput } from './adapters/shared/ask-question-utils.js';
|
||||
import { TaskAggregator, TASK_TOOL_NAMES, TASK_TOOLS_ON_START } from './stores/task-aggregator.js';
|
||||
|
||||
/** Push notification options */
|
||||
interface PushOptions {
|
||||
@@ -53,6 +54,22 @@ const sessionAdapterMap = new Map<string, string>(); // sessionId -
|
||||
// under the old key. This alias map resolves old → new so late-connecting clients
|
||||
// find the correct session.
|
||||
const rekeyAliases = new Map<string, string>(); // oldKey -> newKey
|
||||
const sessionTaskState = new Map<string, TaskAggregator>(); // sessionId -> task aggregator
|
||||
|
||||
function getOrCreateAggregator(sessionId: string): TaskAggregator {
|
||||
let agg = sessionTaskState.get(sessionId);
|
||||
if (!agg) {
|
||||
agg = new TaskAggregator();
|
||||
sessionTaskState.set(sessionId, agg);
|
||||
}
|
||||
return agg;
|
||||
}
|
||||
|
||||
function broadcastTaskState(sessionId: string): void {
|
||||
const aggregator = sessionTaskState.get(sessionId);
|
||||
if (!aggregator?.hasTasks) return;
|
||||
broadcast(sessionId, { type: WS.TASK_STATE, ...aggregator.getSnapshot() });
|
||||
}
|
||||
|
||||
export function setupSessionManager(): void {
|
||||
const adapters = getAllAdapters();
|
||||
@@ -69,11 +86,24 @@ export function setupSessionManager(): void {
|
||||
adapter.on('tool-start', (sessionId: string, data: { toolName: string; [key: string]: unknown }) => {
|
||||
console.log(`[mgr] tool-start: ${data.toolName} for ${sessionId}`);
|
||||
broadcast(sessionId, { type: WS.TOOL_START, ...data });
|
||||
|
||||
// TaskUpdate/TodoWrite can be processed on start (input is sufficient).
|
||||
// TaskCreate is deferred to tool-done because we need the result text for the assigned ID.
|
||||
if (TASK_TOOLS_ON_START.has(data.toolName)) {
|
||||
getOrCreateAggregator(sessionId).processToolUse(data.toolName, (data.input as Record<string, unknown>) || {});
|
||||
broadcastTaskState(sessionId);
|
||||
}
|
||||
});
|
||||
|
||||
adapter.on('tool-done', (sessionId: string, data: { toolName: string; [key: string]: unknown }) => {
|
||||
adapter.on('tool-done', (sessionId: string, data: { toolName: string; result?: any; [key: string]: unknown }) => {
|
||||
console.log(`[mgr] tool-done: ${data.toolName} for ${sessionId}`);
|
||||
broadcast(sessionId, { type: WS.TOOL_DONE, ...data });
|
||||
|
||||
if (TASK_TOOL_NAMES.has(data.toolName)) {
|
||||
const resultText = typeof data.result?.content === 'string' ? data.result.content : '';
|
||||
getOrCreateAggregator(sessionId).processToolUse(data.toolName, (data.input as Record<string, unknown>) || {}, resultText);
|
||||
broadcastTaskState(sessionId);
|
||||
}
|
||||
});
|
||||
|
||||
adapter.on('new-messages', (sessionId: string, messages: Array<{ role: string; [key: string]: unknown }>) => {
|
||||
@@ -166,6 +196,7 @@ export function setupSessionManager(): void {
|
||||
// THEN clean up maps
|
||||
sessionClients.delete(sessionId);
|
||||
sessionAdapterMap.delete(sessionId);
|
||||
sessionTaskState.delete(sessionId);
|
||||
// Clean rekey alias pointing to this session
|
||||
for (const [oldKey, newKey] of rekeyAliases) {
|
||||
if (newKey === sessionId) rekeyAliases.delete(oldKey);
|
||||
@@ -214,6 +245,12 @@ export function setupSessionManager(): void {
|
||||
sessionAdapterMap.delete(oldKey);
|
||||
sessionAdapterMap.set(newKey, adapterName);
|
||||
}
|
||||
// Move task state
|
||||
const taskState = sessionTaskState.get(oldKey);
|
||||
if (taskState) {
|
||||
sessionTaskState.delete(oldKey);
|
||||
sessionTaskState.set(newKey, taskState);
|
||||
}
|
||||
// Update any active reviews that reference the old key as child (FIX 3)
|
||||
sessionReviews.updateChildCliId(oldKey, newKey);
|
||||
// Send SESSION_CREATED with the real UUID — for pendingRekey adapters,
|
||||
@@ -438,16 +475,38 @@ export async function handleReconnect(conn: ClientConnection, sessionId?: string
|
||||
// that would duplicate what HISTORY_LOAD delivers
|
||||
adapter.syncWatcherPosition(resolvedId);
|
||||
|
||||
// Send current messages from store (full history for reconnection)
|
||||
const isStreaming = adapter.isProcessing(resolvedId);
|
||||
let historyMessages: unknown[] = [];
|
||||
try {
|
||||
const { messages } = await adapter.getMessages(resolvedId);
|
||||
if (messages.length > 0) {
|
||||
send(conn, { type: WS.HISTORY_LOAD, messages });
|
||||
}
|
||||
({ messages: historyMessages } = await adapter.getMessages(resolvedId));
|
||||
} catch {}
|
||||
// Rebuild task state from history if not already cached (e.g. after server restart)
|
||||
if (!sessionTaskState.has(resolvedId) && historyMessages.length > 0) {
|
||||
const aggregator = new TaskAggregator();
|
||||
for (const msg of historyMessages as Array<{ role?: string; content?: any[] }>) {
|
||||
if (msg.role !== 'assistant' || !Array.isArray(msg.content)) continue;
|
||||
for (const block of msg.content) {
|
||||
if (block.type === 'tool_use' && TASK_TOOL_NAMES.has(block.name)) {
|
||||
const resultText = block._result?.content;
|
||||
aggregator.processToolUse(block.name, block.input || {}, typeof resultText === 'string' ? resultText : undefined);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (aggregator.hasTasks) {
|
||||
sessionTaskState.set(resolvedId, aggregator);
|
||||
}
|
||||
}
|
||||
|
||||
// Notify client if session is actively processing
|
||||
if (adapter.isProcessing(resolvedId)) {
|
||||
send(conn, { type: WS.HISTORY_LOAD, messages: historyMessages, streaming: isStreaming });
|
||||
|
||||
// Send accumulated task state if available
|
||||
const taskAgg = sessionTaskState.get(resolvedId);
|
||||
if (taskAgg?.hasTasks) {
|
||||
send(conn, { type: WS.TASK_STATE, ...taskAgg.getSnapshot() });
|
||||
}
|
||||
|
||||
// Fallback: client may receive broadcasts before HISTORY_LOAD during the async gap
|
||||
if (isStreaming) {
|
||||
send(conn, { type: WS.SESSION_STATE, streaming: true });
|
||||
}
|
||||
|
||||
|
||||
@@ -0,0 +1,124 @@
|
||||
/**
|
||||
* Pure aggregator: processes tool_use blocks (TaskCreate, TaskUpdate, TodoWrite)
|
||||
* and produces a unified AggregatedTask[] snapshot.
|
||||
*
|
||||
* Stateful — call processToolUse() for each relevant tool call in order.
|
||||
* Call getSnapshot() to read current state (cached, invalidated on changes).
|
||||
*/
|
||||
|
||||
export interface AggregatedTask {
|
||||
id: string;
|
||||
subject: string;
|
||||
description?: string;
|
||||
activeForm?: string;
|
||||
status: 'pending' | 'in_progress' | 'completed';
|
||||
blockedBy?: string[];
|
||||
blocks?: string[];
|
||||
source: 'task-api' | 'todo-write';
|
||||
}
|
||||
|
||||
export interface TaskSnapshot {
|
||||
tasks: AggregatedTask[];
|
||||
completed: number;
|
||||
total: number;
|
||||
}
|
||||
|
||||
/** Tool names that produce task state changes. */
|
||||
export const TASK_TOOL_NAMES = new Set(['TaskCreate', 'TaskUpdate', 'TodoWrite']);
|
||||
/** Subset handled on tool-start (TaskCreate needs result text, so it's deferred to tool-done). */
|
||||
export const TASK_TOOLS_ON_START = new Set(['TaskUpdate', 'TodoWrite']);
|
||||
|
||||
export class TaskAggregator {
|
||||
private taskApiTasks = new Map<string, AggregatedTask>();
|
||||
private todoWriteTasks = new Map<string, AggregatedTask>();
|
||||
private cachedSnapshot: TaskSnapshot | null = null;
|
||||
|
||||
/** Process a single tool_use block. Returns true if state changed. */
|
||||
processToolUse(toolName: string, input: Record<string, unknown>, resultText?: string): boolean {
|
||||
switch (toolName) {
|
||||
case 'TaskCreate': {
|
||||
const match = resultText?.match(/Task #(\d+)/);
|
||||
const id = match?.[1] || String(this.taskApiTasks.size + 1);
|
||||
this.taskApiTasks.set(id, {
|
||||
id,
|
||||
subject: (input.subject as string) || '',
|
||||
description: (input.description as string) || undefined,
|
||||
activeForm: (input.activeForm as string) || undefined,
|
||||
status: 'pending',
|
||||
source: 'task-api',
|
||||
});
|
||||
this.cachedSnapshot = null;
|
||||
return true;
|
||||
}
|
||||
|
||||
case 'TaskUpdate': {
|
||||
const taskId = input.taskId as string;
|
||||
if (!taskId) return false;
|
||||
const existing = this.taskApiTasks.get(taskId);
|
||||
if (!existing) return false;
|
||||
if (input.status === 'deleted') {
|
||||
this.taskApiTasks.delete(taskId);
|
||||
this.cachedSnapshot = null;
|
||||
return true;
|
||||
}
|
||||
const updated: AggregatedTask = { ...existing };
|
||||
if (input.status) updated.status = input.status as AggregatedTask['status'];
|
||||
if (input.subject) updated.subject = input.subject as string;
|
||||
if (input.description) updated.description = input.description as string;
|
||||
if (input.activeForm) updated.activeForm = input.activeForm as string;
|
||||
if (input.addBlockedBy) {
|
||||
updated.blockedBy = [...(updated.blockedBy || []), ...(input.addBlockedBy as string[])];
|
||||
}
|
||||
if (input.addBlocks) {
|
||||
updated.blocks = [...(updated.blocks || []), ...(input.addBlocks as string[])];
|
||||
}
|
||||
this.taskApiTasks.set(taskId, updated);
|
||||
this.cachedSnapshot = null;
|
||||
return true;
|
||||
}
|
||||
|
||||
case 'TodoWrite': {
|
||||
this.todoWriteTasks.clear();
|
||||
const tasks = (input.tasks || input.todos || []) as Array<{
|
||||
id: string;
|
||||
content: string;
|
||||
status: string;
|
||||
}>;
|
||||
for (const t of tasks) {
|
||||
this.todoWriteTasks.set(t.id, {
|
||||
id: t.id,
|
||||
subject: t.content || '',
|
||||
status: (t.status as AggregatedTask['status']) || 'pending',
|
||||
source: 'todo-write',
|
||||
});
|
||||
}
|
||||
this.cachedSnapshot = null;
|
||||
return true;
|
||||
}
|
||||
|
||||
default:
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
getSnapshot(): TaskSnapshot {
|
||||
if (this.cachedSnapshot) return this.cachedSnapshot;
|
||||
const tasks: AggregatedTask[] = [
|
||||
...this.taskApiTasks.values(),
|
||||
...this.todoWriteTasks.values(),
|
||||
];
|
||||
const completed = tasks.filter(t => t.status === 'completed').length;
|
||||
this.cachedSnapshot = { tasks, completed, total: tasks.length };
|
||||
return this.cachedSnapshot;
|
||||
}
|
||||
|
||||
get hasTasks(): boolean {
|
||||
return this.taskApiTasks.size > 0 || this.todoWriteTasks.size > 0;
|
||||
}
|
||||
|
||||
clear(): void {
|
||||
this.taskApiTasks.clear();
|
||||
this.todoWriteTasks.clear();
|
||||
this.cachedSnapshot = null;
|
||||
}
|
||||
}
|
||||
@@ -37,6 +37,8 @@ export const WS = {
|
||||
// Cross-AI Review
|
||||
REVIEW_STARTED: 'review-started',
|
||||
REVIEW_ENDED: 'review-ended',
|
||||
// Task Progress
|
||||
TASK_STATE: 'task-state',
|
||||
} as const;
|
||||
|
||||
export type WsType = typeof WS[keyof typeof WS];
|
||||
|
||||
Reference in New Issue
Block a user