Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_rafactor_trx
  • Loading branch information
Li0k committed Jun 3, 2024
2 parents 525773d + 8f63559 commit e5e2f02
Show file tree
Hide file tree
Showing 46 changed files with 614 additions and 440 deletions.
8 changes: 8 additions & 0 deletions e2e_test/sql_migration/check.slt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,14 @@ create materialized view mv3 as select * from mv2;
statement ok
REVOKE ALL PRIVILEGES ON ALL TABLES IN SCHEMA schema1 FROM user1;

query T
show secrets;
----
secret_1

statement ok
drop secret secret_1;

statement error Permission denied
drop source src;

Expand Down
5 changes: 5 additions & 0 deletions e2e_test/sql_migration/prepare.slt
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,8 @@ statement ok
create function int_42() returns int language javascript as $$
return 42;
$$;

statement ok
create secret secret_1 with (
backend = 'meta'
) as 'demo_secret';
3 changes: 3 additions & 0 deletions proto/stream_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,9 @@ message ProjectNode {
repeated uint32 watermark_input_cols = 2;
repeated uint32 watermark_output_cols = 3;
repeated uint32 nondecreasing_exprs = 4;
// Whether there are likely no-op updates in the output chunks, so that eliminating them with
// `StreamChunk::eliminate_adjacent_noop_update` could be beneficial.
bool noop_update_hint = 5;
}

message FilterNode {
Expand Down
3 changes: 1 addition & 2 deletions src/connector/benches/json_vs_plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,7 @@ mod old_json_parser {
let mut errors = Vec::new();
for value in values {
let accessor = JsonAccess::new(value);
match writer
.insert(|column| accessor.access(&[&column.name], Some(&column.data_type)))
match writer.do_insert(|column| accessor.access(&[&column.name], &column.data_type))
{
Ok(_) => {}
Err(err) => errors.push(err),
Expand Down
4 changes: 2 additions & 2 deletions src/connector/src/parser/csv_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ impl CsvParser {
// The header row does not output a row, so we return early.
return Ok(());
}
writer.insert(|desc| {
writer.do_insert(|desc| {
if let Some(i) = headers.iter().position(|name| name == &desc.name) {
let value = fields.get_mut(i).map(std::mem::take).unwrap_or_default();
if value.is_empty() {
Expand All @@ -125,7 +125,7 @@ impl CsvParser {
})?;
} else {
fields.reverse();
writer.insert(|desc| {
writer.do_insert(|desc| {
if let Some(value) = fields.pop() {
if value.is_empty() {
return Ok(None);
Expand Down
1 change: 1 addition & 0 deletions src/connector/src/parser/json_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ pub struct JsonAccessBuilder {
impl AccessBuilder for JsonAccessBuilder {
#[allow(clippy::unused_async)]
async fn generate_accessor(&mut self, payload: Vec<u8>) -> ConnectorResult<AccessImpl<'_, '_>> {
// XXX: When will we enter this branch?
if payload.is_empty() {
self.value = Some("{}".into());
} else {
Expand Down
8 changes: 4 additions & 4 deletions src/connector/src/parser/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -490,7 +490,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
///
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn insert(
pub fn do_insert(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<Datum>,
) -> AccessResult<()> {
Expand All @@ -501,7 +501,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// produces one [`Datum`] by corresponding [`SourceColumnDesc`].
///
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn delete(
pub fn do_delete(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<Datum>,
) -> AccessResult<()> {
Expand All @@ -512,7 +512,7 @@ impl SourceStreamChunkRowWriter<'_> {
/// produces two [`Datum`]s as old and new value by corresponding [`SourceColumnDesc`].
///
/// See the [struct-level documentation](SourceStreamChunkRowWriter) for more details.
pub fn update(
pub fn do_update(
&mut self,
f: impl FnMut(&SourceColumnDesc) -> AccessResult<(Datum, Datum)>,
) -> AccessResult<()> {
Expand Down Expand Up @@ -590,7 +590,7 @@ pub trait ByteStreamSourceParser: Send + Debug + Sized + 'static {
}

fn emit_empty_row<'a>(&'a mut self, mut writer: SourceStreamChunkRowWriter<'a>) {
_ = writer.insert(|_column| Ok(None));
_ = writer.do_insert(|_column| Ok(None));
}
}

Expand Down
13 changes: 5 additions & 8 deletions src/connector/src/parser/plain_parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use risingwave_common::bail;

use super::unified::json::TimestamptzHandling;
use super::unified::ChangeEvent;
use super::unified::kv_event::KvEvent;
use super::{
AccessBuilderImpl, ByteStreamSourceParser, EncodingProperties, EncodingType,
SourceStreamChunkRowWriter, SpecificParserConfig,
Expand All @@ -24,7 +24,6 @@ use crate::error::ConnectorResult;
use crate::parser::bytes_parser::BytesAccessBuilder;
use crate::parser::simd_json_parser::DebeziumJsonAccessBuilder;
use crate::parser::unified::debezium::parse_transaction_meta;
use crate::parser::unified::upsert::UpsertChangeEvent;
use crate::parser::unified::AccessImpl;
use crate::parser::upsert_parser::get_key_column_name;
use crate::parser::{BytesProperties, ParseResult, ParserFormat};
Expand Down Expand Up @@ -103,22 +102,20 @@ impl PlainParser {
};
}

// reuse upsert component but always insert
let mut row_op: UpsertChangeEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> =
UpsertChangeEvent::default();
let mut row_op: KvEvent<AccessImpl<'_, '_>, AccessImpl<'_, '_>> = KvEvent::default();

if let Some(data) = key
&& let Some(key_builder) = self.key_builder.as_mut()
{
// key is optional in format plain
row_op = row_op.with_key(key_builder.generate_accessor(data).await?);
row_op.with_key(key_builder.generate_accessor(data).await?);
}
if let Some(data) = payload {
// the data part also can be an empty vec
row_op = row_op.with_value(self.payload_builder.generate_accessor(data).await?);
row_op.with_value(self.payload_builder.generate_accessor(data).await?);
}

writer.insert(|column: &SourceColumnDesc| row_op.access_field(column))?;
writer.do_insert(|column: &SourceColumnDesc| row_op.access_field(column))?;

Ok(ParseResult::Rows)
}
Expand Down
3 changes: 2 additions & 1 deletion src/connector/src/parser/protobuf/parser.rs
Original file line number Diff line number Diff line change
Expand Up @@ -896,7 +896,8 @@ mod test {
}

fn pb_eq(a: &ProtobufAccess, field_name: &str, value: ScalarImpl) {
let d = a.access(&[field_name], None).unwrap().unwrap();
let dummy_type = DataType::Varchar;
let d = a.access(&[field_name], &dummy_type).unwrap().unwrap();
assert_eq!(d, value, "field: {} value: {:?}", field_name, d);
}

Expand Down
Loading

0 comments on commit e5e2f02

Please sign in to comment.