Skip to content

Commit

Permalink
feat(storage): support hummock time travel (#17621)
Browse files Browse the repository at this point in the history
  • Loading branch information
zwang28 authored Jul 11, 2024
1 parent f81ca60 commit 5230e97
Show file tree
Hide file tree
Showing 54 changed files with 1,778 additions and 104 deletions.
3 changes: 3 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ message RowSeqScanNode {

// The pushed down `batch_limit`. Max rows needed to return.
optional uint64 limit = 6;
optional plan_common.AsOf as_of = 7;
}

message SysRowSeqScanNode {
Expand Down Expand Up @@ -279,6 +280,7 @@ message LocalLookupJoinNode {
// Null safe means it treats `null = null` as true.
// Each key pair can be null safe independently. (left_key, right_key, null_safe)
repeated bool null_safe = 11;
optional plan_common.AsOf as_of = 12;
}

// RFC: A new schedule way for distributed lookup join
Expand All @@ -295,6 +297,7 @@ message DistributedLookupJoinNode {
// Null safe means it treats `null = null` as true.
// Each key pair can be null safe independently. (left_key, right_key, null_safe)
repeated bool null_safe = 9;
optional plan_common.AsOf as_of = 10;
}

message UnionNode {}
Expand Down
1 change: 1 addition & 0 deletions proto/common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ message BatchQueryEpoch {
uint64 committed = 1;
uint64 current = 2;
uint64 backup = 3;
uint64 time_travel = 4;
}
}

Expand Down
11 changes: 10 additions & 1 deletion proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ message GroupDelta {
IntraLevelDelta intra_level = 1;
GroupConstruct group_construct = 2;
GroupDestroy group_destroy = 3;
GroupMetaChange group_meta_change = 4;
GroupMetaChange group_meta_change = 4 [deprecated = true];
GroupTableChange group_table_change = 5 [deprecated = true];
}
}
Expand Down Expand Up @@ -821,6 +821,14 @@ message CancelCompactTaskResponse {
bool ret = 1;
}

message GetVersionByEpochRequest {
uint64 epoch = 1;
}

message GetVersionByEpochResponse {
HummockVersion version = 1;
}

service HummockManagerService {
rpc UnpinVersionBefore(UnpinVersionBeforeRequest) returns (UnpinVersionBeforeResponse);
rpc GetCurrentVersion(GetCurrentVersionRequest) returns (GetCurrentVersionResponse);
Expand Down Expand Up @@ -861,6 +869,7 @@ service HummockManagerService {
rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse);
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse);
}

message CompactionConfig {
Expand Down
15 changes: 15 additions & 0 deletions proto/plan_common.proto
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,21 @@ message StorageTableDesc {
optional uint32 retention_seconds = 11;
}

message AsOf {
message ProcessTime {}
message Timestamp {
int64 timestamp = 1;
}
message Version {
int64 version = 1;
}
oneof as_of_type {
ProcessTime process_time = 1;
Timestamp timestamp = 2;
Version version = 3;
}
}

// Represents a table in external database for CDC scenario
message ExternalTableDesc {
uint32 table_id = 1;
Expand Down
7 changes: 7 additions & 0 deletions src/batch/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ pub enum BatchError {
#[backtrace]
opendal::Error,
),

#[error("Failed to execute time travel query")]
TimeTravel(
#[source]
#[backtrace]
anyhow::Error,
),
}

// Serialize/deserialize error.
Expand Down
25 changes: 21 additions & 4 deletions src/batch/src/executor/join/distributed_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ use risingwave_storage::{dispatch_state_store, StateStore};
use crate::error::Result;
use crate::executor::join::JoinType;
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, BufferChunkExecutor, Executor,
ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase,
unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder,
BufferChunkExecutor, Executor, ExecutorBuilder, LookupExecutorBuilder, LookupJoinBase,
};
use crate::task::{BatchTaskContext, ShutdownToken};

Expand Down Expand Up @@ -93,6 +93,24 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
NodeBody::DistributedLookupJoin
)?;

