Skip to content

Triggers API

The @cloudwerk/trigger package provides a unified API for handling all Cloudflare event types through the defineTrigger() factory function.

Terminal window
pnpm add @cloudwerk/trigger

Creates a trigger handler for a specific event source.

import { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({
source: TriggerSource,
handle: (event: TriggerEvent, ctx: TriggerContext) => Awaitable<void>,
config?: TriggerConfig,
})
ParameterTypeDescription
sourceTriggerSourceEvent source configuration (see source types below)
handleFunctionAsync handler function receiving event and context
configTriggerConfigOptional configuration (retries, timeout, etc.)

Returns a TriggerDefinition object that Cloudwerk registers automatically.

Cron-based scheduled execution.

interface ScheduledSource {
type: 'scheduled'
cron: string // Cron expression
timezone?: string // IANA timezone (default: 'UTC')
}

Example:

export default defineTrigger({
source: {
type: 'scheduled',
cron: '0 9 * * 1-5', // Weekdays at 9 AM
timezone: 'America/New_York',
},
async handle(event, ctx) {
console.log(`Scheduled at: ${event.scheduledTime}`)
},
})

Event Properties:

PropertyTypeDescription
cronstringThe cron expression that triggered the event
scheduledTimeDateScheduled execution time

Queue message consumption.

interface QueueSource {
type: 'queue'
name: string // Queue name
batchSize?: number // Max messages per batch (1-100, default: 10)
batchTimeout?: string // Max wait time ('5s', '1m', etc.)
retryDelay?: string // Delay between retries (default: '1m')
deadLetterQueue?: string // DLQ for failed messages
}

Example:

export default defineTrigger({
source: {
type: 'queue',
name: 'notifications',
batchSize: 25,
deadLetterQueue: 'notifications-dlq',
},
async handle(event, ctx) {
for (const message of event.messages) {
await processMessage(message.body)
message.ack()
}
},
})

Event Properties:

PropertyTypeDescription
queuestringQueue name
messagesQueueMessage[]Array of messages

QueueMessage:

PropertyTypeDescription
idstringUnique message ID
bodyunknownMessage payload
timestampDateWhen message was sent
attemptsnumberDelivery attempt count
ack()FunctionAcknowledge successful processing
retry(options?)FunctionRequest retry with optional delay
deadLetter(reason?)FunctionSend to dead letter queue

R2 object storage events.

interface R2Source {
type: 'r2'
bucket: string // R2 bucket name
events: R2EventType[] // Event types to listen for
prefix?: string // Key prefix filter
suffix?: string // Key suffix filter
}
type R2EventType =
| 'object:create'
| 'object:delete'
| 'object:create:put'
| 'object:create:copy'
| 'object:create:multipart'

Example:

export default defineTrigger({
source: {
type: 'r2',
bucket: 'uploads',
events: ['object:create'],
prefix: 'images/',
suffix: '.jpg',
},
async handle(event, ctx) {
const { key, size, etag } = event.object
await generateThumbnail(key)
},
})

Event Properties:

PropertyTypeDescription
bucketstringBucket name
actionstringEvent action (‘create’ or ‘delete’)
objectR2EventObjectObject metadata

R2EventObject:

PropertyTypeDescription
keystringObject key
sizenumberObject size in bytes
etagstringObject ETag
uploadedAtDateUpload timestamp

HTTP webhook events with signature verification.

interface WebhookSource {
type: 'webhook'
path: string // Webhook endpoint path
methods?: HttpMethod[] // Allowed HTTP methods (default: ['POST'])
verifier?: WebhookVerifier // Built-in or custom verifier
}
type HttpMethod = 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE'

Example with Built-in Verifier:

import { stripeVerifier } from '@cloudwerk/trigger/verifiers'
export default defineTrigger({
source: {
type: 'webhook',
path: '/webhooks/stripe',
verifier: stripeVerifier({
secret: process.env.STRIPE_WEBHOOK_SECRET,
}),
},
async handle(event, ctx) {
const stripeEvent = event.payload
if (stripeEvent.type === 'checkout.session.completed') {
await fulfillOrder(stripeEvent.data.object)
}
},
})

Event Properties:

PropertyTypeDescription
requestRequestOriginal HTTP request
payloadunknownParsed and verified payload
headersHeadersRequest headers
verifiedbooleanWhether signature was verified

Incoming email events.

interface EmailSource {
type: 'email'
address?: string // Filter by recipient address
domain?: string // Filter by recipient domain
}

Example:

export default defineTrigger({
source: {
type: 'email',
domain: 'support.myapp.com',
},
async handle(event, ctx) {
const { from, to, subject, text, html } = event
await createSupportTicket({ from, subject, body: text || html })
},
})

Event Properties:

PropertyTypeDescription
fromstringSender email address
tostringRecipient email address
subjectstringEmail subject
textstring | nullPlain text body
htmlstring | nullHTML body
attachmentsEmailAttachment[]File attachments
headersMap<string, string>Email headers
rawReadableStreamRaw email stream

Database change events.

interface D1Source {
type: 'd1'
database: string // D1 database binding name
tables?: string[] // Tables to watch (all if omitted)
operations?: D1Operation[] // Operations to watch (all if omitted)
}
type D1Operation = 'INSERT' | 'UPDATE' | 'DELETE'

Example:

export default defineTrigger({
source: {
type: 'd1',
database: 'DB',
tables: ['users', 'orders'],
operations: ['INSERT', 'UPDATE'],
},
async handle(event, ctx) {
const { table, operation, newRow, oldRow } = event
await syncToAnalytics(table, operation, newRow)
},
})

Event Properties:

PropertyTypeDescription
databasestringDatabase binding name
tablestringAffected table name
operationD1OperationINSERT, UPDATE, or DELETE
newRowRecord<string, unknown> | nullNew row data (INSERT/UPDATE)
oldRowRecord<string, unknown> | nullPrevious row data (UPDATE/DELETE)
primaryKeyunknownPrimary key value

Log consumption from other Workers.

interface TailSource {
type: 'tail'
workers: string[] // Worker names to consume logs from
filters?: TailFilter[] // Optional log filters
}
interface TailFilter {
field: 'outcome' | 'status' | 'method' | 'samplingRate'
value: string | number
operator?: 'equals' | 'contains' | 'startsWith'
}

Example:

export default defineTrigger({
source: {
type: 'tail',
workers: ['api-worker', 'auth-worker'],
filters: [
{ field: 'outcome', value: 'exception' },
],
},
async handle(event, ctx) {
for (const log of event.logs) {
if (log.outcome === 'exception') {
await alertOnCall(log)
}
}
},
})

Event Properties:

PropertyTypeDescription
logsTailLog[]Array of log entries
workerstringSource worker name

The context object passed to all trigger handlers.

interface TriggerContext {
env: Env // Environment bindings
traceId: string // Unique trace ID
attempt: number // Current retry attempt (1-based)
waitUntil(promise: Promise<unknown>): void // Extend execution lifetime
emit(triggerName: string, payload: unknown): Promise<void> // Chain triggers
noRetry(): void // Prevent automatic retry
}

Extend the handler’s lifetime for background work.

async handle(event, ctx) {
await criticalWork()
// Non-critical work runs after handler completes
ctx.waitUntil(sendAnalytics(event))
ctx.waitUntil(updateMetrics())
}

Chain to another trigger, preserving trace ID.

async handle(event, ctx) {
await processOrder(event.payload)
// Trigger downstream processing
await ctx.emit('send-confirmation', {
orderId: event.payload.orderId,
})
}

Prevent automatic retry on failure.

async handle(event, ctx) {
try {
await processEvent(event)
} catch (error) {
if (isPermanentError(error)) {
ctx.noRetry() // Don't retry permanent failures
await logPermanentFailure(error)
}
throw error
}
}

Optional configuration for trigger behavior.

interface TriggerConfig {
retries?: number // Max retry attempts (default: 3)
retryDelay?: string // Delay between retries (default: '1m')
timeout?: string // Handler timeout (default: '30s')
observability?: {
metrics?: boolean // Enable metrics (default: true)
tracing?: boolean // Enable tracing (default: true)
}
}

Example:

export default defineTrigger({
source: { type: 'scheduled', cron: '0 * * * *' },
config: {
retries: 5,
retryDelay: '2m',
timeout: '5m',
observability: {
metrics: true,
tracing: true,
},
},
async handle(event, ctx) {
// Handler with custom retry behavior
},
})

Built-in signature verifiers for popular services.

import { stripeVerifier } from '@cloudwerk/trigger/verifiers'
stripeVerifier({
secret: string, // Webhook signing secret
tolerance?: number, // Timestamp tolerance in seconds (default: 300)
})
import { githubVerifier } from '@cloudwerk/trigger/verifiers'
githubVerifier({
secret: string, // Webhook secret
events?: string[], // Allowed event types (optional)
})
import { slackVerifier } from '@cloudwerk/trigger/verifiers'
slackVerifier({
signingSecret: string, // Slack signing secret
tolerance?: number, // Timestamp tolerance (default: 300)
})
import { shopifyVerifier } from '@cloudwerk/trigger/verifiers'
shopifyVerifier({
secret: string, // Webhook secret
})
import { linearVerifier } from '@cloudwerk/trigger/verifiers'
linearVerifier({
secret: string, // Webhook signing secret
})

Create a custom HMAC verifier for any service.

import { customHmacVerifier } from '@cloudwerk/trigger/verifiers'
customHmacVerifier({
secret: string,
algorithm: 'sha256' | 'sha1' | 'sha512',
header: string, // Header containing signature
prefix?: string, // Signature prefix (e.g., 'sha256=')
encoding?: 'hex' | 'base64', // Signature encoding (default: 'hex')
getPayload?: (request: Request) => Promise<string>, // Custom payload extraction
})

Example:

const myVerifier = customHmacVerifier({
secret: process.env.WEBHOOK_SECRET,
algorithm: 'sha256',
header: 'X-Custom-Signature',
prefix: 'sha256=',
})

Mock event factories for testing triggers.

import {
mockScheduledEvent,
mockQueueEvent,
mockR2Event,
mockWebhookEvent,
mockEmailEvent,
mockD1Event,
mockTailEvent,
mockTriggerContext,
} from '@cloudwerk/trigger/testing'
const event = mockScheduledEvent({
cron: '0 * * * *',
scheduledTime: new Date('2024-01-15T10:00:00Z'),
})
const event = mockQueueEvent({
queue: 'notifications',
messages: [
{ id: '1', body: { userId: 'user_123' }, attempts: 1 },
{ id: '2', body: { userId: 'user_456' }, attempts: 1 },
],
})
const ctx = mockTriggerContext({
env: mockEnv,
traceId: 'test-trace-123',
})
import { describe, it, expect, vi } from 'vitest'
import { mockScheduledEvent, mockTriggerContext } from '@cloudwerk/trigger/testing'
import trigger from './daily-cleanup'
describe('daily-cleanup trigger', () => {
it('should clean up expired records', async () => {
const event = mockScheduledEvent({ cron: '0 0 * * *' })
const ctx = mockTriggerContext({
env: { DB: mockD1Database() },
})
await trigger.handle(event, ctx)
expect(ctx.env.DB.prepare).toHaveBeenCalledWith(
expect.stringContaining('DELETE FROM')
)
})
})

Base error class for trigger failures.

import { TriggerError } from '@cloudwerk/trigger'
class TriggerError extends Error {
readonly code: string
readonly retryable: boolean
readonly context?: Record<string, unknown>
}

Thrown when webhook signature verification fails.

import { WebhookVerificationError } from '@cloudwerk/trigger'
try {
await verifySignature(request)
} catch (error) {
if (error instanceof WebhookVerificationError) {
console.error('Invalid signature:', error.message)
return new Response('Unauthorized', { status: 401 })
}
}

Thrown when handler exceeds configured timeout.

import { TriggerTimeoutError } from '@cloudwerk/trigger'

Terminal window
cloudwerk triggers list
Terminal window
cloudwerk triggers info <name>
Terminal window
cloudwerk triggers test <name> --event '{"key": "value"}'
Terminal window
cloudwerk triggers generate-types

// Trigger definition
interface TriggerDefinition {
source: TriggerSource
handle: TriggerHandler
config?: TriggerConfig
}
// Handler signature
type TriggerHandler<E = unknown> = (
event: E,
ctx: TriggerContext
) => Awaitable<void>
// Awaitable helper
type Awaitable<T> = T | Promise<T>
// Union of all source types
type TriggerSource =
| ScheduledSource
| QueueSource
| R2Source
| WebhookSource
| EmailSource
| D1Source
| TailSource
// Union of all event types
type TriggerEvent =
| ScheduledEvent
| QueueEvent
| R2Event
| WebhookEvent
| EmailEvent
| D1Event
| TailEvent

LimitValue
Maximum cron triggers per Worker3
Minimum cron interval1 minute
Maximum handler timeout (free)30 seconds
Maximum handler timeout (paid)15 minutes
Maximum queue batch size100 messages
Maximum webhook payload100 MB