You're viewing documentation for an older version of Kafka - check out our current documentation here.

Documentation

Kafka 0.9.0

Prior releases: 0.7.x, 0.8.0, 0.8.1.X, 0.8.2.X.

1. Getting Started

1.1 Introduction

Kafka® is a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system, but with a unique design.

What does all that mean?

First let's review some basic messaging terminology:

  • Kafka maintains feeds of messages in categories called topics.
  • We'll call processes that publish messages to a Kafka topic producers.
  • We'll call processes that subscribe to topics and process the feed of published messages consumers..
  • Kafka is run as a cluster comprised of one or more servers each of which is called a broker.
So, at a high level, producers send messages over the network to the Kafka cluster which in turn serves them up to consumers like this:
Communication between the clients and the servers is done with a simple, high-performance, language agnostic TCP protocol. We provide a Java client for Kafka, but clients are available in many languages.

Topics and Logs

Let's first dive into the high-level abstraction Kafka provides—the topic.

A topic is a category or feed name to which messages are published. For each topic, the Kafka cluster maintains a partitioned log that looks like this:

Each partition is an ordered, immutable sequence of messages that is continually appended to—a commit log. The messages in the partitions are each assigned a sequential id number called the offset that uniquely identifies each message within the partition.

The Kafka cluster retains all published messages—whether or not they have been consumed—for a configurable period of time. For example if the log retention is set to two days, then for the two days after a message is published it is available for consumption, after which it will be discarded to free up space. Kafka's performance is effectively constant with respect to data size so retaining lots of data is not a problem.

In fact the only metadata retained on a per-consumer basis is the position of the consumer in the log, called the "offset". This offset is controlled by the consumer: normally a consumer will advance its offset linearly as it reads messages, but in fact the position is controlled by the consumer and it can consume messages in any order it likes. For example a consumer can reset to an older offset to reprocess.

This combination of features means that Kafka consumers are very cheap—they can come and go without much impact on the cluster or on other consumers. For example, you can use our command line tools to "tail" the contents of any topic without changing what is consumed by any existing consumers.

The partitions in the log serve several purposes. First, they allow the log to scale beyond a size that will fit on a single server. Each individual partition must fit on the servers that host it, but a topic may have many partitions so it can handle an arbitrary amount of data. Second they act as the unit of parallelism—more on that in a bit.

Distribution

The partitions of the log are distributed over the servers in the Kafka cluster with each server handling data and requests for a share of the partitions. Each partition is replicated across a configurable number of servers for fault tolerance.

Each partition has one server which acts as the "leader" and zero or more servers which act as "followers". The leader handles all read and write requests for the partition while the followers passively replicate the leader. If the leader fails, one of the followers will automatically become the new leader. Each server acts as a leader for some of its partitions and a follower for others so load is well balanced within the cluster.

Producers

Producers publish data to the topics of their choice. The producer is responsible for choosing which message to assign to which partition within the topic. This can be done in a round-robin fashion simply to balance load or it can be done according to some semantic partition function (say based on some key in the message). More on the use of partitioning in a second.

Consumers

Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each message goes to one of them; in publish-subscribe the message is broadcast to all consumers. Kafka offers a single consumer abstraction that generalizes both of these—the consumer group.

Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.

If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.

If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.

More commonly, however, we have found that topics have a small number of consumer groups, one for each "logical subscriber". Each group is composed of many consumer instances for scalability and fault tolerance. This is nothing more than publish-subscribe semantics where the subscriber is cluster of consumers instead of a single process.


A two server Kafka cluster hosting four partitions (P0-P3) with two consumer groups. Consumer group A has two consumer instances and group B has four.

Kafka has stronger ordering guarantees than a traditional messaging system, too.

A traditional queue retains messages in-order on the server, and if multiple consumers consume from the queue then the server hands out messages in the order they are stored. However, although the server hands out messages in order, the messages are delivered asynchronously to consumers, so they may arrive out of order on different consumers. This effectively means the ordering of the messages is lost in the presence of parallel consumption. Messaging systems often work around this by having a notion of "exclusive consumer" that allows only one process to consume from a queue, but of course this means that there is no parallelism in processing.

Kafka does it better. By having a notion of parallelism—the partition—within the topics, Kafka is able to provide both ordering guarantees and load balancing over a pool of consumer processes. This is achieved by assigning the partitions in the topic to the consumers in the consumer group so that each partition is consumed by exactly one consumer in the group. By doing this we ensure that the consumer is the only reader of that partition and consumes the data in order. Since there are many partitions this still balances the load over many consumer instances. Note however that there cannot be more consumer instances in a consumer group than partitions.

Kafka only provides a total order over messages within a partition, not between different partitions in a topic. Per-partition ordering combined with the ability to partition data by key is sufficient for most applications. However, if you require a total order over messages this can be achieved with a topic that has only one partition, though this will mean only one consumer process per consumer group.

Guarantees

At a high-level Kafka gives the following guarantees:
  • Messages sent by a producer to a particular topic partition will be appended in the order they are sent. That is, if a message M1 is sent by the same producer as a message M2, and M1 is sent first, then M1 will have a lower offset than M2 and appear earlier in the log.
  • A consumer instance sees messages in the order they are stored in the log.
  • For a topic with replication factor N, we will tolerate up to N-1 server failures without losing any messages committed to the log.
More details on these guarantees are given in the design section of the documentation.

1.2 Use Cases

Here is a description of a few of the popular use cases for Apache Kafka. For an overview of a number of these areas in action, see this blog post.

Messaging

Kafka works well as a replacement for a more traditional message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc). In comparison to most messaging systems Kafka has better throughput, built-in partitioning, replication, and fault-tolerance which makes it a good solution for large scale message processing applications.

In our experience messaging uses are often comparatively low-throughput, but may require low end-to-end latency and often depend on the strong durability guarantees Kafka provides.

In this domain Kafka is comparable to traditional messaging systems such as ActiveMQ or RabbitMQ.

Website Activity Tracking

The original use case for Kafka was to be able to rebuild a user activity tracking pipeline as a set of real-time publish-subscribe feeds. This means site activity (page views, searches, or other actions users may take) is published to central topics with one topic per activity type. These feeds are available for subscription for a range of use cases including real-time processing, real-time monitoring, and loading into Hadoop or offline data warehousing systems for offline processing and reporting.

Activity tracking is often very high volume as many activity messages are generated for each user page view.

Metrics

Kafka is often used for operational monitoring data. This involves aggregating statistics from distributed applications to produce centralized feeds of operational data.

