Skip to content

Segment Management Brainstorming

Paul Rogers edited this page Apr 5, 2023 · 1 revision

Druid’s segment management system has worked well for many years, but has some inherent limitations based on Druid’s particular history. This paper examines those limitations and brainstorms a revised system which could address those limitations. This is purely a brainstorming exercise: we have no roadmap feature that envisions this work. However, as we move toward Talaria, and to supporting a wider range of operations, our current solution may come under further strain and we may find ourselves looking for a solution. This brainstorming effort anticipates that challenge.

Review

Druid’s current segment management system (including metadata, segment storage intervals (chunks), overshadowing, versions, handoff, etc.) is optimized for two use cases:

  • Appending new data
  • Replacing segment-grain intervals of data.

See Segment locking for an existing proposal in this space which provides an excellent background for the issues discussed here.

The Tombstone effort handles the case in which the replacement data set is empty.

Druid uses time chunk (storage interval) locks to coordinate ingestion and compaction. However, at scale, the present design has awkward constraints which encouraged the idea of segment-level locking (reference needed), but that proposal appears to have multiple issues.

The validity of a Druid query currently depends on an implementation detail: whether a segment is loaded on a historical or not. This leads to a number of data integrity bugs. While the Druid community has learned to live with such issues, they are a hard sell to the general database community where data integrity is a non-negotiable fundamental requirement.

Requirements

Here we identify an ideal set of requirements that Druid should meet. These come from knowledge of how other distributed systems work.

  • Append operations should never conflict with any operation except with a replace operation that has an overlapping time interval.
  • Compaction should never conflict with an append operation: compaction should work with the segments (files) already published (written to cold storage and to segment metadata.)
  • Time chunk replacement mist continue to be supported, and should be compatible with all other supported operations (appends, compaction, schema revisions.)
  • Queries must return correct results or fail. The user can be provided with an option to request best-effort results. But, if accurate results are needed, then Druid must provide them, or fail the query.
  • The solution must work for the classic “Historical”-based model as well as the new Talaria model that allows either queries from cold-storage, or the “Snowflake” model in which segments are read from cache if available, else from cold storage.
  • There must be an upgrade path from the present system to the new system. A further requirement would be a downgrade from the new system back to the old one, though this may be a very costly requirement to satisfy. (We may find it less expensive to ensure that the new system works so that the user does not find that they must downgrade to avoid bugs.)

Possible Revised Design

The fundamental idea is that storage is based on segments of any duration, and tombstones.

Core Concepts

Let us start with some abstract concepts.

Storage Fundamentals

The industry has evolved two major ways to handle updates in a database:

  • Update in place (the traditional RBBMS solution). Typically locking and transactions ensure that data is consistent: that the user’s view of data moves from one consistent state to another. Because the updates is in-place, data can be lost if an operation fails. To work around this, Transaction logs hold both a '“before” and “after” copy of the data to allow undoing a failed operation (rollback) or, to redo an operation that succeeded, but was not written to disk (roll-forward).
  • Log-based updates (HBase, Kafka, etc.) In a distributed systems, locks are impractical: we can’t afford to halt work on 1000 nodes, say, for each update. Also, such locks are subject to time delays, network partitions and all manner of other problems. Instead of updating data in place, data is added to a log. The last log entry wins. For example:
1.    Fred (balance=12.34)
2.    Wilma (balance=987.65)
3.    Fred (balance=20.89)

In the above, at time 1, we create a record for Fred and record his balance as 12.34. Then, at time 2, we create a record for Wilma, with her balance. At time 3, Fred gets paid and we “update” his balance with the new value.

Queries work by scanning forward through the log to find the last value for each record (which is entry 2 for Wilma, entry 3 for Fred.) Tombstones are special records that says that, at this point, the record no longer exists, consider it deleted.

The long structure has many benefits, the main one is that once an entry is written, it is immutable. This is the key design element behind both HBase and Kafka. Some applications that use Kafka record only the fields which have changed, not the full record. To reconstruct the full record, one must replay all changes from the beginning of time (or from a previous checkpoint with all the data, a bit like a key frame in a video.)

