Example 01
Consensus Evaluation Pipeline
/**
* Consensus Evaluation Pipeline
*
* Runs VISION_CALL_COUNT parallel vision model calls against an item image,
* then passes the quorum of successful valuations to a judge model that produces
* a consensus value and confidence score.
*
* ─── Idempotency / duplicate-execution contract ──────────────────────────────
*
* This pipeline is intentionally stateless. It does NOT implement an atomic
* execution claim or persisted workflow state. Duplicate-execution safety is
* the caller's responsibility:
* - The caller must ensure at most one worker runs runConsensusEvaluation
* per itemId at a time (for example via an upstream job claim or lock).
* - itemId is threaded as a correlation key, not an idempotency gate.
* - If crash recovery or replay semantics are required, wrap this pipeline
* in a durable workflow with its own state machine.
*
* ─── Design decisions ────────────────────────────────────────────────────────
*
* Quorum:
* Minimum successful vision calls required to proceed to judge (default: 2/3).
* Retryability of QUORUM_NOT_MET is derived from failure composition:
* - Majority transient failures → retry_with_backoff
* - Majority permanent/invalid output → no_retry
* - Mixed / ambiguous → dlq_only
*
* Partial-failure model:
* Each vision call returns a typed VisionOutcome (success, failure, or cancelled).
* Cancelled outcomes (intentional early-cancel after quorum) are NOT counted as
* failures and do NOT contribute to degraded status or failureSummary.
* Invalid model output (INVALID_OUTPUT) is a permanent contract failure and is
* never retried.
* Transient failures (TIMEOUT, 5xx, RATE_LIMITED) are retried up to
* MAX_VISION_RETRIES.
*
* Early cancel:
* If options.earlyCancel = true, remaining in-flight vision calls are aborted
* once quorum is met, via a composed AbortSignal (timeout + earlyCancel merged).
* Calls aborted by peer quorum completion return
* { ok: false, cancelled: true } and are excluded from failure accounting.
* Default: false — all calls run to completion for richer consensus signal.
* Tradeoff: earlyCancel reduces cost and latency but reduces consensus signal.
*
* Model diversity:
* options.visionModelIds allows specifying a model per call slot.
* Length must equal VISION_CALL_COUNT if provided.
* Duplicates are allowed. If true model diversity is desired, the caller must
* provide distinct model IDs explicitly.
*
* Consensus input determinism:
* Valuations are sorted by callIndex before passing to the judge so that judge
* input is stable regardless of arrival order.
* The judge receives anonymous ordered { value, confidence } pairs — not callIndex.
*
* Judge rationale:
* JudgeResult.rationale is a structured justification summary, not raw
* chain-of-thought.
*
* ─── Service contracts ───────────────────────────────────────────────────────
*
* visionModelService.call(input, options):
* - Returns RawVisionOutput: { value: number, confidence: number, modelId: string }
* - Throws ModelServiceError (.statusCode, .code) on service-layer failure.
* - Throws DOMException("...", "AbortError") on AbortSignal cancellation.
* - Must normalize AbortError across runtimes (Node fetch, undici, browser).
*
* judgeModelService.call(input, options):
* - Returns RawJudgeOutput: { value: number, confidence: number,
* rationale: string, modelId: string }
* - Same error and abort contract as visionModelService.
*
* ─── Metric label notes ──────────────────────────────────────────────────────
*
* All label values are strings because the metrics backend requires string labels.
* Numeric and boolean values are stringified at label construction sites.
* successCount and required are emitted to logs only, not as metric labels, to
* avoid unnecessary cardinality.
*
* ─── Metrics emitted ─────────────────────────────────────────────────────────
*
* Counters:
* consensus.vision_call.success { itemId, callIndex, modelId, environment }
* consensus.vision_call.failure { itemId, callIndex, modelId, errorCode, attempt, environment }
* consensus.vision_call.timeout { itemId, callIndex, environment }
* consensus.vision_call.cancelled { itemId, callIndex, environment }
* consensus.quorum.failed { itemId, retryStrategy, environment }
* consensus.judge.success { itemId, modelId, environment }
* consensus.judge.failure { itemId, modelId, errorCode, attempt, environment }
* consensus.pipeline.success { itemId, degraded, environment }
* consensus.pipeline.failure { itemId, errorCode, environment }
*
* Timings:
* consensus.vision_call.duration_ms { itemId, callIndex, modelId, environment }
* consensus.judge.duration_ms { itemId, modelId, environment }
* consensus.pipeline.duration_ms { itemId, environment }
*/
import { z } from "zod";
import { visionModelService } from "./services/visionModelService";
import { judgeModelService } from "./services/judgeModelService";
import { metrics } from "./lib/metrics";
import { loggerFactory } from "./lib/logger";
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
export const VISION_CALL_COUNT = 3;
const DEFAULT_QUORUM = 2;
const DEFAULT_VISION_TIMEOUT_MS = 30_000;
const DEFAULT_JUDGE_TIMEOUT_MS = 20_000;
/** Max retries per transient vision failure. Total attempts = retries + 1. */
const MAX_VISION_RETRIES = 1;
/** Max retries per transient judge failure. */
const MAX_JUDGE_RETRIES = 1;
// ---------------------------------------------------------------------------
// Zod schemas
// ---------------------------------------------------------------------------
const ImageInputSchema = z.object({
data: z.string().min(1, "image data must not be empty"),
mediaType: z.enum(["image/jpeg", "image/png", "image/webp"]),
});
const RunConsensusOptionsSchema = z
.object({
quorum: z.number().int().min(1).max(VISION_CALL_COUNT).default(DEFAULT_QUORUM),
visionTimeoutMs: z
.number()
.int()
.min(1_000)
.max(120_000)
.default(DEFAULT_VISION_TIMEOUT_MS),
judgeTimeoutMs: z
.number()
.int()
.min(1_000)
.max(120_000)
.default(DEFAULT_JUDGE_TIMEOUT_MS),
earlyCancel: z.boolean().default(false),
visionModelIds: z
.array(z.string().min(1))
.length(VISION_CALL_COUNT, `visionModelIds must have exactly ${VISION_CALL_COUNT} entries`)
.optional(),
})
.default({});
const RawVisionOutputSchema = z.object({
value: z.number().finite("vision value must be finite"),
confidence: z.number().min(0).max(1, "vision confidence must be [0, 1]"),
modelId: z.string().min(1),
});
const RawJudgeOutputSchema = z.object({
value: z.number().finite("judge value must be finite"),
confidence: z.number().min(0).max(1, "judge confidence must be [0, 1]"),
rationale: z.string().min(1, "judge rationale must not be empty"),
modelId: z.string().min(1),
});
const JudgeInputEntrySchema = z.object({
value: z.number().finite(),
confidence: z.number().min(0).max(1),
});
const JudgeInputSchema = z.array(JudgeInputEntrySchema).min(1);
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
export type ImageInput = z.infer<typeof ImageInputSchema>;
export type RunConsensusOptions = z.infer<typeof RunConsensusOptionsSchema>;
export interface VisionResult {
callIndex: number;
value: number;
confidence: number;
modelId: string;
durationMs: number;
}
export interface JudgeResult {
consensusValue: number;
confidence: number;
rationale: string;
modelId: string;
}
export interface ConsensusResult {
itemId: string;
valuations: VisionResult[];
judge: JudgeResult;
/**
* True if fewer than VISION_CALL_COUNT calls genuinely failed but quorum still
* succeeded. Cancelled calls do not contribute to degraded status.
*/
degraded: boolean;
durationMs: number;
}
// ---------------------------------------------------------------------------
// Error types
// ---------------------------------------------------------------------------
export type VisionErrorCode =
| "TIMEOUT"
| "RATE_LIMITED"
| "SERVER_ERROR"
| "CLIENT_ERROR"
| "INVALID_OUTPUT";
export type JudgeErrorCode =
| "TIMEOUT"
| "RATE_LIMITED"
| "SERVER_ERROR"
| "CLIENT_ERROR"
| "INVALID_OUTPUT"
| "INVALID_INPUT";
export type ConsensusErrorCode =
| "VALIDATION_FAILED"
| "QUORUM_NOT_MET"
| "VISION_ALL_FAILED"
| "JUDGE_FAILED"
| "JUDGE_INVALID_OUTPUT"
| "JUDGE_INVALID_INPUT";
export type RetryStrategy = "no_retry" | "retry_with_backoff" | "dlq_only";
export interface VisionFailureSummary {
transientCount: number;
clientErrorCount: number;
invalidOutputCount: number;
}
export class ConsensusError extends Error {
public readonly errorCode: ConsensusErrorCode;
public readonly itemId: string;
public readonly retryStrategy: RetryStrategy;
public readonly failureSummary?: VisionFailureSummary;
public readonly judgeErrorCode?: JudgeErrorCode;
constructor(
message: string,
errorCode: ConsensusErrorCode,
itemId: string,
retryStrategy: RetryStrategy,
extras: { failureSummary?: VisionFailureSummary; judgeErrorCode?: JudgeErrorCode } = {},
options?: ErrorOptions,
) {
super(message, options);
Object.setPrototypeOf(this, new.target.prototype);
this.name = "ConsensusError";
this.errorCode = errorCode;
this.itemId = itemId;
this.retryStrategy = retryStrategy;
this.failureSummary = extras.failureSummary;
this.judgeErrorCode = extras.judgeErrorCode;
}
}
// ---------------------------------------------------------------------------
// Internal types
// ---------------------------------------------------------------------------
type VisionCallSuccess = { ok: true; result: VisionResult };
type VisionCallFailure = {
ok: false;
cancelled: false;
callIndex: number;
errorCode: VisionErrorCode;
error: ErrorMeta;
};
type VisionCallCancelled = {
ok: false;
cancelled: true;
callIndex: number;
};
type VisionOutcome = VisionCallSuccess | VisionCallFailure | VisionCallCancelled;
type RawVisionOutput = z.infer<typeof RawVisionOutputSchema>;
type RawJudgeOutput = z.infer<typeof RawJudgeOutputSchema>;
interface ModelServiceError extends Error {
statusCode?: number;
code?: string;
}
// ---------------------------------------------------------------------------
// Typed metric labels
// ---------------------------------------------------------------------------
interface VisionMetricLabels {
itemId: string;
callIndex: string;
modelId: string;
environment: string;
errorCode?: VisionErrorCode;
attempt?: string;
}
interface JudgeMetricLabels {
itemId: string;
modelId: string;
environment: string;
errorCode?: JudgeErrorCode;
attempt?: string;
}
interface PipelineMetricLabels {
itemId: string;
environment: string;
errorCode?: ConsensusErrorCode;
degraded?: string;
retryStrategy?: RetryStrategy;
}
interface StructuredLogger {
info(msg: string, meta?: Record<string, unknown>): void;
warn(msg: string, meta?: Record<string, unknown>): void;
error(msg: string, meta?: Record<string, unknown>): void;
}
// ---------------------------------------------------------------------------
// Core pipeline
// ---------------------------------------------------------------------------
export async function runConsensusEvaluation(
itemId: string,
image: ImageInput,
options?: Partial<RunConsensusOptions>,
): Promise<ConsensusResult> {
if (!itemId?.trim()) {
throw new ConsensusError("Missing itemId", "VALIDATION_FAILED", itemId ?? "UNKNOWN", "no_retry");
}
const imageResult = ImageInputSchema.safeParse(image);
if (!imageResult.success) {
throw new ConsensusError(
"Invalid image input",
"VALIDATION_FAILED",
itemId,
"no_retry",
{},
{ cause: new Error(imageResult.error.message) },
);
}
const optResult = RunConsensusOptionsSchema.safeParse(options ?? {});
if (!optResult.success) {
throw new ConsensusError(
"Invalid options",
"VALIDATION_FAILED",
itemId,
"no_retry",
{},
{ cause: new Error(optResult.error.message) },
);
}
const { quorum, visionTimeoutMs, judgeTimeoutMs, earlyCancel, visionModelIds } = optResult.data;
const env = resolveEnvironment();
const pipeLabels: PipelineMetricLabels = { itemId, environment: env };
const logger: StructuredLogger = loggerFactory.create({ itemId, environment: env });
const pipeStart = performanceNow();
logger.info("Consensus pipeline started", {
evt: "PIPELINE_STARTED",
quorum,
visionCallCount: VISION_CALL_COUNT,
visionTimeoutMs,
judgeTimeoutMs,
earlyCancel,
visionModelIds: visionModelIds ?? "default",
});
let valuations: VisionResult[];
let failedCount: number;
try {
({ valuations, failedCount } = await runVisionCalls({
itemId,
image: imageResult.data,
quorum,
visionTimeoutMs,
earlyCancel,
visionModelIds,
env,
logger,
}));
} catch (err) {
if (err instanceof ConsensusError) {
const durationMs = performanceNow() - pipeStart;
metrics.increment("consensus.pipeline.failure", { ...pipeLabels, errorCode: err.errorCode });
metrics.timing("consensus.pipeline.duration_ms", durationMs, pipeLabels);
logger.error("Pipeline failed at vision stage", {
evt: "PIPELINE_FAILED",
errorCode: err.errorCode,
failureSummary: err.failureSummary,
durationMs,
});
throw err;
}
throw err;
}
const degraded = failedCount > 0;
if (degraded) {
logger.warn("Degraded run — quorum met but some vision calls genuinely failed", {
evt: "PIPELINE_DEGRADED",
successCount: valuations.length,
failedCount,
});
}
let judgeResult: JudgeResult;
try {
judgeResult = await runJudge({ itemId, valuations, judgeTimeoutMs, env, logger });
} catch (err) {
if (err instanceof ConsensusError) {
const durationMs = performanceNow() - pipeStart;
metrics.increment("consensus.pipeline.failure", { ...pipeLabels, errorCode: err.errorCode });
metrics.timing("consensus.pipeline.duration_ms", durationMs, pipeLabels);
logger.error("Pipeline failed at judge stage", {
evt: "PIPELINE_FAILED",
errorCode: err.errorCode,
judgeErrorCode: err.judgeErrorCode,
durationMs,
});
throw err;
}
throw err;
}
const durationMs = performanceNow() - pipeStart;
metrics.increment("consensus.pipeline.success", { ...pipeLabels, degraded: String(degraded) });
metrics.timing("consensus.pipeline.duration_ms", durationMs, pipeLabels);
logger.info("Consensus pipeline completed", {
evt: "PIPELINE_COMPLETED",
consensusValue: judgeResult.consensusValue,
confidence: judgeResult.confidence,
degraded,
durationMs,
});
return {
itemId,
valuations,
judge: judgeResult,
degraded,
durationMs,
};
}
// ---------------------------------------------------------------------------
// Vision calls
// ---------------------------------------------------------------------------
async function runVisionCalls({
itemId,
image,
quorum,
visionTimeoutMs,
earlyCancel,
visionModelIds,
env,
logger,
}: {
itemId: string;
image: ImageInput;
quorum: number;
visionTimeoutMs: number;
earlyCancel: boolean;
visionModelIds?: string[];
env: string;
logger: StructuredLogger;
}): Promise<{ valuations: VisionResult[]; failedCount: number }> {
const earlyAbortController = new AbortController();
// This counter is only used to decide when to fire the shared early-cancel
// signal. Final truth comes from the settled outcomes array below.
let successCountForAbort = 0;
const outcomes = await Promise.all(
Array.from({ length: VISION_CALL_COUNT }, (_, callIndex) =>
runSingleVisionCall({
itemId,
image,
callIndex,
modelId: visionModelIds?.[callIndex],
visionTimeoutMs,
earlyCancel,
earlyAbortController,
env,
logger,
onSuccess: () => {
successCountForAbort += 1;
if (earlyCancel && successCountForAbort === quorum) {
earlyAbortController.abort(
new Error(`Quorum of ${quorum} met — aborting remaining vision calls`),
);
}
},
}),
),
);
const successes = outcomes.filter((o): o is VisionCallSuccess => o.ok);
const failures = outcomes.filter((o): o is VisionCallFailure => !o.ok && !o.cancelled);
// Cancelled outcomes are intentionally excluded from failure accounting.
const failureSummary: VisionFailureSummary = {
transientCount: failures.filter((f) => isTransientVisionError(f.errorCode)).length,
clientErrorCount: failures.filter((f) => f.errorCode === "CLIENT_ERROR").length,
invalidOutputCount: failures.filter((f) => f.errorCode === "INVALID_OUTPUT").length,
};
if (successes.length === 0) {
const retryStrategy = deriveVisionRetryStrategy(failureSummary, failures.length);
logger.error("All vision calls failed", {
evt: "ALL_VISION_CALLS_FAILED",
failureSummary,
failures: failures.map((f) => ({
callIndex: f.callIndex,
errorCode: f.errorCode,
error: f.error,
})),
});
throw new ConsensusError("All vision calls failed", "VISION_ALL_FAILED", itemId, retryStrategy, {
failureSummary,
});
}
if (successes.length < quorum) {
const retryStrategy = deriveVisionRetryStrategy(failureSummary, failures.length);
metrics.increment("consensus.quorum.failed", {
itemId,
environment: env,
retryStrategy,
});
logger.error("Quorum not met", {
evt: "QUORUM_NOT_MET",
successCount: successes.length,
required: quorum,
retryStrategy,
failureSummary,
failures: failures.map((f) => ({
callIndex: f.callIndex,
errorCode: f.errorCode,
})),
});
throw new ConsensusError(
`Quorum not met: ${successes.length}/${quorum} required vision calls succeeded`,
"QUORUM_NOT_MET",
itemId,
retryStrategy,
{ failureSummary },
);
}
return {
valuations: successes.map((s) => s.result),
failedCount: failures.length,
};
}
async function runSingleVisionCall({
itemId,
image,
callIndex,
modelId,
visionTimeoutMs,
earlyCancel,
earlyAbortController,
env,
logger,
onSuccess,
}: {
itemId: string;
image: ImageInput;
callIndex: number;
modelId?: string;
visionTimeoutMs: number;
earlyCancel: boolean;
earlyAbortController: AbortController;
env: string;
logger: StructuredLogger;
onSuccess: () => void;
}): Promise<VisionOutcome> {
if (earlyCancel && earlyAbortController.signal.aborted) {
metrics.increment("consensus.vision_call.cancelled", {
itemId,
callIndex: String(callIndex),
environment: env,
});
return { ok: false, cancelled: true, callIndex };
}
let lastErrorCode: VisionErrorCode = "SERVER_ERROR";
let lastError: unknown;
for (let attempt = 0; attempt <= MAX_VISION_RETRIES; attempt++) {
const callController = new AbortController();
const timeoutHandle = setTimeout(() => {
callController.abort(new Error(`Vision call ${callIndex} timed out after ${visionTimeoutMs}ms`));
}, visionTimeoutMs);
const composedSignal = earlyCancel
? mergeSignals(callController.signal, earlyAbortController.signal)
: callController.signal;
const callStart = performanceNow();
const visionLabels: VisionMetricLabels = {
itemId,
callIndex: String(callIndex),
modelId: modelId ?? "default",
environment: env,
};
try {
const raw = (await visionModelService.call(
{ image, modelId, requestId: `${itemId}_v${callIndex}_a${attempt}` },
{ signal: composedSignal },
)) as RawVisionOutput;
clearTimeout(timeoutHandle);
const durationMs = performanceNow() - callStart;
// If peer quorum abort fired while the provider was finishing, we intentionally
// discard the late result and treat the call as cancelled for deterministic
// early-cancel behavior.
if (earlyCancel && earlyAbortController.signal.aborted) {
metrics.increment("consensus.vision_call.cancelled", {
itemId,
callIndex: String(callIndex),
environment: env,
});
return { ok: false, cancelled: true, callIndex };
}
const parsed = RawVisionOutputSchema.safeParse(raw);
if (!parsed.success) {
lastErrorCode = "INVALID_OUTPUT";
lastError = new Error(`Vision call ${callIndex} invalid output: ${parsed.error.message}`);
metrics.increment("consensus.vision_call.failure", {
...visionLabels,
errorCode: "INVALID_OUTPUT",
attempt: String(attempt),
});
logger.error("Vision call returned invalid output — permanent failure, not retrying", {
evt: "VISION_CALL_INVALID_OUTPUT",
callIndex,
attempt,
durationMs,
issues: parsed.error.issues.map((i) => ({
path: i.path.join("."),
message: i.message,
})),
});
break;
}
const result: VisionResult = {
callIndex,
value: parsed.data.value,
confidence: parsed.data.confidence,
modelId: parsed.data.modelId,
durationMs,
};
const successLabels: VisionMetricLabels = {
...visionLabels,
modelId: parsed.data.modelId,
};
metrics.increment("consensus.vision_call.success", successLabels);
metrics.timing("consensus.vision_call.duration_ms", durationMs, successLabels);
logger.info("Vision call succeeded", {
evt: "VISION_CALL_SUCCEEDED",
callIndex,
attempt,
modelId: parsed.data.modelId,
value: result.value,
confidence: result.confidence,
durationMs,
});
onSuccess();
return { ok: true, result };
} catch (err) {
clearTimeout(timeoutHandle);
if (earlyCancel && err instanceof Error && err.name === "AbortError" && earlyAbortController.signal.aborted) {
metrics.increment("consensus.vision_call.cancelled", {
itemId,
callIndex: String(callIndex),
environment: env,
});
return { ok: false, cancelled: true, callIndex };
}
lastError = err;
lastErrorCode = classifyVisionError(err);
const durationMs = performanceNow() - callStart;
if (lastErrorCode === "TIMEOUT") {
metrics.increment("consensus.vision_call.timeout", {
itemId,
callIndex: String(callIndex),
environment: env,
});
}
metrics.increment("consensus.vision_call.failure", {
...visionLabels,
errorCode: lastErrorCode,
attempt: String(attempt),
});
logger.warn("Vision call failed", {
evt: "VISION_CALL_FAILED",
callIndex,
attempt,
errorCode: lastErrorCode,
error: toErrorMeta(err),
durationMs,
});
if (!isTransientVisionError(lastErrorCode)) {
break;
}
}
}
return {
ok: false,
cancelled: false,
callIndex,
errorCode: lastErrorCode,
error: toErrorMeta(lastError),
};
}
// ---------------------------------------------------------------------------
// Judge call
// ---------------------------------------------------------------------------
async function runJudge({
itemId,
valuations,
judgeTimeoutMs,
env,
logger,
}: {
itemId: string;
valuations: VisionResult[];
judgeTimeoutMs: number;
env: string;
logger: StructuredLogger;
}): Promise<JudgeResult> {
const orderedValuations = [...valuations].sort((a, b) => a.callIndex - b.callIndex);
const rawJudgeInput = orderedValuations.map((v) => ({
value: v.value,
confidence: v.confidence,
}));
const inputParsed = JudgeInputSchema.safeParse(rawJudgeInput);
if (!inputParsed.success) {
throw new ConsensusError(
"Malformed valuations sent to judge",
"JUDGE_INVALID_INPUT",
itemId,
"no_retry",
{ judgeErrorCode: "INVALID_INPUT" },
{ cause: new Error(inputParsed.error.message) },
);
}
let lastError: unknown;
let lastErrorCode: JudgeErrorCode = "SERVER_ERROR";
for (let attempt = 0; attempt <= MAX_JUDGE_RETRIES; attempt++) {
const controller = new AbortController();
const timeoutHandle = setTimeout(() => {
controller.abort(new Error(`Judge call timed out after ${judgeTimeoutMs}ms`));
}, judgeTimeoutMs);
const callStart = performanceNow();
const judgeLabels: JudgeMetricLabels = {
itemId,
modelId: "unknown",
environment: env,
};
try {
const raw = (await judgeModelService.call(
{ valuations: inputParsed.data, requestId: `${itemId}_judge_a${attempt}` },
{ signal: controller.signal },
)) as RawJudgeOutput;
clearTimeout(timeoutHandle);
const durationMs = performanceNow() - callStart;
const parsed = RawJudgeOutputSchema.safeParse(raw);
if (!parsed.success) {
metrics.increment("consensus.judge.failure", {
...judgeLabels,
errorCode: "INVALID_OUTPUT",
attempt: String(attempt),
});
logger.error("Judge returned invalid output — permanent failure", {
evt: "JUDGE_INVALID_OUTPUT",
attempt,
durationMs,
issues: parsed.error.issues.map((i) => ({
path: i.path.join("."),
message: i.message,
})),
});
throw new ConsensusError(
"Judge returned invalid output",
"JUDGE_INVALID_OUTPUT",
itemId,
"no_retry",
{ judgeErrorCode: "INVALID_OUTPUT" },
{ cause: new Error(parsed.error.message) },
);
}
const result: JudgeResult = {
consensusValue: parsed.data.value,
confidence: parsed.data.confidence,
rationale: parsed.data.rationale,
modelId: parsed.data.modelId,
};
const successLabels: JudgeMetricLabels = {
...judgeLabels,
modelId: parsed.data.modelId,
};
metrics.increment("consensus.judge.success", successLabels);
metrics.timing("consensus.judge.duration_ms", durationMs, successLabels);
logger.info("Judge call succeeded", {
evt: "JUDGE_CALL_SUCCEEDED",
attempt,
modelId: parsed.data.modelId,
consensusValue: result.consensusValue,
confidence: result.confidence,
durationMs,
});
return result;
} catch (err) {
clearTimeout(timeoutHandle);
if (
err instanceof ConsensusError &&
(err.errorCode === "JUDGE_INVALID_OUTPUT" || err.errorCode === "JUDGE_INVALID_INPUT")
) {
throw err;
}
lastError = err;
lastErrorCode = classifyJudgeError(err);
const durationMs = performanceNow() - callStart;
metrics.increment("consensus.judge.failure", {
...judgeLabels,
errorCode: lastErrorCode,
attempt: String(attempt),
});
logger.warn("Judge call failed", {
evt: "JUDGE_CALL_FAILED",
attempt,
errorCode: lastErrorCode,
error: toErrorMeta(err),
durationMs,
});
if (!isTransientJudgeError(lastErrorCode)) {
break;
}
}
}
throw new ConsensusError(
"Judge call failed after retries",
"JUDGE_FAILED",
itemId,
retryStrategyForJudgeError(lastErrorCode),
{ judgeErrorCode: lastErrorCode },
{ cause: lastError },
);
}
// ---------------------------------------------------------------------------
// Retry strategy derivation
// ---------------------------------------------------------------------------
function deriveVisionRetryStrategy(summary: VisionFailureSummary, totalFailures: number): RetryStrategy {
const permanentTotal = summary.clientErrorCount + summary.invalidOutputCount;
if (permanentTotal === 0) {
return "retry_with_backoff";
}
if (permanentTotal > totalFailures / 2) {
return "no_retry";
}
return "dlq_only";
}
function retryStrategyForJudgeError(code: JudgeErrorCode): RetryStrategy {
switch (code) {
case "TIMEOUT":
case "RATE_LIMITED":
case "SERVER_ERROR":
return "retry_with_backoff";
case "CLIENT_ERROR":
case "INVALID_OUTPUT":
case "INVALID_INPUT":
return "no_retry";
}
}
// ---------------------------------------------------------------------------
// Signal composition
// ---------------------------------------------------------------------------
function mergeSignals(...signals: AbortSignal[]): AbortSignal {
if (typeof AbortSignal.any === "function") {
return AbortSignal.any(signals);
}
const merged = new AbortController();
const abortOnce = (reason: unknown): void => {
if (!merged.signal.aborted) {
merged.abort(reason);
}
};
for (const signal of signals) {
if (signal.aborted) {
abortOnce(signal.reason);
break;
}
signal.addEventListener(
"abort",
() => {
abortOnce(signal.reason);
},
{ once: true },
);
}
return merged.signal;
}
// ---------------------------------------------------------------------------
// Error classification
// ---------------------------------------------------------------------------
function classifyVisionError(err: unknown): VisionErrorCode {
if (err instanceof Error && err.name === "AbortError") {
return "TIMEOUT";
}
const e = err as ModelServiceError;
const status = e?.statusCode;
if (status === 429) return "RATE_LIMITED";
if (status !== undefined && status >= 500) return "SERVER_ERROR";
if (status !== undefined && status >= 400) return "CLIENT_ERROR";
if (e?.code === "ECONNRESET" || e?.code === "ENOTFOUND") return "SERVER_ERROR";
return "SERVER_ERROR";
}
function classifyJudgeError(err: unknown): JudgeErrorCode {
if (err instanceof Error && err.name === "AbortError") {
return "TIMEOUT";
}
const e = err as ModelServiceError;
const status = e?.statusCode;
if (status === 429) return "RATE_LIMITED";
if (status !== undefined && status >= 500) return "SERVER_ERROR";
if (status !== undefined && status >= 400) return "CLIENT_ERROR";
if (e?.code === "ECONNRESET" || e?.code === "ENOTFOUND") return "SERVER_ERROR";
return "SERVER_ERROR";
}
function isTransientVisionError(code: VisionErrorCode): boolean {
return code === "TIMEOUT" || code === "RATE_LIMITED" || code === "SERVER_ERROR";
}
function isTransientJudgeError(code: JudgeErrorCode): boolean {
return code === "TIMEOUT" || code === "RATE_LIMITED" || code === "SERVER_ERROR";
}
// ---------------------------------------------------------------------------
// Utilities
// ---------------------------------------------------------------------------
export interface ErrorMeta {
message: string;
name: string;
code?: string;
status?: number;
hasStack: boolean;
}
function toErrorMeta(err: unknown): ErrorMeta {
if (err instanceof Error) {
const e = err as ModelServiceError;
return {
message: e.message,
name: e.name,
code: e.code,
status: e.statusCode,
hasStack: Boolean(e.stack),
};
}
return {
message: String(err),
name: "UnknownError",
hasStack: false,
};
}
function performanceNow(): number {
return typeof performance !== "undefined" ? performance.now() : Date.now();
}
function resolveEnvironment(): string {
return process.env.APP_ENV ?? process.env.NODE_ENV ?? "unknown";
}
Example 02
Idempotent Inference Billing Saga
/**
* Idempotent Inference Billing Saga
*
* This is a distributed compensating-transaction workflow, NOT an atomic operation.
* Each step has an explicit compensation. No single transactional boundary spans
* all three services (billing / model / DLQ).
*
* Guarantees:
* - Duplicate execution is prevented via atomic CAS claim, not a read-then-act check
* - Every state transition is a typed CAS (fromState → toState), enforced by the store
* - InvalidTransitionError maps to STATE_CONFLICT, not INFRA_UNAVAILABLE
* - Reservation-failure state transition uses safeTransition, consistent with all others
* - billingService.reserve errors are classified as infra-transient vs billing-business
* - billingPending (success) and billingUnresolved (error) are deliberately distinct
* - All metric labels are fully typed — no `as any` casts
* - Claim TTL is intentionally sized relative to max possible saga wall-clock time
* - Typed, structured error codes — no regex-on-message retry classification
* - Strongly typed, versioned DLQ payloads with type + modelId + environment labels
* - Labeled metrics at every service boundary
*
* ─── Downstream service contracts ───────────────────────────────────────────
*
* billingService.reserve / capture / release:
* - Idempotent on idempotencyKey; returns existing result on replay.
* - Reservation TTL: 10 minutes.
* - capture() rejects if actualUsage.totalTokens > reservation.maxTokens.
* - Pricing is locked at reservation time; the billing store owns version drift.
* - MUST throw BillingRejectedError for business-logic rejections (insufficient
* credits, model not available, etc.) and a generic Error for infra failures.
* This distinction drives dlq_only vs retry_with_backoff on RESERVATION_FAILED.
*
* modelService.call:
* - Throws a DOMException with name === "AbortError" on AbortSignal cancellation.
* - Throws a typed ModelServiceError (.statusCode, .code) for all service-layer failures.
* - Does NOT enforce its own timeout. AbortSignal is the only timeout mechanism.
* - MUST normalise abort errors to DOMException "AbortError" across all runtimes
* (Node fetch, undici, browser, etc.) before throwing.
*
* dlqService.enqueue:
* - At-least-once delivery.
* - Consumers must be idempotent on (type, jobId, schemaVersion).
*
* jobStateStore.claim(jobId, ttlMs):
* - Atomically creates a CLAIMED lease if no record exists for jobId.
* - Throws JobAlreadyClaimedError (typed class) if a live record exists.
* - Lease expires after ttlMs. Expired leases are recoverable by a sweeper.
* - The sweeper owns recovery of jobs abandoned in any intermediate state.
*
* jobStateStore.transition(jobId, from, to):
* - CAS: throws InvalidTransitionError if current state !== from.
* - Throws StoreUnavailableError on infrastructure failures.
* - Only valid state machine paths can execute.
*
* ─── Job state machine ───────────────────────────────────────────────────────
*
* [absent] ──claim()──────────────► CLAIMED
* CLAIMED ──────────────────────► RESERVED
* CLAIMED ──(reserve failure)───► RESERVATION_FAILED (sweeper may retry)
* RESERVED ──────────────────────► INFERENCE_RUNNING
* INFERENCE_RUNNING ─────────────────────► INFERENCE_SUCCEEDED
* INFERENCE_RUNNING ─(clean model fail)──► RELEASED
* INFERENCE_RUNNING ─(release failed)────► RECONCILIATION_REQUIRED
* INFERENCE_SUCCEEDED ───────────────────► CAPTURED
* INFERENCE_SUCCEEDED ─(capture failed)──► RECONCILIATION_REQUIRED
*
* ─── Claim TTL sizing ────────────────────────────────────────────────────────
*
* CLAIM_LEASE_TTL_MS (15 min) is intentionally larger than the worst-case saga
* wall-clock time:
* MAX_TIMEOUT_MS (2 min) + billing reserve + capture latency (est. ~1 min combined)
* + retry jitter headroom (est. ~2 min) = ~5 min typical worst case.
* 15 min gives 3× headroom before a sweeper treats a CLAIMED record as abandoned.
* Revisit if model timeout ceiling or billing SLA changes significantly.
*
* ─── Recovery model ──────────────────────────────────────────────────────────
*
* CLAIMED + expired lease → sweeper resets to absent; job is retryable.
* RESERVATION_FAILED → sweeper may retry or escalate after N attempts.
* RECONCILIATION_REQUIRED → DLQ worker retries release/capture; marks RELEASED
* or CAPTURED on success; escalates to ops on failure.
* INFERENCE_RUNNING + stale → sweeper verifies model idempotency then transitions.
* INFERENCE_SUCCEEDED + stale → sweeper retries capture before reservation expiry.
*
* ─── billingPending vs billingUnresolved ─────────────────────────────────────
*
* These are deliberately asymmetric and serve different callers:
* billingPending (on InferenceResult) — caller got a successful result but billing
* reconciliation is pending. UX can proceed; ops will reconcile.
* billingUnresolved (on InferenceError) — caller got an error AND billing state is
* unresolved. Caller must not surface a result and should alert.
*/
import { z } from "zod";
import { billingService, BillingRejectedError } from "./services/billingService";
import { modelService } from "./services/modelService";
import { dlqService } from "./services/dlqService";
import {
jobStateStore,
JobAlreadyClaimedError,
InvalidTransitionError,
} from "./services/jobStateStore";
import { metrics } from "./lib/metrics";
import { loggerFactory } from "./lib/logger";
import { computeExpectedCost } from "./lib/costEstimator";
import { ALLOWED_MODEL_IDS, type AllowedModelId } from "./config/models";
// ALLOWED_MODEL_IDS must be declared as a const-asserted tuple at source:
// export const ALLOWED_MODEL_IDS = ["gpt-4o", "claude-3-5-sonnet"] as const;
// export type AllowedModelId = typeof ALLOWED_MODEL_IDS[number];
// ---------------------------------------------------------------------------
// Constants
// ---------------------------------------------------------------------------
const MIN_TIMEOUT_MS = 1_000;
const MAX_TIMEOUT_MS = 120_000; // 2 minutes
const DEFAULT_TIMEOUT_MS = 30_000;
/**
* Claim lease TTL. Intentionally sized at 3× the worst-case saga wall-clock time.
* See "Claim TTL sizing" in the header before changing this value.
*/
const CLAIM_LEASE_TTL_MS = 15 * 60 * 1_000; // 15 minutes
// ---------------------------------------------------------------------------
// Runtime payload validation (Zod)
// ---------------------------------------------------------------------------
const InferencePayloadSchema = z.object({
/** Must be one of the server-side allowlisted values (const-asserted tuple). */
modelId: z.enum(ALLOWED_MODEL_IDS),
/**
* ESTIMATE for pre-flight cost reservation ONLY.
* Actual billing uses result.usage from the model response.
* Must not be treated as authoritative for any other purpose.
*/
promptTokens: z
.number()
.int("promptTokens must be an integer")
.positive("promptTokens must be > 0")
.max(200_000, "promptTokens exceeds maximum supported context window"),
});
export type InferencePayload = z.infer<typeof InferencePayloadSchema>;
// ---------------------------------------------------------------------------
// Billing success policy (see header for full description)
// ---------------------------------------------------------------------------
export type BillingPolicy = "strict_billing" | "best_effort_billing";
export interface RunInferenceOptions {
/** @default "strict_billing" */
billingPolicy?: BillingPolicy;
/**
* Model call abort timeout in milliseconds. Must be [1_000, 120_000].
* @default 30_000
*/
timeoutMs?: number;
}
// ---------------------------------------------------------------------------
// Public types
// ---------------------------------------------------------------------------
export interface ModelUsage {
promptTokens: number;
completionTokens: number;
totalTokens: number;
}
export interface ModelResult {
usage: ModelUsage;
// ...extend as needed
}
export interface InferenceResult {
success: true;
data: ModelResult;
/**
* True when billingPolicy = "best_effort_billing" and capture failed.
* Caller received a result; billing reconciliation is pending via DLQ.
* See "billingPending vs billingUnresolved" in the header.
*/
billingPending?: boolean;
}
// ---------------------------------------------------------------------------
// Error codes
// ---------------------------------------------------------------------------
export type ErrorCode =
| "VALIDATION_FAILED" // bad inputs — no_retry
| "DUPLICATE_JOB" // atomic claim rejected — no_retry
| "INFRA_UNAVAILABLE" // state store or billing infra down — retry_with_backoff
| "STATE_CONFLICT" // CAS transition violated state machine invariant — no_retry + page
| "RESERVATION_FAILED" // billing business rejection (insufficient credits etc.) — dlq_only
| "MODEL_TIMEOUT" // AbortSignal fired — retry_with_backoff
| "MODEL_RATE_LIMITED" // 429 — retry_with_backoff
| "MODEL_SERVER_ERROR" // 5xx / connection reset — retry_with_backoff
| "MODEL_CLIENT_ERROR" // 4xx (not 429) — no_retry
| "CAPTURE_FAILED"; // billing capture rejected — dlq_only
export type RetryStrategy = "no_retry" | "retry_with_backoff" | "dlq_only";
// ---------------------------------------------------------------------------
// Typed error hierarchy
// ---------------------------------------------------------------------------
export class InferenceError extends Error {
public readonly errorCode: ErrorCode;
public readonly jobId: string;
public readonly retryStrategy: RetryStrategy;
/**
* True whenever billing state is unresolved alongside this error:
* - Model failed AND billing release also failed (credits stuck reserved), OR
* - Capture failed under strict_billing (credits reserved, not released or captured).
* Primary errorCode is always the original business failure. This is a separate signal.
* See "billingPending vs billingUnresolved" in the header.
*/
public readonly billingUnresolved: boolean;
constructor(
message: string,
errorCode: ErrorCode,
jobId: string,
retryStrategy: RetryStrategy,
billingUnresolved = false,
options?: ErrorOptions,
) {
super(message, options);
Object.setPrototypeOf(this, new.target.prototype);
this.name = "InferenceError";
this.errorCode = errorCode;
this.jobId = jobId;
this.retryStrategy = retryStrategy;
this.billingUnresolved = billingUnresolved;
}
}
export class BillingCompensationError extends Error {
public readonly jobId: string;
public readonly originalErrorCode: ErrorCode;
constructor(jobId: string, originalErrorCode: ErrorCode, options?: ErrorOptions) {
super(`Billing release failed for job ${jobId}; pushed to DLQ`, options);
Object.setPrototypeOf(this, new.target.prototype);
this.name = "BillingCompensationError";
this.jobId = jobId;
this.originalErrorCode = originalErrorCode;
}
}
// ---------------------------------------------------------------------------
// Versioned, typed DLQ payloads
// ---------------------------------------------------------------------------
interface DlqBase {
schemaVersion: 1;
occurredAt: string; // ISO-8601
service: "inference-saga";
jobId: string;
userId: string;
modelId: AllowedModelId;
environment: string;
retriable: boolean;
}
interface ReleaseFailed extends DlqBase {
type: "RELEASE_FAILED";
reservationId: string;
modelErrorCode: ErrorCode;
releaseError: ErrorMeta;
}
interface CaptureFailed extends DlqBase {
type: "CAPTURE_FAILED";
reservationId: string;
actualUsage: ModelUsage;
captureError: ErrorMeta;
}
interface TransitionFailed extends DlqBase {
type: "TRANSITION_FAILED";
attemptedFrom: JobState | null;
attemptedTo: JobState;
transitionError: ErrorMeta;
}
export type DlqPayload = ReleaseFailed | CaptureFailed | TransitionFailed;
// ---------------------------------------------------------------------------
// Job state machine (see header for full transition diagram)
// ---------------------------------------------------------------------------
export type JobState =
| "CLAIMED"
| "RESERVED"
| "RESERVATION_FAILED"
| "INFERENCE_RUNNING"
| "INFERENCE_SUCCEEDED"
| "CAPTURED"
| "RELEASED"
| "RECONCILIATION_REQUIRED";
// ---------------------------------------------------------------------------
// Metrics labels — fully typed, no `as any`
// ---------------------------------------------------------------------------
interface MetricLabels {
modelId: AllowedModelId;
billingPolicy: BillingPolicy;
environment: string;
errorCode?: ErrorCode;
dlqType?: DlqPayload["type"];
fromState?: JobState;
toState?: JobState;
}
// Subset for logger interface typing — avoids depending on loose logger signatures
type LogSeverity = "info" | "warn" | "error" | "critical";
interface StructuredLogger {
info(msg: string, meta?: Record<string, unknown>): void;
warn(msg: string, meta?: Record<string, unknown>): void;
error(msg: string, meta?: Record<string, unknown>): void;
critical(msg: string, meta?: Record<string, unknown>): void;
log(severity: LogSeverity, msg: string, meta?: Record<string, unknown>): void;
}
// ---------------------------------------------------------------------------
// Core saga
// ---------------------------------------------------------------------------
export async function runInferenceJob(
userId: string,
payload: InferencePayload,
jobId: string,
options: RunInferenceOptions = {},
): Promise<InferenceResult> {
// ── 1. Validate identity fields and options ───────────────────────────────
if (!userId?.trim()) {
throw new InferenceError("Missing userId", "VALIDATION_FAILED", jobId ?? "UNKNOWN", "no_retry");
}
if (!jobId?.trim()) {
throw new InferenceError("Missing jobId", "VALIDATION_FAILED", "UNKNOWN", "no_retry");
}
const { billingPolicy = "strict_billing" } = options;
const timeoutMs = resolveTimeout(options.timeoutMs, jobId);
// ── 2. Validate payload (Zod) ─────────────────────────────────────────────
const parseResult = InferencePayloadSchema.safeParse(payload);
if (!parseResult.success) {
const issues = parseResult.error.issues.map(i => ({ path: i.path.join("."), message: i.message }));
loggerFactory.create({ userId, jobId }).warn("Payload validation failed", {
evt: "VALIDATION_FAILED", issues,
});
throw new InferenceError(
"Payload validation failed", "VALIDATION_FAILED", jobId, "no_retry",
false, { cause: new Error(JSON.stringify(issues)) },
);
}
const validPayload = parseResult.data;
const env = resolveEnvironment();
const baseLabels: MetricLabels = { modelId: validPayload.modelId, billingPolicy, environment: env };
// loggerFactory.create() must return StructuredLogger. If it does not, update the
// factory's return type rather than casting here — the interface is the contract.
const logger: StructuredLogger = loggerFactory.create({ userId, jobId, modelId: validPayload.modelId, billingPolicy, environment: env });
// ── 3. Atomic duplicate-execution guard ───────────────────────────────────
try {
await jobStateStore.claim(jobId, CLAIM_LEASE_TTL_MS);
} catch (claimErr) {
if (claimErr instanceof JobAlreadyClaimedError) {
logger.warn("Duplicate execution blocked via atomic claim", { evt: "DUPLICATE_JOB_BLOCKED" });
throw new InferenceError("Job already claimed", "DUPLICATE_JOB", jobId, "no_retry");
}
logger.error("State store unavailable on claim", { evt: "CLAIM_INFRA_FAILED", error: toErrorMeta(claimErr) });
throw new InferenceError(
"Job state store unavailable", "INFRA_UNAVAILABLE", jobId, "retry_with_backoff",
false, { cause: claimErr },
);
}
// ── 4. Server-side cost estimation ────────────────────────────────────────
const expectedCost = computeExpectedCost(validPayload);
// ── 5. Idempotent reservation ─────────────────────────────────────────────
let reservation: Awaited<ReturnType<typeof billingService.reserve>>;
const reserveStart = performanceNow();
try {
reservation = await billingService.reserve({
userId,
amount: expectedCost,
idempotencyKey: `res_${jobId}`,
});
await safeTransition(jobId, "CLAIMED", "RESERVED", logger, baseLabels, validPayload.modelId, userId, env);
emitTiming("billing.reserve.duration_ms", reserveStart, baseLabels);
metrics.increment("billing.reserve.success", baseLabels);
logger.info("Credits reserved", {
evt: "CREDITS_RESERVED", reservationId: reservation.id,
expectedCost, currency: "credits", durationMs: performanceNow() - reserveStart,
});
} catch (err) {
if (err instanceof InferenceError) throw err; // re-throw from safeTransition
const isBillingRejection = err instanceof BillingRejectedError;
metrics.increment("billing.reserve.failure", { ...baseLabels, errorCode: isBillingRejection ? "RESERVATION_FAILED" : "INFRA_UNAVAILABLE" });
logger.error("Reservation failed", {
evt: "CREDITS_RESERVE_FAILED", isBillingRejection,
error: toErrorMeta(err), durationMs: performanceNow() - reserveStart,
});
// Mark job state through safeTransition (consistent with all other transition sites).
// If this transition also fails, safeTransition routes it to DLQ and throws — the
// CLAIMED → RESERVATION_FAILED transition is treated as a first-class failure here
// because at this point no billing state exists to protect; job TTL expiry would
// also recover a stuck CLAIMED record, but explicit state is preferable for observability.
await safeTransition(jobId, "CLAIMED", "RESERVATION_FAILED", logger, baseLabels, validPayload.modelId, userId, env)
.catch(() => {
// safeTransition already logged and DLQ-enqueued its own failure.
// We swallow here only to ensure the original reservation error is what throws.
});
throw new InferenceError(
"Could not reserve credits",
isBillingRejection ? "RESERVATION_FAILED" : "INFRA_UNAVAILABLE",
jobId,
isBillingRejection ? "dlq_only" : "retry_with_backoff",
false,
{ cause: err },
);
}
// ── 6. Model call with AbortController-enforced timeout ───────────────────
let result: ModelResult;
const abortReason = new Error(`Model call timed out after ${timeoutMs}ms`);
const controller = new AbortController();
const timeoutHandle = setTimeout(() => controller.abort(abortReason), timeoutMs);
const modelStart = performanceNow();
try {
await safeTransition(jobId, "RESERVED", "INFERENCE_RUNNING", logger, baseLabels, validPayload.modelId, userId, env);
result = await modelService.call(validPayload, { requestId: jobId, signal: controller.signal });
await safeTransition(jobId, "INFERENCE_RUNNING", "INFERENCE_SUCCEEDED", logger, baseLabels, validPayload.modelId, userId, env);
emitTiming("model.call.duration_ms", modelStart, baseLabels);
metrics.increment("model.call.success", baseLabels);
logger.info("Model call succeeded", {
evt: "MODEL_CALL_SUCCEEDED", usage: result.usage, durationMs: performanceNow() - modelStart,
});
} catch (modelErr) {
if (modelErr instanceof InferenceError) throw modelErr;
const errorCode = classifyModelError(modelErr);
emitTiming("model.call.duration_ms", modelStart, { ...baseLabels, errorCode });
metrics.increment("model.call.failure", { ...baseLabels, errorCode });
logger.error("Model call failed — releasing reservation", {
evt: "MODEL_CALL_FAILED", errorCode, error: toErrorMeta(modelErr), durationMs: performanceNow() - modelStart,
});
const compensationErr = await releaseOrCompensate({
jobId, userId, modelId: validPayload.modelId, env,
reservationId: reservation.id, originalErrorCode: errorCode, logger, baseLabels,
});
throw new InferenceError(
"Model call failed", errorCode, jobId, retryStrategyFor(errorCode),
compensationErr !== null,
{ cause: compensationErr ?? modelErr },
);
} finally {
clearTimeout(timeoutHandle);
}
// ── 7. Capture — charge actual usage only ────────────────────────────────
const captureStart = performanceNow();
try {
await billingService.capture({
reservationId: reservation.id,
actualUsage: result.usage,
idempotencyKey: `cap_${jobId}`,
});
await safeTransition(jobId, "INFERENCE_SUCCEEDED", "CAPTURED", logger, baseLabels, validPayload.modelId, userId, env);
emitTiming("billing.capture.duration_ms", captureStart, baseLabels);
metrics.increment("billing.capture.success", baseLabels);
logger.info("Credits captured", {
evt: "CREDITS_CAPTURED", actualUsage: result.usage, durationMs: performanceNow() - captureStart,
});
} catch (captureErr) {
if (captureErr instanceof InferenceError) throw captureErr;
metrics.increment("billing.capture.failure", baseLabels);
await safeTransition(jobId, "INFERENCE_SUCCEEDED", "RECONCILIATION_REQUIRED", logger, baseLabels, validPayload.modelId, userId, env);
const enqueued = await safeEnqueueDlq({
schemaVersion: 1,
type: "CAPTURE_FAILED",
occurredAt: new Date().toISOString(),
service: "inference-saga",
jobId, userId, environment: env,
modelId: validPayload.modelId,
reservationId: reservation.id,
actualUsage: result.usage,
captureError: toErrorMeta(captureErr),
retriable: true,
}, logger, { ...baseLabels, dlqType: "CAPTURE_FAILED" });
logger.error("Capture failed", {
evt: "CREDITS_CAPTURE_FAILED", error: toErrorMeta(captureErr),
dlqEnqueued: enqueued, durationMs: performanceNow() - captureStart,
});
if (billingPolicy === "strict_billing") {
// Do NOT return result. Model output must not be delivered without confirmed billing.
throw new InferenceError(
"Model succeeded but billing capture failed", "CAPTURE_FAILED", jobId,
"dlq_only", true, { cause: captureErr },
);
}
logger.warn("Returning success under best_effort_billing; billing reconciliation pending", {
evt: "BILLING_PENDING_RECONCILIATION", jobId,
});
return { success: true, data: result, billingPending: true };
}
return { success: true, data: result };
}
// ---------------------------------------------------------------------------
// Compensation helper
// ---------------------------------------------------------------------------
/**
* Attempts an idempotent release of the reservation.
* Returns null on clean release.
* Returns BillingCompensationError if release fails (never throws).
* Caller surfaces this via InferenceError.billingUnresolved.
*/
async function releaseOrCompensate({
jobId, userId, modelId, env, reservationId, originalErrorCode, logger, baseLabels,
}: {
jobId: string;
userId: string;
modelId: AllowedModelId;
env: string;
reservationId: string;
originalErrorCode: ErrorCode;
logger: StructuredLogger;
baseLabels: MetricLabels;
}): Promise<BillingCompensationError | null> {
const releaseStart = performanceNow();
try {
await billingService.release({ reservationId, idempotencyKey: `rel_${jobId}` });
await safeTransition(jobId, "INFERENCE_RUNNING", "RELEASED", logger, baseLabels, modelId, userId, env);
emitTiming("billing.release.duration_ms", releaseStart, baseLabels);
metrics.increment("billing.release.success", baseLabels);
logger.info("Reservation released", {
evt: "CREDITS_RELEASED", reservationId, durationMs: performanceNow() - releaseStart,
});
return null;
} catch (releaseErr) {
metrics.increment("billing.release.failure", { ...baseLabels, errorCode: originalErrorCode });
// Intentionally best-effort here — unlike elsewhere in the saga, we do NOT route this
// through safeTransition. The reason: we are already inside a compensation path where
// billing is unresolved and a DLQ record is about to be enqueued. Escalating a secondary
// transition failure to STATE_CONFLICT or INFRA_UNAVAILABLE would obscure the original
// billing failure and could interrupt DLQ enqueue below. The DLQ record IS the durable
// reconciliation signal. State store consistency here is desirable but not load-bearing.
await jobStateStore.transition(jobId, "INFERENCE_RUNNING", "RECONCILIATION_REQUIRED").catch(() => {});
const compensationError = new BillingCompensationError(jobId, originalErrorCode, { cause: releaseErr });
await safeEnqueueDlq({
schemaVersion: 1,
type: "RELEASE_FAILED",
occurredAt: new Date().toISOString(),
service: "inference-saga",
jobId, userId, environment: env, modelId, reservationId,
modelErrorCode: originalErrorCode,
releaseError: toErrorMeta(releaseErr),
retriable: true,
}, logger, { ...baseLabels, dlqType: "RELEASE_FAILED" });
logger.error("Release failed — pushed to DLQ", {
evt: "CREDITS_RELEASE_FAILED", originalErrorCode,
releaseError: toErrorMeta(releaseErr), durationMs: performanceNow() - releaseStart,
});
return compensationError;
}
}
// ---------------------------------------------------------------------------
// State transition helper
//
// Wraps jobStateStore.transition with structured error handling.
//
// Two failure modes, deliberately distinct:
//
// InvalidTransitionError → STATE_CONFLICT / no_retry
// The state machine invariant was violated. This implies a bug, a concurrent
// worker that bypassed the claim, or a partially failed prior run. It is a
// paging-severity event and must NOT be retried — retrying would re-execute
// business logic against corrupted state.
//
// StoreUnavailableError → INFRA_UNAVAILABLE / retry_with_backoff
// The store is down. The saga step has not executed. Safe to retry.
//
// Both paths push a TRANSITION_FAILED DLQ record for reconciliation tracking.
// ---------------------------------------------------------------------------
async function safeTransition(
jobId: string,
from: JobState,
to: JobState,
logger: StructuredLogger,
baseLabels: MetricLabels,
modelId: AllowedModelId,
userId: string,
env: string,
): Promise<void> {
try {
await jobStateStore.transition(jobId, from, to);
} catch (transitionErr) {
const isConflict = transitionErr instanceof InvalidTransitionError;
const errorCode: ErrorCode = isConflict ? "STATE_CONFLICT" : "INFRA_UNAVAILABLE";
const retryStrategy: RetryStrategy = isConflict ? "no_retry" : "retry_with_backoff";
const labels: MetricLabels = { ...baseLabels, errorCode, fromState: from, toState: to };
metrics.increment("job_state.transition.failure", labels);
// STATE_CONFLICT is paging severity — it means the saga is in an unexpected state.
if (isConflict) {
logger.critical("State machine invariant violated — STATE_CONFLICT", {
evt: "STATE_TRANSITION_CONFLICT", from, to, error: toErrorMeta(transitionErr),
});
} else {
logger.error("State transition failed — INFRA_UNAVAILABLE", {
evt: "STATE_TRANSITION_FAILED", from, to, error: toErrorMeta(transitionErr),
});
}
await safeEnqueueDlq({
schemaVersion: 1,
type: "TRANSITION_FAILED",
occurredAt: new Date().toISOString(),
service: "inference-saga",
jobId, userId, modelId, environment: env,
attemptedFrom: from,
attemptedTo: to,
transitionError: toErrorMeta(transitionErr),
retriable: !isConflict,
}, logger, { ...baseLabels, dlqType: "TRANSITION_FAILED" });
throw new InferenceError(
`State transition ${from} → ${to} failed`,
errorCode, jobId, retryStrategy,
);
}
}
// ---------------------------------------------------------------------------
// DLQ helper
// ---------------------------------------------------------------------------
/**
* Best-effort DLQ push. Never throws.
* Returns true on success; false on failure (callers annotate logs).
*/
async function safeEnqueueDlq(
payload: DlqPayload,
logger: StructuredLogger,
labels: MetricLabels,
): Promise<boolean> {
try {
await dlqService.enqueue(payload);
metrics.increment("dlq.enqueue.success", { ...labels, dlqType: payload.type });
return true;
} catch (dlqErr) {
logger.critical("DLQ_ENQUEUE_FAILED — MANUAL RECONCILIATION REQUIRED", {
evt: "DLQ_ENQUEUE_FAILED", jobId: payload.jobId, innerType: payload.type,
dlqError: toErrorMeta(dlqErr),
});
metrics.increment("dlq.enqueue.failure", { ...labels, dlqType: payload.type });
return false;
}
}
// ---------------------------------------------------------------------------
// Error classification — structured, not regex-on-message
// ---------------------------------------------------------------------------
/** Normalized error shape the modelService SDK must throw. */
export interface ModelServiceError extends Error {
statusCode?: number;
code?: string;
}
function classifyModelError(err: unknown): ErrorCode {
if (err instanceof Error && err.name === "AbortError") return "MODEL_TIMEOUT";
const e = err as ModelServiceError;
const status = e?.statusCode;
if (status === 429) return "MODEL_RATE_LIMITED";
if (status !== undefined && status >= 500) return "MODEL_SERVER_ERROR";
if (status !== undefined && status >= 400) return "MODEL_CLIENT_ERROR";
if (e?.code === "ECONNRESET" ||
e?.code === "ENOTFOUND") return "MODEL_SERVER_ERROR";
return "MODEL_SERVER_ERROR"; // safe default: assume transient
}
function retryStrategyFor(code: ErrorCode): RetryStrategy {
switch (code) {
case "MODEL_TIMEOUT":
case "MODEL_RATE_LIMITED":
case "MODEL_SERVER_ERROR":
case "INFRA_UNAVAILABLE":
return "retry_with_backoff";
case "MODEL_CLIENT_ERROR":
case "VALIDATION_FAILED":
case "DUPLICATE_JOB":
case "STATE_CONFLICT":
return "no_retry";
case "CAPTURE_FAILED":
case "RESERVATION_FAILED":
return "dlq_only";
}
}
// ---------------------------------------------------------------------------
// Utilities
// ---------------------------------------------------------------------------
/** Structured error metadata for logs and DLQ payloads. Safe to serialize. */
export interface ErrorMeta {
message: string;
name: string;
code?: string;
status?: number;
hasStack: boolean;
}
function toErrorMeta(err: unknown): ErrorMeta {
if (err instanceof Error) {
const e = err as ModelServiceError;
return { message: e.message, name: e.name, code: e.code, status: e.statusCode, hasStack: Boolean(e.stack) };
}
return { message: String(err), name: "UnknownError", hasStack: false };
}
/** Monotonic clock for duration measurement; falls back to Date.now(). */
function performanceNow(): number {
return typeof performance !== "undefined" ? performance.now() : Date.now();
}
function emitTiming(name: string, startMs: number, labels: MetricLabels): void {
metrics.timing(name, performanceNow() - startMs, labels);
}
/** Validates timeoutMs. Rejects NaN, negatives, zero, and out-of-range values. */
function resolveTimeout(timeoutMs: number | undefined, jobId: string): number {
const t = timeoutMs ?? DEFAULT_TIMEOUT_MS;
if (!Number.isFinite(t) || t < MIN_TIMEOUT_MS || t > MAX_TIMEOUT_MS) {
throw new InferenceError(
`timeoutMs must be between ${MIN_TIMEOUT_MS} and ${MAX_TIMEOUT_MS}ms; got ${t}`,
"VALIDATION_FAILED", jobId, "no_retry",
);
}
return t;
}
/**
* Resolves the runtime environment label.
* Prefer an explicit app config (e.g. appConfig.environment) over process.env where possible.
*/
function resolveEnvironment(): string {
return process.env.APP_ENV ?? process.env.NODE_ENV ?? "unknown";
}
Example 03
Realtime WebRTC Speech Gate
/**
* =============================================================================
* Realtime WebRTC Speech Gate
* =============================================================================
*
* OVERVIEW
* -----------------------------------------------------------------------------
* This module implements a production-grade **speech admission gate** for
* realtime WebRTC audio pipelines used in conversational AI systems.
*
* Its responsibility is to decide — frame by frame — whether microphone audio
* should be:
*
* • admitted immediately to the AI model
* • buffered temporarily
* • queued due to backpressure
* • dropped (silence / overflow / invalid)
*
* The gate sits **between the microphone capture pipeline and the upstream
* inference transport**, ensuring that only high-quality, well-timed speech
* frames reach the model.
*
*
* WHY THIS EXISTS
* -----------------------------------------------------------------------------
* Real-time voice systems face several difficult constraints:
*
* 1. **Speech vs noise discrimination**
* Microphones continuously emit frames even when nobody is speaking.
* Sending every frame to the model wastes compute and increases latency.
*
* 2. **Assistant playback echo**
* When the assistant is speaking, microphones often pick up the assistant’s
* own audio output. Without protection this creates echo loops and causes
* the model to respond to itself.
*
* 3. **Turn-taking correctness**
* Human speech contains pauses, hesitations, and breathing. The system must
* avoid prematurely cutting off speech while also minimizing latency.
*
* 4. **Network backpressure**
* Model uplinks may temporarily stall. The system must maintain bounded
* queues and deterministic overflow behaviour.
*
* 5. **Realtime guarantees**
* The audio pipeline must remain stable even if observability hooks,
* downstream transports, or lane resolvers fail.
*
*
* ARCHITECTURAL ROLE
* -----------------------------------------------------------------------------
* The SpeechGate sits in the audio pipeline as:
*
* Microphone
* │
* ▼
* Audio Frame Extraction
* │
* ▼
* SpeechGate ← (this module)
* │
* ▼
* Model Uplink Transport
* │
* ▼
* Realtime Model
*
* The gate is therefore responsible for:
*
* • Voice activity detection (VAD)
* • Turn state management
* • Assistant playback protection
* • Frame buffering and replay
* • Queue backpressure control
* • Deterministic uplink scheduling
*
*
* CORE SUBSYSTEMS
* -----------------------------------------------------------------------------
*
* 1. Voice Activity Detection (VAD)
*
* Speech detection is based on **band-limited energy analysis** across the
* human speech frequency band (typically 300Hz–3400Hz). Energy is compared
* against an adaptive noise floor using a configurable ratio threshold.
*
* Noise floor adapts using exponential smoothing:
*
* noiseFloor = α * noiseFloor + (1-α) * bandEnergy
*
* Frames are classified as speech when:
*
* bandEnergy ≥ minBandEnergy
* AND
* bandEnergy / noiseFloor ≥ energyThresholdRatio
*
*
* 2. Hangover Window
*
* Human speech often contains micro-pauses. To avoid cutting speech early,
* a short **hangover window** continues admitting frames after speech energy
* drops below threshold.
*
*
* 3. Assistant Playback Barrier
*
* While the AI assistant is speaking, the gate **closes admission** to avoid
* echo and feedback loops.
*
* During this time frames may be:
*
* • buffered
* • dropped (if overflow)
* • replayed once the barrier opens
*
* Barrier closure is triggered by:
*
* • response.created
* • response.output_audio events
* • response.done tail hold window
*
*
* 4. Held Audio Buffer
*
* When the assistant barrier is active, speech frames can be temporarily
* stored in a bounded buffer.
*
* Once the barrier opens, the buffered frames are flushed upstream in a
* single batch to preserve conversational continuity.
*
*
* 5. Uplink Queue
*
* To prevent overload of the inference transport, frames pass through a
* bounded queue with configurable overflow policies:
*
* drop_oldest
* drop_newest
* reject
*
*
* 6. Deterministic State Machine
*
* The gate tracks turn state:
*
* idle
* speaking
* hangover
* assistant_blocked
*
* Transitions are emitted as observability events for telemetry and
* debugging of conversational timing.
*
*
* RELIABILITY PRINCIPLES
* -----------------------------------------------------------------------------
*
* The implementation follows several safety guarantees:
*
* • Observability must never break realtime audio flow.
* • Invalid frames are rejected early with explicit error types.
* • All queues and buffers are strictly bounded.
* • Uplink calls are protected with timeouts.
* • Barrier flush generations prevent duplicate replays.
*
*
* PERFORMANCE CHARACTERISTICS
* -----------------------------------------------------------------------------
*
* Designed for realtime WebRTC audio pipelines:
*
* Typical frame size: 10–20 ms
* Processing overhead: < 1 ms
* Memory usage: bounded buffers only
* Latency impact: negligible
*
*
* RESULT
* -----------------------------------------------------------------------------
*
* The SpeechGate ensures that conversational AI systems:
*
* • avoid echo loops
* • avoid sending silence to models
* • maintain natural turn-taking
* • remain stable under network backpressure
*
* while preserving the strict latency requirements of realtime speech
* interaction.
*
* =============================================================================
*/// realtimeWebRTCSpeechGate.ts
import type {
BarrierSnapshot,
Frame,
GateSnapshot,
HeldBufferSnapshot,
RealtimeServerEvent,
SpeechGate,
SpeechGateConfig,
SpeechGateDependencies,
SpeechGateEvent,
SpeechGateMetrics,
SpeechLane,
TurnState,
UplinkResult,
} from "./types";
const DEFAULTS: SpeechGateConfig = {
speechBandLowHz: 300,
speechBandHighHz: 3400,
energyThresholdRatio: 2.2,
noiseAdaptationAlpha: 0.96,
minBandEnergy: 6,
hangoverMs: 220,
assistantTailHoldMs: 180,
maxHoldBufferMs: 2200,
maxHeldFrames: 80,
uplinkTimeoutMs: 120,
maxQueueDepth: 4,
maxConcurrentUplinks: 1,
overflowPolicy: "drop_oldest",
emitStateTransitions: true,
streamLabel: "default",
bufferDuringAssistantPlayback: true,
};
class SpeechGateError extends Error {
readonly code: string;
constructor(code: string, message: string) {
super(message);
this.name = "SpeechGateError";
this.code = code;
}
}
class InvalidFrameError extends SpeechGateError {
constructor(message: string) {
super("INVALID_FRAME", message);
}
}
class UplinkTimeoutError extends SpeechGateError {
constructor(timeoutMs: number) {
super("UPLINK_TIMEOUT", `Uplink timed out after ${timeoutMs}ms`);
}
}
function nowMs(): number {
return Date.now();
}
function emit(
onEvent: SpeechGateDependencies["onEvent"],
event: SpeechGateEvent
): void {
if (!onEvent) return;
try {
onEvent(event);
} catch {
// Observability must not break realtime flow.
}
}
function emitError(
onEvent: SpeechGateDependencies["onEvent"],
streamLabel: string,
code: string,
message: string,
context?: Record<string, unknown>
): void {
emit(onEvent, {
type: "speech_gate.error",
atMs: nowMs(),
streamLabel,
code,
message,
context,
});
}
function withTimeout<T>(promise: Promise<T>, timeoutMs: number): Promise<T> {
let timerId: ReturnType<typeof setTimeout> | undefined;
return new Promise<T>((resolve, reject) => {
timerId = setTimeout(() => {
reject(new UplinkTimeoutError(timeoutMs));
}, timeoutMs);
promise.then(
(value) => {
if (timerId) clearTimeout(timerId);
resolve(value);
},
(error) => {
if (timerId) clearTimeout(timerId);
reject(error);
}
);
});
}
function validateFrame(frame: Frame): void {
if (!frame || typeof frame !== "object") {
throw new InvalidFrameError("Frame must be an object");
}
if (
!Array.isArray(frame.freqData) &&
!(frame.freqData instanceof Uint8Array)
) {
throw new InvalidFrameError("freqData must be an array or Uint8Array");
}
if (!Number.isFinite(frame.fftSize) || frame.fftSize <= 0) {
throw new InvalidFrameError("fftSize must be a positive finite number");
}
if (!Number.isFinite(frame.sampleRate) || frame.sampleRate <= 0) {
throw new InvalidFrameError("sampleRate must be a positive finite number");
}
if (
!Number.isFinite(frame.frameDurationMs) ||
frame.frameDurationMs <= 0
) {
throw new InvalidFrameError(
"frameDurationMs must be a positive finite number"
);
}
if (frame.raw == null) {
throw new InvalidFrameError("raw payload is required");
}
if (
frame.timestampMs != null &&
(!Number.isFinite(frame.timestampMs) || frame.timestampMs <= 0)
) {
throw new InvalidFrameError(
"timestampMs must be a positive finite number when provided"
);
}
}
function computeSpeechBandEnergy(
freqData: Uint8Array | number[],
fftSize: number,
sampleRate: number,
lowHz: number,
highHz: number
): number {
const binSizeHz = sampleRate / fftSize;
const startBin = Math.max(0, Math.floor(lowHz / binSizeHz));
const endBin = Math.min(freqData.length - 1, Math.floor(highHz / binSizeHz));
if (endBin < startBin) {
return 0;
}
let sum = 0;
let count = 0;
for (let i = startBin; i <= endBin; i++) {
sum += Number(freqData[i]) || 0;
count++;
}
return count > 0 ? sum / count : 0;
}
function createHeldAudioBuffer(config: SpeechGateConfig) {
const frames: Frame[] = [];
let totalDurationMs = 0;
let flushGeneration = 0;
function snapshot(): HeldBufferSnapshot {
return {
frameCount: frames.length,
totalDurationMs,
flushGeneration,
};
}
function push(frame: Frame): boolean {
const incomingDuration = frame.frameDurationMs;
if (config.overflowPolicy === "reject") {
if (
frames.length >= config.maxHeldFrames ||
totalDurationMs + incomingDuration > config.maxHoldBufferMs
) {
return false;
}
}
while (
frames.length > 0 &&
(frames.length >= config.maxHeldFrames ||
totalDurationMs + incomingDuration > config.maxHoldBufferMs)
) {
if (config.overflowPolicy === "drop_oldest") {
const dropped = frames.shift();
totalDurationMs -= dropped?.frameDurationMs ?? 0;
continue;
}
if (config.overflowPolicy === "drop_newest" || config.overflowPolicy === "reject") {
return false;
}
}
frames.push(frame);
totalDurationMs += incomingDuration;
return true;
}
function drain(): { generation: number; frames: Frame[] } {
const batch = frames.splice(0, frames.length);
totalDurationMs = 0;
flushGeneration += 1;
return {
generation: flushGeneration,
frames: batch,
};
}
function clear(): void {
frames.length = 0;
totalDurationMs = 0;
}
return {
push,
drain,
clear,
snapshot,
};
}
function createResponsePlaybackBarrier(args: {
assistantTailHoldMs: number;
streamLabel: string;
isAssistantPlaybackActive?: () => boolean;
onEvent?: (event: SpeechGateEvent) => void;
}) {
let activeResponseId: string | null = null;
let responseCreatedAtMs = 0;
let responseDoneAtMs = 0;
let lastAssistantActivityAtMs = 0;
let doneSeenForActiveResponse = false;
function noteAssistantActivity(atMs: number = nowMs()): void {
lastAssistantActivityAtMs = atMs;
}
function onRealtimeEvent(serverEvent: RealtimeServerEvent): void {
const type = String(serverEvent?.type ?? "");
const atMs = nowMs();
switch (type) {
case "response.created": {
const response =
"response" in serverEvent
? (serverEvent.response as { id?: string } | undefined)
: undefined;
activeResponseId =
response?.id ??
(typeof serverEvent.response_id === "string"
? serverEvent.response_id
: null);
responseCreatedAtMs = atMs;
responseDoneAtMs = 0;
doneSeenForActiveResponse = false;
noteAssistantActivity(atMs);
emit(args.onEvent, {
type: "speech_gate.barrier_response_created",
atMs,
streamLabel: args.streamLabel,
responseId: activeResponseId,
});
return;
}
case "response.output_audio.delta":
case "response.output_audio.done":
case "response.output_audio_transcript.delta":
case "response.output_text.delta": {
noteAssistantActivity(atMs);
return;
}
case "response.done": {
const response =
"response" in serverEvent
? (serverEvent.response as { id?: string } | undefined)
: undefined;
const responseId =
response?.id ??
(typeof serverEvent.response_id === "string"
? serverEvent.response_id
: activeResponseId);
if (activeResponseId === null || responseId === activeResponseId) {
doneSeenForActiveResponse = true;
responseDoneAtMs = atMs;
noteAssistantActivity(atMs);
emit(args.onEvent, {
type: "speech_gate.barrier_response_done",
atMs,
streamLabel: args.streamLabel,
responseId,
});
}
return;
}
case "output_audio_buffer.cleared": {
doneSeenForActiveResponse = true;
responseDoneAtMs = atMs;
noteAssistantActivity(atMs);
return;
}
default:
return;
}
}
function isClosed(referenceMs: number = nowMs()): boolean {
const playbackActive = args.isAssistantPlaybackActive?.() ?? false;
const withinTailHold =
referenceMs - lastAssistantActivityAtMs <= args.assistantTailHoldMs;
if (activeResponseId && !doneSeenForActiveResponse) return true;
if (playbackActive) return true;
if (withinTailHold) return true;
return false;
}
function markAssistantPlaybackActivity(referenceMs: number = nowMs()): void {
noteAssistantActivity(referenceMs);
}
function clearCompletedResponseIfDrained(referenceMs: number = nowMs()): boolean {
if (isClosed(referenceMs)) return false;
activeResponseId = null;
responseCreatedAtMs = 0;
responseDoneAtMs = 0;
doneSeenForActiveResponse = false;
return true;
}
function snapshot(referenceMs: number = nowMs()): BarrierSnapshot {
return {
activeResponseId,
responseCreatedAtMs,
responseDoneAtMs,
doneSeenForActiveResponse,
lastAssistantActivityAtMs,
playbackActive: args.isAssistantPlaybackActive?.() ?? false,
closed: isClosed(referenceMs),
};
}
return {
onRealtimeEvent,
isClosed,
markAssistantPlaybackActivity,
clearCompletedResponseIfDrained,
snapshot,
};
}
export function createRealtimeWebRTCSpeechGate(
options: Partial<SpeechGateConfig> = {},
deps: SpeechGateDependencies
): SpeechGate {
if (typeof deps.admitUplinkFrame !== "function") {
throw new TypeError("admitUplinkFrame must be a function");
}
if (typeof deps.resolveLane !== "function") {
throw new TypeError("resolveLane must be a function");
}
const config: SpeechGateConfig = { ...DEFAULTS, ...options };
let closed = false;
let noiseFloor = 0;
let lastSpeechAtMs = 0;
let turnState: TurnState = "idle";
const uplinkQueue: Array<{
frame: Frame;
lane: SpeechLane;
queuedAtMs: number;
}> = [];
let inflight = 0;
let flushInProgress = false;
let highestCompletedFlushGeneration = 0;
const heldAudio = createHeldAudioBuffer(config);
const barrier = createResponsePlaybackBarrier({
assistantTailHoldMs: config.assistantTailHoldMs,
streamLabel: config.streamLabel,
isAssistantPlaybackActive: deps.isAssistantPlaybackActive,
onEvent: deps.onEvent,
});
const metrics: SpeechGateMetrics = {
framesTotal: 0,
framesSpeechDetected: 0,
framesSilenceDetected: 0,
framesDroppedSilence: 0,
framesDroppedInvalid: 0,
framesDroppedGateClosed: 0,
framesDroppedQueueBackpressure: 0,
heldFramesAccepted: 0,
heldFramesDropped: 0,
heldFlushes: 0,
heldFlushFrames: 0,
framesAdmittedImmediate: 0,
framesQueued: 0,
uplinksSucceeded: 0,
uplinksErrored: 0,
uplinksTimedOut: 0,
totalUplinkLatencyMs: 0,
speechStartEvents: 0,
speechEndEvents: 0,
barrierBlockedFrames: 0,
laneResolutionFailures: 0,
};
function transitionTurnState(
nextState: TurnState,
reason: string,
frame?: Frame
): void {
if (turnState === nextState) return;
const previousState = turnState;
turnState = nextState;
if (config.emitStateTransitions) {
emit(deps.onEvent, {
type: "speech_gate.state_transition",
atMs: nowMs(),
streamLabel: config.streamLabel,
previousState,
nextState,
reason,
sessionId: frame?.sessionId ?? null,
streamId: frame?.streamId ?? null,
speakerId: frame?.speakerId ?? null,
});
}
if (nextState === "speaking") {
metrics.speechStartEvents++;
}
if (previousState === "speaking" && nextState !== "speaking") {
metrics.speechEndEvents++;
}
}
function classifySpeech(frame: Frame): {
isSpeech: boolean;
bandEnergy: number;
ratio: number;
} {
const bandEnergy = computeSpeechBandEnergy(
frame.freqData,
frame.fftSize,
frame.sampleRate,
config.speechBandLowHz,
config.speechBandHighHz
);
if (noiseFloor === 0) {
noiseFloor = Math.max(bandEnergy, 1);
} else {
noiseFloor =
config.noiseAdaptationAlpha * noiseFloor +
(1 - config.noiseAdaptationAlpha) * bandEnergy;
}
const safeNoiseFloor = Math.max(noiseFloor, 1e-6);
const ratio = bandEnergy / safeNoiseFloor;
const isSpeech =
bandEnergy >= config.minBandEnergy &&
ratio >= config.energyThresholdRatio;
return { isSpeech, bandEnergy, ratio };
}
function resolveLaneSafe(args: {
frame: Frame;
isSpeech: boolean;
speaking: boolean;
assistantBlocked: boolean;
replayedFromHoldBuffer: boolean;
}): SpeechLane | null {
try {
return deps.resolveLane(args);
} catch (error) {
metrics.laneResolutionFailures++;
emitError(
deps.onEvent,
config.streamLabel,
"LANE_RESOLUTION_FAILED",
"Lane resolution failed",
{
error,
sessionId: args.frame.sessionId ?? null,
streamId: args.frame.streamId ?? null,
speakerId: args.frame.speakerId ?? null,
}
);
return null;
}
}
async function performUplink(
frame: Frame,
lane: SpeechLane
): Promise<UplinkResult> {
const startedAtMs = nowMs();
try {
const result = await withTimeout(
deps.admitUplinkFrame(frame, lane),
config.uplinkTimeoutMs
);
const latencyMs = nowMs() - startedAtMs;
metrics.uplinksSucceeded++;
metrics.totalUplinkLatencyMs += latencyMs;
return {
status: "admitted",
reason: "speech",
lane,
latencyMs,
result,
};
} catch (error) {
const latencyMs = nowMs() - startedAtMs;
if (error instanceof UplinkTimeoutError) {
metrics.uplinksTimedOut++;
emitError(
deps.onEvent,
config.streamLabel,
error.code,
error.message,
{ lane, latencyMs }
);
return {
status: "rejected",
reason: "uplink_timeout",
lane,
latencyMs,
error,
};
}
metrics.uplinksErrored++;
emitError(
deps.onEvent,
config.streamLabel,
"UPLINK_ERROR",
"Uplink failed",
{ lane, latencyMs, error }
);
return {
status: "rejected",
reason: "uplink_error",
lane,
latencyMs,
error: error instanceof Error ? error : new Error(String(error)),
};
}
}
async function performBufferedFlush(
frames: Frame[],
lane: SpeechLane,
generation: number
): Promise<UplinkResult> {
if (frames.length === 0) {
return {
status: "noop",
reason: "empty_flush",
lane,
};
}
if (generation <= highestCompletedFlushGeneration) {
return {
status: "noop",
reason: "stale_flush_generation",
lane,
};
}
flushInProgress = true;
try {
let result: unknown;
if (typeof deps.admitBufferedFrames === "function") {
result = await withTimeout(
deps.admitBufferedFrames(frames, lane, generation),
Math.max(config.uplinkTimeoutMs * 2, 250)
);
} else {
const outputs: UplinkResult[] = [];
for (const frame of frames) {
outputs.push(await performUplink(frame, lane));
}
result = outputs;
}
highestCompletedFlushGeneration = generation;
metrics.heldFlushes++;
metrics.heldFlushFrames += frames.length;
emit(deps.onEvent, {
type: "speech_gate.held_flush_complete",
atMs: nowMs(),
streamLabel: config.streamLabel,
lane,
generation,
flushedFrames: frames.length,
});
return {
status: "flushed",
reason: "assistant_barrier_open",
lane,
generation,
flushedFrames: frames.length,
result,
};
} finally {
flushInProgress = false;
}
}
async function pumpUplinkQueue(): Promise<void> {
if (closed) return;
if (inflight >= config.maxConcurrentUplinks) return;
if (uplinkQueue.length === 0) return;
const next = uplinkQueue.shift();
if (!next) return;
inflight++;
try {
await performUplink(next.frame, next.lane);
} finally {
inflight--;
void pumpUplinkQueue();
}
}
async function maybeFlushHeldAudio(referenceFrame?: Frame): Promise<UplinkResult> {
if (flushInProgress) {
return {
status: "noop",
reason: "flush_in_progress",
lane: null,
};
}
if (barrier.isClosed()) {
return {
status: "noop",
reason: "barrier_closed",
lane: null,
};
}
barrier.clearCompletedResponseIfDrained();
const { frames, generation } = heldAudio.drain();
if (frames.length === 0) {
return {
status: "noop",
reason: "nothing_held",
lane: null,
};
}
const lane = resolveLaneSafe({
frame: referenceFrame ?? frames[0],
isSpeech: true,
speaking: true,
assistantBlocked: false,
replayedFromHoldBuffer: true,
});
if (!lane) {
return {
status: "rejected",
reason: "lane_resolution_failed",
lane: null,
};
}
return performBufferedFlush(frames, lane, generation);
}
async function processFrame(frame: Frame): Promise<UplinkResult> {
metrics.framesTotal++;
if (closed) {
metrics.framesDroppedGateClosed++;
return {
status: "rejected",
reason: "gate_closed",
lane: null,
queueDepth: uplinkQueue.length,
inflight,
};
}
try {
validateFrame(frame);
} catch (error) {
metrics.framesDroppedInvalid++;
emitError(
deps.onEvent,
config.streamLabel,
error instanceof SpeechGateError ? error.code : "INVALID_FRAME",
error instanceof Error ? error.message : "Invalid frame",
{ error }
);
return {
status: "rejected",
reason: "invalid_frame",
lane: null,
queueDepth: uplinkQueue.length,
inflight,
error: error instanceof Error ? error : new Error(String(error)),
};
}
const atMs = frame.timestampMs ?? nowMs();
const { isSpeech, bandEnergy, ratio } = classifySpeech(frame);
if (isSpeech) {
metrics.framesSpeechDetected++;
lastSpeechAtMs = atMs;
transitionTurnState("speaking", "speech_detected", frame);
} else {
metrics.framesSilenceDetected++;
}
const hangoverActive = atMs - lastSpeechAtMs <= config.hangoverMs;
if (!isSpeech && !hangoverActive) {
metrics.framesDroppedSilence++;
transitionTurnState("idle", "silence_after_hangover", frame);
return {
status: "dropped",
reason: "silence",
lane: null,
queueDepth: uplinkQueue.length,
inflight,
diagnostics: {
bandEnergy,
ratio,
hangoverActive: false,
barrier: barrier.snapshot(),
},
};
}
const lane = resolveLaneSafe({
frame,
isSpeech,
speaking: isSpeech || hangoverActive,
assistantBlocked: barrier.isClosed(),
replayedFromHoldBuffer: false,
});
if (!lane) {
return {
status: "rejected",
reason: "lane_resolution_failed",
lane: null,
queueDepth: uplinkQueue.length,
inflight,
};
}
if (barrier.isClosed()) {
metrics.barrierBlockedFrames++;
transitionTurnState("assistant_blocked", "assistant_playback_barrier", frame);
if (!config.bufferDuringAssistantPlayback) {
return {
status: "buffered",
reason: "assistant_playback_barrier",
lane,
queueDepth: uplinkQueue.length,
inflight,
held: heldAudio.snapshot(),
diagnostics: {
bandEnergy,
ratio,
hangoverActive,
barrier: barrier.snapshot(),
},
};
}
const accepted = heldAudio.push(frame);
if (accepted) {
metrics.heldFramesAccepted++;
return {
status: "buffered",
reason: "assistant_playback_barrier",
lane,
queueDepth: uplinkQueue.length,
inflight,
held: heldAudio.snapshot(),
diagnostics: {
bandEnergy,
ratio,
hangoverActive,
barrier: barrier.snapshot(),
},
};
}
metrics.heldFramesDropped++;
return {
status: "dropped",
reason: "held_buffer_overflow",
lane,
queueDepth: uplinkQueue.length,
inflight,
held: heldAudio.snapshot(),
diagnostics: {
bandEnergy,
ratio,
hangoverActive,
barrier: barrier.snapshot(),
},
};
}
if (heldAudio.snapshot().frameCount > 0) {
await maybeFlushHeldAudio(frame);
}
transitionTurnState(
!isSpeech && hangoverActive ? "hangover" : "speaking",
!isSpeech && hangoverActive ? "hangover_active" : "speech_admitted",
frame
);
if (inflight < config.maxConcurrentUplinks && uplinkQueue.length === 0) {
metrics.framesAdmittedImmediate++;
const result = await performUplink(frame, lane);
return {
...result,
queueDepth: uplinkQueue.length,
inflight,
diagnostics: {
bandEnergy,
ratio,
hangoverActive,
barrier: barrier.snapshot(),
},
};
}
if (uplinkQueue.length >= config.maxQueueDepth) {
if (config.overflowPolicy === "drop_oldest" && uplinkQueue.length > 0) {
uplinkQueue.shift();
} else {
metrics.framesDroppedQueueBackpressure++;
return {
status: "dropped",
reason: "queue_backpressure",
lane,
queueDepth: uplinkQueue.length,
inflight,
diagnostics: {
bandEnergy,
ratio,
hangoverActive,
barrier: barrier.snapshot(),
},
};
}
}
uplinkQueue.push({
frame,
lane,
queuedAtMs: nowMs(),
});
metrics.framesQueued++;
void pumpUplinkQueue();
return {
status: "queued",
reason: isSpeech ? "speech" : "hangover",
lane,
queueDepth: uplinkQueue.length,
inflight,
diagnostics: {
bandEnergy,
ratio,
hangoverActive,
barrier: barrier.snapshot(),
},
};
}
function onRealtimeEvent(serverEvent: RealtimeServerEvent): void {
barrier.onRealtimeEvent(serverEvent);
if (!barrier.isClosed()) {
void maybeFlushHeldAudio();
}
}
function markAssistantPlaybackActivity(): void {
barrier.markAssistantPlaybackActivity(nowMs());
}
async function flush(): Promise<void> {
while (!closed) {
const heldCount = heldAudio.snapshot().frameCount;
if (
heldCount === 0 &&
uplinkQueue.length === 0 &&
inflight === 0 &&
!flushInProgress
) {
break;
}
if (!barrier.isClosed()) {
await maybeFlushHeldAudio();
}
await new Promise((resolve) => setTimeout(resolve, 10));
}
}
function resetMetrics(): void {
for (const key of Object.keys(metrics) as Array<keyof SpeechGateMetrics>) {
metrics[key] = 0;
}
}
function getSnapshot(): GateSnapshot {
const avgUplinkLatencyMs =
metrics.uplinksSucceeded > 0
? metrics.totalUplinkLatencyMs / metrics.uplinksSucceeded
: 0;
return {
closed,
turnState,
noiseFloor,
queueDepth: uplinkQueue.length,
inflight,
flushInProgress,
held: heldAudio.snapshot(),
barrier: barrier.snapshot(),
avgUplinkLatencyMs,
metrics: { ...metrics },
config: { ...config },
};
}
function close(): void {
closed = true;
uplinkQueue.length = 0;
heldAudio.clear();
transitionTurnState("idle", "gate_closed");
}
function isClosed(): boolean {
return closed;
}
return {
processFrame,
onRealtimeEvent,
markAssistantPlaybackActivity,
maybeFlushHeldAudio,
getSnapshot,
resetMetrics,
flush,
close,
isClosed,
};
}