Queues
Cloudwerk provides a convention-based queue system through @cloudwerk/queue. Define type-safe consumers with schema validation, dead letter queues, and automatic type generation.
Quick Start
Section titled “Quick Start”-
Create a queue consumer:
// app/queues/email.tsimport { defineQueue } from '@cloudwerk/queue'interface EmailMessage {to: stringsubject: stringbody: string}export default defineQueue<EmailMessage>({async process(message) {await sendEmail(message.body)message.ack()},}) -
Send messages from your routes:
// app/api/subscribe/route.tsimport { queues } from '@cloudwerk/core/bindings'import { json } from '@cloudwerk/core'export async function POST(request: Request) {const { email } = await request.json()await queues.email.send({to: email,subject: 'Welcome!',body: 'Thanks for subscribing.',})return json({ success: true })} -
Queue consumers are automatically discovered and registered during build.
Convention Structure
Section titled “Convention Structure”Queue consumers are defined in the app/queues/ directory:
Directoryapp/queues/
- email.ts # Email queue consumer
- image-processing.ts # Image processing queue
- notifications.ts # Notifications queue
Naming Convention:
- File names are kebab-case → converted to camelCase queue names
email.ts→ queue nameemailimage-processing.ts→ queue nameimageProcessing
Defining Consumers
Section titled “Defining Consumers”Basic Consumer
Section titled “Basic Consumer”// app/queues/notifications.tsimport { defineQueue } from '@cloudwerk/queue'
interface Notification { userId: string type: 'email' | 'push' | 'sms' message: string}
export default defineQueue<Notification>({ async process(message) { const { userId, type, message: content } = message.body
switch (type) { case 'email': await sendEmail(userId, content) break case 'push': await sendPush(userId, content) break case 'sms': await sendSMS(userId, content) break }
message.ack() },})With Schema Validation
Section titled “With Schema Validation”Use Zod for runtime message validation:
// app/queues/email.tsimport { defineQueue } from '@cloudwerk/queue'import { z } from 'zod'
const EmailSchema = z.object({ to: z.string().email(), subject: z.string().min(1), body: z.string(), priority: z.enum(['low', 'normal', 'high']).default('normal'),})
type EmailMessage = z.infer<typeof EmailSchema>
export default defineQueue<EmailMessage>({ schema: EmailSchema, // Validates at runtime async process(message) { // message.body is validated and typed await sendEmail(message.body) message.ack() },})With Configuration
Section titled “With Configuration”export default defineQueue<OrderMessage>({ config: { batchSize: 25, // Max messages per batch (default: 10) maxRetries: 5, // Max retry attempts (default: 3) retryDelay: '2m', // Delay between retries (default: '1m') deadLetterQueue: 'orders-dlq', // DLQ for failed messages batchTimeout: '10s', // Max wait time to fill batch }, async process(message) { await processOrder(message.body) message.ack() },})With Error Handler
Section titled “With Error Handler”export default defineQueue<TaskMessage>({ async process(message) { await processTask(message.body) message.ack() }, async onError(error, message) { console.error(`Failed to process task (attempt ${message.attempts}):`, error)
if (message.attempts >= 3) { // Log for investigation await logFailedMessage(message, error) } },})Sending Messages
Section titled “Sending Messages”Typed Producers
Section titled “Typed Producers”Import the queues proxy to send type-safe messages:
import { queues } from '@cloudwerk/core/bindings'
// Single messageawait queues.email.send({ subject: 'Welcome!', body: 'Thanks for signing up.',})
// With delayawait queues.email.send(message, { delaySeconds: 60 })
// Batch sendawait queues.notifications.sendBatch([ { userId: '1', type: 'email', message: 'Hello' }, { userId: '2', type: 'push', message: 'Hello' },])Send Options
Section titled “Send Options”interface SendOptions { delaySeconds?: number // Delay before processing contentType?: 'json' | 'text' | 'bytes' | 'v8' // Default: 'json'}Message Handling
Section titled “Message Handling”QueueMessage Interface
Section titled “QueueMessage Interface”interface QueueMessage<T> { readonly id: string // Unique message ID readonly body: T // Message payload readonly timestamp: Date // When originally sent readonly attempts: number // Delivery attempt count
ack(): void // Mark as processed retry(options?: { delaySeconds?: number }): void // Request retry deadLetter(reason?: string): void // Send to DLQ}Processing Patterns
Section titled “Processing Patterns”Acknowledge on Success
Section titled “Acknowledge on Success”async process(message) { try { await processWork(message.body) message.ack() // Remove from queue } catch (error) { message.retry() // Try again }}Retry with Backoff
Section titled “Retry with Backoff”async process(message) { try { await processWork(message.body) message.ack() } catch (error) { if (isRetryable(error)) { // Exponential backoff: 1m, 2m, 4m, 8m... const delay = Math.pow(2, message.attempts - 1) * 60 message.retry({ delaySeconds: delay }) } else { message.deadLetter(error.message) } }}Dead Letter Handling
Section titled “Dead Letter Handling”async process(message) { try { await processWork(message.body) message.ack() } catch (error) { if (message.attempts >= 5) { // Max retries reached message.deadLetter(`Failed after ${message.attempts} attempts: ${error.message}`) } else { message.retry() } }}Batch Processing
Section titled “Batch Processing”For high-throughput scenarios, use batch processing:
// app/queues/analytics.tsimport { defineQueue } from '@cloudwerk/queue'
interface AnalyticsEvent { userId: string event: string properties: Record<string, unknown>}
export default defineQueue<AnalyticsEvent>({ config: { batchSize: 100, batchTimeout: '30s', }, async processBatch(messages) { // Process all messages together const events = messages.map(m => m.body)
try { await batchInsertEvents(events)
// Acknowledge all for (const msg of messages) { msg.ack() } } catch (error) { // Retry all on failure for (const msg of messages) { msg.retry() } } },})Mixed Results
Section titled “Mixed Results”Handle individual successes and failures:
async processBatch(messages) { const results = await Promise.allSettled( messages.map(async (msg) => { await processOne(msg.body) return msg }) )
for (const result of results) { if (result.status === 'fulfilled') { result.value.ack() } else { // Find the failed message and retry const failedMsg = messages.find(m => m.id === result.reason.messageId) failedMsg?.retry() } }}Dead Letter Queues
Section titled “Dead Letter Queues”DLQ Configuration
Section titled “DLQ Configuration”export default defineQueue<OrderMessage>({ config: { maxRetries: 5, deadLetterQueue: 'orders-dlq', }, async process(message) { await processOrder(message.body) message.ack() },})DLQ Consumer
Section titled “DLQ Consumer”Create a consumer for the dead letter queue:
// app/queues/orders-dlq.tsimport { defineQueue } from '@cloudwerk/queue'
interface DeadLetterMessage<T> { originalQueue: string originalMessage: T error: string attempts: number failedAt: string}
export default defineQueue<DeadLetterMessage<OrderMessage>>({ async process(message) { const { originalQueue, originalMessage, error, attempts } = message.body
// Log for investigation await db.insert('failed_jobs', { queue: originalQueue, payload: JSON.stringify(originalMessage), error, attempts, failed_at: message.body.failedAt, })
// Alert if critical if (originalQueue === 'payments') { await alertOps(`Payment processing failed: ${error}`) }
message.ack() },})Manual Dead Letter
Section titled “Manual Dead Letter”Send directly to DLQ with custom reason:
async process(message) { try { await processOrder(message.body) message.ack() } catch (error) { if (isUnrecoverable(error)) { message.deadLetter(`Unrecoverable: ${error.message}`) } else { message.retry() } }}Common Patterns
Section titled “Common Patterns”Email Queue with Templates
Section titled “Email Queue with Templates”// app/queues/email.tsimport { defineQueue } from '@cloudwerk/queue'import { z } from 'zod'
const EmailSchema = z.discriminatedUnion('template', [ z.object({ template: z.literal('welcome'), to: z.string().email(), name: z.string(), }), z.object({ template: z.literal('password-reset'), to: z.string().email(), resetToken: z.string(), }), z.object({ template: z.literal('order-confirmation'), to: z.string().email(), orderId: z.string(), items: z.array(z.object({ name: z.string(), quantity: z.number(), price: z.number(), })), }),])
type EmailMessage = z.infer<typeof EmailSchema>
export default defineQueue<EmailMessage>({ schema: EmailSchema, config: { maxRetries: 3, deadLetterQueue: 'email-dlq', }, async process(message) { const { template, to, ...data } = message.body
await sendTemplateEmail(to, template, data) message.ack() },})Image Processing Pipeline
Section titled “Image Processing Pipeline”// app/queues/image-processing.tsinterface ImageJob { key: string operations: Array<{ type: 'resize' | 'crop' | 'compress' params: Record<string, unknown> }> outputKey: string}
export default defineQueue<ImageJob>({ config: { batchSize: 5, // Process 5 images at a time maxRetries: 3, }, async process(message) { const { key, operations, outputKey } = message.body
// Get original from R2 const original = await env.R2.get(key) if (!original) { message.ack() // File doesn't exist, skip return }
let image = await original.arrayBuffer()
// Apply operations for (const op of operations) { image = await applyOperation(image, op) }
// Store result await env.R2.put(outputKey, image) message.ack() },})Webhook Delivery
Section titled “Webhook Delivery”// app/queues/webhooks.tsinterface WebhookJob { url: string event: string payload: unknown attempt?: number}
export default defineQueue<WebhookJob>({ config: { maxRetries: 5, retryDelay: '1m', deadLetterQueue: 'webhooks-dlq', }, async process(message) { const { url, event, payload } = message.body
const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Webhook-Event': event, 'X-Webhook-Signature': await signPayload(payload), }, body: JSON.stringify(payload), })
if (response.ok) { message.ack() } else if (response.status >= 500) { // Server error, retry message.retry() } else { // Client error (4xx), don't retry message.deadLetter(`HTTP ${response.status}: ${response.statusText}`) } },})Type Generation
Section titled “Type Generation”Run the type generator to enable TypeScript autocomplete for your queues:
cloudwerk queues generate-typesThis creates .cloudwerk/types/queues.d.ts with typed producers for all your queues.
Generated Types
Section titled “Generated Types”// .cloudwerk/types/queues.d.ts (auto-generated)declare module '@cloudwerk/core/bindings' { interface CloudwerkQueues { email: Queue<EmailMessage> imageProcessing: Queue<ImageJob> webhooks: Queue<WebhookJob> }}Configuration Reference
Section titled “Configuration Reference”QueueConfig
Section titled “QueueConfig”interface QueueConfig { batchSize?: number // 1-100, default: 10 maxRetries?: number // 0-100, default: 3 retryDelay?: string // Duration, default: '1m' deadLetterQueue?: string batchTimeout?: string // Duration, default: '5s'}Duration Format
Section titled “Duration Format”'30s' → 30 seconds'5m' → 5 minutes'1h' → 1 hourBest Practices
Section titled “Best Practices”Idempotent Processing
Section titled “Idempotent Processing”async process(message) { const { orderId } = message.body
// Check if already processed const existing = await db.query( 'SELECT id FROM processed_orders WHERE order_id = ?', [orderId] )
if (existing.length > 0) { message.ack() // Already processed return }
await processOrder(orderId)
// Mark as processed await db.run( 'INSERT INTO processed_orders (order_id, processed_at) VALUES (?, ?)', [orderId, new Date().toISOString()] )
message.ack()}Limits
Section titled “Limits”| Limit | Value |
|---|---|
| Max batch size | 100 messages |
| Max message size | 128 KB |
| Max delay | 12 hours |
| Message retention | 4 days |
Next Steps
Section titled “Next Steps”- Queues API Reference - Complete API documentation
- Triggers Guide - Event-driven processing
- Durable Objects - Stateful edge computing