The Druid Hybrid

One of Druid’s key innovations is a hybrid structure that combines the best of the above two models. The in-place model is great for query performance: data resides in one place, typically pointed to by an index. The log model tends to require a scan of the log to locate the record, which is slow.

On the other hand, the log model handles atomic updates without the complexity of global locking or a transaction log. (The DB storage is the transaction log.) Updates are just append operations.

Druid’s segment and versioning system creates a hybrid. Segments act as the units of appending. Rather than appending a record to a file, Druid appends a file to the file system. Replacements are handled only at the segment level, and consist of a new version, which is like a new record in the log.

This can be hard to see, so here is an example:

        Log for log-based Store                          Druid
 | 12:24, "bob", 10               |   [ 12:24, "bob", 10   ]    Segment Ver. 1
 | 12:25, "alice", 20             |   [ 12:25, "alice", 20 ]
 | ...
 | 16:45 "bob", 100               |   [ 16:45 "bob", 100   ]    Segmetn Ver. 2
 | ...                            |

The log-based store puts all values into a single log, as in a single Kafka topic. Druid takes a block of such records and places them in a segment. When new data overwrites existing data, a new segment, for the same time chunk, has a new version and thus “overshadows” the prior segment.

The segment interval allows this operation to proceed independently for different chunks of data. We can replace the data for yesterday independently of (concurrently with) replacing data from last Friday.

The challenge for Druid is two-fold:

  • The segment grain is needed to keep update operations independent, but is critical for storage, thus conflating update grain with storage grain.
  • Updates can happen only at the storage grain. If storage is at a day grain, then updates can only happen a day at a time.
  • Only the entire segment grain (the bundle of segments at a version) can be replaced, there is no way to replace a single segment within a versioned bundle (as we might want to do for SaaS schema editing.)

Revised Hybrid

The gist of this proposal is to keep the core idea behind the Druid model, but to remove many of the restrictions. To summarize:

  • The segment model remains, but segments are not bound to segment intervals.
  • Versioning remains, but the definition is generalized to handle more kinds of updates.
  • Time partitioning becomes purely a performance optimization, and has no other impact on updates or queries.
  • The above description is very general. Sections below spell out the proposed details.

Segments

The segment file is another of Druid’s key innovations: we wish to preserve its core concepts such as dictionary encoding, indexes, etc. What we do wish to change is the rigid placement of segments within segment intervals.

Arbitrary Segment Intervals

In current Druid, a segment must be aligned to the storage grain. Druid ingestion and compaction work to ensure that data fits within that grain, and to shuffle data to ensure optimal packing. The user must pick the storage grain when creating the table when they have little knowledge of the optimal value.

As it turns out, there is no theoretical reason to require a strict storage grain. HDFS-based systems used such a grain to create directory structures, but that structure is not needed in an S3-based system.

We instead proposed that a segment can hold data for any interval, and so the segment key is (data-source, interval), with no restrictions on the interval (other that it be positive.)

Segment Size

Segment size is a key performance factor. The general rule-of-thumb is to keep segments to around 5 million rows. Since, in current Druid, the segment must also fit within a fixed segment interval length, we ask the user to compute an interval that happens to produce optimally-sized segments. This is hard! Data arrives at various rates: an interval optimal for weekdays may be sub-optimal for weekends. Data size grows over time: a segment size that works well with today’s data rate may be totally inadequate once app adoption grows ten-fold.

Thus, we seek to separate segment interval from segment size. Segments are simply filled until they hit the target row count (5 million, say), then a new one started.

Simpler Partitioning

Since data need not be partitioned by time, it is easier to write data into the segments. Ingestion is simply an exercise in coping data from the input source into the segment file format. The segment interval is simply (earliest data, latest data) (inclusive). This may lead to ragged segments. But, if the incoming data is ragged, then it is better to combine that data into one optimally-sized, but odd interval segment rather than many even intervals of sub-optimal size.

