Skip to content

Commit

Permalink
[Feature] Enable the capability to specify zstd and lz4 segment compr…
Browse files Browse the repository at this point in the history
…ession via config (apache#14008)

* Enable the capability to specify zstd and lz4 segment compression codec via config

* Reduce the scope of the change to server-only

* Add a blank line to trigger unit test again

* Addressed code review comments.
  • Loading branch information
jackluo923 authored Oct 21, 2024
1 parent f278e2e commit 1f8fd63
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ public class TarCompressionUtils {
private TarCompressionUtils() {
}

/**
* This generic compressed tar file extension does not bind to a particular compressor. Decompression determines the
* appropriate compressor at run-time based on the file's magic number irrespective of the file extension.
* Compression uses the default compressor automatically if this generic extension is used.
*/
public static final String TAR_COMPRESSED_FILE_EXTENSION = ".tar.compressed";
public static final String TAR_GZ_FILE_EXTENSION = ".tar.gz";
public static final String TAR_LZ4_FILE_EXTENSION = ".tar.lz4";
public static final String TAR_ZST_FILE_EXTENSION = ".tar.zst";
Expand All @@ -77,6 +83,13 @@ private TarCompressionUtils() {
CompressorStreamFactory.LZ4_FRAMED, TAR_ZST_FILE_EXTENSION, CompressorStreamFactory.ZSTANDARD);
private static final CompressorStreamFactory COMPRESSOR_STREAM_FACTORY = CompressorStreamFactory.getSingleton();
private static final char ENTRY_NAME_SEPARATOR = '/';
private static String _defaultCompressorName = CompressorStreamFactory.GZIP;

public static void setDefaultCompressor(String compressorName) {
if (COMPRESSOR_NAME_BY_FILE_EXTENSIONS.containsKey(compressorName)) {
_defaultCompressorName = compressorName;
}
}

/**
* Creates a compressed tar file from the input file/directory to the output file. The output file must have
Expand All @@ -93,15 +106,29 @@ public static void createCompressedTarFile(File inputFile, File outputFile)
*/
public static void createCompressedTarFile(File[] inputFiles, File outputFile)
throws IOException {
String compressorName = null;
for (String supportedCompressorExtension : COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
if (outputFile.getName().endsWith(supportedCompressorExtension)) {
compressorName = COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
break;
if (outputFile.getName().endsWith(TAR_COMPRESSED_FILE_EXTENSION)) {
createCompressedTarFile(inputFiles, outputFile, _defaultCompressorName);
} else {
String compressorName = null;
for (String supportedCompressorExtension : COMPRESSOR_NAME_BY_FILE_EXTENSIONS.keySet()) {
if (outputFile.getName().endsWith(supportedCompressorExtension)) {
compressorName = COMPRESSOR_NAME_BY_FILE_EXTENSIONS.get(supportedCompressorExtension);
createCompressedTarFile(inputFiles, outputFile, compressorName);
return;
}
}
Preconditions.checkState(null != compressorName,
"Output file: %s does not have a supported compressed tar file extension", outputFile);
}
Preconditions.checkState(null != compressorName,
"Output file: %s does not have a supported compressed tar file extension", outputFile);
}

public static void createCompressedTarFile(File inputFile, File outputFile, String compressorName)
throws IOException {
createCompressedTarFile(new File[]{inputFile}, outputFile, compressorName);
}

public static void createCompressedTarFile(File[] inputFiles, File outputFile, String compressorName)
throws IOException {
try (OutputStream fileOut = Files.newOutputStream(outputFile.toPath());
BufferedOutputStream bufferedOut = new BufferedOutputStream(fileOut);
OutputStream compressorOut = COMPRESSOR_STREAM_FACTORY.createCompressorOutputStream(compressorName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,7 @@ private File downloadSegmentFromDeepStore(SegmentZKMetadata zkMetadata)
failedAttempts.get());
}
} else {
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(downloadUrl, segmentTarFile, zkMetadata.getCrypterName());
_logger.info("Downloaded tarred segment: {} from: {} to: {}, file length: {}", segmentName, downloadUrl,
segmentTarFile, segmentTarFile.length());
Expand All @@ -820,7 +820,7 @@ private File downloadSegmentFromPeers(SegmentZKMetadata zkMetadata)
_tableNameWithType);
_logger.info("Downloading segment: {} from peers", segmentName);
File tempRootDir = getTmpSegmentDataDir("tmp-" + segmentName + "-" + UUID.randomUUID());
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File segmentTarFile = new File(tempRootDir, segmentName + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
try {
SegmentFetcherFactory.fetchAndDecryptSegmentToLocal(segmentName, _peerDownloadScheme, () -> {
List<URI> peerServerURIs =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1016,7 +1016,8 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
_serverMetrics.addValueToGlobalGauge(ServerGauge.LLC_SIMULTANEOUS_SEGMENT_BUILDS, 1L);

final long lockAcquireTimeMillis = now();
// Build a segment from in-memory rows.If buildTgz is true, then build the tar.gz file as well
// Build a segment from in-memory rows.
// If build compressed archive is true, then build the tar.compressed file as well
// TODO Use an auto-closeable object to delete temp resources.
File tempSegmentFolder = new File(_resourceTmpDir, "tmp-" + _segmentNameStr + "-" + now());

Expand Down Expand Up @@ -1069,7 +1070,7 @@ SegmentBuildDescriptor buildSegmentInternal(boolean forCommit) {
TimeUnit.MILLISECONDS.toSeconds(waitTimeMillis));

if (forCommit) {
File segmentTarFile = new File(dataDir, _segmentNameStr + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File segmentTarFile = new File(dataDir, _segmentNameStr + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
try {
TarCompressionUtils.createCompressedTarFile(indexDir, segmentTarFile);
} catch (IOException e) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -293,7 +293,8 @@ public void testReloadSegmentForceDownload()
throws Exception {
File indexDir = createSegment(SegmentVersion.v3, 5);
SegmentZKMetadata zkMetadata =
makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION), false);
makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION),
false);

// Same CRC but force to download.
BaseTableDataManager tableDataManager = createTableManager();
Expand Down Expand Up @@ -567,7 +568,7 @@ public void testDownloadAndDecrypt()
File tempDir = new File(TEMP_DIR, "testDownloadAndDecrypt");
String fileName = "tmp.txt";
FileUtils.write(new File(tempDir, fileName), "this is from somewhere remote");
String tarFileName = SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION;
String tarFileName = SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION;
File tempTarFile = new File(TEMP_DIR, tarFileName);
TarCompressionUtils.createCompressedTarFile(tempDir, tempTarFile);

Expand Down Expand Up @@ -607,7 +608,7 @@ public void testUntarAndMoveSegment()
File tempRootDir = tableDataManager.getTmpSegmentDataDir("test-untar-move");

// All input and intermediate files are put in the tempRootDir.
File tempTar = new File(tempRootDir, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION);
File tempTar = new File(tempRootDir, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION);
File tempInputDir = new File(tempRootDir, "input");
FileUtils.write(new File(tempInputDir, "tmp.txt"), "this is in segment dir");
TarCompressionUtils.createCompressedTarFile(tempInputDir, tempTar);
Expand Down Expand Up @@ -687,7 +688,8 @@ private static File createSegment(SegmentVersion segmentVersion, int numRows)
private static SegmentZKMetadata createRawSegment(SegmentVersion segmentVersion, int numRows)
throws Exception {
File indexDir = createSegment(segmentVersion, numRows);
return makeRawSegment(indexDir, new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_GZ_FILE_EXTENSION), true);
return makeRawSegment(indexDir,
new File(TEMP_DIR, SEGMENT_NAME + TarCompressionUtils.TAR_COMPRESSED_FILE_EXTENSION), true);
}

private static SegmentZKMetadata makeRawSegment(File indexDir, File rawSegmentFile, boolean deleteIndexDir)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
import org.apache.pinot.common.utils.ServiceStartableUtils;
import org.apache.pinot.common.utils.ServiceStatus;
import org.apache.pinot.common.utils.ServiceStatus.Status;
import org.apache.pinot.common.utils.TarCompressionUtils;
import org.apache.pinot.common.utils.config.TagNameUtils;
import org.apache.pinot.common.utils.fetcher.SegmentFetcherFactory;
import org.apache.pinot.common.utils.helix.HelixHelper;
Expand Down Expand Up @@ -161,6 +162,12 @@ public void init(PinotConfiguration serverConf)
_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_INSECURE_MODE,
CommonConstants.DEFAULT_PINOT_INSECURE_MODE)));

String tarCompressionCodecName =
_serverConf.getProperty(CommonConstants.CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME);
if (null != tarCompressionCodecName) {
TarCompressionUtils.setDefaultCompressor(tarCompressionCodecName);
}

setupHelixSystemProperties();
_listenerConfigs = ListenerConfigUtil.buildServerAdminConfigs(_serverConf);
_hostname = _serverConf.getProperty(Helix.KEY_OF_SERVER_NETTY_HOST,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,8 @@ private CommonConstants() {
public static final String CONFIG_OF_EXECUTORS_FIXED_NUM_THREADS = "pinot.executors.fixed.default.numThreads";
public static final String DEFAULT_EXECUTORS_FIXED_NUM_THREADS = "-1";

public static final String CONFIG_OF_PINOT_TAR_COMPRESSION_CODEC_NAME = "pinot.tar.compression.codec.name";

/**
* The state of the consumer for a given segment
*/
Expand Down

0 comments on commit 1f8fd63

Please sign in to comment.