How to Manage Your Asynchronous API With DynamoDB Streams

Do you want the best of speed and resilience? Here’s how to implement DynamoDB Streams into your architecture.

Written by Vikas Solegaonkar
Published on Jun. 15, 2022
Brand Studio Logo

Responsiveness is one of the most important parameters for the success of any web application. Asynchronous processing is the solution for attaining this responsiveness. A server request from the browser should return immediately, without waiting for completion. The data flow should be designed in a way that doesn’t depend upon an immediate response from the server.

There are several architecture patterns to do this but a major problem with asynchronous processing is error handling. How will the client know if the request failed? We shouldn’t lose data in the process. In fact, a fire and forget service can’t afford to fail. Even if the processing fails for some reason, the data has to reach the DB.

DynamoDB Streams provides a cool solution to this problem. Let’s check it out.

What Is DynamoDB Streams?

Most traditional databases have the concept of triggers, which are events generated when some data changes in the DB. DynamoDB streams are quite similar but with one difference: Instead of generating a distinct trigger per data change, DynamoDB Streams generates a stream of events that flow into a target — Lambda or Kinesis.

 

Why Use DynamoDB Streams?

DynamoDB Streams generates a stream of events that flow into a target — Lambda or Kinesis. We can have a Lambda function triggered by such events that can process this data. The incoming API call can directly dump the data into the DynamoDB using API gateway service integration. This ensures very low response time in the API. We can configure the DynamoDB Streams to invoke a Lambda function that can process the incoming API call.

We have an advantage here; the data is already available in our DB before we start processing it. Even if the downstream processing fails, the data is already available in the DB. In the unlikely event of DB insert/update failure, the API itself will return an error and the client will be able to handle it.

Thus, with DynamoDB Streams, we have best of both worlds: low response time and error resilience. Let’s try to implement this in our AWS account.

More From VikasFauna: An Introduction

 

Lambda Function Code for Asynchronous Invocation

Start with creating a Lambda function, which is simple. Just go to the Lambda console and create a new Lambda function. I’ll call it StreamProcessor (you can choose any other name you like). Add the below code:

const AWS = require('aws-sdk');
const ddb = new AWS.DynamoDB.DocumentClient();
const TABLE_NAME = "Restaurant";
const TTL = 300;
exports.handler = async(event) => {
   var plist = [];
   event.Records.forEach(record => {
       if (record.eventName == "INSERT") {
           plist.push(processInsert(record.dynamodb.NewImage));
       }
   });
   await Promise.all(plist);
   const response = {
       statusCode: 200,
       body: JSON.stringify('Hello from Lambda!'),
   };
   return response;
};
const processInsert = async(newImage) => {
   // Business Logic to process the input
   console.log("Processing: " + JSON.stringify(newImage));
   var error = await businesslogic(newImage);
   if (!error) {
       await ddb.update({
           TableName: TABLE_NAME,
           Key: { id: newImage.id.S },
           UpdateExpression: "set #error = :error",
           ExpressionAttributeValues: { ":error": error },
           ExpressionAttributeNames: { "#error": "error" }
       }).promise();
   }
   else {
       await ddb.update({
           TableName: TABLE_NAME,
           Key: { id: newImage.id.S },
           UpdateExpression: "set #ttl = :ttl",
           ExpressionAttributeValues: { ":ttl": TTL + Math.floor(Date.now() / 1000) },
           ExpressionAttributeNames: { "#ttl": "ttl" }
       }).promise();
   }
};
const businesslogic = async(input) => {
   return;    // return "Error Details" in case of any error
};

 

IAM Role for Lambda

This Lambda function should have enough permissions to operate on the streams of DynamoDB, along with the update permissions. To get this, create a new IAM role.

Along with the usual Lambda permissions, include the code below for DB permissions:

{
   "Version": "2012-10-17",
   "Statement": [
       {
           "Sid": "VisualEditor0",
           "Effect": "Allow",
           "Action": [
               "dynamodb:DescribeStream",
               "dynamodb:UpdateItem",
               "dynamodb:GetShardIterator",
               "dynamodb:GetItem",
               "dynamodb:UpdateTable",
               "dynamodb:GetRecords"
           ],
           "Resource": [
               "arn:aws:dynamodb:us-east-1:1234567890:table/TableName",
               "arn:aws:dynamodb:us-east-1:1234567890:table/TableName/index/*",
               "arn:aws:dynamodb:us-east-1:1234567890:table/TableName/stream/*"
           ]
       }
   ]
}

More on Lambda FunctionsNeed to Write Lambda Functions in Python? Here’s How.

 

DynamoDB Table

Next, we create the table in DynamoDB. This table should have a primary key, ID. On the “Exports & Streams tab,” click “Enable DynamoDB Streams” (not the “Kinesis streams”).

Asynchronous-API-DynamoDB-Streams

Along with that, add the Lambda triggers to the stream. Click on “Create Trigger” and choose the Lambda function we created above. And that sets up the stream plus the trigger. Next, go to the “Additional Settings” tab. There, we can enable the TTL (time to live) as below.

Asynchronous-API-DynamoDB-Streams

That will set up the DynamoDB table for us.

What Is a DynamoDB Stream? (And Why You Should Be Doing It!)

 

API Gateway

Finally, we come to the API gateway to create an API the client can invoke. Here we configure the API gateway to add the request into the database table, which we do using the AWS service integration of the API gateway. Let’s work on that.

Create a new REST API on the API gateway. Add a Put method in it. Integrate the request with DynamoDB as below:

Asynchronous-API-DynamoDB-Streams

And add this to the JSON mapping template.

{
   "TableName":"TableName",
   "Item": {
       "id": {"S": "$context.requestId"},
       "request": {"S": "$util.escapeJavaScript($input.body)"}
   }
}

For every API call, this will add a new item to the table and the request ID will be the key. This insert will trigger the Lambda function and take the processing ahead.

More Advice From Built In ExpertsCreate React App and TypeScript — A Quick How-To

 

Benefits of DynamoDB Streams

You might be asking, “Why not directly call the Lambda from API Gateway?” There are two reasons. First and foremost is the faster response time. Directly adding data from API gateway to DynamoDB is a lot faster than invoking a Lambda. This means the client application sees a super-fast response.

Secondly, resilience and error handling get complex when we invoke Lambda directly from the API gateway. What do we do if the Lambda fails halfway through? Do we retry the same API? Do we invoke another API that will heal the damage? This is too much intelligence to be carried to the client. It’s best if the server manages these problems for itself.

In short, this means the client can dump data into the DynamoDB and can rest assured that their data will be accepted and processed by the server. The data won’t be lost because it’s already in the DB. The processing and error handling will then be localized on the server, which leads to an overall cleaner architecture.

Explore Job Matches.