Skip to content

Commit

Permalink
Vectorize predicates that use scalar array operations
Browse files Browse the repository at this point in the history
We can do this if the underlying scalar predicate is vectorizable, by
running the vector predicate on each element of the array and combining
the results.
  • Loading branch information
akuzm committed Dec 6, 2023
1 parent 2431435 commit 9276312
Show file tree
Hide file tree
Showing 10 changed files with 808 additions and 53 deletions.
1 change: 1 addition & 0 deletions tsl/src/nodes/decompress_chunk/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ set(SOURCES
${CMAKE_CURRENT_SOURCE_DIR}/decompress_chunk.c
${CMAKE_CURRENT_SOURCE_DIR}/exec.c
${CMAKE_CURRENT_SOURCE_DIR}/planner.c
${CMAKE_CURRENT_SOURCE_DIR}/pred_vector_array.c
${CMAKE_CURRENT_SOURCE_DIR}/qual_pushdown.c
${CMAKE_CURRENT_SOURCE_DIR}/vector_predicates.c)
target_sources(${TSL_LIBRARY_NAME} PRIVATE ${SOURCES})
74 changes: 62 additions & 12 deletions tsl/src/nodes/decompress_chunk/compressed_batch.c
Original file line number Diff line number Diff line change
Expand Up @@ -183,21 +183,54 @@ compute_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *ba
const int bitmap_bytes = sizeof(uint64) * ((batch_state->total_batch_rows + 63) / 64);
batch_state->vector_qual_result = palloc(bitmap_bytes);
memset(batch_state->vector_qual_result, 0xFF, bitmap_bytes);
if (batch_state->total_batch_rows % 64 != 0)
{
/*
* We have to zero out the bits for past-the-end elements in the last
* bitmap word. Since all predicates are ANDed to the result bitmap,
* we can do it here once instead of doing it in each predicate.
*/
const uint64 mask = ((uint64) -1) >> (64 - batch_state->total_batch_rows % 64);
batch_state->vector_qual_result[batch_state->total_batch_rows / 64] = mask;
}

/*
* Compute the quals.
*/
ListCell *lc;
foreach (lc, chunk_state->vectorized_quals_constified)
{
/* For now we only support "Var ? Const" predicates. */
OpExpr *oe = castNode(OpExpr, lfirst(lc));
Var *var = castNode(Var, linitial(oe->args));
Const *constnode = castNode(Const, lsecond(oe->args));
/*
* For now we support "Var ? Const" predicates and
* ScalarArrayOperations.
*/
List *args = NULL;
RegProcedure vector_const_opcode = InvalidOid;
ScalarArrayOpExpr *saop = NULL;
OpExpr *opexpr = NULL;
if (IsA(lfirst(lc), ScalarArrayOpExpr))
{
saop = castNode(ScalarArrayOpExpr, lfirst(lc));
args = saop->args;
vector_const_opcode = get_opcode(saop->opno);
}
else
{
opexpr = castNode(OpExpr, lfirst(lc));
args = opexpr->args;
vector_const_opcode = get_opcode(opexpr->opno);
}

/*
* Find the vector_const predicate.
*/
VectorPredicate *vector_const_predicate = get_vector_const_predicate(vector_const_opcode);
Assert(vector_const_predicate != NULL);

/*
* Find the compressed column referred to by the Var.
*/
Var *var = castNode(Var, linitial(args));
DecompressChunkColumnDescription *column_description = NULL;
int column_index = 0;
for (; column_index < chunk_state->num_total_columns; column_index++)
Expand Down Expand Up @@ -273,20 +306,37 @@ compute_vector_quals(DecompressChunkState *chunk_state, DecompressBatchState *ba
predicate_result = &default_value_predicate_result;
}

/* Find and compute the predicate. */
void (*predicate)(const ArrowArray *, Datum, uint64 *restrict) =
get_vector_const_predicate(get_opcode(oe->opno));
Ensure(predicate != NULL,
"vectorized predicate not found for postgres predicate %d",
get_opcode(oe->opno));

/*
* The vectorizable predicates should be STRICT, so we shouldn't see null
* constants here.
*/
Const *constnode = castNode(Const, lsecond(args));
Ensure(!constnode->constisnull, "vectorized predicate called for a null value");

predicate(vector, constnode->constvalue, predicate_result);
/*
* At last, compute the predicate.
*/
if (saop)
{
vector_array_predicate(vector_const_predicate,
saop->useOr,
vector,
constnode->constvalue,
predicate_result);
}
else
{
vector_const_predicate(vector, constnode->constvalue, predicate_result);
}

/* Account for nulls which shouldn't pass the predicate. */
const size_t n = vector->length;
const size_t n_words = (n + 63) / 64;
const uint64 *restrict validity = (uint64 *restrict) vector->buffers[0];
for (size_t i = 0; i < n_words; i++)
{
predicate_result[i] &= validity[i];
}

/* Process the result. */
if (column_values->arrow == NULL)
Expand Down
12 changes: 10 additions & 2 deletions tsl/src/nodes/decompress_chunk/exec.c
Original file line number Diff line number Diff line change
Expand Up @@ -532,8 +532,16 @@ decompress_chunk_begin(CustomScanState *node, EState *estate, int eflags)
}
}

