Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/cargo/chrono-0.4.31
Browse files Browse the repository at this point in the history
  • Loading branch information
TennyZhuang authored Sep 18, 2023
2 parents 13fa64f + 784fe56 commit 16a2e9f
Show file tree
Hide file tree
Showing 23 changed files with 713 additions and 343 deletions.
8 changes: 4 additions & 4 deletions .config/hakari.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,10 @@ resolver = "2"
# Add triples corresponding to platforms commonly used by developers here.
# https://doc.rust-lang.org/rustc/platform-support.html
platforms = [
"x86_64-unknown-linux-gnu",
"aarch64-unknown-linux-gnu",
"x86_64-apple-darwin",
"aarch64-apple-darwin",
# "x86_64-unknown-linux-gnu",
# "aarch64-unknown-linux-gnu",
# "x86_64-apple-darwin",
# "aarch64-apple-darwin",
]

# Write out exact versions rather than a semver range. (Defaults to false.)
Expand Down
35 changes: 21 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

24 changes: 24 additions & 0 deletions e2e_test/batch/basic/func.slt.part
Original file line number Diff line number Diff line change
Expand Up @@ -570,6 +570,30 @@ select regexp_replace('💩💩💩💩💩foo🤔️bar亲爱的😭baz这不
----
💩💩💩💩💩foo🤔️bar亲爱的😭这是🥵爱情❤️‍🔥

# Positive Lookahead
query T
select regexp_replace('foobarbaz', 'a(?=r)', 'X');
----
foobXrbaz

# Negative Lookahead
query T
select regexp_replace('chocolate', 'o(?!c)', 'X');
----
chocXlate

# Positive Lookbehind
query T
select regexp_replace('foobarXaz', '(?<=X)a', 'X');
----
foobarXXz

# Negative Lookbehind
query T
select regexp_replace('foobarXaz', '(?<!X)a', 'X');
----
foobXrXaz

query T
select regexp_count('ABCABCAXYaxy', 'A.');
----
Expand Down
29 changes: 29 additions & 0 deletions e2e_test/udf/udf.slt
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,35 @@ select (extract_tcp_info(E'\\x45000034a8a8400040065b8ac0a8000ec0a80001035d20b6d9
----
192.168.0.14 192.168.0.1 861 8374

# steaming
# to ensure UDF & UDTF respect visibility

statement ok
create table t (x int);

statement ok
create materialized view mv as select gcd(x, x), series(x) from t where x <> 2;

statement ok
insert into t values (1), (2), (3);

statement ok
flush;

query II
select * from mv;
----
1 0
3 0
3 1
3 2

statement ok
drop materialized view mv;

statement ok
drop table t;

# error handling

statement error
Expand Down
6 changes: 4 additions & 2 deletions src/common/src/array/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ use crate::util::iter_util::ZipEqDebug;

// Implement bi-directional `From` between `DataChunk` and `arrow_array::RecordBatch`.

// note: DataChunk -> arrow RecordBatch will IGNORE the visibilities.
impl TryFrom<&DataChunk> for arrow_array::RecordBatch {
type Error = ArrayError;

Expand All @@ -47,8 +48,9 @@ impl TryFrom<&DataChunk> for arrow_array::RecordBatch {
.collect();

let schema = Arc::new(Schema::new(fields));

arrow_array::RecordBatch::try_new(schema, columns)
let opts =
arrow_array::RecordBatchOptions::default().with_row_count(Some(chunk.capacity()));
arrow_array::RecordBatch::try_new_with_options(schema, columns, &opts)
.map_err(|err| ArrayError::ToArrow(err.to_string()))
}
}
Expand Down
22 changes: 22 additions & 0 deletions src/common/src/array/data_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::borrow::Cow;
use std::fmt::Display;
use std::hash::BuildHasher;
use std::sync::Arc;
Expand Down Expand Up @@ -265,6 +266,27 @@ impl DataChunk {
}
}

/// Convert the chunk to compact format.
///
/// If the chunk is not compacted, return a new compacted chunk, otherwise return a reference to self.
pub fn compact_cow(&self) -> Cow<'_, Self> {
match &self.vis2 {
Vis::Compact(_) => Cow::Borrowed(self),
Vis::Bitmap(visibility) => {
let cardinality = visibility.count_ones();
let columns = self
.columns
.iter()
.map(|col| {
let array = col;
array.compact(visibility, cardinality).into()
})
.collect::<Vec<_>>();
Cow::Owned(Self::new(columns, cardinality))
}
}
}

pub fn from_protobuf(proto: &PbDataChunk) -> ArrayResult<Self> {
let mut columns = vec![];
for any_col in proto.get_columns() {
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#![feature(async_fn_in_trait)]
#![feature(associated_type_defaults)]
#![feature(impl_trait_in_assoc_type)]
#![feature(iter_from_generator)]

use std::time::Duration;

Expand Down
55 changes: 55 additions & 0 deletions src/connector/src/sink/formatter/append_only.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::Op;

use super::{Result, SinkFormatter, StreamChunk};
use crate::sink::encoder::RowEncoder;
use crate::tri;

pub struct AppendOnlyFormatter<KE, VE> {
key_encoder: KE,
val_encoder: VE,
}

impl<KE, VE> AppendOnlyFormatter<KE, VE> {
pub fn new(key_encoder: KE, val_encoder: VE) -> Self {
Self {
key_encoder,
val_encoder,
}
}
}

impl<KE: RowEncoder, VE: RowEncoder> SinkFormatter for AppendOnlyFormatter<KE, VE> {
type K = KE::Output;
type V = VE::Output;

fn format_chunk(
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>> {
std::iter::from_generator(|| {
for (op, row) in chunk.rows() {
if op != Op::Insert {
continue;
}
let event_key_object = Some(tri!(self.key_encoder.encode(row)));
let event_object = Some(tri!(self.val_encoder.encode(row)));

yield Ok((event_key_object, event_object))
}
})
}
}
50 changes: 50 additions & 0 deletions src/connector/src/sink/formatter/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
// Copyright 2023 RisingWave Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::array::StreamChunk;

use crate::sink::Result;

mod append_only;
mod upsert;

pub use append_only::AppendOnlyFormatter;
pub use upsert::UpsertFormatter;

/// Transforms a `StreamChunk` into a sequence of key-value pairs according a specific format,
/// for example append-only, upsert or debezium.
pub trait SinkFormatter {
type K;
type V;

fn format_chunk(
&self,
chunk: &StreamChunk,
) -> impl Iterator<Item = Result<(Option<Self::K>, Option<Self::V>)>>;
}

/// `tri!` in generators yield `Err` and return `()`
/// `?` in generators return `Err`
#[macro_export]
macro_rules! tri {
($expr:expr) => {
match $expr {
Ok(val) => val,
Err(err) => {
yield Err(err);
return;
}
}
};
}
Loading

0 comments on commit 16a2e9f

Please sign in to comment.