Skip to content

Commit

Permalink
Bulk load cdk: throw "no end-of-stream" error as transient (#51608)
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao authored Jan 17, 2025
1 parent 787d6fa commit c9e8c81
Show file tree
Hide file tree
Showing 2 changed files with 4 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
package io.airbyte.cdk.load.state

import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.load.command.DestinationCatalog
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.write.StreamLoader
Expand Down Expand Up @@ -141,7 +142,7 @@ class DefaultSyncManager(
.map { (stream, _) -> stream }
if (incompleteStreams.isNotEmpty()) {
val prettyStreams = incompleteStreams.map { it.toPrettyString() }
throw IllegalStateException(
throw TransientErrorException(
"Input was fully read, but some streams did not receive a terminal stream status message. This likely indicates an error in the source or platform. Streams without a status message: $prettyStreams"
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

package io.airbyte.cdk.load.state

import io.airbyte.cdk.TransientErrorException
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream1
import io.airbyte.cdk.load.command.MockDestinationCatalogFactory.Companion.stream2
Expand Down Expand Up @@ -133,7 +134,7 @@ class SyncManagerTest {
val manager1 = syncManager.getStreamManager(stream1.descriptor)
manager1.markEndOfStream(true)
// This should fail, because stream2 was not marked with end of stream
val e = assertThrows<IllegalStateException> { syncManager.markInputConsumed() }
val e = assertThrows<TransientErrorException> { syncManager.markInputConsumed() }
assertEquals(
// stream1 is fine, so the message only includes stream2
"Input was fully read, but some streams did not receive a terminal stream status message. This likely indicates an error in the source or platform. Streams without a status message: [test.stream2]",
Expand Down

0 comments on commit c9e8c81

Please sign in to comment.