Building the Next-Gen Retail Experience with Apache Kafka and Computer Vision

Part 3 of 4

Authors: Stuart Eudaly, Josh Patterson, Austin Harris

Date: Nov 5th, 2019

In part 2 of our series, we looked at how Big Cloud Dealz would detect items in their Shopping Cart 2.0 using TensorFlow. In this post, we’ll look at setting up Confluent’s distribution of Kafka, creating a MySQL database for our in-store items, and pulling in that data to Kafka using Kafka Connect.

Installing and Configuring Kafka

Now that we have a plan for how to generate the shopping cart data to be sent to Kafka, we need to install and configure Kafka using Confluent’s platform so that the data has somewhere to go. Once the Confluent files are downloaded, run the commands listed below in order to start and configure Kafka for this demo. We suggest executing each command in its own terminal window or tab.

# (1) Start Zookeeper
$ ./bin/zookeeper-server-start ./etc/kafka/zookeeper.properties

# (2) Start Kafka
$ ./bin/kafka-server-start ./etc/kafka/server.properties

# (3) Start the Schema Registry
./bin/schema-registry-start ./etc/schema-registry/schema-registry.properties

# (4) Create topic for incoming objects
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic shopping_cart_objects

# (5) Create other topics that will be used later
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysql_tables_jdbc_inventory
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic inventory_rekeyed
./bin/kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic windowed_items

The previous commands start all the Kafka services we’ll need and create the shopping_cart_objects topic. At this point, our code should be ready to run. To get the code and run it locally, you’ll need both Git and Apache Maven installed. Download and prepare the code using the following commands:

git clone https://github.com/pattersonconsulting/BigCloudDealz_GreenLightSpecial.git
cd CartCamApp
mvn package

These commands pull in the files from the repository then build an uber jar in the ./target subdirectory. You'll actually need to cd into each of the three subdirectories (CartCamApp, TestKafkaProducer, and KafkaProcessingApp) and run mvn package to ensure all three pieces of code we'll use will work. We'll need one additional file to run ObjectDetectionProducer - the pre-trained detection model. The model can be downloaded here, or you can browse models and learn more about them here. To run the producer from Maven, use the following command, making sure to include the location of the downloaded model and the address of the Schema Registry as arguments:

mvn exec:java -Dexec.mainClass="com.pattersonconsultingtn.kafka.examples.tf_object_detection.ObjectDetectionProducer" -Dexec.args="/Users/josh/Downloads/faster_rcnn_resnet101_coco_2018_01_28/saved_model/ http://localhost:8081"

Looking at the output from the above command, you’ll see something like this:

Found sports ball (score: 0.9834) Box: 0.0042404234, 0.42308074, 0.3421962, 0.72577053 Found sports ball (score: 0.9471) Box: 0.02893363, 0.6947326, 0.37699723, 0.99258703 Sending avro object detection data for: basket_test_22.jpg Sending avro object detection data for: basket_test_22.jpg

Here we see the TensorFlow code finding objects in the images in the local directory included in the project /resources subdirectory. It will take these objects and individually send them to the Kafka cluster as messages to the shopping_cart_objects topic. You can check that they made it to the topic by executing the command below:

./bin/kafka-avro-console-consumer --topic shopping_cart_objects --from-beginning --property print.key=true --bootstrap-server localhost:9092

You should see console output similar to this:

"sportsball" {"timestamp":1568925073631,"image_name":"sportsball","class_id":1,"class_name":"sportsball","score":99.0,"box_x":1,"box_y":2,"box_w":3,"box_h":4} "spoon" {"timestamp":1568925134323,"image_name":"spoon","class_id":5,"class_name":"spoon","score":99.0,"box_x":1,"box_y":2,"box_w":3,"box_h":4} "cup" {"timestamp":1568925194456,"image_name":"cup","class_id":9,"class_name":"cup","score":99.0,"box_x":1,"box_y":2,"box_w":3,"box_h":4}

Setting up MySQL

