Building the Next-Gen Retail Experience with Apache Kafka and Computer Vision

Part 4 of 4

Authors: Stuart Eudaly, Josh Patterson, Austin Harris

Date: January 7th, 2020

In the last article, we set up Confluent’s distribution of Kafka. We also used Kafka Connect to import the inventory table from a MySQL database and store it as a topic in the Kafka cluster. In this article, we'll actually join detected objects with inventory and perform real-time windowed aggregations to create the Green Light Special application.

Other parts of the series:

Introduction to Kafka Streams: KStreams and KTables

In order to do the joins and windowed aggregations needed to get the Green Light Special application up and running, we’re going to utilize Kafka Streams. Essentially, Kafka Streams takes data from a Kafka topic as input, performs some operation on it, then outputs that data to another topic. The Kafka Streams library makes it relatively easy to apply just about any logic to data in Kafka in real time. The join we’re doing between streaming data and an inventory table is commonly seen in real-time applications:

"I’ll argue that the fundamental problem of an asynchronous application is combining tables that represent the current state of the world with streams of events about what is happening right now."

Jay Kreps' Blog Article: "Introducing Kafka Streams: Stream Processing Made Simple"

Two of the core classes used consistently in the design of streaming applications are the KStream and KTable classes. Below we describe their function and how they fit in with the rest of the streaming pipeline.

"A KStream is an abstraction of a record stream, where each data record represents a self-contained datum in the unbounded data set. Using the table analogy, data records in a record stream are always interpreted as an “INSERT” – think: adding more entries to an append-only ledger – because no record replaces an existing row with the same key. Examples are a credit card transaction, a page view event, or a server log entry."

Streams Concepts from Confluent Documentation

"A KTable is an abstraction of a changelog stream, where each data record represents an update. More precisely, the value in a data record is interpreted as an “UPDATE” of the last value for the same record key, if any (if a corresponding key doesn’t exist yet, the update will be considered an INSERT). Using the table analogy, a data record in a changelog stream is interpreted as an UPSERT aka INSERT/UPDATE because any existing row with the same key is overwritten."

Streams Concepts from Confluent Documentation

So, a KTable gives us the latest view of a stream of events by key, while a KStream shows every record as each is considered an independent piece of information. We’ll be using KStreams and KTables (among other things) extensively in our join and windowed aggregation code.

Rekeying the Inventory Topic

Kafka, as well as just about any other system, requires the keys (both the key type and actual value of the key) to match before a join will occur. If we take a look at the schemas for both the shopping_cart_object and mysql_tables_jdbc_inventory topics, we notice that the keys don’t match up. In fact, mysql_tables_jdbc_inventory has null keys! As it turns out, when you ingest data from a table in a database using the JDBC connector, it will have a null key by default. Here’s example output from each topic:

./bin/kafka-avro-console-consumer --topic shopping_cart_objects --from-beginning --property print.key=true --bootstrap-server localhost:9092
"sportsball" {"timestamp":1568925073631,"image_name":"sportsball","class_id":1,"class_name":"sportsball","score":99.0,"box_x":1,"box_y":2,"box_w":3,"box_h":4} "spoon" {"timestamp":1568925134323,"image_name":"spoon","class_id":5,"class_name":"spoon","score":99.0,"box_x":1,"box_y":2,"box_w":3,"box_h":4} "cup" {"timestamp":1568925194456,"image_name":"cup","class_id":9,"class_name":"cup","score":99.0,"box_x":1,"box_y":2,"box_w":3,"box_h":4}
./bin/kafka-avro-console-consumer --topic mysql_tables_jdbc_inventory --property print.key=true --from-beginning --bootstrap-server localhost:9092
null {"id":1,"name":{"string":"cup"},"upsell_item":{"string":"plate"},"count":{"int":100},"modified":1554994983000} null {"id":2,"name":{"string":"bowl"},"upsell_item":{"string":"cup"},"count":{"int":10},"modified":1554994994000} null {"id":3,"name":{"string":"fork"},"upsell_item":{"string":"spoon"},"count":{"int":200},"modified":1554995002000} null {"id":4,"name":{"string":"spoon"},"upsell_item":{"string":"fork"},"count":{"int":10},"modified":1554995012000} null {"id":5,"name":{"string":"sportsball"},"upsell_item":{"string":"soccer goal"},"count":{"int":2},"modified":1554995020000} null {"id":6,"name":{"string":"tennis racket"},"upsell_item":{"string":"tennis ball"},"count":{"int":10},"modified":1554995038000} null {"id":7,"name":{"string":"frisbees"},"upsell_item":{"string":"frisbee goal"},"count":{"int":100},"modified":1554995048000}


