Skip to content

Commit

Permalink
Merge branch 'release-1.8' into auto-release-1.8-d4ec7f7d450576665543…
Browse files Browse the repository at this point in the history
…57b109495421bbc33f2a
  • Loading branch information
st1page authored Mar 26, 2024
2 parents 8bdcdf1 + a34347e commit 2313f82
Show file tree
Hide file tree
Showing 11 changed files with 145 additions and 296 deletions.
10 changes: 10 additions & 0 deletions ci/scripts/e2e-source-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,20 @@ echo "--- mysql & postgres cdc validate test"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.mysql.slt'
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.validate.postgres.slt'

echo "--- cdc share source test"
# cdc share stream test cases
export MYSQL_HOST=mysql MYSQL_TCP_PORT=3306 MYSQL_PWD=123456
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.share_stream.slt'

# create a share source and check whether heartbeat message is received
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.create_source_job.slt'
table_id=`psql -U root -h localhost -p 4566 -d dev -t -c "select id from rw_internal_tables where name like '%mysql_source%';" | xargs`;
table_count=`psql -U root -h localhost -p 4566 -d dev -t -c "select count(*) from rw_table(${table_id}, public);" | xargs`;
if [ $table_count -eq 0 ]; then
echo "ERROR: internal table of cdc share source is empty!"
exit 1
fi

echo "--- mysql & postgres load and check"
sqllogictest -p 4566 -d dev './e2e_test/source/cdc/cdc.load.slt'
# wait for cdc loading
Expand Down
15 changes: 15 additions & 0 deletions e2e_test/source/cdc/cdc.create_source_job.slt
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
control substitution on

# create a cdc source job, which format fixed to `FORMAT PLAIN ENCODE JSON`
statement ok
create source mysql_source with (
connector = 'mysql-cdc',
hostname = '${MYSQL_HOST:localhost}',
port = '${MYSQL_TCP_PORT:8306}',
username = 'rwcdc',
password = '${MYSQL_PWD:}',
database.name = 'mytest',
server.id = '5001'
);

sleep 2s
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ public static void validateSource(ConnectorServiceProto.ValidateSourceRequest re

boolean isCdcSourceJob = request.getIsSourceJob();
boolean isBackfillTable = request.getIsBackfillTable();
LOG.info(
"source_id: {}, is_cdc_source_job: {}, is_backfill_table: {}",
request.getSourceId(),
isCdcSourceJob,
isBackfillTable);

TableSchema tableSchema = TableSchema.fromProto(request.getTableSchema());
switch (request.getSourceType()) {
Expand Down
20 changes: 9 additions & 11 deletions src/cmd_all/src/bin/risingwave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ use anyhow::Result;
use clap::error::ErrorKind;
use clap::{command, ArgMatches, Args, Command, FromArgMatches};
use risingwave_cmd::{compactor, compute, ctl, frontend, meta};
use risingwave_cmd_all::{PlaygroundOpts, SingleNodeOpts, StandaloneOpts};
use risingwave_cmd_all::{SingleNodeOpts, StandaloneOpts};
use risingwave_common::git_sha;
use risingwave_compactor::CompactorOpts;
use risingwave_compute::ComputeNodeOpts;
Expand Down Expand Up @@ -123,7 +123,7 @@ impl Component {
Self::Frontend => frontend(parse_opts(matches)),
Self::Compactor => compactor(parse_opts(matches)),
Self::Ctl => ctl(parse_opts(matches)),
Self::Playground => playground(parse_opts(matches)),
Self::Playground => single_node(SingleNodeOpts::new_for_playground()),
Self::Standalone => standalone(parse_opts(matches)),
Self::SingleNode => single_node(parse_opts(matches)),
}
Expand Down Expand Up @@ -151,7 +151,7 @@ impl Component {
Component::Frontend => FrontendOpts::augment_args(cmd),
Component::Compactor => CompactorOpts::augment_args(cmd),
Component::Ctl => CtlOpts::augment_args(cmd),
Component::Playground => PlaygroundOpts::augment_args(cmd),
Component::Playground => cmd,
Component::Standalone => StandaloneOpts::augment_args(cmd),
Component::SingleNode => SingleNodeOpts::augment_args(cmd),
}
Expand All @@ -161,8 +161,14 @@ impl Component {
fn commands() -> Vec<Command> {
Self::iter()
.map(|c| {
let is_playground = matches!(c, Component::Playground);
let name: &'static str = c.into();
let command = Command::new(name).visible_aliases(c.aliases());
let command = if is_playground {
command.hide(true)
} else {
command
};
c.augment_args(command)
})
.collect()
Expand Down Expand Up @@ -221,14 +227,6 @@ fn main() -> Result<()> {
Ok(())
}

fn playground(opts: PlaygroundOpts) {
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
.with_target("risingwave_storage", Level::WARN)
.with_thread_name(true);
risingwave_rt::init_risingwave_logger(settings);
risingwave_rt::main_okk(risingwave_cmd_all::playground(opts)).unwrap();
}

fn standalone(opts: StandaloneOpts) {
let opts = risingwave_cmd_all::parse_standalone_opt_args(&opts);
let settings = risingwave_rt::LoggerSettings::from_opts(&opts)
Expand Down
20 changes: 0 additions & 20 deletions src/cmd_all/src/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,3 @@ use std::ffi::OsString;
pub fn osstrs<T: Into<OsString> + AsRef<std::ffi::OsStr>>(s: impl AsRef<[T]>) -> Vec<OsString> {
s.as_ref().iter().map(OsString::from).collect()
}

pub enum RisingWaveService {
Compute(Vec<OsString>),
Meta(Vec<OsString>),
Frontend(Vec<OsString>),
#[allow(dead_code)]
Compactor(Vec<OsString>),
}

impl RisingWaveService {
/// Extend additional arguments to the service.
pub fn extend_args(&mut self, args: &[&str]) {
match self {
RisingWaveService::Compute(args0)
| RisingWaveService::Meta(args0)
| RisingWaveService::Frontend(args0)
| RisingWaveService::Compactor(args0) => args0.extend(args.iter().map(|s| s.into())),
}
}
}
2 changes: 0 additions & 2 deletions src/cmd_all/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,10 @@
#![feature(lazy_cell)]

mod common;
pub mod playground;
mod standalone;

pub mod single_node;

pub use playground::*;
pub use single_node::*;
pub use standalone::*;

Expand Down
Loading

0 comments on commit 2313f82

Please sign in to comment.