Skip to content

Queues API

The @cloudwerk/queue package provides a convention-based queue system for processing background jobs with type safety, schema validation, and dead letter queues.

Terminal window
pnpm add @cloudwerk/queue

Creates a typed queue consumer.

import { defineQueue } from '@cloudwerk/queue'
export default defineQueue<MessageType>({
process: (message: QueueMessage<MessageType>) => Awaitable<void>,
// OR
processBatch?: (messages: QueueMessage<MessageType>[]) => Awaitable<void>,
schema?: ZodSchema<MessageType>,
config?: QueueConfig,
onError?: (error: Error, message: QueueMessage<MessageType>) => Awaitable<void>,
})
ParameterTypeDescription
processFunctionHandler for individual messages
processBatchFunctionHandler for batch processing (alternative to process)
schemaZodSchemaOptional Zod schema for message validation
configQueueConfigOptional queue configuration
onErrorFunctionOptional error handler

Returns a QueueDefinition object that Cloudwerk registers automatically.


Configuration options for queue behavior.

interface QueueConfig {
batchSize?: number // Max messages per batch (1-100, default: 10)
maxRetries?: number // Max retry attempts (0-100, default: 3)
retryDelay?: string // Delay between retries (default: '1m')
deadLetterQueue?: string // DLQ for failed messages
batchTimeout?: string // Max wait time to fill batch (default: '5s')
}
FormatDescription
'30s'30 seconds
'5m'5 minutes
'1h'1 hour
export default defineQueue<OrderMessage>({
config: {
batchSize: 25,
maxRetries: 5,
retryDelay: '2m',
deadLetterQueue: 'orders-dlq',
batchTimeout: '10s',
},
async process(message) {
await processOrder(message.body)
message.ack()
},
})

The message object passed to process handlers.

interface QueueMessage<T> {
readonly id: string // Unique message ID
readonly body: T // Message payload (validated if schema provided)
readonly timestamp: Date // When message was originally sent
readonly attempts: number // Current delivery attempt (1-based)
ack(): void // Acknowledge successful processing
retry(options?: RetryOptions): void // Request retry
deadLetter(reason?: string): void // Send to dead letter queue
}
interface RetryOptions {
delaySeconds?: number // Delay before retry
}

Mark the message as successfully processed. The message is removed from the queue.

async process(message) {
await processWork(message.body)
message.ack() // Remove from queue
}

Request that the message be redelivered. Use for transient failures.

async process(message) {
try {
await processWork(message.body)
message.ack()
} catch (error) {
if (isTransientError(error)) {
message.retry() // Default retry delay
// OR
message.retry({ delaySeconds: 60 }) // Custom delay
}
}
}

Send the message to the dead letter queue. Use for permanent failures.

async process(message) {
try {
await processWork(message.body)
message.ack()
} catch (error) {
if (isPermanentError(error)) {
message.deadLetter(error.message)
} else {
message.retry()
}
}
}

Use Zod schemas for runtime message validation.

import { defineQueue } from '@cloudwerk/queue'
import { z } from 'zod'
const EmailSchema = z.object({
to: z.string().email(),
subject: z.string().min(1),
body: z.string(),
priority: z.enum(['low', 'normal', 'high']).default('normal'),
})
type EmailMessage = z.infer<typeof EmailSchema>
export default defineQueue<EmailMessage>({
schema: EmailSchema,
async process(message) {
// message.body is validated and typed
const { to, subject, body, priority } = message.body
await sendEmail(to, subject, body, priority)
message.ack()
},
})

Process multiple messages together for better throughput.

export default defineQueue<AnalyticsEvent>({
config: {
batchSize: 100,
batchTimeout: '30s',
},
async processBatch(messages) {
// Process all messages together
const events = messages.map(m => m.body)
try {
await batchInsertEvents(events)
// Acknowledge all on success
for (const msg of messages) {
msg.ack()
}
} catch (error) {
// Retry all on failure
for (const msg of messages) {
msg.retry()
}
}
},
})

Handle individual successes and failures in a batch:

async processBatch(messages) {
const results = await Promise.allSettled(
messages.map(async (msg) => {
await processOne(msg.body)
return msg
})
)
for (let i = 0; i < results.length; i++) {
const result = results[i]
const message = messages[i]
if (result.status === 'fulfilled') {
message.ack()
} else {
if (message.attempts >= 3) {
message.deadLetter(result.reason.message)
} else {
message.retry()
}
}
}
}

The optional onError callback is invoked when message processing fails.

export default defineQueue<TaskMessage>({
async process(message) {
await processTask(message.body)
message.ack()
},
async onError(error, message) {
console.error(`Task failed (attempt ${message.attempts}):`, error)
// Log for investigation
await logFailedMessage({
queueMessage: message,
error: error.message,
stack: error.stack,
})
// Alert on repeated failures
if (message.attempts >= 3) {
await alertOps(`Task ${message.id} failed ${message.attempts} times`)
}
},
})

Send messages to queues with type safety.