As you can see, shopping_cart_objects already has the name of the item as the key. However, because mysql_tables_jdbc_inventory has null keys, we’ll need to rekey the data in that topic so that Kafka Streams knows which items to join. For the keys to match up, we’ll change mysql_tables_jdbc_inventory from null to the name of the item. We will also need to send the newly rekeyed data to a new topic in Kafka - inventory_rekeyed. Below is the chunk of code where we accomplish this:

The code above first pulls in the inventory from the mysql_tables_jdbc_inventory topic using a KStream. Then, the .map function is used to assign the new key and the data is sent to the inventory_rekeyed topic. Now, we should be ready to do the join on the two topics and enrich our detected objects in real time!

Joining Detected Objects and Inventory

Now that we have the keys matching between the two topics, it’s time to perform the join. This join will enrich the detected objects with data from inventory. There are three basic types of joins available in Kafka Streams:

  • KStream-KStream join
  • KTable-KTable join
  • KStream-KTable join

Because the objects in carts are independently detected and output to Kafka, a KStream fits best for shopping_cart_objects. On the other hand, inventory from mysql_tables_jdbc_inventory fits best in a KTable, as each record represents a row of the original table from MySQL. That means we’ll be utilizing the KStream-KTable join. Here a few things to note about a KStream-KTable join:

  • They are non-windowed joins.
  • They allow for table lookups (data enrichment) by the KStream against the KTable.
  • They are triggered when a new record is received from the KStream.

Below is the code where the join occurs:

This code creates a KTable from the inventory_rekeyed topic, creates a KStream from the shopping_cart_objects topic, then performs the join. The join returns the name of the item in the cart, the item that should be paired with it (the upsell item), and the number of items. Because each cart item is a separate record in the KStream, the count for each will always be 1. This wouldn’t matter, except that we’ll be using those counts in the windowed aggregations in the next section. Read on!

Windowed Aggregations

At this point, our code has joined the data from shopping_cart_objects and mysql_tables_jdbc_inventory using a KStream and KTable. However, the result of that operation is another KStream. That means that the Big Cloud Dealz team would see a constant stream of individual items detected in carts (and enriched with data from inventory), rather than a sum of all items in all carts in the store. This can be fixed by converting the output KStream to a KTable, which can be used to show an ever-increasing aggregate of the items. That solves half the problem, but doesn’t ever dump items that are no longer in the carts. To accomplish this, we’ll utilize a Windowed KTable so that we only see aggregates for a specified time window. As this is a proof of concept, we’ll assume that an item is only “valid” in a cart for one minute. In other words, we want to see the totals of all items in all carts every 60 seconds. Let’s take a look at how we do it:

The code above:

  1. defines the duration of the time window.
  2. creates a Windowed KTable.
  3. performs a .groupByKey to (you guessed it) group the items by key.
  4. windows the KTable by the window length we defined earlier.
  5. performs a .reduce that simply sums the aggregate and the newly counted item.
  6. suppresses the output using .suppress so that we get one output per time window instead of every time an object is detected. The .suppress operator requires a defined “grace period” on the time window in order know when the window is closed (which is why we added .grace to the time window earlier). For more information on the .suppress operator, read this.
  7. outputs the final results to System.out using a KStream (and some hacked-together string formatting to make it look nice).

To see this code in action, first run StreamingJoin_CartCountsAndInventoryTopics.java, then start ObjectDetectionProducer.java in a separate terminal (or TestDetectionProducer.java).

# 1. Start join and aggregate code
mvn exec:java -Dexec.mainClass="com.pattersoncsultingtn.kafka.examples.tf_object_detection.StreamingJoin_CartCountsAndInventoryTopics"

# 2. Start producer code
mvn exec:java -Dexec.mainClass="com.pattersoncoultingtn.kafka.examples.test_producer.TestDetectionProducer"

