Skip to content

Commit

Permalink
Add HardCompaction to gql
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko committed Apr 9, 2024
1 parent 930a667 commit 3a3e28b
Show file tree
Hide file tree
Showing 26 changed files with 1,019 additions and 86 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use super::{
ensure_expected_dataset_kind,
ensure_flow_preconditions,
ensure_scheduling_permission,
ensure_set_config_flow_supported,
FlowIncompatibleDatasetKind,
FlowPreconditionsNotMet,
};
Expand Down Expand Up @@ -50,6 +51,11 @@ impl DatasetFlowConfigsMut {
paused: bool,
schedule: ScheduleInput,
) -> Result<SetFlowConfigResult> {
if !ensure_set_config_flow_supported(dataset_flow_type) {
return Ok(SetFlowConfigResult::TypeIsNotSupported(
FlowTypeIsNotSupported,
));
}
if let Some(e) =
ensure_expected_dataset_kind(ctx, &self.dataset_handle, dataset_flow_type).await?
{
Expand Down Expand Up @@ -77,8 +83,13 @@ impl DatasetFlowConfigsMut {
let res = flow_config_service
.set_configuration(
Utc::now(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
FlowKeyDataset::new(
self.dataset_handle.id.clone(),
dataset_flow_type.into(),
None,
None,
)
.into(),
paused,
FlowConfigurationRule::Schedule(configuration_rule),
)
Expand All @@ -100,6 +111,11 @@ impl DatasetFlowConfigsMut {
paused: bool,
batching: BatchingConditionInput,
) -> Result<SetFlowConfigResult> {
if !ensure_set_config_flow_supported(dataset_flow_type) {
return Ok(SetFlowConfigResult::TypeIsNotSupported(
FlowTypeIsNotSupported,
));
}
let batching_rule = match BatchingRule::new_checked(
batching.min_records_to_await,
batching.max_batching_interval.into(),
Expand Down Expand Up @@ -132,8 +148,13 @@ impl DatasetFlowConfigsMut {
let res = flow_config_service
.set_configuration(
Utc::now(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
FlowKeyDataset::new(
self.dataset_handle.id.clone(),
dataset_flow_type.into(),
None,
None,
)
.into(),
paused,
FlowConfigurationRule::BatchingRule(batching_rule),
)
Expand Down Expand Up @@ -236,6 +257,7 @@ enum SetFlowConfigResult {
IncompatibleDatasetKind(FlowIncompatibleDatasetKind),
InvalidBatchingConfig(FlowInvalidBatchingConfig),
PreconditionsNotMet(FlowPreconditionsNotMet),
TypeIsNotSupported(FlowTypeIsNotSupported),
}

#[derive(SimpleObject)]
Expand All @@ -251,6 +273,16 @@ impl SetFlowConfigSuccess {
}
}

#[derive(Debug, Clone)]
pub struct FlowTypeIsNotSupported;

#[Object]
impl FlowTypeIsNotSupported {
pub async fn message(&self) -> String {
"Flow type is not supported".to_string()
}
}

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub(crate) struct FlowInvalidBatchingConfig {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ impl DatasetFlowRunsMut {
&self,
ctx: &Context<'_>,
dataset_flow_type: DatasetFlowType,
max_slice_size: Option<u64>,
max_slice_records: Option<u64>,
) -> Result<TriggerFlowResult> {
if let Some(e) =
ensure_expected_dataset_kind(ctx, &self.dataset_handle, dataset_flow_type).await?
Expand All @@ -66,8 +68,13 @@ impl DatasetFlowRunsMut {
let flow_state = flow_service
.trigger_manual_flow(
Utc::now(),
fs::FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
fs::FlowKeyDataset::new(
self.dataset_handle.id.clone(),
dataset_flow_type.into(),
max_slice_size,
max_slice_records,
)
.into(),
odf::AccountID::from(odf::FAKE_ACCOUNT_ID),
logged_account.account_name,
)
Expand Down
11 changes: 10 additions & 1 deletion src/adapter/graphql/src/mutations/flows_mut/flows_mut_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,22 @@ pub(crate) async fn ensure_flow_preconditions(
}));
};
}
DatasetFlowType::Compaction => (),
DatasetFlowType::HardCompaction => (),
}
Ok(None)
}

///////////////////////////////////////////////////////////////////////////////

pub(crate) fn ensure_set_config_flow_supported(dataset_flow_type: DatasetFlowType) -> bool {
match dataset_flow_type {
DatasetFlowType::Ingest | DatasetFlowType::ExecuteTransform => true,
DatasetFlowType::HardCompaction => false,
}
}

///////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject, Debug, Clone)]
#[graphql(complex)]
pub struct FlowPreconditionsNotMet {
Expand Down
17 changes: 14 additions & 3 deletions src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,13 @@ impl DatasetFlowConfigs {
let flow_config_service = from_catalog::<dyn FlowConfigurationService>(ctx).unwrap();
let maybe_flow_config = flow_config_service
.find_configuration(
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
FlowKeyDataset::new(
self.dataset_handle.id.clone(),
dataset_flow_type.into(),
None,
None,
)
.into(),
)
.await
.int_err()?;
Expand All @@ -54,7 +59,13 @@ impl DatasetFlowConfigs {
for dataset_flow_type in kamu_flow_system::DatasetFlowType::all() {
let maybe_flow_config = flow_config_service
.find_configuration(
FlowKeyDataset::new(self.dataset_handle.id.clone(), *dataset_flow_type).into(),
FlowKeyDataset::new(
self.dataset_handle.id.clone(),
*dataset_flow_type,
None,
None,
)
.into(),
)
.await
.int_err()?;
Expand Down
48 changes: 39 additions & 9 deletions src/adapter/graphql/src/queries/flows/flow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,13 @@ impl Flow {
.int_err()?,
})
}
fs::DatasetFlowType::Compaction => {
FlowDescriptionDataset::Compaction(FlowDescriptionDatasetCompaction {
fs::DatasetFlowType::HardCompaction => {
FlowDescriptionDataset::HardCompaction(FlowDescriptionDatasetHardCompaction {
dataset_id: dataset_key.dataset_id.clone().into(),
original_blocks_count: 0, // TODO
resulting_blocks_count: None, // TODO
compact_result:
FlowDescriptionDatasetHardCompactionResult::from_maybe_flow_outcome(
self.flow_state.outcome.as_ref(),
),
})
}
})
Expand Down Expand Up @@ -226,7 +228,7 @@ enum FlowDescriptionDataset {
PollingIngest(FlowDescriptionDatasetPollingIngest),
PushIngest(FlowDescriptionDatasetPushIngest),
ExecuteTransform(FlowDescriptionDatasetExecuteTransform),
Compaction(FlowDescriptionDatasetCompaction),
HardCompaction(FlowDescriptionDatasetHardCompaction),
}

#[derive(SimpleObject)]
Expand All @@ -250,10 +252,9 @@ struct FlowDescriptionDatasetExecuteTransform {
}

#[derive(SimpleObject)]
struct FlowDescriptionDatasetCompaction {
struct FlowDescriptionDatasetHardCompaction {
dataset_id: DatasetID,
original_blocks_count: u64,
resulting_blocks_count: Option<u64>,
compact_result: Option<FlowDescriptionDatasetHardCompactionResult>,
}

///////////////////////////////////////////////////////////////////////////////
Expand All @@ -274,7 +275,7 @@ impl FlowDescriptionUpdateResult {
if let Some(outcome) = maybe_outcome {
match outcome {
fs::FlowOutcome::Success(result) => match result {
fs::FlowResult::Empty => Ok(None),
fs::FlowResult::Empty | fs::FlowResult::DatasetCompact(_) => Ok(None),
fs::FlowResult::DatasetUpdate(update) => {
let increment = dataset_changes_service
.get_increment_between(
Expand All @@ -301,3 +302,32 @@ impl FlowDescriptionUpdateResult {
}

///////////////////////////////////////////////////////////////////////////////

#[derive(SimpleObject)]
struct FlowDescriptionDatasetHardCompactionResult {
original_blocks_count: u64,
resulting_blocks_count: u64,
new_head: Multihash,
}

impl FlowDescriptionDatasetHardCompactionResult {
fn from_maybe_flow_outcome(maybe_outcome: Option<&fs::FlowOutcome>) -> Option<Self> {
if let Some(outcome) = maybe_outcome {
match outcome {
fs::FlowOutcome::Success(result) => match result {
fs::FlowResult::Empty | fs::FlowResult::DatasetUpdate(_) => None,
fs::FlowResult::DatasetCompact(compact) => Some(Self {
original_blocks_count: compact.old_num_blocks as u64,
resulting_blocks_count: compact.new_num_blocks as u64,
new_head: compact.new_head.clone().into(),
}),
},
_ => None,
}
} else {
None
}
}
}

///////////////////////////////////////////////////////////////////////////////
2 changes: 1 addition & 1 deletion src/adapter/graphql/src/scalars/flow_scalars.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ impl From<&kamu_flow_system::FlowOutcome> for FlowOutcome {
pub enum DatasetFlowType {
Ingest,
ExecuteTransform,
Compaction,
HardCompaction,
}

/////////////////////////////////////////////////////////////////////////////////////////
Expand Down
86 changes: 86 additions & 0 deletions src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,92 @@ async fn test_incorrect_dataset_kinds_for_flow_type() {

////////////////////////////////////////////////////////////////////////////////////////

#[test_log::test(tokio::test)]
async fn test_set_config_for_hard_compaction_fails() {
let harness = FlowConfigHarness::with_overrides(FlowRunsHarnessOverrides {
transform_service_mock: Some(MockTransformService::without_set_transform()),
polling_service_mock: Some(MockPollingIngestService::without_active_polling_source()),
});
let create_root_result = harness.create_root_dataset().await;

////

let mutation_code = FlowConfigHarness::set_config_batching_mutation(
&create_root_result.dataset_handle.id,
"HARD_COMPACTION",
false,
1,
(30, "MINUTES"),
);

let schema = kamu_adapter_graphql::schema_quiet();

let response = schema
.execute(
async_graphql::Request::new(mutation_code.clone())
.data(harness.catalog_authorized.clone()),
)
.await;

assert!(response.is_ok(), "{response:?}");
assert_eq!(
response.data,
value!({
"datasets": {
"byId": {
"flows": {
"configs": {
"setConfigBatching": {
"__typename": "FlowTypeIsNotSupported",
"message": "Flow type is not supported",
}
}
}
}
}
})
);

////

let mutation_code = FlowConfigHarness::set_config_cron_expression_mutation(
&create_root_result.dataset_handle.id,
"HARD_COMPACTION",
false,
"0 */2 * * *",
);

let schema = kamu_adapter_graphql::schema_quiet();

let response = schema
.execute(
async_graphql::Request::new(mutation_code.clone())
.data(harness.catalog_authorized.clone()),
)
.await;

assert!(response.is_ok(), "{response:?}");
assert_eq!(
response.data,
value!({
"datasets": {
"byId": {
"flows": {
"configs": {
"setConfigSchedule": {
"__typename": "FlowTypeIsNotSupported",
"message": "Flow type is not supported",
}
}
}
}
}
})
);
}

////////////////////////////////////////////////////////////////////////////////////////

#[test_log::test(tokio::test)]
async fn test_anonymous_setters_fail() {
let harness = FlowConfigHarness::with_overrides(FlowRunsHarnessOverrides {
Expand Down
Loading

0 comments on commit 3a3e28b

Please sign in to comment.