How we extended our Raft protocol and simplified its implementation.
Redpanda uses the well-known Raft distributed consensus algorithm as a foundation for the distributed log. In principle, the Raft protocol requires that the majority of participants agree on a decision before it is accepted as the final outcome. For Redpanda it means that an entry is successfully replicated if the majority of the nodes persisted that entry with the same term at the same place in the log.
Redpanda started with a basic implementation of Raft protocol. Initially it did not even support reconfigurations! Over the course of Redpanda’s platform development we encountered many challenges that ultimately forced us to extend the original Raft protocol so that we could make Redpanda scalable, reliable, and fast. Some of the custom extensions that exist in Redpanda Raft are:
- priority based voting
- learners support
- out of band heartbeat generation
- flush debouncing
- support for Apache KafkaⓇ's
Any one of those mechanisms could be its own article, but in this one I’m going to focus on how we handle Kafka’s
ACKS producer property.
Redpanda needs to deal with different consistency guarantees as specified by the Kafka protocol. Clients may use the
ACKS property to allow users to trade consistency and durability for performance. The Kafka protocol specifies three settings for the
-1(all). Those settings control when clients are sent an acknowledgement of the success of their produce requests:
ACKS=allmeans all in-sync replicas must store the request before returning to the client.
ACKS=1means only a leader has to store the request before returning to the client.
ACKS=0means the client considers the request successfully delivered immediately after the message is put onto the wire.
ACKS in Redpanda
To achieve compatibility with Kafka, Redpanda supports all Kafka’s
ACKS levels (i.e.
ACKS=all works the same as the original version of the Raft protocol. The replication call finishes when the requested entry is safely replicated to the majority of the replicas and is flushed (
ACKS=0 replication finishes as soon as the write to the leader disk is finished but before it is flushed.
The entries' visibility for readers is also influenced by the replication consistency levels. For entries replicated with
ACKS=-1, Redpanda makes them visible as soon as they are
fsync’ed on the majority of nodes. On the other hand, entries with the
ACKS=0,1 are made visible as soon as they are written to the follower's disk, but they are not required to be
How Raft replication works in Redpanda
In the previous implementation, handling of
ACKS=0,1 used different approaches and code paths.
ACKS=-1 replication requests went through the batching on the leader and then were dispatched to the followers. The Redpanda Raft implementation writes data to the leader disk and then dispatches append entries requests to the followers in parallel with the leader disk flush. In this case, the Raft implementation may have multiple requests in flight per follower (currently, the number of requests pending is limited by the per-follower semaphore).
ACKS=0,1 replication was implemented differently. Control was returned to the caller immediately after the data was written to the underlying log. On the next heartbeat, the follower was recognized to be behind the leader and was recovered. The recovery mechanism read a chunk of data and sent it to the follower in a loop (a single request at a time).
Why simplify Raft replication?
There were many reasons to simplify the Raft logic:
- Simpler code and a unified approach makes the Raft implementation easier to maintain and optimize.
- Without the simplification, mixing different consistency modes with high throughput may lead to spikes in latency.
- The simplified and unified replication mechanism makes it easier to implement future Raft features like in-memory writes and remote leaders (where the leader for a partition is not co-located with data).
The simplification approach
Our simplification solution assumes that all supported
ACKS values will be handled in the same way, with the use of
replicate_entries_stm. This approach has several advantages over the previous approach to the
ACKS=1,0 consistency level. Namely, this approach allows us to simplify and unify replication code, control back pressure and memory usage, and limit replication latency.
The simplified implementation of
ACKS=-1 replication is robust and well-tested. No large code changes are required to use
replicate_stm to handle
ACKS=0,1 replication. The way that the
append_entries requests, dispatching, and leader log writes will allow us to handle those requests in parallel in a future release.
replicate_stm prevents us from creating the
append_entries requests twice. Batches coming from
replicate_batcher are not written to the disk and then read again, but instead are shared to generate a leader disk write and follower
append_entries requests. This approach significantly reduces the
storage::batch_cache pressure and may also reduce disk usage. (In some clusters with limited memory cache, the hit ratio is not always 1.0, even if there are no reads other than the reads issued by
In the previous
ACKS=-1 replication implementation, the backpressure was effectively propagated from the followers to the
raft::replicate caller (the Kafka produce request handler). The backpressure propagation was based on the per-follower semaphore since the RPC layer lacked a backpressure propagation mechanism.
replicate_entries_stm was unable to acquire follower dispatch units, it would wait, holding the
replicate_batcher memory reservation lock so that it did not accept writes. When the
replicate_batcher memory reservation semaphore was exhausted, it prevented the first stage of
replicate_stages from finishing, preventing more requests from being processed.
ACKS=0,1 mechanism lacked backpressure propagation. The leader could be a large and arbitrary number of entries ahead of followers. This could present a problem since the user had an impression that data was replicated when, in reality, the data was only stored on the leader.
In the simplified solution, we handle backpressure in the same way for all
ACKS settings. This way, there will be a guarantee that followers can always keep up with the leader.
Memory usage control is handled in the same way it was in the previous implementation, where we will not finish the first stage of replication when the
replicate_batcher memory semaphore is exhausted. This will prevent the Kafka layer from buffering an indefinite number of batches in pending replicate requests.
Replicate batcher memory units are released after the request is handed off to the socket, so that all the user space buffers can be freed. In the future, we may consider a per-shard semaphore that controls the total amount of memory used by all the replicate batchers.
Recovery STM coordination - finishing recovery
recovery_stm is started every time the leader identifies the follower as being behind. Since
recovery_stm was used for both recovering the follower (when it was actually behind) and delivering follower data replicated with the
recovery_stm implementation contains some optimization to minimize the
ACKS=0,1 workload latency.
There are two main optimizations:
- not stopping the recovery when the last replicate request
ACKSare set to 0 or 1
- triggering the next dispatch loop immediately after the leader disk append
These optimizations make the handling of the recovering follower more complex in other scenarios, such as
Since there was no coordination between the two paths that the follower append entry requests were sent from (i.e.
recovery_stm), those requests could be reordered and cause log truncations and redeliveries, which led to increased latency.
In the simplified solution, since we are no longer going to rely on the
recovery_stm to deliver messages to the follower in normal conditions, the complicated handling of recovery finishing logic is no longer needed. When the follower is up to date with the leader,
recovery_stm is simply not running.
The only situation that will require coordination is when the follower is no longer behind the leader, but
replicate_stm skips dispatching requests to the follower as it is still recovering. This may lead to a situation in which the next leader append will happen before the
recovery_stm receives the follower response for the last request (i.e. it cannot finish recovery because the log end has already advanced). This case may be handled by coordinating dispatching follower requests from
replicate_stm. We can introduce the last-sent offset to
raft::follower_index_metadata to check whether recovery should finish, instead of the check being based only on the follower response. This way, the
replicate_stm will not skip sending a request to the follower if it is the next request the follower is supposed to receive.
On the other hand, the
recovery_stm will not try to send the same request again, but will finish, assuming the response will be successful.
Implementing Raft simplification
To implement the simplified solution, we pass the
consistency_level parameter to the
raft::replicate_batcher::replicate method. The
raft::replicate_entries_stm::apply_on_leader method will return immediately after the successful
leader_append. The full quorum replication round will be finished after the
raft::replicate_entries_stm::wait_for_majority() returns. The replicate batcher, based on the cached request
consistency_level, will decide when to release the request, either after
apply_on_leader finishes, or
As in the previous implementation, the
raft::replicate_entries_stm will release batcher memory units after dispatching the follower request. This will allow propagating backpressure for both consistency levels.
Redpanda Raft implementation is optimized to achieve the best results in all scenarios. We introduced unique solutions (flush coalescing, follower back pressure propagation, and out-of-band heartbeats) to achieve the best possible performance in all the circumstances. Unifying code responsible for Raft replication allows us to further optimize replication, as all the optimizations are automatically applied to all supported consistency levels.
If you have feedback on the simplification process, or would like to contribute to ongoing improvements in Redpanda, please chime in on our Github discussions here. We also welcome you to join our Slack Community to talk directly with me!
Let's keep in touch
Subscribe and never miss another blog post, announcement, or community event. We hate spam and will never sell your contact information.