Skip to content

API Reference

All public symbols are exported from the vgi-rpc package entry point.

Classes

Protocol

Fluent builder for defining RPC methods.

import { Protocol } from "vgi-rpc";
const protocol = new Protocol(name: string);

Properties:

  • name: string — the protocol/service name

Methods:

  • unary(name, config) — register a unary method
  • producer<S>(name, config) — register a producer stream
  • exchange<S>(name, config) — register an exchange stream
  • getMethods() — returns Map<string, MethodDefinition>

VgiRpcServer

RPC server that reads Arrow IPC requests from stdin and writes responses to stdout.

import { VgiRpcServer } from "vgi-rpc";
const server = new VgiRpcServer(protocol, options?);

Options:

  • enableDescribe?: boolean — register the __describe__ introspection method (default: true)
  • serverId?: string — server ID included in response metadata (random if omitted)

Methods:

  • run(): Promise<void> — start the server loop; reads requests until stdin closes

OutputCollector

Accumulates output batches during produce/exchange calls. Passed as the out parameter to streaming handlers.

import { OutputCollector } from "vgi-rpc";

Methods:

  • emit(columns: Record<string, any[]>) — emit a data batch from column arrays
  • emit(batch: RecordBatch, metadata?: Map<string, string>) — emit a pre-built RecordBatch
  • emitRow(values: Record<string, any>) — emit a single-row batch
  • finish() — signal stream completion (producer only)
  • clientLog(level, message, extra?) — emit a log message

Properties:

  • finished: boolean — whether finish() has been called
  • batches: EmittedBatch[] — accumulated batches
  • outputSchema: Schema — the output schema

Functions

createHttpHandler

Create a fetch-compatible HTTP handler for a Protocol.

import { createHttpHandler } from "vgi-rpc";
const handler = createHttpHandler(protocol: Protocol, options?: HttpHandlerOptions);
// Returns: (request: Request) => Response | Promise<Response>

See HTTP Transport for usage details.

toSchema

Convert a SchemaLike value to an Arrow Schema:

import { toSchema } from "vgi-rpc";
const schema = toSchema({ name: str, value: float });
// Returns: Schema

inferParamTypes

Infer parameter type strings from an Arrow schema:

import { inferParamTypes } from "vgi-rpc";
const types = inferParamTypes(schema);
// Returns: Record<string, string>

httpConnect

Create an HTTP transport client.

import { httpConnect } from "vgi-rpc";
const client = httpConnect(baseUrl: string, options?: HttpConnectOptions);
// Returns: RpcClient

See Client Transports for usage details.

pipeConnect

Create a raw pipe transport client.

import { pipeConnect } from "vgi-rpc";
const client = pipeConnect(readable: ReadableStream<Uint8Array>, writable: PipeWritable, options?: PipeConnectOptions);
// Returns: RpcClient

subprocessConnect

Spawn a subprocess and create a pipe transport client over its stdin/stdout.

import { subprocessConnect } from "vgi-rpc";
const client = subprocessConnect(cmd: string[], options?: SubprocessConnectOptions);
// Returns: RpcClient

httpIntrospect

Standalone HTTP introspection — calls __describe__ without creating a full client.

import { httpIntrospect } from "vgi-rpc";
const desc = await httpIntrospect(baseUrl: string, options?: { prefix?: string });
// Returns: Promise<ServiceDescription>

parseDescribeResponse

Parse raw describe response batches into a ServiceDescription. Reusable across custom transports.

import { parseDescribeResponse } from "vgi-rpc";
const desc = await parseDescribeResponse(batches: RecordBatch[], onLog?: (msg: LogMessage) => void);
// Returns: Promise<ServiceDescription>

Authentication factories

Built-in factories for creating AuthenticateFn callbacks. Pass the result to createHttpHandler(protocol, { authenticate }).

See HTTP Transport — Authentication for usage examples.

jwtAuthenticate

Create a JWT-validating authenticate callback using oauth4webapi.

