Queues API
The @cloudwerk/queue package provides a convention-based queue system for processing background jobs with type safety, schema validation, and dead letter queues.
Installation
Section titled “Installation”pnpm add @cloudwerk/queuedefineQueue()
Section titled “defineQueue()”Creates a typed queue consumer.
import { defineQueue } from '@cloudwerk/queue'
export default defineQueue<MessageType>({ process: (message: QueueMessage<MessageType>) => Awaitable<void>, // OR processBatch?: (messages: QueueMessage<MessageType>[]) => Awaitable<void>, schema?: ZodSchema<MessageType>, config?: QueueConfig, onError?: (error: Error, message: QueueMessage<MessageType>) => Awaitable<void>,})Parameters
Section titled “Parameters”| Parameter | Type | Description |
|---|---|---|
process | Function | Handler for individual messages |
processBatch | Function | Handler for batch processing (alternative to process) |
schema | ZodSchema | Optional Zod schema for message validation |
config | QueueConfig | Optional queue configuration |
onError | Function | Optional error handler |
Returns
Section titled “Returns”Returns a QueueDefinition object that Cloudwerk registers automatically.
QueueConfig
Section titled “QueueConfig”Configuration options for queue behavior.
interface QueueConfig { batchSize?: number // Max messages per batch (1-100, default: 10) maxRetries?: number // Max retry attempts (0-100, default: 3) retryDelay?: string // Delay between retries (default: '1m') deadLetterQueue?: string // DLQ for failed messages batchTimeout?: string // Max wait time to fill batch (default: '5s')}Duration Format
Section titled “Duration Format”| Format | Description |
|---|---|
'30s' | 30 seconds |
'5m' | 5 minutes |
'1h' | 1 hour |
Example
Section titled “Example”export default defineQueue<OrderMessage>({ config: { batchSize: 25, maxRetries: 5, retryDelay: '2m', deadLetterQueue: 'orders-dlq', batchTimeout: '10s', }, async process(message) { await processOrder(message.body) message.ack() },})QueueMessage
Section titled “QueueMessage”The message object passed to process handlers.
interface QueueMessage<T> { readonly id: string // Unique message ID readonly body: T // Message payload (validated if schema provided) readonly timestamp: Date // When message was originally sent readonly attempts: number // Current delivery attempt (1-based)
ack(): void // Acknowledge successful processing retry(options?: RetryOptions): void // Request retry deadLetter(reason?: string): void // Send to dead letter queue}
interface RetryOptions { delaySeconds?: number // Delay before retry}Mark the message as successfully processed. The message is removed from the queue.
async process(message) { await processWork(message.body) message.ack() // Remove from queue}retry()
Section titled “retry()”Request that the message be redelivered. Use for transient failures.
async process(message) { try { await processWork(message.body) message.ack() } catch (error) { if (isTransientError(error)) { message.retry() // Default retry delay // OR message.retry({ delaySeconds: 60 }) // Custom delay } }}deadLetter()
Section titled “deadLetter()”Send the message to the dead letter queue. Use for permanent failures.
async process(message) { try { await processWork(message.body) message.ack() } catch (error) { if (isPermanentError(error)) { message.deadLetter(error.message) } else { message.retry() } }}Schema Validation
Section titled “Schema Validation”Use Zod schemas for runtime message validation.
import { defineQueue } from '@cloudwerk/queue'import { z } from 'zod'
const EmailSchema = z.object({ to: z.string().email(), subject: z.string().min(1), body: z.string(), priority: z.enum(['low', 'normal', 'high']).default('normal'),})
type EmailMessage = z.infer<typeof EmailSchema>
export default defineQueue<EmailMessage>({ schema: EmailSchema, async process(message) { // message.body is validated and typed const { to, subject, body, priority } = message.body await sendEmail(to, subject, body, priority) message.ack() },})Batch Processing
Section titled “Batch Processing”Process multiple messages together for better throughput.
export default defineQueue<AnalyticsEvent>({ config: { batchSize: 100, batchTimeout: '30s', }, async processBatch(messages) { // Process all messages together const events = messages.map(m => m.body)
try { await batchInsertEvents(events)
// Acknowledge all on success for (const msg of messages) { msg.ack() } } catch (error) { // Retry all on failure for (const msg of messages) { msg.retry() } } },})Mixed Results
Section titled “Mixed Results”Handle individual successes and failures in a batch:
async processBatch(messages) { const results = await Promise.allSettled( messages.map(async (msg) => { await processOne(msg.body) return msg }) )
for (let i = 0; i < results.length; i++) { const result = results[i] const message = messages[i]
if (result.status === 'fulfilled') { message.ack() } else { if (message.attempts >= 3) { message.deadLetter(result.reason.message) } else { message.retry() } } }}Error Handler
Section titled “Error Handler”The optional onError callback is invoked when message processing fails.
export default defineQueue<TaskMessage>({ async process(message) { await processTask(message.body) message.ack() }, async onError(error, message) { console.error(`Task failed (attempt ${message.attempts}):`, error)
// Log for investigation await logFailedMessage({ queueMessage: message, error: error.message, stack: error.stack, })
// Alert on repeated failures if (message.attempts >= 3) { await alertOps(`Task ${message.id} failed ${message.attempts} times`) } },})Typed Producers
Section titled “Typed Producers”Send messages to queues with type safety.
queues Proxy
Section titled “queues Proxy”import { queues } from '@cloudwerk/core/bindings'
// Single messageawait queues.email.send({ subject: 'Welcome!', body: 'Thanks for signing up.',})
// With optionsawait queues.email.send(message, { delaySeconds: 60, contentType: 'json',})sendBatch()
Section titled “sendBatch()”Send multiple messages at once.
import { queues } from '@cloudwerk/core/bindings'
await queues.notifications.sendBatch([ { userId: '1', type: 'email', message: 'Hello' }, { userId: '2', type: 'push', message: 'Hello' },])
// With optionsawait queues.notifications.sendBatch(messages, { delaySeconds: 30,})SendOptions
Section titled “SendOptions”interface SendOptions { delaySeconds?: number // Delay before processing (0-43200, max 12 hours) contentType?: ContentType // Message encoding}
type ContentType = 'json' | 'text' | 'bytes' | 'v8'Queue Utilities
Section titled “Queue Utilities”getQueue()
Section titled “getQueue()”Get a typed queue producer by name.
import { getQueue } from '@cloudwerk/core/bindings'
interface EmailMessage { to: string subject: string body: string}
const emailQueue = getQueue<EmailMessage>('email')await emailQueue.send({ to: '...', subject: '...', body: '...' })hasQueue()
Section titled “hasQueue()”Check if a queue exists.
import { hasQueue } from '@cloudwerk/core/bindings'
if (hasQueue('email')) { await queues.email.send(message)}getQueueNames()
Section titled “getQueueNames()”List all available queue names.
import { getQueueNames } from '@cloudwerk/core/bindings'
const available = getQueueNames()// ['email', 'notifications', 'analytics']Dead Letter Queue Consumer
Section titled “Dead Letter Queue Consumer”Create a consumer for dead letter messages.
// app/queues/orders-dlq.tsimport { defineQueue } from '@cloudwerk/queue'
interface DeadLetterMessage<T = unknown> { originalQueue: string originalMessage: T error: string attempts: number failedAt: string}
export default defineQueue<DeadLetterMessage>({ async process(message) { const { originalQueue, originalMessage, error, attempts } = message.body
// Store for investigation await db.insert('failed_jobs', { queue: originalQueue, payload: JSON.stringify(originalMessage), error, attempts, failed_at: message.body.failedAt, })
// Alert for critical queues if (originalQueue === 'payments') { await alertOps(`Payment processing failed: ${error}`) }
message.ack() },})Testing
Section titled “Testing”mockQueueMessage()
Section titled “mockQueueMessage()”Create mock queue messages for testing.
import { mockQueueMessage } from '@cloudwerk/queue/testing'
const message = mockQueueMessage({ id: 'msg-123', attempts: 1,})Testing Example
Section titled “Testing Example”import { describe, it, expect, vi } from 'vitest'import { mockQueueMessage } from '@cloudwerk/queue/testing'import emailQueue from './email'
describe('email queue', () => { it('should send email and acknowledge', async () => { const sendEmail = vi.fn().mockResolvedValue(true) const message = mockQueueMessage({ })
await emailQueue.process(message)
expect(message.ack).toHaveBeenCalled() })
it('should retry on transient error', async () => { const message = mockQueueMessage({ attempts: 1, })
// Simulate transient error vi.mocked(sendEmail).mockRejectedValueOnce(new Error('Network error'))
await emailQueue.process(message)
expect(message.retry).toHaveBeenCalled() })})Error Classes
Section titled “Error Classes”QueueError
Section titled “QueueError”Base error class for queue failures.
import { QueueError } from '@cloudwerk/queue'
class QueueError extends Error { readonly code: string readonly queueName: string readonly messageId?: string}SchemaValidationError
Section titled “SchemaValidationError”Thrown when message fails schema validation.
import { SchemaValidationError } from '@cloudwerk/queue'
class SchemaValidationError extends QueueError { readonly code: 'SCHEMA_VALIDATION_ERROR' readonly issues: ZodIssue[]}DeadLetterError
Section titled “DeadLetterError”Thrown when sending to DLQ fails.
import { DeadLetterError } from '@cloudwerk/queue'
class DeadLetterError extends QueueError { readonly code: 'DEAD_LETTER_ERROR' readonly originalError: Error}CLI Commands
Section titled “CLI Commands”List Queues
Section titled “List Queues”cloudwerk queues listShow Queue Details
Section titled “Show Queue Details”cloudwerk queues info <name>Generate Types
Section titled “Generate Types”Generate TypeScript types for all queues.
cloudwerk queues generate-typesThis creates .cloudwerk/types/queues.d.ts with typed producers.
Type Definitions
Section titled “Type Definitions”QueueDefinition
Section titled “QueueDefinition”interface QueueDefinition<T = unknown> { process?: (message: QueueMessage<T>) => Awaitable<void> processBatch?: (messages: QueueMessage<T>[]) => Awaitable<void> schema?: ZodSchema<T> config?: QueueConfig onError?: (error: Error, message: QueueMessage<T>) => Awaitable<void>}Queue (Producer)
Section titled “Queue (Producer)”interface Queue<T = unknown> { send(message: T, options?: SendOptions): Promise<void> sendBatch(messages: T[], options?: SendOptions): Promise<void>}Generated Types
Section titled “Generated Types”After running generate-types, you get full type safety:
// .cloudwerk/types/queues.d.ts (auto-generated)declare module '@cloudwerk/core/bindings' { interface CloudwerkQueues { email: Queue<EmailMessage> notifications: Queue<NotificationMessage> analytics: Queue<AnalyticsEvent> }}Limits
Section titled “Limits”| Limit | Value |
|---|---|
| Max batch size | 100 messages |
| Max message size | 128 KB |
| Max delay | 12 hours (43200 seconds) |
| Message retention | 4 days |
| Max retries | 100 |
Next Steps
Section titled “Next Steps”- Queues Guide - Patterns and best practices
- Triggers API - Queue trigger handlers
- Services API - Background job services