Skip to content

Commit

Permalink
Merge branch 'main' into refactor/rw_futures_util_crate
Browse files Browse the repository at this point in the history
  • Loading branch information
TennyZhuang authored Jan 16, 2024
2 parents fad25d4 + 222bbd1 commit b7a8f24
Show file tree
Hide file tree
Showing 10 changed files with 179 additions and 86 deletions.
22 changes: 4 additions & 18 deletions e2e_test/source/basic/pubsub.slt
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,14 @@
statement error
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5981',
pubsub.split_count = 3
pubsub.emulator_host = 'invalid_host:5981'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s1 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-1',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

statement ok
Expand All @@ -25,25 +23,14 @@ statement error
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-not-2',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

statement ok
CREATE TABLE s2 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-2',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 3
) FORMAT PLAIN ENCODE JSON;

# fail with invalid split count
statement error
CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-3',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 0
pubsub.emulator_host = 'localhost:5980'
) FORMAT PLAIN ENCODE JSON;

# fail if both start_offset and start_snapshot are provided
Expand All @@ -52,7 +39,6 @@ CREATE TABLE s3 (v1 int, v2 varchar) WITH (
connector = 'google_pubsub',
pubsub.subscription = 'test-subscription-3',
pubsub.emulator_host = 'localhost:5980',
pubsub.split_count = 2,
pubsub.start_offset = "121212",
pubsub.start_snapshot = "snapshot-that-doesnt-exist"
) FORMAT PLAIN ENCODE JSON;
Expand Down
27 changes: 27 additions & 0 deletions src/common/src/catalog/physical_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,13 @@ use std::collections::HashMap;
use anyhow::anyhow;
use fixedbitset::FixedBitSet;
use itertools::Itertools;
use risingwave_pb::catalog::Table;
use risingwave_pb::common::PbColumnOrder;
use risingwave_pb::plan_common::StorageTableDesc;

use super::{ColumnDesc, ColumnId, TableId};
use crate::catalog::hummock::TABLE_OPTION_DUMMY_RETENTION_SECOND;
use crate::catalog::TableOption;
use crate::util::sort_util::ColumnOrder;

/// Includes necessary information for compute node to access data of the table.
Expand Down Expand Up @@ -138,4 +141,28 @@ impl TableDesc {
});
id_to_idx
}

pub fn from_pb_table(table: &Table) -> Self {
let table_options = TableOption::build_table_option(&table.properties);
Self {
table_id: TableId::new(table.id),
pk: table.pk.iter().map(ColumnOrder::from_protobuf).collect(),
columns: table
.columns
.iter()
.map(|col| ColumnDesc::from(col.column_desc.as_ref().unwrap()))
.collect(),
distribution_key: table.distribution_key.iter().map(|i| *i as _).collect(),
stream_key: table.stream_key.iter().map(|i| *i as _).collect(),
vnode_col_index: table.vnode_col_index.map(|i| i as _),
append_only: table.append_only,
retention_seconds: table_options
.retention_seconds
.unwrap_or(TABLE_OPTION_DUMMY_RETENTION_SECOND),
value_indices: table.value_indices.iter().map(|i| *i as _).collect(),
read_prefix_len_hint: table.read_prefix_len_hint as _,
watermark_columns: table.watermark_indices.iter().map(|i| *i as _).collect(),
versioned: table.version.is_some(),
}
}
}
6 changes: 5 additions & 1 deletion src/common/src/hash/consistent_hash/vnode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use parse_display::Display;
use crate::array::{Array, ArrayImpl, DataChunk};
use crate::hash::Crc32HashCode;
use crate::row::{Row, RowExt};
use crate::types::{DataType, DatumRef, ScalarRefImpl};
use crate::types::{DataType, Datum, DatumRef, ScalarImpl, ScalarRefImpl};
use crate::util::hash_util::Crc32FastBuilder;
use crate::util::row_id::extract_vnode_id_from_row_id;

Expand Down Expand Up @@ -96,6 +96,10 @@ impl VirtualNode {
self.0 as _
}

pub const fn to_datum(self) -> Datum {
Some(ScalarImpl::Int16(self.to_scalar()))
}

