Skip to content

Commit

Permalink
Remove max slice gql params
Browse files Browse the repository at this point in the history
  • Loading branch information
rmn-boiko committed Apr 9, 2024
1 parent 3a3e28b commit accd861
Show file tree
Hide file tree
Showing 14 changed files with 72 additions and 131 deletions.
23 changes: 16 additions & 7 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -391,7 +391,7 @@ type DatasetFlowRunsMut {
enum DatasetFlowType {
INGEST
EXECUTE_TRANSFORM
COMPACTION
HARD_COMPACTION
}

type DatasetFlows {
Expand Down Expand Up @@ -736,17 +736,22 @@ type FlowConnection {
edges: [FlowEdge!]!
}

union FlowDescription = FlowDescriptionDatasetPollingIngest | FlowDescriptionDatasetPushIngest | FlowDescriptionDatasetExecuteTransform | FlowDescriptionDatasetCompaction | FlowDescriptionSystemGC
union FlowDescription = FlowDescriptionDatasetPollingIngest | FlowDescriptionDatasetPushIngest | FlowDescriptionDatasetExecuteTransform | FlowDescriptionDatasetHardCompaction | FlowDescriptionSystemGC

type FlowDescriptionDatasetCompaction {
type FlowDescriptionDatasetExecuteTransform {
datasetId: DatasetID!
originalBlocksCount: Int!
resultingBlocksCount: Int
transformResult: FlowDescriptionUpdateResult
}

type FlowDescriptionDatasetExecuteTransform {
type FlowDescriptionDatasetHardCompaction {
datasetId: DatasetID!
transformResult: FlowDescriptionUpdateResult
compactResult: FlowDescriptionDatasetHardCompactionResult
}

type FlowDescriptionDatasetHardCompactionResult {
originalBlocksCount: Int!
resultingBlocksCount: Int!
newHead: Multihash!
}

type FlowDescriptionDatasetPollingIngest {
Expand Down Expand Up @@ -905,6 +910,10 @@ type FlowTriggerPush {
dummy: Boolean!
}

type FlowTypeIsNotSupported implements SetFlowConfigResult {
message: String!
}

interface GetFlowResult {
message: String!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,13 +83,8 @@ impl DatasetFlowConfigsMut {
let res = flow_config_service
.set_configuration(
Utc::now(),
FlowKeyDataset::new(
self.dataset_handle.id.clone(),
dataset_flow_type.into(),
None,
None,
)
.into(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
paused,
FlowConfigurationRule::Schedule(configuration_rule),
)
Expand Down Expand Up @@ -148,13 +143,8 @@ impl DatasetFlowConfigsMut {
let res = flow_config_service
.set_configuration(
Utc::now(),
FlowKeyDataset::new(
self.dataset_handle.id.clone(),
dataset_flow_type.into(),
None,
None,
)
.into(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
paused,
FlowConfigurationRule::BatchingRule(batching_rule),
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,6 @@ impl DatasetFlowRunsMut {
&self,
ctx: &Context<'_>,
dataset_flow_type: DatasetFlowType,
max_slice_size: Option<u64>,
max_slice_records: Option<u64>,
) -> Result<TriggerFlowResult> {
if let Some(e) =
ensure_expected_dataset_kind(ctx, &self.dataset_handle, dataset_flow_type).await?
Expand All @@ -68,13 +66,8 @@ impl DatasetFlowRunsMut {
let flow_state = flow_service
.trigger_manual_flow(
Utc::now(),
fs::FlowKeyDataset::new(
self.dataset_handle.id.clone(),
dataset_flow_type.into(),
max_slice_size,
max_slice_records,
)
.into(),
fs::FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
odf::AccountID::from(odf::FAKE_ACCOUNT_ID),
logged_account.account_name,
)
Expand Down
17 changes: 3 additions & 14 deletions src/adapter/graphql/src/queries/datasets/dataset_flow_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,13 +37,8 @@ impl DatasetFlowConfigs {
let flow_config_service = from_catalog::<dyn FlowConfigurationService>(ctx).unwrap();
let maybe_flow_config = flow_config_service
.find_configuration(
FlowKeyDataset::new(
self.dataset_handle.id.clone(),
dataset_flow_type.into(),
None,
None,
)
.into(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), dataset_flow_type.into())
.into(),
)
.await
.int_err()?;
Expand All @@ -59,13 +54,7 @@ impl DatasetFlowConfigs {
for dataset_flow_type in kamu_flow_system::DatasetFlowType::all() {
let maybe_flow_config = flow_config_service
.find_configuration(
FlowKeyDataset::new(
self.dataset_handle.id.clone(),
*dataset_flow_type,
None,
None,
)
.into(),
FlowKeyDataset::new(self.dataset_handle.id.clone(), *dataset_flow_type).into(),
)
.await
.int_err()?;
Expand Down
46 changes: 12 additions & 34 deletions src/adapter/graphql/tests/tests/test_gql_dataset_flow_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -718,22 +718,6 @@ async fn test_pause_resume_dataset_flows() {
.await;
assert!(res.is_ok(), "{res:?}");

let mutation_set_compacting = FlowConfigHarness::set_config_time_delta_mutation(
&create_root_result.dataset_handle.id,
"COMPACTION",
false,
1,
"WEEKS",
);

let res = schema
.execute(
async_graphql::Request::new(mutation_set_compacting)
.data(harness.catalog_authorized.clone()),
)
.await;
assert!(res.is_ok(), "{res:?}");

let mutation_set_transform = FlowConfigHarness::set_config_batching_mutation(
&create_derived_result.dataset_handle.id,
"EXECUTE_TRANSFORM",
Expand All @@ -752,7 +736,6 @@ async fn test_pause_resume_dataset_flows() {

let flow_cases = [
(&create_root_result.dataset_handle.id, "INGEST"),
(&create_root_result.dataset_handle.id, "COMPACTION"),
(
&create_derived_result.dataset_handle.id,
"EXECUTE_TRANSFORM",
Expand All @@ -766,7 +749,7 @@ async fn test_pause_resume_dataset_flows() {

// Ensure all flow configs are not paused
for ((dataset_id, dataset_flow_type), expect_paused) in
flow_cases.iter().zip(vec![false, false, false])
flow_cases.iter().zip(vec![false, false])
{
check_flow_config_status(
&harness,
Expand All @@ -784,8 +767,8 @@ async fn test_pause_resume_dataset_flows() {
// Pause compaction of root

let mutation_pause_root_compacting = FlowConfigHarness::pause_flows_of_type_mutation(
&create_root_result.dataset_handle.id,
"COMPACTION",
&create_derived_result.dataset_handle.id,
"EXECUTE_TRANSFORM",
);

let res = schema
Expand All @@ -795,10 +778,8 @@ async fn test_pause_resume_dataset_flows() {
)
.await;
assert!(res.is_ok(), "{res:?}");

// Compaction should be paused
for ((dataset_id, dataset_flow_type), expect_paused) in
flow_cases.iter().zip(vec![false, true, false])
for ((dataset_id, dataset_flow_type), expect_paused) in flow_cases.iter().zip(vec![false, true])
{
check_flow_config_status(
&harness,
Expand All @@ -809,7 +790,7 @@ async fn test_pause_resume_dataset_flows() {
)
.await;
}
for (dataset_id, expect_paused) in dataset_cases.iter().zip(vec![false, false]) {
for (dataset_id, expect_paused) in dataset_cases.iter().zip(vec![false, true]) {
check_dataset_all_configs_status(&harness, &schema, dataset_id, expect_paused).await;
}

Expand All @@ -826,8 +807,7 @@ async fn test_pause_resume_dataset_flows() {
assert!(res.is_ok(), "{res:?}");

// Root flows should be paused
for ((dataset_id, dataset_flow_type), expect_paused) in
flow_cases.iter().zip(vec![true, true, false])
for ((dataset_id, dataset_flow_type), expect_paused) in flow_cases.iter().zip(vec![true, true])
{
check_flow_config_status(
&harness,
Expand All @@ -838,7 +818,7 @@ async fn test_pause_resume_dataset_flows() {
)
.await;
}
for (dataset_id, expect_paused) in dataset_cases.iter().zip(vec![true, false]) {
for (dataset_id, expect_paused) in dataset_cases.iter().zip(vec![true, true]) {
check_dataset_all_configs_status(&harness, &schema, dataset_id, expect_paused).await;
}

Expand All @@ -856,9 +836,8 @@ async fn test_pause_resume_dataset_flows() {
.await;
assert!(res.is_ok(), "{res:?}");

// Only compacting of root should be paused
for ((dataset_id, dataset_flow_type), expect_paused) in
flow_cases.iter().zip(vec![false, true, false])
// Only transform of derive should be paused
for ((dataset_id, dataset_flow_type), expect_paused) in flow_cases.iter().zip(vec![false, true])
{
check_flow_config_status(
&harness,
Expand All @@ -869,7 +848,7 @@ async fn test_pause_resume_dataset_flows() {
)
.await;
}
for (dataset_id, expect_paused) in dataset_cases.iter().zip(vec![false, false]) {
for (dataset_id, expect_paused) in dataset_cases.iter().zip(vec![false, true]) {
check_dataset_all_configs_status(&harness, &schema, dataset_id, expect_paused).await;
}

Expand All @@ -888,8 +867,7 @@ async fn test_pause_resume_dataset_flows() {
assert!(res.is_ok(), "{res:?}");

// Observe status change
for ((dataset_id, dataset_flow_type), expect_paused) in
flow_cases.iter().zip(vec![false, true, true])
for ((dataset_id, dataset_flow_type), expect_paused) in flow_cases.iter().zip(vec![false, true])
{
check_flow_config_status(
&harness,
Expand Down Expand Up @@ -918,7 +896,7 @@ async fn test_pause_resume_dataset_flows() {

// Observe status change
for ((dataset_id, dataset_flow_type), expect_paused) in
flow_cases.iter().zip(vec![false, true, false])
flow_cases.iter().zip(vec![false, false])
{
check_flow_config_status(
&harness,
Expand Down
1 change: 0 additions & 1 deletion src/domain/flow-system/src/entities/flow/flow_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ impl FlowState {
DatasetFlowType::HardCompaction => {
ts::LogicalPlan::CompactDataset(ts::CompactDataset {
dataset_id: flow_key.dataset_id.clone(),
options: flow_key.options.clone(),
})
}
},
Expand Down
13 changes: 1 addition & 12 deletions src/domain/flow-system/src/entities/shared/flow_key.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0.

use kamu_core::compact_service::CompactOptions;
use opendatafabric::DatasetID;

use crate::{AnyFlowType, DatasetFlowType, SystemFlowType};
Expand Down Expand Up @@ -35,23 +34,13 @@ impl FlowKey {
pub struct FlowKeyDataset {
pub dataset_id: DatasetID,
pub flow_type: DatasetFlowType,
pub options: CompactOptions,
}

impl FlowKeyDataset {
pub fn new(
dataset_id: DatasetID,
flow_type: DatasetFlowType,
max_slice_size: Option<u64>,
max_slice_records: Option<u64>,
) -> Self {
pub fn new(dataset_id: DatasetID, flow_type: DatasetFlowType) -> Self {
Self {
dataset_id,
flow_type,
options: CompactOptions {
max_slice_size,
max_slice_records,
},
}
}
}
Expand Down
2 changes: 0 additions & 2 deletions src/domain/task-system/domain/src/entities/logical_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
// by the Apache License, Version 2.0.

use enum_variants::*;
use kamu_core::compact_service::CompactOptions;
use opendatafabric::DatasetID;
use serde::{Deserialize, Serialize};

Expand Down Expand Up @@ -66,7 +65,6 @@ pub struct Probe {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct CompactDataset {
pub dataset_id: DatasetID,
pub options: CompactOptions,
}

/////////////////////////////////////////////////////////////////////////////////////////
Expand Down
9 changes: 3 additions & 6 deletions src/domain/task-system/services/src/task_executor_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use std::sync::Arc;

use dill::*;
use event_bus::EventBus;
use kamu_core::compact_service::CompactService;
use kamu_core::compact_service::{CompactOptions, CompactService};
use kamu_core::{DatasetRepository, PullOptions, PullService, SystemTimeSource};
use kamu_task_system::*;

Expand Down Expand Up @@ -117,10 +117,7 @@ impl TaskExecutor for TaskExecutorImpl {
.clone()
.unwrap_or(TaskOutcome::Success(TaskResult::Empty))
}
LogicalPlan::CompactDataset(CompactDataset {
dataset_id,
options,
}) => {
LogicalPlan::CompactDataset(CompactDataset { dataset_id }) => {
let compact_svc = self.catalog.get_one::<dyn CompactService>().int_err()?;
let dataset_repo = self.catalog.get_one::<dyn DatasetRepository>().int_err()?;
let dataset_handle = dataset_repo
Expand All @@ -129,7 +126,7 @@ impl TaskExecutor for TaskExecutorImpl {
.int_err()?;

let compact_result = compact_svc
.compact_dataset(&dataset_handle, options, None)
.compact_dataset(&dataset_handle, &CompactOptions::default(), None)
.await;

match compact_result {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,8 +297,6 @@ impl FlowServiceInMemory {
let dependent_flow_key = FlowKeyDataset::new(
dependent_dataset_id.clone(),
DatasetFlowType::ExecuteTransform,
None,
None,
)
.into();

Expand Down
Loading

0 comments on commit accd861

Please sign in to comment.