Skip to content

Commit

Permalink
refactor some
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Nov 22, 2023
1 parent 8e88b8d commit b1c0b3e
Show file tree
Hide file tree
Showing 23 changed files with 139 additions and 138 deletions.
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions src/common/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ strum = "0.25"
strum_macros = "0.25"
sysinfo = { version = "0.29", default-features = false }
thiserror = "1"
thiserror-ext = { workspace = true }
tinyvec = { version = "1", features = ["rustc_1_55", "grab_spare_slice"] }
tokio = { version = "0.2", package = "madsim-tokio", features = [
"rt",
Expand Down
2 changes: 2 additions & 0 deletions src/common/common_service/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ prometheus = { version = "0.13" }
risingwave_common = { workspace = true }
risingwave_pb = { workspace = true }
risingwave_rpc_client = { workspace = true }
thiserror = "1"
thiserror-ext = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio", features = ["rt", "rt-multi-thread", "sync", "macros", "time", "signal"] }
tonic = { workspace = true }
tower = { version = "0.4", features = ["util", "load-shed"] }
Expand Down
1 change: 1 addition & 0 deletions src/common/common_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#![feature(lint_reasons)]
#![feature(impl_trait_in_assoc_type)]
#![feature(error_generic_member_access)]

pub mod metrics_manager;
pub mod observer_manager;
Expand Down
46 changes: 32 additions & 14 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@

use std::time::Duration;

use risingwave_common::bail;
use risingwave_common::error::Result;
use risingwave_pb::meta::subscribe_response::Info;
use risingwave_pb::meta::{SubscribeResponse, SubscribeType};
use risingwave_rpc_client::error::RpcError;
Expand Down Expand Up @@ -80,6 +78,25 @@ impl<S: ObserverState> ObserverManager<RpcNotificationClient, S> {
}
}

#[derive(thiserror::Error, Debug)]
pub enum ObserverError {
#[error("notification channel closed")]
ChannelClosed,

#[error(transparent)]
Rpc(
#[from]
#[backtrace]
RpcError,
),
}

impl From<tonic::Status> for ObserverError {
fn from(value: tonic::Status) -> Self {
Self::Rpc(value.into())
}
}

impl<T, S> ObserverManager<T, S>
where
T: NotificationClient,
Expand All @@ -97,24 +114,19 @@ where
}
}

