Skip to content

Commit

Permalink
style: rename *Adaptor to *Adapter (#2914)
Browse files Browse the repository at this point in the history
* rename RecordBatchStreamAdaptor to RecordBatchStreamWrapper

Signed-off-by: Ruihang Xia <[email protected]>

* rename ServerSqlQueryHandlerAdaptor to ServerSqlQueryHandlerAdapter

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Dec 12, 2023
1 parent cf6bba0 commit 8a74bd3
Show file tree
Hide file tree
Showing 11 changed files with 46 additions and 46 deletions.
4 changes: 2 additions & 2 deletions src/catalog/src/information_schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::sync::{Arc, Weak};

use common_catalog::consts::INFORMATION_SCHEMA_NAME;
use common_error::ext::BoxedError;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use datatypes::schema::SchemaRef;
use futures_util::StreamExt;
use snafu::ResultExt;
Expand Down Expand Up @@ -171,7 +171,7 @@ impl DataSource for InformationTableDataSource {
None => batch,
});

let stream = RecordBatchStreamAdaptor {
let stream = RecordBatchStreamWrapper {
schema: projected_schema,
stream: Box::pin(stream),
output_ordering: None,
Expand Down
4 changes: 2 additions & 2 deletions src/client/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ use common_error::ext::{BoxedError, ErrorExt};
use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_query::Output;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::RecordBatchStreamAdaptor;
use common_recordbatch::RecordBatchStreamWrapper;
use common_telemetry::logging;
use common_telemetry::tracing_context::W3cTrace;
use futures_util::StreamExt;
Expand Down Expand Up @@ -315,7 +315,7 @@ impl Database {
yield Ok(record_batch);
}
}));
let record_batch_stream = RecordBatchStreamAdaptor {
let record_batch_stream = RecordBatchStreamWrapper {
schema,
stream,
output_ordering: None,
Expand Down
4 changes: 2 additions & 2 deletions src/client/src/region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use common_grpc::flight::{FlightDecoder, FlightMessage};
use common_meta::datanode_manager::{AffectedRows, Datanode};
use common_meta::error::{self as meta_error, Result as MetaResult};
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_recordbatch::{RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::error;
use prost::Message;
use snafu::{location, Location, OptionExt, ResultExt};
Expand Down Expand Up @@ -136,7 +136,7 @@ impl RegionRequester {
yield Ok(record_batch);
}
}));
let record_batch_stream = RecordBatchStreamAdaptor {
let record_batch_stream = RecordBatchStreamWrapper {
schema,
stream,
output_ordering: None,
Expand Down
14 changes: 7 additions & 7 deletions src/common/recordbatch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,16 +202,16 @@ impl Stream for SimpleRecordBatchStream {
}

/// Adapt a [Stream] of [RecordBatch] to a [RecordBatchStream].
pub struct RecordBatchStreamAdaptor<S> {
pub struct RecordBatchStreamWrapper<S> {
pub schema: SchemaRef,
pub stream: S,
pub output_ordering: Option<Vec<OrderOption>>,
}

impl<S> RecordBatchStreamAdaptor<S> {
/// Creates a RecordBatchStreamAdaptor without output ordering requirement.
pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamAdaptor<S> {
RecordBatchStreamAdaptor {
impl<S> RecordBatchStreamWrapper<S> {
/// Creates a [RecordBatchStreamWrapper] without output ordering requirement.
pub fn new(schema: SchemaRef, stream: S) -> RecordBatchStreamWrapper<S> {
RecordBatchStreamWrapper {
schema,
stream,
output_ordering: None,
Expand All @@ -220,7 +220,7 @@ impl<S> RecordBatchStreamAdaptor<S> {
}

impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
for RecordBatchStreamAdaptor<S>
for RecordBatchStreamWrapper<S>
{
fn schema(&self) -> SchemaRef {
self.schema.clone()
Expand All @@ -231,7 +231,7 @@ impl<S: Stream<Item = Result<RecordBatch>> + Unpin> RecordBatchStream
}
}

impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamAdaptor<S> {
impl<S: Stream<Item = Result<RecordBatch>> + Unpin> Stream for RecordBatchStreamWrapper<S> {
type Item = Result<RecordBatch>;

fn poll_next(mut self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Expand Down
14 changes: 7 additions & 7 deletions src/frontend/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::opentsdb::OpentsdbServer;
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::ServerSqlQueryHandlerAdaptor;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::ServerSqlQueryHandlerAdapter;
use servers::server::{Server, ServerHandler, ServerHandlers};
use snafu::ResultExt;

Expand Down Expand Up @@ -70,7 +70,7 @@ impl Services {
};
let grpc_server = GrpcServer::new(
Some(grpc_config),
Some(ServerGrpcQueryHandlerAdaptor::arc(instance.clone())),
Some(ServerGrpcQueryHandlerAdapter::arc(instance.clone())),
Some(instance.clone()),
None,
None,
Expand All @@ -88,8 +88,8 @@ impl Services {

let mut http_server_builder = HttpServerBuilder::new(http_options.clone());
let _ = http_server_builder
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(instance.clone()));
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(instance.clone()));

if let Some(user_provider) = user_provider.clone() {
let _ = http_server_builder.with_user_provider(user_provider);
Expand Down Expand Up @@ -137,7 +137,7 @@ impl Services {
let mysql_server = MysqlServer::create_server(
mysql_io_runtime,
Arc::new(MysqlSpawnRef::new(
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
ServerSqlQueryHandlerAdapter::arc(instance.clone()),
user_provider.clone(),
)),
Arc::new(MysqlSpawnConfig::new(
Expand Down Expand Up @@ -167,7 +167,7 @@ impl Services {
);

let pg_server = Box::new(PostgresServer::new(
ServerSqlQueryHandlerAdaptor::arc(instance.clone()),
ServerSqlQueryHandlerAdapter::arc(instance.clone()),
opts.tls.clone(),
pg_io_runtime,
user_provider.clone(),
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use std::time::{Duration, Instant};
use async_stream::try_stream;
use common_error::ext::BoxedError;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream};
use common_recordbatch::{RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream};
use common_telemetry::{debug, error};
use common_time::range::TimestampRange;
use snafu::ResultExt;
Expand Down Expand Up @@ -164,7 +164,7 @@ impl SeqScan {
// Update metrics.
READ_STAGE_ELAPSED.with_label_values(&["total"]).observe(metrics.scan_cost.as_secs_f64());
};
let stream = Box::pin(RecordBatchStreamAdaptor::new(
let stream = Box::pin(RecordBatchStreamWrapper::new(
self.mapper.output_schema(),
Box::pin(stream),
));
Expand Down
4 changes: 2 additions & 2 deletions src/query/src/dist_plan/merge_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_query::physical_plan::TaskContext;
use common_recordbatch::adapter::DfRecordBatchStreamAdapter;
use common_recordbatch::error::ExternalSnafu;
use common_recordbatch::{
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamAdaptor, SendableRecordBatchStream,
DfSendableRecordBatchStream, RecordBatch, RecordBatchStreamWrapper, SendableRecordBatchStream,
};
use common_telemetry::tracing;
use common_telemetry::tracing_context::TracingContext;
Expand Down Expand Up @@ -217,7 +217,7 @@ impl MergeScanExec {
}
}));

Ok(Box::pin(RecordBatchStreamAdaptor {
Ok(Box::pin(RecordBatchStreamWrapper {
schema: self.schema.clone(),
stream,
output_ordering: None,
Expand Down
8 changes: 4 additions & 4 deletions src/servers/src/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -802,8 +802,8 @@ mod test {

use super::*;
use crate::error::Error;
use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdaptor};
use crate::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler};
use crate::query_handler::grpc::{GrpcQueryHandler, ServerGrpcQueryHandlerAdapter};
use crate::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};

struct DummyInstance {
_tx: mpsc::Sender<(String, Vec<u8>)>,
Expand Down Expand Up @@ -869,8 +869,8 @@ mod test {

fn make_test_app(tx: mpsc::Sender<(String, Vec<u8>)>) -> Router {
let instance = Arc::new(DummyInstance { _tx: tx });
let sql_instance = ServerSqlQueryHandlerAdaptor::arc(instance.clone());
let grpc_instance = ServerGrpcQueryHandlerAdaptor::arc(instance);
let sql_instance = ServerSqlQueryHandlerAdapter::arc(instance.clone());
let grpc_instance = ServerGrpcQueryHandlerAdapter::arc(instance);
let server = HttpServerBuilder::new(HttpOptions::default())
.with_sql_handler(sql_instance)
.with_grpc_handler(grpc_instance)
Expand Down
6 changes: 3 additions & 3 deletions src/servers/src/query_handler/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,16 +37,16 @@ pub trait GrpcQueryHandler {
) -> std::result::Result<Output, Self::Error>;
}

pub struct ServerGrpcQueryHandlerAdaptor<E>(GrpcQueryHandlerRef<E>);
pub struct ServerGrpcQueryHandlerAdapter<E>(GrpcQueryHandlerRef<E>);

impl<E> ServerGrpcQueryHandlerAdaptor<E> {
impl<E> ServerGrpcQueryHandlerAdapter<E> {
pub fn arc(handler: GrpcQueryHandlerRef<E>) -> Arc<Self> {
Arc::new(Self(handler))
}
}

#[async_trait]
impl<E> GrpcQueryHandler for ServerGrpcQueryHandlerAdaptor<E>
impl<E> GrpcQueryHandler for ServerGrpcQueryHandlerAdapter<E>
where
E: ErrorExt + Send + Sync + 'static,
{
Expand Down
6 changes: 3 additions & 3 deletions src/servers/src/query_handler/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,16 +64,16 @@ pub trait SqlQueryHandler {
) -> std::result::Result<bool, Self::Error>;
}

pub struct ServerSqlQueryHandlerAdaptor<E>(SqlQueryHandlerRef<E>);
pub struct ServerSqlQueryHandlerAdapter<E>(SqlQueryHandlerRef<E>);

impl<E> ServerSqlQueryHandlerAdaptor<E> {
impl<E> ServerSqlQueryHandlerAdapter<E> {
pub fn arc(handler: SqlQueryHandlerRef<E>) -> Arc<Self> {
Arc::new(Self(handler))
}
}

#[async_trait]
impl<E> SqlQueryHandler for ServerSqlQueryHandlerAdaptor<E>
impl<E> SqlQueryHandler for ServerSqlQueryHandlerAdapter<E>
where
E: ErrorExt + Send + Sync + 'static,
{
Expand Down
24 changes: 12 additions & 12 deletions tests-integration/src/test_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,8 @@ use servers::http::{HttpOptions, HttpServerBuilder};
use servers::metrics_handler::MetricsHandler;
use servers::mysql::server::{MysqlServer, MysqlSpawnConfig, MysqlSpawnRef};
use servers::postgres::PostgresServer;
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdaptor;
use servers::query_handler::sql::{ServerSqlQueryHandlerAdaptor, SqlQueryHandler};
use servers::query_handler::grpc::ServerGrpcQueryHandlerAdapter;
use servers::query_handler::sql::{ServerSqlQueryHandlerAdapter, SqlQueryHandler};
use servers::server::Server;
use servers::Mode;
use session::context::QueryContext;
Expand Down Expand Up @@ -378,8 +378,8 @@ pub async fn setup_test_http_app(store_type: StorageType, name: &str) -> (Router
..Default::default()
};
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(
instance.instance.clone(),
))
.with_metrics_handler(MetricsHandler)
Expand Down Expand Up @@ -412,8 +412,8 @@ pub async fn setup_test_http_app_with_frontend_and_user_provider(
let mut http_server = HttpServerBuilder::new(http_opts);

http_server
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(instance.instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(instance.instance.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(
instance.instance.clone(),
))
.with_script_handler(instance.instance.clone())
Expand Down Expand Up @@ -449,8 +449,8 @@ pub async fn setup_test_prom_app_with_frontend(
};
let frontend_ref = instance.instance.clone();
let http_server = HttpServerBuilder::new(http_opts)
.with_sql_handler(ServerSqlQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdaptor::arc(frontend_ref.clone()))
.with_sql_handler(ServerSqlQueryHandlerAdapter::arc(frontend_ref.clone()))
.with_grpc_handler(ServerGrpcQueryHandlerAdapter::arc(frontend_ref.clone()))
.with_script_handler(frontend_ref.clone())
.with_prom_handler(frontend_ref.clone())
.with_prometheus_handler(frontend_ref)
Expand Down Expand Up @@ -493,14 +493,14 @@ pub async fn setup_grpc_server_with(

let fe_instance_ref = instance.instance.clone();
let flight_handler = Arc::new(GreptimeRequestHandler::new(
ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone()),
ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone()),
user_provider.clone(),
runtime.clone(),
));

let fe_grpc_server = Arc::new(GrpcServer::new(
grpc_config,
Some(ServerGrpcQueryHandlerAdaptor::arc(fe_instance_ref.clone())),
Some(ServerGrpcQueryHandlerAdapter::arc(fe_instance_ref.clone())),
Some(fe_instance_ref.clone()),
Some(flight_handler),
None,
Expand Down Expand Up @@ -563,7 +563,7 @@ pub async fn setup_mysql_server_with_user_provider(
let fe_mysql_server = Arc::new(MysqlServer::create_server(
runtime,
Arc::new(MysqlSpawnRef::new(
ServerSqlQueryHandlerAdaptor::arc(fe_instance_ref),
ServerSqlQueryHandlerAdapter::arc(fe_instance_ref),
user_provider,
)),
Arc::new(MysqlSpawnConfig::new(
Expand Down Expand Up @@ -615,7 +615,7 @@ pub async fn setup_pg_server_with_user_provider(
..Default::default()
};
let fe_pg_server = Arc::new(Box::new(PostgresServer::new(
ServerSqlQueryHandlerAdaptor::arc(fe_instance_ref),
ServerSqlQueryHandlerAdapter::arc(fe_instance_ref),
opts.tls.clone(),
runtime,
user_provider,
Expand Down

0 comments on commit 8a74bd3

Please sign in to comment.