Skip to content

Commit

Permalink
Merge branch 'main' into xxh/commit_checkpoint_interval_sink_decouple
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs authored Sep 11, 2024
2 parents 65536d8 + 6038298 commit 849a4cc
Show file tree
Hide file tree
Showing 63 changed files with 403 additions and 281 deletions.
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240812
export BUILD_ENV_VERSION=v20240911

export BUILD_TAG="public.ecr.aws/w1p7b4n3/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911
depends_on:
- mysql
- sqlserver-server
Expand All @@ -85,7 +85,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911
depends_on:
- mysql
- db
Expand All @@ -108,12 +108,12 @@ services:


rw-build-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -124,7 +124,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240812
image: public.ecr.aws/w1p7b4n3/rw-build-env:v20240911
depends_on:
db:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion ci/rust-toolchain
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,4 @@
# 3. (optional) **follow the instructions in lints/README.md** to update the toolchain and dependencies for lints

[toolchain]
channel = "nightly-2024-06-06"
channel = "nightly-2024-07-19"
23 changes: 21 additions & 2 deletions lints/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion lints/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ path = "ui/format_error.rs"
# See `README.md` before bumping the version.
# Remember to update the version in `ci/Dockerfile` as well.
[dependencies]
clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "5e2a7c6adebdb0478ee6d5b67ab4ee94153b2997" }
clippy_utils = { git = "https://github.com/risingwavelabs/clippy", rev = "61e1d2fd7062e46ccf1237707ee6da5aac018f70" }
dylint_linting = "3.1.0"
itertools = "0.12"

Expand Down
2 changes: 1 addition & 1 deletion lints/rust-toolchain
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# See `README.md` before bumping the version.

