Skip to content

Tombstone Design Notes

Paul Rogers edited this page Apr 5, 2023 · 1 revision

Druid did a project to fix a hole in our data replacement functionality using a feature we’ve termed “tombstones.” The technology around this topic is complex, and gets to the heart of how Druid handles segments, overshadowing, and data replacement. These notes summarize what we’ve learned while discussing the issue.

Introduction

Tombstones cut to the core of how Druid ingestion, segments, versioning and overshadowing work. So, to explain tombstones, we must explain basic terms and functionality. Tombstones, as currently scoped, fix a specific problem. However, as we’ll see, they can also fix a variety of other problems. MSQ is our new ingest and query solution. The discussion highlights where a design for Historicals can or cannot be used with MSQ.

References

Open Questions

The PR mentions partitioning. However, partitioning is not part of the segment key, so it is unclear how partitioning enters into the discussion.

Basic Concepts

Druid’s versioning system is complex and a bit unlike other systems. It is not strictly an MVCC system. This section explains the key terms and concepts. The terms used here have not yet been verified against the code: please leave a comment if the code uses a different term.

Data Sources (Tables)

Everything in Druid revolves around data sources (what SaaS, and other products, call “tables”.) This document assume we always operate on a single data source.

Intervals

The core concept in Druid’s data storage and versioning system is the interval: a specific period of time with an inclusive start and an exclusive end: (start, end]. The time can be in any time zone, however, for simplicity, in this paper we assume all intervals are in the UTC time zone so that no time zone conversion complexity is needed.

Time Grain

The time grain (or “time granularity” or just “grain”) is a conceptual interval of a the specified length such as one second, one minute, one day, etc. The grain is more than just a duration: it is assume to be aligned at the boundary where all the more precise values are zero. That is, a time grain is an arbitrary interval, (start, end], where start mod grain = 0 and end mod grain = 0. That is a 1 minute grain might extend from (12:01, 12:02].

Segments and the Segment Grain (AKA Segment Interval, Chunk Size)

Druid stores data in files called segments. Each file stores data for a single interval called a “chunk”. The size of that interval is the chunk size or (in more recent documentation) the segment grain. Data is partitioned by chunk for query efficiency. Each segment is identified a key consisting of the table and a data interval: (datasource, interval) (plus a version which we’ll discuss later.)

The segment grain is the size of an interval (day, hour). The segment interval is the specific interval for a segment (say 2000-01-01).

The simplest conceptual model is to think of a table as consisting of a series of segment intervals (chunks), each of which contains zero or more segment files as in this image from the documentation:

Image

The picture is a bit misleading. In fact, every segment spans the entire chunk like this:

                     |--- Partition 1 ----|
|--- Partition 0 ----|--- Partition 0 ----|--- Partition 0 ----|
|--------------------|--------------------|--------------------|----->
  Chunk 2000-01-01      Chunk 2000-01-02    Chunk 2000-01-03

This model is still too simple for Druid because there is no real concept of a table: a table is simply a collection of segments. So, there is no place to specify the segment grain at the table level. Instead, the segment grain is a property of an ingestion spec. This means that different segments within a table can have different segment grains. Now, most users are likely to use the same segment grain for all ingestions, but Druid does not require this consistency.

In fact, we’ll see how differing segment grains is a feature is used for partial updates. Thus, in classic Druid the idea of “the segment grain” is ill-defined: there can be multiple segment grains. For example, we might load data first at the day grain, then later load at the hour grain:

                      Partitions at hour grain
                        |--|--| ...    |--|
|--- Partition 0 ----|--- Partition 0 ----|--- Partition 0 ----|
|--------------------|--------------------|--------------------|----->
  Chunk 2000-01-01      Chunk 2000-01-02    Chunk 2000-01-03

Note also that where the above diagrams uses the term “partition”, the current discussion (and, it seems, Druid terminology) is to use the term “segment”. (Yes, this can all be very confusing.)

SaaS adds the concept of a table, and the segment grain is a property of the table. Presumably SaaS has a way to propagate that grain through the ingestion specs so that all ingestion jobs produce the same grain.

