diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java index 4123e3157ed..4a2cb33be2b 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/FileUploadDownloadClient.java @@ -113,6 +113,7 @@ public static FileUploadType getDefaultUploadType() { private static final String SCHEMA_PATH = "/schemas"; private static final String OLD_SEGMENT_PATH = "/segments"; private static final String SEGMENT_PATH = "/v2/segments"; + private static final String SEGMENT_UPLOAD_BATCH_PATH = "/v3/segments"; private static final String TABLES_PATH = "/tables"; private static final String TYPE_DELIMITER = "type="; private static final String START_REPLACE_SEGMENTS_PATH = "/startReplaceSegments"; @@ -365,6 +366,12 @@ public static URI getUploadSegmentURI(URI controllerURI) return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), SEGMENT_PATH); } + public static URI getUploadSegmentBatchURI(URI controllerURI) + throws URISyntaxException { + return getURI(controllerURI.getScheme(), controllerURI.getHost(), controllerURI.getPort(), + SEGMENT_UPLOAD_BATCH_PATH); + } + public static URI getStartReplaceSegmentsURI(URI controllerURI, String rawTableName, String tableType, boolean forceCleanup) throws URISyntaxException { diff --git a/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientTest.java b/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientTest.java index 2640f33a695..42e508a3147 100644 --- a/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientTest.java +++ b/pinot-common/src/test/java/org/apache/pinot/common/utils/FileUploadDownloadClientTest.java @@ -27,6 +27,8 @@ import java.io.InputStream; import java.io.OutputStream; import java.net.InetSocketAddress; +import java.net.URI; +import java.net.URISyntaxException; import java.util.Collections; import java.util.List; import java.util.Random; @@ -128,6 +130,14 @@ public void testSendFileWithJson() } } + @Test + public void testGetUploadSegmentListURI() + throws URISyntaxException { + URI controllerURI = new URI("https://myhost:9443"); + URI uriWithEndpoint = FileUploadDownloadClient.getUploadSegmentBatchURI(controllerURI); + Assert.assertEquals(new URI("https://myhost:9443/v3/segments"), uriWithEndpoint); + } + @AfterClass public void shutDown() { _testServer.stop(0); diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java index 20b00a3cc3b..f1091f3916c 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResource.java @@ -36,7 +36,9 @@ import java.io.OutputStream; import java.net.InetAddress; import java.net.URI; +import java.net.UnknownHostException; import java.nio.file.Files; +import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.UUID; @@ -57,6 +59,7 @@ import javax.ws.rs.core.Context; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.MultivaluedMap; import javax.ws.rs.core.Response; import javax.ws.rs.core.StreamingOutput; import org.apache.commons.io.FileUtils; @@ -81,6 +84,7 @@ import org.apache.pinot.controller.api.access.AccessType; import org.apache.pinot.controller.api.access.Authenticate; import org.apache.pinot.controller.api.exception.ControllerApplicationException; +import org.apache.pinot.controller.api.upload.SegmentUploadMetadata; import org.apache.pinot.controller.api.upload.SegmentValidationUtils; import org.apache.pinot.controller.api.upload.ZKOperator; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -101,6 +105,7 @@ import org.apache.pinot.spi.utils.JsonUtils; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.glassfish.grizzly.http.server.Request; +import org.glassfish.jersey.media.multipart.BodyPart; import org.glassfish.jersey.media.multipart.FormDataBodyPart; import org.glassfish.jersey.media.multipart.FormDataMultiPart; import org.glassfish.jersey.server.ManagedAsync; @@ -233,7 +238,9 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl extractHttpHeader(headers, CommonConstants.Controller.TABLE_NAME_HTTP_HEADER); String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); - String sourceDownloadURIStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI); + String sourceDownloadURIStr = + extractHttpHeaderWithOptionalPrefixAsFallback(headers, FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, + CommonConstants.Controller.SEGMENT_URI_HTTP_HEADER_PREFIX); String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR); @@ -295,13 +302,18 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE); copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore); createSegmentFileFromMultipart(multiPart, destFile); + PinotFS pinotFS = null; try { URI segmentURI = new URI(sourceDownloadURIStr); - PinotFS pinotFS = PinotFSFactory.create(segmentURI.getScheme()); + pinotFS = PinotFSFactory.create(segmentURI.getScheme()); segmentSizeInBytes = pinotFS.length(segmentURI); } catch (Exception e) { segmentSizeInBytes = -1; LOGGER.warn("Could not fetch segment size for metadata push", e); + } finally { + if (pinotFS != null) { + pinotFS.close(); + } } break; default: @@ -403,6 +415,159 @@ private SuccessResponse uploadSegment(@Nullable String tableName, TableType tabl } } + // Method used to update a list of segments in batch mode with the METADATA upload type. + private SuccessResponse uploadSegments(String tableName, TableType tableType, FormDataMultiPart multiParts, + boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, Request request) { + String rawTableName = TableNameBuilder.extractRawTableName(tableName); + String tableNameWithType = tableType == TableType.OFFLINE ? TableNameBuilder.OFFLINE.tableNameWithType(rawTableName) + : TableNameBuilder.REALTIME.tableNameWithType(rawTableName); + + TableConfig tableConfig = _pinotHelixResourceManager.getTableConfig(tableNameWithType); + if (tableConfig == null) { + throw new ControllerApplicationException(LOGGER, "Failed to fetch table config for table: " + tableNameWithType, + Response.Status.BAD_REQUEST); + } + + String clientAddress; + try { + clientAddress = InetAddress.getByName(request.getRemoteAddr()).getHostName(); + } catch (UnknownHostException e) { + throw new ControllerApplicationException(LOGGER, "Failed to resolve hostname from input request", + Response.Status.BAD_REQUEST, e); + } + + String uploadTypeStr = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE); + FileUploadType uploadType = getUploadType(uploadTypeStr); + if (!FileUploadType.METADATA.equals(uploadType)) { + throw new ControllerApplicationException(LOGGER, "Unsupported upload type: " + uploadTypeStr, + Response.Status.BAD_REQUEST); + } + + String crypterClassNameInHeader = extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.CRYPTER); + String ingestionDescriptor = extractHttpHeader(headers, CommonConstants.Controller.INGESTION_DESCRIPTOR); + ControllerFilePathProvider provider = ControllerFilePathProvider.getInstance(); + List segmentUploadMetadataList = new ArrayList<>(); + List tempFiles = new ArrayList<>(); + List segmentNames = new ArrayList<>(); + + List bodyParts = multiParts.getBodyParts(); + for (int bodyPartIndex = 0; bodyPartIndex < bodyParts.size(); bodyPartIndex++) { + FormDataBodyPart bodyPart = (FormDataBodyPart) bodyParts.get(bodyPartIndex); + String segmentName = bodyPart.getContentDisposition().getFileName(); + segmentNames.add(segmentName); + if (StringUtils.isEmpty(segmentName)) { + throw new ControllerApplicationException(LOGGER, + "filename is a required field within the multipart object for METADATA batch upload mode.", + Response.Status.BAD_REQUEST); + } + File tempEncryptedFile; + File tempDecryptedFile; + File tempSegmentDir; + try { + String sourceDownloadURIStr = + extractHttpHeader(headers, CommonConstants.Controller.SEGMENT_URI_HTTP_HEADER_PREFIX + segmentName); + if (StringUtils.isEmpty(sourceDownloadURIStr)) { + throw new ControllerApplicationException(LOGGER, + "'DOWNLOAD_URI' is required as a field within the multipart object for METADATA batch upload mode.", + Response.Status.BAD_REQUEST); + } + // The downloadUri for putting into segment zk metadata + String segmentDownloadURIStr = sourceDownloadURIStr; + + String tempFileName = TMP_DIR_PREFIX + UUID.randomUUID(); + tempEncryptedFile = new File(provider.getFileUploadTempDir(), tempFileName + ENCRYPTED_SUFFIX); + tempFiles.add(tempEncryptedFile); + tempDecryptedFile = new File(provider.getFileUploadTempDir(), tempFileName); + tempFiles.add(tempDecryptedFile); + tempSegmentDir = new File(provider.getUntarredFileTempDir(), tempFileName); + tempFiles.add(tempSegmentDir); + boolean encryptSegment = StringUtils.isNotEmpty(crypterClassNameInHeader); + File destFile = encryptSegment ? tempEncryptedFile : tempDecryptedFile; + // override copySegmentToFinalLocation if override provided in headers:COPY_SEGMENT_TO_DEEP_STORE + // else set to false for backward compatibility + String copySegmentToDeepStore = + extractHttpHeader(headers, FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE); + boolean copySegmentToFinalLocation = Boolean.parseBoolean(copySegmentToDeepStore); + createSegmentFileFromBodyPart(bodyPart, destFile); + // Include the un-tarred segment size when available + long segmentSizeInBytes = getSegmentSizeFromHeaders(segmentName, headers); + if (segmentSizeInBytes < 0) { + // Use the tarred segment size as an approximation. + segmentSizeInBytes = getSegmentSizeFromFile(sourceDownloadURIStr); + } + if (encryptSegment) { + decryptFile(crypterClassNameInHeader, tempEncryptedFile, tempDecryptedFile); + } + + String metadataProviderClass = DefaultMetadataExtractor.class.getName(); + SegmentMetadata segmentMetadata = getSegmentMetadata(tempDecryptedFile, tempSegmentDir, metadataProviderClass); + LOGGER.info("Processing upload request for segment: {} of table: {} with upload type: {} from client: {}, " + + "ingestion descriptor: {}", segmentName, tableNameWithType, uploadType, clientAddress, + ingestionDescriptor); + + // Validate segment + if (tableConfig.getIngestionConfig() == null || tableConfig.getIngestionConfig().isSegmentTimeValueCheck()) { + SegmentValidationUtils.validateTimeInterval(segmentMetadata, tableConfig); + } + long untarredSegmentSizeInBytes = 0L; + if (segmentSizeInBytes > 0) { + untarredSegmentSizeInBytes = segmentSizeInBytes; + } + SegmentValidationUtils.checkStorageQuota(segmentName, untarredSegmentSizeInBytes, tableConfig, _controllerConf, + _storageQuotaChecker); + + // Encrypt segment + String crypterNameInTableConfig = tableConfig.getValidationConfig().getCrypterClassName(); + Pair encryptionInfo = + encryptSegmentIfNeeded(tempDecryptedFile, tempEncryptedFile, encryptSegment, crypterClassNameInHeader, + crypterNameInTableConfig, segmentName, tableNameWithType); + File segmentFile = encryptionInfo.getRight(); + + // Update download URI if controller is responsible for moving the segment to the deep store + URI finalSegmentLocationURI = null; + if (copySegmentToFinalLocation) { + URI dataDirURI = provider.getDataDirURI(); + String dataDirPath = dataDirURI.toString(); + String encodedSegmentName = URIUtils.encode(segmentName); + String finalSegmentLocationPath = URIUtils.getPath(dataDirPath, rawTableName, encodedSegmentName); + if (dataDirURI.getScheme().equalsIgnoreCase(CommonConstants.Segment.LOCAL_SEGMENT_SCHEME)) { + segmentDownloadURIStr = URIUtils.getPath(provider.getVip(), "segments", rawTableName, encodedSegmentName); + } else { + segmentDownloadURIStr = finalSegmentLocationPath; + } + finalSegmentLocationURI = URIUtils.getUri(finalSegmentLocationPath); + } + SegmentUploadMetadata segmentUploadMetadata = + new SegmentUploadMetadata(segmentDownloadURIStr, sourceDownloadURIStr, finalSegmentLocationURI, + segmentSizeInBytes, segmentMetadata, encryptionInfo); + segmentUploadMetadataList.add(segmentUploadMetadata); + LOGGER.info("Using segment download URI: {} for segment: {} of table: {} (move segment: {})", + segmentDownloadURIStr, segmentFile, tableNameWithType, copySegmentToFinalLocation); + // complete segment operations for all the segments + if (bodyPartIndex + 1 == bodyParts.size()) { + ZKOperator zkOperator = new ZKOperator(_pinotHelixResourceManager, _controllerConf, _controllerMetrics); + zkOperator.completeSegmentsOperations(tableNameWithType, uploadType, enableParallelPushProtection, + allowRefresh, headers, segmentUploadMetadataList); + } + } catch (Exception e) { + _controllerMetrics.addMeteredGlobalValue(ControllerMeter.CONTROLLER_SEGMENT_UPLOAD_ERROR, + segmentUploadMetadataList.size()); + throw new ControllerApplicationException(LOGGER, + "Exception while processing segments to upload: " + e.getMessage(), Response.Status.INTERNAL_SERVER_ERROR, + e); + } finally { + cleanupTempFiles(tempFiles); + } + } + return new SuccessResponse("Successfully uploaded segments: " + segmentNames + " of table: " + tableNameWithType); + } + + private void cleanupTempFiles(List tempFiles) { + for (File tempFile : tempFiles) { + FileUtils.deleteQuietly(tempFile); + } + } + @Nullable private String extractHttpHeader(HttpHeaders headers, String name) { String value = headers.getHeaderString(name); @@ -412,6 +577,24 @@ private String extractHttpHeader(HttpHeaders headers, String name) { return value; } + @Nullable + @VisibleForTesting + String extractHttpHeaderWithOptionalPrefixAsFallback(HttpHeaders headers, String name, String prefix) { + String sourceDownloadURIStr = extractHttpHeader(headers, name); + if (StringUtils.isEmpty(sourceDownloadURIStr)) { + MultivaluedMap headerMap = headers.getRequestHeaders(); + for (Map.Entry> entry : headerMap.entrySet()) { + if (entry.getKey().startsWith(prefix)) { + List sourceDownloadURIValue = entry.getValue(); + if (!sourceDownloadURIValue.isEmpty()) { + return sourceDownloadURIValue.get(0); + } + } + } + } + return sourceDownloadURIStr; + } + @VisibleForTesting Pair encryptSegmentIfNeeded(File tempDecryptedFile, File tempEncryptedFile, boolean isUploadedSegmentEncrypted, String crypterUsedInUploadedSegment, String crypterClassNameInTableConfig, @@ -555,6 +738,65 @@ public void uploadSegmentAsMultiPart(FormDataMultiPart multiPart, } } + @POST + @ManagedAsync + @Produces(MediaType.APPLICATION_JSON) + @Consumes(MediaType.MULTIPART_FORM_DATA) + @Path("/v3/segments") + @Authorize(targetType = TargetType.TABLE, paramName = "tableName", action = Actions.Cluster.UPLOAD_SEGMENT) + @Authenticate(AccessType.CREATE) + @ApiOperation(value = "Upload a batch of segments", notes = "Upload a batch of segments with METADATA upload type") + @ApiResponses(value = { + @ApiResponse(code = 200, message = "Successfully uploaded segment"), + @ApiResponse(code = 400, message = "Bad Request"), + @ApiResponse(code = 403, message = "Segment validation fails"), + @ApiResponse(code = 409, message = "Segment already exists or another parallel push in progress"), + @ApiResponse(code = 410, message = "Segment to refresh does not exist"), + @ApiResponse(code = 412, message = "CRC check fails"), + @ApiResponse(code = 500, message = "Internal error") + }) + @TrackInflightRequestMetrics + @TrackedByGauge(gauge = ControllerGauge.SEGMENT_UPLOADS_IN_PROGRESS) + // This multipart based endpoint is used to upload a list of segments in batch mode. + public void uploadSegmentsAsMultiPart(FormDataMultiPart multiPart, + @ApiParam(value = "Name of the table", required = true) + @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_NAME) + String tableName, + @ApiParam(value = "Type of the table", required = true) + @QueryParam(FileUploadDownloadClient.QueryParameters.TABLE_TYPE) + String tableType, + @ApiParam(value = "Whether to enable parallel push protection") + @DefaultValue("false") + @QueryParam(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION) + boolean enableParallelPushProtection, + @ApiParam(value = "Whether to refresh if the segment already exists") + @DefaultValue("true") + @QueryParam(FileUploadDownloadClient.QueryParameters.ALLOW_REFRESH) + boolean allowRefresh, + @Context HttpHeaders headers, + @Context Request request, + @Suspended final AsyncResponse asyncResponse) { + if (StringUtils.isEmpty(tableName)) { + throw new ControllerApplicationException(LOGGER, + "tableName is a required field while uploading segments in batch mode.", Response.Status.BAD_REQUEST); + } + if (StringUtils.isEmpty(tableType)) { + throw new ControllerApplicationException(LOGGER, + "tableType is a required field while uploading segments in batch mode.", Response.Status.BAD_REQUEST); + } + if (multiPart == null) { + throw new ControllerApplicationException(LOGGER, + "multiPart is a required field while uploading segments in batch mode.", Response.Status.BAD_REQUEST); + } + try { + asyncResponse.resume( + uploadSegments(tableName, TableType.valueOf(tableType.toUpperCase()), multiPart, enableParallelPushProtection, + allowRefresh, headers, request)); + } catch (Throwable t) { + asyncResponse.resume(t); + } + } + @POST @ManagedAsync @Produces(MediaType.APPLICATION_JSON) @@ -752,6 +994,17 @@ private static void createSegmentFileFromMultipart(FormDataMultiPart multiPart, } } + @VisibleForTesting + static void createSegmentFileFromBodyPart(FormDataBodyPart segmentMetadataBodyPart, File destFile) + throws IOException { + try (InputStream inputStream = segmentMetadataBodyPart.getValueAs(InputStream.class); + OutputStream outputStream = new FileOutputStream(destFile)) { + IOUtils.copyLarge(inputStream, outputStream); + } finally { + segmentMetadataBodyPart.cleanup(); + } + } + private FileUploadType getUploadType(String uploadTypeStr) { if (uploadTypeStr != null) { return FileUploadType.valueOf(uploadTypeStr); @@ -760,6 +1013,35 @@ private FileUploadType getUploadType(String uploadTypeStr) { } } + @VisibleForTesting + long getSegmentSizeFromHeaders(String segmentName, HttpHeaders headers) { + String segmentSizeHeader = CommonConstants.Controller.SEGMENT_SIZE_HTTP_HEADER_PREFIX + segmentName; + String segmentSizeStr = extractHttpHeader(headers, segmentSizeHeader); + if (StringUtils.isEmpty(segmentSizeStr)) { + return -1; + } + return Long.parseLong(segmentSizeStr); + } + + @VisibleForTesting + long getSegmentSizeFromFile(String sourceDownloadURIStr) + throws IOException { + long segmentSizeInBytes = -1; + PinotFS pinotFS = null; + try { + URI segmentURI = new URI(sourceDownloadURIStr); + pinotFS = PinotFSFactory.create(segmentURI.getScheme()); + segmentSizeInBytes = pinotFS.length(segmentURI); + } catch (Exception e) { + LOGGER.warn(String.format("Exception while segment size for uri: %s", sourceDownloadURIStr), e); + } finally { + if (pinotFS != null) { + pinotFS.close(); + } + } + return segmentSizeInBytes; + } + // Validate that there is one file that is in the input. public static boolean validateMultiPart(Map> map, String segmentName) { boolean isGood = true; diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java new file mode 100644 index 00000000000..43a2e22a021 --- /dev/null +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/SegmentUploadMetadata.java @@ -0,0 +1,120 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.controller.api.upload; + +import java.io.File; +import java.net.URI; +import java.util.Objects; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.pinot.segment.spi.SegmentMetadata; + + +/** + * Data object used while adding or updating segments. It's comprised of the following fields: + *
    + *
  1. segmentDownloadURIStr – The segment download URI persisted into the ZK metadata.
  2. + *
  3. sourceDownloadURIStr – The URI from where the segment could be downloaded.
  4. + *
  5. finalSegmentLocationURI – The final location of the segment in the deep-store.
  6. + *
  7. segmentSizeInBytes – The segment size in bytes.
  8. + *
  9. segmentMetadata – The segment metadata as defined in {@link org.apache.pinot.segment.spi.SegmentMetadata}.
  10. + *
  11. encryptionInfo – A pair consisting of the crypter class used to encrypt the segment, and the encrypted segment + * file.
  12. + *
  13. segmentMetadataZNRecord – The segment metadata represented as a helix + * {@link org.apache.helix.zookeeper.datamodel.ZNRecord}.
  14. + *