Note that, in this scheme, late-arriving data is only a nuisance: it increases the segment interval, and causes queries to hit the interval unnecessarily. This problem will be fixed during compaction, so can be accepted during ingestion.

A benefit of this approach is that an ingestion job simply writes segment(s) until a file reaches some target size, then switches to another. There is no need to shuffle data to collect like time partitions. The work to cluster data by cluster-key (formerly known as the partition-key) is simple: just write the data to the corresponding segment.

Late arriving data is no problem, other than the interval for a segment might be larger than average.

The result is that ingestion produces a stream of segments with the key (table, interval), with the intervals reflecting the data as it arrives. Data can be streamed to such segments, there is no need to buffer all the data before writing.

Clustering

Druid performs primary partitioning on date, and secondary clustering on other keys. Under this scheme, clustering becomes the primary physical partitioning mechanism. The interval of segment depends on the actual ingested data, but clustering determines the number of such segments created.

During ingestion, hash clustering can create n segments of roughly even size. Hash clustering allows us to locate the segment for data given the key.

With clustering, we can split data by the partition key, so that the segment key becomes (data-source, interval, partition-key), for some definition of the partition key. (The non-time partition key is called the “cluster key” in MSQ.)

It may be better, however, to omit clustering on ingestion and instead perform clustering on compaction.

Data Coordinates

When querying we want to locate the target data as precisely as possible. General “Big Data” query engines are able to query a large set of files to find the rows of interest. Since that is expensive, most tools provide some kind of partitioning for the Hadoop example. With partitioning, we use attributes of the data to prune the search path. Partitioning is a variation on indexed lookups in an RDBMS. Sophisticated systems such as Druid (or Impala with Parquet) use multiple layers of pruning. Here, we consider only file-level pruning.

The most general form of partitioning is to divide data up into non-overlapping buckets. If the query contains the correct filters (WHERE clause), then we can prune all buckets that we know cannot contain data. Druid does this today with segment partitioning: each segment contains data for a specific time range.

To generalize, we introduce the concept of data coordinates. We think of data as spread across an n-dimensional space (the buckets). Each bucket is identified with its coordinates: (k1, k2, k3, ...). Data is physically split into buckets using these coordinates. Time is special: it is a known dimension with known ordering. Given a set of segments with an identified data interval, it is straight-forward to identify which contain a given timestamp, and which do not.

Time is the only column common to all Druid tables, all other attributes are unique to the application. We need a way to unify these possible keys. One way to do that is with hashing: no matter what the key, it can be hashed into a fixed-size number. From there, we simply decide on the number of divisions within that dimension and, with a bit of simple math, we have a uniform distribution of keys across buckets.

When coming from Hadoop (or an RDBMS B-tree index) it is tempting to think of coordinates as ordered: we need to know key1 to look for key2, etc. However, we propose independent keys: each can be consulted independent of the others, which is why they are called dimensions. We can find all the “Chicago” records in combination with, or independent of, the “2022-01” records.

Primary Keys

In an RBDMS, a primary key is a combination of columns which uniquely identifies a record. In or toy log example above, the name (Fred, Wilma) was the primary key. Any given table has its own key.

There may be cases where data has no primary key. For example, if our data is simply a series of timestamps at which the user pressed the mouse, say, then there is no way to differentiate between two clicks in the same second (if our timestamp is at the second grain.) Such cases should be rare, however, because there is no way to replace or audit data: duplicated data looks just like valid data.

Query Timeline (Take 1)

In the above design, the query timeline is generalized from the current implementation. It is no longer just overlays (for overshadowing) which have odd length: all segments have odd length. Using the “laser” analogy, the timeline looks like this:

  1   2     3         4     5    6      
  V   V     V         V     V    V      "Lasers"
  :   :     :         :     :    :
  :   :|----*---------*-----*---|:      Segment 3
  :   :  |--*---------*---| :    :      Segment 2
  : |-*-----*----|    :     :    :      Segment 1
 |----------|----------|----------|     Data timestamps
