Skip to content

Commit

Permalink
Merge branch 'main' into revert-14887
Browse files Browse the repository at this point in the history
  • Loading branch information
TennyZhuang authored Mar 13, 2024
2 parents a82597d + a37d538 commit 53822bb
Show file tree
Hide file tree
Showing 44 changed files with 687 additions and 560 deletions.
5 changes: 1 addition & 4 deletions ci/workflows/pull-request.yml
Original file line number Diff line number Diff line change
Expand Up @@ -701,10 +701,7 @@ steps:

- label: "e2e single-node binary test"
command: "ci/scripts/e2e-test.sh -p ci-dev -m single-node"
if: |
!(build.pull_request.labels includes "ci/skip-ci") && build.env("CI_STEPS") == null
|| build.pull_request.labels includes "ci/run-e2e-single-node-tests"
|| build.env("CI_STEPS") =~ /(^|,)e2e-single-node-tests?(,|$$)/
if: build.pull_request.labels includes "ci/run-e2e-single-node-tests" || build.env("CI_STEPS") =~ /(^|,)e2e-single-node-tests?(,|$$)/
depends_on:
- "build"
- "build-other"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-distributed.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
compactor-0:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-azblob.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-gcs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
6 changes: 3 additions & 3 deletions docker/docker-compose-with-hdfs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ services:
reservations:
memory: 1G
compute-node-0:
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.6.1_HDFS_2.7-x86_64"
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.7.1_HDFS_2.7-x86_64"
command:
- compute-node
- "--listen-addr"
Expand Down Expand Up @@ -132,7 +132,7 @@ services:
retries: 5
restart: always
frontend-node-0:
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.6.1_HDFS_2.7-x86_64"
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.7.1_HDFS_2.7-x86_64"
command:
- frontend-node
- "--listen-addr"
Expand Down Expand Up @@ -195,7 +195,7 @@ services:
retries: 5
restart: always
meta-node-0:
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.6.1_HDFS_2.7-x86_64"
image: "ghcr.io/risingwavelabs/risingwave:RisingWave_1.7.1_HDFS_2.7-x86_64"
command:
- meta-node
- "--listen-addr"
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-obs.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-oss.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose-with-s3.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
2 changes: 1 addition & 1 deletion docker/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
---
version: "3"
x-image: &image
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.6.1}
image: ${RW_IMAGE:-risingwavelabs/risingwave:v1.7.1}
services:
risingwave-standalone:
<<: *image
Expand Down
37 changes: 18 additions & 19 deletions src/batch/src/executor/row_seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ use risingwave_storage::store::PrefetchOptions;
use risingwave_storage::table::batch_table::storage_table::StorageTable;
use risingwave_storage::table::{collect_data_chunk, TableDistribution};
use risingwave_storage::{dispatch_state_store, StateStore};
use rw_futures_util::select_all;

