Skip to content

Commit

Permalink
chore: cargo +nightly fmt (#13162)
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Oct 31, 2023
1 parent b1e189c commit c583e2c
Show file tree
Hide file tree
Showing 103 changed files with 793 additions and 465 deletions.
18 changes: 12 additions & 6 deletions src/batch/src/executor/project_set.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,15 @@ impl ProjectSetExecutor {
// for each column
for (item, value) in results.iter_mut().zip_eq_fast(&mut row[1..]) {
*value = match item {
Either::Left(state) => if let Some((i, value)) = state.peek() && i == row_idx {
valid = true;
value
} else {
None
Either::Left(state) => {
if let Some((i, value)) = state.peek()
&& i == row_idx
{
valid = true;
value
} else {
None
}
}
Either::Right(array) => array.value_at(row_idx),
};
Expand All @@ -110,7 +114,9 @@ impl ProjectSetExecutor {
}
// move to the next row
for item in &mut results {
if let Either::Left(state) = item && matches!(state.peek(), Some((i, _)) if i == row_idx) {
if let Either::Left(state) = item
&& matches!(state.peek(), Some((i, _)) if i == row_idx)
{
state.next().await?;
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/batch/src/executor/top_n.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,9 @@ impl TopNHeap {
let mut ties_with_peek = vec![];
// pop all the ties with peek
ties_with_peek.push(self.heap.pop().unwrap());
while let Some(e) = self.heap.peek() && e.encoded_row == peek.encoded_row {
while let Some(e) = self.heap.peek()
&& e.encoded_row == peek.encoded_row
{
ties_with_peek.push(self.heap.pop().unwrap());
}
self.heap.push(elem);
Expand Down
8 changes: 6 additions & 2 deletions src/common/src/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -757,7 +757,9 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
shard.release(handle)
};
// do not deallocate data with holding mutex.
if let Some((key, value)) = data && let Some(listener) = &self.listener {
if let Some((key, value)) = data
&& let Some(listener) = &self.listener
{
listener.on_release(key, value);
}
}
Expand Down Expand Up @@ -819,7 +821,9 @@ impl<K: LruKey, T: LruValue> LruCache<K, T> {
shard.erase(hash, key)
};
// do not deallocate data with holding mutex.
if let Some((key, value)) = data && let Some(listener) = &self.listener {
if let Some((key, value)) = data
&& let Some(listener) = &self.listener
{
listener.on_release(key, value);
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/common/src/types/interval.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1386,15 +1386,19 @@ impl Interval {
fn parse_postgres(s: &str) -> Result<Self> {
use DateTimeField::*;
let mut tokens = parse_interval(s)?;
if tokens.len() % 2 != 0 && let Some(TimeStrToken::Num(_)) = tokens.last() {
if tokens.len() % 2 != 0
&& let Some(TimeStrToken::Num(_)) = tokens.last()
{
tokens.push(TimeStrToken::TimeUnit(DateTimeField::Second));
}
if tokens.len() % 2 != 0 {
return Err(ErrorCode::InvalidInputSyntax(format!("Invalid interval {}.", &s)).into());
}
let mut token_iter = tokens.into_iter();
let mut result = Interval::from_month_day_usec(0, 0, 0);
while let Some(num) = token_iter.next() && let Some(interval_unit) = token_iter.next() {
while let Some(num) = token_iter.next()
&& let Some(interval_unit) = token_iter.next()
{
match (num, interval_unit) {
(TimeStrToken::Num(num), TimeStrToken::TimeUnit(interval_unit)) => {
result = (|| match interval_unit {
Expand Down
6 changes: 4 additions & 2 deletions src/common/src/util/column_index_mapping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,10 @@ impl ColIndexMapping {
return false;
}
for (src, tar) in self.map.iter().enumerate() {
if let Some(tar_value) = tar && src == *tar_value {
continue
if let Some(tar_value) = tar
&& src == *tar_value
{
continue;
} else {
return false;
}
Expand Down
6 changes: 4 additions & 2 deletions src/compute/tests/cdc_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,11 @@ async fn consume_message_stream(mut stream: BoxedMessageStream) -> StreamResult<
println!("[mv] chunk: {:#?}", c);
}
Message::Barrier(b) => {
if let Some(m) = b.mutation && matches!(*m, Mutation::Stop(_)) {
if let Some(m) = b.mutation
&& matches!(*m, Mutation::Stop(_))
{
println!("encounter stop barrier");
break
break;
}
}
}
Expand Down
19 changes: 13 additions & 6 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -169,12 +169,19 @@ impl MessageMeta<'_> {
SourceColumnType::Offset => Datum::Some(self.offset.into()).into(),
// Extract custom meta data per connector.
SourceColumnType::Meta if let SourceMeta::Kafka(kafka_meta) = self.meta => {
assert_eq!(desc.name.as_str(), KAFKA_TIMESTAMP_COLUMN_NAME, "unexpected meta column name");
kafka_meta.timestamp.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
}).into()
assert_eq!(
desc.name.as_str(),
KAFKA_TIMESTAMP_COLUMN_NAME,
"unexpected meta column name"
);
kafka_meta
.timestamp
.map(|ts| {
risingwave_common::cast::i64_to_timestamptz(ts)
.unwrap()
.to_scalar_value()
})
.into()
}

// For other cases, return `None`.
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/mysql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ mod tests {
});
pin_mut!(row_stream);
while let Some(row) = row_stream.next().await {
if let Ok(ro) = row && ro.is_some() {
if let Ok(ro) = row
&& ro.is_some()
{
let owned_row = ro.unwrap();
let d = owned_row.datum_at(2);
if let Some(scalar) = d {
Expand Down
62 changes: 28 additions & 34 deletions src/connector/src/parser/unified/debezium.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,42 +145,36 @@ pub fn extract_bson_id(id_type: &DataType, bson_doc: &serde_json::Value) -> anyh
.get("_id")
.ok_or_else(|| anyhow::format_err!("Debezuim Mongo requires document has a `_id` field"))?;
let id: Datum = match id_type {
DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
DataType::Varchar => match id_field {
serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
obj["$oid"].as_str().to_owned().unwrap_or_default().into(),
)),
_ => anyhow::bail!(
"Can not convert bson {:?} to {:?}",
id_field, id_type
),
},
DataType::Int32 => {
if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberInt") {
let int_str = obj["$numberInt"].as_str().unwrap_or_default();
Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
} else {
anyhow::bail!(
"Can not convert bson {:?} to {:?}",
id_field, id_type
)
DataType::Jsonb => ScalarImpl::Jsonb(id_field.clone().into()).into(),
DataType::Varchar => match id_field {
serde_json::Value::String(s) => Some(ScalarImpl::Utf8(s.clone().into())),
serde_json::Value::Object(obj) if obj.contains_key("$oid") => Some(ScalarImpl::Utf8(
obj["$oid"].as_str().to_owned().unwrap_or_default().into(),
)),
_ => anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type),
},
DataType::Int32 => {
if let serde_json::Value::Object(ref obj) = id_field
&& obj.contains_key("$numberInt")
{
let int_str = obj["$numberInt"].as_str().unwrap_or_default();
Some(ScalarImpl::Int32(int_str.parse().unwrap_or_default()))
} else {
anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type)
}
}
}
DataType::Int64 => {
if let serde_json::Value::Object(ref obj) = id_field && obj.contains_key("$numberLong")
{
let int_str = obj["$numberLong"].as_str().unwrap_or_default();
Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
} else {
anyhow::bail!(
"Can not convert bson {:?} to {:?}",
id_field, id_type
)
DataType::Int64 => {
if let serde_json::Value::Object(ref obj) = id_field
&& obj.contains_key("$numberLong")
{
let int_str = obj["$numberLong"].as_str().unwrap_or_default();
Some(ScalarImpl::Int64(int_str.parse().unwrap_or_default()))
} else {
anyhow::bail!("Can not convert bson {:?} to {:?}", id_field, id_type)
}
}
}
_ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
};
_ => unreachable!("DebeziumMongoJsonParser::new must ensure _id column datatypes."),
};
Ok(id)
}
impl<A> MongoProjection<A> {
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/unified/upsert.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,9 @@ where
other => return other,
};

if let Some(key_as_column_name) = &self.key_as_column_name && name == key_as_column_name {
if let Some(key_as_column_name) = &self.key_as_column_name
&& name == key_as_column_name
{
return self.access(&["key"], Some(type_expected));
}

Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/parser/upsert_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,9 @@ impl UpsertParser {
row_op = row_op.with_key(self.key_builder.generate_accessor(data).await?);
}
// Empty payload of kafka is Some(vec![])
if let Some(data) = payload && !data.is_empty() {
if let Some(data) = payload
&& !data.is_empty()
{
row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?);
change_event_op = ChangeEventOperation::Upsert;
}
Expand Down
11 changes: 8 additions & 3 deletions src/connector/src/sink/encoder/template.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,14 @@ impl TemplateEncoder {
));
}
for capture in re.captures_iter(format) {
if let Some(inner_content) = capture.get(1) && !set.contains(inner_content.as_str()){
return Err(SinkError::Redis(format!("Can't find field({:?}) in key_format or value_format",inner_content.as_str())))
}
if let Some(inner_content) = capture.get(1)
&& !set.contains(inner_content.as_str())
{
return Err(SinkError::Redis(format!(
"Can't find field({:?}) in key_format or value_format",
inner_content.as_str()
)));
}
}
Ok(())
}
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/sink/log_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,9 @@ impl<'a, F: TryFuture<Ok = ()> + Unpin + 'static> DeliveryFutureManagerAddFuture

pub async fn await_one_delivery(&mut self) -> Result<(), F::Error> {
for (_, item) in &mut self.0.items {
if let DeliveryFutureManagerItem::Chunk {futures, ..} = item && let Some(mut delivery_future) = futures.pop_front() {
if let DeliveryFutureManagerItem::Chunk { futures, .. } = item
&& let Some(mut delivery_future) = futures.pop_front()
{
self.0.future_count -= 1;
return poll_fn(|cx| delivery_future.try_poll_unpin(cx)).await;
} else {
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/external.rs
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,9 @@ impl ExternalTableReader for MySqlExternalTableReader {

impl MySqlExternalTableReader {
pub fn new(properties: HashMap<String, String>, rw_schema: Schema) -> ConnectorResult<Self> {
if let Some(field) = rw_schema.fields.last() && field.name.as_str() != OFFSET_COLUMN_NAME {
if let Some(field) = rw_schema.fields.last()
&& field.name.as_str() != OFFSET_COLUMN_NAME
{
return Err(ConnectorError::Config(anyhow!(
"last column of schema must be `_rw_offset`"
)));
Expand Down
4 changes: 3 additions & 1 deletion src/connector/src/source/google_pubsub/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,9 @@ impl CommonSplitReader for PubsubSplitReader {
yield chunk;

// Stop if we've approached the stop_offset
if let Some(stop_offset) = self.stop_offset && latest_offset >= stop_offset {
if let Some(stop_offset) = self.stop_offset
&& latest_offset >= stop_offset
{
return Ok(());
}
}
Expand Down
8 changes: 6 additions & 2 deletions src/connector/src/source/kafka/private_link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,9 @@ impl PrivateLinkConsumerContext {
impl ClientContext for PrivateLinkConsumerContext {
/// this func serves as a callback when `poll` is completed.
fn stats(&self, statistics: Statistics) {
if let Some(metrics) = &self.metrics && let Some(id) = &self.identifier {
if let Some(metrics) = &self.metrics
&& let Some(id) = &self.identifier
{
metrics.report(id.as_str(), &statistics);
}
}
Expand Down Expand Up @@ -160,7 +162,9 @@ impl PrivateLinkProducerContext {

impl ClientContext for PrivateLinkProducerContext {
fn stats(&self, statistics: Statistics) {
if let Some(metrics) = &self.metrics && let Some(id) = &self.identifier {
if let Some(metrics) = &self.metrics
&& let Some(id) = &self.identifier
{
metrics.report(id.as_str(), &statistics);
}
}
Expand Down
4 changes: 3 additions & 1 deletion src/ctl/src/cmd_impl/meta/reschedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,9 @@ pub async fn unregister_workers(
.ok()
.or_else(|| worker_index_by_host.get(&worker).cloned());

if let Some(worker_id) = worker_id && worker_ids.contains(&worker_id) {
if let Some(worker_id) = worker_id
&& worker_ids.contains(&worker_id)
{
if !target_worker_ids.insert(worker_id) {
println!("Warn: {} and {} are the same worker", worker, worker_id);
}
Expand Down
4 changes: 3 additions & 1 deletion src/ctl/src/cmd_impl/meta/serving.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,9 @@ pub async fn list_serving_fragment_mappings(context: &CtlContext) -> anyhow::Res
)
.into(),
);
if let Some(w) = worker && let Some(addr) = w.host.as_ref() {
if let Some(w) = worker
&& let Some(addr) = w.host.as_ref()
{
row.add_cell(format!("id: {}; {}:{}", w.id, addr.host, addr.port).into());
} else {
row.add_cell("".into());
Expand Down
4 changes: 3 additions & 1 deletion src/ctl/src/cmd_impl/scale/resize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -393,7 +393,9 @@ pub async fn update_schedulability(
.ok()
.or_else(|| worker_index_by_host.get(&worker).cloned());

if let Some(worker_id) = worker_id && worker_ids.contains(&worker_id){
if let Some(worker_id) = worker_id
&& worker_ids.contains(&worker_id)
{
if !target_worker_ids.insert(worker_id) {
println!("Warn: {} and {} are the same worker", worker, worker_id);
}
Expand Down
4 changes: 3 additions & 1 deletion src/expr/core/src/expr/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,9 @@ pub(crate) fn lexer(input: &str) -> Vec<Token> {
':' => Token::Colon,
'$' => {
let mut number = String::new();
while let Some(c) = chars.peek() && c.is_ascii_digit() {
while let Some(c) = chars.peek()
&& c.is_ascii_digit()
{
number.push(chars.next().unwrap());
}
let index = number.parse::<usize>().expect("Invalid number");
Expand Down
4 changes: 3 additions & 1 deletion src/expr/core/src/expr/expr_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,9 @@ impl Expression for CoalesceExpression {
}
let mut builder = self.return_type.create_array_builder(len);
for (i, sel) in selection.iter().enumerate() {
if init_vis.is_set(i) && let Some(child_idx) = sel {
if init_vis.is_set(i)
&& let Some(child_idx) = sel
{
builder.append(children_array[*child_idx].value_at(i));
} else {
builder.append_null()
Expand Down
Loading

0 comments on commit c583e2c

Please sign in to comment.