Step-by-step guide for creating Temporal workflows in Dust. Use when adding background jobs, async processing, durable workflows, or task queues.
Installation
Details
Usage
After installing, this skill will be available to your AI coding assistant.
Verify installation:
skills listSkill Instructions
name: dust-temporal description: Step-by-step guide for creating Temporal workflows in Dust. Use when adding background jobs, async processing, durable workflows, or task queues.
Creating Temporal Workflows
This skill guides you through creating Temporal workflows for durable background processing.
Quick Reference
Files Structure (per queue)
temporal/your_queue/
├── config.ts # Queue name and version
├── helpers.ts # Workflow ID generators
├── activities.ts # Activity implementations (DB, API calls)
├── workflows.ts # Workflow orchestration
├── worker.ts # Worker setup
└── client.ts # Workflow launcher functions
Key Concepts
- Workflow: Durable, deterministic function that orchestrates activities
- Activity: Non-deterministic function with side effects (DB, API calls)
- Task Queue: Named queue where workflows/activities execute
- Workflow ID: Unique identifier for idempotency
Step-by-Step Implementation
Step 1: Create Queue Configuration
Create temporal/your_queue/config.ts:
const QUEUE_VERSION = 1;
export const QUEUE_NAME = `your-queue-v${QUEUE_VERSION}`;
Step 2: Create Workflow ID Helper
Create temporal/your_queue/helpers.ts:
export function makeYourWorkflowId({ entityId }: { entityId: string }): string {
return `your-workflow-${entityId}`;
}
Important: Workflow IDs must be deterministic (same inputs = same ID) for idempotency.
Step 3: Create Activities
Create temporal/your_queue/activities.ts:
import { YourResource } from "@app/lib/resources/your_resource";
import logger from "@app/logger/logger";
export async function yourActivity({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<void> {
const entity = await YourResource.fetchById(entityId);
if (!entity) {
throw new Error(`Entity not found: ${entityId}`);
}
const result = await entity.doSomething();
if (result.isErr()) {
logger.error({ entityId, error: result.error }, "Failed to process entity");
throw new Error(`Failed to process: ${result.error.message}`);
}
}
Guidelines: Activities perform side effects, can throw (Temporal retries), should be idempotent.
Step 4: Create Workflow
Create temporal/your_queue/workflows.ts:
import { proxyActivities } from "@temporalio/workflow";
import type * as activities from "@app/temporal/your_queue/activities";
const { yourActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
});
export async function yourWorkflow({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<void> {
await yourActivity({ entityId, workspaceId });
}
Guidelines: Workflows are deterministic - no Math.random(), Date.now(), etc.
Step 5: Create Client Launcher
Create temporal/your_queue/client.ts:
import { WorkflowExecutionAlreadyStartedError } from "@temporalio/client";
import { getTemporalClientForFrontNamespace } from "@app/lib/temporal";
import logger from "@app/logger/logger";
import { QUEUE_NAME } from "@app/temporal/your_queue/config";
import { makeYourWorkflowId } from "@app/temporal/your_queue/helpers";
import { yourWorkflow } from "@app/temporal/your_queue/workflows";
import type { Result } from "@app/types";
import { Err, normalizeError, Ok } from "@app/types";
export async function launchYourWorkflow({
entityId,
workspaceId,
}: {
entityId: string;
workspaceId: number;
}): Promise<Result<undefined, Error>> {
const client = await getTemporalClientForFrontNamespace();
const workflowId = makeYourWorkflowId({ entityId });
try {
await client.workflow.start(yourWorkflow, {
args: [{ entityId, workspaceId }],
taskQueue: QUEUE_NAME,
workflowId,
memo: { entityId, workspaceId },
});
return new Ok(undefined);
} catch (e) {
if (!(e instanceof WorkflowExecutionAlreadyStartedError)) {
logger.error({ workflowId, entityId, workspaceId, error: e }, "Failed starting workflow");
}
return new Err(normalizeError(e));
}
}
Step 6: Create Worker
Create temporal/your_queue/worker.ts:
import type { Context } from "@temporalio/activity";
import { Worker } from "@temporalio/worker";
import { getTemporalWorkerConnection, TEMPORAL_MAXED_CACHED_WORKFLOWS } from "@app/lib/temporal";
import { ActivityInboundLogInterceptor } from "@app/lib/temporal_monitoring";
import logger from "@app/logger/logger";
import * as activities from "@app/temporal/your_queue/activities";
import { getWorkflowConfig } from "@app/temporal/bundle_helper";
import { QUEUE_NAME } from "./config";
export async function runYourQueueWorker() {
const { connection, namespace } = await getTemporalWorkerConnection();
const worker = await Worker.create({
...getWorkflowConfig({
workerName: "your_queue",
getWorkflowsPath: () => require.resolve("./workflows"),
}),
activities,
taskQueue: QUEUE_NAME,
maxCachedWorkflows: TEMPORAL_MAXED_CACHED_WORKFLOWS,
maxConcurrentActivityTaskExecutions: 16,
connection,
namespace,
interceptors: {
activityInbound: [(ctx: Context) => new ActivityInboundLogInterceptor(ctx, logger)],
},
});
await worker.run();
}
Step 7: Register Worker (Critical!)
Edit temporal/worker_registry.ts:
// 1. Add import
import { runYourQueueWorker } from "@app/temporal/your_queue/worker";
// 2. Add to WorkerName type
export type WorkerName =
| "agent_loop"
// ... existing workers
| "your_queue"; // <- Add this
// 3. Add to workerFunctions mapping
export const workerFunctions: Record<WorkerName, () => Promise<void>> = {
// ... existing workers
your_queue: runYourQueueWorker, // <- Add this
};
Without registration, workflows will never execute!
Timeout & Retry Configuration
const { yourActivity } = proxyActivities<typeof activities>({
startToCloseTimeout: "5 minutes",
retry: {
maximumAttempts: 3,
initialInterval: "1s",
backoffCoefficient: 2,
maximumInterval: "1m",
nonRetryableErrorTypes: ["ValidationError"],
},
});
Validation Checklist
- Queue config created with versioned name
- Workflow ID helper is deterministic
- Activities handle errors properly
- Workflow uses
proxyActivitieswith appropriate timeouts - Client returns
Result<>and handlesWorkflowExecutionAlreadyStartedError - Worker registered in
worker_registry.ts - Tested locally
Reference Examples
See temporal/ directory for existing implementations.
More by dust-tt
View allInformation about dust-hive, a CLI tool for running multiple isolated Dust development environments. ALWAYS enable this skill when the working directory is under ~/dust-hive/. Use for environment status, Dust app commands, and understanding port allocation.
Step-by-step guide for adding support for a new LLM in Dust. Use when adding a new model, or updating a previous one.
Step-by-step guide for creating new internal MCP server integrations in Dust that connect to remote platforms (Jira, HubSpot, Salesforce, etc.). Use when adding a new MCP server, implementing a platform integration, or connecting Dust to a new external service.
Step-by-step guide for writing focused, practical tests for Dust codebases following the 80/20 principle.
