Skip to main content

Go SDK

The Conductor Go SDK lets you write workers, define workflows as code, and manage workflow executions from your Go application.

Prerequisites

Install the SDK

In your terminal, create a folder for your project and initialize a Go module:

mkdir conductor-app
cd conductor-app
go mod init conductor-app
go get github.com/conductor-sdk/conductor-go

Connect to Conductor

In Orkes Conductor, an application represents your SDK client and controls what it can access on the cluster. Each application has access keys (a key ID and secret) that the SDK uses to authenticate your requests.

To create an application:

  1. Go to Access Control > Applications from the left menu on your Conductor cluster.
  2. Select + Create application.
  3. Enter the application name.
  4. Select Save.

To retrieve the access key:

  1. Open the application.
  2. In Application roles, enable the Worker role.
  3. In the Access Keys section, select + Create access key to generate a unique Key ID, Key Secret, and Server URL.

The Key Secret is shown only once. So ensure to copy and store it securely.

Set environment variables

The SDK reads your server URL and credentials from environment variables. To make them available in every terminal session, add them to your shell profile (~/.zshrc or ~/.bash_profile):

export CONDUCTOR_SERVER_URL=https://SERVER_URL/api
export CONDUCTOR_AUTH_KEY=your-key-id
export CONDUCTOR_AUTH_SECRET=your-key-secret

Reload your shell profile after adding:

source ~/.zshrc

If you set environment variables using export in a terminal, they only persist for that session. Any new terminal will require you to export them again, which is a common source of connection errors when running workers and workflows in separate terminals.

Initialize the SDK clients

Every application that uses the Conductor Go SDK starts with the same three variables:

import (
"github.com/conductor-sdk/conductor-go/sdk/client"
"github.com/conductor-sdk/conductor-go/sdk/worker"
"github.com/conductor-sdk/conductor-go/sdk/workflow/executor"
)

var (
apiClient = client.NewAPIClientFromEnv()
taskRunner = worker.NewTaskRunnerWithApiClient(apiClient)
workflowExecutor = executor.NewWorkflowExecutor(apiClient)
)

client.NewAPIClientFromEnv() reads your environment variables automatically; no arguments needed.

  • apiClient handles all HTTP communication with the Conductor server, including authentication.
  • taskRunner manages the polling loop that picks up worker tasks from the server.
  • workflowExecutor provides methods for registering, starting, and monitoring workflows.

Quickstart

This tutorial walks you through creating a workflow with a single worker task and running it end-to-end.

Before you begin

Create the task definition in the Conductor UI before running the code. A task definition tells the Conductor server that a task type exists and how it should behave.

To create the greet task definition:

  1. Log in to your Conductor UI.
  2. Go to Definitions > Task.
  3. Select + Define task.
  4. Set the task name to greet.
  5. Leave all other settings as default.
  6. Select Save.

Run the quickstart

Open your terminal and navigate to your project folder:

cd conductor-app

Create a new file called quickstart.go and paste the following:

package main

import (
"fmt"
"os"
"strings"
"time"

"github.com/conductor-sdk/conductor-go/sdk/client"
"github.com/conductor-sdk/conductor-go/sdk/model"
"github.com/conductor-sdk/conductor-go/sdk/worker"
"github.com/conductor-sdk/conductor-go/sdk/workflow"
"github.com/conductor-sdk/conductor-go/sdk/workflow/executor"
)

var (
apiClient = client.NewAPIClientFromEnv()
taskRunner = worker.NewTaskRunnerWithApiClient(apiClient)
workflowExecutor = executor.NewWorkflowExecutor(apiClient)
)

func greet(task *model.Task) (interface{}, error) {
name := fmt.Sprintf("%v", task.InputData["name"])
return map[string]interface{}{
"greetings": "Hello, " + name,
}, nil
}

