Skip to content

Compaction Notes

Paul Rogers edited this page Mar 19, 2023 · 1 revision

Notation

  • Partition chunk (PartitionChunk): a time chunk in a timeline, along with knowledge of its position within the timeline.
  • Overshadowable (Overshadowable): interface on top of a data segment that implements the overshadow relation
  • Data segment (DataSegment): "Metadata of Druid's data segment".

Partition chunk:

  • What happens if the timeline has gaps? Are there multiple start/stop sets?

Overshadowable:

  • What is the version & minor version?

Proposed algorithm

This comes from first principles. The current implementation is quite complex: this does not attempt to describe that implementation.

Start with a timeline, which is assumed to be either:

  • A list of visible segments listed in start-time order. Segments appear once. Or
  • A list of time chunks, with the segments that have data for that time chunk. Segments appear multiple times.

Parse the list into spans: runs of adjacent or overlapping segments. A gap occurs when segment A ends before segment B begins.

Parse the above list into time chunks of the target granularity. Each chunk can be empty, have fully-contained segments, or can have segments which overlap to an adjacent segment. Mark each chunk with its "overlap prior" or "overlap later" state.

Score each time chunk. The score has an action: "empty", "stable", "compact" or "no opinion". For "compact", it has a priority in the range 0-1. This may be done by a set of rules.

  • A chunk with no segments (or only tombstones) is empty.
  • A chunk with one segment, and that segment is less than the max size is "stable".
  • A chunk with n segments, and each is below the max size, and the number is ceil((sum sizes) / max size) = n, is also stable.
  • A chunk with overlaps is "no opinion" if the sizes are stable, "compact" otherwise
  • A chunk with oversize segments, or one that does not satisfy the rule above, is "compact"

Create a new list of candidate tasks

  • Omit any empty or stable segment. Clear the previous task.
  • If the segment does not span the earlier chunk, create a new task and include this chunk.
  • If the segment spans the previous, add the chunk to the previous task, if any, else start one.

Rewrite the candidate list to get the actual list.

  • If a task is small, combine it with the previous, if the previous is also small.
  • If a task is below the maximum cost, it becomes an actual task.
  • If the task is large, split it into multiple actual tasks at chunk boundaries so that each actual is below the limit.

If there will be a conflict between adjacent tasks when they share a segment, then create two lists: "red" and "black":

  • The first task is red
  • If a task shares a segment with a previous segment, its color is black.
    • If a task is black, then mark the (prior, task) pair.
  • Otherwise, the task is red

Red tasks can be run (pending availability, etc.) Black tasks go into a dependency map: they can run when the dependent tasks complete. In the worst case, the above will fully serialize the tasks if there is continuous overlap. A refinement is to do finer parsing and maintain multiple dependencies.

Task Manager

The task manager maintains a priority queue of tasks to run, and launches tasks from that queue up to the capacity limit of the cluster. Each task has a cost, and the cluster has a limit on both task count and aggregate task count. Once a task completes, the task manager launches one or more other tasks from the queue.

Controller

If we do the above, and allow all tasks to complete, then repeat the cycle, we should find that there are no new tasks to run: each compaction left a time chunk in an ideal state.

If we do the above, start running tasks, and the system crashes, then repeat the cycle, we should obtain tasks only for those time chunks that were not processed previously. The set of actual tasks may differ because of changed adjacencies.

In a running system, if the aggregate capacity is lower than the aggregate task cost, then the system will stabilize after some time. That is $\sum \textsf{capacity} < \sum \textsf{task cost}$. For example, over a day, the system may start idle. New segments arrive faster than the system can compact them. In this case the arrival rate $a$ is greater than the dispatch rate $d$ and the queue size $q$ increases. If, at some point, the segment arrival rate drops, so that $a < d$, then the size of the queue decreases. If the queue size drops to $0$, then the system has enough total capacity to handle the load over the cycle period (say a day). That cycle period moves the system from one stable state to another. Again, the controller maintains the system in a steady state. That is, if the system repeated the segment analysis above, it would find no work to do except for any just-added segments.

If the average arrival rate stays above the dispatch rate, $\bar{a} > d$, then the queue will grow without bound. In this case, the cluster has insufficient capacity for the load. To prevent the queue from exploding, the system invokes load shedding. Define some maximum queue capacity $m$. When $q > m$, the system sheds load by cancelling the lowest-priority tasks (i.e. those at the end of the priority queue.)

When this occurs, and if we rerun the analysis above, we'll rediscover the low-priority tasks and again add them to the queue. If the system says overloaded, the tasks will again be discarded. The system is stable in the sense that the highest priority tasks are done, and the lowest are never run. However, the system thrashes to determine this state.

Segment State

To avoid the trashing case, and to avoid the need to repeat the analysis of segments which have not changed since the last analysis, we add a state store for segment (or time chunk) state. Each interval is marked with one of:

  • Empty: there is no data
  • Stable: the analysis found the time chunk to be stable
  • Unstable: the analysis found the time chunk requires compaction
  • Deferred: the task to correct instability was canceled due to load shedding

With the above, we can immediately skip any chunk which is empty, stable or deferred without doing a full analysis. We add a task for the unstable case only if one does not yet exist. If the system load drops to where the queue is empty (the dispatch rate exceeds the arrival rate), the system can reanalyze and reschedule deferred time chunks.

The state can also store the version number and segment count within the chunk. When doing analysis, if either differ from the actual state of the cluster, then the time chunk needs re-analysis. This allows the system to detect late-arriving data, data replacement, etc.