Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: remove unused symbols #5193

Merged
merged 1 commit into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions src/cmd/src/datanode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,6 @@ impl Instance {
}
}

pub fn datanode_mut(&mut self) -> &mut Datanode {
&mut self.datanode
}

pub fn datanode(&self) -> &Datanode {
&self.datanode
}
Expand Down
4 changes: 0 additions & 4 deletions src/cmd/src/flownode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,10 +63,6 @@ impl Instance {
}
}

pub fn flownode_mut(&mut self) -> &mut FlownodeInstance {
&mut self.flownode
}

pub fn flownode(&self) -> &FlownodeInstance {
&self.flownode
}
Expand Down
8 changes: 0 additions & 8 deletions src/common/meta/src/cache/table/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,6 @@ impl TableRoute {
TableRoute::Logical(_) => None,
}
}

/// Returns [LogicalTableRouteValue] reference if it's [TableRoute::Logical]; Otherwise it returns [None].
pub fn as_logical_table_route_ref(&self) -> Option<&Arc<LogicalTableRouteValue>> {
match self {
TableRoute::Physical(_) => None,
TableRoute::Logical(table_route) => Some(table_route),
}
}
}

/// [TableRouteCache] caches the [TableId] to [TableRoute] mapping.
Expand Down
53 changes: 0 additions & 53 deletions src/common/meta/src/key/table_route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -290,28 +290,6 @@ impl TableRouteManager {
}
}

/// Returns the [`PhysicalTableRouteValue`] in the first level,
/// It won't follow the [`LogicalTableRouteValue`] to find the next level [`PhysicalTableRouteValue`].
///
/// Returns an error if the first level value is not a [`PhysicalTableRouteValue`].
pub async fn try_get_physical_table_route(
&self,
table_id: TableId,
) -> Result<Option<PhysicalTableRouteValue>> {
match self.storage.get(table_id).await? {
Some(route) => {
ensure!(
route.is_physical(),
UnexpectedLogicalRouteTableSnafu {
err_msg: format!("{route:?} is a non-physical TableRouteValue.")
}
);
Ok(Some(route.into_physical_table_route()))
}
None => Ok(None),
}
}

/// Returns the [TableId] recursively.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
Expand Down Expand Up @@ -569,37 +547,6 @@ impl TableRouteStorage {
.transpose()
}

/// Returns the physical `DeserializedValueWithBytes<TableRouteValue>` recursively.
///
/// Returns a [TableRouteNotFound](crate::error::Error::TableRouteNotFound) Error if:
/// - the physical table(`logical_or_physical_table_id`) does not exist
/// - the corresponding physical table of the logical table(`logical_or_physical_table_id`) does not exist.
pub async fn get_physical_table_route_with_raw_bytes(
&self,
logical_or_physical_table_id: TableId,
) -> Result<(TableId, DeserializedValueWithBytes<TableRouteValue>)> {
let table_route = self
.get_with_raw_bytes(logical_or_physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: logical_or_physical_table_id,
})?;

match table_route.get_inner_ref() {
TableRouteValue::Physical(_) => Ok((logical_or_physical_table_id, table_route)),
TableRouteValue::Logical(x) => {
let physical_table_id = x.physical_table_id();
let physical_table_route = self
.get_with_raw_bytes(physical_table_id)
.await?
.context(TableRouteNotFoundSnafu {
table_id: physical_table_id,
})?;
Ok((physical_table_id, physical_table_route))
}
}
}

/// Returns batch of [`TableRouteValue`] that respects the order of `table_ids`.
pub async fn batch_get(&self, table_ids: &[TableId]) -> Result<Vec<Option<TableRouteValue>>> {
let mut table_routes = self.batch_get_inner(table_ids).await?;
Expand Down
46 changes: 0 additions & 46 deletions src/common/meta/src/rpc/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,39 +89,6 @@ pub fn convert_to_region_leader_map(region_routes: &[RegionRoute]) -> HashMap<Re
.collect::<HashMap<_, _>>()
}

/// Returns the HashMap<[RegionNumber], HashSet<DatanodeId>>
pub fn convert_to_region_peer_map(
region_routes: &[RegionRoute],
) -> HashMap<RegionNumber, HashSet<u64>> {
region_routes
.iter()
.map(|x| {
let set = x
.follower_peers
.iter()
.map(|p| p.id)
.chain(x.leader_peer.as_ref().map(|p| p.id))
.collect::<HashSet<_>>();

(x.region.id.region_number(), set)
})
.collect::<HashMap<_, _>>()
}

