Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
107668: colbuilder: fix recently exposed type schema corruption r=yuzefovich a=yuzefovich

This commit fixes recently exposed (or introduced, depending on how you look at this) type schema corruption that can occur when planning filter expressions. In particular, the bug was exactly as the comment deleted in 85fd4fb described:
```
// As an example, consider the following scenario in the context of
// planFilterExpr method:
// 1. r.ColumnTypes={types.Bool} with len=1 and cap=4
// 2. planSelectionOperators adds another types.Int column, so
//    filterColumnTypes={types.Bool, types.Int} with len=2 and cap=4
//    Crucially, it uses exact same underlying array as r.ColumnTypes
//    uses.
// 3. we project out second column, so r.ColumnTypes={types.Bool}
// 4. later, we add another types.Float column, so
//    r.ColumnTypes={types.Bool, types.Float}, but there is enough
//    capacity in the array, so we simply overwrite the second slot
//    with the new type which corrupts filterColumnTypes to become
//    {types.Bool, types.Float}, and we can get into a runtime type
//    mismatch situation.
```
More concretely, in `planFilterExpr` we are using the passed-in type schema to append new types for the intermediate projection operators, and then we create a "simple project op" that removes those intermediate operators. If we later try to add more output columns, we will overwrite types captured by the intermediate projected away operators. The bug was that the simple project op did not create a new type schema like it's supposed to do.

This is now fixed, and we now enforce that the simple project op in `colbuilder` package can only be created by the helper method that explicitly returns the updated type schema, hoping that this will encourage the callers to think about the type schema management to prevent such issues in the future.

Fixes: cockroachdb#107615.

Release note: None

Co-authored-by: Yahor Yuzefovich <[email protected]>
  • Loading branch information
craig[bot] and yuzefovich committed Jul 27, 2023
2 parents bac4c79 + 80799cd commit 57b6b70
Show file tree
Hide file tree
Showing 5 changed files with 138 additions and 50 deletions.
5 changes: 4 additions & 1 deletion pkg/sql/colexec/colbuilder/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test")

