Real-time Architecture Home
Internals

Real-time Architecture

How a status change in the Node.js daemon reaches the browser in under 50 ms. Slop runs workers autonomously for minutes at a time - without live updates the board would be a frozen snapshot and the worker detail page would show nothing.

Event Bus SSE Stream useWorkerEvents Summarizer No polling

Four Interlocking Systems

Real-time in Slop is built from four components that each do exactly one job. Together they form a pipeline from daemon action to React re-render with no polling, no WebSocket server, and no external message broker.

๐Ÿ“ก
Event Bus
src/server/events/bus.ts

In-process pub-sub singleton. Daemon and worker components publish events; the SSE stream subscribes. Also writes every event to SQLite for persistence.

๐ŸŒŠ
SSE Stream
src/app/api/events/

A GET /api/events route that holds an HTTP connection open and forwards every bus event to the browser as a standard Server-Sent Events data frame.

โšก
useWorkerEvents
src/app/_components/use-worker-events.ts

React hook consumed by board columns and the worker detail page. Connects to the SSE endpoint, parses events, and applies them to local worker state.

๐Ÿ”
Summarizer
src/server/runs/summarize.ts

Converts raw agent SDK messages into compact, human-readable log lines. Tracks tool IDs across the stream so tool results can be attributed to the correct tool.

The Event Bus

The event bus is a lightweight in-process pub-sub built on a Set<Subscriber>. It is created once at daemon boot, registered on globalThis under a well-known Symbol.for("slop.eventBus") key, and injected into every component that needs to publish or subscribe via the DaemonDeps object. Nothing imports the singleton directly; everything receives it as a constructor argument so tests can swap in a fake.

Factory and registration

// Created once at daemon boot (src/server/daemon/boot.ts)
const bus = createEventBus({ eventRepo });
setEventBus(bus);        // registers on globalThis

// The SSE route retrieves it at request time
const bus = getEventBus(); // throws if not initialized

What publish() does

Calling bus.publish(event) triggers two actions in sequence:

  1. Persist to SQLite. If the event carries a workerId or runId, it is written to the Event table via eventRepo.appendEvent. The human-readable message comes from formatEvent in src/lib/event-format.ts, which is the single source of truth for event-to-string conversion.
  2. Fan out to subscribers. Every registered in-memory subscriber (including the SSE stream) receives the event synchronously. If a subscriber throws, the error is logged and the remaining subscribers still receive the event.
Why globalThis? Next.js runs the App Router and the daemon in the same Node.js process but potentially across different module instances due to hot reload. A Symbol.for() key on globalThis survives module reloads and ensures all callers share the same singleton bus instance.

Event type reference

Every event is typed by the BusEvent discriminated union in src/types/bus-event.ts.

TypeCategoryWhen publishedKey payload fields
worker.claimed Worker Daemon tick claims a ready issue workerId, repoId, issueNumber, issueTitle, status
worker.state_changed Worker Worker transitions between statuses workerId, from, to, prNumber (optional)
worker.event_logged Worker Summarizer emits a log line during a run workerId, message, level, rawContent
worker.failed Worker Worker exits with an unrecoverable error workerId, error
worker.completed Worker Worker successfully merges its PR workerId, prNumber
worker.github_state_changed Worker Snapshot poll detects a GitHub state change workerId, issueState, prState
run.started Run On-demand skill invocation begins runId
run.event_logged Run Skill emits a log line runId, message, level
run.failed Run Skill invocation fails runId, error
run.completed Run Skill invocation finishes successfully runId
repo.updated Repo Snapshot poll finishes a fetch cycle repo, fetchedAt
repo.health_changed Repo CI health controller updates status repo, status (red/green/pending)
resource.sampled Resource Worker resource poll samples RSS and CPU app, workers[] (rssBytes, cpuPct per worker)
batch.created Batch A new issue batch is created batchId, repoId
batch.issue_added Batch An issue is added to a batch batchId, issueNumber
Testing. Tests never use the real singleton. They call createEventBus({ eventRepo }) directly and pass the result into the component under test. resetEventBusForTests() nulls out the global so a misbehaving test cannot leak bus state to the next one.

