Skip to content

Concurrent Compaction and Ingestion Analysis

Paul Rogers edited this page Apr 6, 2023 · 1 revision

Analysis of a case in which Druid looses data and which lead to the tombstone feature. This sounds like an issue fundamental to the Druid design rather than a simple code flaw. This paper analyzes the scenarios from a theoretical basis so we can clearly see our options.

The analysis simplifies the problem by considering only one segment interval, say yesterday. It uses diagrams to illustrate the scenarios in which visible segments are shown as dashed lines, overshadowed segments by “X”s. We work through a complete set of scenarios, starting from the simplest. If you see a missing scenario, please point it out. Red section headings indicate incorrect results.

The closing sections present a potential new versioning system that overcomes the problems with the current system. A final section takes a shot at what we can do short-term.

Batch Ingest Only

The simplest scenario is when we only do ingestion.

Single Segment

We start with ingesting a single segment. The first segment loaded creates the version.

Success

At the completion of a first batch ingestion we have one segment labeled with our version.

|--------------------| Segment s1, Version v1
|....................|
   Segment Interval

Everything is good.

Failure

If the ingestion runs, we have no segments at all:

|....................|
   Segment Interval

Good here as well.

Second Segment

Suppose we then run a second batch job which adds a segment. According to Druid’s rules, that second segment takes the same version as the first segment.

Sequential - Success

Suppose we are now at the same state as the success case above: single segment s1 with version v1. We now load a second one. At completion:

|--------------------| Segment s2, Version v1
|--------------------| Segment s1, Version v1

(From here on, we won’t show the segment interval itself for brevity, unless the interval is empty.)

All is good: the two segments have the same version, so both are visible to queries.

Sequential - Failure

Suppose the second job fails. No harm, we are right back where we started after the first ingestion:

|--------------------| Segment s1, Version v1

Concurrent - Success

Now, suppose instead of sequential, we run the two ingestion jobs in parallel: j1 and j2. Here we must make an assumption. The metastore DB will parallelize the two lock requests, let us assume that j1 occurs first and is given version v1. We must assume that, when j2 asks for a lock just afterward, that it will get the same version: v1. Then, assume j1 finishes first. We have the familiar pattern:

|--------------------| Segment s1, Version v1

When job j2 finishes, we have the same pattern as the sequential case:

|--------------------| Segment s2, Version v1
|--------------------| Segment s1, Version v1

Concurrent - Failure

Now let’s consider what happens in a failure. We consider two cases: job j1 fails, or j2 fails. We again must make an assumption: that the version v1 assigned to j1 persists even after j1 dies. It is somehow transferred to j2. In this case, if job j1 dies and j2 completes, we get:

|--------------------| Segment s2, Version v1

If, however, j1 completes and j2 dies, we get:

|--------------------| Segment s1, Version v1

These are both good case. We require that, in both cases, v1 be considered the latest version. In particular, if this scenario is followed by a batch run j3, that creates segment s3, we want:

|--------------------| Segment s3, Version v1
|--------------------| Segment sx, Version v1

Where “x” is 1 or 2 depending on which concurrent job failed. If this is the outcome, then we’ve satisfied the assumption made earlier: the the version persists as long as some segment was created with that version.

Real-Time Ingestion

In real-time ingestion, a segment is queryable while it is being created. We again assume the same serialized locking behavior. A bit of thought suggests that the same scenarios out outcomes hold. It is an exercise for the reader to check this assertion.

Concurrent Batch Ingestion and Compaction

With the above basics in hand we can now dive into the complex cases, some of which won’t end as nicely as the above.

We should start by defining what compaction does. Ingestion produces a set of segments with a shape dependent on the input data, especially in the Kafka case in which we can’t reliable shuffle data at ingest time. Compaction works independently for each segment interval. It takes a “bundle” of existing segments, shuffles the rows, to change partitioning, or better balance segments, then writes a new “bundle” of segments at a new version.

This works fine if compaction is the only task running against that bundle. We run into problems when compaction works concurrently with other tasks. Unfortunately, however, concurrency is the name of the game with systems at scale.

Sequential Ingest then Compaction

Here, we imagine we’re at the end of the single-segment success case:

|--------------------| Segment s1, Version v1

Compaction now starts and creates a new, unpublished version v2. We use the “*” sign to suggest it is unpublished:

|********************| Segment s2, Version v2
|--------------------| Segment s1, Version v1

Queries use v1 during compaction. All good.

Success

If compaction completes, segment s1 is overshadowed by s2 as we expect:

|--------------------| Segment s2, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

Failure

If failure occurs, we discard segment s2 and we’re back where we started:

|--------------------| Segment s1, Version v1

Sequential Ingest, then Compaction, then Ingest

Let’s take the challenge up a notch. Suppose we follow each of the above by another batch ingestion into the same segment interval. It is essential that this segment take the correct version.

If compaction succeeds, segment s3 must take version v2:

|--------------------| Segment s3, Version v2
|--------------------| Segment s2, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

However, if compaction failed, the segment s3 must take version v1.

|--------------------| Segment s3, Version v1
|--------------------| Segment s1, Version v1

Because we assume that the second ingestion starts after compaction completes there should be no ambiguity about the version. Any errors would simply be coding errors we can fix.

Concurrent Compaction and Ingestion

Now we get to the hard part. Keep the above section in mind. Now, imagine that we do compaction concurrently with our second ingestion to produce segment s3. We’ve got an unresolvable race condition.

Compaction Starts First

Suppose that the compaction job, c, starts before the second ingestion job j2, both both run concurrently, with just, say, a second between the starts.

Both Succeed

If both tasks succeed, we want the ingested segment from s3 to have the save version as the now-compacted data from s1, now in s2:

|--------------------| Segment s3, Version v2
|--------------------| Segment s2, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

This is the same as if the compaction ran first, followed by ingestion, and we get the right answer.

Ingestion Fails

If ingestion fails, segment s3 is never published, and all is good:

|--------------------| Segment s3, Version v2
|--------------------| Segment s2, Version v2

Compaction Fails

Now the bombshell drops. If the compaction job fails, we’re in deep trouble. The ingest job only knows the version to use when it starts: it has no way to adjust later. So, we end up as in the “both succeeded” case, but without s2:

|--------------------| Segment s3, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

This is data corruption, data loss: the data from segment s1 is overshadowed by the new ingestion job. But, what we actually want, but cannot get is:

|--------------------| Segment s3, Version v1
|--------------------| Segment s1, Version v1

Take time to study the above logic and why we get the error. The basic assumption is that segment versions are assigned at the start of ingestion and cannot be changed later.

Ingestion Starts First

Now, consider the same scenario, but suppose that ingestion starts a second before compaction. In this case, ingestion will be assigned the current version, v1, and compaction will define a new version v2. Here we have the opposite case as above. As it turns out, the only way to get the right answer is if one of the tasks fails. If they both succeeds, we corrupt the user’s data.

Both Succeed

Suppose, in this scenario, that both tasks succeed. Our new data is overshadowed with the old data, though the new data was not compacted, because it didn’t exist when compaction started:

|xxxxxxxxxxxxxxxxxxxx| Segment s3, Version v1
|--------------------| Segment s2, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

Again, we’ve lost data: this time, the new data.

Ingest Fails

Suppose the ingestion fails. If so, we’re fine:

|--------------------| Segment s2, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

Compaction Fails

Suppose compaction fails. If so, we’re actually right where we want to be:

|--------------------| Segment s3, Version v1
|--------------------| Segment s1, Version v1

Discussion

Careful analysis of the above should suggest that there is no fix for concurrent ingestion and compaction tasks. No matter what clever thing we do with versions, we can make either the success or failure case work, but not both.

Exclusive Locks

The obvious fix is to prevent concurrent tasks. Compaction and ingestion are mutually exclusive.

The above is a highly simplified case: we assumed that ingestion knows its target segment interval up front. However, in practice, such intervals are determined by data: we don’t know what intervals ingestion will lock until we read the data.

Because of the delay, the user experience with exclusive locks is horrible.

  • Suppose a batch ingestion task starts and mostly reads into some interval i2. Concurrently, a compaction task rewrites interval i1. Right at the end of the ingestion job, the ingestion task reads data into i1. The entire long batch ingestion task fails and all data written to both i1 and i2 must be discarded, so the task can reliably restart. (If we only discard data for interval i2, then we can’t restart the task.)
  • Suppose we do it the other way. In the above, the ingestion for i2 kills the compaction task. In this case, we discard all the work for compaction, and must start over. A single row of late-arriving data every once in a while can cause churn the system attempts over and over to compact i2, and fails.

Thus, because of the ingest time delay, exclusive locks are not a good solution.

Dynamic Versions

We might observe that the problems above arise because ingestion must pick its segment version early, but the version we should have picked depends on the future success or failure of compaction. Since we’re still working on our time travel enhancement, we might suggest to postpone obtaining the version as long as possible. There are two issues.

First, if writing to S3, there is no atomic rename: we have to choose the file name before writing. This the gist of the problem: we can’t choose version v1, then later switch to v2.

Even if we could rename (locally or on HDFS), doing so would require more coordination:

  • Lock the file (get a version) when the file is to be created.
  • Confirm the lock (get a different version) as a pre-publish step.
  • Publish the segment.

Even here, there is a time window between “confirm” and “publish” where a race condition could occur. So, perhaps we combine the confirmation with publish:

  • Lock the file (get a version) when the file is to be created.
  • Publish the segment (get a different version).

Now we have a race condition the other way: the Coordinator starts asking historicals to load the new segment before ingestion renames the segments to the new name.

Clearly we’re not making headway here.

Version Overlays

We note that the problem with not knowing which version to use might be overcome if a version could be marked as “extend the prior version, whatever it is.” Then, in our concurrent ingestion and compaction scenario, compaction starts, followed by ingestion. Compaction gets a “hard” version v2, ingestion gets a “soft” version v3.

If both succeed:

|--------------------| Segment s3, Version v3 (overlay for v2)
|--------------------| Segment s2, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

If compaction fails:

|--------------------| Segment s3, Version v3 (overlay for v1)
|--------------------| Segment s1, Version v1

Hooray! Both cases now work. The “overlay for vx” is computed: version v3 adds to whatever version is just underneath it.

Now consider if ingestion starts first, then compaction. If both succeed.

|xxxxxxxxxxxxxxxxxxxx| Segment s3, Version v2 (overlay for v2)
|--------------------| Segment s2, Version v3
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

We’re out of luck. In this case, the version we want to add to is a future version, not a past version. This shows that the overlay solution is a bit to much of a hack to work.

Root Cause

We can now look that the root cause of the problem: the proper ordering of versions depends on not just on a time sequence, within a segment, but also on the logical relationship between segments. This points us toward Jihoon’s segment locking idea.

When compaction runs, it replaces some bundle of segments within a segment interval. Ingestion creates another bundle of segments within that interval. Logically, these two bundles have no dependence on one another: they represent independent sets of rows. Imposing a time ordering just means we’re trying to use the weather in New York to decide what to wear in San Francisco: they are unrelated events.

Before we get too excited, however, we have to introduce one more complexity: data replacement.

Sequential Replacement

Suppose we revisit the original Druid use case: the data of record resides in HDFS (or S3), and Druid holds a materialized view of that data. We wish to update our view by replacing time chunks of data. We start, as before with ingestion of a single segment:

|--------------------| Segment s1, Version v1

Replacement creates a new segment, s2, with version v2, which overshadows v1:

|--------------------| Segment s2, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

If replacement fails, we’re back where we started and all is good.

Partial Replacement

Druid just completed an arduous effort to add tombstones to Druid to handle partial replacement. Let’s understand the issue. Suppose the interval we are on is March 3. A replacement job replaces data for March 2-4. But, there is no new data for March 3. Before that fix:

|---------|         |---------| New segments, version v2, -, v2
|XXXXXXXXX|---------|XXXXXXXXX| Original segments, version v1, v1, v1
|.........|.........|.........|
  March 2   March 3   March 4

Here we created new segments for data on March 2 and March 4. Since a segment can’t be empty, and there was no data on March 3, so no new segment is created on that day and the prior data “shows through.”

The versions are shown as “v1” and “v2” for simplicity. In reality, the versions across segment intervals are uncorrelated, we only now that “v2” comes after “v1” for each interval.

The Tombstone is to introduce a special marker, the tombstone to indicate that there is no data:

|---------|nnnnnnnnn|---------| New segments, version v2, v2, v2
|XXXXXXXXX|XXXXXXXXX|XXXXXXXXX| Original segments, version v1, v1, v1
|.........|.........|.........|
  March 2   March 3   March 4

Where the “n” symbols represent a line of tombstones.

Concurrent Replacement

Now, suppose that replacement occurs concurrently with ingestion. That is, for some odd reason, the user both replaces an interval at the same time that some normal batch ingest job adds data to that day. We again have race conditions. Let’s call the replacement job r that produces segment sr, and the ingest job j that produces segment sj.

Replacement Starts First

Suppose our data starts as before:

|--------------------| Segment s1, Version v1

In the first scenario, the replacement job r starts first, followed a second later by the ingest job j. The replacement job gets a new version, v2. The ingest job arrives and asks to append to the latest version. The latest version is v2, so that’s what the ingest job uses.

Right about now, or intuition should be telling us that this will end up like the compaction scenarios. Let’s see.

Both Succeed

If both tasks succeed, we end up with the following.

|--------------------| Segment sj, Version v2
|--------------------| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

This actually makes sense. Since the ingestion started after replacement, we can assume ingestion is adding to the new data, which is what we got.

Replacement Fails

Suppose replacement fails. We now have corruption:

|--------------------| Segment sj, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

The old data is overshadowed, but the replacement data didn’t survive.

Ingestion Fails

If ingestion fails, we’re in good shape, only the replacement occurs:

|--------------------| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

Ingestion Starts First

Now, assume ingestion starts first, followed a second later by replacement. We can argue two cases:

  • Replacement should replace all data, even that arriving just now.
  • Replacement should replace only data that arrived before replacement started.

We note that there has to be some cutoff: we can’t replace data that arrives after the replacement task ends. So, we have to impose some kind of ordering. Today, that ordering comes from version assignment. The ingestion job j will start with an append operation, on top of the version current at the time ingestion starts: v1. Replacement then creates a new version, v2.

Ingestion Succeeds First

Suppose our ingestion task, which started first, succeeds first. Before replacement is complete, we have:

|--------------------| Segment sj, Version v1
|--------------------| Segment s1, Version v1

If the replacement job r were to fail, then we’d be left with the above as well.

Once replacement completes, we have:

|--------------------| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment sj, Version v1
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

The replacement overshadowed both, which is what we expect if we consider the ingest to be part of the data to replace.

If the ingestion fails, we’re left in effectively the same state:

|--------------------| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

However, if we then restart and rerun ingestion, we end up in a different state:

|--------------------| Segment sj, Version v2
|--------------------| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

Now the new data adds to the replacement set. This says that the relationship between the two data set depends on the order things happen in Druid rather than some relationship determined by the application. This is probably more of a bug than a feature.

Replacement Succeeds First

Suppose that the replacement task completes first, though the ingestion started first. Now we have:

|--------------------| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

If the ingestion fails, we are left with the above state. If ingestion succeeds, we now have:

|xxxxxxxxxxxxxxxxxxxx| Segment sj, Version v1
|--------------------| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

The data is overshadowed as soon as it is published. This is consistent with the above behavior, which is good: we get the same result independent of the order that tasks complete.

Concurrent Replacement and Compaction

Let’s really exercise our brains: what happens if we replace and compact at the same time? We have a compaction job, c, which produces segment sc, and a replacement job r that produces segment sr. Again, we focus on one segment with version v1:


|--------------------| Segment s1, Version v1

Compaction Starts First

Now let’s assume that compaction job c, starts a second before the replacement job r. In this case, c will get the next version: v2. Replacement will get the version after that, v3.

Both Succeed

If both jobs succeed:

|--------------------| Segment sr, Version v3
|xxxxxxxxxxxxxxxxxxxx| Segment sc, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

Here, the compaction job correctly overshadowed the original segment, and the replacement overshadows the (somewhat futile) compaction. This is what we’d expect.

Compaction Fails

If compaction fails, the replacement succeeds and overshadows the original data correctly:

|--------------------| Segment sr, Version v3
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1
Replacement Fails

If the replacement fails, then the compaction overshadows the original data correctly:

|--------------------| Segment sc, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

Replacement Starts First

Now, assume that replacement starts first so that sr is given v2, while the compaction starts a second later and is given the version v3.

Both Succeed

Suppose both that both tasks succeed. Version v3 takes precedence:

|--------------------| Segment sc, Version v3
|xxxxxxxxxxxxxxxxxxxx| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

This is data corruption: the compacted v1 data overshadows the replacement v2 data.

Replacement Fails

If the replacement task fails, we are correctly left with only compaction:

|--------------------| Segment sc, Version v3
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

Compaction Fails

If the compaction task fails, we are correctly left with only replacement:

|--------------------| Segment sr, Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1

This state is also a race condition: we will be here temporarily if both succeed. Thus, the user will see, at three different times:

  • Original data
  • Replacement data
  • Original data

Partial Overshadowing

Druid has one more trick up its sleeve: the ability to ingest replacement data at a finer grain than the original data. We’ve been using day segment grain. Now let’s mix day and hour grains.

Complete Replacement

Let’s start simple. We again focus on a single day with a day grain segment. Now, we replace all 24 hours, with no concurrency. The new segments are sr(h), where h is an hour. The end result is:

|---|---|---| ... |---|---|---| Replacement segments, sr(h), version v2
|XXXXXXXXXXXXXXXXXXXXXXXXXXXXX| Original segment s1, version v1
|...:...:...: ... :...:...:...|
 00  01. 02         21 22  23
            March 3

Partial Replacement

With the finer grain, we can replace a subset of ours, say the last three:

                  |---|nnn|---| Replacement segments, sr(h), version v2
|-----------------XXXXXXXXXXXX| Original segment s1, version v1
|...:...:...: ... :...:...:...|
 00  01. 02         21 22  23
            March 3
``

### Partial Replacement, With Tombstones

Now, consider the scenario above: we replace 24 hours of data, but some of them have no data. We insert tombstones in their stead:

```text
                  |---|nnn|---| Replacement segments, sr(h), version v2
|-----------------XXXXXXXXXXXX| Original segment s1, version v1
|...:...:...: ... :...:...:...|
 00  01. 02         21 22  23
            March 3

The tombstones produce the correct results when queried.

Concurrent Replacement

We could ask what happens if we try to replace the same segment concurrently in two replacement tasks. If we allowed this, we’d have the same race conditions we saw above, since compaction is a kind of replacement. Druid uses exclusive locks, however to prevent concurrent replacement, so it can never occur.

Druid does allow concurrent replacement in non-overlapping segments. We can have one replacement job replace hour 00-03 on March 3, while a different job replaces 21-23. Note that such a combination sets up even more race conditions with compaction.

Segment Rewrites

We now explore one final set of use cases: a set which came up in recently, but which we may workaround using a different approach, yet we should still consider. A users wanted to update tables to reflect changes to schema, such as dropping a column. In this scenario, every segment is independent: for the most part, removing column x from segment s1 can be done independently from removing column x from segment s2: no data need be shuffled between them.

Using compaction for this task is OK, but it is overkill: we don’t inherently need to read all segments in an interval for this task, though, of course, there is no real harm in doing so, other than wasting resources.

What we’d like is to work file by file. We can’t, of course, because we can’t version files, only segment intervals. Suppose we start two segments in our interval.

|--------------------| Segment s1, Version v1
|--------------------| Segment s2, Version v1

We want to rewrite just segment s1 to create segment s1' (s1 prime). We can’t give it a new version because then s1' would overshadow s2, causing data loss:

|--------------------| Segment s1', Version v2
|xxxxxxxxxxxxxxxxxxxx| Segment s1, Version v1
|xxxxxxxxxxxxxxxxxxxx| Segment s2, Version v1

But, we can’t give s1' the existing version, because then we duplicate data:

|--------------------| Segment s1', Version v1
|--------------------| Segment s2, Version v1
|--------------------| Segment s1, Version v1

In short, although there are great reasons to want to replace individual segments, our versioning system won’t allow it.

Related Use Cases

The above is one example of a set of related use cases:

  • Upgrade segments with some new feature (say, indexes for numeric dimensions)
  • Repair a corrupted segment (due, say, to some bug somewhere)
  • Delete a row from a segment (because of a GPDR request, say)
  • Replace a row in a segment (to, say, patch an application error)

In the partial overshadow case, delete the overshadowed data, retaining the visible portion.

In all such cases, we are forced to rewrite the entire segment interval to change anything at all. This should be telling us something is amiss.

Discussion, Part 2

We’ve now seen even more evidence that the Druid system works most of the time, but that there are inherent flaws in the concurrency system that no amount of patching can fix. We can only play Whack-a-Mole: fixing the problem in one scenario necessarily causes a different one to fail.

We can again ask about root causes. They are:

  • Operations which are inherently ambiguous (replacing data while adding new data)
  • Attempting to use version an entire segment when some operations inherently work with subsets of files within an interval.
  • Needing to add data concurrently with replacing/rewriting data, but having only one version to use to manage two operations.

This is telling us that the versioning system, as clever as it is and as well as it has mostly served us, is incomplete.

Brainstorm a Replacement System If the current system has holes, how might we fix it? One place to start is to consider similar problems, such as Git or log-based databases.

  • Not every object will be at the same version.
  • The file is the lowest level of versioning.

Druid already has support for the first idea . The idea of different interval grains during replacement leans toward the second.

Git-like Versions for Druid

Can we go all the way?

  • A table is a collection of versions.
  • A version is a collection of files and tombstones.

Each Druid table maintains a version log. Each log entry is one of:

  • A file addition.
  • A file deletion.

A file tombstone (a mask of some or all of the time interval for a file.)

We assume that deletions and replacements are of time ranges.

There are only two fundamental operations:

  • Insert: add a new file.
  • Tombstone: mask all or a time range of an existing file.

They are combined to provide three higher-level operations.

  • Insert: a collection of file inserts.
  • Replace: a collection of tombstones and inserts.
  • Delete: a collection of tombstones.

In this model, there is no difference between an interval-based replacement (classic Druid replace) and a single-segment replacement (for Polaris segment rewrites.)

A version is a collection of inserts and tombstones applied as an atomic unit. The system moves from one version to the next atomically, applying all changes in that version as a unit.

We then must determine how to compute the “working set” of files. A working set consists of a list of files visible for querying, along with a partial tombstone mask over the file. A file is removed from the working set if the tombstone hides the complete file.

Logically, to obtain the working set for. a version, we simply replay the version history:

  • Start with the first addition. Add each file to the working set.
  • On each insertion, add the new files to the working set.
  • On each tombstone, apply a “mask” to the file corresponding to the tombstone range. If the entire file is now masked, remove the file from the working set.

This is the “change-set” view: we construct the working set by applying changes.

The dual of this is the change set view is to working set that results from each version. Given version v1 and a new version v2, we apply the changes in v2 to v1 and we store the resulting working set along with the v2 version.

Note that since the above is file-based, the notion of interval only appears in tombstones. Clearly files must be labeled with their data intervals so we know how to work with them, but that interval is not part of file identity as in “classic Druid.”

Timeline

Versioning helps us build the timeline. It is clear that, given a set of files and their masks, we can apply the timeline algorithm to produce a timeline. In fact, we can do so even if files are not aligned at any particular segment boundary or segment size. Conceptually, we define a time chunk as a (file, interval) pair. If a file has no mask, the chunk is the whole file. If the file has one or more masks, then the file has one or more chunks: one for each non-contiguous interval visible in that file.

From there, we simply sort the chunks and bucket chunks into whatever intervals are the most useful for querying. We can even use a tree approach: build wide intervals, and split as the intervals begin to hold too many chunks.

Concurrency

Let’s now examine the case that failed earlier: concurrent replacement (true replacement or compaction) concurrent with ingestion. Since we work at the file level, we’ll see that there is no conflict.

As before, we start with a single file (segment) at version v1:

v1: (+s1) [s1]

Now, since the file interval does not matter, we just show the working set for the version. The parens enclose the operations: we add file s1. The brackets enclose the working set: just file s1.

Again, we run a replacement task r that produces a replacement file sr, and ingest job j that produces a file sj.

Replacement Starts First

We assume that replacement r starts first. It is assigned version v2. Then, the ingest job j starts a second later, creating version v3. (Note that, in this model, all operations create new versions.)

Replacement Finishes First

If both succeed, we get one change-set for v2 and another for v3. The order of completion is important.

Let’s assume replacement finishes first. then have

v2: (-s1, +sr) [sr]
v1: (+s1) [s1]

A while later, ingest finishes, leaving us with:

v3: (+sj), [sr, sj]
v2: (-s1, +sr) [sr]
v1: (+s1) [s1]

The working set for v3 is exactly right.

Ingest Finishes First

Or, if the ingest finishes first:

v3: (+sj) [s1, sj]
v1: (+s1) [s1]

Now the tricky bit. Replacement finishes. We could do some tricky bit of surgery to insert v2 under v3, and replay the history from v1, v2 to v3 to recreate the proper working set:

v3: (+sj), [sr, sj]
v2: (-s1, +sr) [sr]
v1: (+s1) [s1]

Or, we can be clever and notice that the two versions touch separate files. They are like Git commits that touch separate files. They can be replayed in either order. So, let’s just play them in completion order:

v2: (-s1, +sr) [sr, sj]
v3: (+sj), [s1, sj]
v1: (+s1) [s1]

Note that, in this formulation, we get the correct results where, in the Druid implementation, we got data corruption.

Replacement or Ingest Fails

From the above, it should be clear that we get the proper result if either task fails, since already examined an intermediate step above which would be the final step on failure.

Ingest Starts First

We now reverse the order: ingest starts first and is given version v2. Replacement starts just after and is given version v3. Not surprisingly, since version order didn’t matter above, it won’t matter here.

The one new it is we have to decide how to handle the race condition: we’re ingesting new data at the same time that we’re replacing data. The safest assumption is to assume that replace means, “add all data in the working set current at the time replacement starts.” If we do that, we completely separate the replacement and ingest tasks: the concurrent ingest hasn’t yet produced any segments, so they are not in the replacement task’s working set.

If we make this move, then the cases here can be described just by reversing the labels “v2” and “v3” above, and we end up with exactly the same result as before. This is very reassuring!

Segment Rewrites

Let’s now consider the case of replacing a single segment file concurrently with ingest. Before, we ran into problems because that one file needs a version both newer than, and consistent with, the existing version. Here we’ll find it “just works.”

The setup is that we have two segments for the same time interval and we want to replace s1:

|--------------------| Segment s1, Version v1
|--------------------| Segment s2, Version v1

In our new notation:

v1: (+s1, +s2) [s1, s2]

Now, let’s replace s1 with s1':

v2: (-s1, +s1') [s1', s2]
v1: (+s1, +s2) [s1, s2]

That was easy.

Discussion

We could repeat the analysis for the above cases. We can add replacement grains smaller than the segment grain, as done above. In every case, it should be clear that we will get the correct, consistent result. (We leave this verification as an exercise for the reader.)

The result works because we carefully avoided creating versions with conflicting actions. We were careful to do so because…there are no conflicting actions!

  • No insert can ever conflict with another insert: they are about different files.
  • No tombstone can ever conflict with another tombstone: they just combine.
  • No tombstone can conflict with an insert because no file is ever added and deleted in the same version.

There are certainly ordering considerations: we saw that above when deciding which data that a replacement covers. But, there are no inherent ambiguities or conflicts. (Ambiguity is wonderful in politics, not quite as helpful in software.)

Backward Compatibility

The above is wonderful, but what about the many existing Druid systems? How could those systems be upgraded to this new one? First, we note that the new system is file based, and the existing table certainly has files. Second, we notice we need an initial version, which we can directly read off of the timeline.

Thus, to convert, we build a timeline from all the existing segments. That timeline becomes a very large initial version. After that, we just apply the new system.

Segment Size Optimization

We often hear many discussions around optimal segment grain. Bigger, to reach the 5M row target size? Smaller, to allow a shorter time to compaction. The file-version design offers another answer: whatever.

Since the file interval is not an essential part of the versioning system, it can be anything. Pinch off a file every five minutes, or when it reaches 5m rows. No need to split an incoming stream of records into strict intervals. There are optimization reasons to choose one size or another, but those choices are not constrained by a rigid segment interval.

Compaction works best if data is aligned as we can replace an entire set of files. But, if data is not aligned, it is merely a nuisance: we compact some range of the data and create a tombstone for that range: the remaining data remains valid and can be compacted at some other time. Again, there are strategies that are more optimal than others, but all strategies “just work” from a data validity perspective.

Roll-out

The fly in the ointment is the cost of such a change: it would be large. Many parts of the system must change. Fortunately, we have another project that is changing all those other parts: MSQ. So, the best way to roll out this new system is to define it as an option in Talaria as we build out the Talaria stack. MSQ-for-ingest creates the version history, MSQ-for-query builds a timeline from the version working sets.

Since MSQ works with cold storage, and caches segments (rather than use the heavy-weight Historical load mechanism), switching versions is trivial: move the version pointer to a newer one and queries will immediately target the new files, loading them if not yet cached. Old files, no longer used, will gracefully expire from the cache.

Short-Term Fixes

Clearly we are not in a position to replace Druid’s existing version system. We can keep the above solutions in mind for some future time when we have the luxury of such brain surgery. For now, we have to make do with what we have.

The gist of the problem is that the version system is not robust under concurrency. So, the simple solution is: don’t allow concurrency. This is, in fact, the current solution with the compaction back-off interval.

Since concurrency is data-driven (we don’t know what intervals ingestion will touch until we see the data), we have to prioritize what to kill when conflicts occur. Very likely, ingest must take precedence as we don’t want to lose customer data, or hurt real-time latency. Compaction must fail if an ingest job writes to the interval under compaction.

This means we must accept that the failure rate of compaction driven entirely by the customer’s rate of late-arriving data. The larger we make the compaction back-off, the less sensitive we are to late-arriving data. But, we do compaction to increase performance, so a larger back-off period hurts performance.

Thus, the failure rate of compaction must increase as we seek to improve query performance by reducing the back-off period.

That is:

  • Late arriving data → longer backoff → lower performance
  • Higher performance → shorter backoff → more compaction failures

There seems no other short-term solution.

Clone this wiki locally