Skip to content

Commit

Permalink
#520 Fix a regression introduced by the refactoring.
Browse files Browse the repository at this point in the history
  • Loading branch information
yruslan committed Jan 2, 2025
1 parent b7b15fc commit cc0301e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ class SinkJob(operationDef: OperationDef,

val result = RunResult(getDataDf(infoDate, metastoreReader))

if (isIncremental) {
// This ensures offsets are tracked for the input table used as sink's source.
metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalStage()
}

result
}

Expand Down Expand Up @@ -167,6 +172,7 @@ class SinkJob(operationDef: OperationDef,
)

if (isIncremental) {
// This ensures offsets are tracked for all incremental tables used by 'sink.send()'.
metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalOutputTable(sinkTable.metaTableName, s"${sinkTable.metaTableName}->$sinkName")
metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalStage()
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ class TransformationJob(operationDef: OperationDef,
val runResult = RunResult(transformer.run(metastoreReader, infoDate, operationDef.extraOptions))

if (isIncremental) {
// Output tables for transient transformations should not be tracked since they are calculated on-demand.
if (!outputTable.format.isTransient)
metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalOutputTable(outputTable.name, outputTable.name)
metastoreReader.asInstanceOf[MetastoreReaderIncremental].commitIncrementalStage()
Expand Down

0 comments on commit cc0301e

Please sign in to comment.