Using Apache Avro with Apache Kafka
The Apache Kafka book sets forth serialization as a key part of building a robust data platform as described in the excerpt below:
"A consistent data format is important in Kafka, as it allows writing and reading messages to be decoupled. When these tasks are tightly coupled, applications that subscribe to messages must be updated to handle the new data format, in parallel with the old format. Only then can the applications that publish the messages be updated to utilize the new format. By using well-defined schemas and storing them in a common repository, the messages in Kafka can be understood without coordination."
Narkhede, Shapira, and Palino, "Kafka: The Definitive Guide"
While protobuff and raw JSON are considered popular for Kafka in some circles1
, many large enterprises prefer Avro as the serialization framework of choice to use with Apache Kafka as outlined in Jay Kreps blog article
. Jay makes the argument for Avro on Confluent's platform (their distro of Apache Kafka) based on the following reasons:
- It has a direct mapping to and from JSON
- It has a very compact format. The bulk of JSON, repeating every field name with every single record, is what makes JSON inefficient for high-volume usage.
- It is very fast.
- It has great bindings for a wide variety of programming languages so you can generate Java objects that make working with event data easier, but it does not require code generation so tools can be written generically for any data stream.
- It has a rich, extensible schema language defined in pure JSON
- It has the best notion of compatibility for evolving your data over time. (note: Avro schemas can be evolved when they abide by a set of compatibility rules)
Schemas drive collaboration, consistency
, robustness, and clarity in how to interoperate on a system. Avro gives us these properties and thus is more than suitable for the Apache Kafka platform.
So now that we've made the argument for using Avro for serialization on Kafka, we need to dig into "how" part of doing this. Avro has many subtlies to it, and saying "just use avro" can prove daunting to new Kafka users. This article is meant to provide some notes on basic usage of Avro across producers, consumers, and streaming applications on Kafka.
The key aspects of Avro usage in Kafka we'll focus on for this article are:
Let's start off with taking a quick look at the role of the Schema Registry in building streaming platforms for enterprises.
Leveraging the Confluent Schema Registry
I felt a bit of deja vu when reading Gwen Shapira's story about building ETL pipelines for Data Warehouse Offloading while working on Hadoop projects (side note: we both worked in the field division at Cloudera previously). Schema-on-read (or really "whatever schema was embedded in the MapReduce code...") was the theme of the times (around 2010), and the world needed flexibility to handle the masses of heterogenous types of data being offloaded into Hadoop. However, over time the world found that schema-on-read had some drawbacks and began to think about bringing order to the wild west of data pipelines. Enter the concept of the Schema Registry and schema management.
The nice aspects of the schema registry is that it watches messages passing through for new schemas and archives each new schema. Specifically the class KafkaAvroSerializer does this, as explained in the Confluent documentation:
"When sending a message to a topic t, the Avro schema for the key and the value will be automatically registered in the Schema Registry under the subject t-key and t-value, respectively, if the compatibility test passes."
Serializer and Formatter
The schema is sent to the schema registry for retrieval later by other processes needing to decode the payload of a message using the same schema. The schema registry acts as a centralized authority on schema management for the Kafka platforn.
We need the serializer used to produce messages to the Kafka cluster to match the deserializer that will be used when consuming messages (otherwise bad things happen and exceptions are thrown).
The behind-the-scenes details of storing the schema in the registry and pulling it up when required is performed by the serializers and deserializers in Kafka (which is pretty handy). Having to manually keep track of these when dealing with lots of concurrent streams on a well-worked Kafka cluster quickly becomes a challenge. Being able to leverage Avro in conjunction with the Confluent Schema Registry solves a lot of issues easily that most enterprises don't take a lot of time to think about (side note: data platforms in the enterprise commonly overlook many of the small issues like these which can end up taking most of the engineering time, ironically enough).
So we've established a solid argument for not only using Avro on Kafka but also basing our schema management on the Confluent Schema Registry. Now let's take a look at design patterns for Avro schema design and then ways to encode messages with Avro for Kafka: Generic Records and Specific Records.
Tips for Avro Schema Design in Kafka
We'll briefly highlight a few notes from this article about best practices for Avro schema design relative to usage in Kafka:
- Use enumerated values whenever possible
- Require documentation for all fields
- Avoid non-trivial union types and recursive types
- Enforce reasonable schema and field naming conventions
Beyond defining the schema, we also need to consider which part of the Avro API we want to use.
General Notes on Generic Avro Records vs Specific Avro Records
The Avro SpecificRecord API does require us to know in advance of compile time what all of our schema fields will be, unlike the Avro GenericRecord API. If we need more flexibility at runtime, then we likely should consider the GenericRecord API instead.
The high-level comparison between the two Avro API paths:
- generic record api is adaptable due to flexibility at runtime
- specific record api has type safety and ease-of-use provided by specific format
- both styles are orthogonal to schema evolution (e.g., "it doesnt affect schema evolution")
- both styles are orthogonal to storage space as the binary encoding is exactly the same
- if we don't have a good idea of how set the schema will be, we might want to start out with the generic record API
Avro bindings make use of classes that are generated from your schema specifications with the Maven Avro Plugin.
These generated classes allow us to manage the fields in our Avro schema using getter and setter methods in the code in our Kafka application, making programming feel a bit more familiar.
The SpecificRecord API
offers static compile time type safety checks and provides integrity for using correct field names and datatypes. We see the SpecificRecord API used for most RPC uses and for data applications that always use the same datatypes (e.g., "schemas are known at compile time").
The Avro GenericRecord
binding is a general-purpose binding which indentifies fields to be read and written by supplying a simple string that names the field, as can see in the example schema code section shown below.
Generic record bindings provide the widest support for the Avro data types, which is helpful if your store has a constantly expanding set of schema.
For data applications that accept dynamic datatypes where the schema is not known until runtime the Avro GenericRecord API
is recommended. The advantage with this approach is it allows us to use the binding in a generic manner but also suffers from a lack of type safety at application compile time.
If we are early on in the message design process and we don't have a good idea of how set the schema will be, we might want to start out with the generic record API.
Best Practices with the Avro Records API for Producers, Consumers, and the Streaming API
A quick guide with examples of each API for the 3 types of Kafka processing nodes commonly seen used with Avro. We're listing this out explicitly here because its not always clear which API to use and where you will need the .avsc file (or not).
Leveraging the SpecificRecord API with Kafka and the Schema Registry
Overview: with the SpecificRecord API you'll always provide the .avsc file at compile time
to generate the Avro specific message code to be compiled into the Kafka application.
- producers: for the SpecficRecord API we need the specific .avsc file for the schema so that the Maven Avro Plugin can auto-generate the code to represent the class, which is then compiled in as part of the application. The class is referenced in the import list of the Kafka code and is used like a normal java class. [ example .avsc, example producer ]
- consumers: for SpecificRecord API usage and consumers we need a .avsc file for the Maven Avro Plugin to generate the Avro message generated code. [ example avsc, example consumer ]
- streaming API: for Specific Record API usage and the streaming API we again need a .avsc file for the Maven Avro Plugin to generate the message class code [ example avsc, example streaming code ]
Leveraging the GenericRecord API with Kafka and the Schema Registry
Overview: with the GenericRecord API it is a mixture of usage patterns with respect to how the schema is provided, as detailed befow.
- producers: need to have the schema typically embedded as a string in the code to inform the Generic Record object. The schema registry may or may not already have a copy of the schema, but for a new application it likely will not so we'll need to provide the schema the first time. [ example producer ]
- consumers: if they are only consuming messages formatted with the Avro Generic object schema, then it will take the schema ID from the message it gets, query the schema registry, pull the schema, and update the generic avro object instance locally to parse the message bytes
- streaming API: for anything it reads via avro, it can look up the message in the schema registry, ------- but for any intermediate or outbound messages, it needs to have an Avro schema copy (embedded in code, or locally somehow) of the schema to dynamically generate the messages [ example streaming code ]