[toolchain]
channel = "nightly-2024-06-06"
channel = "nightly-2024-07-19"
components = ["llvm-tools-preview", "rustc-dev"]
5 changes: 2 additions & 3 deletions src/batch/src/executor/join/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,9 +162,8 @@ impl<'a> Iterator for RowIdIter<'a> {
type Item = RowId;

fn next(&mut self) -> Option<Self::Item> {
self.current_row_id.map(|row_id| {
self.current_row_id = self.next_row_id[row_id];
row_id
self.current_row_id.inspect(|row_id| {
self.current_row_id = self.next_row_id[*row_id];
})
}
}
Expand Down
1 change: 0 additions & 1 deletion src/batch/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
#![feature(coroutines)]
#![feature(proc_macro_hygiene, stmt_expr_attributes)]
#![feature(iterator_try_collect)]
#![feature(lint_reasons)]
#![feature(is_sorted)]
#![recursion_limit = "256"]
#![feature(let_chains)]
Expand Down
2 changes: 1 addition & 1 deletion src/common/benches/bench_data_chunk_encoding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ fn bench_data_chunk_encoding(c: &mut Criterion) {
for null_ratio in NULL_RATIOS {
for chunk_size in CHUNK_SIZES {
let chunk = rand_chunk::gen_chunk(&case.data_types, *chunk_size, SEED, *null_ratio);
let mut group = c.benchmark_group(&format!(
let mut group = c.benchmark_group(format!(
"data chunk encoding: {}, {} rows, Pr[null]={}",
case.name, chunk_size, null_ratio
));
Expand Down
2 changes: 0 additions & 2 deletions src/common/benches/bench_sequencer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]

use std::cell::RefCell;
use std::hint::black_box;
use std::sync::atomic::{AtomicUsize, Ordering};
Expand Down
1 change: 0 additions & 1 deletion src/common/common_service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

// This is a stub lib.rs.

#![feature(lint_reasons)]
#![feature(impl_trait_in_assoc_type)]
#![feature(error_generic_member_access)]

Expand Down
1 change: 0 additions & 1 deletion src/common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#![feature(test)]
#![feature(trusted_len)]
#![feature(allocator_api)]
#![feature(lint_reasons)]
#![feature(coroutines)]
#![feature(map_try_insert)]
#![feature(error_generic_member_access)]
Expand Down
6 changes: 3 additions & 3 deletions src/compute/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
#![feature(coroutines)]
#![feature(type_alias_impl_trait)]
#![feature(let_chains)]
#![feature(lint_reasons)]
#![feature(impl_trait_in_assoc_type)]
#![cfg_attr(coverage, feature(coverage_attribute))]

Expand Down Expand Up @@ -103,8 +102,9 @@ pub struct ComputeNodeOpts {
pub role: Role,

/// Used for control the metrics level, similar to log level.
/// 0 = disable metrics
/// >0 = enable metrics
///
/// level = 0: disable metrics
/// level > 0: enable metrics
#[clap(long, hide = true, env = "RW_METRICS_LEVEL")]
#[override_opts(path = server.metrics_level)]
pub metrics_level: Option<MetricLevel>,
Expand Down
1 change: 0 additions & 1 deletion src/connector/codec/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#![feature(stmt_expr_attributes)]
#![feature(box_patterns)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![feature(box_into_inner)]
#![feature(type_alias_impl_trait)]
Expand Down
1 change: 0 additions & 1 deletion src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#![feature(stmt_expr_attributes)]
#![feature(box_patterns)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(let_chains)]
#![feature(box_into_inner)]
#![feature(type_alias_impl_trait)]
Expand Down
3 changes: 1 addition & 2 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -297,10 +297,9 @@ mod tests {
.unwrap()
.into_iter()
.filter(|c| c.cardinality() > 0)
.map(|c| {
.inspect(|c| {
// 5 data messages in a single chunk
assert_eq!(5, c.cardinality());
c
})
.collect_vec();

Expand Down
47 changes: 27 additions & 20 deletions src/connector/src/sink/google_pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,15 @@

use std::collections::BTreeMap;

use anyhow::{anyhow, Context};
use futures::future::try_join_all;
use futures::prelude::future::FutureExt;
use futures::prelude::TryFuture;
use futures::TryFutureExt;
use anyhow::anyhow;
use google_cloud_gax::conn::Environment;
use google_cloud_googleapis::pubsub::v1::PubsubMessage;
use google_cloud_pubsub::apiv1;
use google_cloud_pubsub::client::google_cloud_auth::credentials::CredentialsFile;
use google_cloud_pubsub::client::google_cloud_auth::project;
use google_cloud_pubsub::client::google_cloud_auth::token::DefaultTokenSourceProvider;
use google_cloud_pubsub::client::{Client, ClientConfig};
use google_cloud_pubsub::publisher::{Awaiter, Publisher};
use google_cloud_pubsub::publisher::Publisher;
use risingwave_common::array::StreamChunk;
use risingwave_common::catalog::Schema;
use serde_derive::Deserialize;
Expand All @@ -46,19 +42,33 @@ use crate::dispatch_sink_formatter_str_key_impl;
pub const PUBSUB_SINK: &str = "google_pubsub";
const PUBSUB_SEND_FUTURE_BUFFER_MAX_SIZE: usize = 65536;

fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
mod delivery_future {
use anyhow::Context;
use futures::future::try_join_all;
use futures::{FutureExt, TryFuture, TryFutureExt};
use google_cloud_pubsub::publisher::Awaiter;

use crate::sink::SinkError;

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

pub(super) fn may_delivery_future(awaiter: Vec<Awaiter>) -> GooglePubSubSinkDeliveryFuture {
try_join_all(awaiter.into_iter().map(|awaiter| {
awaiter.get().map(|result| {
result
.context("Google Pub/Sub sink error")
.map_err(SinkError::GooglePubSub)
.map(|_| ())
})
}))
.map_ok(|_: Vec<()>| ())
.boxed()
}
}

use delivery_future::*;

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct GooglePubSubConfig {
Expand Down Expand Up @@ -172,9 +182,6 @@ struct GooglePubSubPayloadWriter<'w> {
add_future: DeliveryFutureManagerAddFuture<'w, GooglePubSubSinkDeliveryFuture>,
}

pub type GooglePubSubSinkDeliveryFuture =
impl TryFuture<Ok = (), Error = SinkError> + Unpin + 'static;

impl GooglePubSubSinkWriter {
pub async fn new(
config: GooglePubSubConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ pub struct MonitoredFanoutPartitionedWriterBuilder<B: IcebergWriterBuilder> {
}

impl<B: IcebergWriterBuilder> MonitoredFanoutPartitionedWriterBuilder<B> {
#[expect(dead_code)]
pub fn new(
inner: FanoutPartitionedWriterBuilder<B>,
partition_num: LabelGuardedIntGauge<2>,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ pub struct MonitoredWriteWriterBuilder<B: IcebergWriterBuilder> {

impl<B: IcebergWriterBuilder> MonitoredWriteWriterBuilder<B> {
/// Create writer context.
#[expect(dead_code)]
pub fn new(
inner: B,
write_qps: LabelGuardedIntCounter<2>,
Expand Down
2 changes: 1 addition & 1 deletion src/connector/src/sink/redis.rs
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ impl RedisSinkPayloadWriter {
return Ok(());
}
}
self.pipe.query(self.conn.as_mut().unwrap()).await?;
self.pipe.query::<()>(self.conn.as_mut().unwrap()).await?;
self.pipe.clear();
Ok(())
}
Expand Down
5 changes: 2 additions & 3 deletions src/connector/src/sink/remote.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use async_trait::async_trait;
use await_tree::InstrumentAwait;
use futures::future::select;
use futures::TryStreamExt;
use itertools::Itertools;
use jni::JavaVM;
use prost::Message;
use risingwave_common::array::StreamChunk;
Expand Down Expand Up @@ -174,7 +173,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
bail!("Es sink only supports single pk or pk with delimiter option");
}
// FIXME: support struct and array in stream sink
param.columns.iter().map(|col| {
param.columns.iter().try_for_each(|col| {
match &col.data_type {
DataType::Int16
| DataType::Int32
Expand Down Expand Up @@ -217,7 +216,7 @@ async fn validate_remote_sink(param: &SinkParam, sink_name: &str) -> ConnectorRe
"remote sink supports Int16, Int32, Int64, Float32, Float64, Boolean, Decimal, Time, Date, Interval, Jsonb, Timestamp, Timestamptz, Bytea, List and Varchar, (Es sink support Struct) got {:?}: {:?}",
col.name,
col.data_type,
)))}}).try_collect()?;
)))}})?;

let jvm = JVM.get_or_init()?;
let sink_param = param.to_proto();
Expand Down
1 change: 0 additions & 1 deletion src/dml/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

#![allow(clippy::derive_partial_eq_without_eq)]
#![feature(trait_alias)]
#![feature(lint_reasons)]
#![feature(coroutines)]
#![feature(hash_extract_if)]
#![feature(type_alias_impl_trait)]
Expand Down
1 change: 0 additions & 1 deletion src/error/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
//! access if `risingwave_common` is already a dependency.
#![feature(error_generic_member_access)]
#![feature(lint_reasons)]
#![feature(register_tool)]
#![register_tool(rw)]
#![feature(trait_alias)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
// limitations under the License.

#![feature(let_chains)]
#![feature(lint_reasons)]
#![feature(iterator_try_collect)]
#![feature(coroutines)]
#![feature(never_type)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/impl/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#![allow(non_snake_case)] // for `ctor` generated code
#![feature(let_chains)]
#![feature(assert_matches)]
#![feature(lint_reasons)]
#![feature(iterator_try_collect)]
#![feature(coroutines)]
#![feature(test)]
Expand Down
1 change: 0 additions & 1 deletion src/expr/macro/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#![feature(lint_reasons)]
#![feature(let_chains)]

use std::vec;
Expand Down
Loading

0 comments on commit 849a4cc

Please sign in to comment.