Skip to content

Commit

Permalink
Minor refactors to processing
Browse files Browse the repository at this point in the history
Some refactors across druid to clean up the code and add utility functions where required.
  • Loading branch information
adarshsanjeev authored Nov 21, 2024
1 parent 17215cd commit 2726c6f
Show file tree
Hide file tree
Showing 25 changed files with 621 additions and 254 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,6 @@ public String getFormatString()
binder -> binder.bind(SegmentManager.class).toInstance(EasyMock.createMock(SegmentManager.class)),
new JoinableFactoryModule(),
new IndexingServiceTuningConfigModule(),
new MSQIndexingModule(),
Modules.override(new MSQSqlModule()).with(
binder -> {
// Our Guice configuration currently requires bindings to exist even if they aren't ever used, the
Expand Down Expand Up @@ -540,6 +539,7 @@ public String getFormatString()

objectMapper = setupObjectMapper(injector);
objectMapper.registerModules(new StorageConnectorModule().getJacksonModules());
objectMapper.registerModules(new MSQIndexingModule().getJacksonModules());
objectMapper.registerModules(sqlModule.getJacksonModules());
objectMapper.registerModules(BuiltInTypesModule.getJacksonModulesList());

Expand Down Expand Up @@ -697,7 +697,6 @@ protected Supplier<ResourceHolder<CompleteSegment>> getSupplierForSegment(
break;
default:
throw new ISE("Cannot query segment %s in test runner", segmentId);

}
Segment segment = new Segment()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,17 @@

package org.apache.druid.query.aggregation;

import com.google.common.base.Preconditions;
import it.unimi.dsi.fastutil.longs.LongArrayList;
import org.apache.druid.error.DruidException;
import org.apache.druid.segment.serde.cell.IOIterator;
import org.apache.druid.segment.serde.cell.IntSerializer;
import org.apache.druid.segment.serde.cell.StagedSerde;
import org.apache.druid.segment.writeout.WriteOutBytes;

import javax.annotation.Nullable;
import java.io.BufferedInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.nio.IntBuffer;
import java.util.NoSuchElementException;

/**
Expand All @@ -45,109 +44,181 @@ public class SerializedStorage<T>
{
private final WriteOutBytes writeOutBytes;
private final StagedSerde<T> serde;
private final IntSerializer intSerializer = new IntSerializer();
private final ByteBuffer itemOffsetsBytes;
private final IntBuffer itemSizes;

private final LongArrayList rowChunkOffsets = new LongArrayList();
private int numStored = 0;
private int maxSize = 0;

public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde)
{
this(writeOutBytes, serde, 4096);
}

public SerializedStorage(WriteOutBytes writeOutBytes, StagedSerde<T> serde, int chunkSize)
{
this.writeOutBytes = writeOutBytes;
this.serde = serde;

this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * Integer.BYTES).order(ByteOrder.nativeOrder());
this.itemSizes = itemOffsetsBytes.asIntBuffer();
}

public void store(@Nullable T value) throws IOException
{
byte[] bytes = serde.serialize(value);

writeOutBytes.write(intSerializer.serialize(bytes.length));
writeOutBytes.write(bytes);
maxSize = Math.max(maxSize, bytes.length);
itemSizes.put(bytes.length);
if (bytes.length > 0) {
writeOutBytes.write(bytes);
}

++numStored;
if (itemSizes.remaining() == 0) {
rowChunkOffsets.add(writeOutBytes.size());
writeOutBytes.write(itemOffsetsBytes);
itemOffsetsBytes.clear();
itemSizes.clear();
}
}

public int numStored()
{
return numStored;
}

/**
* Generates an iterator over everything that has been stored. Also signifies the end of storing objects.
* iterator() can be called multiple times if needed, but after iterator() is called, store() can no longer be
* called.
*
* @return an iterator
* @throws IOException on failure
*/
public IOIterator<T> iterator() throws IOException
{
return new DeserializingIOIterator<>(writeOutBytes.asInputStream(), serde);
if (itemSizes.position() != itemSizes.limit()) {
rowChunkOffsets.add(writeOutBytes.size());
itemOffsetsBytes.limit(itemSizes.position() * Integer.BYTES);
writeOutBytes.write(itemOffsetsBytes);

// Move the limit to the position so that we fail subsequent writes and indicate that we are done
itemSizes.limit(itemSizes.position());
}

return new DeserializingIOIterator<>(
writeOutBytes,
rowChunkOffsets,
numStored,
itemSizes.capacity(),
maxSize,
serde
);
}

private static class DeserializingIOIterator<T> implements IOIterator<T>
{
private static final int NEEDS_READ = -2;
private static final int EOF = -1;
private static final ByteBuffer EMPTY_BUFFER = ByteBuffer.allocate(0).asReadOnlyBuffer();

private final byte[] intBytes;
private final BufferedInputStream inputStream;
private final WriteOutBytes medium;
private final LongArrayList rowChunkOffsets;
private final int numEntries;
private ByteBuffer tmpBuf;
private final StagedSerde<T> serde;

private int nextSize;

public DeserializingIOIterator(InputStream inputStream, StagedSerde<T> serde)
private final ByteBuffer itemOffsetsBytes;
private final int[] itemSizes;

private long itemStartOffset;
private int chunkId = 0;
private int currId = 0;
private int itemIndex;

public DeserializingIOIterator(
WriteOutBytes medium,
LongArrayList rowChunkOffsets,
int numEntries,
int chunkSize,
int maxSize,
StagedSerde<T> serde
)
{
this.inputStream = new BufferedInputStream(inputStream);
this.medium = medium;
this.rowChunkOffsets = rowChunkOffsets;
this.numEntries = numEntries;
this.tmpBuf = ByteBuffer.allocate(maxSize).order(ByteOrder.nativeOrder());
this.serde = serde;
intBytes = new byte[Integer.BYTES];
nextSize = NEEDS_READ;

this.itemOffsetsBytes = ByteBuffer.allocate(chunkSize * Integer.BYTES).order(ByteOrder.nativeOrder());
this.itemSizes = new int[chunkSize];
this.itemIndex = chunkSize;
}

@Override
public boolean hasNext() throws IOException
public boolean hasNext()
{
return getNextSize() > EOF;
return currId < numEntries;
}

@Override
public T next() throws IOException
{
int currentNextSize = getNextSize();

if (currentNextSize == -1) {
throw new NoSuchElementException("end of buffer reached");
if (currId >= numEntries) {
throw new NoSuchElementException();
}

byte[] nextBytes = new byte[currentNextSize];
int bytesRead = 0;

while (bytesRead < currentNextSize) {
int result = inputStream.read(nextBytes, bytesRead, currentNextSize - bytesRead);

if (result == -1) {
throw new NoSuchElementException("unexpected end of buffer reached");
if (itemIndex >= itemSizes.length) {
if (chunkId == 0) {
itemStartOffset = 0;
} else {
if (itemStartOffset != rowChunkOffsets.getLong(chunkId - 1)) {
throw DruidException.defensive(
"Should have read up to the start of the offsets [%,d], "
+ "but for some reason the values [%,d] don't align. Possible corruption?",
rowChunkOffsets.getLong(chunkId - 1),
itemStartOffset
);
}
itemStartOffset += (((long) itemSizes.length) * Integer.BYTES);
}

bytesRead += result;
}

Preconditions.checkState(bytesRead == currentNextSize);
T value = serde.deserialize(nextBytes);

nextSize = NEEDS_READ;

return value;
}

private int getNextSize() throws IOException
{
if (nextSize == NEEDS_READ) {
int bytesRead = 0;

while (bytesRead < Integer.BYTES) {
int result = inputStream.read(intBytes, bytesRead, Integer.BYTES - bytesRead);
int numToRead = Math.min(itemSizes.length, numEntries - (chunkId * itemSizes.length));
final long readOffset = rowChunkOffsets.getLong(chunkId++);
itemOffsetsBytes.clear();
itemOffsetsBytes.limit(numToRead * Integer.BYTES);
medium.readFully(readOffset, itemOffsetsBytes);
itemOffsetsBytes.flip();
itemOffsetsBytes.asIntBuffer().get(itemSizes, 0, numToRead);

if (result == -1) {
nextSize = EOF;
return EOF;
} else {
bytesRead += result;
}
}
Preconditions.checkState(bytesRead == Integer.BYTES);
itemIndex = 0;
}

nextSize = ByteBuffer.wrap(intBytes).order(ByteOrder.nativeOrder()).getInt();
int bytesToRead = itemSizes[itemIndex];
final T retVal;
if (bytesToRead == 0) {
retVal = serde.deserialize(EMPTY_BUFFER);
} else {
tmpBuf.clear();
tmpBuf.limit(bytesToRead);
medium.readFully(itemStartOffset, tmpBuf);
tmpBuf.flip();

retVal = serde.deserialize(tmpBuf);
}

return nextSize;
itemStartOffset += bytesToRead;
++itemIndex;
++currId;

return retVal;
}

@Override
public void close() throws IOException
public void close()
{
inputStream.close();

}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ public QueryableIndex load(File inDir, ObjectMapper mapper, boolean lazy, Segmen
new StringUtf8DictionaryEncodedColumnSupplier<>(
index.getDimValueUtf8Lookup(dimension)::singleThreaded,
null,
Suppliers.ofInstance(index.getDimColumn(dimension))
Suppliers.ofInstance(index.getDimColumn(dimension)),
LEGACY_FACTORY.getBitmapFactory()
)
);
GenericIndexed<ImmutableBitmap> bitmaps = index.getBitmapIndexes().get(dimension);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
*/
public class IndexSpec
{
public static IndexSpec DEFAULT = IndexSpec.builder().build();
public static final IndexSpec DEFAULT = IndexSpec.builder().build();

public static Builder builder()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
package org.apache.druid.segment.column;

import com.google.common.collect.Lists;
import org.apache.druid.collections.bitmap.BitmapFactory;
import org.apache.druid.common.semantic.SemanticUtils;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.query.extraction.ExtractionFn;
Expand Down Expand Up @@ -73,16 +74,19 @@ public class StringUtf8DictionaryEncodedColumn implements DictionaryEncodedColum
@Nullable
private final ColumnarMultiInts multiValueColumn;
private final Indexed<ByteBuffer> utf8Dictionary;
private final BitmapFactory bitmapFactory;

public StringUtf8DictionaryEncodedColumn(
@Nullable ColumnarInts singleValueColumn,
@Nullable ColumnarMultiInts multiValueColumn,
Indexed<ByteBuffer> utf8Dictionary
Indexed<ByteBuffer> utf8Dictionary,
BitmapFactory bitmapFactory
)
{
this.column = singleValueColumn;
this.multiValueColumn = multiValueColumn;
this.utf8Dictionary = utf8Dictionary;
this.bitmapFactory = bitmapFactory;
}

@Override
Expand Down Expand Up @@ -135,6 +139,11 @@ public int getCardinality()
return utf8Dictionary.size();
}

public BitmapFactory getBitmapFactory()
{
return bitmapFactory;
}

@Override
public HistoricalDimensionSelector makeDimensionSelector(
final ReadableOffset offset,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ public class BlockLayoutColumnarDoublesSupplier implements Supplier<ColumnarDoub

// The number of doubles per buffer.
private final int sizePer;
private final CompressionStrategy strategy;

public BlockLayoutColumnarDoublesSupplier(
int totalSize,
Expand All @@ -45,7 +46,8 @@ public BlockLayoutColumnarDoublesSupplier(
CompressionStrategy strategy
)
{
baseDoubleBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.strategy = strategy;
this.baseDoubleBuffers = GenericIndexed.read(fromBuffer, DecompressingByteBufferObjectStrategy.of(byteOrder, strategy));
this.totalSize = totalSize;
this.sizePer = sizePer;
}
Expand Down Expand Up @@ -78,7 +80,8 @@ public double get(int index)
}
}

private class BlockLayoutColumnarDoubles implements ColumnarDoubles
// This needs to be a public class so that SemanticCreator is able to call it.
public class BlockLayoutColumnarDoubles implements ColumnarDoubles
{
final Indexed<ResourceHolder<ByteBuffer>> singleThreadedDoubleBuffers = baseDoubleBuffers.singleThreaded();

Expand All @@ -91,6 +94,11 @@ private class BlockLayoutColumnarDoubles implements ColumnarDoubles
@Nullable
DoubleBuffer doubleBuffer;

public CompressionStrategy getCompressionStrategy()
{
return strategy;
}

@Override
public int size()
{
Expand Down
Loading

0 comments on commit 2726c6f

Please sign in to comment.