Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into xxchan/source-test
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan committed Apr 17, 2024
2 parents 1636afa + adcdd8c commit f0f6716
Show file tree
Hide file tree
Showing 11 changed files with 133 additions and 55 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ futures-timer = { git = "https://github.com/madsim-rs/futures-timer.git", rev =
# patch: unlimit 4MB message size for grpc client
etcd-client = { git = "https://github.com/risingwavelabs/etcd-client.git", rev = "4e84d40" }
# todo(wcy-fdu): remove this patch fork after opendal release a new version to apply azure workload identity change.
reqsign = { git = "https://github.com/wcy-fdu/reqsign.git", rev = "e6cb304" }
reqsign = { git = "https://github.com/wcy-fdu/reqsign.git", rev = "002ee2a" }

[workspace.metadata.dylint]
libraries = [{ path = "./lints" }]
2 changes: 1 addition & 1 deletion docker/docker-compose-with-sqlite.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:nightly-20240328}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.8.1}
services:
risingwave-standalone:
<<: *image
Expand Down
18 changes: 7 additions & 11 deletions scripts/install/install-risingwave.sh
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,9 @@ fi
STATE_STORE_PATH="${HOME}/.risingwave/state_store"
META_STORE_PATH="${HOME}/.risingwave/meta_store"

VERSION="v1.7.0-standalone"
# TODO(kwannoel): re-enable it once we have stable release in latest for single node mode.
#VERSION=$(curl -s https://api.github.com/repos/risingwavelabs/risingwave/releases/latest \
# | grep '.tag_name' \
# | sed -E -n 's/.*(v[0-9]+.[0-9]+.[0-9])\",/\1/p')
HOMEBREW_VERSION="1.7-standalone"
VERSION=$(curl -s https://api.github.com/repos/risingwavelabs/risingwave/releases/latest \
| grep '.tag_name' \
| sed -E -n 's/.*(v[0-9]+.[0-9]+.[0-9])\",/\1/p')

BASE_URL="https://github.com/risingwavelabs/risingwave/releases/download"

Expand Down Expand Up @@ -51,11 +48,11 @@ fi

