NOTE This is a work in progress. Unlike your typical blog post, this is going to be something of a “living document” while I iron out the rough edges.

DISCLAIMER I’m filling in gaps in my own knowledge as I go. Given that & the length of this document, it’s likely I’ve gotten plenty wrong. So don’t hesitate to drop me a line if I’ve gone and goofed something up.

Contents

  1. Why this document?
    1. Why should I believe your ravings?
    2. So this is a specification, then?
    3. Assumptions
  2. Group Coordinators, Leaders and Protocols
  3. What is the purpose of a consumer rebalance?
  4. Reasons for a rebalance
  5. Consumer rebalances from 10,000ft
    1. Rebalances as Double Barriers
  6. A little more detail
    1. Acquiesce
    2. Find the group coordinator
    3. Commit offsets
    4. Join or rejoin the consumer group
    5. Sync group members and assign partitions
    6. Fetch last committed offsets
  7. After the rebalance
    1. Continue sending periodic heartbeats
    2. Send fetch requests to read from partition leaders
    3. Periodically commit offsets if necessary
    4. Leave the group when the consumer is closing
  8. Watch for errors!
  9. Other advice
    1. Think “streams” not “queues”
    2. Use partitions as a natural leverage point for parallelism
    3. Retriable vs. non-retriable errors

Why this document?

This document explains how Kafka consumer rebalances work in excruciating detail. It’s not intended to be totally comprehensive nor too prescriptive, but hopefully useful enough for folks working on client libraries to have a solid chance of implementing consumer rebalances properly.

Why should I believe your ravings?

I’ve both written services that use Kafka and helped run various Kafka clusters in production for a few years. Other folks I’ve worked with in the past (and presently worth with in the present) know this Kafka thing much better than I do, but sadly – this being my blog and all – you’re stuck with me.

I’ve also spent a ton of time buried in Kafka client libraries across multiple languages for reasons I’m trying to forget. I’ve come away from it all a little frustrated – primarily because the quality of client libraries has been a bit lacking. This is the point you might tell me there’s librdkafka, but dealing with native extensions & language bindings can be a hassle in its own right. For better or worse, I tend to prefer rock solid libraries written in my language of choice (JVM/CLR excepted, I suppose).

Anyway, the quality issue is not solely at the feet of Kafka client library authors. These folks are doing some great work so that folks like you and I don’t have to. The real issue I think is that the Kafka protocol puts a lot of responsibility on the client and though the specific messages exchanged between clients and brokers are fairly well documented, there’s no solid specification of how a client should behave during more complicated exchanges with brokers. Consumer rebalances are one such example where I think a solid specification would go far.

So this is a specification, then?

Well, no. Though I’d love a thorough specification, this document is not one, nor is it even an official piece of Kafka(tm) documentation. Heck, I’m not even a Kafka contributor. But I have spent enough time looking at Kafka source code and third party Kafka client library source code in various languages (usually during or following production incidents) to be able to navigate my way around this stuff maybe a little better than the average bear. Regardless, I hope it serves as a source of useful information for client library authors who might want/need a resource that isn’t somebody else’s source code.

I also hope that it’s useful for folks building services on top of Kafka who maybe just want to get a bit of a better understanding of what’s supposed to happen during a consumer rebalance.

If nothing else I’m glad this document exists for me, because I guarantee you I’ll just as quickly write something down here as forget it.

Assumptions

I’m assuming general familiarity with terminology that is typically well-understood by Kafka users: brokers, topics, partitions, groups, etc. I’ll cover some slightly less common terminology specific to rebalances in this document.

It’s probably worth noting that most of my knowledge comes from older versions of Kafka (0.8.x and 0.10.x), but very little should change in the overall mechanics in newer versions. My understanding is that in spite of sweeping internal changes in brokers and clients, at the protocol level there are small changes between 0.10 and say 2.0. The same message types are exchanged in most situations, but new API versions may be available. I’m not planning to cover the extremely old ZK-based rebalances & offsets found in 0.8 and below, but might get around to it one day. Regardless, Kafka-based consumer groups are the future.

Group Coordinators, Leaders and Protocols