// as_of takes precedence
let as_of = distributed_lookup_join_node
.as_of
.as_ref()
.map(AsOf::try_from)
.transpose()?;
let query_epoch = as_of
.map(|a| {
let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
tracing::debug!(epoch, "time travel");
risingwave_pb::common::BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
epoch,
)),
}
})
.unwrap_or_else(|| source.epoch());

let join_type = JoinType::from_prost(distributed_lookup_join_node.get_join_type()?);
let condition = match distributed_lookup_join_node.get_condition() {
Ok(cond_prost) => Some(build_from_prost(cond_prost)?),
Expand Down Expand Up @@ -179,12 +197,11 @@ impl BoxedExecutorBuilder for DistributedLookupJoinExecutorBuilder {
let vnodes = Some(TableDistribution::all_vnodes());
dispatch_state_store!(source.context().state_store(), state_store, {
let table = StorageTable::new_partial(state_store, column_ids, vnodes, table_desc);

let inner_side_builder = InnerSideExecutorBuilder::new(
outer_side_key_types,
inner_side_key_types.clone(),
lookup_prefix_len,
source.epoch(),
query_epoch,
vec![],
table,
chunk_size,
Expand Down
27 changes: 24 additions & 3 deletions src/batch/src/executor/join/local_lookup_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ use risingwave_pb::plan_common::StorageTableDesc;

use crate::error::Result;
use crate::executor::{
BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder, DummyExecutor, Executor,
ExecutorBuilder, JoinType, LookupJoinBase,
unix_timestamp_sec_to_epoch, AsOf, BoxedDataChunkStream, BoxedExecutor, BoxedExecutorBuilder,
DummyExecutor, Executor, ExecutorBuilder, JoinType, LookupJoinBase,
};
use crate::task::{BatchTaskContext, ShutdownToken, TaskId};

Expand All @@ -66,6 +66,7 @@ struct InnerSideExecutorBuilder<C> {
chunk_size: usize,
shutdown_rx: ShutdownToken,
next_stage_id: usize,
as_of: Option<AsOf>,
}

/// Used to build the executor for the inner side
Expand Down Expand Up @@ -108,6 +109,7 @@ impl<C: BatchTaskContext> InnerSideExecutorBuilder<C> {
ordered: false,
vnode_bitmap: Some(vnode_bitmap.finish().to_protobuf()),
limit: None,
as_of: self.as_of.as_ref().map(Into::into),
});

Ok(row_seq_scan_node)
Expand Down Expand Up @@ -296,6 +298,24 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
source.plan_node().get_node_body().unwrap(),
NodeBody::LocalLookupJoin
)?;
// as_of takes precedence
let as_of = lookup_join_node
.as_of
.as_ref()
.map(AsOf::try_from)
.transpose()?;
let query_epoch = as_of
.as_ref()
.map(|a| {
let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
tracing::debug!(epoch, "time travel");
risingwave_pb::common::BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
epoch,
)),
}
})
.unwrap_or_else(|| source.epoch());

