Subscriptions & Notifications
Quick Reference
CLI Commands
| Command | Description |
|---|---|
thrum subscribe |
Subscribe to notifications (scope, mention, all) |
thrum unsubscribe ID |
Remove a subscription by ID |
thrum subscriptions |
List active subscriptions for current session |
thrum wait |
Block until notification arrives or timeout |
Overview
The subscription system allows agents to receive real-time push notifications when messages match their interests. Agents can subscribe to:
- Scopes - Messages with specific scope (e.g.,
module:auth,file:main.go) - Mentions - Messages that @mention a specific role (e.g.,
@reviewer) or agent name (e.g.,@furiosa) - All messages - Wildcard subscription to receive all messages
Subscriptions are automatically deleted when a session ends (added in v0.4.3). Subscription identity resolution now correctly uses the caller's agent ID for filtering (caller_agent_id resolution fix in v0.4.3).
When a new message matches a subscription, the daemon:
- Identifies matching subscriptions via the dispatcher
- Sends push notifications to connected clients via the broadcaster
- Stores the message for later retrieval via
message.listAPI
Architecture
Components
+-------------------------------------------------------------+
| Message Flow |
+-------------------------------------------------------------+
message.send RPC
|
v
+--------------+
| Message | 1. Write to JSONL (sharded per-agent)
| Handler | 2. Insert into SQLite
+---------+----+ 3. Extract scopes/refs
|
v
+--------------+
| Dispatcher | 1. Query all subscriptions
| | 2. Match against message (scope/mention/all)
+---------+----+ 3. Build notifications
|
v
+--------------+
| Broadcaster | 1. Try Unix socket clients first
| | 2. Fall back to WebSocket clients
+---------+----+ 3. Best-effort delivery
|
+----+----+
| |
v v
Unix Socket WebSocket
Clients Clients (port 9999)
| |
v v
Connected clients receive notification.message
Database Schema
subscriptions table:
CREATE TABLE subscriptions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
session_id TEXT NOT NULL,
scope_type TEXT, -- NULL for non-scope subscriptions
scope_value TEXT, -- NULL for non-scope subscriptions
mention_role TEXT, -- NULL for non-mention subscriptions
created_at TEXT NOT NULL,
UNIQUE(session_id, scope_type, scope_value, mention_role)
);
-- Indexes for efficient matching
CREATE INDEX idx_subscriptions_scope ON subscriptions(scope_type, scope_value);
CREATE INDEX idx_subscriptions_mention ON subscriptions(mention_role);
CREATE INDEX idx_subscriptions_session ON subscriptions(session_id);
Note: The subscriptions table does not have a foreign key constraint on
session_id. Subscription cleanup on session end is handled at the application
level.
Subscription types (mutually exclusive):
| Type | scope_type | scope_value | mention_role | Description |
|---|---|---|---|---|
| Scope | "module" |
"auth" |
NULL |
Matches messages with scope module:auth |
| Mention | NULL |
NULL |
"reviewer" |
Matches messages with @reviewer mention |
| All | NULL |
NULL |
NULL |
Matches all messages (wildcard) |
Duplicate Prevention
SQLite's UNIQUE constraint doesn't work correctly with NULL values (treats
each NULL as unique). We implement application-level duplicate checking in
service.go:
func (s *Service) subscriptionExists(...) (bool, error) {
// Build query with explicit NULL checks for each combination
if scopeType != nil && scopeValue != nil {
query = "WHERE session_id = ? AND scope_type = ? AND scope_value = ?"
} else if mentionRole != nil {
query = "WHERE session_id = ? AND mention_role = ?"
} else {
query = "WHERE session_id = ? AND scope_type IS NULL AND ..."
}
}
Subscription Lifecycle
Creating Subscriptions
- Client calls
subscribeRPC with subscription criteria - Handler validates:
- Session is active (resolved from agent identity config)
- At least one of scope, mention_role, or all specified
- No duplicate subscription exists (application-level check)
- Insert into
subscriptionstable - Return subscription ID
Example:
// Request
{
"jsonrpc": "2.0",
"method": "subscribe",
"params": {"scope": {"type": "module", "value": "auth"}},
"id": 1
}
// Response
{
"jsonrpc": "2.0",
"result": {
"subscription_id": 42,
"session_id": "ses_01HXE...",
"created_at": "2026-02-03T10:00:00Z"
},
"id": 1
}
Removing Subscriptions
- Client calls
unsubscribeRPC with subscription ID - Handler verifies subscription belongs to current session
- Delete from
subscriptionstable - Return
{"removed": true}or{"removed": false}(idempotent)
Example:
// Request
{
"jsonrpc": "2.0",
"method": "unsubscribe",
"params": {"subscription_id": 42},
"id": 2
}
// Response
{
"jsonrpc": "2.0",
"result": {"removed": true},
"id": 2
}
Listing Subscriptions
// Request
{
"jsonrpc": "2.0",
"method": "subscriptions.list",
"id": 1
}
// Response
{
"jsonrpc": "2.0",
"result": {
"subscriptions": [
{
"id": 42,
"scope_type": "module",
"scope_value": "auth",
"created_at": "2026-02-03T10:00:00Z"
},
{
"id": 43,
"mention_role": "reviewer",
"created_at": "2026-02-03T10:05:00Z"
},
{
"id": 44,
"all": true,
"created_at": "2026-02-03T10:10:00Z"
}
]
},
"id": 1
}
Message Dispatch
Matching Algorithm
When message.send is called, the dispatcher:
- Query all subscriptions from database (joins with sessions and agents tables for mention resolution)
- For each subscription, check if message matches:
- Scope match: Any message scope matches subscription scope
- Mention match: Any message ref has
type="mention"and matches the subscription'smention_role, the agent's role, or the agent's ID/name - All match: Always matches (wildcard)
- Build notification for each match
- Push to connected clients via the Broadcaster
Implementation (dispatcher.go):
// matchSubscription checks if a message matches a subscription.
// Supports both role-based mentions (@reviewer) and name-based mentions (@furiosa).
func matchSubscription(msg *MessageInfo, scopeType, scopeValue, mentionRole, agentID, agentRole sql.NullString) string {
// All subscription - always matches
if !scopeType.Valid && !scopeValue.Valid && !mentionRole.Valid {
return "all"
}
// Scope subscription
if scopeType.Valid && scopeValue.Valid {
for _, scope := range msg.Scopes {
if scope.Type == scopeType.String && scope.Value == scopeValue.String {
return "scope"
}
}
}
// Mention subscription - matches on:
// 1. subscription's mention_role
// 2. agent's role (for role-based mentions)
// 3. agent's ID/name (for name-based mentions like @furiosa)
if mentionRole.Valid {
for _, ref := range msg.Refs {
if ref.Type == "mention" {
if ref.Value == mentionRole.String {
return "mention"
}
if agentRole.Valid && ref.Value == agentRole.String {
return "mention"
}
if agentID.Valid && ref.Value == agentID.String {
return "mention"
}
}
}
}
return "" // No match
}
Notification Building
For each match, the dispatcher builds a notification payload:
{
"method": "notification.message",
"params": {
"message_id": "msg_01HXE...",
"author": {
"agent_id": "furiosa",
"name": "furiosa",
"role": "implementer",
"module": ""
},
"preview": "First 100 characters of content...",
"scopes": [{ "type": "module", "value": "auth" }],
"matched_subscription": {
"subscription_id": 42,
"match_type": "scope"
},
"timestamp": "2026-02-03T10:00:00Z"
}
}
Author parsing:
- Uses
identity.ParseAgentID()to extract the role from the agent ID - The
namefield is set to the raw agent ID (which is the agent's name for named agents) - The
modulefield is empty -- module is not encoded in the agent ID and would require a database lookup
Preview truncation:
- If content is 100 chars or less: Use as-is
- If content exceeds 100 chars: Truncate to 100 chars and append
"..."
Push Notifications
Broadcaster
The daemon uses a Broadcaster (internal/daemon/broadcaster.go) that
implements the ClientNotifier interface. It tries both transport registries in
order:
- Unix socket clients first (via
ClientRegistryfrominternal/daemon/notify.go) - WebSocket clients as fallback (via
ClientRegistryfrominternal/websocket/registry.go)
If the notification is delivered successfully via either transport, the Broadcaster returns immediately. This means each session only receives one notification per match, regardless of how many transports are available.
Unix Socket Client Registry
Location: internal/daemon/notify.go
type ClientRegistry struct {
mu sync.RWMutex
clients map[string]*ConnectedClient
}
type ConnectedClient struct {
sessionID string
conn net.Conn
}
Operations:
Register(sessionID, conn)- Add client when they connectUnregister(sessionID)- Remove client on disconnectNotify(sessionID, *Notification)- Send notification with newline framing
WebSocket Client Registry
Location: internal/websocket/registry.go
type ClientRegistry struct {
mu sync.RWMutex
clients map[string]*Connection
}
Operations:
Register(sessionID, conn)- Add client when they connectUnregister(sessionID)- Remove client on disconnectGet(sessionID)- Look up client by session IDCount()- Number of connected clientsCloseAll()- Close all connections (used during shutdown)Notify(sessionID, notification)- Send JSON-RPC notification via WebSocket frame
Sending Notifications
When the dispatcher finds matches, it calls Broadcaster.Notify() for each:
- Lookup session in Unix socket registry, then WebSocket registry
- If not found in either: Silently succeed (client will see message via
message.list)
- If not found in either: Silently succeed (client will see message via
- Marshal notification to JSON-RPC format
- Write to transport:
- Unix socket: Newline-delimited JSON
- WebSocket: Text frame via buffered send channel (256-message buffer)
- Handle errors:
- Write error: Client disconnected - auto-unregister
- Buffer full (WebSocket): Client disconnected - auto-unregister
- Success: Continue
JSON-RPC notification format:
{
"jsonrpc": "2.0",
"method": "notification.message",
"params": {
/* NotifyParams */
}
}
Note: No id field - notifications are one-way, no response expected.
Connection Management
Client responsibilities:
- Keep connection open during session
- Listen for incoming notifications
- Parse JSON-RPC notifications (no
idfield) - Fetch full message content with
message.get
Daemon responsibilities:
- Track connected clients by session
- Auto-unregister on write errors
- Don't block message.send on notification failures
- Silently ignore notifications to disconnected clients
Testing
Coverage
Key test scenarios:
- Subscription CRUD (create, list, unsubscribe)
- Duplicate prevention (all subscription types)
- Scope matching (exact match, multiple scopes, no match)
- Mention matching (role-based @reviewer, name-based @furiosa)
- All subscription matching (wildcard)
- Multiple subscriptions per session
- Client registry (register, unregister, notify) for both Unix socket and WebSocket
- Notification format (JSON-RPC, field validation)
- Preview truncation (short, long, exact 100 chars)
- Disconnected client handling (auto-unregister)
- Broadcaster routing (Unix socket first, WebSocket fallback)
Test Patterns
Database tests use temp directories:
tmpDir := t.TempDir()
db, err := schema.OpenDB(filepath.Join(tmpDir, "test.db"))
defer db.Close()
Unix socket connection tests use net.Pipe():
server, client := net.Pipe()
defer server.Close()
defer client.Close()
// IMPORTANT: net.Pipe() is synchronous - use goroutines
go func() {
buf := make([]byte, 1024)
n, _ := client.Read(buf)
// Process buffer
}()
registry.Notify("ses_001", notification)
Performance Considerations
Matching Efficiency
Current implementation:
- Load ALL subscriptions from DB on every message
- O(N) matching where N = number of subscriptions
- Joins with sessions and agents tables for mention resolution
Rationale for simple approach:
- Expected subscription count: < 100 per daemon instance
- Message send frequency: < 10/second
- Premature optimization avoided - measure first
Notification Delivery
Current:
- Synchronous notification sending via Broadcaster
- Blocks message.send briefly
Trade-off:
- Simplicity vs. throughput
- Current approach is correct and maintainable
- Optimize when proven necessary
Error Handling
Subscription Errors
| Scenario | Behavior |
|---|---|
| Duplicate subscription | Return error "subscription already exists" |
| Invalid session | Return error "no active session found" |
| Missing parameters | Return error "at least one of scope, mention_role, or all must be specified" |
| Database error | Return error with details |
Notification Errors
| Scenario | Behavior |
|---|---|
| Client not connected | Silently succeed (client will poll) |
| Write error (disconnect) | Auto-unregister, return error |
| Buffer full (WebSocket) | Auto-unregister, return error |
| Marshal error | Return error (should never happen) |
Recovery
Client reconnection:
- Registry tracks by session ID
- Re-register on reconnect
- Previous subscriptions still active (tied to session, not connection)
Design Notes
Why Application-Level Duplicate Checking?
SQLite's UNIQUE constraint fails with NULL values:
(ses_001, NULL, NULL, NULL)can be inserted multiple times- SQLite treats NULL != NULL in uniqueness checks
Alternatives considered:
- Use empty string
""instead of NULL - Semantically incorrect - Separate tables per subscription type - Over-engineered
- Application-level checking - Simple, correct, maintainable (chosen)
Why Silently Ignore Disconnected Clients?
When a notification can't be sent (client disconnected):
- Message is already in database (via
message.send) - Client will see it when they call
message.list - Failing the entire
message.sendwould be wrong
Design:
- Notifications are best-effort delivery
- Database is source of truth
- Clients must poll
message.liston reconnect
Why Preview Truncation?
- Full message content can be large (megabytes)
- Notifications should be lightweight
- Clients can fetch full content with
message.get - 100 chars is enough for preview/triage
Next Steps
- Messaging — the CLI commands that trigger subscriptions:
thrum subscribe,thrum wait, andthrum subscriptions - Event Streaming — the Broadcaster and Dispatcher internals that route matched events to subscribers
- WebSocket API — connect over WebSocket to receive subscription notifications in real time
- RPC API Reference —
subscribe,unsubscribe, andsubscriptions.listmethod schemas - Dispatcher:
internal/subscriptions/dispatcher.go - Broadcaster:
internal/daemon/broadcaster.go - Unix Socket Client Registry:
internal/daemon/notify.go - WebSocket Client Registry:
internal/websocket/registry.go