/// Creates a virtual node from the given big-endian bytes representation.
pub const fn from_be_bytes(bytes: [u8; Self::SIZE]) -> Self {
let inner = VirtualNodeInner::from_be_bytes(bytes);
Expand Down
9 changes: 2 additions & 7 deletions src/connector/src/source/google_pubsub/enumerator/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ impl SplitEnumerator for PubsubSplitEnumerator {
properties: Self::Properties,
_context: SourceEnumeratorContextRef,
) -> anyhow::Result<PubsubSplitEnumerator> {
let split_count = properties.split_count;
let subscription = properties.subscription.to_owned();

if split_count < 1 {
bail!("split_count must be >= 1")
}

if properties.credentials.is_none() && properties.emulator_host.is_none() {
bail!("credentials must be set if not using the pubsub emulator")
}
Expand Down Expand Up @@ -87,7 +82,7 @@ impl SplitEnumerator for PubsubSplitEnumerator {
}
(None, Some(snapshot)) => Some(SeekTo::Snapshot(snapshot)),
(Some(_), Some(_)) => {
bail!("specify atmost one of start_offset or start_snapshot")
bail!("specify at most one of start_offset or start_snapshot")
}
};

Expand All @@ -99,7 +94,7 @@ impl SplitEnumerator for PubsubSplitEnumerator {

Ok(Self {
subscription,
split_count,
split_count: 1,
})
}

Expand Down
13 changes: 3 additions & 10 deletions src/connector/src/source/google_pubsub/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ pub mod source;
pub mod split;

pub use enumerator::*;
use serde_with::{serde_as, DisplayFromStr};
pub use source::*;
pub use split::*;
use with_options::WithOptions;
Expand All @@ -30,13 +29,8 @@ use crate::source::SourceProperties;

pub const GOOGLE_PUBSUB_CONNECTOR: &str = "google_pubsub";

#[serde_as]
#[derive(Clone, Debug, Deserialize, WithOptions)]
pub struct PubsubProperties {
#[serde_as(as = "DisplayFromStr")]
#[serde(rename = "pubsub.split_count")]
pub split_count: u32,

/// pubsub subscription to consume messages from
/// The subscription should be configured with the `retain-on-ack` property to enable
/// message recovery within risingwave.
Expand All @@ -48,19 +42,19 @@ pub struct PubsubProperties {
#[serde(rename = "pubsub.emulator_host")]
pub emulator_host: Option<String>,

/// credentials JSON object encoded with base64
/// `credentials` is a JSON string containing the service account credentials.
/// See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account).
/// The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).
#[serde(rename = "pubsub.credentials")]
pub credentials: Option<String>,

/// `start_offset` is a numeric timestamp, ideallly the publish timestamp of a message
/// `start_offset` is a numeric timestamp, ideally the publish timestamp of a message
/// in the subscription. If present, the connector will attempt to seek the subscription
/// to the timestamp and start consuming from there. Note that the seek operation is
/// subject to limitations around the message retention policy of the subscription. See
/// [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for
/// more details.
#[serde(rename = "pubsub.start_offset")]
#[serde(rename = "pubsub.start_offset.nanos")]
pub start_offset: Option<String>,

/// `start_snapshot` is a named pub/sub snapshot. If present, the connector will first seek
Expand Down Expand Up @@ -127,7 +121,6 @@ mod tests {
let default_properties = PubsubProperties {
credentials: None,
emulator_host: None,
split_count: 1,
start_offset: None,
start_snapshot: None,
subscription: String::from("test-subscription"),
Expand Down
9 changes: 3 additions & 6 deletions src/connector/with_options_source.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -471,9 +471,6 @@ PosixFsProperties:
default: Default::default
PubsubProperties:
fields:
- name: pubsub.split_count
field_type: u32
required: true
- name: pubsub.subscription
field_type: String
comments: pubsub subscription to consume messages from The subscription should be configured with the `retain-on-ack` property to enable message recovery within risingwave.
Expand All @@ -484,11 +481,11 @@ PubsubProperties:
required: false
- name: pubsub.credentials
field_type: String
comments: credentials JSON object encoded with base64 See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).
comments: '`credentials` is a JSON string containing the service account credentials. See the [service-account credentials guide](https://developers.google.com/workspace/guides/create-credentials#create_credentials_for_a_service_account). The service account must have the `pubsub.subscriber` [role](https://cloud.google.com/pubsub/docs/access-control#roles).'
required: false
- name: pubsub.start_offset
- name: pubsub.start_offset.nanos
field_type: String
comments: '`start_offset` is a numeric timestamp, ideallly the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.'
comments: '`start_offset` is a numeric timestamp, ideally the publish timestamp of a message in the subscription. If present, the connector will attempt to seek the subscription to the timestamp and start consuming from there. Note that the seek operation is subject to limitations around the message retention policy of the subscription. See [Seeking to a timestamp](https://cloud.google.com/pubsub/docs/replay-overview#seeking_to_a_timestamp) for more details.'
required: false
- name: pubsub.start_snapshot
field_type: String
Expand Down
4 changes: 4 additions & 0 deletions src/storage/src/table/batch_table/storage_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,10 @@ impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
pub fn table_id(&self) -> TableId {
self.table_id
}

pub fn vnodes(&self) -> &Arc<Bitmap> {
self.distribution.vnodes()
}
}
/// Point get
impl<S: StateStore, SD: ValueRowSerde> StorageTableInner<S, SD> {
Expand Down
13 changes: 8 additions & 5 deletions src/storage/src/table/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -115,11 +115,15 @@ impl TableDistribution {
Self::singleton_vnode_bitmap_ref().clone()
}

pub fn all_vnodes() -> Arc<Bitmap> {
pub fn all_vnodes_ref() -> &'static Arc<Bitmap> {
/// A bitmap that all vnodes are set.
static ALL_VNODES: LazyLock<Arc<Bitmap>> =
LazyLock::new(|| Bitmap::ones(VirtualNode::COUNT).into());
ALL_VNODES.clone()
&ALL_VNODES
}

pub fn all_vnodes() -> Arc<Bitmap> {
Self::all_vnodes_ref().clone()
}

/// Distribution that accesses all vnodes, mainly used for tests.
Expand Down Expand Up @@ -272,10 +276,9 @@ pub fn compute_vnode(row: impl Row, indices: &[usize], vnodes: &Bitmap) -> Virtu
vnode
}

pub fn get_vnode_from_row(row: impl Row, index: usize, _vnodes: &Bitmap) -> VirtualNode {
pub fn get_vnode_from_row(row: impl Row, index: usize, vnodes: &Bitmap) -> VirtualNode {
let vnode = VirtualNode::from_datum(row.datum_at(index));
// TODO: enable this check when `WatermarkFilterExecutor` use `StorageTable` to read global max watermark
// check_vnode_is_set(vnode, vnodes);
check_vnode_is_set(vnode, vnodes);

tracing::debug!(target: "events::storage::storage_table", "get vnode from row: {:?} vnode column index {:?} => {}", row, index, vnode);

Expand Down
Loading

0 comments on commit b7a8f24

Please sign in to comment.