First, just a tiny little bit of terminology because the names are a little confusing but very important:

  • A group coordinator is a broker responsible for coordinating rebalances between consumers in a consumer group. A broker may be a group coordinator for many groups. Only brokers that are acting as lead replicas for __consumer_offsets partitions are capable of acting as group coordinators. OffsetCommit and OffsetFetch requests should be sent to the group coordinator to store and retrieve partition offsets, respectively.
  • A group leader is a consumer that is responsible for making partition assignment decisions on behalf of all consumers in a group. The group coordinator determines which consumer will be the group leader and sends this information to consumers in a JoinGroup response. Partition assignment decisions are communicated to other consumers in the group when SyncGroup messages are exchanged with the Kafka broker.
  • A group protocol is used by consumers during rebalances by group leaders to make decisions about partition assignments and to encode & decode those decisions for distribution to other consumers in the group. Group coordinators negotiate a group protocol from among options provided by consumers and distribute the encoded assignment data from the group leader to other consumers, but otherwise treat group protocol data as something of a black box (i.e. encoding & decoding is all client-side).

A group coordinator has nothing to do with the cluster coordinator. The former is responsible for coordinating rebalances The latter is responsible for acting as a coordinator among all brokers.

A group leader has nothing to do with lead replicas. It is simply a consumer given responsibility for making decisions about partition assignment strategies.

A group protocol is implementation-specific and largely independent of the overall Kafka protocol.

What is the purpose of a consumer rebalance?

Hopefully if you’re reading this, you’re at least familiar with the notion of topics, partitions and consumer groups. Topics are subdivided into partitions; topics are something of of abstract thing in Kafka’s data model – a loose abstraction over some configuration and a collection of partitions. Consumers and producers read and write from partitions, respectively.

Consumer groups are used to coordinate “related” consumers. In a typical use case you’ll have one consumer group per service, with each instance of your service acting as a “member” of the group. Group membership is dynamic, so instances of your service may join and leave the group at will (or be forced out of the group due to expired sessions).

During a rebalance, members of the group act to decide which consumer is responsible for which subset of all available partitions for some topic(s). For example, you might have a topic with 30 partitions and 10 consumers; at the end of a rebalance, you might expect each consumer to be reading from 3 partitions. If you shut down 5 of those consumers, you might expect each consumer to have 6 partitions after a rebalance has completed.

The purpose of a consumer rebalance is to facilitate this kind of dynamic “partition assignment” behavior often expected by users, and to do so in a safe and predictable way.

Reasons for a rebalance

A rebalance is typically initiated by the group coordinator when a consumer requests to join a group or leaves a group. Consumers might leave a group explicitly via a LeaveGroup request, or implicitly because the group coordinator has not received a Heartbeat message from one or more consumers for a period of time leading to consumer session timeouts.

There are numerous other reasons too, but this is by far the most frequent cause of rebalances: services stopping and starting (gracefully or otherwise).

NOTE Though it’s not the central purpose of this document I really want to expand on this topic, because one of the first things Kafka newbies ask when they run into some kind of incident in production and see a ton of messages in their logs about rebalances is “why would Kafka be causing us to rebalance so much?” – and because Kafka clients are really where all the “smarts” occur, the answer is usually something on the client side. The call is coming from inside the house!

Consumer rebalances from 10,000ft

Conceptually rebalances aren’t actually all that hard, but the devil’s in the details. At a very high level, a consumer rebalance in a well-behaved client library looks something like this:

  1. A rebalance is initiated by some change in the group state on the group coordinator.
  2. Consumers start to acquiesce to ensure they’re in a known state before proceeding with the rebalance.
    • Stop fetching data.
    • Stop processing messages.
  3. If necessary, consumers (re)locate the group coordinator using a FindCoordinator request.
  4. If necessary, consumers commit the offsets of the last processed message per partition using an OffsetCommit request.
  5. Consumers stop sending heartbeats (if necessary), then send a JoinGroup request to the group coordinator. Consumers resume sending heartbeats after receiving a successful JoinGroup response with the new generation ID.
  6. Consumers send a SyncGroup request with the new generation ID.
    • The leader will send partition assignments based on the group protocol.
    • Consumers then use the response to determine assigned partitions.
  7. After the rebalance is complete, consumers continue to send periodic Heartbeat messages using the new generation ID for so long as the consumer is “alive”.
  8. Consumers send a OffsetFetch request to the group coordinator to get the last committed offsets for their newly assigned partitions.
  9. Consumers send Fetch requests and resume processing messages.

Again, nothing about the process is hard, but there is some subtlety, a few moving parts and knowledge that isn’t explicitly documented outside of the Java source code and maybe a KIP or two. All before you worry about the various edge cases that arise during error handling.

Rebalances as Double Barriers

