diff --git a/.github/dependabot.yml b/.github/dependabot.yml index 39dd579aa3732..2105e6de2cb64 100644 --- a/.github/dependabot.yml +++ b/.github/dependabot.yml @@ -26,6 +26,7 @@ updates: - "prost*" opentelemetry: patterns: + - "opentelemetry" - "opentelemetry*" - "tracing-opentelemetry" mysql: diff --git a/Cargo.lock b/Cargo.lock index 98467d29c0211..ec284f3ba4778 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6778,26 +6778,32 @@ dependencies = [ [[package]] name = "opentelemetry" -version = "0.20.0" +version = "0.21.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9591d937bc0e6d2feb6f71a559540ab300ea49955229c347a517a28d27784c54" +checksum = "1e32339a5dc40459130b3bd269e9892439f55b33e772d2a9d402a789baaf4e8a" dependencies = [ - "opentelemetry_api", - "opentelemetry_sdk", + "futures-core", + "futures-sink", + "indexmap 2.0.0", + "js-sys", + "once_cell", + "pin-project-lite", + "thiserror", + "urlencoding", ] [[package]] name = "opentelemetry-otlp" -version = "0.13.0" +version = "0.14.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e5e5a5c4135864099f3faafbe939eb4d7f9b80ebf68a8448da961b32a7c1275" +checksum = "f24cda83b20ed2433c68241f918d0f6fdec8b1d43b7a9590ab4420c5095ca930" dependencies = [ "async-trait", "futures-core", "http 0.2.9", + "opentelemetry", "opentelemetry-proto", "opentelemetry-semantic-conventions", - "opentelemetry_api", "opentelemetry_sdk", "prost 0.11.9", "thiserror", @@ -6807,11 +6813,11 @@ dependencies = [ [[package]] name = "opentelemetry-proto" -version = "0.3.0" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b1e3f814aa9f8c905d0ee4bde026afd3b2577a97c10e1699912e3e44f0c4cbeb" +checksum = "a2e155ce5cc812ea3d1dffbd1539aed653de4bf4882d60e6e04dcf0901d674e1" dependencies = [ - "opentelemetry_api", + "opentelemetry", "opentelemetry_sdk", "prost 0.11.9", "tonic 0.9.2", @@ -6819,47 +6825,30 @@ dependencies = [ [[package]] name = "opentelemetry-semantic-conventions" -version = "0.12.0" +version = "0.13.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "73c9f9340ad135068800e7f1b24e9e09ed9e7143f5bf8518ded3d3ec69789269" +checksum = "f5774f1ef1f982ef2a447f6ee04ec383981a3ab99c8e77a1a7b30182e65bbc84" dependencies = [ "opentelemetry", ] -[[package]] -name = "opentelemetry_api" -version = "0.20.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8a81f725323db1b1206ca3da8bb19874bbd3f57c3bcd59471bfb04525b265b9b" -dependencies = [ - "futures-channel", - "futures-util", - "indexmap 1.9.3", - "js-sys", - "once_cell", - "pin-project-lite", - "thiserror", - "urlencoding", -] - [[package]] name = "opentelemetry_sdk" -version = "0.20.0" +version = "0.21.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa8e705a0612d48139799fcbaba0d4a90f06277153e43dd2bdc16c6f0edd8026" +checksum = "968ba3f2ca03e90e5187f5e4f46c791ef7f2c163ae87789c8ce5f5ca3b7b7de5" dependencies = [ "async-trait", "crossbeam-channel", "futures-channel", "futures-executor", "futures-util", + "glob", "once_cell", - "opentelemetry_api", - "ordered-float 3.9.1", + "opentelemetry", + "ordered-float 4.1.1", "percent-encoding", "rand", - "regex", - "serde_json", "thiserror", "tokio", "tokio-stream", @@ -6883,6 +6872,15 @@ dependencies = [ "num-traits", ] +[[package]] +name = "ordered-float" +version = "4.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "536900a8093134cf9ccf00a27deb3532421099e958d9dd431135d0c7543ca1e8" +dependencies = [ + "num-traits", +] + [[package]] name = "ordered-multimap" version = "0.7.1" @@ -8767,6 +8765,7 @@ dependencies = [ "num-traits", "number_prefix", "opentelemetry", + "opentelemetry_sdk", "parking_lot 0.12.1", "parse-display", "paste", @@ -9758,6 +9757,7 @@ dependencies = [ "opentelemetry", "opentelemetry-otlp", "opentelemetry-semantic-conventions", + "opentelemetry_sdk", "parking_lot 0.12.1", "pprof", "risingwave_common", @@ -12400,20 +12400,33 @@ dependencies = [ "tracing-core", ] +[[package]] +name = "tracing-log" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3" +dependencies = [ + "log", + "once_cell", + "tracing-core", +] + [[package]] name = "tracing-opentelemetry" -version = "0.21.0" +version = "0.22.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75327c6b667828ddc28f5e3f169036cb793c3f588d83bf0f262a7f062ffed3c8" +checksum = "c67ac25c5407e7b961fafc6f7e9aa5958fd297aada2d20fa2ae1737357e55596" dependencies = [ + "js-sys", "once_cell", "opentelemetry", "opentelemetry_sdk", "smallvec", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.2.0", "tracing-subscriber", + "web-time", ] [[package]] @@ -12445,7 +12458,7 @@ dependencies = [ "time", "tracing", "tracing-core", - "tracing-log", + "tracing-log 0.1.3", "tracing-serde", ] @@ -13296,6 +13309,16 @@ dependencies = [ "wasm-bindgen", ] +[[package]] +name = "web-time" +version = "0.2.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57099a701fb3a8043f993e8228dc24229c7b942e2b009a1b962e54489ba1d3bf" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + [[package]] name = "webpki-roots" version = "0.25.2" @@ -13795,8 +13818,6 @@ dependencies = [ "num-traits", "openssl", "openssl-sys", - "opentelemetry_api", - "opentelemetry_sdk", "ordered-float 3.9.1", "parking_lot 0.12.1", "parking_lot_core 0.9.8", diff --git a/Cargo.toml b/Cargo.toml index dbc4cb88b89c3..681c867b6dcd2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -153,7 +153,12 @@ tikv-jemallocator = { git = "https://github.com/risingwavelabs/jemallocator.git" "profiling", "stats", ], rev = "64a2d9" } +opentelemetry = "0.21" +opentelemetry-otlp = "0.14" +opentelemetry_sdk = { version = "0.21", default-features = false } +opentelemetry-semantic-conventions = "0.13" tokio-util = "0.7" +tracing-opentelemetry = "0.22" risingwave_backup = { path = "./src/storage/backup" } risingwave_batch = { path = "./src/batch" } diff --git a/proto/stream_service.proto b/proto/stream_service.proto index 462f5ff0256a6..e8c5d94a20ac3 100644 --- a/proto/stream_service.proto +++ b/proto/stream_service.proto @@ -46,6 +46,7 @@ message DropActorsResponse { message ForceStopActorsRequest { string request_id = 1; + uint64 prev_epoch = 2; } message ForceStopActorsResponse { diff --git a/src/bench/Cargo.toml b/src/bench/Cargo.toml index 63e8d89844c54..81d59587193cb 100644 --- a/src/bench/Cargo.toml +++ b/src/bench/Cargo.toml @@ -23,7 +23,7 @@ futures-async-stream = { workspace = true } hdrhistogram = "7" itertools = "0.12" libc = "0.2" -opentelemetry = { version = "0.20", default-features = false, features = ["rt-tokio"], optional = true } +opentelemetry = { workspace = true, optional = true } parking_lot = "0.12" prometheus = { version = "0.13", features = ["process"] } rand = "0.8" diff --git a/src/common/Cargo.toml b/src/common/Cargo.toml index 664b8a53f2769..668f4173424dd 100644 --- a/src/common/Cargo.toml +++ b/src/common/Cargo.toml @@ -61,7 +61,8 @@ memcomparable = { version = "0.2", features = ["decimal"] } num-integer = "0.1" num-traits = "0.2" number_prefix = "0.4.0" -opentelemetry = { version = "0.20", default-features = false } +opentelemetry = { workspace = true } +opentelemetry_sdk = { workspace = true } parking_lot = "0.12" parse-display = "0.9" paste = "1" @@ -110,7 +111,7 @@ toml = "0.8" tonic = { workspace = true } tracing = "0.1" tracing-futures = { version = "0.2", features = ["futures-03"] } -tracing-opentelemetry = "0.21" +tracing-opentelemetry = { workspace = true } tracing-subscriber = "0.3.17" twox-hash = "1" url = "2" diff --git a/src/common/src/util/tracing.rs b/src/common/src/util/tracing.rs index e7da6e8e7d580..19523dde56324 100644 --- a/src/common/src/util/tracing.rs +++ b/src/common/src/util/tracing.rs @@ -19,7 +19,7 @@ use std::pin::Pin; use std::task::{Context, Poll}; use opentelemetry::propagation::TextMapPropagator; -use opentelemetry::sdk::propagation::TraceContextPropagator; +use opentelemetry_sdk::propagation::TraceContextPropagator; use tracing_opentelemetry::OpenTelemetrySpanExt; /// Context for tracing used for propagating tracing information in a distributed system. diff --git a/src/compute/src/rpc/service/stream_service.rs b/src/compute/src/rpc/service/stream_service.rs index def9a534586bb..d496e20d51eb5 100644 --- a/src/compute/src/rpc/service/stream_service.rs +++ b/src/compute/src/rpc/service/stream_service.rs @@ -116,7 +116,7 @@ impl StreamService for StreamServiceImpl { request: Request, ) -> std::result::Result, Status> { let req = request.into_inner(); - self.mgr.reset().await; + self.mgr.reset(req.prev_epoch).await; Ok(Response::new(ForceStopActorsResponse { request_id: req.request_id, status: None, diff --git a/src/expr/core/src/expr/expr_udf.rs b/src/expr/core/src/expr/expr_udf.rs index b162691896a43..246944ae5d9d3 100644 --- a/src/expr/core/src/expr/expr_udf.rs +++ b/src/expr/core/src/expr/expr_udf.rs @@ -44,6 +44,7 @@ pub struct UserDefinedFunction { children: Vec, arg_types: Vec, return_type: DataType, + #[expect(dead_code)] arg_schema: Arc, imp: UdfImpl, identifier: String, @@ -72,13 +73,19 @@ impl Expression for UserDefinedFunction { } async fn eval(&self, input: &DataChunk) -> Result { - let vis = input.visibility(); + if input.cardinality() == 0 { + // early return for empty input + let mut builder = self.return_type.create_array_builder(input.capacity()); + builder.append_n_null(input.capacity()); + return Ok(builder.finish().into_ref()); + } let mut columns = Vec::with_capacity(self.children.len()); for child in &self.children { let array = child.eval(input).await?; columns.push(array); } - self.eval_inner(columns, vis).await + let chunk = DataChunk::new(columns, input.visibility().clone()); + self.eval_inner(&chunk).await } async fn eval_row(&self, input: &OwnedRow) -> Result { @@ -89,51 +96,29 @@ impl Expression for UserDefinedFunction { } let arg_row = OwnedRow::new(columns); let chunk = DataChunk::from_rows(std::slice::from_ref(&arg_row), &self.arg_types); - let arg_columns = chunk.columns().to_vec(); - let output_array = self.eval_inner(arg_columns, chunk.visibility()).await?; + let output_array = self.eval_inner(&chunk).await?; Ok(output_array.to_datum()) } } impl UserDefinedFunction { - async fn eval_inner( - &self, - columns: Vec, - vis: &risingwave_common::buffer::Bitmap, - ) -> Result { - let chunk = DataChunk::new(columns, vis.clone()); - let compacted_chunk = chunk.compact_cow(); - let compacted_columns: Vec = compacted_chunk - .columns() - .iter() - .map(|c| { - c.as_ref() - .try_into() - .expect("failed covert ArrayRef to arrow_array::ArrayRef") - }) - .collect(); - let opts = arrow_array::RecordBatchOptions::default() - .with_row_count(Some(compacted_chunk.capacity())); - let input = arrow_array::RecordBatch::try_new_with_options( - self.arg_schema.clone(), - compacted_columns, - &opts, - ) - .expect("failed to build record batch"); + async fn eval_inner(&self, input: &DataChunk) -> Result { + // this will drop invisible rows + let arrow_input = arrow_array::RecordBatch::try_from(input)?; - let output: arrow_array::RecordBatch = match &self.imp { - UdfImpl::Wasm(runtime) => runtime.call(&self.identifier, &input)?, - UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &input)?, + let arrow_output: arrow_array::RecordBatch = match &self.imp { + UdfImpl::Wasm(runtime) => runtime.call(&self.identifier, &arrow_input)?, + UdfImpl::JavaScript(runtime) => runtime.call(&self.identifier, &arrow_input)?, UdfImpl::External(client) => { let disable_retry_count = self.disable_retry_count.load(Ordering::Relaxed); let result = if disable_retry_count != 0 { client - .call(&self.identifier, input) + .call(&self.identifier, arrow_input) .instrument_await(self.span.clone()) .await } else { client - .call_with_retry(&self.identifier, input) + .call_with_retry(&self.identifier, arrow_input) .instrument_await(self.span.clone()) .await }; @@ -155,16 +140,16 @@ impl UserDefinedFunction { result? } }; - if output.num_rows() != vis.count_ones() { + if arrow_output.num_rows() != input.cardinality() { bail!( "UDF returned {} rows, but expected {}", - output.num_rows(), - vis.len(), + arrow_output.num_rows(), + input.cardinality(), ); } - let data_chunk = DataChunk::try_from(&output)?; - let output = data_chunk.uncompact(vis.clone()); + let output = DataChunk::try_from(&arrow_output)?; + let output = output.uncompact(input.visibility().clone()); let Some(array) = output.columns().first() else { bail!("UDF returned no columns"); diff --git a/src/expr/udf/src/external.rs b/src/expr/udf/src/external.rs index f8d4cf6cc379e..046d681485c38 100644 --- a/src/expr/udf/src/external.rs +++ b/src/expr/udf/src/external.rs @@ -139,21 +139,17 @@ impl ArrowFlightUdfClient { } async fn call_internal(&self, id: &str, input: RecordBatch) -> Result { - let mut output_stream = self.call_stream(id, stream::once(async { input })).await?; - // TODO: support no output - let head = output_stream - .next() - .await - .ok_or_else(Error::no_returned)??; - let remaining = output_stream.try_collect::>().await?; - if remaining.is_empty() { - Ok(head) - } else { - Ok(arrow_select::concat::concat_batches( - &head.schema(), - std::iter::once(&head).chain(remaining.iter()), - )?) + let mut output_stream = self + .call_stream_internal(id, stream::once(async { input })) + .await?; + let mut batches = vec![]; + while let Some(batch) = output_stream.next().await { + batches.push(batch?); } + Ok(arrow_select::concat::concat_batches( + output_stream.schema().ok_or_else(Error::no_returned)?, + batches.iter(), + )?) } /// Call a function, retry up to 5 times / 3s if connection is broken. @@ -179,6 +175,17 @@ impl ArrowFlightUdfClient { id: &str, inputs: impl Stream + Send + 'static, ) -> Result> + Send + 'static> { + Ok(self + .call_stream_internal(id, inputs) + .await? + .map_err(|e| e.into())) + } + + async fn call_stream_internal( + &self, + id: &str, + inputs: impl Stream + Send + 'static, + ) -> Result { let descriptor = FlightDescriptor::new_path(vec![id.into()]); let flight_data_stream = FlightDataEncoderBuilder::new() @@ -194,11 +201,10 @@ impl ArrowFlightUdfClient { // decode response let stream = response.into_inner(); - let record_batch_stream = FlightRecordBatchStream::new_from_flight_data( + Ok(FlightRecordBatchStream::new_from_flight_data( // convert tonic::Status to FlightError stream.map_err(|e| e.into()), - ); - Ok(record_batch_stream.map_err(|e| e.into())) + )) } } diff --git a/src/meta/model_v2/migration/src/m20230908_072257_init.rs b/src/meta/model_v2/migration/src/m20230908_072257_init.rs index 0c3d9d36c4853..444433d84c31e 100644 --- a/src/meta/model_v2/migration/src/m20230908_072257_init.rs +++ b/src/meta/model_v2/migration/src/m20230908_072257_init.rs @@ -571,6 +571,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Table::Description).string()) .col(ColumnDef::new(Table::Version).json()) .col(ColumnDef::new(Table::RetentionSeconds).integer()) + .col(ColumnDef::new(Table::IncomingSinks).json().not_null()) .foreign_key( &mut ForeignKey::create() .name("FK_table_object_id") @@ -628,6 +629,7 @@ impl MigrationTrait for Migration { .col(ColumnDef::new(Sink::DbName).string().not_null()) .col(ColumnDef::new(Sink::SinkFromName).string().not_null()) .col(ColumnDef::new(Sink::SinkFormatDesc).json()) + .col(ColumnDef::new(Sink::TargetTable).integer()) .foreign_key( &mut ForeignKey::create() .name("FK_sink_object_id") @@ -643,6 +645,13 @@ impl MigrationTrait for Migration { .to(Connection::Table, Connection::ConnectionId) .to_owned(), ) + .foreign_key( + &mut ForeignKey::create() + .name("FK_sink_target_table_id") + .from(Sink::Table, Sink::TargetTable) + .to(Table::Table, Table::TableId) + .to_owned(), + ) .to_owned(), ) .await?; @@ -1034,6 +1043,7 @@ enum Table { Description, Version, RetentionSeconds, + IncomingSinks, } #[derive(DeriveIden)] @@ -1069,6 +1079,7 @@ enum Sink { DbName, SinkFromName, SinkFormatDesc, + TargetTable, } #[derive(DeriveIden)] diff --git a/src/meta/model_v2/src/sink.rs b/src/meta/model_v2/src/sink.rs index 0d93b43803057..f5fa59c85ff5f 100644 --- a/src/meta/model_v2/src/sink.rs +++ b/src/meta/model_v2/src/sink.rs @@ -18,6 +18,7 @@ use sea_orm::ActiveValue::Set; use crate::{ ColumnCatalogArray, ColumnOrderArray, ConnectionId, I32Array, Property, SinkFormatDesc, SinkId, + TableId, }; #[derive(Clone, Debug, PartialEq, Eq, EnumIter, DeriveActiveEnum)] @@ -69,6 +70,7 @@ pub struct Model { pub db_name: String, pub sink_from_name: String, pub sink_format_desc: Option, + pub target_table: Option, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -123,6 +125,7 @@ impl From for ActiveModel { db_name: Set(pb_sink.db_name), sink_from_name: Set(pb_sink.sink_from_name), sink_format_desc: Set(pb_sink.format_desc.map(|x| x.into())), + target_table: Set(pb_sink.target_table.map(|x| x as _)), } } } diff --git a/src/meta/model_v2/src/table.rs b/src/meta/model_v2/src/table.rs index 963b683f4ef57..06710d42d9d25 100644 --- a/src/meta/model_v2/src/table.rs +++ b/src/meta/model_v2/src/table.rs @@ -123,6 +123,7 @@ pub struct Model { pub description: Option, pub version: Option, pub retention_seconds: Option, + pub incoming_sinks: I32Array, } #[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)] @@ -232,6 +233,7 @@ impl From for ActiveModel { description: Set(pb_table.description), version: Set(pb_table.version.map(|v| v.into())), retention_seconds: Set(pb_table.retention_seconds.map(|i| i as _)), + incoming_sinks: Set(pb_table.incoming_sinks.into()), } } } diff --git a/src/meta/node/src/server.rs b/src/meta/node/src/server.rs index 59e520a3316fb..4792293863237 100644 --- a/src/meta/node/src/server.rs +++ b/src/meta/node/src/server.rs @@ -186,6 +186,7 @@ pub async fn rpc_serve( let mut options = sea_orm::ConnectOptions::new(endpoint); options .max_connections(max_connection) + .sqlx_logging(false) .connect_timeout(Duration::from_secs(10)) .idle_timeout(Duration::from_secs(30)); let conn = sea_orm::Database::connect(options).await?; diff --git a/src/meta/src/barrier/command.rs b/src/meta/src/barrier/command.rs index 6b1b73d6ca697..07765fe840c38 100644 --- a/src/meta/src/barrier/command.rs +++ b/src/meta/src/barrier/command.rs @@ -928,6 +928,27 @@ impl CommandContext { init_split_assignment, ) .await?; + + if let Some(ReplaceTablePlan { + new_table_fragments, + dispatchers, + init_split_assignment, + old_table_fragments, + .. + }) = replace_table + { + // Tell compute nodes to drop actors. + self.clean_up(old_table_fragments.actor_ids()).await?; + + mgr.catalog_controller + .post_collect_table_fragments( + new_table_fragments.table_id().table_id as _, + new_table_fragments.actor_ids(), + dispatchers.clone(), + init_split_assignment, + ) + .await?; + } } } diff --git a/src/meta/src/barrier/mod.rs b/src/meta/src/barrier/mod.rs index 7be8a5e218103..b393db2d7dd41 100644 --- a/src/meta/src/barrier/mod.rs +++ b/src/meta/src/barrier/mod.rs @@ -522,9 +522,7 @@ impl GlobalBarrierManager { let paused = self.take_pause_on_bootstrap().await.unwrap_or(false); let paused_reason = paused.then_some(PausedReason::Manual); - self.recovery(prev_epoch, paused_reason) - .instrument(span) - .await; + self.recovery(paused_reason).instrument(span).await; } self.context.set_status(BarrierManagerStatus::Running); @@ -721,7 +719,7 @@ impl GlobalBarrierManager { // No need to clean dirty tables for barrier recovery, // The foreground stream job should cleanup their own tables. - self.recovery(prev_epoch, None).instrument(span).await; + self.recovery(None).instrument(span).await; self.context.set_status(BarrierManagerStatus::Running); } else { panic!("failed to execute barrier: {}", err.as_report()); diff --git a/src/meta/src/barrier/recovery.rs b/src/meta/src/barrier/recovery.rs index 3f7f3911e7302..1ced92ce61c3a 100644 --- a/src/meta/src/barrier/recovery.rs +++ b/src/meta/src/barrier/recovery.rs @@ -329,7 +329,14 @@ impl GlobalBarrierManager { /// the cluster or `risectl` command. Used for debugging purpose. /// /// Returns the new state of the barrier manager after recovery. - pub async fn recovery(&mut self, prev_epoch: TracedEpoch, paused_reason: Option) { + pub async fn recovery(&mut self, paused_reason: Option) { + let prev_epoch = TracedEpoch::new( + self.context + .hummock_manager + .latest_snapshot() + .committed_epoch + .into(), + ); // Mark blocked and abort buffered schedules, they might be dirty already. self.scheduled_barriers .abort_and_mark_blocked("cluster is under recovering"); @@ -399,9 +406,11 @@ impl GlobalBarrierManager { }; // Reset all compute nodes, stop and drop existing actors. - self.reset_compute_nodes(&info).await.inspect_err(|err| { - warn!(error = %err.as_report(), "reset compute nodes failed"); - })?; + self.reset_compute_nodes(&info, prev_epoch.value().0) + .await + .inspect_err(|err| { + warn!(error = %err.as_report(), "reset compute nodes failed"); + })?; if self.pre_apply_drop_cancel().await? { info = self @@ -447,21 +456,6 @@ impl GlobalBarrierManager { tracing::Span::current(), // recovery span )); - #[cfg(not(all(test, feature = "failpoints")))] - { - use risingwave_common::util::epoch::INVALID_EPOCH; - - let mce = self - .context - .hummock_manager - .get_current_max_committed_epoch() - .await; - - if mce != INVALID_EPOCH { - command_ctx.wait_epoch_commit(mce).await?; - } - }; - let res = match self .context .inject_barrier(command_ctx.clone(), None, None) @@ -1055,14 +1049,18 @@ impl GlobalBarrierManager { } /// Reset all compute nodes by calling `force_stop_actors`. - async fn reset_compute_nodes(&self, info: &InflightActorInfo) -> MetaResult<()> { - debug!(worker = ?info.node_map.keys().collect_vec(), "force stop actors"); + async fn reset_compute_nodes( + &self, + info: &InflightActorInfo, + prev_epoch: u64, + ) -> MetaResult<()> { + debug!(prev_epoch, worker = ?info.node_map.keys().collect_vec(), "force stop actors"); self.context .stream_rpc_manager - .force_stop_actors(info.node_map.values()) + .force_stop_actors(info.node_map.values(), prev_epoch) .await?; - debug!("all compute nodes have been reset."); + debug!(prev_epoch, "all compute nodes have been reset."); Ok(()) } diff --git a/src/meta/src/barrier/rpc.rs b/src/meta/src/barrier/rpc.rs index 7662c331f3272..b258cbec59524 100644 --- a/src/meta/src/barrier/rpc.rs +++ b/src/meta/src/barrier/rpc.rs @@ -362,11 +362,13 @@ impl StreamRpcManager { pub async fn force_stop_actors( &self, nodes: impl Iterator, + prev_epoch: u64, ) -> MetaResult<()> { self.broadcast(nodes, |client| async move { client .force_stop_actors(ForceStopActorsRequest { request_id: Self::new_request_id(), + prev_epoch, }) .await }) diff --git a/src/meta/src/controller/mod.rs b/src/meta/src/controller/mod.rs index 4873a42809b0b..7fd70318a1ff3 100644 --- a/src/meta/src/controller/mod.rs +++ b/src/meta/src/controller/mod.rs @@ -134,8 +134,7 @@ impl From> for PbTable { .optional_associated_source_id .map(|id| PbOptionalAssociatedSourceId::AssociatedSourceId(id as _)), description: value.0.description, - // TODO: fix it for model v2. - incoming_sinks: vec![], + incoming_sinks: value.0.incoming_sinks.into_u32_array(), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, retention_seconds: value.0.retention_seconds.map(|id| id as u32), @@ -204,8 +203,7 @@ impl From> for PbSink { sink_from_name: value.0.sink_from_name, stream_job_status: PbStreamJobStatus::Created as _, // todo: deprecate it. format_desc: value.0.sink_format_desc.map(|desc| desc.0), - // todo: fix this for model v2 - target_table: None, + target_table: value.0.target_table.map(|id| id as _), initialized_at_cluster_version: value.1.initialized_at_cluster_version, created_at_cluster_version: value.1.created_at_cluster_version, } diff --git a/src/meta/src/controller/streaming_job.rs b/src/meta/src/controller/streaming_job.rs index 4fc84dda21d55..9bb8af6172469 100644 --- a/src/meta/src/controller/streaming_job.rs +++ b/src/meta/src/controller/streaming_job.rs @@ -552,8 +552,8 @@ impl CatalogController { streaming_job: StreamingJob, merge_updates: Vec, table_col_index_mapping: Option, - _creating_sink_id: Option, - _dropping_sink_id: Option, + creating_sink_id: Option, + dropping_sink_id: Option, ) -> MetaResult { // Question: The source catalog should be remain unchanged? let StreamingJob::Table(_, table, ..) = streaming_job else { @@ -564,7 +564,22 @@ impl CatalogController { let txn = inner.db.begin().await?; let job_id = table.id as ObjectId; - let table = table::ActiveModel::from(table).update(&txn).await?; + let mut table = table::ActiveModel::from(table); + let mut incoming_sinks = table.incoming_sinks.as_ref().inner_ref().clone(); + if let Some(sink_id) = creating_sink_id { + debug_assert!(!incoming_sinks.contains(&(sink_id as i32))); + incoming_sinks.push(sink_id as _); + } + + if let Some(sink_id) = dropping_sink_id { + let drained = incoming_sinks + .extract_if(|id| *id == sink_id as i32) + .collect_vec(); + debug_assert_eq!(drained, vec![sink_id as i32]); + } + + table.incoming_sinks = Set(incoming_sinks.into()); + let table = table.update(&txn).await?; // let old_fragment_mappings = get_fragment_mappings(&txn, job_id).await?; // 1. replace old fragments/actors with new ones. diff --git a/src/meta/src/rpc/ddl_controller.rs b/src/meta/src/rpc/ddl_controller.rs index 25b6ed25464a3..7cff9dc4a9b7a 100644 --- a/src/meta/src/rpc/ddl_controller.rs +++ b/src/meta/src/rpc/ddl_controller.rs @@ -389,8 +389,13 @@ impl DdlController { .await } MetadataManager::V2(_) => { - self.drop_object(ObjectType::Database, database_id as _, DropMode::Cascade) - .await + self.drop_object( + ObjectType::Database, + database_id as _, + DropMode::Cascade, + None, + ) + .await } } } @@ -409,7 +414,7 @@ impl DdlController { match &self.metadata_manager { MetadataManager::V1(mgr) => mgr.catalog_manager.drop_schema(schema_id).await, MetadataManager::V2(_) => { - self.drop_object(ObjectType::Schema, schema_id as _, DropMode::Restrict) + self.drop_object(ObjectType::Schema, schema_id as _, DropMode::Restrict, None) .await } } @@ -453,7 +458,7 @@ impl DdlController { ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { return self - .drop_object(ObjectType::Source, source_id as _, drop_mode) + .drop_object(ObjectType::Source, source_id as _, drop_mode, None) .await; }; // 1. Drop source in catalog. @@ -523,7 +528,7 @@ impl DdlController { ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { return self - .drop_object(ObjectType::View, view_id as _, drop_mode) + .drop_object(ObjectType::View, view_id as _, drop_mode, None) .await; }; let (version, streaming_job_ids) = mgr @@ -569,6 +574,7 @@ impl DdlController { ObjectType::Connection, connection_id as _, DropMode::Restrict, + None, ) .await } @@ -616,7 +622,7 @@ impl DdlController { ) -> MetaResult { let MetadataManager::V1(mgr) = &self.metadata_manager else { return self - .create_streaming_job_v2(stream_job, fragment_graph) + .create_streaming_job_v2(stream_job, fragment_graph, affected_table_replace_info) .await; }; let id = self.gen_unique_id::<{ IdCategory::Table }>().await?; @@ -843,9 +849,10 @@ impl DdlController { // Here we modify the union node of the downstream table by the TableFragments of the to-be-created sink upstream. // The merge in the union has already been set up in the frontend and will be filled with specific upstream actors in this function. // Meanwhile, the Dispatcher corresponding to the upstream of the merge will also be added to the replace table context here. - async fn inject_replace_table_job_for_table_sink( + pub(crate) async fn inject_replace_table_job_for_table_sink( &self, - mgr: &MetadataManagerV1, + dummy_id: u32, + mgr: &MetadataManager, stream_ctx: StreamContext, sink: Option<&Sink>, creating_sink_table_fragments: Option<&TableFragments>, @@ -853,12 +860,6 @@ impl DdlController { streaming_job: &StreamingJob, fragment_graph: StreamFragmentGraph, ) -> MetaResult<(ReplaceTableContext, TableFragments)> { - let dummy_id = self - .env - .id_gen_manager() - .generate::<{ IdCategory::Table }>() - .await? as u32; - let (mut replace_table_ctx, mut table_fragments) = self .build_replace_table(stream_ctx, streaming_job, fragment_graph, None, dummy_id) .await?; @@ -901,17 +902,14 @@ impl DdlController { } let [table_catalog]: [_; 1] = mgr - .catalog_manager - .get_tables(&[table.id]) - .await + .get_table_catalog_by_ids(vec![table.id]) + .await? .try_into() .expect("Target table should exist in sink into table"); assert_eq!(table_catalog.incoming_sinks, table.incoming_sinks); { - let guard = mgr.fragment_manager.get_fragment_read_guard().await; - for sink_id in &table_catalog.incoming_sinks { if let Some(dropping_sink_id) = dropping_sink_id && *sink_id == dropping_sink_id @@ -919,10 +917,9 @@ impl DdlController { continue; }; - let sink_table_fragments = guard - .table_fragments() - .get(&risingwave_common::catalog::TableId::new(*sink_id)) - .unwrap(); + let sink_table_fragments = mgr + .get_job_fragments_by_id(&risingwave_common::catalog::TableId::new(*sink_id)) + .await?; let sink_fragment = sink_table_fragments.sink_fragment().unwrap(); @@ -1133,16 +1130,18 @@ impl DdlController { .await } MetadataManager::V2(_) => { - if target_replace_info.is_some() { - unimplemented!("support replace table for drop in v2"); - } let (object_id, object_type) = match job_id { StreamingJobId::MaterializedView(id) => (id as _, ObjectType::Table), StreamingJobId::Sink(id) => (id as _, ObjectType::Sink), StreamingJobId::Table(_, id) => (id as _, ObjectType::Table), StreamingJobId::Index(idx) => (idx as _, ObjectType::Index), }; - self.drop_object(object_type, object_id, drop_mode).await + + let version = self + .drop_object(object_type, object_id, drop_mode, target_replace_info) + .await?; + + Ok(version) } } } @@ -1218,9 +1217,17 @@ impl DdlController { let result: MetaResult<()> = try { tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink"); + + let dummy_id = self + .env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32; + let (context, table_fragments) = self .inject_replace_table_job_for_table_sink( - mgr, + dummy_id, + &self.metadata_manager, stream_ctx, None, None, @@ -1387,13 +1394,31 @@ impl DdlController { let StreamingJob::Sink(s, _) = stream_job else { bail!("additional replace table event only occurs when sinking into table"); }; - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support create sink into table in v2"); + + let dummy_id = match &self.metadata_manager { + MetadataManager::V1(_) => { + self.env + .id_gen_manager() + .generate::<{ IdCategory::Table }>() + .await? as u32 + } + MetadataManager::V2(mgr) => { + let table = streaming_job.table().unwrap(); + mgr.catalog_controller + .create_job_catalog_for_replace( + &streaming_job, + &stream_ctx, + table.get_version()?, + &fragment_graph.default_parallelism(), + ) + .await? as u32 + } }; let (context, table_fragments) = self .inject_replace_table_job_for_table_sink( - mgr, + dummy_id, + &self.metadata_manager, stream_ctx, Some(s), Some(&table_fragments), diff --git a/src/meta/src/rpc/ddl_controller_v2.rs b/src/meta/src/rpc/ddl_controller_v2.rs index 748857d2c3da7..126d040997a6b 100644 --- a/src/meta/src/rpc/ddl_controller_v2.rs +++ b/src/meta/src/rpc/ddl_controller_v2.rs @@ -29,7 +29,9 @@ use crate::manager::{ MetadataManagerV2, NotificationVersion, StreamingJob, IGNORED_NOTIFICATION_VERSION, }; use crate::model::{MetadataModel, StreamContext}; -use crate::rpc::ddl_controller::{fill_table_stream_graph_info, DdlController, DropMode}; +use crate::rpc::ddl_controller::{ + fill_table_stream_graph_info, DdlController, DropMode, ReplaceTableInfo, +}; use crate::stream::{validate_sink, StreamFragmentGraph}; use crate::MetaResult; @@ -38,6 +40,7 @@ impl DdlController { &self, mut streaming_job: StreamingJob, mut fragment_graph: StreamFragmentGraphProto, + affected_table_replace_info: Option, ) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); @@ -80,7 +83,13 @@ impl DdlController { // create streaming job. match self - .create_streaming_job_inner_v2(mgr, ctx, &mut streaming_job, fragment_graph) + .create_streaming_job_inner_v2( + mgr, + ctx, + &mut streaming_job, + fragment_graph, + affected_table_replace_info, + ) .await { Ok(version) => Ok(version), @@ -110,6 +119,7 @@ impl DdlController { ctx: StreamContext, streaming_job: &mut StreamingJob, fragment_graph: StreamFragmentGraphProto, + affected_table_replace_info: Option, ) -> MetaResult { let mut fragment_graph = StreamFragmentGraph::new(&self.env, fragment_graph, streaming_job).await?; @@ -124,10 +134,34 @@ impl DdlController { .await?; fragment_graph.refill_internal_table_ids(table_id_map); + let affected_table_replace_info = match affected_table_replace_info { + Some(replace_table_info) => { + let ReplaceTableInfo { + mut streaming_job, + fragment_graph, + .. + } = replace_table_info; + + let fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + Some((streaming_job, fragment_graph)) + } + None => None, + }; + // create fragment and actor catalogs. tracing::debug!(id = streaming_job.id(), "building streaming job"); let (ctx, table_fragments) = self - .build_stream_job(ctx, streaming_job, fragment_graph, None) + .build_stream_job( + ctx, + streaming_job, + fragment_graph, + affected_table_replace_info, + ) .await?; match streaming_job { @@ -139,9 +173,12 @@ impl DdlController { self.source_manager.register_source(source).await?; } StreamingJob::Sink(sink, target_table) => { - if target_table.is_some() { - unimplemented!("support create sink into table in v2"); + if let Some((StreamingJob::Table(source, table, _), ..)) = + &ctx.replace_table_job_info + { + *target_table = Some((table.clone(), source.clone())); } + // Validate the sink on the connector node. validate_sink(sink).await?; } @@ -160,13 +197,39 @@ impl DdlController { let stream_job_id = streaming_job.id(); match streaming_job.create_type() { CreateType::Unspecified | CreateType::Foreground => { + let replace_table_job_info = ctx.replace_table_job_info.as_ref().map( + |(streaming_job, ctx, table_fragments)| { + ( + streaming_job.clone(), + ctx.merge_updates.clone(), + table_fragments.table_id(), + ) + }, + ); + self.stream_manager .create_streaming_job(table_fragments, ctx) .await?; - let version = mgr + + let mut version = mgr .catalog_controller .finish_streaming_job(stream_job_id as _) .await?; + + if let Some((streaming_job, merge_updates, table_id)) = replace_table_job_info { + version = mgr + .catalog_controller + .finish_replace_streaming_job( + table_id.table_id as _, + streaming_job, + merge_updates, + None, + Some(stream_job_id), + None, + ) + .await?; + } + Ok(version) } CreateType::Background => { @@ -199,9 +262,10 @@ impl DdlController { object_type: ObjectType, object_id: ObjectId, drop_mode: DropMode, + target_replace_info: Option, ) -> MetaResult { let mgr = self.metadata_manager.as_v2_ref(); - let (release_ctx, version) = match object_type { + let (release_ctx, mut version) = match object_type { ObjectType::Database => mgr.catalog_controller.drop_database(object_id).await?, ObjectType::Schema => { return mgr @@ -224,6 +288,97 @@ impl DdlController { } }; + if let Some(replace_table_info) = target_replace_info { + let stream_ctx = + StreamContext::from_protobuf(replace_table_info.fragment_graph.get_ctx().unwrap()); + + let ReplaceTableInfo { + mut streaming_job, + fragment_graph, + .. + } = replace_table_info; + + let sink_id = if let ObjectType::Sink = object_type { + object_id as _ + } else { + panic!("additional replace table event only occurs when dropping sink into table") + }; + + let fragment_graph = + StreamFragmentGraph::new(&self.env, fragment_graph, &streaming_job).await?; + streaming_job.set_table_fragment_id(fragment_graph.table_fragment_id()); + streaming_job.set_dml_fragment_id(fragment_graph.dml_fragment_id()); + let streaming_job = streaming_job; + + let table = streaming_job.table().unwrap(); + + tracing::debug!(id = streaming_job.id(), "replacing table for dropped sink"); + let dummy_id = mgr + .catalog_controller + .create_job_catalog_for_replace( + &streaming_job, + &stream_ctx, + table.get_version()?, + &fragment_graph.default_parallelism(), + ) + .await? as u32; + + let (ctx, table_fragments) = self + .inject_replace_table_job_for_table_sink( + dummy_id, + &self.metadata_manager, + stream_ctx, + None, + None, + Some(sink_id), + &streaming_job, + fragment_graph, + ) + .await?; + + let result: MetaResult> = try { + let merge_updates = ctx.merge_updates.clone(); + + mgr.catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .await?; + + self.stream_manager + .replace_table(table_fragments, ctx) + .await?; + + merge_updates + }; + + version = match result { + Ok(merge_updates) => { + let version = mgr + .catalog_controller + .finish_replace_streaming_job( + dummy_id as _, + streaming_job, + merge_updates, + None, + None, + Some(sink_id), + ) + .await?; + Ok(version) + } + Err(err) => { + tracing::error!(id = object_id, error = ?err.as_report(), "failed to replace table"); + let _ = mgr + .catalog_controller + .try_abort_replacing_streaming_job(dummy_id as _) + .await + .inspect_err(|err| { + tracing::error!(id = object_id, error = ?err.as_report(), "failed to abort replacing table"); + }); + Err(err) + } + }?; + } + let ReleaseContext { state_table_ids, source_ids, diff --git a/src/meta/src/stream/stream_manager.rs b/src/meta/src/stream/stream_manager.rs index e0f4f8cb82b53..a949780e06ed1 100644 --- a/src/meta/src/stream/stream_manager.rs +++ b/src/meta/src/stream/stream_manager.rs @@ -31,7 +31,7 @@ use super::{Locations, RescheduleOptions, ScaleControllerRef, TableResizePolicy} use crate::barrier::{BarrierScheduler, Command, ReplaceTablePlan, StreamRpcManager}; use crate::hummock::HummockManagerRef; use crate::manager::{DdlType, MetaSrvEnv, MetadataManager, StreamingJob}; -use crate::model::{ActorId, TableFragments, TableParallelism}; +use crate::model::{ActorId, MetadataModel, TableFragments, TableParallelism}; use crate::stream::SourceManagerRef; use crate::{MetaError, MetaResult}; @@ -430,10 +430,7 @@ impl GlobalStreamManager { self.build_actors(&table_fragments, &building_locations, &existing_locations) .await?; - if let Some((_, context, table_fragments)) = replace_table_job_info { - let MetadataManager::V1(mgr) = &self.metadata_manager else { - unimplemented!("support create sink into table in v2"); - }; + if let Some((streaming_job, context, table_fragments)) = replace_table_job_info { self.build_actors( &table_fragments, &context.building_locations, @@ -441,10 +438,19 @@ impl GlobalStreamManager { ) .await?; - // Add table fragments to meta store with state: `State::Initial`. - mgr.fragment_manager - .start_create_table_fragments(table_fragments.clone()) - .await?; + match &self.metadata_manager { + MetadataManager::V1(mgr) => { + // Add table fragments to meta store with state: `State::Initial`. + mgr.fragment_manager + .start_create_table_fragments(table_fragments.clone()) + .await? + } + MetadataManager::V2(mgr) => { + mgr.catalog_controller + .prepare_streaming_job(table_fragments.to_protobuf(), &streaming_job, true) + .await? + } + } let dummy_table_id = table_fragments.table_id(); diff --git a/src/storage/hummock_test/src/bin/replay/replay_impl.rs b/src/storage/hummock_test/src/bin/replay/replay_impl.rs index ef9b671da9b49..c3dedd6dbae46 100644 --- a/src/storage/hummock_test/src/bin/replay/replay_impl.rs +++ b/src/storage/hummock_test/src/bin/replay/replay_impl.rs @@ -186,12 +186,8 @@ impl ReplayStateStore for GlobalReplayImpl { Ok(()) } - async fn clear_shared_buffer(&self) -> Result<()> { - self.store - .clear_shared_buffer() - .await - .map_err(|_| TraceError::ClearSharedBufferFailed)?; - Ok(()) + async fn clear_shared_buffer(&self, prev_epoch: u64) { + self.store.clear_shared_buffer(prev_epoch).await } } pub(crate) struct LocalReplayImpl(LocalHummockStorage); diff --git a/src/storage/hummock_test/src/state_store_tests.rs b/src/storage/hummock_test/src/state_store_tests.rs index e52983dc787a5..d7f7531b8b65b 100644 --- a/src/storage/hummock_test/src/state_store_tests.rs +++ b/src/storage/hummock_test/src/state_store_tests.rs @@ -1330,7 +1330,7 @@ async fn test_gc_watermark_and_clear_shared_buffer() { drop(local_hummock_storage); - hummock_storage.clear_shared_buffer().await.unwrap(); + hummock_storage.clear_shared_buffer(epoch1).await; assert_eq!( hummock_storage diff --git a/src/storage/hummock_test/src/test_utils.rs b/src/storage/hummock_test/src/test_utils.rs index 554f7bd8b8be1..f5d1a10a18839 100644 --- a/src/storage/hummock_test/src/test_utils.rs +++ b/src/storage/hummock_test/src/test_utils.rs @@ -34,7 +34,7 @@ use risingwave_storage::filter_key_extractor::{ RpcFilterKeyExtractorManager, }; use risingwave_storage::hummock::backup_reader::BackupReader; -use risingwave_storage::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; +use risingwave_storage::hummock::event_handler::HummockVersionUpdate; use risingwave_storage::hummock::iterator::test_utils::mock_sstable_store; use risingwave_storage::hummock::local_version::pinned_version::PinnedVersion; use risingwave_storage::hummock::observer_manager::HummockObserverNode; @@ -53,8 +53,8 @@ pub async fn prepare_first_valid_version( worker_node: WorkerNode, ) -> ( PinnedVersion, - UnboundedSender, - UnboundedReceiver, + UnboundedSender, + UnboundedReceiver, ) { let (tx, mut rx) = unbounded_channel(); let notification_client = @@ -73,7 +73,7 @@ pub async fn prepare_first_valid_version( .await; observer_manager.start().await; let hummock_version = match rx.recv().await { - Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, + Some(HummockVersionUpdate::PinnedVersion(version)) => version, _ => unreachable!("should be full version"), }; diff --git a/src/storage/hummock_trace/src/collector.rs b/src/storage/hummock_trace/src/collector.rs index 264d2445ca76d..b1a269a4620ee 100644 --- a/src/storage/hummock_trace/src/collector.rs +++ b/src/storage/hummock_trace/src/collector.rs @@ -214,8 +214,11 @@ impl TraceSpan { Self::new_global_op(Operation::SealCurrentEpoch { epoch, opts }, storage_type) } - pub fn new_clear_shared_buffer_span() -> MayTraceSpan { - Self::new_global_op(Operation::ClearSharedBuffer, StorageType::Global) + pub fn new_clear_shared_buffer_span(prev_epoch: u64) -> MayTraceSpan { + Self::new_global_op( + Operation::ClearSharedBuffer(prev_epoch), + StorageType::Global, + ) } pub fn new_validate_read_epoch_span(epoch: HummockReadEpoch) -> MayTraceSpan { diff --git a/src/storage/hummock_trace/src/record.rs b/src/storage/hummock_trace/src/record.rs index 7bd0a86d0e222..f8e4ceed65449 100644 --- a/src/storage/hummock_trace/src/record.rs +++ b/src/storage/hummock_trace/src/record.rs @@ -170,7 +170,7 @@ pub enum Operation { TryWaitEpoch(TracedHummockReadEpoch), /// clear shared buffer - ClearSharedBuffer, + ClearSharedBuffer(u64), /// Seal current epoch SealCurrentEpoch { @@ -299,7 +299,6 @@ pub enum OperationResult { Sync(TraceResult), NotifyHummock(TraceResult<()>), TryWaitEpoch(TraceResult<()>), - ClearSharedBuffer(TraceResult<()>), ValidateReadEpoch(TraceResult<()>), LocalStorageEpoch(TraceResult), LocalStorageIsDirty(TraceResult), diff --git a/src/storage/hummock_trace/src/replay/mod.rs b/src/storage/hummock_trace/src/replay/mod.rs index 99aa1c0c37144..046ab67b18607 100644 --- a/src/storage/hummock_trace/src/replay/mod.rs +++ b/src/storage/hummock_trace/src/replay/mod.rs @@ -120,7 +120,7 @@ pub trait ReplayStateStore { async fn notify_hummock(&self, info: Info, op: RespOperation, version: u64) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - async fn clear_shared_buffer(&self) -> Result<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } @@ -152,7 +152,7 @@ mock! { ) -> Result; async fn new_local(&self, opts: TracedNewLocalOptions) -> Box; async fn try_wait_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; - async fn clear_shared_buffer(&self) -> Result<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); fn validate_read_epoch(&self, epoch: HummockReadEpoch) -> Result<()>; } impl GlobalReplay for GlobalReplayInterface{} diff --git a/src/storage/hummock_trace/src/replay/worker.rs b/src/storage/hummock_trace/src/replay/worker.rs index 7bc821a26c5b9..4d37708420d48 100644 --- a/src/storage/hummock_trace/src/replay/worker.rs +++ b/src/storage/hummock_trace/src/replay/worker.rs @@ -327,22 +327,9 @@ impl ReplayWorker { ); } } - Operation::ClearSharedBuffer => { + Operation::ClearSharedBuffer(prev_epoch) => { assert_eq!(storage_type, StorageType::Global); - let res = res_rx.recv().await.expect("recv result failed"); - if let OperationResult::ClearSharedBuffer(expected) = res { - let actual = replay.clear_shared_buffer().await; - assert_eq!( - TraceResult::from(actual), - expected, - "clear_shared_buffer wrong" - ); - } else { - panic!( - "wrong clear_shared_buffer result, expect epoch result, but got {:?}", - res - ); - } + replay.clear_shared_buffer(prev_epoch).await; } Operation::SealCurrentEpoch { epoch, opts } => { assert_ne!(storage_type, StorageType::Global); diff --git a/src/storage/src/hummock/event_handler/hummock_event_handler.rs b/src/storage/src/hummock/event_handler/hummock_event_handler.rs index 615bb293a4116..f1209775e154a 100644 --- a/src/storage/src/hummock/event_handler/hummock_event_handler.rs +++ b/src/storage/src/hummock/event_handler/hummock_event_handler.rs @@ -22,10 +22,12 @@ use await_tree::InstrumentAwait; use itertools::Itertools; use parking_lot::RwLock; use prometheus::core::{AtomicU64, GenericGauge}; +use risingwave_hummock_sdk::compaction_group::hummock_version_ext::SstDeltaInfo; use risingwave_hummock_sdk::{HummockEpoch, LocalSstableInfo}; use thiserror_ext::AsReport; use tokio::spawn; -use tokio::sync::{mpsc, oneshot}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; +use tokio::sync::oneshot; use tracing::{debug, error, info, trace, warn}; use super::refiller::{CacheRefillConfig, CacheRefiller}; @@ -33,7 +35,7 @@ use super::{LocalInstanceGuard, LocalInstanceId, ReadVersionMappingType}; use crate::filter_key_extractor::FilterKeyExtractorManager; use crate::hummock::compactor::{compact, CompactorContext}; use crate::hummock::conflict_detector::ConflictDetector; -use crate::hummock::event_handler::refiller::CacheRefillerEvent; +use crate::hummock::event_handler::refiller::{CacheRefillerEvent, SpawnRefillTask}; use crate::hummock::event_handler::uploader::{ default_spawn_merging_task, HummockUploader, SpawnMergingTask, SpawnUploadTask, SyncedData, UploadTaskInfo, UploadTaskPayload, UploaderEvent, @@ -117,8 +119,9 @@ impl BufferTracker { } pub struct HummockEventHandler { - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + hummock_event_tx: UnboundedSender, + hummock_event_rx: UnboundedReceiver, + version_update_rx: UnboundedReceiver, pending_sync_requests: BTreeMap>>, read_version_mapping: Arc>, /// A copy of `read_version_mapping` but owned by event handler @@ -164,8 +167,7 @@ async fn flush_imms( impl HummockEventHandler { pub fn new( - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + version_update_rx: UnboundedReceiver, pinned_version: PinnedVersion, compactor_context: CompactorContext, filter_key_extractor_manager: FilterKeyExtractorManager, @@ -175,8 +177,7 @@ impl HummockEventHandler { let upload_compactor_context = compactor_context.clone(); let cloned_sstable_object_id_manager = sstable_object_id_manager.clone(); Self::new_inner( - hummock_event_tx, - hummock_event_rx, + version_update_rx, pinned_version, Some(sstable_object_id_manager), compactor_context.sstable_store.clone(), @@ -192,12 +193,12 @@ impl HummockEventHandler { )) }), default_spawn_merging_task(compactor_context.compaction_executor.clone()), + CacheRefiller::default_spawn_refill_task(), ) } fn new_inner( - hummock_event_tx: mpsc::UnboundedSender, - hummock_event_rx: mpsc::UnboundedReceiver, + version_update_rx: UnboundedReceiver, pinned_version: PinnedVersion, sstable_object_id_manager: Option>, sstable_store: SstableStoreRef, @@ -205,7 +206,9 @@ impl HummockEventHandler { storage_opts: &StorageOpts, spawn_upload_task: SpawnUploadTask, spawn_merging_task: SpawnMergingTask, + spawn_refill_task: SpawnRefillTask, ) -> Self { + let (hummock_event_tx, hummock_event_rx) = unbounded_channel(); let (version_update_notifier_tx, _) = tokio::sync::watch::channel(pinned_version.max_committed_epoch()); let version_update_notifier_tx = Arc::new(version_update_notifier_tx); @@ -227,11 +230,13 @@ impl HummockEventHandler { let refiller = CacheRefiller::new( CacheRefillConfig::from_storage_opts(storage_opts), sstable_store, + spawn_refill_task, ); Self { hummock_event_tx, hummock_event_rx, + version_update_rx, pending_sync_requests: Default::default(), version_update_notifier_tx, pinned_version: Arc::new(ArcSwap::from_pointee(pinned_version)), @@ -257,6 +262,10 @@ impl HummockEventHandler { ReadOnlyRwLockRef::new(self.read_version_mapping.clone()) } + pub fn event_sender(&self) -> UnboundedSender { + self.hummock_event_tx.clone() + } + pub fn buffer_tracker(&self) -> &BufferTracker { self.uploader.buffer_tracker() } @@ -395,16 +404,74 @@ impl HummockEventHandler { } } - fn handle_clear(&mut self, notifier: oneshot::Sender<()>) { + async fn handle_clear(&mut self, notifier: oneshot::Sender<()>, prev_epoch: u64) { info!( - "handle clear event. max_committed_epoch: {}, max_synced_epoch: {}, max_sealed_epoch: {}", - self.uploader.max_committed_epoch(), - self.uploader.max_synced_epoch(), - self.uploader.max_sealed_epoch(), + prev_epoch, + max_committed_epoch = self.uploader.max_committed_epoch(), + max_synced_epoch = self.uploader.max_synced_epoch(), + max_sealed_epoch = self.uploader.max_sealed_epoch(), + "handle clear event" ); self.uploader.clear(); + let current_version = self.uploader.hummock_version(); + + if current_version.max_committed_epoch() < prev_epoch { + let mut latest_version = if let Some(CacheRefillerEvent { + pinned_version, + new_pinned_version, + }) = self.refiller.clear() + { + assert_eq!( + current_version.id(), + pinned_version.id(), + "refiller earliest version {:?} not equal to current version {:?}", + pinned_version.version(), + current_version.version() + ); + + info!( + prev_epoch, + current_mce = current_version.max_committed_epoch(), + refiller_mce = new_pinned_version.max_committed_epoch(), + "refiller is clear in recovery" + ); + + Some(new_pinned_version) + } else { + None + }; + + while let latest_version_ref = latest_version.as_ref().unwrap_or(current_version) + && latest_version_ref.max_committed_epoch() < prev_epoch + { + let version_update = self + .version_update_rx + .recv() + .await + .expect("should not be empty"); + latest_version = Some(Self::resolve_version_update_info( + latest_version_ref.clone(), + version_update, + None, + )); + } + + self.apply_version_update( + current_version.clone(), + latest_version.expect("must have some version update to raise the mce"), + ); + } + + assert!(self.uploader.max_committed_epoch() >= prev_epoch); + if self.uploader.max_committed_epoch() > prev_epoch { + warn!( + mce = self.uploader.max_committed_epoch(), + prev_epoch, "mce higher than clear prev_epoch" + ); + } + for (epoch, result_sender) in self.pending_sync_requests.extract_if(|_, _| true) { send_sync_result( result_sender, @@ -433,6 +500,8 @@ impl HummockEventHandler { let _ = notifier.send(()).inspect_err(|e| { error!("failed to notify completion of clear event: {:?}", e); }); + + info!(prev_epoch, "clear finished"); } fn handle_version_update(&mut self, version_payload: HummockVersionUpdate) { @@ -440,17 +509,34 @@ impl HummockEventHandler { .refiller .last_new_pinned_version() .cloned() - .map(Arc::new) - .unwrap_or_else(|| self.pinned_version.load().clone()); + .unwrap_or_else(|| self.uploader.hummock_version().clone()); let mut sst_delta_infos = vec![]; + let new_pinned_version = Self::resolve_version_update_info( + pinned_version.clone(), + version_payload, + Some(&mut sst_delta_infos), + ); + + self.refiller + .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); + } + + fn resolve_version_update_info( + pinned_version: PinnedVersion, + version_payload: HummockVersionUpdate, + mut sst_delta_infos: Option<&mut Vec>, + ) -> PinnedVersion { let newly_pinned_version = match version_payload { HummockVersionUpdate::VersionDeltas(version_deltas) => { let mut version_to_apply = pinned_version.version().clone(); for version_delta in &version_deltas { assert_eq!(version_to_apply.id, version_delta.prev_id); if version_to_apply.max_committed_epoch == version_delta.max_committed_epoch { - sst_delta_infos = version_to_apply.build_sst_delta_infos(version_delta); + if let Some(sst_delta_infos) = &mut sst_delta_infos { + **sst_delta_infos = + version_to_apply.build_sst_delta_infos(version_delta); + } } version_to_apply.apply_version_delta(version_delta); } @@ -462,15 +548,12 @@ impl HummockEventHandler { validate_table_key_range(&newly_pinned_version); - let new_pinned_version = pinned_version.new_pin_version(newly_pinned_version); - - self.refiller - .start_cache_refill(sst_delta_infos, pinned_version, new_pinned_version); + pinned_version.new_pin_version(newly_pinned_version) } fn apply_version_update( &mut self, - pinned_version: Arc, + pinned_version: PinnedVersion, new_pinned_version: PinnedVersion, ) { self.pinned_version @@ -529,10 +612,26 @@ impl HummockEventHandler { } event = pin!(self.hummock_event_rx.recv()) => { let Some(event) = event else { break }; - if self.handle_hummock_event(event) { - break; + match event { + HummockEvent::Clear(notifier, prev_epoch) => { + self.handle_clear(notifier, prev_epoch).await + }, + HummockEvent::Shutdown => { + info!("event handler shutdown"); + return; + }, + event => { + self.handle_hummock_event(event); + } } } + version_update = pin!(self.version_update_rx.recv()) => { + let Some(version_update) = version_update else { + warn!("version update stream ends. event handle shutdown"); + return; + }; + self.handle_version_update(version_update); + } } } } @@ -570,7 +669,7 @@ impl HummockEventHandler { } /// Gracefully shutdown if returns `true`. - fn handle_hummock_event(&mut self, event: HummockEvent) -> bool { + fn handle_hummock_event(&mut self, event: HummockEvent) { match event { HummockEvent::BufferMayFlush => { self.uploader.may_flush(); @@ -581,18 +680,12 @@ impl HummockEventHandler { } => { self.handle_await_sync_epoch(new_sync_epoch, sync_result_sender); } - HummockEvent::Clear(notifier) => { - self.handle_clear(notifier); + HummockEvent::Clear(_, _) => { + unreachable!("clear is handled in separated async context") } HummockEvent::Shutdown => { - info!("buffer tracker shutdown"); - return true; - } - - HummockEvent::VersionUpdate(version_payload) => { - self.handle_version_update(version_payload); + unreachable!("shutdown is handled specially") } - HummockEvent::ImmToUploader(imm) => { assert!( self.local_read_version_mapping @@ -730,7 +823,6 @@ impl HummockEventHandler { } } } - false } fn generate_instance_id(&mut self) -> LocalInstanceId { @@ -775,7 +867,7 @@ fn to_sync_result(result: &HummockResult) -> HummockResult), + Clear(oneshot::Sender<()>, u64), Shutdown, - VersionUpdate(HummockVersionUpdate), - ImmToUploader(ImmutableMemtable), SealEpoch { @@ -108,14 +106,10 @@ impl HummockEvent { sync_result_sender: _, } => format!("AwaitSyncEpoch epoch {} ", new_sync_epoch), - HummockEvent::Clear(_) => "Clear".to_string(), + HummockEvent::Clear(_, prev_epoch) => format!("Clear {:?}", prev_epoch), HummockEvent::Shutdown => "Shutdown".to_string(), - HummockEvent::VersionUpdate(version_update_payload) => { - format!("VersionUpdate {:?}", version_update_payload) - } - HummockEvent::ImmToUploader(imm) => { format!("ImmToUploader {:?}", imm) } diff --git a/src/storage/src/hummock/event_handler/refiller.rs b/src/storage/src/hummock/event_handler/refiller.rs index 3be242d6b94ec..1c592fac85a2d 100644 --- a/src/storage/src/hummock/event_handler/refiller.rs +++ b/src/storage/src/hummock/event_handler/refiller.rs @@ -13,10 +13,10 @@ // limitations under the License. use std::collections::{HashMap, HashSet, VecDeque}; -use std::ops::{Deref, DerefMut, Range}; -use std::pin::Pin; +use std::future::poll_fn; +use std::ops::{Deref, Range}; use std::sync::{Arc, LazyLock}; -use std::task::{ready, Context, Poll}; +use std::task::{ready, Poll}; use std::time::{Duration, Instant}; use foyer::common::code::Key; @@ -223,16 +223,30 @@ struct Item { event: CacheRefillerEvent, } +pub(crate) type SpawnRefillTask = Arc< + // first current version, second new version + dyn Fn(Vec, CacheRefillContext, PinnedVersion, PinnedVersion) -> JoinHandle<()> + + Send + + Sync + + 'static, +>; + /// A cache refiller for hummock data. -pub struct CacheRefiller { +pub(crate) struct CacheRefiller { /// order: old => new queue: VecDeque, context: CacheRefillContext, + + spawn_refill_task: SpawnRefillTask, } impl CacheRefiller { - pub fn new(config: CacheRefillConfig, sstable_store: SstableStoreRef) -> Self { + pub(crate) fn new( + config: CacheRefillConfig, + sstable_store: SstableStoreRef, + spawn_refill_task: SpawnRefillTask, + ) -> Self { let config = Arc::new(config); let concurrency = Arc::new(Semaphore::new(config.concurrency)); Self { @@ -242,71 +256,87 @@ impl CacheRefiller { concurrency, sstable_store, }, + spawn_refill_task, } } - pub fn start_cache_refill( + pub(crate) fn default_spawn_refill_task() -> SpawnRefillTask { + Arc::new(|deltas, context, _, _| { + let task = CacheRefillTask { deltas, context }; + tokio::spawn(task.run()) + }) + } + + pub(crate) fn start_cache_refill( &mut self, deltas: Vec, - pinned_version: Arc, + pinned_version: PinnedVersion, new_pinned_version: PinnedVersion, ) { - let task = CacheRefillTask { + let handle = (self.spawn_refill_task)( deltas, - context: self.context.clone(), - }; + self.context.clone(), + pinned_version.clone(), + new_pinned_version.clone(), + ); let event = CacheRefillerEvent { pinned_version, new_pinned_version, }; - let handle = tokio::spawn(task.run()); let item = Item { handle, event }; self.queue.push_back(item); GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.add(1); } - pub fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { + pub(crate) fn last_new_pinned_version(&self) -> Option<&PinnedVersion> { self.queue.back().map(|item| &item.event.new_pinned_version) } - pub fn next_event(&mut self) -> NextCacheRefillerEvent<'_> { - NextCacheRefillerEvent { refiller: self } + /// Clear the queue for cache refill and return an event that merges all pending cache refill events + /// into a single event that takes the earliest and latest version. + pub(crate) fn clear(&mut self) -> Option { + let Some(last_item) = self.queue.pop_back() else { + return None; + }; + let mut event = last_item.event; + while let Some(item) = self.queue.pop_back() { + assert_eq!( + event.pinned_version.id(), + item.event.new_pinned_version.id() + ); + event.pinned_version = item.event.pinned_version; + } + Some(event) } } -pub struct NextCacheRefillerEvent<'a> { - refiller: &'a mut CacheRefiller, -} - -impl<'a> Future for NextCacheRefillerEvent<'a> { - type Output = CacheRefillerEvent; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let refiller = &mut self.deref_mut().refiller; - - if let Some(item) = refiller.queue.front_mut() { - ready!(item.handle.poll_unpin(cx)).unwrap(); - let item = refiller.queue.pop_front().unwrap(); - GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1); - return Poll::Ready(item.event); - } - Poll::Pending +impl CacheRefiller { + pub(crate) fn next_event(&mut self) -> impl Future + '_ { + poll_fn(|cx| { + if let Some(item) = self.queue.front_mut() { + ready!(item.handle.poll_unpin(cx)).unwrap(); + let item = self.queue.pop_front().unwrap(); + GLOBAL_CACHE_REFILL_METRICS.refill_queue_total.sub(1); + return Poll::Ready(item.event); + } + Poll::Pending + }) } } pub struct CacheRefillerEvent { - pub pinned_version: Arc, + pub pinned_version: PinnedVersion, pub new_pinned_version: PinnedVersion, } #[derive(Clone)] -struct CacheRefillContext { +pub(crate) struct CacheRefillContext { config: Arc, concurrency: Arc, sstable_store: SstableStoreRef, } -pub struct CacheRefillTask { +struct CacheRefillTask { deltas: Vec, context: CacheRefillContext, } diff --git a/src/storage/src/hummock/event_handler/uploader.rs b/src/storage/src/hummock/event_handler/uploader.rs index b8ebc858054a1..7ffa3956c2349 100644 --- a/src/storage/src/hummock/event_handler/uploader.rs +++ b/src/storage/src/hummock/event_handler/uploader.rs @@ -15,9 +15,8 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, VecDeque}; use std::fmt::{Debug, Display, Formatter}; -use std::future::Future; +use std::future::{poll_fn, Future}; use std::mem::swap; -use std::ops::DerefMut; use std::pin::Pin; use std::sync::Arc; use std::task::{ready, Context, Poll}; @@ -780,6 +779,10 @@ impl HummockUploader { self.context.pinned_version.max_committed_epoch() } + pub(crate) fn hummock_version(&self) -> &PinnedVersion { + &self.context.pinned_version + } + pub(crate) fn get_synced_data(&self, epoch: HummockEpoch) -> Option<&SyncedDataState> { assert!(self.max_committed_epoch() < epoch && epoch <= self.max_synced_epoch); self.synced_data.get(&epoch) @@ -1037,10 +1040,6 @@ impl HummockUploader { // TODO: call `abort` on the uploading task join handle } - - pub(crate) fn next_event(&mut self) -> NextUploaderEvent<'_> { - NextUploaderEvent { uploader: self } - } } impl HummockUploader { @@ -1132,10 +1131,6 @@ impl HummockUploader { } } -pub(crate) struct NextUploaderEvent<'a> { - uploader: &'a mut HummockUploader, -} - pub(crate) enum UploaderEvent { // staging sstable info of newer data comes first SyncFinish(HummockEpoch, Vec), @@ -1143,30 +1138,28 @@ pub(crate) enum UploaderEvent { ImmMerged(MergeImmTaskOutput), } -impl<'a> Future for NextUploaderEvent<'a> { - type Output = UploaderEvent; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let uploader = &mut self.deref_mut().uploader; - - if let Some((epoch, newly_uploaded_sstables)) = ready!(uploader.poll_syncing_task(cx)) { - return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); - } +impl HummockUploader { + pub(crate) fn next_event(&mut self) -> impl Future + '_ { + poll_fn(|cx| { + if let Some((epoch, newly_uploaded_sstables)) = ready!(self.poll_syncing_task(cx)) { + return Poll::Ready(UploaderEvent::SyncFinish(epoch, newly_uploaded_sstables)); + } - if let Some(sstable_info) = ready!(uploader.poll_sealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); - } + if let Some(sstable_info) = ready!(self.poll_sealed_spill_task(cx)) { + return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + } - if let Some(sstable_info) = ready!(uploader.poll_unsealed_spill_task(cx)) { - return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); - } + if let Some(sstable_info) = ready!(self.poll_unsealed_spill_task(cx)) { + return Poll::Ready(UploaderEvent::DataSpilled(sstable_info)); + } - if let Some(merge_output) = ready!(uploader.poll_sealed_merge_imm_task(cx)) { - // add the merged imm into sealed data - uploader.update_sealed_data(&merge_output.merged_imm); - return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); - } - Poll::Pending + if let Some(merge_output) = ready!(self.poll_sealed_merge_imm_task(cx)) { + // add the merged imm into sealed data + self.update_sealed_data(&merge_output.merged_imm); + return Poll::Ready(UploaderEvent::ImmMerged(merge_output)); + } + Poll::Pending + }) } } diff --git a/src/storage/src/hummock/observer_manager.rs b/src/storage/src/hummock/observer_manager.rs index e96d575ce599b..4e10d9a523950 100644 --- a/src/storage/src/hummock/observer_manager.rs +++ b/src/storage/src/hummock/observer_manager.rs @@ -26,14 +26,14 @@ use tokio::sync::mpsc::UnboundedSender; use crate::filter_key_extractor::{FilterKeyExtractorImpl, FilterKeyExtractorManagerRef}; use crate::hummock::backup_reader::BackupReaderRef; -use crate::hummock::event_handler::{HummockEvent, HummockVersionUpdate}; +use crate::hummock::event_handler::HummockVersionUpdate; use crate::hummock::write_limiter::WriteLimiterRef; pub struct HummockObserverNode { filter_key_extractor_manager: FilterKeyExtractorManagerRef, backup_reader: BackupReaderRef, write_limiter: WriteLimiterRef, - version_update_sender: UnboundedSender, + version_update_sender: UnboundedSender, version: u64, } @@ -71,14 +71,12 @@ impl ObserverState for HummockObserverNode { Info::HummockVersionDeltas(hummock_version_deltas) => { let _ = self .version_update_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::VersionDeltas( - hummock_version_deltas - .version_deltas - .iter() - .map(HummockVersionDelta::from_rpc_protobuf) - .collect(), - ), + .send(HummockVersionUpdate::VersionDeltas( + hummock_version_deltas + .version_deltas + .iter() + .map(HummockVersionDelta::from_rpc_protobuf) + .collect(), )) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send version delta"); @@ -123,12 +121,12 @@ impl ObserverState for HummockObserverNode { ); let _ = self .version_update_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::PinnedVersion(HummockVersion::from_rpc_protobuf( + .send(HummockVersionUpdate::PinnedVersion( + HummockVersion::from_rpc_protobuf( &snapshot .hummock_version .expect("should get hummock version"), - )), + ), )) .inspect_err(|e| { tracing::error!(event = ?e.0, "unable to send full version"); @@ -142,7 +140,7 @@ impl HummockObserverNode { pub fn new( filter_key_extractor_manager: FilterKeyExtractorManagerRef, backup_reader: BackupReaderRef, - version_update_sender: UnboundedSender, + version_update_sender: UnboundedSender, write_limiter: WriteLimiterRef, ) -> Self { Self { diff --git a/src/storage/src/hummock/store/hummock_storage.rs b/src/storage/src/hummock/store/hummock_storage.rs index a5322a0d7765b..d2064ffd09dee 100644 --- a/src/storage/src/hummock/store/hummock_storage.rs +++ b/src/storage/src/hummock/store/hummock_storage.rs @@ -79,6 +79,8 @@ impl Drop for HummockStorageShutdownGuard { #[derive(Clone)] pub struct HummockStorage { hummock_event_sender: UnboundedSender, + // only used in test for setting hummock version in uploader + _version_update_sender: UnboundedSender, context: CompactorContext, @@ -151,22 +153,22 @@ impl HummockStorage { .await .map_err(HummockError::read_backup_error)?; let write_limiter = Arc::new(WriteLimiter::default()); - let (event_tx, mut event_rx) = unbounded_channel(); + let (version_update_tx, mut version_update_rx) = unbounded_channel(); let observer_manager = ObserverManager::new( notification_client, HummockObserverNode::new( filter_key_extractor_manager.clone(), backup_reader.clone(), - event_tx.clone(), + version_update_tx.clone(), write_limiter.clone(), ), ) .await; observer_manager.start().await; - let hummock_version = match event_rx.recv().await { - Some(HummockEvent::VersionUpdate(HummockVersionUpdate::PinnedVersion(version))) => version, + let hummock_version = match version_update_rx.recv().await { + Some(HummockVersionUpdate::PinnedVersion(version)) => version, _ => unreachable!("the hummock observer manager is the first one to take the event tx. Should be full hummock version") }; @@ -189,8 +191,7 @@ impl HummockStorage { let seal_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let min_current_epoch = Arc::new(AtomicU64::new(pinned_version.max_committed_epoch())); let hummock_event_handler = HummockEventHandler::new( - event_tx.clone(), - event_rx, + version_update_rx, pinned_version, compactor_context.clone(), filter_key_extractor_manager.clone(), @@ -198,6 +199,8 @@ impl HummockStorage { state_store_metrics.clone(), ); + let event_tx = hummock_event_handler.event_sender(); + let instance = Self { context: compactor_context, filter_key_extractor_manager: filter_key_extractor_manager.clone(), @@ -206,6 +209,7 @@ impl HummockStorage { version_update_notifier_tx: hummock_event_handler.version_update_notifier_tx(), seal_epoch, hummock_event_sender: event_tx.clone(), + _version_update_sender: version_update_tx, pinned_version: hummock_event_handler.pinned_version(), hummock_version_reader: HummockVersionReader::new( sstable_store, @@ -467,10 +471,10 @@ impl StateStore for HummockStorage { StoreLocalStatistic::flush_all(); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, prev_epoch: u64) { let (tx, rx) = oneshot::channel(); self.hummock_event_sender - .send(HummockEvent::Clear(tx)) + .send(HummockEvent::Clear(tx, prev_epoch)) .expect("should send success"); rx.await.expect("should wait success"); @@ -478,8 +482,6 @@ impl StateStore for HummockStorage { self.min_current_epoch .store(HummockEpoch::MAX, MemOrdering::SeqCst); self.seal_epoch.store(epoch, MemOrdering::SeqCst); - - Ok(()) } fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_ { @@ -527,13 +529,12 @@ impl HummockStorage { } /// Used in the compaction test tool + #[cfg(any(test, feature = "test"))] pub async fn update_version_and_wait(&self, version: HummockVersion) { use tokio::task::yield_now; let version_id = version.id; - self.hummock_event_sender - .send(HummockEvent::VersionUpdate( - HummockVersionUpdate::PinnedVersion(version), - )) + self._version_update_sender + .send(HummockVersionUpdate::PinnedVersion(version)) .unwrap(); loop { if self.pinned_version.load().id() >= version_id { diff --git a/src/storage/src/memory.rs b/src/storage/src/memory.rs index 1e77b5f5652bd..007260fef7350 100644 --- a/src/storage/src/memory.rs +++ b/src/storage/src/memory.rs @@ -635,7 +635,7 @@ impl StateStore for RangeKvStateStore { fn seal_epoch(&self, _epoch: u64, _is_checkpoint: bool) {} #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, _prev_epoch: u64) { unimplemented!("recovery not supported") } diff --git a/src/storage/src/monitor/monitored_store.rs b/src/storage/src/monitor/monitored_store.rs index 6e3b9b3db0fc0..1bea7f6742c68 100644 --- a/src/storage/src/monitor/monitored_store.rs +++ b/src/storage/src/monitor/monitored_store.rs @@ -331,11 +331,10 @@ impl StateStore for MonitoredStateStore { panic!("the state store is already monitored") } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { self.inner - .clear_shared_buffer() + .clear_shared_buffer(prev_epoch) .verbose_instrument_await("store_clear_shared_buffer") - .inspect_err(|e| error!(error = %e.as_report(), "Failed in clear_shared_buffer")) } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { diff --git a/src/storage/src/monitor/traced_store.rs b/src/storage/src/monitor/traced_store.rs index 95a5c835407d6..8cf96a231ead0 100644 --- a/src/storage/src/monitor/traced_store.rs +++ b/src/storage/src/monitor/traced_store.rs @@ -250,13 +250,9 @@ impl StateStore for TracedStateStore { self.inner.seal_epoch(epoch, is_checkpoint); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { - let span = TraceSpan::new_clear_shared_buffer_span(); - let res = self.inner.clear_shared_buffer().await; - span.may_send_result(OperationResult::ClearSharedBuffer( - res.as_ref().map(|o| *o).into(), - )); - res + async fn clear_shared_buffer(&self, prev_epoch: u64) { + let _span = TraceSpan::new_clear_shared_buffer_span(prev_epoch); + self.inner.clear_shared_buffer(prev_epoch).await; } async fn new_local(&self, options: NewLocalOptions) -> Self::Local { diff --git a/src/storage/src/panic_store.rs b/src/storage/src/panic_store.rs index 08b0663b4e220..5299cac9fe085 100644 --- a/src/storage/src/panic_store.rs +++ b/src/storage/src/panic_store.rs @@ -154,7 +154,7 @@ impl StateStore for PanicStateStore { } #[allow(clippy::unused_async)] - async fn clear_shared_buffer(&self) -> StorageResult<()> { + async fn clear_shared_buffer(&self, _prev_epoch: u64) { panic!("should not clear shared buffer from the panic state store!"); } diff --git a/src/storage/src/store.rs b/src/storage/src/store.rs index 0daca6aa5305d..7c8353dc0f30f 100644 --- a/src/storage/src/store.rs +++ b/src/storage/src/store.rs @@ -191,7 +191,7 @@ pub trait StateStore: StateStoreRead + StaticSendSync + Clone { /// Clears contents in shared buffer. /// This method should only be called when dropping all actors in the local compute node. - fn clear_shared_buffer(&self) -> impl Future> + Send + '_; + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_; fn new_local(&self, option: NewLocalOptions) -> impl Future + Send + '_; diff --git a/src/storage/src/store_impl.rs b/src/storage/src/store_impl.rs index 4502236c0a20a..f1316fe7e20c8 100644 --- a/src/storage/src/store_impl.rs +++ b/src/storage/src/store_impl.rs @@ -482,8 +482,8 @@ pub mod verify { self.actual.seal_epoch(epoch, is_checkpoint) } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { - self.actual.clear_shared_buffer() + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { + self.actual.clear_shared_buffer(prev_epoch) } async fn new_local(&self, option: NewLocalOptions) -> Self::Local { @@ -927,7 +927,7 @@ pub mod boxed_state_store { fn seal_epoch(&self, epoch: u64, is_checkpoint: bool); - async fn clear_shared_buffer(&self) -> StorageResult<()>; + async fn clear_shared_buffer(&self, prev_epoch: u64); async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore; @@ -948,8 +948,8 @@ pub mod boxed_state_store { self.seal_epoch(epoch, is_checkpoint); } - async fn clear_shared_buffer(&self) -> StorageResult<()> { - self.clear_shared_buffer().await + async fn clear_shared_buffer(&self, prev_epoch: u64) { + self.clear_shared_buffer(prev_epoch).await } async fn new_local(&self, option: NewLocalOptions) -> BoxDynamicDispatchedLocalStateStore { @@ -1018,8 +1018,8 @@ pub mod boxed_state_store { self.deref().sync(epoch) } - fn clear_shared_buffer(&self) -> impl Future> + Send + '_ { - self.deref().clear_shared_buffer() + fn clear_shared_buffer(&self, prev_epoch: u64) -> impl Future + Send + '_ { + self.deref().clear_shared_buffer(prev_epoch) } fn seal_epoch(&self, epoch: u64, is_checkpoint: bool) { diff --git a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs index 1cbd65fec151f..9b77a9a7bf096 100644 --- a/src/stream/src/common/log_store_impl/kv_log_store/mod.rs +++ b/src/stream/src/common/log_store_impl/kv_log_store/mod.rs @@ -640,7 +640,7 @@ mod tests { drop(writer); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -845,7 +845,7 @@ mod tests { drop(writer); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; // Rebuild log reader and writer in recovery let factory = KvLogStoreFactory::new( @@ -1072,7 +1072,7 @@ mod tests { drop(writer2); // Recovery - test_env.storage.clear_shared_buffer().await.unwrap(); + test_env.storage.clear_shared_buffer(epoch2).await; let vnodes = build_bitmap(0..VirtualNode::COUNT); let factory = KvLogStoreFactory::new( diff --git a/src/stream/src/executor/source/executor_core.rs b/src/stream/src/executor/source/executor_core.rs index 8857654be3cf2..6b3713cc64af1 100644 --- a/src/stream/src/executor/source/executor_core.rs +++ b/src/stream/src/executor/source/executor_core.rs @@ -35,16 +35,17 @@ pub struct StreamSourceCore { /// Split info for stream source. A source executor might read data from several splits of /// external connector. - pub(crate) stream_source_splits: HashMap, + pub(crate) latest_split_info: HashMap, /// Stores information of the splits. pub(crate) split_state_store: SourceStateTableHandler, - /// In-memory cache for the splits. + /// Contains the latests offsets for the splits that are updated *in the current epoch*. + /// It is cleared after each barrier. /// /// Source messages will only write the cache. /// It is read on split change and rebuild stream reader on error. - pub(crate) state_cache: HashMap, + pub(crate) updated_splits_in_epoch: HashMap, } impl StreamSourceCore @@ -63,14 +64,14 @@ where source_name, column_ids, source_desc_builder: Some(source_desc_builder), - stream_source_splits: HashMap::new(), + latest_split_info: HashMap::new(), split_state_store, - state_cache: HashMap::new(), + updated_splits_in_epoch: HashMap::new(), } } pub fn init_split_state(&mut self, splits: Vec) { - self.stream_source_splits = splits + self.latest_split_info = splits .into_iter() .map(|split| (split.id(), split)) .collect(); diff --git a/src/stream/src/executor/source/fs_source_executor.rs b/src/stream/src/executor/source/fs_source_executor.rs index cbda448712e72..a2478cdb6bb0d 100644 --- a/src/stream/src/executor/source/fs_source_executor.rs +++ b/src/stream/src/executor/source/fs_source_executor.rs @@ -149,7 +149,7 @@ impl FsSourceExecutor { let mut target_state: Vec = Vec::new(); let mut no_change_flag = true; for sc in rhs { - if let Some(s) = core.state_cache.get(&sc.id()) { + if let Some(s) = core.updated_splits_in_epoch.get(&sc.id()) { let fs = s .as_fs() .unwrap_or_else(|| panic!("split {:?} is not fs", s)); @@ -173,7 +173,7 @@ impl FsSourceExecutor { sc }; - core.state_cache + core.updated_splits_in_epoch .entry(state.id()) .or_insert_with(|| state.clone()); target_state.push(state); @@ -201,7 +201,7 @@ impl FsSourceExecutor { .map_err(StreamExecutorError::connector_error); stream.replace_data_stream(reader); - self.stream_source_core.stream_source_splits = target_state + self.stream_source_core.latest_split_info = target_state .into_iter() .map(|split| (split.id(), split)) .collect(); @@ -215,7 +215,7 @@ impl FsSourceExecutor { ) -> StreamExecutorResult<()> { let core = &mut self.stream_source_core; let incompleted = core - .state_cache + .updated_splits_in_epoch .values() .filter(|split| { let fs = split @@ -227,7 +227,7 @@ impl FsSourceExecutor { .collect_vec(); let completed = core - .state_cache + .updated_splits_in_epoch .values() .filter(|split| { let fs = split @@ -250,7 +250,7 @@ impl FsSourceExecutor { // commit anyway, even if no message saved core.split_state_store.state_store.commit(epoch).await?; - core.state_cache.clear(); + core.updated_splits_in_epoch.clear(); Ok(()) } @@ -439,17 +439,18 @@ impl FsSourceExecutor { let state: Vec<(SplitId, SplitImpl)> = mapping .iter() .flat_map(|(id, offset)| { - let origin_split = - self.stream_source_core.stream_source_splits.get_mut(id); - - origin_split.map(|split| { - split.update_in_place(offset.clone())?; - Ok::<_, anyhow::Error>((id.clone(), split.clone())) - }) + self.stream_source_core.latest_split_info.get_mut(id).map( + |origin_split| { + origin_split.update_in_place(offset.clone())?; + Ok::<_, anyhow::Error>((id.clone(), origin_split.clone())) + }, + ) }) .try_collect()?; - self.stream_source_core.state_cache.extend(state); + self.stream_source_core + .updated_splits_in_epoch + .extend(state); } self.metrics diff --git a/src/stream/src/executor/source/source_executor.rs b/src/stream/src/executor/source/source_executor.rs index 07903a2c7b34e..8ad653c5f8397 100644 --- a/src/stream/src/executor/source/source_executor.rs +++ b/src/stream/src/executor/source/source_executor.rs @@ -138,6 +138,7 @@ impl SourceExecutor { ] } + /// Returns `target_states` if split changed. Otherwise `None`. async fn apply_split_change( &mut self, source_desc: &SourceDesc, @@ -176,7 +177,9 @@ impl SourceExecutor { Ok(None) } - /// Note: `update_state_if_changed` will modify `state_cache` + /// Returns `target_states` if split changed. Otherwise `None`. + /// + /// Note: `update_state_if_changed` will modify `updated_splits_in_epoch` async fn update_state_if_changed( &mut self, state: ConnectorState, @@ -193,8 +196,9 @@ impl SourceExecutor { let mut split_changed = false; + // Checks added splits for (split_id, split) in &target_splits { - if let Some(s) = core.state_cache.get(split_id) { + if let Some(s) = core.updated_splits_in_epoch.get(split_id) { // existing split, no change, clone from cache target_state.push(s.clone()) } else { @@ -211,7 +215,7 @@ impl SourceExecutor { split.clone() }; - core.state_cache + core.updated_splits_in_epoch .entry(split.id()) .or_insert_with(|| initial_state.clone()); @@ -219,8 +223,8 @@ impl SourceExecutor { } } - // state cache may be stale - for existing_split_id in core.stream_source_splits.keys() { + // Checks dropped splits + for existing_split_id in core.latest_split_info.keys() { if !target_splits.contains_key(existing_split_id) { tracing::info!("split dropping detected: {}", existing_split_id); split_changed = true; @@ -235,7 +239,6 @@ impl SourceExecutor { &mut self, source_desc: &SourceDesc, stream: &mut StreamReaderWithPause, - split_info: &mut [SplitImpl], e: StreamExecutorError, ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); @@ -252,26 +255,8 @@ impl SourceExecutor { self.actor_ctx.id.to_string(), core.source_id.to_string(), ]); - // fetch the newest offset, either it's in cache (before barrier) - // or in state table (just after barrier) - let target_state = if core.state_cache.is_empty() { - for ele in &mut *split_info { - if let Some(recover_state) = core - .split_state_store - .try_recover_from_state_store(ele) - .await? - { - *ele = recover_state; - } - } - split_info.to_owned() - } else { - core.state_cache - .values() - .map(|split_impl| split_impl.to_owned()) - .collect_vec() - }; + let target_state = core.latest_split_info.values().cloned().collect(); self.replace_stream_reader_with_target_state(source_desc, stream, target_state) .await } @@ -301,16 +286,24 @@ impl SourceExecutor { /// - `target_state`: the new split info from barrier. `None` if no split update. /// - `should_trim_state`: whether to trim state for dropped splits. + /// + /// For scaling, the connector splits can be migrated to other actors, but + /// won't be added or removed. Actors should not trim states for splits that + /// are moved to other actors. + /// + /// For source split change, split will not be migrated and we can trim states + /// for deleted splits. async fn persist_state_and_clear_cache( &mut self, epoch: EpochPair, + // target_state is Some means split change (or migration) happened. target_state: Option>, should_trim_state: bool, ) -> StreamExecutorResult<()> { let core = self.stream_source_core.as_mut().unwrap(); let mut cache = core - .state_cache + .updated_splits_in_epoch .values() .map(|split_impl| split_impl.to_owned()) .collect_vec(); @@ -322,7 +315,7 @@ impl SourceExecutor { cache.retain(|split| target_split_ids.contains(&split.id())); let dropped_splits = core - .stream_source_splits + .latest_split_info .extract_if(|split_id, _| !target_split_ids.contains(split_id)) .map(|(_, split)| split) .collect_vec(); @@ -332,7 +325,7 @@ impl SourceExecutor { core.split_state_store.trim_state(&dropped_splits).await?; } - core.stream_source_splits = target_splits + core.latest_split_info = target_splits .into_iter() .map(|split| (split.id(), split)) .collect(); @@ -345,7 +338,7 @@ impl SourceExecutor { // commit anyway, even if no message saved core.split_state_store.state_store.commit(epoch).await?; - core.state_cache.clear(); + core.updated_splits_in_epoch.clear(); Ok(()) } @@ -410,7 +403,6 @@ impl SourceExecutor { _ => {} } } - let mut latest_split_info = boot_state.clone(); core.split_state_store.init_epoch(barrier.epoch); @@ -462,13 +454,8 @@ impl SourceExecutor { while let Some(msg) = stream.next().await { let Ok(msg) = msg else { tokio::time::sleep(Duration::from_millis(1000)).await; - self.rebuild_stream_reader_from_error( - &source_desc, - &mut stream, - &mut latest_split_info, - msg.unwrap_err(), - ) - .await?; + self.rebuild_stream_reader_from_error(&source_desc, &mut stream, msg.unwrap_err()) + .await?; continue; }; @@ -513,10 +500,6 @@ impl SourceExecutor { } } - if let Some(target_state) = &target_state { - latest_split_info = target_state.clone(); - } - self.persist_state_and_clear_cache(epoch, target_state, should_trim_state) .await?; @@ -572,24 +555,25 @@ impl SourceExecutor { let state: HashMap<_, _> = mapping .iter() .flat_map(|(split_id, offset)| { - let origin_split_impl = self - .stream_source_core + self.stream_source_core .as_mut() .unwrap() - .stream_source_splits - .get_mut(split_id); - - origin_split_impl.map(|split_impl| { - split_impl.update_in_place(offset.clone())?; - Ok::<_, anyhow::Error>((split_id.clone(), split_impl.clone())) - }) + .latest_split_info + .get_mut(split_id) + .map(|original_split_impl| { + original_split_impl.update_in_place(offset.clone())?; + Ok::<_, anyhow::Error>(( + split_id.clone(), + original_split_impl.clone(), + )) + }) }) .try_collect()?; self.stream_source_core .as_mut() .unwrap() - .state_cache + .updated_splits_in_epoch .extend(state); } metric_row_per_barrier += chunk.cardinality() as u64; @@ -736,9 +720,9 @@ mod tests { source_id: table_id, column_ids, source_desc_builder: Some(source_desc_builder), - stream_source_splits: HashMap::new(), + latest_split_info: HashMap::new(), split_state_store, - state_cache: HashMap::new(), + updated_splits_in_epoch: HashMap::new(), source_name: MOCK_SOURCE_NAME.to_string(), }; @@ -830,9 +814,9 @@ mod tests { source_id: table_id, column_ids: column_ids.clone(), source_desc_builder: Some(source_desc_builder), - stream_source_splits: HashMap::new(), + latest_split_info: HashMap::new(), split_state_store, - state_cache: HashMap::new(), + updated_splits_in_epoch: HashMap::new(), source_name: MOCK_SOURCE_NAME.to_string(), }; diff --git a/src/stream/src/task/barrier_manager.rs b/src/stream/src/task/barrier_manager.rs index b838314729ad3..385825a23e0d2 100644 --- a/src/stream/src/task/barrier_manager.rs +++ b/src/stream/src/task/barrier_manager.rs @@ -72,7 +72,10 @@ pub(super) enum LocalBarrierEvent { actor_ids_to_collect: HashSet, result_sender: oneshot::Sender>, }, - Reset(oneshot::Sender<()>), + Reset { + prev_epoch: u64, + result_sender: oneshot::Sender<()>, + }, ReportActorCollected { actor_id: ActorId, barrier: Barrier, @@ -223,9 +226,10 @@ impl LocalBarrierWorker { event = event_rx.recv() => { if let Some(event) = event { match event { - LocalBarrierEvent::Reset(finish_sender) => { - self.reset().await; - let _ = finish_sender.send(()); + LocalBarrierEvent::Reset { + result_sender, prev_epoch} => { + self.reset(prev_epoch).await; + let _ = result_sender.send(()); } event => { self.handle_event(event); @@ -268,7 +272,7 @@ impl LocalBarrierWorker { warn!(err=?e, "fail to send inject barrier result"); }); } - LocalBarrierEvent::Reset(_) => { + LocalBarrierEvent::Reset { .. } => { unreachable!("Reset event should be handled separately in async context") } ReportActorCollected { actor_id, barrier } => self.collect(actor_id, &barrier), diff --git a/src/stream/src/task/stream_manager.rs b/src/stream/src/task/stream_manager.rs index 5a2bde99da491..11eb9a44290cf 100644 --- a/src/stream/src/task/stream_manager.rs +++ b/src/stream/src/task/stream_manager.rs @@ -231,9 +231,12 @@ impl LocalStreamManager { } /// Force stop all actors on this worker, and then drop their resources. - pub async fn reset(&self) { + pub async fn reset(&self, prev_epoch: u64) { self.local_barrier_manager - .send_and_await(LocalBarrierEvent::Reset) + .send_and_await(|result_sender| LocalBarrierEvent::Reset { + result_sender, + prev_epoch, + }) .await .expect("should receive reset") } @@ -268,7 +271,7 @@ impl LocalBarrierWorker { } /// Force stop all actors on this worker, and then drop their resources. - pub(super) async fn reset(&mut self) { + pub(super) async fn reset(&mut self, prev_epoch: u64) { let actor_handles = self.actor_manager_state.drain_actor_handles(); for (actor_id, handle) in &actor_handles { tracing::debug!("force stopping actor {}", actor_id); @@ -295,7 +298,7 @@ impl LocalBarrierWorker { m.lock().clear(); } dispatch_state_store!(&self.actor_manager.env.state_store(), store, { - store.clear_shared_buffer().await.unwrap(); + store.clear_shared_buffer(prev_epoch).await; }); self.reset_state(); self.actor_manager.env.dml_manager_ref().clear(); diff --git a/src/utils/runtime/Cargo.toml b/src/utils/runtime/Cargo.toml index 5d6270e35f6d6..1f4e085632eae 100644 --- a/src/utils/runtime/Cargo.toml +++ b/src/utils/runtime/Cargo.toml @@ -21,8 +21,6 @@ console-subscriber = "0.2.0" either = "1" futures = { version = "0.3", default-features = false, features = ["alloc"] } hostname = "0.3" -opentelemetry-otlp = { version = "0.13" } -opentelemetry-semantic-conventions = "0.12" parking_lot = { version = "0.12", features = ["deadlock_detection"] } pprof = { version = "0.13", features = ["flamegraph"] } risingwave_common = { workspace = true } @@ -40,11 +38,14 @@ tokio = { version = "0.2", package = "madsim-tokio", features = [ "fs" ] } tracing = "0.1" -tracing-opentelemetry = "0.21" +tracing-opentelemetry = { workspace = true } tracing-subscriber = { version = "0.3", features = ["fmt", "parking_lot", "std", "time", "local-time", "json"] } [target.'cfg(not(madsim))'.dependencies] -opentelemetry = { version = "0.20", default-features = false, features = ["rt-tokio"] } +opentelemetry = { workspace = true } +opentelemetry-otlp = { workspace = true } +opentelemetry-semantic-conventions = { workspace = true } +opentelemetry_sdk = { workspace = true, features = ["rt-tokio"] } # only enable `rt-tokio` feature under non-madsim target workspace-hack = { path = "../../workspace-hack" } [lints] diff --git a/src/utils/runtime/src/logger.rs b/src/utils/runtime/src/logger.rs index 6bacbe28d264a..da77f2e76c0b6 100644 --- a/src/utils/runtime/src/logger.rs +++ b/src/utils/runtime/src/logger.rs @@ -392,8 +392,9 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { if let Some(endpoint) = settings.tracing_endpoint { println!("opentelemetry tracing will be exported to `{endpoint}` if enabled"); - use opentelemetry::{sdk, KeyValue}; + use opentelemetry::KeyValue; use opentelemetry_otlp::WithExportConfig; + use opentelemetry_sdk as sdk; use opentelemetry_semantic_conventions::resource; let id = format!( @@ -435,7 +436,7 @@ pub fn init_risingwave_logger(settings: LoggerSettings) { KeyValue::new(resource::SERVICE_VERSION, env!("CARGO_PKG_VERSION")), KeyValue::new(resource::PROCESS_PID, std::process::id().to_string()), ]))) - .install_batch(opentelemetry::runtime::Tokio) + .install_batch(sdk::runtime::Tokio) .unwrap() }; diff --git a/src/workspace-hack/Cargo.toml b/src/workspace-hack/Cargo.toml index 0c7df07fa5b79..690fed8acc47a 100644 --- a/src/workspace-hack/Cargo.toml +++ b/src/workspace-hack/Cargo.toml @@ -90,8 +90,6 @@ num-iter = { version = "0.1", default-features = false, features = ["i128", "std num-traits = { version = "0.2", features = ["i128", "libm"] } openssl = { version = "0.10", features = ["vendored"] } openssl-sys = { version = "0.9", default-features = false, features = ["vendored"] } -opentelemetry_api = { version = "0.20", features = ["logs", "metrics"] } -opentelemetry_sdk = { version = "0.20", features = ["logs", "metrics"] } ordered-float = { version = "3" } parking_lot = { version = "0.12", features = ["arc_lock", "deadlock_detection"] } parking_lot_core = { version = "0.9", default-features = false, features = ["deadlock_detection"] }