Skip to content

Commit

Permalink
split out CursorMakerFactory interface, adjustments
Browse files Browse the repository at this point in the history
  • Loading branch information
clintropolis committed Jul 26, 2024
1 parent d22e1cf commit 6c32775
Show file tree
Hide file tree
Showing 23 changed files with 183 additions and 127 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,9 @@ public void setUp()
@Test
public void testRead()
{
try (final CursorMaker maker = makeCursor(); final VectorCursor cursor = maker.makeVectorCursor()) {
try (final CursorMaker maker = makeCursor();
final VectorCursor cursor = maker.makeVectorCursor()
) {
final Supplier<Object[]> qualitySupplier = ColumnProcessors.makeVectorProcessor(
"quality",
ToObjectVectorColumnProcessorFactory.INSTANCE,
Expand Down Expand Up @@ -184,7 +186,9 @@ private CursorMaker makeCursor()

private List<Object> readColumn(final String column, final int limit)
{
try (final CursorMaker maker = makeCursor(); final VectorCursor cursor = maker.makeVectorCursor()) {
try (final CursorMaker maker = makeCursor();
final VectorCursor cursor = maker.makeVectorCursor()
) {
final Supplier<Object[]> supplier = ColumnProcessors.makeVectorProcessor(
column,
ToObjectVectorColumnProcessorFactory.INSTANCE,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,9 @@ private void addFrame(final Frame frame)
final StorageAdapter adapter = new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY);
try (final CursorMaker maker = adapter.asCursorMaker(CursorBuildSpec.FULL_SCAN)) {
final Cursor cursor = maker.makeCursor();
if (cursor == null) {
return;
}

final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,10 @@ private void exportFrame(final Frame frame)
final StorageAdapter adapter = new FrameStorageAdapter(frame, frameReader, Intervals.ETERNITY);
try (final CursorMaker maker = adapter.asCursorMaker(CursorBuildSpec.FULL_SCAN)) {
final Cursor cursor = maker.makeCursor();
if (cursor == null) {
exportWriter.writeRowEnd();
return;
}
final ColumnSelectorFactory columnSelectorFactory = cursor.getColumnSelectorFactory();

//noinspection rawtypes
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,11 @@
import org.apache.druid.segment.BaseLongColumnValueSelector;
import org.apache.druid.segment.BaseObjectColumnValueSelector;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorMaker;
import org.apache.druid.segment.DimensionSelector;
import org.apache.druid.segment.QueryableIndex;
import org.apache.druid.segment.QueryableIndexStorageAdapter;
import org.apache.druid.segment.VirtualColumns;
import org.apache.druid.segment.column.ColumnHolder;
import org.apache.druid.segment.data.IndexedInts;
import org.apache.druid.segment.filter.Filters;
Expand Down Expand Up @@ -214,108 +215,99 @@ public SegmentReader(
@Override
public Sequence<InputRow> apply(WindowedStorageAdapter adapter)
{
return Sequences.concat(
Sequences.map(
adapter.getAdapter().makeCursors(
Filters.toFilter(dimFilter),
adapter.getInterval(),
VirtualColumns.EMPTY,
Granularities.ALL,
false,
null
), new Function<Cursor, Sequence<InputRow>>()
final CursorBuildSpec buildSpec = CursorBuildSpec.builder()
.setFilter(Filters.toFilter(dimFilter))
.setInterval(adapter.getInterval())
.setGranularity(Granularities.ALL)
.build();
final CursorMaker maker = adapter.getAdapter().asCursorMaker(buildSpec);
final Cursor cursor = maker.makeCursor();
if (cursor == null) {
return Sequences.empty();
}
final BaseLongColumnValueSelector timestampColumnSelector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);

final Map<String, DimensionSelector> dimSelectors = new HashMap<>();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec(dim, dim));
// dimSelector is null if the dimension is not present
if (dimSelector != null) {
dimSelectors.put(dim, dimSelector);
}
}

final Map<String, BaseObjectColumnValueSelector> metSelectors = new HashMap<>();
for (String metric : metrics) {
final BaseObjectColumnValueSelector metricSelector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(metric);
metSelectors.put(metric, metricSelector);
}

return Sequences.simple(
new Iterable<InputRow>()
{
@Override
public Iterator<InputRow> iterator()
{
return new Iterator<InputRow>()
{
@Nullable
@Override
public Sequence<InputRow> apply(final Cursor cursor)
public boolean hasNext()
{
final BaseLongColumnValueSelector timestampColumnSelector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(ColumnHolder.TIME_COLUMN_NAME);

final Map<String, DimensionSelector> dimSelectors = new HashMap<>();
for (String dim : dims) {
final DimensionSelector dimSelector = cursor
.getColumnSelectorFactory()
.makeDimensionSelector(new DefaultDimensionSpec(dim, dim));
// dimSelector is null if the dimension is not present
if (dimSelector != null) {
dimSelectors.put(dim, dimSelector);
return !cursor.isDone();
}

@Override
public InputRow next()
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getLong();
theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp));

for (Map.Entry<String, DimensionSelector> dimSelector :
dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();

int valsSize = vals.size();
if (valsSize == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else if (valsSize > 1) {
List<String> dimVals = new ArrayList<>(valsSize);
for (int i = 0; i < valsSize; ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}

final Map<String, BaseObjectColumnValueSelector> metSelectors = new HashMap<>();
for (String metric : metrics) {
final BaseObjectColumnValueSelector metricSelector =
cursor.getColumnSelectorFactory().makeColumnValueSelector(metric);
metSelectors.put(metric, metricSelector);
for (Map.Entry<String, BaseObjectColumnValueSelector> metSelector :
metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final BaseObjectColumnValueSelector selector = metSelector.getValue();
Object value = selector.getObject();
if (value != null) {
theEvent.put(metric, value);
}
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}

return Sequences.simple(
new Iterable<InputRow>()
{
@Override
public Iterator<InputRow> iterator()
{
return new Iterator<InputRow>()
{
@Override
public boolean hasNext()
{
return !cursor.isDone();
}

@Override
public InputRow next()
{
final Map<String, Object> theEvent = Maps.newLinkedHashMap();
final long timestamp = timestampColumnSelector.getLong();
theEvent.put(TimestampSpec.DEFAULT_COLUMN, DateTimes.utc(timestamp));

for (Map.Entry<String, DimensionSelector> dimSelector :
dimSelectors.entrySet()) {
final String dim = dimSelector.getKey();
final DimensionSelector selector = dimSelector.getValue();
final IndexedInts vals = selector.getRow();

int valsSize = vals.size();
if (valsSize == 1) {
final String dimVal = selector.lookupName(vals.get(0));
theEvent.put(dim, dimVal);
} else if (valsSize > 1) {
List<String> dimVals = new ArrayList<>(valsSize);
for (int i = 0; i < valsSize; ++i) {
dimVals.add(selector.lookupName(vals.get(i)));
}
theEvent.put(dim, dimVals);
}
}

for (Map.Entry<String, BaseObjectColumnValueSelector> metSelector :
metSelectors.entrySet()) {
final String metric = metSelector.getKey();
final BaseObjectColumnValueSelector selector = metSelector.getValue();
Object value = selector.getObject();
if (value != null) {
theEvent.put(metric, value);
}
}
cursor.advance();
return new MapBasedInputRow(timestamp, dims, theEvent);
}

@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
};
}
}
);
@Override
public void remove()
{
throw new UnsupportedOperationException("Remove Not Supported");
}
}
)
);
};
}
}
).withBaggage(maker);
}
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.MapInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.java.util.common.guava.Sequence;
import org.apache.druid.java.util.common.guava.Sequences;
Expand Down Expand Up @@ -132,6 +133,9 @@ protected CloseableIterator<Map<String, Object>> intermediateRowIterator() throw

final CursorMaker maker = storageAdapter.getAdapter().asCursorMaker(cursorBuildSpec);
final Cursor cursor = maker.makeCursor();
if (cursor == null) {
return CloseableIterators.wrap(Collections.emptyIterator(), maker);
}

// Retain order of columns from the original segments. Useful for preserving dimension order if we're in
// schemaless mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public int fieldCount()
/**
* Read a particular field value as an object.
*
* For performance reasons, prefer {@link org.apache.druid.frame.read.FrameReader#makeCursorFactory}
* For performance reasons, prefer {@link org.apache.druid.frame.read.FrameReader#makeCursorMakerFactory}
* for reading many rows out of a frame.
*/
public Object readField(final Memory memory, final long rowPosition, final long rowLength, final int fieldNumber)
Expand All @@ -77,7 +77,7 @@ public Object readField(final Memory memory, final long rowPosition, final long
/**
* Read an entire row as a list of objects.
*
* For performance reasons, prefer {@link org.apache.druid.frame.read.FrameReader#makeCursorFactory}
* For performance reasons, prefer {@link org.apache.druid.frame.read.FrameReader#makeCursorMakerFactory}
* for reading many rows out of a frame.
*/
public List<Object> readRow(final Memory memory, final long rowPosition, final long rowLength)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@
import org.apache.druid.frame.key.KeyColumn;
import org.apache.druid.frame.read.columnar.FrameColumnReader;
import org.apache.druid.frame.read.columnar.FrameColumnReaders;
import org.apache.druid.frame.segment.row.FrameCursorFactory;
import org.apache.druid.frame.segment.row.FrameCursorMakerFactory;
import org.apache.druid.frame.write.FrameWriterUtils;
import org.apache.druid.java.util.common.IAE;
import org.apache.druid.java.util.common.ISE;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorMakerFactory;
import org.apache.druid.segment.column.ColumnCapabilities;
import org.apache.druid.segment.column.ColumnType;
import org.apache.druid.segment.column.RowSignature;
Expand Down Expand Up @@ -136,15 +136,15 @@ public ColumnCapabilities columnCapabilities(final Frame frame, final String col
}

/**
* Create a {@link CursorFactory} for the given frame.
* Create a {@link CursorMakerFactory} for the given frame.
*/
public CursorFactory makeCursorFactory(final Frame frame)
public CursorMakerFactory makeCursorMakerFactory(final Frame frame)
{
switch (frame.type()) {
case COLUMNAR:
return new org.apache.druid.frame.segment.columnar.FrameCursorFactory(frame, signature, columnReaders);
return new org.apache.druid.frame.segment.columnar.FrameCursorMakerFactory(frame, signature, columnReaders);
case ROW_BASED:
return new FrameCursorFactory(frame, this, fieldReaders);
return new FrameCursorMakerFactory(frame, this, fieldReaders);
default:
throw new ISE("Unrecognized frame type [%s]", frame.type());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,15 @@

package org.apache.druid.frame.segment;

import org.apache.druid.frame.segment.columnar.FrameCursorMakerFactory;
import org.apache.druid.query.BaseQuery;
import org.apache.druid.segment.ColumnSelectorFactory;
import org.apache.druid.segment.Cursor;
import org.apache.druid.segment.SimpleSettableOffset;

/**
* An implementation of {@link Cursor} used by {@link org.apache.druid.frame.segment.row.FrameCursorFactory}
* and {@link org.apache.druid.frame.segment.columnar.FrameCursorFactory}.
* An implementation of {@link Cursor} used by {@link org.apache.druid.frame.segment.row.FrameCursorMakerFactory}
* and {@link FrameCursorMakerFactory}.
*
* Adds the methods {@link #getCurrentRow()} and {@link #setCurrentRow(int)} so the cursor can be moved to
* particular rows.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@
import org.apache.druid.frame.Frame;
import org.apache.druid.frame.read.FrameReader;
import org.apache.druid.segment.CursorBuildSpec;
import org.apache.druid.segment.CursorFactory;
import org.apache.druid.segment.CursorMaker;
import org.apache.druid.segment.CursorMakerFactory;
import org.apache.druid.segment.DimensionDictionarySelector;
import org.apache.druid.segment.Metadata;
import org.apache.druid.segment.StorageAdapter;
Expand All @@ -47,14 +47,14 @@ public class FrameStorageAdapter implements StorageAdapter
private final Frame frame;
private final FrameReader frameReader;
private final Interval interval;
private final CursorFactory cursorFactory;
private final CursorMakerFactory cursorFactory;

public FrameStorageAdapter(Frame frame, FrameReader frameReader, Interval interval)
{
this.frame = frame;
this.frameReader = frameReader;
this.interval = interval;
this.cursorFactory = frameReader.makeCursorFactory(frame);
this.cursorFactory = frameReader.makeCursorMakerFactory(frame);
}

@Override
Expand Down
Loading

0 comments on commit 6c32775

Please sign in to comment.