Skip to content

Cloudwerk provides a convention-based queue system through @cloudwerk/queue. Define type-safe consumers with schema validation, dead letter queues, and automatic type generation.

  1. Create a queue consumer:

    // app/queues/email.ts
    import { defineQueue } from '@cloudwerk/queue'
    interface EmailMessage {
    to: string
    subject: string
    body: string
    }
    export default defineQueue<EmailMessage>({
    async process(message) {
    await sendEmail(message.body)
    message.ack()
    },
    })
  2. Send messages from your routes:

    // app/api/subscribe/route.ts
    import { queues } from '@cloudwerk/core/bindings'
    import { json } from '@cloudwerk/core'
    export async function POST(request: Request) {
    const { email } = await request.json()
    await queues.email.send({
    to: email,
    subject: 'Welcome!',
    body: 'Thanks for subscribing.',
    })
    return json({ success: true })
    }
  3. Queue consumers are automatically discovered and registered during build.

Queue consumers are defined in the app/queues/ directory:

  • Directoryapp/queues/
    • email.ts # Email queue consumer
    • image-processing.ts # Image processing queue
    • notifications.ts # Notifications queue

Naming Convention:

  • File names are kebab-case → converted to camelCase queue names
  • email.ts → queue name email
  • image-processing.ts → queue name imageProcessing
// app/queues/notifications.ts
import { defineQueue } from '@cloudwerk/queue'
interface Notification {
userId: string
type: 'email' | 'push' | 'sms'
message: string
}
export default defineQueue<Notification>({
async process(message) {
const { userId, type, message: content } = message.body
switch (type) {
case 'email':
await sendEmail(userId, content)
break
case 'push':
await sendPush(userId, content)
break
case 'sms':
await sendSMS(userId, content)
break
}
message.ack()
},
})

Use Zod for runtime message validation:

// app/queues/email.ts
import { defineQueue } from '@cloudwerk/queue'
import { z } from 'zod'
const EmailSchema = z.object({
to: z.string().email(),
subject: z.string().min(1),
body: z.string(),
priority: z.enum(['low', 'normal', 'high']).default('normal'),
})
type EmailMessage = z.infer<typeof EmailSchema>
export default defineQueue<EmailMessage>({
schema: EmailSchema, // Validates at runtime
async process(message) {
// message.body is validated and typed
await sendEmail(message.body)
message.ack()
},
})
export default defineQueue<OrderMessage>({
config: {
batchSize: 25, // Max messages per batch (default: 10)
maxRetries: 5, // Max retry attempts (default: 3)
retryDelay: '2m', // Delay between retries (default: '1m')
deadLetterQueue: 'orders-dlq', // DLQ for failed messages
batchTimeout: '10s', // Max wait time to fill batch
},
async process(message) {
await processOrder(message.body)
message.ack()
},
})
export default defineQueue<TaskMessage>({
async process(message) {
await processTask(message.body)
message.ack()
},
async onError(error, message) {
console.error(`Failed to process task (attempt ${message.attempts}):`, error)
if (message.attempts >= 3) {
// Log for investigation
await logFailedMessage(message, error)
}
},
})

Import the queues proxy to send type-safe messages:

import { queues } from '@cloudwerk/core/bindings'
// Single message
await queues.email.send({
subject: 'Welcome!',
body: 'Thanks for signing up.',
})
// With delay
await queues.email.send(message, { delaySeconds: 60 })
// Batch send
await queues.notifications.sendBatch([
{ userId: '1', type: 'email', message: 'Hello' },
{ userId: '2', type: 'push', message: 'Hello' },
])
interface SendOptions {
delaySeconds?: number // Delay before processing
contentType?: 'json' | 'text' | 'bytes' | 'v8' // Default: 'json'
}
interface QueueMessage<T> {
readonly id: string // Unique message ID
readonly body: T // Message payload
readonly timestamp: Date // When originally sent
readonly attempts: number // Delivery attempt count
ack(): void // Mark as processed
retry(options?: { delaySeconds?: number }): void // Request retry
deadLetter(reason?: string): void // Send to DLQ
}
async process(message) {
try {
await processWork(message.body)
message.ack() // Remove from queue
} catch (error) {
message.retry() // Try again
}
}
async process(message) {
try {
await processWork(message.body)
message.ack()
} catch (error) {
if (isRetryable(error)) {
// Exponential backoff: 1m, 2m, 4m, 8m...
const delay = Math.pow(2, message.attempts - 1) * 60
message.retry({ delaySeconds: delay })
} else {
message.deadLetter(error.message)
}
}
}
async process(message) {
try {
await processWork(message.body)
message.ack()
} catch (error) {
if (message.attempts >= 5) {
// Max retries reached
message.deadLetter(`Failed after ${message.attempts} attempts: ${error.message}`)
} else {
message.retry()
}
}
}

For high-throughput scenarios, use batch processing:

// app/queues/analytics.ts
import { defineQueue } from '@cloudwerk/queue'
interface AnalyticsEvent {
userId: string
event: string
properties: Record<string, unknown>
}
export default defineQueue<AnalyticsEvent>({
config: {
batchSize: 100,
batchTimeout: '30s',
},
async processBatch(messages) {
// Process all messages together
const events = messages.map(m => m.body)
try {
await batchInsertEvents(events)
// Acknowledge all
for (const msg of messages) {
msg.ack()
}
} catch (error) {
// Retry all on failure
for (const msg of messages) {
msg.retry()
}
}
},
})

