Author: Austin Harris
Date: July 26th, 2018
In this blog article we aim to introduce you to the Kafka Streams API by creating our own stream processing application.
With today's rapid adoption of sensors and IOT devices the demand for real-time processing applications has exploded. Large deployments of connected devices require specialized infrastructure in order to ingest large volumes of high frequency data.
In this post, we will use Kafka to set up a data pipeline in order to ingest streaming data from multiple simulated sensors, and create an application using the Kafka Streams API to process the data in real-time. This articles primary objective is to introduce Kafka Streams API. Therefore, we assume you have a Kafka broker up and running. If you are new to Kafka, I reccommend starting here.
Before we can define stream processing, we need to make sure we know what a stream is. While many explanations exist, we will be using the one commonly agreed upon:
"First and foremost, a data stream is an abstraction representing an unbounded dataset. Unbounded means infinite and ever growing. The dataset is unbounded because over time, new records keep arriving. This definition is used by Google, Amazon, and pretty much everyone else."
Narkhede, Shapira, and Palino, "Kafka: The Definitive Guide"
Stream processing is the processing of data continuously, a single record at a time. The Kafka: The Definitive Guide gives a great explanation of stream processing as well:
"As long as you are continuously reading data from an unbounded dataset, doing something to it, and emitting output, you are doing stream processing. But the processing has to be continuous and ongoing."
Narkhede, Shapira, and Palino, "Kafka: The Definitive Guide"
A few examples of streams would include:
Kafka Streams is a client library for stream processing applications that makes writing scalable, fault-tolerant streaming applications easy. It requires no other dependencies other than kafka itself. A streams application can be package, and deployed like any other Java application. A kafka streams application processes data in the cluster as the input topic and then outputs its results back into the cluster as a new topic.
For now, we will use the high-level Streams DSL (Domain Specific Language) to create our application. However, Kafka does provide a Low-Level Processor API if you need a bit more control. If you would like to learn more about the APIs themselves Confluent provides some great examples. We will use windowed aggregation in order to show how easy it is to compute a moving average in the example below.
All code in this article can be found here.
In order to introduce the Kafka Streams API, we will implememnt an example Kafka streams application that will calculate the moving average of a given topic. Each topic will consist of a single stream of data from a sensor. This example is broken down into three main components:
The Stream Producers will publish readings to sensor topics. The stream processor will read from a single topic, and calculate the moving average for the given topic. In order to do so, we will implement our own aggregator that will be used to keep track of the number of readings, and the sum in order to compute the average.
Since we do not have access to sensors, we will simulate our own. We will use a subset of data from UCI’s “Gas sensors for home activity monitoring”. You can download the dataset here.
Data from each sensor will be published via the producer to the sensors topic. The topic is determined by the sensors type. Each sensor will publish its own topic, because order is required. The date, time, sensor type and reading make up a single messages. Avro is used for serialization and deserialization in this example, if you are new to Avro I would suggest checking out this article. The schema is shown below:
Next, configure your Producer properties:
Lastly, for each record in the CSV, publish a message for each sensor consisting of the sensor reading, type, date and time. This is shown in the code below:
The sensor aggregator is responsible for defining how the data should be aggregated. The aggregator consists of two methods:
calculateAverage(). The average is calculated for the duration of pre-defined windows of time. For a given time window, an aggregator is initialized and the readings that are recieved are passed via the
add() method. The
calculateAverage() method is called when the time window has completed.
The SensorStreamProcessor uses the Kafka Streams API windowed aggregation to aggregate sensor data into a new stream. First, a StreamBuilder is initialized and a KStream is created using
builder.stream(“topicName”). The “topicName” can be any of the sensor types at the top of the CSV file. Here we build a stream from the "PT08S1" sensor:
Now that we have the stream we use the
aggregate() method along with our sensor aggregator, then we specify the window size, and then calculate the average for each window.
If you would like to view the new stream, you can use the code below to print off each message in your aggregated stream:
To run this example, ZooKeeper needs to be up and running. Run:
Next, start the Kafka Broker:
Once these are up and running, you will need to create your topic. Run the command below to create the topic for a single sensor. While our producer does create a stream for each sensor, the stream processor application requires only one sensor. We will use the "PT08S1" sensor. Run:
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 \
--partitions 1 --topic PT08S1
Now run our stream processor:
mvn exec:java -Dexec.mainClass="com.pattersonconsultingtn.kafka.examples.AirQualityProcessor.SensorStreamProcessor"
Lastly, start our stream producer:
mvn exec:java -Dexec.mainClass="com.pattersonconsultingtn.kafka.examples.PollutionDataTracker.PollutionSensorProducer"
Your output should look like this:
In this post, we learned how to implement a simple stream prcoessing application using the Kafka Streams API. Allowing us to get a high level idea of how the APIs actually work. Our goal to consume stream data from our sensors, and calculate the windowed average was accomplished using minimum lines of code thanks to the API. A future blog post will cover how we can process the same data using KSQL.