async fn wait_init_notification(&mut self) -> Result<()> {
async fn wait_init_notification(&mut self) -> Result<(), ObserverError> {
let mut notification_vec = Vec::new();
let init_notification = loop {
// notification before init notification must be received successfully.
match self.rx.message().await {
Ok(Some(notification)) => {
match self.rx.message().await? {
Some(notification) => {
if !matches!(notification.info.as_ref().unwrap(), &Info::Snapshot(_)) {
notification_vec.push(notification);
} else {
break notification;
}
}
Ok(None) => {
bail!("notification channel from meta is closed");
}
Err(err) => {
bail!("receives meta's notification err: {:?}", err);
}
None => return Err(ObserverError::ChannelClosed),
}
};

Expand Down Expand Up @@ -231,7 +243,10 @@ impl<T: Send + 'static> Channel for Streaming<T> {
#[async_trait::async_trait]
pub trait NotificationClient: Send + Sync + 'static {
type Channel: Channel<Item = SubscribeResponse>;
async fn subscribe(&self, subscribe_type: SubscribeType) -> Result<Self::Channel>;
async fn subscribe(
&self,
subscribe_type: SubscribeType,
) -> Result<Self::Channel, ObserverError>;
}

pub struct RpcNotificationClient {
Expand All @@ -248,10 +263,13 @@ impl RpcNotificationClient {
impl NotificationClient for RpcNotificationClient {
type Channel = Streaming<SubscribeResponse>;

async fn subscribe(&self, subscribe_type: SubscribeType) -> Result<Self::Channel> {
async fn subscribe(
&self,
subscribe_type: SubscribeType,
) -> Result<Self::Channel, ObserverError> {
self.meta_client
.subscribe(subscribe_type)
.await
.map_err(RpcError::into)
.map_err(Into::into)
}
}
2 changes: 2 additions & 0 deletions src/common/heap_profiling/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ chrono = { version = "0.4", default-features = false, features = [
] }
parking_lot = "0.12"
risingwave_common = { workspace = true }
thiserror = "1"
thiserror-ext = { workspace = true }
tikv-jemalloc-ctl = { workspace = true }
tokio = { version = "0.2", package = "madsim-tokio" }
tracing = "0.1"
Expand Down
45 changes: 26 additions & 19 deletions src/common/heap_profiling/src/jeprof.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,24 @@

use std::path::Path;
use std::process::Command;
use std::result::Result;
use std::{env, fs};

use anyhow::anyhow;
use risingwave_common::error::Result;
#[derive(thiserror::Error, Debug, thiserror_ext::ContextInto)]
pub enum JeprofError {
#[error(transparent)]
IoError(#[from] std::io::Error),

pub async fn run(profile_path: String, collapsed_path: String) -> Result<()> {
#[error("jeprof exit with an error (stdout: {stdout}, stderr: {stderr}): {inner}")]
ExitError {
#[source]
inner: std::process::ExitStatusError,
stdout: String,
stderr: String,
},
}

pub async fn run(profile_path: String, collapsed_path: String) -> Result<(), JeprofError> {
let executable_path = env::current_exe()?;

let prof_cmd = move || {
Expand All @@ -29,20 +41,15 @@ pub async fn run(profile_path: String, collapsed_path: String) -> Result<()> {
.arg(Path::new(&profile_path))
.output()
};
match tokio::task::spawn_blocking(prof_cmd).await.unwrap() {
Ok(output) => {
if output.status.success() {
fs::write(Path::new(&collapsed_path), &output.stdout)?;
Ok(())
} else {
Err(anyhow!(
"jeprof exit with an error. stdout: {}, stderr: {}",
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr)
)
.into())
}
}
Err(e) => Err(e.into()),
}

let output = tokio::task::spawn_blocking(prof_cmd).await.unwrap()?;

output.status.exit_ok().into_exit_error(
String::from_utf8_lossy(&output.stdout),
String::from_utf8_lossy(&output.stderr),
)?;

fs::write(Path::new(&collapsed_path), &output.stdout)?;

Ok(())
}
2 changes: 2 additions & 0 deletions src/common/heap_profiling/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(exit_status_error)]

pub const MANUALLY_DUMP_SUFFIX: &str = "manual.heap";
pub const AUTO_DUMP_SUFFIX: &str = "auto.heap";
pub const COLLAPSED_SUFFIX: &str = "collapsed";
Expand Down
22 changes: 7 additions & 15 deletions src/common/src/array/proto_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -256,13 +256,12 @@ mod tests {
DecimalArray, DecimalArrayBuilder, I32Array, I32ArrayBuilder, TimeArray, TimeArrayBuilder,
TimestampArray, TimestampArrayBuilder, Utf8Array, Utf8ArrayBuilder,
};
use crate::error::Result;
use crate::types::{Date, Decimal, Time, Timestamp};

// Convert a column to protobuf, then convert it back to column, and ensures the two are
// identical.
#[test]
fn test_column_protobuf_conversion() -> Result<()> {
fn test_column_protobuf_conversion() {
let cardinality = 2048;
let mut builder = I32ArrayBuilder::new(cardinality);
for i in 0..cardinality {
Expand All @@ -283,11 +282,10 @@ mod tests {
assert!(x.is_none());
}
});
Ok(())
}

#[test]
fn test_bool_column_protobuf_conversion() -> Result<()> {
fn test_bool_column_protobuf_conversion() {
let cardinality = 2048;
let mut builder = BoolArrayBuilder::new(cardinality);
for i in 0..cardinality {
Expand All @@ -306,11 +304,10 @@ mod tests {
1 => assert_eq!(Some(true), x),
_ => assert_eq!(None, x),
});
Ok(())
}

#[test]
fn test_utf8_column_conversion() -> Result<()> {
fn test_utf8_column_conversion() {
let cardinality = 2048;
let mut builder = Utf8ArrayBuilder::new(cardinality);
for i in 0..cardinality {
Expand All @@ -330,11 +327,10 @@ mod tests {
assert!(x.is_none());
}
});
Ok(())
}

#[test]
fn test_decimal_protobuf_conversion() -> Result<()> {
fn test_decimal_protobuf_conversion() {
let cardinality = 2048;
let mut builder = DecimalArrayBuilder::new(cardinality);
for i in 0..cardinality {
Expand All @@ -355,11 +351,10 @@ mod tests {
assert!(x.is_none());
}
});
Ok(())
}

#[test]
fn test_date_protobuf_conversion() -> Result<()> {
fn test_date_protobuf_conversion() {
let cardinality = 2048;
let mut builder = DateArrayBuilder::new(cardinality);
for i in 0..cardinality {
Expand All @@ -380,11 +375,10 @@ mod tests {
assert!(x.is_none());
}
});
Ok(())
}

#[test]
fn test_time_protobuf_conversion() -> Result<()> {
fn test_time_protobuf_conversion() {
let cardinality = 2048;
let mut builder = TimeArrayBuilder::new(cardinality);
for i in 0..cardinality {
Expand All @@ -410,11 +404,10 @@ mod tests {
assert!(x.is_none());
}
});
Ok(())
}

#[test]
fn test_timestamp_protobuf_conversion() -> Result<()> {
fn test_timestamp_protobuf_conversion() {
let cardinality = 2048;
let mut builder = TimestampArrayBuilder::new(cardinality);
for i in 0..cardinality {
Expand All @@ -440,6 +433,5 @@ mod tests {
assert!(x.is_none());
}
});
Ok(())
}
}
6 changes: 3 additions & 3 deletions src/common/src/types/datetime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -318,7 +318,7 @@ impl ToText for Timestamp {
}

impl ToBinary for Date {
fn to_binary_with_type(&self, ty: &DataType) -> crate::error::Result<Option<Bytes>> {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
super::DataType::Date => {
let mut output = BytesMut::new();
Expand All @@ -331,7 +331,7 @@ impl ToBinary for Date {
}

impl ToBinary for Time {
fn to_binary_with_type(&self, ty: &DataType) -> crate::error::Result<Option<Bytes>> {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
super::DataType::Time => {
let mut output = BytesMut::new();
Expand All @@ -344,7 +344,7 @@ impl ToBinary for Time {
}

impl ToBinary for Timestamp {
fn to_binary_with_type(&self, ty: &DataType) -> crate::error::Result<Option<Bytes>> {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
super::DataType::Timestamp => {
let mut output = BytesMut::new();
Expand Down
3 changes: 1 addition & 2 deletions src/common/src/types/decimal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ use super::to_binary::ToBinary;
use super::to_text::ToText;
use super::DataType;
use crate::array::ArrayResult;
use crate::error::Result as RwResult;
use crate::estimate_size::EstimateSize;
use crate::types::ordered_float::OrderedFloat;
use crate::types::Decimal::Normalized;
Expand Down Expand Up @@ -82,7 +81,7 @@ impl Decimal {
}

impl ToBinary for Decimal {
fn to_binary_with_type(&self, ty: &DataType) -> RwResult<Option<Bytes>> {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
DataType::Decimal => {
let mut output = BytesMut::new();
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1161,7 +1161,7 @@ impl<'a> FromSql<'a> for Interval {
}

impl ToBinary for Interval {
fn to_binary_with_type(&self, ty: &DataType) -> Result<Option<Bytes>> {
fn to_binary_with_type(&self, ty: &DataType) -> super::to_binary::Result<Option<Bytes>> {
match ty {
DataType::Interval => {
let mut output = BytesMut::new();
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/jsonb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl crate::types::to_binary::ToBinary for JsonbRef<'_> {
fn to_binary_with_type(
&self,
_ty: &crate::types::DataType,
) -> crate::error::Result<Option<bytes::Bytes>> {
) -> super::to_binary::Result<Option<bytes::Bytes>> {
Ok(Some(self.value_serialize().into()))
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/common/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1047,7 +1047,7 @@ pub fn hash_datum(datum: impl ToDatumRef, state: &mut impl std::hash::Hasher) {
impl ScalarRefImpl<'_> {
/// Encode the scalar to postgresql binary format.
/// The encoder implements encoding using <https://docs.rs/postgres-types/0.2.3/postgres_types/trait.ToSql.html>
pub fn binary_format(&self, data_type: &DataType) -> RwResult<Bytes> {
pub fn binary_format(&self, data_type: &DataType) -> to_binary::Result<Bytes> {
self.to_binary_with_type(data_type).transpose().unwrap()
}

Expand Down
Loading

0 comments on commit b1c0b3e

Please sign in to comment.