Log Aggregation

Many people use Kafka as a replacement for a log aggregation solution. Log aggregation typically collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing. Kafka abstracts away the details of files and gives a cleaner abstraction of log or event data as a stream of messages. This allows for lower-latency processing and easier support for multiple data sources and distributed data consumption. In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance, stronger durability guarantees due to replication, and much lower end-to-end latency.

Stream Processing

Many users end up doing stage-wise processing of data where data is consumed from topics of raw data and then aggregated, enriched, or otherwise transformed into new Kafka topics for further consumption. For example a processing flow for article recommendation might crawl article content from RSS feeds and publish it to an "articles" topic; further processing might help normalize or deduplicate this content to a topic of cleaned article content; a final stage might attempt to match this content to users. This creates a graph of real-time data flow out of the individual topics. Storm and Samza are popular frameworks for implementing these kinds of transformations.

Event Sourcing

Event sourcing is a style of application design where state changes are logged as a time-ordered sequence of records. Kafka's support for very large stored log data makes it an excellent backend for an application built in this style.

Commit Log

Kafka can serve as a kind of external commit-log for a distributed system. The log helps replicate data between nodes and acts as a re-syncing mechanism for failed nodes to restore their data. The log compaction feature in Kafka helps support this usage. In this usage Kafka is similar to Apache BookKeeper project.

1.3 Quick Start

This tutorial assumes you are starting fresh and have no existing Kafka or ZooKeeper data.

Step 1: Download the code

Download the 0.9.0.0 release and un-tar it.
> tar -xzf kafka_2.11-0.9.0.0.tgz
> cd kafka_2.11-0.9.0.0

Step 2: Start the server

Kafka uses ZooKeeper so you need to first start a ZooKeeper server if you don't already have one. You can use the convenience script packaged with kafka to get a quick-and-dirty single-node ZooKeeper instance.

> bin/zookeeper-server-start.sh config/zookeeper.properties
[2013-04-22 15:01:37,495] INFO Reading configuration from: config/zookeeper.properties (org.apache.zookeeper.server.quorum.QuorumPeerConfig)
...
Now start the Kafka server:
> bin/kafka-server-start.sh config/server.properties
[2013-04-22 15:01:47,028] INFO Verifying properties (kafka.utils.VerifiableProperties)
[2013-04-22 15:01:47,051] INFO Property socket.send.buffer.bytes is overridden to 1048576 (kafka.utils.VerifiableProperties)
...

Step 3: Create a topic

Let's create a topic named "test" with a single partition and only one replica:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
We can now see that topic if we run the list topic command:
> bin/kafka-topics.sh --list --zookeeper localhost:2181
test
Alternatively, instead of manually creating topics you can also configure your brokers to auto-create topics when a non-existent topic is published to.

Step 4: Send some messages

Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default each line will be sent as a separate message.

Run the producer and then type a few messages into the console to send to the server.

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message

Step 5: Start a consumer

Kafka also has a command line consumer that will dump out messages to standard output.
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
This is a message
This is another message

If you have each of the above commands running in a different terminal then you should now be able to type messages into the producer terminal and see them appear in the consumer terminal.

All of the command line tools have additional options; running the command with no arguments will display usage information documenting them in more detail.

Step 6: Setting up a multi-broker cluster

So far we have been running against a single broker, but that's no fun. For Kafka, a single broker is just a cluster of size one, so nothing much changes other than starting a few more broker instances. But just to get feel for it, let's expand our cluster to three nodes (still all on our local machine).

First we make a config file for each of the brokers:

> cp config/server.properties config/server-1.properties
> cp config/server.properties config/server-2.properties
Now edit these new files and set the following properties:

config/server-1.properties:
    broker.id=1
    port=9093
    log.dir=/tmp/kafka-logs-1

config/server-2.properties:
    broker.id=2
    port=9094
    log.dir=/tmp/kafka-logs-2
The broker.id property is the unique and permanent name of each node in the cluster. We have to override the port and log directory only because we are running these all on the same machine and we want to keep the brokers from all trying to register on the same port or overwrite each others data.

We already have Zookeeper and our single node started, so we just need to start the two new nodes:

> bin/kafka-server-start.sh config/server-1.properties &
...
> bin/kafka-server-start.sh config/server-2.properties &
...
Now create a new topic with a replication factor of three:
> bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 1 --topic my-replicated-topic
Okay but now that we have a cluster how can we know which broker is doing what? To see that run the "describe topics" command:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 1	Replicas: 1,2,0	Isr: 1,2,0
Here is an explanation of output. The first line gives a summary of all the partitions, each additional line gives information about one partition. Since we have only one partition for this topic there is only one line.
  • "leader" is the node responsible for all reads and writes for the given partition. Each node will be the leader for a randomly selected portion of the partitions.
  • "replicas" is the list of nodes that replicate the log for this partition regardless of whether they are the leader or even if they are currently alive.
  • "isr" is the set of "in-sync" replicas. This is the subset of the replicas list that is currently alive and caught-up to the leader.
Note that in my example node 1 is the leader for the only partition of the topic.

We can run the same command on the original topic we created to see where it is:

> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic test
Topic:test	PartitionCount:1	ReplicationFactor:1	Configs:
	Topic: test	Partition: 0	Leader: 0	Replicas: 0	Isr: 0
So there is no surprise there—the original topic has no replicas and is on server 0, the only server in our cluster when we created it.

Let's publish a few messages to our new topic:

> bin/kafka-console-producer.sh --broker-list localhost:9092 --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Now let's consume these messages:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C
Now let's test out fault-tolerance. Broker 1 was acting as the leader so let's kill it:
> ps | grep server-1.properties
7564 ttys002    0:15.91 /System/Library/Frameworks/JavaVM.framework/Versions/1.6/Home/bin/java...
> kill -9 7564
Leadership has switched to one of the slaves and node 1 is no longer in the in-sync replica set:
> bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic my-replicated-topic
Topic:my-replicated-topic	PartitionCount:1	ReplicationFactor:3	Configs:
	Topic: my-replicated-topic	Partition: 0	Leader: 2	Replicas: 1,2,0	Isr: 2,0
But the messages are still be available for consumption even though the leader that took the writes originally is down:
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --from-beginning --topic my-replicated-topic
...
my test message 1
my test message 2
^C

Step 7: Use Kafka Connect to import/export data

