Real-time Architecture
How a status change in the Node.js daemon reaches the browser in under 50 ms. Slop runs workers autonomously for minutes at a time - without live updates the board would be a frozen snapshot and the worker detail page would show nothing.
Four Interlocking Systems
Real-time in Slop is built from four components that each do exactly one job. Together they form a pipeline from daemon action to React re-render with no polling, no WebSocket server, and no external message broker.
In-process pub-sub singleton. Daemon and worker components publish events; the SSE stream subscribes. Also writes every event to SQLite for persistence.
A GET /api/events route that holds an HTTP connection open and forwards every bus event to the browser as a standard Server-Sent Events data frame.
React hook consumed by board columns and the worker detail page. Connects to the SSE endpoint, parses events, and applies them to local worker state.
Converts raw agent SDK messages into compact, human-readable log lines. Tracks tool IDs across the stream so tool results can be attributed to the correct tool.
The Event Bus
The event bus is a lightweight in-process pub-sub built on a Set<Subscriber>. It is created once at daemon boot, registered on globalThis under a well-known Symbol.for("slop.eventBus") key, and injected into every component that needs to publish or subscribe via the DaemonDeps object. Nothing imports the singleton directly; everything receives it as a constructor argument so tests can swap in a fake.
Factory and registration
// Created once at daemon boot (src/server/daemon/boot.ts) const bus = createEventBus({ eventRepo }); setEventBus(bus); // registers on globalThis // The SSE route retrieves it at request time const bus = getEventBus(); // throws if not initialized
What publish() does
Calling bus.publish(event) triggers two actions in sequence:
- Persist to SQLite. If the event carries a
workerIdorrunId, it is written to theEventtable viaeventRepo.appendEvent. The human-readable message comes fromformatEventinsrc/lib/event-format.ts, which is the single source of truth for event-to-string conversion. - Fan out to subscribers. Every registered in-memory subscriber (including the SSE stream) receives the event synchronously. If a subscriber throws, the error is logged and the remaining subscribers still receive the event.
Symbol.for() key on globalThis survives module reloads and ensures all callers share the same singleton bus instance.
Event type reference
Every event is typed by the BusEvent discriminated union in src/types/bus-event.ts.
| Type | Category | When published | Key payload fields |
|---|---|---|---|
worker.claimed |
Worker | Daemon tick claims a ready issue | workerId, repoId, issueNumber, issueTitle, status |
worker.state_changed |
Worker | Worker transitions between statuses | workerId, from, to, prNumber (optional) |
worker.event_logged |
Worker | Summarizer emits a log line during a run | workerId, message, level, rawContent |
worker.failed |
Worker | Worker exits with an unrecoverable error | workerId, error |
worker.completed |
Worker | Worker successfully merges its PR | workerId, prNumber |
worker.github_state_changed |
Worker | Snapshot poll detects a GitHub state change | workerId, issueState, prState |
run.started |
Run | On-demand skill invocation begins | runId |
run.event_logged |
Run | Skill emits a log line | runId, message, level |
run.failed |
Run | Skill invocation fails | runId, error |
run.completed |
Run | Skill invocation finishes successfully | runId |
repo.updated |
Repo | Snapshot poll finishes a fetch cycle | repo, fetchedAt |
repo.health_changed |
Repo | CI health controller updates status | repo, status (red/green/pending) |
resource.sampled |
Resource | Worker resource poll samples RSS and CPU | app, workers[] (rssBytes, cpuPct per worker) |
batch.created |
Batch | A new issue batch is created | batchId, repoId |
batch.issue_added |
Batch | An issue is added to a batch | batchId, issueNumber |
createEventBus({ eventRepo }) directly and pass the result into the component under test. resetEventBusForTests() nulls out the global so a misbehaving test cannot leak bus state to the next one.
SSE Stream
Server-Sent Events (SSE) is a simple HTTP protocol where the server holds the connection open and pushes newline-delimited frames. The browser handles reconnection automatically via the EventSource API - no WebSocket handshake, no custom framing protocol.
Endpoint
GET /api/events Response headers: Content-Type: text/event-stream Cache-Control: no-cache Connection: keep-alive
The route handler in src/app/api/events/route.ts is marked export const dynamic = "force-dynamic". This prevents Next.js from ever trying to statically render or cache it. The handler calls createEventStream(getEventBus(), { signal: request.signal }) and returns the ReadableStream directly as the response body.
Event wire format
Each bus event is encoded as a standard SSE data frame:
// A state transition event on the wire data: {"type":"worker.state_changed","workerId":"wk_abc123","from":"implementing","to":"merging","at":"2026-06-13T14:22:00.000Z"} ^^ double newline terminates frame // Heartbeat comment (every 25 seconds) : ping
The double newline (\n\n) at the end of each frame is the SSE protocol delimiter. The browser's EventSource parser splits on it automatically. The heartbeat comment (lines beginning with :) is ignored by the browser but keeps proxies and load balancers from treating a silent connection as dead.
Stream lifecycle
The createEventStream function returns a ReadableStream<Uint8Array>. Inside its start() callback it:
- Subscribes to the event bus. Every published event is immediately encoded and enqueued to the stream.
- Starts a
setIntervalthat enqueues a: ping\n\ncomment every 25 seconds. - Attaches an
abortlistener torequest.signal. When the browser disconnects, the signal fires,cleanup()runs: the bus subscription is removed, the interval is cleared.
/api/workers on every successful (re)connect. Persisted events in the Event table are available for the worker detail log panel, which loads them directly from the server on mount.
Typed event shape
The browser hook validates incoming data with isBusEvent() before processing it. This function checks the type field against the full BUS_EVENT_TYPES set, so malformed or future-unknown event types are silently discarded rather than crashing the state update.
// src/types/bus-event.ts export function isBusEvent(value: unknown): value is BusEvent { if (typeof value !== "object" || value === null) return false; const type = (value as Record<string, unknown>).type; return typeof type === "string" && BUS_EVENT_TYPES.has(type as BusEvent["type"]); }
useWorkerEvents Hook
This is the single consumer of the SSE stream in the browser. All board columns and the worker detail page call it. It owns the live worker list for the duration of the component's mount.
Signature
function useWorkerEvents( initialWorkers: WorkerRow[], // server-rendered snapshot from page props initialResources: ResourceSnapshot | null, repoId: string | null, ): UseWorkerEventsResult // Returns: type UseWorkerEventsResult = { workers: WorkerRow[]; // live-updating worker list resources: ResourceSnapshot | null; // latest RSS/CPU sample connected: boolean; // false while reconnecting refresh: () => Promise<void>; // force a full snapshot fetch subscribe: (cb: LiveEventSubscriber) => () => void; // raw event tap };
What SSE drives vs. what stays in initialWorkers
This is the most important design decision in the hook. Not all worker fields arrive via SSE. Some are fetched once at page load and remain stable; others are updated continuously via events.
status- updated on everyworker.state_changed,worker.failed,worker.completedstatusBeforePause- captured when transitioning topausedlastError- set when aworker.failedevent arrives- New worker rows -
worker.claimedprepends a fresh row to the list - Resource metrics -
resource.sampledupdates the RSS/CPU snapshot
prNumber,prUrl- stable once a PR is opened; refreshed by snapshotciStatus- lives in the database, not emitted as a bus eventcostUsd,netTokens- computed from WorkerReport rows after a runissueTitle,issueUrland all other issue metadatawaitingMergeMs- accumulated in DB, fetched via snapshot on specific transitions
The rationale: SSE events carry the minimum data needed for live status display. Loading all PR and CI fields into bus events would bloat every worker.state_changed message and duplicate data already correctly served by the REST snapshot. On certain transitions that are known to have updated cost or timing data - such as waiting_merge to merging - the hook automatically re-fetches the full snapshot from /api/workers.
The applyEvent reducer
State is updated by a pure function exported for testing. The board never mutates worker rows directly.
// src/app/_components/use-worker-events.ts export function applyEvent(state: WorkerRow[], event: BusEvent): WorkerRow[] { switch (event.type) { case "worker.claimed": // prepend a new row if not already present (idempotent) if (state.some((w) => w.id === event.workerId)) return state; return [buildRow(event), ...state]; case "worker.state_changed": return state.map((w) => w.id !== event.workerId ? w : { ...w, status: event.to, ...(event.to === "paused" ? { statusBeforePause: event.from } : {}), } ); // ... worker.failed, worker.completed, worker.github_state_changed ... default: return state; // repo, batch, resource events do not affect worker rows } }
Reconnection with exponential backoff
const RECONNECT_INITIAL_MS = 1_000; const RECONNECT_MAX_MS = 30_000; // On onerror: source.close(); setConnected(false); scheduleReconnect(); // delay doubles each attempt, caps at 30 s // On onopen: reconnectDelay.current = RECONNECT_INITIAL_MS; // reset setConnected(true); void fetchSnapshot(); // re-sync missed events
The connected boolean lets the UI show a subtle indicator when disconnected. The reconnection itself is invisible to the user - the snapshot fetch on reconnect catches up any state changes that arrived while the connection was down.
useRef counter (not Date.now()) when assigning React keys to newly appended live log entries. Events can arrive within the same millisecond during rapid state transitions, so timestamps produce duplicate keys and React renders incorrectly. A monotonically incrementing counter is always unique.
Event Summarization
The Claude agent SDK emits a stream of structured SDKMessage objects: assistant turns with content blocks, tool call frames, tool results, system init messages, and streaming deltas. These objects are not suitable for display. The summarizer converts them into compact, human-readable log lines that appear on the worker detail page.
Factory: one summarizer per session
// src/server/runs/summarize.ts export function createEventSummarizer(): (event: DriverEvent) => SummarizedLine | null { const toolNames = new Map<string, string>(); // tool_use_id -> tool_name return (event) => { if (event.type === "message" && event.payload.type === "assistant") { extractToolNames(event.payload.message?.content, toolNames); } return summarizeEvent(event, toolNames); }; }
Create one instance per driver session and never share it across sessions. The toolNames map is session-scoped: tool_use_id values are only unique within a single agent session, not globally.
Why tool attribution matters
Tool results arrive in a separate SDK message from the tool call itself, identified only by tool_use_id. Without the map, a tool_result event cannot know which tool produced it. This matters because the summarizer intentionally drops most tool results as noise - a Read or Grep result is usually a file dump that adds nothing to the log. Signal is extracted only from Bash results (test tallies, exit codes). Without tool attribution, a file that happens to contain the text "54 errors" would be logged as a phantom failure.
// Only Bash output carries test/build tallies worth a log line. // A Read/Grep file dump matching COUNT_RE is a false positive, so skip it. const toolName = toolNames.get(b.tool_use_id); if (toolName !== undefined && toolName !== "Bash") return null;
Noise filtering
The summarizer returns null for events that carry no actionable information:
- Streaming deltas and status pings (emitted frequently by the SDK; never complete)
- Empty text blocks in assistant messages
Read,Grep, and other non-Bash tool results without test/build signalsystemmessages with subtypes other thaninit
SummarizedLine shape
type SummarizedLine = { message: string; // human-readable, max 200 chars (condensed) level: LogLevel; // "info" | "warn" | "error" };
Each SummarizedLine is published as a worker.event_logged or run.event_logged bus event, which is then persisted to the Event table and forwarded to all connected browsers via SSE.
Complete Event Flow
Here is the exact path a status change takes from the daemon to the browser. On localhost with no network round trip, total latency from state change to visible re-render is typically under 50 ms.
Step-by-step: implementing to merging
Worker calls transition()
The worker in src/server/workers/worker.ts calls its internal transition("implementing", "merging") helper, which in turn calls bus.publish({ type: "worker.state_changed", from: "implementing", to: "merging", workerId: "wk_abc", at: new Date() }).
Bus persists to SQLite
The bus calls formatEvent(event) to get the human-readable string "state implementing -> merging" and writes a row to the Event table. This is what the worker detail log panel reads when it first loads.
Bus fans out to SSE subscriber
The SSE stream's subscription callback fires. It encodes data: {...}\n\n and enqueues the bytes to the ReadableStream.
HTTP sends the chunk
Node.js flushes the enqueued bytes over the open HTTP connection. Because SSE uses chunked transfer encoding, the browser receives the frame as soon as it is written - no buffering.
Browser EventSource fires onmessage
The browser's EventSource parser receives the frame, splits on \n\n, and calls source.onmessage with msg.data set to the JSON string.
Hook validates and applies
JSON.parse(msg.data) produces the event object. isBusEvent(event) confirms it is a known type. setWorkers((current) => applyEvent(current, event)) maps over the current worker list and returns a new array with the matched row's status set to "merging".
React re-renders
The new state triggers a re-render of every component subscribed to the workers array. The board card for this issue moves from the In Progress column to the Merging column. No page reload, no polling, no manual refresh.