Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
131645: sql,crosscluster: correctly handle NULLABLE columns in KV writer r=stevendanna a=stevendanna

Previously, we were skipping encoding values based on whether the _new_ value was NULL, producing an incorrect _previous_ value.

Further, if _all_ columns were NULL we were not correctly writing a sentinel value.

Epic: none
Release note: None

Co-authored-by: Steven Danna <[email protected]>
  • Loading branch information
craig[bot] and stevendanna committed Oct 5, 2024
2 parents b5ca162 + dc9291c commit dcce4ca
Show file tree
Hide file tree
Showing 6 changed files with 286 additions and 74 deletions.
1 change: 1 addition & 0 deletions pkg/ccl/crosscluster/logical/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ go_test(
srcs = [
"dead_letter_queue_test.go",
"logical_replication_job_test.go",
"lww_kv_processor_test.go",
"lww_row_processor_test.go",
"main_test.go",
"purgatory_test.go",
Expand Down
4 changes: 3 additions & 1 deletion pkg/ccl/crosscluster/logical/lww_kv_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -425,11 +425,13 @@ func (p *kvTableWriter) deleteRow(
var ph row.PartialIndexUpdateHelper
// TODO(dt): support partial indexes.
oth := &row.OriginTimestampCPutHelper{
OriginTimestamp: after.MvccTimestamp,
PreviousWasDeleted: before.IsDeleted(),
OriginTimestamp: after.MvccTimestamp,
// TODO(ssd): We should choose this based by comparing the cluster IDs of the source
// and destination clusters.
ShouldWinTie: true,
}

return p.rd.DeleteRow(ctx, b, p.oldVals, ph, oth, false)
}

Expand Down
184 changes: 184 additions & 0 deletions pkg/ccl/crosscluster/logical/lww_kv_processor_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
// Copyright 2024 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.

package logical

import (
"context"
"fmt"
"testing"

"github.com/cockroachdb/cockroach/pkg/base"
"github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descs"
"github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils"
"github.com/cockroachdb/cockroach/pkg/sql/execinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/eval"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/testutils/serverutils"
"github.com/cockroachdb/cockroach/pkg/testutils/sqlutils"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
"github.com/cockroachdb/cockroach/pkg/util/log"
"github.com/cockroachdb/cockroach/pkg/util/randutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/stretchr/testify/require"
)

func TestKVWriterUpdateEncoding(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)
ctx := context.Background()

srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{})
defer srv.Stopper().Stop(ctx)
s := srv.ApplicationLayer()

runner := sqlutils.MakeSQLRunner(sqlDB)

tableWithNullableColumns := `CREATE TABLE %s (pk INT PRIMARY KEY, payload1 STRING NULL, payload2 STRING NULL)`

tableNumber := 0
createTable := func(t *testing.T, schema string) string {
tableName := fmt.Sprintf("tab%d", tableNumber)
runner.Exec(t, fmt.Sprintf(schema, tableName))
runner.Exec(t, fmt.Sprintf(
"ALTER TABLE %s "+lwwColumnAdd,
tableName))
tableNumber++
return tableName
}

type encoderFn func(datums ...interface{}) roachpb.KeyValue

setup := func(t *testing.T, schema string) (string, RowProcessor, encoderFn) {
tableName := createTable(t, schema)
desc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), s.Codec(), "defaultdb", tableName)

rp, err := newKVRowProcessor(ctx,
&execinfra.ServerConfig{
DB: s.InternalDB().(descs.DB),
LeaseManager: s.LeaseManager(),
}, &eval.Context{
Codec: s.Codec(),
Settings: s.ClusterSettings(),
}, map[descpb.ID]sqlProcessorTableConfig{
desc.GetID(): {
srcDesc: desc,
},
})
require.NoError(t, err)
return tableName, rp, func(datums ...interface{}) roachpb.KeyValue {
kv := replicationtestutils.EncodeKV(t, s.Codec(), desc, datums...)
kv.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()}
return kv
}
}

insertRow := func(rp RowProcessor, keyValue roachpb.KeyValue, prevValue roachpb.Value) error {
_, err := rp.ProcessRow(ctx, nil, keyValue, prevValue)
return err
}

rng, _ := randutil.NewTestRand()
useRandomPrevValue := rng.Float32() < 0.5
t.Logf("using random previous values = %v", useRandomPrevValue)

