NOTE This is a work in progress. Unlike your typical blog post, this is going to be something of a “living document” while I iron out the rough edges.
DISCLAIMER I’m filling in gaps in my own knowledge as I go. Given that & the length of this document, it’s likely I’ve gotten plenty wrong. So don’t hesitate to drop me a line if I’ve gone and goofed something up.
- Why this document?
- Group Coordinators, Leaders and Protocols
- What is the purpose of a consumer rebalance?
- Reasons for a rebalance
- Consumer rebalances from 10,000ft
- A little more detail
- After the rebalance
- Watch for errors!
- Other advice
Why this document?
This document explains how Kafka consumer rebalances work in excruciating detail. It’s not intended to be totally comprehensive nor too prescriptive, but hopefully useful enough for folks working on client libraries to have a solid chance of implementing consumer rebalances properly.
Why should I believe your ravings?
I’ve both written services that use Kafka and helped run various Kafka clusters in production for a few years. Other folks I’ve worked with in the past (and presently worth with in the present) know this Kafka thing much better than I do, but sadly – this being my blog and all – you’re stuck with me.
I’ve also spent a ton of time buried in Kafka client libraries across multiple languages for reasons I’m trying to forget. I’ve come away from it all a little frustrated – primarily because the quality of client libraries has been a bit lacking. This is the point you might tell me there’s librdkafka, but dealing with native extensions & language bindings can be a hassle in its own right. For better or worse, I tend to prefer rock solid libraries written in my language of choice (JVM/CLR excepted, I suppose).
Anyway, the quality issue is not solely at the feet of Kafka client library authors. These folks are doing some great work so that folks like you and I don’t have to. The real issue I think is that the Kafka protocol puts a lot of responsibility on the client and though the specific messages exchanged between clients and brokers are fairly well documented, there’s no solid specification of how a client should behave during more complicated exchanges with brokers. Consumer rebalances are one such example where I think a solid specification would go far.
So this is a specification, then?
Well, no. Though I’d love a thorough specification, this document is not one, nor is it even an official piece of Kafka(tm) documentation. Heck, I’m not even a Kafka contributor. But I have spent enough time looking at Kafka source code and third party Kafka client library source code in various languages (usually during or following production incidents) to be able to navigate my way around this stuff maybe a little better than the average bear. Regardless, I hope it serves as a source of useful information for client library authors who might want/need a resource that isn’t somebody else’s source code.
I also hope that it’s useful for folks building services on top of Kafka who maybe just want to get a bit of a better understanding of what’s supposed to happen during a consumer rebalance.
If nothing else I’m glad this document exists for me, because I guarantee you I’ll just as quickly write something down here as forget it.
I’m assuming general familiarity with terminology that is typically well-understood by Kafka users: brokers, topics, partitions, groups, etc. I’ll cover some slightly less common terminology specific to rebalances in this document.
It’s probably worth noting that most of my knowledge comes from older versions of Kafka (0.8.x and 0.10.x), but very little should change in the overall mechanics in newer versions. My understanding is that in spite of sweeping internal changes in brokers and clients, at the protocol level there are small changes between 0.10 and say 2.0. The same message types are exchanged in most situations, but new API versions may be available. I’m not planning to cover the extremely old ZK-based rebalances & offsets found in 0.8 and below, but might get around to it one day. Regardless, Kafka-based consumer groups are the future.
Group Coordinators, Leaders and Protocols
First, just a tiny little bit of terminology because the names are a little confusing but very important:
- A group coordinator is a broker responsible for coordinating
rebalances between consumers in a consumer group. A broker may be a group
coordinator for many groups. Only brokers that are acting as lead replicas
__consumer_offsetspartitions are capable of acting as group coordinators. OffsetCommit and OffsetFetch requests should be sent to the group coordinator to store and retrieve partition offsets, respectively.
- A group leader is a consumer that is responsible for making partition assignment decisions on behalf of all consumers in a group. The group coordinator determines which consumer will be the group leader and sends this information to consumers in a JoinGroup response. Partition assignment decisions are communicated to other consumers in the group when SyncGroup messages are exchanged with the Kafka broker.
- A group protocol is used by consumers during rebalances by group leaders to make decisions about partition assignments and to encode & decode those decisions for distribution to other consumers in the group. Group coordinators negotiate a group protocol from among options provided by consumers and distribute the encoded assignment data from the group leader to other consumers, but otherwise treat group protocol data as something of a black box (i.e. encoding & decoding is all client-side).
A group coordinator has nothing to do with the cluster coordinator. The former is responsible for coordinating rebalances The latter is responsible for acting as a coordinator among all brokers.
A group leader has nothing to do with lead replicas. It is simply a consumer given responsibility for making decisions about partition assignment strategies.
A group protocol is implementation-specific and largely independent of the overall Kafka protocol.
What is the purpose of a consumer rebalance?
Hopefully if you’re reading this, you’re at least familiar with the notion of topics, partitions and consumer groups. Topics are subdivided into partitions; topics are something of of abstract thing in Kafka’s data model – a loose abstraction over some configuration and a collection of partitions. Consumers and producers read and write from partitions, respectively.
Consumer groups are used to coordinate “related” consumers. In a typical use case you’ll have one consumer group per service, with each instance of your service acting as a “member” of the group. Group membership is dynamic, so instances of your service may join and leave the group at will (or be forced out of the group due to expired sessions).
During a rebalance, members of the group act to decide which consumer is responsible for which subset of all available partitions for some topic(s). For example, you might have a topic with 30 partitions and 10 consumers; at the end of a rebalance, you might expect each consumer to be reading from 3 partitions. If you shut down 5 of those consumers, you might expect each consumer to have 6 partitions after a rebalance has completed.
The purpose of a consumer rebalance is to facilitate this kind of dynamic “partition assignment” behavior often expected by users, and to do so in a safe and predictable way.
Reasons for a rebalance
A rebalance is typically initiated by the group coordinator when a consumer requests to join a group or leaves a group. Consumers might leave a group explicitly via a LeaveGroup request, or implicitly because the group coordinator has not received a Heartbeat message from one or more consumers for a period of time leading to consumer session timeouts.
There are numerous other reasons too, but this is by far the most frequent cause of rebalances: services stopping and starting (gracefully or otherwise).
NOTE Though it’s not the central purpose of this document I really want to expand on this topic, because one of the first things Kafka newbies ask when they run into some kind of incident in production and see a ton of messages in their logs about rebalances is “why would Kafka be causing us to rebalance so much?” – and because Kafka clients are really where all the “smarts” occur, the answer is usually something on the client side. The call is coming from inside the house!
Consumer rebalances from 10,000ft
Conceptually rebalances aren’t actually all that hard, but the devil’s in the details. At a very high level, a consumer rebalance in a well-behaved client library looks something like this:
- A rebalance is initiated by some change in the group state on the group coordinator.
- Consumers start to acquiesce to ensure they’re in a known state before
proceeding with the rebalance.
- Stop fetching data.
- Stop processing messages.
- If necessary, consumers (re)locate the group coordinator using a
- If necessary, consumers commit the offsets of the last processed message
per partition using an
- Consumers stop sending heartbeats (if necessary), then send a
JoinGrouprequest to the group coordinator. Consumers resume sending heartbeats after receiving a successful
JoinGroupresponse with the new generation ID.
- Consumers send a
SyncGrouprequest with the new generation ID.
- The leader will send partition assignments based on the group protocol.
- Consumers then use the response to determine assigned partitions.
- After the rebalance is complete, consumers continue to send periodic
Heartbeatmessages using the new generation ID for so long as the consumer is “alive”.
- Consumers send a
OffsetFetchrequest to the group coordinator to get the last committed offsets for their newly assigned partitions.
- Consumers send
Fetchrequests and resume processing messages.
Again, nothing about the process is hard, but there is some subtlety, a few moving parts and knowledge that isn’t explicitly documented outside of the Java source code and maybe a KIP or two. All before you worry about the various edge cases that arise during error handling.
Rebalances as Double Barriers
If it’s not obvious from the above description, it’s worth noting that rebalances act as a kind of barrier. In fact, rebalances in some ways seem to be very similar to the ZooKeeper double barrier recipe:
- consumers ask to “enter” the barrier with a
JoinGroupresponses aren’t sent until all consumers have sent a
JoinGrouprequest, preventing progress on the rebalance until we actually “enter” the barrier.
- consumers then send
SyncGrouprequests, which double as a request to “leave” the barrier.
SyncGroupresponses aren’t sent until all consumers have sent a
SyncGrouprequest, preventing completion of the rebalance until we actually “leave” the barrier.
The nice part of this is that we (a consumer) can all safely
the beginning of a rebalance knowing that other consumers can’t enter the
“rebalance barrier” until we send our own
JoinGroup request. Further, a new
generation ID won’t be “published” until all consumers enter the rebalance
barrier, so we can also reuse the current generation ID for the
request even though we know a rebalance is in progress.
Just to be clear, this is very “unofficial”: none of this “barrier” nonsense has been spelled out in documentation etc. and perhaps there are holes in my reasoning here, but I find it a useful abstraction for reasoning about the process.
A little more detail
A good consumer client library should ensure that message processing has acquiesced before proceeding with a rebalance and that no new data is being fetched. Though it’s very important to stop message processing, it isn’t strictly necessary to stop fetching data – but you may be fetching data only to throw it away again after the join is complete if a partition gets assigned to another broker. One might wonder why it’s so important to stop message processing, so here’s a hypothetical:
Say your API doesn’t have do a great job of ensuring your user’s message processing code has completed processing the most recent batch of fetched messages for some partition. Your client library commits whatever offset happens to be the most recently processed (call this offset A) and proceeds with the rebalance while messages continue to be processed from A+1 onward.
Now, say the partition your user’s code is still processing is assigned to another consumer during the rebalance. Suddenly you’ve got two consumers attempting to process the same messages in parallel. At best this is inefficient and/or wasted effort; at worst this can cause serious problems.
To be clear, duplicate messages are always a risk due to consumer or broker failures. Still, it should be fairly unusual for the same message to be delivered to a well-behaved consumer twice when consumers & brokers are healthy and operating normally.
When it comes time to acquiesce, you can be sloppy with stopping fetches (if you’re careful), but please, please, please ensure that you stop processing messages. It will go a long way to saving your users from themselves.
Find the group coordinator
The group coordinator will be the primary point of contact for group and offset management operations performed by a client, so first we need to know where to find it.
To achieve this the client should send a FindCoordinator request to any known broker. The 0.10.2.1 Java client libs will choose the broker with the least number of in-flight requests but this is an implementation detail. It’s up to you as the library author to determine which broker should receive the request. Picking one at random is fine.
A successful response message will include the group coordinator’s broker ID, host and port which can be used in hopefully-obvious ways to send group and offset management requests to the group coordinator for the indicated group ID.
Don’t forget to handle errors. Of particular note is
which you may see when the broker responsible for acting as a group
coordinator changes from one broker to another. This can occur when you
__consumer_offsets partitions from one broker to another via
replica reassignments, or due to partition leadership changes (e.g.
ASIDE If anybody at Shopify is reading, I’d love a hand getting this fixed in Sarama. Super annoying bug.
How do brokers determine group coordinators?
The details of how a broker locates a group coordinator are largely unimportant when writing a client library, and there’s nothing to say that these semantics won’t change in the future. However, it’s sometimes helpful to know exactly what’s going on under the hood when debugging certain types of failure. So let’s talk briefly about how it works at the moment.
You’re probably familiar with the
__consumer_offsets topic. These partitions
are like any other Kafka topic in that you have a number of replicas, at least
one of which acts as a partition leader.
When a broker services a
FindCoordinator request, it simply chooses the
__consumer_offsets partition based on the hash of the group ID, modulo the
The broker acting as the lead replica of this partition will also act as the
group coordinator for the group ID in the
Again, client libraries should not need to know about this and it may or
may not change at any point in the future. Don’t do anything home-grown here
based on this knowledge, just trust the
Most client libraries will only do this automatically if
(or the library-specific equivalent) is set to true. If your library’s
auto.commit.enabled is set to
false, your library should
skip this step but it may be nice to provide user code one last chance
to commit (via some kind of hook) before proceeding with the rebalance.
You most likely don’t want to do this on the initial group join – and I’m not
sure you could anyway given you won’t have a generation ID or a member ID
until after the
JoinGroup response is received.
Anyway, this step is fairly straightforward. The client sends an
request to the group coordinator to persist the offsets of the messages most
recently processed by the client in the
associated with the consumer group.
Whatever happens, make sure you do it before you enter the rebalance barrier.
Join or rejoin the consumer group
Now we’re ready to join (or rejoin) the group and negotiate a partitioning strategy with other members of the group. This step is always mandatory, but the parameters may change subtly based on whether a consumer is joining the group for the first time vs. an existing member rejoining.
If rejoining, stop heartbeats before doing anything else
An existing consumer will be sending
during its steady state, and the coming
JoinGroup request will invalidate
the current generation ID sent with heartbeats. This will lead to heartbeats
suddenly failing. So if we’re rejoining the group we’ll want to stop sending
heartbeats until the
JoinGroup response is received.
Send the JoinGroup request
Next, the consumer will send a
to the group coordinator to ask to be admitted to the group, possibly
initiating a rebalance if the consumer is a new member. Most
of the fields in the
JoinGroup request are more or less common sense, but a
few trickier ones that are also not particularly well documented are:
- The session timeout: the maximum amount of time the group coordinator
will assume a consumer is “alive” without receiving a
- The member ID should be set to
nullon the first
JoinGroupsent by a consumer – the broker will provide a new member ID that the user can then use on subsequent requests. By re-sending the member ID the group coordinator is able to identify distinct consumer instances across rebalances.
- The protocol type is always “consumer” for consumer groups. This field exists because the “group” coordination mechanism is used for several different reasons in Kafka. I won’t be covering those here.
- A list of group protocols indicating the partition assignment “protocols” supported by the consumer and the protocol-specific metadata. As far as I can tell, this metadata can only be retrieved via DescribeGroups. The implementation details of the protocols are entirely opaque to the group coordinator: it only acts as a means of passing bytes around when consumers need to determine which consumer should subscribe to which partitions.
JoinGroup request succeeds the response sent by the group coordinator
will include metadata essential for normal operation of the consumer:
- The generation ID for the group: requests pertaining to the group (SyncGroup, Heartbeat, OffsetCommit and others) must include this ID so that the group coordinator can determine whether or not the request is for the current version/”generation” of the group.
- The member ID of the sender as assigned by the group coordinator.
This member ID should be parroted back to the group coordinator on all
subsequent group management requests – including new
JoinGrouprequests, until an
UNKNOWN_MEMBER_IDerror is received.
- The member ID of the group leader. (This may be the sender’s member ID!)
- The group protocol chosen from the list of acceptable protocols proposed
by all members of the group. The group coordinator decides on a protocol
the consumers in the group can all handle. If it’s unable to determine a
suitable protocol due to conflicting/non-overlapping proposals from group
members the group coordinator will return an
- A group membership list, consisting of all consumers that are members with active sessions in the group (as at the returned generation ID).
A response will not be received by consumers until the group coordinator
JoinGroup request from all consumers participating in the
rebalance or the rebalance somehow fails.
It might also be interesting to know that the group protocols list and the group membership list are somewhat related in that the protocol-specific metadata sent on the request by each member is returned in the membership list on the response. This means each member in the group has access to the protocol-specific metadata of all other members in the group. (This has no real implications at the Kafka protocol level, but it may allow interesting things to happen in client-side partition assignment decisions for a given protocol if you want to get creative.)
Finally, resume sending heartbeats
Once a successful
JoinGroup response is received we can start (or resume)
sending heartbeat requests using the new generation ID.
Sync group members and assign partitions
Next up the consumer needs to send a SyncGroup request to the group coordinator. This step is mandatory.
This is the first stage where the consumers in the group know who the group
leader is. After determining itself to be the group leader, a consumer should
send assignment decisions, with assignment data encoded according to the
group protocol that was negotiated in the
JoinGroup phase. Consumers that are not the group leader should send an
empty assignment array.
SyncGroup requests have been received from all(?) members of the group
participating in the rebalance, the group coordinator will send the
responses to all consumers. The response sent to each broker will incorporate
the member-specific assignment data as encoded by the underlying group
This is covered elsewhere, but just to reiterate: the encoded assignment data is effectively opaque to the group coordinator: the group coordinator does not participate in partition assignment decisions, except to act as a convenient transport for assignment data “sent” by the group leader and received by consumers.
Fetch last committed offsets
Finally, in order to know where to start fetching data for newly assigned
partitions, a consumer needs to retrieve the last committed offsets via an
request. This allows a consumer to fetch the offsets committed by the former
owners of the assigned partitions – hopefully committed right before the most
Keep an eye out for strange off-bye-one errors here: make sure you don’t lose or reprocess messages because you committed the wrong offset before the rebalance. If the previous consumer cleanly committed their offsets, fetching those same offsets should allow you to pick up exactly where the previous consumer left off. If this is not happening, it is likely a bug in the client library.
After the rebalance
I’d like to flesh these out more one day, but compared to the rest of the rebalance exchange these are a walk in the park & mostly self-contained. For now I’ll keep it short.
Continue sending periodic heartbeats
Consumer sessions will expire if heartbeats aren’t sent regularly, which will force a consumer out of the consumer group. So make sure you resume sending heartbeats after a rebalance.
Send fetch requests to read from partition leaders
You can’t process data you don’t have! Send fetch requests to partition leaders to pull down data.
Periodically commit offsets if necessary
If your library’s equivalent of
auto.commit.enable is true, you’ll want
to periodically commit offsets for your users.
Leave the group when the consumer is closing
Send a LeaveGroup request to the group coordinator when the consumer is closing to remove the departing consumer from the group and trigger a rebalance as soon as possible. Failing to do this may see partitions lag briefly since the group coordinator will need to wait for the consumer’s session to time out before permitting a rebalance to occur.
Watch for errors!
With all that out of the way, it’s worth noting that in the scheme of things it’s really not that hard to get the “happy path” rebalance logic working properly – though I’m sure having the steps spelled out will be a big help for some.
However, where most bugs I’ve seen crop up is in holes in error handling. Certain modes of failure are overlooked, operations that should be retried simply aren’t, untested/broken code paths get exercised due to a broker failure, etc.
As you’re writing your library, test it against a real, live Kafka server
and be sure to do things like moving partition leadership around or
otherwise moving partition assignments to force things to fail in interesting
__consumer_offsets partitions around to force group coordinator
changes (this will also trigger rebalances). Allow the client to connect
then black hole outbound packets using iptables, see how it copes; restore
the connection again – does it recover as expected? Stuff like that.
Aside from thorough testing of your client library, my suggestion would be to walk through the Kafka source code for hints about what might go wrong and when. A good entry point into the code is KafkaApis, which handles all the request processing on the broker side.
This advice isn’t specific to rebalances, but I’ve seen (and implemented!) a few patterns that while seeming to be reasonable in theory have proven to be … counter-productive in practice.
Think “streams” not “queues”
This piece of advice is as much for users of Kafka client libraries as for developers of the client libraries:
Avoid “windowing”, processing multiple messages in parallel and other complicated offset management strategies that require you to hack in the illusion of message-level acks. You’ve got better options that won’t leave you fighting Kafka’s streaming model. For example, you could:
- Increase partition counts & deploy more instances of your service.
- Increase partition counts & run roughly a thread per partition.
- Revisit your overall partitioning strategy (e.g. randomized or round-robin).
It’s not always possible, but in some situations you may also be able to get away with a certain amount of pipelining. For example, in Java you can get a huge amount of leverage out of libraries like the LMAX Disruptor. Using one or more disruptor rings, you can trivially parallelize certain steps of your (in-process) message processing pipeline, allowing you to maximize throughput without losing ordering guarantees at the partition level. IMO such a strategy is much more in line with Kafka’s streaming model.
Similar strategies are possible in other languages, just be wary of violating ordering guarantees.
Use partitions as a natural leverage point for parallelism
At least one NodeJS client library I played with refused to fetch more data from brokers until you had finished processing all data received for all partitions from the last successful fetch. This essentially meant that a particularly “hot” partition could prevent processing all partitions.
If you can design your library to ensure that each partition can be processed entirely independently of one another, your users may have an easier time squeezing more throughput out of Kafka.
Retriable vs. non-retriable errors
See the table of protocol errors to determine failures that are implicitly retriable. Other system-level failures might also be retriable – socket timeouts, for example. Typically users of a library expect these to be handled transparently. Either way, it might be nice to somehow surface hooks or metrics so that folks can hook them up to their monitoring tool of choice.