So, we now have Kafka running and our code producing messages to a topic. Each of these messages contains information about an item in a customer’s basket, which is a great start. However, neither the baskets nor Kafka have any notion of what item Big_Cloud_Dealz wants to pair with the items in customers’ baskets. Luckily, Kafka Connect makes it easy to integrate a large number of external systems with Kafka. If we have a Kafka topic that contains inventory information, it will eliminate the need to constantly perform SQL queries every time a new item is detected in a cart. This, in turn will prevent excessive table lookups and scalability issues. To demonstrate all of this, we’ll create a MySQL database, add a table for inventory and populate it, then use Kafka Connect to ingest that data into Kafka. The end goal here is to have all the information we need inside Kafka so that we can do real-time data enrichment using Kafka Streams, but we’ll wait until part 4 to do that. Start by installing MySQL and logging into it. Once you’re at the mysql> prompt, use the following commands to build the inventory table and populated it with data:

mysql> CREATE DATABASE big_cloud_dealz;

mysql> USE big_cloud_dealz;

mysql> CREATE TABLE inventory (
 id serial NOT NULL PRIMARY KEY,
 name varchar(100),
 upsell_item varchar(200),
 count INT,
 modified timestamp default CURRENT_TIMESTAMP NOT NULL,
 INDEX `modified_index` (`modified`)
 );

mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('cup', 'plate', 100);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('bowl', 'cup', 10);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('fork', 'spoon', 200);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('spoon', 'fork', 10);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('sportsball', 'soccer goal', 2);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('tennis racket', 'tennis ball', 10);
mysql> INSERT INTO inventory (name, upsell_item, count) VALUES ('frisbees', 'frisbee goal', 100);