10:00      11:00      12:00      13:00

There is no overshadowing here, all segments are visible. Think of them as being transparent to the laser so the laser can strike multiple segments. Each “strike” is shown by the “*” symbol.

Thus we see that for positions 1 and 6, the lasers strike no segments. For the others, the lasers strike one two or all three segments. The logic here is basically the same as that used for overlays within the current Druid time chunks: we simply omit the time chunk boundary restrictions.

Insert

It should be clear from the above that insert operations “just work”: add another segment and it immediately (upon being published) is reflected in the timeline and thus query planning. Since nothing in an append conflicts with the existing segments, no locking is needed across append operations.

Replacement

We now encounter the first challenge: replacing a time-slice of data. We will start, however, with a more basic operation: updating a single record.

Single-Record Updates

Single-record updates are the bread-and-butter of RDBMSs:

UPDATE accounts
  SET balance = 20.89
  WHERE name = 'Fred'

The above has two important parts:

  • A condition (typically a primary key for a single-record update) that says which record to update.
  • The new data to write into the record.

How might we perform this operation in Druid where segments are immutable? We’d again refer to the analogy with log-based storage: we’d create a segment with the new values and somehow tell the query engine to use that new segment in place of the old one. (By analogy, we’d place the segment later in the log than the one with the original data.)

Doing this is quite hard. Hive has a mechanism which does joins from the base table with an update table. Basically:

  • Read the original data.
  • Join with the list of updated records.
  • If the value is in the update list, use the updated value, else use the original value.

This is challenging in practice:

  • Druid doesn’t support joins (very well)
  • A single update requires we do the join for all records, to see if they happen to be the updated one.

This is only possible if we know the primary key of the record. The primary key can’t be included in the query (where it would be redundant, hard to state, and error-prone), it must be part of the metadata about the table.

Our data coordinate idea helps: it prunes the search space. If we know that, say only (10, 20) (using hash values) has changes, then we can skip the joins for other coordinates, say (11, 20) or (10, 21).

In this way, a system that never does updates will never pay the cost. A system that must do updates must pay the cost only for those coordinates in which updates occurred.

Compaction can remove the updates, merging the values into a new, updated segment. So the extra join cost would be transitory.

Record Deletions

We can extend the above system for deletions (as, for example, from a GPDR request). In this case, rather that writing a new record, we write a tombstone: a marker that says that the record was deleted. The join above becomes an anti-join:

If the value is in the update list, drop the record from the result set, else use the original value.

Versioned Chunks

Druid’s classic use case is to replace not a single record, but an entire time-slice of data. This can be done by introducing two new concepts:

  • Version: a collection of segments (and, later, tombstones) which all contain data to be queried as shown in the illustration above.
  • Interval tombstone: a marker (like the Wite-out of old used to correct typewriter mistakes) that “erases” any data from a previous version.

Again, by analogy with log-based stores, the tombstone draws a line under a set of entries and says, “RESET! Ignore all data above this point!”. The new data then follows the tombstone.

Since we don’t want to delete all data, we need a bit more nuance. We want the tombstone to apply to just one “chunk” of data: one coordinate in our partition space. In classic Druid, this could only be a segment interval. Here we propose it can be anything: a time interval along with other partition keys. This lets us, say, replace yesterday’s data, but only for client 1234.

In this model, a replacement is logically a delete, followed by an insert, just as in SQL:

START TRANSACTION;
DELETE FROM table WHERE <replace interval>;
INSERT INTO table AS SELECT ...;
COMMIT TRANSACTION;

The interval tombstone marks a time-slice of data as being logically deleted and corresponds to the DELETE statement above.. The replacement operation then optionally adds new data to replace the removed data and corresponds to the INSERT statement above. (A "drop" operation for an interval is simply a replacement with no new data.)