maybeRandomPrevValue := func(expectedPrev roachpb.KeyValue, encoder encoderFn) roachpb.KeyValue {
if useRandomPrevValue {
return encoder(1, fmt.Sprintf("rand-%d", rng.Int63()), fmt.Sprintf("rand-%d", rng.Int63()))
}
return expectedPrev
}

t.Run("one-NULL-to-not-NULL", func(t *testing.T) {
tableNameDst, rp, encoder := setup(t, tableWithNullableColumns)

runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, NULL)", tableNameDst), 1, "not null")

keyValue1 := encoder(1, "not null", "not null")
keyValue2 := maybeRandomPrevValue(encoder(1, "not null", tree.DNull), encoder)
require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value))
expectedRows := [][]string{
{"1", "not null", "not null"},
}
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
t.Run("one-not-NULL-to-NULL", func(t *testing.T) {
tableNameDst, rp, encoder := setup(t, tableWithNullableColumns)

runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, $3)", tableNameDst), 1, "not null", "not null")

keyValue1 := encoder(1, "not null", tree.DNull)
keyValue2 := maybeRandomPrevValue(encoder(1, "not null", "not null"), encoder)
require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value))
expectedRows := [][]string{
{"1", "not null", "NULL"},
}
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
t.Run("all-NULL-to-not-NULL", func(t *testing.T) {
tableNameDst, rp, encoder := setup(t, tableWithNullableColumns)

runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, NULL, NULL)", tableNameDst), 1)

keyValue1 := encoder(1, "not null", "not null")
keyValue2 := maybeRandomPrevValue(encoder(1, tree.DNull, tree.DNull), encoder)
require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value))
expectedRows := [][]string{
{"1", "not null", "not null"},
}
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
t.Run("all-not-NULL-to-NULL", func(t *testing.T) {
tableNameDst, rp, encoder := setup(t, tableWithNullableColumns)

runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, $3)", tableNameDst), 1, "not null", "not null")

keyValue1 := encoder(1, tree.DNull, tree.DNull)
keyValue2 := maybeRandomPrevValue(encoder(1, "not null", "not null"), encoder)
require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value))
expectedRows := [][]string{
{"1", "NULL", "NULL"},
}
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
t.Run("deleted-one-NULL", func(t *testing.T) {
tableNameDst, rp, encoder := setup(t, tableWithNullableColumns)
runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, NULL)", tableNameDst), 1, "not null")

keyValue1 := encoder(1, "not null", tree.DNull)
keyValue1.Value.RawBytes = nil
keyValue2 := maybeRandomPrevValue(encoder(1, "not null", tree.DNull), encoder)
require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value))

expectedRows := [][]string{}
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
t.Run("deleted-all-NULL", func(t *testing.T) {
tableNameDst, rp, encoder := setup(t, tableWithNullableColumns)
runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, NULL, NULL)", tableNameDst), 1)

keyValue1 := maybeRandomPrevValue(encoder(1, tree.DNull, tree.DNull), encoder)
keyValue1.Value.RawBytes = nil
keyValue2 := encoder(1, tree.DNull, tree.DNull)
require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value))

