Skip to content

Commit

Permalink
fix comm
Browse files Browse the repository at this point in the history
  • Loading branch information
xxhZs committed May 6, 2024
1 parent d38f245 commit 69690ab
Show file tree
Hide file tree
Showing 24 changed files with 158 additions and 144 deletions.
6 changes: 3 additions & 3 deletions proto/catalog.proto
Original file line number Diff line number Diff line change
Expand Up @@ -179,15 +179,15 @@ message Subscription {
enum SubscriptionState {
UNSPECIFIED = 0;
INIT = 1;
CREATE = 2;
CREATED = 2;
}
uint32 id = 1;
string name = 2;
string definition = 3;
map<string, string> properties = 6;
uint64 retention_seconds = 6;
uint32 database_id = 8;
uint32 schema_id = 9;
uint32 dependent_table = 10;
uint32 dependent_table_id = 10;
optional uint64 initialized_at_epoch = 11;
optional uint64 created_at_epoch = 12;
uint32 owner = 13;
Expand Down
1 change: 0 additions & 1 deletion proto/ddl_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,6 @@ message CreateSubscriptionResponse {
message DropSubscriptionRequest {
uint32 subscription_id = 1;
bool cascade = 2;
uint32 dependent_table = 3;
}

message DropSubscriptionResponse {
Expand Down
8 changes: 4 additions & 4 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -833,7 +833,7 @@ service HummockManagerService {
rpc ListCompactTaskAssignment(ListCompactTaskAssignmentRequest) returns (ListCompactTaskAssignmentResponse);
rpc ListCompactTaskProgress(ListCompactTaskProgressRequest) returns (ListCompactTaskProgressResponse);
rpc CancelCompactTask(CancelCompactTaskRequest) returns (CancelCompactTaskResponse);
rpc ListEpochForSubscription(ListEpochForSubscriptionRequest) returns (ListEpochForSubscriptionResponse);
rpc ListChangeLogEpochs(ListChangeLogEpochsRequest) returns (ListChangeLogEpochsResponse);
}

message CompactionConfig {
Expand Down Expand Up @@ -894,12 +894,12 @@ message BranchedObject {
uint64 compaction_group_id = 3;
}

message ListEpochForSubscriptionRequest {
message ListChangeLogEpochsRequest {
uint32 table_id = 1;
uint64 min_epoch = 2;
uint64 max_epoch = 3;
uint32 max_count = 3;
}

message ListEpochForSubscriptionResponse {
message ListChangeLogEpochsResponse {
repeated uint64 epochs = 1;
}
2 changes: 1 addition & 1 deletion src/ctl/src/cmd_impl/meta/migration.rs
Original file line number Diff line number Diff line change
Expand Up @@ -562,7 +562,7 @@ pub async fn migrate(from: EtcdBackend, target: String, force_clean: bool) -> an
.map(|s| {
object_dependencies.push(object_dependency::ActiveModel {
id: NotSet,
oid: Set(s.dependent_table as _),
oid: Set(s.dependent_table_id as _),
used_by: Set(s.id as _),
});
s.into()
Expand Down
26 changes: 8 additions & 18 deletions src/frontend/src/catalog/catalog_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -152,12 +152,7 @@ pub trait CatalogWriter: Send + Sync {
affected_table_change: Option<PbReplaceTablePlan>,
) -> Result<()>;

async fn drop_subscription(
&self,
subscription_id: u32,
cascade: bool,
dependent_table: u32,
) -> Result<()>;
async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()>;

async fn drop_database(&self, database_id: u32) -> Result<()>;

Expand Down Expand Up @@ -206,11 +201,11 @@ pub trait CatalogWriter: Send + Sync {
new_schema_id: u32,
) -> Result<()>;

async fn list_epoch_for_subscription(
async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>>;
}

Expand Down Expand Up @@ -427,15 +422,10 @@ impl CatalogWriter for CatalogWriterImpl {
self.wait_version(version).await
}

async fn drop_subscription(
&self,
subscription_id: u32,
cascade: bool,
dependent_table: u32,
) -> Result<()> {
async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
let version = self
.meta_client
.drop_subscription(subscription_id, cascade, dependent_table)
.drop_subscription(subscription_id, cascade)
.await?;
self.wait_version(version).await
}
Expand Down Expand Up @@ -575,15 +565,15 @@ impl CatalogWriter for CatalogWriterImpl {
Ok(())
}

async fn list_epoch_for_subscription(
async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>> {
Ok(self
.meta_client
.list_epoch_for_subscription(table_id, min_epoch, max_epoch)
.list_change_log_epochs(table_id, min_epoch, max_count)
.await?)
}
}
Expand Down
22 changes: 11 additions & 11 deletions src/frontend/src/catalog/subscription_catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ pub struct SubscriptionCatalog {
/// Full SQL definition of the subscription. For debug now.
pub definition: String,

/// The properties of the subscription, only `retention`.
pub properties: BTreeMap<String, String>,
/// The retention seconds of the subscription.
pub retention_seconds: u64,

/// The database id
pub database_id: u32,
Expand All @@ -47,7 +47,7 @@ pub struct SubscriptionCatalog {
pub schema_id: u32,

/// The subscription depends on the upstream list
pub dependent_table: TableId,
pub dependent_table_id: TableId,

/// The user id
pub owner: UserId,
Expand Down Expand Up @@ -82,8 +82,8 @@ impl SubscriptionId {
}

impl SubscriptionCatalog {
pub fn get_retention_seconds(&self) -> Result<u64> {
let retention_seconds_str = self.properties.get("retention").ok_or_else(|| {
pub fn set_retention_seconds(&mut self, properties: BTreeMap<String, String>) -> Result<()> {
let retention_seconds_str = properties.get("retention").ok_or_else(|| {
ErrorCode::InternalError("Subscription retention time not set.".to_string())
})?;
let retention_seconds = (Interval::from_str(retention_seconds_str)
Expand All @@ -95,8 +95,8 @@ impl SubscriptionCatalog {
})?
.epoch_in_micros()
/ 1000000) as u64;

Ok(retention_seconds)
self.retention_seconds = retention_seconds;
Ok(())
}

pub fn create_sql(&self) -> String {
Expand All @@ -108,15 +108,15 @@ impl SubscriptionCatalog {
id: self.id.subscription_id,
name: self.name.clone(),
definition: self.definition.clone(),
properties: self.properties.clone().into_iter().collect(),
retention_seconds: self.retention_seconds,
database_id: self.database_id,
schema_id: self.schema_id,
initialized_at_epoch: self.initialized_at_epoch.map(|e| e.0),
created_at_epoch: self.created_at_epoch.map(|e| e.0),
owner: self.owner.into(),
initialized_at_cluster_version: self.initialized_at_cluster_version.clone(),
created_at_cluster_version: self.created_at_cluster_version.clone(),
dependent_table: self.dependent_table.table_id,
dependent_table_id: self.dependent_table_id.table_id,
subscription_state: PbSubscriptionState::Init.into(),
}
}
Expand All @@ -128,10 +128,10 @@ impl From<&PbSubscription> for SubscriptionCatalog {
id: SubscriptionId::new(prost.id),
name: prost.name.clone(),
definition: prost.definition.clone(),
properties: prost.properties.clone().into_iter().collect(),
retention_seconds: prost.retention_seconds,
database_id: prost.database_id,
schema_id: prost.schema_id,
dependent_table: TableId::new(prost.dependent_table),
dependent_table_id: TableId::new(prost.dependent_table_id),
owner: prost.owner.into(),
created_at_epoch: prost.created_at_epoch.map(Epoch::from),
initialized_at_epoch: prost.initialized_at_epoch.map(Epoch::from),
Expand Down
10 changes: 6 additions & 4 deletions src/frontend/src/handler/create_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,31 @@ pub fn create_subscription_catalog(
let (subscription_database_id, subscription_schema_id) =
session.get_database_and_schema_id_for_create(subscription_schema_name.clone())?;
let definition = context.normalized_sql().to_owned();
let dependent_table = session
let dependent_table_id = session
.get_table_by_name(
&subscription_from_table_name,
table_database_id,
table_schema_id,
)?
.id;

let subscription_catalog = SubscriptionCatalog {
let mut subscription_catalog = SubscriptionCatalog {
id: SubscriptionId::placeholder(),
name: subscription_name,
definition,
properties: context.with_options().clone().into_inner(),
retention_seconds: 0,
database_id: subscription_database_id,
schema_id: subscription_schema_id,
dependent_table,
dependent_table_id,
owner: UserId::new(session.user_id()),
initialized_at_epoch: None,
created_at_epoch: None,
created_at_cluster_version: None,
initialized_at_cluster_version: None,
};

subscription_catalog.set_retention_seconds(context.with_options().clone().into_inner())?;

Ok(subscription_catalog)
}

Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/handler/declare_cursor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,17 +58,17 @@ async fn handle_declare_subscription_cursor(
let cursor_from_subscription_name = sub_name.0.last().unwrap().real_value().clone();
let subscription =
session.get_subscription_by_name(schema_name, &cursor_from_subscription_name)?;
let table = session.get_table_by_id(&subscription.dependent_table)?;
let table = session.get_table_by_id(&subscription.dependent_table_id)?;
// Start the first query of cursor, which includes querying the table and querying the subscription's logstore
let start_rw_timestamp = match rw_timestamp {
Some(risingwave_sqlparser::ast::Since::TimestampMsNum(start_rw_timestamp)) => {
check_cursor_unix_millis(start_rw_timestamp, subscription.get_retention_seconds()?)?;
check_cursor_unix_millis(start_rw_timestamp, subscription.retention_seconds)?;
Some(convert_unix_millis_to_logstore_u64(start_rw_timestamp))
}
Some(risingwave_sqlparser::ast::Since::ProcessTime) => Some(Epoch::now().0),
Some(risingwave_sqlparser::ast::Since::Begin) => {
let min_unix_millis =
Epoch::now().as_unix_millis() - subscription.get_retention_seconds()? * 1000;
Epoch::now().as_unix_millis() - subscription.retention_seconds * 1000;
let subscription_build_millis = subscription.created_at_epoch.unwrap().as_unix_millis();
let min_unix_millis = std::cmp::max(min_unix_millis, subscription_build_millis);
Some(convert_unix_millis_to_logstore_u64(min_unix_millis))
Expand Down
6 changes: 1 addition & 5 deletions src/frontend/src/handler/drop_subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,7 @@ pub async fn handle_drop_subscription(

let catalog_writer = session.catalog_writer()?;
catalog_writer
.drop_subscription(
subscription_id.subscription_id,
cascade,
subscription.dependent_table.table_id,
)
.drop_subscription(subscription_id.subscription_id, cascade)
.await?;

Ok(PgResponse::empty_result(StatementType::DROP_SUBSCRIPTION))
Expand Down
6 changes: 3 additions & 3 deletions src/frontend/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -929,15 +929,15 @@ impl SessionImpl {
Ok(table.clone())
}

pub async fn list_epoch_for_subscription(
pub async fn list_change_log_epochs(
&self,
table_id: u32,
min_epoch: u64,
max_epoch: u64,
max_count: u32,
) -> Result<Vec<u64>> {
self.env
.catalog_writer
.list_epoch_for_subscription(table_id, min_epoch, max_epoch)
.list_change_log_epochs(table_id, min_epoch, max_count)
.await
}

Expand Down
11 changes: 3 additions & 8 deletions src/frontend/src/session/cursor_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ use pgwire::pg_response::StatementType;
use pgwire::types::Row;
use risingwave_common::session_config::QueryMode;
use risingwave_common::types::DataType;
use risingwave_common::util::epoch::MAX_EPOCH;
use risingwave_sqlparser::ast::{Ident, ObjectName, Statement};

use super::SessionImpl;
Expand Down Expand Up @@ -188,7 +187,7 @@ impl SubscriptionCursor {
};

let cursor_need_drop_time =
Instant::now() + Duration::from_secs(subscription.get_retention_seconds()?);
Instant::now() + Duration::from_secs(subscription.retention_seconds);
Ok(Self {
cursor_name,
subscription,
Expand Down Expand Up @@ -227,7 +226,7 @@ impl SubscriptionCursor {
)
.await?;
self.cursor_need_drop_time = Instant::now()
+ Duration::from_secs(self.subscription.get_retention_seconds()?);
+ Duration::from_secs(self.subscription.retention_seconds);
let mut remaining_rows = VecDeque::new();
Self::try_refill_remaining_rows(&mut row_stream, &mut remaining_rows)
.await?;
Expand Down Expand Up @@ -340,13 +339,9 @@ impl SubscriptionCursor {
let new_epochs = handle_args
.session
.catalog_writer()?
.list_epoch_for_subscription(table_id, seek_timestamp, MAX_EPOCH - 1)
.list_change_log_epochs(table_id, seek_timestamp, 2)
.await?;

println!(
"expected_timestamp{:?},{:?},{:?}",
expected_timestamp, new_epochs, seek_timestamp
);
if let Some(expected_timestamp) = expected_timestamp
&& (new_epochs.is_empty() || &expected_timestamp != new_epochs.first().unwrap())
{
Expand Down
11 changes: 3 additions & 8 deletions src/frontend/src/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ impl CatalogWriter for MockCatalogWriter {
Ok(())
}

async fn list_epoch_for_subscription(
async fn list_change_log_epochs(
&self,
_table_id: u32,
_min_epoch: u64,
_max_epoch: u64,
_max_count: u32,
) -> Result<Vec<u64>> {
unreachable!()
}
Expand Down Expand Up @@ -473,12 +473,7 @@ impl CatalogWriter for MockCatalogWriter {
Ok(())
}

async fn drop_subscription(
&self,
subscription_id: u32,
cascade: bool,
_dependent_table: u32,
) -> Result<()> {
async fn drop_subscription(&self, subscription_id: u32, cascade: bool) -> Result<()> {
if cascade {
return Err(ErrorCode::NotSupported(
"drop cascade in MockCatalogWriter is unsupported".to_string(),
Expand Down
10 changes: 5 additions & 5 deletions src/meta/model_v2/src/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,18 @@ use sea_orm::entity::prelude::*;
use sea_orm::ActiveValue::Set;
use serde::{Deserialize, Serialize};

use crate::{Property, SubscriptionId};
use crate::SubscriptionId;

#[derive(Clone, Debug, PartialEq, DeriveEntityModel, Eq, Serialize, Deserialize)]
#[sea_orm(table_name = "subscription")]
pub struct Model {
#[sea_orm(primary_key, auto_increment = false)]
pub subscription_id: SubscriptionId,
pub name: String,
pub properties: Property,
pub retention_seconds: u64,
pub definition: String,
pub subscription_state: i32,
pub dependent_table: u32,
pub dependent_table_id: u32,
}

#[derive(Copy, Clone, Debug, EnumIter, DeriveRelation)]
Expand Down Expand Up @@ -56,10 +56,10 @@ impl From<PbSubscription> for ActiveModel {
Self {
subscription_id: Set(pb_subscription.id as _),
name: Set(pb_subscription.name),
properties: Set(pb_subscription.properties.into()),
retention_seconds: Set(pb_subscription.retention_seconds),
definition: Set(pb_subscription.definition),
subscription_state: Set(pb_subscription.subscription_state),
dependent_table: Set(pb_subscription.dependent_table),
dependent_table_id: Set(pb_subscription.dependent_table_id),
}
}
}
Loading

0 comments on commit 69690ab

Please sign in to comment.