Kafka Streams is a client library for stream processing applications that makes writing scalable, fault-tolerant streaming applications easy. It requires no other dependencies other than kafka itself. A streams application can be package, and deployed like any other Java application. A kafka streams application processes data in the cluster as the input topic and then outputs its results back into the cluster as a new topic.
For now, we will use the high-level Streams DSL (Domain Specific Language) to create our application. However, Kafka does provide a Low-Level Processor API if you need a bit more control. If you would like to learn more about the APIs themselves Confluent provides some great examples. We will use windowed aggregation in order to show how easy it is to compute a moving average in the example below.
All code in this article can be found here.
In order to introduce the Kafka Streams API, we will implememnt an example Kafka streams application that will calculate the moving average of a given topic. Each topic will consist of a single stream of data from a sensor. This example is broken down into three main components:
- Sensor Stream Producer
- Sensor Stream Processor
- Sensor Stream Aggregator
The Stream Producers will publish readings to sensor topics. The stream processor will read from a single topic, and calculate the moving average for the given topic. In order to do so, we will implement our own aggregator that will be used to keep track of the number of readings, and the sum in order to compute the average.
Since we do not have access to sensors, we will simulate our own. We will use a subset of data from UCI’s “Gas sensors for home activity monitoring”. You can download the dataset here.
Data from each sensor will be published via the producer to the sensors topic. The topic is determined by the sensors type. Each sensor will publish its own topic, because order is required. The date, time, sensor type and reading make up a single messages. Avro is used for serialization and deserialization in this example, if you are new to Avro I would suggest checking out this article. The schema is shown below:
Next, configure your Producer properties:
Lastly, for each record in the CSV, publish a message for each sensor consisting of the sensor reading, type, date and time. This is shown in the code below:
The sensor aggregator is responsible for defining how the data should be aggregated. The aggregator consists of two methods:
calculateAverage(). The average is calculated for the duration of pre-defined windows of time. For a given time window, an aggregator is initialized and the readings that are recieved are passed via the
add() method. The
calculateAverage() method is called when the time window has completed.
The SensorStreamProcessor uses the Kafka Streams API windowed aggregation to aggregate sensor data into a new stream. First, a StreamBuilder is initialized and a KStream is created using
builder.stream(“topicName”). The “topicName” can be any of the sensor types at the top of the CSV file. Here we build a stream from the "PT08S1" sensor:
Now that we have the stream we use the
aggregate() method along with our sensor aggregator, then we specify the window size, and then calculate the average for each window.
If you would like to view the new stream, you can use the code below to print off each message in your aggregated stream:
Running The Example
To run this example, ZooKeeper needs to be up and running. Run:
Next, start the Kafka Broker:
Once these are up and running, you will need to create your topic. Run the command below to create the topic for a single sensor. While our producer does create a stream for each sensor, the stream processor application requires only one sensor. We will use the "PT08S1" sensor. Run:
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 \
--partitions 1 --topic PT08S1
Now run our stream processor:
mvn exec:java -Dexec.mainClass="com.pattersonconsultingtn.kafka.examples.AirQualityProcessor.SensorStreamProcessor"
Lastly, start our stream producer:
mvn exec:java -Dexec.mainClass="com.pattersonconsultingtn.kafka.examples.PollutionDataTracker.PollutionSensorProducer"
Your output should look like this:
In this post, we learned how to implement a simple stream prcoessing application using the Kafka Streams API. Allowing us to get a high level idea of how the APIs actually work. Our goal to consume stream data from our sensors, and calculate the windowed average was accomplished using minimum lines of code thanks to the API. A future blog post will cover how we can process the same data using KSQL.