Event Streaming

Overview

Event streaming enables real-time push notifications to connected WebSocket and Unix socket clients. When significant events occur (new messages), the daemon automatically pushes notifications to subscribed clients.

The WebSocket server and embedded SPA are served on the same port (default 9999, configurable via THRUM_WS_PORT). WebSocket connections use the /ws endpoint when the UI is active, or / when running without UI.

Architecture

Components

  1. Broadcaster (internal/daemon/broadcaster.go)

    • Unified notification sender for both Unix socket and WebSocket clients
    • Implements the ClientNotifier interface expected by the subscription dispatcher
    • Tries Unix socket transport first, then WebSocket as fallback
    • Handles client disconnections gracefully
    • Thread-safe with sync.RWMutex
  2. Subscription Dispatcher (internal/subscriptions/dispatcher.go)

    • Matches events against active subscriptions
    • Filters events based on scopes, mentions (role-based and name-based), and subscription types
    • Pushes notifications to matched subscribers via the Broadcaster
  3. Event Streaming Setup (internal/daemon/event_streaming.go)

    • Factory for wiring together Broadcaster and Dispatcher
    • Two convenience constructors:
      • NewEventStreamingSetup(unixClients, wsServer, db) - from raw components
      • NewEventStreamingSetupFromState(state, unixClients, wsServer) - from daemon state
    • Returns EventStreamingSetup struct with Broadcaster and Dispatcher fields

Data Flow

Event Source (message.send, message.edit)
  |
  v
Subscription Dispatcher
  | (query subscriptions, match against message scopes/mentions)
  v
Broadcaster
  |         |
  v         v
Unix Socket    WebSocket
Clients        Clients (port 9999, /ws endpoint)

Note: All WebSocket connections enforce a 10s handshake timeout. Server-side requests have a 10s per-request timeout (v0.4.3).

Implementation Details

Supported Notifications

Currently implemented:

Notification Format

Notifications use JSON-RPC 2.0 notification format (no id field, no response expected):

{
  "jsonrpc": "2.0",
  "method": "notification.message",
  "params": {
    "message_id": "msg_...",
    "author": {
      "agent_id": "furiosa",
      "name": "furiosa",
      "role": "implementer",
      "module": ""
    },
    "preview": "First 100 characters of content...",
    "scopes": [{ "type": "task", "value": "thrum-ukr" }],
    "matched_subscription": {
      "subscription_id": 1,
      "match_type": "scope"
    },
    "timestamp": "2026-02-03T10:00:00Z"
  }
}

Subscription Filtering

The dispatcher automatically filters events based on subscriptions:

Client Buffer Management

Both Unix socket and WebSocket connections use buffered I/O:

Usage

Daemon Initialization

When starting the daemon, create the event streaming infrastructure:

// Create daemon state
st, _ := state.NewState(thrumDir, syncDir, repoID)

// Create client registries
unixClients := daemon.NewClientRegistry()

// Create WebSocket server with handler registry and optional UI filesystem
wsServer := websocket.NewServer(wsAddr, wsRegistry, uiFS)

// Set up event streaming (wires Broadcaster + Dispatcher)
eventSetup := daemon.NewEventStreamingSetupFromState(st, unixClients, wsServer)

// Create message handler with the dispatcher for push notifications
messageHandler := rpc.NewMessageHandlerWithDispatcher(st, eventSetup.Dispatcher)

// Register handlers on both Unix socket and WebSocket registries...

Client Subscription

Clients subscribe via the subscribe RPC method:

{
  "jsonrpc": "2.0",
  "method": "subscribe",
  "params": {
    "scope": { "type": "task", "value": "thrum-ukr" }
  },
  "id": 1
}

Or subscribe to mentions:

{
  "jsonrpc": "2.0",
  "method": "subscribe",
  "params": {
    "mention_role": "reviewer"
  },
  "id": 1
}

Or subscribe to all messages (firehose):

{
  "jsonrpc": "2.0",
  "method": "subscribe",
  "params": {
    "all": true
  },
  "id": 1
}

Receiving Notifications

WebSocket clients receive notifications as JSON-RPC notifications (no response required):

// WebSocket client example
const ws = new WebSocket("ws://localhost:9999/ws");

ws.onmessage = (event) => {
  const notification = JSON.parse(event.data);
  if (notification.method === "notification.message") {
    console.log("New message:", notification.params.preview);
  } else if (notification.method === "notification.thread.updated") {
    console.log("Thread updated:", notification.params.thread_id);
  }
};

MCP Server Integration

The MCP server (thrum mcp serve) uses WebSocket notifications for its wait_for_message tool. It connects to the daemon's WebSocket endpoint and subscribes to notifications, enabling blocking message waits for agent sub-agents (like the message-listener pattern).

Testing

Comprehensive test coverage includes:

  1. Unit Tests (internal/daemon/broadcaster_test.go)

    • Broadcaster notification routing (WebSocket path)
    • Client not connected handling
    • Notification format conversion
  2. Integration Tests (internal/daemon/event_streaming_test.go)

    • End-to-end message notification flow with subscriptions
    • Subscription filtering (scope matching vs. non-matching)
    • Event streaming setup wiring
    • Mock notification receiver pattern
  3. Dispatcher Tests (internal/subscriptions/dispatcher_test.go)

    • Scope, mention, and "all" subscription matching
    • Name-based mention matching (@furiosa)
    • Multiple subscriptions per message
    • No subscriptions scenario

Run tests:

go test ./internal/daemon/...
go test ./internal/subscriptions/...
go test ./internal/websocket/...

Performance Characteristics

Troubleshooting

Notifications Not Received

  1. Check subscription exists: thrum subscriptions CLI or subscriptions.list RPC
  2. Verify client is connected: Check WebSocket client registry via daemon logs
  3. Confirm event matches subscription: Check scope/mention filters match message scopes/refs
  4. Look for slow client disconnections: WebSocket buffer full (256-message limit)
  5. Verify WebSocket endpoint: Use ws://localhost:9999/ws (not ws://localhost:9999/)

High Memory Usage

Notification Lag

References

Next Steps