Skip to content

Commit

Permalink
Refactor VectorAgg to use TupleTableSlot interface
Browse files Browse the repository at this point in the history
To support VectorAgg on top of Hypercore TAM, change the vector agg
processing functions to pass around tuple table slots instead of
compressed batches. This makes it possible to pass any "compatible"
"vector slot" to the vector agg functions, which is required since TAM
uses a slightly different slot implementation for arrow/vector data.

In addition, add some functions to handle reading vector data from
compatible vector slot implementations. This commit only adds the code
to read from compressed batches. Arrow slots will be supported as part
of a later change.
  • Loading branch information
erimatnor committed Jan 30, 2025
1 parent 538c443 commit 64e5ffc
Show file tree
Hide file tree
Showing 10 changed files with 116 additions and 53 deletions.
2 changes: 1 addition & 1 deletion tsl/src/nodes/vector_agg/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ vector_agg_exec(CustomScanState *node)
/*
* Finally, pass the compressed batch to the grouping policy.
*/
grouping->gp_add_batch(grouping, batch_state);
grouping->gp_add_batch(grouping, &batch_state->decompressed_scan_slot_data.base);
}

/*
Expand Down
5 changes: 3 additions & 2 deletions tsl/src/nodes/vector_agg/grouping_policy.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
*/
#pragma once

typedef struct DecompressBatchState DecompressBatchState;
#include <postgres.h>
#include <executor/tuptable.h>

typedef struct GroupingPolicy GroupingPolicy;

Expand All @@ -31,7 +32,7 @@ typedef struct GroupingPolicy
/*
* Aggregate a single compressed batch.
*/
void (*gp_add_batch)(GroupingPolicy *gp, DecompressBatchState *batch_state);
void (*gp_add_batch)(GroupingPolicy *gp, TupleTableSlot *vector_slot);