Writing data from the console and writing it back to the console is a convenient place to start, but you'll probably want to use data from other sources or export data from Kafka to other systems. For many systems, instead of writing custom integration code you can use Kafka Connect to import or export data. Kafka Connect is a tool included with Kafka that imports and exports data to Kafka. It is an extensible tool that runs connectors, which implement the custom logic for interacting with an external system. In this quickstart we'll see how to run Kafka Connect with simple connectors that import data from a file to a Kafka topic and export data from a Kafka topic to a file. First, we'll start by creating some seed data to test with:
> echo -e "foo\nbar" > test.txt
Next, we'll start two connectors running in standalone mode, which means they run in a single, local, dedicated process. We provide three configuration files as parameters. The first is always the configuration for the Kafka Connect process, containing common configuration such as the Kafka brokers to connect to and the serialization format for data. The remaining configuration files each specify a connector to create. These files include a unique connector name, the connector class to instantiate, and any other configuration required by the connector.
> bin/connect-standalone.sh config/connect-standalone.properties config/connect-file-source.properties config/connect-file-sink.properties
These sample configuration files, included with Kafka, use the default local cluster configuration you started earlier and create two connectors: the first is a source connector that reads lines from an input file and produces each to a Kafka topic and the second is a sink connector that reads messages from a Kafka topic and produces each as a line in an output file. During startup you'll see a number of log messages, including some indicating that the connectors are being instantiated. Once the Kafka Connect process has started, the source connector should start reading lines from
test.txt
and producing them to the topic
connect-test
, and the sink connector should start reading messages from the topic
connect-test
and write them to the file
test.sink.txt
. We can verify the data has been delivered through the entire pipeline by examining the contents of the output file:
> cat test.sink.txt
foo
bar
Note that the data is being stored in the Kafka topic
connect-test
, so we can also run a console consumer to see the data in the topic (or use custom consumer code to process it):
> bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic connect-test --from-beginning
{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
...
The connectors continue to process data, so we can add data to the file and see it move through the pipeline:
> echo "Another line" >> test.txt
You should see the line appear in the console consumer output and in the sink file.

1.4 Ecosystem

There are a plethora of tools that integrate with Kafka outside the main distribution. The ecosystem page lists many of these, including stream processing systems, Hadoop integration, monitoring, and deployment tools.

1.5 Upgrading From Previous Versions

Upgrading from 0.8.0, 0.8.1.X or 0.8.2.X to 0.9.0.0

0.9.0.0 has potential breaking changes (please review before upgrading) and an inter-broker protocol change from previous versions. This means that upgraded brokers and clients may not be compatible with older versions. It is important that you upgrade your Kafka cluster before upgrading your clients. If you are using MirrorMaker downstream clusters should be upgraded first as well.

For a rolling upgrade:

  1. Update server.properties file on all brokers and add the following property: inter.broker.protocol.version=0.8.2.X
  2. Upgrade the brokers. This can be done a broker at a time by simply bringing it down, updating the code, and restarting it.
  3. Once the entire cluster is upgraded, bump the protocol version by editing inter.broker.protocol.version and setting it to 0.9.0.0.
  4. Restart the brokers one by one for the new protocol version to take effect

Note: If you are willing to accept downtime, you can simply take all the brokers down, update the code and start all of them. They will start with the new protocol by default.

Note: Bumping the protocol version and restarting can be done any time after the brokers were upgraded. It does not have to be immediately after.

Potential breaking changes in 0.9.0.0
  • Java 1.6 is no longer supported.
  • Scala 2.9 is no longer supported.
  • Broker IDs above 1000 are now reserved by default to automatically assigned broker IDs. If your cluster has existing broker IDs above that threshold make sure to increase the reserved.broker.max.id broker configuration property accordingly.
  • Configuration parameter replica.lag.max.messages was removed. Partition leaders will no longer consider the number of lagging messages when deciding which replicas are in sync.
  • Configuration parameter replica.lag.time.max.ms now refers not just to the time passed since last fetch request from replica, but also to time since the replica last caught up. Replicas that are still fetching messages from leaders but did not catch up to the latest messages in replica.lag.time.max.ms will be considered out of sync.
  • Compacted topics no longer accept messages without key and an exception is thrown by the producer if this is attempted. In 0.8.x, a message without key would cause the log compaction thread to subsequently complain and quit (and stop compacting all compacted topics).
  • MirrorMaker no longer supports multiple target clusters. As a result it will only accept a single --consumer.config parameter. To mirror multiple source clusters, you will need at least one MirrorMaker instance per source cluster, each with its own consumer configuration.
  • Tools packaged under org.apache.kafka.clients.tools.* have been moved to org.apache.kafka.tools.*. All included scripts will still function as usual, only custom code directly importing these classes will be affected.
  • The default Kafka JVM performance options (KAFKA_JVM_PERFORMANCE_OPTS) have been changed in kafka-run-class.sh.
  • The kafka-topics.sh script (kafka.admin.TopicCommand) now exits with non-zero exit code on failure.
  • The kafka-topics.sh script (kafka.admin.TopicCommand) will now print a warning when topic names risk metric collisions due to the use of a '.' or '_' in the topic name, and error in the case of an actual collision.
  • The kafka-console-producer.sh script (kafka.tools.ConsoleProducer) will use the new producer instead of the old producer be default, and users have to specify 'old-producer' to use the old producer.
  • By default all command line tools will print all logging messages to stderr instead of stout.
Notable changes in 0.9.0.1
  • The new broker id generation feature can be disable by setting broker.id.generation.enable to false.
  • Configuration parameter log.cleaner.enable is now true by default. This means topics with a cleanup.policy=compact will now be compacted by default, and 128 MB of heap will be allocated to the cleaner process via log.cleaner.dedupe.buffer.size. You may want to review log.cleaner.dedupe.buffer.size and the other log.cleaner configuration values based on your usage of compacted topics.
  • Default value of configuration parameter fetch.min.bytes for the new consumer is now 1 by default.
Deprecations in 0.9.0.0
  • Altering topic configuration from the kafka-topics.sh script (kafka.admin.TopicCommand) has been deprecated. Going forward, please use the kafka-configs.sh script (kafka.admin.ConfigCommand) for this functionality.
  • The kafka-consumer-offset-checker.sh (kafka.tools.ConsumerOffsetChecker) has been deprecated. Going forward, please use kafka-consumer-groups.sh (kafka.admin.ConsumerGroupCommand) for this functionality.
  • The kafka.tools.ProducerPerformance class has been deprecated. Going forward, please use org.apache.kafka.tools.ProducerPerformance for this functionality (kafka-producer-perf-test.sh will also be changed to use the new class).

Upgrading from 0.8.1 to 0.8.2

0.8.2 is fully compatible with 0.8.1. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.

Upgrading from 0.8.0 to 0.8.1

0.8.1 is fully compatible with 0.8. The upgrade can be done one broker at a time by simply bringing it down, updating the code, and restarting it.

Upgrading from 0.7

Release 0.7 is incompatible with newer releases. Major changes were made to the API, ZooKeeper data structures, and protocol, and configuration in order to add replication (Which was missing in 0.7). The upgrade from 0.7 to later versions requires a special tool for migration. This migration can be done without downtime.

2. API

Apache Kafka includes new java clients (in the org.apache.kafka.clients package). These are meant to supplant the older Scala clients, but for compatability they will co-exist for some time. These clients are available in a seperate jar with minimal dependencies, while the old Scala clients remain packaged with the server.

2.1 Producer API

We encourage all new development to use the new Java producer. This client is production tested and generally both faster and more fully featured than the previous Scala client. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.9.0.0</version>
	</dependency>
Examples showing how to use the producer are given in the javadocs.

For those interested in the legacy Scala producer api, information can be found here.

2.2 Consumer API

As of the 0.9.0 release we have added a new Java consumer to replace our existing high-level ZooKeeper-based consumer and low-level consumer APIs. This client is considered beta quality. To ensure a smooth upgrade path for users, we still maintain the old 0.8 consumer clients that continue to work on an 0.9 Kafka cluster. In the following sections we introduce both the old 0.8 consumer APIs (both high-level ConsumerConnector and low-level SimpleConsumer) and the new Java consumer API respectively.

2.2.1 Old High Level Consumer API

class Consumer {
  /**
   *  Create a ConsumerConnector
   *
   *  @param config  at the minimum, need to specify the groupid of the consumer and the zookeeper
   *                 connection string zookeeper.connect.
   */
  public static kafka.javaapi.consumer.ConsumerConnector createJavaConsumerConnector(ConsumerConfig config);
}

/**
 *  V: type of the message
 *  K: type of the optional key assciated with the message
 */
public interface kafka.javaapi.consumer.ConsumerConnector {
  /**
   *  Create a list of message streams of type T for each topic.
   *
   *  @param topicCountMap  a map of (topic, #streams) pair
   *  @param decoder a decoder that converts from Message to T
   *  @return a map of (topic, list of  KafkaStream) pairs.
   *          The number of items in the list is #streams. Each stream supports
   *          an iterator over message/metadata pairs.
   */
  public <K,V> Map<String, List<KafkaStream<K,V>>>
    createMessageStreams(Map<String, Integer> topicCountMap, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

  /**
   *  Create a list of message streams of type T for each topic, using the default decoder.
   */
  public Map<String, List<KafkaStream<byte[], byte[]>>> createMessageStreams(Map<String, Integer> topicCountMap);

  /**
   *  Create a list of message streams for topics matching a wildcard.
   *
   *  @param topicFilter a TopicFilter that specifies which topics to
   *                    subscribe to (encapsulates a whitelist or a blacklist).
   *  @param numStreams the number of message streams to return.
   *  @param keyDecoder a decoder that decodes the message key
   *  @param valueDecoder a decoder that decodes the message itself
   *  @return a list of KafkaStream. Each stream supports an
   *          iterator over its MessageAndMetadata elements.
   */
  public <K,V> List<KafkaStream<K,V>>
    createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams, Decoder<K> keyDecoder, Decoder<V> valueDecoder);

  /**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder.
   */
  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter, int numStreams);

  /**
   *  Create a list of message streams for topics matching a wildcard, using the default decoder, with one stream.
   */
  public List<KafkaStream<byte[], byte[]>> createMessageStreamsByFilter(TopicFilter topicFilter);

  /**
   *  Commit the offsets of all topic/partitions connected by this connector.
   */
  public void commitOffsets();

  /**
   *  Shut down the connector
   */
  public void shutdown();
}

