Notes on Using Apache Avro with Apache Kafka

Author: Josh Patterson

Date: July 16th, 2018

In this blog article we give a list of tips and tricks on the basics of using Apache Avro with Apache Kafka. Topics Include:

  • What is Apache Avro?
  • How does Avro work within Kafka?
  • Best practices for Avro usage with Kafka Producers, Consumers, and Streaming API

What is Avro and Why Do I Care?

The description from the Apache Avro website describes Avro as a data serialization system providing rich data structures in a compact binary data format (and more). The data in Avro is described in a language-independent schema (which are usually JSON format) and binary files are generally how the files are serialized. Many times the schema is embedded in the data files themselves. The Apache Kafka book goes on to state:

"Apache Avro, which is a serialization framework originally developed for Hadoop. Avro provides a compact serialization format; schemas that are separate from the message payloads and that do not require code to be generated when they change; and strong data typing and schema evolution, with both backward and forward compatibility."

Narkhede, Shapira, and Palino, "Kafka: The Definitive Guide"

We can see an example of an avro schem below:

We can see from the Avro example above that there are four fields (outside of the fields themselves in the "fields" entry):

  1. type - Identifies the JSON field type. For Avro schemas, this must always be 'record' when it is specified at the schema's top level
  2. namespace - effectively a URI that uniquely identifies this schema for your organization
  3. name - combines with namespace (making it fully-qualified) to uniquely identify in the global pool of Avro schemas
  4. fields - actual schema definition, defines what fields are in the value payload of the message and their corresponding data types
Note that schema field names must begin with [A-Za-z_], and subsequently contain only [A-Za-z0-9_].

The home website for Avro offers:

  • maven dependency information for including Avro in projects
  • the Avro Maven plugin for performing code generation
  • examples of schemas
  • instructions on how to perform manual code generation for Avro
While all of these tools are interesting in the abstract, what we really want to focus on in this article in how we apply Avro in the context of Apache Kafka streaming applicaitons.

Using Apache Avro with Apache Kafka

The Apache Kafka book sets forth serialization as a key part of building a robust data platform as described in the excerpt below:

"A consistent data format is important in Kafka, as it allows writing and reading messages to be decoupled. When these tasks are tightly coupled, applications that subscribe to messages must be updated to handle the new data format, in parallel with the old format. Only then can the applications that publish the messages be updated to utilize the new format. By using well-defined schemas and storing them in a common repository, the messages in Kafka can be understood without coordination."

Narkhede, Shapira, and Palino, "Kafka: The Definitive Guide"

While protobuff and raw JSON are considered popular for Kafka in some circles1, many large enterprises prefer Avro as the serialization framework of choice to use with Apache Kafka as outlined in Jay Kreps blog article. Jay makes the argument for Avro on Confluent's platform (their distro of Apache Kafka) based on the following reasons:
  • It has a direct mapping to and from JSON
  • It has a very compact format. The bulk of JSON, repeating every field name with every single record, is what makes JSON inefficient for high-volume usage.
  • It is very fast.
  • It has great bindings for a wide variety of programming languages so you can generate Java objects that make working with event data easier, but it does not require code generation so tools can be written generically for any data stream.
  • It has a rich, extensible schema language defined in pure JSON
  • It has the best notion of compatibility for evolving your data over time. (note: Avro schemas can be evolved when they abide by a set of compatibility rules)
Schemas drive collaboration, consistency, robustness, and clarity in how to interoperate on a system. Avro gives us these properties and thus is more than suitable for the Apache Kafka platform.

So now that we've made the argument for using Avro for serialization on Kafka, we need to dig into "how" part of doing this. Avro has many subtlies to it, and saying "just use avro" can prove daunting to new Kafka users. This article is meant to provide some notes on basic usage of Avro across producers, consumers, and streaming applications on Kafka. The key aspects of Avro usage in Kafka we'll focus on for this article are:

Let's start off with taking a quick look at the role of the Schema Registry in building streaming platforms for enterprises.

