Build on top

In-process stream

Subscribe to wide events flowing through evlog, in-process, with createStreamDrain — sync listeners, async iterators, and a ring buffer.
Looking for the network bridge? This page covers the in-process primitive (createStreamDrain) — for subscribers running in the same Node process. For browser tabs, CLIs, or external devtools that connect over HTTP, see Stream server.

createStreamDrain() exposes the events flowing through evlog as an in-process pub/sub. It's the primitive any local consumer can subscribe to without re-implementing a drain.

Subscribe to wide events in-process

Scope: the stream lives inside a single Node / Bun / Deno process. It sees events emitted from that process only.That means it works during local development, on long-lived self-hosted servers, and inside containers. On serverless platforms (Vercel Functions, Cloudflare Workers, AWS Lambda…), each invocation is a separate isolate, so a subscriber in one invocation will not see events emitted from another. Use a real broker for cross-instance fan-out.
import { createStreamDrain } from 'evlog/stream'

const stream = createStreamDrain({ buffer: 200 })

// Register as a normal evlog drain (Nitro hook or plugin runner):
nitroApp.hooks.hook('evlog:drain', stream.drain)

Subscribing

Two consumption styles are supported.

Sync listener

const unsubscribe = stream.subscribe((event) => {
  if (event.level === 'error') notifyOps(event)
})

// Later:
unsubscribe()

Listener errors are caught and logged — they never affect other subscribers or the drain.

Async iterator

for await (const event of stream.events()) {
  console.log(event.timestamp, event.action ?? event.message)
  if (shouldStop(event)) break  // breaking cleanly unsubscribes
}

Each call to events() returns a fresh independent iterator. Past buffered events are not replayed; pair with stream.recent() to seed history.

Replay buffer

stream.recent() returns a defensive copy of the most recent events (oldest first). The default buffer holds 500 events; pass buffer: 0 to disable, or set it explicitly:

const stream = createStreamDrain({ buffer: 1000 })

const initial = stream.recent()
for (const past of initial) seedDashboard(past)

stream.subscribe(liveEvent => updateDashboard(liveEvent))

Backpressure

A slow async-iterator consumer never blocks the drain. Each iterator has a per-subscriber queue (default 1000); when it overflows, the oldest queued events are dropped and stream.droppedCount increments.

Filter

Events that fail the optional filter predicate are not buffered nor delivered:

const errors = createStreamDrain({
  filter: event => event.level === 'error' || event.status >= 500,
})

Default singleton

When several pieces of code in the same process need to share a single stream — typically a framework integration that wires the drain on one side and the stream server on the other — use the singleton accessors:

import { getDefaultStream, setDefaultStream } from 'evlog/stream'

// Lazily creates a singleton on first call
const stream = getDefaultStream({ buffer: 500 })

// Reset (mostly useful in tests)
setDefaultStream(null)

The mini stream server uses this singleton internally, so anything draining into getDefaultStream() automatically reaches all SSE clients.

Going further

  • Network bridge — expose this stream over HTTP for browser tabs / CLIs / external devtools. See the Stream server.
  • Recipes — concrete consumer patterns (devtool, replay-then-live, aggregation). See Consumer recipes.