Skip to content

Commit

Permalink
feat(test): support customized source logic in deterministic test
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Sep 20, 2023
1 parent 869ef90 commit 41ca193
Show file tree
Hide file tree
Showing 9 changed files with 346 additions and 24 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion ci/scripts/deterministic-it-test.sh
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ buildkite-agent artifact download simulation-it-test.tar.zst .
echo "--- Extract artifacts"
tar -xvf simulation-it-test.tar.zst
mkdir target/sim
mv target/ci-sim target/sim
mv target/debug target/sim/ci-sim

echo "--- Run integration tests in deterministic simulation mode"
seq $TEST_NUM | parallel MADSIM_TEST_SEED={} NEXTEST_PROFILE=ci-sim \
Expand Down
5 changes: 3 additions & 2 deletions src/connector/src/macros.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ macro_rules! for_all_classified_sources {
{ Datagen, $crate::source::datagen::DatagenProperties, $crate::source::datagen::DatagenSplit },
{ GooglePubsub, $crate::source::google_pubsub::PubsubProperties, $crate::source::google_pubsub::PubsubSplit },
{ Nats, $crate::source::nats::NatsProperties, $crate::source::nats::split::NatsSplit },
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit }
{ S3, $crate::source::filesystem::S3Properties, $crate::source::filesystem::FsSplit },
{ Test, $crate::source::test_source::TestSourceProperties, $crate::source::test_source::TestSourceSplit}
}
$(
,$extra_args
Expand Down Expand Up @@ -152,7 +153,7 @@ macro_rules! dispatch_split_impl {
macro_rules! impl_split {
({$({ $variant_name:ident, $prop_name:ty, $split:ty}),*}) => {

#[derive(Debug, Clone, EnumAsInner, PartialEq, Hash)]
#[derive(Debug, Clone, EnumAsInner, PartialEq)]
pub enum SplitImpl {
$(
$variant_name($split),
Expand Down
2 changes: 2 additions & 0 deletions src/connector/src/source/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ mod common;
pub mod external;
mod manager;
mod mock_external_table;
pub mod test_source;

pub use manager::{SourceColumnDesc, SourceColumnType};
pub use mock_external_table::MockExternalTableReader;

Expand Down
239 changes: 239 additions & 0 deletions src/connector/src/source/test_source.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,239 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::{Arc, OnceLock};

use anyhow::anyhow;
use async_trait::async_trait;
use parking_lot::Mutex;
use risingwave_common::types::JsonbVal;
use serde_derive::{Deserialize, Serialize};

use crate::parser::ParserConfig;
use crate::source::{
BoxSourceWithStateStream, Column, SourceContextRef, SourceEnumeratorContextRef,
SourceProperties, SplitEnumerator, SplitId, SplitMetaData, SplitReader, TryFromHashmap,
};

pub type BoxListSplits = Box<
dyn FnMut(
TestSourceProperties,
SourceEnumeratorContextRef,
) -> anyhow::Result<Vec<TestSourceSplit>>
+ Send
+ 'static,
>;

pub type BoxIntoSourceStream = Box<
dyn FnMut(
TestSourceProperties,
Vec<TestSourceSplit>,
ParserConfig,
SourceContextRef,
Option<Vec<Column>>,
) -> BoxSourceWithStateStream
+ Send
+ 'static,
>;

pub struct BoxSource {
list_split: BoxListSplits,
into_source_stream: BoxIntoSourceStream,
}

impl BoxSource {
pub fn new(
list_splits: impl FnMut(
TestSourceProperties,
SourceEnumeratorContextRef,
) -> anyhow::Result<Vec<TestSourceSplit>>
+ Send
+ 'static,
into_source_stream: impl FnMut(
TestSourceProperties,
Vec<TestSourceSplit>,
ParserConfig,
SourceContextRef,
Option<Vec<Column>>,
) -> BoxSourceWithStateStream
+ Send
+ 'static,
) -> BoxSource {
BoxSource {
list_split: Box::new(list_splits),
into_source_stream: Box::new(into_source_stream),
}
}
}

struct TestSourceRegistry {
box_source: Arc<Mutex<Option<BoxSource>>>,
}

impl TestSourceRegistry {
fn new() -> Self {
TestSourceRegistry {
box_source: Arc::new(Mutex::new(None)),
}
}
}

fn get_registry() -> &'static TestSourceRegistry {
static GLOBAL_REGISTRY: OnceLock<TestSourceRegistry> = OnceLock::new();
GLOBAL_REGISTRY.get_or_init(TestSourceRegistry::new)
}

pub struct TestSourceRegistryGuard;

impl Drop for TestSourceRegistryGuard {
fn drop(&mut self) {
assert!(get_registry().box_source.lock().take().is_some());
}
}

pub fn registry_test_source(box_source: BoxSource) -> TestSourceRegistryGuard {
assert!(get_registry()
.box_source
.lock()
.replace(box_source)
.is_none());
TestSourceRegistryGuard
}

pub const TEST_CONNECTOR: &str = "test";

#[derive(Clone, Debug)]
pub struct TestSourceProperties {
properties: HashMap<String, String>,
}

impl TryFromHashmap for TestSourceProperties {
fn try_from_hashmap(props: HashMap<String, String>) -> anyhow::Result<Self> {
if cfg!(any(madsim, test)) {
Ok(TestSourceProperties { properties: props })
} else {
Err(anyhow!("test source only available at test"))
}
}
}

#[derive(Clone, Debug, PartialEq, Serialize, Deserialize)]
pub struct TestSourceSplit {
pub id: SplitId,
pub properties: HashMap<String, String>,
pub offset: String,
}

impl SplitMetaData for TestSourceSplit {
fn id(&self) -> SplitId {
self.id.clone()
}

fn encode_to_json(&self) -> JsonbVal {
serde_json::to_value(self.clone()).unwrap().into()
}

fn restore_from_json(value: JsonbVal) -> anyhow::Result<Self> {
serde_json::from_value(value.take()).map_err(|e| anyhow!(e))
}

fn update_with_offset(&mut self, start_offset: String) -> anyhow::Result<()> {
self.offset = start_offset;
Ok(())
}
}

pub struct TestSourceSplitEnumerator {
properties: TestSourceProperties,
context: SourceEnumeratorContextRef,
}

#[async_trait]
impl SplitEnumerator for TestSourceSplitEnumerator {
type Properties = TestSourceProperties;
type Split = TestSourceSplit;

async fn new(
properties: Self::Properties,
context: SourceEnumeratorContextRef,
) -> anyhow::Result<Self> {
Ok(Self {
properties,
context,
})
}

async fn list_splits(&mut self) -> anyhow::Result<Vec<Self::Split>> {
(get_registry()
.box_source
.lock()
.as_mut()
.expect("should have init")
.list_split)(self.properties.clone(), self.context.clone())
}
}

pub struct TestSourceSplitReader {
properties: TestSourceProperties,
state: Vec<TestSourceSplit>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
}

#[async_trait]
impl SplitReader for TestSourceSplitReader {
type Properties = TestSourceProperties;
type Split = TestSourceSplit;

async fn new(
properties: Self::Properties,
state: Vec<Self::Split>,
parser_config: ParserConfig,
source_ctx: SourceContextRef,
columns: Option<Vec<Column>>,
) -> anyhow::Result<Self> {
Ok(Self {
properties,
state,
parser_config,
source_ctx,
columns,
})
}

fn into_stream(self) -> BoxSourceWithStateStream {
(get_registry()
.box_source
.lock()
.as_mut()
.expect("should have init")
.into_source_stream)(
self.properties,
self.state,
self.parser_config,
self.source_ctx,
self.columns,
)
}
}

impl SourceProperties for TestSourceProperties {
type Split = TestSourceSplit;
type SplitEnumerator = TestSourceSplitEnumerator;
type SplitReader = TestSourceSplitReader;

const SOURCE_NAME: &'static str = TEST_CONNECTOR;
}
4 changes: 4 additions & 0 deletions src/frontend/src/handler/create_source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ use risingwave_connector::source::{
SourceEncode, SourceFormat, SourceStruct, GOOGLE_PUBSUB_CONNECTOR, KAFKA_CONNECTOR,
KINESIS_CONNECTOR, NATS_CONNECTOR, NEXMARK_CONNECTOR, PULSAR_CONNECTOR,
};
use risingwave_connector::source::test_source::TEST_CONNECTOR;
use risingwave_pb::catalog::{
PbSchemaRegistryNameStrategy, PbSource, StreamSourceInfo, WatermarkDesc,
};
Expand Down Expand Up @@ -907,6 +908,9 @@ static CONNECTORS_COMPATIBLE_FORMATS: LazyLock<HashMap<String, HashMap<Format, V
NATS_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json],
),
TEST_CONNECTOR => hashmap!(
Format::Plain => vec![Encode::Json],
)
))
});

Expand Down
2 changes: 1 addition & 1 deletion src/stream/src/executor/wrapper/schema_check.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ pub async fn schema_check(info: Arc<ExecutorInfo>, input: impl MessageStream) {
}
Message::Barrier(_) => Ok(()),
}
.unwrap_or_else(|e| panic!("schema check failed on {}: {}", info.identity, e));
.unwrap_or_else(|e| panic!("schema check failed on {:?}: {}", info, e));

yield message;
}
Expand Down
1 change: 1 addition & 0 deletions src/tests/simulation/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ serde_json = "1.0.107"
sqllogictest = "0.15.3"
tempfile = "3"
tokio = { version = "0.2.23", package = "madsim-tokio" }
tokio-stream = "0.1"
tokio-postgres = "0.7"
tracing = "0.1"
tracing-subscriber = { version = "0.3", features = ["env-filter"] }
Expand Down
Loading

0 comments on commit 41ca193

Please sign in to comment.