Skip to content

Commit

Permalink
Remove segment granularity check from MetadataUpdater (#7)
Browse files Browse the repository at this point in the history
  • Loading branch information
vivek-balakrishnan-rovio authored Apr 22, 2021
1 parent 2040a60 commit 8108491
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 55 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -312,7 +312,7 @@ These are the options for `DruidSource`, to be passed with `write.options()`.
| `druid.segment.max_rows` | Max number of rows per segment | `5000000` |
| `druid.memory.max_rows` | Max number of rows to keep in memory in spark data writer | `75000` |
| `druid.segment_storage.type` | Type of Deep Storage to use. Allowed values: `s3`, `local`. | `s3` |
| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `false`, `druid.segment_granularity` must match with the existing segments in the target datasource. | `false` |
| `druid.datasource.init` | Boolean flag for (re-)initializing Druid datasource. If `true`, any pre-existing segments for the datasource is marked as unused. | `false` |
| `druid.bitmap_factory` | Compression format for bitmap indexes. Possible values: `concise`, `roaring`. For type `roaring`, the boolean property compressRunOnSerialization is always set to `true`. `rovio-ingest` uses `concise` by default regardless of Druid library version. | `concise` |
| `druid.segment.rollup` | Whether to rollup data during ingestion | `true` |

Expand Down
1 change: 0 additions & 1 deletion src/main/java/com/rovio/ingest/DruidDataSourceWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ class DruidDataSourceWriter implements BatchWrite {
this.segmentSpec = SegmentSpec.from(param.getDataSource(),param.getTimeColumn(), param.getExcludedDimensions(),
param.getSegmentGranularity(), param.getQueryGranularity(), schema, param.isRollup(), param.getMetricsSpec());
this.metadataUpdater = new MetadataUpdater(param);
this.metadataUpdater.checkGranularity(segmentSpec.getDataSchema().getGranularitySpec().getSegmentGranularity());
}

@Override
Expand Down
38 changes: 0 additions & 38 deletions src/main/java/com/rovio/ingest/util/MetadataUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,19 @@
import com.google.common.base.Preconditions;
import com.rovio.ingest.WriterContext;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.granularity.Granularity;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.storage.mysql.MySQLConnector;
import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.joda.time.DateTime;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.Map;

import static com.rovio.ingest.DataSegmentCommitMessage.MAPPER;
import static java.lang.String.format;
Expand All @@ -49,10 +46,6 @@ public class MetadataUpdater {
"UPDATE %1$s SET used = false" +
" WHERE dataSource=:dataSource AND version != :version AND used = true";

private static final String SELECT_ANY_USED_SEGMENT_SQL =
"SELECT start, end FROM %1$s" +
" WHERE dataSource=:dataSource AND version != :version AND used = true LIMIT 1";

private final String dataSource;
private final String version;
private final boolean initDataSource;
Expand Down Expand Up @@ -144,35 +137,4 @@ public void publishSegments(List<DataSegment> dataSegments) {
return execute;
});
}

public void checkGranularity(Granularity segmentGranularity) {
if (initDataSource) {
LOG.info("initDataSource set to true, skip segment granularity check");
return;
}

Map<String, Object> result = this.sqlConnector
.retryWithHandle(handle -> handle.createQuery(format(SELECT_ANY_USED_SEGMENT_SQL, this.segmentsTable))
.bind("dataSource", this.dataSource)
.bind("version", this.version)
.first());

if (result == null) {
LOG.info("No existing segments found while checking for granularity");
return;
}

DateTime start = DateTime.parse((String) result.get("start"));
DateTime end = DateTime.parse((String) result.get("end"));
if (!segmentGranularity.increment(start).equals(end)) {
throw new IllegalStateException(format(
"Found a used segment with granularity mismatch, granularity=%s, start=%s, end=%s, expected end=%s",
segmentGranularity,
start,
end,
segmentGranularity.increment(start)
));
}

}
}
23 changes: 16 additions & 7 deletions src/test/java/com/rovio/ingest/DruidDatasetExtensionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Table;
import com.rovio.ingest.extensions.java.DruidDatasetExtensions;
import org.apache.spark.sql.DataFrameWriter;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SaveMode;
Expand Down Expand Up @@ -150,7 +149,7 @@ public void saveWithPartiallyOverWrittenSegments() throws IOException {
}