func main() {
taskRunner.StartWorker("greet", greet, 1, time.Millisecond*100)

wf := workflow.NewConductorWorkflow(workflowExecutor).
Name("greetings").
Version(1).
Description("Greetings workflow - Greets a user by their name").
TimeoutPolicy(workflow.TimeOutWorkflow, 600)

greetTask := workflow.NewSimpleTask("greet", "greet_ref").
Input("name", "${workflow.input.name}")

wf.Add(greetTask)
wf.OutputParameters(map[string]interface{}{
"greetings": greetTask.OutputRef("greetings"),
})

err := wf.Register(true)
if err != nil {
panic(err)
}

id, err := workflowExecutor.StartWorkflow(&model.StartWorkflowRequest{
Name: "greetings",
Version: 1,
Input: map[string]interface{}{
"name": "Conductor",
},
})
if err != nil {
panic(err)
}

channel, _ := workflowExecutor.MonitorExecution(id)
run := <-channel
fmt.Println("Output:", run.Output)
serverURL := strings.TrimSuffix(os.Getenv("CONDUCTOR_SERVER_URL"), "/api")
fmt.Printf("Execution: %s/execution/%s\n", serverURL, id)
}

What this does:

  • Define a worker: greet is a plain Go function with the worker signature func(task *model.Task) (interface{}, error). Whatever you return becomes the task's output.
  • Initialize the SDK: NewAPIClientFromEnv() reads your environment variables and creates the SDK clients.
  • Define and register the workflow: Builds a greetings workflow with one task and registers it on the Conductor server with wf.Register(true).
  • Start the worker: taskRunner.StartWorker begins polling for greet tasks.
  • Run the workflow: Starts an execution and waits for the result via MonitorExecution.

Run it:

go mod tidy
go run quickstart.go
Note

go mod tidy scans your source files and pulls in all transitive dependencies of the SDK. Without it, go run may fail with missing package errors.

Expected output:

Output: map[greetings:Hello, Conductor]
Execution: https://your-cluster/execution/<workflow-id>

Open the execution URL to view the workflow run in the Conductor UI. Select the task, then go to the Output tab to confirm that the output is displayed.

Workflow executed using Go SDK

The greetings workflow definition is now registered on your Conductor cluster. To verify, go to Definitions > Workflow and confirm it is listed there.

Greetings workflow created using JGoava SDK

That's it. You have successfully run a simple workflow. Next, explore the core concepts to understand how to build your own workflows.

The following sections cover each concept from the quickstart in detail, including workers, workflows, and execution management.

Workers

A worker is a piece of code responsible for executing a task. In Conductor, workers can be implemented in any language and deployed anywhere.

Implement a worker

In Go, a worker is a plain function with the signature func(task *model.Task) (interface{}, error). You register it against a task type name using taskRunner.StartWorker.

func greet(task *model.Task) (interface{}, error) {
name := fmt.Sprintf("%v", task.InputData["name"])
return map[string]interface{}{
"greetings": "Hello, " + name,
}, nil
}

taskRunner.StartWorker("greet", greet, 1, time.Millisecond*100)

The first argument to StartWorker is the task type name. The worker polls for tasks of that type and executes the function for each one.

Register the task definition

Before running a worker, create the task definition in the Conductor UI. A task definition tells the Conductor server that a task type exists and how it should behave.

To create a task definition:

  1. Log in to your Conductor UI.
  2. Go to Definitions > Task.
  3. Select + Define task.
  4. Set the task name.
  5. Leave all other settings as default.
  6. Select Save.

The task definition and the worker are intentionally decoupled. The definition tells the server what task types exist and how they should behave; including retry count, timeout, and rate limits. The worker provides the code that executes when the server assigns a task. This means you can update retry behavior without touching worker code, and deploy workers without changing server configuration.

Task inputs and outputs

The worker receives inputs through task.InputData, a map[string]interface{}. Use fmt.Sprintf("%v", ...) to safely cast values to strings.

Whatever the function returns as its first value becomes the task's output. If you return a map[string]interface{}, each key becomes an output field that downstream tasks can reference.

Failure modes

Returning a non-nil error marks the task as FAILED and triggers the retry policy. To fail the task immediately without retries, set model.FailedWithTerminalErrorTask on a TaskResult:

func validateOrder(task *model.Task) (interface{}, error) {
customerId := fmt.Sprintf("%v", task.InputData["customer_id"])

taskResult := &model.TaskResult{
WorkflowInstanceId: task.WorkflowInstanceId,
TaskId: task.TaskId,
}

if customerId == "" {
taskResult.Status = model.FailedWithTerminalErrorTask
taskResult.ReasonForIncompletion = "Missing customer ID"
return taskResult, nil
}

taskResult.Status = model.CompletedTask
taskResult.OutputData = map[string]interface{}{
"customer_id": customerId,
}
return taskResult, nil
}

Use FailedWithTerminalErrorTask when retrying would never help: invalid input, missing records, unauthorized requests. The task status in the UI will show Failed_with_terminal_error.

Retry count and logs

Use task.RetryCount to check which attempt is currently running. Use taskResult.Logs to write log lines visible in the Conductor UI under the task's execution timeline, with no extra instrumentation needed.

func chargePayment(task *model.Task) (interface{}, error) {
attempt := task.RetryCount

taskResult := &model.TaskResult{
WorkflowInstanceId: task.WorkflowInstanceId,
TaskId: task.TaskId,
}
taskResult.Logs = []model.TaskExecLog{
{Log: fmt.Sprintf("Charge attempt %d", attempt)},
}
// ...
}

task.RetryCount is 0 on first execution, 1 on first retry, and so on. The Conductor server passes it in at poll time; the worker doesn't track this itself.

Concurrency

The third argument to StartWorker is the batch size: how many tasks the worker picks up per poll. The fourth argument is the polling interval. Increase batch size for higher throughput; increase the polling interval to reduce server load during idle periods.

taskRunner.StartWorker("process_order", processOrder, 5, time.Millisecond*100)

If you have multiple workers in the same application, call StartWorker once per worker type. The same taskRunner manages all of them.

In applications that only run workers (no inline workflow execution), call taskRunner.WaitWorkers() to block the main goroutine and keep workers running:

taskRunner.StartWorker("get_user", getUser, 1, time.Millisecond*100)
taskRunner.StartWorker("send_welcome", sendWelcome, 1, time.Millisecond*100)
taskRunner.WaitWorkers()

In the quickstart, MonitorExecution already blocks the main goroutine until the workflow completes, so WaitWorkers() is not needed there.

Worker design principles

  • Workers are stateless and contain no workflow-specific logic.
  • Each worker executes one task and produces a defined output for a given input.
  • Workers should be idempotent; a task may be rescheduled if it times out.
  • Retry and timeout logic is handled by the Conductor server, not the worker.

Workflows

A workflow is the blueprint that connects tasks into a sequence. It defines which tasks run, in what order, and how outputs from one task become inputs to the next. Workflows handle branching, parallelism, retries, and timeouts; all configured in the workflow definition, not in your worker code.

You can define workflows in the Conductor UI, via the API, or in code using the SDK.

Define a workflow as code

Use workflow.NewConductorWorkflow to define workflows in Go. Chain tasks using wf.Add() in the order they should execute:

wf := workflow.NewConductorWorkflow(workflowExecutor).
Name("email_workflow").
Version(1)

getUserTask := workflow.NewSimpleTask("get_user_email", "get_user_email_ref").
Input("userid", "${workflow.input.userid}")

sendEmailTask := workflow.NewSimpleTask("send_email", "send_email_ref").
Input("email", getUserTask.OutputRef("email"))

wf.Add(getUserTask)
wf.Add(sendEmailTask)
wf.OutputParameters(map[string]interface{}{
"email": getUserTask.OutputRef("email"),
})
wf.Register(true)

NewSimpleTask("get_user_email", "get_user_email_ref") creates a task of type get_user_email with the reference name get_user_email_ref. The first argument is the task type; it must match the name your worker polls for. The second is the reference name; a unique identifier for this task instance within the workflow.

Pass data between tasks

.Input("key", value) maps a value to a task input. To pass a workflow-level input, use the JSONPath expression "${workflow.input.fieldName}". To pass the output of a previous task, use taskRef.OutputRef("fieldName"). Conductor resolves these at runtime.

