Producer Streams
Producer streams let the server emit a sequence of output batches from a single request. Use them for large result sets, real-time data, or any pattern where the output length is not known upfront.
Basic usage
import { Protocol, int32 } from "vgi-rpc-typescript";
const protocol = new Protocol("MyService");
protocol.producer<{ limit: number; current: number }>("count", { params: { limit: int32 }, outputSchema: { n: int32, n_squared: int32 }, init: async ({ limit }) => ({ limit, current: 0 }), produce: async (state, out) => { if (state.current >= state.limit) { out.finish(); return; } out.emitRow({ n: state.current, n_squared: state.current ** 2 }); state.current++; }, doc: "Count from 0 to limit-1.",});How it works
A producer method has two phases:
init— receives the request parameters, returns the initial state objectproduce— called repeatedly with the current state and anOutputCollector
The produce function must either:
- Emit one data batch via
out.emit()orout.emitRow(), or - Call
out.finish()to end the stream
Mutate state in-place between calls to track progress.
Type-safe state
The generic <S> parameter on protocol.producer<S>() threads the state type from init to produce:
interface CountState { limit: number; current: number; batchSize: number;}
protocol.producer<CountState>("count", { params: { limit: int32, batch_size: int32 }, outputSchema: { n: int32, n_squared: int32 }, init: async ({ limit, batch_size }) => ({ limit, current: 0, batchSize: batch_size, }), produce: async (state, out) => { // state is typed as CountState if (state.current >= state.limit) { out.finish(); return; } out.emitRow({ n: state.current, n_squared: state.current ** 2 }); state.current++; }, defaults: { batch_size: 1 },});Multi-row batches
For efficiency, emit multiple rows per batch instead of one at a time:
produce: async (state, out) => { if (state.current >= state.limit) { out.finish(); return; }
const remaining = state.limit - state.current; const count = Math.min(state.batchSize, remaining);
const nValues: number[] = []; const sqValues: number[] = []; for (let i = 0; i < count; i++) { const n = state.current + i; nValues.push(n); sqValues.push(n * n); } state.current += count;
out.emit({ n: nValues, n_squared: sqValues });},Stream headers
Producer methods can optionally send a one-time header before the data stream. See Stream Headers.