Package: wf
Programmable workflows with steps, pipelines, and DAGs, including dependencies, state, context management, and error handling.
Repository: https://github.com/dracory/wf
Key Features
- Flexible initialization (e.g.,
WithName
,WithID
,WithRunnables
) - Automated task execution (programmatic, non-human)
- Simple, reusable step definitions
- Organized pipelines (sequence of runnables)
- DAG for dependencies between steps/pipelines
- Cycle detection (prevents circular dependencies)
- Context + shared data map between steps
- Error propagation end-to-end
- State management (track/persist execution state)
- Pause and resume
- Testable by design
When to Use
Ideal for:
- Data processing pipelines, ETL, batch jobs
- Automated build/test pipelines
- Service orchestration
Not suitable for:
- Human-driven approval/manual workflows (see
swf
)
Core Components
- Step: basic unit of work with ID, name, handler
- Pipeline: sequence of runnables (steps or pipelines)
- Dag: runnables + dependency graph with execution ordering
- State: execution status, completed steps, workflow data
Hierarchy:
Runnable
├── Step (single operation)
├── Pipeline (runs runnables in sequence)
└── Dag (dependencies between runnables)
Usage Examples
Creating Steps
// Create a step with an execution function
step := wf.NewStep()
step.SetName("My Step")
step.SetHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
data["key"] = "value"
return ctx, data, nil
})
Creating a Pipeline
step1 := wf.NewStep(
wf.WithName("Process Data"),
wf.WithHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
data["processed"] = true
return ctx, data, nil
}),
)
step2 := wf.NewStep(
wf.WithName("Validate Data"),
wf.WithHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
if !data["processed"].(bool) {
return ctx, data, errors.New("data not processed")
}
return ctx, data, nil
}),
)
pipeline := wf.NewPipeline(
wf.WithName("Data Processing Pipeline"),
wf.WithRunnables(step1, step2),
)
Creating a DAG
dag := wf.NewDag(
wf.WithName("My DAG"),
wf.WithRunnables(step1, step2),
wf.WithDependency(step2, step1), // step2 depends on step1
)
Using a Pipeline in a DAG
pipeline := wf.NewPipeline(
wf.WithName("Data Processing Pipeline"),
wf.WithRunnables(step1, step2),
)
dag := wf.NewDag(
wf.WithName("My DAG"),
wf.WithRunnables(pipeline, step3),
wf.WithDependency(step3, pipeline),
)
Executing
ctx := context.Background()
data := make(map[string]any)
_, data, err := dag.Run(ctx, data)
if err != nil {
// handle error
}
State Management
- Persist/restore
State
between runs - Track status and completed runnables
- Keep/update data map for subsequent steps
Notes
- Designed for programmatic, automated workflows
- Use
swf
for human-driven linear workflows