Ablauf

Class-based API

Learn how to define workflows by extending BaseWorkflow for more structure and control.

Class-based API

The class-based API allows you to define workflows by extending BaseWorkflow. This approach is great if you prefer object-oriented patterns, need additional helper methods, or want more explicit type control.

Most users should start with the functional API for less boilerplate. Use classes when you need additional structure or methods.

Basic Example

Here's a workflow that handles user onboarding with email verification:

import { BaseWorkflow, t } from '@der-ablauf/workflows';
import type { Step, SSE } from '@der-ablauf/workflows';
import type { z } from 'zod';

const inputSchema = t.object({
	name: t.string(),
	email: t.string().email(),
});
type Payload = z.infer<typeof inputSchema>;

const eventSchemas = {
	verification: t.object({ code: t.string() }),
};
type Events = { verification: { code: string } };

interface Result {
	verified: boolean;
	welcomeMessage: string;
}

export class OnboardingWorkflow extends BaseWorkflow<Payload, Result, Events> {
	static type = 'onboarding' as const;
	static inputSchema = inputSchema;
	static events = eventSchemas;
	static defaults = {
		retries: { limit: 3, delay: '1s', backoff: 'exponential' as const },
	};

	async run(step: Step<Events>, payload: Payload, _sse: SSE<never>): Promise<Result> {
		await step.do('send-verification', async () => {
			// Send verification email
			console.log(`Sending verification to ${payload.email}`);
		});

		const verification = await step.waitForEvent('verification', {
			timeout: '1h',
		});

		const verified = verification.code === 'VALID';

		const welcomeMessage = await step.do('create-welcome', async () => {
			return `Welcome, ${payload.name}!`;
		});

		return { verified, welcomeMessage };
	}
}

Schemas use the t namespace from @der-ablauf/workflows instead of z from Zod. This constrained namespace only exposes SuperJSON-compatible types, preventing serialization errors at compile time. You can also wrap existing Zod schemas with serializable().

Generic Type Parameters

BaseWorkflow accepts up to four generic type parameters:

BaseWorkflow<Payload, Result, Events, SSEUpdates>;
  • Payload - The input type for your workflow
  • Result - The return type of the run method
  • Events - Event payload types (use never if no events)
  • SSEUpdates - SSE message type (use never if no SSE)

Static Properties

All static properties are required except events, defaults, and sseUpdates.

type (required)

The unique workflow identifier. The as const assertion is important for proper type narrowing.

static type = "onboarding" as const;
Don't forget as const! Without it, TypeScript will widen the type to string and you'll lose type safety.

inputSchema (required)

A Zod schema that validates the workflow payload. Use the t namespace for transport-safe types.

static inputSchema = t.object({
	name: t.string(),
	email: t.string().email(),
});

If you have an existing Zod schema, wrap it with serializable():

import { serializable } from '@der-ablauf/workflows';
import { z } from 'zod';

// Pre-existing schema from your app
const userSchema = z.object({ name: z.string(), email: z.string() });

class MyWorkflow extends BaseWorkflow<...> {
	static inputSchema = serializable(userSchema);
}

events (optional)

A record mapping event names to their Zod validation schemas.

static events = {
	verification: t.object({ code: t.string() }),
	cancellation: t.object({ reason: t.string() }),
};

defaults (optional)

Default retry configuration for all steps. Note the as const on the backoff strategy.

static defaults = {
	retries: {
		limit: 3,
		delay: "1s",
		backoff: "exponential" as const,
	},
};

sseUpdates (optional)

A record of named Zod schemas for SSE updates your workflow can broadcast/emit.

static sseUpdates = {
	status: t.object({ message: t.string() }),
	error: t.object({ error: t.string() }),
};

resultSizeLimit (optional)

Cumulative memory budget for all step results. Prevents OOM by failing steps before the total serialized result size exceeds the limit.

static resultSizeLimit = {
	maxSize: '128mb',     // Default: '64mb'
	onOverflow: 'retry',  // Default: 'fail'
};

See Result Size Limits for details.

The run Method

The run method is where your workflow logic lives. It receives three parameters and must return a Promise of your Result type.

async run(
	step: Step<Events>,
	payload: Payload,
	sse: SSE<SSEUpdates>
): Promise<Result> {
	// Your workflow logic
}

Type Annotations

Unlike the functional API, you'll need to manually annotate parameter types:

  • step: Step<Events> - Typed with your event schemas
  • payload: Payload - Your input type
  • sse: SSE<SSEUpdates> - Typed with your SSE schema (or SSE<never>)

Advanced Example

Here's a more complex workflow with SSE updates and multiple event types:

import { BaseWorkflow, t } from '@der-ablauf/workflows';
import type { Step, SSE } from '@der-ablauf/workflows';
import type { z } from 'zod';

const inputSchema = t.object({
	fileUrl: t.string().url(),
	format: t.enum(['pdf', 'docx', 'txt']),
});
type Payload = z.infer<typeof inputSchema>;

const eventSchemas = {
	progress: t.object({ percent: t.number() }),
	cancel: t.object({ reason: t.string() }),
};
type Events = {
	progress: { percent: number };
	cancel: { reason: string };
};

const sseSchema = {
	started: t.object({ url: t.string() }),
	progress: t.object({ percent: t.number() }),
	complete: t.object({ outputUrl: t.string() }),
};
type SSEUpdates = {
	started: z.infer<typeof sseSchema.started>;
	progress: z.infer<typeof sseSchema.progress>;
	complete: z.infer<typeof sseSchema.complete>;
};

interface Result {
	success: boolean;
	outputUrl?: string;
	cancelReason?: string;
}

export class DocumentProcessingWorkflow extends BaseWorkflow<Payload, Result, Events, SSEUpdates> {
	static type = 'document-processing' as const;
	static inputSchema = inputSchema;
	static events = eventSchemas;
	static sseUpdates = sseSchema;
	static defaults = {
		retries: { limit: 5, delay: '2s', backoff: 'exponential' as const },
	};

	async run(step: Step<Events>, payload: Payload, sse: SSE<SSEUpdates>): Promise<Result> {
		sse.broadcast('started', { url: payload.fileUrl });

		const downloaded = await step.do('download', async () => {
			// Download file logic
			return { path: '/tmp/file', size: 1024 };
		});

		sse.broadcast('progress', { percent: 33 });

		const converted = await step.do('convert', async () => {
			// Conversion logic
			return { outputPath: `/tmp/output.${payload.format}` };
		});

		sse.broadcast('progress', { percent: 66 });

		const uploaded = await step.do('upload', async () => {
			// Upload to storage
			return { url: 'https://cdn.example.com/output.pdf' };
		});

		sse.broadcast('complete', { outputUrl: uploaded.url });

		return { success: true, outputUrl: uploaded.url };
	}
}

When to Use Classes

Consider the class-based API when:

  • You prefer object-oriented programming patterns
  • You need helper methods or computed properties
  • You want more explicit control over type annotations
  • You're integrating with other class-based code

Otherwise, the functional API is recommended for its simplicity and superior type inference.

Remember: Both APIs are functionally identical. The functional API actually creates a class under the hood. Choose based on your style preference, not performance or capability.

On this page