Skip to content

Commit

Permalink
Merge branch 'main' into tab/enable.ssl.certificate.verification
Browse files Browse the repository at this point in the history
  • Loading branch information
xxchan authored Feb 14, 2024
2 parents b597f52 + 2306bd9 commit b803e1c
Show file tree
Hide file tree
Showing 34 changed files with 251 additions and 314 deletions.
12 changes: 6 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ ENV CARGO_REGISTRIES_CRATES_IO_PROTOCOL=sparse
# install build tools
RUN curl -L --proto '=https' --tlsv1.2 -sSf https://raw.githubusercontent.com/cargo-bins/cargo-binstall/main/install-from-binstall-release.sh | bash
RUN cargo binstall -y --no-symlinks cargo-llvm-cov cargo-nextest cargo-hakari cargo-sort cargo-cache cargo-audit \
cargo-make@0.36.10 \
cargo-make@0.37.9 \
[email protected] \
[email protected] \
&& cargo cache -a \
Expand Down
2 changes: 1 addition & 1 deletion ci/build-ci-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ cat ../rust-toolchain
# shellcheck disable=SC2155

# REMEMBER TO ALSO UPDATE ci/docker-compose.yml
export BUILD_ENV_VERSION=v20240208
export BUILD_ENV_VERSION=v20240213

export BUILD_TAG="public.ecr.aws/x5u3w5h6/rw-build-env:${BUILD_ENV_VERSION}"

Expand Down
10 changes: 5 additions & 5 deletions ci/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ services:
retries: 5

source-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240208
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240213
depends_on:
- mysql
- db
Expand All @@ -81,7 +81,7 @@ services:
- ..:/risingwave

sink-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240208
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240213
depends_on:
- mysql
- db
Expand All @@ -93,12 +93,12 @@ services:
- ..:/risingwave

rw-build-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240208
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240213
volumes:
- ..:/risingwave

ci-flamegraph-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240208
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240213
# NOTE(kwannoel): This is used in order to permit
# syscalls for `nperf` (perf_event_open),
# so it can do CPU profiling.
Expand All @@ -109,7 +109,7 @@ services:
- ..:/risingwave

regress-test-env:
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240208
image: public.ecr.aws/x5u3w5h6/rw-build-env:v20240213
depends_on:
db:
condition: service_healthy
Expand Down
2 changes: 1 addition & 1 deletion ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ steps:
config: ci/docker-compose.yml
mount-buildkite-agent: true
- ./ci/plugins/upload-failure-logs
timeout_in_minutes: 16
timeout_in_minutes: 17
retry: *auto-retry

- label: "end-to-end test (parallel)"
Expand Down
4 changes: 2 additions & 2 deletions lints/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 9 additions & 10 deletions scripts/install/install-risingwave.sh
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ STATE_STORE_PATH="${HOME}/.risingwave/state_store"
META_STORE_PATH="${HOME}/.risingwave/meta_store"

VERSION="v1.7.0-single-node-2"
HOMEBREW_VERSION="1.7-single-node"
# TODO(kwannoel): re-enable it once we have stable release in latest for single node mode.
#VERSION=$(curl -s https://api.github.com/repos/risingwavelabs/risingwave/releases/latest \
# | grep '.tag_name' \
Expand All @@ -30,17 +31,15 @@ if [ "${OS}" = "Linux" ]; then
fi
elif [ "${OS}" = "Darwin" ]; then
if [ "${ARCH}" = "x86_64" ] || [ "${ARCH}" = "amd64" ] || [ "${ARCH}" = "aarch64" ] || [ "${ARCH}" = "arm64" ]; then
echo "Brew installation is not supported yet."
# USE_BREW=1
USE_BREW=1
fi
fi

if [ -z "$USE_BREW" ]; then
echo
echo "Unsupported OS or Architecture: ${OS}-${ARCH}"
echo
echo "Supported OSes: Linux"
# echo "Supported OSes: Linux, macOS"
echo "Supported OSes: Linux, macOS"
echo "Supported architectures: x86_64"
echo
echo "Please open an issue at <https://github.com/risingwavelabs/risingwave/issues/new/choose>,"
Expand All @@ -60,17 +59,17 @@ echo

