Skip to content

Commit

Permalink
CLP as a compressionCodec (apache#12504)
Browse files Browse the repository at this point in the history
---------

Co-authored-by: Saurabh Dubey <[email protected]>
Co-authored-by: Saurabh Dubey <[email protected]>
Co-authored-by: Seunghyun Lee <[email protected]>
  • Loading branch information
4 people authored Mar 21, 2024
1 parent ddc3d0b commit 02d1c12
Show file tree
Hide file tree
Showing 29 changed files with 1,388 additions and 23 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,150 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pinot.integration.tests;

import java.io.File;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import javax.annotation.Nullable;
import org.apache.pinot.spi.config.table.FieldConfig;
import org.apache.pinot.spi.config.table.TableConfig;
import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
import org.apache.pinot.spi.config.table.ingestion.TransformConfig;
import org.apache.pinot.spi.data.Schema;
import org.apache.pinot.util.TestUtils;
import org.testng.Assert;
import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;


public class CLPEncodingRealtimeIntegrationTest extends BaseClusterIntegrationTestSet {
private List<File> _avroFiles;

@BeforeClass
public void setUp()
throws Exception {
TestUtils.ensureDirectoriesExistAndEmpty(_tempDir, _segmentDir, _tarDir);
_avroFiles = unpackAvroData(_tempDir);

// Start the Pinot cluster
startZk();
// Start a customized controller with more frequent realtime segment validation
startController();
startBroker();
startServers(1);

startKafka();
pushAvroIntoKafka(_avroFiles);

Schema schema = createSchema();
addSchema(schema);
TableConfig tableConfig = createRealtimeTableConfig(_avroFiles.get(0));
addTableConfig(tableConfig);

waitForAllDocsLoaded(600_000L);
}

@Nullable
@Override
protected List<String> getInvertedIndexColumns() {
return null;
}

@Nullable
@Override
protected List<String> getRangeIndexColumns() {
return null;
}

@Nullable
@Override
protected List<String> getBloomFilterColumns() {
return null;
}

@Nullable
@Override
protected String getSortedColumn() {
return null;
}

@Override
protected List<String> getNoDictionaryColumns() {
return Collections.singletonList("logLine");
}

@Test
public void testValues()
throws Exception {
Assert.assertEquals(getPinotConnection().execute(
"SELECT count(*) FROM " + getTableName() + " WHERE REGEXP_LIKE(logLine, '.*executor.*')").getResultSet(0)
.getLong(0), 53);
}

protected int getRealtimeSegmentFlushSize() {
return 30;
}

@Override
protected long getCountStarResult() {
return 100;
}

@Override
protected String getTableName() {
return "clpEncodingIT";
}

@Override
protected String getAvroTarFileName() {
return "clpEncodingITData.tar.gz";
}

@Override
protected String getSchemaFileName() {
return "clpEncodingRealtimeIntegrationTestSchema.schema";
}

@Override
protected String getTimeColumnName() {
return "timestampInEpoch";
}

@Override
protected List<FieldConfig> getFieldConfigs() {
List<FieldConfig> fieldConfigs = new ArrayList<>();
fieldConfigs.add(
new FieldConfig("logLine", FieldConfig.EncodingType.RAW, null, null, FieldConfig.CompressionCodec.CLP, null,
null, null, null));

return fieldConfigs;
}

@Override
protected IngestionConfig getIngestionConfig() {
List<TransformConfig> transforms = new ArrayList<>();
transforms.add(new TransformConfig("timestampInEpoch", "now()"));

IngestionConfig ingestionConfig = new IngestionConfig();
ingestionConfig.setTransformConfigs(transforms);

return ingestionConfig;
}
}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
{
"schemaName": "clpEncodingIT",
"dimensionFieldSpecs": [
{
"name": "logLine",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [
{
"name": "timestampInEpoch",
"dataType": "LONG",
"notNull": false,
"format": "1:MILLISECONDS:EPOCH",
"granularity": "1:MILLISECONDS"
}
]
}
4 changes: 4 additions & 0 deletions pinot-segment-local/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -151,5 +151,9 @@
<type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.yscope.clp</groupId>
<artifactId>clp-ffi</artifactId>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pinot.segment.local.io.util;

import java.util.List;
import org.apache.pinot.segment.spi.index.reader.ForwardIndexReader;
import org.apache.pinot.segment.spi.memory.PinotDataBuffer;

import static java.nio.charset.StandardCharsets.UTF_8;
Expand Down Expand Up @@ -101,6 +103,15 @@ public String getUnpaddedString(int index, int numBytesPerValue, byte[] buffer)
return new String(buffer, 0, length, UTF_8);
}

public void recordOffsetRanges(int index, long baseOffset, List<ForwardIndexReader.ByteRange> rangeList) {
int offsetPosition = _dataSectionStartOffSet + Integer.BYTES * index;
int startOffset = _dataBuffer.getInt(offsetPosition);
int endOffset = _dataBuffer.getInt(offsetPosition + Integer.BYTES);
rangeList.add(new ForwardIndexReader.ByteRange(offsetPosition + baseOffset, 2 * Integer.BYTES));
int length = endOffset - startOffset;
rangeList.add(new ForwardIndexReader.ByteRange(startOffset + baseOffset, length));
}

@Override
public String getPaddedString(int index, int numBytesPerValue, byte[] buffer) {
throw new UnsupportedOperationException();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ public class FixedBitMVForwardIndexWriter implements Closeable {
private int _nextDocId = 0;

public FixedBitMVForwardIndexWriter(File file, int numDocs, int totalNumValues, int numBitsPerValue)
throws Exception {
throws IOException {
float averageValuesPerDoc = totalNumValues / numDocs;
_docsPerChunk = (int) (Math.ceil(PREFERRED_NUM_VALUES_PER_CHUNK / averageValuesPerDoc));
_numChunks = (numDocs + _docsPerChunk - 1) / _docsPerChunk;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ public class FixedBitSVForwardIndexWriter implements Closeable {
private int _nextDocId = 0;

public FixedBitSVForwardIndexWriter(File file, int numDocs, int numBitsPerValue)
throws Exception {
throws IOException {
// Convert to long in order to avoid int overflow
long length = ((long) numDocs * numBitsPerValue + Byte.SIZE - 1) / Byte.SIZE;
// Backward-compatible: index file is always big-endian
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import com.google.common.base.Preconditions;
import java.util.Map;
import java.util.Set;
import org.apache.pinot.segment.local.realtime.impl.forward.CLPMutableForwardIndex;
import org.apache.pinot.segment.local.segment.creator.impl.stats.CLPStatsProvider;
import org.apache.pinot.segment.spi.creator.ColumnStatistics;
import org.apache.pinot.segment.spi.datasource.DataSource;
import org.apache.pinot.segment.spi.datasource.DataSourceMetadata;
Expand All @@ -30,7 +32,7 @@
import static org.apache.pinot.segment.spi.Constants.UNKNOWN_CARDINALITY;


public class MutableNoDictionaryColStatistics implements ColumnStatistics {
public class MutableNoDictionaryColStatistics implements ColumnStatistics, CLPStatsProvider {
private final DataSourceMetadata _dataSourceMetadata;
private final MutableForwardIndex _forwardIndex;

Expand Down Expand Up @@ -111,4 +113,13 @@ public Map<String, String> getPartitionFunctionConfig() {
public Set<Integer> getPartitions() {
return _dataSourceMetadata.getPartitions();
}

@Override
public CLPStats getCLPStats() {
if (_forwardIndex instanceof CLPMutableForwardIndex) {
return ((CLPMutableForwardIndex) _forwardIndex).getCLPStats();
}
throw new IllegalStateException(
"CLP stats not available for column: " + _dataSourceMetadata.getFieldSpec().getName());
}
}
Loading

0 comments on commit 02d1c12

Please sign in to comment.