If it’s not obvious from the above description, it’s worth noting that rebalances act as a kind of barrier. In fact, rebalances in some ways seem to be very similar to the ZooKeeper double barrier recipe:

  1. consumers ask to “enter” the barrier with a JoinGroup request.
  2. JoinGroup responses aren’t sent until all consumers have sent a JoinGroup request, preventing progress on the rebalance until we actually “enter” the barrier.
  3. consumers then send SyncGroup requests, which double as a request to “leave” the barrier.
  4. SyncGroup responses aren’t sent until all consumers have sent a SyncGroup request, preventing completion of the rebalance until we actually “leave” the barrier.

The nice part of this is that we (a consumer) can all safely OffsetCommit at the beginning of a rebalance knowing that other consumers can’t enter the “rebalance barrier” until we send our own JoinGroup request. Further, a new generation ID won’t be “published” until all consumers enter the rebalance barrier, so we can also reuse the current generation ID for the OffsetCommit request even though we know a rebalance is in progress.

Just to be clear, this is very “unofficial”: none of this “barrier” nonsense has been spelled out in documentation etc. and perhaps there are holes in my reasoning here, but I find it a useful abstraction for reasoning about the process.

A little more detail

Acquiesce

A good consumer client library should ensure that message processing has acquiesced before proceeding with a rebalance and that no new data is being fetched. Though it’s very important to stop message processing, it isn’t strictly necessary to stop fetching data – but you may be fetching data only to throw it away again after the join is complete if a partition gets assigned to another broker. One might wonder why it’s so important to stop message processing, so here’s a hypothetical:

Say your API doesn’t have do a great job of ensuring your user’s message processing code has completed processing the most recent batch of fetched messages for some partition. Your client library commits whatever offset happens to be the most recently processed (call this offset A) and proceeds with the rebalance while messages continue to be processed from A+1 onward.

Now, say the partition your user’s code is still processing is assigned to another consumer during the rebalance. Suddenly you’ve got two consumers attempting to process the same messages in parallel. At best this is inefficient and/or wasted effort; at worst this can cause serious problems.

To be clear, duplicate messages are always a risk due to consumer or broker failures. Still, it should be fairly unusual for the same message to be delivered to a well-behaved consumer twice when consumers & brokers are healthy and operating normally.

When it comes time to acquiesce, you can be sloppy with stopping fetches (if you’re careful), but please, please, please ensure that you stop processing messages. It will go a long way to saving your users from themselves.

Find the group coordinator

The group coordinator will be the primary point of contact for group and offset management operations performed by a client, so first we need to know where to find it.

To achieve this the client should send a FindCoordinator request to any known broker. The 0.10.2.1 Java client libs will choose the broker with the least number of in-flight requests but this is an implementation detail. It’s up to you as the library author to determine which broker should receive the request. Picking one at random is fine.

A successful response message will include the group coordinator’s broker ID, host and port which can be used in hopefully-obvious ways to send group and offset management requests to the group coordinator for the indicated group ID.

Don’t forget to handle errors. Of particular note is NOT_COORDINATOR, which you may see when the broker responsible for acting as a group coordinator changes from one broker to another. This can occur when you move __consumer_offsets partitions from one broker to another via replica reassignments, or due to partition leadership changes (e.g. broker outages).

ASIDE If anybody at Shopify is reading, I’d love a hand getting this fixed in Sarama. Super annoying bug.

How do brokers determine group coordinators?

The details of how a broker locates a group coordinator are largely unimportant when writing a client library, and there’s nothing to say that these semantics won’t change in the future. However, it’s sometimes helpful to know exactly what’s going on under the hood when debugging certain types of failure. So let’s talk briefly about how it works at the moment.

You’re probably familiar with the __consumer_offsets topic. These partitions are like any other Kafka topic in that you have a number of replicas, at least one of which acts as a partition leader.

When a broker services a FindCoordinator request, it simply chooses the __consumer_offsets partition based on the hash of the group ID, modulo the number of __consumer_offsets partitions. The broker acting as the lead replica of this partition will also act as the group coordinator for the group ID in the FindCoordinator request.

Again, client libraries should not need to know about this and it may or may not change at any point in the future. Don’t do anything home-grown here based on this knowledge, just trust the FindCoordinator response.

Commit offsets

Most client libraries will only do this automatically if auto.commit.enabled (or the library-specific equivalent) is set to true. If your library’s equivalent of auto.commit.enabled is set to false, your library should skip this step but it may be nice to provide user code one last chance to commit (via some kind of hook) before proceeding with the rebalance.