import { jwtAuthenticate } from "vgi-rpc";
const auth = jwtAuthenticate({
issuer: "https://auth.example.com",
audience: "https://api.example.com/vgi",
});
ParameterTypeDefaultDescription
issuerstringrequiredExpected iss claim (used for OIDC discovery)
audiencestringrequiredExpected aud claim
jwksUristringundefinedExplicit JWKS URL (discovered from issuer if omitted)
principalClaimstring"sub"JWT claim to use as AuthContext.principal
domainstring"jwt"AuthContext.domain value

bearerAuthenticate

Create a bearer-token authenticate callback with a custom validate function.

import { bearerAuthenticate } from "vgi-rpc";
const auth = bearerAuthenticate({
validate: (token: string) => { /* return AuthContext or throw */ },
});
ParameterTypeDescription
validateBearerValidateFnReceives the raw token, returns AuthContext on success, throws on failure

bearerAuthenticateStatic

Create a bearer-token authenticate callback from a static token map. Uses constant-time comparison for token lookup.

import { bearerAuthenticateStatic } from "vgi-rpc";
const auth = bearerAuthenticateStatic({
tokens: { "key-abc": aliceContext, "key-def": bobContext },
});
ParameterTypeDescription
tokensRecord<string, AuthContext> | ReadonlyMap<string, AuthContext>Maps bearer token strings to AuthContext values

chainAuthenticate

Chain multiple authenticate callbacks, trying each in order.

import { chainAuthenticate } from "vgi-rpc";
const auth = chainAuthenticate(jwtAuth, apiKeyAuth);
BehaviourException typeResult
Credentials accepted(none)Returns AuthContext, stops chain
Bad / missing credentialsPlain Error (constructor === Error)Tries next authenticator
Authenticated but forbiddenError with name === "PermissionError"Propagates immediately
Bug in authenticatorTypeError, RangeError, etc.Propagates immediately
Non-Error throwanyPropagates immediately

Throws at construction time if called with no authenticators.

mtlsAuthenticate

Create an mTLS authenticate callback with custom certificate validation. Parses a URL-encoded PEM from the specified header.

import { mtlsAuthenticate } from "vgi-rpc";
const auth = mtlsAuthenticate({
validate: (cert) => new AuthContext("mtls", true, "principal", {}),
header: "X-SSL-Client-Cert",
checkExpiry: false,
});
ParameterTypeDefaultDescription
validateCertValidateFnrequiredReceives X509Certificate, returns AuthContext or throws
headerstring"X-SSL-Client-Cert"Header containing URL-encoded PEM
checkExpirybooleanfalseCheck validity period before calling validate

mtlsAuthenticateFingerprint

Create an mTLS authenticate callback using certificate fingerprint lookup. Fingerprints must be lowercase hex without colons.

import { mtlsAuthenticateFingerprint } from "vgi-rpc";
const auth = mtlsAuthenticateFingerprint({
fingerprints: { "a1b2c3...": new AuthContext("mtls", true, "service-a") },
});
ParameterTypeDefaultDescription
fingerprintsRecord<string, AuthContext> | ReadonlyMap<string, AuthContext>requiredFingerprint-to-context map
headerstring"X-SSL-Client-Cert"Header containing URL-encoded PEM
algorithmstring"sha256"Hash algorithm (sha256, sha1, sha384, sha512)
checkExpirybooleanfalseReject expired certificates

mtlsAuthenticateSubject

Create an mTLS authenticate callback using certificate subject CN. Populates claims with subject_dn, serial, and not_valid_after.

import { mtlsAuthenticateSubject } from "vgi-rpc";
const auth = mtlsAuthenticateSubject({
allowedSubjects: new Set(["my-service"]),
});
ParameterTypeDefaultDescription
headerstring"X-SSL-Client-Cert"Header containing URL-encoded PEM
domainstring"mtls"AuthContext.domain value
allowedSubjectsReadonlySet<string> | nullnullRestrict accepted CNs
checkExpirybooleanfalseReject expired certificates

