Skip to main content
For listing workers, functions, triggers, or subscribing to function availability events, see Discover workers, functions, and triggers.

Installation

npm install iii-sdk

Initialization

Creates and returns a connected SDK instance. The WebSocket connection is established automatically — there is no separate connect() call.
import { registerWorker } from 'iii-sdk'

const iii = registerWorker(process.env.III_URL ?? 'ws://localhost:49134', {
  workerName: 'my-worker',
})

Methods

createChannel

Creates a streaming channel pair for worker-to-worker data transfer. Returns a Channel with a local writer/reader and serializable refs that can be passed as fields in the invocation data to other functions. Signature
createChannel(bufferSize: number) => Promise<Channel>

Parameters

NameTypeRequiredDescription
bufferSizenumberYesOptional buffer size for the channel (default: 64)

Example

const channel = await iii.createChannel()

// Pass the writer ref to another function
await iii.trigger({
  function_id: 'stream-producer',
  payload: { outputChannel: channel.writerRef },
})

// Read data locally
channel.reader.onMessage((msg) => {
  console.log('Received:', msg)
})

createStream

Creates a new stream implementation. This overrides the default stream implementation. Signature
createStream(streamName: string, stream: IStream<TData>) => void

Parameters

NameTypeRequiredDescription
streamNamestringYesThe name of the stream
streamIStream<TData>YesThe stream implementation

Example

const redisStream: IStream<UserSession> = {
  async get({ group_id, item_id }) {
    return JSON.parse(await redis.get(`${group_id}:${item_id}`) ?? 'null')
  },
  async set({ group_id, item_id, data }) {
    const old = await this.get({ stream_name: 'sessions', group_id, item_id })
    await redis.set(`${group_id}:${item_id}`, JSON.stringify(data))
    return { old_value: old ?? undefined, new_value: data }
  },
  async delete({ group_id, item_id }) {
    const old = await this.get({ stream_name: 'sessions', group_id, item_id })
    await redis.del(`${group_id}:${item_id}`)
    return { old_value: old ?? undefined }
  },
  async list({ group_id }) { return [] },
  async listGroups() { return [] },
  async update({ group_id, item_id, ops }) { return { new_value: {} } },
}

iii.createStream('sessions', redisStream)

registerFunction

Registers a new function with a local handler or an HTTP invocation config. Signature
registerFunction(functionId: string, handler: HttpInvocationConfig | RemoteFunctionHandler<any, any>, options: RegisterFunctionOptions) => FunctionRef

Parameters

NameTypeRequiredDescription
functionIdstringYesUnique function identifier
handlerHttpInvocationConfig | RemoteFunctionHandler<any, any>YesAsync handler for local execution, or an HTTP invocation config for external functions (Lambda, Cloudflare Workers, etc.)
optionsRegisterFunctionOptionsYesOptional function registration options (description, request/response formats, metadata)

Example

// Local handler
const ref = iii.registerFunction(
  'greet',
  async (data: { name: string }) => ({ message: `Hello, ${data.name}!` }),
  { description: 'Returns a greeting' },
)

// HTTP invocation
const lambdaRef = iii.registerFunction(
  'external::my-lambda',
  {
    url: 'https://abc123.lambda-url.us-east-1.on.aws',
    method: 'POST',
    timeout_ms: 30_000,
    auth: { type: 'bearer', token_key: 'LAMBDA_AUTH_TOKEN' },
  },
  { description: 'Proxied Lambda function' },
)

// Later, remove the function
ref.unregister()

registerService

Registers a new service. Signature
registerService(message: RegisterServiceInput) => void

Parameters

NameTypeRequiredDescription
messageRegisterServiceInputYesThe service to register

registerTrigger

Registers a new trigger. A trigger is a way to invoke a function when a certain event occurs. Signature
registerTrigger(trigger: RegisterTriggerInput) => Trigger

Parameters

NameTypeRequiredDescription
triggerRegisterTriggerInputYesThe trigger to register

Example

const trigger = iii.registerTrigger({
  type: 'cron',
  function_id: 'my-service::process-batch',
  config: { schedule: '*/5 * * * *' },
})

// Later, remove the trigger
trigger.unregister()

registerTriggerType