You most likely don’t want to do this on the initial group join – and I’m not sure you could anyway given you won’t have a generation ID or a member ID until after the JoinGroup response is received.

Anyway, this step is fairly straightforward. The client sends an OffsetCommit request to the group coordinator to persist the offsets of the messages most recently processed by the client in the __consumer_offsets partition associated with the consumer group.

Whatever happens, make sure you do it before you enter the rebalance barrier.

Join or rejoin the consumer group

Now we’re ready to join (or rejoin) the group and negotiate a partitioning strategy with other members of the group. This step is always mandatory, but the parameters may change subtly based on whether a consumer is joining the group for the first time vs. an existing member rejoining.

If rejoining, stop heartbeats before doing anything else

An existing consumer will be sending Heartbeats during its steady state, and the coming JoinGroup request will invalidate the current generation ID sent with heartbeats. This will lead to heartbeats suddenly failing. So if we’re rejoining the group we’ll want to stop sending heartbeats until the JoinGroup response is received.

Send the JoinGroup request

Next, the consumer will send a JoinGroup request to the group coordinator to ask to be admitted to the group, possibly initiating a rebalance if the consumer is a new member. Most of the fields in the JoinGroup request are more or less common sense, but a few trickier ones that are also not particularly well documented are:

  • The session timeout: the maximum amount of time the group coordinator will assume a consumer is “alive” without receiving a Heartbeat message.
  • The member ID should be set to null on the first JoinGroup sent by a consumer – the broker will provide a new member ID that the user can then use on subsequent requests. By re-sending the member ID the group coordinator is able to identify distinct consumer instances across rebalances.
  • The protocol type is always “consumer” for consumer groups. This field exists because the “group” coordination mechanism is used for several different reasons in Kafka. I won’t be covering those here.
  • A list of group protocols indicating the partition assignment “protocols” supported by the consumer and the protocol-specific metadata. As far as I can tell, this metadata can only be retrieved via DescribeGroups. The implementation details of the protocols are entirely opaque to the group coordinator: it only acts as a means of passing bytes around when consumers need to determine which consumer should subscribe to which partitions.

If the JoinGroup request succeeds the response sent by the group coordinator will include metadata essential for normal operation of the consumer:

  • The generation ID for the group: requests pertaining to the group (SyncGroup, Heartbeat, OffsetCommit and others) must include this ID so that the group coordinator can determine whether or not the request is for the current version/”generation” of the group.
  • The member ID of the sender as assigned by the group coordinator. This member ID should be parroted back to the group coordinator on all subsequent group management requests – including new JoinGroup requests, until an UNKNOWN_MEMBER_ID error is received.
  • The member ID of the group leader. (This may be the sender’s member ID!)
  • The group protocol chosen from the list of acceptable protocols proposed by all members of the group. The group coordinator decides on a protocol the consumers in the group can all handle. If it’s unable to determine a suitable protocol due to conflicting/non-overlapping proposals from group members the group coordinator will return an INCONSISTENT_GROUP_PROTOCOL error.
  • A group membership list, consisting of all consumers that are members with active sessions in the group (as at the returned generation ID).

A response will not be received by consumers until the group coordinator receives a JoinGroup request from all consumers participating in the rebalance or the rebalance somehow fails.

It might also be interesting to know that the group protocols list and the group membership list are somewhat related in that the protocol-specific metadata sent on the request by each member is returned in the membership list on the response. This means each member in the group has access to the protocol-specific metadata of all other members in the group. (This has no real implications at the Kafka protocol level, but it may allow interesting things to happen in client-side partition assignment decisions for a given protocol if you want to get creative.)

Finally, resume sending heartbeats

Once a successful JoinGroup response is received we can start (or resume) sending heartbeat requests using the new generation ID.

Sync group members and assign partitions

Next up the consumer needs to send a SyncGroup request to the group coordinator. This step is mandatory.

This is the first stage where the consumers in the group know who the group leader is. After determining itself to be the group leader, a consumer should send assignment decisions, with assignment data encoded according to the group protocol that was negotiated in the JoinGroup phase. Consumers that are not the group leader should send an empty assignment array.

When SyncGroup requests have been received from all(?) members of the group participating in the rebalance, the group coordinator will send the SyncGroup responses to all consumers. The response sent to each broker will incorporate the member-specific assignment data as encoded by the underlying group protocol.

