From 9046af27a1917f75ce3c0dfa4a140a959112fb8e Mon Sep 17 00:00:00 2001 From: Kartik Khare Date: Fri, 18 Oct 2024 16:55:55 +0530 Subject: [PATCH] Return interfaces in Segment processor framework classes instead of Implementations (#14252) * change abstraction for writer. * fix segment fetcher for localfs. --------- Co-authored-by: Aishik --- .../pinot/common/utils/fetcher/PinotFSSegmentFetcher.java | 6 +++++- .../pinot/common/utils/fetcher/SegmentFetcherFactory.java | 3 +++ .../processing/genericrow/AdaptiveSizeBasedWriter.java | 4 ++-- .../pinot/core/segment/processing/mapper/SegmentMapper.java | 4 ++-- 4 files changed, 12 insertions(+), 5 deletions(-) diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java index b0bb4706a30..c11a0239c10 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/PinotFSSegmentFetcher.java @@ -28,6 +28,10 @@ public class PinotFSSegmentFetcher extends BaseSegmentFetcher { @Override protected void fetchSegmentToLocalWithoutRetry(URI uri, File dest) throws Exception { - PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest); + if (uri.getScheme() == null) { + PinotFSFactory.create(PinotFSFactory.LOCAL_PINOT_FS_SCHEME).copyToLocalFile(uri, dest); + } else { + PinotFSFactory.create(uri.getScheme()).copyToLocalFile(uri, dest); + } } } diff --git a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java index 543db8c4031..235c63bf0c6 100644 --- a/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java +++ b/pinot-common/src/main/java/org/apache/pinot/common/utils/fetcher/SegmentFetcherFactory.java @@ -106,6 +106,9 @@ public static SegmentFetcher getSegmentFetcher(String protocol) { return segmentFetcher; } else { LOGGER.info("Segment fetcher is not configured for protocol: {}, using default", protocol); + if (protocol == null) { + return PINOT_FS_SEGMENT_FETCHER; + } switch (protocol) { case CommonConstants.HTTP_PROTOCOL: case CommonConstants.HTTPS_PROTOCOL: diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java index 541bd14e269..d7db6509f3f 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/genericrow/AdaptiveSizeBasedWriter.java @@ -22,7 +22,7 @@ import org.apache.pinot.spi.data.readers.GenericRow; -public class AdaptiveSizeBasedWriter implements AdaptiveConstraintsWriter { +public class AdaptiveSizeBasedWriter implements AdaptiveConstraintsWriter, GenericRow> { private final long _bytesLimit; private long _numBytesWritten; @@ -45,7 +45,7 @@ public boolean canWrite() { } @Override - public void write(GenericRowFileWriter writer, GenericRow row) throws IOException { + public void write(FileWriter writer, GenericRow row) throws IOException { _numBytesWritten += writer.writeData(row); } } diff --git a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java index 49ae88b19f1..cca0a0ef585 100644 --- a/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java +++ b/pinot-core/src/main/java/org/apache/pinot/core/segment/processing/mapper/SegmentMapper.java @@ -31,8 +31,8 @@ import org.apache.commons.lang3.tuple.Pair; import org.apache.pinot.core.segment.processing.framework.SegmentProcessorConfig; import org.apache.pinot.core.segment.processing.genericrow.AdaptiveSizeBasedWriter; +import org.apache.pinot.core.segment.processing.genericrow.FileWriter; import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileManager; -import org.apache.pinot.core.segment.processing.genericrow.GenericRowFileWriter; import org.apache.pinot.core.segment.processing.partitioner.Partitioner; import org.apache.pinot.core.segment.processing.partitioner.PartitionerConfig; import org.apache.pinot.core.segment.processing.partitioner.PartitionerFactory; @@ -245,7 +245,7 @@ private void writeRecord(GenericRow row) } // Get the file writer. - GenericRowFileWriter fileWriter = fileManager.getFileWriter(); + FileWriter fileWriter = fileManager.getFileWriter(); // Write the row. _adaptiveSizeBasedWriter.write(fileWriter, row);