Skip to content

Commit

Permalink
fix thiserror/anyhow, fmt, some clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Sep 9, 2023
1 parent 5ab88a8 commit ab8c61c
Show file tree
Hide file tree
Showing 9 changed files with 27 additions and 27 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/connector/build.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ fn main() {

println!("cargo:rerun-if-changed={}", proto_dir);

let proto_files = vec!["recursive"];
let proto_files = ["recursive"];
let protos: Vec<String> = proto_files
.iter()
.map(|f| format!("{}/{}.proto", proto_dir, f))
Expand Down
12 changes: 9 additions & 3 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,10 +185,16 @@ impl Binder {
}
};

let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction { args: lambda_args, body: lambda_body }) = lambda.get_expr() else {
let ast::FunctionArgExpr::Expr(ast::Expr::LambdaFunction {
args: lambda_args,
body: lambda_body,
}) = lambda.get_expr()
else {
return Err(ErrorCode::BindError(
"The `lambda` argument for `array_transform` should be a lambda function".to_string()
).into());
"The `lambda` argument for `array_transform` should be a lambda function"
.to_string(),
)
.into());
};

let [lambda_arg] = <[Ident; 1]>::try_from(lambda_args).map_err(|args| -> RwError {
Expand Down
1 change: 0 additions & 1 deletion src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#![feature(error_generic_member_access)]
#![feature(provide_any)]
#![feature(lazy_cell)]
#![feature(once_cell_try)]
#![feature(type_alias_impl_trait)]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ impl HummockEventHandler {
UploaderEvent::ImmMerged(merge_output) => {
// update read version for corresponding table shards
let read_guard = self.read_version_mapping.read();
read_guard.get(&merge_output.table_id).map_or((), |shards| {
if let Some(shards) = read_guard.get(&merge_output.table_id) {
shards.get(&merge_output.instance_id).map_or_else(
|| {
warn!(
Expand All @@ -525,7 +525,7 @@ impl HummockEventHandler {
));
},
)
});
}
}
}
}
Expand Down
3 changes: 1 addition & 2 deletions src/storage/src/monitor/traced_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,7 @@ impl<S> TracedStateStore<S> {
}
}

type TracedStateStoreIterStream<S: StateStoreIterItemStream> =
impl StateStoreIterItemStream;
type TracedStateStoreIterStream<S: StateStoreIterItemStream> = impl StateStoreIterItemStream;

impl<S: LocalStateStore> LocalStateStore for TracedStateStore<S> {
type IterStream<'a> = impl StateStoreIterItemStream + 'a;
Expand Down
16 changes: 6 additions & 10 deletions src/stream/src/executor/over_window/general.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,15 +307,15 @@ impl<S: StateStore> OverWindowExecutor<S> {
match record {
Record::Insert { new_row } => {
let part_key = this.get_partition_key(new_row).into();
let part_delta = deltas.entry(part_key).or_insert(PartitionDelta::new());
let part_delta = deltas.entry(part_key).or_default();
part_delta.insert(
this.row_to_cache_key(new_row)?,
Change::Insert(new_row.into_owned_row()),
);
}
Record::Delete { old_row } => {
let part_key = this.get_partition_key(old_row).into();
let part_delta = deltas.entry(part_key).or_insert(PartitionDelta::new());
let part_delta = deltas.entry(part_key).or_default();
part_delta.insert(this.row_to_cache_key(old_row)?, Change::Delete);
}
Record::Update { old_row, new_row } => {
Expand All @@ -325,27 +325,23 @@ impl<S: StateStore> OverWindowExecutor<S> {
let new_state_key = this.row_to_cache_key(new_row)?;
if old_part_key == new_part_key && old_state_key == new_state_key {
// not a key-change update
let part_delta =
deltas.entry(old_part_key).or_insert(PartitionDelta::new());
let part_delta = deltas.entry(old_part_key).or_default();
part_delta.insert(old_state_key, Change::Insert(new_row.into_owned_row()));
} else if old_part_key == new_part_key {
// order-change update, split into delete + insert, will be merged after
// building changes
key_change_updated_pks.insert(this.get_input_pk(old_row));
let part_delta =
deltas.entry(old_part_key).or_insert(PartitionDelta::new());
let part_delta = deltas.entry(old_part_key).or_default();
part_delta.insert(old_state_key, Change::Delete);
part_delta.insert(new_state_key, Change::Insert(new_row.into_owned_row()));
} else {
// partition-change update, split into delete + insert
// NOTE(rc): Since we append partition key to logical pk, we can't merge the
// delete + insert back to update later.
// TODO: IMO this behavior is problematic. Deep discussion is needed.
let old_part_delta =
deltas.entry(old_part_key).or_insert(PartitionDelta::new());
let old_part_delta = deltas.entry(old_part_key).or_default();
old_part_delta.insert(old_state_key, Change::Delete);
let new_part_delta =
deltas.entry(new_part_key).or_insert(PartitionDelta::new());
let new_part_delta = deltas.entry(new_part_key).or_default();
new_part_delta
.insert(new_state_key, Change::Insert(new_row.into_owned_row()));
}
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/over_window/over_partition.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1083,7 +1083,7 @@ mod find_affected_ranges_tests {
) {
result
.into_iter()
.zip_eq(expected.into_iter())
.zip_eq(expected)
.for_each(|(result, expected)| {
assert_eq!(
result.0.as_normal_expect().pk.0,
Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/source/source_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ impl<S: StateStore> SourceExecutor<S> {
// fetch the newest offset, either it's in cache (before barrier)
// or in state table (just after barrier)
let target_state = if core.state_cache.is_empty() {
for ele in split_info.iter_mut() {
for ele in &mut *split_info {
if let Some(recover_state) = core
.split_state_store
.try_recover_from_state_store(ele)
Expand Down

0 comments on commit ab8c61c

Please sign in to comment.