Skip to content

Commit

Permalink
Fix table locking while marking unused old segments (#14)
Browse files Browse the repository at this point in the history
* Use Druid library class to publish segments

* Avoid lock contention with 2 phase old segment update
  • Loading branch information
vivek-balakrishnan-rovio authored Jun 21, 2021
1 parent dd38a69 commit e090d3c
Showing 1 changed file with 52 additions and 47 deletions.
99 changes: 52 additions & 47 deletions src/main/java/com/rovio/ingest/util/MetadataUpdater.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,41 +16,40 @@
package com.rovio.ingest.util;

import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.rovio.ingest.WriterContext;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.commons.lang3.StringUtils;
import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler;
import org.apache.druid.metadata.MetadataStorageConnectorConfig;
import org.apache.druid.metadata.MetadataStorageTablesConfig;
import org.apache.druid.metadata.SQLMetadataConnector;
import org.apache.druid.metadata.storage.mysql.MySQLConnector;
import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.NoneShardSpec;
import org.skife.jdbi.v2.PreparedBatch;
import org.skife.jdbi.v2.Update;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.stream.Collectors;

import static com.rovio.ingest.DataSegmentCommitMessage.MAPPER;
import static java.lang.String.format;

public class MetadataUpdater {
private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class);

private static final String INSERT_SEGMENT_SQL =
"INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " +
"VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)";

private static final String MARK_ALL_OLDER_SEGMENTS_AS_UNUSED_SQL =
"UPDATE %1$s SET used = false" +
" WHERE dataSource=:dataSource AND version != :version AND used = true";
private static final String SELECT_UNUSED_OLD_SEGMENTS =
"SELECT id FROM %1$s WHERE dataSource = :dataSource AND version < :version AND used = true AND id NOT IN (:ids)";
private static final String MARK_SEGMENT_AS_UNUSED_BY_ID =
"UPDATE %1$s SET used=false WHERE id = :id";

private final String dataSource;
private final String version;
private final boolean initDataSource;
private final SQLMetadataConnector sqlConnector;
private final String segmentsTable;
private final SQLMetadataStorageUpdaterJobHandler metadataStorageUpdaterJobHandler;

public MetadataUpdater(WriterContext param) {
Preconditions.checkNotNull(param);
Expand Down Expand Up @@ -79,6 +78,7 @@ public String getPassword() {
this.sqlConnector = new MySQLConnector(() -> metadataStorageConnectorConfig,
() -> metadataStorageTablesConfig,
new MySQLConnectorConfig());
this.metadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(sqlConnector);

testDbConnection();
}
Expand All @@ -91,50 +91,55 @@ private void testDbConnection() {
}

/**
* Updates segments in Metadata database.
* Same logic as {@link org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler#publishSegments} with
* additional handling for (re-)init.
* Updates segments in Metadata segment table using {@link org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler#publishSegments},
* with additional handling for (re-)init.
*/
public void publishSegments(List<DataSegment> dataSegments) {
if (dataSegments.isEmpty()) {
LOG.warn("No segments created, skipping metadata update.");
return;
}

this.sqlConnector.getDBI().withHandle(handle -> {
handle.getConnection().setAutoCommit(false);
handle.begin();
PreparedBatch preparedBatch = handle.prepareBatch(format(INSERT_SEGMENT_SQL, this.segmentsTable));
for (DataSegment segment : dataSegments) {
preparedBatch
.bind("id", segment.getIdentifier())
.bind("dataSource", segment.getDataSource())
.bind("created_date", DateTimes.nowUtc().toString())
.bind("start", segment.getInterval().getStart().toString())
.bind("end", segment.getInterval().getEnd().toString())
.bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec))
.bind("version", segment.getVersion())
.bind("used", true)
.bind("payload", MAPPER.writeValueAsBytes(segment))
.add();
}

int[] execute = preparedBatch.execute();
if (execute.length != dataSegments.size()) {
throw new IllegalStateException(format("Failed to update All segments, segment=%d, updated=%d",
dataSegments.size(), execute.length));
}
metadataStorageUpdaterJobHandler.publishSegments(segmentsTable, dataSegments, MAPPER);
LOG.info("All segments published");

if (initDataSource) {
// Mark old segments as unused for the datasource.
// This done in 2 steps:
// 1. List all used old segments for the datasource
// 2. Update these segments as unused.
// This is done this way to avoid accidentally locking the entire segment table.

sqlConnector.getDBI().withHandle(handle -> {
List<String> oldSegmentIds = handle
.createQuery(String.format(SELECT_UNUSED_OLD_SEGMENTS, segmentsTable))
.bind("dataSource", dataSource)
.bind("version", version)
.bind("ids",
dataSegments
.stream()
.map(d -> StringUtils.wrap(d.getIdentifier(), "'"))
.collect(Collectors.joining(",")))
.list()
.stream()
.map(m -> m.get("id").toString())
.collect(Collectors.toList());

if (!oldSegmentIds.isEmpty()) {
PreparedBatch batch = handle.prepareBatch(String.format(MARK_SEGMENT_AS_UNUSED_BY_ID, segmentsTable));
for (String id : oldSegmentIds) {
batch.add(new ImmutableMap.Builder<String, Object>()
.put("id", id)
.build());
}

LOG.info(String.format("Marking %s old segments as ununsed", oldSegmentIds.size()));
batch.execute();
}

return null;
});

Update updateStatement;
if (initDataSource) {
updateStatement = handle.createStatement(format(MARK_ALL_OLDER_SEGMENTS_AS_UNUSED_SQL, this.segmentsTable))
.bind("dataSource", this.dataSource)
.bind("version", this.version);
updateStatement.execute();
}

handle.commit();
return execute;
});
}
}
}

0 comments on commit e090d3c

Please sign in to comment.