diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 4123e3157ed..2e4c443ac28 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -113,6 +113,7 @@ public static FileUploadType getDefaultUploadType() { private static final String SCHEMA_PATH = "/schemas"; private static final String OLD_SEGMENT_PATH = "/segments"; private static final String SEGMENT_PATH = "/v2/segments"; + private static final String SEGMENT_UPLOAD_LIST_PATH = "/segmentList"; private static final String TABLES_PATH = "/tables"; private static final String TYPE_DELIMITER = "type="; private static final String START_REPLACE_SEGMENTS_PATH = "/startReplaceSegments"; @@ -365,6 +366,12 @@ public static URI getUploadSegmentURI(URI controllerURI) return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH); } + public static URI getUploadSegmentListURI(URI controllerURI) + throws URISyntaxException { + return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), + SEGMENT_UPLOAD_LIST_PATH); + } + public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType, boolean forceCleanup) throws URISyntaxException { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientTest.java index 2640f33a695..9b79e814b98 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientTest.java @@ -27,6 +27,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Collections; import java.util.List; import java.util.Random; @@ -128,6 +130,14 @@ public void testSendFileWithJson() } } + @Test + public void testGetUploadSegmentListURI() + throws URISyntaxException { + URI controllerURI = new URI("https://myhost:9443"); + URI uriWithEndpoint = FileUploadDownloadClient.getUploadSegmentListURI(controllerURI); + Assert.assertEquals(new URI("https://myhost:9443/segmentList"), uriWithEndpoint); + } + @AfterClass public void shutDown() { _testServer.stop(0); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 20b00a3cc3b..40a70e79415 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -36,7 +36,10 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; +import java.net.UnknownHostException; import java.nio.file.Files; +import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.Map; import java.util.UUID; @@ -57,6 +60,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import org.apache.commons.io.FileUtils; @@ -81,6 +85,7 @@ import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.api.upload.SegmentUploadMetadata; import org.apache.pinot.controller.api.upload.SegmentValidationUtils; import org.apache.pinot.controller.api.upload.ZKOperator; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -101,6 +106,7 @@ import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.glassfish.grizzly.http.server.Request; +import org.glassfish.jersey.media.multipart.BodyPart; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.server.ManagedAsync; @@ -233,7 +239,8 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER); String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); - String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI); + String sourceDownloadURIStr = extractHttpHeaderWithOptionalPrefixAsFallback(headers, + FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, "x-segmentname-"); String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR); @@ -295,13 +302,18 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE); copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore); createSegmentFileFromMultipart(multiPart, destFile); + PinotFS pinotFS = null; try { URI segmentURI = new URI(sourceDownloadURIStr); - PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme()); + pinotFS = PinotFSFactory.create(segmentURI.getScheme()); segmentSizeInBytes = pinotFS.length(segmentURI); } catch (Exception e) { segmentSizeInBytes = -1; LOGGER.warn("Could not fetch segment size for metadata push", e); + } finally { + if (pinotFS != null) { + pinotFS.close(); + } } break; default: @@ -362,8 +374,6 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl Pair encryptionInfo = encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, uploadedSegmentIsEncrypted, crypterClassNameInHeader, crypterNameInTableConfig, segmentName, tableNameWithType); - - String crypterName = encryptionInfo.getLeft(); File segmentFile = encryptionInfo.getRight(); // Update download URI if controller is responsible for moving the segment to the deep store @@ -384,10 +394,10 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation); ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); - zkOperator.completeSegmentOperations(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, - segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, - enableParallelPushProtection, allowRefresh, headers); - + SegmentUploadMetadata segmentUploadMetadata = new SegmentUploadMetadata(segmentDownloadURIStr, + sourceDownloadURIStr, finalSegmentLocationURI, segmentSizeInBytes, segmentMetadata, encryptionInfo); + zkOperator.completeSegmentsOperations(tableNameWithType, uploadType, enableParallelPushProtection, allowRefresh, + headers, Collections.singletonList(segmentUploadMetadata)); return new SuccessResponse("Successfully uploaded segment: " + segmentName + " of table: " + tableNameWithType); } catch (WebApplicationException e) { throw e; @@ -403,6 +413,184 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl } } + // Method used to update a list of segments in batch mode with the METADATA upload type. + private SuccessResponse uploadSegments(String tableName, TableType tableType, + FormDataMultiPart multiParts, boolean enableParallelPushProtection, + boolean allowRefresh, HttpHeaders headers, Request request) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String tableNameWithType = tableType == TableType.OFFLINE + ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName) + : TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + throw new ControllerApplicationException(LOGGER, "Failed to fetch table config for table: " + tableNameWithType, + Response.Status.BAD_REQUEST); + } + + String clientAddress; + try { + clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName(); + } catch (UnknownHostException ex) { + throw new ControllerApplicationException(LOGGER, "Failed to resolve hostname from input request", + Response.Status.BAD_REQUEST, ex); + } + + String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); + FileUploadType uploadType = getUploadType(uploadTypeStr); + if (!FileUploadType.METADATA.equals(uploadType)) { + throw new ControllerApplicationException(LOGGER, "Unsupported upload type: " + uploadTypeStr, + Response.Status.BAD_REQUEST); + } + + String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); + String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR); + ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance(); + List segmentUploadMetadataList = new ArrayList<>(); + List tempEncryptedFiles = new ArrayList<>(); + List tempDecryptedFiles = new ArrayList<>(); + List tempSegmentDirs = new ArrayList<>(); + List segmentNames = new ArrayList<>(); + PinotFS pinotFS = null; + + for (BodyPart bodyPart1: multiParts.getBodyParts()) { + FormDataBodyPart bodyPart = (FormDataBodyPart) bodyPart1; + String segmentName = bodyPart.getContentDisposition().getFileName(); + if (StringUtils.isEmpty(segmentName)) { + throw new ControllerApplicationException(LOGGER, + "filename is a required field within the multipart object for METADATA batch upload mode.", + Response.Status.BAD_REQUEST); + } + File tempEncryptedFile; + File tempDecryptedFile; + File tempSegmentDir; + try { + String sourceDownloadURIStr = extractHttpHeader(headers, "x-segmentname-" + segmentName); + if (StringUtils.isEmpty(sourceDownloadURIStr)) { + throw new ControllerApplicationException(LOGGER, + "'DOWNLOAD_URI' is required as a field within the multipart object for METADATA batch upload mode.", + Response.Status.BAD_REQUEST); + } + // The downloadUri for putting into segment zk metadata + String segmentDownloadURIStr = sourceDownloadURIStr; + + String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID(); + tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName + ENCRYPTED_SUFFIX); + tempEncryptedFiles.add(tempEncryptedFile); + tempDecryptedFile = new File(provider.getFileUploadTempDir(), tempFileName); + tempDecryptedFiles.add(tempDecryptedFile); + tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName); + tempSegmentDirs.add(tempSegmentDir); + boolean encryptSegment = StringUtils.isNotEmpty(crypterClassNameInHeader); + File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile; + // override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE + // else set to false for backward compatibility + String copySegmentToDeepStore = + extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE); + boolean copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore); + createSegmentFileFromBodyPart(bodyPart, destFile); + long segmentSizeInBytes; + try { + URI segmentURI = new URI(sourceDownloadURIStr); + if (pinotFS == null) { + pinotFS = PinotFSFactory.create(segmentURI.getScheme()); + } + segmentSizeInBytes = pinotFS.length(segmentURI); + } catch (Exception ex) { + segmentSizeInBytes = -1; + LOGGER.warn("Could not fetch segment size for metadata push", ex); + } + if (pinotFS != null) { + pinotFS.close(); + } + if (encryptSegment) { + decryptFile(crypterClassNameInHeader, tempEncryptedFile, tempDecryptedFile); + } + + String metadataProviderClass = DefaultMetadataExtractor.class.getName(); + SegmentMetadata segmentMetadata = getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass); + segmentNames.add(segmentName); + LOGGER.info("Processing upload request for segment: {} of table: {} with upload type: {} from client: {}, " + + "ingestion descriptor: {}", segmentName, tableNameWithType, uploadType, clientAddress, ingestionDescriptor + ); + + // Validate segment + if (tableConfig.getIngestionConfig() == null || tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) { + SegmentValidationUtils.validateTimeInterval(segmentMetadata, tableConfig); + } + long untarredSegmentSizeInBytes = 0L; + if (segmentSizeInBytes > 0) { + // TODO: Include the untarred segment size when using the METADATA push rest API. + // Currently we can only use the tarred segment size as an approximation. + untarredSegmentSizeInBytes = segmentSizeInBytes; + } + SegmentValidationUtils.checkStorageQuota(segmentName, untarredSegmentSizeInBytes, tableConfig, + _pinotHelixResourceManager, _controllerConf, _controllerMetrics, _connectionManager, _executor, + _leadControllerManager); + + // Encrypt segment + String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName(); + Pair encryptionInfo = + encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, encryptSegment, + crypterClassNameInHeader, crypterNameInTableConfig, segmentName, tableNameWithType); + File segmentFile = encryptionInfo.getRight(); + + // Update download URI if controller is responsible for moving the segment to the deep store + URI finalSegmentLocationURI = null; + if (copySegmentToFinalLocation) { + URI dataDirURI = provider.getDataDirURI(); + String dataDirPath = dataDirURI.toString(); + String encodedSegmentName = URIUtils.encode(segmentName); + String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName); + if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { + segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName); + } else { + segmentDownloadURIStr = finalSegmentLocationPath; + } + finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath); + } + SegmentUploadMetadata segmentUploadMetadata = new SegmentUploadMetadata(segmentDownloadURIStr, + sourceDownloadURIStr, finalSegmentLocationURI, segmentSizeInBytes, segmentMetadata, encryptionInfo); + segmentUploadMetadataList.add(segmentUploadMetadata); + LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})", + segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation); + } catch (Exception ex) { + cleanupTempFiles(tempEncryptedFiles, tempDecryptedFiles, tempSegmentDirs); + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR, + segmentUploadMetadataList.size()); + throw new ControllerApplicationException(LOGGER, "Exception while processing segments to upload: " + + ex.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, ex); + } + } + + try { + ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); + zkOperator.completeSegmentsOperations(tableNameWithType, uploadType, enableParallelPushProtection, allowRefresh, + headers, segmentUploadMetadataList); + return new SuccessResponse("Successfully uploaded segments: " + segmentNames + " of table: " + + tableNameWithType); + } catch (Exception ex) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR, + segmentUploadMetadataList.size()); + throw new ControllerApplicationException(LOGGER, "Exception while uploading segments: " + ex.getMessage(), + Response.Status.INTERNAL_SERVER_ERROR, ex); + } finally { + cleanupTempFiles(tempEncryptedFiles, tempDecryptedFiles, tempSegmentDirs); + } + } + + private void cleanupTempFiles(List tempEncryptedFiles, List tempDecryptedFiles, + List tempSegmentDirs) { + List tempFiles = new ArrayList<>(); + tempFiles.addAll(tempEncryptedFiles); + tempFiles.addAll(tempDecryptedFiles); + tempFiles.addAll(tempSegmentDirs); + + for (File tempFile: tempFiles) { + FileUtils.deleteQuietly(tempFile); + } + } + @Nullable private String extractHttpHeader(HttpHeaders headers, String name) { String value = headers.getHeaderString(name); @@ -412,6 +600,24 @@ private String extractHttpHeader(HttpHeaders headers, String name) { return value; } + @Nullable + @VisibleForTesting + String extractHttpHeaderWithOptionalPrefixAsFallback(HttpHeaders headers, String name, String prefix) { + String sourceDownloadURIStr = extractHttpHeader(headers, name); + if (StringUtils.isEmpty(sourceDownloadURIStr)) { + MultivaluedMap headerMap = headers.getRequestHeaders(); + for (Map.Entry> entry: headerMap.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + List sourceDownloadURIValue = entry.getValue(); + if (!sourceDownloadURIValue.isEmpty()) { + return sourceDownloadURIValue.get(0); + } + } + } + } + return sourceDownloadURIStr; + } + @VisibleForTesting Pair encryptSegmentIfNeeded(File tempDecryptedFile, File tempEncryptedFile, boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment, String crypterClassNameInTableConfig, @@ -555,6 +761,67 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart, } } + @POST + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Path("/segmentList") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Cluster.UPLOAD_SEGMENT) + @Authenticate(AccessType.CREATE) + @ApiOperation(value = "Upload a segment", notes = "Upload a segment as binary") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully uploaded segment"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 403, message = "Segment validation fails"), + @ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"), + @ApiResponse(code = 410, message = "Segment to refresh does not exist"), + @ApiResponse(code = 412, message = "CRC check fails"), + @ApiResponse(code = 500, message = "Internal error") + }) + @TrackInflightRequestMetrics + @TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS) + // This multipart based endpoint is used to upload a list of segments in batch mode. + public void uploadSegmentsAsMultiPart(FormDataMultiPart multiPart, + @ApiParam(value = "Name of the table", required = true) + @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) + String tableName, + @ApiParam(value = "Type of the table", required = true) + @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) + String tableType, + @ApiParam(value = "Whether to enable parallel push protection") + @DefaultValue("false") + @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) + boolean enableParallelPushProtection, + @ApiParam(value = "Whether to refresh if the segment already exists") + @DefaultValue("true") + @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) + boolean allowRefresh, + @Context HttpHeaders headers, + @Context Request request, + @Suspended final AsyncResponse asyncResponse) { + if (StringUtils.isEmpty(tableName)) { + throw new ControllerApplicationException(LOGGER, + "tableName is a required field while uploading segments in batch mode.", + Response.Status.BAD_REQUEST); + } + if (StringUtils.isEmpty(tableType)) { + throw new ControllerApplicationException(LOGGER, + "tableType is a required field while uploading segments in batch mode.", + Response.Status.BAD_REQUEST); + } + if (multiPart == null) { + throw new ControllerApplicationException(LOGGER, + "multiPart is a required field while uploading segments in batch mode.", + Response.Status.BAD_REQUEST); + } + try { + asyncResponse.resume(uploadSegments(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, + enableParallelPushProtection, allowRefresh, headers, request)); + } catch (Throwable t) { + asyncResponse.resume(t); + } + } + @POST @ManagedAsync @Produces(MediaType.APPLICATION_JSON) @@ -752,6 +1019,17 @@ private static void createSegmentFileFromMultipart(FormDataMultiPart multiPart, } } + @VisibleForTesting + static void createSegmentFileFromBodyPart(FormDataBodyPart segmentMetadataBodyPart, File destFile) + throws IOException { + try (InputStream inputStream = segmentMetadataBodyPart.getValueAs(InputStream.class); + OutputStream outputStream = new FileOutputStream(destFile)) { + IOUtils.copyLarge(inputStream, outputStream); + } finally { + segmentMetadataBodyPart.cleanup(); + } + } + private FileUploadType getUploadType(String uploadTypeStr) { if (uploadTypeStr != null) { return FileUploadType.valueOf(uploadTypeStr); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java new file mode 100644 index 00000000000..30f8a885045 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.upload; + +import java.io.File; +import java.net.URI; +import java.util.Objects; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.segment.spi.SegmentMetadata; + + +public class SegmentUploadMetadata { + private final String _segmentDownloadURIStr; + private final String _sourceDownloadURIStr; + private final URI _finalSegmentLocationURI; + private final Long _segmentSizeInBytes; + private final SegmentMetadata _segmentMetadata; + private final Pair _encryptionInfo; + private ZNRecord _segmentMetadataZNRecord; + + public SegmentUploadMetadata(String segmentDownloadURIStr, String sourceDownloadURIStr, URI finalSegmentLocationURI, + Long segmentSizeInBytes, SegmentMetadata segmentMetadata, Pair encryptionInfo) { + _segmentDownloadURIStr = segmentDownloadURIStr; + _sourceDownloadURIStr = sourceDownloadURIStr; + _segmentSizeInBytes = segmentSizeInBytes; + _segmentMetadata = segmentMetadata; + _encryptionInfo = encryptionInfo; + _finalSegmentLocationURI = finalSegmentLocationURI; + } + + public String getSegmentDownloadURIStr() { + return _segmentDownloadURIStr; + } + + public String getSourceDownloadURIStr() { + return _sourceDownloadURIStr; + } + + public URI getFinalSegmentLocationURI() { + return _finalSegmentLocationURI; + } + + public Long getSegmentSizeInBytes() { + return _segmentSizeInBytes; + } + + public SegmentMetadata getSegmentMetadata() { + return _segmentMetadata; + } + + public Pair getEncryptionInfo() { + return _encryptionInfo; + } + + public void setSegmentMetadataZNRecord(ZNRecord segmentMetadataZNRecord) { + _segmentMetadataZNRecord = segmentMetadataZNRecord; + } + + public ZNRecord getSegmentMetadataZNRecord() { + return _segmentMetadataZNRecord; + } + + @Override + public boolean equals(Object o) { + if (this == o) return true; + if (o == null || getClass() != o.getClass()) return false; + SegmentUploadMetadata that = (SegmentUploadMetadata) o; + return Objects.equals(_segmentDownloadURIStr, that._segmentDownloadURIStr) && + Objects.equals(_sourceDownloadURIStr, that._sourceDownloadURIStr) && + Objects.equals(_finalSegmentLocationURI, that._finalSegmentLocationURI) && + Objects.equals(_segmentSizeInBytes, that._segmentSizeInBytes) && + Objects.equals(_segmentMetadata, that._segmentMetadata) && + Objects.equals(_encryptionInfo, that._encryptionInfo) && + Objects.equals(_segmentMetadataZNRecord, that._segmentMetadataZNRecord); + } + + @Override + public int hashCode() { + return Objects.hash(_segmentDownloadURIStr, _sourceDownloadURIStr, _finalSegmentLocationURI, + _segmentSizeInBytes, _segmentMetadata, _encryptionInfo, _segmentMetadataZNRecord); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index b0aee83d0d5..d6d8a0729c2 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -21,9 +21,13 @@ import com.google.common.base.Preconditions; import java.io.File; import java.net.URI; -import javax.annotation.Nullable; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.IdealState; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -62,52 +66,58 @@ public ZKOperator(PinotHelixResourceManager pinotHelixResourceManager, Controlle _controllerMetrics = controllerMetrics; } - public void completeSegmentOperations(String tableNameWithType, SegmentMetadata segmentMetadata, - FileUploadType uploadType, @Nullable URI finalSegmentLocationURI, File segmentFile, - @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, @Nullable String crypterName, - long segmentSizeInBytes, boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers) + public void completeSegmentsOperations(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, + List segmentUploadMetadataList) throws Exception { - String segmentName = segmentMetadata.getName(); boolean refreshOnly = Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY)); + List newSegmentsList = new ArrayList<>(); + List existingSegmentsList = new ArrayList<>(); + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentName = segmentMetadata.getName(); - ZNRecord existingSegmentMetadataZNRecord = - _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); - if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName, - existingSegmentMetadataZNRecord, enableParallelPushProtection)) { - LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", - tableNameWithType, segmentName); - Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), - "Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName); - existingSegmentMetadataZNRecord = null; - } - - if (existingSegmentMetadataZNRecord == null) { - // Add a new segment - if (refreshOnly) { - throw new ControllerApplicationException(LOGGER, - String.format("Cannot refresh non-existing segment: %s for table: %s", segmentName, tableNameWithType), - Response.Status.GONE); + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName, + existingSegmentMetadataZNRecord, enableParallelPushProtection)) { + LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", + tableNameWithType, segmentName); + Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), + "Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName); + existingSegmentMetadataZNRecord = null; } - LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType); - processNewSegment(tableNameWithType, segmentMetadata, uploadType, finalSegmentLocationURI, segmentFile, - sourceDownloadURIStr, segmentDownloadURIStr, crypterName, segmentSizeInBytes, enableParallelPushProtection, - headers); - } else { - // Refresh an existing segment - if (!allowRefresh) { - // We cannot perform this check up-front in UploadSegment API call. If a segment doesn't exist during the check - // done up-front but ends up getting created before the check here, we could incorrectly refresh an existing - // segment. - throw new ControllerApplicationException(LOGGER, - String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, - tableNameWithType), Response.Status.CONFLICT); + + if (existingSegmentMetadataZNRecord == null) { + // Add a new segment + if (refreshOnly) { + throw new ControllerApplicationException(LOGGER, + String.format("Cannot refresh non-existing segment: %s for table: %s", segmentName, tableNameWithType), + Response.Status.GONE); + } + LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType); + newSegmentsList.add(segmentUploadMetadata); + } else { + // Refresh an existing segment + if (!allowRefresh) { + // We cannot perform this check up-front in UploadSegment API call. If a segment doesn't exist during the + // check done up-front but ends up getting created before the check here, we could incorrectly refresh an + // existing segment. + throw new ControllerApplicationException(LOGGER, + String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, + tableNameWithType), Response.Status.CONFLICT); + } + LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); + segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord); + existingSegmentsList.add(segmentUploadMetadata); } - LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); - processExistingSegment(tableNameWithType, segmentMetadata, uploadType, existingSegmentMetadataZNRecord, - finalSegmentLocationURI, segmentFile, sourceDownloadURIStr, segmentDownloadURIStr, crypterName, - segmentSizeInBytes, enableParallelPushProtection, headers); } + // process new segments + processNewSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, newSegmentsList); + + // process existing segments + processExistingSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, existingSegmentsList); } /** @@ -148,131 +158,140 @@ private void handleParallelPush(String tableNameWithType, String segmentName, lo } } - private void processExistingSegment(String tableNameWithType, SegmentMetadata segmentMetadata, - FileUploadType uploadType, ZNRecord existingSegmentMetadataZNRecord, @Nullable URI finalSegmentLocationURI, - File segmentFile, @Nullable String sourceDownloadURIStr, String segmentDownloadURIStr, - @Nullable String crypterName, long segmentSizeInBytes, boolean enableParallelPushProtection, HttpHeaders headers) + private void processExistingSegments(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, HttpHeaders headers, List segmentUploadMetadataList) throws Exception { - String segmentName = segmentMetadata.getName(); - int expectedVersion = existingSegmentMetadataZNRecord.getVersion(); + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentDownloadURIStr = segmentUploadMetadata.getSegmentDownloadURIStr(); + String sourceDownloadURIStr = segmentUploadMetadata.getSourceDownloadURIStr(); + URI finalSegmentLocationURI = segmentUploadMetadata.getFinalSegmentLocationURI(); + Pair encryptionInfo = segmentUploadMetadata.getEncryptionInfo(); + String crypterName = encryptionInfo.getLeft(); + File segmentFile = encryptionInfo.getRight(); + String segmentName = segmentMetadata.getName(); + ZNRecord existingSegmentMetadataZNRecord = segmentUploadMetadata.getSegmentMetadataZNRecord(); + long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes(); + int expectedVersion = existingSegmentMetadataZNRecord.getVersion(); - // Check if CRC match when IF-MATCH header is set - SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); - long existingCrc = segmentZKMetadata.getCrc(); - checkCRC(headers, tableNameWithType, segmentName, existingCrc); + // Check if CRC match when IF-MATCH header is set + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingCrc = segmentZKMetadata.getCrc(); + checkCRC(headers, tableNameWithType, segmentName, existingCrc); - // Check segment upload start time when parallel push protection enabled - if (enableParallelPushProtection) { - // When segment upload start time is larger than 0, that means another upload is in progress - long segmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); - if (segmentUploadStartTime > 0) { - handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime); - } + // Check segment upload start time when parallel push protection enabled + if (enableParallelPushProtection) { + // When segment upload start time is larger than 0, that means another upload is in progress + long segmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); + if (segmentUploadStartTime > 0) { + handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime); + } - // Lock the segment by setting the upload start time in ZK - segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis()); - if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { - throw new ControllerApplicationException(LOGGER, - String.format("Failed to lock the segment: %s of table: %s, retry later", segmentName, tableNameWithType), - Response.Status.CONFLICT); - } else { - // The version will increment if the zk metadata update is successful - expectedVersion++; + // Lock the segment by setting the upload start time in ZK + segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis()); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to lock the segment: %s of table: %s, retry later", segmentName, tableNameWithType), + Response.Status.CONFLICT); + } else { + // The version will increment if the zk metadata update is successful + expectedVersion++; + } } - } - // Reset segment upload start time to unlock the segment later - // NOTE: reset this value even if parallel push protection is not enabled so that segment can recover in case - // previous segment upload did not finish properly and the parallel push protection is turned off - segmentZKMetadata.setSegmentUploadStartTime(-1); + // Reset segment upload start time to unlock the segment later + // NOTE: reset this value even if parallel push protection is not enabled so that segment can recover in case + // previous segment upload did not finish properly and the parallel push protection is turned off + segmentZKMetadata.setSegmentUploadStartTime(-1); - try { - // Construct the segment ZK metadata custom map modifier - String customMapModifierStr = - headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER); - SegmentZKMetadataCustomMapModifier customMapModifier = - customMapModifierStr != null ? new SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null; + try { + // Construct the segment ZK metadata custom map modifier + String customMapModifierStr = + headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER); + SegmentZKMetadataCustomMapModifier customMapModifier = + customMapModifierStr != null ? new SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null; - // Update ZK metadata and refresh the segment if necessary - long newCrc = Long.parseLong(segmentMetadata.getCrc()); - if (newCrc == existingCrc) { - LOGGER.info( - "New segment crc '{}' is the same as existing segment crc for segment '{}'. Updating ZK metadata without " - + "refreshing the segment.", newCrc, segmentName); - // NOTE: Even though we don't need to refresh the segment, we should still update the following fields: - // - Creation time (not included in the crc) - // - Refresh time - // - Custom map - segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); - segmentZKMetadata.setRefreshTime(System.currentTimeMillis()); - if (customMapModifier != null) { - segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); + // Update ZK metadata and refresh the segment if necessary + long newCrc = Long.parseLong(segmentMetadata.getCrc()); + if (newCrc == existingCrc) { + LOGGER.info( + "New segment crc '{}' is the same as existing segment crc for segment '{}'. Updating ZK metadata without " + + "refreshing the segment.", newCrc, segmentName); + // NOTE: Even though we don't need to refresh the segment, we should still update the following fields: + // - Creation time (not included in the crc) + // - Refresh time + // - Custom map + segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); + segmentZKMetadata.setRefreshTime(System.currentTimeMillis()); + if (customMapModifier != null) { + segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); + } else { + // If no modifier is provided, use the custom map from the segment metadata + segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap()); + } + if (!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) { + // For offline ingestion, it is quite common that the download.uri would change but the crc would be the + // same. E.g. a user re-runs the job which process the same data and segments are stored/pushed from a + // different path from the Deepstore. Read more: https://github.com/apache/pinot/issues/11535 + LOGGER.info("Updating segment download url from: {} to: {} even though crc is the same", + segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr); + segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr); + // When download URI changes, we also need to copy the segment to the final location if existed. + // This typically means users changed the push type from METADATA to SEGMENT or SEGMENT to METADATA. + // Note that switching push type from SEGMENT to METADATA may lead orphan segments in the controller + // managed directory. Read more: https://github.com/apache/pinot/pull/11720 + if (finalSegmentLocationURI != null) { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } + } + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new RuntimeException( + String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", + segmentName, tableNameWithType, expectedVersion)); + } } else { - // If no modifier is provided, use the custom map from the segment metadata - segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap()); - } - if (!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) { - // For offline ingestion, it is quite common that the download.uri would change but the crc would be the same. - // E.g. a user re-runs the job which process the same data and segments are stored/pushed from a different - // path from the Deepstore. Read more: https://github.com/apache/pinot/issues/11535 - LOGGER.info("Updating segment download url from: {} to: {} even though crc is the same", - segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr); - segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr); - // When download URI changes, we also need to copy the segment to the final location if existed. - // This typically means users changed the push type from METADATA to SEGMENT or SEGMENT to METADATA. - // Note that switching push type from SEGMENT to METADATA may lead orphan segments in the controller - // managed directory. Read more: https://github.com/apache/pinot/pull/11720 + // New segment is different with the existing one, update ZK metadata and refresh the segment + LOGGER.info( + "New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing " + + "segment {}", newCrc, existingCrc, segmentName); if (finalSegmentLocationURI != null) { copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, finalSegmentLocationURI); } - } - if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { - throw new RuntimeException( - String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", - segmentName, tableNameWithType, expectedVersion)); - } - } else { - // New segment is different with the existing one, update ZK metadata and refresh the segment - LOGGER.info( - "New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing " - + "segment {}", newCrc, existingCrc, segmentName); - if (finalSegmentLocationURI != null) { - copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, - finalSegmentLocationURI); - } - // NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on - // segment ZK metadata to refresh the segment (server will compare the segment ZK metadata with the local - // metadata to decide whether to download the new segment; broker will update the segment partition info & time - // boundary based on the segment ZK metadata) - if (customMapModifier == null) { - // If no modifier is provided, use the custom map from the segment metadata - segmentZKMetadata.setCustomMap(null); - ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, - segmentDownloadURIStr, crypterName, segmentSizeInBytes); - } else { - // If modifier is provided, first set the custom map from the segment metadata, then apply the modifier - ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, - segmentDownloadURIStr, crypterName, segmentSizeInBytes); - segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); + // NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on + // segment ZK metadata to refresh the segment (server will compare the segment ZK metadata with the local + // metadata to decide whether to download the new segment; broker will update the segment partition info & + // time boundary based on the segment ZK metadata) + if (customMapModifier == null) { + // If no modifier is provided, use the custom map from the segment metadata + segmentZKMetadata.setCustomMap(null); + ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + } else { + // If modifier is provided, first set the custom map from the segment metadata, then apply the modifier + ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); + } + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new RuntimeException( + String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", + segmentName, tableNameWithType, expectedVersion)); + } + LOGGER.info("Updated segment: {} of table: {} to property store", segmentName, tableNameWithType); + + // Send a message to servers and brokers hosting the table to refresh the segment + _pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true); } + } catch (Exception ex) { if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { - throw new RuntimeException( - String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", - segmentName, tableNameWithType, expectedVersion)); + LOGGER.error("Failed to update ZK metadata for segment: {}, table: {}, expected version: {}", segmentName, + tableNameWithType, expectedVersion); } - LOGGER.info("Updated segment: {} of table: {} to property store", segmentName, tableNameWithType); - - // Send a message to servers and brokers hosting the table to refresh the segment - _pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true); - } - } catch (Exception e) { - if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { - LOGGER.error("Failed to update ZK metadata for segment: {}, table: {}, expected version: {}", segmentName, - tableNameWithType, expectedVersion); + throw ex; } - throw e; } } @@ -295,81 +314,97 @@ private void checkCRC(HttpHeaders headers, String tableNameWithType, String segm } } - private void processNewSegment(String tableNameWithType, SegmentMetadata segmentMetadata, FileUploadType uploadType, - @Nullable URI finalSegmentLocationURI, File segmentFile, @Nullable String sourceDownloadURIStr, - String segmentDownloadURIStr, @Nullable String crypterName, long segmentSizeInBytes, - boolean enableParallelPushProtection, HttpHeaders headers) + private void processNewSegments(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, HttpHeaders headers, List segmentUploadMetadataList) throws Exception { - String segmentName = segmentMetadata.getName(); - SegmentZKMetadata newSegmentZKMetadata; - try { - newSegmentZKMetadata = - ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, segmentDownloadURIStr, - crypterName, segmentSizeInBytes); - } catch (IllegalArgumentException e) { - throw new ControllerApplicationException(LOGGER, - String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName, - tableNameWithType, e.getMessage()), Response.Status.BAD_REQUEST); - } - - // Lock if enableParallelPushProtection is true. + Map segmentZKMetadataMap = new HashMap<>(); + List segmentNames = new ArrayList<>(); long segmentUploadStartTime = System.currentTimeMillis(); - if (enableParallelPushProtection) { - newSegmentZKMetadata.setSegmentUploadStartTime(segmentUploadStartTime); - } + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentName = segmentMetadata.getName(); + SegmentZKMetadata newSegmentZKMetadata; + URI finalSegmentLocationURI = segmentUploadMetadata.getFinalSegmentLocationURI(); + String segmentDownloadURIStr = segmentUploadMetadata.getSegmentDownloadURIStr(); + String sourceDownloadURIStr = segmentUploadMetadata.getSourceDownloadURIStr(); + String crypterName = segmentUploadMetadata.getEncryptionInfo().getLeft(); + long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes(); + File segmentFile = segmentUploadMetadata.getEncryptionInfo().getRight(); + try { + newSegmentZKMetadata = ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + segmentZKMetadataMap.put(segmentName, newSegmentZKMetadata); + segmentNames.add(segmentName); + } catch (IllegalArgumentException e) { + throw new ControllerApplicationException(LOGGER, + String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName, + tableNameWithType, e.getMessage()), Response.Status.BAD_REQUEST); + } - // Update zk metadata customer map - String segmentZKMetadataCustomMapModifierStr = headers != null ? headers.getHeaderString( - FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER) : null; - if (segmentZKMetadataCustomMapModifierStr != null) { - SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = - new SegmentZKMetadataCustomMapModifier(segmentZKMetadataCustomMapModifierStr); - newSegmentZKMetadata.setCustomMap( - segmentZKMetadataCustomMapModifier.modifyMap(newSegmentZKMetadata.getCustomMap())); - } - if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) { - throw new RuntimeException( - String.format("Failed to create ZK metadata for segment: %s of table: %s", segmentName, tableNameWithType)); - } + // Lock if enableParallelPushProtection is true. + if (enableParallelPushProtection) { + newSegmentZKMetadata.setSegmentUploadStartTime(segmentUploadStartTime); + } - if (finalSegmentLocationURI != null) { - try { - copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, - finalSegmentLocationURI); - } catch (Exception e) { - // Cleanup the Zk entry and the segment from the permanent directory if it exists. - LOGGER.error("Could not move segment {} from table {} to permanent directory", segmentName, tableNameWithType, - e); - deleteSegmentIfNeeded(tableNameWithType, segmentName, segmentUploadStartTime, enableParallelPushProtection); - throw e; + // Update zk metadata custom map + String segmentZKMetadataCustomMapModifierStr = headers != null ? headers.getHeaderString( + FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER) : null; + if (segmentZKMetadataCustomMapModifierStr != null) { + SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = new SegmentZKMetadataCustomMapModifier( + segmentZKMetadataCustomMapModifierStr); + newSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap( + newSegmentZKMetadata.getCustomMap())); + } + if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) { + throw new RuntimeException(String.format("Failed to create ZK metadata for segment: %s of table: %s", + segmentName, tableNameWithType)); + } + + if (finalSegmentLocationURI != null) { + try { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } catch (Exception ex) { + // Cleanup the Zk entry and the segment from the permanent directory if it exists. + LOGGER.error("Could not move segment {} from table {} to permanent directory", + segmentName, tableNameWithType, ex); + // Delete all segments that are getting processed as we are in batch mode + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, enableParallelPushProtection); + throw ex; + } } } try { - _pinotHelixResourceManager.assignTableSegment(tableNameWithType, segmentMetadata.getName()); - } catch (Exception e) { + _pinotHelixResourceManager.assignTableSegments(tableNameWithType, segmentNames); + } catch (Exception ex) { // assignTableSegment removes the zk entry. // Call deleteSegment to remove the segment from permanent location if needed. - LOGGER.error("Caught exception while calling assignTableSegment for adding segment: {} to table: {}", segmentName, - tableNameWithType, e); - deleteSegmentIfNeeded(tableNameWithType, segmentName, segmentUploadStartTime, enableParallelPushProtection); - throw e; + LOGGER.error("Caught exception while calling assignTableSegments for adding segments: {} to table: {}", + segmentZKMetadataMap.keySet(), tableNameWithType, ex); + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, enableParallelPushProtection); + throw ex; } - if (enableParallelPushProtection) { - // Release lock. Expected version will be 0 as we hold a lock and no updates could take place meanwhile. - newSegmentZKMetadata.setSegmentUploadStartTime(-1); - if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) { - // There is a race condition when it took too much time for the 1st segment upload to process (due to slow - // PinotFS access), which leads to the 2nd attempt of segment upload, and the 2nd segment upload succeeded. - // In this case, when the 1st upload comes back, it shouldn't blindly delete the segment when it failed to - // update the zk metadata. Instead, the 1st attempt should validate the upload start time one more time. If the - // start time doesn't match with the one persisted in zk metadata, segment deletion should be skipped. - String errorMsg = - String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentFile, tableNameWithType); - LOGGER.error(errorMsg); - deleteSegmentIfNeeded(tableNameWithType, segmentName, segmentUploadStartTime, true); - throw new RuntimeException(errorMsg); + for (Map.Entry segmentZKMetadataEntry: segmentZKMetadataMap.entrySet()) { + SegmentZKMetadata newSegmentZKMetadata = segmentZKMetadataEntry.getValue(); + String segmentName = segmentZKMetadataEntry.getKey(); + if (enableParallelPushProtection) { + // Release lock. Expected version will be 0 as we hold a lock and no updates could take place meanwhile. + newSegmentZKMetadata.setSegmentUploadStartTime(-1); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) { + // There is a race condition when it took too much time for the 1st segment upload to process (due to slow + // PinotFS access), which leads to the 2nd attempt of segment upload, and the 2nd segment upload succeeded. + // In this case, when the 1st upload comes back, it shouldn't blindly delete the segment when it failed to + // update the zk metadata. Instead, the 1st attempt should validate the upload start time one more time. + // If the start time doesn't match with the one persisted in zk metadata, segment deletion should be skipped. + String errorMsg = String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentName, + tableNameWithType); + LOGGER.error(errorMsg); + // Delete all segments that are getting processed as we are in batch mode + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, true); + throw new RuntimeException(errorMsg); + } } } } @@ -379,22 +414,26 @@ private void processNewSegment(String tableNameWithType, SegmentMetadata segment * 1) the uploadStartTime matches with the one persisted in ZK metadata. * 2) enableParallelPushProtection is not enabled. */ - private void deleteSegmentIfNeeded(String tableNameWithType, String segmentName, long currentSegmentUploadStartTime, - boolean enableParallelPushProtection) { - ZNRecord existingSegmentMetadataZNRecord = - _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); - if (existingSegmentMetadataZNRecord == null) { - return; - } - // Check if the upload start time is set by this thread itself, if yes delete the segment. - SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); - long existingSegmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); - LOGGER.info("Parallel push protection is {} for segment: {}.", - (enableParallelPushProtection ? "enabled" : "disabled"), segmentName); - if (!enableParallelPushProtection || currentSegmentUploadStartTime == existingSegmentUploadStartTime) { - _pinotHelixResourceManager.deleteSegment(tableNameWithType, segmentName); - LOGGER.info("Deleted zk entry and segment {} for table {}.", segmentName, tableNameWithType); + private void deleteSegmentsIfNeeded(String tableNameWithType, List segmentNames, + long currentSegmentUploadStartTime, boolean enableParallelPushProtection) { + List segmentsToDelete = new ArrayList<>(); + for (String segmentName: segmentNames) { + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord == null) { + continue; + } + // Check if the upload start time is set by this thread itself, if yes delete the segment. + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingSegmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); + LOGGER.info("Parallel push protection is {} for segment: {}.", + (enableParallelPushProtection ? "enabled" : "disabled"), segmentName); + if (!enableParallelPushProtection || currentSegmentUploadStartTime == existingSegmentUploadStartTime) { + segmentsToDelete.add(segmentName); + } } + _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete); + LOGGER.info("Deleted zk entry and segments {} for table {}.", segmentsToDelete, tableNameWithType); } private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadType uploadType, diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 2b835faaae5..8cc071dd98a 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2257,13 +2257,16 @@ public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetad "Failed to set segment ZK metadata for table: " + tableNameWithType + ", segment: " + segmentName); LOGGER.info("Added segment: {} of table: {} to property store", segmentName, tableNameWithType); - assignTableSegment(tableNameWithType, segmentName); + assignTableSegments(tableNameWithType, Collections.singletonList(segmentName)); } - public void assignTableSegment(String tableNameWithType, String segmentName) { - String segmentZKMetadataPath = - ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, segmentName); - + public void assignTableSegments(String tableNameWithType, List segmentNames) { + Map segmentZKMetadataPathMap = new HashMap<>(); + for (String segmentName: segmentNames) { + String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, + segmentName); + segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath); + } // Assign instances for the segment and add it into IdealState try { TableConfig tableConfig = getTableConfig(tableNameWithType); @@ -2276,18 +2279,17 @@ public void assignTableSegment(String tableNameWithType, String segmentName) { if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { List sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(), TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager); - - // Update segment tier to support direct assignment for multiple data directories - updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); - - InstancePartitions tierInstancePartitions = - TierConfigUtils.getTieredInstancePartitionsForSegment(tableNameWithType, segmentName, sortedTiers, - _helixZkManager); - if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { - // Override instance partitions for offline table - LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", - tierInstancePartitions, segmentName, tableNameWithType); - instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions); + for (String segmentName: segmentNames) { + // Update segment tier to support direct assignment for multiple data directories + updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); + InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment( + tableNameWithType, segmentName, sortedTiers, _helixZkManager); + if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + // Override instance partitions for offline table + LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", + tierInstancePartitions, segmentName, tableNameWithType); + instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions); + } } } @@ -2297,33 +2299,39 @@ public void assignTableSegment(String tableNameWithType, String segmentName) { Map finalInstancePartitionsMap = instancePartitionsMap; HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { assert idealState != null; - Map> currentAssignment = idealState.getRecord().getMapFields(); - if (currentAssignment.containsKey(segmentName)) { - LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, - tableNameWithType); - } else { - List assignedInstances = - segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); - LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, - tableNameWithType); - currentAssignment.put(segmentName, - SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, SegmentStateModel.ONLINE)); + for (String segmentName: segmentNames) { + Map> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, + SegmentStateModel.ONLINE)); + } } return idealState; }); - LOGGER.info("Added segment: {} to IdealState for table: {}", segmentName, tableNameWithType); + LOGGER.info("Added segments: {} to IdealState for table: {}", segmentNames, tableNameWithType); } - } catch (Exception e) { + } catch (Exception ex) { LOGGER.error( - "Caught exception while adding segment: {} to IdealState for table: {}, deleting segment ZK metadata", - segmentName, tableNameWithType, e); - if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) { - LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); - } else { - LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName, - tableNameWithType); + "Caught exception while adding segments: {} to IdealState for table: {}, deleting segments ZK metadata", + segmentNames, tableNameWithType, ex); + for (Map.Entry segmentZKMetadataPathEntry: segmentZKMetadataPathMap.entrySet()) { + String segmentName = segmentZKMetadataPathEntry.getKey(); + String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue(); + if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) { + LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); + } else { + LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName, + tableNameWithType); + } } - throw e; + throw ex; } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java index 600a501274e..77b98c275e4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java @@ -18,17 +18,31 @@ */ package org.apache.pinot.controller.api.resources; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MultivaluedMap; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.spi.crypt.NoOpPinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypterFactory; import org.apache.pinot.spi.env.PinotConfiguration; +import org.glassfish.jersey.media.multipart.FormDataBodyPart; +import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; @@ -125,4 +139,105 @@ public void testEncryptSegmentIfNeededNoEncryption() { assertNull(encryptionInfo.getLeft()); assertEquals(_decryptedFile, encryptionInfo.getRight()); } + + @Test + public void testCreateSegmentFileFromBodyPart() throws IOException { + // Arrange + FormDataBodyPart mockBodyPart = mock(FormDataBodyPart.class); + File destFile = new File("testSegmentFile.txt"); + String testContent = "This is a test content"; + + // Mock input stream to return the test content + InputStream mockInputStream = new ByteArrayInputStream(testContent.getBytes()); + when(mockBodyPart.getValueAs(InputStream.class)).thenReturn(mockInputStream); + + // Act + PinotSegmentUploadDownloadRestletResource.createSegmentFileFromBodyPart(mockBodyPart, destFile); + + // Assert + try (BufferedReader reader = new BufferedReader(new FileReader(destFile))) { + StringBuilder fileContent = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + fileContent.append(line); + } + Assert.assertEquals(fileContent.toString(), testContent); + } finally { + // Clean up + destFile.delete(); + } + + // Verify that the cleanup method was called + verify(mockBodyPart).cleanup(); + } + + @Test + public void testExtractHttpHeaderWithPrefixHeaderFound() { + // Arrange + HttpHeaders mockHeaders = mock(HttpHeaders.class); + String name = "X-Source-Download-URI"; + + when(mockHeaders.getHeaderString(name)).thenReturn("http://example.com"); + + // Act + String result = _resource.extractHttpHeaderWithOptionalPrefixAsFallback(mockHeaders, name, null); + + // Assert + Assert.assertEquals(result, "http://example.com"); + } + + @Test + public void testExtractHttpHeaderWithPrefixHeaderNotFound() { + // Arrange + HttpHeaders mockHeaders = mock(HttpHeaders.class); + String name = "X-Source-Download-URI"; + String prefix = "X-Prefix-"; + + when(mockHeaders.getHeaderString(name)).thenReturn(null); + + // Create a mock MultivaluedMap + MultivaluedMap mockHeaderMap = mock(MultivaluedMap.class); + when(mockHeaders.getRequestHeaders()).thenReturn(mockHeaderMap); + + // Set up headers in the map + Map> headerMap = new HashMap<>(); + headerMap.put("X-Prefix-Example", Collections.singletonList("http://example.com")); + when(mockHeaderMap.entrySet()).thenReturn(headerMap.entrySet()); + + // Set up the mock to return the headers + for (Map.Entry> entry : headerMap.entrySet()) { + when(mockHeaderMap.get(entry.getKey())).thenReturn(entry.getValue()); + } + + // Act + String result = _resource.extractHttpHeaderWithOptionalPrefixAsFallback(mockHeaders, name, prefix); + + // Assert + Assert.assertEquals(result, "http://example.com"); + } + + @Test + public void testExtractHttpHeaderWithPrefixHeaderAndPrefixNotFound() { + // Arrange + HttpHeaders mockHeaders = mock(HttpHeaders.class); + String name = "X-Source-Download-URI"; + String prefix = "X-Prefix-"; + + // Mock extractHttpHeader method to return null + when(mockHeaders.getHeaderString(name)).thenReturn(null); + + // Create a mock MultivaluedMap + MultivaluedMap mockHeaderMap = mock(MultivaluedMap.class); + when(mockHeaders.getRequestHeaders()).thenReturn(mockHeaderMap); + + // Set up empty headers in the map + Map> headerMap = new HashMap<>(); + when(mockHeaderMap.entrySet()).thenReturn(headerMap.entrySet()); + + // Act + String result = _resource.extractHttpHeaderWithOptionalPrefixAsFallback(mockHeaders, name, prefix); + + // Assert + Assert.assertNull(result); + } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java index 0e69dd105ed..5d03930b005 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/upload/ZKOperatorTest.java @@ -21,12 +21,14 @@ import com.google.common.collect.ImmutableList; import java.io.File; import java.net.URI; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.IdealState; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; import org.apache.pinot.common.metrics.ControllerMetrics; @@ -166,9 +168,10 @@ public void testMetadataUploadType() // with finalSegmentLocation not null File finalSegmentLocation = new File(DATA_DIR, segmentName); Assert.assertFalse(finalSegmentLocation.exists()); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.METADATA, - finalSegmentLocation.toURI(), segmentFile, sourceDownloadURIStr, "downloadUrl", "crypter", 10, true, true, - httpHeaders); + SegmentUploadMetadata segmentUploadMetadata = new SegmentUploadMetadata("downloadUrl", sourceDownloadURIStr, + finalSegmentLocation.toURI(), 10L, segmentMetadata, Pair.of("crypter", segmentFile)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.METADATA, true, true, + httpHeaders, Collections.singletonList(segmentUploadMetadata)); Assert.assertTrue(finalSegmentLocation.exists()); Assert.assertTrue(segmentTar.exists()); checkSegmentZkMetadata(segmentName, 12345L, 123L); @@ -182,8 +185,10 @@ public void testMetadataUploadType() FileUtils.deleteQuietly(DATA_DIR); // with finalSegmentLocation null - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.METADATA, null, - segmentFile, sourceDownloadURIStr, "downloadUrl", "crypter", 10, true, true, httpHeaders); + SegmentUploadMetadata segmentUploadMetadata2 = new SegmentUploadMetadata("downloadUrl", + sourceDownloadURIStr, null, 10L, segmentMetadata, Pair.of("crypter", segmentFile)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.METADATA, true, true, httpHeaders, + Collections.singletonList(segmentUploadMetadata2)); Assert.assertFalse(finalSegmentLocation.exists()); Assert.assertTrue(segmentTar.exists()); checkSegmentZkMetadata(segmentName, 12345L, 123L); @@ -206,8 +211,10 @@ public void testCompleteSegmentOperations() URI finalSegmentLocationURI = URIUtils.getUri("mockPath", OFFLINE_TABLE_NAME, URIUtils.encode(segmentMetadata.getName())); File segmentFile = new File(new File("foo/bar"), "mockChild"); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, - finalSegmentLocationURI, segmentFile, "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders); + SegmentUploadMetadata segmentUploadMetadata1 = new SegmentUploadMetadata("downloadUrl", "downloadUrl", + finalSegmentLocationURI, 10L, segmentMetadata, Pair.of("crypter", segmentFile)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.SEGMENT, true, true, + httpHeaders, Collections.singletonList(segmentUploadMetadata1)); fail(); } catch (Exception e) { // Expected @@ -218,9 +225,10 @@ public void testCompleteSegmentOperations() SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); return segmentZKMetadata == null; }, 30_000L, "Failed to delete segmentZkMetadata."); - - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders); + SegmentUploadMetadata segmentUploadMetadata2 = new SegmentUploadMetadata("downloadUrl", "downloadUrl", + null, 10L, segmentMetadata, Pair.of("crypter", null)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.SEGMENT, true, true, + httpHeaders, Collections.singletonList(segmentUploadMetadata2)); SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); assertEquals(segmentZKMetadata.getCrc(), 12345L); @@ -242,8 +250,10 @@ public void testCompleteSegmentOperations() _resourceManager.getHelixAdmin() .setResourceIdealState(_resourceManager.getHelixClusterName(), OFFLINE_TABLE_NAME, idealState); // The segment should be uploaded as a new segment (push time should change, and refresh time shouldn't be set) - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "downloadUrl", "downloadUrl", "crypter", 10, true, true, httpHeaders); + SegmentUploadMetadata segmentUploadMetadata3 = new SegmentUploadMetadata("downloadUrl", "downloadUrl", + null, 10L, segmentMetadata, Pair.of("crypter", null)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.SEGMENT, true, true, + httpHeaders, Collections.singletonList(segmentUploadMetadata3)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); assertEquals(segmentZKMetadata.getCrc(), 12345L); @@ -259,8 +269,10 @@ public void testCompleteSegmentOperations() // Upload the same segment with allowRefresh = false. Validate that an exception is thrown. try { - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, false, httpHeaders); + SegmentUploadMetadata segmentUploadMetadata4 = new SegmentUploadMetadata("otherDownloadUrl", "otherDownloadUrl", + null, 10L, segmentMetadata, Pair.of("otherCrypter", null)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.SEGMENT, true, false, + httpHeaders, Collections.singletonList(segmentUploadMetadata4)); fail(); } catch (Exception e) { // Expected @@ -269,8 +281,10 @@ public void testCompleteSegmentOperations() // Refresh the segment with unmatched IF_MATCH field when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("123"); try { - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, true, httpHeaders); + SegmentUploadMetadata segmentUploadMetadata5 = new SegmentUploadMetadata("otherDownloadUrl", "otherDownloadUrl", + null, 10L, segmentMetadata, Pair.of("otherCrypter", null)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.SEGMENT, true, true, + httpHeaders, Collections.singletonList(segmentUploadMetadata5)); fail(); } catch (Exception e) { // Expected @@ -280,9 +294,10 @@ public void testCompleteSegmentOperations() // downloadURL and crypter when(httpHeaders.getHeaderString(HttpHeaders.IF_MATCH)).thenReturn("12345"); when(segmentMetadata.getIndexCreationTime()).thenReturn(456L); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 10, true, true, httpHeaders); - + SegmentUploadMetadata segmentUploadMetadata6 = new SegmentUploadMetadata("otherDownloadUrl", "otherDownloadUrl", + null, 10L, segmentMetadata, Pair.of("otherCrypter", null)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.SEGMENT, true, true, + httpHeaders, Collections.singletonList(segmentUploadMetadata6)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); assertEquals(segmentZKMetadata.getCrc(), 12345L); @@ -304,8 +319,10 @@ public void testCompleteSegmentOperations() when(segmentMetadata.getIndexCreationTime()).thenReturn(789L); // Add a tiny sleep to guarantee that refresh time is different from the previous round Thread.sleep(10); - zkOperator.completeSegmentOperations(OFFLINE_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "otherDownloadUrl", "otherDownloadUrl", "otherCrypter", 100, true, true, httpHeaders); + SegmentUploadMetadata segmentUploadMetadata5 = new SegmentUploadMetadata("otherDownloadUrl", "otherDownloadUrl", + null, 100L, segmentMetadata, Pair.of("otherCrypter", null)); + zkOperator.completeSegmentsOperations(OFFLINE_TABLE_NAME, FileUploadType.SEGMENT, true, true, + httpHeaders, Collections.singletonList(segmentUploadMetadata5)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(OFFLINE_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -328,8 +345,10 @@ public void testPushToRealtimeTable() SegmentMetadata segmentMetadata = mock(SegmentMetadata.class); when(segmentMetadata.getName()).thenReturn(SEGMENT_NAME); when(segmentMetadata.getCrc()).thenReturn("12345"); - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); + SegmentUploadMetadata segmentUploadMetadata1 = new SegmentUploadMetadata("downloadUrl", "downloadUrl", + null, 10L, segmentMetadata, Pair.of(null, null)); + zkOperator.completeSegmentsOperations(REALTIME_TABLE_NAME, FileUploadType.SEGMENT, true, true, + mock(HttpHeaders.class), Collections.singletonList(segmentUploadMetadata1)); SegmentZKMetadata segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -341,8 +360,10 @@ public void testPushToRealtimeTable() when(segmentMetadata.getName()).thenReturn(LLC_SEGMENT_NAME); when(segmentMetadata.getCrc()).thenReturn("23456"); try { - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); + SegmentUploadMetadata segmentUploadMetadata2 = new SegmentUploadMetadata("downloadUrl", "downloadUrl", + null, 10L, segmentMetadata, Pair.of(null, null)); + zkOperator.completeSegmentsOperations(REALTIME_TABLE_NAME, FileUploadType.SEGMENT, true, true, + mock(HttpHeaders.class), Collections.singletonList(segmentUploadMetadata2)); fail(); } catch (ControllerApplicationException e) { assertEquals(e.getResponse().getStatus(), Response.Status.BAD_REQUEST.getStatusCode()); @@ -352,8 +373,10 @@ public void testPushToRealtimeTable() // Uploading a segment with LLC segment name and start/end offset should success when(segmentMetadata.getStartOffset()).thenReturn("0"); when(segmentMetadata.getEndOffset()).thenReturn("1234"); - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); + SegmentUploadMetadata segmentUploadMetadata3 = new SegmentUploadMetadata("downloadUrl", "downloadUrl", + null, 10L, segmentMetadata, Pair.of(null, null)); + zkOperator.completeSegmentsOperations(REALTIME_TABLE_NAME, FileUploadType.SEGMENT, true, true, + mock(HttpHeaders.class), Collections.singletonList(segmentUploadMetadata3)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -365,8 +388,10 @@ public void testPushToRealtimeTable() when(segmentMetadata.getCrc()).thenReturn("34567"); when(segmentMetadata.getStartOffset()).thenReturn(null); when(segmentMetadata.getEndOffset()).thenReturn(null); - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); + SegmentUploadMetadata segmentUploadMetadata4 = new SegmentUploadMetadata("downloadUrl", "downloadUrl", + null, 10L, segmentMetadata, Pair.of(null, null)); + zkOperator.completeSegmentsOperations(REALTIME_TABLE_NAME, FileUploadType.SEGMENT, true, true, + mock(HttpHeaders.class), Collections.singletonList(segmentUploadMetadata4)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME); assertNotNull(segmentZKMetadata); @@ -378,8 +403,10 @@ public void testPushToRealtimeTable() when(segmentMetadata.getCrc()).thenReturn("45678"); when(segmentMetadata.getStartOffset()).thenReturn("1234"); when(segmentMetadata.getEndOffset()).thenReturn("2345"); - zkOperator.completeSegmentOperations(REALTIME_TABLE_NAME, segmentMetadata, FileUploadType.SEGMENT, null, null, - "downloadUrl", "downloadUrl", null, 10, true, true, mock(HttpHeaders.class)); + SegmentUploadMetadata segmentUploadMetadata5 = new SegmentUploadMetadata("downloadUrl", "downloadUrl", + null, 10L, segmentMetadata, Pair.of(null, null)); + zkOperator.completeSegmentsOperations(REALTIME_TABLE_NAME, FileUploadType.SEGMENT, true, true, + mock(HttpHeaders.class), Collections.singletonList(segmentUploadMetadata5)); segmentZKMetadata = _resourceManager.getSegmentZKMetadata(REALTIME_TABLE_NAME, LLC_SEGMENT_NAME); assertNotNull(segmentZKMetadata); diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java index 35fdae84489..d7fba2f9024 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java @@ -27,6 +27,7 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner; import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner; @@ -59,6 +60,7 @@ * todo: add test for URI push */ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { + private static String tableNameSuffix; @Override protected Map getStreamConfigs() { @@ -93,6 +95,7 @@ protected List getBloomFilterColumns() { @BeforeMethod public void setUpTest() throws IOException { + tableNameSuffix = RandomStringUtils.randomAlphabetic(12); TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); } @@ -136,15 +139,15 @@ public void testUploadAndQuery() jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); TableSpec tableSpec = new TableSpec(); - tableSpec.setTableName(DEFAULT_TABLE_NAME); - tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME)); + tableSpec.setTableName(getTableName()); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); jobSpec.setTableSpec(tableSpec); PinotClusterSpec clusterSpec = new PinotClusterSpec(); clusterSpec.setControllerURI(getControllerBaseApiUrl()); jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); File dataDir = new File(_controllerConfig.getDataDir()); - File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME); + File dataDirSegments = new File(dataDir, getTableName()); // Not present in dataDir, only present in sourceDir Assert.assertFalse(dataDirSegments.exists()); @@ -204,6 +207,77 @@ public void testUploadAndQuery() testCountStar(numDocs); } + @Test + public void testUploadMultipleSegmentsInBatchModeAndQuery() + throws Exception { + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig offlineTableConfig = createOfflineTableConfig(); + waitForEVToDisappear(offlineTableConfig.getTableName()); + addTableConfig(offlineTableConfig); + + List avroFiles = getAllAvroFiles(); + + // Create the list of segments + for (int segNum = 0; segNum < 12; segNum++) { + ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(segNum), offlineTableConfig, schema, "_seg" + segNum, + _segmentDir, _tarDir); + } + + SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner(); + SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setCopyToDeepStoreForMetadataPush(true); + // enable batch mode + pushJobSpec.setBatchMode(true); + jobSpec.setPushJobSpec(pushJobSpec); + PinotFSSpec fsSpec = new PinotFSSpec(); + fsSpec.setScheme("file"); + fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS"); + jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); + jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(getTableName() + "_OFFLINE"); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); + jobSpec.setTableSpec(tableSpec); + PinotClusterSpec clusterSpec = new PinotClusterSpec(); + clusterSpec.setControllerURI(getControllerBaseApiUrl()); + jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); + + File dataDir = new File(_controllerConfig.getDataDir()); + File dataDirSegments = new File(dataDir, getTableName()); + + // Not present in dataDir, only present in sourceDir + Assert.assertFalse(dataDirSegments.exists()); + Assert.assertEquals(_tarDir.listFiles().length, 12); + + runner.init(jobSpec); + runner.run(); + + // Segment should be seen in dataDir + Assert.assertTrue(dataDirSegments.exists()); + Assert.assertEquals(dataDirSegments.listFiles().length, 12); + Assert.assertEquals(_tarDir.listFiles().length, 12); + + // test segment loaded + JsonNode segmentsList = getSegmentsList(); + Assert.assertEquals(segmentsList.size(), 12); + long numDocs = 0; + for (JsonNode segmentName: segmentsList) { + numDocs += getNumDocs(segmentName.asText()); + } + testCountStar(numDocs); + + // Clear segment and tar dir + for (File segment : _segmentDir.listFiles()) { + FileUtils.deleteQuietly(segment); + } + for (File tar : _tarDir.listFiles()) { + FileUtils.deleteQuietly(tar); + } + } + /** * Runs both SegmentMetadataPushJobRunner and SegmentTarPushJobRunner while enabling consistent data push. * Checks that segments are properly loaded and segment lineage entry were also in expected states. @@ -237,15 +311,15 @@ public void testUploadAndQueryWithConsistentPush() jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); TableSpec tableSpec = new TableSpec(); - tableSpec.setTableName(DEFAULT_TABLE_NAME); - tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME)); + tableSpec.setTableName(getTableName()); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); jobSpec.setTableSpec(tableSpec); PinotClusterSpec clusterSpec = new PinotClusterSpec(); clusterSpec.setControllerURI(getControllerBaseApiUrl()); jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); File dataDir = new File(_controllerConfig.getDataDir()); - File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME); + File dataDirSegments = new File(dataDir, getTableName()); Assert.assertEquals(_tarDir.listFiles().length, 1); @@ -268,7 +342,7 @@ public void testUploadAndQueryWithConsistentPush() // Fetch segment lineage entry after running segment metadata push with consistent push enabled. String segmentLineageResponse = ControllerTest.sendGetRequest( ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()) - .forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())); + .forListAllSegmentLineages(getTableName(), TableType.OFFLINE.toString())); // Segment lineage should be in completed state. Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\"")); // SegmentsFrom should be empty as we started with a blank table. @@ -317,7 +391,7 @@ public void testUploadAndQueryWithConsistentPush() // Fetch segment lineage entry after running segment tar push with consistent push enabled. segmentLineageResponse = ControllerTest.sendGetRequest( ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()) - .forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())); + .forListAllSegmentLineages(getTableName(), TableType.OFFLINE.toString())); // Segment lineage should be in completed state. Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\"")); // SegmentsFrom should contain the previous segment @@ -337,14 +411,14 @@ protected TableConfig createOfflineTableConfigWithConsistentPush() { private long getNumDocs(String segmentName) throws IOException { return JsonUtils.stringToJsonNode( - sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME, segmentName))) + sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(), segmentName))) .get("segment.total.docs").asLong(); } private JsonNode getSegmentsList() throws IOException { return JsonUtils.stringToJsonNode(sendGetRequest( - _controllerRequestURLBuilder.forSegmentListAPI(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString()))) + _controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.OFFLINE.toString()))) .get(0).get("OFFLINE"); } @@ -362,6 +436,11 @@ public Boolean apply(@Nullable Void aVoid) { }, 100L, 300_000, "Failed to load " + countStarResult + " documents", true); } + @Override + public String getTableName() { + return DEFAULT_TABLE_NAME + tableNameSuffix; + } + @AfterMethod public void tearDownTest() throws IOException { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index 8b0963578e2..ec7ac57d62e 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -179,14 +179,14 @@ public List executeTask(PinotTaskConfig pinotTaskConfig _pinotTaskConfig = pinotTaskConfig; _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId()); String taskType = pinotTaskConfig.getTaskType(); - Map configs = pinotTaskConfig.getConfigs(); - String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); - String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY); + Map taskConfigs = pinotTaskConfig.getConfigs(); + String tableNameWithType = taskConfigs.get(MinionConstants.TABLE_NAME_KEY); + String inputSegmentNames = taskConfigs.get(MinionConstants.SEGMENT_NAME_KEY); String[] segmentNames = inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR); - String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY); - String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY); + String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); + String downloadURLString = taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY); String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR); - AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN)); + AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN)); LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType, tableNameWithType, inputSegmentNames, downloadURLString, uploadURL); File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID()); @@ -274,6 +274,8 @@ public List executeTask(PinotTaskConfig pinotTaskConfig SegmentUploadContext segmentUploadContext = new SegmentUploadContext(pinotTaskConfig, segmentConversionResults); preUploadSegments(segmentUploadContext); + Map segmentUriToTarPathMap = new HashMap<>(); + PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs); // Upload the tarred segments for (int i = 0; i < numOutputSegments; i++) { @@ -291,11 +293,11 @@ public List executeTask(PinotTaskConfig pinotTaskConfig segmentZKMetadataCustomMapModifier.toJsonString()); String pushMode = - configs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name()); + taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name()); URI outputSegmentTarURI; if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) != BatchConfigProperties.SegmentPushType.TAR) { - outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, convertedTarredSegmentFile); + outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, convertedTarredSegmentFile); LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile, outputSegmentTarURI); } else { @@ -323,13 +325,25 @@ public List executeTask(PinotTaskConfig pinotTaskConfig List parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter, tableTypeParameter); - pushSegment(tableNameParameter.getValue(), configs, outputSegmentTarURI, httpHeaders, parameters, - segmentConversionResult); + if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) + == BatchConfigProperties.SegmentPushType.METADATA) { + updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI, segmentConversionResult, + segmentUriToTarPathMap, pushJobSpec); + } else { + pushSegment(taskConfigs, outputSegmentTarURI, httpHeaders, parameters, segmentConversionResult); + } if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); } } + if (!segmentUriToTarPathMap.isEmpty()) { + // For metadata push, push all segments in batch mode + pushJobSpec.setBatchMode(true); + pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig, segmentUriToTarPathMap, pushJobSpec, + authProvider); + } + postUploadSegments(segmentUploadContext); String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName) @@ -345,50 +359,85 @@ public List executeTask(PinotTaskConfig pinotTaskConfig } } - private void pushSegment(String tableName, Map taskConfigs, URI outputSegmentTarURI, - List
headers, List parameters, SegmentConversionResult segmentConversionResult) - throws Exception { - String pushMode = - taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name()); - LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + private void updateSegmentUriToTarPathMap(Map taskConfigs, URI outputSegmentTarURI, + SegmentConversionResult segmentConversionResult, Map segmentUriToTarPathMap, + PushJobSpec pushJobSpec) { + String segmentName = segmentConversionResult.getSegmentName(); + if (!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) { + throw new RuntimeException(String.format("Output dir URI missing for metadata push while processing segment: %s", + segmentName)); + } + URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + Map localSegmentUriToTarPathMap = + SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec, + new String[]{outputSegmentTarURI.toString()}); + if (!localSegmentUriToTarPathMap.isEmpty()) { + segmentUriToTarPathMap.putAll(localSegmentUriToTarPathMap); + } + } + private PushJobSpec getPushJobSpec(Map taskConfigs) { PushJobSpec pushJobSpec = new PushJobSpec(); pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS); pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM); pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS); pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX)); pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX)); + return pushJobSpec; + } - SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec); - - switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { - case TAR: - File tarFile = new File(outputSegmentTarURI); - String segmentName = segmentConversionResult.getSegmentName(); - String tableNameWithType = segmentConversionResult.getTableNameWithType(); - String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); - SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName, - uploadURL, tarFile); - break; - case METADATA: - if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) { - URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); - try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { - Map segmentUriToTarPathMap = - SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec, - new String[]{outputSegmentTarURI.toString()}); - SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters); - } - } else { - throw new RuntimeException("Output dir URI missing for metadata push"); - } - break; - default: - throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode); + private void pushSegments(String tableNameWithType, Map taskConfigs, PinotTaskConfig pinotTaskConfig, + Map segmentUriToTarPathMap, PushJobSpec pushJobSpec, AuthProvider authProvider) + throws Exception { + String tableName = TableNameBuilder.extractRawTableName(tableNameWithType); + SegmentGenerationJobSpec spec = generateSegmentGenerationJobSpec(tableName, taskConfigs, pushJobSpec); + SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = + getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, null); // intentionally null + Header segmentZKMetadataCustomMapModifierHeader = + new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER, + segmentZKMetadataCustomMapModifier.toJsonString()); + + List
headers = new ArrayList<>(); + headers.add(segmentZKMetadataCustomMapModifierHeader); + headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider)); + + // Set parameters for upload request + NameValuePair enableParallelPushProtectionParameter = + new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true"); + NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, + TableNameBuilder.extractRawTableName(tableNameWithType)); + NameValuePair tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, + TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString()); + + List parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter, + tableTypeParameter); + URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { + SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters); + } + } + + private void pushSegment(Map taskConfigs, URI outputSegmentTarURI, + List
headers, List parameters, SegmentConversionResult segmentConversionResult) + throws Exception { + String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, + BatchConfigProperties.SegmentPushType.TAR.name()); + LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + + if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) + == BatchConfigProperties.SegmentPushType.TAR) { + File tarFile = new File(outputSegmentTarURI); + String segmentName = segmentConversionResult.getSegmentName(); + String tableNameWithType = segmentConversionResult.getTableNameWithType(); + String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); + SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName, uploadURL, + tarFile); + } else { + throw new UnsupportedOperationException("Unrecognized push mode: " + pushMode); } } - private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map taskConfigs, + private SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String tableName, Map taskConfigs, PushJobSpec pushJobSpec) { TableSpec tableSpec = new TableSpec(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java index 4a5dd219482..7eead8a60fa 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java @@ -277,6 +277,8 @@ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino Map segmentUriToTarPathMap, List
headers, List parameters) throws Exception { String tableName = spec.getTableSpec().getTableName(); + Map segmentMetadataFileMap = new HashMap<>(); + Map segmentMetadataUriPathMap = new HashMap<>(); LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}", segmentUriToTarPathMap, Arrays.toString(spec.getPinotClusterSpecs()), tableName); for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { @@ -303,59 +305,82 @@ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino } else { segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); } - try { - for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { - URI controllerURI; - try { - controllerURI = new URI(pinotClusterSpec.getControllerURI()); - } catch (URISyntaxException e) { - throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'"); - } - LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName); - int attempts = 1; - if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) { - attempts = spec.getPushJobSpec().getPushAttempts(); - } - long retryWaitMs = 1000L; - if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) { - retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis(); - } - RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { - List
reqHttpHeaders = new ArrayList<>(headers); - try { - reqHttpHeaders.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath)); - reqHttpHeaders.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, - FileUploadDownloadClient.FileUploadType.METADATA.toString())); - if (spec.getPushJobSpec() != null) { - reqHttpHeaders.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE, - String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush()))); - } + segmentMetadataFileMap.put(segmentName, segmentMetadataFile); + segmentMetadataUriPathMap.put(segmentName, segmentUriPath); + } - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata( - FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, - segmentMetadataFile, reqHttpHeaders, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); - LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName, - controllerURI, response.getStatusCode(), response.getResponse()); - return true; - } catch (HttpErrorStatusException e) { - int statusCode = e.getStatusCode(); - if (statusCode >= 500) { - // Temporary exception - LOGGER.warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", - tableName, segmentName, controllerURI, e); - return false; - } else { - // Permanent exception - LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry", - tableName, segmentName, controllerURI, e); - throw e; - } - } - }); + // perform metadata push in batch mode for every cluster + try { + for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { + URI controllerURI; + try { + controllerURI = new URI(pinotClusterSpec.getControllerURI()); + } catch (URISyntaxException e) { + throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'"); + } + LOGGER.info("Pushing segments: {} to Pinot cluster: {} for table {}", + segmentMetadataFileMap.keySet(), controllerURI, tableName); + int attempts = 1; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) { + attempts = spec.getPushJobSpec().getPushAttempts(); + } + long retryWaitMs = 1000L; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) { + retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis(); } - } finally { - FileUtils.deleteQuietly(segmentMetadataFile); + RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { + List
reqHttpHeaders = new ArrayList<>(headers); + try { + addHeaders(segmentMetadataUriPathMap, spec, reqHttpHeaders); + URI segmentUploadURI = getSegmentUploadURI(spec.getPushJobSpec(), controllerURI); + SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI, + segmentMetadataFileMap, reqHttpHeaders, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); + LOGGER.info("Response for pushing table {} segments {} to location {} - {}: {}", tableName, + segmentMetadataFileMap.keySet(), controllerURI, response.getStatusCode(), response.getResponse()); + return true; + } catch (HttpErrorStatusException ex) { + int statusCode = ex.getStatusCode(); + if (statusCode >= 500) { + // Temporary exception + LOGGER.warn("Caught temporary exception while pushing table: {} segments: {} to {}, will retry", + tableName, segmentMetadataFileMap.keySet(), controllerURI, ex); + return false; + } else { + // Permanent exception + LOGGER.error("Caught permanent exception while pushing table: {} segments: {} to {}, won't retry", + tableName, segmentMetadataFileMap.keySet(), controllerURI, ex); + throw ex; + } + } + }); } + } finally { + for (Map.Entry metadataFileEntry: segmentMetadataFileMap.entrySet()) { + FileUtils.deleteQuietly(metadataFileEntry.getValue()); + } + } + } + + private static URI getSegmentUploadURI(PushJobSpec jobSpec, URI controllerURI) + throws URISyntaxException { + if (jobSpec.isBatchMode()) { + return FileUploadDownloadClient.getUploadSegmentListURI(controllerURI); + } else { + return FileUploadDownloadClient.getUploadSegmentURI(controllerURI); + } + } + + private static void addHeaders(Map segmentMetadataUriPathMap, + SegmentGenerationJobSpec jobSpec, List
headers) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString())); + if (jobSpec.getPushJobSpec() != null) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE, + String.valueOf(jobSpec.getPushJobSpec().getCopyToDeepStoreForMetadataPush()))); + } + // For each header that is created, the segment name is the key and the segment download URI is the value + for (Map.Entry entry: segmentMetadataUriPathMap.entrySet()) { + headers.add(new BasicHeader("x-segmentname-" + entry.getKey(), entry.getValue())); } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java index 31d1ce8448c..2b9237e5fd3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java @@ -46,6 +46,13 @@ public class PushJobSpec implements Serializable { * If true, and if segment was not already in the deep store, move it to deep store. */ private boolean _copyToDeepStoreForMetadataPush; + + /** + * Applicable for METADATA push type. + * If true, multiple segment metadata files are uploaded to the controller in a single call. + */ + private boolean _batchMode; + /** * Used in SegmentUriPushJobRunner, which is used to composite the segment uri to send to pinot controller. * The URI sends to controller is in the format ${segmentUriPrefix}${segmentPath}${segmentUriSuffix} @@ -148,4 +155,12 @@ public boolean getCopyToDeepStoreForMetadataPush() { public void setCopyToDeepStoreForMetadataPush(boolean copyToDeepStoreForMetadataPush) { _copyToDeepStoreForMetadataPush = copyToDeepStoreForMetadataPush; } + + public boolean isBatchMode() { + return _batchMode; + } + + public void setBatchMode(boolean batchMode) { + _batchMode = batchMode; + } }