import { queues } from '@cloudwerk/core/bindings'
// Single message
await queues.email.send({
subject: 'Welcome!',
body: 'Thanks for signing up.',
})
// With options
await queues.email.send(message, {
delaySeconds: 60,
contentType: 'json',
})

Send multiple messages at once.

import { queues } from '@cloudwerk/core/bindings'
await queues.notifications.sendBatch([
{ userId: '1', type: 'email', message: 'Hello' },
{ userId: '2', type: 'push', message: 'Hello' },
])
// With options
await queues.notifications.sendBatch(messages, {
delaySeconds: 30,
})
interface SendOptions {
delaySeconds?: number // Delay before processing (0-43200, max 12 hours)
contentType?: ContentType // Message encoding
}
type ContentType = 'json' | 'text' | 'bytes' | 'v8'

Get a typed queue producer by name.

import { getQueue } from '@cloudwerk/core/bindings'
interface EmailMessage {
to: string
subject: string
body: string
}
const emailQueue = getQueue<EmailMessage>('email')
await emailQueue.send({ to: '...', subject: '...', body: '...' })

Check if a queue exists.

import { hasQueue } from '@cloudwerk/core/bindings'
if (hasQueue('email')) {
await queues.email.send(message)
}

List all available queue names.

import { getQueueNames } from '@cloudwerk/core/bindings'
const available = getQueueNames()
// ['email', 'notifications', 'analytics']

Create a consumer for dead letter messages.

// app/queues/orders-dlq.ts
import { defineQueue } from '@cloudwerk/queue'
interface DeadLetterMessage<T = unknown> {
originalQueue: string
originalMessage: T
error: string
attempts: number
failedAt: string
}
export default defineQueue<DeadLetterMessage>({
async process(message) {
const { originalQueue, originalMessage, error, attempts } = message.body
// Store for investigation
await db.insert('failed_jobs', {
queue: originalQueue,
payload: JSON.stringify(originalMessage),
error,
attempts,
failed_at: message.body.failedAt,
})
// Alert for critical queues
if (originalQueue === 'payments') {
await alertOps(`Payment processing failed: ${error}`)
}
message.ack()
},
})

Create mock queue messages for testing.

import { mockQueueMessage } from '@cloudwerk/queue/testing'
const message = mockQueueMessage({
id: 'msg-123',
body: { to: '[email protected]', subject: 'Test' },
attempts: 1,
})
import { describe, it, expect, vi } from 'vitest'
import { mockQueueMessage } from '@cloudwerk/queue/testing'
import emailQueue from './email'
describe('email queue', () => {
it('should send email and acknowledge', async () => {
const sendEmail = vi.fn().mockResolvedValue(true)
const message = mockQueueMessage({
body: { to: '[email protected]', subject: 'Hello', body: 'Test' },
})
await emailQueue.process(message)
expect(message.ack).toHaveBeenCalled()
})
it('should retry on transient error', async () => {
const message = mockQueueMessage({
body: { to: '[email protected]', subject: 'Hello', body: 'Test' },
attempts: 1,
})
// Simulate transient error
vi.mocked(sendEmail).mockRejectedValueOnce(new Error('Network error'))
await emailQueue.process(message)
expect(message.retry).toHaveBeenCalled()
})
})

Base error class for queue failures.

import { QueueError } from '@cloudwerk/queue'
class QueueError extends Error {
readonly code: string
readonly queueName: string
readonly messageId?: string
}

Thrown when message fails schema validation.

import { SchemaValidationError } from '@cloudwerk/queue'
class SchemaValidationError extends QueueError {
readonly code: 'SCHEMA_VALIDATION_ERROR'
readonly issues: ZodIssue[]
}

Thrown when sending to DLQ fails.

import { DeadLetterError } from '@cloudwerk/queue'
class DeadLetterError extends QueueError {
readonly code: 'DEAD_LETTER_ERROR'
readonly originalError: Error
}

Terminal window
cloudwerk queues list
Terminal window
cloudwerk queues info <name>

Generate TypeScript types for all queues.

Terminal window
cloudwerk queues generate-types

This creates .cloudwerk/types/queues.d.ts with typed producers.


interface QueueDefinition<T = unknown> {
process?: (message: QueueMessage<T>) => Awaitable<void>
processBatch?: (messages: QueueMessage<T>[]) => Awaitable<void>
schema?: ZodSchema<T>
config?: QueueConfig
onError?: (error: Error, message: QueueMessage<T>) => Awaitable<void>
}
interface Queue<T = unknown> {
send(message: T, options?: SendOptions): Promise<void>
sendBatch(messages: T[], options?: SendOptions): Promise<void>
}

After running generate-types, you get full type safety:

// .cloudwerk/types/queues.d.ts (auto-generated)
declare module '@cloudwerk/core/bindings' {
interface CloudwerkQueues {
email: Queue<EmailMessage>
notifications: Queue<NotificationMessage>
analytics: Queue<AnalyticsEvent>
}
}

LimitValue
Max batch size100 messages
Max message size128 KB
Max delay12 hours (43200 seconds)
Message retention4 days
Max retries100