Home>>Data Science>>Put data to Amazon Kinesis Firehose delivery stream using Spring Boot
Amazon Kinesis Firehose
Data ScienceTech

Put data to Amazon Kinesis Firehose delivery stream using Spring Boot

If you work with streams of big data which have to be collected, transformed, and analysed, you for sure would have heard of Amazon Kinesis Firehose. It is an AWS service used to load streams of data to data lakes or analytical tools, along with compressing, transforming, or encrypting the data.

You can use Firehose to load streaming data to something like S3, or RedShift. From there, you can use a SQL query engine such as Amazon Athena to query this data. You can even connect this data to your BI tool and get real time analytics of the data. This could be very useful in applications where real time analysis of data is necessary.

In this post, we’ll see how we can create a delivery stream in Kinesis Firehose, and write a simple piece of Java code to put records (produce data) to this delivery stream. We’ll setup Kinesis Firehose to save the incoming data to a folder in Amazon S3, which can be added to a pipeline where you can query it using Athena. Come to think of it, you can really complicate your pipeline and suffer later in the future when things go out of control. It is advised to take a step back during the architecture phase of your project and analyse where all your pipeline could cause bottlenecks. It is true that most AWS services are fully managed and auto-scaling. But it never hurts to have control on your architecture and design it in such a way that the pipeline doesn’t become the weakest structure of your project. Anyway, let’s move on.


Create a Delivery Stream in Kinesis Firehose

To start sending messages to a Kinesis Firehose delivery stream, we first need to create one. For this, let’s login to the AWS Console, and head over to the Kinesis service. You should see a button to create a new Firehose delivery stream on the Kinesis home page. Let’s hope that you found that button. There are four steps involved in creating a delivery stream. Let’s look at each of them, one by one:

Step 1: Name and source

Here, we specify the name of the stream, and also how the stream will get the data, which is the source part. Give a name of your choice for the stream. The page should look something like the screenshot below:

Next, we have to specify the source of data for this stream. There are two ways you can put data to a stream:

  • Using the Put data APIs that AWS provides, which can be integrated to your application to send data directly to the stream.
  • Pipe the data from another data stream.

For this example, we’ll use the first option, Direct PUT or other sources. Select this option and click Next at the bottom of the page to move to the second step.

Step 2: Process records

One of the many features of Kinesis Firehose is that it can transform or convert the incoming data before sending it to the destination. In this second step, we can tell Kinesis what it has to do with the incoming data for this stream. For now, we’ll keep everything default. So that means no transformation of any kind, and also, no converting the format of the data. Ideally, you’d want to convert your data to Parquet format, which both compresses the data and makes processing more efficient. Keep both the configuration disabled and click the Next button at the bottom of the page.

Step 3: Choose destination

Now that we have configured Firehose to get the data, we now have to tell it where to send that data. There are four options:

  1. Amazon S3
  2. Amazon Redshift
  3. Amazon Elasticsearch Service
  4. Splunk

Each of these destination has it’s own configuration. Because we’re not really worried about anything else for now and just want to send data to Firehose, we’ll save our data to S3, which is also the default option.

Scroll down and you’ll find the option to provide an S3 bucket where the data has to be stored. You can either select an existing bucket, or create a new one.

Next, we have the prefix option. So anything that Firehose writes to S3 will be partitioned by default, in the format “YYYY/MM/DD/HH” (in UTC). You can of course override this, but there has to be at least one timestamp in your prefix. If you haven’t provided a prefix which has a timestamp, Firehose will add one by itself, as it’s mandatory. Also, you can’t add a part of your incoming data as a prefix. So don’t plan on designing it that way. For now, leave it empty or give a random string.

Similarly, you can give a prefix for errors as well. I’d leave it empty for now, as this is just a POC. Click the Next button at the bottom of the page. We’ll move to the next step in the process.

Step 4: Configure settings

In this step, we specify a few important configuration for how our data will be stored in S3. I’ll explain each of them, but I’d recommend leaving them at the default settings for this POC.

First, we have the buffer conditions. There are two pieces of configuration here, buffer size and buffer interval. Kinesis Firehose buffers the incoming data before writing it to S3. Writing each piece of message received to S3 will be very expensive. So the data is buffered and written to S3 in bulk. You can specify the buffer size, for example 5MB, and a buffer interval, for example 300 seconds. So whichever happens first will be considered. For example, if the buffer reaches the 5MB size limit in just 10 seconds, the data will be written to S3 and the buffer will be cleared. Also, if the data is not yet 5MB, but 300 seconds have elapsed since the last write, whatever data is in the buffer will be written to S3 and the buffer will be cleared.

