Skip to content

Commit

Permalink
refactor: remove unused symbols (#5193)
Browse files Browse the repository at this point in the history
chore: remove unused symbols

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 18, 2024
1 parent 218236c commit 548e198
Show file tree
Hide file tree
Showing 20 changed files with 4 additions and 357 deletions.
4 changes: 0 additions & 4 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ impl Instance {
}
}

pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}

pub fn datanode(&self) -> &Datanode {
&self.datanode
}
Expand Down
4 changes: 0 additions & 4 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ impl Instance {
}
}

pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}

pub fn flownode(&self) -> &FlownodeInstance {
&self.flownode
}
Expand Down
8 changes: 0 additions & 8 deletions src/common/meta/src/cache/table/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ impl TableRoute {
TableRoute::Logical(_) => None,
}
}

/// Returns [LogicalTableRouteValue] reference if it's [TableRoute::Logical]; Otherwise it returns [None].
pub fn as_logical_table_route_ref(&self) -> Option<&Arc<LogicalTableRouteValue>> {
match self {
TableRoute::Physical(_) => None,
TableRoute::Logical(table_route) => Some(table_route),
}
}
}

/// [TableRouteCache] caches the [TableId] to [TableRoute] mapping.
Expand Down
53 changes: 0 additions & 53 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,28 +290,6 @@ impl TableRouteManager {
}
}

/// Returns the [`PhysicalTableRouteValue`] in the first level,
/// It won't follow the [`LogicalTableRouteValue`] to find the next level [`PhysicalTableRouteValue`].
///
/// Returns an error if the first level value is not a [`PhysicalTableRouteValue`].
pub async fn try_get_physical_table_route(
&self,
table_id: TableId,
) -> Result<Option<PhysicalTableRouteValue>> {
match self.storage.get(table_id).await? {
Some(route) => {
ensure!(
route.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{route:?} is a non-physical TableRouteValue.")
}
);
Ok(Some(route.into_physical_table_route()))
}
None => Ok(None),
}
}

/// Returns the [TableId] recursively.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
Expand Down Expand Up @@ -569,37 +547,6 @@ impl TableRouteStorage {
.transpose()
}

/// Returns the physical `DeserializedValueWithBytes<TableRouteValue>` recursively.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
/// - the physical table(`logical_or_physical_table_id`) does not exist
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
pub async fn get_physical_table_route_with_raw_bytes(
&self,
logical_or_physical_table_id: TableId,
) -> Result<(TableId, DeserializedValueWithBytes<TableRouteValue>)> {
let table_route = self
.get_with_raw_bytes(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?;

match table_route.get_inner_ref() {
TableRouteValue::Physical(_) => Ok((logical_or_physical_table_id, table_route)),
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route = self
.get_with_raw_bytes(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
Ok((physical_table_id, physical_table_route))
}
}
}

/// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
let mut table_routes = self.batch_get_inner(table_ids).await?;
Expand Down
46 changes: 0 additions & 46 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,39 +89,6 @@ pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<Re
.collect::<HashMap<_, _>>()
}

/// Returns the HashMap<[RegionNumber], HashSet<DatanodeId>>
pub fn convert_to_region_peer_map(
region_routes: &[RegionRoute],
) -> HashMap<RegionNumber, HashSet<u64>> {
region_routes
.iter()
.map(|x| {
let set = x
.follower_peers
.iter()
.map(|p| p.id)
.chain(x.leader_peer.as_ref().map(|p| p.id))
.collect::<HashSet<_>>();

(x.region.id.region_number(), set)
})
.collect::<HashMap<_, _>>()
}

/// Returns the HashMap<[RegionNumber], [LeaderState]>;
pub fn convert_to_region_leader_state_map(
region_routes: &[RegionRoute],
) -> HashMap<RegionNumber, LeaderState> {
region_routes
.iter()
.filter_map(|x| {
x.leader_state
.as_ref()
.map(|state| (x.region.id.region_number(), *state))
})
.collect::<HashMap<_, _>>()
}

pub fn find_region_leader(
region_routes: &[RegionRoute],
region_number: RegionNumber,
Expand All @@ -147,19 +114,6 @@ pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Ve
.collect()
}

pub fn extract_all_peers(region_routes: &[RegionRoute]) -> Vec<Peer> {
let mut peers = region_routes
.iter()
.flat_map(|x| x.leader_peer.iter().chain(x.follower_peers.iter()))
.collect::<HashSet<_>>()
.into_iter()
.cloned()
.collect::<Vec<_>>();
peers.sort_by_key(|x| x.id);

peers
}

impl TableRoute {
pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
let region_leaders = region_routes
Expand Down
14 changes: 0 additions & 14 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::sync::Arc;

use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use datafusion::physical_plan::memory::MemoryStream;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::compute::SortOptions;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
Expand Down Expand Up @@ -170,19 +169,6 @@ impl RecordBatches {
index: 0,
})
}

pub fn into_df_stream(self) -> DfSendableRecordBatchStream {
let df_record_batches = self
.batches
.into_iter()
.map(|batch| batch.into_df_record_batch())
.collect();
// unwrap safety: `MemoryStream::try_new` won't fail
Box::pin(
MemoryStream::try_new(df_record_batches, self.schema.arrow_schema().clone(), None)
.unwrap(),
)
}
}

