Package: wf

Programmable workflows with steps, pipelines, DAGs, dependencies, state, and error handling.

Package: wf

Programmable workflows with steps, pipelines, and DAGs, including dependencies, state, context management, and error handling.

Repository: https://github.com/dracory/wf

Key Features

  • Flexible initialization (e.g., WithName, WithID, WithRunnables)
  • Automated task execution (programmatic, non-human)
  • Simple, reusable step definitions
  • Organized pipelines (sequence of runnables)
  • DAG for dependencies between steps/pipelines
  • Cycle detection (prevents circular dependencies)
  • Context + shared data map between steps
  • Error propagation end-to-end
  • State management (track/persist execution state)
  • Pause and resume
  • Testable by design

When to Use

Ideal for:

  • Data processing pipelines, ETL, batch jobs
  • Automated build/test pipelines
  • Service orchestration

Not suitable for:

  • Human-driven approval/manual workflows (see swf)

Core Components

  • Step: basic unit of work with ID, name, handler
  • Pipeline: sequence of runnables (steps or pipelines)
  • Dag: runnables + dependency graph with execution ordering
  • State: execution status, completed steps, workflow data

Hierarchy:

Runnable
├── Step (single operation)
├── Pipeline (runs runnables in sequence)
└── Dag (dependencies between runnables)

Usage Examples

Creating Steps

// Create a step with an execution function
step := wf.NewStep()
step.SetName("My Step")
step.SetHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
    data["key"] = "value"
    return ctx, data, nil
})

Creating a Pipeline

step1 := wf.NewStep(
    wf.WithName("Process Data"),
    wf.WithHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
        data["processed"] = true
        return ctx, data, nil
    }),
)

step2 := wf.NewStep(
    wf.WithName("Validate Data"),
    wf.WithHandler(func(ctx context.Context, data map[string]any) (context.Context, map[string]any, error) {
        if !data["processed"].(bool) {
            return ctx, data, errors.New("data not processed")
        }
        return ctx, data, nil
    }),
)

pipeline := wf.NewPipeline(
    wf.WithName("Data Processing Pipeline"),
    wf.WithRunnables(step1, step2),
)

Creating a DAG

dag := wf.NewDag(
    wf.WithName("My DAG"),
    wf.WithRunnables(step1, step2),
    wf.WithDependency(step2, step1), // step2 depends on step1
)

Using a Pipeline in a DAG

pipeline := wf.NewPipeline(
    wf.WithName("Data Processing Pipeline"),
    wf.WithRunnables(step1, step2),
)

dag := wf.NewDag(
    wf.WithName("My DAG"),
    wf.WithRunnables(pipeline, step3),
    wf.WithDependency(step3, pipeline),
)

Executing

ctx := context.Background()
data := make(map[string]any)

_, data, err := dag.Run(ctx, data)
if err != nil {
    // handle error
}

State Management

  • Persist/restore State between runs
  • Track status and completed runnables
  • Keep/update data map for subsequent steps

Notes

  • Designed for programmatic, automated workflows
  • Use swf for human-driven linear workflows
Menu