diff --git a/src/cmd/src/datanode.rs b/src/cmd/src/datanode.rs index 811ed826ad49..be2aedf57ec8 100644 --- a/src/cmd/src/datanode.rs +++ b/src/cmd/src/datanode.rs @@ -59,10 +59,6 @@ impl Instance { } } - pub fn datanode_mut(&mut self) -> &mut Datanode { - &mut self.datanode - } - pub fn datanode(&self) -> &Datanode { &self.datanode } diff --git a/src/cmd/src/flownode.rs b/src/cmd/src/flownode.rs index a9ad12bfbc02..b399bf37f70d 100644 --- a/src/cmd/src/flownode.rs +++ b/src/cmd/src/flownode.rs @@ -63,10 +63,6 @@ impl Instance { } } - pub fn flownode_mut(&mut self) -> &mut FlownodeInstance { - &mut self.flownode - } - pub fn flownode(&self) -> &FlownodeInstance { &self.flownode } diff --git a/src/common/meta/src/cache/table/table_route.rs b/src/common/meta/src/cache/table/table_route.rs index 2383a1ea13d0..840e52f8ae1c 100644 --- a/src/common/meta/src/cache/table/table_route.rs +++ b/src/common/meta/src/cache/table/table_route.rs @@ -49,14 +49,6 @@ impl TableRoute { TableRoute::Logical(_) => None, } } - - /// Returns [LogicalTableRouteValue] reference if it's [TableRoute::Logical]; Otherwise it returns [None]. - pub fn as_logical_table_route_ref(&self) -> Option<&Arc> { - match self { - TableRoute::Physical(_) => None, - TableRoute::Logical(table_route) => Some(table_route), - } - } } /// [TableRouteCache] caches the [TableId] to [TableRoute] mapping. diff --git a/src/common/meta/src/key/table_route.rs b/src/common/meta/src/key/table_route.rs index 96949d2b9fda..b5ebf0b4b1ec 100644 --- a/src/common/meta/src/key/table_route.rs +++ b/src/common/meta/src/key/table_route.rs @@ -290,28 +290,6 @@ impl TableRouteManager { } } - /// Returns the [`PhysicalTableRouteValue`] in the first level, - /// It won't follow the [`LogicalTableRouteValue`] to find the next level [`PhysicalTableRouteValue`]. - /// - /// Returns an error if the first level value is not a [`PhysicalTableRouteValue`]. - pub async fn try_get_physical_table_route( - &self, - table_id: TableId, - ) -> Result> { - match self.storage.get(table_id).await? { - Some(route) => { - ensure!( - route.is_physical(), - UnexpectedLogicalRouteTableSnafu { - err_msg: format!("{route:?} is a non-physical TableRouteValue.") - } - ); - Ok(Some(route.into_physical_table_route())) - } - None => Ok(None), - } - } - /// Returns the [TableId] recursively. /// /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: @@ -569,37 +547,6 @@ impl TableRouteStorage { .transpose() } - /// Returns the physical `DeserializedValueWithBytes` recursively. - /// - /// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if: - /// - the physical table(`logical_or_physical_table_id`) does not exist - /// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist. - pub async fn get_physical_table_route_with_raw_bytes( - &self, - logical_or_physical_table_id: TableId, - ) -> Result<(TableId, DeserializedValueWithBytes)> { - let table_route = self - .get_with_raw_bytes(logical_or_physical_table_id) - .await? - .context(TableRouteNotFoundSnafu { - table_id: logical_or_physical_table_id, - })?; - - match table_route.get_inner_ref() { - TableRouteValue::Physical(_) => Ok((logical_or_physical_table_id, table_route)), - TableRouteValue::Logical(x) => { - let physical_table_id = x.physical_table_id(); - let physical_table_route = self - .get_with_raw_bytes(physical_table_id) - .await? - .context(TableRouteNotFoundSnafu { - table_id: physical_table_id, - })?; - Ok((physical_table_id, physical_table_route)) - } - } - } - /// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`. pub async fn batch_get(&self, table_ids: &[TableId]) -> Result>> { let mut table_routes = self.batch_get_inner(table_ids).await?; diff --git a/src/common/meta/src/rpc/router.rs b/src/common/meta/src/rpc/router.rs index dd7349ae8f79..0e700cc6daaf 100644 --- a/src/common/meta/src/rpc/router.rs +++ b/src/common/meta/src/rpc/router.rs @@ -89,39 +89,6 @@ pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap>() } -/// Returns the HashMap<[RegionNumber], HashSet> -pub fn convert_to_region_peer_map( - region_routes: &[RegionRoute], -) -> HashMap> { - region_routes - .iter() - .map(|x| { - let set = x - .follower_peers - .iter() - .map(|p| p.id) - .chain(x.leader_peer.as_ref().map(|p| p.id)) - .collect::>(); - - (x.region.id.region_number(), set) - }) - .collect::>() -} - -/// Returns the HashMap<[RegionNumber], [LeaderState]>; -pub fn convert_to_region_leader_state_map( - region_routes: &[RegionRoute], -) -> HashMap { - region_routes - .iter() - .filter_map(|x| { - x.leader_state - .as_ref() - .map(|state| (x.region.id.region_number(), *state)) - }) - .collect::>() -} - pub fn find_region_leader( region_routes: &[RegionRoute], region_number: RegionNumber, @@ -147,19 +114,6 @@ pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Ve .collect() } -pub fn extract_all_peers(region_routes: &[RegionRoute]) -> Vec { - let mut peers = region_routes - .iter() - .flat_map(|x| x.leader_peer.iter().chain(x.follower_peers.iter())) - .collect::>() - .into_iter() - .cloned() - .collect::>(); - peers.sort_by_key(|x| x.id); - - peers -} - impl TableRoute { pub fn new(table: Table, region_routes: Vec) -> Self { let region_leaders = region_routes diff --git a/src/common/recordbatch/src/lib.rs b/src/common/recordbatch/src/lib.rs index 257b6f09732a..0281b457495e 100644 --- a/src/common/recordbatch/src/lib.rs +++ b/src/common/recordbatch/src/lib.rs @@ -26,7 +26,6 @@ use std::sync::Arc; use adapter::RecordBatchMetrics; use arc_swap::ArcSwapOption; -use datafusion::physical_plan::memory::MemoryStream; pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream; use datatypes::arrow::compute::SortOptions; pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch; @@ -170,19 +169,6 @@ impl RecordBatches { index: 0, }) } - - pub fn into_df_stream(self) -> DfSendableRecordBatchStream { - let df_record_batches = self - .batches - .into_iter() - .map(|batch| batch.into_df_record_batch()) - .collect(); - // unwrap safety: `MemoryStream::try_new` won't fail - Box::pin( - MemoryStream::try_new(df_record_batches, self.schema.arrow_schema().clone(), None) - .unwrap(), - ) - } } impl IntoIterator for RecordBatches { diff --git a/src/common/time/src/util.rs b/src/common/time/src/util.rs index 19fe3bc9119e..ccb9e1bdd0a3 100644 --- a/src/common/time/src/util.rs +++ b/src/common/time/src/util.rs @@ -29,10 +29,6 @@ pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String { } } -pub fn system_datetime_to_utc(local: &NaiveDateTime) -> LocalResult { - datetime_to_utc(local, get_timezone(None)) -} - /// Cast a [`NaiveDateTime`] with the given timezone. pub fn datetime_to_utc( datetime: &NaiveDateTime, diff --git a/src/flow/src/adapter.rs b/src/flow/src/adapter.rs index 80d03e27706b..7d9ae5e422d2 100644 --- a/src/flow/src/adapter.rs +++ b/src/flow/src/adapter.rs @@ -206,28 +206,6 @@ impl DiffRequest { } } -/// iterate through the diff row and form continuous diff row with same diff type -pub fn diff_row_to_request(rows: Vec) -> Vec { - let mut reqs = Vec::new(); - for (row, ts, diff) in rows { - let last = reqs.last_mut(); - match (last, diff) { - (Some(DiffRequest::Insert(rows)), 1) => { - rows.push((row, ts)); - } - (Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])), - (Some(DiffRequest::Delete(rows)), -1) => { - rows.push((row, ts)); - } - (Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])), - (None, 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])), - (None, -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])), - _ => {} - } - } - reqs -} - pub fn batches_to_rows_req(batches: Vec) -> Result, Error> { let mut reqs = Vec::new(); for batch in batches { diff --git a/src/flow/src/compute/render/src_sink.rs b/src/flow/src/compute/render/src_sink.rs index 62e733420b3c..cc8cf01ff7f3 100644 --- a/src/flow/src/compute/render/src_sink.rs +++ b/src/flow/src/compute/render/src_sink.rs @@ -14,7 +14,7 @@ //! Source and Sink for the dataflow -use std::collections::{BTreeMap, VecDeque}; +use std::collections::BTreeMap; use common_telemetry::{debug, trace}; use hydroflow::scheduled::graph_ext::GraphExt; @@ -28,7 +28,7 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff}; use crate::error::{Error, PlanSnafu}; use crate::expr::error::InternalSnafu; use crate::expr::{Batch, EvalError}; -use crate::repr::{DiffRow, Row, BROADCAST_CAP}; +use crate::repr::{DiffRow, Row}; #[allow(clippy::mutable_key_type)] impl Context<'_, '_> { @@ -242,44 +242,4 @@ impl Context<'_, '_> { }, ); } - - /// Render a sink which send updates to broadcast channel, have internal buffer in case broadcast channel is full - pub fn render_sink(&mut self, bundle: CollectionBundle, sender: broadcast::Sender) { - let CollectionBundle { - collection, - arranged: _, - } = bundle; - let mut buf = VecDeque::with_capacity(1000); - - let schd = self.compute_state.get_scheduler(); - let inner_schd = schd.clone(); - let now = self.compute_state.current_time_ref(); - - let sink = self - .df - .add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| { - let data = recv.take_inner(); - buf.extend(data.into_iter().flat_map(|i| i.into_iter())); - if sender.len() >= BROADCAST_CAP { - return; - } else { - while let Some(row) = buf.pop_front() { - // if the sender is full, stop sending - if sender.len() >= BROADCAST_CAP { - break; - } - // TODO(discord9): handling tokio broadcast error - let _ = sender.send(row); - } - } - - // if buffer is not empty, schedule the next run at next tick - // so the buffer can be drained as soon as possible - if !buf.is_empty() { - inner_schd.schedule_at(*now.borrow() + 1); - } - }); - - schd.set_cur_subgraph(sink); - } } diff --git a/src/flow/src/compute/types.rs b/src/flow/src/compute/types.rs index 00ed660a6ef0..e125a2d27261 100644 --- a/src/flow/src/compute/types.rs +++ b/src/flow/src/compute/types.rs @@ -82,22 +82,6 @@ impl Arranged { writer: self.writer.clone(), }) } - - /// Copy the full arrangement, including the future and the current updates. - /// - /// Internally `Rc-ed` so it's cheap to copy - pub fn try_copy_full(&self) -> Option { - self.arrangement - .clone_full_arrange() - .map(|arrangement| Arranged { - arrangement, - readers: self.readers.clone(), - writer: self.writer.clone(), - }) - } - pub fn add_reader(&self, id: SubgraphId) { - self.readers.borrow_mut().push(id) - } } /// A bundle of the various ways a collection can be represented. diff --git a/src/flow/src/expr/error.rs b/src/flow/src/expr/error.rs index 4b69b3df235e..992d5c592125 100644 --- a/src/flow/src/expr/error.rs +++ b/src/flow/src/expr/error.rs @@ -21,11 +21,6 @@ use datafusion_common::DataFusionError; use datatypes::data_type::ConcreteDataType; use snafu::{Location, Snafu}; -fn is_send_sync() { - fn check() {} - check::(); -} - /// EvalError is about errors happen on columnar evaluation /// /// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column diff --git a/src/flow/src/expr/linear.rs b/src/flow/src/expr/linear.rs index 8e220f7d86a2..373e467aba1b 100644 --- a/src/flow/src/expr/linear.rs +++ b/src/flow/src/expr/linear.rs @@ -359,14 +359,6 @@ impl MapFilterProject { ) } - /// Convert the `MapFilterProject` into a staged evaluation plan. - /// - /// The main behavior is extract temporal predicates, which cannot be evaluated - /// using the standard machinery. - pub fn into_plan(self) -> Result { - MfpPlan::create_from(self) - } - /// Lists input columns whose values are used in outputs. /// /// It is entirely appropriate to determine the demand of an instance @@ -602,26 +594,6 @@ impl SafeMfpPlan { } } - /// A version of `evaluate` which produces an iterator over `Datum` - /// as output. - /// - /// This version can be useful when one wants to capture the resulting - /// datums without packing and then unpacking a row. - #[inline(always)] - pub fn evaluate_iter<'a>( - &'a self, - datums: &'a mut Vec, - ) -> Result + 'a>, EvalError> { - let passed_predicates = self.evaluate_inner(datums)?; - if !passed_predicates { - Ok(None) - } else { - Ok(Some( - self.mfp.projection.iter().map(move |i| datums[*i].clone()), - )) - } - } - /// Populates `values` with `self.expressions` and tests `self.predicates`. /// /// This does not apply `self.projection`, which is up to the calling method. diff --git a/src/flow/src/plan.rs b/src/flow/src/plan.rs index dc86b984ed23..e1cf22e621ec 100644 --- a/src/flow/src/plan.rs +++ b/src/flow/src/plan.rs @@ -18,10 +18,8 @@ mod join; mod reduce; -use std::collections::BTreeSet; - use crate::error::Error; -use crate::expr::{GlobalId, Id, LocalId, MapFilterProject, SafeMfpPlan, TypedExpr}; +use crate::expr::{Id, LocalId, MapFilterProject, SafeMfpPlan, TypedExpr}; use crate::plan::join::JoinPlan; pub(crate) use crate::plan::reduce::{AccumulablePlan, AggrWithIndex, KeyValPlan, ReducePlan}; use crate::repr::{DiffRow, RelationDesc}; @@ -186,48 +184,6 @@ pub enum Plan { }, } -impl Plan { - /// Find all the used collection in the plan - pub fn find_used_collection(&self) -> BTreeSet { - fn recur_find_use(plan: &Plan, used: &mut BTreeSet) { - match plan { - Plan::Get { id } => { - match id { - Id::Local(_) => (), - Id::Global(g) => { - used.insert(*g); - } - }; - } - Plan::Let { value, body, .. } => { - recur_find_use(&value.plan, used); - recur_find_use(&body.plan, used); - } - Plan::Mfp { input, .. } => { - recur_find_use(&input.plan, used); - } - Plan::Reduce { input, .. } => { - recur_find_use(&input.plan, used); - } - Plan::Join { inputs, .. } => { - for input in inputs { - recur_find_use(&input.plan, used); - } - } - Plan::Union { inputs, .. } => { - for input in inputs { - recur_find_use(&input.plan, used); - } - } - _ => {} - } - } - let mut ret = Default::default(); - recur_find_use(self, &mut ret); - ret - } -} - impl Plan { pub fn with_types(self, schema: RelationDesc) -> TypedPlan { TypedPlan { schema, plan: self } diff --git a/src/flow/src/repr/relation.rs b/src/flow/src/repr/relation.rs index 54ad1c5e8ec4..d0fbb861eb24 100644 --- a/src/flow/src/repr/relation.rs +++ b/src/flow/src/repr/relation.rs @@ -46,14 +46,6 @@ impl Key { self.column_indices.push(col); } - /// Add columns to Key - pub fn add_cols(&mut self, cols: I) - where - I: IntoIterator, - { - self.column_indices.extend(cols); - } - /// Remove a column from Key pub fn remove_col(&mut self, col: usize) { self.column_indices.retain(|&r| r != col); diff --git a/src/meta-srv/src/metasrv.rs b/src/meta-srv/src/metasrv.rs index da614ac9b943..c7dcd81e9f09 100644 --- a/src/meta-srv/src/metasrv.rs +++ b/src/meta-srv/src/metasrv.rs @@ -204,10 +204,6 @@ impl Context { pub fn reset_in_memory(&self) { self.in_memory.reset(); } - - pub fn reset_leader_cached_kv_backend(&self) { - self.leader_cached_kv_backend.reset(); - } } /// The value of the leader. It is used to store the leader's address. diff --git a/src/meta-srv/src/mocks.rs b/src/meta-srv/src/mocks.rs index cf9144dc3900..9611fcdd13df 100644 --- a/src/meta-srv/src/mocks.rs +++ b/src/meta-srv/src/mocks.rs @@ -52,11 +52,6 @@ pub async fn mock_with_etcdstore(addr: &str) -> MockInfo { mock(Default::default(), kv_backend, None, None, None).await } -pub async fn mock_with_memstore_and_selector(selector: SelectorRef) -> MockInfo { - let kv_backend = Arc::new(MemoryKvBackend::new()); - mock(Default::default(), kv_backend, Some(selector), None, None).await -} - pub async fn mock( opts: MetasrvOptions, kv_backend: KvBackendRef, diff --git a/src/meta-srv/src/procedure/region_migration.rs b/src/meta-srv/src/procedure/region_migration.rs index 40df9401cb24..1baa0c04d4a1 100644 --- a/src/meta-srv/src/procedure/region_migration.rs +++ b/src/meta-srv/src/procedure/region_migration.rs @@ -364,12 +364,6 @@ impl Context { Ok(datanode_value.as_ref().unwrap()) } - /// Removes the `table_info` of [VolatileContext], returns true if any. - pub fn remove_table_info_value(&mut self) -> bool { - let value = self.volatile_ctx.table_info.take(); - value.is_some() - } - /// Returns the [RegionId]. pub fn region_id(&self) -> RegionId { self.persistent_ctx.region_id diff --git a/src/script/src/python/ffi_types/copr.rs b/src/script/src/python/ffi_types/copr.rs index 1a9a88466b59..e0037550a649 100644 --- a/src/script/src/python/ffi_types/copr.rs +++ b/src/script/src/python/ffi_types/copr.rs @@ -499,26 +499,6 @@ pub fn exec_parsed( } } -/// execute script just like [`exec_coprocessor`] do, -/// but instead of return a internal [`Error`] type, -/// return a friendly String format of error -/// -/// use `ln_offset` and `filename` to offset line number and mark file name in error prompt -#[cfg(test)] -#[allow(dead_code)] -pub fn exec_copr_print( - script: &str, - rb: &Option, - ln_offset: usize, - filename: &str, - eval_ctx: &EvalContext, -) -> StdResult { - let res = exec_coprocessor(script, rb, eval_ctx); - res.map_err(|e| { - crate::python::error::pretty_print_error_in_src(script, &e, ln_offset, filename) - }) -} - #[cfg(test)] mod tests { use crate::python::ffi_types::copr::parse::parse_and_compile_copr; diff --git a/src/session/src/lib.rs b/src/session/src/lib.rs index f553fef58c42..c018d47ebced 100644 --- a/src/session/src/lib.rs +++ b/src/session/src/lib.rs @@ -97,10 +97,6 @@ impl Session { &self.conn_info } - pub fn mut_conn_info(&mut self) -> &mut ConnInfo { - &mut self.conn_info - } - pub fn timezone(&self) -> Timezone { self.mutable_inner.read().unwrap().timezone.clone() } diff --git a/src/sql/src/statements.rs b/src/sql/src/statements.rs index 00196ed5313b..90db401cbaa6 100644 --- a/src/sql/src/statements.rs +++ b/src/sql/src/statements.rs @@ -34,10 +34,8 @@ pub mod truncate; use std::str::FromStr; use api::helper::ColumnDataTypeWrapper; -use api::v1::add_column_location::LocationType; -use api::v1::{AddColumnLocation as Location, SemanticType}; +use api::v1::SemanticType; use common_base::bytes::Bytes; -use common_query::AddColumnLocation; use common_time::timezone::Timezone; use common_time::Timestamp; use datatypes::prelude::ConcreteDataType; @@ -688,22 +686,6 @@ pub fn concrete_data_type_to_sql_data_type(data_type: &ConcreteDataType) -> Resu } } -pub fn sql_location_to_grpc_add_column_location( - location: &Option, -) -> Option { - match location { - Some(AddColumnLocation::First) => Some(Location { - location_type: LocationType::First.into(), - after_column_name: String::default(), - }), - Some(AddColumnLocation::After { column_name }) => Some(Location { - location_type: LocationType::After.into(), - after_column_name: column_name.to_string(), - }), - None => None, - } -} - #[cfg(test)] mod tests { use std::assert_matches::assert_matches;