############# BREW INSTALL
if [ "${USE_BREW}" -eq 1 ]; then
echo "Installing RisingWave@${HOMEBREW_VERSION} using Homebrew."
echo "Installing RisingWave@${VERSION} using Homebrew."
brew tap risingwavelabs/risingwave
brew install risingwave@${HOMEBREW_VERSION}
brew install risingwave
echo
echo "Successfully installed RisingWave@${HOMEBREW_VERSION} using Homebrew."
echo "Successfully installed RisingWave@${VERSION} using Homebrew."
echo
echo "Run RisingWave:"
echo
Expand Down Expand Up @@ -107,5 +104,4 @@ if [ -z "${JAVA_HOME}" ]; then
tput setaf 3
echo "WARNING: Java is required to use RisingWave's Java Connectors (e.g. MySQL)."
echo "Please install Java, and set the \$JAVA_HOME environment variable."
fi
# TODO(kwannoel): Include link to our docs.
fi
14 changes: 3 additions & 11 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,9 +40,7 @@ use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::PausedReason;
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_service::barrier_complete_response::CreateMviewProgress;
use risingwave_pb::stream_service::{
streaming_control_stream_response, BarrierCompleteResponse, StreamingControlStreamResponse,
};
use risingwave_pb::stream_service::BarrierCompleteResponse;
use thiserror_ext::AsReport;
use tokio::sync::oneshot::{Receiver, Sender};
use tokio::sync::Mutex;
Expand Down Expand Up @@ -670,16 +668,10 @@ impl GlobalBarrierManager {
_ => {}
}
}
resp_result = self.control_stream_manager.next_response() => {
resp_result = self.control_stream_manager.next_complete_barrier_response() => {
match resp_result {
Ok((worker_id, prev_epoch, resp)) => {
let resp: StreamingControlStreamResponse = resp;
match resp.response {
Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => {
self.checkpoint_control.barrier_collected(worker_id, prev_epoch, resp);
},
resp => unreachable!("invalid response: {:?}", resp),
}
self.checkpoint_control.barrier_collected(worker_id, prev_epoch, resp);

}
Err(e) => {
Expand Down
17 changes: 4 additions & 13 deletions src/meta/src/barrier/recovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::assert_matches::assert_matches;
use std::collections::{BTreeSet, HashMap, HashSet};
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand All @@ -29,9 +28,6 @@ use risingwave_pb::meta::{PausedReason, Recovery};
use risingwave_pb::stream_plan::barrier::BarrierKind;
use risingwave_pb::stream_plan::barrier_mutation::Mutation;
use risingwave_pb::stream_plan::AddMutation;
use risingwave_pb::stream_service::{
streaming_control_stream_response, StreamingControlStreamResponse,
};
use thiserror_ext::AsReport;
use tokio::sync::oneshot;
use tokio_retry::strategy::{jitter, ExponentialBackoff};
Expand Down Expand Up @@ -502,15 +498,10 @@ impl GlobalBarrierManager {
let mut node_to_collect =
control_stream_manager.inject_barrier(command_ctx.clone())?;
while !node_to_collect.is_empty() {
let (worker_id, _, resp) = control_stream_manager.next_response().await?;
assert_matches!(
resp,
StreamingControlStreamResponse {
response: Some(
streaming_control_stream_response::Response::CompleteBarrier(_)
)
}
);
let (worker_id, prev_epoch, _) = control_stream_manager
.next_complete_barrier_response()
.await?;
assert_eq!(prev_epoch, command_ctx.prev_epoch.value().0);
assert!(node_to_collect.remove(&worker_id));
}

Expand Down
58 changes: 46 additions & 12 deletions src/meta/src/barrier/rpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
// limitations under the License.

use std::collections::{HashMap, HashSet, VecDeque};
use std::error::Error;
use std::future::Future;
use std::sync::Arc;
use std::time::Duration;
Expand All @@ -28,7 +29,7 @@ use risingwave_common::util::tracing::TracingContext;
use risingwave_pb::common::{ActorInfo, WorkerNode};
use risingwave_pb::stream_plan::{Barrier, BarrierMutation, StreamActor};
use risingwave_pb::stream_service::{
streaming_control_stream_request, streaming_control_stream_response,
streaming_control_stream_request, streaming_control_stream_response, BarrierCompleteResponse,
BroadcastActorInfoTableRequest, BuildActorsRequest, DropActorsRequest, InjectBarrierRequest,
StreamingControlStreamRequest, StreamingControlStreamResponse, UpdateActorsRequest,
};
Expand All @@ -47,6 +48,8 @@ use super::GlobalBarrierManagerContext;
use crate::manager::{MetaSrvEnv, WorkerId};
use crate::{MetaError, MetaResult};

const COLLECT_ERROR_TIMEOUT: Duration = Duration::from_secs(3);

struct ControlStreamNode {
worker: WorkerNode,
sender: UnboundedSender<StreamingControlStreamRequest>,
Expand Down Expand Up @@ -162,17 +165,25 @@ impl ControlStreamManager {
Ok(())
}

pub(super) async fn next_response(
async fn next_response(
&mut self,
) -> Option<(WorkerId, MetaResult<StreamingControlStreamResponse>)> {
let (worker_id, response_stream, result) = self.response_streams.next().await?;
if result.is_ok() {
self.response_streams
.push(into_future(worker_id, response_stream));
}
Some((worker_id, result))
}

pub(super) async fn next_complete_barrier_response(
&mut self,
) -> MetaResult<(WorkerId, u64, StreamingControlStreamResponse)> {
) -> MetaResult<(WorkerId, u64, BarrierCompleteResponse)> {
loop {
let (worker_id, response_stream, result) =
pending_on_none(self.response_streams.next()).await;
let (worker_id, result) = pending_on_none(self.next_response()).await;
match result {
Ok(resp) => match &resp.response {
Some(streaming_control_stream_response::Response::CompleteBarrier(_)) => {
self.response_streams
.push(into_future(worker_id, response_stream));
Ok(resp) => match resp.response {
Some(streaming_control_stream_response::Response::CompleteBarrier(resp)) => {
let node = self
.nodes
.get_mut(&worker_id)
Expand All @@ -195,16 +206,39 @@ impl ControlStreamManager {
// Note: No need to use `?` as the backtrace is from meta and not useful.
warn!(node = ?node.worker, err = %err.as_report(), "get error from response stream");
if let Some(command) = node.inflight_barriers.pop_front() {
let errors = self.collect_errors(node.worker.id, err).await;
let err = merge_node_rpc_errors("get error from control stream", errors);
self.context.report_collect_failure(&command, &err);
break Err(err);
} else {
// for node with no inflight barrier, simply ignore the error
info!(node = ?node.worker, "no inflight barrier no node. Ignore error");
continue;
}
}
}
}
}

async fn collect_errors(
&mut self,
worker_id: WorkerId,
first_err: MetaError,
) -> Vec<(WorkerId, MetaError)> {
let mut errors = vec![(worker_id, first_err)];
#[cfg(not(madsim))]
{
let _ = timeout(COLLECT_ERROR_TIMEOUT, async {
while let Some((worker_id, result)) = self.next_response().await {
if let Err(e) = result {
errors.push((worker_id, e));
}
}
})
.await;
}
errors
}
}

impl ControlStreamManager {
Expand Down Expand Up @@ -356,7 +390,7 @@ impl StreamRpcManager {
let client = pool.get(node).await.map_err(|e| (node.id, e))?;
f(client, input).await.map_err(|e| (node.id, e))
});
let result = try_join_all_with_error_timeout(iters, Duration::from_secs(3)).await;
let result = try_join_all_with_error_timeout(iters, COLLECT_ERROR_TIMEOUT).await;
result.map_err(|results_err| merge_node_rpc_errors("merged RPC Error", results_err))
}

Expand Down Expand Up @@ -491,9 +525,9 @@ where
Err(results_err)
}

fn merge_node_rpc_errors(
fn merge_node_rpc_errors<E: Error>(
message: &str,
errors: impl IntoIterator<Item = (WorkerId, RpcError)>,
errors: impl IntoIterator<Item = (WorkerId, E)>,
) -> MetaError {
use std::fmt::Write;

Expand Down
21 changes: 16 additions & 5 deletions src/object_store/src/object/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -888,11 +888,22 @@ pub async fn build_remote_object_store(
set your endpoint to the environment variable RW_S3_ENDPOINT.");
panic!("Passing s3-compatible is not supported, please modify the environment variable and pass in s3.");
}
minio if minio.starts_with("minio://") => ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
),
minio if minio.starts_with("minio://") => {
if config.s3.developer.use_opendal {
tracing::info!("Using OpenDAL to access minio.");
ObjectStoreImpl::Opendal(
OpendalObjectStore::with_minio(minio, config.clone())
.unwrap()
.monitored(metrics, config),
)
} else {
ObjectStoreImpl::S3(
S3ObjectStore::with_minio(minio, metrics.clone(), config.clone())
.await
.monitored(metrics, config),
)
}
}
"memory" => {
if ident == "Meta Backup" {
tracing::warn!("You're using in-memory remote object store for {}. This is not recommended for production environment.", ident);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ pub enum EngineType {
Memory,
Hdfs,
Gcs,
Minio,
S3,
Obs,
Oss,
Expand All @@ -64,6 +65,7 @@ impl ObjectStore for OpendalObjectStore {
fn get_object_prefix(&self, obj_id: u64) -> String {
match self.engine_type {
EngineType::S3 => prefix::s3::get_object_prefix(obj_id),
EngineType::Minio => prefix::s3::get_object_prefix(obj_id),
EngineType::Memory => String::default(),
EngineType::Hdfs => String::default(),
EngineType::Gcs => String::default(),
Expand Down Expand Up @@ -201,6 +203,7 @@ impl ObjectStore for OpendalObjectStore {
match self.engine_type {
EngineType::Memory => "Memory",
EngineType::Hdfs => "Hdfs",
EngineType::Minio => "Minio",
EngineType::S3 => "S3",
EngineType::Gcs => "Gcs",
EngineType::Obs => "Obs",
Expand Down
50 changes: 50 additions & 0 deletions src/object_store/src/object/opendal_engine/opendal_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,56 @@ impl OpendalObjectStore {
})
}

/// Creates a minio client. The server should be like `minio://key:secret@address:port/bucket`.
pub fn with_minio(server: &str, object_store_config: ObjectStoreConfig) -> ObjectResult<Self> {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();

let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
} else if let Some(rest_stripped) = rest.strip_prefix("http://") {
rest = rest_stripped;
"http://"
} else {
"http://"
};
let (address, bucket) = rest.split_once('/').unwrap();

let mut builder = S3::default();
builder
.bucket(bucket)
.region("custom")
.access_key_id(access_key_id)
.secret_access_key(secret_access_key)
.endpoint(&format!("{}{}", endpoint_prefix, address));

builder.disable_config_load();
let http_client = Self::new_http_client(&object_store_config)?;
builder.http_client(http_client);
let op: Operator = Operator::new(builder)?
.layer(LoggingLayer::default())
.layer(
RetryLayer::new()
.with_min_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_interval_ms,
))
.with_max_delay(Duration::from_millis(
object_store_config.s3.object_store_req_retry_max_delay_ms,
))
.with_max_times(object_store_config.s3.object_store_req_retry_max_attempts)
.with_factor(1.0)
.with_jitter(),
)
.finish();

Ok(Self {
op,
engine_type: EngineType::Minio,
})
}

pub fn new_http_client(config: &ObjectStoreConfig) -> ObjectResult<HttpClient> {
let mut client_builder = reqwest::ClientBuilder::new();

Expand Down
1 change: 1 addition & 0 deletions src/object_store/src/object/s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -658,6 +658,7 @@ impl S3ObjectStore {
let server = server.strip_prefix("minio://").unwrap();
let (access_key_id, rest) = server.split_once(':').unwrap();
let (secret_access_key, mut rest) = rest.split_once('@').unwrap();

let endpoint_prefix = if let Some(rest_stripped) = rest.strip_prefix("https://") {
rest = rest_stripped;
"https://"
Expand Down

0 comments on commit f0f6716

Please sign in to comment.