mtlsAuthenticateXfcc

Create an authenticate callback from the Envoy x-forwarded-client-cert header. No crypto dependencies required.

import { mtlsAuthenticateXfcc } from "vgi-rpc";
const auth = mtlsAuthenticateXfcc({ selectElement: "first" });
ParameterTypeDefaultDescription
validateXfccValidateFnCustom validation (default: extract CN from Subject)
domainstring"mtls"AuthContext.domain value
selectElement"first" | "last""first"Which element to use when multiple are present

parseXfcc

Parse an x-forwarded-client-cert header value into structured elements. Exported for testability.

import { parseXfcc } from "vgi-rpc";
const elements = parseXfcc(headerValue);
// Returns: XfccElement[]

httpOAuthMetadata

Discover OAuth Protected Resource Metadata (RFC 9728) from a vgi-rpc server.

import { httpOAuthMetadata } from "vgi-rpc";
const metadata = await httpOAuthMetadata(baseUrl: string, prefix?: string);
// Returns: Promise<OAuthResourceMetadataResponse | null>

parseResourceMetadataUrl

Extract the resource_metadata URL from a WWW-Authenticate Bearer challenge.

import { parseResourceMetadataUrl } from "vgi-rpc";
const url = parseResourceMetadataUrl(wwwAuthenticate: string);
// Returns: string | null

parseClientId

Extract the client_id from a WWW-Authenticate Bearer challenge.

import { parseClientId } from "vgi-rpc";
const clientId = parseClientId(wwwAuthenticate: string);
// Returns: string | null

parseClientSecret

Extract the client_secret from a WWW-Authenticate Bearer challenge.

import { parseClientSecret } from "vgi-rpc";
const clientSecret = parseClientSecret(wwwAuthenticate: string);
// Returns: string | null

parseUseIdTokenAsBearer

Extract the use_id_token_as_bearer flag from a WWW-Authenticate Bearer challenge.

import { parseUseIdTokenAsBearer } from "vgi-rpc";
const useIdToken = parseUseIdTokenAsBearer(wwwAuthenticate: string);
// Returns: boolean

Client classes

HttpStreamSession

HTTP streaming session returned by client.stream() on HTTP transport clients.

class HttpStreamSession implements StreamSession {
readonly header: Record<string, any> | null;
exchange(input: Record<string, any>[]): Promise<Record<string, any>[]>;
[Symbol.asyncIterator](): AsyncIterableIterator<Record<string, any>[]>;
close(): void;
}

PipeStreamSession

Pipe streaming session returned by client.stream() on pipe and subprocess transport clients.

class PipeStreamSession implements StreamSession {
readonly header: Record<string, any> | null;
exchange(input: Record<string, any>[]): Promise<Record<string, any>[]>;
[Symbol.asyncIterator](): AsyncIterableIterator<Record<string, any>[]>;
close(): void;
}

Type singletons

Schema shorthand values — see Schema Shorthand for details.

ExportArrow Type
strUtf8
bytesBinary
intInt64
int32Int32
floatFloat64
float32Float32
boolBool

Type aliases

type UnaryHandler = (
params: Record<string, any>,
ctx: LogContext,
) => Promise<Record<string, any>> | Record<string, any>;
type ProducerInit<S> = (
params: Record<string, any>,
) => Promise<S> | S;
type ProducerFn<S> = (
state: S,
out: OutputCollector,
) => Promise<void> | void;
type ExchangeInit<S> = (
params: Record<string, any>,
) => Promise<S> | S;
type ExchangeFn<S> = (
state: S,
input: RecordBatch,
out: OutputCollector,
) => Promise<void> | void;
type HeaderInit = (
params: Record<string, any>,
state: any,
ctx: LogContext,
) => Record<string, any>;
type SchemaLike = Schema | Record<string, DataType>;
type AuthenticateFn = (request: Request) => AuthContext | Promise<AuthContext>;
type BearerValidateFn = (token: string) => AuthContext | Promise<AuthContext>;
type CertValidateFn = (cert: X509Certificate) => AuthContext | Promise<AuthContext>;
type XfccValidateFn = (element: XfccElement) => AuthContext | Promise<AuthContext>;

