From e090d3c2edeafb7cb6c155ab09216bb4634e1de9 Mon Sep 17 00:00:00 2001 From: vivek-balakrishnan-rovio <82032422+vivek-balakrishnan-rovio@users.noreply.github.com> Date: Mon, 21 Jun 2021 14:58:57 +0530 Subject: [PATCH] Fix table locking while marking unused old segments (#14) * Use Druid library class to publish segments * Avoid lock contention with 2 phase old segment update --- .../rovio/ingest/util/MetadataUpdater.java | 99 ++++++++++--------- 1 file changed, 52 insertions(+), 47 deletions(-) diff --git a/src/main/java/com/rovio/ingest/util/MetadataUpdater.java b/src/main/java/com/rovio/ingest/util/MetadataUpdater.java index 8033063..cf6cf52 100644 --- a/src/main/java/com/rovio/ingest/util/MetadataUpdater.java +++ b/src/main/java/com/rovio/ingest/util/MetadataUpdater.java @@ -16,21 +16,22 @@ package com.rovio.ingest.util; import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableMap; import com.rovio.ingest.WriterContext; -import org.apache.druid.java.util.common.DateTimes; +import org.apache.commons.lang3.StringUtils; +import org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler; import org.apache.druid.metadata.MetadataStorageConnectorConfig; import org.apache.druid.metadata.MetadataStorageTablesConfig; import org.apache.druid.metadata.SQLMetadataConnector; import org.apache.druid.metadata.storage.mysql.MySQLConnector; import org.apache.druid.metadata.storage.mysql.MySQLConnectorConfig; import org.apache.druid.timeline.DataSegment; -import org.apache.druid.timeline.partition.NoneShardSpec; import org.skife.jdbi.v2.PreparedBatch; -import org.skife.jdbi.v2.Update; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.stream.Collectors; import static com.rovio.ingest.DataSegmentCommitMessage.MAPPER; import static java.lang.String.format; @@ -38,19 +39,17 @@ public class MetadataUpdater { private static final Logger LOG = LoggerFactory.getLogger(MetadataUpdater.class); - private static final String INSERT_SEGMENT_SQL = - "INSERT INTO %1$s (id, dataSource, created_date, start, \"end\", partitioned, version, used, payload) " + - "VALUES (:id, :dataSource, :created_date, :start, :end, :partitioned, :version, :used, :payload)"; - - private static final String MARK_ALL_OLDER_SEGMENTS_AS_UNUSED_SQL = - "UPDATE %1$s SET used = false" + - " WHERE dataSource=:dataSource AND version != :version AND used = true"; + private static final String SELECT_UNUSED_OLD_SEGMENTS = + "SELECT id FROM %1$s WHERE dataSource = :dataSource AND version < :version AND used = true AND id NOT IN (:ids)"; + private static final String MARK_SEGMENT_AS_UNUSED_BY_ID = + "UPDATE %1$s SET used=false WHERE id = :id"; private final String dataSource; private final String version; private final boolean initDataSource; private final SQLMetadataConnector sqlConnector; private final String segmentsTable; + private final SQLMetadataStorageUpdaterJobHandler metadataStorageUpdaterJobHandler; public MetadataUpdater(WriterContext param) { Preconditions.checkNotNull(param); @@ -79,6 +78,7 @@ public String getPassword() { this.sqlConnector = new MySQLConnector(() -> metadataStorageConnectorConfig, () -> metadataStorageTablesConfig, new MySQLConnectorConfig()); + this.metadataStorageUpdaterJobHandler = new SQLMetadataStorageUpdaterJobHandler(sqlConnector); testDbConnection(); } @@ -91,9 +91,8 @@ private void testDbConnection() { } /** - * Updates segments in Metadata database. - * Same logic as {@link org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler#publishSegments} with - * additional handling for (re-)init. + * Updates segments in Metadata segment table using {@link org.apache.druid.indexer.SQLMetadataStorageUpdaterJobHandler#publishSegments}, + * with additional handling for (re-)init. */ public void publishSegments(List dataSegments) { if (dataSegments.isEmpty()) { @@ -101,40 +100,46 @@ public void publishSegments(List dataSegments) { return; } - this.sqlConnector.getDBI().withHandle(handle -> { - handle.getConnection().setAutoCommit(false); - handle.begin(); - PreparedBatch preparedBatch = handle.prepareBatch(format(INSERT_SEGMENT_SQL, this.segmentsTable)); - for (DataSegment segment : dataSegments) { - preparedBatch - .bind("id", segment.getIdentifier()) - .bind("dataSource", segment.getDataSource()) - .bind("created_date", DateTimes.nowUtc().toString()) - .bind("start", segment.getInterval().getStart().toString()) - .bind("end", segment.getInterval().getEnd().toString()) - .bind("partitioned", !(segment.getShardSpec() instanceof NoneShardSpec)) - .bind("version", segment.getVersion()) - .bind("used", true) - .bind("payload", MAPPER.writeValueAsBytes(segment)) - .add(); - } - - int[] execute = preparedBatch.execute(); - if (execute.length != dataSegments.size()) { - throw new IllegalStateException(format("Failed to update All segments, segment=%d, updated=%d", - dataSegments.size(), execute.length)); - } + metadataStorageUpdaterJobHandler.publishSegments(segmentsTable, dataSegments, MAPPER); + LOG.info("All segments published"); + + if (initDataSource) { + // Mark old segments as unused for the datasource. + // This done in 2 steps: + // 1. List all used old segments for the datasource + // 2. Update these segments as unused. + // This is done this way to avoid accidentally locking the entire segment table. + + sqlConnector.getDBI().withHandle(handle -> { + List oldSegmentIds = handle + .createQuery(String.format(SELECT_UNUSED_OLD_SEGMENTS, segmentsTable)) + .bind("dataSource", dataSource) + .bind("version", version) + .bind("ids", + dataSegments + .stream() + .map(d -> StringUtils.wrap(d.getIdentifier(), "'")) + .collect(Collectors.joining(","))) + .list() + .stream() + .map(m -> m.get("id").toString()) + .collect(Collectors.toList()); + + if (!oldSegmentIds.isEmpty()) { + PreparedBatch batch = handle.prepareBatch(String.format(MARK_SEGMENT_AS_UNUSED_BY_ID, segmentsTable)); + for (String id : oldSegmentIds) { + batch.add(new ImmutableMap.Builder() + .put("id", id) + .build()); + } + + LOG.info(String.format("Marking %s old segments as ununsed", oldSegmentIds.size())); + batch.execute(); + } + + return null; + }); - Update updateStatement; - if (initDataSource) { - updateStatement = handle.createStatement(format(MARK_ALL_OLDER_SEGMENTS_AS_UNUSED_SQL, this.segmentsTable)) - .bind("dataSource", this.dataSource) - .bind("version", this.version); - updateStatement.execute(); - } - - handle.commit(); - return execute; - }); + } } }