Next, we have the options of compressing and encrypting the data in S3. We can use compression algorithms such as GZIP and SNAPPY to compress the data. This will come into consideration when we’re dealing with GBs of data. For this POC, keep it disabled. We also don’t need any encryption right now.

We can enable error logging if necessary. And maybe add some tags as well. But I don’t see the need for either of them for this POC. So let’s just proceed to the bottom of the page.

Here, we need to specify an IAM role which has access to write to S3. Kinesis Firehose will use this IAM role to write the incoming data to S3. You can either choose an already existing IAM role or create a new one. Once you have this selected, click the Next button at the bottom of the page.

You should now have the delivery stream created. We’re done with the creation of the stream. We can move on to the Java code which we’ll use to send data to this stream.


The Java Code

Before we get to the code, as usual, let’s take a look at the dependencies we need. I’ve created a Maven project for this. So I have a pom.xml file for dependency management. I’ve listed all the dependencies I’m using here:

<dependencies>

    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>aws-java-sdk</artifactId>
        <version>1.11.627</version>
    </dependency>
    <dependency>
        <groupId>com.amazonaws</groupId>
        <artifactId>amazon-kinesis-client</artifactId>
        <version>1.11.2</version>
    </dependency>
    <dependency>
        <groupId>org.codehaus.jettison</groupId>
        <artifactId>jettison</artifactId>
        <version>1.2</version>
    </dependency>

</dependencies>

Next, using the AWS SDK, we need to create an AmazonKinesisFirehose client, which will be used to put the data, or the record to the Firehose delivery stream. For this, we’ll first need to create the AWS credentials object. For this example, I’m using the BasicAWSCredentials class. This means, we have to provide the access key and the access secret. We’ll keep this data in the application.properties file, so that we can get the data into the Java code without hardcoding it. Since this is a Spring Boot application, we can use the @Value() annotation for this:

@Value("${aws.auth.accessKey}")
private String awsAccessKey;

@Value("${aws.auth.secretKey}")
private String awsSecretKey;

Using these two variables, we’ll create the credentials object and also the Firehose client:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(awsAccessKey, awsSecretKey);

AmazonKinesisFirehose firehoseClient = AmazonKinesisFirehoseClient.builder()
        .withRegion(Regions.US_WEST_2)
        .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
        .build();

Make sure you change the region in the code snippet above to match your Kinesis setup. Once we have this Firehose client created, we can start “putting records” to Firehose. But let’s look at the data we’re going to put.

JSONObject messageJson = new JSONObject();
messageJson.put("key1", "We are testing Amazon Kinesis Firehose!");
messageJson.put("integerKey", 123);
messageJson.put("booleanKey", true);
messageJson.put("anotherString", "This should work!");

logger.info("Message to Firehose: " + messageJson.toString());

As you can see, I’ve created a simple JSON object, which will be serialised, converted to bytes, and then sent to Firehose. Before we actually send the data, we have to create a PutRecordRequest object, using which we’ll specify to which stream we’re sending the data. The stream name is not hardcoded, but we’re getting it from the application.properites file, similar to the AWS credentials:

@Value("${aws.kinesis.firehose.deliveryStream.name}")
private String fireHoseDeliveryStreamName;

We’ll create the PutRecordRequest now:

PutRecordRequest putRecordRequest = new PutRecordRequest();
putRecordRequest.setDeliveryStreamName(fireHoseDeliveryStreamName);

Next, we’ll create a Record object, which will hold the actual data which is going to be sent across. We’ll set this record to the putRecordRequest object we just created, and call the .putRecord() method on the Firehose client object.

Record record = new Record().withData(ByteBuffer.wrap(messageJson.toString().getBytes()));
putRecordRequest.setRecord(record);

PutRecordResult putRecordResult = firehoseClient.putRecord(putRecordRequest);

That’s it. If you don’t get any error from the AWS client, you can be assured that the data you sent will be in your S3 bucket. Go check it out.

If you’re interested in the complete Spring Boot project for this example, head over to my Github repository, and fork it maybe?

1 Comments

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.

%d bloggers like this: