Skip to content

A Git Inspired, Minimal Lock Solution to Concurrent Ingestion and Compaction

Paul Rogers edited this page Apr 6, 2023 · 1 revision

Druid performs four main tasks on the ingestion side:

  • Add new data
  • Replace existing data
  • Compact existing data (a form of replacement)
  • Delete data

Proposed is a Git-inspired versioning approach which minimizing locking.

Druid divides data into time chunks, each of which contains zero or more segments. Druid versions time chunks. This proposal builds on that idea, but in a Git-inspired way.

Today in Druid, a segment is physically assigned to a single version by placing the version number in the name. In Git, any single file version can be associated with one or more commits (project versions). We borrow this same idea for Druid.

In the proposal, a Druid segment exists across multiple versions until the segment is removed. Again, drawing from Git, there are only two version operations on a segment: add and remove. A segment is added to a version when created, exists in all subsequent versions, and is removed at some later version, after which it no longer appears in any subsequent version.

The only locking required in this model is a simple exclusive time-chunk lock to prevent concurrent compaction tasks or compactions concurrent with replacements. (As noted above, compaction is replacement, so the rule simplifies to preventing concurrent replacement tasks on any given time chunk.)

Both insertions and deletions define new versions. Historicals operate on versions: publishing moves the system from one version to the next.

The current system requires tombstones to handle the replace-with-no-data case. The proposed solution does not require tombstones nor does it need the concept of overshadowing: the versioning mechanism handles these operations.

Background

A far more modest solution is in the works. In the spirit of Apache, this ticket should move to be a public Apache issue. The ticket proposes a solution based on minor changes to the implementation code. However, in discussions, it was discovered that the change, as described is incomplete.

The key issue is the scenario in which a task fails during a concurrent ingestion and compaction. In Druid, segments are physically associated with a version (the version number is in the segment name). But, in one scenario, the version of a segment must change after the fact. The result complicates the design, and introduces complexity in the process of loading data on historicals. See the design review recording for details. The design proposed here does not suffer from these issues.

Since our primary goal is execution, we might wait to discover these issues as we roll out code. Or, we could work out the issues up front and select a design with minimum overall cost. The purpose of this writeup is to present options.

Scenarios

To make the proposal clear, let us walk through a number of scenarios.

Concurrent Ingestion

Suppose we have no segments in time chunk T, and we run two ingestion tasks, I1 and I2, each of which create segments in time chunk T. At a high level:

  • I1 completes and creates version 1 which includes segments S1 and S2.
  • I2 completes and creates version 2, which includes all segments from version 1 plus S3 and S4.

On the Coordinator/Broker/Historical side:

  • When version 1 is published, the coordinator tells historicals to load the segments for V1.
  • Once the V1 segments are loaded, the coordinator tells the Broker to run queries against V1. That is, to include segments S1 and S2 in queries.
  • When version 2 is published, the coordinator tells historicals to load the segments for V2.
  • While historicals load V2, they continue to serve V1.
  • When segments S3 and S4 are loaded, the Coordinator tells the Broker to run queries against V2. That is, to include segments S1 through S4 in queries.
  • Once all Brokers report that they are using V1, the Coordinator tells Historicals to unload any dropped segments. Since there are none, this phase trivially completes instantly.

Notice, above, that queries always run against a consistent version. There is no case in which a query will include a partial set of segments. Queries can be tagged, in the metric system with the version against which the ran. This version consistency can be extended to provide atomic rollout of lookups.

Concurrent Ingestion and Compaction

Suppose that, after V2, we run a compaction task, while a third ingestion I3, continues to create segments for our time chunk T. Task start order does not matter as we’ll see below. Suppose tasks start in this order:

  • I3 starts and builds segments S4 and S5. The system remains at version 2 since I3 has not published yet.
  • Compaction C1 starts at version 2. C places a replace (exclusive) lock on segment T and begins to compact Segments S1 through S4, to create segment S5.
  • If another compaction, say C2 were to try to compact T, it would fail due to the replace lock on T.

Task completion order does matter. We have to consider two different completion orders. First, assume that I3 completes first.

  • I3 completes, publishes S5 and S6, creating version 3.
  • C1 completes, and creates a new version 4. This version:
    • Drops segments S1 through S4
    • Carries forward all other V3 segments (S5 and S6)
    • Adds segment S7

On the Coordinator/Broker/Historical side:

  • The Coordinator sees version 3.
    • The Coordinator load V3 segments on Historicals, which means loading S5 an S6.
    • Once loaded, the Coordinator tells the Broker to plan new queries against version 3. That is, to include S1-S6.
  • The Coordinator sees version 4.
    • The Coordinator publishes segments added in V4 which is S7.
    • The Coordinator tells the Broker to plan new queries against version 4. That is, to include S5-S7.
    • The Coordinator waits for al Brokers to confirm the move to version 4.
    • The Coordinator then unloads segments not included in version 4. That is S1-S4.