+ */ +public class SegmentUploadMetadata { + private final String _segmentDownloadURIStr; + private final String _sourceDownloadURIStr; + private final URI _finalSegmentLocationURI; + // Segment size reported in bytes. + private final Long _segmentSizeInBytes; + // The segment met + private final SegmentMetadata _segmentMetadata; + // The crypter class used to encrypt the segment, The encrypted segment file. + private final Pair _encryptionInfo; + private ZNRecord _segmentMetadataZNRecord; + + public SegmentUploadMetadata(String segmentDownloadURIStr, String sourceDownloadURIStr, URI finalSegmentLocationURI, + Long segmentSizeInBytes, SegmentMetadata segmentMetadata, Pair encryptionInfo) { + _segmentDownloadURIStr = segmentDownloadURIStr; + _sourceDownloadURIStr = sourceDownloadURIStr; + _segmentSizeInBytes = segmentSizeInBytes; + _segmentMetadata = segmentMetadata; + _encryptionInfo = encryptionInfo; + _finalSegmentLocationURI = finalSegmentLocationURI; + } + + public String getSegmentDownloadURIStr() { + return _segmentDownloadURIStr; + } + + public String getSourceDownloadURIStr() { + return _sourceDownloadURIStr; + } + + public URI getFinalSegmentLocationURI() { + return _finalSegmentLocationURI; + } + + public Long getSegmentSizeInBytes() { + return _segmentSizeInBytes; + } + + public SegmentMetadata getSegmentMetadata() { + return _segmentMetadata; + } + + public Pair getEncryptionInfo() { + return _encryptionInfo; + } + + public void setSegmentMetadataZNRecord(ZNRecord segmentMetadataZNRecord) { + _segmentMetadataZNRecord = segmentMetadataZNRecord; + } + + public ZNRecord getSegmentMetadataZNRecord() { + return _segmentMetadataZNRecord; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SegmentUploadMetadata that = (SegmentUploadMetadata) o; + return Objects.equals(_segmentDownloadURIStr, that._segmentDownloadURIStr) + && Objects.equals(_sourceDownloadURIStr, that._sourceDownloadURIStr) + && Objects.equals(_finalSegmentLocationURI, that._finalSegmentLocationURI) + && Objects.equals(_segmentSizeInBytes, that._segmentSizeInBytes) + && Objects.equals(_segmentMetadata, that._segmentMetadata) + && Objects.equals(_encryptionInfo, that._encryptionInfo) + && Objects.equals(_segmentMetadataZNRecord, that._segmentMetadataZNRecord); + } + + @Override + public int hashCode() { + return Objects.hash(_segmentDownloadURIStr, _sourceDownloadURIStr, _finalSegmentLocationURI, + _segmentSizeInBytes, _segmentMetadata, _encryptionInfo, _segmentMetadataZNRecord); + } +} diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java index b0aee83d0d5..59befc7e559 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/api/upload/ZKOperator.java @@ -21,9 +21,14 @@ import com.google.common.base.Preconditions; import java.io.File; import java.net.URI; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; import javax.annotation.Nullable; import javax.ws.rs.core.HttpHeaders; import javax.ws.rs.core.Response; +import org.apache.commons.lang3.tuple.Pair; import org.apache.helix.model.IdealState; import org.apache.helix.zookeeper.datamodel.ZNRecord; import org.apache.pinot.common.metadata.segment.SegmentZKMetadata; @@ -110,6 +115,61 @@ public void completeSegmentOperations(String tableNameWithType, SegmentMetadata } } + // Complete segment operations for a list of segments in batch mode + public void completeSegmentsOperations(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, boolean allowRefresh, HttpHeaders headers, + List segmentUploadMetadataList) + throws Exception { + boolean refreshOnly = + Boolean.parseBoolean(headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.REFRESH_ONLY)); + List newSegmentsList = new ArrayList<>(); + List existingSegmentsList = new ArrayList<>(); + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentName = segmentMetadata.getName(); + + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord != null && shouldProcessAsNewSegment(tableNameWithType, segmentName, + existingSegmentMetadataZNRecord, enableParallelPushProtection)) { + LOGGER.warn("Removing segment ZK metadata (recovering from previous upload failure) for table: {}, segment: {}", + tableNameWithType, segmentName); + Preconditions.checkState(_pinotHelixResourceManager.removeSegmentZKMetadata(tableNameWithType, segmentName), + "Failed to remove segment ZK metadata for table: %s, segment: %s", tableNameWithType, segmentName); + existingSegmentMetadataZNRecord = null; + } + + if (existingSegmentMetadataZNRecord == null) { + // Add a new segment + if (refreshOnly) { + throw new ControllerApplicationException(LOGGER, + String.format("Cannot refresh non-existing segment: %s for table: %s", segmentName, tableNameWithType), + Response.Status.GONE); + } + LOGGER.info("Adding new segment: {} to table: {}", segmentName, tableNameWithType); + newSegmentsList.add(segmentUploadMetadata); + } else { + // Refresh an existing segment + if (!allowRefresh) { + // We cannot perform this check up-front in UploadSegment API call. If a segment doesn't exist during the + // check done up-front but ends up getting created before the check here, we could incorrectly refresh an + // existing segment. + throw new ControllerApplicationException(LOGGER, + String.format("Segment: %s already exists in table: %s. Refresh not permitted.", segmentName, + tableNameWithType), Response.Status.CONFLICT); + } + LOGGER.info("Segment: {} already exists in table: {}, refreshing it", segmentName, tableNameWithType); + segmentUploadMetadata.setSegmentMetadataZNRecord(existingSegmentMetadataZNRecord); + existingSegmentsList.add(segmentUploadMetadata); + } + } + // process new segments + processNewSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, newSegmentsList); + + // process existing segments + processExistingSegments(tableNameWithType, uploadType, enableParallelPushProtection, headers, existingSegmentsList); + } + /** * Returns {@code true} when the segment should be processed as new segment. *

When segment ZK metadata exists, check if segment exists in the ideal state. If the previous upload failed after @@ -276,6 +336,144 @@ private void processExistingSegment(String tableNameWithType, SegmentMetadata se } } + // process a batch of existing segments + private void processExistingSegments(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, HttpHeaders headers, List segmentUploadMetadataList) + throws Exception { + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentDownloadURIStr = segmentUploadMetadata.getSegmentDownloadURIStr(); + String sourceDownloadURIStr = segmentUploadMetadata.getSourceDownloadURIStr(); + URI finalSegmentLocationURI = segmentUploadMetadata.getFinalSegmentLocationURI(); + Pair encryptionInfo = segmentUploadMetadata.getEncryptionInfo(); + String crypterName = encryptionInfo.getLeft(); + File segmentFile = encryptionInfo.getRight(); + String segmentName = segmentMetadata.getName(); + ZNRecord existingSegmentMetadataZNRecord = segmentUploadMetadata.getSegmentMetadataZNRecord(); + long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes(); + int expectedVersion = existingSegmentMetadataZNRecord.getVersion(); + + // Check if CRC match when IF-MATCH header is set + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingCrc = segmentZKMetadata.getCrc(); + checkCRC(headers, tableNameWithType, segmentName, existingCrc); + + // Check segment upload start time when parallel push protection enabled + if (enableParallelPushProtection) { + // When segment upload start time is larger than 0, that means another upload is in progress + long segmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); + if (segmentUploadStartTime > 0) { + handleParallelPush(tableNameWithType, segmentName, segmentUploadStartTime); + } + + // Lock the segment by setting the upload start time in ZK + segmentZKMetadata.setSegmentUploadStartTime(System.currentTimeMillis()); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new ControllerApplicationException(LOGGER, + String.format("Failed to lock the segment: %s of table: %s, retry later", segmentName, tableNameWithType), + Response.Status.CONFLICT); + } else { + // The version will increment if the zk metadata update is successful + expectedVersion++; + } + } + + // Reset segment upload start time to unlock the segment later + // NOTE: reset this value even if parallel push protection is not enabled so that segment can recover in case + // previous segment upload did not finish properly and the parallel push protection is turned off + segmentZKMetadata.setSegmentUploadStartTime(-1); + + try { + // Construct the segment ZK metadata custom map modifier + String customMapModifierStr = + headers.getHeaderString(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER); + SegmentZKMetadataCustomMapModifier customMapModifier = + customMapModifierStr != null ? new SegmentZKMetadataCustomMapModifier(customMapModifierStr) : null; + + // Update ZK metadata and refresh the segment if necessary + long newCrc = Long.parseLong(segmentMetadata.getCrc()); + if (newCrc == existingCrc) { + LOGGER.info( + "New segment crc '{}' is the same as existing segment crc for segment '{}'. Updating ZK metadata without " + + "refreshing the segment.", newCrc, segmentName); + // NOTE: Even though we don't need to refresh the segment, we should still update the following fields: + // - Creation time (not included in the crc) + // - Refresh time + // - Custom map + segmentZKMetadata.setCreationTime(segmentMetadata.getIndexCreationTime()); + segmentZKMetadata.setRefreshTime(System.currentTimeMillis()); + if (customMapModifier != null) { + segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); + } else { + // If no modifier is provided, use the custom map from the segment metadata + segmentZKMetadata.setCustomMap(segmentMetadata.getCustomMap()); + } + if (!segmentZKMetadata.getDownloadUrl().equals(segmentDownloadURIStr)) { + // For offline ingestion, it is quite common that the download.uri would change but the crc would be the + // same. E.g. a user re-runs the job which process the same data and segments are stored/pushed from a + // different path from the Deepstore. Read more: https://github.com/apache/pinot/issues/11535 + LOGGER.info("Updating segment download url from: {} to: {} even though crc is the same", + segmentZKMetadata.getDownloadUrl(), segmentDownloadURIStr); + segmentZKMetadata.setDownloadUrl(segmentDownloadURIStr); + // When download URI changes, we also need to copy the segment to the final location if existed. + // This typically means users changed the push type from METADATA to SEGMENT or SEGMENT to METADATA. + // Note that switching push type from SEGMENT to METADATA may lead orphan segments in the controller + // managed directory. Read more: https://github.com/apache/pinot/pull/11720 + if (finalSegmentLocationURI != null) { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } + } + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new RuntimeException( + String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", + segmentName, tableNameWithType, expectedVersion)); + } + } else { + // New segment is different with the existing one, update ZK metadata and refresh the segment + LOGGER.info( + "New segment crc {} is different than the existing segment crc {}. Updating ZK metadata and refreshing " + + "segment {}", newCrc, existingCrc, segmentName); + if (finalSegmentLocationURI != null) { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } + + // NOTE: Must first set the segment ZK metadata before trying to refresh because servers and brokers rely on + // segment ZK metadata to refresh the segment (server will compare the segment ZK metadata with the local + // metadata to decide whether to download the new segment; broker will update the segment partition info & + // time boundary based on the segment ZK metadata) + if (customMapModifier == null) { + // If no modifier is provided, use the custom map from the segment metadata + segmentZKMetadata.setCustomMap(null); + ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + } else { + // If modifier is provided, first set the custom map from the segment metadata, then apply the modifier + ZKMetadataUtils.refreshSegmentZKMetadata(tableNameWithType, segmentZKMetadata, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + segmentZKMetadata.setCustomMap(customMapModifier.modifyMap(segmentZKMetadata.getCustomMap())); + } + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + throw new RuntimeException( + String.format("Failed to update ZK metadata for segment: %s, table: %s, expected version: %d", + segmentName, tableNameWithType, expectedVersion)); + } + LOGGER.info("Updated segment: {} of table: {} to property store", segmentName, tableNameWithType); + + // Send a message to servers and brokers hosting the table to refresh the segment + _pinotHelixResourceManager.sendSegmentRefreshMessage(tableNameWithType, segmentName, true, true); + } + } catch (Exception e) { + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, segmentZKMetadata, expectedVersion)) { + LOGGER.error("Failed to update ZK metadata for segment: {}, table: {}, expected version: {}", segmentName, + tableNameWithType, expectedVersion); + } + throw e; + } + } + } + private void checkCRC(HttpHeaders headers, String tableNameWithType, String segmentName, long existingCrc) { String expectedCrcStr = headers.getHeaderString(HttpHeaders.IF_MATCH); if (expectedCrcStr != null) { @@ -374,6 +572,102 @@ private void processNewSegment(String tableNameWithType, SegmentMetadata segment } } + // process a batch of new segments + private void processNewSegments(String tableNameWithType, FileUploadType uploadType, + boolean enableParallelPushProtection, HttpHeaders headers, List segmentUploadMetadataList) + throws Exception { + Map segmentZKMetadataMap = new HashMap<>(); + List segmentNames = new ArrayList<>(); + long segmentUploadStartTime = System.currentTimeMillis(); + for (SegmentUploadMetadata segmentUploadMetadata: segmentUploadMetadataList) { + SegmentMetadata segmentMetadata = segmentUploadMetadata.getSegmentMetadata(); + String segmentName = segmentMetadata.getName(); + SegmentZKMetadata newSegmentZKMetadata; + URI finalSegmentLocationURI = segmentUploadMetadata.getFinalSegmentLocationURI(); + String segmentDownloadURIStr = segmentUploadMetadata.getSegmentDownloadURIStr(); + String sourceDownloadURIStr = segmentUploadMetadata.getSourceDownloadURIStr(); + String crypterName = segmentUploadMetadata.getEncryptionInfo().getLeft(); + long segmentSizeInBytes = segmentUploadMetadata.getSegmentSizeInBytes(); + File segmentFile = segmentUploadMetadata.getEncryptionInfo().getRight(); + try { + newSegmentZKMetadata = ZKMetadataUtils.createSegmentZKMetadata(tableNameWithType, segmentMetadata, + segmentDownloadURIStr, crypterName, segmentSizeInBytes); + segmentZKMetadataMap.put(segmentName, newSegmentZKMetadata); + segmentNames.add(segmentName); + } catch (IllegalArgumentException e) { + throw new ControllerApplicationException(LOGGER, + String.format("Got invalid segment metadata when adding segment: %s for table: %s, reason: %s", segmentName, + tableNameWithType, e.getMessage()), Response.Status.BAD_REQUEST); + } + + // Lock if enableParallelPushProtection is true. + if (enableParallelPushProtection) { + newSegmentZKMetadata.setSegmentUploadStartTime(segmentUploadStartTime); + } + + // Update zk metadata custom map + String segmentZKMetadataCustomMapModifierStr = headers != null ? headers.getHeaderString( + FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER) : null; + if (segmentZKMetadataCustomMapModifierStr != null) { + SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = new SegmentZKMetadataCustomMapModifier( + segmentZKMetadataCustomMapModifierStr); + newSegmentZKMetadata.setCustomMap(segmentZKMetadataCustomMapModifier.modifyMap( + newSegmentZKMetadata.getCustomMap())); + } + if (!_pinotHelixResourceManager.createSegmentZkMetadata(tableNameWithType, newSegmentZKMetadata)) { + throw new RuntimeException(String.format("Failed to create ZK metadata for segment: %s of table: %s", + segmentName, tableNameWithType)); + } + + if (finalSegmentLocationURI != null) { + try { + copySegmentToDeepStore(tableNameWithType, segmentName, uploadType, segmentFile, sourceDownloadURIStr, + finalSegmentLocationURI); + } catch (Exception e) { + // Cleanup the Zk entry and the segment from the permanent directory if it exists. + LOGGER.error("Could not move segment {} from table {} to permanent directory", + segmentName, tableNameWithType, e); + // Delete all segments that are getting processed as we are in batch mode + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, enableParallelPushProtection); + throw e; + } + } + } + + try { + _pinotHelixResourceManager.assignTableSegments(tableNameWithType, segmentNames); + } catch (Exception e) { + // assignTableSegment removes the zk entry. + // Call deleteSegment to remove the segment from permanent location if needed. + LOGGER.error("Caught exception while calling assignTableSegments for adding segments: {} to table: {}", + segmentZKMetadataMap.keySet(), tableNameWithType, e); + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, enableParallelPushProtection); + throw e; + } + + for (Map.Entry segmentZKMetadataEntry: segmentZKMetadataMap.entrySet()) { + SegmentZKMetadata newSegmentZKMetadata = segmentZKMetadataEntry.getValue(); + String segmentName = segmentZKMetadataEntry.getKey(); + if (enableParallelPushProtection) { + // Release lock. Expected version will be 0 as we hold a lock and no updates could take place meanwhile. + newSegmentZKMetadata.setSegmentUploadStartTime(-1); + if (!_pinotHelixResourceManager.updateZkMetadata(tableNameWithType, newSegmentZKMetadata, 0)) { + // There is a race condition when it took too much time for the 1st segment upload to process (due to slow + // PinotFS access), which leads to the 2nd attempt of segment upload, and the 2nd segment upload succeeded. + // In this case, when the 1st upload comes back, it shouldn't blindly delete the segment when it failed to + // update the zk metadata. Instead, the 1st attempt should validate the upload start time one more time. + // If the start time doesn't match with the one persisted in zk metadata, segment deletion should be skipped. + String errorMsg = String.format("Failed to update ZK metadata for segment: %s of table: %s", segmentName, + tableNameWithType); + LOGGER.error(errorMsg); + // Delete all segments that are getting processed as we are in batch mode + deleteSegmentsIfNeeded(tableNameWithType, segmentNames, segmentUploadStartTime, true); + throw new RuntimeException(errorMsg); + } + } + } + } + /** * Deletes the segment to be uploaded if either one of the criteria is qualified: * 1) the uploadStartTime matches with the one persisted in ZK metadata. @@ -397,6 +691,33 @@ private void deleteSegmentIfNeeded(String tableNameWithType, String segmentName, } } + /** + * Deletes the segments to be uploaded if either one of the criteria is qualified: + * 1) the uploadStartTime matches with the one persisted in ZK metadata. + * 2) enableParallelPushProtection is not enabled. + */ + private void deleteSegmentsIfNeeded(String tableNameWithType, List segmentNames, + long currentSegmentUploadStartTime, boolean enableParallelPushProtection) { + List segmentsToDelete = new ArrayList<>(); + for (String segmentName: segmentNames) { + ZNRecord existingSegmentMetadataZNRecord = + _pinotHelixResourceManager.getSegmentMetadataZnRecord(tableNameWithType, segmentName); + if (existingSegmentMetadataZNRecord == null) { + continue; + } + // Check if the upload start time is set by this thread itself, if yes delete the segment. + SegmentZKMetadata segmentZKMetadata = new SegmentZKMetadata(existingSegmentMetadataZNRecord); + long existingSegmentUploadStartTime = segmentZKMetadata.getSegmentUploadStartTime(); + LOGGER.info("Parallel push protection is {} for segment: {}.", + (enableParallelPushProtection ? "enabled" : "disabled"), segmentName); + if (!enableParallelPushProtection || currentSegmentUploadStartTime == existingSegmentUploadStartTime) { + segmentsToDelete.add(segmentName); + } + } + _pinotHelixResourceManager.deleteSegments(tableNameWithType, segmentsToDelete); + LOGGER.info("Deleted zk entry and segments {} for table {}.", segmentsToDelete, tableNameWithType); + } + private void copySegmentToDeepStore(String tableNameWithType, String segmentName, FileUploadType uploadType, File segmentFile, String sourceDownloadURIStr, URI finalSegmentLocationURI) throws Exception { diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java index 2b835faaae5..f0a0993f4f6 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManager.java @@ -2257,7 +2257,7 @@ public void addNewSegment(String tableNameWithType, SegmentMetadata segmentMetad "Failed to set segment ZK metadata for table: " + tableNameWithType + ", segment: " + segmentName); LOGGER.info("Added segment: {} of table: {} to property store", segmentName, tableNameWithType); - assignTableSegment(tableNameWithType, segmentName); + assignTableSegments(tableNameWithType, Collections.singletonList(segmentName)); } public void assignTableSegment(String tableNameWithType, String segmentName) { @@ -2327,6 +2327,82 @@ public void assignTableSegment(String tableNameWithType, String segmentName) { } } + // Assign a list of segments in batch mode + public void assignTableSegments(String tableNameWithType, List segmentNames) { + Map segmentZKMetadataPathMap = new HashMap<>(); + for (String segmentName: segmentNames) { + String segmentZKMetadataPath = ZKMetadataProvider.constructPropertyStorePathForSegment(tableNameWithType, + segmentName); + segmentZKMetadataPathMap.put(segmentName, segmentZKMetadataPath); + } + // Assign instances for the segment and add it into IdealState + try { + TableConfig tableConfig = getTableConfig(tableNameWithType); + Preconditions.checkState(tableConfig != null, "Failed to find table config for table: " + tableNameWithType); + + Map instancePartitionsMap = + fetchOrComputeInstancePartitions(tableNameWithType, tableConfig); + + // Initialize tier information only in case direct tier assignment is configured + if (_enableTieredSegmentAssignment && CollectionUtils.isNotEmpty(tableConfig.getTierConfigsList())) { + List sortedTiers = TierConfigUtils.getSortedTiersForStorageType(tableConfig.getTierConfigsList(), + TierFactory.PINOT_SERVER_STORAGE_TYPE, _helixZkManager); + for (String segmentName: segmentNames) { + // Update segment tier to support direct assignment for multiple data directories + updateSegmentTargetTier(tableNameWithType, segmentName, sortedTiers); + InstancePartitions tierInstancePartitions = TierConfigUtils.getTieredInstancePartitionsForSegment( + tableNameWithType, segmentName, sortedTiers, _helixZkManager); + if (tierInstancePartitions != null && TableNameBuilder.isOfflineTableResource(tableNameWithType)) { + // Override instance partitions for offline table + LOGGER.info("Overriding with tiered instance partitions: {} for segment: {} of table: {}", + tierInstancePartitions, segmentName, tableNameWithType); + instancePartitionsMap = Collections.singletonMap(InstancePartitionsType.OFFLINE, tierInstancePartitions); + } + } + } + + SegmentAssignment segmentAssignment = + SegmentAssignmentFactory.getSegmentAssignment(_helixZkManager, tableConfig, _controllerMetrics); + synchronized (getTableUpdaterLock(tableNameWithType)) { + Map finalInstancePartitionsMap = instancePartitionsMap; + HelixHelper.updateIdealState(_helixZkManager, tableNameWithType, idealState -> { + assert idealState != null; + for (String segmentName: segmentNames) { + Map> currentAssignment = idealState.getRecord().getMapFields(); + if (currentAssignment.containsKey(segmentName)) { + LOGGER.warn("Segment: {} already exists in the IdealState for table: {}, do not update", segmentName, + tableNameWithType); + } else { + List assignedInstances = + segmentAssignment.assignSegment(segmentName, currentAssignment, finalInstancePartitionsMap); + LOGGER.info("Assigning segment: {} to instances: {} for table: {}", segmentName, assignedInstances, + tableNameWithType); + currentAssignment.put(segmentName, SegmentAssignmentUtils.getInstanceStateMap(assignedInstances, + SegmentStateModel.ONLINE)); + } + } + return idealState; + }); + LOGGER.info("Added segments: {} to IdealState for table: {}", segmentNames, tableNameWithType); + } + } catch (Exception e) { + LOGGER.error( + "Caught exception while adding segments: {} to IdealState for table: {}, deleting segments ZK metadata", + segmentNames, tableNameWithType, e); + for (Map.Entry segmentZKMetadataPathEntry: segmentZKMetadataPathMap.entrySet()) { + String segmentName = segmentZKMetadataPathEntry.getKey(); + String segmentZKMetadataPath = segmentZKMetadataPathEntry.getValue(); + if (_propertyStore.remove(segmentZKMetadataPath, AccessOption.PERSISTENT)) { + LOGGER.info("Deleted segment ZK metadata for segment: {} of table: {}", segmentName, tableNameWithType); + } else { + LOGGER.error("Failed to deleted segment ZK metadata for segment: {} of table: {}", segmentName, + tableNameWithType); + } + } + throw e; + } + } + private Map fetchOrComputeInstancePartitions(String tableNameWithType, TableConfig tableConfig) { if (TableNameBuilder.isOfflineTableResource(tableNameWithType)) { diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java index 600a501274e..77b98c275e4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/api/resources/PinotSegmentUploadDownloadRestletResourceTest.java @@ -18,17 +18,31 @@ */ package org.apache.pinot.controller.api.resources; +import java.io.BufferedReader; +import java.io.ByteArrayInputStream; import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.util.Collections; import java.util.HashMap; +import java.util.List; import java.util.Map; +import javax.ws.rs.core.HttpHeaders; +import javax.ws.rs.core.MultivaluedMap; import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.controller.api.exception.ControllerApplicationException; import org.apache.pinot.spi.crypt.NoOpPinotCrypter; import org.apache.pinot.spi.crypt.PinotCrypterFactory; import org.apache.pinot.spi.env.PinotConfiguration; +import org.glassfish.jersey.media.multipart.FormDataBodyPart; +import org.testng.Assert; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; import static org.testng.Assert.assertEquals; import static org.testng.Assert.assertNull; @@ -125,4 +139,105 @@ public void testEncryptSegmentIfNeededNoEncryption() { assertNull(encryptionInfo.getLeft()); assertEquals(_decryptedFile, encryptionInfo.getRight()); } + + @Test + public void testCreateSegmentFileFromBodyPart() throws IOException { + // Arrange + FormDataBodyPart mockBodyPart = mock(FormDataBodyPart.class); + File destFile = new File("testSegmentFile.txt"); + String testContent = "This is a test content"; + + // Mock input stream to return the test content + InputStream mockInputStream = new ByteArrayInputStream(testContent.getBytes()); + when(mockBodyPart.getValueAs(InputStream.class)).thenReturn(mockInputStream); + + // Act + PinotSegmentUploadDownloadRestletResource.createSegmentFileFromBodyPart(mockBodyPart, destFile); + + // Assert + try (BufferedReader reader = new BufferedReader(new FileReader(destFile))) { + StringBuilder fileContent = new StringBuilder(); + String line; + while ((line = reader.readLine()) != null) { + fileContent.append(line); + } + Assert.assertEquals(fileContent.toString(), testContent); + } finally { + // Clean up + destFile.delete(); + } + + // Verify that the cleanup method was called + verify(mockBodyPart).cleanup(); + } + + @Test + public void testExtractHttpHeaderWithPrefixHeaderFound() { + // Arrange + HttpHeaders mockHeaders = mock(HttpHeaders.class); + String name = "X-Source-Download-URI"; + + when(mockHeaders.getHeaderString(name)).thenReturn("http://example.com"); + + // Act + String result = _resource.extractHttpHeaderWithOptionalPrefixAsFallback(mockHeaders, name, null); + + // Assert + Assert.assertEquals(result, "http://example.com"); + } + + @Test + public void testExtractHttpHeaderWithPrefixHeaderNotFound() { + // Arrange + HttpHeaders mockHeaders = mock(HttpHeaders.class); + String name = "X-Source-Download-URI"; + String prefix = "X-Prefix-"; + + when(mockHeaders.getHeaderString(name)).thenReturn(null); + + // Create a mock MultivaluedMap + MultivaluedMap mockHeaderMap = mock(MultivaluedMap.class); + when(mockHeaders.getRequestHeaders()).thenReturn(mockHeaderMap); + + // Set up headers in the map + Map> headerMap = new HashMap<>(); + headerMap.put("X-Prefix-Example", Collections.singletonList("http://example.com")); + when(mockHeaderMap.entrySet()).thenReturn(headerMap.entrySet()); + + // Set up the mock to return the headers + for (Map.Entry> entry : headerMap.entrySet()) { + when(mockHeaderMap.get(entry.getKey())).thenReturn(entry.getValue()); + } + + // Act + String result = _resource.extractHttpHeaderWithOptionalPrefixAsFallback(mockHeaders, name, prefix); + + // Assert + Assert.assertEquals(result, "http://example.com"); + } + + @Test + public void testExtractHttpHeaderWithPrefixHeaderAndPrefixNotFound() { + // Arrange + HttpHeaders mockHeaders = mock(HttpHeaders.class); + String name = "X-Source-Download-URI"; + String prefix = "X-Prefix-"; + + // Mock extractHttpHeader method to return null + when(mockHeaders.getHeaderString(name)).thenReturn(null); + + // Create a mock MultivaluedMap + MultivaluedMap mockHeaderMap = mock(MultivaluedMap.class); + when(mockHeaders.getRequestHeaders()).thenReturn(mockHeaderMap); + + // Set up empty headers in the map + Map> headerMap = new HashMap<>(); + when(mockHeaderMap.entrySet()).thenReturn(headerMap.entrySet()); + + // Act + String result = _resource.extractHttpHeaderWithOptionalPrefixAsFallback(mockHeaders, name, prefix); + + // Assert + Assert.assertNull(result); + } } diff --git a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java index 35fdae84489..8adfeddc575 100644 --- a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java +++ b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/SegmentUploadIntegrationTest.java @@ -27,6 +27,7 @@ import java.util.Map; import javax.annotation.Nullable; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.RandomStringUtils; import org.apache.pinot.controller.helix.ControllerTest; import org.apache.pinot.plugin.ingestion.batch.common.BaseSegmentPushJobRunner; import org.apache.pinot.plugin.ingestion.batch.standalone.SegmentMetadataPushJobRunner; @@ -59,6 +60,7 @@ * todo: add test for URI push */ public class SegmentUploadIntegrationTest extends BaseClusterIntegrationTest { + private static String _tableNameSuffix; @Override protected Map getStreamConfigs() { @@ -93,6 +95,7 @@ protected List getBloomFilterColumns() { @BeforeMethod public void setUpTest() throws IOException { + _tableNameSuffix = RandomStringUtils.randomAlphabetic(12); TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir); } @@ -136,15 +139,15 @@ public void testUploadAndQuery() jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); TableSpec tableSpec = new TableSpec(); - tableSpec.setTableName(DEFAULT_TABLE_NAME); - tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME)); + tableSpec.setTableName(getTableName()); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); jobSpec.setTableSpec(tableSpec); PinotClusterSpec clusterSpec = new PinotClusterSpec(); clusterSpec.setControllerURI(getControllerBaseApiUrl()); jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); File dataDir = new File(_controllerConfig.getDataDir()); - File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME); + File dataDirSegments = new File(dataDir, getTableName()); // Not present in dataDir, only present in sourceDir Assert.assertFalse(dataDirSegments.exists()); @@ -204,6 +207,77 @@ public void testUploadAndQuery() testCountStar(numDocs); } + @Test + public void testUploadMultipleSegmentsInBatchModeAndQuery() + throws Exception { + // Create and upload the schema and table config + Schema schema = createSchema(); + addSchema(schema); + TableConfig offlineTableConfig = createOfflineTableConfig(); + waitForEVToDisappear(offlineTableConfig.getTableName()); + addTableConfig(offlineTableConfig); + + List avroFiles = getAllAvroFiles(); + + // Create the list of segments + for (int segNum = 0; segNum < 12; segNum++) { + ClusterIntegrationTestUtils.buildSegmentFromAvro(avroFiles.get(segNum), offlineTableConfig, schema, + "_seg" + segNum, _segmentDir, _tarDir); + } + + SegmentMetadataPushJobRunner runner = new SegmentMetadataPushJobRunner(); + SegmentGenerationJobSpec jobSpec = new SegmentGenerationJobSpec(); + PushJobSpec pushJobSpec = new PushJobSpec(); + pushJobSpec.setCopyToDeepStoreForMetadataPush(true); + // enable batch mode + pushJobSpec.setBatchMode(true); + jobSpec.setPushJobSpec(pushJobSpec); + PinotFSSpec fsSpec = new PinotFSSpec(); + fsSpec.setScheme("file"); + fsSpec.setClassName("org.apache.pinot.spi.filesystem.LocalPinotFS"); + jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); + jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); + TableSpec tableSpec = new TableSpec(); + tableSpec.setTableName(getTableName() + "_OFFLINE"); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); + jobSpec.setTableSpec(tableSpec); + PinotClusterSpec clusterSpec = new PinotClusterSpec(); + clusterSpec.setControllerURI(getControllerBaseApiUrl()); + jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); + + File dataDir = new File(_controllerConfig.getDataDir()); + File dataDirSegments = new File(dataDir, getTableName()); + + // Not present in dataDir, only present in sourceDir + Assert.assertFalse(dataDirSegments.exists()); + Assert.assertEquals(_tarDir.listFiles().length, 12); + + runner.init(jobSpec); + runner.run(); + + // Segment should be seen in dataDir + Assert.assertTrue(dataDirSegments.exists()); + Assert.assertEquals(dataDirSegments.listFiles().length, 12); + Assert.assertEquals(_tarDir.listFiles().length, 12); + + // test segment loaded + JsonNode segmentsList = getSegmentsList(); + Assert.assertEquals(segmentsList.size(), 12); + long numDocs = 0; + for (JsonNode segmentName: segmentsList) { + numDocs += getNumDocs(segmentName.asText()); + } + testCountStar(numDocs); + + // Clear segment and tar dir + for (File segment : _segmentDir.listFiles()) { + FileUtils.deleteQuietly(segment); + } + for (File tar : _tarDir.listFiles()) { + FileUtils.deleteQuietly(tar); + } + } + /** * Runs both SegmentMetadataPushJobRunner and SegmentTarPushJobRunner while enabling consistent data push. * Checks that segments are properly loaded and segment lineage entry were also in expected states. @@ -237,15 +311,15 @@ public void testUploadAndQueryWithConsistentPush() jobSpec.setPinotFSSpecs(Lists.newArrayList(fsSpec)); jobSpec.setOutputDirURI(_tarDir.getAbsolutePath()); TableSpec tableSpec = new TableSpec(); - tableSpec.setTableName(DEFAULT_TABLE_NAME); - tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(DEFAULT_TABLE_NAME)); + tableSpec.setTableName(getTableName()); + tableSpec.setTableConfigURI(_controllerRequestURLBuilder.forUpdateTableConfig(getTableName())); jobSpec.setTableSpec(tableSpec); PinotClusterSpec clusterSpec = new PinotClusterSpec(); clusterSpec.setControllerURI(getControllerBaseApiUrl()); jobSpec.setPinotClusterSpecs(new PinotClusterSpec[]{clusterSpec}); File dataDir = new File(_controllerConfig.getDataDir()); - File dataDirSegments = new File(dataDir, DEFAULT_TABLE_NAME); + File dataDirSegments = new File(dataDir, getTableName()); Assert.assertEquals(_tarDir.listFiles().length, 1); @@ -268,7 +342,7 @@ public void testUploadAndQueryWithConsistentPush() // Fetch segment lineage entry after running segment metadata push with consistent push enabled. String segmentLineageResponse = ControllerTest.sendGetRequest( ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()) - .forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())); + .forListAllSegmentLineages(getTableName(), TableType.OFFLINE.toString())); // Segment lineage should be in completed state. Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\"")); // SegmentsFrom should be empty as we started with a blank table. @@ -317,7 +391,7 @@ public void testUploadAndQueryWithConsistentPush() // Fetch segment lineage entry after running segment tar push with consistent push enabled. segmentLineageResponse = ControllerTest.sendGetRequest( ControllerRequestURLBuilder.baseUrl(getControllerBaseApiUrl()) - .forListAllSegmentLineages(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString())); + .forListAllSegmentLineages(getTableName(), TableType.OFFLINE.toString())); // Segment lineage should be in completed state. Assert.assertTrue(segmentLineageResponse.contains("\"state\":\"COMPLETED\"")); // SegmentsFrom should contain the previous segment @@ -337,14 +411,14 @@ protected TableConfig createOfflineTableConfigWithConsistentPush() { private long getNumDocs(String segmentName) throws IOException { return JsonUtils.stringToJsonNode( - sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(DEFAULT_TABLE_NAME, segmentName))) + sendGetRequest(_controllerRequestURLBuilder.forSegmentMetadata(getTableName(), segmentName))) .get("segment.total.docs").asLong(); } private JsonNode getSegmentsList() throws IOException { return JsonUtils.stringToJsonNode(sendGetRequest( - _controllerRequestURLBuilder.forSegmentListAPI(DEFAULT_TABLE_NAME, TableType.OFFLINE.toString()))) + _controllerRequestURLBuilder.forSegmentListAPI(getTableName(), TableType.OFFLINE.toString()))) .get(0).get("OFFLINE"); } @@ -362,6 +436,11 @@ public Boolean apply(@Nullable Void aVoid) { }, 100L, 300_000, "Failed to load " + countStarResult + " documents", true); } + @Override + public String getTableName() { + return DEFAULT_TABLE_NAME + _tableNameSuffix; + } + @AfterMethod public void tearDownTest() throws IOException { diff --git a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java index 8b0963578e2..8ddd4434f83 100644 --- a/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java +++ b/pinot-plugins/pinot-minion-tasks/pinot-minion-builtin-tasks/src/main/java/org/apache/pinot/plugin/minion/tasks/BaseMultipleSegmentsConversionExecutor.java @@ -26,6 +26,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -60,6 +61,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.TableSpec; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.builder.TableNameBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -179,14 +181,14 @@ public List executeTask(PinotTaskConfig pinotTaskConfig _pinotTaskConfig = pinotTaskConfig; _eventObserver = MinionEventObservers.getInstance().getMinionEventObserver(pinotTaskConfig.getTaskId()); String taskType = pinotTaskConfig.getTaskType(); - Map configs = pinotTaskConfig.getConfigs(); - String tableNameWithType = configs.get(MinionConstants.TABLE_NAME_KEY); - String inputSegmentNames = configs.get(MinionConstants.SEGMENT_NAME_KEY); + Map taskConfigs = pinotTaskConfig.getConfigs(); + String tableNameWithType = taskConfigs.get(MinionConstants.TABLE_NAME_KEY); + String inputSegmentNames = taskConfigs.get(MinionConstants.SEGMENT_NAME_KEY); String[] segmentNames = inputSegmentNames.split(MinionConstants.SEGMENT_NAME_SEPARATOR); - String uploadURL = configs.get(MinionConstants.UPLOAD_URL_KEY); - String downloadURLString = configs.get(MinionConstants.DOWNLOAD_URL_KEY); + String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); + String downloadURLString = taskConfigs.get(MinionConstants.DOWNLOAD_URL_KEY); String[] downloadURLs = downloadURLString.split(MinionConstants.URL_SEPARATOR); - AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(configs.get(MinionConstants.AUTH_TOKEN)); + AuthProvider authProvider = AuthProviderUtils.makeAuthProvider(taskConfigs.get(MinionConstants.AUTH_TOKEN)); LOGGER.info("Start executing {} on table: {}, input segments: {} with downloadURLs: {}, uploadURL: {}", taskType, tableNameWithType, inputSegmentNames, downloadURLString, uploadURL); File tempDataDir = new File(new File(MINION_CONTEXT.getDataDir(), taskType), "tmp-" + UUID.randomUUID()); @@ -239,8 +241,11 @@ public List executeTask(PinotTaskConfig pinotTaskConfig int numOutputSegments = segmentConversionResults.size(); List tarredSegmentFiles = new ArrayList<>(numOutputSegments); int count = 1; + Map segmentSizeMap = new HashMap<>(); for (SegmentConversionResult segmentConversionResult : segmentConversionResults) { File convertedSegmentDir = segmentConversionResult.getFile(); + long untarredSegmentSize = FileUtils.sizeOfDirectory(convertedSegmentDir); + segmentSizeMap.put(segmentConversionResult.getSegmentName(), untarredSegmentSize); reportSegmentUploadMetrics(convertedSegmentDir, tableNameWithType, taskType); // Tar the converted segment @@ -274,6 +279,8 @@ public List executeTask(PinotTaskConfig pinotTaskConfig SegmentUploadContext segmentUploadContext = new SegmentUploadContext(pinotTaskConfig, segmentConversionResults); preUploadSegments(segmentUploadContext); + Map segmentUriToTarPathMap = new HashMap<>(); + PushJobSpec pushJobSpec = getPushJobSpec(taskConfigs); // Upload the tarred segments for (int i = 0; i < numOutputSegments; i++) { @@ -282,51 +289,60 @@ public List executeTask(PinotTaskConfig pinotTaskConfig String resultSegmentName = segmentConversionResult.getSegmentName(); _eventObserver.notifyProgress(_pinotTaskConfig, String.format("Uploading segment: %s (%d out of %d)", resultSegmentName, (i + 1), numOutputSegments)); - - // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata - SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = - getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult); - Header segmentZKMetadataCustomMapModifierHeader = - new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER, - segmentZKMetadataCustomMapModifier.toJsonString()); - - String pushMode = - configs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name()); + String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, + BatchConfigProperties.SegmentPushType.TAR.name()); URI outputSegmentTarURI; if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) != BatchConfigProperties.SegmentPushType.TAR) { - outputSegmentTarURI = moveSegmentToOutputPinotFS(configs, convertedTarredSegmentFile); + outputSegmentTarURI = moveSegmentToOutputPinotFS(taskConfigs, convertedTarredSegmentFile); LOGGER.info("Moved generated segment from [{}] to location: [{}]", convertedTarredSegmentFile, outputSegmentTarURI); } else { outputSegmentTarURI = convertedTarredSegmentFile.toURI(); } - List