impl IntoIterator for RecordBatches {
Expand Down
4 changes: 0 additions & 4 deletions src/common/time/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String {
}
}

pub fn system_datetime_to_utc(local: &NaiveDateTime) -> LocalResult<NaiveDateTime> {
datetime_to_utc(local, get_timezone(None))
}

/// Cast a [`NaiveDateTime`] with the given timezone.
pub fn datetime_to_utc(
datetime: &NaiveDateTime,
Expand Down
22 changes: 0 additions & 22 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,28 +206,6 @@ impl DiffRequest {
}
}

/// iterate through the diff row and form continuous diff row with same diff type
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
let mut reqs = Vec::new();
for (row, ts, diff) in rows {
let last = reqs.last_mut();
match (last, diff) {
(Some(DiffRequest::Insert(rows)), 1) => {
rows.push((row, ts));
}
(Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
(Some(DiffRequest::Delete(rows)), -1) => {
rows.push((row, ts));
}
(Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
(None, 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
(None, -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
_ => {}
}
}
reqs
}

pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
let mut reqs = Vec::new();
for batch in batches {
Expand Down
44 changes: 2 additions & 42 deletions src/flow/src/compute/render/src_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Source and Sink for the dataflow
use std::collections::{BTreeMap, VecDeque};
use std::collections::BTreeMap;

use common_telemetry::{debug, trace};
use hydroflow::scheduled::graph_ext::GraphExt;
Expand All @@ -28,7 +28,7 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{Batch, EvalError};
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
use crate::repr::{DiffRow, Row};

#[allow(clippy::mutable_key_type)]
impl Context<'_, '_> {
Expand Down Expand Up @@ -242,44 +242,4 @@ impl Context<'_, '_> {
},
);
}

/// Render a sink which send updates to broadcast channel, have internal buffer in case broadcast channel is full
pub fn render_sink(&mut self, bundle: CollectionBundle, sender: broadcast::Sender<DiffRow>) {
let CollectionBundle {
collection,
arranged: _,
} = bundle;
let mut buf = VecDeque::with_capacity(1000);

let schd = self.compute_state.get_scheduler();
let inner_schd = schd.clone();
let now = self.compute_state.current_time_ref();

let sink = self
.df
.add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| {
let data = recv.take_inner();
buf.extend(data.into_iter().flat_map(|i| i.into_iter()));
if sender.len() >= BROADCAST_CAP {
return;
} else {
while let Some(row) = buf.pop_front() {
// if the sender is full, stop sending
if sender.len() >= BROADCAST_CAP {
break;
}
// TODO(discord9): handling tokio broadcast error
let _ = sender.send(row);
}
}

// if buffer is not empty, schedule the next run at next tick
// so the buffer can be drained as soon as possible
if !buf.is_empty() {
inner_schd.schedule_at(*now.borrow() + 1);
}
});

schd.set_cur_subgraph(sink);
}
}
16 changes: 0 additions & 16 deletions src/flow/src/compute/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,6 @@ impl Arranged {
writer: self.writer.clone(),
})
}

/// Copy the full arrangement, including the future and the current updates.
///
/// Internally `Rc-ed` so it's cheap to copy
pub fn try_copy_full(&self) -> Option<Self> {
self.arrangement
.clone_full_arrange()
.map(|arrangement| Arranged {
arrangement,
readers: self.readers.clone(),
writer: self.writer.clone(),
})
}
pub fn add_reader(&self, id: SubgraphId) {
self.readers.borrow_mut().push(id)
}
}

/// A bundle of the various ways a collection can be represented.
Expand Down
5 changes: 0 additions & 5 deletions src/flow/src/expr/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ use datafusion_common::DataFusionError;
use datatypes::data_type::ConcreteDataType;
use snafu::{Location, Snafu};

fn is_send_sync() {
fn check<T: Send + Sync>() {}
check::<EvalError>();
}

/// EvalError is about errors happen on columnar evaluation
///
/// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column
Expand Down
28 changes: 0 additions & 28 deletions src/flow/src/expr/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,6 @@ impl MapFilterProject {
)
}

/// Convert the `MapFilterProject` into a staged evaluation plan.
///
/// The main behavior is extract temporal predicates, which cannot be evaluated
/// using the standard machinery.
pub fn into_plan(self) -> Result<MfpPlan, Error> {
MfpPlan::create_from(self)
}

/// Lists input columns whose values are used in outputs.
///
/// It is entirely appropriate to determine the demand of an instance
Expand Down Expand Up @@ -602,26 +594,6 @@ impl SafeMfpPlan {
}
}

/// A version of `evaluate` which produces an iterator over `Datum`
/// as output.
///
/// This version can be useful when one wants to capture the resulting
/// datums without packing and then unpacking a row.
#[inline(always)]
pub fn evaluate_iter<'a>(
&'a self,
datums: &'a mut Vec<Value>,
) -> Result<Option<impl Iterator<Item = Value> + 'a>, EvalError> {
let passed_predicates = self.evaluate_inner(datums)?;
if !passed_predicates {
Ok(None)
} else {
Ok(Some(
self.mfp.projection.iter().map(move |i| datums[*i].clone()),
))
}
}

/// Populates `values` with `self.expressions` and tests `self.predicates`.
///
/// This does not apply `self.projection`, which is up to the calling method.
Expand Down
Loading

0 comments on commit 548e198

Please sign in to comment.