Skip to content

Cloudwerk provides a powerful event-driven trigger system through @cloudwerk/trigger. Define handlers for 7 different event sources with automatic configuration, type safety, built-in webhook verification, and observability.

Triggers are event handlers that respond to various sources in your Cloudflare infrastructure. Each trigger type maps to a specific Cloudflare feature:

Trigger TypeCloudflare FeatureUse Case
scheduledCron TriggersPeriodic tasks (cleanup, reports, syncs)
queueQueue ConsumerProcess async messages
r2R2 Event NotificationsReact to file uploads/deletions
webhookHTTP RouteReceive external webhooks
emailEmail WorkersProcess incoming emails
d1D1 (via triggers)React to database changes
tailTail WorkersProcess logs from other Workers

Triggers can either execute code directly or hand off work to a queue:

Direct Execution — Handle the event immediately in the trigger handler:

export default defineTrigger({
source: { type: 'r2', bucket: 'uploads', events: ['object-create'] },
async handle(event, ctx) {
await processImage(event.key)
}
})

Queue Handoff — Send a message to a queue for async processing:

export default defineTrigger({
source: { type: 'r2', bucket: 'uploads', events: ['object-create'] },
async handle(event, ctx) {
await ctx.env.IMAGE_QUEUE.send({
key: event.key,
action: event.type,
})
}
})

Choose based on your use case: direct execution for simple, fast operations; queue handoff when you need batching, retries, or want to decouple processing from the trigger.

  1. Create a trigger file:

    // app/triggers/daily-cleanup.ts
    import { defineTrigger } from '@cloudwerk/trigger'
    export default defineTrigger({
    source: { type: 'scheduled', cron: '0 0 * * *' },
    async handle(event, ctx) {
    console.log(`[${ctx.traceId}] Running cleanup...`)
    await cleanupExpiredSessions(ctx.env.DB)
    },
    })
  2. Triggers are automatically discovered from app/triggers/ and registered during build.

  3. Deploy and your trigger runs on schedule.

Triggers are defined in the app/triggers/ directory:

  • Directoryapp/triggers/
    • daily-cleanup.ts # Scheduled trigger
    • process-uploads.ts # R2 trigger
    • stripe-webhook.ts # Webhook trigger
    • email-handler.ts # Email trigger
    • sync-search.ts # D1 trigger

Naming Convention:

  • File names are kebab-case → converted to camelCase trigger names
  • daily-cleanup.ts → trigger name dailyCleanup
  • process-uploads.ts → trigger name processUploads

Run tasks on a schedule using cron expressions:

// app/triggers/daily-report.ts
import { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({
source: { type: 'scheduled', cron: '0 9 * * 1-5' }, // Weekdays at 9 AM UTC
async handle(event, ctx) {
console.log(`Scheduled time: ${event.scheduledTime}`)
await generateDailyReport(ctx.env.DB)
},
})
┌───────────── minute (0 - 59)
│ ┌───────────── hour (0 - 23)
│ │ ┌───────────── day of month (1 - 31)
│ │ │ ┌───────────── month (1 - 12)
│ │ │ │ ┌───────────── day of week (0 - 6, Sunday = 0)
│ │ │ │ │
* * * * *
PatternDescription
* * * * *Every minute
*/5 * * * *Every 5 minutes
0 * * * *Every hour
0 0 * * *Daily at midnight UTC
0 9 * * 1-5Weekdays at 9 AM UTC
0 0 1 * *Monthly on the 1st

Process messages from Cloudflare Queues:

// app/triggers/process-orders.ts
import { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({
source: {
type: 'queue',
queue: 'orders',
batch: { size: 20, timeout: '10s' },
},
async handle(batch, ctx) {
for (const message of batch.messages) {
try {
await processOrder(message.body, ctx.env)
message.ack()
} catch (error) {
message.retry({ delaySeconds: 60 })
}
}
},
})
OptionDefaultDescription
size10Max messages per batch (1-100)
timeout’5s’Max wait time to fill batch
message.ack() // Mark as processed
message.retry() // Retry with default delay
message.retry({ delaySeconds: 60 }) // Retry with custom delay

React to object creation and deletion in R2 buckets:

// app/triggers/process-uploads.ts
import { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({
source: {
type: 'r2',
bucket: 'uploads',
events: ['object-create'],
prefix: 'images/',
suffix: '.jpg',
},
async handle(event, ctx) {
console.log(`New image: ${event.key} (${event.size} bytes)`)
await generateThumbnail(event.key, ctx.env.R2)
},
})
EventDescription
object-createObject created or updated
object-deleteObject deleted
interface R2Event {
type: 'object-create' | 'object-delete'
bucket: string
key: string
etag?: string // Create events only
size?: number // Create events only
uploadedAt?: Date // Create events only
}

Receive and verify incoming webhooks:

// app/triggers/stripe-webhook.ts
import { defineTrigger, verifiers } from '@cloudwerk/trigger'
export default defineTrigger({
source: {
type: 'webhook',
path: '/webhooks/stripe',
verify: verifiers.stripe(process.env.STRIPE_WEBHOOK_SECRET!),
},
async handle(event, ctx) {
if (!event.verified) {
throw new Error('Signature verification failed')
}
switch (event.payload.type) {
case 'checkout.session.completed':
await handleCheckoutComplete(event.payload.data)
break
case 'invoice.paid':
await handleInvoicePaid(event.payload.data)
break
}
},
})

Cloudwerk includes signature verifiers for popular services:

import { verifiers } from '@cloudwerk/trigger'
verifiers.stripe(secret, {
tolerance: 300, // Timestamp tolerance in seconds
})

Additional verifiers: verifiers.twilio(), verifiers.linear()

Create a custom HMAC signature verifier:

import { verifiers } from '@cloudwerk/trigger'
verifiers.custom(secret, {
header: 'x-webhook-signature',
algorithm: 'SHA-256', // 'SHA-256' | 'SHA-1' | 'SHA-512'
encoding: 'hex', // 'hex' | 'base64'
prefix: 'sha256=', // Optional prefix to strip
timestampHeader: 'x-timestamp',
timestampTolerance: 300,
buildSignatureBase: (body, timestamp) => `${timestamp}.${body}`,
})
interface WebhookEvent<T = unknown> {
payload: T // Parsed JSON payload
headers: Headers
signature: string | null
rawBody: ArrayBuffer
verified: boolean
method: string
path: string
}

Process incoming emails:

// app/triggers/support-inbox.ts
import { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({
source: {
type: 'email',
address: '[email protected]',
},
async handle(event, ctx) {
const text = await event.text()
console.log(`Email from ${event.from}: ${event.subject}`)
await createSupportTicket({
from: event.from,
subject: event.subject,
body: text,
}, ctx.env.DB)
},
})
interface EmailEvent {
from: string
to: string
subject: string
rawEmail: ReadableStream
text(): Promise<string>
html(): Promise<string | null>
}

React to database changes:

// app/triggers/sync-search.ts
import { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({
source: {
type: 'd1',
database: 'main',
table: 'products',
events: ['insert', 'update'],
},
async handle(event, ctx) {
console.log(`${event.type} on ${event.table}`, event.primaryKey)
if (event.newValues) {
await updateSearchIndex(event.primaryKey, event.newValues, ctx.env)
}
},
})
EventDescription
insertRow inserted
updateRow updated
deleteRow deleted
interface D1Event {
type: 'insert' | 'update' | 'delete'
database: string
table: string
primaryKey: Record<string, unknown>
newValues?: Record<string, unknown> // Insert/update
oldValues?: Record<string, unknown> // Update/delete
}

Consume logs from other Workers:

// app/triggers/error-alerter.ts
import { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({
source: {
type: 'tail',
consumers: ['api-worker', 'background-worker'],
},
async handle(event, ctx) {
for (const log of event.logs) {
if (log.level === 'error') {
await notifyOncall({
worker: event.worker,
message: log.message,
timestamp: log.timestamp,
})
}
}
},
})

Configure retry behavior for failed triggers:

export default defineTrigger({
source: { type: 'scheduled', cron: '0 * * * *' },
retry: {
maxAttempts: 5, // 0-100, default: 3
delay: '30s', // Default: '1m'
backoff: 'exponential', // 'linear' | 'exponential'
},
async handle(event, ctx) {
await unreliableOperation()
},
})

Define custom error handling:

export default defineTrigger({
source: { type: 'r2', bucket: 'uploads', events: ['object-create'] },
async handle(event, ctx) {
await processFile(event.key)
},
async onError(error, event, ctx) {
console.error(`Failed to process ${event.key}:`, error.message)
await logToMonitoring({
trigger: 'processUploads',
error: error.message,
traceId: ctx.traceId,
})
},
})

For non-retryable errors in scheduled triggers:

export default defineTrigger({
source: { type: 'scheduled', cron: '0 0 * * *' },
async handle(event, ctx) {
try {
await processData()
} catch (error) {
if (isPermanentError(error)) {
event.noRetry() // Don't retry
}
throw error
}
},
})

Chain triggers together using emit():

// app/triggers/process-upload.ts
import { defineTrigger, emit } from '@cloudwerk/trigger'
export default defineTrigger({
source: { type: 'r2', bucket: 'uploads', events: ['object-create'] },
async handle(event, ctx) {
const metadata = await extractMetadata(event.key)
// Chain to other triggers
await emit('indexForSearch', { key: event.key, metadata })
await emit('generateThumbnail', { key: event.key })
await emit('notifyUser', { key: event.key }, { delay: 5000 })
// Trace ID is automatically propagated
console.log(`All emissions use traceId: ${ctx.traceId}`)
},
})
import { emitMany } from '@cloudwerk/trigger'
await emitMany([
['processImage', { key: 'image.jpg' }],
['updateIndex', { key: 'image.jpg' }],
['notifyUser', { userId: 123 }],
])

Every trigger handler receives a context object:

interface TriggerContext {
traceId: string // Unique trace ID for distributed tracing
waitUntil(promise: Promise<unknown>): void // Extend execution lifetime
passThroughOnException(): void // Allow pass-through on error
env: Record<string, unknown> // Cloudflare bindings
}
export default defineTrigger({
source: { type: 'queue', queue: 'tasks' },
async handle(batch, ctx) {
const db = ctx.env.DB as D1Database
for (const msg of batch.messages) {
// Background logging (doesn't block response)
ctx.waitUntil(
db.prepare('INSERT INTO logs (trace_id, event) VALUES (?, ?)')
.bind(ctx.traceId, JSON.stringify(msg.body))
.run()
)
msg.ack()
}
},
})

Every trigger execution has a unique trace ID that propagates through chained triggers:

ctx.traceId // e.g., 'tr_a1b2c3d4e5f6g7h8'
// Child triggers get derived IDs:
// 'tr_a1b2c3d4e5f6g7h8.12345678'
import { ExecutionTimer } from '@cloudwerk/trigger'
export default defineTrigger({
source: { type: 'queue', queue: 'tasks' },
async handle(batch, ctx) {
const timer = new ExecutionTimer()
timer.mark('processing-start')
for (const msg of batch.messages) {
await processMessage(msg)
}
timer.mark('processing-end')
const duration = timer.stop()
console.log(`Processed batch in ${duration}ms`, timer.getMarks())
},
})
import { describe, it, expect } from 'vitest'
import { testTrigger, mockEvent } from '@cloudwerk/trigger/testing'
import processUploads from '../app/triggers/process-uploads'
describe('process-uploads', () => {
it('processes R2 uploads', async () => {
const result = await testTrigger(processUploads, {
event: mockEvent.r2({
type: 'object-create',
bucket: 'uploads',
key: 'test.pdf',
size: 1024,
}),
bindings: {
DB: mockD1(),
R2: mockR2(),
},
})
expect(result.success).toBe(true)
})
it('handles errors gracefully', async () => {
const result = await testTrigger(processUploads, {
event: mockEvent.r2({
type: 'object-create',
bucket: 'uploads',
key: 'test.pdf',
}),
bindings: { DB: failingMock() },
})
expect(result.success).toBe(false)
expect(result.errorHandlerCalled).toBe(true)
})
})
import { mockEvent } from '@cloudwerk/trigger/testing'
mockEvent.scheduled({ cron: '0 0 * * *' })
mockEvent.queue({ messages: [{ body: { type: 'email' } }] })
mockEvent.r2({ type: 'object-create', bucket: 'uploads', key: 'file.pdf' })
mockEvent.webhook({ payload: { event: 'created' } })
mockEvent.email({ from: '[email protected]', subject: 'Hello' })

For testing trigger chains:

import { TriggerTestHarness } from '@cloudwerk/trigger/testing'
describe('Upload workflow', () => {
it('chains triggers correctly', async () => {
const harness = new TriggerTestHarness({
bindings: { DB: mockD1() },
})
harness.register('processUploads', processUploads)
harness.register('generateThumbnail', generateThumbnail)
await harness.emit('processUploads', mockEvent.r2({ key: 'photo.jpg' }))
expect(harness.completed).toHaveLength(1)
})
})
export default defineTrigger({
source: { type: 'scheduled', cron: '0 3 * * *' }, // Daily at 3 AM
async handle(event, ctx) {
const db = ctx.env.DB
// Delete expired sessions
await db.prepare(`
DELETE FROM sessions WHERE expires_at < datetime('now')
`).run()
// Clean old logs
await db.prepare(`
DELETE FROM audit_logs WHERE created_at < datetime('now', '-30 days')
`).run()
},
})
// app/triggers/process-image.ts
export default defineTrigger({
source: {
type: 'r2',
bucket: 'uploads',
events: ['object-create'],
prefix: 'images/',
},
async handle(event, ctx) {
if (!event.key.match(/\.(jpg|png|webp)$/)) {
return // Not an image
}
// Chain to specialized triggers
await emit('generateThumbnail', { key: event.key, sizes: ['sm', 'md', 'lg'] })
await emit('extractMetadata', { key: event.key })
await emit('updateDatabase', { key: event.key, size: event.size })
},
})
// app/triggers/payment-webhook.ts
export default defineTrigger({
source: {
type: 'webhook',
path: '/webhooks/payments',
methods: ['POST'],
},
async handle(event, ctx) {
const provider = event.headers.get('x-payment-provider')
switch (provider) {
case 'stripe':
await handleStripeEvent(event.payload)
break
case 'paypal':
await handlePayPalEvent(event.payload)
break
default:
console.warn('Unknown payment provider:', provider)
}
},
})
LimitValue
Max cron triggers3 per Worker
Min cron interval1 minute
Max execution time30s (free), 15m (paid)
Default timeout30 seconds
Max timeout10 minutes