expectedRows := [][]string{}
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
t.Run("phantom-delete", func(t *testing.T) {
tableNameDst, rp, encoder := setup(t, tableWithNullableColumns)

keyValue1 := encoder(1, tree.DNull, tree.DNull)
keyValue1.Value.RawBytes = nil
require.NoError(t, insertRow(rp, keyValue1, keyValue1.Value))

expectedRows := [][]string{}
runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows)
})
}
43 changes: 15 additions & 28 deletions pkg/sql/row/deleter.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,13 +160,15 @@ func (rd *Deleter) DeleteRow(
rd.key = keys.MakeFamilyKey(primaryIndexKey, uint32(familyID))

if oth.IsSet() {
prevValue, err := rd.encodeValueForPrimaryIndexFamily(family, values)
if err != nil {
return err
}
var expValue []byte
if prevValue.IsPresent() {
expValue = prevValue.TagAndDataBytes()
if !oth.PreviousWasDeleted {
prevValue, err := rd.encodeValueForPrimaryIndexFamily(family, values)
if err != nil {
return err
}
if prevValue.IsPresent() {
expValue = prevValue.TagAndDataBytes()
}
}
oth.DelWithCPut(ctx, &KVBatchAdapter{b}, &rd.key, expValue, traceKV)
} else {
Expand Down Expand Up @@ -207,35 +209,20 @@ func (rd *Deleter) encodeValueForPrimaryIndexFamily(
}

rd.rawValueBuf = rd.rawValueBuf[:0]
var lastColID descpb.ColumnID
familySortedColumnIDs, ok := rd.Helper.SortedColumnFamily(family.ID)
if !ok {
return roachpb.Value{}, errors.AssertionFailedf("invalid family sorted column id map")
}
for _, colID := range familySortedColumnIDs {
idx, ok := rd.FetchColIDtoRowIndex.Get(colID)
if !ok || values[idx] == tree.DNull {
continue
}

if skip, _ := rd.Helper.SkipColumnNotInPrimaryIndexValue(colID, values[idx]); skip {
continue
}

col := rd.FetchCols[idx]
if lastColID > col.GetID() {
return roachpb.Value{}, errors.AssertionFailedf("cannot write column id %d after %d", col.GetID(), lastColID)
}
colIDDelta := valueside.MakeColumnIDDelta(lastColID, col.GetID())
lastColID = col.GetID()
var err error
rd.rawValueBuf, err = valueside.Encode(rd.rawValueBuf, colIDDelta, values[idx], nil)
if err != nil {
return roachpb.Value{}, err
}
var err error
rd.rawValueBuf, err = rd.Helper.encodePrimaryIndexValuesToBuf(values, rd.FetchColIDtoRowIndex, familySortedColumnIDs, rd.FetchCols, rd.rawValueBuf)
if err != nil {
return roachpb.Value{}, err
}
ret := roachpb.Value{}
if len(rd.rawValueBuf) > 0 {
// For family 0, we expect a value even when no columns have
// been encoded to oldBytes.
if family.ID == 0 || len(rd.rawValueBuf) > 0 {
ret.SetTuple(rd.rawValueBuf)
}
return ret, nil
Expand Down
47 changes: 47 additions & 0 deletions pkg/sql/row/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/rowencpb"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
"github.com/cockroachdb/cockroach/pkg/sql/rowinfra"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util/encoding"
Expand All @@ -31,6 +32,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/log/logpb"
"github.com/cockroachdb/cockroach/pkg/util/log/severity"
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
"github.com/cockroachdb/errors"
)

const (
Expand Down Expand Up @@ -207,6 +209,42 @@ func (rh *RowHelper) encodeSecondaryIndexes(
return rh.indexEntries, nil
}

// encodePrimaryIndexValuesToBuf encodes the given values, writing
// into the given buffer.
func (rh *RowHelper) encodePrimaryIndexValuesToBuf(
vals []tree.Datum,
valColIDMapping catalog.TableColMap,
sortedColumnIDs []descpb.ColumnID,
fetchedCols []catalog.Column,
buf []byte,
) ([]byte, error) {
var lastColID descpb.ColumnID
for _, colID := range sortedColumnIDs {
idx, ok := valColIDMapping.Get(colID)
if !ok || vals[idx] == tree.DNull {
// Column not being updated or inserted.
continue
}

if skip, _ := rh.SkipColumnNotInPrimaryIndexValue(colID, vals[idx]); skip {
continue
}

col := fetchedCols[idx]
if lastColID > col.GetID() {
return nil, errors.AssertionFailedf("cannot write column id %d after %d", col.GetID(), lastColID)
}
colIDDelta := valueside.MakeColumnIDDelta(lastColID, col.GetID())
lastColID = col.GetID()
var err error
buf, err = valueside.Encode(buf, colIDDelta, vals[idx], nil)
if err != nil {
return nil, err
}
}
return buf, nil
}

// SkipColumnNotInPrimaryIndexValue returns true if the value at column colID
// does not need to be encoded, either because it is already part of the primary
// key, or because it is not part of the primary index altogether. Composite
Expand Down Expand Up @@ -323,9 +361,18 @@ func (rh *RowHelper) deleteIndexEntry(
return nil
}

// OriginTimetampCPutHelper is used by callers of Inserter, Updater,
// and Deleter when the caller wants updates to the primary key to be
// constructed using ConditionalPutRequests with the OriginTimestamp
// option set.
type OriginTimestampCPutHelper struct {
OriginTimestamp hlc.Timestamp
ShouldWinTie bool
// PreviousWasDeleted is used to indicate that the expected
// value is non-existent. This is helpful in Deleter to
// distinguish between a delete of a value that had no columns
// in the value vs a delete of a non-existent value.
PreviousWasDeleted bool
}

func (oh *OriginTimestampCPutHelper) IsSet() bool {
Expand Down
Loading

0 comments on commit dcce4ca

Please sign in to comment.