Leveraging the Confluent Schema Registry

I felt a bit of deja vu when reading Gwen Shapira's story about building ETL pipelines for Data Warehouse Offloading while working on Hadoop projects (side note: we both worked in the field division at Cloudera previously). Schema-on-read (or really "whatever schema was embedded in the MapReduce code...") was the theme of the times (around 2010), and the world needed flexibility to handle the masses of heterogenous types of data being offloaded into Hadoop. However, over time the world found that schema-on-read had some drawbacks and began to think about bringing order to the wild west of data pipelines. Enter the concept of the Schema Registry and schema management.

The nice aspects of the schema registry is that it watches messages passing through for new schemas and archives each new schema. Specifically the class KafkaAvroSerializer does this, as explained in the Confluent documentation:

"When sending a message to a topic t, the Avro schema for the key and the value will be automatically registered in the Schema Registry under the subject t-key and t-value, respectively, if the compatibility test passes."

Serializer and Formatter

The schema is sent to the schema registry for retrieval later by other processes needing to decode the payload of a message using the same schema. The schema registry acts as a centralized authority on schema management for the Kafka platforn.

We need the serializer used to produce messages to the Kafka cluster to match the deserializer that will be used when consuming messages (otherwise bad things happen and exceptions are thrown). The behind-the-scenes details of storing the schema in the registry and pulling it up when required is performed by the serializers and deserializers in Kafka (which is pretty handy). Having to manually keep track of these when dealing with lots of concurrent streams on a well-worked Kafka cluster quickly becomes a challenge. Being able to leverage Avro in conjunction with the Confluent Schema Registry solves a lot of issues easily that most enterprises don't take a lot of time to think about (side note: data platforms in the enterprise commonly overlook many of the small issues like these which can end up taking most of the engineering time, ironically enough).

So we've established a solid argument for not only using Avro on Kafka but also basing our schema management on the Confluent Schema Registry. Now let's take a look at design patterns for Avro schema design and then ways to encode messages with Avro for Kafka: Generic Records and Specific Records.

Tips for Avro Schema Design in Kafka

We'll briefly highlight a few notes from this article about best practices for Avro schema design relative to usage in Kafka:

  • Use enumerated values whenever possible
  • Require documentation for all fields
  • Avoid non-trivial union types and recursive types
  • Enforce reasonable schema and field naming conventions
Beyond defining the schema, we also need to consider which part of the Avro API we want to use.

General Notes on Generic Avro Records vs Specific Avro Records

The Avro SpecificRecord API does require us to know in advance of compile time what all of our schema fields will be, unlike the Avro GenericRecord API. If we need more flexibility at runtime, then we likely should consider the GenericRecord API instead. The high-level comparison between the two Avro API paths:

  • generic record api is adaptable due to flexibility at runtime
  • specific record api has type safety and ease-of-use provided by specific format
  • both styles are orthogonal to schema evolution (e.g., "it doesnt affect schema evolution")
  • both styles are orthogonal to storage space as the binary encoding is exactly the same
  • if we don't have a good idea of how set the schema will be, we might want to start out with the generic record API

Avro SpecificRecords

SpecificRecord Avro bindings make use of classes that are generated from your schema specifications with the Maven Avro Plugin. These generated classes allow us to manage the fields in our Avro schema using getter and setter methods in the code in our Kafka application, making programming feel a bit more familiar. The SpecificRecord API offers static compile time type safety checks and provides integrity for using correct field names and datatypes. We see the SpecificRecord API used for most RPC uses and for data applications that always use the same datatypes (e.g., "schemas are known at compile time").

Avro GenericRecords

The Avro GenericRecord binding is a general-purpose binding which indentifies fields to be read and written by supplying a simple string that names the field, as can see in the example schema code section shown below. Generic record bindings provide the widest support for the Avro data types, which is helpful if your store has a constantly expanding set of schema. For data applications that accept dynamic datatypes where the schema is not known until runtime the Avro GenericRecord API is recommended. The advantage with this approach is it allows us to use the binding in a generic manner but also suffers from a lack of type safety at application compile time. If we are early on in the message design process and we don't have a good idea of how set the schema will be, we might want to start out with the generic record API.