use crate::error::{BatchError, Result};
use crate::executor::{
Expand Down Expand Up @@ -319,28 +318,28 @@ impl<S: StateStore> RowSeqScanExecutor<S> {
}

// Range Scan
let range_scans = select_all(range_scans.into_iter().map(|range_scan| {
let table = table.clone();
let histogram = histogram.clone();
Box::pin(Self::execute_range(
table,
range_scan,
// WARN: DO NOT use `select` to execute range scans concurrently
// it can consume too much memory if there're too many ranges.
for range in range_scans {
let stream = Self::execute_range(
table.clone(),
range,
ordered,
epoch.clone(),
chunk_size,
limit,
histogram,
))
}));
#[for_await]
for chunk in range_scans {
let chunk = chunk?;
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit
&& returned >= *limit
{
return Ok(());
histogram.clone(),
);
#[for_await]
for chunk in stream {
let chunk = chunk?;
returned += chunk.cardinality() as u64;
yield chunk;
if let Some(limit) = &limit
&& returned >= *limit
{
return Ok(());
}
}
}
}
Expand Down
33 changes: 15 additions & 18 deletions src/ctl/src/cmd_impl/hummock/list_kv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,10 @@

use core::ops::Bound::Unbounded;

use futures::StreamExt;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::is_max_epoch;
use risingwave_storage::hummock::CachePolicy;
use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreRead};
use risingwave_storage::store::{PrefetchOptions, ReadOptions, StateStoreIter, StateStoreRead};

use crate::common::HummockServiceOpts;
use crate::CtlContext;
Expand All @@ -36,22 +35,20 @@ pub async fn list_kv(
tracing::info!("using MAX EPOCH as epoch");
}
let range = (Unbounded, Unbounded);
let mut scan_result = Box::pin(
hummock
.iter(
range,
epoch,
ReadOptions {
table_id: TableId { table_id },
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::NotFill,
..Default::default()
},
)
.await?,
);
while let Some(item) = scan_result.next().await {
let (k, v) = item?;
let mut scan_result = hummock
.iter(
range,
epoch,
ReadOptions {
table_id: TableId { table_id },
prefetch_options: PrefetchOptions::prefetch_for_large_range_scan(),
cache_policy: CachePolicy::NotFill,
..Default::default()
},
)
.await?;
while let Some(item) = scan_result.try_next().await? {
let (k, v) = item;
let print_string = format!("[t{}]", k.user_key.table_id.table_id());
println!("{} {:?} => {:?}", print_string, k, v)
}
Expand Down
1 change: 1 addition & 0 deletions src/frontend/src/binder/expr/function.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1300,6 +1300,7 @@ impl Binder {
("pg_get_partkeydef", raw_literal(ExprImpl::literal_null(DataType::Varchar))),
("pg_encoding_to_char", raw_literal(ExprImpl::literal_varchar("UTF8".into()))),
("has_database_privilege", raw_literal(ExprImpl::literal_bool(true))),
("pg_stat_get_numscans", raw_literal(ExprImpl::literal_bigint(0))),
("pg_backend_pid", raw(|binder, _inputs| {
// FIXME: the session id is not global unique in multi-frontend env.
Ok(ExprImpl::literal_int(binder.session_id.0))
Expand Down
86 changes: 45 additions & 41 deletions src/jni_core/src/hummock_iterator.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
use std::sync::Arc;

use bytes::Bytes;
use futures::TryStreamExt;
use futures::{Stream, TryStreamExt};
use risingwave_common::catalog::ColumnDesc;
use risingwave_common::config::{MetricLevel, ObjectStoreConfig};
use risingwave_common::hash::VirtualNode;
Expand All @@ -37,20 +37,31 @@ use risingwave_storage::hummock::{
};
use risingwave_storage::monitor::{global_hummock_state_store_metrics, HummockStateStoreMetrics};
use risingwave_storage::row_serde::value_serde::ValueRowSerdeNew;
use risingwave_storage::store::{ReadOptions, StateStoreReadIterStream, StreamTypeOfIter};
use risingwave_storage::store::{ReadOptions, StateStoreIterExt};
use risingwave_storage::table::KeyedRow;
use rw_futures_util::select_all;
use tokio::sync::mpsc::unbounded_channel;

type SelectAllIterStream = impl StateStoreReadIterStream + Unpin;
type SelectAllIterStream = impl Stream<Item = StorageResult<KeyedRow<Bytes>>> + Unpin;
type SingleIterStream = impl Stream<Item = StorageResult<KeyedRow<Bytes>>>;

fn select_all_vnode_stream(
streams: Vec<StreamTypeOfIter<HummockStorageIterator>>,
) -> SelectAllIterStream {
fn select_all_vnode_stream(streams: Vec<SingleIterStream>) -> SelectAllIterStream {
select_all(streams.into_iter().map(Box::pin))
}

pub struct HummockJavaBindingIterator {
fn to_deserialized_stream(
iter: HummockStorageIterator,
row_serde: EitherSerde,
) -> SingleIterStream {
iter.into_stream(move |(key, value)| {
Ok(KeyedRow::new(
key.user_key.table_key.copy_into(),
row_serde.deserialize(value).map(OwnedRow::new)?,
))
})
}

pub struct HummockJavaBindingIterator {
stream: SelectAllIterStream,
}

Expand Down Expand Up @@ -87,6 +98,28 @@ impl HummockJavaBindingIterator {
0,
);

let table = read_plan.table_catalog.unwrap();
let versioned = table.version.is_some();
let table_columns = table
.columns
.into_iter()
.map(|c| ColumnDesc::from(c.column_desc.unwrap()));

// Decide which serializer to use based on whether the table is versioned or not.
let row_serde: EitherSerde = if versioned {
ColumnAwareSerde::new(
Arc::from_iter(0..table_columns.len()),
Arc::from_iter(table_columns),
)
.into()
} else {
BasicSerde::new(
Arc::from_iter(0..table_columns.len()),
Arc::from_iter(table_columns),
)
.into()
};

let mut streams = Vec::with_capacity(read_plan.vnode_ids.len());
let key_range = read_plan.key_range.unwrap();
let pin_version = PinnedVersion::new(
Expand All @@ -104,7 +137,7 @@ impl HummockJavaBindingIterator {
key_range,
read_plan.epoch,
);
let stream = reader
let iter = reader
.iter(
key_range,
read_plan.epoch,
Expand All @@ -116,45 +149,16 @@ impl HummockJavaBindingIterator {
read_version_tuple,
)
.await?;
streams.push(stream);
streams.push(to_deserialized_stream(iter, row_serde.clone()));
}

let stream = select_all_vnode_stream(streams);

let table = read_plan.table_catalog.unwrap();
let versioned = table.version.is_some();
let table_columns = table
.columns
.into_iter()
.map(|c| ColumnDesc::from(c.column_desc.unwrap()));

// Decide which serializer to use based on whether the table is versioned or not.
let row_serde = if versioned {
ColumnAwareSerde::new(
Arc::from_iter(0..table_columns.len()),
Arc::from_iter(table_columns),
)
.into()
} else {
BasicSerde::new(
Arc::from_iter(0..table_columns.len()),
Arc::from_iter(table_columns),
)
.into()
};

Ok(Self { row_serde, stream })
Ok(Self { stream })
}

pub async fn next(&mut self) -> StorageResult<Option<(Bytes, OwnedRow)>> {
let item = self.stream.try_next().await?;
Ok(match item {
Some((key, value)) => Some((
key.user_key.table_key.0,
OwnedRow::new(self.row_serde.deserialize(&value)?),
)),
None => None,
})
pub async fn next(&mut self) -> StorageResult<Option<KeyedRow<Bytes>>> {
self.stream.try_next().await
}
}

Expand Down
5 changes: 3 additions & 2 deletions src/jni_core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -360,10 +360,11 @@ extern "system" fn Java_com_risingwave_java_binding_Binding_iteratorNext<'a>(
iter.cursor = None;
Ok(JNI_FALSE)
}
Some((key, row)) => {
Some(keyed_row) => {
let (key, row) = keyed_row.into_parts();
iter.cursor = Some(RowCursor {
row,
extra: RowExtra::Key(key),
extra: RowExtra::Key(key.0),
});
Ok(JNI_TRUE)
}
Expand Down
Loading

0 comments on commit 53822bb

Please sign in to comment.