############# BREW INSTALL
if [ "${USE_BREW}" -eq 1 ]; then
echo "Installing RisingWave@${VERSION} using Homebrew."
echo "Installing RisingWave@${HOMEBREW_VERSION} using Homebrew."
brew tap risingwavelabs/risingwave
brew install risingwave@${VERSION}
echo "Successfully installed RisingWave@${VERSION} using Homebrew."
brew install risingwave@${HOMEBREW_VERSION}
echo "Successfully installed RisingWave@${HOMEBREW_VERSION} using Homebrew."
echo
echo "You can run it as:"
echo
echo " risingwave >risingwave.log 2>&1 &"
echo
echo
echo "In a separate terminal, you can attach a psql client to the standalone server using:"
echo "You can attach a psql client to the standalone server using:"
echo
echo " psql -h localhost -p 4566 -d dev -U root"
echo
Expand All @@ -83,7 +82,7 @@ if [ "${USE_BREW}" -eq 1 ]; then
echo
echo "To view available options, run:"
echo
echo " ./risingwave single-node --help"
echo " risingwave single-node --help"
echo
echo
exit 0
Expand All @@ -101,7 +100,7 @@ echo
echo " ./risingwave >risingwave.log 2>&1 &"
echo
echo
echo "In a separate terminal, you can connect a psql client to the standalone server using:"
echo "You can connect a psql client to the standalone server using:"
echo
echo " psql -h localhost -p 4566 -d dev -U root"
echo
Expand Down
18 changes: 11 additions & 7 deletions src/common/src/session_config/sink_decouple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,16 @@ impl FromStr for SinkDecouple {
}
}

impl ToString for SinkDecouple {
fn to_string(&self) -> String {
match self {
Self::Default => "default".to_string(),
Self::Enable => "enable".to_string(),
Self::Disable => "disable".to_string(),
}
impl std::fmt::Display for SinkDecouple {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}",
match self {
Self::Default => "default",
Self::Enable => "enable",
Self::Disable => "disable",
}
)
}
}
38 changes: 7 additions & 31 deletions src/connector/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,46 +16,22 @@ use thiserror::Error;