The version number allow us to stack the segments and tombstones in the correct order for our “laser”:

  1   2     3         4     5    6      
  V   V     V         V     V    V      "Lasers"
  :   :     :         :     :    :      Ver.
  :   :     :   |-----*-|   :    :      3. New data
  :   :  |xxxxxxxxxxxxxxxxxxx|   :      2. Tombstone for the replacement
  :   :|------------------------|:      1. Segment 3
  :   :  |----------------|      :      1. Segment 2
  : |-*----------|               :      1. Segment 1
 |----------|----------|----------|     Data timestamps
10:00      11:00      12:00      13:00

Here we start with the same data as before, marked as version 1. We replace data, starting with the tombstone, at version 2, for the interval to be replaced. Then, at version 3, we add new data, which does not fill the entire replacement range.

Lasers 1 and 6 are still past the valid data ranges. Laser 2 still strikes the version 1 data. However, lasers 2 and 5 strike the tombstone and thus find no data. Laser 4 strikes the replacement data.

Now, stretch your imagination and consider a checkerboard of partitions, with the above logic applying in each cell, each “coordinate”. This is what allows us to replace one cell while not affecting others.

Unifying the Two Cases

The two cases (single-record update and interval update) look different, but they can be unified somewhat:

Both cases trigger a version change:

  • The tombstone is at a newer version than the data it hides, and the replacement data is at a newer version than the tombstone.
  • The update/delete records are at a newer version than the data it changes, and subsequent data is at a newer version than the update/delete records.

Both cases involve “masking”:

  • The tombstone masks an entire cell in the partitioning space.
  • The per-record updates mask a single record.

Replace/Insert Coordination

To make this work, replacements must block inserts and visa-versa. Thus, we need interval locking: no two replacement operations can operate on overlapping intervals: if they did, there would be no way to tell which replacement takes precedence during that overlap. Should their data be combined, or does one replace the other?

Further, a replacement conflicts with insertion. Should the insertion occur “over” or “under” the replacement?

The interval for a replace operation is clear: it must be stated in the operation itself. However, the interval for an insert is unknown until the data itself is observed. Thus, we don’t know ahead of time, which interval to lock for an insert.

The naive approach would be to guess the insert interval, and obtain an exclusive lock on that interval. However this introduces the very problem we wish to avoid: inserts would conflict.

We could introduce lock types (similar to read/write locks). Insert locks are compatible, but replace locks are not compatible with other replace locks or insert locks. This works, but we are still left with the problem of not knowing which time period to lock for ingest. What if we guess incorrectly? Should we fail the entire insert if one outlier row conflict with a replace operation?

A possible nuanced solution is to impose an ordering on the insert and replace operations to avoid the need for locking to coordinate across these operations. Then, employ only exclusive replace locks, with inserts taking no locks. Publish all the inserted data. At publish time, compare the start times of the ingest operation with any replace operation that started (completed?) after the insert started. Then, for each interval that overlaps a replace operation:

  • If the replace started after the ingest, publish a tombstone for the overlapping interval.
  • If the replace started before the ingest, do nothing.

The logic here becomes a bit tricky because we are considering two timelines: the order of operations and the timeline of the data. Here is an example:

        |RRRRRR|                                      |------|           Replace 2
                                                      |xxxxxx|
      |IIIIIIII|                              |------------------|       Insert
  |RRRRRRR|                                |-----|                       Replace 1
                                          |xxxxxxxx|                     
                                        |---------------------|          Original
|-----------:----------:----------|   |----------:----------:----------|
    Wall-clock (operation) time                   Data time

Here, we start with an original set of segments, then we have three operations:

  • Replace 1, which starts before the insert, and replaces a in interval that overlaps with the insert. The “x” tombstone shows the replaced region, with a smaller interval of replacement data.
  • The insert, which starts after replace 1, and so we assume that the new data adds to that from Replace 1.
  • Replace 2, which starts after the insert, and so we assume that its data replaces the existing data, and the new data in the insert. The tombstone inserted by Replace 2, with a newer version, ensures that our “laser” is blocked from seeing the replaced data in either the original or inserted data. Replace 2 overlaps in execution time with Replace 1, but because the data intervals do not overlap, the two replacements can proceed in parallel with no conflict.