The ability to use multiple segment grains within a single table is important when we look at replacement actions: it allows us to replace one our, say, within a day:

        |---|                Hour-grain segment: (01-01T03:00, 01-01T04:00]
  |--------------------|     Day grain segment:  (01-01, 01-02]
01-01                01-02

In all these cases, the segments are aligned with the start and end of the interval. Every segment is of exactly one segment grain unit. (One hour, one day, etc.)

Segments immutable once written, a fact important to how Druid handles adding and replacing data.

Spans

There are times when we want to talk about intervals in relation to other intervals. In this case, we say that our interval of interest spans some set of base intervals. We just saw a case of this: the day-grain segment above spanned the entire storage interval. The hour segment spanned a subset of the storage interval.

Queries typically span many storage intervals, and may not be aligned with storage interval boundaries. Examples of spans:

10:00      11:00      12:00      13:00
  |----------|----------|----------|     Segment intervals (base intervals)
  |----------|                           Full Span aligned with base
  |-----|                                Sub-span left-aligned with base
        |----|                           Sub-span right-aligned with base
    |----|                               Arbitrary sub-span
  |---------------------|                Aligned span of two full base intervals
        |---------|                      Abribrary span of two base intervals
      |----------------------|           Most generic case: partial span of star
                                          and end base intervals, spans a full
                                          intermediate interval

Every span is of one of two kinds

  • An arbitrary sub-span
  • A span of two or more base segments

The two+ segment span is can be broken one or more of the following parts:

  • A right-aligned portion
  • A full span of any number of segments
  • A left-aligned portion

The ability to decompose an arbitrary span into parts is key to understanding multiple parts of Druid

Null Span

At times it is useful to describe span that includes none of the base span. We call this a null span. In the example above, the first line, which spanned just the first base interval, has a null span with the other two base intervals.

Data Storage Concepts

With the above components in mind, we can now outline Druid’s versioning mechanism.

Segment “Bundles”

We’ve already described a segment: one file that spans a single segment interval.

As we talk about versions, we need to think about groups of segments, which we’ll call a bundle here (if only because Druid doesn’t use that term for anything else, and Druid has no term for this concept.

Consider the diagram above with three days. The day 2001-01-02 has a bundle of two segments called “Partition 0” and “Partition 1”. Combined the set of files in the bundle contains all data that Druid has for that segment interval.

Bundles can arise from clustering (formerly called secondary partitioning): we might group our records by, say, the host on which the events occurred, or by client, or by some other key. Bundles also arise if a single partition reads more rows than comfortably fit into a single segment file. Bundles naturally arise when two ingestions each produce data in the same intervals. Since segments are immutable, new data must go into a new segment file. But, that file is within the same bundle as any existing files.

Versions

In Druid, a version is just the time at which the segment bundle was created. (Actually, the time at which the lock was obtained for first segment in the bundle, which, for our purposes, is basically the same thing.) Versions are per segment interval.

A version is thus a bundle of segments that all have the same version timestamp:

  |--------------------|     Segment 6 \
  |--------------------|     Segment 5  }  version 2021-02-18T00:11:22
  |--------------------|     Segment 4 /
  |--------------------|     Segment 3 \
  |--------------------|     Segment 2  }  version 2021-02-16T12:34:45
  |--------------------|     Segment 1 /
10:00                11:00

Segments 1, 2 and 3 form one bundle at version 2021-02-16T12:34:45, while segments 4, 5, 6 form another bundle at version 2021-02-18T00:11:22.

Versions allow us to impose an order on segment bundles. In this paper, we use the term overlay to say that the newer version (2021-02-16T12:34:45) sits on top of (or overlays) the earlier version.

Version locking (not further discussed here), ensures that we create only one version at a time for a given storage interval, and that the bundle of segments created within that lock have the same version number.

Data Replacement

Druid’s versioning mechanism exists to support replacement operations. The basic idea is that the data of record resides externally to Druid. In the earliest versions, data would reside in Hadoop in HDFS. Data in HDFS is often partitioned into a time-based directory structure:

2021 /
|- 01
   |- 01
      | file01-1.csv
      | file01-2.csv
      | ...
   |- 02
      | file02-1.csv
      | file02-2.csv
      ...
|- 02
...

The data of record accumulates and changes over time. We want to copy an entire day of data (one of the leaf directories above) into Druid. Since the data in Hadoop is the full source of truth, we want to replace any existing data in Druid with the new data.