go_library(
name = "colbuilder",
srcs = ["execplan.go"],
srcs = [
"execplan.go",
"execplan_util.go",
],
importpath = "github.com/cockroachdb/cockroach/pkg/sql/colexec/colbuilder",
visibility = ["//visibility:public"],
deps = [
Expand Down
72 changes: 23 additions & 49 deletions pkg/sql/colexec/colbuilder/execplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -1382,7 +1382,7 @@ func NewColOperator(

aggInput := ehj.(colexecop.Operator)
if len(hgjSpec.JoinOutputColumns) > 0 {
aggInput = colexecbase.NewSimpleProjectOp(ehj, len(hjOutputTypes), hgjSpec.JoinOutputColumns)
aggInput, _ = addProjection(aggInput, hjOutputTypes, hgjSpec.JoinOutputColumns)
}

newAggArgs := *newAggArgs
Expand Down Expand Up @@ -1668,26 +1668,19 @@ func NewColOperator(
result.ToClose = append(result.ToClose, c)
}

result.ColumnTypes = append(result.ColumnTypes, returnType)
if outputColIdx > numInputCols {
// We want to project out temporary columns (which have been added in
// between the input columns and output column) as well as include the
// new output column (which is located after any temporary columns).
// We want to project out temporary columns (which have been
// added in between the input columns and output column) as
// well as include the new output column (which is located
// after any temporary columns).
numOutputCols := numInputCols + 1
projection := make([]uint32, numOutputCols)
for i := 0; i < numInputCols; i++ {
projection[i] = uint32(i)
}
projection[numInputCols] = uint32(outputColIdx)
result.Root = colexecbase.NewSimpleProjectOp(result.Root, numOutputCols, projection)
// We need to allocate a fresh types slice because we'd
// "corrupt" the existing slice if we were to overwrite
// numInputCols'th position.
inputTypes := result.ColumnTypes[:numInputCols]
result.ColumnTypes = make([]*types.T, numInputCols+1)
copy(result.ColumnTypes, inputTypes)
result.ColumnTypes[numInputCols] = returnType
} else {
result.ColumnTypes = append(result.ColumnTypes, returnType)
result.Root, result.ColumnTypes = addProjection(result.Root, result.ColumnTypes, projection)
}

input = result.Root
Expand Down Expand Up @@ -1818,8 +1811,8 @@ func (r opResult) planAndMaybeWrapFilter(
filter execinfrapb.Expression,
factory coldata.ColumnFactory,
) error {
op, err := planFilterExpr(
ctx, flowCtx, r.Root, r.ColumnTypes, filter, args.StreamingMemAccount, factory, args.ExprHelper, &r.Releasables,
err := r.planFilterExpr(
ctx, flowCtx, filter, args.StreamingMemAccount, factory, args.ExprHelper,
)
if err != nil {
// Filter expression planning failed. Fall back to planning the filter
Expand All @@ -1837,7 +1830,6 @@ func (r opResult) planAndMaybeWrapFilter(
processorID, factory, err,
)
}
r.Root = op
return nil
}

Expand Down Expand Up @@ -1963,12 +1955,7 @@ func (r *postProcessResult) planPostProcessSpec(
}
renderedCols = append(renderedCols, uint32(outputIdx))
}
r.Op = colexecbase.NewSimpleProjectOp(r.Op, len(r.ColumnTypes), renderedCols)
newTypes := make([]*types.T, len(renderedCols))
for i, j := range renderedCols {
newTypes[i] = r.ColumnTypes[j]
}
r.ColumnTypes = newTypes
r.Op, r.ColumnTypes = addProjection(r.Op, r.ColumnTypes, renderedCols)
}
if post.Offset != 0 {
r.Op = colexec.NewOffsetOp(r.Op, post.Offset)
Expand Down Expand Up @@ -2030,54 +2017,41 @@ func (r opResult) finishScanPlanning(op colfetcher.ScanOperator, resultTypes []*
}

// planFilterExpr creates all operators to implement filter expression.
func planFilterExpr(
func (r opResult) planFilterExpr(
ctx context.Context,
flowCtx *execinfra.FlowCtx,
input colexecop.Operator,
columnTypes []*types.T,
filter execinfrapb.Expression,
acc *mon.BoundAccount,
factory coldata.ColumnFactory,
helper *colexecargs.ExprHelper,
releasables *[]execreleasable.Releasable,
) (colexecop.Operator, error) {
expr, err := helper.ProcessExpr(ctx, filter, flowCtx.EvalCtx, columnTypes)
) error {
expr, err := helper.ProcessExpr(ctx, filter, flowCtx.EvalCtx, r.ColumnTypes)
if err != nil {
return nil, err
return err
}
if expr == tree.DNull {
// The filter expression is tree.DNull meaning that it is always false, so
// we put a zero operator.
return colexecutils.NewZeroOp(input), nil
r.Root = colexecutils.NewZeroOp(r.Root)
return nil
}
op, _, filterColumnTypes, err := planSelectionOperators(
ctx, flowCtx.EvalCtx, expr, columnTypes, input, acc, factory, releasables,
ctx, flowCtx.EvalCtx, expr, r.ColumnTypes, r.Root, acc, factory, &r.Releasables,
)
if err != nil {
return nil, errors.Wrapf(err, "unable to columnarize filter expression %q", filter)
return errors.Wrapf(err, "unable to columnarize filter expression %q", filter)
}
if len(filterColumnTypes) > len(columnTypes) {
r.Root = op
if len(filterColumnTypes) > len(r.ColumnTypes) {
// Additional columns were appended to store projections while
// evaluating the filter. Project them away.
var outputColumns []uint32
for i := range columnTypes {
for i := range r.ColumnTypes {
outputColumns = append(outputColumns, uint32(i))
}
op = colexecbase.NewSimpleProjectOp(op, len(filterColumnTypes), outputColumns)
r.Root, r.ColumnTypes = addProjection(r.Root, filterColumnTypes, outputColumns)
}
return op, nil
}

// addProjection adds a simple projection on top of op according to projection
// and returns the updated operator and type schema.
func addProjection(
op colexecop.Operator, typs []*types.T, projection []uint32,
) (colexecop.Operator, []*types.T) {
newTypes := make([]*types.T, len(projection))
for i, j := range projection {
newTypes[i] = typs[j]
}
return colexecbase.NewSimpleProjectOp(op, len(typs), projection), newTypes
return nil
}

func examineLikeOp(op treecmp.ComparisonOperator) (negate bool, caseInsensitive bool) {
Expand Down
34 changes: 34 additions & 0 deletions pkg/sql/colexec/colbuilder/execplan_util.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright 2023 The Cockroach Authors.
//
// Use of this software is governed by the Business Source License
// included in the file licenses/BSL.txt.
//
// As of the Change Date specified in that file, in accordance with
// the Business Source License, use of this software will be governed
// by the Apache License, Version 2.0, included in the file
// licenses/APL.txt.

package colbuilder

import (
"github.com/cockroachdb/cockroach/pkg/sql/colexec/colexecbase"
"github.com/cockroachdb/cockroach/pkg/sql/colexecop"
"github.com/cockroachdb/cockroach/pkg/sql/types"
)

// addProjection adds a simple projection on top of op according to projection
// and returns the updated operator and type schema.
//
// Note that this method is the only place that's allowed to create a simple
// project op in colbuilder package (enforced by the linter) in order to force
// the caller to think about the type schema to prevent type schema corruption
// issues like #47889 and #107615.
func addProjection(
op colexecop.Operator, typs []*types.T, projection []uint32,
) (colexecop.Operator, []*types.T) {
newTypes := make([]*types.T, len(projection))
for i, j := range projection {
newTypes[i] = typs[j]
}
return colexecbase.NewSimpleProjectOp(op, len(typs), projection), newTypes
}
42 changes: 42 additions & 0 deletions pkg/sql/logictest/testdata/logic_test/vectorize_types
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,45 @@ INSERT INTO t64676 VALUES

statement ok
SELECT i + d FROM t64676

# Regression test for type schema corruption when planning filter expressions
# (#107615).
statement ok
CREATE TABLE t107615 AS
SELECT
g::INT2 AS _int2,
g::INT4 AS _int4,
g::INT8 AS _int8,
g::FLOAT8 AS _float8,
'2001-01-01'::DATE + g AS _date,
'2001-01-01'::TIMESTAMP + g * '1 day'::INTERVAL AS _timestamp,
'2001-01-01'::TIMESTAMPTZ + g * '1 day'::INTERVAL AS _timestamptz,
g * '1 day'::INTERVAL AS _interval,
g % 2 = 1 AS _bool,
g::DECIMAL AS _decimal,
g::STRING AS _string,
g::STRING::BYTES AS _bytes,
substring('00000000-0000-0000-0000-' || g::STRING || '00000000000', 1, 36)::UUID AS _uuid
FROM
generate_series(1, 5) AS g;
SET testing_optimizer_random_seed = 4478711114964600496;
SELECT
1.2345678901234564e+23:::FLOAT8,
_string,
_int2,
tableoid,
_int2,
'1942-08-15 21:13:20+00':::TIMESTAMPTZ,
'\xc3a0':::BYTES,
true,
_timestamp,
_date,
e'\x01':::STRING,
_uuid,
'{"test": "json"}':::JSONB,
_int4,
_interval
FROM
t107615
WHERE
(_bool OR (NOT _bool));
35 changes: 35 additions & 0 deletions pkg/testutils/lint/lint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2025,6 +2025,41 @@ func TestLint(t *testing.T) {
}
})

t.Run("TestColbuilderSimpleProject", func(t *testing.T) {
t.Parallel()
cmd, stderr, filter, err := dirCmd(
pkgDir,
"git",
"grep",
"-nE",
// We prohibit usage of colexecbase.NewSimpleProjectOp outside of
// addProjection helper in colbuilder package.
`colexecbase\.NewSimpleProjectOp`,
"--",
"sql/colexec/colbuilder*",
":!sql/colexec/colbuilder/execplan_util.go",
)
if err != nil {
t.Fatal(err)
}

if err := cmd.Start(); err != nil {
t.Fatal(err)
}

if err := stream.ForEach(filter, func(s string) {
t.Errorf("\n%s <- forbidden; use addProjection to prevent type schema corruption", s)
}); err != nil {
t.Error(err)
}

if err := cmd.Wait(); err != nil {
if out := stderr.String(); len(out) > 0 {
t.Fatalf("err=%s, stderr=%s", err, out)
}
}
})

t.Run("TestGCAssert", func(t *testing.T) {
skip.UnderShort(t)

Expand Down

0 comments on commit 57b6b70

Please sign in to comment.