Queues
Cloudflare Queues allow you to process tasks asynchronously in the background. Cloudwerk provides seamless integration for sending and consuming queue messages.
Getting Started
Section titled “Getting Started”Create a Queue
Section titled “Create a Queue”-
Create a queue via Wrangler:
Terminal window wrangler queues create my-queue -
Add the queue binding to
wrangler.toml:[[queues.producers]]binding = "MY_QUEUE"queue = "my-queue"[[queues.consumers]]queue = "my-queue"max_batch_size = 10max_batch_timeout = 30 -
Configure Cloudwerk:
// cloudwerk.config.tsimport { defineConfig } from '@cloudwerk/core';export default defineConfig({queues: {MY_QUEUE: {handler: './workers/queue-handler.ts',},},});
Sending Messages
Section titled “Sending Messages”Basic Message
Section titled “Basic Message”// app/api/orders/route.tsimport { json } from '@cloudwerk/core';
export async function POST(request: Request, { context }: CloudwerkHandlerContext) { const order = await request.json();
// Save order to database const savedOrder = await context.db .insertInto('orders') .values(order) .returning(['id']) .executeTakeFirst();
// Queue background processing await context.queues.MY_QUEUE.send({ type: 'process_order', orderId: savedOrder.id, });
return json({ orderId: savedOrder.id }, { status: 201 });}Batch Messages
Section titled “Batch Messages”// Send multiple messagesawait context.queues.MY_QUEUE.sendBatch([]);Delayed Messages
Section titled “Delayed Messages”// Delay message by 60 secondsawait context.queues.MY_QUEUE.send( { type: 'reminder', userId: user.id }, { delaySeconds: 60 });Consuming Messages
Section titled “Consuming Messages”Queue Handler
Section titled “Queue Handler”Create a queue handler to process messages:
// workers/queue-handler.tsimport type { QueueHandler, Message } from '@cloudwerk/core';
interface QueueMessage { type: string; [key: string]: unknown;}
export default { async queue(batch: Message<QueueMessage>[], env: Env, ctx: ExecutionContext) { for (const message of batch) { try { await processMessage(message.body, env); message.ack(); } catch (error) { console.error('Failed to process message:', error); message.retry(); } } },} satisfies QueueHandler;
async function processMessage(data: QueueMessage, env: Env) { switch (data.type) { case 'process_order': await processOrder(data.orderId as string, env); break; case 'send_email': await sendEmail(data as EmailMessage, env); break; default: console.warn('Unknown message type:', data.type); }}Message Acknowledgment
Section titled “Message Acknowledgment”export default { async queue(batch: Message[], env: Env) { for (const message of batch) { try { await processMessage(message.body); // Mark message as processed message.ack(); } catch (error) { if (isRetryable(error)) { // Retry the message later message.retry({ delaySeconds: 30 }); } else { // Don't retry, acknowledge to remove from queue message.ack(); // Log to dead letter handling await logFailedMessage(message, error); } } } },};Common Patterns
Section titled “Common Patterns”Email Queue
Section titled “Email Queue”// workers/email-queue.tsimport type { Message } from '@cloudwerk/core';
interface EmailMessage { to: string; template: string; data: Record<string, unknown>;}
export default { async queue(batch: Message<EmailMessage>[], env: Env) { for (const message of batch) { const { to, template, data } = message.body;
try { await sendEmail(to, template, data, env); message.ack(); } catch (error) { console.error(`Failed to send email to ${to}:`, error); message.retry({ delaySeconds: 60 }); } } },};
async function sendEmail(to: string, template: string, data: Record<string, unknown>, env: Env) { const response = await fetch('https://api.sendgrid.com/v3/mail/send', { method: 'POST', headers: { 'Authorization': `Bearer ${env.SENDGRID_API_KEY}`, 'Content-Type': 'application/json', }, body: JSON.stringify({ personalizations: [{ to: [{ email: to }] }], template_id: template, dynamic_template_data: data, }), });
if (!response.ok) { throw new Error(`SendGrid error: ${response.status}`); }}Image Processing Queue
Section titled “Image Processing Queue”// workers/image-queue.tsinterface ImageJob { imageKey: string; sizes: { width: number; height: number; suffix: string }[];}
export default { async queue(batch: Message<ImageJob>[], env: Env) { for (const message of batch) { const { imageKey, sizes } = message.body;
try { // Get original image from R2 const original = await env.R2.get(imageKey); if (!original) { message.ack(); continue; }
// Process each size for (const size of sizes) { const resized = await resizeImage(original, size.width, size.height); const newKey = imageKey.replace(/(\.[^.]+)$/, `_${size.suffix}$1`); await env.R2.put(newKey, resized); }
message.ack(); } catch (error) { console.error('Image processing failed:', error); message.retry(); } } },};Webhook Delivery Queue
Section titled “Webhook Delivery Queue”// workers/webhook-queue.tsinterface WebhookJob { url: string; event: string; payload: unknown; retries: number;}
export default { async queue(batch: Message<WebhookJob>[], env: Env) { for (const message of batch) { const { url, event, payload, retries = 0 } = message.body;
try { const response = await fetch(url, { method: 'POST', headers: { 'Content-Type': 'application/json', 'X-Webhook-Event': event, 'X-Webhook-Signature': await signPayload(payload, env.WEBHOOK_SECRET), }, body: JSON.stringify(payload), });
if (response.ok) { message.ack(); } else if (retries < 5) { // Exponential backoff const delay = Math.pow(2, retries) * 60; message.retry({ delaySeconds: delay }); } else { message.ack(); await logWebhookFailure(url, event, payload, env); } } catch (error) { if (retries < 5) { message.retry({ delaySeconds: 60 }); } else { message.ack(); } } } },};Error Handling
Section titled “Error Handling”Retry Strategies
Section titled “Retry Strategies”export default { async queue(batch: Message[], env: Env) { for (const message of batch) { const attempts = message.attempts ?? 1;
try { await processMessage(message.body); message.ack(); } catch (error) { if (attempts >= 5) { // Max retries reached, dead letter await env.DEAD_LETTER_QUEUE.send({ originalMessage: message.body, error: error.message, attempts, }); message.ack(); } else { // Exponential backoff: 1m, 2m, 4m, 8m, 16m const delaySeconds = Math.pow(2, attempts - 1) * 60; message.retry({ delaySeconds }); } } } },};Dead Letter Queue
Section titled “Dead Letter Queue”// Set up dead letter handling// wrangler.toml[[queues.producers]]binding = "DEAD_LETTER"queue = "dead-letter-queue"
[[queues.consumers]]queue = "dead-letter-queue"max_batch_size = 10// workers/dead-letter-handler.tsexport default { async queue(batch: Message[], env: Env) { for (const message of batch) { // Store failed message for investigation await env.DB.prepare(` INSERT INTO failed_jobs (payload, error, created_at) VALUES (?, ?, datetime('now')) `).bind( JSON.stringify(message.body.originalMessage), message.body.error ).run();
// Alert operations team await sendAlert('Dead letter received', message.body);
message.ack(); } },};Best Practices
Section titled “Best Practices”Idempotent Processing
Section titled “Idempotent Processing”export default { async queue(batch: Message[], env: Env) { for (const message of batch) { const { orderId, action } = message.body;
// Check if already processed const existing = await env.DB.prepare(` SELECT id FROM processed_messages WHERE message_id = ? `).bind(message.id).first();
if (existing) { message.ack(); continue; }
try { await processOrder(orderId, action, env);
// Mark as processed await env.DB.prepare(` INSERT INTO processed_messages (message_id, processed_at) VALUES (?, datetime('now')) `).bind(message.id).run();
message.ack(); } catch (error) { message.retry(); } } },};Next Steps
Section titled “Next Steps”- Durable Objects - Stateful edge computing
- Triggers - Scheduled jobs and cron
- API Reference - Queue API details