Everything said for replace also works for a drop operation, since a drop is a replace with no new data.

Table-level Versions

Let us now examine the concept of “version” discussed above. What does it mean to have the various versions? The above versions are table-level versions: they refer to collections of segments and tombstones. As such, table-level versions are not attributes of segments (or tombstones): they are groupings of these objects. Thus, table-level versions exist in metadata, and are not part of the segment identifier. This means that the segment identifier is (data-source, interval). Table-level versions are a table:

Table Version Components
1 Original data (a list of segments)
2 Tombstone for Replace 1
3 New data for Replace 1
New data for the Insert
4 Tombstone for Replace 2
5 New data for Replace 2
Any additional inserted data until the next replace

In fact, if we look carefully, we find that a table version occurs only at a replace (or drop). We can this define a table version as a set of tombstones and a set of data segments:

Table Version Tombstones Insertions
1 -- Original data
2 From Replace 1 Replace 1
Insert
3 From Replace 2 Replace 2
Subsequent inserts

The above definition of versions neatly captures the idea that all inserted data “goes together” up until a drop/replace occurs, at which point some interval of data reaches its end-of-life. Since we cannot physically delete that data (at scale, or atomically), we instead use a logical deletion: a tombstone. The version then clearly identifies the insertions which occurred before the deletion (and should thus be considered deleted), and those that came after (and thus are visible to queries.)

While the table-level version could be part of the segment identifier (that is, file name), they need not be as metadata can capture the essential information.

Segment Identity

We’ve been casual about segment identity, the time has come to formalize that identity. See the documentation for the current scheme. We know that segments are always for a single data source, so we start with (data-source). We know that segments cover some time interval, so we add that: (data-source, interval).

Here we should observe that, there can be multiple segments with the same basic identifier so we need some sequence number to differentiate two separate segments for the same interval. Let’s assume we use a simple sequence number: (data-source, interval, sequence-no).

We require that the above be globally unique so that file names in cold storage are unique. (If stored in a directory-based system such as HDFS, files can be partitioned by their interval start time.)

Note that, unlike the present Druid, the table version number need not appear in the file name as it adds no new data. This is actually a benefit.

Some file systems (such as S3) do not support rename operations: the object name must be known up front. By excluding the table version from the name, we can write the file without knowing that version, which may only be known at segment publish time.

Extreme Naming

Note that, since segments are registered in metadata, the file name does not actually matter at all. We could use a UUID and get exactly the same behavior. With a UUID, table renaming is trivial: it is entirely a metadata operation: the data source name does not appear in the segment file name.

The one drawback is if the metadata is lost, it cannot be recreated from the file names. However, segments do contain metadata, so the metadata could be recreated from that metadata rather than the file name.

In this case, the segment identifier, in cold storage is just (uuid), with the table version table holding a list of UUIDs (and tombstones) within that version.

Segment-Level Operations

There are times when we wish to rewrite a single segment. For example, if the user changes the schema under SaaS (in a way that does not affect partitioning or roll-up), we must rewrite segments to match the new schema. In particular, we may wish to remove columns, or change the column type. (Adding a column is a no-op: Druid does not store any data for columns that have all-null values.)

Such operations can be done one segment at a time:

segment x -> rewrite -> new segment x

Let us assume that the operation preserves all the rows in segment x. (However, the discussion works just as well if we deleted some rows, say to comply with a GPDR request.)

We need a way to say that the new segment replaces the old one, and to do that in a way that does not conflict with a replace operation. (If the old segment was partially overshadowed, then so is the new one. Though, an optimization would be to remove overshadowed rows.)

To accomplish this, we need a per-segment version number which is distinct from the table version number used in the above discussions. That is, each segment is identified by (data-source, interval, sequence-no, segment-version). Or, if we use UUIDs: (uuid, segment-version). The UUID uniquely identifies the segment, versions identify versions (copies) of that segment.

