Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/risingwavelabs/risingwave i…
Browse files Browse the repository at this point in the history
…nto li0k/storage_support_merge_cg_part
  • Loading branch information
Li0k committed Aug 27, 2024
2 parents b3a4f7c + 22926d6 commit 596ddbb
Show file tree
Hide file tree
Showing 31 changed files with 398 additions and 194 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions proto/batch_plan.proto
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,13 @@ message SourceNode {
map<string, secret.SecretRef> secret_refs = 6;
}

message IcebergScanNode {
repeated plan_common.ColumnCatalog columns = 1;
map<string, string> with_properties = 2;
repeated bytes split = 3;
map<string, secret.SecretRef> secret_refs = 4;
}

message FileScanNode {
enum FileFormat {
FILE_FORMAT_UNSPECIFIED = 0;
Expand Down Expand Up @@ -365,6 +372,7 @@ message PlanNode {
MaxOneRowNode max_one_row = 36;
LogRowSeqScanNode log_row_seq_scan = 37;
FileScanNode file_scan = 38;
IcebergScanNode iceberg_scan = 39;
// The following nodes are used for testing.
bool block_executor = 100;
bool busy_loop_executor = 101;
Expand Down
4 changes: 4 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -937,6 +937,10 @@ message TableStats {
int64 total_key_size = 1;
int64 total_value_size = 2;
int64 total_key_count = 3;

// `total_compressed_size`` represents the size that the table takes up in the output sst
// and this field is only filled and used by CN flushes, not compactor compaction
uint64 total_compressed_size = 4;
}

message HummockVersionStats {
Expand Down
4 changes: 2 additions & 2 deletions risedev.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ profile:
# config-path: src/config/example.toml
steps:
# If you want to use the local s3 storage, enable the following line
# - use: minio
- use: minio

# If you want to use aws-s3, configure AK and SK in env var and enable the following lines:
# - use: aws-s3
Expand All @@ -40,7 +40,7 @@ profile:
- use: frontend

# If you want to enable compactor, uncomment the following line, and enable either minio or aws-s3 as well.
# - use: compactor
- use: compactor

# If you want to create source from Kafka, uncomment the following lines
# - use: kafka
Expand Down
74 changes: 73 additions & 1 deletion src/batch/src/executor/iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ use futures_async_stream::try_stream;
use futures_util::stream::StreamExt;
use iceberg::scan::FileScanTask;
use iceberg::spec::TableMetadata;
use itertools::Itertools;
use risingwave_common::array::arrow::IcebergArrowConvert;
use risingwave_common::catalog::Schema;
use risingwave_common::catalog::{Field, Schema};
use risingwave_common::types::DataType;
use risingwave_connector::sink::iceberg::IcebergConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::{ConnectorProperties, SplitImpl, SplitMetaData};
use risingwave_connector::WithOptionsSecResolved;
use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::error::BatchError;
use crate::executor::{DataChunk, Executor};
use crate::task::BatchTaskContext;

pub struct IcebergScanExecutor {
iceberg_config: IcebergConfig,
Expand Down Expand Up @@ -108,3 +116,67 @@ impl IcebergScanExecutor {
}
}
}

pub struct IcebergScanExecutorBuilder {}

#[async_trait::async_trait]
impl BoxedExecutorBuilder for IcebergScanExecutorBuilder {
async fn new_boxed_executor<C: BatchTaskContext>(
source: &ExecutorBuilder<'_, C>,
inputs: Vec<BoxedExecutor>,
) -> crate::error::Result<BoxedExecutor> {
ensure!(
inputs.is_empty(),
"Iceberg source should not have input executor!"
);
let source_node = try_match_expand!(
source.plan_node().get_node_body().unwrap(),
NodeBody::IcebergScan
)?;

// prepare connector source
let options_with_secret = WithOptionsSecResolved::new(
source_node.with_properties.clone(),
source_node.secret_refs.clone(),
);
let config = ConnectorProperties::extract(options_with_secret.clone(), false)
.map_err(BatchError::connector)?;

let split_list = source_node
.split
.iter()
.map(|split| SplitImpl::restore_from_bytes(split).unwrap())
.collect_vec();
assert_eq!(split_list.len(), 1);

let fields = source_node
.columns
.iter()
.map(|prost| {
let column_desc = prost.column_desc.as_ref().unwrap();
let data_type = DataType::from(column_desc.column_type.as_ref().unwrap());
let name = column_desc.name.clone();
Field::with_name(data_type, name)
})
.collect();
let schema = Schema::new(fields);

if let ConnectorProperties::Iceberg(iceberg_properties) = config
&& let SplitImpl::Iceberg(split) = &split_list[0]
{
let iceberg_properties: IcebergProperties = *iceberg_properties;
let split: IcebergSplit = split.clone();
Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
Some(split.snapshot_id),
split.table_meta.deserialize(),
split.files.into_iter().map(|x| x.deserialize()).collect(),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
)))
} else {
unreachable!()
}
}
}
1 change: 1 addition & 0 deletions src/batch/src/executor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -243,6 +243,7 @@ impl<'a, C: BatchTaskContext> ExecutorBuilder<'a, C> {
NodeBody::SortOverWindow => SortOverWindowExecutor,
NodeBody::MaxOneRow => MaxOneRowExecutor,
NodeBody::FileScan => FileScanExecutorBuilder,
NodeBody::IcebergScan => IcebergScanExecutorBuilder,
// Follow NodeBody only used for test
NodeBody::BlockExecutor => BlockExecutorBuilder,
NodeBody::BusyLoopExecutor => BusyLoopExecutorBuilder,
Expand Down
65 changes: 23 additions & 42 deletions src/batch/src/executor/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use risingwave_common::array::{DataChunk, Op, StreamChunk};
use risingwave_common::catalog::{ColumnDesc, ColumnId, Field, Schema, TableId};
use risingwave_common::types::DataType;
use risingwave_connector::parser::SpecificParserConfig;
use risingwave_connector::source::iceberg::{IcebergProperties, IcebergSplit};
use risingwave_connector::source::monitor::SourceMetrics;
use risingwave_connector::source::reader::reader::SourceReader;
use risingwave_connector::source::{
Expand All @@ -32,7 +31,7 @@ use risingwave_pb::batch_plan::plan_node::NodeBody;

use super::Executor;
use crate::error::{BatchError, Result};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder, IcebergScanExecutor};
use crate::executor::{BoxedExecutor, BoxedExecutorBuilder, ExecutorBuilder};
use crate::task::BatchTaskContext;

pub struct SourceExecutor {
Expand Down Expand Up @@ -103,46 +102,28 @@ impl BoxedExecutorBuilder for SourceExecutor {
.collect();
let schema = Schema::new(fields);

if let ConnectorProperties::Iceberg(iceberg_properties) = config {
let iceberg_properties: IcebergProperties = *iceberg_properties;
assert_eq!(split_list.len(), 1);
if let SplitImpl::Iceberg(split) = &split_list[0] {
let split: IcebergSplit = split.clone();
Ok(Box::new(IcebergScanExecutor::new(
iceberg_properties.to_iceberg_config(),
Some(split.snapshot_id),
split.table_meta.deserialize(),
split.files.into_iter().map(|x| x.deserialize()).collect(),
source.context.get_config().developer.chunk_size,
schema,
source.plan_node().get_identity().clone(),
)))
} else {
unreachable!()
}
} else {
let source_reader = SourceReader {
config,
columns,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
.developer
.connector_message_buffer_size,
};

Ok(Box::new(SourceExecutor {
source: source_reader,
column_ids,
metrics: source.context().source_metrics(),
source_id: TableId::new(source_node.source_id),
split_list,
schema,
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context().get_config().developer.chunk_size,
}))
}
assert!(!matches!(config, ConnectorProperties::Iceberg(_)));
let source_reader = SourceReader {
config,
columns,
parser_config,
connector_message_buffer_size: source
.context()
.get_config()
.developer
.connector_message_buffer_size,
};

Ok(Box::new(SourceExecutor {
source: source_reader,
column_ids,
metrics: source.context().source_metrics(),
source_id: TableId::new(source_node.source_id),
split_list,
schema,
identity: source.plan_node().get_identity().clone(),
chunk_size: source.context().get_config().developer.chunk_size,
}))
}
}

Expand Down
44 changes: 2 additions & 42 deletions src/frontend/src/handler/query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ use std::collections::HashSet;
use std::sync::Arc;
use std::time::Instant;

use futures::StreamExt;
use itertools::Itertools;
use pgwire::pg_field_descriptor::PgFieldDescriptor;
use pgwire::pg_response::{PgResponse, StatementType};
use pgwire::types::Format;
use postgres_types::FromSql;
use risingwave_batch::worker_manager::worker_node_manager::WorkerNodeSelector;
use risingwave_common::bail_not_implemented;
use risingwave_common::catalog::Schema;
Expand Down Expand Up @@ -424,47 +422,9 @@ async fn execute(
let stmt_type = plan_fragmenter_result.stmt_type;

let query_start_time = Instant::now();
let (mut row_stream, pg_descs) =
let (row_stream, pg_descs) =
create_stream(session.clone(), plan_fragmenter_result, formats).await?;

let row_cnt: Option<i32> = match stmt_type {
StatementType::SELECT
| StatementType::INSERT_RETURNING
| StatementType::DELETE_RETURNING
| StatementType::UPDATE_RETURNING => None,

StatementType::INSERT | StatementType::DELETE | StatementType::UPDATE => {
let first_row_set = row_stream.next().await;
let first_row_set = match first_row_set {
None => {
return Err(RwError::from(ErrorCode::InternalError(
"no affected rows in output".to_string(),
)))
}
Some(row) => row?,
};
let affected_rows_str = first_row_set[0].values()[0]
.as_ref()
.expect("compute node should return affected rows in output");
if let Format::Binary = first_field_format {
Some(
i64::from_sql(&postgres_types::Type::INT8, affected_rows_str)
.unwrap()
.try_into()
.expect("affected rows count large than i64"),
)
} else {
Some(
String::from_utf8(affected_rows_str.to_vec())
.unwrap()
.parse()
.unwrap_or_default(),
)
}
}
_ => unreachable!(),
};

// We need to do some post work after the query is finished and before the `Complete` response
// it sent. This is achieved by the `callback` in `PgResponse`.
let callback = async move {
Expand Down Expand Up @@ -510,7 +470,7 @@ async fn execute(
};

Ok(PgResponse::builder(stmt_type)
.row_cnt_opt(row_cnt)
.row_cnt_format_opt(Some(first_field_format))
.values(row_stream, pg_descs)
.callback(callback)
.into())
Expand Down
6 changes: 2 additions & 4 deletions src/frontend/src/optimizer/plan_node/batch_iceberg_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::rc::Rc;

use pretty_xmlish::{Pretty, XmlNode};
use risingwave_pb::batch_plan::plan_node::NodeBody;
use risingwave_pb::batch_plan::SourceNode;
use risingwave_pb::batch_plan::IcebergScanNode;
use risingwave_sqlparser::ast::AsOf;

use super::batch::prelude::*;
Expand Down Expand Up @@ -99,9 +99,7 @@ impl ToBatchPb for BatchIcebergScan {
fn to_batch_prost_body(&self) -> NodeBody {
let source_catalog = self.source_catalog().unwrap();
let (with_properties, secret_refs) = source_catalog.with_properties.clone().into_parts();
NodeBody::Source(SourceNode {
source_id: source_catalog.id,
info: Some(source_catalog.info.clone()),
NodeBody::IcebergScan(IcebergScanNode {
columns: self
.core
.column_catalog
Expand Down
24 changes: 21 additions & 3 deletions src/frontend/src/scheduler/distributed/stage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1052,9 +1052,7 @@ impl StageRunner {
node_body: Some(NodeBody::LogRowSeqScan(scan_node)),
}
}
PlanNodeType::BatchSource
| PlanNodeType::BatchKafkaScan
| PlanNodeType::BatchIcebergScan => {
PlanNodeType::BatchSource | PlanNodeType::BatchKafkaScan => {
let node_body = execution_plan_node.node.clone();
let NodeBody::Source(mut source_node) = node_body else {
unreachable!();
Expand All @@ -1074,6 +1072,26 @@ impl StageRunner {
node_body: Some(NodeBody::Source(source_node)),
}
}
PlanNodeType::BatchIcebergScan => {
let node_body = execution_plan_node.node.clone();
let NodeBody::IcebergScan(mut iceberg_scan_node) = node_body else {
unreachable!();
};

let partition = partition
.expect("no partition info for seq scan")
.into_source()
.expect("PartitionInfo should be SourcePartitionInfo");
iceberg_scan_node.split = partition
.into_iter()
.map(|split| split.encode_to_bytes().into())
.collect_vec();
PbPlanNode {
children: vec![],
identity,
node_body: Some(NodeBody::IcebergScan(iceberg_scan_node)),
}
}
_ => {
let children = execution_plan_node
.children
Expand Down
Loading

0 comments on commit 596ddbb

Please sign in to comment.