/// Returns the HashMap<[RegionNumber], [LeaderState]>;
pub fn convert_to_region_leader_state_map(
region_routes: &[RegionRoute],
) -> HashMap<RegionNumber, LeaderState> {
region_routes
.iter()
.filter_map(|x| {
x.leader_state
.as_ref()
.map(|state| (x.region.id.region_number(), *state))
})
.collect::<HashMap<_, _>>()
}

pub fn find_region_leader(
region_routes: &[RegionRoute],
region_number: RegionNumber,
Expand All @@ -147,19 +114,6 @@ pub fn find_leader_regions(region_routes: &[RegionRoute], datanode: &Peer) -> Ve
.collect()
}

pub fn extract_all_peers(region_routes: &[RegionRoute]) -> Vec<Peer> {
let mut peers = region_routes
.iter()
.flat_map(|x| x.leader_peer.iter().chain(x.follower_peers.iter()))
.collect::<HashSet<_>>()
.into_iter()
.cloned()
.collect::<Vec<_>>();
peers.sort_by_key(|x| x.id);

peers
}

impl TableRoute {
pub fn new(table: Table, region_routes: Vec<RegionRoute>) -> Self {
let region_leaders = region_routes
Expand Down
14 changes: 0 additions & 14 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use std::sync::Arc;

use adapter::RecordBatchMetrics;
use arc_swap::ArcSwapOption;
use datafusion::physical_plan::memory::MemoryStream;
pub use datafusion::physical_plan::SendableRecordBatchStream as DfSendableRecordBatchStream;
use datatypes::arrow::compute::SortOptions;
pub use datatypes::arrow::record_batch::RecordBatch as DfRecordBatch;
Expand Down Expand Up @@ -170,19 +169,6 @@ impl RecordBatches {
index: 0,
})
}

pub fn into_df_stream(self) -> DfSendableRecordBatchStream {
let df_record_batches = self
.batches
.into_iter()
.map(|batch| batch.into_df_record_batch())
.collect();
// unwrap safety: `MemoryStream::try_new` won't fail
Box::pin(
MemoryStream::try_new(df_record_batches, self.schema.arrow_schema().clone(), None)
.unwrap(),
)
}
}

impl IntoIterator for RecordBatches {
Expand Down
4 changes: 0 additions & 4 deletions src/common/time/src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,6 @@ pub fn format_utc_datetime(utc: &NaiveDateTime, pattern: &str) -> String {
}
}

pub fn system_datetime_to_utc(local: &NaiveDateTime) -> LocalResult<NaiveDateTime> {
datetime_to_utc(local, get_timezone(None))
}

/// Cast a [`NaiveDateTime`] with the given timezone.
pub fn datetime_to_utc(
datetime: &NaiveDateTime,
Expand Down
22 changes: 0 additions & 22 deletions src/flow/src/adapter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -206,28 +206,6 @@ impl DiffRequest {
}
}