Interfaces

XfccElement

interface XfccElement {
hash: string | null;
cert: string | null;
subject: string | null;
uri: string | null;
dns: readonly string[];
by: string | null;
}

OAuthResourceMetadata

Server-side configuration for RFC 9728 OAuth Protected Resource Metadata.

interface OAuthResourceMetadata {
resource: string;
authorizationServers: string[];
scopesSupported?: string[];
bearerMethodsSupported?: string[];
resourceName?: string;
resourceDocumentation?: string;
resourcePolicyUri?: string;
resourceTosUri?: string;
clientId?: string;
clientSecret?: string;
useIdTokenAsBearer?: boolean;
}

OAuthResourceMetadataResponse

Client-side response from OAuth Protected Resource Metadata discovery.

interface OAuthResourceMetadataResponse {
resource: string;
authorizationServers: string[];
scopesSupported?: string[];
bearerMethodsSupported?: string[];
resourceName?: string;
resourceDocumentation?: string;
resourcePolicyUri?: string;
resourceTosUri?: string;
clientId?: string;
clientSecret?: string;
useIdTokenAsBearer?: boolean;
}

LogContext

interface LogContext {
clientLog(level: string, message: string, extra?: Record<string, string>): void;
}

RpcClient

interface RpcClient {
call(method: string, params?: Record<string, any>): Promise<Record<string, any> | null>;
stream(method: string, params?: Record<string, any>): Promise<StreamSession>;
describe(): Promise<ServiceDescription>;
close(): void;
}

StreamSession

interface StreamSession {
readonly header: Record<string, any> | null;
exchange(input: Record<string, any>[]): Promise<Record<string, any>[]>;
[Symbol.asyncIterator](): AsyncIterableIterator<Record<string, any>[]>;
close(): void;
}

ServiceDescription

interface ServiceDescription {
protocolName: string;
methods: MethodInfo[];
}

MethodInfo

interface MethodInfo {
name: string;
type: "unary" | "stream";
paramsSchema: Schema;
resultSchema: Schema;
inputSchema?: Schema;
outputSchema?: Schema;
headerSchema?: Schema;
doc?: string;
paramTypes?: Record<string, string>;
defaults?: Record<string, any>;
}

LogMessage

interface LogMessage {
level: string;
message: string;
extra?: Record<string, any>;
}

HttpConnectOptions

See Configuration.

PipeConnectOptions

See Configuration.

SubprocessConnectOptions

See Configuration.

HttpHandlerOptions

See Configuration.

StateSerializer

interface StateSerializer {
serialize(state: any): Uint8Array;
deserialize(bytes: Uint8Array): any;
}

Enums

MethodType

enum MethodType {
UNARY = "unary",
STREAM = "stream",
}

Error classes

RpcError

class RpcError extends Error {
readonly errorType: string;
readonly errorMessage: string;
readonly remoteTraceback: string;
}

VersionError

class VersionError extends Error {}

Constants

Wire protocol metadata keys:

ConstantValue
RPC_METHOD_KEYvgi_rpc.method
REQUEST_VERSION_KEYvgi_rpc.request_version
REQUEST_VERSION1
LOG_LEVEL_KEYvgi_rpc.log_level
LOG_MESSAGE_KEYvgi_rpc.log_message
LOG_EXTRA_KEYvgi_rpc.log_extra
SERVER_ID_KEYvgi_rpc.server_id
REQUEST_ID_KEYvgi_rpc.request_id
PROTOCOL_NAME_KEYvgi_rpc.protocol_name
DESCRIBE_VERSION_KEYvgi_rpc.describe_version
DESCRIBE_VERSION2
DESCRIBE_METHOD_NAME__describe__
STATE_KEYvgi_rpc.stream_state
ARROW_CONTENT_TYPEapplication/vnd.apache.arrow.stream