Skip to content

Commit

Permalink
Correct dataset id is sent when input dataset was compacted before tr…
Browse files Browse the repository at this point in the history
…ansform (#823)
  • Loading branch information
zaychenko-sergei authored Sep 10, 2024
1 parent b632ce1 commit a7d11ce
Show file tree
Hide file tree
Showing 11 changed files with 90 additions and 68 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,10 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
<!--- - Changed -->
<!--- - Fixed -->

## [Unreleased]
### Fixed
- Associating correct input dataset that was hard compacted with the error during transformation of derived dataset

## [0.199.2] - 2024-09-09
### Added
- REST API: The `/query` endpoint now supports response proofs via reproducibility and signing (#816)
Expand Down
16 changes: 8 additions & 8 deletions resources/schema.gql
Original file line number Diff line number Diff line change
Expand Up @@ -995,11 +995,6 @@ type FlowConnection {
edges: [FlowEdge!]!
}

type FlowDatasetCompactedFailedError {
rootDataset: Dataset!
message: String!
}

union FlowDescription = FlowDescriptionDatasetPollingIngest | FlowDescriptionDatasetPushIngest | FlowDescriptionDatasetExecuteTransform | FlowDescriptionDatasetHardCompaction | FlowDescriptionDatasetReset | FlowDescriptionSystemGC

type FlowDescriptionDatasetExecuteTransform {
Expand Down Expand Up @@ -1106,14 +1101,19 @@ type FlowEventTriggerAdded implements FlowEvent {
}

type FlowFailedError {
reason: FlowFailedReason!
reason: FlowFailureReason!
}

type FlowFailedMessage {
union FlowFailureReason = FlowFailureReasonGeneral | FlowFailureReasonInputDatasetCompacted

type FlowFailureReasonGeneral {
message: String!
}

union FlowFailedReason = FlowFailedMessage | FlowDatasetCompactedFailedError
type FlowFailureReasonInputDatasetCompacted {
inputDataset: Dataset!
message: String!
}

scalar FlowID

Expand Down
28 changes: 14 additions & 14 deletions src/adapter/graphql/src/queries/flows/flow_outcome.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,17 +24,17 @@ pub(crate) enum FlowOutcome {

#[derive(SimpleObject)]
pub(crate) struct FlowFailedError {
reason: FlowFailedReason,
reason: FlowFailureReason,
}

#[derive(Union)]
pub(crate) enum FlowFailedReason {
FlowFailed(FlowFailedMessage),
FlowDatasetCompactedFailed(FlowDatasetCompactedFailedError),
pub(crate) enum FlowFailureReason {
General(FlowFailureReasonGeneral),
InputDatasetCompacted(FlowFailureReasonInputDatasetCompacted),
}

#[derive(SimpleObject)]
pub(crate) struct FlowFailedMessage {
pub(crate) struct FlowFailureReasonGeneral {
message: String,
}

Expand All @@ -49,8 +49,8 @@ pub(crate) struct FlowAbortedResult {
}

#[derive(SimpleObject)]
pub(crate) struct FlowDatasetCompactedFailedError {
root_dataset: Dataset,
pub(crate) struct FlowFailureReasonInputDatasetCompacted {
input_dataset: Dataset,
message: String,
}

Expand All @@ -66,11 +66,11 @@ impl FlowOutcome {
}),
kamu_flow_system::FlowOutcome::Failed(err) => match err {
FlowError::Failed => Self::Failed(FlowFailedError {
reason: FlowFailedReason::FlowFailed(FlowFailedMessage {
reason: FlowFailureReason::General(FlowFailureReasonGeneral {
message: "FAILED".to_owned(),
}),
}),
FlowError::RootDatasetCompacted(err) => {
FlowError::InputDatasetCompacted(err) => {
let dataset_repository =
from_catalog::<dyn DatasetRepository>(ctx).unwrap();
let hdl = dataset_repository
Expand All @@ -84,16 +84,16 @@ impl FlowOutcome {

let dataset = Dataset::new(account, hdl);
Self::Failed(FlowFailedError {
reason: FlowFailedReason::FlowDatasetCompactedFailed(
FlowDatasetCompactedFailedError {
message: "Root dataset was compacted".to_owned(),
root_dataset: dataset,
reason: FlowFailureReason::InputDatasetCompacted(
FlowFailureReasonInputDatasetCompacted {
message: "Input dataset was compacted".to_owned(),
input_dataset: dataset,
},
),
})
}
FlowError::ResetHeadNotFound => Self::Failed(FlowFailedError {
reason: FlowFailedReason::FlowFailed(FlowFailedMessage {
reason: FlowFailureReason::General(FlowFailureReasonGeneral {
message: "New head hash to reset not found".to_owned(),
}),
}),
Expand Down
16 changes: 8 additions & 8 deletions src/adapter/graphql/tests/tests/test_gql_account_flow_configs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -823,12 +823,12 @@ impl FlowConfigHarness {
}
...on FlowFailedError {
reason {
...on FlowFailedMessage {
...on FlowFailureReasonGeneral {
message
}
...on FlowDatasetCompactedFailedError {
...on FlowFailureReasonInputDatasetCompacted {
message
rootDataset {
inputDataset {
id
}
}
Expand Down Expand Up @@ -927,12 +927,12 @@ impl FlowConfigHarness {
}
...on FlowFailedError {
reason {
...on FlowFailedMessage {
message
}
...on FlowDatasetCompactedFailedError {
...on FlowFailureReasonGeneral {
message
}
...on FlowFailureReasonInputDatasetCompacted {
message
rootDataset {
inputDataset {
id
}
}
Expand Down
48 changes: 24 additions & 24 deletions src/adapter/graphql/tests/tests/test_gql_dataset_flow_runs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2703,11 +2703,11 @@ async fn test_execute_transfrom_flow_error_after_compaction() {
})
.await;

let create_result = harness.create_root_dataset().await;
let create_root_result = harness.create_root_dataset().await;
let create_derived_result = harness.create_derived_dataset().await;

let mutation_code = FlowRunsHarness::trigger_flow_with_compaction_config_mutation(
&create_result.dataset_handle.id,
&create_root_result.dataset_handle.id,
"HARD_COMPACTION",
10000,
1_000_000,
Expand Down Expand Up @@ -2781,7 +2781,7 @@ async fn test_execute_transfrom_flow_error_after_compaction() {
)
.await;

let request_code = FlowRunsHarness::list_flows_query(&create_result.dataset_handle.id);
let request_code = FlowRunsHarness::list_flows_query(&create_root_result.dataset_handle.id);
let response = schema
.execute(
async_graphql::Request::new(request_code.clone())
Expand All @@ -2803,7 +2803,7 @@ async fn test_execute_transfrom_flow_error_after_compaction() {
"flowId": "0",
"description": {
"__typename": "FlowDescriptionDatasetHardCompaction",
"datasetId": create_result.dataset_handle.id.to_string(),
"datasetId": create_root_result.dataset_handle.id.to_string(),
"compactionResult": {
"originalBlocksCount": 5,
"resultingBlocksCount": 4,
Expand Down Expand Up @@ -2920,8 +2920,8 @@ async fn test_execute_transfrom_flow_error_after_compaction() {
flow_task_metadata,
complete_time,
ts::TaskOutcome::Failed(ts::TaskError::UpdateDatasetError(
ts::UpdateDatasetTaskError::RootDatasetCompacted(ts::RootDatasetCompactedError {
dataset_id: create_result.dataset_handle.id.clone(),
ts::UpdateDatasetTaskError::InputDatasetCompacted(ts::InputDatasetCompactedError {
dataset_id: create_root_result.dataset_handle.id.clone(),
}),
)),
)
Expand Down Expand Up @@ -2955,9 +2955,9 @@ async fn test_execute_transfrom_flow_error_after_compaction() {
"status": "FINISHED",
"outcome": {
"reason": {
"message": "Root dataset was compacted",
"rootDataset": {
"id": create_result.dataset_handle.id.to_string()
"message": "Input dataset was compacted",
"inputDataset": {
"id": create_root_result.dataset_handle.id.to_string()
}
}
},
Expand Down Expand Up @@ -3509,12 +3509,12 @@ impl FlowRunsHarness {
}
...on FlowFailedError {
reason {
...on FlowFailedMessage {
...on FlowFailureReasonGeneral {
message
}
...on FlowDatasetCompactedFailedError {
...on FlowFailureReasonInputDatasetCompacted {
message
rootDataset {
inputDataset {
id
}
}
Expand Down Expand Up @@ -3720,12 +3720,12 @@ impl FlowRunsHarness {
}
...on FlowFailedError {
reason {
...on FlowFailedMessage {
...on FlowFailureReasonGeneral {
message
}
...on FlowDatasetCompactedFailedError {
...on FlowFailureReasonInputDatasetCompacted {
message
rootDataset {
inputDataset {
id
}
}
Expand Down Expand Up @@ -3790,12 +3790,12 @@ impl FlowRunsHarness {
}
...on FlowFailedError {
reason {
...on FlowFailedMessage {
...on FlowFailureReasonGeneral {
message
}
...on FlowDatasetCompactedFailedError {
...on FlowFailureReasonInputDatasetCompacted {
message
rootDataset {
inputDataset {
id
}
}
Expand Down Expand Up @@ -3861,12 +3861,12 @@ impl FlowRunsHarness {
}
...on FlowFailedError {
reason {
...on FlowFailedMessage {
...on FlowFailureReasonGeneral {
message
}
...on FlowDatasetCompactedFailedError {
...on FlowFailureReasonInputDatasetCompacted {
message
rootDataset {
inputDataset {
id
}
}
Expand Down Expand Up @@ -3917,12 +3917,12 @@ impl FlowRunsHarness {
}
...on FlowFailedError {
reason {
...on FlowFailedMessage {
...on FlowFailureReasonGeneral {
message
}
...on FlowDatasetCompactedFailedError {
...on FlowFailureReasonInputDatasetCompacted {
message
rootDataset {
inputDataset {
id
}
}
Expand Down
14 changes: 12 additions & 2 deletions src/domain/core/src/services/transform_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,10 +149,10 @@ pub enum TransformError {
AccessError,
),
#[error(transparent)]
InvalidInterval(
InvalidInputInterval(
#[from]
#[backtrace]
InvalidIntervalError,
InvalidInputIntervalError,
),
#[error(transparent)]
Internal(
Expand All @@ -178,6 +178,16 @@ pub struct InputSchemaNotDefinedError {

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

#[derive(Error, Debug)]
#[error("Invalid block interval [{head}, {tail}) in input dataset '{input_dataset_id}'")]
pub struct InvalidInputIntervalError {
pub input_dataset_id: DatasetID,
pub head: Multihash,
pub tail: Multihash,
}

////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

impl From<GetDatasetError> for TransformError {
fn from(v: GetDatasetError) -> Self {
match v {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,12 +58,12 @@ impl FlowResult {
#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum FlowError {
Failed,
RootDatasetCompacted(FlowRootDatasetCompactedError),
InputDatasetCompacted(FlowInputDatasetCompactedError),
ResetHeadNotFound,
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct FlowRootDatasetCompactedError {
pub struct FlowInputDatasetCompactedError {
pub dataset_id: DatasetID,
}

Expand All @@ -72,8 +72,8 @@ impl From<&TaskError> for FlowError {
match value {
TaskError::Empty => Self::Failed,
TaskError::UpdateDatasetError(update_dataset_error) => match update_dataset_error {
UpdateDatasetTaskError::RootDatasetCompacted(err) => {
Self::RootDatasetCompacted(FlowRootDatasetCompactedError {
UpdateDatasetTaskError::InputDatasetCompacted(err) => {
Self::InputDatasetCompacted(FlowInputDatasetCompactedError {
dataset_id: err.dataset_id.clone(),
})
}
Expand Down
4 changes: 2 additions & 2 deletions src/domain/task-system/domain/src/entities/task_status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,11 +88,11 @@ pub enum TaskError {

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub enum UpdateDatasetTaskError {
RootDatasetCompacted(RootDatasetCompactedError),
InputDatasetCompacted(InputDatasetCompactedError),
}

#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
pub struct RootDatasetCompactedError {
pub struct InputDatasetCompactedError {
pub dataset_id: DatasetID,
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,10 +88,10 @@ impl TaskLogicalPlanRunnerImpl {
TaskUpdateDatasetResult { pull_result },
))),
Err(err) => match err {
PullError::TransformError(TransformError::InvalidInterval(_)) => {
PullError::TransformError(TransformError::InvalidInputInterval(e)) => {
Ok(TaskOutcome::Failed(TaskError::UpdateDatasetError(
UpdateDatasetTaskError::RootDatasetCompacted(RootDatasetCompactedError {
dataset_id: args.dataset_id.clone(),
UpdateDatasetTaskError::InputDatasetCompacted(InputDatasetCompactedError {
dataset_id: e.input_dataset_id,
}),
)))
}
Expand Down
10 changes: 8 additions & 2 deletions src/infra/core/src/transform_service_impl.rs
Original file line number Diff line number Diff line change
Expand Up @@ -394,7 +394,13 @@ impl TransformServiceImpl {
.try_collect()
.await
.map_err(|chain_err| match chain_err {
IterBlocksError::InvalidInterval(err) => TransformError::InvalidInterval(err),
IterBlocksError::InvalidInterval(err) => {
TransformError::InvalidInputInterval(InvalidInputIntervalError {
head: err.head,
tail: err.tail,
input_dataset_id: dataset_handle.id.clone(),
})
}
_ => TransformError::Internal(chain_err.int_err()),
})?
} else {
Expand Down Expand Up @@ -704,7 +710,7 @@ impl TransformServiceImpl {
listener.success(&TransformResult::UpToDate);
Ok(TransformResult::UpToDate)
}
Err(err @ TransformError::InvalidInterval(_))
Err(err @ TransformError::InvalidInputInterval(_))
if options.reset_derivatives_on_diverged_input =>
{
tracing::warn!(
Expand Down
Loading

0 comments on commit a7d11ce

Please sign in to comment.