API Reference
All public symbols are exported from the vgi-rpc package entry point.
Classes
Protocol
Fluent builder for defining RPC methods.
import { Protocol } from "vgi-rpc";
const protocol = new Protocol(name: string);Properties:
name: string— the protocol/service name
Methods:
unary(name, config)— register a unary methodproducer<S>(name, config)— register a producer streamexchange<S>(name, config)— register an exchange streamgetMethods()— returnsMap<string, MethodDefinition>
VgiRpcServer
RPC server that reads Arrow IPC requests from stdin and writes responses to stdout.
import { VgiRpcServer } from "vgi-rpc";
const server = new VgiRpcServer(protocol, options?);Options:
enableDescribe?: boolean— register the__describe__introspection method (default:true)serverId?: string— server ID included in response metadata (random if omitted)
Methods:
run(): Promise<void>— start the server loop; reads requests until stdin closes
OutputCollector
Accumulates output batches during produce/exchange calls. Passed as the out parameter to streaming handlers.
import { OutputCollector } from "vgi-rpc";Methods:
emit(columns: Record<string, any[]>)— emit a data batch from column arraysemit(batch: RecordBatch, metadata?: Map<string, string>)— emit a pre-built RecordBatchemitRow(values: Record<string, any>)— emit a single-row batchfinish()— signal stream completion (producer only)clientLog(level, message, extra?)— emit a log message
Properties:
finished: boolean— whetherfinish()has been calledbatches: EmittedBatch[]— accumulated batchesoutputSchema: Schema— the output schema
Functions
createHttpHandler
Create a fetch-compatible HTTP handler for a Protocol.
import { createHttpHandler } from "vgi-rpc";
const handler = createHttpHandler(protocol: Protocol, options?: HttpHandlerOptions);// Returns: (request: Request) => Response | Promise<Response>See HTTP Transport for usage details.
toSchema
Convert a SchemaLike value to an Arrow Schema:
import { toSchema } from "vgi-rpc";
const schema = toSchema({ name: str, value: float });// Returns: SchemainferParamTypes
Infer parameter type strings from an Arrow schema:
import { inferParamTypes } from "vgi-rpc";
const types = inferParamTypes(schema);// Returns: Record<string, string>httpConnect
Create an HTTP transport client.
import { httpConnect } from "vgi-rpc";
const client = httpConnect(baseUrl: string, options?: HttpConnectOptions);// Returns: RpcClientSee Client Transports for usage details.
pipeConnect
Create a raw pipe transport client.
import { pipeConnect } from "vgi-rpc";
const client = pipeConnect(readable: ReadableStream<Uint8Array>, writable: PipeWritable, options?: PipeConnectOptions);// Returns: RpcClientsubprocessConnect
Spawn a subprocess and create a pipe transport client over its stdin/stdout.
import { subprocessConnect } from "vgi-rpc";
const client = subprocessConnect(cmd: string[], options?: SubprocessConnectOptions);// Returns: RpcClienthttpIntrospect
Standalone HTTP introspection — calls __describe__ without creating a full client.
import { httpIntrospect } from "vgi-rpc";
const desc = await httpIntrospect(baseUrl: string, options?: { prefix?: string });// Returns: Promise<ServiceDescription>parseDescribeResponse
Parse raw describe response batches into a ServiceDescription. Reusable across custom transports.
import { parseDescribeResponse } from "vgi-rpc";
const desc = await parseDescribeResponse(batches: RecordBatch[], onLog?: (msg: LogMessage) => void);// Returns: Promise<ServiceDescription>Authentication factories
Built-in factories for creating AuthenticateFn callbacks. Pass the result to createHttpHandler(protocol, { authenticate }).
See HTTP Transport — Authentication for usage examples.
jwtAuthenticate
Create a JWT-validating authenticate callback using oauth4webapi.
import { jwtAuthenticate } from "vgi-rpc";
const auth = jwtAuthenticate({ issuer: "https://auth.example.com", audience: "https://api.example.com/vgi",});| Parameter | Type | Default | Description |
|---|---|---|---|
issuer | string | required | Expected iss claim (used for OIDC discovery) |
audience | string | required | Expected aud claim |
jwksUri | string | undefined | Explicit JWKS URL (discovered from issuer if omitted) |
principalClaim | string | "sub" | JWT claim to use as AuthContext.principal |
domain | string | "jwt" | AuthContext.domain value |
bearerAuthenticate
Create a bearer-token authenticate callback with a custom validate function.
import { bearerAuthenticate } from "vgi-rpc";
const auth = bearerAuthenticate({ validate: (token: string) => { /* return AuthContext or throw */ },});| Parameter | Type | Description |
|---|---|---|
validate | BearerValidateFn | Receives the raw token, returns AuthContext on success, throws on failure |
bearerAuthenticateStatic
Create a bearer-token authenticate callback from a static token map. Uses constant-time comparison for token lookup.
import { bearerAuthenticateStatic } from "vgi-rpc";
const auth = bearerAuthenticateStatic({ tokens: { "key-abc": aliceContext, "key-def": bobContext },});| Parameter | Type | Description |
|---|---|---|
tokens | Record<string, AuthContext> | ReadonlyMap<string, AuthContext> | Maps bearer token strings to AuthContext values |
chainAuthenticate
Chain multiple authenticate callbacks, trying each in order.
import { chainAuthenticate } from "vgi-rpc";
const auth = chainAuthenticate(jwtAuth, apiKeyAuth);| Behaviour | Exception type | Result |
|---|---|---|
| Credentials accepted | (none) | Returns AuthContext, stops chain |
| Bad / missing credentials | Plain Error (constructor === Error) | Tries next authenticator |
| Authenticated but forbidden | Error with name === "PermissionError" | Propagates immediately |
| Bug in authenticator | TypeError, RangeError, etc. | Propagates immediately |
| Non-Error throw | any | Propagates immediately |
Throws at construction time if called with no authenticators.
mtlsAuthenticate
Create an mTLS authenticate callback with custom certificate validation. Parses a URL-encoded PEM from the specified header.
import { mtlsAuthenticate } from "vgi-rpc";
const auth = mtlsAuthenticate({ validate: (cert) => new AuthContext("mtls", true, "principal", {}), header: "X-SSL-Client-Cert", checkExpiry: false,});| Parameter | Type | Default | Description |
|---|---|---|---|
validate | CertValidateFn | required | Receives X509Certificate, returns AuthContext or throws |
header | string | "X-SSL-Client-Cert" | Header containing URL-encoded PEM |
checkExpiry | boolean | false | Check validity period before calling validate |
mtlsAuthenticateFingerprint
Create an mTLS authenticate callback using certificate fingerprint lookup. Fingerprints must be lowercase hex without colons.
import { mtlsAuthenticateFingerprint } from "vgi-rpc";
const auth = mtlsAuthenticateFingerprint({ fingerprints: { "a1b2c3...": new AuthContext("mtls", true, "service-a") },});| Parameter | Type | Default | Description |
|---|---|---|---|
fingerprints | Record<string, AuthContext> | ReadonlyMap<string, AuthContext> | required | Fingerprint-to-context map |
header | string | "X-SSL-Client-Cert" | Header containing URL-encoded PEM |
algorithm | string | "sha256" | Hash algorithm (sha256, sha1, sha384, sha512) |
checkExpiry | boolean | false | Reject expired certificates |
mtlsAuthenticateSubject
Create an mTLS authenticate callback using certificate subject CN. Populates claims with subject_dn, serial, and not_valid_after.
import { mtlsAuthenticateSubject } from "vgi-rpc";
const auth = mtlsAuthenticateSubject({ allowedSubjects: new Set(["my-service"]),});| Parameter | Type | Default | Description |
|---|---|---|---|
header | string | "X-SSL-Client-Cert" | Header containing URL-encoded PEM |
domain | string | "mtls" | AuthContext.domain value |
allowedSubjects | ReadonlySet<string> | null | null | Restrict accepted CNs |
checkExpiry | boolean | false | Reject expired certificates |
mtlsAuthenticateXfcc
Create an authenticate callback from the Envoy x-forwarded-client-cert header. No crypto dependencies required.
import { mtlsAuthenticateXfcc } from "vgi-rpc";
const auth = mtlsAuthenticateXfcc({ selectElement: "first" });| Parameter | Type | Default | Description |
|---|---|---|---|
validate | XfccValidateFn | — | Custom validation (default: extract CN from Subject) |
domain | string | "mtls" | AuthContext.domain value |
selectElement | "first" | "last" | "first" | Which element to use when multiple are present |
parseXfcc
Parse an x-forwarded-client-cert header value into structured elements. Exported for testability.
import { parseXfcc } from "vgi-rpc";
const elements = parseXfcc(headerValue);// Returns: XfccElement[]httpOAuthMetadata
Discover OAuth Protected Resource Metadata (RFC 9728) from a vgi-rpc server.
import { httpOAuthMetadata } from "vgi-rpc";
const metadata = await httpOAuthMetadata(baseUrl: string, prefix?: string);// Returns: Promise<OAuthResourceMetadataResponse | null>parseResourceMetadataUrl
Extract the resource_metadata URL from a WWW-Authenticate Bearer challenge.
import { parseResourceMetadataUrl } from "vgi-rpc";
const url = parseResourceMetadataUrl(wwwAuthenticate: string);// Returns: string | nullparseClientId
Extract the client_id from a WWW-Authenticate Bearer challenge.
import { parseClientId } from "vgi-rpc";
const clientId = parseClientId(wwwAuthenticate: string);// Returns: string | nullparseClientSecret
Extract the client_secret from a WWW-Authenticate Bearer challenge.
import { parseClientSecret } from "vgi-rpc";
const clientSecret = parseClientSecret(wwwAuthenticate: string);// Returns: string | nullparseUseIdTokenAsBearer
Extract the use_id_token_as_bearer flag from a WWW-Authenticate Bearer challenge.
import { parseUseIdTokenAsBearer } from "vgi-rpc";
const useIdToken = parseUseIdTokenAsBearer(wwwAuthenticate: string);// Returns: booleanClient classes
HttpStreamSession
HTTP streaming session returned by client.stream() on HTTP transport clients.
class HttpStreamSession implements StreamSession { readonly header: Record<string, any> | null; exchange(input: Record<string, any>[]): Promise<Record<string, any>[]>; [Symbol.asyncIterator](): AsyncIterableIterator<Record<string, any>[]>; close(): void;}PipeStreamSession
Pipe streaming session returned by client.stream() on pipe and subprocess transport clients.
class PipeStreamSession implements StreamSession { readonly header: Record<string, any> | null; exchange(input: Record<string, any>[]): Promise<Record<string, any>[]>; [Symbol.asyncIterator](): AsyncIterableIterator<Record<string, any>[]>; close(): void;}Type singletons
Schema shorthand values — see Schema Shorthand for details.
| Export | Arrow Type |
|---|---|
str | Utf8 |
bytes | Binary |
int | Int64 |
int32 | Int32 |
float | Float64 |
float32 | Float32 |
bool | Bool |
Type aliases
type UnaryHandler = ( params: Record<string, any>, ctx: LogContext,) => Promise<Record<string, any>> | Record<string, any>;
type ProducerInit<S> = ( params: Record<string, any>,) => Promise<S> | S;
type ProducerFn<S> = ( state: S, out: OutputCollector,) => Promise<void> | void;
type ExchangeInit<S> = ( params: Record<string, any>,) => Promise<S> | S;
type ExchangeFn<S> = ( state: S, input: RecordBatch, out: OutputCollector,) => Promise<void> | void;
type HeaderInit = ( params: Record<string, any>, state: any, ctx: LogContext,) => Record<string, any>;
type SchemaLike = Schema | Record<string, DataType>;
type AuthenticateFn = (request: Request) => AuthContext | Promise<AuthContext>;
type BearerValidateFn = (token: string) => AuthContext | Promise<AuthContext>;
type CertValidateFn = (cert: X509Certificate) => AuthContext | Promise<AuthContext>;
type XfccValidateFn = (element: XfccElement) => AuthContext | Promise<AuthContext>;Interfaces
XfccElement
interface XfccElement { hash: string | null; cert: string | null; subject: string | null; uri: string | null; dns: readonly string[]; by: string | null;}OAuthResourceMetadata
Server-side configuration for RFC 9728 OAuth Protected Resource Metadata.
interface OAuthResourceMetadata { resource: string; authorizationServers: string[]; scopesSupported?: string[]; bearerMethodsSupported?: string[]; resourceName?: string; resourceDocumentation?: string; resourcePolicyUri?: string; resourceTosUri?: string; clientId?: string; clientSecret?: string; useIdTokenAsBearer?: boolean;}OAuthResourceMetadataResponse
Client-side response from OAuth Protected Resource Metadata discovery.
interface OAuthResourceMetadataResponse { resource: string; authorizationServers: string[]; scopesSupported?: string[]; bearerMethodsSupported?: string[]; resourceName?: string; resourceDocumentation?: string; resourcePolicyUri?: string; resourceTosUri?: string; clientId?: string; clientSecret?: string; useIdTokenAsBearer?: boolean;}LogContext
interface LogContext { clientLog(level: string, message: string, extra?: Record<string, string>): void;}RpcClient
interface RpcClient { call(method: string, params?: Record<string, any>): Promise<Record<string, any> | null>; stream(method: string, params?: Record<string, any>): Promise<StreamSession>; describe(): Promise<ServiceDescription>; close(): void;}StreamSession
interface StreamSession { readonly header: Record<string, any> | null; exchange(input: Record<string, any>[]): Promise<Record<string, any>[]>; [Symbol.asyncIterator](): AsyncIterableIterator<Record<string, any>[]>; close(): void;}ServiceDescription
interface ServiceDescription { protocolName: string; methods: MethodInfo[];}MethodInfo
interface MethodInfo { name: string; type: "unary" | "stream"; paramsSchema: Schema; resultSchema: Schema; inputSchema?: Schema; outputSchema?: Schema; headerSchema?: Schema; doc?: string; paramTypes?: Record<string, string>; defaults?: Record<string, any>;}LogMessage
interface LogMessage { level: string; message: string; extra?: Record<string, any>;}HttpConnectOptions
See Configuration.
PipeConnectOptions
See Configuration.
SubprocessConnectOptions
See Configuration.
HttpHandlerOptions
See Configuration.
StateSerializer
interface StateSerializer { serialize(state: any): Uint8Array; deserialize(bytes: Uint8Array): any;}Enums
MethodType
enum MethodType { UNARY = "unary", STREAM = "stream",}Error classes
RpcError
class RpcError extends Error { readonly errorType: string; readonly errorMessage: string; readonly remoteTraceback: string;}VersionError
class VersionError extends Error {}Constants
Wire protocol metadata keys:
| Constant | Value |
|---|---|
RPC_METHOD_KEY | vgi_rpc.method |
REQUEST_VERSION_KEY | vgi_rpc.request_version |
REQUEST_VERSION | 1 |
LOG_LEVEL_KEY | vgi_rpc.log_level |
LOG_MESSAGE_KEY | vgi_rpc.log_message |
LOG_EXTRA_KEY | vgi_rpc.log_extra |
SERVER_ID_KEY | vgi_rpc.server_id |
REQUEST_ID_KEY | vgi_rpc.request_id |
PROTOCOL_NAME_KEY | vgi_rpc.protocol_name |
DESCRIBE_VERSION_KEY | vgi_rpc.describe_version |
DESCRIBE_VERSION | 2 |
DESCRIBE_METHOD_NAME | __describe__ |
STATE_KEY | vgi_rpc.stream_state |
ARROW_CONTENT_TYPE | application/vnd.apache.arrow.stream |