Exchange Streams
Exchange streams implement bidirectional streaming where the client sends batches and the server responds with one output batch per input batch.
Basic usage
import { Protocol, float, type OutputCollector } from "vgi-rpc-typescript";import { type RecordBatch } from "apache-arrow";
const protocol = new Protocol("MyService");
protocol.exchange<{ factor: number }>("scale", { params: { factor: float }, inputSchema: { value: float }, outputSchema: { value: float }, init: async ({ factor }) => ({ factor }), exchange: async (state, input: RecordBatch, out: OutputCollector) => { const value = input.getChildAt(0)?.get(0) as number; out.emitRow({ value: value * state.factor }); }, doc: "Scale input values by a factor.",});How it works
An exchange method has two phases:
init— receives the request parameters, returns the initial state objectexchange— called once per input batch with the current state, input batch, and anOutputCollector
The exchange function must emit exactly one data batch per call. Unlike producer streams, calling out.finish() is not allowed in exchange methods.
Exchange function signature
type ExchangeFn<S> = ( state: S, input: RecordBatch, out: OutputCollector,) => Promise<void> | void;state— mutable state object frominitinput— the ArrowRecordBatchsent by the clientout—OutputCollectorfor emitting the response batch
Reading input data
Access input batch data using Arrow’s column API:
exchange: async (state, input, out) => { // By column index const col0 = input.getChildAt(0);
// By field name const values = input.getChild("value");
// Iterate rows for (let i = 0; i < input.numRows; i++) { const val = values?.get(i); // process each row... }
out.emitRow({ result: processedValue });},Lockstep protocol
Exchange streams use a lockstep protocol: the server reads one input batch and writes one output batch before reading the next. This interleaved pattern prevents deadlocks when both sides are reading and writing on the same pipe.
Type-safe state
Like producer streams, the generic <S> parameter threads state types:
interface TransformState { factor: number; totalProcessed: number;}
protocol.exchange<TransformState>("transform", { params: { factor: float }, inputSchema: { value: float }, outputSchema: { value: float, running_total: float }, init: async ({ factor }) => ({ factor, totalProcessed: 0 }), exchange: async (state, input, out) => { const value = input.getChildAt(0)?.get(0) as number; const scaled = value * state.factor; state.totalProcessed += scaled; out.emitRow({ value: scaled, running_total: state.totalProcessed }); },});Stream headers
Exchange methods can optionally send a one-time header before the data stream. See Stream Headers.