The above commands do 4 things:

  • Create a database in MySQL called big_cloud_dealz
  • Switch the current database to big_cloud_dealz in the MySQL command line tool
  • Create a table called inventory in the big_cloud_dealz database
  • Insert 7 records in the inventory table
  • Now let's move on to configuring Kafka Connect so we can ingest the inventory table into a topic in the Kafka cluster.

    Configuring Kafka Connect

    Kafka Connect helps create reliable, high-performance ETL pipelines into Kafka. The Kafka Connect system uses a predefined connector to communicate with MySQL and ingest an Avro message for every record in the table. Given that our system is based on the Confluent platform, we already have Kafka Connect installed. Before we start Kafka Connect, we need to configure it to know where our database is located, what information to ingest, and how to connect to it. We can see the configuration file for the Kafka Connect system below.

    # Bootstrap Kafka servers. If multiple servers are specified, they should be comma-separated.
    bootstrap.servers=localhost:9092
    
    # The converters specify the format of data in Kafka and how to translate it into Connect data.
    # Every Connect user will need to configure these based on the format they want their data in
    # when loaded from or stored into Kafka
    key.converter=io.confluent.connect.avro.AvroConverter
    key.converter.schema.registry.url=http://localhost:8081
    value.converter=io.confluent.connect.avro.AvroConverter
    value.converter.schema.registry.url=http://localhost:8081
    
    # The internal converter used for offsets and config data is configurable and must be specified,
    # but most users will always want to use the built-in default. Offset and config data is never
    # visible outside of Connect in this format.
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter.schemas.enable=false
    internal.value.converter.schemas.enable=false
    
    # Local storage file for offset data
    offset.storage.file.filename=/tmp/connect.offsets
    
    plugin.path=/home/pattersonconsulting/confluent-5.1.2/share/java

    This configuration file tells Connect where the bootstrap server for the Kafka cluster is, to use Avro for the messages, and where the connector plugin jars are. Now, let’s take a look at how we configure the MySQL connector. We'll use the stock JDBC connector that ships with the Confluent platform. The stock JDBC connector allows us to connect to any relational database that supports JDBC as described below:

    "The JDBC connector allows you to import data from any relational database with a JDBC driver (such as MySQL, Oracle, or SQL Server) into Kafka. By using JDBC, this connector can support a wide variety of databases without requiring custom code for each one."

    Confluent Blog Post: How to Build a Scalable ETL Pipeline with Kafka Connect

    Here, you can see the connector configuration file. The most complicated part is making sure the connection.url string is right. Once the file is set up correctly, you’re ready to use Kafka Connect to ingest MySQL data from the inventory table into the Kafka topic mysql_tables_jdbc_inventory.

    name=mysql-jdbc
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    connection.url=jdbc:mysql://localhost:3306/big_cloud_dealz?user=root&password=1234&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC
    mode=timestamp+incrementing
    incrementing.column.name=id
    timestamp.column.name=modified
    topic.prefix=mysql_tables_jdbc_
    table.whitelist=inventory

    Running Kafka Connect to Ingest the Inventory Table from MySQL

    Now that everything is set up, let’s get Kafka Connect up and running. The command below runs Connect in standalone mode and points to the Connect properties and MySQL connector properties files.

    ./bin/connect-standalone /home/pattersonconsulting/BigCloudDealz_GreenLightSpecial/CartCamApp/src/main/resources/kafka/connect/connect-avro-standalone.properties /home/pattersonconsulting/BigCloudDealz_GreenLightSpecial/CartCamApp/src/main/resources/kafka/connect/mysql_ingest.properties

    This command will output logs to the terminal similar to this:

    [2019-09-19 15:50:26,257] INFO WorkerSourceTask{id=mysql-jdbc-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397) [2019-09-19 15:50:26,258] INFO WorkerSourceTask{id=mysql-jdbc-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414) [2019-09-19 15:50:26,271] INFO WorkerSourceTask{id=mysql-jdbc-0} Finished commitOffsets successfully in 13 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:496)

    If we launch another terminal window we can query our inventory data in the mysql_tables_jdbc_inventory topic with the following command:

    ./bin/kafka-avro-console-consumer --topic mysql_tables_jdbc_inventory --property print.key=true --from-beginning --bootstrap-server localhost:9092

    The output should look like what you see below. Kafka Connect successfully converted each row of the table into an individual message in the mysql_tables_jdbc_inventory topic in Kafka. You’ll notice that the key for each message is null. This is something we’ll need to address in part 4 of our series.

    null {"id":1,"name":{"string":"cup"},"upsell_item":{"string":"plate"},"count":{"int":100},"modified":1554994983000} null {"id":2,"name":{"string":"bowl"},"upsell_item":{"string":"cup"},"count":{"int":10},"modified":1554994994000} null {"id":3,"name":{"string":"fork"},"upsell_item":{"string":"spoon"},"count":{"int":200},"modified":1554995002000} null {"id":4,"name":{"string":"spoon"},"upsell_item":{"string":"fork"},"count":{"int":10},"modified":1554995012000} null {"id":5,"name":{"string":"sportsball"},"upsell_item":{"string":"soccer goal"},"count":{"int":2},"modified":1554995020000} null {"id":6,"name":{"string":"tennis racket"},"upsell_item":{"string":"tennis ball"},"count":{"int":10},"modified":1554995038000} null {"id":7,"name":{"string":"frisbees"},"upsell_item":{"string":"frisbee goal"},"count":{"int":100},"modified":1554995048000}

    A note on rerunning Kafka Connect

    If, for any reason, you find that you need to re-scan the inventory table in MySQL using Kafka Connect, you'll need to delete the connect.offsets file in the /tmp directory. This file keeps track of what Kafka Connect has already sent to Kafka. Deleting it ensures that Kafka Connect will re-scan the entire table. Otherwise, it would only send any new data to Kafka.

    Summary

    At this point, we have our system detecting objects, sending them to a Kafka topic, and our inventory table from MySQL being ingested into its own Kafka topic. In our final post in this series (Part 4, coming soon), the Big Cloud Dealz team will join the cart items with the inventory and perform windowed aggregations in real-time to create the "Green Light Special" application.

    For a further discussion on architecture ideas around Kafka and Confluent-based infrastructure, please reach out to the Patterson Consulting team. We'd love to talk with your team about topics such as:
    • DevOps Managed Kafka Clusters
    • Kafka Architectural Consulting
    • Custom Kafka Streaming API Application Design
    • Hardware design for your data center