What's the BEST strategy to publish messages using Kafka?
Take the first step to use Apache Kafka in your project
Hello, this is the ProgressiveCoder Publication and today, we will discuss the BEST strategy to publish messages using Kafka.
Whatever be our use-case for Kafka, we will always have a Producer that publishes messages to Kafka and a Consumer that subscribes to messages from Kafka.
In this particular post, we will focus on the message producing side of things. Our focus will be on a few important points:
How Kafka Producer works under the hood? Yes, we have illustrations to make things easy to understand.
How to actually create a Kafka Producer and send records to Kafka? Yes, there will be actual working code that you can use in your project.
What is the BEST strategy for publishing messages using Kafka? Yes, there is a BEST strategy that works for majority of cases.
If this is the first time you are hearing about Kafka, you might want to check out this earlier post that takes a high-level overview of Kafka.
Kafka Producer - What goes under the hood?
The first step in producing messages to Kafka is creating a ProducerRecord
.
The ProducerRecord
must contain two mandatory items:
The name of the Kafka Topic we want to utilize for sending our message. Basically, topics help us in classifying the message.
The actual message that we wish to send.
There are a bunch of optional items as well that we can add to the ProducerRecord
- a key, a partition, timestamp and a collection of headers. We won’t delve too much into them right now.
Anyways, what happens after we send the ProducerRecord
?
Well, a number of things take place before our record even reaches the Kafka broker.
First, the Producer serializes the key and the message to byte arrays. This is done so that the data can be sent over the network.
If we didn’t explicitly specify a partition, the data is sent to a partitioner. The partitioner will automatically choose a partition for us. This is usually done based on the ProducerRecord’s key (if we have specified one).
Once a partition is selected, the Producer knows the topic and the partition the record will eventually go to. However, it does not immediately send out the record.
The record is added to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending this batch to the appropriate Kafka brokers.
Ultimately, at some point of time, the record makes its way to the broker.
The below illustration explains the entire flow in a pictorial manner.
When the broker receives the messages, it sends back a response to the Producer.
If the messages were successfully written to Kafka, the broker will return a RecordMetadata
object. This object contains information such as the topic, partition and the offset of the record within the partition.
The broker, however, can also fail to write the message. In such a case, it will return an error.
When the Producer receives an error, it may retry sending the message a few more times before giving up and returning an error to the application.
The offset is an integer value that continually increases as more messages are added to the Kafka broker. Basically, it is a piece of metadata that Kafka adds to each message as it is produced.
Each message in a given partition has a unique offset and the next message has a greater offset than the one before it.
By storing the next possible offset for each partition (in Kafka itself), a consumer can stop and restart message processing without losing track.
How to create a Kafka Producer?
Now that we have some idea of what the Kafka Producer actually does, we can now create one programmatically.
Since we are using Java Maven project for our demo, we need to add the kafka-clients
dependency to the pom.xml
file. This dependency pulls in the official Kafka client package that contains all the classes we are using to create and use a Kafka Producer.
<dependencies>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.3.1</version>
</dependency>
</dependencies>
Next, we can create a Producer instance as below:
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", StringSerializer.class);
kafkaProps.put("value.serializer", StringSerializer.class);
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
Basically, we created a Properties
object and added three mandatory properties to it.
bootstrap.servers
- This is a list ofhost:port
pairs of brokers that the producer can use to create the initial connection to Kafka. It is good to provide at least two brokers (though we are playing a bit brave in the above snippet by having onlylocalhost:9092
). Even if one goes down, the producer can still connect to the cluster.key.serializer
- This is the name of the class that will be used to serialize the keys of the records. Though Kafka brokers expect byte arrays as keys, the producer interface allows us to use parameterized types. Therefore, we can use any valid Java object as a key. The producer will use this class to serialize the key object to a byte array.value.serializer
- As evident, this is the name of a class that will be used to serialize the message value of the records. Basically, this is the same askey.serializer
except it is applicable to the message or value part of the record.
Once the Properties
object is ready, we use it while instantiating the KafkaProducer
. The <String, String>
denotes the data type for the key and value respectively.
Some more important points about the Kafka Producer:
The
key.serializer
andvalue.serializer
should be set to the name of a class that implements theorg.apache.kafka.common.serialization.Serializer
interface. The special classStringSerializer
implements this particular interface.We may not be interested in setting a key for our record. However, we still need to set the value for the
key.serializer
to the specialVoidSerializer
class.Our Kafka cluster might have several brokers. However, we don’t need to include the entire list as part of the
bootstrap.servers
. Just one or two are enough. The producer will get more information about the other brokers after the initial connection to the cluster.
Strategies to send messages using Kafka Producer
With our Producer configured, we can actually send messages to the Kafka broker. As mentioned earlier, there are a few strategies we can use while sending messages:
Fire and Forget
Synchronous Send
Asynchronous Send
Let us look at each of them one-by-one.
Fire and Forget Strategy
As the name suggests, we send a message to the Kafka broker and forget about it. We don’t care if the message successfully reaches the broker or not.
Since Kafka is highly available and the Producer also retries sending the messages, chances are that the message will arrive successfully.
However, if there is a genuine error that cannot be retried or there is a timeout, messages will get lost. Moreover, the application will not get any information or exceptions about this failure.
The below code snippet uses the fire-and-forget approach.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaDemo {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", StringSerializer.class);
kafkaProps.put("value.serializer", StringSerializer.class);
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<>("topic-1", "msg", "kafka trial message");
try {
producer.send(record);
} catch(Exception e) {
e.printStackTrace();
}
producer.flush();
}
}
The first segment of the code deals with configuring the KafkaProducer
instance the same way we saw in the previous section.
Once the instance is configured, we create a ProducerRecord
object. The constructor of this class takes a few input parameters:
The name of the topic (in this case
topic-1
)The key of the message
Lastly, the actual message value which is a very imaginative “kafka trial message”.
Next, we send the message to the Kafka broker using the producer.send()
method. The call to this method is wrapped within a try-catch
block to handle exceptions that might arise.
At the very end, we flush the producer by calling the flush()
method.
If you recall, Kafka Producer does not actually send the message immediately but places it in a batch. A separate thread ships the message after some time.
However, the above application terminates after calling
producer.send()
even before a separate thread can send the message. Theflush()
method basically flushes the accumulated records in the producer with immediate effect.
Synchronous Send Strategy
On a purely technical level, the Kafka Producer is always asynchronous.
However, we can force it to behave in a synchronous manner. Check out the below piece of code.
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
public class KafkaDemo {
public static void main(String[] args) {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", StringSerializer.class);
kafkaProps.put("value.serializer", StringSerializer.class);
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<>("topic-1", "msg", "new producer");
try {
producer.send(record).get();
} catch(Exception e) {
e.printStackTrace();
}
}
}
What’s the difference here?
Nothing much! We have turned the send method to a synchronous operation by using producer.send(record).get()
.
Basically, the send()
method returns a Future
object. By using the get()
method, we wait on the Future
object to make sure if the call to send()
was successful or not.
The method will throw an exception if the record is not sent successfully to Kafka. If there were no errors, we get a RecordMetadata
object that contains information such as the offset
of the message.
Notice that here we don’t use
flush()
. There is no need for flushing the producer. Since this is synchronous approach, the producer will move forward only after receiving the response from Kafka.
You must have already noticed the issue with this approach.
With synchronous send, we are essentially making a trade-off on performance.
Brokers in a typical Kafka cluster may take some time to respond to message produce requests. With synchronous messages, the sending thread of our application will spend this time waiting and doing nothing else. This can dramatically reduce the performance of our application.
Asynchronous Send Strategy
This is the third approach for sending messages to Kafka.
Basically, here we call the send()
method of the Producer with a callback function. The callback function gets triggered when it receives a response from the Kafka broker.
Check out the below code:
package com.progressivecoder.kafkaexample;
import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import java.util.Properties;
@SpringBootApplication
public class KafkaExampleApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KafkaExampleApplication.class, args);
}
@Override
public void run(String... args) throws Exception {
Properties kafkaProps = new Properties();
kafkaProps.put("bootstrap.servers", "localhost:9092");
kafkaProps.put("key.serializer", StringSerializer.class);
kafkaProps.put("value.serializer", StringSerializer.class);
KafkaProducer producer = new KafkaProducer<String, String>(kafkaProps);
ProducerRecord<String, String> record = new ProducerRecord<>("topic-1", "msg", "new kafka async");
try {
producer.send(record, new DemoProducerCallback());
} catch(Exception e) {
e.printStackTrace();
}
}
}
class DemoProducerCallback implements Callback {
@Override
public void onCompletion(RecordMetadata recordMetadata, Exception e) {
if (e != null) {
e.printStackTrace();
}
}
}
Here, we are using a slightly different setup - A Java Spring Boot application.
With Spring Boot, our application will continue running even after we send the message to Kafka. This way we can easily configure the callback.
To use callbacks, we need to declare a class that implements the
Callback
interface provided by theorg.apache.kafka.clients.producer
package. This interface has a single functiononCompletion()
that we have to implement.If there is an error from Kafka, the
onCompletion()
will have a non-null exception. We can handle the same by printing the stack trace.In case of success, we get the
RecordMetadata
that contains the topic, partition and offset of the message.Once the callback class is defined, we pass an object of this class while calling the
producer.send()
method.The entire
KafkaProducer
configuration setup and sending the message is within therun()
method that comes from theCommandLineRunner
interface provided by Spring Boot.
The real advantage of callback approach is seen when we fail to send message.
With the callback in place, we can take some action in case of message failure such as throwing an exception, logging an error or writing the failed message to some other place for further analysis.
NOTE OF CAUTION
The callback functions execute in the producer’s main thread.
Therefore, we should make sure that the callback is reasonably fast and there are no blocking operations within the callback as it can slow down the producer.
We are done with our first step in using the Kafka Producer. We also looked at the three different strategies for sending messages using Kafka.
So what’s the best strategy?
As with most things in software engineering, it depends on the use-case. However, while all three strategies have their benefits, the asynchronous send approach is probably the best strategy for a typical production application.
It allows us to send messages at a high enough performance while also managing error situations and exceptions. Basically, you get the best of both worlds while using asynchronous approach.
In case you don’t care about what happens to your messages after sending, fire-and-forget is the easiest choice.
Thanks for reading this post! Please do share your views and suggestions in the comments section below. It would help improve the publication.
If you found this post useful, please consider sharing it with your friends and colleagues.
Also, follow me on Twitter for latest updates and behind-the-scenes information about upcoming topics.