diff --git a/README.md b/README.md index f9f8783..c60dab6 100644 --- a/README.md +++ b/README.md @@ -312,7 +312,7 @@ These are the options for `DruidSource`, to be passed with `write.options()`. | `druid.segment.max_rows` | Max number of rows per segment | `5000000` | | `druid.memory.max_rows` | Max number of rows to keep in memory in spark data writer | `75000` | | `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`. | `s3` | -| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `false`, `druid.segment_granularity` must match with the existing segments in the target datasource. | `false` | +| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `true`, any pre-existing segments for the datasource is marked as unused. | `false` | | `druid.bitmap_factory` | Compression format for bitmap indexes. Possible values: `concise`, `roaring`. For type `roaring`, the boolean property compressRunOnSerialization is always set to `true`. `rovio-ingest` uses `concise` by default regardless of Druid library version. | `concise` | | `druid.segment.rollup` | Whether to rollup data during ingestion | `true` | diff --git a/src/main/java/com/rovio/ingest/DruidDataSourceWriter.java b/src/main/java/com/rovio/ingest/DruidDataSourceWriter.java index c4b865e..2f656be 100644 --- a/src/main/java/com/rovio/ingest/DruidDataSourceWriter.java +++ b/src/main/java/com/rovio/ingest/DruidDataSourceWriter.java @@ -43,7 +43,6 @@ class DruidDataSourceWriter implements BatchWrite { this.segmentSpec = SegmentSpec.from(param.getDataSource(),param.getTimeColumn(), param.getExcludedDimensions(), param.getSegmentGranularity(), param.getQueryGranularity(), schema, param.isRollup(), param.getMetricsSpec()); this.metadataUpdater = new MetadataUpdater(param); - this.metadataUpdater.checkGranularity(segmentSpec.getDataSchema().getGranularitySpec().getSegmentGranularity()); } @Override diff --git a/src/main/java/com/rovio/ingest/util/MetadataUpdater.java b/src/main/java/com/rovio/ingest/util/MetadataUpdater.java index ab81bac..8033063 100644 --- a/src/main/java/com/rovio/ingest/util/MetadataUpdater.java +++ b/src/main/java/com/rovio/ingest/util/MetadataUpdater.java @@ -18,7 +18,6 @@ import com.google.common.base.Preconditions; import com.rovio.ingest.WriterContext; import org.apache.druid.java.util.common.DateTimes; -import org.apache.druid.java.util.common.granularity.Granularity; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; @@ -26,14 +25,12 @@ import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig; import org.apache.druid.timeline.DataSegment; import org.apache.druid.timeline.partition.NoneShardSpec; -import org.joda.time.DateTime; import org.skife.jdbi.v2.PreparedBatch; import org.skife.jdbi.v2.Update; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; -import java.util.Map; import static com.rovio.ingest.DataSegmentCommitMessage.MAPPER; import static java.lang.String.format; @@ -49,10 +46,6 @@ public class MetadataUpdater { "UPDATE %1$s SET used = false" + " WHERE dataSource=:dataSource AND version != :version AND used = true"; - private static final String SELECT_ANY_USED_SEGMENT_SQL = - "SELECT start, end FROM %1$s" + - " WHERE dataSource=:dataSource AND version != :version AND used = true LIMIT 1"; - private final String dataSource; private final String version; private final boolean initDataSource; @@ -144,35 +137,4 @@ public void publishSegments(List dataSegments) { return execute; }); } - - public void checkGranularity(Granularity segmentGranularity) { - if (initDataSource) { - LOG.info("initDataSource set to true, skip segment granularity check"); - return; - } - - Map result = this.sqlConnector - .retryWithHandle(handle -> handle.createQuery(format(SELECT_ANY_USED_SEGMENT_SQL, this.segmentsTable)) - .bind("dataSource", this.dataSource) - .bind("version", this.version) - .first()); - - if (result == null) { - LOG.info("No existing segments found while checking for granularity"); - return; - } - - DateTime start = DateTime.parse((String) result.get("start")); - DateTime end = DateTime.parse((String) result.get("end")); - if (!segmentGranularity.increment(start).equals(end)) { - throw new IllegalStateException(format( - "Found a used segment with granularity mismatch, granularity=%s, start=%s, end=%s, expected end=%s", - segmentGranularity, - start, - end, - segmentGranularity.increment(start) - )); - } - - } } diff --git a/src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java b/src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java index 728e31c..ecaa7c2 100644 --- a/src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java +++ b/src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java @@ -18,7 +18,6 @@ import com.google.common.collect.ImmutableMap; import com.google.common.collect.Table; import com.rovio.ingest.extensions.java.DruidDatasetExtensions; -import org.apache.spark.sql.DataFrameWriter; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; import org.apache.spark.sql.SaveMode; @@ -150,7 +149,7 @@ public void saveWithPartiallyOverWrittenSegments() throws IOException { } @Test - public void saveFailsWithIncrementalAndGranularityChanges() { + public void saveWithIncrementalAndGranularityChanges() throws IOException { Dataset dataset = loadCsv(spark, "/data.csv"); dataset = DruidDatasetExtensions .repartitionByDruidSegmentSize(dataset, "date", "DAY", 5000000, false); @@ -162,6 +161,11 @@ public void saveFailsWithIncrementalAndGranularityChanges() { .options(options) .save(); + Interval interval = new Interval(DateTime.parse("2019-10-16T00:00:00Z"), DateTime.parse("2019-10-18T00:00:00Z")); + String firstVersion = DateTime.now(ISOChronology.getInstanceUTC()).toString(); + verifySegmentPath(Paths.get(testFolder.toString(), DATA_SOURCE), interval, firstVersion, 1, false); + verifySegmentTable(interval, firstVersion, true, 2); + Dataset dataset2 = loadCsv(spark, "/data2.csv"); dataset2.show(false); dataset2 = DruidDatasetExtensions @@ -171,14 +175,19 @@ public void saveFailsWithIncrementalAndGranularityChanges() { DateTimeUtils.setCurrentMillisFixed(VERSION_TIME_MILLIS + 60_000); options.put(SEGMENT_GRANULARITY, "MONTH"); - DataFrameWriter writer = dataset2.write() + dataset2.write() .format(DruidSource.FORMAT) .mode(SaveMode.Overwrite) - .options(options); + .options(options) + .save(); - Throwable thrown = assertThrows(IllegalStateException.class, writer::save); - assertEquals(thrown.getMessage(), - "Found a used segment with granularity mismatch, granularity={type=period, period=P1M, timeZone=UTC, origin=null}, start=2019-10-16T00:00:00.000Z, end=2019-10-17T00:00:00.000Z, expected end=2019-11-16T00:00:00.000Z"); + interval = new Interval(DateTime.parse("2019-10-16T00:00:00Z"), DateTime.parse("2019-10-18T00:00:00Z")); + verifySegmentTable(interval, firstVersion, true, 2); + + String secondVersion = DateTime.now(ISOChronology.getInstanceUTC()).toString(); + interval = new Interval(DateTime.parse("2019-10-01T00:00:00Z"), DateTime.parse("2019-11-01T00:00:00Z")); + verifySegmentTable(interval, secondVersion, true, 1); + verifySegmentPath(Paths.get(testFolder.toString(), DATA_SOURCE), interval, secondVersion, 1, true); } @Test diff --git a/src/test/java/com/rovio/ingest/DruidSourceTest.java b/src/test/java/com/rovio/ingest/DruidSourceTest.java index ccba53e..2b5cc97 100644 --- a/src/test/java/com/rovio/ingest/DruidSourceTest.java +++ b/src/test/java/com/rovio/ingest/DruidSourceTest.java @@ -259,7 +259,7 @@ public void passForPartiallyOverWrittenSegments() throws IOException { } @Test - public void failWhenIncrementalAndGranularityChanges() { + public void shouldSaveWhenIncrementalAndGranularityChanges() throws IOException { Dataset dataset = loadCsv(spark, "/data.csv"); List columns = Lists.newArrayList(unix_timestamp(column("date")).multiply(1000), lit("DAY")); dataset = dataset.withColumn("__partition", @@ -276,6 +276,11 @@ public void failWhenIncrementalAndGranularityChanges() { .options(options) .save(); + Interval interval = new Interval(DateTime.parse("2019-10-16T00:00:00Z"), DateTime.parse("2019-10-18T00:00:00Z")); + String firstVersion = DateTime.now(ISOChronology.getInstanceUTC()).toString(); + verifySegmentPath(Paths.get(testFolder.toString(), DATA_SOURCE), interval, firstVersion, 1, false); + verifySegmentTable(interval, firstVersion, true, 2); + dataset = loadCsv(spark, "/data2.csv"); dataset.show(false); columns = Lists.newArrayList(unix_timestamp(column("date")).multiply(1000), lit("MONTH")); @@ -288,13 +293,18 @@ public void failWhenIncrementalAndGranularityChanges() { DateTimeUtils.setCurrentMillisFixed(VERSION_TIME_MILLIS + 60_000); options.put(SEGMENT_GRANULARITY, "MONTH"); - IllegalStateException thrown = assertThrows(IllegalStateException.class, - () -> dataset2.write() - .format("com.rovio.ingest.DruidSource") - .mode(SaveMode.Overwrite) - .options(options) - .save()); - assertThat(thrown.getMessage(), containsString("Found a used segment with granularity mismatch")); + dataset2.write() + .format("com.rovio.ingest.DruidSource") + .mode(SaveMode.Overwrite) + .options(options) + .save(); + interval = new Interval(DateTime.parse("2019-10-16T00:00:00Z"), DateTime.parse("2019-10-18T00:00:00Z")); + verifySegmentTable(interval, firstVersion, true, 2); + + String secondVersion = DateTime.now(ISOChronology.getInstanceUTC()).toString(); + interval = new Interval(DateTime.parse("2019-10-01T00:00:00Z"), DateTime.parse("2019-11-01T00:00:00Z")); + verifySegmentTable(interval, secondVersion, true, 1); + verifySegmentPath(Paths.get(testFolder.toString(), DATA_SOURCE), interval, secondVersion, 1, true); } @Test