/*
* Is a partial aggregation result ready?
Expand Down
32 changes: 21 additions & 11 deletions tsl/src/nodes/vector_agg/grouping_policy_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,14 @@

#include <postgres.h>

#include <access/attnum.h>
#include <executor/tuptable.h>
#include <nodes/pg_list.h>

#include "grouping_policy.h"

#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/vector_agg/exec.h"
#include "nodes/vector_agg/vector_slot.h"

typedef struct
{
Expand Down Expand Up @@ -113,21 +114,26 @@ gp_batch_reset(GroupingPolicy *obj)
}

static void
compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batch_state,
compute_single_aggregate(GroupingPolicyBatch *policy, TupleTableSlot *vector_slot,
VectorAggDef *agg_def, void *agg_state, MemoryContext agg_extra_mctx)
{
ArrowArray *arg_arrow = NULL;
const ArrowArray *arg_arrow = NULL;
const uint64 *arg_validity_bitmap = NULL;
Datum arg_datum = 0;
bool arg_isnull = true;
uint16 total_batch_rows = 0;
const uint64 *vector_qual_result = vector_slot_get_qual_result(vector_slot, &total_batch_rows);

/*
* We have functions with one argument, and one function with no arguments
* (count(*)). Collect the arguments.
*/
if (agg_def->input_offset >= 0)
{
CompressedColumnValues *values = &batch_state->compressed_columns[agg_def->input_offset];
const AttrNumber attnum = AttrOffsetGetAttrNumber(agg_def->input_offset);
const CompressedColumnValues *values =
vector_slot_get_compressed_column_values(vector_slot, attnum);

Assert(values->decompression_type != DT_Invalid);
Assert(values->decompression_type != DT_Iterator);

Expand All @@ -147,10 +153,10 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc
/*
* Compute the unified validity bitmap.
*/
const size_t num_words = (batch_state->total_batch_rows + 63) / 64;
const size_t num_words = (total_batch_rows + 63) / 64;
const uint64 *filter = arrow_combine_validity(num_words,
policy->tmp_filter,
batch_state->vector_qual_result,
vector_qual_result,
agg_def->filter_result,
arg_validity_bitmap);

Expand All @@ -172,7 +178,7 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc
* have been skipped by the caller, but we also have to check for the
* case when no rows match the aggregate FILTER clause.
*/
const int n = arrow_num_valid(filter, batch_state->total_batch_rows);
const int n = arrow_num_valid(filter, total_batch_rows);
if (n > 0)
{
agg_def->func.agg_scalar(agg_state, arg_datum, arg_isnull, n, agg_extra_mctx);
Expand All @@ -181,15 +187,17 @@ compute_single_aggregate(GroupingPolicyBatch *policy, DecompressBatchState *batc
}

static void
gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state)
gp_batch_add_batch(GroupingPolicy *gp, TupleTableSlot *vector_slot)
{
GroupingPolicyBatch *policy = (GroupingPolicyBatch *) gp;
uint16 total_batch_rows = 0;
vector_slot_get_qual_result(vector_slot, &total_batch_rows);

/*
* Allocate the temporary filter array for computing the combined results of
* batch filter, aggregate filter and column validity.
*/
const size_t num_words = (batch_state->total_batch_rows + 63) / 64;
const size_t num_words = (total_batch_rows + 63) / 64;
if (num_words > policy->num_tmp_filter_words)
{
const size_t new_words = (num_words * 2) + 1;
Expand All @@ -210,7 +218,7 @@ gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state)
{
VectorAggDef *agg_def = &policy->agg_defs[i];
void *agg_state = policy->agg_states[i];
compute_single_aggregate(policy, batch_state, agg_def, agg_state, policy->agg_extra_mctx);
compute_single_aggregate(policy, vector_slot, agg_def, agg_state, policy->agg_extra_mctx);
}

/*
Expand All @@ -220,10 +228,12 @@ gp_batch_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state)
for (int i = 0; i < ngrp; i++)
{
GroupingColumn *col = &policy->grouping_columns[i];
const AttrNumber attnum = AttrOffsetGetAttrNumber(col->input_offset);
Assert(col->input_offset >= 0);
Assert(col->output_offset >= 0);

CompressedColumnValues *values = &batch_state->compressed_columns[col->input_offset];
const CompressedColumnValues *values =
vector_slot_get_compressed_column_values(vector_slot, attnum);
Assert(values->decompression_type == DT_Scalar);

/*
Expand Down
49 changes: 26 additions & 23 deletions tsl/src/nodes/vector_agg/grouping_policy_hash.c
Original file line number Diff line number Diff line change
Expand Up @@ -11,13 +11,15 @@

#include <postgres.h>

#include <access/attnum.h>
#include <access/tupdesc.h>
#include <executor/tuptable.h>
#include <nodes/pg_list.h>

#include "grouping_policy.h"

#include "nodes/decompress_chunk/compressed_batch.h"
#include "nodes/vector_agg/exec.h"
#include "nodes/vector_agg/vector_slot.h"

#include "grouping_policy_hash.h"

Expand Down Expand Up @@ -106,25 +108,28 @@ gp_hash_reset(GroupingPolicy *obj)
}

static void
compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState *batch_state,
int start_row, int end_row, const VectorAggDef *agg_def, void *agg_states)
compute_single_aggregate(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row,
int end_row, const VectorAggDef *agg_def, void *agg_states)
{
const ArrowArray *arg_arrow = NULL;
const uint64 *arg_validity_bitmap = NULL;
Datum arg_datum = 0;
bool arg_isnull = true;

uint16 total_batch_rows = 0;
const uint32 *offsets = policy->key_index_for_row;
MemoryContext agg_extra_mctx = policy->agg_extra_mctx;
const uint64 *vector_qual_result = vector_slot_get_qual_result(vector_slot, &total_batch_rows);

/*
* We have functions with one argument, and one function with no arguments
* (count(*)). Collect the arguments.
*/
if (agg_def->input_offset >= 0)
{
const AttrNumber attnum = AttrOffsetGetAttrNumber(agg_def->input_offset);
const CompressedColumnValues *values =
&batch_state->compressed_columns[agg_def->input_offset];
vector_slot_get_compressed_column_values(vector_slot, attnum);

Assert(values->decompression_type != DT_Invalid);
Assert(values->decompression_type != DT_Iterator);

Expand All @@ -144,11 +149,11 @@ compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState
/*
* Compute the unified validity bitmap.
*/
const size_t num_words = (batch_state->total_batch_rows + 63) / 64;
const size_t num_words = (total_batch_rows + 63) / 64;
const uint64 *filter = arrow_combine_validity(num_words,
policy->tmp_filter,
agg_def->filter_result,
batch_state->vector_qual_result,
vector_qual_result,
arg_validity_bitmap);

/*
Expand Down Expand Up @@ -199,13 +204,11 @@ compute_single_aggregate(GroupingPolicyHash *policy, const DecompressBatchState
}

static void
add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, const int start_row,
add_one_range(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, const int start_row,
const int end_row)
{
const int num_fns = policy->num_agg_defs;

Assert(start_row < end_row);
Assert(end_row <= batch_state->total_batch_rows);

/*
* Remember which aggregation states have already existed, and which we
Expand All @@ -218,7 +221,7 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con
* Match rows to aggregation states using a hash table.
*/
Assert((size_t) end_row <= policy->num_key_index_for_row);
policy->hashing.fill_offsets(policy, batch_state, start_row, end_row);
policy->hashing.fill_offsets(policy, vector_slot, start_row, end_row);

/*
* Process the aggregate function states. We are processing single aggregate
Expand Down Expand Up @@ -256,7 +259,7 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con
* Add this batch to the states of this aggregate function.
*/
compute_single_aggregate(policy,
batch_state,
vector_slot,
start_row,
end_row,
agg_def,
Expand All @@ -275,14 +278,14 @@ add_one_range(GroupingPolicyHash *policy, DecompressBatchState *batch_state, con
}

static void
gp_hash_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state)
gp_hash_add_batch(GroupingPolicy *gp, TupleTableSlot *vector_slot)
{
GroupingPolicyHash *policy = (GroupingPolicyHash *) gp;
uint16 n;
const uint64 *restrict filter = vector_slot_get_qual_result(vector_slot, &n);

Assert(!policy->returning_results);

const int n = batch_state->total_batch_rows;

/*
* Initialize the array for storing the aggregate state offsets corresponding
* to a given batch row. We don't need the offsets for the previous batch
Expand Down Expand Up @@ -317,24 +320,24 @@ gp_hash_add_batch(GroupingPolicy *gp, DecompressBatchState *batch_state)
for (int i = 0; i < policy->num_grouping_columns; i++)
{
const GroupingColumn *def = &policy->grouping_columns[i];
const CompressedColumnValues *values = &batch_state->compressed_columns[def->input_offset];
policy->current_batch_grouping_column_values[i] = *values;
}

policy->current_batch_grouping_column_values[i] =
*vector_slot_get_compressed_column_values(vector_slot,
AttrOffsetGetAttrNumber(def->input_offset));
}
/*
* Call the per-batch initialization function of the hashing strategy.
*/

policy->hashing.prepare_for_batch(policy, batch_state);
policy->hashing.prepare_for_batch(policy, vector_slot);

/*
* Add the batch rows to aggregate function states.
*/
const uint64 *restrict filter = batch_state->vector_qual_result;
add_one_range(policy, batch_state, 0, n);
add_one_range(policy, vector_slot, 0, n);

policy->stat_input_total_rows += batch_state->total_batch_rows;
policy->stat_input_valid_rows += arrow_num_valid(filter, batch_state->total_batch_rows);
policy->stat_input_total_rows += n;
policy->stat_input_valid_rows += arrow_num_valid(filter, n);
}

static bool
Expand Down
8 changes: 6 additions & 2 deletions tsl/src/nodes/vector_agg/hashing/batch_hashing_params.h
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@

#pragma once

#include "nodes/vector_agg/grouping_policy_hash.h"
#include "nodes/vector_agg/vector_slot.h"

/*
* The data required to map the rows of the given compressed batch to the unique
* indexes of grouping keys, using a hash table.
Expand All @@ -21,11 +24,12 @@ typedef struct BatchHashingParams
} BatchHashingParams;

static pg_attribute_always_inline BatchHashingParams
build_batch_hashing_params(GroupingPolicyHash *policy, DecompressBatchState *batch_state)
build_batch_hashing_params(GroupingPolicyHash *policy, TupleTableSlot *vector_slot)
{
uint16 nrows;
BatchHashingParams params = {
.policy = policy,
.batch_filter = batch_state->vector_qual_result,
.batch_filter = vector_slot_get_qual_result(vector_slot, &nrows),
.result_key_indexes = policy->key_index_for_row,
};

Expand Down
6 changes: 3 additions & 3 deletions tsl/src/nodes/vector_agg/hashing/hash_strategy_common.c
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,11 @@
* allocations in the hot loop that fills the hash table.
*/
void
hash_strategy_output_key_alloc(GroupingPolicyHash *policy, DecompressBatchState *batch_state)
hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows)
{
HashingStrategy *hashing = &policy->hashing;
const int n = batch_state->total_batch_rows;
const uint32 num_possible_keys = policy->last_used_key_index + 1 + n;
const uint32 num_possible_keys = policy->last_used_key_index + 1 + nrows;

if (num_possible_keys > hashing->num_allocated_output_keys)
{
hashing->num_allocated_output_keys = num_possible_keys * 2 + 1;
Expand Down
15 changes: 9 additions & 6 deletions tsl/src/nodes/vector_agg/hashing/hash_strategy_impl.c
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
*/

#include "batch_hashing_params.h"
#include "nodes/vector_agg/vector_slot.h"

/*
* The hash table maps the value of the grouping key to its unique index.
Expand Down Expand Up @@ -61,10 +62,12 @@ FUNCTION_NAME(hash_strategy_reset)(HashingStrategy *hashing)

static void
FUNCTION_NAME(hash_strategy_prepare_for_batch)(GroupingPolicyHash *policy,
DecompressBatchState *batch_state)
TupleTableSlot *vector_slot)
{
hash_strategy_output_key_alloc(policy, batch_state);
FUNCTION_NAME(key_hashing_prepare_for_batch)(policy, batch_state);
uint16 nrows = 0;
vector_slot_get_qual_result(vector_slot, &nrows);
hash_strategy_output_key_alloc(policy, nrows);
FUNCTION_NAME(key_hashing_prepare_for_batch)(policy, vector_slot);
}

/*
Expand Down Expand Up @@ -158,12 +161,12 @@ FUNCTION_NAME(fill_offsets_impl)(BatchHashingParams params, int start_row, int e
}

static void
FUNCTION_NAME(fill_offsets)(GroupingPolicyHash *policy, DecompressBatchState *batch_state,
int start_row, int end_row)
FUNCTION_NAME(fill_offsets)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row,
int end_row)
{
Assert((size_t) end_row <= policy->num_key_index_for_row);

BatchHashingParams params = build_batch_hashing_params(policy, batch_state);
BatchHashingParams params = build_batch_hashing_params(policy, vector_slot);

FUNCTION_NAME(fill_offsets_impl)(params, start_row, end_row);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ FUNCTION_NAME(key_hashing_init)(HashingStrategy *hashing)

static void
FUNCTION_NAME(key_hashing_prepare_for_batch)(GroupingPolicyHash *policy,
DecompressBatchState *batch_state)
TupleTableSlot *vector_slot)
{
}

Expand Down
8 changes: 4 additions & 4 deletions tsl/src/nodes/vector_agg/hashing/hashing_strategy.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ typedef struct HashingStrategy
void (*init)(HashingStrategy *hashing, GroupingPolicyHash *policy);
void (*reset)(HashingStrategy *hashing);
uint64 (*get_size_bytes)(HashingStrategy *hashing);
void (*prepare_for_batch)(GroupingPolicyHash *policy, DecompressBatchState *batch_state);
void (*fill_offsets)(GroupingPolicyHash *policy, DecompressBatchState *batch_state,
int start_row, int end_row);
void (*prepare_for_batch)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot);
void (*fill_offsets)(GroupingPolicyHash *policy, TupleTableSlot *vector_slot, int start_row,
int end_row);
void (*emit_key)(GroupingPolicyHash *policy, uint32 current_key,
TupleTableSlot *aggregated_slot);

Expand Down Expand Up @@ -56,6 +56,6 @@ typedef struct HashingStrategy
uint32 null_key_index;
} HashingStrategy;

void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, DecompressBatchState *batch_state);
void hash_strategy_output_key_alloc(GroupingPolicyHash *policy, uint16 nrows);
void hash_strategy_output_key_single_emit(GroupingPolicyHash *policy, uint32 current_key,
TupleTableSlot *aggregated_slot);
Loading

0 comments on commit 64e5ffc

Please sign in to comment.