Skip to content

Commit

Permalink
add hacky test
Browse files Browse the repository at this point in the history
  • Loading branch information
edgao committed Jan 28, 2025
1 parent df56f9a commit 0886140
Showing 1 changed file with 48 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,10 @@

import static org.assertj.core.api.Assertions.assertThat;

import com.azure.core.http.rest.PagedIterable;
import com.azure.storage.blob.BlobContainerClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import com.azure.storage.blob.models.BlobItem;
import com.azure.storage.common.StorageSharedKeyCredential;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
Expand Down Expand Up @@ -47,16 +49,15 @@ public class AzureBlobStorageSpillTest {
private BlobContainerClient blobContainerClient;

private static final ObjectMapper mapper = MoreMappers.initMapper();
private AzureBlobStorageDestinationConfig azureBlobStorageDestinationConfig;

@BeforeEach
void setup() {
azureBlobStorageContainer = new AzureBlobStorageContainer().withExposedPorts(10000);
azureBlobStorageContainer.start();
var azureBlobStorageDestinationConfig = createConfig(azureBlobStorageContainer.getHost(), azureBlobStorageContainer.getMappedPort(10000));
azureBlobStorageDestinationConfig = createConfig(azureBlobStorageContainer.getHost(), azureBlobStorageContainer.getMappedPort(10000));
var configuredAirbyteCatalog = createConfiguredAirbyteCatalog();
azureBlobStorageConsumer =
new AzureBlobStorageConsumer(azureBlobStorageDestinationConfig, configuredAirbyteCatalog,
new ProductionWriterFactory(), m -> {});
azureBlobStorageConsumer = createConsumer(configuredAirbyteCatalog);
var credential = new StorageSharedKeyCredential(
azureBlobStorageDestinationConfig.getAccountName(),
azureBlobStorageDestinationConfig.getAccountKey());
Expand Down Expand Up @@ -99,6 +100,39 @@ void testSpillBlobWithExceedingSize() throws Exception {
.anyMatch(blobItem -> blobItem.getName().endsWith("_0"))
.anyMatch(blobItem -> blobItem.getName().endsWith("_1"));

// Run a sync to a stream whose name is a suffix of the first sync's stream.
// There was previously a bug where this would delete data from the first sync.
String overwriteStreamName = AIRBYTE_STREAM.substring(2);
var overwriteCatalog = new ConfiguredAirbyteCatalog()
.withStreams(
List.of(
new ConfiguredAirbyteStream()
.withDestinationSyncMode(DestinationSyncMode.OVERWRITE)
.withStream(createAirbyteStream(overwriteStreamName))));
var overwriteAzureBlobStorageConsumer = createConsumer(overwriteCatalog);
overwriteAzureBlobStorageConsumer.startTracked();
overwriteAzureBlobStorageConsumer.acceptTracked(
createAirbyteMessage(
overwriteStreamName,
Jsons.deserialize(
"""
{"foo": "bar"}
""")));
overwriteAzureBlobStorageConsumer.close(false);
PagedIterable<BlobItem> actualBlobsAfterOverwrite = blobContainerClient.listBlobs();
assertThat(actualBlobsAfterOverwrite)
.as("Blob names were: " + actualBlobsAfterOverwrite.stream().map(BlobItem::getName).toList())
.hasSize(3)
.anyMatch(blobItem -> blobItem.getName().startsWith(AIRBYTE_STREAM) && blobItem.getName().endsWith("_0"))
.anyMatch(blobItem -> blobItem.getName().startsWith(AIRBYTE_STREAM) && blobItem.getName().endsWith("_1"))
.anyMatch(blobItem -> blobItem.getName().startsWith(overwriteStreamName) && blobItem.getName().endsWith("_0"));
}

private AzureBlobStorageConsumer createConsumer(ConfiguredAirbyteCatalog configuredAirbyteCatalog) {
return new AzureBlobStorageConsumer(
azureBlobStorageDestinationConfig,
configuredAirbyteCatalog,
new ProductionWriterFactory(), m -> {});
}

private static AzureBlobStorageDestinationConfig createConfig(String host, Integer mappedPort) {
Expand All @@ -118,18 +152,26 @@ private static AzureBlobStorageDestinationConfig createConfig(String host, Integ
}

private static AirbyteMessage createAirbyteMessage(JsonNode data) {
return createAirbyteMessage(AIRBYTE_STREAM, data);
}

private static AirbyteMessage createAirbyteMessage(String streamName, JsonNode data) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(AIRBYTE_STREAM)
.withStream(streamName)
.withNamespace(AIRBYTE_NAMESPACE)
.withData(data)
.withEmittedAt(Instant.now().toEpochMilli()));
}

private static AirbyteStream createAirbyteStream() {
return createAirbyteStream(AIRBYTE_STREAM);
}

private static AirbyteStream createAirbyteStream(String name) {
return new AirbyteStream()
.withName(AIRBYTE_STREAM)
.withName(name)
.withNamespace(AIRBYTE_NAMESPACE)
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH));
}
Expand Down

0 comments on commit 0886140

Please sign in to comment.