You can follow this example to learn how to use the high level consumer api.

2.2.2 Old Simple Consumer API

class kafka.javaapi.consumer.SimpleConsumer {
  /**
   *  Fetch a set of messages from a topic.
   *
   *  @param request specifies the topic name, topic partition, starting byte offset, maximum bytes to be fetched.
   *  @return a set of fetched messages
   */
  public FetchResponse fetch(kafka.javaapi.FetchRequest request);

  /**
   *  Fetch metadata for a sequence of topics.
   *
   *  @param request specifies the versionId, clientId, sequence of topics.
   *  @return metadata for each topic in the request.
   */
  public kafka.javaapi.TopicMetadataResponse send(kafka.javaapi.TopicMetadataRequest request);

  /**
   *  Get a list of valid offsets (up to maxSize) before the given time.
   *
   *  @param request a [[kafka.javaapi.OffsetRequest]] object.
   *  @return a [[kafka.javaapi.OffsetResponse]] object.
   */
  public kafka.javaapi.OffsetResponse getOffsetsBefore(OffsetRequest request);

  /**
   * Close the SimpleConsumer.
   */
  public void close();
}
For most applications, the high level consumer Api is good enough. Some applications want features not exposed to the high level consumer yet (e.g., set initial offset when restarting the consumer). They can instead use our low level SimpleConsumer Api. The logic will be a bit more complicated and you can follow the example in here.

2.2.3 New Consumer API

This new unified consumer API removes the distinction between the 0.8 high-level and low-level consumer APIs. You can use this client by adding a dependency on the client jar using the following example maven co-ordinates (you can change the version numbers with new releases):
	<dependency>
	    <groupId>org.apache.kafka</groupId>
	    <artifactId>kafka-clients</artifactId>
	    <version>0.9.0.0</version>
	</dependency>
Examples showing how to use the consumer are given in the javadocs.

3. Configuration

Kafka uses key-value pairs in the property file format for configuration. These values can be supplied either from a file or programmatically.

3.1 Broker Configs

The essential configurations are the following:
  • broker.id
  • log.dirs
  • zookeeper.connect