We said before that segments are immutable, so we can’t simply go into segments, remove the old data, and add the new data as we’d do in an RDBMS. Instead, we use versions. Each time we load the same data, we create a new version. So, the versions shown in the prior image represent different times we (re)loaded the same data.

Of course, if we only ever append data (as in ingesting from Kafka), then the data replacement issue never arrises, and all data within any given segment will all be at the same version (there will be only one bundle of segments.)

Replacement Grain

Replacement occurs at the segment grain. But, suppose our table is a SaaS table with a one-day segment grain, but we want to replace only one hour? To replace that one hour, we’d have to replace the entire day. If we only provide the single hour, the other 23 hours will disappear, which is not what we want.

Druid provides a feature to solve this issue. We noted above that, in Druid there is no single segment grain: every segment has its own grain. So, for the replacement ingestion we simply use a one-hour grain. This leads to the following situation:

        |---|                Hour-grain segment, version 2021-01-03T22:33:44
  |--------------------|     Day grain segment,  version 2021-01-02T00:11:22
01-01                01-02

In this case, the one hour of data is at a newer version than the rest of the data. The next section discusses how this works in detail. The replacement data forms a new bundle, at a new version, but only for one our of time.

We could do the same trick for appending data, but there is no reason. When appending, we add data to an existing bundle: it doesn’t matter if the new data spans all, or only part, of the segment interval: all data in the bundle is combined when running a query.

Visibility and Overshadowing

The above explains why we want versions, and what they do. The next question is: how do they work with queries? The fundamental question is: what segments (files) should be read to satisfy a query? We noted that Druid uses bundles and overlays to store data, and assigns versions so we can “stack” them in a defined order. We want to know, which spans, within each segment, should be visible to a query? Druid uses the term overshadowed for the opposite concept; which spans within a segment are not visible to a query.

Conceptually, Druid computes visibility instant-by-instant. We can imagine an infinitely narrow laser beam scanning an interval from left to right. Whichever segment span is hit by the laser is visible. Anything overshadowed by a higher segment (that is, a bundle with a later version) is not visible.

Example:

    1       2        3
    V       V        V
    :       :        :
    :   |=======|    :       Overlay segment, version 2021-02-18T02:33:44
  |=====--|          :       Overlay segment, version 2021-02-18T01:22:33
  |--------------======|     Full segment,    version 2021-02-16T12:34:45
10:00                11:00

The diagram shows a full segment and two overlays, stacked in version order. The “V” symbol is the laser, shown in three different positions during the scan. The bold (“=”) portions of each segment show the visible portions: the data that should appear in a query. The dashed portions are overshadowed: they will not appear in a query.

The Timeline

In Druid, the timeline is the data structure that figures out which data is visible in each segment. For our tombstone discussion, we’ll need two timelines: the segment timeline and the query timeline. (See the code in VersionedIntervalTimeline and the detailed use cases described in VersionedIntervalTimelineTest.)

Segment Timeline

The Coordinator uses the segment timeline to determine which segments to load. Consider two examples. In the one directly above (the one with the lasers), all three segments must be loaded because all three have data visible to queries. However, in the prior (Versions) example, it is clear that the bottom bundle of three segments is completely overshadowed, none of their data is visible. (Using a term introduced above, the visible data is the null span.) Therefore these bottom three segments need not be loaded.

Query Timeline

The query timeline (often just referred to as the timeline) is built only from segments which are loaded, and announced by Historical nodes. This means that the query timeline is not based on the actual segments, but only the announced segments (and is thus a potential source of all manner of bugs and race conditions.)

We can consider the query timeline as a subset of the segment load timeline, which includes only loaded segments. This can obviously lead to incorrect results. Consider the example above:

    1       2        3
    V       V        V
    :       :        :
    :   |=======|    :       Overlay interval, version 2021-02-18T02:33:44
  ..:......          :       Overlay interval, not yet loaded
  |======--------======|     Full segment, version     2021-02-16T12:34:45
10:00                11:00