/// iterate through the diff row and form continuous diff row with same diff type
pub fn diff_row_to_request(rows: Vec<DiffRow>) -> Vec<DiffRequest> {
let mut reqs = Vec::new();
for (row, ts, diff) in rows {
let last = reqs.last_mut();
match (last, diff) {
(Some(DiffRequest::Insert(rows)), 1) => {
rows.push((row, ts));
}
(Some(DiffRequest::Insert(_)), -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
(Some(DiffRequest::Delete(rows)), -1) => {
rows.push((row, ts));
}
(Some(DiffRequest::Delete(_)), 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
(None, 1) => reqs.push(DiffRequest::Insert(vec![(row, ts)])),
(None, -1) => reqs.push(DiffRequest::Delete(vec![(row, ts)])),
_ => {}
}
}
reqs
}

pub fn batches_to_rows_req(batches: Vec<Batch>) -> Result<Vec<DiffRequest>, Error> {
let mut reqs = Vec::new();
for batch in batches {
Expand Down
44 changes: 2 additions & 42 deletions src/flow/src/compute/render/src_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Source and Sink for the dataflow

use std::collections::{BTreeMap, VecDeque};
use std::collections::BTreeMap;

use common_telemetry::{debug, trace};
use hydroflow::scheduled::graph_ext::GraphExt;
Expand All @@ -28,7 +28,7 @@ use crate::compute::types::{Arranged, Collection, CollectionBundle, Toff};
use crate::error::{Error, PlanSnafu};
use crate::expr::error::InternalSnafu;
use crate::expr::{Batch, EvalError};
use crate::repr::{DiffRow, Row, BROADCAST_CAP};
use crate::repr::{DiffRow, Row};

#[allow(clippy::mutable_key_type)]
impl Context<'_, '_> {
Expand Down Expand Up @@ -242,44 +242,4 @@ impl Context<'_, '_> {
},
);
}

/// Render a sink which send updates to broadcast channel, have internal buffer in case broadcast channel is full
pub fn render_sink(&mut self, bundle: CollectionBundle, sender: broadcast::Sender<DiffRow>) {
let CollectionBundle {
collection,
arranged: _,
} = bundle;
let mut buf = VecDeque::with_capacity(1000);

let schd = self.compute_state.get_scheduler();
let inner_schd = schd.clone();
let now = self.compute_state.current_time_ref();

let sink = self
.df
.add_subgraph_sink("Sink", collection.into_inner(), move |_ctx, recv| {
let data = recv.take_inner();
buf.extend(data.into_iter().flat_map(|i| i.into_iter()));
if sender.len() >= BROADCAST_CAP {
return;
} else {
while let Some(row) = buf.pop_front() {
// if the sender is full, stop sending
if sender.len() >= BROADCAST_CAP {
break;
}
// TODO(discord9): handling tokio broadcast error
let _ = sender.send(row);
}
}

// if buffer is not empty, schedule the next run at next tick
// so the buffer can be drained as soon as possible
if !buf.is_empty() {
inner_schd.schedule_at(*now.borrow() + 1);
}
});

schd.set_cur_subgraph(sink);
}
}
16 changes: 0 additions & 16 deletions src/flow/src/compute/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,22 +82,6 @@ impl Arranged {
writer: self.writer.clone(),
})
}

/// Copy the full arrangement, including the future and the current updates.
///
/// Internally `Rc-ed` so it's cheap to copy
pub fn try_copy_full(&self) -> Option<Self> {
self.arrangement
.clone_full_arrange()
.map(|arrangement| Arranged {
arrangement,
readers: self.readers.clone(),
writer: self.writer.clone(),
})
}
pub fn add_reader(&self, id: SubgraphId) {
self.readers.borrow_mut().push(id)
}
}

/// A bundle of the various ways a collection can be represented.
Expand Down
5 changes: 0 additions & 5 deletions src/flow/src/expr/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,6 @@ use datafusion_common::DataFusionError;
use datatypes::data_type::ConcreteDataType;
use snafu::{Location, Snafu};

fn is_send_sync() {
fn check<T: Send + Sync>() {}
check::<EvalError>();
}

/// EvalError is about errors happen on columnar evaluation
///
/// TODO(discord9): add detailed location of column/operator(instead of code) to errors tp help identify related column
Expand Down
28 changes: 0 additions & 28 deletions src/flow/src/expr/linear.rs
Original file line number Diff line number Diff line change
Expand Up @@ -359,14 +359,6 @@ impl MapFilterProject {
)
}

/// Convert the `MapFilterProject` into a staged evaluation plan.
///
/// The main behavior is extract temporal predicates, which cannot be evaluated
/// using the standard machinery.
pub fn into_plan(self) -> Result<MfpPlan, Error> {
MfpPlan::create_from(self)
}

/// Lists input columns whose values are used in outputs.
///
/// It is entirely appropriate to determine the demand of an instance
Expand Down Expand Up @@ -602,26 +594,6 @@ impl SafeMfpPlan {
}
}

/// A version of `evaluate` which produces an iterator over `Datum`
/// as output.
///
/// This version can be useful when one wants to capture the resulting
/// datums without packing and then unpacking a row.
#[inline(always)]
pub fn evaluate_iter<'a>(
&'a self,
datums: &'a mut Vec<Value>,
) -> Result<Option<impl Iterator<Item = Value> + 'a>, EvalError> {
let passed_predicates = self.evaluate_inner(datums)?;
if !passed_predicates {
Ok(None)
} else {
Ok(Some(
self.mfp.projection.iter().map(move |i| datums[*i].clone()),
))
}
}

/// Populates `values` with `self.expressions` and tests `self.predicates`.
///
/// This does not apply `self.projection`, which is up to the calling method.
Expand Down
Loading
Loading