Topic-level configurations and defaults are discussed in more detail below.
Name Description Type Default Valid Values Importance
zookeeper.connectZookeeper host stringstringhigh
advertised.host.nameHostname to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, it will use the value for "host.name" if configured. Otherwise it will use the value returned from java.net.InetAddress.getCanonicalHostName().stringnullhigh
advertised.listenersListeners to publish to ZooKeeper for clients to use, if different than the listeners above. In IaaS environments, this may need to be different from the interface to which the broker binds. If this is not set, the value for "listeners" will be used.stringnullhigh
advertised.portThe port to publish to ZooKeeper for clients to use. In IaaS environments, this may need to be different from the port to which the broker binds. If this is not set, it will publish the same port that the broker binds to.intnullhigh
auto.create.topics.enableEnable auto creation of topic on the serverbooleantruehigh
auto.leader.rebalance.enableEnables auto leader balancing. A background thread checks and triggers leader balance if required at regular intervalsbooleantruehigh
background.threadsThe number of threads to use for various background processing tasksint10[1,...]high
broker.idThe broker id for this server. To avoid conflicts between zookeeper generated brokerId and user's config.brokerId added MaxReservedBrokerId and zookeeper sequence starts from MaxReservedBrokerId + 1.int-1high
compression.typeSpecify the final compression type for a given topic. This configuration accepts the standard compression codecs ('gzip', 'snappy', lz4). It additionally accepts 'uncompressed' which is equivalent to no compression; and 'producer' which means retain the original compression codec set by the producer.stringproducerhigh
delete.topic.enableEnables delete topic. Delete topic through the admin tool will have no effect if this config is turned offbooleanfalsehigh
host.namehostname of broker. If this is set, it will only bind to this address. If this is not set, it will bind to all interfacesstring""high
leader.imbalance.check.interval.secondsThe frequency with which the partition rebalance check is triggered by the controllerlong300high
leader.imbalance.per.broker.percentageThe ratio of leader imbalance allowed per broker. The controller would trigger a leader balance if it goes above this value per broker. The value is specified in percentage.int10high
listenersListener List - Comma-separated list of URIs we will listen on and their protocols. Specify hostname as 0.0.0.0 to bind to all interfaces. Leave hostname empty to bind to default interface. Examples of legal listener lists: PLAINTEXT://myhost:9092,TRACE://:9091 PLAINTEXT://0.0.0.0:9092, TRACE://localhost:9093 stringnullhigh
log.dirThe directory in which the log data is kept (supplemental for log.dirs property)string/tmp/kafka-logshigh
log.dirsThe directories in which the log data is kept. If not set, the value in log.dir is usedstringnullhigh
log.flush.interval.messagesThe number of messages accumulated on a log partition before messages are flushed to disk long9223372036854775807[1,...]high
log.flush.interval.msThe maximum time in ms that a message in any topic is kept in memory before flushed to disk. If not set, the value in log.flush.scheduler.interval.ms is usedlongnullhigh
log.flush.offset.checkpoint.interval.msThe frequency with which we update the persistent record of the last flush which acts as the log recovery pointint60000[0,...]high
log.flush.scheduler.interval.msThe frequency in ms that the log flusher checks whether any log needs to be flushed to disklong9223372036854775807high
log.retention.bytesThe maximum size of the log before deleting itlong-1high
log.retention.hoursThe number of hours to keep a log file before deleting it (in hours), tertiary to log.retention.ms propertyint168high
log.retention.minutesThe number of minutes to keep a log file before deleting it (in minutes), secondary to log.retention.ms property. If not set, the value in log.retention.hours is usedintnullhigh
log.retention.msThe number of milliseconds to keep a log file before deleting it (in milliseconds), If not set, the value in log.retention.minutes is usedlongnullhigh
log.roll.hoursThe maximum time before a new log segment is rolled out (in hours), secondary to log.roll.ms propertyint168[1,...]high
log.roll.jitter.hoursThe maximum jitter to subtract from logRollTimeMillis (in hours), secondary to log.roll.jitter.ms propertyint0[0,...]high
log.roll.jitter.msThe maximum jitter to subtract from logRollTimeMillis (in milliseconds). If not set, the value in log.roll.jitter.hours is usedlongnullhigh
log.roll.msThe maximum time before a new log segment is rolled out (in milliseconds). If not set, the value in log.roll.hours is usedlongnullhigh
log.segment.bytesThe maximum size of a single log fileint1073741824[14,...]high
log.segment.delete.delay.msThe amount of time to wait before deleting a file from the filesystemlong60000[0,...]high
message.max.bytesThe maximum size of message that the server can receiveint1000012[0,...]high
min.insync.replicasdefine the minimum number of replicas in ISR needed to satisfy a produce request with required.acks=-1 (or all)int1[1,...]high
num.io.threadsThe number of io threads that the server uses for carrying out network requestsint8[1,...]high
num.network.threadsthe number of network threads that the server uses for handling network requestsint3[1,...]high
num.recovery.threads.per.data.dirThe number of threads per data directory to be used for log recovery at startup and flushing at shutdownint1[1,...]high
num.replica.fetchersNumber of fetcher threads used to replicate messages from a source broker. Increasing this value can increase the degree of I/O parallelism in the follower broker.int1high
offset.metadata.max.bytesThe maximum size for a metadata entry associated with an offset commitint4096high
offsets.commit.required.acksThe required acks before the commit can be accepted. In general, the default (-1) should not be overriddenshort-1high
offsets.commit.timeout.msOffset commit will be delayed until all replicas for the offsets topic receive the commit or this timeout is reached. This is similar to the producer request timeout.int5000[1,...]high
offsets.load.buffer.sizeBatch size for reading from the offsets segments when loading offsets into the cache.int5242880[1,...]high
offsets.retention.check.interval.msFrequency at which to check for stale offsetslong600000[1,...]high
offsets.retention.minutesLog retention window in minutes for offsets topicint1440[1,...]high
offsets.topic.compression.codecCompression codec for the offsets topic - compression may be used to achieve "atomic" commitsint0high
offsets.topic.num.partitionsThe number of partitions for the offset commit topic (should not change after deployment)int50[1,...]high
offsets.topic.replication.factorThe replication factor for the offsets topic (set higher to ensure availability). To ensure that the effective replication factor of the offsets topic is the configured value, the number of alive brokers has to be at least the replication factor at the time of the first request for the offsets topic. If not, either the offsets topic creation will fail or it will get a replication factor of min(alive brokers, configured replication factor)short3[1,...]high
offsets.topic.segment.bytesThe offsets topic segment bytes should be kept relatively small in order to facilitate faster log compaction and cache loadsint104857600[1,...]high
portthe port to listen and accept connections onint9092high
queued.max.requestsThe number of queued requests allowed before blocking the network threadsint500[1,...]high
quota.consumer.defaultAny consumer distinguished by clientId/consumer group will get throttled if it fetches more bytes than this value per-secondlong9223372036854775807[1,...]high
quota.producer.defaultAny producer distinguished by clientId will get throttled if it produces more bytes than this value per-secondlong9223372036854775807[1,...]high
replica.fetch.max.bytesThe number of byes of messages to attempt to fetchint1048576high
replica.fetch.min.bytesMinimum bytes expected for each fetch response. If not enough bytes, wait up to replicaMaxWaitTimeMsint1high
replica.fetch.wait.max.msmax wait time for each fetcher request issued by follower replicas. This value should always be less than the replica.lag.time.max.ms at all times to prevent frequent shrinking of ISR for low throughput topicsint500high
replica.high.watermark.checkpoint.interval.msThe frequency with which the high watermark is saved out to disklong5000high
replica.lag.time.max.msIf a follower hasn't sent any fetch requests or hasn't consumed up to the leaders log end offset for at least this time, the leader will remove the follower from isrlong10000high
replica.socket.receive.buffer.bytesThe socket receive buffer for network requestsint65536high
replica.socket.timeout.msThe socket timeout for network requests. Its value should be at least replica.fetch.wait.max.msint30000high
request.timeout.msThe configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.int30000high
socket.receive.buffer.bytesThe SO_RCVBUF buffer of the socket sever socketsint102400high
socket.request.max.bytesThe maximum number of bytes in a socket requestint104857600[1,...]high
socket.send.buffer.bytesThe SO_SNDBUF buffer of the socket sever socketsint102400high
unclean.leader.election.enableIndicates whether to enable replicas not in the ISR set to be elected as leader as a last resort, even though doing so may result in data lossbooleantruehigh
zookeeper.connection.timeout.msThe max time that the client waits to establish a connection to zookeeper. If not set, the value in zookeeper.session.timeout.ms is usedintnullhigh
zookeeper.session.timeout.msZookeeper session timeoutint6000high
zookeeper.set.aclSet client to use secure ACLsbooleanfalsehigh
broker.id.generation.enableEnable automatic broker id generation on the server? When enabled the value configured for reserved.broker.max.id should be reviewed.booleantruemedium
connections.max.idle.msIdle connections timeout: the server socket processor threads close the connections that idle more than thislong600000medium
controlled.shutdown.enableEnable controlled shutdown of the serverbooleantruemedium
controlled.shutdown.max.retriesControlled shutdown can fail for multiple reasons. This determines the number of retries when such failure happensint3medium
controlled.shutdown.retry.backoff.msBefore each retry, the system needs time to recover from the state that caused the previous failure (Controller fail over, replica lag etc). This config determines the amount of time to wait before retrying.long5000medium
controller.socket.timeout.msThe socket timeout for controller-to-broker channelsint30000medium
default.replication.factordefault replication factors for automatically created topicsint1medium
fetch.purgatory.purge.interval.requestsThe purge interval (in number of requests) of the fetch request purgatoryint1000medium
group.max.session.timeout.msThe maximum allowed session timeout for registered consumersint30000medium
group.min.session.timeout.msThe minimum allowed session timeout for registered consumersint6000medium
inter.broker.protocol.versionSpecify which version of the inter-broker protocol will be used. This is typically bumped after all brokers were upgraded to a new version. Example of some valid values are: 0.8.0, 0.8.1, 0.8.1.1, 0.8.2, 0.8.2.0, 0.8.2.1, 0.9.0.0, 0.9.0.1 Check ApiVersion for the full list.string0.9.0.Xmedium
log.cleaner.backoff.msThe amount of time to sleep when there are no logs to cleanlong15000[0,...]medium
log.cleaner.dedupe.buffer.sizeThe total memory used for log deduplication across all cleaner threadslong134217728medium
log.cleaner.delete.retention.msHow long are delete records retained?long86400000medium
log.cleaner.enableEnable the log cleaner process to run on the server? Should be enabled if using any topics with a cleanup.policy=compact including the internal offsets topic. If disabled those topics will not be compacted and continually grow in size.booleantruemedium
log.cleaner.io.buffer.load.factorLog cleaner dedupe buffer load factor. The percentage full the dedupe buffer can become. A higher value will allow more log to be cleaned at once but will lead to more hash collisionsdouble0.9medium
log.cleaner.io.buffer.sizeThe total memory used for log cleaner I/O buffers across all cleaner threadsint524288[0,...]medium
log.cleaner.io.max.bytes.per.secondThe log cleaner will be throttled so that the sum of its read and write i/o will be less than this value on averagedouble1.7976931348623157E308medium
log.cleaner.min.cleanable.ratioThe minimum ratio of dirty log to total log for a log to eligible for cleaningdouble0.5medium
log.cleaner.threadsThe number of background threads to use for log cleaningint1[0,...]medium
log.cleanup.policyThe default cleanup policy for segments beyond the retention window, must be either "delete" or "compact"stringdelete[compact, delete]medium
log.index.interval.bytesThe interval with which we add an entry to the offset indexint4096[0,...]medium
log.index.size.max.bytesThe maximum size in bytes of the offset indexint10485760[4,...]medium
log.preallocateShould pre allocate file when create new segment? If you are using Kafka on Windows, you probably need to set it to true.booleanfalsemedium
log.retention.check.interval.msThe frequency in milliseconds that the log cleaner checks whether any log is eligible for deletionlong300000[1,...]medium
max.connections.per.ipThe maximum number of connections we allow from each ip addressint2147483647[1,...]medium
max.connections.per.ip.overridesPer-ip or hostname overrides to the default maximum number of connectionsstring""medium
num.partitionsThe default number of log partitions per topicint1[1,...]medium
principal.builder.classThe fully qualified name of a class that implements the PrincipalBuilder interface, which is currently used to build the Principal for connections with the SSL SecurityProtocol.classclass org.apache.kafka.common.security.auth.DefaultPrincipalBuildermedium
producer.purgatory.purge.interval.requestsThe purge interval (in number of requests) of the producer request purgatoryint1000medium
replica.fetch.backoff.msThe amount of time to sleep when fetch partition error occurs.int1000[0,...]medium
reserved.broker.max.idMax number that can be used for a broker.idint1000[0,...]medium
sasl.kerberos.kinit.cmdKerberos kinit command path.string/usr/bin/kinitmedium
sasl.kerberos.min.time.before.reloginLogin thread sleep time between refresh attempts.long60000medium
sasl.kerberos.principal.to.local.rulesA list of rules for mapping from principal names to short names (typically operating system usernames). The rules are evaluated in order and the first rule that matches a principal name is used to map it to a short name. Any later rules in the list are ignored. By default, principal names of the form {username}/{hostname}@{REALM} are mapped to {username}. For more details on the format please see security authorization and acls.list[DEFAULT]medium
sasl.kerberos.service.nameThe Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.stringnullmedium
sasl.kerberos.ticket.renew.jitterPercentage of random jitter added to the renewal time.double0.05medium
sasl.kerberos.ticket.renew.window.factorLogin thread will sleep until the specified window factor of time from last refresh to ticket's expiry has been reached, at which time it will try to renew the ticket.double0.8medium
security.inter.broker.protocolSecurity protocol used to communicate between brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.stringPLAINTEXTmedium
ssl.cipher.suitesA list of cipher suites. This is a named combination of authentication, encryption, MAC and key exchange algorithm used to negotiate the security settings for a network connection using TLS or SSL network protocol.By default all the available cipher suites are supported.listnullmedium
ssl.client.authConfigures kafka broker to request client authentication. The following settings are common:
  • ssl.client.auth=required If set to required client authentication is required.
  • ssl.client.auth=requested This means client authentication is optional. unlike requested , if this option is set client can choose not to provide authentication information about itself
  • ssl.client.auth=none This means client authentication is not needed.