Now, consider the opposite order of completion.

  • C1 completes and creates a new version 3. This version:
    • Drops segments S1 though S4.
    • Carries forward no other segments
    • Adds segment S5
  • I3 completes, publishes S6 and S7, creating version 4.

The Coordinator side:

  • The Coordinator sees version 3.
    • The Coordinator loads V3 segments on Historicals, which means S5.
    • The Coordinator tells the Broker to plan new queries against version 3, which is just S5.
    • The Coordinator waits for al Brokers to confirm the move to version 3.
    • The Coordinator then unloads segments not included in version 3. That is S1-S4.
  • The Coordinator sees version 4.
    • The Coordinator loads V4 segments on Historicals, which means S6 and S7.
    • The Coordinator tells the Broker to plan new queries against version 4, which is S4-S7.
    • The Coordinator waits for al Brokers to confirm the move to version 4.
    • The Coordinator then unloads segments not included in version 4. There are none, so this step is trivial.

Note that, regardless of order, version 4 contains same three (S5, S6, S7) (though the names shift in the two scenarios above.)

Concurrent Replacement and Ingestion

For our purposes, replacement is virtually the same as compaction: both remove existing segments and add new ones. The only difference is the source of the data. Replacement uses external data while compaction uses existing data. Since the source of data is not relevant to the versioning scheme, replacement works identically to compaction.

One nuance of replacement is that the new data set can be empty, which results in a tombstone segment in the current system. In the proposed system, no tombstone is necessary. To see this, consider a replacement task R1 that overwrites V1 above with no new data. At completion:

  • R1 starts with the system at version 2 which ontains segments S1-S4.
  • R1 completes, creating version version 3. This version:
    • Drops segments S1-S4
    • Carries forward no new segments
    • Adds no new segments

Then, for the rest of the system:

  • The coordinator loads version 3 on historicals. Since version 3 is empty, the task trivially completes instantly.
  • The Coordinator tells the Broker to plan new queries against version 3, which includes no segments.
  • The Coordinator waits for al Brokers to confirm the move to version 3.
  • The Coordinator then unloads segments not included in version 3. That is S1-S4.

The result is that the versioning system trivially handles the case that requires the extra tombstone concept previously.

Versioning Structure

The above scenarios suggest a design for the data structures that underlie the revised versioning model. The system includes:

  • A Segment Table that lists all segment files. Unlike the existing segment table, the revised table includes no version information. A segment is just a file in deep storage, tracked by the segment table.
  • A Version Table that lists all versions and their included segments. (But see below for a nuanced structure.)
  • A time chunk lock table that identifies which task holds a replace (exclusive) lock on each time chunk.

The version table could naively hold the set of segments in that version. (That is, the materialized segment set.) However, we can learn from Git here: the segment table can hold just deltas. That is, every version is assumed to include all the segments from the previous version except for deltas. Delta records are either drops or adds.

Thus, for the scenario above, segment table records might be of the form (time chunk, version, action, segment):

(T, 1, ADD, S1)
(T, 1, ADD, S2)
(T, 2, ADD, S3)
(T, 3, ADD, S4)

Then, for the two concurrent ingest/replacement scenarios, we have either:

(T, 4, ADD, S5)
(T, 4, ADD, S6)
(T, 5, DROP, S1)
(T, 5, DROP, S2)
(T, 5, DROP, S3)
(T, 5, DROP, S5)
(T, 5, ADD, S7)

Or:

(T, 4, DROP, S1)
(T, 4, DROP, S2)
(T, 4, DROP, S3)
(T, 4, DROP, S5)
(T, 4, ADD, S6)
(T, 5, ADD, S7)
(T, 5, ADD, S8)

And, for the tombstone scenario:

(T, 3, DROP, S1)
(T, 3, DROP, S2)
(T, 3, DROP, S3)
(T, 3, DROP, S5)

In this scenario, we commute version V as:

  • Compute the set of segments in version V-1.
  • Add all ADD segments.
  • Drop all DROP segments.

If the number of versions is small, the delta-based approach is sufficient. If the number becomes large, we can define an additional “key frame” table: a table that materializes an entire version. Reconstructing a version is then like reconstructing a video:

  • Find the most recent key frame.
  • Reconstruct version V rolling forward from the key frame, rather than rolling forward from time 0.

Impact

The proposed system is a minor change to the way Druid works conceptually, but a major change to how the code is implemented. The proposal has impact in ingestion, compaction, the coordinator, the broker and possibly historicals. As a result, there may be a case for adjusting the current system to achieve roughly the same result but with a large dependence on locks. Only when locking becomes a burden might we consider changing to the propose mostly lock-free approach. As noted, the version approach also allows us to version lookups so we can provide atomic lookup updates.

Clone this wiki locally