Unix Socket Launcher
The launcher serves a vgi-rpc protocol over an AF_UNIX domain socket instead of stdin/stdout or HTTP. It is built for the spawn-or-reuse pattern: a single long-running worker process binds one socket, and any number of callers connect to that socket path. A coordination layer (launch) makes sure exactly one worker exists per worker command tuple, reusing a warm worker when one is already running.
The on-disk layout and tuple hash match the Python vgi_rpc.launcher byte-for-byte, so a TypeScript worker and a Python launcher (or vice versa) resolve to the same socket.
How it works
Section titled “How it works”- One server process per socket.
serveUnixbinds an AF_UNIX socket, accepts connections one at a time (sequential listen, like Python’sserve_unix), and dispatches each connection through the standard unary / stream /__describe__machinery. - Self-termination. A worker shuts itself down after
idleTimeoutseconds with zero connected clients, after an initialstartupGracewindow so a slow first caller doesn’t see the socket vanish. - Spawn-or-reuse.
launchderives a deterministic socket path from a hash of the worker command tuple, serializes concurrent first-callers on a per-hash lockfile, probes for an existing worker, and spawns one only if none is serving. - Worker contract. A worker accepts
--unix PATHand--idle-timeout SECon its command line, and prints exactly oneUNIX:<absolute-path>line to stdout once bind + listen succeed.
Serving a socket: serveUnix
Section titled “Serving a socket: serveUnix”serveUnix(protocol, options) binds a Unix socket and serves a Protocol. It returns a ServeUnixHandle.
import { Protocol, float, serveUnix } from "@query-farm/vgi-rpc";
const protocol = new Protocol("Calculator");
protocol.unary("add", { params: { a: float, b: float }, result: { result: float }, handler: async ({ a, b }) => ({ result: a + b }),});
const handle = await serveUnix(protocol, { unixPath: "/tmp/calculator.sock", idleTimeout: 300, // self-terminate after 300s idle (0 disables)});
console.log("serving on", handle.socketPath);
// Block until the server stops (idle timeout, stop(), or fatal error).await handle.done;Once bind + listen succeed, serveUnix writes UNIX:<absolute-path>\n to stdout (this is the cross-language launcher contract) and tightens the socket file to mode 0600.
ServeUnixOptions
Section titled “ServeUnixOptions”| Option | Type | Default | Description |
|---|---|---|---|
unixPath | string | (required) | Path to the Unix socket to bind. Resolved to an absolute path. |
idleTimeout | number | 300 | Self-terminate after this many seconds with zero connected clients. 0 disables the timer. |
startupGraceSeconds | number | 5 | Grace period after listen() succeeds before the idle timer starts ticking. |
protocolVersion | string | "" | Optional logical-service / protocol-contract version label. |
serverId | string | random | Custom server identifier (random 12-char id if omitted). |
enableDescribe | boolean | true | Enable the __describe__ introspection method. |
dispatchHook | DispatchHook | — | Optional dispatch hook for observability. |
externalLocation | ExternalLocationConfig | — | Optional external-storage config for large-batch externalization. |
onServeStart | ServeStartHook | — | Lifecycle hook fired once before the first dispatched request. |
backlog | number | 16 | Maximum listen backlog. |
onBound | (sockPath: string) => void | — | Called after listen() returns but before UNIX:<path> is printed. |
announcementSink | NodeJS.WritableStream | process.stdout | Stream used for the UNIX:<path> announcement line. |
ServeUnixHandle
Section titled “ServeUnixHandle”interface ServeUnixHandle { readonly socketPath: string; /** Shut down the listener and unlink the socket file. */ stop(): Promise<void>; /** Resolves when the server has stopped (idle timeout, stop(), or fatal error). */ readonly done: Promise<void>;}Launch-or-reuse: launch
Section titled “Launch-or-reuse: launch”launch(config) ensures a worker is running for a given command and returns its socket path. The flow:
- Resolve the state directory and the per-tuple
lock/sock/metapaths (or use an explicitsocketPath). - Acquire the per-hash lockfile (blocking up to
connectTimeout). - Probe the socket — if a worker is already serving, reuse it and return its path.
- Otherwise clean up any stale socket, write launch metadata, and spawn the worker, waiting for its
UNIX:<path>announcement. - Release the lock, then run a bounded opportunistic GC of stale entries.
import { launch, pipeConnect } from "@query-farm/vgi-rpc";import { createConnection } from "node:net";
const socketPath = await launch({ workerArgv: ["bun", "run", "examples/calculator-unix.ts"], idleTimeout: 300, // forwarded as --idle-timeout connectTimeout: 30, // max seconds to wait for the lock workerStartupTimeout: 60, // max seconds to wait for UNIX:<path>});
// Connect to the worker over the socket (a Node socket is a duplex stream).const sock = createConnection({ path: socketPath });const client = pipeConnect(sock, sock);const result = await client.call("add", { a: 2, b: 3 });launch appends --unix <socketPath> and --idle-timeout <seconds> to workerArgv when spawning, so the worker must implement the serveUnix CLI contract (parse those flags, print UNIX:<path>).
LaunchConfig
Section titled “LaunchConfig”| Field | Type | Default | Description |
|---|---|---|---|
workerArgv | readonly string[] | (required) | The worker command and arguments. Must be non-empty. |
socketPath | string | derived | Explicit socket path. When omitted, derived from the tuple hash. Explicit paths get a sibling .lock, no .meta, and are skipped by status / GC. |
idleTimeout | number | 300 | Worker self-shutdown after this many seconds idle. Forwarded as --idle-timeout. |
connectTimeout | number | 30 | Max seconds to block waiting for the per-hash file lock. |
workerStartupTimeout | number | 60 | Max seconds to wait for the worker to print UNIX:<path>. |
workerStderr | string | — | When set, worker stderr is appended to this file; otherwise discarded. |
stateDir | string | defaultStateDir() | Override the default state directory. |
Advanced: state directory, locks, and GC
Section titled “Advanced: state directory, locks, and GC”These lower-level exports back the coordination layer. Most callers only need launch and serveUnix; reach for these when building custom tooling (status displays, cleanup jobs, alternative coordinators).
State directory and paths
Section titled “State directory and paths”defaultStateDir() returns the per-user directory used for lockfiles and sockets, creating it mode 0700 if missing:
- Linux:
$XDG_RUNTIME_DIR/vgi-rpc/when set, otherwise$TMPDIR/vgi-rpc-$UID/. - macOS / BSD:
$TMPDIR/vgi-rpc-$UID/. - Windows:
$TMP/vgi-rpc/.
On POSIX it refuses to operate on a directory not owned by the current user.
socketPaths(stateDir, hashId) returns the <hash>.lock / <hash>.sock / <hash>.meta triple for one worker tuple:
import { defaultStateDir, socketPaths, launcherComputeHash } from "@query-farm/vgi-rpc";
const stateDir = defaultStateDir();const hashId = await launcherComputeHash(["bun", "run", "worker.ts"]);const { lockPath, sockPath, metaPath } = socketPaths(stateDir, hashId);interface SocketPaths { lockPath: string; sockPath: string; metaPath: string;}launcherComputeHash(workerArgv, cwd?, env?) computes the 16-hex-char tuple hash. Only VGI_RPC_* environment keys participate, so workers that differ only in unrelated env (PATH, HOME, …) intentionally share a worker. The hash matches Python’s compute_hash byte-for-byte.
Probing
Section titled “Probing”probeSocket(sockPath, timeoutMs?) (default 2000 ms) resolves true if something is currently accepting connections on the socket, false otherwise:
import { probeSocket } from "@query-farm/vgi-rpc";
if (await probeSocket("/tmp/calculator.sock")) { console.log("a worker is live");}Cross-process locking
Section titled “Cross-process locking”launch serializes first-callers on a per-hash lockfile. The lock uses a persistent PID-stamp protocol (the lockfile is truncated to zero bytes on release rather than unlinked, so it remains a slot marker for scanners).
tryAcquireLock(lockPath)— non-blocking; returns aFileLockHandleornullif held by a live process.acquireLock(lockPath, timeoutMs)— polls until acquired or the timeout fires (throws on timeout).
import { acquireLock, tryAcquireLock, socketPaths, defaultStateDir, launcherComputeHash } from "@query-farm/vgi-rpc";
const hashId = await launcherComputeHash(["bun", "run", "worker.ts"]);const { lockPath } = socketPaths(defaultStateDir(), hashId);
const handle = await acquireLock(lockPath, 30_000);try { // critical section: probe + spawn} finally { handle.release();}
// Or non-blocking:const maybe = tryAcquireLock(lockPath);if (maybe) maybe.release();interface FileLockHandle { readonly path: string; /** Truncate the file to zero bytes; the file persists as a slot marker. Idempotent. */ release(): void;}Status
Section titled “Status”statusRows(stateDir) lists one row per <hash>.lock in the state directory. It is read-only and takes no locks, reading the best-effort .meta file and probing each socket for liveness:
import { statusRows, defaultStateDir } from "@query-farm/vgi-rpc";
for (const row of await statusRows(defaultStateDir())) { console.log(row.hashId, row.alive ? "alive" : "dead", row.cmd.join(" "));}interface StatusRow { hashId: string; cmd: string[]; cwd: string; socket: string; startedAt: number | null; alive: boolean;}Garbage collection
Section titled “Garbage collection”gcStateDir(stateDir, tryAcquire, options?) removes the .lock / .sock / .meta triples whose worker is no longer accepting connections. It only cleans entries whose lock it can acquire (so it never touches a live worker or an in-flight launch). The tryAcquire callback returns a release function when the lock is free, or null when it is held — pass one backed by tryAcquireLock:
import { gcStateDir, tryAcquireLock, defaultStateDir } from "@query-farm/vgi-rpc";
const result = await gcStateDir( defaultStateDir(), async (lockPath) => { const h = tryAcquireLock(lockPath); return h ? () => h.release() : null; }, { limit: 16 }, // optional: bound the scan; excludeHash skips a specific entry);
console.log("cleaned:", result.cleaned);console.log("skipped (in use):", result.skippedInUse);interface GcResult { /** Hash IDs of stale entries that were removed. */ cleaned: string[]; /** Hash IDs whose lockfile is currently held (a launch is in flight or the worker is alive). */ skippedInUse: string[];}launch already runs a bounded GC (limit 16) opportunistically after releasing its lock, so you rarely need to call gcStateDir directly.