We should note that, if deletion occurs, we could end up with an empty segment. In this case, we need a segment tombstone: something that says that the segment itself has reached end-of-life. Since we do not want to create an empty file just to say that the segment is empty, we need a metadata solution.

That solution might be a logical segment table:

Segment ID Version Contents
(ds, interval, seq. no) or (UUID) 1 UUID-1.extn
2 UUID-2.extn
3 Tombstone

The semantics would be:

  • For each segment, queries work with the latest version. (In the above case, in the latest version, the segment does not exist.)
  • Previous versions can be pruned: the cold storage file removed and the metadata record removed.
  • If the only entry for a segment is a tombstone, then the entire segment entry can be removed.

Re-Indexing (AKA Compaction)

The proposed segment system can result in inefficient segments: data concentrated in one interval, but with outliers in perhaps distant times. Such a layout slows query performance, so we will want to rewrite (re-index, compact) the data.

Let us observe that any rewrite task is a transformation of a defined set of segments:

(segments in) -> transform -> (segments out)

Unlike the per-segment operations, here the input segments reach end-of life, and are replaced with a different set of output segments: with different intervals, different partition keys, etc. Thus, we cannot use segment versions since the new segments don’t share an identity with the old segments.

However, we can build on segment versions.

  • For each input segment, create a new segment version which is a tombstone.
  • For each output segment, create a new segment, at version 1, at the same table version as the input segment with the highest segment version.

The above works as long as the transform honors all overlays (tombstones) with versions newer than each input segment. Said another way, for each input segment:

  • Use the query timeline to compute the interval(s) (if any) which are overshadowed.
  • Logically delete those from consideration. (Do not read those rows.)

With the above, a compaction operation can run concurrently with:

  • Insert, because insert creates new segments not part of the input set.
  • Replacement, because replacement will create a tombstone that overshadows any now-doomed rows that the compaction operation may produce.

Compaction on a disjoint set of input segments.

Compaction is not compatible with another compaction operation with an overlapping set of input segments, for the obvious reason that the same rows would appear in two sets of output segments, which would corrupt the table data.

Query Timeline (Take 2)

We can now refine the query timeline concept presented above. To build the timeline:

  • Create a stack of versions that has two layers: the tombstones at the bottom, and the insertions above that.
  • For each segment within each layer, consider only the latest segment version. If the latest version is a tombstone, then omit the segment.

Implications

With the above (rough) sketch in mind, we can now consider how such a design might benefit the SaaS product.

Streaming Ingestion

The documentation describes how Streaming Ingestion works today, but not in detail. The details are that Kafka streaming launches a wave of topic ingestion tasks, runs for some period of time, then ends the first wave, and starts a second. The first wave publishes its segments, and participates in segment handoff concurrently with the second wave.

The key facts to note are:

  • Kafka topics, which are independent, are processed together, causing unnecessary synchronization.
  • Since waves are synchronized, there is a period of time in which 2n resources are used for n ingestion tasks.

The proposed design opens the door to a simpler design:

  • Each Kafka topic proceeds independently.
  • A Kafka topic ingestion can run forever, or can stop when its output segments reach some target size, or some amount of time has lapsed.
  • Since topic ingestion is asynchronous, we need reserve only a few extra slots for the publish/handoff work. Chances are, random fluctuations will mean that each ingestion task finishes at a distinct time.

Under MSQ, we can separate the publish/handoff operation from the ingestion, but that is outside the scope of this note.

Schema Editing

We have the option to rewrite data one segment at a time, for non-structural changes. We can scale the operation up or down easily, since each segment is independent of others.

MSQ-based Cold Tier

The cold tier reads data directly from deep storage. As a result, the cold tier requires a different timeline operation than that used by Historicals. Though not directly affected by the above design, the cold tier provides an opportunity and a motivation to adopt an alternative segment storage system under the MSQ umbrella.

Clone this wiki locally