Registers a new trigger type. A trigger type is a way to invoke a function when a certain event occurs. Signature
registerTriggerType(triggerType: RegisterTriggerTypeInput, handler: TriggerHandler<TConfig>) => TriggerTypeRef<TConfig>

Parameters

NameTypeRequiredDescription
triggerTypeRegisterTriggerTypeInputYesThe trigger type to register
handlerTriggerHandler<TConfig>YesThe handler for the trigger type

Example

type CronConfig = { schedule: string }

iii.registerTriggerType<CronConfig>(
  { id: 'cron', description: 'Fires on a cron schedule' },
  {
    async registerTrigger({ id, function_id, config }) {
      startCronJob(id, config.schedule, () =>
        iii.trigger({ function_id, payload: {} }),
      )
    },
    async unregisterTrigger({ id }) {
      stopCronJob(id)
    },
  },
)

shutdown

Gracefully shutdown the iii, cleaning up all resources. Signature
shutdown() => Promise<void>

Example

process.on('SIGTERM', async () => {
  await iii.shutdown()
  process.exit(0)
})

trigger

Invokes a function using a request object. Signature
trigger(request: TriggerRequest<TInput>) => Promise<TOutput>

Parameters

NameTypeRequiredDescription
requestTriggerRequest<TInput>YesThe trigger request containing function_id, payload, and optional action/timeout

Example

// Synchronous invocation
const result = await iii.trigger<{ name: string }, { message: string }>({
  function_id: 'greet',
  payload: { name: 'World' },
  timeoutMs: 5000,
})
console.log(result.message) // "Hello, World!"

// Fire-and-forget
await iii.trigger({
  function_id: 'send-email',
  payload: { to: 'user@example.com' },
  action: TriggerAction.Void(),
})

// Enqueue for async processing
const receipt = await iii.trigger({
  function_id: 'process-order',
  payload: { orderId: '123' },
  action: TriggerAction.Enqueue({ queue: 'orders' }),
})

unregisterTriggerType

Unregisters a trigger type. Signature
unregisterTriggerType(triggerType: RegisterTriggerTypeInput) => void

Parameters

NameTypeRequiredDescription
triggerTypeRegisterTriggerTypeInputYesThe trigger type to unregister

Example

iii.unregisterTriggerType({ id: 'cron', description: 'Fires on a cron schedule' })

Subpath Exports

The iii-sdk package provides additional entry points:
Import pathContents
iii-sdkMessageType, ChannelReader, ChannelWriter, Logger, ISdk, ApiRequest, ApiResponse, AuthInput, AuthResult, Channel, etc.
iii-sdk/streamIStream, StreamAuthInput, StreamAuthResult, StreamChangeEvent, StreamJoinLeaveEvent, StreamJoinLeaveTriggerConfig, StreamJoinResult, StreamTriggerConfig, DeleteResult, StreamContext, etc.
iii-sdk/stateStateEventType, IState, StateEventData, DeleteResult, StateDeleteInput, StateDeleteResult, StateGetInput, StateListInput, StateSetInput, StateSetResult, etc.
iii-sdk/telemetryWorkerMetricsCollector, IIIReconnectionConfig, OtelConfig, ReconnectionConfig, WorkerGaugesOptions, WorkerMetricsCollectorOptions, IIIConnectionState, OtelLogEvent, RegisterFunctionFormat, WorkerMetrics, etc.

Logger

Structured logger that emits logs as OpenTelemetry LogRecords. Every log call automatically captures the active trace and span context, correlating your logs with distributed traces without any manual wiring. When OTel is not initialized, Logger gracefully falls back to console.*. Pass structured data as the second argument to any log method. Using an object of key-value pairs (instead of string interpolation) lets you filter, aggregate, and build dashboards in your observability backend.

debug

Log a debug-level message. Signature
debug(message: string, data: unknown) => void

Parameters

NameTypeRequiredDescription
messagestringYesHuman-readable log message.
dataunknownYesStructured context attached as OTel log attributes.
Use key-value objects to enable filtering and aggregation in your
observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.debug('Cache lookup', { key: 'user:42', hit: false })

error

Log an error-level message. Signature
error(message: string, data: unknown) => void

Parameters

