Orchestrating Long-Running APIs
This tutorial demonstrates how to handle long-running API tasks efficiently using Orkes Conductor. You’ll learn how to handle HTTP timeouts by invoking tasks asynchronously and polling for their status.
In this tutorial, you will:
- Create an AWS Lambda function to simulate a long-running task.
- Store execution status in Amazon DynamoDB.
- Set up Amazon API Gateway endpoints to invoke and monitor tasks.
- Define and run a Conductor workflow.
By the end, you'll have a fully functional workflow that orchestrates long-running API tasks effectively.
Understanding the challenge
Problem: Handling long-running API calls
Conductor’s HTTP task is designed for synchronous API calls, with a default timeout of 60 seconds. This can be problematic when dealing with APIs that require extended processing time (e.g., 4 minutes or more).
Solution: Use asynchronous invocation with polling
To address this, you can use employ a two-step pattern:
- Trigger the long-running task asynchronously using an HTTP task.
- Poll the task status using an HTTP Poll task until it completes.
The long running workflow
Here is the long running workflow that you’ll build in this tutorial:
Step 1: Create an Amazon DynamoDB table to track task status
To store task status, begin by creating a DynamoDB table:
- Sign in to the AWS Console.
- In the navigation pane, go to Services > Amazon DynamoDB.
- In the left menu, select Tables, and then choose Create table.
- Set the Table name to lambda-invocation-status.
- Set the Partition Key to requestId.
The requestId is a unique identifier used to track each task instance throughout the workflow. It is provided when running the workflow, passed to the Lambda function, and used later to poll for status from DynamoDB.
For more information, refer to DynamoDB documentation.
Step 2: Set up a long-running task in AWS Lambda
Create an AWS Lambda function that stimulates a long-running task. It will take requestId as input and store its execution status in DynamoDB.
Create a Lambda function
To create a Lambda function:
- In the AWS Console, go to Services > AWS Lambda.
- Select Create a function.
- Enter a Function name.
- In Runtime, select Python.
- Select Create function.
- After the function is created, go to the Code tab, and replace the default code with the following:
import json
import time
import boto3
dynamodb = boto3.resource('dynamodb')
table = dynamodb.Table('lambda-invocation-status')
def lambda_handler(event, context):
request_id = event['requestId']
action = event.get("action")
if action == "invoke":
table.put_item(Item={'requestId': request_id, 'status': 'PROCESSING'})
# Simulate long processing time
time.sleep(2)
process_long_running_task(request_id)
return {
"statusCode": 202,
"body": json.dumps({"requestId": request_id, "status": "PROCESSING"})
}
elif action == "status":
response = table.get_item(Key={'requestId': request_id})
item = response.get('Item')
if item:
return {"statusCode": 200, "body": json.dumps(item)}
else:
return {"statusCode": 404, "body": json.dumps({"status": "NOT_FOUND"})}
def process_long_running_task(request_id):
time.sleep(25) # Simulate processing time
table.update_item(
Key={'requestId': request_id},
UpdateExpression='SET #s = :val',
ExpressionAttributeNames={'#s': 'status'},
ExpressionAttributeValues={':val': 'COMPLETED'}
)
- In Configurations > General configuration, increase the Timeout to 1 minute.
Create an IAM policy
Next, create an IAM policy to allow the Lambda function to access DynamoDB.
- Go to IAM > Roles > Create role.
- In Trusted entity type, select AWS service.
- In Use case, select Lambda.
- Select Next.
- In Permission policies, select AmazonDynamoDBFullAccess.
- Select Next.
- Enter a Role name and then choose Create role.
Attach IAM policy to Lambda function
Next you need to attach the policy to the Lambda function.
- Open the Lambda function.
- Go to Configurations > Permissions.
- In Execution role, select Edit.
- In Existing role, select the policy you just created.
- Select Save.
Step 3: Set up API Gateway Endpoints
We’ll now expose our Lambda function using Amazon API Gateway. For this, create the following two API endpoints:
POST /invoke-lambda
: Triggers the Lambda function asynchronously.GET /{requestId}
: Retrieves the status of the Lambda execution.
Create REST API
- In the AWS Console, go to Services > Amazon API Gateway.
- In Choose an API type, select REST API > Build.
- Enter an API name, and select Create API.
Create resources
Next, we need to create resources for each API endpoint.
POST /invoke-lambda Endpoint
- In Resources, select Create resource.
- In Resource path, select /.
- In Resource name, enter invoke-lambda, then select Create resource.
- Select Create method, and choose Method type as POST.
- Select Integration type as Lambda.
- In Lambda function, select the Lambda function created in Step 2.
- Select Create method.
Next, let’s add the integration request mapping.
- In Integration request tab, select Edit.
- In Mapping templates, set Content type as application/json.
- In Template body, paste this:
#set($context.requestOverride.header.X-Amz-Invocation-Type = "Event")
{
"action": "invoke",
"requestId": "$input.path('$.requestId')"
}
Now, let’s configure the method response.
- Go to Method response tab, select Edit.
- Delete the default 200 response.
- Select Create method response.
- Set HTTP status code to 202.
- Set Header name to Content-Type.
Finally, set up an integration response to return a processing acknowledgment.
- In Integration response tab, delete the existing 200 responses.
- Select Create response.
- Set Method response status code to 202.
- Select Create, then select Edit.
- In Mapping templates, set Content type as application/json.
- In Template body, paste this:
{
"requestId": "$input.json('$.body.requestId')",
"status": "PROCESSING"
}
GET /{requestId} Endpoint
- In Resources, select Create resource.
- In Resource path, select /.
- In Resource name, enter {requestId}, then select Create resource.
- Select Create method, and choose Method type as GET.
- Select Integration type as Lambda.
- In Lambda function, select the Lambda function created in Step 2.
- Select Create method.
Next, add the integration request mapping.
- In Integration request tab, select Edit.
- In Mapping templates, set Content type as application/json.
- In Template body, paste this:
{
"action": "status",
"requestId": "$input.params('requestId')"
}
Next, add a header to the method response.
- Go to Method response tab, select Edit.
- For 200 responses, select Add header.
- Set Header name to Content-Type.
- Set the Response body as application/json.
Finally, let’s confirm the integration response configuration. In the Integration response tab, verify that the method response status code is 200.
Deploy REST API
Next, deploy the APIs to a stage (e.g., test, prod).
To deploy your API:
- Select Deploy API.
- In Stage, select New stage.
- Enter a Stage name.
- Select Deploy.
Once deployed, note the Invoke URL which will be of the format:
https://<your-api-gateway-id>.execute-api.<your-region>.amazonaws.com/<stage>
For more details, refer to API Gateway REST API setup guide.
Step 4: Create Conductor workflow
Use the free Developer Playground to create workflows.
Now, define a Conductor workflow that:
- Invokes the Lambda function using an HTTP task.
- Polls the status endpoint until the task is completed using the HTTP Poll task.
To create a workflow:
- Go to Definitions > Workflow from the left navigation menu on your Conductor cluster.
- Select + Define workflow.
- In the Code tab, paste the following code:
{
"name": "LongRunningAPIWorkflow",
"description": "Sample workflow",
"version": 1,
"tasks": [
{
"name": "InvokeLambdaTask",
"taskReferenceName": "invokeLambda",
"inputParameters": {
"http_request": {
"uri": "https://<your-api-gateway-id>.execute-api.<your-region>.amazonaws.com/<stage>/invoke-lambda",
"method": "POST",
"headers": {
"Content-Type": "application/json",
"X-Amz-Invocation-Type": "Event"
},
"body": {
"requestId": "${workflow.input.requestId}"
}
}
},
"type": "HTTP"
},
{
"name": "http_poll",
"taskReferenceName": "http_poll_ref",
"inputParameters": {
"http_request": {
"uri": "https://<your-api-gateway-id>.execute-api.<your-region>.amazonaws.com/<stage>/${workflow.input.requestId}",
"method": "GET",
"accept": "application/json",
"contentType": "application/json",
"terminationCondition": "(function(){ return $.output.response.statusCode == 200 && $.output.response.body.body.status == \"COMPLETED\";})();",
"pollingInterval": "5",
"pollingStrategy": "FIXED",
"encode": true
}
},
"type": "HTTP_POLL"
}
],
"inputParameters": [
"requestId"
],
"schemaVersion": 2
}
- Select Save > Confirm.
Modify workflow
Next, let’s modify the workflow to include your deployed API Gateway URLs.
- In InvokeLambdaTask, replace the uri value with your deployed API endpoint for invoking the Lambda:
https://<your-api-gateway-id>.execute-api.<your-region>.amazonaws.com/<stage>/invoke-lambda
- In http_poll, replace the polling URL with the endpoint for checking Lambda status:
https://<your-api-gateway-id>.execute-api.<your-region>.amazonaws.com/<stage>/${workflow.input.requestId}
- Save the workflow
Step 5: Run workflow
Now, let’s test the workflow:
- Go to the Run tab.
- Enter the input parameters. For example:
{
"requestId": "12345"
}
- Select Run Workflow.
On running the workflow, the InvokeLambdaTask will trigger the Lambda function. The http_poll task will continuously check the status until the task is completed.
Troubleshooting
- Workflow gets stuck on polling: Check API Gateway response format.
- Lambda execution fails: Ensure it has the right IAM permissions for DynamoDB.
- Polling stops too soon: Adjust pollingInterval for the HTTP task in the workflow definition.