diff --git a/e2e_test/subscription/main.py b/e2e_test/subscription/main.py index 60db0df82c33a..a3079e2e6847e 100644 --- a/e2e_test/subscription/main.py +++ b/e2e_test/subscription/main.py @@ -1,5 +1,6 @@ import subprocess import psycopg2 +import threading import time @@ -274,7 +275,7 @@ def test_cursor_with_table_alter(): row = execute_query("fetch next from cur",conn) check_rows_data([1,2],row[0],"Insert") row = execute_query("fetch next from cur",conn) - assert(row == []) + assert row == [] row = execute_query("fetch next from cur",conn) check_rows_data([4,4,4],row[0],"Insert") execute_insert("insert into t1 values(5,5,5)",conn) @@ -285,7 +286,7 @@ def test_cursor_with_table_alter(): execute_insert("insert into t1 values(6,6)",conn) execute_insert("flush",conn) row = execute_query("fetch next from cur",conn) - assert(row == []) + assert row == [] row = execute_query("fetch next from cur",conn) check_rows_data([6,6],row[0],"Insert") drop_table_subscription() @@ -355,6 +356,52 @@ def test_rebuild_table(): check_rows_data([1,100],row[2],"UpdateInsert") drop_table_subscription() +def test_block_cursor(): + print(f"test_block_cursor") + create_table_subscription() + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + + execute_insert("declare cur subscription cursor for sub2 full",conn) + execute_insert("insert into t2 values(1,1)",conn) + execute_insert("flush",conn) + execute_insert("update t2 set v2 = 100 where v1 = 1",conn) + execute_insert("flush",conn) + start_time = time.time() + row = execute_query("fetch 100 from cur with (timeout = '30s')",conn) + assert (time.time() - start_time) < 3 + assert len(row) == 3 + check_rows_data([1,1],row[0],"Insert") + check_rows_data([1,1],row[1],"UpdateDelete") + check_rows_data([1,100],row[2],"UpdateInsert") + + # Test block cursor fetches data successfully + thread = threading.Thread(target=insert_into_table) + thread.start() + row = execute_query("fetch 100 from cur with (timeout = '5s')",conn) + check_rows_data([10,10],row[0],"Insert") + thread.join() + + # Test block cursor timeout + row = execute_query("fetch 100 from cur with (timeout = '5s')",conn) + assert row == [] + + drop_table_subscription() + +def insert_into_table(): + time.sleep(2) + conn = psycopg2.connect( + host="localhost", + port="4566", + user="root", + database="dev" + ) + execute_insert("insert into t2 values(10,10)",conn) + if __name__ == "__main__": test_cursor_snapshot() test_cursor_op() @@ -366,3 +413,4 @@ def test_rebuild_table(): test_cursor_with_table_alter() test_cursor_fetch_n() test_rebuild_table() + test_block_cursor() diff --git a/proto/hummock.proto b/proto/hummock.proto index f52854b727045..c9e6b96c91aaa 100644 --- a/proto/hummock.proto +++ b/proto/hummock.proto @@ -835,7 +835,6 @@ service HummockManagerService { rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse); rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse); rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse); - rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse); rpc GetVersionByEpoch(GetVersionByEpochRequest) returns (GetVersionByEpochResponse); rpc MergeCompactionGroup(MergeCompactionGroupRequest) returns (MergeCompactionGroupResponse); } @@ -907,13 +906,3 @@ message BranchedObject { // Compaction group id the SST belongs to. uint64 compaction_group_id = 3; } - -message ListChangeLogEpochsRequest { - uint32 table_id = 1; - uint64 min_epoch = 2; - uint32 max_count = 3; -} - -message ListChangeLogEpochsResponse { - repeated uint64 epochs = 1; -} diff --git a/src/frontend/src/catalog/subscription_catalog.rs b/src/frontend/src/catalog/subscription_catalog.rs index 36a5a71a0e9be..93c726953c752 100644 --- a/src/frontend/src/catalog/subscription_catalog.rs +++ b/src/frontend/src/catalog/subscription_catalog.rs @@ -12,17 +12,14 @@ // See the License for the specific language governing permissions and // limitations under the License. -use core::str::FromStr; - use risingwave_common::catalog::{TableId, UserId, OBJECT_ID_PLACEHOLDER}; -use risingwave_common::types::Interval; use risingwave_common::util::epoch::Epoch; use risingwave_pb::catalog::subscription::PbSubscriptionState; use risingwave_pb::catalog::PbSubscription; -use thiserror_ext::AsReport; use super::OwnedByUserCatalog; use crate::error::{ErrorCode, Result}; +use crate::handler::util::convert_interval_to_u64_seconds; use crate::WithOptions; #[derive(Clone, Debug, PartialEq, Eq, Hash)] @@ -86,15 +83,7 @@ impl SubscriptionCatalog { let retention_seconds_str = properties.get("retention").ok_or_else(|| { ErrorCode::InternalError("Subscription retention time not set.".to_string()) })?; - let retention_seconds = (Interval::from_str(retention_seconds_str) - .map_err(|err| { - ErrorCode::InternalError(format!( - "Retention needs to be set in Interval format: {:?}", - err.to_report_string() - )) - })? - .epoch_in_micros() - / 1000000) as u64; + let retention_seconds = convert_interval_to_u64_seconds(retention_seconds_str)?; self.retention_seconds = retention_seconds; Ok(()) } diff --git a/src/frontend/src/handler/fetch_cursor.rs b/src/frontend/src/handler/fetch_cursor.rs index d339e3e7a1acb..8759e7c5f5917 100644 --- a/src/frontend/src/handler/fetch_cursor.rs +++ b/src/frontend/src/handler/fetch_cursor.rs @@ -22,11 +22,12 @@ use risingwave_sqlparser::ast::{FetchCursorStatement, Statement}; use super::extended_handle::{PortalResult, PrepareStatement, PreparedResult}; use super::query::BoundResult; +use super::util::convert_interval_to_u64_seconds; use super::RwPgResponse; use crate::binder::BoundStatement; use crate::error::Result; use crate::handler::HandlerArgs; -use crate::{Binder, PgResponseStream}; +use crate::{Binder, PgResponseStream, WithOptions}; pub async fn handle_fetch_cursor_execute( handler_args: HandlerArgs, @@ -61,10 +62,31 @@ pub async fn handle_fetch_cursor( let (_, cursor_name) = Binder::resolve_schema_qualified_name(db_name, stmt.cursor_name.clone())?; + let with_options = WithOptions::try_from(stmt.with_properties.0.as_slice())?; + + if with_options.len() > 1 { + bail_not_implemented!("only `timeout` is supported in with options") + } + + let timeout_seconds = with_options + .get("timeout") + .map(convert_interval_to_u64_seconds) + .transpose()?; + + if with_options.len() == 1 && timeout_seconds.is_none() { + bail_not_implemented!("only `timeout` is supported in with options") + } + let cursor_manager = session.get_cursor_manager(); let (rows, pg_descs) = cursor_manager - .get_rows_with_cursor(cursor_name, stmt.count, handler_args, formats) + .get_rows_with_cursor( + cursor_name, + stmt.count, + handler_args, + formats, + timeout_seconds, + ) .await?; Ok(build_fetch_cursor_response(rows, pg_descs)) } diff --git a/src/frontend/src/handler/util.rs b/src/frontend/src/handler/util.rs index 0531ce5a65284..5551154bd8ec4 100644 --- a/src/frontend/src/handler/util.rs +++ b/src/frontend/src/handler/util.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use core::str::FromStr; use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; @@ -28,13 +29,16 @@ use pin_project_lite::pin_project; use risingwave_common::array::DataChunk; use risingwave_common::catalog::Field; use risingwave_common::row::Row as _; -use risingwave_common::types::{write_date_time_tz, DataType, ScalarRefImpl, Timestamptz}; +use risingwave_common::types::{ + write_date_time_tz, DataType, Interval, ScalarRefImpl, Timestamptz, +}; use risingwave_common::util::epoch::Epoch; use risingwave_common::util::iter_util::ZipEqFast; use risingwave_sqlparser::ast::{ CompatibleSourceSchema, ConnectorSchema, ObjectName, Query, Select, SelectItem, SetExpr, TableFactor, TableWithJoins, }; +use thiserror_ext::AsReport; use crate::error::{ErrorCode, Result as RwResult}; use crate::session::{current, SessionImpl}; @@ -238,6 +242,19 @@ pub fn convert_logstore_u64_to_unix_millis(logstore_u64: u64) -> u64 { Epoch::from(logstore_u64).as_unix_millis() } +pub fn convert_interval_to_u64_seconds(interval: &String) -> RwResult { + let seconds = (Interval::from_str(interval) + .map_err(|err| { + ErrorCode::InternalError(format!( + "Covert interval to u64 error, please check format, error: {:?}", + err.to_report_string() + )) + })? + .epoch_in_micros() + / 1000000) as u64; + Ok(seconds) +} + #[cfg(test)] mod tests { use postgres_types::{ToSql, Type}; diff --git a/src/frontend/src/meta_client.rs b/src/frontend/src/meta_client.rs index d29bd1c3576d0..bdbc67a4b9720 100644 --- a/src/frontend/src/meta_client.rs +++ b/src/frontend/src/meta_client.rs @@ -120,13 +120,6 @@ pub trait FrontendMetaClient: Send + Sync { rate_limit: Option, ) -> Result<()>; - async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Result>; - async fn get_cluster_recovery_status(&self) -> Result; async fn get_cluster_limits(&self) -> Result>; @@ -298,17 +291,6 @@ impl FrontendMetaClient for FrontendMetaClientImpl { .map(|_| ()) } - async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Result> { - self.0 - .list_change_log_epochs(table_id, min_epoch, max_count) - .await - } - async fn get_cluster_recovery_status(&self) -> Result { self.0.get_cluster_recovery_status().await } diff --git a/src/frontend/src/scheduler/snapshot.rs b/src/frontend/src/scheduler/snapshot.rs index bb1d98aa5f8f7..22fb790a55619 100644 --- a/src/frontend/src/scheduler/snapshot.rs +++ b/src/frontend/src/scheduler/snapshot.rs @@ -26,6 +26,7 @@ use risingwave_pb::common::{batch_query_epoch, BatchQueryCommittedEpoch, BatchQu use risingwave_pb::hummock::{HummockVersionDeltas, StateTableInfoDelta}; use tokio::sync::watch; +use crate::error::{ErrorCode, RwError}; use crate::expr::InlineNowProcTime; use crate::meta_client::FrontendMetaClient; use crate::scheduler::SchedulerError; @@ -139,6 +140,20 @@ impl PinnedSnapshot { pub fn version(&self) -> &FrontendHummockVersion { &self.value } + + pub fn list_change_log_epochs( + &self, + table_id: u32, + min_epoch: u64, + max_count: u32, + ) -> Vec { + if let Some(table_change_log) = self.value.table_change_log.get(&TableId::new(table_id)) { + let table_change_log = table_change_log.clone(); + table_change_log.get_non_empty_epochs(min_epoch, max_count as usize) + } else { + vec![] + } + } } /// Returns an invalid snapshot, used for initial values. @@ -162,6 +177,14 @@ pub struct HummockSnapshotManager { /// `current_epoch` is always in the shared buffer, so it will never be gc before the data /// of `committed_epoch`. latest_snapshot: watch::Sender, + + table_change_log_notification_sender: watch::Sender, +} + +#[derive(Default)] +struct TableChangeLogNotificationMsg { + updated_change_log_table_ids: HashSet, + deleted_table_ids: HashSet, } pub type HummockSnapshotManagerRef = Arc; @@ -174,7 +197,13 @@ impl HummockSnapshotManager { let (latest_snapshot, _) = watch::channel(latest_snapshot); - Self { latest_snapshot } + let (table_change_log_notification_sender, _) = + watch::channel(TableChangeLogNotificationMsg::default()); + + Self { + latest_snapshot, + table_change_log_notification_sender, + } } /// Acquire the latest snapshot by increasing its reference count. @@ -183,6 +212,24 @@ impl HummockSnapshotManager { } pub fn init(&self, version: FrontendHummockVersion) { + let updated_change_log_table_ids: HashSet<_> = version + .table_change_log + .iter() + .filter_map(|(table_id, change_log)| { + if change_log.get_non_empty_epochs(0, usize::MAX).is_empty() { + None + } else { + Some(table_id.table_id()) + } + }) + .collect(); + self.table_change_log_notification_sender + .send(TableChangeLogNotificationMsg { + updated_change_log_table_ids, + deleted_table_ids: Default::default(), + }) + .ok(); + self.update_inner(|_| Some(version)); } @@ -190,6 +237,35 @@ impl HummockSnapshotManager { /// /// Should only be called by the observer manager. pub fn update(&self, deltas: HummockVersionDeltas) { + let updated_change_log_table_ids: HashSet<_> = deltas + .version_deltas + .iter() + .flat_map(|version_deltas| &version_deltas.change_log_delta) + .filter_map(|(table_id, change_log)| match change_log.new_log.as_ref() { + Some(new_log) => { + let new_value_empty = new_log.new_value.is_empty(); + let old_value_empty = new_log.old_value.is_empty(); + if !new_value_empty || !old_value_empty { + Some(*table_id) + } else { + None + } + } + None => None, + }) + .collect(); + let deleted_table_ids: HashSet<_> = deltas + .version_deltas + .iter() + .flat_map(|version_deltas| version_deltas.removed_table_ids.clone()) + .collect(); + self.table_change_log_notification_sender + .send(TableChangeLogNotificationMsg { + updated_change_log_table_ids, + deleted_table_ids, + }) + .ok(); + self.update_inner(|old_snapshot| { if deltas.version_deltas.is_empty() { return None; @@ -249,4 +325,31 @@ impl HummockSnapshotManager { rx.changed().await.unwrap(); } } + + pub async fn wait_table_change_log_notification(&self, table_id: u32) -> Result<(), RwError> { + let mut rx = self.table_change_log_notification_sender.subscribe(); + loop { + rx.changed() + .await + .map_err(|_| ErrorCode::InternalError("cursor notify channel is closed.".into()))?; + let table_change_log_notification_msg = rx.borrow_and_update(); + if table_change_log_notification_msg + .deleted_table_ids + .contains(&table_id) + { + return Err(ErrorCode::InternalError(format!( + "Cursor dependent table deleted: table_id is {:?}", + table_id + )) + .into()); + } + if table_change_log_notification_msg + .updated_change_log_table_ids + .contains(&table_id) + { + break; + } + } + Ok(()) + } } diff --git a/src/frontend/src/session.rs b/src/frontend/src/session.rs index 7b76e1413ef0d..9157466d9af91 100644 --- a/src/frontend/src/session.rs +++ b/src/frontend/src/session.rs @@ -346,6 +346,9 @@ impl FrontendEnv { // This `session_params` should be initialized during the initial notification in `observer_manager` let session_params = Arc::new(RwLock::new(SessionConfig::default())); + let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new())); + let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone())); + let frontend_observer_node = FrontendObserverNode::new( worker_node_manager.clone(), catalog, @@ -417,10 +420,7 @@ impl FrontendEnv { .unwrap(), )); - let sessions_map: SessionMapRef = Arc::new(RwLock::new(HashMap::new())); - let cursor_metrics = Arc::new(CursorMetrics::init(sessions_map.clone())); let sessions = sessions_map.clone(); - // Idle transaction background monitor let join_handle = tokio::spawn(async move { let mut check_idle_txn_interval = @@ -1090,7 +1090,7 @@ impl SessionImpl { Ok(secret.clone()) } - pub async fn list_change_log_epochs( + pub fn list_change_log_epochs( &self, table_id: u32, min_epoch: u64, @@ -1098,9 +1098,9 @@ impl SessionImpl { ) -> Result> { Ok(self .env - .meta_client() - .list_change_log_epochs(table_id, min_epoch, max_count) - .await?) + .hummock_snapshot_manager() + .acquire() + .list_change_log_epochs(table_id, min_epoch, max_count)) } pub fn clear_cancel_query_flag(&self) { diff --git a/src/frontend/src/session/cursor_manager.rs b/src/frontend/src/session/cursor_manager.rs index 5270aced1f13f..7cfdc0aa012a0 100644 --- a/src/frontend/src/session/cursor_manager.rs +++ b/src/frontend/src/session/cursor_manager.rs @@ -112,13 +112,18 @@ impl Cursor { count: u32, handle_args: HandlerArgs, formats: &Vec, + timeout_seconds: Option, ) -> Result<(Vec, Vec)> { match self { Cursor::Subscription(cursor) => cursor - .next(count, handle_args, formats) + .next(count, handle_args, formats, timeout_seconds) .await .inspect_err(|_| cursor.cursor_metrics.subscription_cursor_error_count.inc()), - Cursor::Query(cursor) => cursor.next(count, formats, handle_args).await, + Cursor::Query(cursor) => { + cursor + .next(count, formats, handle_args, timeout_seconds) + .await + } } } @@ -163,9 +168,11 @@ impl QueryCursor { count: u32, formats: &Vec, handle_args: HandlerArgs, + timeout_seconds: Option, ) -> Result<(Vec, Vec)> { // `FETCH NEXT` is equivalent to `FETCH 1`. // min with 100 to avoid allocating too many memory at once. + let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s)); let session = handle_args.session; let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; @@ -177,6 +184,11 @@ impl QueryCursor { { cur += 1; ans.push(row); + if let Some(timeout_instant) = timeout_instant + && Instant::now() > timeout_instant + { + break; + } } Ok((ans, desc)) } @@ -318,9 +330,7 @@ impl SubscriptionCursor { *expected_timestamp, handle_args.clone(), &self.subscription, - ) - .await - { + ) { Ok((Some(rw_timestamp), expected_timestamp)) => { let (mut chunk_stream, fields, init_query_timer) = Self::initiate_query( @@ -434,7 +444,9 @@ impl SubscriptionCursor { count: u32, handle_args: HandlerArgs, formats: &Vec, + timeout_seconds: Option, ) -> Result<(Vec, Vec)> { + let timeout_instant = timeout_seconds.map(|s| Instant::now() + Duration::from_secs(s)); if Instant::now() > self.cursor_need_drop_time { return Err(ErrorCode::InternalError( "The cursor has exceeded its maximum lifetime, please recreate it (close then declare cursor).".to_string(), @@ -442,9 +454,9 @@ impl SubscriptionCursor { .into()); } + let session = &handle_args.session; let mut ans = Vec::with_capacity(std::cmp::min(100, count) as usize); let mut cur = 0; - let desc = self.fields.iter().map(to_pg_field).collect(); if let State::Fetch { from_snapshot, chunk_stream, @@ -456,7 +468,7 @@ impl SubscriptionCursor { formats, from_snapshot, &self.fields, - handle_args.session.clone(), + session.clone(), ); } while cur < count { @@ -472,16 +484,43 @@ impl SubscriptionCursor { ans.push(row); } None => { - break; + let timeout_seconds = timeout_seconds.unwrap_or(0); + if cur > 0 || timeout_seconds == 0 { + break; + } + // It's only blocked when there's no data + // This method will only be called once, either to trigger a timeout or to get the return value in the next loop via `next_row`. + match tokio::time::timeout( + Duration::from_secs(timeout_seconds), + session + .env + .hummock_snapshot_manager() + .wait_table_change_log_notification(self.dependent_table_id.table_id()), + ) + .await + { + Ok(result) => result?, + Err(_) => { + tracing::debug!("Cursor wait next epoch timeout"); + break; + } + } } } + // Timeout, return with current value + if let Some(timeout_instant) = timeout_instant + && Instant::now() > timeout_instant + { + break; + } } self.last_fetch = Instant::now(); + let desc = self.fields.iter().map(to_pg_field).collect(); Ok((ans, desc)) } - async fn get_next_rw_timestamp( + fn get_next_rw_timestamp( seek_timestamp: u64, table_id: &TableId, expected_timestamp: Option, @@ -496,9 +535,7 @@ impl SubscriptionCursor { )?; // The epoch here must be pulled every time, otherwise there will be cache consistency issues - let new_epochs = session - .list_change_log_epochs(table_id.table_id(), seek_timestamp, 2) - .await?; + let new_epochs = session.list_change_log_epochs(table_id.table_id(), seek_timestamp, 2)?; if let Some(expected_timestamp) = expected_timestamp && (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap()) { @@ -794,9 +831,12 @@ impl CursorManager { count: u32, handle_args: HandlerArgs, formats: &Vec, + timeout_seconds: Option, ) -> Result<(Vec, Vec)> { if let Some(cursor) = self.cursor_map.lock().await.get_mut(&cursor_name) { - cursor.next(count, handle_args, formats).await + cursor + .next(count, handle_args, formats, timeout_seconds) + .await } else { Err(ErrorCode::ItemNotFound(format!("Cannot find cursor `{}`", cursor_name)).into()) } diff --git a/src/frontend/src/test_utils.rs b/src/frontend/src/test_utils.rs index 86046850645f0..ff1c92227716c 100644 --- a/src/frontend/src/test_utils.rs +++ b/src/frontend/src/test_utils.rs @@ -1053,15 +1053,6 @@ impl FrontendMetaClient for MockFrontendMetaClient { Ok(RecoveryStatus::StatusRunning) } - async fn list_change_log_epochs( - &self, - _table_id: u32, - _min_epoch: u64, - _max_count: u32, - ) -> RpcResult> { - unimplemented!() - } - async fn get_cluster_limits(&self) -> RpcResult> { Ok(vec![]) } diff --git a/src/meta/service/src/hummock_service.rs b/src/meta/service/src/hummock_service.rs index 6be42783a2388..1c2dd26e2bff6 100644 --- a/src/meta/service/src/hummock_service.rs +++ b/src/meta/service/src/hummock_service.rs @@ -619,22 +619,6 @@ impl HummockManagerService for HummockServiceImpl { return Ok(response); } - async fn list_change_log_epochs( - &self, - request: Request, - ) -> Result, Status> { - let ListChangeLogEpochsRequest { - table_id, - min_epoch, - max_count, - } = request.into_inner(); - let epochs = self - .hummock_manager - .list_change_log_epochs(table_id, min_epoch, max_count) - .await; - Ok(Response::new(ListChangeLogEpochsResponse { epochs })) - } - async fn get_version_by_epoch( &self, request: Request, diff --git a/src/meta/src/hummock/manager/versioning.rs b/src/meta/src/hummock/manager/versioning.rs index 725d838711e9e..c2ff62256eaf3 100644 --- a/src/meta/src/hummock/manager/versioning.rs +++ b/src/meta/src/hummock/manager/versioning.rs @@ -16,7 +16,6 @@ use std::cmp; use std::collections::{BTreeMap, HashMap, HashSet}; use itertools::Itertools; -use risingwave_common::catalog::TableId; use risingwave_hummock_sdk::compaction_group::hummock_version_ext::{ get_compaction_group_ids, get_table_compaction_group_id_mapping, BranchedSstInfo, }; @@ -264,25 +263,6 @@ impl HummockManager { } Ok(()) } - - pub async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Vec { - let versioning = self.versioning.read().await; - if let Some(table_change_log) = versioning - .current_version - .table_change_log - .get(&TableId::new(table_id)) - { - let table_change_log = table_change_log.clone(); - table_change_log.get_non_empty_epochs(min_epoch, max_count as usize) - } else { - vec![] - } - } } /// Calculates write limits for `target_groups`. diff --git a/src/rpc_client/src/meta_client.rs b/src/rpc_client/src/meta_client.rs index a433af1e51e66..07632a0e3f775 100644 --- a/src/rpc_client/src/meta_client.rs +++ b/src/rpc_client/src/meta_client.rs @@ -698,21 +698,6 @@ impl MetaClient { .ok_or_else(|| anyhow!("wait version not set"))?) } - pub async fn list_change_log_epochs( - &self, - table_id: u32, - min_epoch: u64, - max_count: u32, - ) -> Result> { - let request = ListChangeLogEpochsRequest { - table_id, - min_epoch, - max_count, - }; - let resp = self.inner.list_change_log_epochs(request).await?; - Ok(resp.epochs) - } - pub async fn drop_index(&self, index_id: IndexId, cascade: bool) -> Result { let request = DropIndexRequest { index_id: index_id.index_id, @@ -2119,7 +2104,6 @@ macro_rules! for_all_meta_rpc { ,{ hummock_client, list_compact_task_assignment, ListCompactTaskAssignmentRequest, ListCompactTaskAssignmentResponse } ,{ hummock_client, list_compact_task_progress, ListCompactTaskProgressRequest, ListCompactTaskProgressResponse } ,{ hummock_client, cancel_compact_task, CancelCompactTaskRequest, CancelCompactTaskResponse} - ,{ hummock_client, list_change_log_epochs, ListChangeLogEpochsRequest, ListChangeLogEpochsResponse } ,{ hummock_client, get_version_by_epoch, GetVersionByEpochRequest, GetVersionByEpochResponse } ,{ hummock_client, merge_compaction_group, MergeCompactionGroupRequest, MergeCompactionGroupResponse } ,{ user_client, create_user, CreateUserRequest, CreateUserResponse } diff --git a/src/sqlparser/src/ast/statement.rs b/src/sqlparser/src/ast/statement.rs index 7dd7564fc6661..8164194b36bc8 100644 --- a/src/sqlparser/src/ast/statement.rs +++ b/src/sqlparser/src/ast/statement.rs @@ -717,6 +717,7 @@ impl fmt::Display for DeclareCursorStatement { pub struct FetchCursorStatement { pub cursor_name: ObjectName, pub count: u32, + pub with_properties: WithProperties, } impl ParseTo for FetchCursorStatement { @@ -728,8 +729,13 @@ impl ParseTo for FetchCursorStatement { }; p.expect_keyword(Keyword::FROM)?; impl_parse_to!(cursor_name: ObjectName, p); + impl_parse_to!(with_properties: WithProperties, p); - Ok(Self { cursor_name, count }) + Ok(Self { + cursor_name, + count, + with_properties, + }) } } diff --git a/src/storage/hummock_sdk/src/frontend_version.rs b/src/storage/hummock_sdk/src/frontend_version.rs index fd010e1c3e6ff..58276c522bd54 100644 --- a/src/storage/hummock_sdk/src/frontend_version.rs +++ b/src/storage/hummock_sdk/src/frontend_version.rs @@ -18,7 +18,7 @@ use risingwave_common::catalog::TableId; use risingwave_common::util::epoch::INVALID_EPOCH; use risingwave_pb::hummock::hummock_version_delta::PbChangeLogDelta; use risingwave_pb::hummock::{ - PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbTableChangeLog, + PbEpochNewChangeLog, PbHummockVersion, PbHummockVersionDelta, PbSstableInfo, PbTableChangeLog, StateTableInfoDelta, }; @@ -162,8 +162,9 @@ impl FrontendHummockVersionDelta { truncate_epoch: change_log_delta.truncate_epoch, new_log: change_log_delta.new_log.as_ref().map(|new_log| { EpochNewChangeLogCommon { - new_value: vec![], - old_value: vec![], + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` + new_value: vec![(); new_log.new_value.len()], + old_value: vec![(); new_log.old_value.len()], epochs: new_log.epochs.clone(), } }), @@ -196,8 +197,9 @@ impl FrontendHummockVersionDelta { table_id.table_id, PbChangeLogDelta { new_log: delta.new_log.as_ref().map(|new_log| PbEpochNewChangeLog { - old_value: vec![], - new_value: vec![], + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `PbSstableInfo::default()` + old_value: vec![PbSstableInfo::default(); new_log.old_value.len()], + new_value: vec![PbSstableInfo::default(); new_log.new_value.len()], epochs: new_log.epochs.clone(), }), truncate_epoch: delta.truncate_epoch, @@ -237,8 +239,9 @@ impl FrontendHummockVersionDelta { truncate_epoch: change_log_delta.truncate_epoch, new_log: change_log_delta.new_log.as_ref().map(|new_log| { EpochNewChangeLogCommon { - new_value: vec![], - old_value: vec![], + // Here we need to determine if value is null but don't care what the value is, so we fill him in using `()` + new_value: vec![(); new_log.new_value.len()], + old_value: vec![(); new_log.old_value.len()], epochs: new_log.epochs.clone(), } }),