NameTypeRequiredDescription
messagestringYesHuman-readable log message.
dataunknownYesStructured context attached as OTel log attributes.
Use key-value objects to enable filtering and aggregation in your
observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.error('Payment failed', { orderId: 'ord_123', gateway: 'stripe', errorCode: 'card_declined' })

info

Log an info-level message. Signature
info(message: string, data: unknown) => void

Parameters

NameTypeRequiredDescription
messagestringYesHuman-readable log message.
dataunknownYesStructured context attached as OTel log attributes.
Use key-value objects to enable filtering and aggregation in your
observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.info('Order processed', { orderId: 'ord_123', status: 'completed' })

warn

Log a warning-level message. Signature
warn(message: string, data: unknown) => void

Parameters

NameTypeRequiredDescription
messagestringYesHuman-readable log message.
dataunknownYesStructured context attached as OTel log attributes.
Use key-value objects to enable filtering and aggregation in your
observability backend (e.g. Grafana, Datadog, New Relic).

Example

logger.warn('Retry attempt', { attempt: 3, maxRetries: 5, endpoint: '/api/charge' })

Types

MessageType · ChannelReader · ChannelWriter · Channel · FunctionRef · HttpAuthConfig · HttpInvocationConfig · InitOptions · RegisterFunctionMessage · RegisterFunctionOptions · RegisterServiceInput · RegisterTriggerInput · RegisterTriggerMessage · RegisterTriggerTypeInput · RegisterTriggerTypeMessage · RemoteFunctionHandler · StreamChannelRef · Trigger · TriggerHandler · TriggerRequest · TriggerTypeRef · IStream · DeleteResult · StreamSetResult · StreamUpdateResult · DeleteResult · IIIReconnectionConfig · OtelConfig · ReconnectionConfig · RegisterFunctionFormat

MessageType

NameTypeRequiredDescription
InvocationResult"invocationresult"Yes-
InvokeFunction"invokefunction"Yes-
RegisterFunction"registerfunction"Yes-
RegisterService"registerservice"Yes-
RegisterTrigger"registertrigger"Yes-
RegisterTriggerType"registertriggertype"Yes-
TriggerRegistrationResult"triggerregistrationresult"Yes-
UnregisterFunction"unregisterfunction"Yes-
UnregisterTrigger"unregistertrigger"Yes-
UnregisterTriggerType"unregistertriggertype"Yes-
WorkerRegistered"workerregistered"Yes-

ChannelReader

Read end of a streaming channel. Provides both a Node.js Readable stream for binary data and an onMessage callback for structured text messages.
NameTypeRequiredDescription
streamReadableYesNode.js Readable stream for binary data.

ChannelWriter

Write end of a streaming channel. Provides both a Node.js Writable stream and a sendMessage method for sending structured text messages.
NameTypeRequiredDescription
streamWritableYesNode.js Writable stream for binary data.

Channel

A streaming channel pair for worker-to-worker data transfer. Created via ISdk.createChannel.
NameTypeRequiredDescription
readerChannelReaderYesReader end of the channel.
readerRefStreamChannelRefYesSerializable reference to the reader (can be sent to other workers).
writerChannelWriterYesWriter end of the channel.
writerRefStreamChannelRefYesSerializable reference to the writer (can be sent to other workers).

FunctionRef

Handle returned by ISdk.registerFunction. Contains the function’s id and an unregister() method.
NameTypeRequiredDescription
idstringYesThe unique function identifier.
unregister() => voidYesRemoves this function from the engine.

HttpAuthConfig

Authentication configuration for HTTP-invoked functions.
  • hmac — HMAC signature verification using a shared secret.
  • bearer — Bearer token authentication.
  • api_key — API key sent via a custom header.
type HttpAuthConfig = unknown | unknown | unknown

HttpInvocationConfig

Configuration for registering an HTTP-invoked function (Lambda, Cloudflare Workers, etc.) instead of a local handler.
NameTypeRequiredDescription
authHttpAuthConfigNoAuthentication configuration.
headersRecord<string, string>NoCustom headers to send with the request.
method"GET" | "POST" | "PUT" | "PATCH" | "DELETE"NoHTTP method. Defaults to POST.
timeout_msnumberNoTimeout in milliseconds.
urlstringYesURL to invoke.

InitOptions