#[derive(Error, Debug)]
pub enum ConnectorError {
#[error("Parse error: {0}")]
Parse(&'static str),

#[error("Invalid parameter {name}: {reason}")]
InvalidParam { name: &'static str, reason: String },

#[error("Kafka error: {0}")]
Kafka(#[from] rdkafka::error::KafkaError),

#[error("Config error: {0}")]
Config(
#[source]
#[error("MySQL error: {0}")]
MySql(
#[from]
#[backtrace]
anyhow::Error,
mysql_async::Error,
),

#[error("Connection error: {0}")]
Connection(
#[source]
#[backtrace]
anyhow::Error,
),

#[error("MySQL error: {0}")]
MySql(#[from] mysql_async::Error),

#[error("Postgres error: {0}")]
Postgres(#[from] tokio_postgres::Error),

#[error("Pulsar error: {0}")]
Pulsar(
#[source]
#[backtrace]
anyhow::Error,
),

#[error(transparent)]
Internal(
Uncategorized(
#[from]
#[backtrace]
anyhow::Error,
),
}

pub type ConnectorResult<T> = Result<T, ConnectorError>;
5 changes: 2 additions & 3 deletions src/connector/src/source/cdc/external/mock_external_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@ use futures_async_stream::try_stream;
use risingwave_common::row::OwnedRow;
use risingwave_common::types::ScalarImpl;

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::source::cdc::external::{
CdcOffset, CdcOffsetParseFunc, ConnectorResult, ExternalTableReader, MySqlOffset,
SchemaTableName,
CdcOffset, CdcOffsetParseFunc, ExternalTableReader, MySqlOffset, SchemaTableName,
};

#[derive(Debug)]
Expand Down
25 changes: 6 additions & 19 deletions src/connector/src/source/cdc/external/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ mod postgres;

use std::collections::HashMap;

use anyhow::{anyhow, Context};
use anyhow::Context;
use futures::stream::BoxStream;
use futures::{pin_mut, StreamExt};
use futures_async_stream::try_stream;
Expand All @@ -32,13 +32,11 @@ use risingwave_common::types::DataType;
use risingwave_common::util::iter_util::ZipEqFast;
use serde_derive::{Deserialize, Serialize};

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::mysql_row_to_owned_row;
use crate::source::cdc::external::mock_external_table::MockExternalTableReader;
use crate::source::cdc::external::postgres::{PostgresExternalTableReader, PostgresOffset};

pub type ConnectorResult<T> = std::result::Result<T, ConnectorError>;

#[derive(Debug)]
pub enum CdcTableType {
Undefined,
Expand Down Expand Up @@ -77,10 +75,7 @@ impl CdcTableType {
Self::Postgres => Ok(ExternalTableReaderImpl::Postgres(
PostgresExternalTableReader::new(with_properties, schema).await?,
)),
_ => bail!(ConnectorError::Config(anyhow!(
"invalid external table type: {:?}",
*self
))),
_ => bail!("invalid external table type: {:?}", *self),
}
}
}
Expand Down Expand Up @@ -405,19 +400,11 @@ impl MySqlExternalTableReader {
DataType::Date => Value::from(value.into_date().0),
DataType::Time => Value::from(value.into_time().0),
DataType::Timestamp => Value::from(value.into_timestamp().0),
_ => {
return Err(ConnectorError::Internal(anyhow!(
"unsupported primary key data type: {}",
ty
)))
}
_ => bail!("unsupported primary key data type: {}", ty),
};
Ok((pk.clone(), val))
ConnectorResult::Ok((pk.clone(), val))
} else {
Err(ConnectorError::Internal(anyhow!(
"primary key {} cannot be null",
pk
)))
bail!("primary key {} cannot be null", pk);
}
})
.try_collect()?;
Expand Down
6 changes: 3 additions & 3 deletions src/connector/src/source/cdc/external/postgres.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ use thiserror_ext::AsReport;
use tokio_postgres::types::PgLsn;
use tokio_postgres::NoTls;

use crate::error::ConnectorError;
use crate::error::{ConnectorError, ConnectorResult};
use crate::parser::postgres_row_to_owned_row;
use crate::source::cdc::external::{
CdcOffset, CdcOffsetParseFunc, ConnectorResult, DebeziumOffset, ExternalTableConfig,
ExternalTableReader, SchemaTableName,
CdcOffset, CdcOffsetParseFunc, DebeziumOffset, ExternalTableConfig, ExternalTableReader,
SchemaTableName,
};

#[derive(Debug, Clone, Default, PartialEq, Serialize, Deserialize)]
Expand Down
12 changes: 6 additions & 6 deletions src/connector/src/source/pulsar/source/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::collections::HashMap;
use std::time::{SystemTime, UNIX_EPOCH};

use anyhow::{anyhow, Context};
use anyhow::Context;
use arrow_array::{Int32Array, Int64Array, RecordBatch};
use async_trait::async_trait;
use futures::StreamExt;
Expand All @@ -32,7 +32,6 @@ use risingwave_common::array::{DataChunk, StreamChunk};
use risingwave_common::catalog::ROWID_PREFIX;
use risingwave_common::{bail, ensure};

use crate::error::ConnectorError;
use crate::parser::ParserConfig;
use crate::source::pulsar::split::PulsarSplit;
use crate::source::pulsar::{PulsarEnumeratorOffset, PulsarProperties};
Expand Down Expand Up @@ -398,10 +397,11 @@ impl PulsarIcebergReader {
fn build_iceberg_configs(&self) -> anyhow::Result<HashMap<String, String>> {
let mut iceberg_configs = HashMap::new();

let bucket =
self.props.iceberg_bucket.as_ref().ok_or_else(|| {
ConnectorError::Pulsar(anyhow!("Iceberg bucket is not configured"))
})?;
let bucket = self
.props
.iceberg_bucket
.as_ref()
.context("Iceberg bucket is not configured")?;

iceberg_configs.insert(CATALOG_TYPE.to_string(), "storage".to_string());
iceberg_configs.insert(CATALOG_NAME.to_string(), "pulsar".to_string());
Expand Down
7 changes: 4 additions & 3 deletions src/connector/src/source/pulsar/topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,10 @@ pub struct Topic {
pub partition_index: Option<i32>,
}

impl ToString for Topic {
fn to_string(&self) -> String {
format!(
impl std::fmt::Display for Topic {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(
f,
"{}://{}/{}/{}",
self.domain, self.tenant, self.namespace, self.topic
)
Expand Down
Loading

0 comments on commit b803e1c

Please sign in to comment.