Parallel Commits: A New Atomic Commit Protocol for Distributed Transactions(cockroachlabs.com)
What prevents a transaction from being prematurely aborted while there are some intents in flight?
From what I understand, transaction A can still be replicating intents and transaction B, not knowing that coordinator for transaction A is still at work, start a recovery process. This makes transaction A abort which could be prevented by waiting for intents of A to be successfully replicated.
From what I understand, with this new commit protocol they managed to improve response time of a writing transaction by shifting some work of determining its final status to the readers. Am I understanding correctly that readers' performance will degrade by the same amount?
While this is an achievement which some scenarios will definitely benefit from, like bulk loading or updating secondary indexes as mentioned in the article, what about other scenarios where read performance is more important, like the ones where there are much more readers than writers? Shouldn't there be a configuration option which commit protocol to use?
> Am I understanding correctly that readers' performance will degrade by the same amount?
Not quite. The "slow path" talked about is only applicable when the coordinator node is unavailable (presumably a rare event). If it's unavailable, there's nobody left to clean up the STAGING txn record, so the reader is tasked to do it itself.
In normal conditions however, once the coordinator node receives acknowledgement for the successful persisting of all write intents and the "txn STAGING" record, it can simply record "txn COMMITTED" in memory (and return to the client, send off async intent resolution procedures, etc.) Any subsequent read requests that observes left over intents (yet to be resolved) are pointed to the coordinator node, which can simply consult the "txn COMMITTED" record in memory. This is all safe because the commit marker is not simply stored on the coordinator node, it's a distributed condition and can be reconstructed by any observer even if the coordinator failed.
>Any subsequent read requests that observes left over intents (yet to be resolved) are pointed to the coordinator node, which can simply consult the "txn COMMITTED" record in memory.
Another roundtrip performed by reader rather than writer? That's what I'm talking about.
Though I understand it differently, as reader just waits until all writes and "txn COMMITTED" record arrive at it's node.
> Though I understand it differently, as reader just waits until all writes and "txn COMMITTED" record arrive at it's node.
Actually, welp, I believe this is closer to the truth in implementation. But consider how read performance compares before and after introduction of Parallel Commits. Before, when readers happen over extant 2PC "prepare" phase markers, they would still have to wait for txn resolution (on the coordinator node of the other txn, or on the node the intent was seen). They simply continue doing the same in Parallel Commits, there's no extra latency added to the read path (except when there is failure, but even then as soon as the earlier txn is recovered, future readers no longer get stuck).
>Before, when readers happen over extant 2PC "prepare" phase markers, they would still have to wait for txn resolution
Again, I understand it differently, but maybe I'm wrong. A reader upon encountering unresolved intent looks up corresponding transaction record. Before Parallel Commits, it's either marked COMMITTED or PENDING. If it's PENDING, reader just ignores it and skips its data, since they use MVCC. There's no waiting here.
Now with parallel commits, transaction record can also be marked STAGING, in which case the reader cannot determine if it's commited without additional work and/or waiting (the author doesn't go much into details).
> If it's PENDING, reader just ignores it and skips its data, since they use MVCC. There's no waiting here.
I think this is where the confusion is coming from. You're correct that a read can simply ignore writes, even pending ones, at higher timestamps due to MVCC. This improves transaction concurrency.
However, if a read finds a provisional write (an intent) at a lower timestamp, it can't just ignore it. It needs to know whether to observe the write or not. So it looks up the write's transaction record and may have to wait. If the write transaction is not finalized then it needs to either wait on the transaction to finish or force the transaction's timestamp up above its read timestamp. This is true regardless of parallel commits or not.
What parallel commits gets us is a faster path to transaction commit, as irfansharif pointed out below. So the write can not only be committed faster with parallel commits, but it can also be resolved faster to get out of other reads' ways. In that way, it improves both the synchronous latency profile and the contention footprint of transactions, assuming no coordinator failures.
Thank you for clarification.
If I understood correctly, the extra round trip on the reader only occurs with left-over intents, which are the product of an earlier failure.
- Writes are faster due to fewer round trips in normal running.
- Reads are the same speed in normal running.
- After coordinator failure events, the new coordinator starts to clean up left-over intents asynchronously (nothing specific is waiting for it to finish).
- Reads are slowed by extra round trips in the short time after a failure event, until the left-over intents are cleaned up. But only in that time period, and only for those ranges touched by transactions during the failure event.
- The cost of extra round trips done by the reader just during recovery is much less important than the round trips that happened on every cross-region write with the older algorithm.
- But if you care about read latency being consistent all the time, including during recovery from coordinator failure events, maybe you need a more sophisticated high-availability configuration for the logical coordinator.
> If I understood correctly, the extra round trip on the reader only occurs with left-over intents, which are the product of an earlier failure.
Left over intents are also visible for an ongoing txn, before those intents are resolved. But there's no added latency in the read path, I've commented elsewhere in the thread to explain how.
> After coordinator failure events, the new coordinator starts to clean up left-over intents asynchronously (nothing specific is waiting for it to finish).
The cleanup happens by readers on demand, there's no separate global coordinator scanning the keyspace and resolving old write intents.
> Left over intents are also visible for an ongoing txn, before those intents are resolved. But there's no added latency in the read path, I've commented elsewhere in the thread to explain how.
Thanks, that was a helpful comment.
> The cleanup happens by readers on demand, there's no separate global coordinator scanning the keyspace and resolving old write intents.
Oh, that's a little surprising. I assumed the coordinator(s) did so because asynchronous cleanup is mentioned numerous times in the article, but upon closer scrutiny I see now that it only applies in the after phase of transactions without a failure.
Would that scanning, analogous to RAID "resilvering" subject to write-intent ranges to limit the keyspace regions scanned, usefully improve read latencies later?
> I see now that it only applies in the after phase of transactions without a failure.
> Would that scanning, analogous to RAID "resilvering" subject to write-intent ranges to limit the keyspace regions scanned, usefully improve read latencies later?
I think it's just a better design to have it done on demand. The keyspace is large and failures are rare, and when one of these zombie intents are happened upon, the very first reader addressing it resolves it for all subsequent readers. A global scan would improve read latencies later, but not by much and not for many readers.
Why is it so important for transactions to be interactive? I feel like this is not a feature that I use frequently as an application developer and it makes distributed transactions so much harder.
It seems like something that accidentally came along from SQL, and is imposing a large cost.
Interactive may be a little misleading here. Consider a common case in any java application at my current place of employment.
1. A controller starts a transaction.
2. Some code looks up a record in a database.
3. Based on the results of that lookup we run 1 of two possible writes to a table in that database.
Knowing what kind of write you will do in the transaction requires knowing the result of the lookup which itself must be run in the transaction. Now take this small case and imagine it in the real world scenario where there might many such lookup specific write logic all wrapped in a single transaction and the space of possible combinations of statements you will want to execute in the transaction is large. None of this is interactive in the sense of someone working in a shell. But it is interactive in a way that makes the transaction useful in the real word.
Exactly. What Calvin (and FaunaDB, I think) does is allow you to run an arbitrary function which takes input on the database. That function can do whatever it wants, including any number of reads, writes, and arbitrary logic based on the reads. But critically, it can't talk back to the calling client, except for to send the final result.
This allows you to implement the pattern you describe, which I agree is common, but in a dramatically simpler way.
Having the database, which is not really a single thing, but a swarm of computers spread across the globe separated by unreliable links who are trying to stay in consensus, pause their work to hear back from the client just seems ... well it seems miraculous that it can work at all.
The Calvin way is so much easier, it seems like there must be some very good reason that it's not what CockroachDB does. But I've never heard what that reason is.
Calvin has been an elegant protocol to work with in practice, and has pretty radically simplified FaunaDB's implementation of transactions compared to classic 2PC. Writes are committed in one global communication exchange, read isolation is pretty straightforward, and not requiring transaction recovery cuts out a significant amount of complexity which tends to be overlooked.
In talking with others, my best guess as to why we've seen relatively few implementations of it in the wild is that it is just less well understood compared to 2PC, so misconceptions propagate. The original paper focuses on how it works, rather than how to apply it in detail to generic transaction processing, which perhaps is a shame in hindsight considering that is where most of the confusion lies, IMHO.
For example, there is no reason stemming from Calvin that FaunaDB cannot provide full SQL-style session transactions. We chose not to implement them because they aren't a good fit for the core use-cases the system currently targets. Specifically, interactive transactions are too chatty for client-server interactions over the internet where link latency dominates an application's perceived speed: Instead, FaunaDB's interface encourages packing as much logic into as few requests as possible. (But I suppose that's a topic for another comment thread.)
Would that include transactions for which the reads can query the whole database as opposed to a predetermined set of rows?
The SQL support? Yes, even that.
> That function can do whatever it wants, including any number of reads, writes, and arbitrary logic based on the reads. But critically, it can't talk back to the calling client, except for to send the final result.
If that function can still do everything the client did and the client still has to wait for a transaction - you are only eliminating interactive communications overhead, not actually improving or simplifying anything fundamentally. There is still consensus, coordination happening during that waiting and all of this is still fundamentally incompatible with computers spread across the globe communicating over unreliable links.
What would actually be a big improvement is eliminating waiting for coordination, but would require some change in programming model .
> The Calvin way is so much easier, it seems like there must be some very good reason that it's not what CockroachDB does.
It's just not "so much easier", that's the reason.
Nobody in industry understood how to apply Calvin until we did at Fauna. That is the only reason; the rest is engineering path dependence.
if you don’t mind sharing, I’m curious to learn more what was the main challenge was applying Calvin.
See @freels’ reply above.
I think it is fair to say that the Calvin paper is visibly incomplete and expresses some constraints in a way that makes them seem insurmountable when they are not; specifically, they are only constraints within the log, but do not constrain the database experience overall.
Applying Calvin to traditional RDBMS workloads was a very unlikely creative exercise because it required questioning these explicit constraints.
The Spanner paper also leaves a lot unexplained, but it is less obvious until you are too far down the path to turn back. After all, it worked for Google. Calvin did not have that real-world proof. Combine that with the pessimism of the paper itself and nobody was willing to pick it up.
Wouldn't you have to implement a DSL and all the parts related to it for that to work? Also things like serialization cost. What if I have a large local in memory structure I want to base my query results off of? Funny enough this is kind of similar to the problem solved by things like apache beam.
No you'd need a complete programming language. But SQL is basically one already, most variants are already turing-complete.
You'd also have to provide the programming environment with concepts of cursors and so on so they could page through data efficiently.
Every procedural layer I've ever used bolted onto SQL (pl/SQL, t-sql) has been absolute goddamned agony to use.
SQL is a good (if dated) language for relational access and manipulation, but awful for procedural scripting.
This article implies that it reduces latency.
I like the elegance and simplicity of two-phase commits. I didn't understand the criticism in the article; maybe it's something specific to CockroachDB. In my experience with two-phase commits, if the system crashes before a transaction is fully processed and committed, it should be fully reprocessed (from scratch) once the server restarts.
In one of my open source projects with RethinkDB (which doesn't natively support atomic transactions), I implemented a distributed 'parallel' two-phase commit mechanism by assigning each pending record a shard key (an integer derived from a hash of the record id); then worker servers would decide which subset/range of DB records to process/commit based on a hash of their own server id (which would tell them which range of shard keys/records they were responsible for). Only when a record had been fully processed, its status would be updated as committed by writing a 'settled' flag on that record.
Whenever a server failed and restarted, it would pick up processing from the last successful commit. If a server did not restart, the worker count would be updated and remaining workers would redistribute the partitions among themselves based on the shard keys of the records.
The "criticism" as it pertains to CockroachDB and off-the-shelf 2PC is less so about 2PC in isolation and more so about layering 2PC on top of consensus groups used to persist records. When any given txn does the "prepare" phase, it lays down markers for a possible upcoming commit. If the 2PC coordinator fails in an inopportune moment, there's a delay between the failure and the markers being cleaned up (whether or not the transaction is "reprocessed" or aborted). The reason why this delay is problematic is because any subsequent transactions that happen upon said markers, they just have to wait for the resolution ("commit"/"aborted") aka it blocks. So clearly recovery must be built into 2PC, i.e. the transaction state itself must be persisted. This is done so in the same way the markers/regular writes are, through consensus. But marking the transaction state as "committed" can only happen once we're guaranteed that all the individual write markers are persisted. Which adds a second round of consensus.
> In my experience with two-phase commits, if the system crashes before a transaction is fully processed and committed, it should be fully reprocessed (from scratch) once the server restarts.
Then the problem becomes, in a fully distributed system how do you know which was the last successful commit before the system crashed?
That 'settled' flag you mention may be set in the coordinator node's storage just before the coordinator crashed, but not communicated to any other node because of the crash.
So the other nodes have to wait for the coordinator to restart, and replay its storage, to find out if that transaction was successful.
That pause can be quite long, especially if it involves a coordinator rebooting, and really long if it involves resynchronising a RAID, checking filesystems/DB integrity, etc.
The other nodes could decide it doesn't matter, exclude the coordinator from the cluster, and reprocess starting from an older transaction. But then they have to agree which older transaction - a distributed consensus problem.
This is certainly possible, but I wouldn't call it simple, and it wouldn't be two-phase commit any more.
Restarting older transactions tends to be visible to clients as well, because they may see transaction retries, and it's often desirable to minimise the number of those from a distributed database for minor failures (such as one node harmlessly going offline). Among other things, they can cause load spikes in the large system, and may exercise retry corner cases that don't come up normally, surfacing bugs that shouldn't be there in application code, but are.
This is about reducing the number of message delays before the commit succeeds. Failure scenarios have to be handled to be correct, but this is a performance optimization primarily from what I can see.
> Whenever a server failed and restarted, it would pick up processing from the last successful commit. If a server did not restart, the worker count would be updated and remaining workers would redistribute the partitions among themselves based on the shard keys of the records.
You don't describe how you come to a global view of which is the "last successful commit". Do you mean that the co-ordinator recovers (such as from its own log) and uses that information, or do you mean you use local information? In the latter case, what you describe doesn't sound like it preserves transaction atomicity across node crashes.
In RethinkDB, a table can be split into shards and each shard has at most one master and multiple replicas. With one master to perform all writes for a shard, there is no need for a co-ordinator. If the master of a shard goes down, a replica can be promoted as the master. For writes, RethinkDB prioritizes consistency over availability so a server failure may carry some downtime in terms of writes but reads can be configured to be highly available (though they could potentially be out of date).
But there is definitely a downside that if a RethinkDB master server fails immediately after a write (before it propagated to a replica) and does not recover, then there can be some data loss (but only the most recent writes on that shard).
So if I'm reading this right, the master process can lose writes (if there's permanent failure). So taking your 2PC scheme above with the 'settled' flag, you can respond to the client that the txn has been committed once you've written the flag, but this commit marker can also just be entirely lost? (again, permanent failure)
If this 'settled' flag exists on each 'shard' instead of just the coordinator, any random subset of those too can be lost? I don't understand what's going on here, or what guarantees this 2PC implementation provides.
Actually, I may be mistaken about my previous commment. I'm not completely sure if this loss of recent data would happen as I've described. It depends on client implementation. For example, a client could wait for a write to propagate to at least 1 replica before telling the caller that the data was inserted successfully. This is an implementation detail I'm not sure about.
Also the settled flag exists on each record, not each shard. A shard is typically made up of multiple unsettled records. Each worker is assigned to a shard using a hash function so it's deterministic and the worker only processes unsettled transactions from their own shard.
Also I said something else misleading in one of my previous comments. In my case, the shard key of each record (which determines which shard a record belongs to) was not based on its own record ID but on the account ID of the user who owns that record. So effectively the sharding was happening based on user accounts and it was designed so that the records created by an account could be processed independently of records created by a different account.
Given how critical preventing future intent writes is to the protocol to ensure safety during recovery, it'd be nice to have more detail on how that works. Calling it an in memory data structure doesn't exactly inspire confidence.
Agreed, we should be talking about the "timestamp cache" in more detail generally. While I'm here, looking at  helped me confirm how everything is kosher despite being in-memory. The timestamp cache pessimistically maintains a "low water mark", this always ratchets up monotonically and represents the earliest access timestamp of any key in the range. Writes happening at timestamps lower than this water mark are not let through, and bumping this watermark past the point of the observed write intent's timestamp is how slow inflight write intents are aborted by recovering read requests. On server restart, the timestamp cache is initialized with a low water mark of the current system time + maximum clock offset, so "future" write intents (or more accurately: write intents sent in the past but previously stuck in transit) are simply rejected.
Wow, this is impressive.
What I'll say might come off naive but I accept this at an attempt for a reductionist's viewpoint.
A singular update that side affects *N (times N) where N is greater than 1 will always lead to either a race condition or latency.