Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin' into tab/connection
Browse files Browse the repository at this point in the history
  • Loading branch information
tabversion committed Nov 7, 2024
2 parents 635975d + 218e63e commit 0fd3972
Show file tree
Hide file tree
Showing 13 changed files with 210 additions and 155 deletions.
2 changes: 1 addition & 1 deletion src/common/src/acl/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ pub static ALL_AVAILABLE_TABLE_MODES: LazyLock<AclModeSet> =
LazyLock::new(|| make_bitflags!(AclMode::{Select | Insert | Update | Delete}).into());
pub static ALL_AVAILABLE_SOURCE_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_MVIEW_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_SINK_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::empty);
pub static ALL_AVAILABLE_SINK_MODES: LazyLock<AclModeSet> = LazyLock::new(AclModeSet::readonly);
pub static ALL_AVAILABLE_SUBSCRIPTION_MODES: LazyLock<AclModeSet> =
LazyLock::new(AclModeSet::empty);
pub static ALL_AVAILABLE_FUNCTION_MODES: LazyLock<AclModeSet> =
Expand Down
14 changes: 14 additions & 0 deletions src/frontend/src/handler/handle_privilege.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,20 @@ fn make_prost_privilege(
grant_objs.push(PbObject::SourceId(source.id));
}
}
GrantObjects::Sinks(sinks) => {
let db_name = session.database();
let search_path = session.config().search_path();
let user_name = &session.auth_context().user_name;

for name in sinks {
let (schema_name, sink_name) =
Binder::resolve_schema_qualified_name(db_name, name)?;
let schema_path = SchemaPath::new(schema_name.as_deref(), &search_path, user_name);

let (sink, _) = reader.get_sink_by_name(db_name, schema_path, &sink_name)?;
grant_objs.push(PbObject::SinkId(sink.id.sink_id));
}
}
GrantObjects::AllSourcesInSchema { schemas } => {
for schema in schemas {
let schema_name = Binder::resolve_schema_name(schema)?;
Expand Down
4 changes: 3 additions & 1 deletion src/meta/model/src/user_privilege.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ use serde::{Deserialize, Serialize};

use crate::{ObjectId, PrivilegeId, UserId};

#[derive(Clone, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize)]
#[derive(
Clone, Copy, Debug, Hash, PartialEq, Eq, EnumIter, DeriveActiveEnum, Serialize, Deserialize,
)]
#[sea_orm(rs_type = "String", db_type = "string(None)")]
pub enum Action {
#[sea_orm(string_value = "INSERT")]
Expand Down
20 changes: 11 additions & 9 deletions src/meta/src/barrier/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1213,7 +1213,7 @@ impl GlobalBarrierWorkerContextImpl {
if is_first_time {
commit_info
.new_table_fragment_infos
.push(NewTableFragmentInfo::NewCompactionGroup {
.push(NewTableFragmentInfo {
table_ids: tables_to_commit,
});
};
Expand Down Expand Up @@ -1747,14 +1747,16 @@ fn collect_commit_epoch_info(
&& !matches!(job_type, CreateStreamingJobType::SnapshotBackfill(_))
{
let table_fragments = &info.table_fragments;
vec![NewTableFragmentInfo::Normal {
mv_table_id: table_fragments.mv_table_id().map(TableId::new),
internal_table_ids: table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect(),
}]
let mut table_ids: HashSet<_> = table_fragments
.internal_table_ids()
.into_iter()
.map(TableId::new)
.collect();
if let Some(mv_table_id) = table_fragments.mv_table_id() {
table_ids.insert(TableId::new(mv_table_id));
}

vec![NewTableFragmentInfo { table_ids }]
} else {
vec![]
};
Expand Down
12 changes: 3 additions & 9 deletions src/meta/src/controller/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,8 @@ use tokio::sync::{RwLock, RwLockReadGuard, RwLockWriteGuard};
use tracing::info;

use super::utils::{
check_subscription_name_duplicate, get_fragment_ids_by_jobs, rename_relation,
rename_relation_refer,
check_subscription_name_duplicate, get_fragment_ids_by_jobs, get_internal_tables_by_id,
rename_relation, rename_relation_refer,
};
use crate::controller::utils::{
build_relation_group_for_delete, check_connection_name_duplicate,
Expand Down Expand Up @@ -3514,13 +3514,7 @@ async fn update_internal_tables(
new_value: Value,
relations_to_notify: &mut Vec<PbRelationInfo>,
) -> MetaResult<()> {
let internal_tables: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.eq(object_id))
.into_tuple()
.all(txn)
.await?;
let internal_tables = get_internal_tables_by_id(object_id, txn).await?;

if !internal_tables.is_empty() {
Object::update_many()
Expand Down
10 changes: 2 additions & 8 deletions src/meta/src/controller/streaming_job.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use crate::controller::rename::ReplaceTableExprRewriter;
use crate::controller::utils::{
build_relation_group_for_delete, check_relation_name_duplicate, check_sink_into_table_cycle,
ensure_object_id, ensure_user_id, get_fragment_actor_ids, get_fragment_mappings,
rebuild_fragment_mapping_from_actors, PartialObject,
get_internal_tables_by_id, rebuild_fragment_mapping_from_actors, PartialObject,
};
use crate::controller::ObjectModel;
use crate::manager::{NotificationVersion, StreamingJob};
Expand Down Expand Up @@ -521,13 +521,7 @@ impl CatalogController {
}
}

let internal_table_ids: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(table::Column::BelongsToJobId.eq(job_id))
.into_tuple()
.all(&txn)
.await?;
let internal_table_ids = get_internal_tables_by_id(job_id, &txn).await?;

// Get the notification info if the job is a materialized view.
let table_obj = Table::find_by_id(job_id).one(&txn).await?;
Expand Down
45 changes: 40 additions & 5 deletions src/meta/src/controller/user.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,9 @@ use sea_orm::{
use crate::controller::catalog::CatalogController;
use crate::controller::utils::{
check_user_name_duplicate, ensure_privileges_not_referred, ensure_user_id,
extract_grant_obj_id, get_object_owner, get_referring_privileges_cascade, get_user_privilege,
list_user_info_by_ids, PartialUserPrivilege,
extract_grant_obj_id, get_index_state_tables_by_table_id, get_internal_tables_by_id,
get_object_owner, get_referring_privileges_cascade, get_user_privilege, list_user_info_by_ids,
PartialUserPrivilege,
};
use crate::manager::{NotificationVersion, IGNORED_NOTIFICATION_VERSION};
use crate::{MetaError, MetaResult};
Expand Down Expand Up @@ -230,14 +231,31 @@ impl CatalogController {
let mut privileges = vec![];
for gp in new_grant_privileges {
let id = extract_grant_obj_id(gp.get_object()?);
let internal_table_ids = get_internal_tables_by_id(id, &txn).await?;
let index_state_table_ids = get_index_state_tables_by_table_id(id, &txn).await?;
for action_with_opt in &gp.action_with_opts {
let action = action_with_opt.get_action()?.into();
privileges.push(user_privilege::ActiveModel {
oid: Set(id),
granted_by: Set(grantor),
action: Set(action_with_opt.get_action()?.into()),
action: Set(action),
with_grant_option: Set(action_with_opt.with_grant_option),
..Default::default()
});
if action == Action::Select {
privileges.extend(
internal_table_ids
.iter()
.chain(index_state_table_ids.iter())
.map(|&tid| user_privilege::ActiveModel {
oid: Set(tid),
granted_by: Set(grantor),
action: Set(Action::Select),
with_grant_option: Set(action_with_opt.with_grant_option),
..Default::default()
}),
);
}
}
}

Expand All @@ -254,7 +272,7 @@ impl CatalogController {
let filter = user_privilege::Column::UserId
.eq(grantor)
.and(user_privilege::Column::Oid.eq(*privilege.oid.as_ref()))
.and(user_privilege::Column::Action.eq(privilege.action.as_ref().clone()))
.and(user_privilege::Column::Action.eq(*privilege.action.as_ref()))
.and(user_privilege::Column::WithGrantOption.eq(true));
let privilege_id: Option<PrivilegeId> = UserPrivilege::find()
.select_only()
Expand Down Expand Up @@ -351,12 +369,29 @@ impl CatalogController {
let mut revoke_items = HashMap::new();
for privilege in revoke_grant_privileges {
let obj = extract_grant_obj_id(privilege.get_object()?);
let internal_table_ids = get_internal_tables_by_id(obj, &txn).await?;
let index_state_table_ids = get_index_state_tables_by_table_id(obj, &txn).await?;
let mut include_select = false;
let actions = privilege
.action_with_opts
.iter()
.map(|ao| Action::from(ao.get_action().unwrap()))
.map(|ao| {
let action = Action::from(ao.get_action().unwrap());
if action == Action::Select {
include_select = true;
}
action
})
.collect_vec();
revoke_items.insert(obj, actions);
if include_select {
revoke_items.extend(
internal_table_ids
.iter()
.chain(index_state_table_ids.iter())
.map(|&tid| (tid, vec![Action::Select])),
);
}
}

let filter = if !revoke_user.is_super {
Expand Down
53 changes: 53 additions & 0 deletions src/meta/src/controller/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use risingwave_meta_model::actor::ActorStatus;
use risingwave_meta_model::fragment::DistributionType;
use risingwave_meta_model::object::ObjectType;
use risingwave_meta_model::prelude::*;
use risingwave_meta_model::table::TableType;
use risingwave_meta_model::{
actor, actor_dispatcher, connection, database, fragment, function, index, object,
object_dependency, schema, secret, sink, source, subscription, table, user, user_privilege,
Expand Down Expand Up @@ -735,6 +736,58 @@ pub fn construct_privilege_dependency_query(ids: Vec<PrivilegeId>) -> WithQuery
.to_owned()
}

pub async fn get_internal_tables_by_id<C>(job_id: ObjectId, db: &C) -> MetaResult<Vec<TableId>>
where
C: ConnectionTrait,
{
let table_ids: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(
table::Column::TableType
.eq(TableType::Internal)
.and(table::Column::BelongsToJobId.eq(job_id)),
)
.into_tuple()
.all(db)
.await?;
Ok(table_ids)
}

pub async fn get_index_state_tables_by_table_id<C>(
table_id: TableId,
db: &C,
) -> MetaResult<Vec<TableId>>
where
C: ConnectionTrait,
{
let mut index_table_ids: Vec<TableId> = Index::find()
.select_only()
.column(index::Column::IndexTableId)
.filter(index::Column::PrimaryTableId.eq(table_id))
.into_tuple()
.all(db)
.await?;

if !index_table_ids.is_empty() {
let internal_table_ids: Vec<TableId> = Table::find()
.select_only()
.column(table::Column::TableId)
.filter(
table::Column::TableType
.eq(TableType::Internal)
.and(table::Column::BelongsToJobId.is_in(index_table_ids.clone())),
)
.into_tuple()
.all(db)
.await?;

index_table_ids.extend(internal_table_ids.into_iter());
}

Ok(index_table_ids)
}

#[derive(Clone, DerivePartialModel, FromQueryResult)]
#[sea_orm(entity = "UserPrivilege")]
pub struct PartialUserPrivilege {
Expand Down
Loading

0 comments on commit 0fd3972

Please sign in to comment.