Ablauf

Functional API (Recommended)

Learn how to define workflows using the functional API with defineWorkflow() for less boilerplate and better type inference.

Functional API

The functional API using defineWorkflow() is the recommended way to define workflows in Ablauf. It provides excellent type inference, minimal boilerplate, and a clean declarative style.

Why functional? Types are automatically inferred from your schemas, and you get the same power as the class-based API with less code to maintain.

Basic Example

Here's a simple workflow that processes an order with validation, a cool-down period, and shipping:

import { defineWorkflow } from '@der-ablauf/workflows';

const OrderWorkflow = defineWorkflow((t) => ({
	type: 'process-order',
	input: t.object({
		orderId: t.string(),
		items: t.array(
			t.object({
				name: t.string(),
				quantity: t.number(),
			}),
		),
	}),
	run: async (step, payload) => {
		const validated = await step.do('validate-order', async () => {
			return { valid: true, itemCount: payload.items.length };
		});

		await step.sleep('cool-down', '5s');

		const shipped = await step.do('ship-order', async () => {
			return { trackingId: `TRK-${payload.orderId}` };
		});

		return { validated, shipped };
	},
}));

Notice how payload is automatically typed based on your input schema. No manual type annotations needed.

Configuration Fields

type (required)

A unique string identifier for your workflow. This is used for routing, listing, and querying workflows.

type: 'process-order';
Workflow types must be unique across your application. Think of them like route paths.

input (required)

A Zod schema that validates the workflow payload. The type of payload in your run function is automatically inferred from this schema.

input: t.object({
	orderId: t.string(),
	customerId: t.string().uuid(),
	amount: t.number().positive(),
});

run (required)

The main workflow function. It receives three parameters:

  • step - The step context for executing durable steps
  • payload - The validated input (typed from your schema)
  • sse - Server-sent events broadcaster for real-time updates
run: async (step, payload, sse) => {
	// Your workflow logic here
	return { success: true };
};

events (optional)

A record mapping event names to Zod schemas. This makes step.waitForEvent() fully typed.

events: {
	approved: t.object({ by: t.string() }),
	rejected: t.object({ reason: t.string() }),
}

defaults (optional)

Default retry configuration applied to all steps in the workflow. Individual steps can override these settings.

defaults: {
	retries: {
		limit: 5,
		delay: "2s",
		backoff: "exponential",
	},
}

resultSizeLimit (optional)

Cumulative memory budget for all step results. Prevents OOM by failing steps before the total serialized result size exceeds the limit.

resultSizeLimit: {
	maxSize: '128mb',     // Default: '64mb'
	onOverflow: 'retry',  // Default: 'fail'
}

See Result Size Limits for details.

sseUpdates (optional)

A record of named Zod schemas defining the SSE updates your workflow can emit.

sseUpdates: {
	progress: t.object({ percent: t.number() }),
	complete: t.object({ result: t.string() }),
}

Advanced Example

Here's a more complex workflow demonstrating events, SSE updates, and retry configuration:

import { defineWorkflow } from '@der-ablauf/workflows';

const ApprovalWorkflow = defineWorkflow((t) => ({
	type: 'approval',
	input: t.object({
		requestId: t.string(),
		amount: t.number(),
	}),
	events: {
		decision: t.object({
			approved: t.boolean(),
			reviewer: t.string(),
		}),
	},
	sseUpdates: {
		progress: t.object({ message: t.string() }),
		complete: t.object({ approved: t.boolean() }),
	},
	defaults: {
		retries: {
			limit: 5,
			delay: '2s',
			backoff: 'exponential',
		},
	},
	run: async (step, payload, sse) => {
		sse.broadcast('progress', { message: 'Processing request...' });

		const check = await step.do('validate', async () => {
			return payload.amount < 10000;
		});

		if (!check) {
			sse.broadcast('progress', {
				message: 'Request requires approval...',
			});
		}

		const decision = await step.waitForEvent('decision', { timeout: '24h' });

		sse.emit('complete', { approved: decision.approved });

		return {
			approved: decision.approved,
			reviewer: decision.reviewer,
		};
	},
}));

Under the Hood

defineWorkflow() is the recommended API and provides the best type inference with the least boilerplate.

If you find yourself needing more structure or additional methods, you can always refactor to the class-based API without changing behavior.

Pro tip: Start with the functional API. You can always refactor to classes later if your workflow logic becomes complex enough to warrant it.

On this page