stringnone[required, requested, none]medium
ssl.enabled.protocolsThe list of protocols enabled for SSL connections.list[TLSv1.2, TLSv1.1, TLSv1]medium
ssl.key.passwordThe password of the private key in the key store file. This is optional for client.passwordnullmedium
ssl.keymanager.algorithmThe algorithm used by key manager factory for SSL connections. Default value is the key manager factory algorithm configured for the Java Virtual Machine.stringSunX509medium
ssl.keystore.locationThe location of the key store file. This is optional for client and can be used for two-way authentication for client.stringnullmedium
ssl.keystore.passwordThe store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. passwordnullmedium
ssl.keystore.typeThe file format of the key store file. This is optional for client.stringJKSmedium
ssl.protocolThe SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.stringTLSmedium
ssl.providerThe name of the security provider used for SSL connections. Default value is the default security provider of the JVM.stringnullmedium
ssl.trustmanager.algorithmThe algorithm used by trust manager factory for SSL connections. Default value is the trust manager factory algorithm configured for the Java Virtual Machine.stringPKIXmedium
ssl.truststore.locationThe location of the trust store file. stringnullmedium
ssl.truststore.passwordThe password for the trust store file. passwordnullmedium
ssl.truststore.typeThe file format of the trust store file.stringJKSmedium
authorizer.class.nameThe authorizer class that should be used for authorizationstring""low
metric.reportersA list of classes to use as metrics reporters. Implementing the MetricReporter interface allows plugging in classes that will be notified of new metric creation. The JmxReporter is always included to register JMX statistics.list[]low
metrics.num.samplesThe number of samples maintained to compute metrics.int2[1,...]low
metrics.sample.window.msThe number of samples maintained to compute metrics.long30000[1,...]low
quota.window.numThe number of samples to retain in memoryint11[1,...]low
quota.window.size.secondsThe time span of each sampleint1[1,...]low
ssl.endpoint.identification.algorithmThe endpoint identification algorithm to validate server hostname using server certificate. stringnulllow
zookeeper.sync.time.msHow far a ZK follower can be behind a ZK leaderint2000low

