Skip to content

Commit

Permalink
fix some
Browse files Browse the repository at this point in the history
Signed-off-by: Bugen Zhao <[email protected]>
  • Loading branch information
BugenZhao committed Feb 23, 2024
1 parent 3bd9753 commit f7be887
Show file tree
Hide file tree
Showing 12 changed files with 33 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 5 additions & 5 deletions src/cmd/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::error::v2::AsReport as _;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
use risingwave_ctl::CliOpts as CtlOpts;
Expand Down Expand Up @@ -67,13 +68,12 @@ pub fn ctl(opts: CtlOpts) {
// Note: Use a simple current thread runtime for ctl.
// When there's a heavy workload, multiple thread runtime seems to respond slowly. May need
// further investigation.
tokio::runtime::Builder::new_current_thread()
if let Err(e) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
.unwrap()
.block_on(risingwave_ctl::start(opts))
.inspect_err(|e| {
eprintln!("{:#?}", e);
})
.unwrap();
{
eprintln!("Error: {:#?}", e.as_report());
}
}
1 change: 1 addition & 0 deletions src/cmd_all/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ workspace-hack = { path = "../workspace-hack" }
expect-test = "1"

[build-dependencies]
thiserror-ext = { workspace = true }
vergen = { version = "8", default-features = false, features = [
"build",
"git",
Expand Down
3 changes: 2 additions & 1 deletion src/cmd_all/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use thiserror_ext::AsReport;
use vergen::EmitBuilder;

fn main() {
if let Err(e) = EmitBuilder::builder().git_sha(true).fail_on_error().emit() {
// Leave the environment variable unset if error occurs.
println!("cargo:warning={}", e)
println!("cargo:warning={}", e.as_report())
}
}
6 changes: 3 additions & 3 deletions src/common/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -391,9 +391,9 @@ impl<'de> Deserialize<'de> for DefaultParallelism {
VirtualNode::COUNT
)))?
} else {
NonZeroUsize::new(i)
.context("default parallelism should be greater than 0")
.map_err(|e| serde::de::Error::custom(e.to_string()))?
NonZeroUsize::new(i).ok_or_else(|| {
serde::de::Error::custom("default parallelism should be greater than 0")
})?
})),
}
}
Expand Down
9 changes: 3 additions & 6 deletions src/connector/src/sink/nats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,9 @@ impl Sink for NatsSink {
"Nats sink only support append-only mode"
)));
}
let _client = self
.config
.common
.build_client()
.await
.context("validate nats sink error")?;
let _client = (self.config.common.build_client().await)
.context("validate nats sink error")
.map_err(SinkError::Nats)?;
Ok(())
}

Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/source/cdc/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

use std::str::FromStr;

use anyhow::anyhow;
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use futures_async_stream::try_stream;
use itertools::Itertools;
Expand Down Expand Up @@ -79,8 +79,8 @@ impl<T: CdcSourceTypeTrait> SplitReader for CdcSplitReader<T> {
if matches!(T::source_type(), CdcSourceType::Citus)
&& let Some(server_addr) = split.server_addr()
{
let host_addr = HostAddr::from_str(&server_addr)
.map_err(|err| anyhow!("invalid server address for cdc split. {}", err))?;
let host_addr =
HostAddr::from_str(&server_addr).context("invalid server address for cdc split")?;
properties.insert("hostname".to_string(), host_addr.host);
properties.insert("port".to_string(), host_addr.port.to_string());
// rewrite table name with suffix to capture all shards in the split
Expand Down Expand Up @@ -218,7 +218,7 @@ impl<T: CdcSourceTypeTrait> CommonSplitReader for CdcSplitReader<T> {
GLOBAL_ERROR_METRICS.cdc_source_error.report([
source_type.as_str_name().into(),
source_id.clone(),
e.to_string(),
e.to_report_string(),
]);
Err(e)?;
}
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/source/kafka/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::time::Duration;

use anyhow::{anyhow, Context as _};
use anyhow::{anyhow, Context};
use async_trait::async_trait;
use rdkafka::consumer::{BaseConsumer, Consumer};
use rdkafka::error::KafkaResult;
Expand Down Expand Up @@ -112,6 +112,7 @@ impl SplitEnumerator for KafkaSplitEnumerator {
self.broker_address
)
})?;

let watermarks = self.get_watermarks(topic_partitions.as_ref()).await?;
let mut start_offsets = self
.fetch_start_offset(topic_partitions.as_ref(), &watermarks)
Expand Down
3 changes: 3 additions & 0 deletions src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,3 +23,6 @@

pub mod anyhow;
pub mod tonic;

// Re-export the `thiserror-ext` crate.
pub use thiserror_ext::*;
5 changes: 3 additions & 2 deletions src/risedevtool/src/bin/risedev-dev.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use risedev::{
RISEDEV_SESSION_NAME,
};
use tempfile::tempdir;
use thiserror_ext::AsReport;
use yaml_rust::YamlEmitter;

#[derive(Default)]
Expand Down Expand Up @@ -444,9 +445,9 @@ fn main() -> Result<()> {
}
Err(err) => {
println!(
"{} - Failed to start: {:?}", // with `Caused by`
"{} - Failed to start: {:#}", // pretty with `Caused by`
style("ERROR").red().bold(),
err,
err.as_report(),
);
println!();
println!(
Expand Down
5 changes: 3 additions & 2 deletions src/risedevtool/src/preflight_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::process::Command;

use anyhow::Result;
use console::style;
use thiserror_ext::AsReport;

fn preflight_check_proxy() -> Result<()> {
if env::var("http_proxy").is_ok()
Expand Down Expand Up @@ -72,7 +73,7 @@ pub fn preflight_check() -> Result<()> {
"[{}] {} - failed to run proxy preflight check: {}",
style("risedev-preflight-check").bold(),
style("WARN").yellow().bold(),
e
e.as_report()
);
}

Expand All @@ -81,7 +82,7 @@ pub fn preflight_check() -> Result<()> {
"[{}] {} - failed to run ulimit preflight check: {}",
style("risedev-preflight-check").bold(),
style("WARN").yellow().bold(),
e
e.as_report()
);
}

Expand Down
3 changes: 3 additions & 0 deletions src/tests/regress/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#![deny(rustdoc::broken_intra_doc_links)]
#![feature(path_file_prefix)]
#![feature(let_chains)]
#![feature(register_tool)]
#![register_tool(rw)]
#![allow(rw::format_error)]

mod opts;

Expand Down

0 comments on commit f7be887

Please sign in to comment.