Here is some sample output from our TestDetectionProducer.java that randomly outputs an item every 15 seconds:

09/20/2019 14:40:34 item: spoon count: 1 upsell: fork 09/20/2019 14:40:34 item: tennis racket count: 1 upsell: tennis ball ------------------- 09/20/2019 14:41:35 item: bowl count: 1 upsell: cup 09/20/2019 14:41:35 item: spoon count: 1 upsell: fork 09/20/2019 14:41:35 item: sportsball count: 1 upsell: soccer goal 09/20/2019 14:41:35 item: tennis racket count: 1 upsell: tennis ball ------------------- 09/20/2019 14:42:35 item: bowl count: 1 upsell: cup 09/20/2019 14:42:35 item: sportsball count: 1 upsell: soccer goal 09/20/2019 14:42:35 item: tennis racket count: 2 upsell: tennis ball ------------------- 09/20/2019 14:43:35 item: cup count: 2 upsell: plate 09/20/2019 14:43:35 item: frisbees count: 1 upsell: frisbee goal 09/20/2019 14:43:35 item: tennis racket count: 1 upsell: tennis ball ------------------- 09/20/2019 14:44:35 item: spoon count: 3 upsell: fork 09/20/2019 14:44:35 item: frisbees count: 1 upsell: frisbee goal


As you can see, every minute we get output that includes the items in carts, how many there are across all carts, and the upsell items that should be paired with those items. Because our TestDetectionProducer.java code outputs an item every 15 seconds, we see a total of 4 items every 1-minute time window (with the exception of the first window, where the code was started halfway through the window). Because the windowing operation only includes items seen in that particular 1-minute time window, it also drops items from older time windows (rather than an ever-increasing aggregate). So, the end result is that we only see items from the defined time window each time a window closes. Sweet!

Resetting Streaming Applications

If you run the above stream applications and then try to rerun them later, they will not automatically reprocess the data. To reset the application state, run the following command:

./bin/kafka-streams-application-reset --application-id pct-cv-streaming-join-counts-inventory-app-3 --input-topics mysql_tables_jdbc_inventory,shopping_cart_objects,inventory_rekeyed

For more information on resetting streaming applications, check out these resources:
https://www.confluent.io/blog/data-reprocessing-with-kafka-streams-resetting-a-streams-application/
https://kafka.apache.org/20/documentation/streams/developer-guide/app-reset-tool.html

Putting it All Together for a New Retail Experience

In this post, we utilized Kafka Streams to join data from two Kafka topics and perform windowed aggregations on the results. This has been a fairly involved process to get the Big Cloud Dealz team the desired functionality in their Green Light Special application. While this application involves many pieces working together to function, Kafka and the Kafka Streams library enable the manipulation of real-time streaming data that would be rather difficult otherwise.

At this point we have put together the original concept laid out in the beginning of this series (Part 1), as reflected in the diagram above. The core components we built in this series are:

  1. A shopping cart (2.0) (Part 2) with an attached camera and wifi unit (likely an ARM-based embedded system) with an object detection model loaded to detect specific objects from the camera.
  2. A Kafka cluster (Part 3) back in the data center to collect all of the incoming data from the shopping carts, organizing it into logical topics for processing.
  3. A group of streaming applications leveraging Kafka's Streaming API (as detailed in this post) to give the retail store's team a real-time look at what items are in customers' baskets across the store.

With this prototype running, the Big Cloud Dealz team is able to view their brick and mortar shopping floor much the same way as an online retailer views the aggregate activity of their online users. It sets the store operations team up to dynamically recommend upsell items to shoppers strategically at the right time based on what is happening inside the shopping carts. The Big Cloud Dealz team is able to roll out the proof of concept Green Light Special system to one of their locations to begin live testing. Big Cloud Ron is exceptionally pleased as well:

For a further discussion on architecture ideas around Kafka and Confluent-based infrastructure, please reach out to the Patterson Consulting team or check out our Kafka Offerings page. We'd love to talk with your team about topics such as:

  • DevOps Managed Kafka Clusters
  • Kafka Architectural Consulting
  • Custom Kafka Streaming API Application Design
  • Hardware design for your data center
  • computer vision model construction and deployment