From 1f8fd632c09d6835f42d71675277694234938934 Mon Sep 17 00:00:00 2001 From: Jack Luo Date: Tue, 22 Oct 2024 04:13:32 +0800 Subject: [PATCH] [Feature] Enable the capability to specify zstd and lz4 segment compression via config (#14008) * Enable the capability to specify zstd and lz4 segment compression codec via config * Reduce the scope of the change to server-only * Add a blank line to trigger unit test again * Addressed code review comments. --- .../common/utils/TarCompressionUtils.java | 41 +++++++++++++++---- .../data/manager/BaseTableDataManager.java | 4 +- .../realtime/RealtimeSegmentDataManager.java | 5 ++- .../manager/BaseTableDataManagerTest.java | 10 +++-- .../starter/helix/BaseServerStarter.java | 7 ++++ .../pinot/spi/utils/CommonConstants.java | 2 + 6 files changed, 54 insertions(+), 15 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java index 089c0fae3654..3a6f3170f079 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/TarCompressionUtils.java @@ -69,6 +69,12 @@ public class TarCompressionUtils { private TarCompressionUtils() { } + /** + * This generic compressed tar file extension does not bind to a particular compressor. Decompression determines the + * appropriate compressor at run-time based on the file's magic number irrespective of the file extension. + * Compression uses the default compressor automatically if this generic extension is used. + */ + public static final String TAR_COMPRESSED_FILE_EXTENSION = ".tar.compressed"; public static final String TAR_GZ_FILE_EXTENSION = ".tar.gz"; public static final String TAR_LZ4_FILE_EXTENSION = ".tar.lz4"; public static final String TAR_ZST_FILE_EXTENSION = ".tar.zst"; @@ -77,6 +83,13 @@ private TarCompressionUtils() { CompressorStreamFactory.LZ4_FRAMED, TAR_ZST_FILE_EXTENSION, CompressorStreamFactory.ZSTANDARD); private static final CompressorStreamFactory COMPRESSOR_STREAM_FACTORY = CompressorStreamFactory.getSingleton(); private static final char ENTRY_NAME_SEPARATOR = '/'; + private static String _defaultCompressorName = CompressorStreamFactory.GZIP; + + public static void setDefaultCompressor(String compressorName) { + if (COMPRESSOR_NAME_BY_FILE_EXTENSIONS.containsKey(compressorName)) { + _defaultCompressorName = compressorName; + } + } /** * Creates a compressed tar file from the input file/directory to the output file. The output file must have @@ -93,15 +106,29 @@ public static void createCompressedTarFile(File inputFile, File outputFile) */ public static void createCompressedTarFile(File[] inputFiles, File outputFile) throws IOException { - String compressorName = null; - for (String supportedCompressorExtension : COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) { - if (outputFile.getName().endsWith(supportedCompressorExtension)) { - compressorName = COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension); - break; + if (outputFile.getName().endsWith(TAR_COMPRESSED_FILE_EXTENSION)) { + createCompressedTarFile(inputFiles, outputFile, _defaultCompressorName); + } else { + String compressorName = null; + for (String supportedCompressorExtension : COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) { + if (outputFile.getName().endsWith(supportedCompressorExtension)) { + compressorName = COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension); + createCompressedTarFile(inputFiles, outputFile, compressorName); + return; + } } + Preconditions.checkState(null != compressorName, + "Output file: %s does not have a supported compressed tar file extension", outputFile); } - Preconditions.checkState(null != compressorName, - "Output file: %s does not have a supported compressed tar file extension", outputFile); + } + + public static void createCompressedTarFile(File inputFile, File outputFile, String compressorName) + throws IOException { + createCompressedTarFile(new File[]{inputFile}, outputFile, compressorName); + } + + public static void createCompressedTarFile(File[] inputFiles, File outputFile, String compressorName) + throws IOException { try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath()); BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut); OutputStream compressorOut = COMPRESSOR_STREAM_FACTORY.createCompressorOutputStream(compressorName, diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java index 56d2cb35d61f..b1d3647a545a 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/BaseTableDataManager.java @@ -793,7 +793,7 @@ private File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata) failedAttempts.get()); } } else { - File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION); SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, segmentTarFile, zkMetadata.getCrypterName()); _logger.info("Downloaded tarred segment: {} from: {} to: {}, file length: {}", segmentName, downloadUrl, segmentTarFile, segmentTarFile.length()); @@ -820,7 +820,7 @@ private File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata) _tableNameWithType); _logger.info("Downloading segment: {} from peers", segmentName); File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID()); - File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION); try { SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, _peerDownloadScheme, () -> { List peerServerURIs = diff --git a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java index bed8f2a3103b..939e43d39358 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/data/manager/realtime/RealtimeSegmentDataManager.java @@ -1016,7 +1016,8 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { _serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, 1L); final long lockAcquireTimeMillis = now(); - // Build a segment from in-memory rows.If buildTgz is true, then build the tar.gz file as well + // Build a segment from in-memory rows. + // If build compressed archive is true, then build the tar.compressed file as well // TODO Use an auto-closeable object to delete temp resources. File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + _segmentNameStr + "-" + now()); @@ -1069,7 +1070,7 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) { TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis)); if (forCommit) { - File segmentTarFile = new File(dataDir, _segmentNameStr + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + File segmentTarFile = new File(dataDir, _segmentNameStr + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION); try { TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile); } catch (IOException e) { diff --git a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java index e40ef4913107..7d351c486f66 100644 --- a/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java +++ b/pinot-core/src/test/java/org/apache/pinot/core/data/manager/BaseTableDataManagerTest.java @@ -293,7 +293,8 @@ public void testReloadSegmentForceDownload() throws Exception { File indexDir = createSegment(SegmentVersion.v3, 5); SegmentZKMetadata zkMetadata = - makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION), false); + makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), + false); // Same CRC but force to download. BaseTableDataManager tableDataManager = createTableManager(); @@ -567,7 +568,7 @@ public void testDownloadAndDecrypt() File tempDir = new File(TEMP_DIR, "testDownloadAndDecrypt"); String fileName = "tmp.txt"; FileUtils.write(new File(tempDir, fileName), "this is from somewhere remote"); - String tarFileName = SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION; + String tarFileName = SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION; File tempTarFile = new File(TEMP_DIR, tarFileName); TarCompressionUtils.createCompressedTarFile(tempDir, tempTarFile); @@ -607,7 +608,7 @@ public void testUntarAndMoveSegment() File tempRootDir = tableDataManager.getTmpSegmentDataDir("test-untar-move"); // All input and intermediate files are put in the tempRootDir. - File tempTar = new File(tempRootDir, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION); + File tempTar = new File(tempRootDir, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION); File tempInputDir = new File(tempRootDir, "input"); FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment dir"); TarCompressionUtils.createCompressedTarFile(tempInputDir, tempTar); @@ -687,7 +688,8 @@ private static File createSegment(SegmentVersion segmentVersion, int numRows) private static SegmentZKMetadata createRawSegment(SegmentVersion segmentVersion, int numRows) throws Exception { File indexDir = createSegment(segmentVersion, numRows); - return makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION), true); + return makeRawSegment(indexDir, + new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true); } private static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFile, boolean deleteIndexDir) diff --git a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java index dc4100eebc8b..98f700c277f8 100644 --- a/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java +++ b/pinot-server/src/main/java/org/apache/pinot/server/starter/helix/BaseServerStarter.java @@ -59,6 +59,7 @@ import org.apache.pinot.common.utils.ServiceStartableUtils; import org.apache.pinot.common.utils.ServiceStatus; import org.apache.pinot.common.utils.ServiceStatus.Status; +import org.apache.pinot.common.utils.TarCompressionUtils; import org.apache.pinot.common.utils.config.TagNameUtils; import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory; import org.apache.pinot.common.utils.helix.HelixHelper; @@ -161,6 +162,12 @@ public void init(PinotConfiguration serverConf) _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE, CommonConstants.DEFAULT_PINOT_INSECURE_MODE))); + String tarCompressionCodecName = + _serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME); + if (null != tarCompressionCodecName) { + TarCompressionUtils.setDefaultCompressor(tarCompressionCodecName); + } + setupHelixSystemProperties(); _listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs(_serverConf); _hostname = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST, diff --git a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java index ff81f6bc4ea8..f62efb2062d8 100644 --- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java +++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java @@ -65,6 +65,8 @@ private CommonConstants() { public static final String CONFIG_OF_EXECUTORS_FIXED_NUM_THREADS = "pinot.executors.fixed.default.numThreads"; public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1"; + public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME = "pinot.tar.compression.codec.name"; + /** * The state of the consumer for a given segment */