Configuration options passed to registerWorker.
NameTypeRequiredDescription
enableMetricsReportingbooleanNoEnable worker metrics via OpenTelemetry. Defaults to true.
headersRecord<string, string>NoCustom HTTP headers sent during the WebSocket handshake.
invocationTimeoutMsnumberNoDefault timeout for trigger() in milliseconds. Defaults to 30000.
otelOmit<OtelConfig, “engineWsUrl”>NoOpenTelemetry configuration. OTel is initialized automatically by default.
Set { enabled: false } or env OTEL_ENABLED=false/0/no/off to disable.
The engineWsUrl is set automatically from the III address.
reconnectionConfigPartial<IIIReconnectionConfig>NoWebSocket reconnection behavior.
workerNamestringNoDisplay name for this worker. Defaults to hostname:pid.

RegisterFunctionMessage

NameTypeRequiredDescription
descriptionstringNoThe description of the function
idstringYesThe path of the function (use :: for namespacing, e.g. external::my_lambda)
invocationHttpInvocationConfigNoHTTP invocation config for external HTTP functions (Lambda, Cloudflare Workers, etc.)
message_typeMessageType.RegisterFunctionYes-
metadataRecord<string, unknown>No-
request_formatRegisterFunctionFormatNoThe request format of the function
response_formatRegisterFunctionFormatNoThe response format of the function

RegisterFunctionOptions

type RegisterFunctionOptions = Omit<RegisterFunctionMessage, "message_type" | "id">

RegisterServiceInput

type RegisterServiceInput = Omit<RegisterServiceMessage, "message_type">

RegisterTriggerInput

type RegisterTriggerInput = Omit<RegisterTriggerMessage, "message_type" | "id">

RegisterTriggerMessage

NameTypeRequiredDescription
configunknownYes-
function_idstringYes-
idstringYes-
message_typeMessageType.RegisterTriggerYes-
metadataRecord<string, unknown>No-
typestringYes-

RegisterTriggerTypeInput

type RegisterTriggerTypeInput = Omit<RegisterTriggerTypeMessage, "message_type">

RegisterTriggerTypeMessage

NameTypeRequiredDescription
descriptionstringYes-
idstringYes-
message_typeMessageType.RegisterTriggerTypeYes-

RemoteFunctionHandler

Async function handler for a registered function. Receives the invocation payload and returns the result.
type RemoteFunctionHandler = (data: TInput) => Promise<TOutput>

StreamChannelRef

Serializable reference to one end of a streaming channel. Can be included in invocation payloads to pass channel endpoints between workers.
NameTypeRequiredDescription
access_keystringYesAccess key for authentication.
channel_idstringYesUnique channel identifier.
direction"read" | "write"YesWhether this ref is for reading or writing.

Trigger

Handle returned by ISdk.registerTrigger. Use unregister() to remove the trigger from the engine.
NameTypeRequiredDescription
unregistervoidYesRemoves this trigger from the engine.

TriggerHandler

Handler interface for custom trigger types. Passed to ISdk.registerTriggerType.
NameTypeRequiredDescription
registerTriggerPromise<void>YesCalled when a trigger instance is registered.
unregisterTriggerPromise<void>YesCalled when a trigger instance is unregistered.

TriggerRequest

Request object passed to ISdk.trigger.
NameTypeRequiredDescription
actionTriggerActionNoRouting action. Omit for synchronous request/response.
function_idstringYesID of the function to invoke.
payloadTInputYesPayload to pass to the function.
timeoutMsnumberNoOverride the default invocation timeout in milliseconds.

TriggerTypeRef

Typed handle returned by ISdk.registerTriggerType. Provides convenience methods to register triggers and functions scoped to this trigger type, so callers don’t need to repeat the type field.
NameTypeRequiredDescription
idstringYesThe trigger type identifier.
registerFunctionFunctionRefYesRegister a function and immediately bind it to this trigger type.
registerTriggerTriggerYesRegister a trigger bound to this trigger type.
unregistervoidYesUnregister this trigger type from the engine.

IStream