@Test
public void saveFailsWithIncrementalAndGranularityChanges() {
public void saveWithIncrementalAndGranularityChanges() throws IOException {
Dataset<Row> dataset = loadCsv(spark, "/data.csv");
dataset = DruidDatasetExtensions
.repartitionByDruidSegmentSize(dataset, "date", "DAY", 5000000, false);
Expand All @@ -162,6 +161,11 @@ public void saveFailsWithIncrementalAndGranularityChanges() {
.options(options)
.save();

Interval interval = new Interval(DateTime.parse("2019-10-16T00:00:00Z"), DateTime.parse("2019-10-18T00:00:00Z"));
String firstVersion = DateTime.now(ISOChronology.getInstanceUTC()).toString();
verifySegmentPath(Paths.get(testFolder.toString(), DATA_SOURCE), interval, firstVersion, 1, false);
verifySegmentTable(interval, firstVersion, true, 2);

Dataset<Row> dataset2 = loadCsv(spark, "/data2.csv");
dataset2.show(false);
dataset2 = DruidDatasetExtensions
Expand All @@ -171,14 +175,19 @@ public void saveFailsWithIncrementalAndGranularityChanges() {
DateTimeUtils.setCurrentMillisFixed(VERSION_TIME_MILLIS + 60_000);
options.put(SEGMENT_GRANULARITY, "MONTH");

DataFrameWriter<Row> writer = dataset2.write()
dataset2.write()
.format(DruidSource.FORMAT)
.mode(SaveMode.Overwrite)
.options(options);
.options(options)
.save();

Throwable thrown = assertThrows(IllegalStateException.class, writer::save);
assertEquals(thrown.getMessage(),
"Found a used segment with granularity mismatch, granularity={type=period, period=P1M, timeZone=UTC, origin=null}, start=2019-10-16T00:00:00.000Z, end=2019-10-17T00:00:00.000Z, expected end=2019-11-16T00:00:00.000Z");
interval = new Interval(DateTime.parse("2019-10-16T00:00:00Z"), DateTime.parse("2019-10-18T00:00:00Z"));
verifySegmentTable(interval, firstVersion, true, 2);

String secondVersion = DateTime.now(ISOChronology.getInstanceUTC()).toString();
interval = new Interval(DateTime.parse("2019-10-01T00:00:00Z"), DateTime.parse("2019-11-01T00:00:00Z"));
verifySegmentTable(interval, secondVersion, true, 1);
verifySegmentPath(Paths.get(testFolder.toString(), DATA_SOURCE), interval, secondVersion, 1, true);
}

@Test
Expand Down
26 changes: 18 additions & 8 deletions src/test/java/com/rovio/ingest/DruidSourceTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -259,7 +259,7 @@ public void passForPartiallyOverWrittenSegments() throws IOException {
}

@Test
public void failWhenIncrementalAndGranularityChanges() {
public void shouldSaveWhenIncrementalAndGranularityChanges() throws IOException {
Dataset<Row> dataset = loadCsv(spark, "/data.csv");
List<Column> columns = Lists.newArrayList(unix_timestamp(column("date")).multiply(1000), lit("DAY"));
dataset = dataset.withColumn("__partition",
Expand All @@ -276,6 +276,11 @@ public void failWhenIncrementalAndGranularityChanges() {
.options(options)
.save();

Interval interval = new Interval(DateTime.parse("2019-10-16T00:00:00Z"), DateTime.parse("2019-10-18T00:00:00Z"));
String firstVersion = DateTime.now(ISOChronology.getInstanceUTC()).toString();
verifySegmentPath(Paths.get(testFolder.toString(), DATA_SOURCE), interval, firstVersion, 1, false);
verifySegmentTable(interval, firstVersion, true, 2);

dataset = loadCsv(spark, "/data2.csv");
dataset.show(false);
columns = Lists.newArrayList(unix_timestamp(column("date")).multiply(1000), lit("MONTH"));
Expand All @@ -288,13 +293,18 @@ public void failWhenIncrementalAndGranularityChanges() {

DateTimeUtils.setCurrentMillisFixed(VERSION_TIME_MILLIS + 60_000);
options.put(SEGMENT_GRANULARITY, "MONTH");
IllegalStateException thrown = assertThrows(IllegalStateException.class,
() -> dataset2.write()
.format("com.rovio.ingest.DruidSource")
.mode(SaveMode.Overwrite)
.options(options)
.save());
assertThat(thrown.getMessage(), containsString("Found a used segment with granularity mismatch"));
dataset2.write()
.format("com.rovio.ingest.DruidSource")
.mode(SaveMode.Overwrite)
.options(options)
.save();
interval = new Interval(DateTime.parse("2019-10-16T00:00:00Z"), DateTime.parse("2019-10-18T00:00:00Z"));
verifySegmentTable(interval, firstVersion, true, 2);

String secondVersion = DateTime.now(ISOChronology.getInstanceUTC()).toString();
interval = new Interval(DateTime.parse("2019-10-01T00:00:00Z"), DateTime.parse("2019-11-01T00:00:00Z"));
verifySegmentTable(interval, secondVersion, true, 1);
verifySegmentPath(Paths.get(testFolder.toString(), DATA_SOURCE), interval, secondVersion, 1, true);
}

@Test
Expand Down

0 comments on commit 8108491

Please sign in to comment.