In the above, some time delay or other issue has caused one of the overlay segments to not be loaded (or announced yet.) As a result, our “laser” does not land on that segment, it instead lands on the full segment. The result is that queries return results, but the wrong results. Amazingly, this is a “feature” of Druid, not bug. It is the responsibility of the application to poll Druid to determine if everything is loaded. For other DBs, the above would be considered “data corruption.”

MSQ Timeline (Cold Tier)

MSQ is our new query solution. MSQ is designed to operate on cold storage: that is, on the segments directly, not on their cached copies. (MSQ will include caching, but for now, let’s focus on the pure “cold tier” operation.)

MSQ wants to operate on the segment load timeline. Since there may be no historicals, using the current query timeline is not practical. This means that the segment timeline must be made explicit (if not yet explicit in the implementation). It also means that Talaria is immune to the errors that result from the announcement-based query timeline.

When MSQ adds segment caching, one can imagine adding another step:

  • Use the segment timeline to find the visible set of segments.
  • Consult a cache map to determine which of those segments are cached, and where.

Because MSQ must use the segment (rather than query) timeline, the tombstone fix may or may not benefit MSQ, depending on how it is implemented.

Segment Metadata

Thus far we’ve discussed segments a stand-alone objects. It is true that segments encode (data-source, interval, version) in their file names, and much more information in the metadata stored within the segment. However, it would be very expensive to constantly scan the deep storage (HDFS, S3) to find the inventory of segments. Instead, Druid has a metadata table (in a traditional RDBMS) which holds that information. The table is typically called segments, is defined in SQLMetadataConnector and has the following schema:

CREATE TABLE segments (
  id VARCHAR(255) NOT NULL,
  dataSource VARCHAR(255) NOT NULL,
  created_date VARCHAR(255) NOT NULL,
  start VARCHAR(255) NOT NULL,
  end VARCHAR(255) NOT NULL,
  partitioned BOOLEAN NOT NULL,
  version VARCHAR(255) NOT NULL,
  used BOOLEAN NOT NULL,
  payload BLOB NOT NULL,
  PRIMARY KEY (id)
)

The actual schema has nuances that are not relevant here. The key fields are the (dataSource, start, end, version) which repeat the information in the id field (file name). The payload is the serialized metadata for the table.

Unused Segments

The above story is incomplete: there is one additional variable: the segment used flag, as shown in the metadata schema above. Each segment can be marked as used (used = true) or unused (used = false). This flag has, unfortunately, come to mean two entirely different things:

  1. Do not load the segment. Perhaps the segment represents older data not needed in day-to-day querying and the user chooses to save cost by not loading it. Since the query timeline is built from announced segments, unused (thus unloaded) segments are not visible to queries: it as if the data does not exist. (Again, amazingly, this is a feature, not a bug: the user gets no warning when they attempt to query unloaded segments.)
  2. Remove the segment. (AKA “soft delete.”) The only way to “drop a table” in Druid is to mark all segments for that table as unused. The result is that the segments are not loaded so that, to queries, it does seem as if the data is deleted.

It is not hard to see serious bugs with the above. The meaning to the Coordinator is the same (don’t load the segment.) However, to the user, these are entirely different use case: one retains the user’s data, the other deletes it.

If the unused segment is an overlay, then the previously-overshadowed data becomes visible. (This is exactly the “unloaded overlay” case discussed above.)

If the user thinks they “drop” data, but later mark those segments as used, then the data reappears in queries.

Marking segments as unused may be a way of saying that older versions need not be loaded. If so, then this use is redundant: the segment timeline should discover that fact from the overlay intervals and segment versions.

Ingestion

We are now ready to discuss dynamic ingest behavior. Let’s first take the easy case: loading data into an empty table

Initial Ingestion

In the initial ingestion, no data exists. The ingested data creates entirely new segments placed into “virgin” storage intervals. Ingested data is split into segments at storage interval boundaries, so that a single ingestion may produce one or more storage intervals. Ingestion may produce multiple segments per interval. The simple case occurs when the input data has more rows than is ideal for a segment (that is, more than about 5 million rows): data is split across multiple segments. Multiple segments may also be produced due to clustering.

For our purposes, the key thing to observe is that all the segments within a single storage interval will have the same version (timestamp.) This means our “laser” above is not looking at just a single segment at each “layer” but rather a collection of segments.