Interface for custom stream implementations. Passed to ISdk.createStream to override the engine’s built-in stream storage.
NameTypeRequiredDescription
deletePromise<DeleteResult>YesDelete a stream item.
getPromise<TData | null>YesRetrieve a single item by group and item ID.
listPromise<TData[]>YesList all items in a group.
listGroupsPromise<string[]>YesList all group IDs in a stream.
setPromise<StreamSetResult<TData> | null>YesSet (create or overwrite) a stream item.
updatePromise<StreamUpdateResult<TData> | null>YesApply atomic update operations to a stream item.

DeleteResult

Result of a stream delete operation.
NameTypeRequiredDescription
old_valueanyNoPrevious value (if it existed).

StreamSetResult

Result of a stream set operation.
NameTypeRequiredDescription
new_valueTDataYesNew value that was stored.
old_valueTDataNoPrevious value (if it existed).

StreamUpdateResult

Result of a stream update operation.
NameTypeRequiredDescription
new_valueTDataYesNew value after the update.
old_valueTDataNoPrevious value (if it existed).

DeleteResult

Result of a state delete operation.
NameTypeRequiredDescription
old_valueanyNoPrevious value (if it existed).

IIIReconnectionConfig

Configuration for WebSocket reconnection behavior
NameTypeRequiredDescription
backoffMultipliernumberYesExponential backoff multiplier (default: 2)
initialDelayMsnumberYesStarting delay in milliseconds (default: 1000ms)
jitterFactornumberYesRandom jitter factor 0-1 (default: 0.3)
maxDelayMsnumberYesMaximum delay cap in milliseconds (default: 30000ms)
maxRetriesnumberYesMaximum retry attempts, -1 for infinite (default: -1)

OtelConfig

Configuration for OpenTelemetry initialization.
NameTypeRequiredDescription
enabledbooleanNoWhether OpenTelemetry export is enabled. Defaults to true. Set to false or OTEL_ENABLED=false/0/no/off to disable.
engineWsUrlstringNoIII Engine WebSocket URL. Defaults to III_URL or “ws://localhost:49134”.
fetchInstrumentationEnabledbooleanNoWhether to auto-instrument globalThis.fetch calls. Defaults to true. Works on Node.js, Bun, and Deno. Set to false to disable.
instrumentationsInstrumentation<InstrumentationConfig>[]NoOpenTelemetry instrumentations to register (e.g., PrismaInstrumentation).
logsBatchSizenumberNoMaximum number of log records exported per batch. Defaults to 1.
logsFlushIntervalMsnumberNoLog processor flush delay in milliseconds. Defaults to 100ms.
metricsEnabledbooleanNoWhether OpenTelemetry metrics export is enabled. Defaults to true. Set to false or OTEL_METRICS_ENABLED=false/0/no/off to disable.
metricsExportIntervalMsnumberNoMetrics export interval in milliseconds. Defaults to 60000 (60 seconds).
reconnectionConfigPartial<ReconnectionConfig>NoOptional reconnection configuration for the WebSocket connection.
serviceInstanceIdstringNoThe service instance ID to report. Defaults to SERVICE_INSTANCE_ID env var or auto-generated UUID.
serviceNamestringNoThe service name to report. Defaults to OTEL_SERVICE_NAME or “iii-node”.
serviceNamespacestringNoThe service namespace to report. Defaults to SERVICE_NAMESPACE env var.
serviceVersionstringNoThe service version to report. Defaults to SERVICE_VERSION env var or “unknown”.

ReconnectionConfig

Configuration for WebSocket reconnection behavior
NameTypeRequiredDescription
backoffMultipliernumberYesExponential backoff multiplier (default: 2)
initialDelayMsnumberYesStarting delay in milliseconds (default: 1000ms)
jitterFactornumberYesRandom jitter factor 0-1 (default: 0.3)
maxDelayMsnumberYesMaximum delay cap in milliseconds (default: 30000ms)
maxRetriesnumberYesMaximum retry attempts, -1 for infinite (default: -1)

RegisterFunctionFormat

NameTypeRequiredDescription
descriptionstringNoThe description of the parameter
itemsunknownNoThe items of the parameter (for arrays)
namestringNoThe name of the parameter
propertiesRecord<string, unknown>NoThe body of the parameter (for objects)
requiredstring[]NoWhether the parameter is required
type"string" | "number" | "boolean" | "object" | "array" | "null" | "map" | "integer"NoThe type of the parameter