Case Study: Using CRDTs to improve a distributed system
At a previous job, I was lucky enough to be faced with the challenge of improving a distributed service’s robustness to network faults. In this post, I share how I attacked the problem using Conflict-free/Convergent Replicated Data Types (CRDTs).
A Chronicle of Counters
The service was responsible for managing the state of a set of counters. These counters had lifetimes and rules attached to them, dictating when they could be incremented. Many counters needed to be incremented steadily over time. Other counters needed to be incremented slowly at the beginning, then more quickly toward the end of the counter’s life. The service didn’t care what the counters stood for. The service’s job was to inform clients when counters could be incremented, and to acknowledge and remember when clients did increment counters.
The service supported four types of requests:
- Clients send (ASK, abc, 10) requests, asking the service if counter abc can be incremented by 10. The server responds with YES or NO.
- Clients send (TELL, abc, 5) requests, informing the service that counter abc should be incremented by 5.
- Replicas broadcast (ANNOUNCE, abc, 5) messages, informing other replicas that the counter abc has been incremented by 5.
- Sometime between one hour and the next, e.g. 10:00 and 11:00, a periodic job sends (STATE, 09:59, abc, 3) to every replica, stating that the state of counter abc at time 09:59 was 3. The service does its best to track the values of counters, but for reasons I won’t get in to, the service can be wrong about the states of counters. STATE requests are a source of truth. This will be very important later.
Some other facts about the system:
- High performance. Peak hours saw over 1 million QPS with 99th percentile latency under 2ms.
- Servers run NTP; their physical clocks are synchronized. I won’t really get in to why this matters in this post, but having bounds on clock drift is essential when one wants servers to coordinate using Logical Physical Clocks.
- ASK requests do not always turn into TELL requests.
- STATE requests are not comprised of the sum of all TELL requests made to the system. It’s close, but it’s different enough to be a source of chaos.
- The distribution of requests is uneven. One server may get 30% of all ASK/TELL requests for counter abc, but 0% of requests for counter xyz.
- The service tries to keep counters close to their limits at any given time. Over-incrementing is a bad outcome when a counter corresponds to a monetary budget. Under-incrementing is a bad outcome when a counter corresponds to achievement of some important goal.
Before I worked on this service:
- Replicas maintained connections to all other replicas. The number of open connections in the system was N(N-1)/2. O(N²) isn’t that bad; Erlang defaults to a fully connected cluster and scales to a decently high number of machines. However, we felt that we needed to do better in order to scale out the service, because…
- If any connection between any two connections was broken or stalled, all replicas would resort to the safe option of saying NO to every ASK request. With O(N²) connections, the likelihood of any connection breaking and entering the bad state of saying NO to every request would increase sharply as nodes get added to the cluster.
My task was to change the service so that it could be scaled out without increasing the likelihood of entering a bad state. To achieve this, I sought to:
- Remove the requirement for full network connectivity for saying YES to ASK requests.
- Relax the requirement from needing fresh state from all replicas to only needing fresh state from a majority of replicas.
Data Meets Math
One of the first things I noticed about the ANNOUNCE protocol was that it was very sensitive to messages being dropped. I changed the data in ANNOUNCE messages from being “everything a replica learned since the previous broadcast” to “everything a replica knows about the state of the system”. The ANNOUNCE messages got much larger in size, but I strongly believe the properties of these larger messages were worth the cost.
- Messages changed from being independent events to being members of a partially ordered set. The message {(abc, 10), (xyz, 5)} could be considered greater than {(abc, 8), (xyz, 3)} or {(abc, 1)}, but not greater than {(xyz, 8)}. Orders, even partial ones, are transitive.
- It became possible to ‘merge’ messages according to some least upper bound function. Given the messages {(abc, 10)} and {(xyz, 5)}, the LUB is the union; {(abc, 10), (xyz, 5)}. For sets of pairs where the first element is the same, a common LUB is to take the pairwise maximum, for example: {(abc, 10), (xyz, 5)} ⊔ {(xyz, 8)} would be {(abc, 10), (xyz, 8)}. In my case, I also had to account for replicas, so my messages resembled {(A, abc, 10), (A, xyz, 5), (B, abc, 3)}, where the state of counter abc would be 10 + 3 = 13. This merge function is associative, commutative and idempotent. Associativity can be useful when you have to merge a lot of messages (merging messages in a tree shape can be faster than merging in a list shape), but commutativity and idempotency were essential to making the counter service protocol more robust.
- A partially ordered set with a least upper bound function is a join-semilattice. Semilattices are monotonic, which is another great property for data flowing in a distributed system.
- A join-semilattice with an ‘update’ function, like a TELL message, can be considered a State-based CRDT, a form of data that provides strong eventual consistency; which means that once all replicas exchange and merge their state-based CRDTs; the replicas have equivalent state. Eventually consistent systems might need expensive reconciliation protocols, but this is not necessary in a system with strong eventual consistency.
Once applying ANNOUNCE messages to local state became idempotent, it became safe to retransmit messages without fear of introducing inconsistencies.
Once applying ANNOUNCE messages to local state became commutative and monotonic, it became almost harmless for the network to drop or reorder messages. ANNOUNCE messages contained the full knowledge of some replica, so it became easy to determine which ANNOUNCE message was the most recent; the greatest/maximum message according to our partial ordering. As long as ANNOUNCE messages were delivered in a timely manner, it did not matter if individual messages were dropped or reordered.
Once ANNOUNCE messages to local state became transitive, the service no longer relied on direct connections between replicas. If replica A ANNOUNCE’d at replica B, then replica B ANNOUNCE’d at replica C, the state would be the same as if replica A had also ANNOUNCE’d at replica C. Transitivity in ANNOUNCE messages relaxed the connectivity graph requirement from needing to be fully connected to needing to be strongly connected.
Problems in Paradise
Idempotency, transitivity, commutativity and monotonicity are all excellent properties to have in a distributed system protocol. However, there are obstacles that prevent CRDTs being used in production everywhere. I encountered two specific issues:
- Counter state was not always monotonic. As I mentioned earlier, STATE messages were a source of truth that often declared that a counter’s state was lower than the service expected.
- How could I shrink a message that was supposed to be monotonic?
Both problems were solved by the fact that STATE messages corresponded to some ground truth rooted at a specific point in the past.
STATE messages always reflected the true state of the system at the end of some hour. By changing my (Replica, Counter, State) triples into (Replica, Counter, Hour, State) tuples, and relying on the fact that STATE messages are always delivered to all replicas, a replica could stop sending tuples where Hour was before the latest STATE message. STATE messages helped put a practical bound on the size of ANNOUNCE messages.
Splitting counter state into hourly buckets also helped to account for nonmonotonic updates from STATE messages; While counter values are monotonically non-decreasing in ANNOUNCE messages, the Hour element of STATE messages were also monotonically non-decreasing. and I was able to change my least upper bound operation into a lexicographic least upper bound, which I feel is best explained by example: (0, 5) < (0, 500) < (1, 0). Applied to STATE messages, (08:59, 5000) < (09:59, 3).
Mo’ computers? No problem.
With that final change, I was finished. After writing, testing, and deploying the code to production, I was delighted to see that when a single instance of the service went down, whether due to a network partition, or perhaps a deployment, the rest of the cluster was healthy and answering YES to ASK requests.
I had a lot of fun working on that problem. I also had a really great time researching CRDTs and eventual consistency, implementing two different distributed system simulators, and eventually implementing the changes to the protocol. I’m hoping to write another post in the near future showcasing my favorite papers around this topic.
For the sake of brevity, many fascinating details about the service and protocol have been left out. I hope nevertheless that you remember and possibly consider CRDTs when you find yourself tasked with making some distributed system more robust.
- Previous: Generating code with jq
- Next: ETS Match Specification Pitfall: Maps
- Previous: Generating code with jq
- Next: ETS Match Specification Pitfall: Maps