We also note that, since all data within a segment must be at the same version, only one ingestion per table can occur concurrently. If we had to, then the segments produced by Ingestion 2 (with a later version) would overshadow those produced by Ingestion 1 (with an earlier version.) This is another “feature” that seems more like a bug, but is a topic for a different discussion.

See the documentation for details on the actual handoff process.

Subsequent Ingestion

The first ingestion is done: we now have data in three storage segments, say (09, 10], (10, 11], and (11, 12]. Now the second batch of data arrives. The data contains some “late arriving data” for the existing intervals, plus data for additional, later intervals.

To append data to an existing storage interval (chunk), Druid provides an allocate operation: SegmentAllocateAction which creates new segments at the current segment version.

Open question: which version?

Is the version that of the full segments? Of the latest overlay? Both seem to have bugs. If the latest full segment version, then an earlier replace will overshadow the new data. If at the latest overlay version, then it is very unclear how overshadowing can work. Maybe, for a given base segment version, one can either append or replace, but not both?

Data Replacement

Druid was originally developed as a materialized view system. The “data of record” was stored in Hadoop, and copied into Druid. As the Hadoop data of record changed (new data arrived, corrections made), the data in Druid was replaced with a new set. Thus, Druid’s versioning and overshadowing mechanism is optimized for this specific use case.

Consider a typical example. We have data in the (10:00, 11:00] storage interval. We want to replace data in the (10:00, 10:15] interval. This is exactly what overlays allow:

  |----|                     Replacement, overlay interval: (10:00, 10:15]
  |--------------------|     Full segment interval: (10:00, 11:00]
  |--------------------|     Storage interval:      (10:00, 11:00]
10:00                11:00

The data visibility rules, and timelines, ensure that queries use the new, replacement data for the (10:00, 10:15] interval, and the base data for the rest of the hour.

Replacing Multiple Storage Intervals

All of the discussion thus far is for the case of segments that contain data. Indeed, Druid has no concept of segments which do not contain data. But, what if the replacement data has no rows for a given storage interval? What blocks the “laser” from hitting the now-replaced base segment data? We already discussed one solution, the used flag. An early fix for this case used that flag as follows:

  • Take a replacement range. If our storage grain is one hour, perhaps the replacement range is (09:00, 12:00].
  • Read the rows and create new segments.
  • Note the set of original storage intervals spanned by the replacement range. Find any that have no rows in the new segments.
  • For each such storage range, mark the original segments as used = false. (That is, the second case above, the “soft delete.”)

Unfortunately, the original design of this fix issued the required actions in two steps:

  • Issue an action to mark the segments in the storage intervals as unused.
  • Issue a second action to replace the non-empty storage intervals with new data.

While this is technically correct, the actions are not atomic. The behavior in the cluster is:

  • The first action cause all data for all three storage intervals to become unloaded. Queries against that time range return no results.
  • The second action causes the new data to be loaded, restoring data to queries in the target storage intervals.

As it turned out, the “missing data” aspect was perceived more as a bug than a feature.

MSQ fixes this problem:

  • For each storage segment spanned the replacement interval, determine if the new data contains rows in that interval. If not, mark all segments in only the now-empty storage intervals as unused.
  • Issue a second action to replace the non-empty storage intervals with new data.

The actions are still not atomic, so the user will see:

  • Queries return old data for all three storage. (Before the first action.)
  • Queries return old data for the non-empty storage interval, but old data for the others. (After the first action.)
  • Queries return new data. (After the second action.)