More details about broker configuration can be found in the scala class kafka.server.KafkaConfig.

Topic-level configuration Configurations pertinent to topics have both a global default as well an optional per-topic override. If no per-topic configuration is given the global default is used. The override can be set at topic creation time by giving one or more --config options. This example creates a topic named my-topic with a custom max message size and flush rate:
 > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic my-topic --partitions 1
        --replication-factor 1 --config max.message.bytes=64000 --config flush.messages=1
Overrides can also be changed or set later using the alter topic command. This example updates the max message size for my-topic:
 > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --config max.message.bytes=128000
To remove an override you can do
 > bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic my-topic
    --deleteConfig max.message.bytes
The following are the topic-level configurations. The server's default configuration for this property is given under the Server Default Property heading, setting this default in the server config allows you to change the default given to topics that have no override specified.
Property Default Server Default Property Description
cleanup.policy delete log.cleanup.policy A string that is either "delete" or "compact". This string designates the retention policy to use on old log segments. The default policy ("delete") will discard old segments when their retention time or size limit has been reached. The "compact" setting will enable log compaction on the topic.
delete.retention.ms 86400000 (24 hours) log.cleaner.delete.retention.ms The amount of time to retain delete tombstone markers for log compacted topics. This setting also gives a bound on the time in which a consumer must complete a read if they begin from offset 0 to ensure that they get a valid snapshot of the final stage (otherwise delete tombstones may be collected before they complete their scan).
flush.messages None log.flush.interval.messages This setting allows specifying an interval at which we will force an fsync of data written to the log. For example if this was set to 1 we would fsync after every message; if it were 5 we would fsync after every five messages. In general we recommend you not set this and use replication for durability and allow the operating system's background flush capabilities as it is more efficient. This setting can be overridden on a per-topic basis (see the per-topic configuration section).
flush.ms None log.flush.interval.ms This setting allows specifying a time interval at which we will force an fsync of data written to the log. For example if this was set to 1000 we would fsync after 1000 ms had passed. In general we recommend you not set this and use replication for durability and allow the operating system's background flush capabilities as it is more efficient.
index.interval.bytes 4096 log.index.interval.bytes This setting controls how frequently Kafka adds an index entry to it's offset index. The default setting ensures that we index a message roughly every 4096 bytes. More indexing allows reads to jump closer to the exact position in the log but makes the index larger. You probably don't need to change this.
max.message.bytes 1,000,000 message.max.bytes This is largest message size Kafka will allow to be appended to this topic. Note that if you increase this size you must also increase your consumer's fetch size so they can fetch messages this large.
min.cleanable.dirty.ratio 0.5 log.cleaner.min.cleanable.ratio This configuration controls how frequently the log compactor will attempt to clean the log (assuming log compaction is enabled). By default we will avoid cleaning a log where more than 50% of the log has been compacted. This ratio bounds the maximum space wasted in the log by duplicates (at 50% at most 50% of the log could be duplicates). A higher ratio will mean fewer, more efficient cleanings but will mean more wasted space in the log.
min.insync.replicas 1 min.insync.replicas When a producer sets request.required.acks to -1, min.insync.replicas specifies the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend). When used together, min.insync.replicas and request.required.acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with request.required.acks of -1. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
retention.bytes None log.retention.bytes This configuration controls the maximum size a log can grow to before we will discard old log segments to free up space if we are using the "delete" retention policy. By default there is no size limit only a time limit.
retention.ms 7 days log.retention.minutes This configuration controls the maximum time we will retain a log before we will discard old log segments to free up space if we are using the "delete" retention policy. This represents an SLA on how soon consumers must read their data.
segment.bytes 1 GB log.segment.bytes This configuration controls the segment file size for the log. Retention and cleaning is always done a file at a time so a larger segment size means fewer files but less granular control over retention.
segment.index.bytes 10 MB log.index.size.max.bytes This configuration controls the size of the index that maps offsets to file positions. We preallocate this index file and shrink it only after log rolls. You generally should not need to change this setting.
segment.ms 7 days log.roll.hours This configuration controls the period of time after which Kafka will force the log to roll even if the segment file isn't full to ensure that retention can delete or compact old data.
segment.jitter.ms 0 log.roll.jitter.{ms,hours} The maximum jitter to subtract from logRollTimeMillis.

