Ablauf

Server

Set up and use the Ablauf class to orchestrate workflows in your Cloudflare Worker.

Server

The Ablauf class is the server-side API for creating, managing, and querying durable workflow instances. It connects to your Durable Object binding and provides type-safe methods for the full workflow lifecycle.

Installation

npm install @der-ablauf/workflows hono zod

Setup

Initialize the Ablauf class with your Durable Object binding and register your workflows:

src/index.ts
import { Hono } from 'hono';
import { Ablauf } from '@der-ablauf/workflows';
import { env } from 'cloudflare:workers';
import { OrderWorkflow } from './workflows/order';
import { PaymentWorkflow } from './workflows/payment';

const ablauf = new Ablauf(env.WORKFLOW_RUNNER, {
	workflows: [OrderWorkflow, PaymentWorkflow],
});

const app = new Hono();

Exporting the Durable Object

Your worker must export the WorkflowRunner Durable Object class:

src/index.ts
export const WorkflowRunner = ablauf.createWorkflowRunner();
export default { fetch: app.fetch };
The export name WorkflowRunner must match the class_name in your wrangler.jsonc Durable Object binding.

Creating Workflows

Start a new workflow instance with create(). The payload is validated against the workflow's Zod schema:

app.post('/orders', async (c) => {
	const payload = await c.req.json();

	const stub = await ablauf.create(OrderWorkflow, {
		id: crypto.randomUUID(),
		payload,
	});

	const status = await stub.getStatus();
	return c.json(status);
});

Getting a Workflow Handle

Use get() to get a handle for an existing workflow instance. This is how you interact with workflows after creation — send events, check status, pause, resume, or terminate:

const order = ablauf.get(OrderWorkflow, { id: 'order-123' });

Sending Events

Send typed events to running workflows. The event name and payload are type-checked and validated:

const order = ablauf.get(OrderWorkflow, { id: 'order-123' });
await order.sendEvent({
	event: 'payment-received',
	payload: { amount: 99.99, transactionId: 'tx-456' },
});

Querying Status

Get the current status of any workflow instance:

const order = ablauf.get(OrderWorkflow, { id: 'order-123' });
const status = await order.getStatus();
// status.payload is typed as OrderPayload
// status.result is typed as OrderResult | null

Lifecycle Control

Pause, resume, or terminate workflows:

const order = ablauf.get(OrderWorkflow, { id: 'order-123' });
await order.pause(); // Finish current step, then suspend
await order.resume(); // Replay and continue from where it paused
await order.terminate(); // Permanently stop (cannot be resumed)

Waiting for SSE Updates

Wait for a specific real-time update from a workflow. This is useful for request-response patterns where you need to wait for a workflow to reach a certain point:

const order = ablauf.get(OrderWorkflow, { id: 'order-123' });
const progress = await order.waitForUpdate({
	update: 'progress',
	timeout: '30s',
});

Setting Up the Dashboard API

To enable the Dashboard, add the oRPC handlers to your worker using createHandlers():

const { rpcHandler, openApiHandler } = ablauf.createHandlers();

app.all('/__ablauf/*', async (c) => {
	const { matched, response } = await openApiHandler.handle(c.req.raw, {
		prefix: '/__ablauf',
		context: ablauf.getDashboardContext(),
	});
	if (matched) return c.newResponse(response.body, response);

	const { matched: matchedRpc, response: rpcResponse } = await rpcHandler.handle(c.req.raw, {
		prefix: '/__ablauf',
		context: ablauf.getDashboardContext(),
	});
	if (matchedRpc) return c.newResponse(rpcResponse.body, rpcResponse);

	return new Response('Not Found', { status: 404 });
});

For the full dashboard setup including installation, running, and configuration, see the Dashboard docs.

Configuration

The full AblaufConfig options:

const ablauf = new Ablauf(env.WORKFLOW_RUNNER, {
	// Register workflows (with optional shard config)
	workflows: [OrderWorkflow, [HighVolumeWorkflow, { shards: 8 }]],
	// Observability: true (default), false, or a custom ObservabilityProvider
	observability: true,
	// CORS origins for the dashboard API
	corsOrigins: ['https://dashboard.example.com'],
});

The observability option accepts three modes:

  • true (default) — Uses the built-in ShardObservabilityProvider with shard-based Durable Object indexing.
  • false — Disables all observability. ablauf.list() throws ObservabilityReadNotConfiguredError.
  • Custom ObservabilityProvider — Plug in your own storage backend (see below).

See the API Reference for the complete method signatures and type definitions.

Custom Observability Provider

By default, Ablauf uses shard-based indexing for workflow listing and dashboard data. You can replace this entirely with your own storage backend by providing a custom ObservabilityProvider:

import { Ablauf } from '@der-ablauf/workflows';
import type { ObservabilityProvider } from '@der-ablauf/workflows';

const myProvider: ObservabilityProvider<MyCollector> = {
	createCollector(workflowId, type) {
		return { workflowId, type, events: [] };
	},
	onWorkflowStart(collector, event) { /* ... */ },
	onWorkflowStatusChange(collector, event) { /* ... */ },
	onStepStart(collector, event) { /* ... */ },
	onStepComplete(collector, event) { /* ... */ },
	onStepRetry(collector, event) { /* ... */ },
	async flush(collector, reason) {
		// Batch write all collected events to your database
		await db.insert(collector.events);
	},
	// Optional: enable dashboard read features
	async listWorkflows(filters) { /* query your database */ },
	async getWorkflowStatus(id) { /* query your database */ },
	async getWorkflowTimeline(id) { /* query your database */ },
};

const ablauf = new Ablauf(env.WORKFLOW_RUNNER, {
	workflows: [OrderWorkflow],
	observability: myProvider,
});

If your custom provider only implements write methods (the lifecycle hooks), the dashboard list endpoint will throw ObservabilityReadNotConfiguredError. You can still use ablauf.get() to check individual workflows by ID.

To customize the built-in shard provider directly (e.g., for custom shard counts), use ShardObservabilityProvider:

import { Ablauf, ShardObservabilityProvider } from '@der-ablauf/workflows';

const ablauf = new Ablauf(env.WORKFLOW_RUNNER, {
	workflows: [OrderWorkflow],
	observability: new ShardObservabilityProvider(env.WORKFLOW_RUNNER, {
		shards: { 'order-processing': { shards: 8 } },
		workflowTypes: ['order-processing'],
	}),
});

On this page