SSE Stream

Server-Sent Events (SSE) is a simple HTTP protocol where the server holds the connection open and pushes newline-delimited frames. The browser handles reconnection automatically via the EventSource API - no WebSocket handshake, no custom framing protocol.

Endpoint

GET /api/events
Response headers:
Content-Type:  text/event-stream
Cache-Control: no-cache
Connection:    keep-alive

The route handler in src/app/api/events/route.ts is marked export const dynamic = "force-dynamic". This prevents Next.js from ever trying to statically render or cache it. The handler calls createEventStream(getEventBus(), { signal: request.signal }) and returns the ReadableStream directly as the response body.

Event wire format

Each bus event is encoded as a standard SSE data frame:

// A state transition event on the wire
data: {"type":"worker.state_changed","workerId":"wk_abc123","from":"implementing","to":"merging","at":"2026-06-13T14:22:00.000Z"}
                                                                                                      ^^
                                                                                              double newline terminates frame

// Heartbeat comment (every 25 seconds)
: ping

The double newline (\n\n) at the end of each frame is the SSE protocol delimiter. The browser's EventSource parser splits on it automatically. The heartbeat comment (lines beginning with :) is ignored by the browser but keeps proxies and load balancers from treating a silent connection as dead.

Stream lifecycle

The createEventStream function returns a ReadableStream<Uint8Array>. Inside its start() callback it:

  1. Subscribes to the event bus. Every published event is immediately encoded and enqueued to the stream.
  2. Starts a setInterval that enqueues a : ping\n\n comment every 25 seconds.
  3. Attaches an abort listener to request.signal. When the browser disconnects, the signal fires, cleanup() runs: the bus subscription is removed, the interval is cleared.
No replay on connect. The current implementation does not replay recent persisted events when a new client connects. The hook handles this by fetching a fresh snapshot from /api/workers on every successful (re)connect. Persisted events in the Event table are available for the worker detail log panel, which loads them directly from the server on mount.

Typed event shape

The browser hook validates incoming data with isBusEvent() before processing it. This function checks the type field against the full BUS_EVENT_TYPES set, so malformed or future-unknown event types are silently discarded rather than crashing the state update.

// src/types/bus-event.ts
export function isBusEvent(value: unknown): value is BusEvent {
  if (typeof value !== "object" || value === null) return false;
  const type = (value as Record<string, unknown>).type;
  return typeof type === "string" && BUS_EVENT_TYPES.has(type as BusEvent["type"]);
}

useWorkerEvents Hook

This is the single consumer of the SSE stream in the browser. All board columns and the worker detail page call it. It owns the live worker list for the duration of the component's mount.

Signature

function useWorkerEvents(
  initialWorkers: WorkerRow[],           // server-rendered snapshot from page props
  initialResources: ResourceSnapshot | null,
  repoId: string | null,
): UseWorkerEventsResult

// Returns:
type UseWorkerEventsResult = {
  workers:   WorkerRow[];           // live-updating worker list
  resources: ResourceSnapshot | null; // latest RSS/CPU sample
  connected: boolean;               // false while reconnecting
  refresh:   () => Promise<void>;   // force a full snapshot fetch
  subscribe: (cb: LiveEventSubscriber) => () => void; // raw event tap
};

What SSE drives vs. what stays in initialWorkers

This is the most important design decision in the hook. Not all worker fields arrive via SSE. Some are fetched once at page load and remain stable; others are updated continuously via events.

Updated via SSE
  • status - updated on every worker.state_changed, worker.failed, worker.completed
  • statusBeforePause - captured when transitioning to paused
  • lastError - set when a worker.failed event arrives
  • New worker rows - worker.claimed prepends a fresh row to the list
  • Resource metrics - resource.sampled updates the RSS/CPU snapshot
Server prop (initialWorkers)
  • prNumber, prUrl - stable once a PR is opened; refreshed by snapshot
  • ciStatus - lives in the database, not emitted as a bus event
  • costUsd, netTokens - computed from WorkerReport rows after a run
  • issueTitle, issueUrl and all other issue metadata
  • waitingMergeMs - accumulated in DB, fetched via snapshot on specific transitions