3.2 Producer Configs

Below is the configuration of the Java producer:
Name Description Type Default Valid Values Importance
bootstrap.serversA list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping—this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).listhigh
key.serializerSerializer class for key that implements the Serializer interface.classhigh
value.serializerSerializer class for value that implements the Serializer interface.classhigh
acksThe number of acknowledgments the producer requires the leader to have received before considering a request complete. This controls the durability of records that are sent. The following settings are common:
  • acks=0 If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retries configuration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.
  • acks=1 This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers. In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
  • acks=all This means the leader will wait for the full set of in-sync replicas to acknowledge the record. This guarantees that the record will not be lost as long as at least one in-sync replica remains alive. This is the strongest available guarantee.
string1[all, -1, 0, 1]high
buffer.memoryThe total bytes of memory the producer can use to buffer records waiting to be sent to the server. If records are sent faster than they can be delivered to the server the producer will either block or throw an exception based on the preference specified by block.on.buffer.full.

This setting should correspond roughly to the total memory the producer will use, but is not a hard bound since not all memory the producer uses is used for buffering. Some additional memory will be used for compression (if compression is enabled) as well as for maintaining in-flight requests.

long33554432[0,...]high
compression.typeThe compression type for all data generated by the producer. The default is none (i.e. no compression). Valid values are none, gzip, snappy, or lz4. Compression is of full batches of data, so the efficacy of batching will also impact the compression ratio (more batching means better compression).stringnonehigh
retriesSetting a value greater than zero will cause the client to resend any record whose send fails with a potentially transient error. Note that this retry is no different than if the client resent the record upon receiving the error. Allowing retries will potentially change the ordering of records because if two records are sent to a single partition, and the first fails and is retried but the second succeeds, then the second record may appear first.int0[0,...,2147483647]high
ssl.key.passwordThe password of the private key in the key store file. This is optional for client.passwordnullhigh
ssl.keystore.locationThe location of the key store file. This is optional for client and can be used for two-way authentication for client.stringnullhigh
ssl.keystore.passwordThe store password for the key store file.This is optional for client and only needed if ssl.keystore.location is configured. passwordnullhigh
ssl.truststore.locationThe location of the trust store file. stringnullhigh
ssl.truststore.passwordThe password for the trust store file. passwordnullhigh
batch.sizeThe producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

No attempt will be made to batch records larger than this size.

Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.

A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.

int16384[0,...]medium
client.idAn id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.string""medium
connections.max.idle.msClose idle connections after the number of milliseconds specified by this config.long540000medium
linger.msThe producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay—that is, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This can be thought of as analogous to Nagle's algorithm in TCP. This setting gives the upper bound on the delay for batching: once we get batch.size worth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absense of load.long0[0,...]medium
max.block.msThe configuration controls how long {@link KafkaProducer#send()} and {@link KafkaProducer#partitionsFor} will block.These methods can be blocked either because the buffer is full or metadata unavailable.Blocking in the user-supplied serializers or partitioner will not be counted against this timeout.long60000[0,...]medium
max.request.sizeThe maximum size of a request. This is also effectively a cap on the maximum record size. Note that the server has its own cap on record size which may be different from this. This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.int1048576[0,...]medium
partitioner.classPartitioner class that implements the Partitioner interface.classclass org.apache.kafka.clients.producer.internals.DefaultPartitionermedium
receive.buffer.bytesThe size of the TCP receive buffer (SO_RCVBUF) to use when reading data.int32768[0,...]medium
request.timeout.msThe configuration controls the maximum amount of time the client will wait for the response of a request. If the response is not received before the timeout elapses the client will resend the request if necessary or fail the request if retries are exhausted.int30000[0,...]medium
sasl.kerberos.service.nameThe Kerberos principal name that Kafka runs as. This can be defined either in Kafka's JAAS config or in Kafka's config.stringnullmedium
security.protocolProtocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.stringPLAINTEXTmedium
send.buffer.bytesThe size of the TCP send buffer (SO_SNDBUF) to use when sending data.int131072[0,...]medium
ssl.enabled.protocolsThe list of protocols enabled for SSL connections.list[TLSv1.2, TLSv1.1, TLSv1]medium
ssl.keystore.typeThe file format of the key store file. This is optional for client.stringJKSmedium
ssl.protocolThe SSL protocol used to generate the SSLContext. Default setting is TLS, which is fine for most cases. Allowed values in recent JVMs are TLS, TLSv1.1 and TLSv1.2. SSL, SSLv2 and SSLv3 may be supported in older JVMs, but their usage is discouraged due to known security vulnerabilities.stringTLSmedium
ssl.providerThe name of the security provider used for SSL connections. Default value is the default security provider of the JVM.stringnullmedium
ssl.truststore.typeThe file format of the trust store file.stringJKSmedium
timeout.msThe configuration controls the maximum amount of time the server will wait for acknowledgments from followers to meet the acknowledgment requirements the producer has specified with the acks configuration. If the requested number of acknowledgments are not met when the timeout elapses an error will be returned. This timeout is measured on the server side and does not include the network latency of the request.int30000[0,...]medium
block.on.buffer.fullWhen our memory buffer is exhausted we must either stop accepting new records (block) or throw errors. By default