Richard Kallos


Case Study: Using CRDTs to improve a distributed system

:: crdt, distsys, programming

At a previous job, I was lucky enough to be faced with the challenge of improving a distributed service’s robustness to network faults. In this post, I share how I attacked the problem using Conflict-free/Convergent Replicated Data Types (CRDTs).

A Chronicle of Counters

The service was responsible for managing the state of a set of counters. These counters had lifetimes and rules attached to them, dictating when they could be incremented. Many counters needed to be incremented steadily over time. Other counters needed to be incremented slowly at the beginning, then more quickly toward the end of the counter’s life. The service didn’t care what the counters stood for. The service’s job was to inform clients when counters could be incremented, and to acknowledge and remember when clients did increment counters.

The service supported four types of requests:

  1. Clients send (ASK, abc, 10) requests, asking the service if counter abc can be incremented by 10. The server responds with YES or NO.
  2. Clients send (TELL, abc, 5) requests, informing the service that counter abc should be incremented by 5.
  3. Replicas broadcast (ANNOUNCE, abc, 5) messages, informing other replicas that the counter abc has been incremented by 5.
  4. Sometime between one hour and the next, e.g. 10:00 and 11:00, a periodic job sends (STATE, 09:59, abc, 3) to every replica, stating that the state of counter abc at time 09:59 was 3. The service does its best to track the values of counters, but for reasons I won’t get in to, the service can be wrong about the states of counters. STATE requests are a source of truth. This will be very important later.

Some other facts about the system:

Before I worked on this service:

My task was to change the service so that it could be scaled out without increasing the likelihood of entering a bad state. To achieve this, I sought to:

Data Meets Math

One of the first things I noticed about the ANNOUNCE protocol was that it was very sensitive to messages being dropped. I changed the data in ANNOUNCE messages from being “everything a replica learned since the previous broadcast” to “everything a replica knows about the state of the system”. The ANNOUNCE messages got much larger in size, but I strongly believe the properties of these larger messages were worth the cost.

Once applying ANNOUNCE messages to local state became idempotent, it became safe to retransmit messages without fear of introducing inconsistencies.

Once applying ANNOUNCE messages to local state became commutative and monotonic, it became almost harmless for the network to drop or reorder messages. ANNOUNCE messages contained the full knowledge of some replica, so it became easy to determine which ANNOUNCE message was the most recent; the greatest/maximum message according to our partial ordering. As long as ANNOUNCE messages were delivered in a timely manner, it did not matter if individual messages were dropped or reordered.

Once ANNOUNCE messages to local state became transitive, the service no longer relied on direct connections between replicas. If replica A ANNOUNCE’d at replica B, then replica B ANNOUNCE’d at replica C, the state would be the same as if replica A had also ANNOUNCE’d at replica C. Transitivity in ANNOUNCE messages relaxed the connectivity graph requirement from needing to be fully connected to needing to be strongly connected.

Problems in Paradise

Idempotency, transitivity, commutativity and monotonicity are all excellent properties to have in a distributed system protocol. However, there are obstacles that prevent CRDTs being used in production everywhere. I encountered two specific issues:

  1. Counter state was not always monotonic. As I mentioned earlier, STATE messages were a source of truth that often declared that a counter’s state was lower than the service expected.
  2. How could I shrink a message that was supposed to be monotonic?

Both problems were solved by the fact that STATE messages corresponded to some ground truth rooted at a specific point in the past.

STATE messages always reflected the true state of the system at the end of some hour. By changing my (Replica, Counter, State) triples into (Replica, Counter, Hour, State) tuples, and relying on the fact that STATE messages are always delivered to all replicas, a replica could stop sending tuples where Hour was before the latest STATE message. STATE messages helped put a practical bound on the size of ANNOUNCE messages.

Splitting counter state into hourly buckets also helped to account for nonmonotonic updates from STATE messages; While counter values are monotonically non-decreasing in ANNOUNCE messages, the Hour element of STATE messages were also monotonically non-decreasing. and I was able to change my least upper bound operation into a lexicographic least upper bound, which I feel is best explained by example: (0, 5) < (0, 500) < (1, 0). Applied to STATE messages, (08:59, 5000) < (09:59, 3).

Mo’ computers? No problem.

With that final change, I was finished. After writing, testing, and deploying the code to production, I was delighted to see that when a single instance of the service went down, whether due to a network partition, or perhaps a deployment, the rest of the cluster was healthy and answering YES to ASK requests.

I had a lot of fun working on that problem. I also had a really great time researching CRDTs and eventual consistency, implementing two different distributed system simulators, and eventually implementing the changes to the protocol. I’m hoping to write another post in the near future showcasing my favorite papers around this topic.

For the sake of brevity, many fascinating details about the service and protocol have been left out. I hope nevertheless that you remember and possibly consider CRDTs when you find yourself tasked with making some distributed system more robust.


All posts
Tag: programming