From 0f3facef04ae65883e1e1f15448002844b0d0d7e Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Mon, 11 Jul 2022 22:21:58 +0800 Subject: [PATCH 1/6] chore(common/base): rm useless mod in infallible --- common/base/src/infallible/condvar.rs | 34 ------------ common/base/src/infallible/exit_guard.rs | 29 ---------- common/base/src/infallible/mod.rs | 18 ------ common/base/src/infallible/mutex.rs | 36 ------------ common/base/src/infallible/rwlock.rs | 55 ------------------- .../src/infallible/rwlock_upgrade_read.rs | 39 ------------- common/base/src/mem_allocator/malloc_size.rs | 12 ---- common/base/tests/it/derive.rs | 2 +- common/base/tests/it/main.rs | 2 - common/base/tests/it/mutex.rs | 39 ------------- common/base/tests/it/rwlock.rs | 40 -------------- 11 files changed, 1 insertion(+), 305 deletions(-) delete mode 100644 common/base/src/infallible/condvar.rs delete mode 100644 common/base/src/infallible/exit_guard.rs delete mode 100644 common/base/src/infallible/mutex.rs delete mode 100644 common/base/src/infallible/rwlock.rs delete mode 100644 common/base/src/infallible/rwlock_upgrade_read.rs delete mode 100644 common/base/tests/it/mutex.rs delete mode 100644 common/base/tests/it/rwlock.rs diff --git a/common/base/src/infallible/condvar.rs b/common/base/src/infallible/condvar.rs deleted file mode 100644 index a42224d3db9b2..0000000000000 --- a/common/base/src/infallible/condvar.rs +++ /dev/null @@ -1,34 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use parking_lot::Condvar as ParkingCondvar; -use parking_lot::MutexGuard; - -pub struct Condvar(ParkingCondvar); - -impl Condvar { - pub fn create() -> Condvar { - Condvar(ParkingCondvar::new()) - } - - #[inline] - pub fn notify_one(&self) -> bool { - self.0.notify_one() - } - - #[inline] - pub fn wait(&self, mutex_guard: &mut MutexGuard<'_, T>) { - self.0.wait(mutex_guard) - } -} diff --git a/common/base/src/infallible/exit_guard.rs b/common/base/src/infallible/exit_guard.rs deleted file mode 100644 index d26dc30d2dfae..0000000000000 --- a/common/base/src/infallible/exit_guard.rs +++ /dev/null @@ -1,29 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -pub struct ExitGuard { - function: F, -} - -impl ExitGuard { - pub fn create(f: F) -> ExitGuard { - ExitGuard { function: f } - } -} - -impl Drop for ExitGuard { - fn drop(&mut self) { - (self.function)(); - } -} diff --git a/common/base/src/infallible/mod.rs b/common/base/src/infallible/mod.rs index 470e155cd50a6..a8b611b529688 100644 --- a/common/base/src/infallible/mod.rs +++ b/common/base/src/infallible/mod.rs @@ -12,26 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -mod condvar; -mod exit_guard; -mod mutex; mod remutex; mod remutex_guard; -mod rwlock; -mod rwlock_upgrade_read; -pub use condvar::Condvar; -pub use exit_guard::ExitGuard; -pub use mutex::Mutex; pub use remutex::ReentrantMutex; pub use remutex_guard::ReentrantMutexGuard; -pub use rwlock::RwLock; -pub use rwlock_upgrade_read::RwLockUpgradableReadGuard; - -#[macro_export] -macro_rules! exit_scope { - ($x:block) => { - use common_base::infallible::ExitGuard; - let _exit_guard = ExitGuard::create(move || $x); - }; -} diff --git a/common/base/src/infallible/mutex.rs b/common/base/src/infallible/mutex.rs deleted file mode 100644 index 10c4f44ed1f8c..0000000000000 --- a/common/base/src/infallible/mutex.rs +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use parking_lot::Mutex as ParkingMutex; -use parking_lot::MutexGuard; - -/// A simple wrapper around the lock() function of a std::sync::Mutex -#[derive(Debug)] -pub struct Mutex(ParkingMutex); - -unsafe impl Send for Mutex where ParkingMutex: Send {} - -unsafe impl Sync for Mutex where ParkingMutex: Sync {} - -impl Mutex { - /// creates mutex - pub fn new(t: T) -> Self { - Self(ParkingMutex::new(t)) - } - - /// lock the mutex - pub fn lock(&self) -> MutexGuard<'_, T> { - self.0.lock() - } -} diff --git a/common/base/src/infallible/rwlock.rs b/common/base/src/infallible/rwlock.rs deleted file mode 100644 index bd9040adecddf..0000000000000 --- a/common/base/src/infallible/rwlock.rs +++ /dev/null @@ -1,55 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use parking_lot::RwLock as ParkingRwLock; -use parking_lot::RwLockReadGuard; -use parking_lot::RwLockWriteGuard; - -use super::RwLockUpgradableReadGuard; - -/// A simple wrapper around the lock() function of a std::sync::RwLock -/// The only difference is that you don't need to call unwrap() on it. -#[derive(Debug, Default)] -pub struct RwLock(ParkingRwLock); - -unsafe impl Send for RwLock where ParkingRwLock: Send {} - -unsafe impl Sync for RwLock where ParkingRwLock: Sync {} - -impl RwLock { - /// creates a read-write lock - pub fn new(t: T) -> Self { - Self(ParkingRwLock::new(t)) - } - - /// lock the rwlock in read mode - pub fn read(&self) -> RwLockReadGuard<'_, T> { - self.0.read() - } - - /// lock the rwlock in write mode - pub fn write(&self) -> RwLockWriteGuard<'_, T> { - self.0.write() - } - - // lock the rwlock in read mode and can be upgrade to write mode - pub fn upgradable_read(&self) -> RwLockUpgradableReadGuard<'_, T> { - RwLockUpgradableReadGuard::create(self.0.upgradable_read()) - } - - /// return the owned type consuming the lock - pub fn into_inner(self) -> T { - self.0.into_inner() - } -} diff --git a/common/base/src/infallible/rwlock_upgrade_read.rs b/common/base/src/infallible/rwlock_upgrade_read.rs deleted file mode 100644 index 004d12a4f8199..0000000000000 --- a/common/base/src/infallible/rwlock_upgrade_read.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::ops::Deref; - -use parking_lot::RwLockUpgradableReadGuard as ParkingUpgradableReadGuard; -use parking_lot::RwLockWriteGuard; - -pub struct RwLockUpgradableReadGuard<'a, T: ?Sized>(ParkingUpgradableReadGuard<'a, T>); - -impl<'a, T: ?Sized + 'a> RwLockUpgradableReadGuard<'a, T> { - pub fn create(inner: ParkingUpgradableReadGuard<'a, T>) -> Self { - Self(inner) - } - - pub fn upgrade(self) -> RwLockWriteGuard<'a, T> { - ParkingUpgradableReadGuard::<'a, T>::upgrade(self.0) - } -} - -impl<'a, T: ?Sized + 'a> Deref for RwLockUpgradableReadGuard<'a, T> { - type Target = T; - - #[inline] - fn deref(&self) -> &Self::Target { - self.0.deref() - } -} diff --git a/common/base/src/mem_allocator/malloc_size.rs b/common/base/src/mem_allocator/malloc_size.rs index 631e609170e4b..62599c4428979 100644 --- a/common/base/src/mem_allocator/malloc_size.rs +++ b/common/base/src/mem_allocator/malloc_size.rs @@ -524,12 +524,6 @@ impl MallocSizeOf for parking_lot::Mutex { } } -impl MallocSizeOf for crate::infallible::Mutex { - fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize { - self.lock().size_of(ops) - } -} - impl MallocSizeOf for std::sync::RwLock { fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize { self.read().unwrap().size_of(ops) @@ -542,12 +536,6 @@ impl MallocSizeOf for parking_lot::RwLock { } } -impl MallocSizeOf for crate::infallible::RwLock { - fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize { - self.read().size_of(ops) - } -} - impl MallocShallowSizeOf for Box { fn shallow_size_of(&self, _ops: &mut MallocSizeOfOps) -> usize { size_of_val(&**self) diff --git a/common/base/tests/it/derive.rs b/common/base/tests/it/derive.rs index ab3a22b75a8c7..7e4e494f9ced4 100644 --- a/common/base/tests/it/derive.rs +++ b/common/base/tests/it/derive.rs @@ -15,11 +15,11 @@ use std::collections::HashMap; use std::sync::Arc; -use common_base::infallible::Mutex; use common_base::mem_allocator::malloc_size; use common_base::mem_allocator::MallocSizeOf; use common_base::mem_allocator::MallocSizeOfExt; use common_macros::MallocSizeOf; +use parking_lot::Mutex; #[test] fn derive_vec() { diff --git a/common/base/tests/it/main.rs b/common/base/tests/it/main.rs index aff3ef1f9caf4..2203ffb4dcec8 100644 --- a/common/base/tests/it/main.rs +++ b/common/base/tests/it/main.rs @@ -14,12 +14,10 @@ mod derive; mod malloc_size; -mod mutex; mod pool; mod progress; mod range_key_test; mod range_map_test; mod runtime; -mod rwlock; mod stoppable; mod string_func; diff --git a/common/base/tests/it/mutex.rs b/common/base/tests/it/mutex.rs deleted file mode 100644 index cc7d1de2c2336..0000000000000 --- a/common/base/tests/it/mutex.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#[test] -fn test_mutex() { - use std::sync::Arc; - use std::thread; - - use common_base::infallible::Mutex; - let a = 7u8; - let mutex = Arc::new(Mutex::new(a)); - let mutex2 = mutex.clone(); - let mutex3 = mutex.clone(); - - let thread1 = thread::spawn(move || { - let mut b = mutex2.lock(); - *b = 8; - }); - let thread2 = thread::spawn(move || { - let mut b = mutex3.lock(); - *b = 9; - }); - - let _ = thread1.join(); - let _ = thread2.join(); - - let _locked = mutex.lock(); -} diff --git a/common/base/tests/it/rwlock.rs b/common/base/tests/it/rwlock.rs deleted file mode 100644 index 81600d57ec9e0..0000000000000 --- a/common/base/tests/it/rwlock.rs +++ /dev/null @@ -1,40 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#[test] -fn test_rwlock() { - use std::sync::Arc; - use std::thread; - - use common_base::infallible::RwLock; - - let a = 7u8; - let rwlock = Arc::new(RwLock::new(a)); - let rwlock2 = rwlock.clone(); - let rwlock3 = rwlock.clone(); - - let thread1 = thread::spawn(move || { - let mut b = rwlock2.write(); - *b = 8; - }); - let thread2 = thread::spawn(move || { - let mut b = rwlock3.write(); - *b = 9; - }); - - let _ = thread1.join(); - let _ = thread2.join(); - - let _read = rwlock.read(); -} From b0fa806d44cce03a071f745867fe9a3a401fcc1d Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Mon, 11 Jul 2022 22:23:00 +0800 Subject: [PATCH 2/6] refactor(common): use parking_lot to replace infallible --- common/datablocks/Cargo.toml | 1 + common/datablocks/src/memory.rs | 2 +- common/meta/grpc/Cargo.toml | 1 + common/meta/grpc/src/grpc_client.rs | 2 +- common/metrics/Cargo.toml | 1 + common/metrics/src/recorder.rs | 2 +- common/streams/Cargo.toml | 1 + common/streams/src/stream_error.rs | 2 +- common/users/Cargo.toml | 1 + common/users/src/role_cache_mgr.rs | 2 +- 10 files changed, 10 insertions(+), 5 deletions(-) diff --git a/common/datablocks/Cargo.toml b/common/datablocks/Cargo.toml index 5087cbbb27c23..9889b6e6b7cf1 100644 --- a/common/datablocks/Cargo.toml +++ b/common/datablocks/Cargo.toml @@ -23,6 +23,7 @@ common-io = { path = "../io" } # Crates.io dependencies ahash = "0.7.6" comfy-table = "6.0.0" +parking_lot = "0.12.1" primitive-types = "0.11.1" regex = "1.5.6" serde = { version = "1.0.137", features = ["derive"] } diff --git a/common/datablocks/src/memory.rs b/common/datablocks/src/memory.rs index 79eeb93f205f4..3b2ff46f25f69 100644 --- a/common/datablocks/src/memory.rs +++ b/common/datablocks/src/memory.rs @@ -15,7 +15,7 @@ use std::collections::HashMap; use std::sync::Arc; -use common_base::infallible::RwLock; +use parking_lot::RwLock; use crate::DataBlock; diff --git a/common/meta/grpc/Cargo.toml b/common/meta/grpc/Cargo.toml index ba01cf3645ecf..d230c78cdfee9 100644 --- a/common/meta/grpc/Cargo.toml +++ b/common/meta/grpc/Cargo.toml @@ -29,6 +29,7 @@ common-tracing = { path = "../../tracing" } derive_more = "0.99.17" futures = "0.3.21" once_cell = "1.12.0" +parking_lot = "0.12.1" prost = "0.10.4" rand = "0.8.5" semver = "1.0.10" diff --git a/common/meta/grpc/src/grpc_client.rs b/common/meta/grpc/src/grpc_client.rs index fb38550deab2c..98051d95e7b74 100644 --- a/common/meta/grpc/src/grpc_client.rs +++ b/common/meta/grpc/src/grpc_client.rs @@ -32,7 +32,6 @@ use common_base::base::TrySpawn; use common_base::containers::ItemManager; use common_base::containers::Pool; use common_base::containers::TtlHashMap; -use common_base::infallible::Mutex; use common_exception::ErrorCode; use common_exception::Result; use common_grpc::ConnectionFactory; @@ -64,6 +63,7 @@ use common_metrics::label_histogram_with_val; use common_metrics::label_increment_gauge_with_val_and_labels; use common_tracing::tracing; use futures::stream::StreamExt; +use parking_lot::Mutex; use prost::Message; use semver::Version; use serde::de::DeserializeOwned; diff --git a/common/metrics/Cargo.toml b/common/metrics/Cargo.toml index d2cb42911a32c..250030823fd63 100644 --- a/common/metrics/Cargo.toml +++ b/common/metrics/Cargo.toml @@ -20,6 +20,7 @@ common-tracing = { path = "../tracing" } metrics = "0.19.0" metrics-exporter-prometheus = { version = "0.10.0", default-features = false } once_cell = "1.12.0" +parking_lot = "0.12.1" prometheus-parse = "0.2.3" serde = { version = "1.0.137", features = ["derive"] } diff --git a/common/metrics/src/recorder.rs b/common/metrics/src/recorder.rs index b5c0b4b128942..d8542c257a208 100644 --- a/common/metrics/src/recorder.rs +++ b/common/metrics/src/recorder.rs @@ -15,7 +15,6 @@ use std::sync::Arc; use std::sync::Once; -use common_base::infallible::RwLock; use common_tracing::tracing; use metrics::counter; use metrics::decrement_gauge; @@ -24,6 +23,7 @@ use metrics::increment_gauge; use metrics_exporter_prometheus::PrometheusBuilder; use metrics_exporter_prometheus::PrometheusHandle; use once_cell::sync::Lazy; +use parking_lot::RwLock; static PROMETHEUS_HANDLE: Lazy>>> = Lazy::new(|| Arc::new(RwLock::new(None))); diff --git a/common/streams/Cargo.toml b/common/streams/Cargo.toml index 0c9a74130c9f2..edd26bb34eb5b 100644 --- a/common/streams/Cargo.toml +++ b/common/streams/Cargo.toml @@ -30,6 +30,7 @@ chrono-tz = "0.6.1" csv-async = "1.2.4" futures = "0.3.21" opendal = { version = "0.10.0", features = ["retry", "compress"] } +parking_lot = "0.12.1" pin-project-lite = "0.2.9" serde_json = { version = "1.0.81", default-features = false, features = ["preserve_order"] } tempfile = "3.3.0" diff --git a/common/streams/src/stream_error.rs b/common/streams/src/stream_error.rs index c9d202c9e4c4f..27a1f78dec79a 100644 --- a/common/streams/src/stream_error.rs +++ b/common/streams/src/stream_error.rs @@ -16,11 +16,11 @@ use std::sync::Arc; use std::task::Context; use std::task::Poll; -use common_base::infallible::Mutex; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use futures::Stream; +use parking_lot::Mutex; use pin_project_lite::pin_project; use crate::SendableDataBlockStream; diff --git a/common/users/Cargo.toml b/common/users/Cargo.toml index 2abbb611c45bb..57b8c3c2bb4dc 100644 --- a/common/users/Cargo.toml +++ b/common/users/Cargo.toml @@ -25,6 +25,7 @@ common-tracing = { path = "../tracing" } # Crates.io dependencies jwtk = "0.2.3" +parking_lot = "0.12.1" serde = { version = "1.0.137", features = ["derive"] } [dev-dependencies] diff --git a/common/users/src/role_cache_mgr.rs b/common/users/src/role_cache_mgr.rs index a5111587d47aa..6bc2b5f3a19b2 100644 --- a/common/users/src/role_cache_mgr.rs +++ b/common/users/src/role_cache_mgr.rs @@ -21,10 +21,10 @@ use std::time::Instant; use common_base::base::tokio; use common_base::base::tokio::task::JoinHandle; -use common_base::infallible::RwLock; use common_exception::Result; use common_meta_types::RoleInfo; use common_tracing::tracing; +use parking_lot::RwLock; use crate::UserApiProvider; From 5d616b9af98ddbf80231a0664930e7a818dd3e09 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Mon, 11 Jul 2022 22:30:29 +0800 Subject: [PATCH 3/6] refactor(query): use parking_lot to replace infallible --- Cargo.lock | 6 ++++ query/Cargo.toml | 1 + .../rpc/exchange/exchange_channel_receiver.rs | 2 +- .../rpc/exchange/exchange_channel_sender.rs | 2 +- .../src/api/rpc/exchange/exchange_manager.rs | 2 +- query/src/api/rpc/flight_dispatcher.rs | 2 +- query/src/catalogs/default/catalog_context.rs | 2 +- .../src/catalogs/default/table_memory_meta.rs | 2 +- query/src/databases/database_context.rs | 2 +- query/src/databases/database_factory.rs | 2 +- query/src/interpreters/async_insert_queue.rs | 4 +-- .../src/interpreters/async_insert_queue_v2.rs | 36 +++++++++---------- .../interpreter_factory_interceptor.rs | 2 +- query/src/interpreters/interpreter_insert.rs | 2 +- .../src/interpreters/interpreter_insert_v2.rs | 2 +- .../new/executor/executor_condvar.rs | 6 ++-- .../pipelines/new/executor/executor_tasks.rs | 2 +- .../new/executor/pipeline_pushing_executor.rs | 2 +- .../new/processors/sources/blocks_source.rs | 2 +- .../transforms/hash_join/join_hash_table.rs | 2 +- .../transforms/transform_create_sets.rs | 2 +- .../pipelines/processors/processor_mixed.rs | 2 +- .../transforms/transform_create_sets.rs | 2 +- .../transforms/transform_group_by_final.rs | 2 +- .../src/servers/http/v1/query/expiring_map.rs | 2 +- .../http/v1/query/http_query_manager.rs | 2 +- query/src/sessions/query_ctx.rs | 4 +-- query/src/sessions/query_ctx_shared.rs | 4 +-- query/src/sessions/session.rs | 2 +- query/src/sessions/session_ctx.rs | 2 +- query/src/sessions/session_mgr.rs | 4 +-- query/src/sessions/session_settings.rs | 2 +- query/src/sql/planner/metadata.rs | 2 +- query/src/sql/planner/mod.rs | 2 +- query/src/storages/memory/memory_table.rs | 4 +-- query/src/storages/stage/stage_source.rs | 2 +- query/src/storages/stage/stage_table.rs | 2 +- query/src/storages/storage_context.rs | 2 +- query/src/storages/storage_factory.rs | 2 +- query/src/storages/system/query_log_table.rs | 2 +- .../table_functions/table_function_factory.rs | 2 +- query/tests/it/sql/optimizer/heuristic/mod.rs | 2 +- query/tests/it/sql/planner/format/mod.rs | 2 +- query/tests/it/storages/fuse/io.rs | 2 +- 44 files changed, 72 insertions(+), 67 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7e33035ee0ec9..175faccf0a1f2 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1076,6 +1076,7 @@ dependencies = [ "common-datavalues", "common-exception", "common-io", + "parking_lot 0.12.1", "pretty_assertions", "primitive-types", "regex", @@ -1384,6 +1385,7 @@ dependencies = [ "derive_more", "futures", "once_cell", + "parking_lot 0.12.1", "prost", "rand 0.8.5", "semver", @@ -1492,6 +1494,7 @@ dependencies = [ "metrics", "metrics-exporter-prometheus", "once_cell", + "parking_lot 0.12.1", "prometheus-parse", "serde", "tokio", @@ -1562,6 +1565,7 @@ dependencies = [ "csv-async", "futures", "opendal", + "parking_lot 0.12.1", "pin-project-lite", "serde_json", "tempfile", @@ -1600,6 +1604,7 @@ dependencies = [ "common-meta-types", "common-tracing", "jwtk", + "parking_lot 0.12.1", "pretty_assertions", "serde", ] @@ -2180,6 +2185,7 @@ dependencies = [ "opensrv-clickhouse", "opensrv-mysql", "openssl", + "parking_lot 0.12.1", "paste", "petgraph", "poem", diff --git a/query/Cargo.toml b/query/Cargo.toml index 102f4658f34f8..075daec7b3fa8 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -99,6 +99,7 @@ opendal = { version = "0.10.0", features = ["retry", "compress"] } opensrv-clickhouse = "0.1.0" opensrv-mysql = "0.1.0" openssl = { version = "0.10.40", features = ["vendored"] } +parking_lot = "0.12.1" paste = "1.0.7" petgraph = "0.6.2" poem = { version = "1.3.31", features = ["rustls", "multipart", "compression"] } diff --git a/query/src/api/rpc/exchange/exchange_channel_receiver.rs b/query/src/api/rpc/exchange/exchange_channel_receiver.rs index ea5946d38037b..45b08069277c1 100644 --- a/query/src/api/rpc/exchange/exchange_channel_receiver.rs +++ b/query/src/api/rpc/exchange/exchange_channel_receiver.rs @@ -23,10 +23,10 @@ use common_base::base::tokio::sync::Notify; use common_base::base::tokio::task::JoinHandle; use common_base::base::Runtime; use common_base::base::TrySpawn; -use common_base::infallible::Mutex; use common_exception::ErrorCode; use common_exception::Result; use futures::future::Either; +use parking_lot::Mutex; use crate::api::rpc::exchange::exchange_channel::FragmentReceiver; use crate::api::rpc::packets::DataPacket; diff --git a/query/src/api/rpc/exchange/exchange_channel_sender.rs b/query/src/api/rpc/exchange/exchange_channel_sender.rs index 95c4a2b729a7b..58ecf65b6970f 100644 --- a/query/src/api/rpc/exchange/exchange_channel_sender.rs +++ b/query/src/api/rpc/exchange/exchange_channel_sender.rs @@ -24,11 +24,11 @@ use common_base::base::tokio::task::JoinHandle; use common_base::base::tokio::time::sleep; use common_base::base::Runtime; use common_base::base::TrySpawn; -use common_base::infallible::Mutex; use common_exception::ErrorCode; use common_exception::Result; use common_grpc::ConnectionFactory; use futures_util::future::Either; +use parking_lot::Mutex; use crate::api::rpc::exchange::exchange_channel::FragmentSender; use crate::api::rpc::packets::DataPacket; diff --git a/query/src/api/rpc/exchange/exchange_manager.rs b/query/src/api/rpc/exchange/exchange_manager.rs index 173587927b3a5..74fa11d9d7e93 100644 --- a/query/src/api/rpc/exchange/exchange_manager.rs +++ b/query/src/api/rpc/exchange/exchange_manager.rs @@ -23,7 +23,6 @@ use common_base::base::tokio::task::JoinHandle; use common_base::base::Runtime; use common_base::base::Thread; use common_base::base::TrySpawn; -use common_base::infallible::Mutex; use common_base::infallible::ReentrantMutex; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; @@ -31,6 +30,7 @@ use common_exception::Result; use common_planners::PlanNode; use futures::StreamExt; use futures_util::future::Either; +use parking_lot::Mutex; use tonic::Streaming; use crate::api::rpc::exchange::exchange_channel::FragmentReceiver; diff --git a/query/src/api/rpc/flight_dispatcher.rs b/query/src/api/rpc/flight_dispatcher.rs index ca1770055849b..954f088072e5a 100644 --- a/query/src/api/rpc/flight_dispatcher.rs +++ b/query/src/api/rpc/flight_dispatcher.rs @@ -20,7 +20,6 @@ use std::sync::Arc; use common_base::base::tokio::sync::mpsc::Sender; use common_base::base::tokio::sync::*; use common_base::base::TrySpawn; -use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; @@ -30,6 +29,7 @@ use common_planners::Expression; use common_tracing::tracing; use common_tracing::tracing::Instrument; use common_tracing::tracing::Span; +use parking_lot::RwLock; use tokio_stream::StreamExt; use crate::api::rpc::flight_scatter::FlightScatter; diff --git a/query/src/catalogs/default/catalog_context.rs b/query/src/catalogs/default/catalog_context.rs index fa180eb637a9e..0a2662a61c1be 100644 --- a/query/src/catalogs/default/catalog_context.rs +++ b/query/src/catalogs/default/catalog_context.rs @@ -14,9 +14,9 @@ use std::sync::Arc; -use common_base::infallible::RwLock; use common_datablocks::InMemoryData; use common_meta_store::MetaStore; +use parking_lot::RwLock; use crate::databases::DatabaseFactory; use crate::storages::StorageFactory; diff --git a/query/src/catalogs/default/table_memory_meta.rs b/query/src/catalogs/default/table_memory_meta.rs index eb8119d437c8e..89ff46034dac5 100644 --- a/query/src/catalogs/default/table_memory_meta.rs +++ b/query/src/catalogs/default/table_memory_meta.rs @@ -18,10 +18,10 @@ use std::sync::atomic::AtomicU64; use std::sync::atomic::Ordering; use std::sync::Arc; -use common_base::infallible::RwLock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::MetaId; +use parking_lot::RwLock; use crate::storages::Table; diff --git a/query/src/databases/database_context.rs b/query/src/databases/database_context.rs index ee73821d94cc7..c4ab4cc5fad18 100644 --- a/query/src/databases/database_context.rs +++ b/query/src/databases/database_context.rs @@ -14,9 +14,9 @@ use std::sync::Arc; -use common_base::infallible::RwLock; use common_datablocks::InMemoryData; use common_meta_api::SchemaApi; +use parking_lot::RwLock; /// Database Context. #[derive(Clone)] diff --git a/query/src/databases/database_factory.rs b/query/src/databases/database_factory.rs index f4553d85a4cdb..93d3b9e3e1839 100644 --- a/query/src/databases/database_factory.rs +++ b/query/src/databases/database_factory.rs @@ -16,10 +16,10 @@ use std::collections::HashMap; use std::sync::Arc; -use common_base::infallible::RwLock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_app::schema::DatabaseInfo; +use parking_lot::RwLock; use crate::databases::default::DefaultDatabase; use crate::databases::github::GithubDatabase; diff --git a/query/src/interpreters/async_insert_queue.rs b/query/src/interpreters/async_insert_queue.rs index 3e503b781d0c2..a27911849e68a 100644 --- a/query/src/interpreters/async_insert_queue.rs +++ b/query/src/interpreters/async_insert_queue.rs @@ -24,13 +24,13 @@ use common_base::base::tokio::time::Duration; use common_base::base::tokio::time::Instant; use common_base::base::ProgressValues; use common_base::base::Runtime; -use common_base::infallible::Mutex; -use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_planners::InsertPlan; use common_planners::SelectPlan; +use parking_lot::Mutex; +use parking_lot::RwLock; use super::InsertInterpreter; use super::SelectInterpreter; diff --git a/query/src/interpreters/async_insert_queue_v2.rs b/query/src/interpreters/async_insert_queue_v2.rs index 2ca813fbdc810..dcfe8b91b9263 100644 --- a/query/src/interpreters/async_insert_queue_v2.rs +++ b/query/src/interpreters/async_insert_queue_v2.rs @@ -24,11 +24,11 @@ use common_base::base::tokio::time::Duration; use common_base::base::tokio::time::Instant; use common_base::base::ProgressValues; use common_base::base::Runtime; -use common_base::infallible::Mutex; -use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; +use parking_lot::Mutex; +use parking_lot::RwLock; use super::InsertInterpreterV2; use super::SelectInterpreterV2; @@ -43,8 +43,8 @@ use crate::sessions::SessionManager; use crate::sessions::SessionType; use crate::sessions::Settings; use crate::sql::plans::Insert; -use crate::sql::plans::Plan; use crate::sql::plans::InsertInputSource; +use crate::sql::plans::Plan; use crate::storages::memory::MemoryTableSink; #[derive(Clone)] @@ -256,11 +256,7 @@ impl AsyncInsertQueue { } } - pub async fn push( - self: Arc, - plan: Arc, - ctx: Arc, - ) -> Result<()> { + pub async fn push(self: Arc, plan: Arc, ctx: Arc) -> Result<()> { let self_arc = self.clone(); let plan = plan.clone(); let settings = ctx.get_changed_settings(); @@ -272,14 +268,12 @@ impl AsyncInsertQueue { s_expr, metadata, bind_context, - } => { - SelectInterpreterV2::try_create( - ctx.clone(), - *bind_context.clone(), - s_expr.clone(), - metadata.clone(), - ) - } + } => SelectInterpreterV2::try_create( + ctx.clone(), + *bind_context.clone(), + s_expr.clone(), + metadata.clone(), + ), _ => unreachable!(), }; @@ -294,10 +288,14 @@ impl AsyncInsertQueue { ); } pipeline.add_pipe(sink_pipeline_builder.finalize()); - + let query_need_abort = ctx.query_need_abort(); - let executor = - PipelineCompleteExecutor::try_create(self.runtime.clone(), query_need_abort, pipeline).unwrap(); + let executor = PipelineCompleteExecutor::try_create( + self.runtime.clone(), + query_need_abort, + pipeline, + ) + .unwrap(); executor.execute()?; drop(executor); let blocks = ctx.consume_precommit_blocks(); diff --git a/query/src/interpreters/interpreter_factory_interceptor.rs b/query/src/interpreters/interpreter_factory_interceptor.rs index f382bc8d5e511..066a99299bea0 100644 --- a/query/src/interpreters/interpreter_factory_interceptor.rs +++ b/query/src/interpreters/interpreter_factory_interceptor.rs @@ -15,12 +15,12 @@ use std::sync::Arc; use std::time::SystemTime; -use common_base::infallible::Mutex; use common_exception::Result; use common_planners::PlanNode; use common_streams::ErrorStream; use common_streams::ProgressStream; use common_streams::SendableDataBlockStream; +use parking_lot::Mutex; use crate::interpreters::access::ManagementModeAccess; use crate::interpreters::Interpreter; diff --git a/query/src/interpreters/interpreter_insert.rs b/query/src/interpreters/interpreter_insert.rs index 913c43f4f912d..3306384e91532 100644 --- a/query/src/interpreters/interpreter_insert.rs +++ b/query/src/interpreters/interpreter_insert.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use chrono_tz::Tz; use common_base::base::TrySpawn; -use common_base::infallible::Mutex; use common_datavalues::DataType; use common_exception::ErrorCode; use common_exception::Result; @@ -32,6 +31,7 @@ use common_planners::SelectPlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; use futures::TryStreamExt; +use parking_lot::Mutex; use crate::interpreters::interpreter_insert_with_stream::InsertWithStream; use crate::interpreters::plan_schedulers::InsertWithPlan; diff --git a/query/src/interpreters/interpreter_insert_v2.rs b/query/src/interpreters/interpreter_insert_v2.rs index 6f4ab141d7f96..d84522e9b037e 100644 --- a/query/src/interpreters/interpreter_insert_v2.rs +++ b/query/src/interpreters/interpreter_insert_v2.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use chrono_tz::Tz; use common_base::base::TrySpawn; -use common_base::infallible::Mutex; use common_datavalues::DataType; use common_exception::ErrorCode; use common_exception::Result; @@ -25,6 +24,7 @@ use common_functions::scalars::CastFunction; use common_functions::scalars::FunctionContext; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; +use parking_lot::Mutex; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterPtr; diff --git a/query/src/pipelines/new/executor/executor_condvar.rs b/query/src/pipelines/new/executor/executor_condvar.rs index cf45e3c2a12c3..4fd8823ef9e37 100644 --- a/query/src/pipelines/new/executor/executor_condvar.rs +++ b/query/src/pipelines/new/executor/executor_condvar.rs @@ -17,8 +17,8 @@ use std::sync::atomic::AtomicUsize; use std::sync::atomic::Ordering; use std::sync::Arc; -use common_base::infallible::Condvar; -use common_base::infallible::Mutex; +use parking_lot::Condvar; +use parking_lot::Mutex; struct WorkerCondvar { mutex: Mutex, @@ -29,7 +29,7 @@ impl WorkerCondvar { pub fn create() -> WorkerCondvar { WorkerCondvar { mutex: Mutex::new(false), - condvar: Condvar::create(), + condvar: Condvar::new(), } } } diff --git a/query/src/pipelines/new/executor/executor_tasks.rs b/query/src/pipelines/new/executor/executor_tasks.rs index 4c9b976ca312a..62211ec25e489 100644 --- a/query/src/pipelines/new/executor/executor_tasks.rs +++ b/query/src/pipelines/new/executor/executor_tasks.rs @@ -17,8 +17,8 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use common_base::infallible::Mutex; use common_exception::Result; +use parking_lot::Mutex; use petgraph::prelude::NodeIndex; use crate::pipelines::new::executor::executor_condvar::WorkersCondvar; diff --git a/query/src/pipelines/new/executor/pipeline_pushing_executor.rs b/query/src/pipelines/new/executor/pipeline_pushing_executor.rs index 3282f8dc0ee36..37f7dd3055af6 100644 --- a/query/src/pipelines/new/executor/pipeline_pushing_executor.rs +++ b/query/src/pipelines/new/executor/pipeline_pushing_executor.rs @@ -18,10 +18,10 @@ use std::sync::mpsc::Receiver; use std::sync::mpsc::SyncSender; use std::sync::Arc; -use common_base::infallible::Mutex; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; +use parking_lot::Mutex; use crate::pipelines::new::executor::PipelineExecutor; use crate::pipelines::new::processors::port::OutputPort; diff --git a/query/src/pipelines/new/processors/sources/blocks_source.rs b/query/src/pipelines/new/processors/sources/blocks_source.rs index ef7a17739c1c3..782c17b19f441 100644 --- a/query/src/pipelines/new/processors/sources/blocks_source.rs +++ b/query/src/pipelines/new/processors/sources/blocks_source.rs @@ -15,9 +15,9 @@ use std::collections::VecDeque; use std::sync::Arc; -use common_base::infallible::Mutex; use common_datablocks::DataBlock; use common_exception::Result; +use parking_lot::Mutex; use crate::pipelines::new::processors::port::OutputPort; use crate::pipelines::new::processors::processor::ProcessorPtr; diff --git a/query/src/pipelines/new/processors/transforms/hash_join/join_hash_table.rs b/query/src/pipelines/new/processors/transforms/hash_join/join_hash_table.rs index 9d7d9a731e411..cdbf613ce10a8 100644 --- a/query/src/pipelines/new/processors/transforms/hash_join/join_hash_table.rs +++ b/query/src/pipelines/new/processors/transforms/hash_join/join_hash_table.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use std::sync::Mutex; use common_arrow::arrow::bitmap::MutableBitmap; -use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_datablocks::HashMethod; use common_datablocks::HashMethodFixedKeys; @@ -41,6 +40,7 @@ use common_exception::ErrorCode; use common_exception::Result; use common_hashtable::HashMap; use common_hashtable::HashTableKeyable; +use parking_lot::RwLock; use primitive_types::U256; use primitive_types::U512; diff --git a/query/src/pipelines/new/processors/transforms/transform_create_sets.rs b/query/src/pipelines/new/processors/transforms/transform_create_sets.rs index a40761096cf5d..3a76cda615240 100644 --- a/query/src/pipelines/new/processors/transforms/transform_create_sets.rs +++ b/query/src/pipelines/new/processors/transforms/transform_create_sets.rs @@ -18,7 +18,6 @@ use std::sync::Arc; use common_base::base::tokio::task::JoinHandle; use common_base::base::Runtime; use common_base::base::TrySpawn; -use common_base::infallible::Mutex; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_datavalues::DataType; @@ -28,6 +27,7 @@ use common_exception::Result; use common_planners::Expression; use common_planners::PlanNode; use common_planners::SelectPlan; +use parking_lot::Mutex; use crate::interpreters::Interpreter; use crate::interpreters::SelectInterpreter; diff --git a/query/src/pipelines/processors/processor_mixed.rs b/query/src/pipelines/processors/processor_mixed.rs index 23488ac2ece94..d79e558ac3808 100644 --- a/query/src/pipelines/processors/processor_mixed.rs +++ b/query/src/pipelines/processors/processor_mixed.rs @@ -20,12 +20,12 @@ use std::sync::Arc; use common_base::base::tokio::sync::mpsc; use common_base::base::TrySpawn; -use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; use common_streams::SendableDataBlockStream; use common_tracing::tracing; +use parking_lot::RwLock; use tokio_stream::wrappers::ReceiverStream; use tokio_stream::StreamExt; diff --git a/query/src/pipelines/transforms/transform_create_sets.rs b/query/src/pipelines/transforms/transform_create_sets.rs index ac1cd1d693f72..9999c0e904a00 100644 --- a/query/src/pipelines/transforms/transform_create_sets.rs +++ b/query/src/pipelines/transforms/transform_create_sets.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use common_base::base::tokio::task::JoinHandle; use common_base::base::TrySpawn; -use common_base::infallible::Mutex; use common_datavalues::DataSchemaRef; use common_datavalues::DataValue; use common_exception::ErrorCode; @@ -33,6 +32,7 @@ use futures::future::Shared; use futures::Future; use futures::FutureExt; use futures::StreamExt; +use parking_lot::Mutex; use crate::pipelines::processors::EmptyProcessor; use crate::pipelines::processors::Pipeline; diff --git a/query/src/pipelines/transforms/transform_group_by_final.rs b/query/src/pipelines/transforms/transform_group_by_final.rs index 2ea80e959d54e..e41592a27bb86 100644 --- a/query/src/pipelines/transforms/transform_group_by_final.rs +++ b/query/src/pipelines/transforms/transform_group_by_final.rs @@ -19,7 +19,6 @@ use std::sync::Arc; use std::time::Instant; use bumpalo::Bump; -use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_datablocks::HashMethodKind; use common_datablocks::HashMethodSerializer; @@ -34,6 +33,7 @@ use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; use common_tracing::tracing; use futures::stream::StreamExt; +use parking_lot::RwLock; use crate::pipelines::processors::EmptyProcessor; use crate::pipelines::processors::Processor; diff --git a/query/src/servers/http/v1/query/expiring_map.rs b/query/src/servers/http/v1/query/expiring_map.rs index 1ab1ad4510479..bf6da139782d7 100644 --- a/query/src/servers/http/v1/query/expiring_map.rs +++ b/query/src/servers/http/v1/query/expiring_map.rs @@ -20,7 +20,7 @@ use std::time::Duration; use common_base::base::tokio::task; use common_base::base::tokio::time::sleep; -use common_base::infallible::RwLock; +use parking_lot::RwLock; use crate::servers::http::v1::query::expirable::Expirable; use crate::servers::http::v1::query::expirable::ExpiringState; diff --git a/query/src/servers/http/v1/query/http_query_manager.rs b/query/src/servers/http/v1/query/http_query_manager.rs index 40c73e32c1e22..4c885367d6577 100644 --- a/query/src/servers/http/v1/query/http_query_manager.rs +++ b/query/src/servers/http/v1/query/http_query_manager.rs @@ -19,9 +19,9 @@ use std::time::Duration; use common_base::base::tokio; use common_base::base::tokio::sync::RwLock; use common_base::base::tokio::time::sleep; -use common_base::infallible::Mutex; use common_exception::Result; use common_tracing::tracing; +use parking_lot::Mutex; use super::expiring_map::ExpiringMap; use super::HttpQueryContext; diff --git a/query/src/sessions/query_ctx.rs b/query/src/sessions/query_ctx.rs index 8330b7aace752..afdf36c3f1b97 100644 --- a/query/src/sessions/query_ctx.rs +++ b/query/src/sessions/query_ctx.rs @@ -27,8 +27,6 @@ use common_base::base::Progress; use common_base::base::ProgressValues; use common_base::base::Runtime; use common_base::base::TrySpawn; -use common_base::infallible::Mutex; -use common_base::infallible::RwLock; use common_contexts::DalContext; use common_contexts::DalMetrics; use common_datablocks::DataBlock; @@ -53,6 +51,8 @@ use common_users::RoleCacheMgr; use common_users::UserApiProvider; use futures::future::AbortHandle; use opendal::Operator; +use parking_lot::Mutex; +use parking_lot::RwLock; use crate::api::DataExchangeManager; use crate::auth::AuthMgr; diff --git a/query/src/sessions/query_ctx_shared.rs b/query/src/sessions/query_ctx_shared.rs index 534de3705f14e..9c4270f347363 100644 --- a/query/src/sessions/query_ctx_shared.rs +++ b/query/src/sessions/query_ctx_shared.rs @@ -22,8 +22,6 @@ use std::sync::Arc; use chrono_tz::Tz; use common_base::base::Progress; use common_base::base::Runtime; -use common_base::infallible::Mutex; -use common_base::infallible::RwLock; use common_contexts::DalContext; use common_exception::ErrorCode; use common_exception::Result; @@ -33,6 +31,8 @@ use common_planners::PlanNode; use common_users::RoleCacheMgr; use common_users::UserApiProvider; use futures::future::AbortHandle; +use parking_lot::Mutex; +use parking_lot::RwLock; use uuid::Uuid; use crate::auth::AuthMgr; diff --git a/query/src/sessions/session.rs b/query/src/sessions/session.rs index ca01004f1b963..87f2cf47036e7 100644 --- a/query/src/sessions/session.rs +++ b/query/src/sessions/session.rs @@ -16,7 +16,6 @@ use std::net::SocketAddr; use std::sync::atomic::AtomicUsize; use std::sync::Arc; -use common_base::infallible::RwLock; use common_base::mem_allocator::malloc_size; use common_exception::ErrorCode; use common_exception::Result; @@ -27,6 +26,7 @@ use common_meta_types::UserPrivilegeType; use common_users::RoleCacheMgr; use futures::channel::*; use opendal::Operator; +use parking_lot::RwLock; use crate::catalogs::CatalogManager; use crate::sessions::QueryContext; diff --git a/query/src/sessions/session_ctx.rs b/query/src/sessions/session_ctx.rs index 0e3a527df4652..c1cefe8bc46ef 100644 --- a/query/src/sessions/session_ctx.rs +++ b/query/src/sessions/session_ctx.rs @@ -17,11 +17,11 @@ use std::sync::atomic::AtomicBool; use std::sync::atomic::Ordering; use std::sync::Arc; -use common_base::infallible::RwLock; use common_exception::Result; use common_macros::MallocSizeOf; use common_meta_types::UserInfo; use futures::channel::oneshot::Sender; +use parking_lot::RwLock; use crate::sessions::QueryContextShared; use crate::Config; diff --git a/query/src/sessions/session_mgr.rs b/query/src/sessions/session_mgr.rs index 7bb3dfb841b15..94481385d1a41 100644 --- a/query/src/sessions/session_mgr.rs +++ b/query/src/sessions/session_mgr.rs @@ -23,7 +23,6 @@ use std::time::Duration; use common_base::base::tokio; use common_base::base::Runtime; use common_base::base::SignalStream; -use common_base::infallible::RwLock; use common_contexts::DalRuntime; use common_exception::ErrorCode; use common_exception::Result; @@ -37,6 +36,7 @@ use common_users::UserApiProvider; use futures::future::Either; use futures::StreamExt; use opendal::Operator; +use parking_lot::RwLock; use crate::api::DataExchangeManager; use crate::catalogs::CatalogManager; @@ -371,7 +371,7 @@ impl SessionManager { async fn destroy_idle_sessions(sessions: &Arc>>>) -> bool { // Read lock does not support reentrant - // https://github.com/Amanieu/parking_lot/blob/lock_api-0.4.4/lock_api/src/rwlock.rs#L422 + // https://github.com/Amanieu/parking_lot::/blob/lock_api-0.4.4/lock_api/src/rwlock.rs#L422 let active_sessions_read_guard = sessions.read(); // First try to kill the idle session diff --git a/query/src/sessions/session_settings.rs b/query/src/sessions/session_settings.rs index fd024267ecc8e..e36b775454352 100644 --- a/query/src/sessions/session_settings.rs +++ b/query/src/sessions/session_settings.rs @@ -18,11 +18,11 @@ use std::fmt::Debug; use std::fmt::Formatter; use std::sync::Arc; -use common_base::infallible::RwLock; use common_datavalues::prelude::*; use common_exception::ErrorCode; use common_exception::Result; use itertools::Itertools; +use parking_lot::RwLock; use crate::Config; diff --git a/query/src/sql/planner/metadata.rs b/query/src/sql/planner/metadata.rs index d835d36ed2cb1..56398e34e3f26 100644 --- a/query/src/sql/planner/metadata.rs +++ b/query/src/sql/planner/metadata.rs @@ -17,9 +17,9 @@ use std::sync::Arc; use common_ast::ast::Expr; use common_ast::ast::Literal; -use common_base::infallible::RwLock; use common_datavalues::prelude::*; use common_planners::ReadDataSourcePlan; +use parking_lot::RwLock; use crate::sql::common::IndexType; use crate::storages::Table; diff --git a/query/src/sql/planner/mod.rs b/query/src/sql/planner/mod.rs index 9e1deee22e376..da0859881a5a1 100644 --- a/query/src/sql/planner/mod.rs +++ b/query/src/sql/planner/mod.rs @@ -17,8 +17,8 @@ use std::sync::Arc; use common_ast::parser::parse_sql; use common_ast::parser::tokenize_sql; use common_ast::Backtrace; -use common_base::infallible::RwLock; use common_exception::Result; +use parking_lot::RwLock; pub use plans::ScalarExpr; use crate::sessions::QueryContext; diff --git a/query/src/storages/memory/memory_table.rs b/query/src/storages/memory/memory_table.rs index 1c87d04bdc579..c5e4564b5f990 100644 --- a/query/src/storages/memory/memory_table.rs +++ b/query/src/storages/memory/memory_table.rs @@ -18,8 +18,6 @@ use std::collections::HashSet; use std::collections::VecDeque; use std::sync::Arc; -use common_base::infallible::Mutex; -use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_datavalues::ColumnRef; use common_exception::Result; @@ -30,6 +28,8 @@ use common_planners::ReadDataSourcePlan; use common_planners::Statistics; use common_planners::TruncateTablePlan; use common_streams::SendableDataBlockStream; +use parking_lot::Mutex; +use parking_lot::RwLock; use crate::pipelines::new::processors::port::InputPort; use crate::pipelines::new::processors::port::OutputPort; diff --git a/query/src/storages/stage/stage_source.rs b/query/src/storages/stage/stage_source.rs index ccc953c3bd9ee..f0ae30e8ed3ac 100644 --- a/query/src/storages/stage/stage_source.rs +++ b/query/src/storages/stage/stage_source.rs @@ -15,7 +15,6 @@ use std::collections::VecDeque; use std::sync::Arc; -use common_base::infallible::Mutex; use common_datablocks::DataBlock; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; @@ -37,6 +36,7 @@ use opendal::io_util::CompressAlgorithm; use opendal::io_util::SeekableReader; use opendal::BytesReader; use opendal::Operator; +use parking_lot::Mutex; use crate::pipelines::new::processors::port::OutputPort; use crate::pipelines::new::processors::processor::ProcessorPtr; diff --git a/query/src/storages/stage/stage_table.rs b/query/src/storages/stage/stage_table.rs index 8998e6cdff340..859bbf8f05d95 100644 --- a/query/src/storages/stage/stage_table.rs +++ b/query/src/storages/stage/stage_table.rs @@ -17,7 +17,6 @@ use std::collections::VecDeque; use std::str::FromStr; use std::sync::Arc; -use common_base::infallible::Mutex; use common_datablocks::DataBlock; use common_exception::ErrorCode; use common_exception::Result; @@ -30,6 +29,7 @@ use common_planners::StageTableInfo; use common_planners::Statistics; use common_planners::TruncateTablePlan; use common_streams::SendableDataBlockStream; +use parking_lot::Mutex; use super::StageSource; use crate::pipelines::new::processors::port::OutputPort; diff --git a/query/src/storages/storage_context.rs b/query/src/storages/storage_context.rs index c87fdcbb2c18c..ab3995fae67e6 100644 --- a/query/src/storages/storage_context.rs +++ b/query/src/storages/storage_context.rs @@ -14,9 +14,9 @@ use std::sync::Arc; -use common_base::infallible::RwLock; use common_datablocks::InMemoryData; use common_meta_api::SchemaApi; +use parking_lot::RwLock; /// Storage Context. #[derive(Clone)] diff --git a/query/src/storages/storage_factory.rs b/query/src/storages/storage_factory.rs index 259b1b45a86e3..ae5586d01ea33 100644 --- a/query/src/storages/storage_factory.rs +++ b/query/src/storages/storage_factory.rs @@ -15,10 +15,10 @@ use std::collections::HashMap; use std::sync::Arc; -use common_base::infallible::RwLock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_app::schema::TableInfo; +use parking_lot::RwLock; use super::random::RandomTable; use crate::storages::fuse::FuseTable; diff --git a/query/src/storages/system/query_log_table.rs b/query/src/storages/system/query_log_table.rs index c247eeaac7796..90df70a8dfefb 100644 --- a/query/src/storages/system/query_log_table.rs +++ b/query/src/storages/system/query_log_table.rs @@ -16,7 +16,6 @@ use std::any::Any; use std::collections::VecDeque; use std::sync::Arc; -use common_base::infallible::RwLock; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::Result; @@ -31,6 +30,7 @@ use common_planners::TruncateTablePlan; use common_streams::DataBlockStream; use common_streams::SendableDataBlockStream; use futures::StreamExt; +use parking_lot::RwLock; use crate::pipelines::new::processors::port::OutputPort; use crate::pipelines::new::processors::processor::ProcessorPtr; diff --git a/query/src/table_functions/table_function_factory.rs b/query/src/table_functions/table_function_factory.rs index 9ce2e115ff4be..57cd1518169ef 100644 --- a/query/src/table_functions/table_function_factory.rs +++ b/query/src/table_functions/table_function_factory.rs @@ -16,11 +16,11 @@ use std::collections::HashMap; use std::sync::Arc; -use common_base::infallible::RwLock; use common_exception::ErrorCode; use common_exception::Result; use common_meta_types::MetaId; use common_planners::Expression; +use parking_lot::RwLock; use crate::catalogs::SYS_TBL_FUC_ID_END; use crate::catalogs::SYS_TBL_FUNC_ID_BEGIN; diff --git a/query/tests/it/sql/optimizer/heuristic/mod.rs b/query/tests/it/sql/optimizer/heuristic/mod.rs index a4cfebfce81ba..fa0504291ef1c 100644 --- a/query/tests/it/sql/optimizer/heuristic/mod.rs +++ b/query/tests/it/sql/optimizer/heuristic/mod.rs @@ -22,7 +22,6 @@ use std::sync::Arc; use common_ast::parser::parse_sql; use common_ast::parser::tokenize_sql; use common_ast::Backtrace; -use common_base::infallible::RwLock; use common_exception::ErrorCode; use common_exception::Result; use databend_query::sessions::QueryContext; @@ -32,6 +31,7 @@ use databend_query::sql::optimizer::RuleList; use databend_query::sql::plans::Plan; use databend_query::sql::Binder; use databend_query::sql::Metadata; +use parking_lot::RwLock; pub(super) struct Suite { pub comment: String, diff --git a/query/tests/it/sql/planner/format/mod.rs b/query/tests/it/sql/planner/format/mod.rs index 87c93184157a8..5a9e57936f697 100644 --- a/query/tests/it/sql/planner/format/mod.rs +++ b/query/tests/it/sql/planner/format/mod.rs @@ -14,7 +14,6 @@ use std::sync::Arc; -use common_base::infallible::RwLock; use common_datavalues::BooleanType; use common_datavalues::DataSchemaRefExt; use common_datavalues::DataValue; @@ -35,6 +34,7 @@ use databend_query::sql::plans::PhysicalScan; use databend_query::sql::ColumnBinding; use databend_query::sql::Metadata; use databend_query::storages::Table; +use parking_lot::RwLock; struct DummyTable { table_info: TableInfo, diff --git a/query/tests/it/storages/fuse/io.rs b/query/tests/it/storages/fuse/io.rs index 7765fd51a6b99..dbdbb2515ed60 100644 --- a/query/tests/it/storages/fuse/io.rs +++ b/query/tests/it/storages/fuse/io.rs @@ -17,7 +17,6 @@ use std::fmt::Debug; use std::sync::Arc; use common_base::base::tokio; -use common_base::infallible::Mutex; use common_datablocks::DataBlock; use common_datavalues::prelude::*; use common_exception::ErrorCode; @@ -39,6 +38,7 @@ use opendal::Accessor; use opendal::BytesReader; use opendal::BytesWriter; use opendal::Operator; +use parking_lot::Mutex; use uuid::Uuid; use crate::tests::create_query_context; From 441ddba08e29191ba6e6054bb42a32a31cf47e55 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Mon, 11 Jul 2022 23:56:58 +0800 Subject: [PATCH 4/6] chore: make udeps happy --- Cargo.lock | 1 - common/metrics/Cargo.toml | 1 - 2 files changed, 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 175faccf0a1f2..603bc829043d5 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1488,7 +1488,6 @@ dependencies = [ name = "common-metrics" version = "0.1.0" dependencies = [ - "common-base", "common-exception", "common-tracing", "metrics", diff --git a/common/metrics/Cargo.toml b/common/metrics/Cargo.toml index 250030823fd63..dabf0768c5cd3 100644 --- a/common/metrics/Cargo.toml +++ b/common/metrics/Cargo.toml @@ -12,7 +12,6 @@ test = false [dependencies] # Workspace dependencies -common-base = { path = "../base" } common-exception = { path = "../exception" } common-tracing = { path = "../tracing" } From 9488647476a0307d76d24636fd93e2499dc873a6 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Tue, 12 Jul 2022 10:26:47 +0800 Subject: [PATCH 5/6] refactor: intro atomic_refcell to replace remutex --- Cargo.lock | 7 +++ common/base/src/infallible/mod.rs | 19 ------- common/base/src/infallible/remutex.rs | 41 -------------- common/base/src/infallible/remutex_guard.rs | 53 ------------------- common/base/src/lib.rs | 1 - query/Cargo.toml | 1 + .../src/api/rpc/exchange/exchange_manager.rs | 34 +++++++----- 7 files changed, 30 insertions(+), 126 deletions(-) delete mode 100644 common/base/src/infallible/mod.rs delete mode 100644 common/base/src/infallible/remutex.rs delete mode 100644 common/base/src/infallible/remutex_guard.rs diff --git a/Cargo.lock b/Cargo.lock index 603bc829043d5..902ad659e27b4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -319,6 +319,12 @@ dependencies = [ "crossbeam-utils", ] +[[package]] +name = "atomic_refcell" +version = "0.1.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "73b5e5f48b927f04e952dedc932f31995a65a0bf65ec971c74436e51bf6e970d" + [[package]] name = "atty" version = "0.2.14" @@ -2115,6 +2121,7 @@ dependencies = [ "async-recursion", "async-stream", "async-trait 0.1.56 (git+https://github.com/datafuse-extras/async-trait?rev=f0b0fd5)", + "atomic_refcell", "backoff", "backon", "base64 0.13.0", diff --git a/common/base/src/infallible/mod.rs b/common/base/src/infallible/mod.rs deleted file mode 100644 index a8b611b529688..0000000000000 --- a/common/base/src/infallible/mod.rs +++ /dev/null @@ -1,19 +0,0 @@ -// Copyright 2021 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -mod remutex; -mod remutex_guard; - -pub use remutex::ReentrantMutex; -pub use remutex_guard::ReentrantMutexGuard; diff --git a/common/base/src/infallible/remutex.rs b/common/base/src/infallible/remutex.rs deleted file mode 100644 index 899c2c092ef38..0000000000000 --- a/common/base/src/infallible/remutex.rs +++ /dev/null @@ -1,41 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::cell::UnsafeCell; - -use parking_lot::ReentrantMutex as ParkingReentrantMutex; - -use crate::infallible::ReentrantMutexGuard; - -/// A simple wrapper around the lock() function of a ReentrantMutex -#[derive(Debug)] -pub struct ReentrantMutex(ParkingReentrantMutex>); - -#[allow(suspicious_auto_trait_impls)] -unsafe impl Send for ReentrantMutex> where ParkingReentrantMutex: Send {} - -#[allow(suspicious_auto_trait_impls)] -unsafe impl Sync for ReentrantMutex> where ParkingReentrantMutex: Sync {} - -impl ReentrantMutex { - /// creates mutex - pub fn new(t: T) -> Self { - Self(ParkingReentrantMutex::new(UnsafeCell::new(t))) - } - - /// lock the mutex - pub fn lock(&self) -> ReentrantMutexGuard<'_, T> { - ReentrantMutexGuard::create(self.0.lock()) - } -} diff --git a/common/base/src/infallible/remutex_guard.rs b/common/base/src/infallible/remutex_guard.rs deleted file mode 100644 index 4c3c26a8da5be..0000000000000 --- a/common/base/src/infallible/remutex_guard.rs +++ /dev/null @@ -1,53 +0,0 @@ -// Copyright 2022 Datafuse Labs. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::cell::UnsafeCell; -use std::ops::Deref; -use std::ops::DerefMut; - -use parking_lot::ReentrantMutexGuard as ParkingReentrantMutexGuard; - -pub struct ReentrantMutexGuard<'a, T> { - inner: ParkingReentrantMutexGuard<'a, UnsafeCell>, -} - -#[allow(suspicious_auto_trait_impls)] -unsafe impl<'a, T> Send for ReentrantMutexGuard<'a, UnsafeCell> where ParkingReentrantMutexGuard<'a, T>: Send -{} - -#[allow(suspicious_auto_trait_impls)] -unsafe impl<'a, T> Sync for ReentrantMutexGuard<'a, UnsafeCell> where ParkingReentrantMutexGuard<'a, T>: Sync -{} - -impl<'a, T> ReentrantMutexGuard<'a, T> { - pub fn create( - inner: ParkingReentrantMutexGuard<'a, UnsafeCell>, - ) -> ReentrantMutexGuard<'a, T> { - ReentrantMutexGuard { inner } - } -} - -impl<'a, T> Deref for ReentrantMutexGuard<'a, T> { - type Target = T; - - fn deref(&self) -> &Self::Target { - unsafe { &*self.inner.deref().get() } - } -} - -impl<'a, T> DerefMut for ReentrantMutexGuard<'a, T> { - fn deref_mut(&mut self) -> &mut Self::Target { - unsafe { &mut *self.inner.deref().get() } - } -} diff --git a/common/base/src/lib.rs b/common/base/src/lib.rs index 54fc5af016879..be00d924ddd61 100644 --- a/common/base/src/lib.rs +++ b/common/base/src/lib.rs @@ -16,6 +16,5 @@ pub mod base; pub mod containers; -pub mod infallible; pub mod mem_allocator; pub mod rangemap; diff --git a/query/Cargo.toml b/query/Cargo.toml index 075daec7b3fa8..1544df65cdb88 100644 --- a/query/Cargo.toml +++ b/query/Cargo.toml @@ -69,6 +69,7 @@ async-channel = "1.6.1" async-compat = "0.2.1" async-recursion = "1.0.0" async-stream = "0.3.3" +atomic_refcell = "0.1.8" backoff = { version = "0.4.0", features = ["futures", "tokio"] } backon = "0.0.2" base64 = "0.13.0" diff --git a/query/src/api/rpc/exchange/exchange_manager.rs b/query/src/api/rpc/exchange/exchange_manager.rs index 74fa11d9d7e93..ce55dcccf059f 100644 --- a/query/src/api/rpc/exchange/exchange_manager.rs +++ b/query/src/api/rpc/exchange/exchange_manager.rs @@ -17,13 +17,13 @@ use std::collections::HashMap; use std::sync::Arc; use async_channel::Sender; +use atomic_refcell::AtomicRefCell; use common_arrow::arrow_format::flight::data::FlightData; use common_base::base::tokio::sync::Notify; use common_base::base::tokio::task::JoinHandle; use common_base::base::Runtime; use common_base::base::Thread; use common_base::base::TrySpawn; -use common_base::infallible::ReentrantMutex; use common_datavalues::DataSchemaRef; use common_exception::ErrorCode; use common_exception::Result; @@ -31,6 +31,7 @@ use common_planners::PlanNode; use futures::StreamExt; use futures_util::future::Either; use parking_lot::Mutex; +use parking_lot::ReentrantMutex; use tonic::Streaming; use crate::api::rpc::exchange::exchange_channel::FragmentReceiver; @@ -60,14 +61,14 @@ use crate::Config; pub struct DataExchangeManager { config: Config, - queries_coordinator: ReentrantMutex>, + queries_coordinator: ReentrantMutex>>, } impl DataExchangeManager { pub fn create(config: Config) -> Arc { Arc::new(DataExchangeManager { config, - queries_coordinator: ReentrantMutex::new(HashMap::new()), + queries_coordinator: ReentrantMutex::new(AtomicRefCell::new(HashMap::new())), }) } @@ -82,7 +83,8 @@ impl DataExchangeManager { } } - let mut queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_lock.as_ptr() }; match queries_coordinator.get_mut(&packet.query_id) { None => Err(ErrorCode::LogicalError(format!( @@ -95,7 +97,8 @@ impl DataExchangeManager { // Execute query in background pub fn execute_partial_query(&self, query_id: &str) -> Result<()> { - let mut queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_lock.as_ptr() }; match queries_coordinator.get_mut(query_id) { None => Err(ErrorCode::LogicalError(format!( @@ -112,7 +115,8 @@ impl DataExchangeManager { ctx: &Arc, packet: &QueryFragmentsPlanPacket, ) -> Result<()> { - let mut queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_lock.as_ptr() }; // TODO: When the query is not executed for a long time after submission, we need to remove it match queries_coordinator.entry(packet.query_id.to_owned()) { @@ -142,7 +146,8 @@ impl DataExchangeManager { source: &str, stream: Streaming, ) -> Result> { - let mut queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_lock.as_ptr() }; match queries_coordinator.get_mut(id) { None => Err(ErrorCode::LogicalError(format!( @@ -154,14 +159,16 @@ impl DataExchangeManager { } pub fn on_finished_query(&self, query_id: &str, may_error: &Option) { - let mut queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_lock.as_ptr() }; if let Some(mut query_coordinator) = queries_coordinator.remove(query_id) { query_coordinator.on_finished(may_error); } } pub fn shutdown_query(&self, query_id: &str, cause: Option) -> Result<()> { - let mut queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_lock.as_ptr() }; match queries_coordinator.get_mut(query_id) { None => Err(ErrorCode::LogicalError(format!( @@ -225,7 +232,8 @@ impl DataExchangeManager { ) -> Result<()> { for executor_packet in executor_packet { if executor_packet.executor == executor_packet.request_executor { - let mut queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_lock.as_ptr() }; match queries_coordinator.entry(executor_packet.query_id.to_owned()) { Entry::Vacant(_) => Err(ErrorCode::LogicalError(format!( @@ -248,7 +256,8 @@ impl DataExchangeManager { id: usize, endpoint: &str, ) -> Result { - let queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &*queries_coordinator_lock.as_ptr() }; match queries_coordinator.get(query_id) { None => Err(ErrorCode::LogicalError("Query not exists.")), @@ -263,7 +272,8 @@ impl DataExchangeManager { schema: DataSchemaRef, pipeline: &mut NewPipeline, ) -> Result<()> { - let mut queries_coordinator = self.queries_coordinator.lock(); + let queries_coordinator_lock = self.queries_coordinator.lock(); + let queries_coordinator = unsafe { &mut *queries_coordinator_lock.as_ptr() }; match queries_coordinator.get_mut(&query_id) { None => Err(ErrorCode::LogicalError("Query not exists.")), From 111202e524d64899142d55e1276f4ea010eaf047 Mon Sep 17 00:00:00 2001 From: Chojan Shang Date: Tue, 12 Jul 2022 11:57:09 +0800 Subject: [PATCH 6/6] refactor: apply review comment --- query/src/interpreters/async_insert_queue_v2.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/query/src/interpreters/async_insert_queue_v2.rs b/query/src/interpreters/async_insert_queue_v2.rs index dcfe8b91b9263..9518f3dcd444d 100644 --- a/query/src/interpreters/async_insert_queue_v2.rs +++ b/query/src/interpreters/async_insert_queue_v2.rs @@ -294,8 +294,7 @@ impl AsyncInsertQueue { self.runtime.clone(), query_need_abort, pipeline, - ) - .unwrap(); + )?; executor.execute()?; drop(executor); let blocks = ctx.consume_precommit_blocks();