Skip to content

Commit

Permalink
Merge branch 'main' into yiming/non-async-complete-barrier
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 committed Feb 18, 2024
2 parents 232a28a + a256378 commit 5b0c7c1
Show file tree
Hide file tree
Showing 107 changed files with 3,039 additions and 3,946 deletions.
10 changes: 10 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ members = [
"src/expr/macro",
"src/expr/udf",
"src/frontend",
"src/frontend/macro",
"src/frontend/planner_test",
"src/java_binding",
"src/jni_core",
Expand Down
8 changes: 4 additions & 4 deletions e2e_test/batch/catalog/issue_8791.slt.part
Original file line number Diff line number Diff line change
@@ -1,16 +1,16 @@
# UNION and other complex queries should also be in local mode
query I
SELECT name FROM pg_catalog.pg_settings union select 'a';
SELECT amname FROM pg_catalog.pg_am union select 'a';
----
a

query T
SELECT name FROM (SELECT pg_catalog.lower(name) AS name FROM pg_catalog.pg_settings UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(name,1,0)=''
LIMIT 1000
SELECT amname FROM (SELECT pg_catalog.lower(amname) AS amname FROM pg_catalog.pg_am UNION ALL SELECT 'session authorization' UNION ALL SELECT 'all') ss WHERE substring(amname,1,0)=''
LIMIT 1000;
----
session authorization
all

query I
with q as ( select name FROM pg_catalog.pg_settings ) select * from q;
with q as ( select amname FROM pg_catalog.pg_am ) select * from q;
----
30 changes: 15 additions & 15 deletions e2e_test/batch/catalog/pg_class.slt.part
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
query ITIT
SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class ORDER BY oid limit 15;
----
1 pg_type 1 v
2 pg_namespace 1 v
3 pg_cast 1 v
4 pg_matviews 1 v
5 pg_user 1 v
6 pg_class 1 v
7 pg_index 1 v
8 pg_opclass 1 v
9 pg_collation 1 v
10 pg_am 1 v
11 pg_operator 1 v
12 pg_views 1 v
13 pg_attribute 1 v
14 pg_database 1 v
1 columns 1 v
2 tables 1 v
3 views 1 v
4 pg_am 1 v
5 pg_attrdef 1 v
6 pg_attribute 1 v
7 pg_auth_members 1 v
8 pg_cast 1 r
9 pg_class 1 v
10 pg_collation 1 v
11 pg_constraint 1 v
12 pg_conversion 1 v
13 pg_database 1 v
14 pg_depend 1 v
15 pg_description 1 v

query ITIT
SELECT oid,relname,relowner,relkind FROM pg_catalog.pg_class WHERE oid = 'pg_namespace'::regclass;
----
2 pg_namespace 1 v
24 pg_namespace 1 v
45 changes: 44 additions & 1 deletion e2e_test/batch/catalog/pg_settings.slt.part
Original file line number Diff line number Diff line change
@@ -1,6 +1,49 @@
query TT
SELECT * FROM pg_catalog.pg_settings;
SELECT name FROM pg_catalog.pg_settings order by name;
----
application_name
background_ddl
batch_enable_distributed_dml
batch_parallelism
bytea_output
client_encoding
client_min_messages
create_compaction_group_for_mv
datestyle
extra_float_digits
idle_in_transaction_session_timeout
intervalstyle
lock_timeout
max_split_range_gap
query_epoch
query_mode
row_security
rw_batch_enable_lookup_join
rw_batch_enable_sort_agg
rw_enable_join_ordering
rw_enable_share_plan
rw_enable_two_phase_agg
rw_force_split_distinct_agg
rw_force_two_phase_agg
rw_implicit_flush
rw_streaming_allow_jsonb_in_stream_key
rw_streaming_enable_bushy_join
rw_streaming_enable_delta_join
rw_streaming_over_window_cache_policy
search_path
server_encoding
server_version
server_version_num
sink_decouple
standard_conforming_strings
statement_timeout
streaming_enable_arrangement_backfill
streaming_parallelism
streaming_rate_limit
synchronize_seqscans
timezone
transaction_isolation
visibility_mode

query TT
SELECT * FROM pg_catalog.pg_settings where name='dummy';
Expand Down
36 changes: 12 additions & 24 deletions src/batch/src/executor/sys_row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,7 @@
use futures_async_stream::try_stream;
use itertools::Itertools;
use risingwave_common::array::DataChunk;
use risingwave_common::catalog::{ColumnDesc, ColumnId, Schema, SysCatalogReaderRef, TableId};
use risingwave_common::row::{OwnedRow, Row};
use risingwave_common::types::ToOwnedDatum;
use risingwave_common::catalog::{ColumnDesc, Schema, SysCatalogReaderRef, TableId};
use risingwave_pb::batch_plan::plan_node::NodeBody;

use crate::error::{BatchError, Result};
Expand All @@ -29,7 +27,7 @@ use crate::task::BatchTaskContext;
pub struct SysRowSeqScanExecutor {
table_id: TableId,
schema: Schema,
column_ids: Vec<ColumnId>,
column_indices: Vec<usize>,
identity: String,

sys_catalog_reader: SysCatalogReaderRef,
Expand All @@ -39,14 +37,14 @@ impl SysRowSeqScanExecutor {
pub fn new(
table_id: TableId,
schema: Schema,
column_id: Vec<ColumnId>,
column_indices: Vec<usize>,
identity: String,
sys_catalog_reader: SysCatalogReaderRef,
) -> Self {
Self {
table_id,
schema,
column_ids: column_id,
column_indices,
identity,
sys_catalog_reader,
}
Expand Down Expand Up @@ -78,12 +76,15 @@ impl BoxedExecutorBuilder for SysRowSeqScanExecutorBuilder {
.map(|column_desc| ColumnDesc::from(column_desc.clone()))
.collect_vec();

let column_ids = column_descs.iter().map(|d| d.column_id).collect_vec();
let column_indices = column_descs
.iter()
.map(|d| d.column_id.get_id() as usize)
.collect_vec();
let schema = Schema::new(column_descs.iter().map(Into::into).collect_vec());
Ok(Box::new(SysRowSeqScanExecutor::new(
table_id,
schema,
column_ids,
column_indices,
source.plan_node().get_identity().clone(),
sys_catalog_reader,
)))
Expand All @@ -107,26 +108,13 @@ impl Executor for SysRowSeqScanExecutor {
impl SysRowSeqScanExecutor {
#[try_stream(boxed, ok = DataChunk, error = BatchError)]
async fn do_executor(self: Box<Self>) {
let rows = self
let chunk = self
.sys_catalog_reader
.read_table(&self.table_id)
.await
.map_err(BatchError::SystemTable)?;
let filtered_rows = rows
.iter()
.map(|row| {
let datums = self
.column_ids
.iter()
.map(|column_id| row.datum_at(column_id.get_id() as usize).to_owned_datum())
.collect_vec();
OwnedRow::new(datums)
})
.collect_vec();

if !filtered_rows.is_empty() {
let chunk = DataChunk::from_rows(&filtered_rows, &self.schema.data_types());
yield chunk
if chunk.cardinality() != 0 {
yield chunk.project(&self.column_indices);
}
}
}
4 changes: 2 additions & 2 deletions src/common/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,9 @@ use risingwave_pb::catalog::HandleConflictBehavior as PbHandleConflictBehavior;
use risingwave_pb::plan_common::ColumnDescVersion;
pub use schema::{test_utils as schema_test_utils, Field, FieldDisplay, Schema};

use crate::array::DataChunk;
pub use crate::constants::hummock;
use crate::error::BoxedError;
use crate::row::OwnedRow;
use crate::types::DataType;

/// The global version of the catalog.
Expand Down Expand Up @@ -146,7 +146,7 @@ pub fn cdc_table_name_column_desc() -> ColumnDesc {
/// The local system catalog reader in the frontend node.
#[async_trait]
pub trait SysCatalogReader: Sync + Send + 'static {
async fn read_table(&self, table_id: &TableId) -> Result<Vec<OwnedRow>, BoxedError>;
async fn read_table(&self, table_id: &TableId) -> Result<DataChunk, BoxedError>;
}

pub type SysCatalogReaderRef = Arc<dyn SysCatalogReader>;
Expand Down
5 changes: 5 additions & 0 deletions src/common/src/util/chunk_coalesce.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,11 @@ impl DataChunkBuilder {
}
}

/// Build a data chunk from the current buffer.
pub fn finish(mut self) -> DataChunk {
self.build_data_chunk()
}

fn append_one_row_internal(&mut self, data_chunk: &DataChunk, row_idx: usize) {
self.do_append_one_row_from_datums(data_chunk.row_at(row_idx).0.iter());
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ risingwave_common_service = { workspace = true }
risingwave_connector = { workspace = true }
risingwave_dml = { workspace = true }
risingwave_expr = { workspace = true }
risingwave_frontend_macro = { path = "macro" }
risingwave_hummock_sdk = { workspace = true }
risingwave_object_store = { workspace = true }
risingwave_pb = { workspace = true }
Expand Down
16 changes: 16 additions & 0 deletions src/frontend/macro/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
[package]
name = "risingwave_frontend_macro"
version = { workspace = true }
edition = { workspace = true }
homepage = { workspace = true }
keywords = { workspace = true }
license = { workspace = true }
repository = { workspace = true }

[lib]
proc-macro = true

[dependencies]
proc-macro2 = "1"
quote = "1"
syn = { version = "2", features = ["full", "extra-traits"] }
Loading

0 comments on commit 5b0c7c1

Please sign in to comment.