This is covered elsewhere, but just to reiterate: the encoded assignment data is effectively opaque to the group coordinator: the group coordinator does not participate in partition assignment decisions, except to act as a convenient transport for assignment data “sent” by the group leader and received by consumers.

Fetch last committed offsets

Finally, in order to know where to start fetching data for newly assigned partitions, a consumer needs to retrieve the last committed offsets via an OffsetFetch request. This allows a consumer to fetch the offsets committed by the former owners of the assigned partitions – hopefully committed right before the most recent JoinGroup requests.

Keep an eye out for strange off-bye-one errors here: make sure you don’t lose or reprocess messages because you committed the wrong offset before the rebalance. If the previous consumer cleanly committed their offsets, fetching those same offsets should allow you to pick up exactly where the previous consumer left off. If this is not happening, it is likely a bug in the client library.

After the rebalance

I’d like to flesh these out more one day, but compared to the rest of the rebalance exchange these are a walk in the park & mostly self-contained. For now I’ll keep it short.

Continue sending periodic heartbeats

Consumer sessions will expire if heartbeats aren’t sent regularly, which will force a consumer out of the consumer group. So make sure you resume sending heartbeats after a rebalance.

Send fetch requests to read from partition leaders

You can’t process data you don’t have! Send fetch requests to partition leaders to pull down data.

Periodically commit offsets if necessary

If your library’s equivalent of auto.commit.enable is true, you’ll want to periodically commit offsets for your users.

Leave the group when the consumer is closing

Send a LeaveGroup request to the group coordinator when the consumer is closing to remove the departing consumer from the group and trigger a rebalance as soon as possible. Failing to do this may see partitions lag briefly since the group coordinator will need to wait for the consumer’s session to time out before permitting a rebalance to occur.

Watch for errors!

With all that out of the way, it’s worth noting that in the scheme of things it’s really not that hard to get the “happy path” rebalance logic working properly – though I’m sure having the steps spelled out will be a big help for some.

However, where most bugs I’ve seen crop up is in holes in error handling. Certain modes of failure are overlooked, operations that should be retried simply aren’t, untested/broken code paths get exercised due to a broker failure, etc.

As you’re writing your library, test it against a real, live Kafka server and be sure to do things like moving partition leadership around or otherwise moving partition assignments to force things to fail in interesting ways. Move __consumer_offsets partitions around to force group coordinator changes (this will also trigger rebalances). Allow the client to connect then black hole outbound packets using iptables, see how it copes; restore the connection again – does it recover as expected? Stuff like that.

Aside from thorough testing of your client library, my suggestion would be to walk through the Kafka source code for hints about what might go wrong and when. A good entry point into the code is KafkaApis, which handles all the request processing on the broker side.

Other advice

This advice isn’t specific to rebalances, but I’ve seen (and implemented!) a few patterns that while seeming to be reasonable in theory have proven to be … counter-productive in practice.

Think “streams” not “queues”

This piece of advice is as much for users of Kafka client libraries as for developers of the client libraries:

Avoid “windowing”, processing multiple messages in parallel and other complicated offset management strategies that require you to hack in the illusion of message-level acks. You’ve got better options that won’t leave you fighting Kafka’s streaming model. For example, you could:

  • Increase partition counts & deploy more instances of your service.
  • Increase partition counts & run roughly a thread per partition.
  • Revisit your overall partitioning strategy (e.g. randomized or round-robin).

It’s not always possible, but in some situations you may also be able to get away with a certain amount of pipelining. For example, in Java you can get a huge amount of leverage out of libraries like the LMAX Disruptor. Using one or more disruptor rings, you can trivially parallelize certain steps of your (in-process) message processing pipeline, allowing you to maximize throughput without losing ordering guarantees at the partition level. IMO such a strategy is much more in line with Kafka’s streaming model.

Similar strategies are possible in other languages, just be wary of violating ordering guarantees.

Use partitions as a natural leverage point for parallelism

At least one NodeJS client library I played with refused to fetch more data from brokers until you had finished processing all data received for all partitions from the last successful fetch. This essentially meant that a particularly “hot” partition could prevent processing all partitions.

If you can design your library to ensure that each partition can be processed entirely independently of one another, your users may have an easier time squeezing more throughput out of Kafka.

Retriable vs. non-retriable errors

See the table of protocol errors to determine failures that are implicitly retriable. Other system-level failures might also be retriable – socket timeouts, for example. Typically users of a library expect these to be handled transparently. Either way, it might be nice to somehow surface hooks or metrics so that folks can hook them up to their monitoring tool of choice.