The rationale: SSE events carry the minimum data needed for live status display. Loading all PR and CI fields into bus events would bloat every worker.state_changed message and duplicate data already correctly served by the REST snapshot. On certain transitions that are known to have updated cost or timing data - such as waiting_merge to merging - the hook automatically re-fetches the full snapshot from /api/workers.

The applyEvent reducer

State is updated by a pure function exported for testing. The board never mutates worker rows directly.

// src/app/_components/use-worker-events.ts
export function applyEvent(state: WorkerRow[], event: BusEvent): WorkerRow[] {
  switch (event.type) {
    case "worker.claimed":
      // prepend a new row if not already present (idempotent)
      if (state.some((w) => w.id === event.workerId)) return state;
      return [buildRow(event), ...state];
    case "worker.state_changed":
      return state.map((w) =>
        w.id !== event.workerId ? w : {
          ...w,
          status: event.to,
          ...(event.to === "paused" ? { statusBeforePause: event.from } : {}),
        }
      );
    // ... worker.failed, worker.completed, worker.github_state_changed ...
    default:
      return state; // repo, batch, resource events do not affect worker rows
  }
}

Reconnection with exponential backoff

const RECONNECT_INITIAL_MS = 1_000;
const RECONNECT_MAX_MS     = 30_000;

// On onerror:
source.close();
setConnected(false);
scheduleReconnect(); // delay doubles each attempt, caps at 30 s

// On onopen:
reconnectDelay.current = RECONNECT_INITIAL_MS; // reset
setConnected(true);
void fetchSnapshot();  // re-sync missed events

The connected boolean lets the UI show a subtle indicator when disconnected. The reconnection itself is invisible to the user - the snapshot fetch on reconnect catches up any state changes that arrived while the connection was down.

No timestamp-based keys. The hook uses a useRef counter (not Date.now()) when assigning React keys to newly appended live log entries. Events can arrive within the same millisecond during rapid state transitions, so timestamps produce duplicate keys and React renders incorrectly. A monotonically incrementing counter is always unique.

Event Summarization

The Claude agent SDK emits a stream of structured SDKMessage objects: assistant turns with content blocks, tool call frames, tool results, system init messages, and streaming deltas. These objects are not suitable for display. The summarizer converts them into compact, human-readable log lines that appear on the worker detail page.

Factory: one summarizer per session

// src/server/runs/summarize.ts
export function createEventSummarizer(): (event: DriverEvent) => SummarizedLine | null {
  const toolNames = new Map<string, string>(); // tool_use_id -> tool_name
  return (event) => {
    if (event.type === "message" && event.payload.type === "assistant") {
      extractToolNames(event.payload.message?.content, toolNames);
    }
    return summarizeEvent(event, toolNames);
  };
}

Create one instance per driver session and never share it across sessions. The toolNames map is session-scoped: tool_use_id values are only unique within a single agent session, not globally.

Why tool attribution matters

Tool results arrive in a separate SDK message from the tool call itself, identified only by tool_use_id. Without the map, a tool_result event cannot know which tool produced it. This matters because the summarizer intentionally drops most tool results as noise - a Read or Grep result is usually a file dump that adds nothing to the log. Signal is extracted only from Bash results (test tallies, exit codes). Without tool attribution, a file that happens to contain the text "54 errors" would be logged as a phantom failure.

// Only Bash output carries test/build tallies worth a log line.
// A Read/Grep file dump matching COUNT_RE is a false positive, so skip it.
const toolName = toolNames.get(b.tool_use_id);
if (toolName !== undefined && toolName !== "Bash") return null;

Noise filtering

The summarizer returns null for events that carry no actionable information:

  • Streaming deltas and status pings (emitted frequently by the SDK; never complete)
  • Empty text blocks in assistant messages
  • Read, Grep, and other non-Bash tool results without test/build signal
  • system messages with subtypes other than init
Examples: in (SDK event type) vs. out (log line)
assistant text block: "I'll start by reading the migration file"
assistant: I'll start by reading the migration file