Handle individual successes and failures:

async processBatch(messages) {
const results = await Promise.allSettled(
messages.map(async (msg) => {
await processOne(msg.body)
return msg
})
)
for (const result of results) {
if (result.status === 'fulfilled') {
result.value.ack()
} else {
// Find the failed message and retry
const failedMsg = messages.find(m => m.id === result.reason.messageId)
failedMsg?.retry()
}
}
}
export default defineQueue<OrderMessage>({
config: {
maxRetries: 5,
deadLetterQueue: 'orders-dlq',
},
async process(message) {
await processOrder(message.body)
message.ack()
},
})

Create a consumer for the dead letter queue:

// app/queues/orders-dlq.ts
import { defineQueue } from '@cloudwerk/queue'
interface DeadLetterMessage<T> {
originalQueue: string
originalMessage: T
error: string
attempts: number
failedAt: string
}
export default defineQueue<DeadLetterMessage<OrderMessage>>({
async process(message) {
const { originalQueue, originalMessage, error, attempts } = message.body
// Log for investigation
await db.insert('failed_jobs', {
queue: originalQueue,
payload: JSON.stringify(originalMessage),
error,
attempts,
failed_at: message.body.failedAt,
})
// Alert if critical
if (originalQueue === 'payments') {
await alertOps(`Payment processing failed: ${error}`)
}
message.ack()
},
})

Send directly to DLQ with custom reason:

async process(message) {
try {
await processOrder(message.body)
message.ack()
} catch (error) {
if (isUnrecoverable(error)) {
message.deadLetter(`Unrecoverable: ${error.message}`)
} else {
message.retry()
}
}
}
// app/queues/email.ts
import { defineQueue } from '@cloudwerk/queue'
import { z } from 'zod'
const EmailSchema = z.discriminatedUnion('template', [
z.object({
template: z.literal('welcome'),
to: z.string().email(),
name: z.string(),
}),
z.object({
template: z.literal('password-reset'),
to: z.string().email(),
resetToken: z.string(),
}),
z.object({
template: z.literal('order-confirmation'),
to: z.string().email(),
orderId: z.string(),
items: z.array(z.object({
name: z.string(),
quantity: z.number(),
price: z.number(),
})),
}),
])
type EmailMessage = z.infer<typeof EmailSchema>
export default defineQueue<EmailMessage>({
schema: EmailSchema,
config: {
maxRetries: 3,
deadLetterQueue: 'email-dlq',
},
async process(message) {
const { template, to, ...data } = message.body
await sendTemplateEmail(to, template, data)
message.ack()
},
})
// app/queues/image-processing.ts
interface ImageJob {
key: string
operations: Array<{
type: 'resize' | 'crop' | 'compress'
params: Record<string, unknown>
}>
outputKey: string
}
export default defineQueue<ImageJob>({
config: {
batchSize: 5, // Process 5 images at a time
maxRetries: 3,
},
async process(message) {
const { key, operations, outputKey } = message.body
// Get original from R2
const original = await env.R2.get(key)
if (!original) {
message.ack() // File doesn't exist, skip
return
}
let image = await original.arrayBuffer()
// Apply operations
for (const op of operations) {
image = await applyOperation(image, op)
}
// Store result
await env.R2.put(outputKey, image)
message.ack()
},
})
// app/queues/webhooks.ts
interface WebhookJob {
url: string
event: string
payload: unknown
attempt?: number
}
export default defineQueue<WebhookJob>({
config: {
maxRetries: 5,
retryDelay: '1m',
deadLetterQueue: 'webhooks-dlq',
},
async process(message) {
const { url, event, payload } = message.body
const response = await fetch(url, {
method: 'POST',
headers: {
'Content-Type': 'application/json',
'X-Webhook-Event': event,
'X-Webhook-Signature': await signPayload(payload),
},
body: JSON.stringify(payload),
})
if (response.ok) {
message.ack()
} else if (response.status >= 500) {
// Server error, retry
message.retry()
} else {
// Client error (4xx), don't retry
message.deadLetter(`HTTP ${response.status}: ${response.statusText}`)
}
},
})

Run the type generator to enable TypeScript autocomplete for your queues:

Terminal window
cloudwerk queues generate-types

This creates .cloudwerk/types/queues.d.ts with typed producers for all your queues.

// .cloudwerk/types/queues.d.ts (auto-generated)
declare module '@cloudwerk/core/bindings' {
interface CloudwerkQueues {
email: Queue<EmailMessage>
imageProcessing: Queue<ImageJob>
webhooks: Queue<WebhookJob>
}
}
interface QueueConfig {
batchSize?: number // 1-100, default: 10
maxRetries?: number // 0-100, default: 3
retryDelay?: string // Duration, default: '1m'
deadLetterQueue?: string
batchTimeout?: string // Duration, default: '5s'
}
'30s' → 30 seconds
'5m' → 5 minutes
'1h' → 1 hour
async process(message) {
const { orderId } = message.body
// Check if already processed
const existing = await db.query(
'SELECT id FROM processed_orders WHERE order_id = ?',
[orderId]
)
if (existing.length > 0) {
message.ack() // Already processed
return
}
await processOrder(orderId)
// Mark as processed
await db.run(
'INSERT INTO processed_orders (order_id, processed_at) VALUES (?, ?)',
[orderId, new Date().toISOString()]
)
message.ack()
}
LimitValue
Max batch size100 messages
Max message size128 KB
Max delay12 hours
Message retention4 days