let join_type = JoinType::from_prost(lookup_join_node.get_join_type()?);
let condition = match lookup_join_node.get_condition() {
Expand Down Expand Up @@ -402,12 +422,13 @@ impl BoxedExecutorBuilder for LocalLookupJoinExecutorBuilder {
lookup_prefix_len,
context: source.context().clone(),
task_id: source.task_id.clone(),
epoch: source.epoch(),
epoch: query_epoch,
worker_slot_to_scan_range_mapping: HashMap::new(),
chunk_size,
shutdown_rx: source.shutdown_rx.clone(),
next_stage_id: 0,
worker_slot_mapping,
as_of,
};

let identity = source.plan_node().get_identity().clone();
Expand Down
67 changes: 64 additions & 3 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use risingwave_common::util::value_encoding::deserialize_datum;
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::{scan_range, PbScanRange};
use risingwave_pb::common::BatchQueryEpoch;
use risingwave_pb::plan_common::StorageTableDesc;
use risingwave_pb::plan_common::as_of::AsOfType;
use risingwave_pb::plan_common::{as_of, PbAsOf, StorageTableDesc};
use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::TableDistribution;
Expand All @@ -55,6 +56,7 @@ pub struct RowSeqScanExecutor<S: StateStore> {
ordered: bool,
epoch: BatchQueryEpoch,
limit: Option<u64>,
as_of: Option<AsOf>,
}

/// Range for batch scan.
Expand All @@ -66,6 +68,36 @@ pub struct ScanRange {
pub next_col_bounds: (Bound<Datum>, Bound<Datum>),
}

#[derive(Debug, Clone, PartialEq, Eq, Hash)]
pub struct AsOf {
pub timestamp: i64,
}

impl TryFrom<&PbAsOf> for AsOf {
type Error = BatchError;

fn try_from(pb: &PbAsOf) -> std::result::Result<Self, Self::Error> {
match pb.as_of_type.as_ref().unwrap() {
AsOfType::Timestamp(ts) => Ok(Self {
timestamp: ts.timestamp,
}),
AsOfType::ProcessTime(_) | AsOfType::Version(_) => Err(BatchError::TimeTravel(
anyhow::anyhow!("batch query does not support as of process time or version"),
)),
}
}
}

impl From<&AsOf> for PbAsOf {
fn from(v: &AsOf) -> Self {
PbAsOf {
as_of_type: Some(AsOfType::Timestamp(as_of::Timestamp {
timestamp: v.timestamp,
})),
}
}
}

impl ScanRange {
/// Create a scan range from the prost representation.
pub fn new(
Expand Down Expand Up @@ -134,6 +166,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
identity: String,
limit: Option<u64>,
metrics: Option<BatchMetricsWithTaskLabels>,
as_of: Option<AsOf>,
) -> Self {
Self {
chunk_size,
Expand All @@ -144,6 +177,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
ordered,
epoch,
limit,
as_of,
}
}
}
Expand Down Expand Up @@ -205,6 +239,11 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {

let epoch = source.epoch.clone();
let limit = seq_scan_node.limit;
let as_of = seq_scan_node
.as_of
.as_ref()
.map(AsOf::try_from)
.transpose()?;
let chunk_size = if let Some(limit) = seq_scan_node.limit {
(limit as u32).min(source.context.get_config().developer.chunk_size as u32)
} else {
Expand All @@ -223,6 +262,7 @@ impl BoxedExecutorBuilder for RowSeqScanExecutorBuilder {
source.plan_node().get_identity().clone(),
limit,
metrics,
as_of,
)))
})
}
Expand Down Expand Up @@ -254,8 +294,21 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
ordered,
epoch,
limit,
as_of,
} = *self;
let table = Arc::new(table);
// as_of takes precedence
let query_epoch = as_of
.map(|a| {
let epoch = unix_timestamp_sec_to_epoch(a.timestamp).0;
tracing::debug!(epoch, identity, "time travel");
risingwave_pb::common::BatchQueryEpoch {
epoch: Some(risingwave_pb::common::batch_query_epoch::Epoch::TimeTravel(
epoch,
)),
}
})
.unwrap_or_else(|| epoch);

// Create collector.
let histogram = metrics.as_ref().map(|metrics| {
Expand Down Expand Up @@ -288,7 +341,8 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
for point_get in point_gets {
let table = table.clone();
if let Some(row) =
Self::execute_point_get(table, point_get, epoch.clone(), histogram.clone()).await?
Self::execute_point_get(table, point_get, query_epoch.clone(), histogram.clone())
.await?
{
if let Some(chunk) = data_chunk_builder.append_one_row(row) {
returned += chunk.cardinality() as u64;
Expand Down Expand Up @@ -319,7 +373,7 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
table.clone(),
range,
ordered,
epoch.clone(),
query_epoch.clone(),
chunk_size,
limit,
histogram.clone(),
Expand Down Expand Up @@ -442,3 +496,10 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
}
}
}

pub fn unix_timestamp_sec_to_epoch(ts: i64) -> risingwave_common::util::epoch::Epoch {
let ts = ts.checked_add(1).unwrap();
risingwave_common::util::epoch::Epoch::from_unix_millis(
u64::try_from(ts).unwrap().checked_mul(1000).unwrap(),
)
}
Loading

0 comments on commit 5230e97

Please sign in to comment.