chargeTask := workflow.NewSimpleTask("charge_card", "charge_ref").
Input("order_id", validateTask.OutputRef("order_id")).
Input("amount", validateTask.OutputRef("amount"))

charge_card only starts after validate_order completes and its output is available. You don't write any waiting logic; the data dependency is the scheduling signal.

Register a workflow

wf.Register(true) pushes the workflow definition to the server. The true argument overwrites any existing definition with the same name and version. You only need to call this when you create or update a workflow definition, not every time you run it.

In production, registration happens at deploy time. Pass false to prevent accidental overwrites.

Use system tasks

System tasks are pre-built tasks available in Conductor without writing a worker.

Wait task: Pauses the workflow until a certain timestamp, duration, or an external signal is received.

waitTask := workflow.NewWaitTask("wait_ref")
wf.Add(waitTask)

HTTP task: The HTTP task is used to make calls to remote services exposed over HTTP/HTTPS.

httpTask := workflow.NewHttpTask("call_api_ref", &workflow.HttpInput{
Uri: "https://orkes-api-tester.orkesconductor.com/api",
Method: "GET",
})
wf.Add(httpTask)

Inline task: Runs ECMA-compliant JavaScript inline.

inlineTask := workflow.NewInlineTask(
"inline_ref",
"function e() { if ($.value == 1){return {\"result\": true}} else { return {\"result\": false}}} e();",
)
wf.Add(inlineTask)

Learn more about other task types.

Execute a workflow

To start a workflow and get its ID asynchronously:

id, err := workflowExecutor.StartWorkflow(&model.StartWorkflowRequest{
Name: "greetings",
Version: 1,
Input: map[string]interface{}{
"name": "Conductor",
},
})

To start and wait for the result:

channel, _ := workflowExecutor.MonitorExecution(id)
run := <-channel
fmt.Println("Output:", run.Output)

MonitorExecution returns a channel that receives the final *model.Workflow when the execution reaches a terminal state. Use this for short-duration workflows. For long-running workflows, use StartWorkflow and poll for status using GetWorkflow.

Manage workflow executions

Get status, pause, resume, terminate, and restart workflow executions using workflowExecutor.

Get execution status

wfDetails, err := workflowExecutor.GetWorkflow(id, true)
fmt.Println("Status:", wfDetails.Status)

When the second argument is true, the response includes all completed and in-progress tasks.

Pause and resume

A paused workflow lets currently running tasks complete but does not schedule new tasks until resumed.

workflowExecutor.Pause(workflowId)
workflowExecutor.Resume(workflowId)

Terminate

Stops the workflow immediately and moves it to TERMINATED state. Any in-progress tasks are moved to CANCELED. The reason string is recorded in the execution and visible in the UI.

workflowExecutor.Terminate(workflowId, "Cancelled by user")

Retry a failed workflow

Resumes the workflow from the failed task without restarting from the beginning. In a multi-task workflow, if task 3 of 10 fails, Retry resumes from task 3; tasks 1 and 2 don't re-execute.

workflowExecutor.Retry(workflowId, false)

Restart a workflow

Restarts a terminated or failed workflow from the beginning. Pass true to use the most recently registered workflow definition.

workflowExecutor.Restart(workflowId, true)

Test workflows

Worker functions are plain Go functions and can be tested directly without a running server or workflow execution. Pass in a model.Task with the inputs you want to test and assert on the returned output.

func TestValidateOrderWorker(t *testing.T) {
task := &model.Task{
InputData: map[string]interface{}{
"order_id": "ORD-001",
"amount": 99.99,
"customer_id": "CUST-123",
},
}
result, err := validateOrder(task)
if err != nil {
t.Fatal(err)
}
taskResult := result.(*model.TaskResult)
if taskResult.OutputData["order_id"] != "ORD-001" {
t.Errorf("expected ORD-001, got %v", taskResult.OutputData["order_id"])
}
}

Run your tests with:

go test -v

Next steps

  • Examples: Browse the full examples directory on GitHub.