httpHeaders = new ArrayList<>(); - httpHeaders.add(segmentZKMetadataCustomMapModifierHeader); - httpHeaders.addAll(AuthProviderUtils.toRequestHeaders(authProvider)); - + // Set segment ZK metadata custom map modifier into HTTP header to modify the segment ZK metadata + List
httpHeaders = getSegmentPushCommonHeaders(pinotTaskConfig, authProvider, segmentConversionResults); // Set parameters for upload request - NameValuePair enableParallelPushProtectionParameter = - new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, "true"); - NameValuePair tableNameParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, - TableNameBuilder.extractRawTableName(tableNameWithType)); - NameValuePair tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, - TableNameBuilder.getTableTypeFromTableName(tableNameWithType).toString()); + List parameters = getSegmentPushCommonParams(tableNameWithType); + // RealtimeToOfflineSegmentsTask pushed segments to the corresponding offline table // TODO: This is not clean to put the override here, but let's think about it harder to see what is the proper // way to override it. if (MinionConstants.RealtimeToOfflineSegmentsTask.TASK_TYPE.equals(taskType)) { - tableTypeParameter = new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, - TableType.OFFLINE.toString()); + Iterator paramItr = parameters.iterator(); + while (paramItr.hasNext()) { + NameValuePair nameValuePair = paramItr.next(); + if (FileUploadDownloadClient.QueryParameters.TABLE_TYPE.equals(nameValuePair.getName())) { + paramItr.remove(); + parameters.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, + TableType.OFFLINE.toString())); + break; + } + } + } + + if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) + == BatchConfigProperties.SegmentPushType.METADATA) { + updateSegmentUriToTarPathMap(taskConfigs, outputSegmentTarURI, segmentConversionResult, + segmentUriToTarPathMap, pushJobSpec); + } else { + pushSegment(taskConfigs, outputSegmentTarURI, httpHeaders, parameters, segmentConversionResult); + if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { + LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); + } } - List parameters = Arrays.asList(enableParallelPushProtectionParameter, tableNameParameter, - tableTypeParameter); + } - pushSegment(tableNameParameter.getValue(), configs, outputSegmentTarURI, httpHeaders, parameters, - segmentConversionResult); - if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { - LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); + if (!segmentUriToTarPathMap.isEmpty()) { + // For metadata push, push all segments in batch mode + pushJobSpec.setBatchMode(true); + pushSegments(tableNameWithType, taskConfigs, pinotTaskConfig, segmentUriToTarPathMap, segmentSizeMap, + pushJobSpec, authProvider, segmentConversionResults); + for (File convertedTarredSegmentFile: tarredSegmentFiles) { + if (!FileUtils.deleteQuietly(convertedTarredSegmentFile)) { + LOGGER.warn("Failed to delete tarred converted segment: {}", convertedTarredSegmentFile.getAbsolutePath()); + } } } @@ -335,9 +351,8 @@ public List executeTask(PinotTaskConfig pinotTaskConfig String outputSegmentNames = segmentConversionResults.stream().map(SegmentConversionResult::getSegmentName) .collect(Collectors.joining(",")); postProcess(pinotTaskConfig); - LOGGER - .info("Done executing {} on table: {}, input segments: {}, output segments: {}", taskType, tableNameWithType, - inputSegmentNames, outputSegmentNames); + LOGGER.info("Done executing {} on table: {}, input segments: {}, output segments: {}", taskType, + tableNameWithType, inputSegmentNames, outputSegmentNames); return segmentConversionResults; } finally { @@ -345,50 +360,112 @@ public List executeTask(PinotTaskConfig pinotTaskConfig } } - private void pushSegment(String tableName, Map taskConfigs, URI outputSegmentTarURI, - List
headers, List parameters, SegmentConversionResult segmentConversionResult) - throws Exception { - String pushMode = - taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, BatchConfigProperties.SegmentPushType.TAR.name()); - LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + private void updateSegmentUriToTarPathMap(Map taskConfigs, URI outputSegmentTarURI, + SegmentConversionResult segmentConversionResult, Map segmentUriToTarPathMap, + PushJobSpec pushJobSpec) { + String segmentName = segmentConversionResult.getSegmentName(); + if (!taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) { + throw new RuntimeException(String.format("Output dir URI missing for metadata push while processing segment: %s", + segmentName)); + } + URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + Map localSegmentUriToTarPathMap = + SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec, + new String[]{outputSegmentTarURI.toString()}); + if (!localSegmentUriToTarPathMap.isEmpty()) { + segmentUriToTarPathMap.putAll(localSegmentUriToTarPathMap); + } + } + private PushJobSpec getPushJobSpec(Map taskConfigs) { PushJobSpec pushJobSpec = new PushJobSpec(); pushJobSpec.setPushAttempts(DEFUALT_PUSH_ATTEMPTS); pushJobSpec.setPushParallelism(DEFAULT_PUSH_PARALLELISM); pushJobSpec.setPushRetryIntervalMillis(DEFAULT_PUSH_RETRY_INTERVAL_MILLIS); pushJobSpec.setSegmentUriPrefix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_PREFIX)); pushJobSpec.setSegmentUriSuffix(taskConfigs.get(BatchConfigProperties.PUSH_SEGMENT_URI_SUFFIX)); + return pushJobSpec; + } - SegmentGenerationJobSpec spec = generatePushJobSpec(tableName, taskConfigs, pushJobSpec); - - switch (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase())) { - case TAR: - File tarFile = new File(outputSegmentTarURI); - String segmentName = segmentConversionResult.getSegmentName(); - String tableNameWithType = segmentConversionResult.getTableNameWithType(); - String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); - SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName, - uploadURL, tarFile); - break; - case METADATA: - if (taskConfigs.containsKey(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)) { - URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); - try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { - Map segmentUriToTarPathMap = - SegmentPushUtils.getSegmentUriToTarPathMap(outputSegmentDirURI, pushJobSpec, - new String[]{outputSegmentTarURI.toString()}); - SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters); - } - } else { - throw new RuntimeException("Output dir URI missing for metadata push"); - } - break; - default: - throw new UnsupportedOperationException("Unrecognized push mode - " + pushMode); + private List
getSegmentPushCommonHeaders(PinotTaskConfig pinotTaskConfig, AuthProvider authProvider, + List segmentConversionResults) { + SegmentConversionResult segmentConversionResult; + if (segmentConversionResults.size() == 1) { + segmentConversionResult = segmentConversionResults.get(0); + } else { + // Setting to null as the base method expects a single object. This is ok for now, since the + // segmentConversionResult is not made use of while generating the customMap. + segmentConversionResult = null; + } + SegmentZKMetadataCustomMapModifier segmentZKMetadataCustomMapModifier = + getSegmentZKMetadataCustomMapModifier(pinotTaskConfig, segmentConversionResult); + Header segmentZKMetadataCustomMapModifierHeader = + new BasicHeader(FileUploadDownloadClient.CustomHeaders.SEGMENT_ZK_METADATA_CUSTOM_MAP_MODIFIER, + segmentZKMetadataCustomMapModifier.toJsonString()); + + List
headers = new ArrayList<>(); + headers.add(segmentZKMetadataCustomMapModifierHeader); + headers.addAll(AuthProviderUtils.toRequestHeaders(authProvider)); + return headers; + } + + private List getSegmentPushCommonParams(String tableNameWithType) { + List params = new ArrayList<>(); + params.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.ENABLE_PARALLEL_PUSH_PROTECTION, + "true")); + params.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_NAME, + TableNameBuilder.extractRawTableName(tableNameWithType))); + TableType tableType = TableNameBuilder.getTableTypeFromTableName(tableNameWithType); + if (tableType != null) { + params.add(new BasicNameValuePair(FileUploadDownloadClient.QueryParameters.TABLE_TYPE, tableType.toString())); + } else { + throw new RuntimeException(String.format("Failed to determine the tableType from name: %s", tableNameWithType)); + } + return params; + } + + private void pushSegments(String tableNameWithType, Map taskConfigs, PinotTaskConfig pinotTaskConfig, + Map segmentUriToTarPathMap, Map segmentSizeMap, PushJobSpec pushJobSpec, + AuthProvider authProvider, List segmentConversionResults) + throws Exception { + String tableName = TableNameBuilder.extractRawTableName(tableNameWithType); + SegmentGenerationJobSpec spec = generateSegmentGenerationJobSpec(tableName, taskConfigs, pushJobSpec); + + List
headers = getSegmentPushCommonHeaders(pinotTaskConfig, authProvider, segmentConversionResults); + for (Map.Entry segmentSizeEntry: segmentSizeMap.entrySet()) { + String segmentNameKey = CommonConstants.Controller.SEGMENT_SIZE_HTTP_HEADER_PREFIX + segmentSizeEntry.getKey(); + Header segmentSizeHeader = new BasicHeader(segmentNameKey, segmentSizeEntry.getValue()); + headers.add(segmentSizeHeader); + } + List parameters = getSegmentPushCommonParams(tableNameWithType); + + URI outputSegmentDirURI = URI.create(taskConfigs.get(BatchConfigProperties.OUTPUT_SEGMENT_DIR_URI)); + try (PinotFS outputFileFS = MinionTaskUtils.getOutputPinotFS(taskConfigs, outputSegmentDirURI)) { + SegmentPushUtils.sendSegmentUriAndMetadata(spec, outputFileFS, segmentUriToTarPathMap, headers, parameters); + } + } + + private void pushSegment(Map taskConfigs, URI outputSegmentTarURI, + List
headers, List parameters, SegmentConversionResult segmentConversionResult) + throws Exception { + String pushMode = taskConfigs.getOrDefault(BatchConfigProperties.PUSH_MODE, + BatchConfigProperties.SegmentPushType.TAR.name()); + LOGGER.info("Trying to push Pinot segment with push mode {} from {}", pushMode, outputSegmentTarURI); + + if (BatchConfigProperties.SegmentPushType.valueOf(pushMode.toUpperCase()) + == BatchConfigProperties.SegmentPushType.TAR) { + File tarFile = new File(outputSegmentTarURI); + String segmentName = segmentConversionResult.getSegmentName(); + String tableNameWithType = segmentConversionResult.getTableNameWithType(); + String uploadURL = taskConfigs.get(MinionConstants.UPLOAD_URL_KEY); + SegmentConversionUtils.uploadSegment(taskConfigs, headers, parameters, tableNameWithType, segmentName, uploadURL, + tarFile); + } else { + throw new UnsupportedOperationException("Unrecognized push mode: " + pushMode); } } - private SegmentGenerationJobSpec generatePushJobSpec(String tableName, Map taskConfigs, + private SegmentGenerationJobSpec generateSegmentGenerationJobSpec(String tableName, Map taskConfigs, PushJobSpec pushJobSpec) { TableSpec tableSpec = new TableSpec(); diff --git a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java index 4a5dd219482..bdbb57f0d2c 100644 --- a/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java +++ b/pinot-segment-local/src/main/java/org/apache/pinot/segment/local/utils/SegmentPushUtils.java @@ -56,6 +56,7 @@ import org.apache.pinot.spi.ingestion.batch.spec.PinotClusterSpec; import org.apache.pinot.spi.ingestion.batch.spec.PushJobSpec; import org.apache.pinot.spi.ingestion.batch.spec.SegmentGenerationJobSpec; +import org.apache.pinot.spi.utils.CommonConstants; import org.apache.pinot.spi.utils.retry.AttemptsExceededException; import org.apache.pinot.spi.utils.retry.RetriableOperationException; import org.apache.pinot.spi.utils.retry.RetryPolicies; @@ -277,6 +278,8 @@ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino Map segmentUriToTarPathMap, List
headers, List parameters) throws Exception { String tableName = spec.getTableSpec().getTableName(); + Map segmentMetadataFileMap = new HashMap<>(); + Map segmentUriPathMap = new HashMap<>(); LOGGER.info("Start pushing segment metadata: {} to locations: {} for table {}", segmentUriToTarPathMap, Arrays.toString(spec.getPinotClusterSpecs()), tableName); for (String segmentUriPath : segmentUriToTarPathMap.keySet()) { @@ -303,62 +306,86 @@ public static void sendSegmentUriAndMetadata(SegmentGenerationJobSpec spec, Pino } else { segmentMetadataFile = generateSegmentMetadataFile(fileSystem, URI.create(tarFilePath)); } - try { - for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { - URI controllerURI; - try { - controllerURI = new URI(pinotClusterSpec.getControllerURI()); - } catch (URISyntaxException e) { - throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'"); - } - LOGGER.info("Pushing segment: {} to location: {} for table {}", segmentName, controllerURI, tableName); - int attempts = 1; - if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) { - attempts = spec.getPushJobSpec().getPushAttempts(); - } - long retryWaitMs = 1000L; - if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) { - retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis(); - } - RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { - List
reqHttpHeaders = new ArrayList<>(headers); - try { - reqHttpHeaders.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.DOWNLOAD_URI, segmentUriPath)); - reqHttpHeaders.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, - FileUploadDownloadClient.FileUploadType.METADATA.toString())); - if (spec.getPushJobSpec() != null) { - reqHttpHeaders.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE, - String.valueOf(spec.getPushJobSpec().getCopyToDeepStoreForMetadataPush()))); - } + segmentMetadataFileMap.put(segmentName, segmentMetadataFile); + segmentUriPathMap.put(segmentName, segmentUriPath); + } - SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadata( - FileUploadDownloadClient.getUploadSegmentURI(controllerURI), segmentName, - segmentMetadataFile, reqHttpHeaders, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); - LOGGER.info("Response for pushing table {} segment {} to location {} - {}: {}", tableName, segmentName, - controllerURI, response.getStatusCode(), response.getResponse()); - return true; - } catch (HttpErrorStatusException e) { - int statusCode = e.getStatusCode(); - if (statusCode >= 500) { - // Temporary exception - LOGGER.warn("Caught temporary exception while pushing table: {} segment: {} to {}, will retry", - tableName, segmentName, controllerURI, e); - return false; - } else { - // Permanent exception - LOGGER.error("Caught permanent exception while pushing table: {} segment: {} to {}, won't retry", - tableName, segmentName, controllerURI, e); - throw e; - } - } - }); + // perform metadata push in batch mode for every cluster + try { + for (PinotClusterSpec pinotClusterSpec : spec.getPinotClusterSpecs()) { + URI controllerURI; + try { + controllerURI = new URI(pinotClusterSpec.getControllerURI()); + } catch (URISyntaxException e) { + throw new RuntimeException("Got invalid controller uri - '" + pinotClusterSpec.getControllerURI() + "'"); + } + LOGGER.info("Pushing segments: {} to Pinot cluster: {} for table {}", + segmentMetadataFileMap.keySet(), controllerURI, tableName); + int attempts = 1; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushAttempts() > 0) { + attempts = spec.getPushJobSpec().getPushAttempts(); } - } finally { - FileUtils.deleteQuietly(segmentMetadataFile); + long retryWaitMs = 1000L; + if (spec.getPushJobSpec() != null && spec.getPushJobSpec().getPushRetryIntervalMillis() > 0) { + retryWaitMs = spec.getPushJobSpec().getPushRetryIntervalMillis(); + } + RetryPolicies.exponentialBackoffRetryPolicy(attempts, retryWaitMs, 5).attempt(() -> { + List
reqHttpHeaders = new ArrayList<>(headers); + try { + addHeaders(segmentUriPathMap, spec, reqHttpHeaders); + URI segmentUploadURI = getSegmentUploadURI(spec.getPushJobSpec(), controllerURI); + SimpleHttpResponse response = FILE_UPLOAD_DOWNLOAD_CLIENT.uploadSegmentMetadataFiles(segmentUploadURI, + segmentMetadataFileMap, reqHttpHeaders, parameters, HttpClient.DEFAULT_SOCKET_TIMEOUT_MS); + LOGGER.info("Response for pushing table {} segments {} to location {} - {}: {}", tableName, + segmentMetadataFileMap.keySet(), controllerURI, response.getStatusCode(), response.getResponse()); + return true; + } catch (HttpErrorStatusException e) { + int statusCode = e.getStatusCode(); + if (statusCode >= 500) { + // Temporary exception + LOGGER.warn("Caught temporary exception while pushing table: {} segments: {} to {}, will retry", + tableName, segmentMetadataFileMap.keySet(), controllerURI, e); + return false; + } else { + // Permanent exception + LOGGER.error("Caught permanent exception while pushing table: {} segments: {} to {}, won't retry", + tableName, segmentMetadataFileMap.keySet(), controllerURI, e); + throw e; + } + } + }); + } + } finally { + for (Map.Entry metadataFileEntry: segmentMetadataFileMap.entrySet()) { + FileUtils.deleteQuietly(metadataFileEntry.getValue()); } } } + private static URI getSegmentUploadURI(PushJobSpec jobSpec, URI controllerURI) + throws URISyntaxException { + if (jobSpec.isBatchMode()) { + return FileUploadDownloadClient.getUploadSegmentBatchURI(controllerURI); + } else { + return FileUploadDownloadClient.getUploadSegmentURI(controllerURI); + } + } + + private static void addHeaders(Map segmentUriPathMap, SegmentGenerationJobSpec jobSpec, + List
headers) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.UPLOAD_TYPE, + FileUploadDownloadClient.FileUploadType.METADATA.toString())); + if (jobSpec.getPushJobSpec() != null) { + headers.add(new BasicHeader(FileUploadDownloadClient.CustomHeaders.COPY_SEGMENT_TO_DEEP_STORE, + String.valueOf(jobSpec.getPushJobSpec().getCopyToDeepStoreForMetadataPush()))); + } + // For each header that is created, the segment name is the key and the segment download URI is the value + for (Map.Entry entry: segmentUriPathMap.entrySet()) { + headers.add(new BasicHeader(CommonConstants.Controller.SEGMENT_URI_HTTP_HEADER_PREFIX + entry.getKey(), + entry.getValue())); + } + } + public static Map getSegmentUriToTarPathMap(URI outputDirURI, PushJobSpec pushSpec, String[] files) { Map segmentUriToTarPathMap = new HashMap<>(); diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java index 31d1ce8448c..2b9237e5fd3 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/ingestion/batch/spec/PushJobSpec.java @@ -46,6 +46,13 @@ public class PushJobSpec implements Serializable { * If true, and if segment was not already in the deep store, move it to deep store. */ private boolean _copyToDeepStoreForMetadataPush; + + /** + * Applicable for METADATA push type. + * If true, multiple segment metadata files are uploaded to the controller in a single call. + */ + private boolean _batchMode; + /** * Used in SegmentUriPushJobRunner, which is used to composite the segment uri to send to pinot controller. * The URI sends to controller is in the format ${segmentUriPrefix}${segmentPath}${segmentUriSuffix} @@ -148,4 +155,12 @@ public boolean getCopyToDeepStoreForMetadataPush() { public void setCopyToDeepStoreForMetadataPush(boolean copyToDeepStoreForMetadataPush) { _copyToDeepStoreForMetadataPush = copyToDeepStoreForMetadataPush; } + + public boolean isBatchMode() { + return _batchMode; + } + + public void setBatchMode(boolean batchMode) { + _batchMode = batchMode; + } } diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index 3753c0e64bb..c80f3fc9a0d 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -787,6 +787,8 @@ public static class Controller { public static final String HOST_HTTP_HEADER = "Pinot-Controller-Host"; public static final String VERSION_HTTP_HEADER = "Pinot-Controller-Version"; public static final String SEGMENT_NAME_HTTP_HEADER = "Pinot-Segment-Name"; + public static final String SEGMENT_URI_HTTP_HEADER_PREFIX = "x-segment-uri-"; + public static final String SEGMENT_SIZE_HTTP_HEADER_PREFIX = "x-segment-size-"; public static final String TABLE_NAME_HTTP_HEADER = "Pinot-Table-Name"; public static final String PINOT_QUERY_ERROR_CODE_HEADER = "X-Pinot-Error-Code"; public static final String INGESTION_DESCRIPTOR = "Pinot-Ingestion-Descriptor";