Best Practices with the Avro Records API for Producers, Consumers, and the Streaming API

A quick guide with examples of each API for the 3 types of Kafka processing nodes commonly seen used with Avro. We're listing this out explicitly here because its not always clear which API to use and where you will need the .avsc file (or not).

Leveraging the SpecificRecord API with Kafka and the Schema Registry

Overview: with the SpecificRecord API you'll always provide the .avsc file at compile time to generate the Avro specific message code to be compiled into the Kafka application.
  • producers: for the SpecficRecord API we need the specific .avsc file for the schema so that the Maven Avro Plugin can auto-generate the code to represent the class, which is then compiled in as part of the application. The class is referenced in the import list of the Kafka code and is used like a normal java class. [ example .avsc, example producer ]
  • consumers: for SpecificRecord API usage and consumers we need a .avsc file for the Maven Avro Plugin to generate the Avro message generated code. [ example avsc, example consumer ]
  • streaming API: for Specific Record API usage and the streaming API we again need a .avsc file for the Maven Avro Plugin to generate the message class code [ example avsc, example streaming code ]

Leveraging the GenericRecord API with Kafka and the Schema Registry

Overview: with the GenericRecord API it is a mixture of usage patterns with respect to how the schema is provided, as detailed befow.
  • producers: need to have the schema typically embedded as a string in the code to inform the Generic Record object. The schema registry may or may not already have a copy of the schema, but for a new application it likely will not so we'll need to provide the schema the first time. [ example producer ]
  • consumers: if they are only consuming messages formatted with the Avro Generic object schema, then it will take the schema ID from the message it gets, query the schema registry, pull the schema, and update the generic avro object instance locally to parse the message bytes
  • streaming API: for anything it reads via avro, it can look up the message in the schema registry, ------- but for any intermediate or outbound messages, it needs to have an Avro schema copy (embedded in code, or locally somehow) of the schema to dynamically generate the messages [ example streaming code ]

Illustrated Example: Kafka Producer Example Using SpecificRecord API

In this example we see a basic producer that is using the SpecificRecord API to and the Maven Avro plugin to generate the Avro message class at compile time with the included .avsc file shown below:

In the code below we see the producer code reading log lines from a file on disk and producing Kafka messages to a Kafka cluster using the class (JavaSessionize.avro.LogLine) generated at compile from the .avsc file shown above. A few lines of note in the example code above:

  • key and value serializers are both set to io.confluent.kafka.serializers.KafkaAvroSerializer, which knows how to interop with the Confluent Schema Registry
  • schema.registry.url - gives the address of where the Confluent Schema Registry is running
  • Producer producer = new KafkaProducer(props) --- shows the LogLine class in action at compile time

Illustrated Example: Kafka Producer Example Using GenericRecord API

Now let's take a look at using generic Avro objects rather than generated Avro objects in the context of a Kafka producer example.

  1. we still use KafkaAvroSerializer
  2. we also still provide the uri of schema registry
  3. we now need to provide the avro schema at runtime, since it is not provided by the avro-generated object. In the example below we see the schema text in the code itself.
  4. The Avro message object type is nowGenericRecord and we have to manually initialize it with our text schema we include in the code
  5. ProducerRecord will use a GenericRecord that contains the combination of our custom schema and data values for the fields
We can see this in action in the example below where we see a Kafka Producer setting up to use the GenericRecord API:

Summary

In this article we started out introducing the basic concept of Avro, made the case for the use of Avro with the Confluent Schema Registry as a best practice for Kafka, and then provided some best practices for Specific vs Generic Avro Record API usage. We hope this information helps as an introduction to the concepts for any new Kafka practitioners out there.