Triggers API
The @cloudwerk/trigger package provides a unified API for handling all Cloudflare event types through the defineTrigger() factory function.
Installation
Section titled “Installation”pnpm add @cloudwerk/triggerdefineTrigger()
Section titled “defineTrigger()”Creates a trigger handler for a specific event source.
import { defineTrigger } from '@cloudwerk/trigger'
export default defineTrigger({ source: TriggerSource, handle: (event: TriggerEvent, ctx: TriggerContext) => Awaitable<void>, config?: TriggerConfig,})Parameters
Section titled “Parameters”| Parameter | Type | Description |
|---|---|---|
source | TriggerSource | Event source configuration (see source types below) |
handle | Function | Async handler function receiving event and context |
config | TriggerConfig | Optional configuration (retries, timeout, etc.) |
Returns
Section titled “Returns”Returns a TriggerDefinition object that Cloudwerk registers automatically.
Trigger Source Types
Section titled “Trigger Source Types”ScheduledSource
Section titled “ScheduledSource”Cron-based scheduled execution.
interface ScheduledSource { type: 'scheduled' cron: string // Cron expression timezone?: string // IANA timezone (default: 'UTC')}Example:
export default defineTrigger({ source: { type: 'scheduled', cron: '0 9 * * 1-5', // Weekdays at 9 AM timezone: 'America/New_York', }, async handle(event, ctx) { console.log(`Scheduled at: ${event.scheduledTime}`) },})Event Properties:
| Property | Type | Description |
|---|---|---|
cron | string | The cron expression that triggered the event |
scheduledTime | Date | Scheduled execution time |
QueueSource
Section titled “QueueSource”Queue message consumption.
interface QueueSource { type: 'queue' name: string // Queue name batchSize?: number // Max messages per batch (1-100, default: 10) batchTimeout?: string // Max wait time ('5s', '1m', etc.) retryDelay?: string // Delay between retries (default: '1m') deadLetterQueue?: string // DLQ for failed messages}Example:
export default defineTrigger({ source: { type: 'queue', name: 'notifications', batchSize: 25, deadLetterQueue: 'notifications-dlq', }, async handle(event, ctx) { for (const message of event.messages) { await processMessage(message.body) message.ack() } },})Event Properties:
| Property | Type | Description |
|---|---|---|
queue | string | Queue name |
messages | QueueMessage[] | Array of messages |
QueueMessage:
| Property | Type | Description |
|---|---|---|
id | string | Unique message ID |
body | unknown | Message payload |
timestamp | Date | When message was sent |
attempts | number | Delivery attempt count |
ack() | Function | Acknowledge successful processing |
retry(options?) | Function | Request retry with optional delay |
deadLetter(reason?) | Function | Send to dead letter queue |
R2Source
Section titled “R2Source”R2 object storage events.
interface R2Source { type: 'r2' bucket: string // R2 bucket name events: R2EventType[] // Event types to listen for prefix?: string // Key prefix filter suffix?: string // Key suffix filter}
type R2EventType = | 'object:create' | 'object:delete' | 'object:create:put' | 'object:create:copy' | 'object:create:multipart'Example:
export default defineTrigger({ source: { type: 'r2', bucket: 'uploads', events: ['object:create'], prefix: 'images/', suffix: '.jpg', }, async handle(event, ctx) { const { key, size, etag } = event.object await generateThumbnail(key) },})Event Properties:
| Property | Type | Description |
|---|---|---|
bucket | string | Bucket name |
action | string | Event action (‘create’ or ‘delete’) |
object | R2EventObject | Object metadata |
R2EventObject:
| Property | Type | Description |
|---|---|---|
key | string | Object key |
size | number | Object size in bytes |
etag | string | Object ETag |
uploadedAt | Date | Upload timestamp |
WebhookSource
Section titled “WebhookSource”HTTP webhook events with signature verification.
interface WebhookSource { type: 'webhook' path: string // Webhook endpoint path methods?: HttpMethod[] // Allowed HTTP methods (default: ['POST']) verifier?: WebhookVerifier // Built-in or custom verifier}
type HttpMethod = 'GET' | 'POST' | 'PUT' | 'PATCH' | 'DELETE'Example with Built-in Verifier:
import { stripeVerifier } from '@cloudwerk/trigger/verifiers'
export default defineTrigger({ source: { type: 'webhook', path: '/webhooks/stripe', verifier: stripeVerifier({ secret: process.env.STRIPE_WEBHOOK_SECRET, }), }, async handle(event, ctx) { const stripeEvent = event.payload if (stripeEvent.type === 'checkout.session.completed') { await fulfillOrder(stripeEvent.data.object) } },})Event Properties:
| Property | Type | Description |
|---|---|---|
request | Request | Original HTTP request |
payload | unknown | Parsed and verified payload |
headers | Headers | Request headers |
verified | boolean | Whether signature was verified |
EmailSource
Section titled “EmailSource”Incoming email events.
interface EmailSource { type: 'email' address?: string // Filter by recipient address domain?: string // Filter by recipient domain}Example:
export default defineTrigger({ source: { type: 'email', domain: 'support.myapp.com', }, async handle(event, ctx) { const { from, to, subject, text, html } = event await createSupportTicket({ from, subject, body: text || html }) },})Event Properties:
| Property | Type | Description |
|---|---|---|
from | string | Sender email address |
to | string | Recipient email address |
subject | string | Email subject |
text | string | null | Plain text body |
html | string | null | HTML body |
attachments | EmailAttachment[] | File attachments |
headers | Map<string, string> | Email headers |
raw | ReadableStream | Raw email stream |
D1Source
Section titled “D1Source”Database change events.
interface D1Source { type: 'd1' database: string // D1 database binding name tables?: string[] // Tables to watch (all if omitted) operations?: D1Operation[] // Operations to watch (all if omitted)}
type D1Operation = 'INSERT' | 'UPDATE' | 'DELETE'Example:
export default defineTrigger({ source: { type: 'd1', database: 'DB', tables: ['users', 'orders'], operations: ['INSERT', 'UPDATE'], }, async handle(event, ctx) { const { table, operation, newRow, oldRow } = event await syncToAnalytics(table, operation, newRow) },})Event Properties:
| Property | Type | Description |
|---|---|---|
database | string | Database binding name |
table | string | Affected table name |
operation | D1Operation | INSERT, UPDATE, or DELETE |
newRow | Record<string, unknown> | null | New row data (INSERT/UPDATE) |
oldRow | Record<string, unknown> | null | Previous row data (UPDATE/DELETE) |
primaryKey | unknown | Primary key value |
TailSource
Section titled “TailSource”Log consumption from other Workers.
interface TailSource { type: 'tail' workers: string[] // Worker names to consume logs from filters?: TailFilter[] // Optional log filters}
interface TailFilter { field: 'outcome' | 'status' | 'method' | 'samplingRate' value: string | number operator?: 'equals' | 'contains' | 'startsWith'}Example:
export default defineTrigger({ source: { type: 'tail', workers: ['api-worker', 'auth-worker'], filters: [ { field: 'outcome', value: 'exception' }, ], }, async handle(event, ctx) { for (const log of event.logs) { if (log.outcome === 'exception') { await alertOnCall(log) } } },})Event Properties:
| Property | Type | Description |
|---|---|---|
logs | TailLog[] | Array of log entries |
worker | string | Source worker name |
TriggerContext
Section titled “TriggerContext”The context object passed to all trigger handlers.
interface TriggerContext { env: Env // Environment bindings traceId: string // Unique trace ID attempt: number // Current retry attempt (1-based)
waitUntil(promise: Promise<unknown>): void // Extend execution lifetime emit(triggerName: string, payload: unknown): Promise<void> // Chain triggers noRetry(): void // Prevent automatic retry}Context Methods
Section titled “Context Methods”waitUntil()
Section titled “waitUntil()”Extend the handler’s lifetime for background work.
async handle(event, ctx) { await criticalWork()
// Non-critical work runs after handler completes ctx.waitUntil(sendAnalytics(event)) ctx.waitUntil(updateMetrics())}emit()
Section titled “emit()”Chain to another trigger, preserving trace ID.
async handle(event, ctx) { await processOrder(event.payload)
// Trigger downstream processing await ctx.emit('send-confirmation', { orderId: event.payload.orderId, })}noRetry()
Section titled “noRetry()”Prevent automatic retry on failure.
async handle(event, ctx) { try { await processEvent(event) } catch (error) { if (isPermanentError(error)) { ctx.noRetry() // Don't retry permanent failures await logPermanentFailure(error) } throw error }}TriggerConfig
Section titled “TriggerConfig”Optional configuration for trigger behavior.
interface TriggerConfig { retries?: number // Max retry attempts (default: 3) retryDelay?: string // Delay between retries (default: '1m') timeout?: string // Handler timeout (default: '30s') observability?: { metrics?: boolean // Enable metrics (default: true) tracing?: boolean // Enable tracing (default: true) }}Example:
export default defineTrigger({ source: { type: 'scheduled', cron: '0 * * * *' }, config: { retries: 5, retryDelay: '2m', timeout: '5m', observability: { metrics: true, tracing: true, }, }, async handle(event, ctx) { // Handler with custom retry behavior },})Webhook Verifiers
Section titled “Webhook Verifiers”Built-in signature verifiers for popular services.
stripeVerifier
Section titled “stripeVerifier”import { stripeVerifier } from '@cloudwerk/trigger/verifiers'
stripeVerifier({ secret: string, // Webhook signing secret tolerance?: number, // Timestamp tolerance in seconds (default: 300)})githubVerifier
Section titled “githubVerifier”import { githubVerifier } from '@cloudwerk/trigger/verifiers'
githubVerifier({ secret: string, // Webhook secret events?: string[], // Allowed event types (optional)})slackVerifier
Section titled “slackVerifier”import { slackVerifier } from '@cloudwerk/trigger/verifiers'
slackVerifier({ signingSecret: string, // Slack signing secret tolerance?: number, // Timestamp tolerance (default: 300)})shopifyVerifier
Section titled “shopifyVerifier”import { shopifyVerifier } from '@cloudwerk/trigger/verifiers'
shopifyVerifier({ secret: string, // Webhook secret})linearVerifier
Section titled “linearVerifier”import { linearVerifier } from '@cloudwerk/trigger/verifiers'
linearVerifier({ secret: string, // Webhook signing secret})customHmacVerifier
Section titled “customHmacVerifier”Create a custom HMAC verifier for any service.
import { customHmacVerifier } from '@cloudwerk/trigger/verifiers'
customHmacVerifier({ secret: string, algorithm: 'sha256' | 'sha1' | 'sha512', header: string, // Header containing signature prefix?: string, // Signature prefix (e.g., 'sha256=') encoding?: 'hex' | 'base64', // Signature encoding (default: 'hex') getPayload?: (request: Request) => Promise<string>, // Custom payload extraction})Example:
const myVerifier = customHmacVerifier({ secret: process.env.WEBHOOK_SECRET, algorithm: 'sha256', header: 'X-Custom-Signature', prefix: 'sha256=',})Testing Utilities
Section titled “Testing Utilities”Mock event factories for testing triggers.
import { mockScheduledEvent, mockQueueEvent, mockR2Event, mockWebhookEvent, mockEmailEvent, mockD1Event, mockTailEvent, mockTriggerContext,} from '@cloudwerk/trigger/testing'mockScheduledEvent
Section titled “mockScheduledEvent”const event = mockScheduledEvent({ cron: '0 * * * *', scheduledTime: new Date('2024-01-15T10:00:00Z'),})mockQueueEvent
Section titled “mockQueueEvent”const event = mockQueueEvent({ queue: 'notifications', messages: [ { id: '1', body: { userId: 'user_123' }, attempts: 1 }, { id: '2', body: { userId: 'user_456' }, attempts: 1 }, ],})mockTriggerContext
Section titled “mockTriggerContext”const ctx = mockTriggerContext({ env: mockEnv, traceId: 'test-trace-123',})Testing Example
Section titled “Testing Example”import { describe, it, expect, vi } from 'vitest'import { mockScheduledEvent, mockTriggerContext } from '@cloudwerk/trigger/testing'import trigger from './daily-cleanup'
describe('daily-cleanup trigger', () => { it('should clean up expired records', async () => { const event = mockScheduledEvent({ cron: '0 0 * * *' }) const ctx = mockTriggerContext({ env: { DB: mockD1Database() }, })
await trigger.handle(event, ctx)
expect(ctx.env.DB.prepare).toHaveBeenCalledWith( expect.stringContaining('DELETE FROM') ) })})Error Handling
Section titled “Error Handling”TriggerError
Section titled “TriggerError”Base error class for trigger failures.
import { TriggerError } from '@cloudwerk/trigger'
class TriggerError extends Error { readonly code: string readonly retryable: boolean readonly context?: Record<string, unknown>}WebhookVerificationError
Section titled “WebhookVerificationError”Thrown when webhook signature verification fails.
import { WebhookVerificationError } from '@cloudwerk/trigger'
try { await verifySignature(request)} catch (error) { if (error instanceof WebhookVerificationError) { console.error('Invalid signature:', error.message) return new Response('Unauthorized', { status: 401 }) }}TriggerTimeoutError
Section titled “TriggerTimeoutError”Thrown when handler exceeds configured timeout.
import { TriggerTimeoutError } from '@cloudwerk/trigger'CLI Commands
Section titled “CLI Commands”List Triggers
Section titled “List Triggers”cloudwerk triggers listShow Trigger Details
Section titled “Show Trigger Details”cloudwerk triggers info <name>Test Trigger Locally
Section titled “Test Trigger Locally”cloudwerk triggers test <name> --event '{"key": "value"}'Generate Types
Section titled “Generate Types”cloudwerk triggers generate-typesType Definitions
Section titled “Type Definitions”Full Type Reference
Section titled “Full Type Reference”// Trigger definitioninterface TriggerDefinition { source: TriggerSource handle: TriggerHandler config?: TriggerConfig}
// Handler signaturetype TriggerHandler<E = unknown> = ( event: E, ctx: TriggerContext) => Awaitable<void>
// Awaitable helpertype Awaitable<T> = T | Promise<T>
// Union of all source typestype TriggerSource = | ScheduledSource | QueueSource | R2Source | WebhookSource | EmailSource | D1Source | TailSource
// Union of all event typestype TriggerEvent = | ScheduledEvent | QueueEvent | R2Event | WebhookEvent | EmailEvent | D1Event | TailEventLimits
Section titled “Limits”| Limit | Value |
|---|---|
| Maximum cron triggers per Worker | 3 |
| Minimum cron interval | 1 minute |
| Maximum handler timeout (free) | 30 seconds |
| Maximum handler timeout (paid) | 15 minutes |
| Maximum queue batch size | 100 messages |
| Maximum webhook payload | 100 MB |
Next Steps
Section titled “Next Steps”- Triggers Guide - Patterns and best practices
- Queues Guide - Queue consumer patterns
- Services API - Service extraction