This is still less than idea as queries can return results that don’t reflect any actual real-world state. (We can even go so far as to call this “data corruption.”

The correct solution is to have a single action that combines both actions.

Compaction

The final ingestion topic to cover is compaction. Ingestion can produce odd-sized segments. Repeated rounds of ingestion can produce separate sets of files. Compaction rewrites the segments within an interval to produce a new set of segments with better storage or query characteristics. (In SaaS, compaction is also used to change the schema of an existing table.)

Versions allow us to differentiate the original data from the rewritten data and are critical for an atomic operation. The compaction task:

  • Locks the storage interval to prevent other tasks from changing the same interval.
  • Reads all existing segments and produces a new set with all the same rows, just arranged differently.
  • Issues a task action which ((what?)) to mark the old set of segments as obsolete, and publishes the new set.
  • The Coordinator notices the change and requests historicals to load the new set of segments.
  • Once the new segments appear, they have a newer version than the previous set, and replace the previous set in the timeline. (Is this atomic? If not, it is a bug.)
  • Once published, the Coordinator unpublishes the old set of segments.

Tombstones

We previously discussed the case in which an entire storage interval has no data after replacement. Here, let’s talk about when a sub-interval has no data. This situation can occur in any of the cases mentioned previously:

  • The replacement interval is a subset of the storage interval, and has no data. (Example: replace the interval (10:05, 10:10] within (10:00, 11:00], and the replacement has no data.
  • The replacement interval overlaps the start or end of a storage interval, and has no data. (Example: replace the interval (9:50, 11:10], and the (9:50, 10:00] sub-interval has no data.

Now if we could create a zero-row segment the solution would work exactly as described above for visibility and overshadowing. However, segments cannot have zero rows. And, even if they did, it is silly to create a file just to say that the file contains nothing, so we can properly handle that case in metadata.

The idea is that we can record in metadata, a tombstone which says that, for a given interval, there is no data. When our laser shines on this interval, the timeline understands that there is no data in that span, so there is nothing to announce.

The tombstone becomes a third way to say that there is no data:

  • Empty (“virgin”) storage intervals have no data by virtue of having no segments.
  • Intervals with all segments marked used = false have no data by virtue of not loading those segments.
  • Intervals with a tombstone have no data by virtue of having been replaced by data that has zero rows in that interval.

The careful reader will note that the second two cases are redundant, and could be combined. We’ll have more to say on that later.

Metadata Storage

Tombstones are entries in the segments metadata table described above. The payload contains JSON that marks the record not as an actual segment, but as a tombstone. (It would be ideal to add another field to the table itself, but that would break downgrades, and Druid appears to not have a mechanism to do such schema updates.) The result is that, to all consumers of the table, a tombstone looks like a segment. Only those able to deserialize the payload will have the information to know that the entry is not for a file, but for a tombstone. (Again, this seems more like a bug than a feature.)

Segment Timeline

The segment timeline should handle tombstones specially (though the proposed code changes do not actually do this.) Consider this example:

         |xxxxxxx|           Overlay 3: tombstone, (10:20, 10:40]
          |---|              Overlay 2: (10:25, 10:35]
     |===---------==|        Overlay 1: (10:10, 10:55]
  |==----------------==|     Full segment interval: (10:00, 11:00]
  |--------------------|     Storage interval:  (10:00, 11:00]
10:00                11:00

In this case, the tombstone entirely overshadows overlay 2. The double lines (“=”) indicate the visible bits. The “x” indicate “no data”.

When this is used as a segment timeline, the Coordinator must request to load:

  • Base segments, because of the (10:00, 10:10] and [10:55, 11:00] intervals.
  • Overlay 1, because of the [10:10, 10:20] and [10:40, 10:55] intervals.

Note that the tombstone should not be loaded because the very definition of a tombstone is “there is no data to load.” (The proposed code does load the tombstone, causing additional code changes, but no harm.)

Query Timeline

The query timeline should look similar to the above, but with the omission of the non-published Overlay 2:

         |xxxxxxx|           Overlay 3: tombstone, (10:20, 10:40]
     |===---------==|        Overlay 1: (10:10, 10:55]
  |==----------------==|     Full segment interval: (10:00, 11:00]
  |--------------------|     Storage interval:  (10:00, 11:00]
10:00                11:00

The challenge, of course is: where does the query timeline get the tombstone information? The query timeline is based on the segments which historicals announce (as described above.) So, it seems logical that a historical must also announce the tombstone. (This is the reason that the proposed change does load and announce tombstones.)

The alternative is that the coordinator announces the tombstones and bypasses historicals. Such an approach as the benefit of recognizing the “nothing to load” nature of a tombstone, but does require extra code.

MSQ Timeline

Since the Talaria timeline cannot be built from historical announcements, it must be based on the segment timeline above. Query behavior (see below) will be identical in both cases, but will not be subject to the race conditions and errors described above for the “we only know about loaded data” issue.

Query Logic

The Broker plans a query. In so doing, it divides the query into a series of segment scans. The Broker can already handle query intervals for which there is no data (such as a query that covers a month in the future, when all data is historical, or all segments for an interval are unused.) Given the example above, if we queried the entire storage interval, the Broker would produce the following segment scans (query slices):

  • (10:00, 10:10] - base segment
  • [10:10, 10:20] - Overlay 1
  • [10:55, 11:00] - Overlay 1
  • [10:40, 10:55] - base segment

Notice the gap where the tombstone occurs: the Broker simply issues no query slice for that time period.

Generalizing Tombstones

The discussion thus far has stated that tombstones are used only when replacing a subset of a storage interval. We also noted that we used the used = false flag in place of tombstones to mark entire storage intervals as having no data. We also noted the ambiguity of the two uses of used = false: 1) don’t load my data, and 2) delete my data.

Tombstones offer a way to resolve this ambiguity.

  • If data is not to be loaded (that is, it is in the proposed “cold tier”) then mark used = false (accepting the awkward naming.)
  • If data is to be deleted (via a delete operation, or replaced with an empty data set, or the data has expired due to expiration rules), then use a tombstone that spans the full storage interval.

Removing Tombstones

During a delete operation, if we insert a tombstone over the top of the existing data then we have something that looks like this:

  |xxxxxxxxxxxxxxxxxxxx|     Tombstone to mark the data deleted
  |--------------------|     Full segment interval: (10:00, 11:00]
  |--------------------|     Storage interval:      (10:00, 11:00]
10:00                11:00

To physically remove the data, we do the following:

  • Scan for segments which are entirely overshadowed. (That is, none of the data is visible.)
  • Physically delete the segments.
  • Remove the metadata entry for each such segment.

The result leaves us with:

  |xxxxxxxxxxxxxxxxxxxx|     Tombstone
  |--------------------|     Storage interval:  (10:00, 11:00]
10:00                11:00

We can now invoke a second rule:

  • If there is nothing which a tombstone overshadows, delete the tombstone from metadata.

Similar rules can be used for tombstones which cover a subset of the storage interval.

Both rules can be realized by observing the segment load timeline described above.

Data Replacement Revisions

With the above definition of tombstones in place, we can revise the data replacement algorithm:

  • Replacement is for some interval. Break that interval into a set of sub-intervals that overlap a single storage interval, as described previously.
  • Ingest the replacement data.
  • Review the set of newly-created segments against our set of sub-intervals created above. If any such sub-interval has no new segment (that is, it has zero rows), create a tombstone for that sub-interval.
  • In a single atomic action:
    • Retire the old intervals.
    • Publish the new segments and tombstones.

If “retire the old intervals” means setting used = false, which we noted is a problem, then even simpler algorithm is:

  • Replacement is for some interval. Break that interval into a set of sub-intervals that overlap a single storage interval, as described previously.
  • Ingest the replacement data.
  • In a single atomic action:
    • Publish tombstones for all sub-intervals.
    • Publish the new segments.

Internally, the ideal outcome would be for the tombstones to have a version older than the new segments. The result would be:

             |----------|                New data (with zero-rows sub-intervals)
       |xxxxx|xxxxxxxxxx|xxxxx|          Tombstones
  |----------|----------|----------|     Original segments 
  |----------|----------|----------|     Storage intervals (base intervals)
10:00      11:00      12:00      13:00
       |<-------------------->|          Replacement interval

If tombstones are the standard way to mark data as deleted, then we simplify the problem by using them to “wite-out” the entire range of replaced data. We then overlay the tombstones with the actual replacement data: it does not matter if that data has zero rows for all, some or none of the sub-intervals.

Note the nuance in the above, revised algorithm:

  • Publishing of the tombstones and data must be done atomically, so that the cluster moves from the old data to the new data in one step. (We avoid the race conditions discussed previously.)
  • The tombstones must have an earlier version than the segments so that the segments sit “above” the tombstones in the segment “stack.”

One way to do that is to simply use the transaction timestamp as the tombstone version. Add one to that timestamp to get the segment version. (If processing has occurred so quickly that the segment timestamp is somehow still in the future, simply sleep until that timestamp is in the past. Locking will prevent another transaction from using that same timestamp.)

Clone this wiki locally