tool_use: Bash("pnpm test")
Bash(pnpm test)

tool_result for Bash: "54 passed, 0 failed"
-> 54 passed

tool_result for Read: (file contents with "54 error" inside)
[dropped - not a Bash result]

TodoWrite({ todos: [...] })
tasks: โœ“ Write migration ยท โ–ถ Update types ยท โ˜ Add tests

streaming delta, status ping
[dropped - no complete information]

SummarizedLine shape

type SummarizedLine = {
  message: string;    // human-readable, max 200 chars (condensed)
  level:   LogLevel;  // "info" | "warn" | "error"
};

Each SummarizedLine is published as a worker.event_logged or run.event_logged bus event, which is then persisted to the Event table and forwarded to all connected browsers via SSE.

Complete Event Flow

Here is the exact path a status change takes from the daemon to the browser. On localhost with no network round trip, total latency from state change to visible re-render is typically under 50 ms.

Daemon / Worker (src/server/workers/worker.ts) | | transition("implementing", "merging") calls bus.publish(event) v Event Bus (src/server/events/bus.ts) | +----[1. persist]----> SQLite Event table (for log panel replay) | +----[2. fan-out]---> SSE stream subscriber (src/app/api/events/stream.ts) | | encoder.encode(`data: ${JSON.stringify(event)}\n\n`) v ReadableStream (HTTP chunked transfer) | | HTTP/1.1 keep-alive connection v Browser EventSource (/api/events) | | source.onmessage fires | isBusEvent(event) validates type | applyEvent(state, event) updates list v useWorkerEvents hook | | setWorkers(newState) triggers re-render v React state -> board card moves to Merging column

Step-by-step: implementing to merging

1

Worker calls transition()

The worker in src/server/workers/worker.ts calls its internal transition("implementing", "merging") helper, which in turn calls bus.publish({ type: "worker.state_changed", from: "implementing", to: "merging", workerId: "wk_abc", at: new Date() }).

2

Bus persists to SQLite

The bus calls formatEvent(event) to get the human-readable string "state implementing -> merging" and writes a row to the Event table. This is what the worker detail log panel reads when it first loads.

3

Bus fans out to SSE subscriber

The SSE stream's subscription callback fires. It encodes data: {...}\n\n and enqueues the bytes to the ReadableStream.

4

HTTP sends the chunk

Node.js flushes the enqueued bytes over the open HTTP connection. Because SSE uses chunked transfer encoding, the browser receives the frame as soon as it is written - no buffering.

5

Browser EventSource fires onmessage

The browser's EventSource parser receives the frame, splits on \n\n, and calls source.onmessage with msg.data set to the JSON string.

6

Hook validates and applies

JSON.parse(msg.data) produces the event object. isBusEvent(event) confirms it is a known type. setWorkers((current) => applyEvent(current, event)) maps over the current worker list and returns a new array with the matched row's status set to "merging".

7

React re-renders

The new state triggers a re-render of every component subscribed to the workers array. The board card for this issue moves from the In Progress column to the Merging column. No page reload, no polling, no manual refresh.

Mermaid diagram. The following diagram shows the same flow in a more visual format.
flowchart LR subgraph Server["Node.js Process"] W["Worker\nsrc/server/workers/"] -->|"bus.publish(event)"| B["Event Bus\nsrc/server/events/bus.ts"] B -->|"appendEvent()"| DB[("SQLite\nEvent table")] B -->|"handler(event)"| SSE["SSE Stream\nsrc/app/api/events/stream.ts"] end subgraph Browser ES["EventSource\n/api/events"] -->|"onmessage"| H["useWorkerEvents\nhook"] H -->|"setWorkers()"| UI["React State\nBoard re-render"] end SSE -->|"HTTP keep-alive\ndata: JSON"| ES style B fill:#2d1f5e,stroke:#a78bfa,color:#e2e4f0 style SSE fill:#1a2a3e,stroke:#4ea8de,color:#e2e4f0 style H fill:#3e2010,stroke:#f5924e,color:#e2e4f0 style DB fill:#1e2a1a,stroke:#2dd4aa,color:#e2e4f0