Triggers
Cloudwerk provides a powerful event-driven trigger system through @cloudwerk/trigger. Define handlers for 7 different event sources with automatic configuration, type safety, built-in webhook verification, and observability.
What Are Triggers?
Section titled “What Are Triggers?”Triggers are event handlers that respond to various sources in your Cloudflare infrastructure. Each trigger type maps to a specific Cloudflare feature:
| Trigger Type | Cloudflare Feature | Use Case |
|---|---|---|
scheduled | Cron Triggers | Periodic tasks (cleanup, reports, syncs) |
queue | Queue Consumer | Process async messages |
r2 | R2 Event Notifications | React to file uploads/deletions |
webhook | HTTP Route | Receive external webhooks |
email | Email Workers | Process incoming emails |
d1 | D1 (via triggers) | React to database changes |
tail | Tail Workers | Process logs from other Workers |
Direct Execution vs. Queue Handoff
Section titled “Direct Execution vs. Queue Handoff”Triggers can either execute code directly or hand off work to a queue:
Direct Execution — Handle the event immediately in the trigger handler:
export default defineTrigger({ source: { type: 'r2', bucket: 'uploads', events: ['object-create'] }, async handle(event, ctx) { await processImage(event.key) }})Queue Handoff — Send a message to a queue for async processing:
export default defineTrigger({ source: { type: 'r2', bucket: 'uploads', events: ['object-create'] }, async handle(event, ctx) { await ctx.env.IMAGE_QUEUE.send({ key: event.key, action: event.type, }) }})Choose based on your use case: direct execution for simple, fast operations; queue handoff when you need batching, retries, or want to decouple processing from the trigger.
Quick Start
Section titled “Quick Start”-
Create a trigger file:
// app/triggers/daily-cleanup.tsimport { defineTrigger } from '@cloudwerk/trigger'export default defineTrigger({source: { type: 'scheduled', cron: '0 0 * * *' },async handle(event, ctx) {console.log(`[${ctx.traceId}] Running cleanup...`)await cleanupExpiredSessions(ctx.env.DB)},}) -
Triggers are automatically discovered from
app/triggers/and registered during build. -
Deploy and your trigger runs on schedule.
Convention Structure
Section titled “Convention Structure”Triggers are defined in the app/triggers/ directory:
Directoryapp/triggers/
- daily-cleanup.ts # Scheduled trigger
- process-uploads.ts # R2 trigger
- stripe-webhook.ts # Webhook trigger
- email-handler.ts # Email trigger
- sync-search.ts # D1 trigger
Naming Convention:
- File names are kebab-case → converted to camelCase trigger names
daily-cleanup.ts→ trigger namedailyCleanupprocess-uploads.ts→ trigger nameprocessUploads
Trigger Types
Section titled “Trigger Types”Scheduled (Cron)
Section titled “Scheduled (Cron)”Run tasks on a schedule using cron expressions:
// app/triggers/daily-report.tsimport { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'scheduled', cron: '0 9 * * 1-5' }, // Weekdays at 9 AM UTC async handle(event, ctx) { console.log(`Scheduled time: ${event.scheduledTime}`) await generateDailyReport(ctx.env.DB) },})Cron Syntax
Section titled “Cron Syntax”┌───────────── minute (0 - 59)│ ┌───────────── hour (0 - 23)│ │ ┌───────────── day of month (1 - 31)│ │ │ ┌───────────── month (1 - 12)│ │ │ │ ┌───────────── day of week (0 - 6, Sunday = 0)│ │ │ │ │* * * * *| Pattern | Description |
|---|---|
* * * * * | Every minute |
*/5 * * * * | Every 5 minutes |
0 * * * * | Every hour |
0 0 * * * | Daily at midnight UTC |
0 9 * * 1-5 | Weekdays at 9 AM UTC |
0 0 1 * * | Monthly on the 1st |
Process messages from Cloudflare Queues:
// app/triggers/process-orders.tsimport { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'queue', queue: 'orders', batch: { size: 20, timeout: '10s' }, }, async handle(batch, ctx) { for (const message of batch.messages) { try { await processOrder(message.body, ctx.env) message.ack() } catch (error) { message.retry({ delaySeconds: 60 }) } } },})Batch Configuration
Section titled “Batch Configuration”| Option | Default | Description |
|---|---|---|
size | 10 | Max messages per batch (1-100) |
timeout | ’5s’ | Max wait time to fill batch |
Message Operations
Section titled “Message Operations”message.ack() // Mark as processedmessage.retry() // Retry with default delaymessage.retry({ delaySeconds: 60 }) // Retry with custom delayR2 (Object Storage)
Section titled “R2 (Object Storage)”React to object creation and deletion in R2 buckets:
// app/triggers/process-uploads.tsimport { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'r2', bucket: 'uploads', events: ['object-create'], prefix: 'images/', suffix: '.jpg', }, async handle(event, ctx) { console.log(`New image: ${event.key} (${event.size} bytes)`) await generateThumbnail(event.key, ctx.env.R2) },})R2 Event Types
Section titled “R2 Event Types”| Event | Description |
|---|---|
object-create | Object created or updated |
object-delete | Object deleted |
R2 Event Properties
Section titled “R2 Event Properties”interface R2Event { type: 'object-create' | 'object-delete' bucket: string key: string etag?: string // Create events only size?: number // Create events only uploadedAt?: Date // Create events only}Webhook
Section titled “Webhook”Receive and verify incoming webhooks:
// app/triggers/stripe-webhook.tsimport { defineTrigger, verifiers } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'webhook', path: '/webhooks/stripe', verify: verifiers.stripe(process.env.STRIPE_WEBHOOK_SECRET!), }, async handle(event, ctx) { if (!event.verified) { throw new Error('Signature verification failed') }
switch (event.payload.type) { case 'checkout.session.completed': await handleCheckoutComplete(event.payload.data) break case 'invoice.paid': await handleInvoicePaid(event.payload.data) break } },})Built-in Verifiers
Section titled “Built-in Verifiers”Cloudwerk includes signature verifiers for popular services:
import { verifiers } from '@cloudwerk/trigger'
verifiers.stripe(secret, { tolerance: 300, // Timestamp tolerance in seconds})import { verifiers } from '@cloudwerk/trigger'
verifiers.github(secret)// Supports both x-hub-signature-256 (SHA-256) and x-hub-signature (SHA-1)import { verifiers } from '@cloudwerk/trigger'
verifiers.slack(signingSecret, { tolerance: 300,})import { verifiers } from '@cloudwerk/trigger'
verifiers.shopify(secret)Additional verifiers: verifiers.twilio(), verifiers.linear()
Custom Verifier
Section titled “Custom Verifier”Create a custom HMAC signature verifier:
import { verifiers } from '@cloudwerk/trigger'
verifiers.custom(secret, { header: 'x-webhook-signature', algorithm: 'SHA-256', // 'SHA-256' | 'SHA-1' | 'SHA-512' encoding: 'hex', // 'hex' | 'base64' prefix: 'sha256=', // Optional prefix to strip timestampHeader: 'x-timestamp', timestampTolerance: 300, buildSignatureBase: (body, timestamp) => `${timestamp}.${body}`,})Webhook Event Properties
Section titled “Webhook Event Properties”interface WebhookEvent<T = unknown> { payload: T // Parsed JSON payload headers: Headers signature: string | null rawBody: ArrayBuffer verified: boolean method: string path: string}Process incoming emails:
// app/triggers/support-inbox.tsimport { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'email', }, async handle(event, ctx) { const text = await event.text() console.log(`Email from ${event.from}: ${event.subject}`)
await createSupportTicket({ from: event.from, subject: event.subject, body: text, }, ctx.env.DB) },})Email Event Properties
Section titled “Email Event Properties”interface EmailEvent { from: string to: string subject: string rawEmail: ReadableStream text(): Promise<string> html(): Promise<string | null>}D1 (Database)
Section titled “D1 (Database)”React to database changes:
// app/triggers/sync-search.tsimport { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'd1', database: 'main', table: 'products', events: ['insert', 'update'], }, async handle(event, ctx) { console.log(`${event.type} on ${event.table}`, event.primaryKey)
if (event.newValues) { await updateSearchIndex(event.primaryKey, event.newValues, ctx.env) } },})D1 Event Types
Section titled “D1 Event Types”| Event | Description |
|---|---|
insert | Row inserted |
update | Row updated |
delete | Row deleted |
D1 Event Properties
Section titled “D1 Event Properties”interface D1Event { type: 'insert' | 'update' | 'delete' database: string table: string primaryKey: Record<string, unknown> newValues?: Record<string, unknown> // Insert/update oldValues?: Record<string, unknown> // Update/delete}Consume logs from other Workers:
// app/triggers/error-alerter.tsimport { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'tail', consumers: ['api-worker', 'background-worker'], }, async handle(event, ctx) { for (const log of event.logs) { if (log.level === 'error') { await notifyOncall({ worker: event.worker, message: log.message, timestamp: log.timestamp, }) } } },})Error Handling & Retry
Section titled “Error Handling & Retry”Retry Configuration
Section titled “Retry Configuration”Configure retry behavior for failed triggers:
export default defineTrigger({ source: { type: 'scheduled', cron: '0 * * * *' }, retry: { maxAttempts: 5, // 0-100, default: 3 delay: '30s', // Default: '1m' backoff: 'exponential', // 'linear' | 'exponential' }, async handle(event, ctx) { await unreliableOperation() },})Error Handler
Section titled “Error Handler”Define custom error handling:
export default defineTrigger({ source: { type: 'r2', bucket: 'uploads', events: ['object-create'] }, async handle(event, ctx) { await processFile(event.key) }, async onError(error, event, ctx) { console.error(`Failed to process ${event.key}:`, error.message) await logToMonitoring({ trigger: 'processUploads', error: error.message, traceId: ctx.traceId, }) },})Prevent Retry
Section titled “Prevent Retry”For non-retryable errors in scheduled triggers:
export default defineTrigger({ source: { type: 'scheduled', cron: '0 0 * * *' }, async handle(event, ctx) { try { await processData() } catch (error) { if (isPermanentError(error)) { event.noRetry() // Don't retry } throw error } },})Trigger Chaining
Section titled “Trigger Chaining”Chain triggers together using emit():
// app/triggers/process-upload.tsimport { defineTrigger, emit } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'r2', bucket: 'uploads', events: ['object-create'] }, async handle(event, ctx) { const metadata = await extractMetadata(event.key)
// Chain to other triggers await emit('indexForSearch', { key: event.key, metadata }) await emit('generateThumbnail', { key: event.key }) await emit('notifyUser', { key: event.key }, { delay: 5000 })
// Trace ID is automatically propagated console.log(`All emissions use traceId: ${ctx.traceId}`) },})Emit Multiple
Section titled “Emit Multiple”import { emitMany } from '@cloudwerk/trigger'
await emitMany([ ['processImage', { key: 'image.jpg' }], ['updateIndex', { key: 'image.jpg' }], ['notifyUser', { userId: 123 }],])Trigger Context
Section titled “Trigger Context”Every trigger handler receives a context object:
interface TriggerContext { traceId: string // Unique trace ID for distributed tracing waitUntil(promise: Promise<unknown>): void // Extend execution lifetime passThroughOnException(): void // Allow pass-through on error env: Record<string, unknown> // Cloudflare bindings}Using Context
Section titled “Using Context”export default defineTrigger({ source: { type: 'queue', queue: 'tasks' }, async handle(batch, ctx) { const db = ctx.env.DB as D1Database
for (const msg of batch.messages) { // Background logging (doesn't block response) ctx.waitUntil( db.prepare('INSERT INTO logs (trace_id, event) VALUES (?, ?)') .bind(ctx.traceId, JSON.stringify(msg.body)) .run() ) msg.ack() } },})Observability
Section titled “Observability”Trace IDs
Section titled “Trace IDs”Every trigger execution has a unique trace ID that propagates through chained triggers:
ctx.traceId // e.g., 'tr_a1b2c3d4e5f6g7h8'
// Child triggers get derived IDs:// 'tr_a1b2c3d4e5f6g7h8.12345678'Execution Metrics
Section titled “Execution Metrics”import { ExecutionTimer } from '@cloudwerk/trigger'
export default defineTrigger({ source: { type: 'queue', queue: 'tasks' }, async handle(batch, ctx) { const timer = new ExecutionTimer()
timer.mark('processing-start') for (const msg of batch.messages) { await processMessage(msg) } timer.mark('processing-end')
const duration = timer.stop() console.log(`Processed batch in ${duration}ms`, timer.getMarks()) },})Testing
Section titled “Testing”Test Utilities
Section titled “Test Utilities”import { describe, it, expect } from 'vitest'import { testTrigger, mockEvent } from '@cloudwerk/trigger/testing'import processUploads from '../app/triggers/process-uploads'
describe('process-uploads', () => { it('processes R2 uploads', async () => { const result = await testTrigger(processUploads, { event: mockEvent.r2({ type: 'object-create', bucket: 'uploads', key: 'test.pdf', size: 1024, }), bindings: { DB: mockD1(), R2: mockR2(), }, })
expect(result.success).toBe(true) })
it('handles errors gracefully', async () => { const result = await testTrigger(processUploads, { event: mockEvent.r2({ type: 'object-create', bucket: 'uploads', key: 'test.pdf', }), bindings: { DB: failingMock() }, })
expect(result.success).toBe(false) expect(result.errorHandlerCalled).toBe(true) })})Mock Event Factories
Section titled “Mock Event Factories”import { mockEvent } from '@cloudwerk/trigger/testing'
mockEvent.scheduled({ cron: '0 0 * * *' })mockEvent.queue({ messages: [{ body: { type: 'email' } }] })mockEvent.r2({ type: 'object-create', bucket: 'uploads', key: 'file.pdf' })mockEvent.webhook({ payload: { event: 'created' } })Test Harness
Section titled “Test Harness”For testing trigger chains:
import { TriggerTestHarness } from '@cloudwerk/trigger/testing'
describe('Upload workflow', () => { it('chains triggers correctly', async () => { const harness = new TriggerTestHarness({ bindings: { DB: mockD1() }, })
harness.register('processUploads', processUploads) harness.register('generateThumbnail', generateThumbnail)
await harness.emit('processUploads', mockEvent.r2({ key: 'photo.jpg' }))
expect(harness.completed).toHaveLength(1) })})Common Patterns
Section titled “Common Patterns”Database Cleanup
Section titled “Database Cleanup”export default defineTrigger({ source: { type: 'scheduled', cron: '0 3 * * *' }, // Daily at 3 AM async handle(event, ctx) { const db = ctx.env.DB
// Delete expired sessions await db.prepare(` DELETE FROM sessions WHERE expires_at < datetime('now') `).run()
// Clean old logs await db.prepare(` DELETE FROM audit_logs WHERE created_at < datetime('now', '-30 days') `).run() },})Image Processing Pipeline
Section titled “Image Processing Pipeline”// app/triggers/process-image.tsexport default defineTrigger({ source: { type: 'r2', bucket: 'uploads', events: ['object-create'], prefix: 'images/', }, async handle(event, ctx) { if (!event.key.match(/\.(jpg|png|webp)$/)) { return // Not an image }
// Chain to specialized triggers await emit('generateThumbnail', { key: event.key, sizes: ['sm', 'md', 'lg'] }) await emit('extractMetadata', { key: event.key }) await emit('updateDatabase', { key: event.key, size: event.size }) },})Multi-Provider Webhook Handler
Section titled “Multi-Provider Webhook Handler”// app/triggers/payment-webhook.tsexport default defineTrigger({ source: { type: 'webhook', path: '/webhooks/payments', methods: ['POST'], }, async handle(event, ctx) { const provider = event.headers.get('x-payment-provider')
switch (provider) { case 'stripe': await handleStripeEvent(event.payload) break case 'paypal': await handlePayPalEvent(event.payload) break default: console.warn('Unknown payment provider:', provider) } },})Best Practices
Section titled “Best Practices”Limits
Section titled “Limits”| Limit | Value |
|---|---|
| Max cron triggers | 3 per Worker |
| Min cron interval | 1 minute |
| Max execution time | 30s (free), 15m (paid) |
| Default timeout | 30 seconds |
| Max timeout | 10 minutes |
Next Steps
Section titled “Next Steps”- Triggers API Reference - Complete API documentation
- Queues Guide - Background job processing
- Durable Objects - Stateful edge computing