From 6bd99b7f8ed9b32071c9d79151b2352eed6b863b Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Tue, 1 Oct 2024 06:01:55 +0100 Subject: [PATCH 1/2] sql,crosscluster: correctly handle NULLABLE columns in KV writer Previously, we were skipping encoding values based on whether the _new_ value was NULL, producing an incorrect _previous_ value. Further, if _all_ columns were NULL we were not correctly writing a sentinel value. Epic: none Release note: None --- pkg/ccl/crosscluster/logical/BUILD.bazel | 1 + .../crosscluster/logical/lww_kv_processor.go | 4 +- .../logical/lww_kv_processor_test.go | 184 ++++++++++++++++++ pkg/sql/row/deleter.go | 43 ++-- pkg/sql/row/helper.go | 47 +++++ pkg/sql/row/writer.go | 57 ++---- 6 files changed, 268 insertions(+), 68 deletions(-) create mode 100644 pkg/ccl/crosscluster/logical/lww_kv_processor_test.go diff --git a/pkg/ccl/crosscluster/logical/BUILD.bazel b/pkg/ccl/crosscluster/logical/BUILD.bazel index c70b627ec50f..605acb261e09 100644 --- a/pkg/ccl/crosscluster/logical/BUILD.bazel +++ b/pkg/ccl/crosscluster/logical/BUILD.bazel @@ -97,6 +97,7 @@ go_test( srcs = [ "dead_letter_queue_test.go", "logical_replication_job_test.go", + "lww_kv_processor_test.go", "lww_row_processor_test.go", "main_test.go", "purgatory_test.go", diff --git a/pkg/ccl/crosscluster/logical/lww_kv_processor.go b/pkg/ccl/crosscluster/logical/lww_kv_processor.go index 74e7854402fc..eb3758a960e1 100644 --- a/pkg/ccl/crosscluster/logical/lww_kv_processor.go +++ b/pkg/ccl/crosscluster/logical/lww_kv_processor.go @@ -425,11 +425,13 @@ func (p *kvTableWriter) deleteRow( var ph row.PartialIndexUpdateHelper // TODO(dt): support partial indexes. oth := &row.OriginTimestampCPutHelper{ - OriginTimestamp: after.MvccTimestamp, + PreviousWasDeleted: before.IsDeleted(), + OriginTimestamp: after.MvccTimestamp, // TODO(ssd): We should choose this based by comparing the cluster IDs of the source // and destination clusters. ShouldWinTie: true, } + return p.rd.DeleteRow(ctx, b, p.oldVals, ph, oth, false) } diff --git a/pkg/ccl/crosscluster/logical/lww_kv_processor_test.go b/pkg/ccl/crosscluster/logical/lww_kv_processor_test.go new file mode 100644 index 000000000000..bed8539518d3 --- /dev/null +++ b/pkg/ccl/crosscluster/logical/lww_kv_processor_test.go @@ -0,0 +1,184 @@ +// Copyright 2024 The Cockroach Authors. +// +// Use of this software is governed by the CockroachDB Software License +// included in the /LICENSE file. + +package logical + +import ( + "context" + "fmt" + "testing" + + "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/crosscluster/replicationtestutils" + "github.com/cockroachdb/cockroach/pkg/roachpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descs" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" + "github.com/cockroachdb/cockroach/pkg/sql/execinfra" + "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" + "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/cockroach/pkg/util/leaktest" + "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" + "github.com/cockroachdb/cockroach/pkg/util/timeutil" + "github.com/stretchr/testify/require" +) + +func TestKVWriterUpdateEncoding(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + ctx := context.Background() + + srv, sqlDB, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer srv.Stopper().Stop(ctx) + s := srv.ApplicationLayer() + + runner := sqlutils.MakeSQLRunner(sqlDB) + + tableWithNullableColumns := `CREATE TABLE %s (pk INT PRIMARY KEY, payload1 STRING NULL, payload2 STRING NULL)` + + tableNumber := 0 + createTable := func(t *testing.T, schema string) string { + tableName := fmt.Sprintf("tab%d", tableNumber) + runner.Exec(t, fmt.Sprintf(schema, tableName)) + runner.Exec(t, fmt.Sprintf( + "ALTER TABLE %s "+lwwColumnAdd, + tableName)) + tableNumber++ + return tableName + } + + type encoderFn func(datums ...interface{}) roachpb.KeyValue + + setup := func(t *testing.T, schema string) (string, RowProcessor, encoderFn) { + tableName := createTable(t, schema) + desc := desctestutils.TestingGetPublicTableDescriptor(s.DB(), s.Codec(), "defaultdb", tableName) + + rp, err := newKVRowProcessor(ctx, + &execinfra.ServerConfig{ + DB: s.InternalDB().(descs.DB), + LeaseManager: s.LeaseManager(), + }, &eval.Context{ + Codec: s.Codec(), + Settings: s.ClusterSettings(), + }, map[descpb.ID]sqlProcessorTableConfig{ + desc.GetID(): { + srcDesc: desc, + }, + }) + require.NoError(t, err) + return tableName, rp, func(datums ...interface{}) roachpb.KeyValue { + kv := replicationtestutils.EncodeKV(t, s.Codec(), desc, datums...) + kv.Value.Timestamp = hlc.Timestamp{WallTime: timeutil.Now().UnixNano()} + return kv + } + } + + insertRow := func(rp RowProcessor, keyValue roachpb.KeyValue, prevValue roachpb.Value) error { + _, err := rp.ProcessRow(ctx, nil, keyValue, prevValue) + return err + } + + rng, _ := randutil.NewTestRand() + useRandomPrevValue := rng.Float32() < 0.5 + t.Logf("using random previous values = %v", useRandomPrevValue) + + maybeRandomPrevValue := func(expectedPrev roachpb.KeyValue, encoder encoderFn) roachpb.KeyValue { + if useRandomPrevValue { + return encoder(1, fmt.Sprintf("rand-%d", rng.Int63()), fmt.Sprintf("rand-%d", rng.Int63())) + } + return expectedPrev + } + + t.Run("one-NULL-to-not-NULL", func(t *testing.T) { + tableNameDst, rp, encoder := setup(t, tableWithNullableColumns) + + runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, NULL)", tableNameDst), 1, "not null") + + keyValue1 := encoder(1, "not null", "not null") + keyValue2 := maybeRandomPrevValue(encoder(1, "not null", tree.DNull), encoder) + require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value)) + expectedRows := [][]string{ + {"1", "not null", "not null"}, + } + runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) + }) + t.Run("one-not-NULL-to-NULL", func(t *testing.T) { + tableNameDst, rp, encoder := setup(t, tableWithNullableColumns) + + runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, $3)", tableNameDst), 1, "not null", "not null") + + keyValue1 := encoder(1, "not null", tree.DNull) + keyValue2 := maybeRandomPrevValue(encoder(1, "not null", "not null"), encoder) + require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value)) + expectedRows := [][]string{ + {"1", "not null", "NULL"}, + } + runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) + }) + t.Run("all-NULL-to-not-NULL", func(t *testing.T) { + tableNameDst, rp, encoder := setup(t, tableWithNullableColumns) + + runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, NULL, NULL)", tableNameDst), 1) + + keyValue1 := encoder(1, "not null", "not null") + keyValue2 := maybeRandomPrevValue(encoder(1, tree.DNull, tree.DNull), encoder) + require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value)) + expectedRows := [][]string{ + {"1", "not null", "not null"}, + } + runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) + }) + t.Run("all-not-NULL-to-NULL", func(t *testing.T) { + tableNameDst, rp, encoder := setup(t, tableWithNullableColumns) + + runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, $3)", tableNameDst), 1, "not null", "not null") + + keyValue1 := encoder(1, tree.DNull, tree.DNull) + keyValue2 := maybeRandomPrevValue(encoder(1, "not null", "not null"), encoder) + require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value)) + expectedRows := [][]string{ + {"1", "NULL", "NULL"}, + } + runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) + }) + t.Run("deleted-one-NULL", func(t *testing.T) { + tableNameDst, rp, encoder := setup(t, tableWithNullableColumns) + runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, $2, NULL)", tableNameDst), 1, "not null") + + keyValue1 := encoder(1, "not null", tree.DNull) + keyValue1.Value.RawBytes = nil + keyValue2 := maybeRandomPrevValue(encoder(1, "not null", tree.DNull), encoder) + require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value)) + + expectedRows := [][]string{} + runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) + }) + t.Run("deleted-all-NULL", func(t *testing.T) { + tableNameDst, rp, encoder := setup(t, tableWithNullableColumns) + runner.Exec(t, fmt.Sprintf("INSERT INTO %s VALUES ($1, NULL, NULL)", tableNameDst), 1) + + keyValue1 := maybeRandomPrevValue(encoder(1, tree.DNull, tree.DNull), encoder) + keyValue1.Value.RawBytes = nil + keyValue2 := encoder(1, tree.DNull, tree.DNull) + require.NoError(t, insertRow(rp, keyValue1, keyValue2.Value)) + + expectedRows := [][]string{} + runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) + }) + t.Run("phantom-delete", func(t *testing.T) { + tableNameDst, rp, encoder := setup(t, tableWithNullableColumns) + + keyValue1 := encoder(1, tree.DNull, tree.DNull) + keyValue1.Value.RawBytes = nil + require.NoError(t, insertRow(rp, keyValue1, keyValue1.Value)) + + expectedRows := [][]string{} + runner.CheckQueryResults(t, fmt.Sprintf("SELECT * from %s", tableNameDst), expectedRows) + }) +} diff --git a/pkg/sql/row/deleter.go b/pkg/sql/row/deleter.go index 2a2ae38d1bb6..0028302e63a8 100644 --- a/pkg/sql/row/deleter.go +++ b/pkg/sql/row/deleter.go @@ -160,13 +160,15 @@ func (rd *Deleter) DeleteRow( rd.key = keys.MakeFamilyKey(primaryIndexKey, uint32(familyID)) if oth.IsSet() { - prevValue, err := rd.encodeValueForPrimaryIndexFamily(family, values) - if err != nil { - return err - } var expValue []byte - if prevValue.IsPresent() { - expValue = prevValue.TagAndDataBytes() + if !oth.PreviousWasDeleted { + prevValue, err := rd.encodeValueForPrimaryIndexFamily(family, values) + if err != nil { + return err + } + if prevValue.IsPresent() { + expValue = prevValue.TagAndDataBytes() + } } oth.DelWithCPut(ctx, &KVBatchAdapter{b}, &rd.key, expValue, traceKV) } else { @@ -207,35 +209,20 @@ func (rd *Deleter) encodeValueForPrimaryIndexFamily( } rd.rawValueBuf = rd.rawValueBuf[:0] - var lastColID descpb.ColumnID familySortedColumnIDs, ok := rd.Helper.SortedColumnFamily(family.ID) if !ok { return roachpb.Value{}, errors.AssertionFailedf("invalid family sorted column id map") } - for _, colID := range familySortedColumnIDs { - idx, ok := rd.FetchColIDtoRowIndex.Get(colID) - if !ok || values[idx] == tree.DNull { - continue - } - if skip, _ := rd.Helper.SkipColumnNotInPrimaryIndexValue(colID, values[idx]); skip { - continue - } - - col := rd.FetchCols[idx] - if lastColID > col.GetID() { - return roachpb.Value{}, errors.AssertionFailedf("cannot write column id %d after %d", col.GetID(), lastColID) - } - colIDDelta := valueside.MakeColumnIDDelta(lastColID, col.GetID()) - lastColID = col.GetID() - var err error - rd.rawValueBuf, err = valueside.Encode(rd.rawValueBuf, colIDDelta, values[idx], nil) - if err != nil { - return roachpb.Value{}, err - } + var err error + rd.rawValueBuf, err = rd.Helper.encodePrimaryIndexValuesToBuf(values, rd.FetchColIDtoRowIndex, familySortedColumnIDs, rd.FetchCols, rd.rawValueBuf) + if err != nil { + return roachpb.Value{}, err } ret := roachpb.Value{} - if len(rd.rawValueBuf) > 0 { + // For family 0, we expect a value even when no columns have + // been encoded to oldBytes. + if family.ID == 0 || len(rd.rawValueBuf) > 0 { ret.SetTuple(rd.rawValueBuf) } return ret, nil diff --git a/pkg/sql/row/helper.go b/pkg/sql/row/helper.go index 0a61d80a17ce..daeefd5b60fa 100644 --- a/pkg/sql/row/helper.go +++ b/pkg/sql/row/helper.go @@ -21,6 +21,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgerror" "github.com/cockroachdb/cockroach/pkg/sql/rowenc" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/rowencpb" + "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/rowinfra" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util/encoding" @@ -31,6 +32,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/log/logpb" "github.com/cockroachdb/cockroach/pkg/util/log/severity" "github.com/cockroachdb/cockroach/pkg/util/protoutil" + "github.com/cockroachdb/errors" ) const ( @@ -207,6 +209,42 @@ func (rh *RowHelper) encodeSecondaryIndexes( return rh.indexEntries, nil } +// encodePrimaryIndexValuesToBuf encodes the given values, writing +// into the given buffer. +func (rh *RowHelper) encodePrimaryIndexValuesToBuf( + vals []tree.Datum, + valColIDMapping catalog.TableColMap, + sortedColumnIDs []descpb.ColumnID, + fetchedCols []catalog.Column, + buf []byte, +) ([]byte, error) { + var lastColID descpb.ColumnID + for _, colID := range sortedColumnIDs { + idx, ok := valColIDMapping.Get(colID) + if !ok || vals[idx] == tree.DNull { + // Column not being updated or inserted. + continue + } + + if skip, _ := rh.SkipColumnNotInPrimaryIndexValue(colID, vals[idx]); skip { + continue + } + + col := fetchedCols[idx] + if lastColID > col.GetID() { + return nil, errors.AssertionFailedf("cannot write column id %d after %d", col.GetID(), lastColID) + } + colIDDelta := valueside.MakeColumnIDDelta(lastColID, col.GetID()) + lastColID = col.GetID() + var err error + buf, err = valueside.Encode(buf, colIDDelta, vals[idx], nil) + if err != nil { + return nil, err + } + } + return buf, nil +} + // SkipColumnNotInPrimaryIndexValue returns true if the value at column colID // does not need to be encoded, either because it is already part of the primary // key, or because it is not part of the primary index altogether. Composite @@ -323,9 +361,18 @@ func (rh *RowHelper) deleteIndexEntry( return nil } +// OriginTimetampCPutHelper is used by callers of Inserter, Updater, +// and Deleter when the caller wants updates to the primary key to be +// constructed using ConditionalPutRequests with the OriginTimestamp +// option set. type OriginTimestampCPutHelper struct { OriginTimestamp hlc.Timestamp ShouldWinTie bool + // PreviousWasDeleted is used to indicate that the expected + // value is non-existent. This is helpful in Deleter to + // distinguish between a delete of a value that had no columns + // in the value vs a delete of a non-existent value. + PreviousWasDeleted bool } func (oh *OriginTimestampCPutHelper) IsSet() bool { diff --git a/pkg/sql/row/writer.go b/pkg/sql/row/writer.go index e23f0ca63235..a0a345916272 100644 --- a/pkg/sql/row/writer.go +++ b/pkg/sql/row/writer.go @@ -11,7 +11,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/keys" "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" "github.com/cockroachdb/cockroach/pkg/util" @@ -157,9 +156,6 @@ func prepareInsertOrUpdateBatch( } } - // TODO(ssd): Here and below investigate reducing the - // number of allocations required to marshal the old - // value. var oldVal []byte if oth.IsSet() && len(oldValues) > 0 { // If the column could be composite, we only encode the old value if it @@ -203,53 +199,36 @@ func prepareInsertOrUpdateBatch( continue } - rawValueBuf = rawValueBuf[:0] - - var lastColID descpb.ColumnID - var oldBytes []byte - familySortedColumnIDs, ok := helper.SortedColumnFamily(family.ID) if !ok { return nil, errors.AssertionFailedf("invalid family sorted column id map") } - for _, colID := range familySortedColumnIDs { - idx, ok := valColIDMapping.Get(colID) - if !ok || values[idx] == tree.DNull { - // Column not being updated or inserted. - continue - } - if skip, _ := helper.SkipColumnNotInPrimaryIndexValue(colID, values[idx]); skip { - continue - } + rawValueBuf = rawValueBuf[:0] + var err error + rawValueBuf, err = helper.encodePrimaryIndexValuesToBuf(values, valColIDMapping, familySortedColumnIDs, fetchedCols, rawValueBuf) + if err != nil { + return nil, err + } - col := fetchedCols[idx] - if lastColID > col.GetID() { - return nil, errors.AssertionFailedf("cannot write column id %d after %d", col.GetID(), lastColID) - } - colIDDelta := valueside.MakeColumnIDDelta(lastColID, col.GetID()) - lastColID = col.GetID() - var err error - rawValueBuf, err = valueside.Encode(rawValueBuf, colIDDelta, values[idx], nil) + // TODO(ssd): Here and below investigate reducing the number of + // allocations required to marshal the old value. + var expBytes []byte + if oth.IsSet() && len(oldValues) > 0 { + var oldBytes []byte + oldBytes, err = helper.encodePrimaryIndexValuesToBuf(oldValues, valColIDMapping, familySortedColumnIDs, fetchedCols, oldBytes) if err != nil { return nil, err } - if oth.IsSet() && len(oldValues) > 0 { - var err error - oldBytes, err = valueside.Encode(oldBytes, colIDDelta, oldValues[idx], nil) - if err != nil { - return nil, err - } + // For family 0, we expect a value even when + // no columns have been encoded to oldBytes. + if family.ID == 0 || len(oldBytes) > 0 { + old := &roachpb.Value{} + old.SetTuple(oldBytes) + expBytes = old.TagAndDataBytes() } } - var expBytes []byte - if oth.IsSet() && len(oldBytes) > 0 { - old := &roachpb.Value{} - old.SetTuple(oldBytes) - expBytes = old.TagAndDataBytes() - } - if family.ID != 0 && len(rawValueBuf) == 0 { if overwrite { // The family might have already existed but every column in it is being From dc9291c7935becf65b9a98159891f058ee2ded7b Mon Sep 17 00:00:00 2001 From: Steven Danna Date: Fri, 4 Oct 2024 13:50:45 +0100 Subject: [PATCH 2/2] sql: add assertion failure to prevent use of untested code path We need to write tests for multi-column families as these will not be hit by the end-to-end tests. Epic: none Release note: None --- pkg/sql/row/writer.go | 24 ++++++++++++++++++------ 1 file changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/sql/row/writer.go b/pkg/sql/row/writer.go index a0a345916272..164c8c4c5c02 100644 --- a/pkg/sql/row/writer.go +++ b/pkg/sql/row/writer.go @@ -98,6 +98,15 @@ func prepareInsertOrUpdateBatch( overwrite, traceKV bool, ) ([]byte, error) { families := helper.TableDesc.GetFamilies() + // TODO(ssd): We don't currently support multiple column + // families on the LDR write path. As a result, we don't have + // good end-to-end testing of multi-column family writes with + // the origin timestamp helper set. Until we write such tests, + // we error if we ever see such writes. + if oth.IsSet() && len(families) > 1 { + return nil, errors.AssertionFailedf("OriginTimestampCPutHelper is not yet testing with multi-column family writes") + } + for i := range families { family := &families[i] update := false @@ -213,6 +222,9 @@ func prepareInsertOrUpdateBatch( // TODO(ssd): Here and below investigate reducing the number of // allocations required to marshal the old value. + // + // If we are using OriginTimestamp ConditionalPuts, calculate the expected + // value. var expBytes []byte if oth.IsSet() && len(oldValues) > 0 { var oldBytes []byte @@ -230,14 +242,14 @@ func prepareInsertOrUpdateBatch( } if family.ID != 0 && len(rawValueBuf) == 0 { - if overwrite { + if oth.IsSet() { + // If using OriginTimestamp'd CPuts, we _always_ want to issue a Delete + // so that we can confirm our expected bytes were correct. + oth.DelWithCPut(ctx, batch, kvKey, expBytes, traceKV) + } else if overwrite { // The family might have already existed but every column in it is being // set to NULL, so delete it. - if oth.IsSet() { - oth.DelWithCPut(ctx, batch, kvKey, expBytes, traceKV) - } else { - insertDelFn(ctx, batch, kvKey, traceKV) - } + insertDelFn(ctx, batch, kvKey, traceKV) } } else { // Copy the contents of rawValueBuf into the roachpb.Value. This is