diff --git a/src/metric-engine/src/engine.rs b/src/metric-engine/src/engine.rs index 20c7e0d050ae..38cbc6e4ac4e 100644 --- a/src/metric-engine/src/engine.rs +++ b/src/metric-engine/src/engine.rs @@ -13,6 +13,7 @@ // limitations under the License. mod alter; +mod catchup; mod close; mod create; mod drop; @@ -147,8 +148,7 @@ impl RegionEngine for MetricEngine { | RegionRequest::Flush(_) | RegionRequest::Compact(_) | RegionRequest::Truncate(_) => UnsupportedRegionRequestSnafu { request }.fail(), - // It always Ok(0), all data is the latest. - RegionRequest::Catchup(_) => Ok(0), + RegionRequest::Catchup(ref req) => self.inner.catchup_region(region_id, *req).await, }; result.map_err(BoxedError::new).map(|rows| RegionResponse { diff --git a/src/metric-engine/src/engine/catchup.rs b/src/metric-engine/src/engine/catchup.rs new file mode 100644 index 000000000000..0e6aee1e3883 --- /dev/null +++ b/src/metric-engine/src/engine/catchup.rs @@ -0,0 +1,61 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use snafu::ResultExt; +use store_api::region_engine::RegionEngine; +use store_api::region_request::{AffectedRows, RegionCatchupRequest, RegionRequest}; +use store_api::storage::RegionId; + +use crate::engine::MetricEngineInner; +use crate::error::{MitoCatchupOperationSnafu, Result, UnsupportedRegionRequestSnafu}; +use crate::utils; + +impl MetricEngineInner { + pub async fn catchup_region( + &self, + region_id: RegionId, + req: RegionCatchupRequest, + ) -> Result { + if !self.is_physical_region(region_id) { + return UnsupportedRegionRequestSnafu { + request: RegionRequest::Catchup(req), + } + .fail(); + } + let metadata_region_id = utils::to_metadata_region_id(region_id); + // TODO(weny): improve the catchup, we can read the wal entries only once. + self.mito + .handle_request( + metadata_region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: req.set_writable, + entry_id: None, + }), + ) + .await + .context(MitoCatchupOperationSnafu)?; + + self.mito + .handle_request( + region_id, + RegionRequest::Catchup(RegionCatchupRequest { + set_writable: req.set_writable, + entry_id: req.entry_id, + }), + ) + .await + .context(MitoCatchupOperationSnafu) + .map(|response| response.affected_rows) + } +} diff --git a/src/metric-engine/src/error.rs b/src/metric-engine/src/error.rs index f52bc6e3cc72..340f4f19bcfa 100644 --- a/src/metric-engine/src/error.rs +++ b/src/metric-engine/src/error.rs @@ -121,6 +121,13 @@ pub enum Error { location: Location, }, + #[snafu(display("Mito catchup operation fails"))] + MitoCatchupOperation { + source: BoxedError, + #[snafu(implicit)] + location: Location, + }, + #[snafu(display("Failed to collect record batch stream"))] CollectRecordBatchStream { source: common_recordbatch::error::Error, @@ -267,7 +274,8 @@ impl ErrorExt for Error { | OpenMitoRegion { source, .. } | CloseMitoRegion { source, .. } | MitoReadOperation { source, .. } - | MitoWriteOperation { source, .. } => source.status_code(), + | MitoWriteOperation { source, .. } + | MitoCatchupOperation { source, .. } => source.status_code(), CollectRecordBatchStream { source, .. } => source.status_code(), diff --git a/src/store-api/src/region_request.rs b/src/store-api/src/region_request.rs index 70271f017b35..1452fcbe6125 100644 --- a/src/store-api/src/region_request.rs +++ b/src/store-api/src/region_request.rs @@ -666,7 +666,7 @@ pub struct RegionTruncateRequest {} /// /// Makes a readonly region to catch up to leader region changes. /// There is no effect if it operating on a leader region. -#[derive(Debug)] +#[derive(Debug, Clone, Copy)] pub struct RegionCatchupRequest { /// Sets it to writable if it's available after it has caught up with all changes. pub set_writable: bool,