OpExpr *opexpr = castNode(OpExpr, constified);
Ensure(IsA(lsecond(opexpr->args), Const),
List *args;
if (IsA(constified, OpExpr))
{
args = castNode(OpExpr, constified)->args;
}
else
{
args = castNode(ScalarArrayOpExpr, constified)->args;
}
Ensure(IsA(lsecond(args), Const),
"failed to evaluate runtime constant in vectorized filter");
chunk_state->vectorized_quals_constified =
lappend(chunk_state->vectorized_quals_constified, constified);
Expand Down
92 changes: 67 additions & 25 deletions tsl/src/nodes/decompress_chunk/planner.c
Original file line number Diff line number Diff line change
Expand Up @@ -431,47 +431,70 @@ is_not_runtime_constant(Node *node)
static Node *
make_vectorized_qual(DecompressChunkPath *path, Node *qual)
{
/* Only simple "Var op Const" binary predicates for now. */
if (!IsA(qual, OpExpr))
/*
* Currently we vectorize some "Var op Const" binary predicates,
* and scalar array operations with these predicates.
*/
if (!IsA(qual, OpExpr) && !IsA(qual, ScalarArrayOpExpr))
{
return NULL;
}

OpExpr *o = castNode(OpExpr, qual);
List *args = NIL;
OpExpr *opexpr = NULL;
Oid opno = InvalidOid;
ScalarArrayOpExpr *saop = NULL;
if (IsA(qual, OpExpr))
{
opexpr = castNode(OpExpr, qual);
args = opexpr->args;
opno = opexpr->opno;
}
else
{
saop = castNode(ScalarArrayOpExpr, qual);
args = saop->args;
opno = saop->opno;
}

if (list_length(o->args) != 2)
if (list_length(args) != 2)
{
return NULL;
}

if (IsA(lsecond(o->args), Var))
if (opexpr && IsA(lsecond(args), Var))
{
/* Try to commute the operator if the constant is on the right. */
Oid commutator_opno = get_commutator(o->opno);
if (OidIsValid(commutator_opno))
/*
* Try to commute the operator if we have Var on the right.
*/
opno = get_commutator(opno);
if (!OidIsValid(opno))
{
o = (OpExpr *) copyObject(o);
o->opno = commutator_opno;
/*
* opfuncid is a cache, we can set it to InvalidOid like the
* CommuteOpExpr() does.
*/
o->opfuncid = InvalidOid;
o->args = list_make2(lsecond(o->args), linitial(o->args));
return NULL;
}

opexpr = (OpExpr *) copyObject(opexpr);
opexpr->opno = opno;
/*
* opfuncid is a cache, we can set it to InvalidOid like the
* CommuteOpExpr() does.
*/
opexpr->opfuncid = InvalidOid;
args = list_make2(lsecond(args), linitial(args));
opexpr->args = args;
}

/*
* We can vectorize the operation where the left side is a Var and the right
* side is a constant or can be evaluated to a constant at run time (e.g.
* contains stable functions).
*/
if (!IsA(linitial(o->args), Var) || is_not_runtime_constant(lsecond(o->args)))
if (!IsA(linitial(args), Var) || is_not_runtime_constant(lsecond(args)))
{
return NULL;
}

Var *var = castNode(Var, linitial(o->args));
Var *var = castNode(Var, linitial(args));
Assert((Index) var->varno == path->info->chunk_rel->relid);

/*
Expand All @@ -485,13 +508,26 @@ make_vectorized_qual(DecompressChunkPath *path, Node *qual)
return NULL;
}

Oid opcode = get_opcode(o->opno);
if (get_vector_const_predicate(opcode))
Oid opcode = get_opcode(opno);
if (!get_vector_const_predicate(opcode))
{
return (Node *) o;
return NULL;
}

return NULL;
#if PG14_GE
if (saop)
{
if (saop->hashfuncid)
{
/*
* Don't vectorize if the planner decided to build a hash table.
*/
return NULL;
}
}
#endif

return opexpr ? (Node *) opexpr : (Node *) saop;
}

/*
Expand Down Expand Up @@ -861,10 +897,16 @@ decompress_chunk_plan_create(PlannerInfo *root, RelOptInfo *rel, CustomPath *pat
{
elog(ERROR, "debug: encountered vector quals when they are disabled");
}
else if (ts_guc_debug_require_vector_qual == RVQ_Only &&
list_length(decompress_plan->scan.plan.qual) > 0)
else if (ts_guc_debug_require_vector_qual == RVQ_Only)
{
elog(ERROR, "debug: encountered non-vector quals when they are disabled");
if (list_length(decompress_plan->scan.plan.qual) > 0)
{
elog(ERROR, "debug: encountered non-vector quals when they are disabled");
}
if (list_length(vectorized_quals) == 0)
{
elog(ERROR, "debug: did not encounter vector quals when they are required");
}
}
#endif

Expand Down
Loading

0 comments on commit 9276312

Please sign in to comment.