From 2135554a5d49f5069d80dd2d8d9cfe1d875e8276 Mon Sep 17 00:00:00 2001 From: Raphael 'kena' Poss Date: Wed, 2 Aug 2023 17:06:36 +0200 Subject: [PATCH 1/8] rpc: fix various test bugs TLDR: Prior to this patch, multiple unit tests in the `rpc` package were misconfiguring their heartbeat interval and timeout. This patch fixes this. The issue was that there were two separate locations for these tunable parameters: - `RPCHeartbeat{Interval,Timeout}` fields in `ContextOptions` (previously via `*base.Config`, recently as direct fields in `ContextOptions`) - also, two private fields `heartbeat{Interval,Timeout}` The former was copied into the latter during `NewContext()`. Only the latter (the private fields) matter for the actual behavior. Prior to this commit, the tests would call `NewContext()` and then afterwards would override the fields in `ContextOptions`. These overrides were thus ineffective because the value was already copied by that time. In addition to this design bug, there were a couple of additional bugs lurking: the overrides defined by the tests were problematic in some cases. If these overrides had been effective, the tests would break. The problem was discovered after 8d2d2bf47cb3da1c27baa140a5afb5d28a5ed8f6 merged, because in that commit some of the overrides from broken tests were taken on board and became effective. This started causing flakes. The fix is to remove the ambiguity in configuration and remove the problematic overrides from offending tests. Release note: None --- pkg/rpc/context.go | 15 +++++---------- pkg/rpc/context_test.go | 8 +------- pkg/rpc/datadriven_test.go | 1 - pkg/rpc/peer.go | 4 ++-- 4 files changed, 8 insertions(+), 20 deletions(-) diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index df0986cd54e2..e18cf789bacb 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -213,9 +213,6 @@ type Context struct { RemoteClocks *RemoteClockMonitor MasterCtx context.Context // cancel on stopper quiesce - heartbeatInterval time.Duration - heartbeatTimeout time.Duration - rpcCompression bool localInternalClient RestrictedInternalClient @@ -531,13 +528,11 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context { secCtx.useNodeAuth = opts.UseNodeAuth rpcCtx := &Context{ - ContextOptions: opts, - SecurityContext: secCtx, - rpcCompression: enableRPCCompression, - MasterCtx: masterCtx, - metrics: makeMetrics(), - heartbeatInterval: opts.RPCHeartbeatInterval, - heartbeatTimeout: opts.RPCHeartbeatTimeout, + ContextOptions: opts, + SecurityContext: secCtx, + rpcCompression: enableRPCCompression, + MasterCtx: masterCtx, + metrics: makeMetrics(), } rpcCtx.dialbackMu.Lock() diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 6bce3dbb7bd8..ee69efc21c6f 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -876,9 +876,6 @@ func TestOffsetMeasurement(t *testing.T) { clientClock := &AdvancingClock{time: timeutil.Unix(0, 10)} clientMaxOffset := time.Duration(0) clientCtx := newTestContext(clusterID, clientClock, clientMaxOffset, stopper) - // Make the interval shorter to speed up the test. - clientCtx.RPCHeartbeatInterval = 1 * time.Millisecond - clientCtx.RPCHeartbeatTimeout = 1 * time.Millisecond clientCtx.RemoteClocks.offsetTTL = 5 * clientClock.getAdvancementInterval() if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(ctx); err != nil { t.Fatal(err) @@ -952,7 +949,7 @@ func TestFailedOffsetMeasurement(t *testing.T) { clientCtx := newTestContext(clusterID, clock, maxOffset, stopper) // Remove the timeout so that failure arises from exceeding the maximum // clock reading delay, not the timeout. - clientCtx.heartbeatTimeout = 0 + clientCtx.RPCHeartbeatTimeout = 0 // Allow two heartbeat for initialization. The first ping doesn't report an offset, // the second one thus doesn't have an offset to work with, so it's only on the // third one that's fully configured. @@ -1022,9 +1019,6 @@ func TestLatencyInfoCleanupOnClosedConnection(t *testing.T) { clientClock := &AdvancingClock{time: timeutil.Unix(0, 10)} clientMaxOffset := time.Duration(0) clientCtx := newTestContext(clusterID, clientClock, clientMaxOffset, stopper) - // Make the interval shorter to speed up the test. - clientCtx.RPCHeartbeatInterval = 1 * time.Millisecond - clientCtx.RPCHeartbeatTimeout = 1 * time.Millisecond var hbDecommission atomic.Value hbDecommission.Store(false) diff --git a/pkg/rpc/datadriven_test.go b/pkg/rpc/datadriven_test.go index 4ebfbdc201da..70d5c01cc343 100644 --- a/pkg/rpc/datadriven_test.go +++ b/pkg/rpc/datadriven_test.go @@ -148,7 +148,6 @@ func setupEnv(t *testing.T) *ddEnv { maxOffset := time.Duration(250) clientCtx := newTestContext(clusterID, clock, maxOffset, stopper) - clientCtx.heartbeatInterval = 10 * time.Millisecond env := &ddEnv{ clusterID: clusterID, diff --git a/pkg/rpc/peer.go b/pkg/rpc/peer.go index 5ff1372af218..e137d8ba61c4 100644 --- a/pkg/rpc/peer.go +++ b/pkg/rpc/peer.go @@ -184,8 +184,8 @@ func (rpcCtx *Context) newPeer(k peerKey) *peer { dial: func(ctx context.Context, target string, class ConnectionClass) (*grpc.ClientConn, error) { return rpcCtx.grpcDialRaw(ctx, target, class, rpcCtx.testingDialOpts...) }, - heartbeatInterval: rpcCtx.heartbeatInterval, - heartbeatTimeout: rpcCtx.heartbeatTimeout, + heartbeatInterval: rpcCtx.RPCHeartbeatInterval, + heartbeatTimeout: rpcCtx.RPCHeartbeatTimeout, } var b *circuit.Breaker From 53835a5a01fed643a552a1df11be18cf2f29c71f Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Mon, 24 Jul 2023 14:50:17 -0400 Subject: [PATCH 2/8] sql: only use public indexes when fingerprinting Previously, running `SHOW EXPERIMENTAL_FINGERPRINTS` on a table in the middle of a schema change would sometimes cause an error due to missing indexes. The root cause of this error is that fingerprinting attempted to read non-public indexes which exist during the schema change: ``` 1 foo_pkey 2 crdb_internal_index_2_name_placeholder 3 crdb_internal_index_3_name_placeholder ``` The fix to this change is a one line change which ensures fingerprinting reads active indexes only. This change also adds a regression test for this scenario. There was a test util method in `changefeedccl` which was useful for this test. This change moves that util method to `schematestutils` so it can be called outside of `changefeedccl`. Epic: none Release note: None --- pkg/ccl/changefeedccl/BUILD.bazel | 3 +- pkg/ccl/changefeedccl/changefeed_test.go | 95 ++---------------- .../schemafeed/schematestutils/BUILD.bazel | 11 +++ .../schematestutils/schema_test_utils.go | 97 +++++++++++++++++++ pkg/sql/BUILD.bazel | 1 + pkg/sql/show_fingerprints.go | 2 +- pkg/sql/show_fingerprints_test.go | 34 +++++++ 7 files changed, 153 insertions(+), 90 deletions(-) diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 6b26b812349b..958dfe52ebb9 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -218,6 +218,7 @@ go_test( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/changefeedccl/changefeedpb", "//pkg/ccl/changefeedccl/kvevent", + "//pkg/ccl/changefeedccl/schemafeed/schematestutils", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/multiregionccl", "//pkg/ccl/multiregionccl/multiregionccltestutils", @@ -254,7 +255,6 @@ go_test( "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/colinfo", - "//pkg/sql/catalog/descbuilder", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/desctestutils", "//pkg/sql/catalog/schemaexpr", @@ -279,7 +279,6 @@ go_test( "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/sql/types", - "//pkg/storage", "//pkg/testutils", "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index cd235a0ee31f..ac8a04ac3761 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // multi-tenant tests _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" // locality-related table mutations _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" @@ -55,8 +56,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -68,7 +67,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -77,7 +75,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -2151,7 +2148,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { `add_column_def: [2]->{"after": {"a": 2}}`, }) sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`) - ts := fetchDescVersionModificationTime(t, s, `add_column_def`, 7) + ts := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `add_column_def`, 7) assertPayloads(t, addColumnDef, []string{ fmt.Sprintf(`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}`, ts.AsOfSystemTime()), @@ -2171,7 +2168,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { `add_col_comp: [2]->{"after": {"a": 2, "b": 7}}`, }) sqlDB.Exec(t, `ALTER TABLE add_col_comp ADD COLUMN c INT AS (a + 10) STORED`) - ts := fetchDescVersionModificationTime(t, s, `add_col_comp`, 7) + ts := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `add_col_comp`, 7) assertPayloads(t, addColComp, []string{ fmt.Sprintf(`add_col_comp: [1]->{"after": {"a": 1, "b": 6, "c": 11}, "updated": "%s"}`, ts.AsOfSystemTime()), @@ -2192,7 +2189,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { }) sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (3)`) - ts := fetchDescVersionModificationTime(t, s, `drop_column`, 2) + ts := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `drop_column`, 2) // Backfill for DROP COLUMN b. assertPayloads(t, dropColumn, []string{ @@ -2243,9 +2240,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { // version 2. Then, when adding column c, it goes from 9->17, with the schema change being visible at // the 7th step (version 15). Finally, when adding column d, it goes from 17->25 ith the schema change // being visible at the 7th step (version 23). - dropTS := fetchDescVersionModificationTime(t, s, `multiple_alters`, 2) - addTS := fetchDescVersionModificationTime(t, s, `multiple_alters`, 15) - addTS2 := fetchDescVersionModificationTime(t, s, `multiple_alters`, 23) + dropTS := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `multiple_alters`, 2) + addTS := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `multiple_alters`, 15) + addTS2 := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `multiple_alters`, 23) assertPayloads(t, multipleAlters, []string{ fmt.Sprintf(`multiple_alters: [1]->{"after": {"a": 1}, "updated": "%s"}`, dropTS.AsOfSystemTime()), @@ -2296,7 +2293,7 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`) // The primary index swap occurs at version 7. - ts := fetchDescVersionModificationTime(t, s, `add_column_def`, 7) + ts := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `add_column_def`, 7) assertPayloads(t, combinedFeed, []string{ fmt.Sprintf(`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}`, ts.AsOfSystemTime()), @@ -2319,82 +2316,6 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { } } -// fetchDescVersionModificationTime fetches the `ModificationTime` of the specified -// `version` of `tableName`'s table descriptor. -func fetchDescVersionModificationTime( - t testing.TB, s TestServerWithSystem, tableName string, version int, -) hlc.Timestamp { - tblKey := s.Codec.TablePrefix(keys.DescriptorTableID) - header := kvpb.RequestHeader{ - Key: tblKey, - EndKey: tblKey.PrefixEnd(), - } - dropColTblID := sqlutils.QueryTableID(t, s.DB, `d`, "public", tableName) - req := &kvpb.ExportRequest{ - RequestHeader: header, - MVCCFilter: kvpb.MVCCFilter_All, - StartTime: hlc.Timestamp{}, - } - hh := kvpb.Header{Timestamp: hlc.NewClockForTesting(nil).Now()} - res, pErr := kv.SendWrappedWith(context.Background(), - s.SystemServer.DB().NonTransactionalSender(), hh, req) - if pErr != nil { - t.Fatal(pErr.GoError()) - } - for _, file := range res.(*kvpb.ExportResponse).Files { - it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, storage.IterOptions{ - KeyTypes: storage.IterKeyTypePointsAndRanges, - LowerBound: keys.MinKey, - UpperBound: keys.MaxKey, - }) - if err != nil { - t.Fatal(err) - } - defer it.Close() - for it.SeekGE(storage.NilKey); ; it.Next() { - if ok, err := it.Valid(); err != nil { - t.Fatal(err) - } else if !ok { - continue - } - k := it.UnsafeKey() - if _, hasRange := it.HasPointAndRange(); hasRange { - t.Fatalf("unexpected MVCC range key at %s", k) - } - remaining, _, _, err := s.Codec.DecodeIndexPrefix(k.Key) - if err != nil { - t.Fatal(err) - } - _, tableID, err := encoding.DecodeUvarintAscending(remaining) - if err != nil { - t.Fatal(err) - } - if tableID != uint64(dropColTblID) { - continue - } - unsafeValue, err := it.UnsafeValue() - require.NoError(t, err) - if unsafeValue == nil { - t.Fatal(errors.New(`value was dropped or truncated`)) - } - value := roachpb.Value{RawBytes: unsafeValue, Timestamp: k.Timestamp} - b, err := descbuilder.FromSerializedValue(&value) - if err != nil { - t.Fatal(err) - } - require.NotNil(t, b) - if b.DescriptorType() == catalog.Table { - tbl := b.BuildImmutable().(catalog.TableDescriptor) - if int(tbl.GetVersion()) == version { - return tbl.GetModificationTime() - } - } - } - } - t.Fatal(errors.New(`couldn't find table desc for given version`)) - return hlc.Timestamp{} -} - // Regression test for #34314 func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/changefeedccl/schemafeed/schematestutils/BUILD.bazel b/pkg/ccl/changefeedccl/schemafeed/schematestutils/BUILD.bazel index 7832057de020..0856e8378683 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schematestutils/BUILD.bazel +++ b/pkg/ccl/changefeedccl/schemafeed/schematestutils/BUILD.bazel @@ -6,12 +6,23 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvpb", + "//pkg/roachpb", "//pkg/sql/catalog", "//pkg/sql/catalog/catpb", + "//pkg/sql/catalog/descbuilder", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/tabledesc", "//pkg/sql/types", + "//pkg/storage", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/util/encoding", "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//proto", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go b/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go index b76dd3e22cf1..fce08e9365b6 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go +++ b/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go @@ -11,15 +11,28 @@ package schematestutils import ( + "context" "strconv" + "testing" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" ) // MakeTableDesc makes a generic table descriptor with the provided properties. @@ -124,3 +137,87 @@ func AddDropIndexMutation(desc catalog.TableDescriptor) catalog.TableDescriptor }) return tabledesc.NewBuilder(desc.TableDesc()).BuildImmutableTable() } + +// FetchDescVersionModificationTime fetches the `ModificationTime` of the +// specified `version` of `tableName`'s table descriptor. +func FetchDescVersionModificationTime( + t testing.TB, + s serverutils.ApplicationLayerInterface, + dbName string, + schemaName string, + tableName string, + version int, +) hlc.Timestamp { + db := serverutils.OpenDBConn( + t, s.SQLAddr(), dbName, false, s.Stopper()) + + tblKey := s.Codec().TablePrefix(keys.DescriptorTableID) + header := kvpb.RequestHeader{ + Key: tblKey, + EndKey: tblKey.PrefixEnd(), + } + dropColTblID := sqlutils.QueryTableID(t, db, dbName, schemaName, tableName) + req := &kvpb.ExportRequest{ + RequestHeader: header, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + } + hh := kvpb.Header{Timestamp: hlc.NewClockForTesting(nil).Now()} + res, pErr := kv.SendWrappedWith(context.Background(), + s.DB().NonTransactionalSender(), hh, req) + if pErr != nil { + t.Fatal(pErr.GoError()) + } + for _, file := range res.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.MinKey, + UpperBound: keys.MaxKey, + }) + if err != nil { + t.Fatal(err) + } + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + continue + } + k := it.UnsafeKey() + if _, hasRange := it.HasPointAndRange(); hasRange { + t.Fatalf("unexpected MVCC range key at %s", k) + } + remaining, _, _, err := s.Codec().DecodeIndexPrefix(k.Key) + if err != nil { + t.Fatal(err) + } + _, tableID, err := encoding.DecodeUvarintAscending(remaining) + if err != nil { + t.Fatal(err) + } + if tableID != uint64(dropColTblID) { + continue + } + unsafeValue, err := it.UnsafeValue() + require.NoError(t, err) + if unsafeValue == nil { + t.Fatal(errors.New(`value was dropped or truncated`)) + } + value := roachpb.Value{RawBytes: unsafeValue, Timestamp: k.Timestamp} + b, err := descbuilder.FromSerializedValue(&value) + if err != nil { + t.Fatal(err) + } + require.NotNil(t, b) + if b.DescriptorType() == catalog.Table { + tbl := b.BuildImmutable().(catalog.TableDescriptor) + if int(tbl.GetVersion()) == version { + return tbl.GetModificationTime() + } + } + } + } + t.Fatal(errors.New(`couldn't find table desc for given version`)) + return hlc.Timestamp{} +} diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index d6ad4bdfbb30..1d028efd5974 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -732,6 +732,7 @@ go_test( "//pkg/base", "//pkg/build/bazel", "//pkg/ccl", + "//pkg/ccl/changefeedccl/schemafeed/schematestutils", "//pkg/cloud/impl:cloudimpl", "//pkg/clusterversion", "//pkg/col/coldata", diff --git a/pkg/sql/show_fingerprints.go b/pkg/sql/show_fingerprints.go index acff3d23f80a..780418ad2109 100644 --- a/pkg/sql/show_fingerprints.go +++ b/pkg/sql/show_fingerprints.go @@ -66,7 +66,7 @@ func (p *planner) ShowFingerprints( return &showFingerprintsNode{ tableDesc: tableDesc, - indexes: tableDesc.NonDropIndexes(), + indexes: tableDesc.ActiveIndexes(), }, nil } diff --git a/pkg/sql/show_fingerprints_test.go b/pkg/sql/show_fingerprints_test.go index c3f8cf1e899b..ec307eba8f61 100644 --- a/pkg/sql/show_fingerprints_test.go +++ b/pkg/sql/show_fingerprints_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -86,3 +87,36 @@ func TestShowFingerprintsColumnNames(t *testing.T) { t.Errorf("expected different fingerprints: %v vs %v", fprint1, fprint2) } } + +// TestShowFingerprintsDuringSchemaChange is a regression test that asserts that +// fingerprinting does not fail when done in the middle of a schema change using +// an AOST query. In the middle of a schema change such as `ADD COLUMN ... +// DEFAULT`, there may be non-public indexes in the descriptor. Prior to the +// change which introduced this test, fingerprinting would attempt to read these +// non-public indexes and fail. +func TestShowFingerprintsDuringSchemaChange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `USE d`) + sqlDB.Exec(t, `CREATE TABLE foo ( + a INT PRIMARY KEY, + b INT + )`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 0)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 1)`) + + // At version 5, there are non-public indexes. + sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN c INT DEFAULT -1`) + ts := schematestutils.FetchDescVersionModificationTime( + t, s, "d", "public", "foo", 5) + sqlDB.Exec(t, fmt.Sprintf( + `SELECT * FROM [SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE foo] AS OF SYSTEM TIME %s`, + ts.AsOfSystemTime())) +} From b65ea135f923c69df95b275dea9eb95620b7628c Mon Sep 17 00:00:00 2001 From: Jayant Shrivastava Date: Tue, 25 Jul 2023 10:54:27 -0400 Subject: [PATCH 3/8] changefeedccl: update nemeses test to use the declarative schema changer This change updates the nemeses test, which asserts the behavior of changefeeds under a randomized DML/DDL workload. Previously, the test would only run with the legacy schema changer. This is not ideal because, by default, the declarative schema changer is used in production. This change makes it so that the declarative schema changer is used 90% of the time instead. Informs: #106906 Epic: none Release note: None --- pkg/ccl/changefeedccl/cdctest/BUILD.bazel | 1 - pkg/ccl/changefeedccl/cdctest/nemeses.go | 6 +++--- pkg/ccl/changefeedccl/helpers_test.go | 7 ------- pkg/ccl/changefeedccl/nemeses_test.go | 8 ++++++-- 4 files changed, 9 insertions(+), 13 deletions(-) diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index da98d1c85cf0..9fc078027ba0 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "//pkg/util/fsm", "//pkg/util/hlc", "//pkg/util/log", - "//pkg/util/randutil", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index 93b0e498693b..234a47ad3ef0 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" ) @@ -30,7 +29,9 @@ import ( // duplicates, which the two cdctest.Validator implementations verify for the // real output of a changefeed. The output rows and resolved timestamps of the // tested feed are fed into them to check for anomalies. -func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, error) { +func RunNemesis( + f TestFeedFactory, db *gosql.DB, isSinkless bool, rng *rand.Rand, +) (Validator, error) { // possible additional nemeses: // - schema changes // - merges @@ -42,7 +43,6 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er // - sink chaos ctx := context.Background() - rng, _ := randutil.NewPseudoRand() eventPauseCount := 10 if isSinkless { diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 1fb1aef33bc7..0cdea8ce55d9 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -73,13 +73,6 @@ func maybeDisableDeclarativeSchemaChangesForTest( return disable } -// disableDeclarativeSchemaChangesForTest tests that are disabled due to differences -// in changefeed behaviour and are tracked by issue #80545. -func disableDeclarativeSchemaChangesForTest(t testing.TB, sqlDB *sqlutils.SQLRunner) { - sqlDB.Exec(t, "SET use_declarative_schema_changer='off'") - sqlDB.Exec(t, "SET CLUSTER SETTING sql.defaults.use_declarative_schema_changer='off'") -} - func waitForSchemaChange( t testing.TB, sqlDB *sqlutils.SQLRunner, stmt string, arguments ...interface{}, ) { diff --git a/pkg/ccl/changefeedccl/nemeses_test.go b/pkg/ccl/changefeedccl/nemeses_test.go index 907bdd169b85..9302c522cf18 100644 --- a/pkg/ccl/changefeedccl/nemeses_test.go +++ b/pkg/ccl/changefeedccl/nemeses_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" ) func TestChangefeedNemeses(t *testing.T) { @@ -27,12 +28,15 @@ func TestChangefeedNemeses(t *testing.T) { skip.UnderRace(t, "takes >1 min under race") testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + rng, seed := randutil.NewPseudoRand() + t.Logf("random seed: %d", seed) + sqlDB := sqlutils.MakeSQLRunner(s.DB) - disableDeclarativeSchemaChangesForTest(t, sqlDB) + _ = maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB, rng) // TODO(dan): Ugly hack to disable `eventPause` in sinkless feeds. See comment in // `RunNemesis` for details. isSinkless := strings.Contains(t.Name(), "sinkless") - v, err := cdctest.RunNemesis(f, s.DB, isSinkless) + v, err := cdctest.RunNemesis(f, s.DB, isSinkless, rng) if err != nil { t.Fatalf("%+v", err) } From f8e7dea25fa2fa52fe8cd5ce057bf26423605c04 Mon Sep 17 00:00:00 2001 From: Josh Imhoff Date: Wed, 2 Aug 2023 18:34:49 -0400 Subject: [PATCH 4/8] storage: fix overwrite RemoteStorage factory bug As of this commit, if both cfg.SharedStorage and cfg.RemoteStorageFactory are set, CRDB uses cfg.SharedStorage. Note that eventually we will enable using both at the same time, but with the abstractions available today, that is not easy. We prefer cfg.SharedStorage, since the Locator -> Storage mapping contained in it is needed for CRDB to function properly. Release note: None. Epic: None. --- pkg/storage/pebble.go | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index be726c051a5c..63705f2264d1 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1148,6 +1148,13 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { opts.EventListener = &el p.wrappedIntentWriter = wrapIntentWriter(p) + // If both cfg.SharedStorage and cfg.RemoteStorageFactory are set, CRDB uses + // cfg.SharedStorage. Note that eventually we will enable using both at the + // same time, but we don't have the right abstractions in place to do that + // today. + // + // We prefer cfg.SharedStorage, since the Locator -> Storage mapping contained + // in it is needed for CRDB to function properly. if cfg.SharedStorage != nil { esWrapper := &externalStorageWrapper{p: p, es: cfg.SharedStorage, ctx: ctx} opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ @@ -1155,10 +1162,10 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { }) opts.Experimental.CreateOnShared = true opts.Experimental.CreateOnSharedLocator = "" - } - - if cfg.RemoteStorageFactory != nil { - opts.Experimental.RemoteStorage = remoteStorageAdaptor{p: p, ctx: ctx, factory: cfg.RemoteStorageFactory} + } else { + if cfg.RemoteStorageFactory != nil { + opts.Experimental.RemoteStorage = remoteStorageAdaptor{p: p, ctx: ctx, factory: cfg.RemoteStorageFactory} + } } // Read the current store cluster version. From bdca2a4476444ed04b9850ee8faabf307d1d4558 Mon Sep 17 00:00:00 2001 From: Chengxiong Ruan Date: Wed, 2 Aug 2023 17:31:59 -0400 Subject: [PATCH 5/8] sql: unskip TestCancelSchemaChange Informs #51796 This commit unskip TestCancelSchemaChange by fixing several wrong assumption of schema changer jobs. Also removed some variables which made no effects on the test. Release note: None Release justification: test only change. --- pkg/sql/schema_changer_test.go | 75 +++++++++++++--------------------- 1 file changed, 29 insertions(+), 46 deletions(-) diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index b1662a466fe7..f7d62b23d866 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -4699,18 +4699,14 @@ ALTER TABLE t.test ADD COLUMN c INT AS (v + 4) STORED, ADD COLUMN d INT DEFAULT func TestCancelSchemaChange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 51796) const ( - numNodes = 3 maxValue = 100 ) var sqlDB *sqlutils.SQLRunner - var db *gosql.DB params, _ := createTestServerParams() doCancel := false - var enableAsyncSchemaChanges uint32 params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ WriteCheckpointInterval: time.Nanosecond, // checkpoint after every chunk. @@ -4721,52 +4717,41 @@ func TestCancelSchemaChange(t *testing.T) { if !doCancel { return nil } - if _, err := db.Exec(`CANCEL JOB ( + sqlDB.Exec(t, `CANCEL JOB ( SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND status = $1 AND description NOT LIKE 'ROLL BACK%' - )`, jobs.StatusRunning); err != nil { - panic(err) - } + )`, jobs.StatusRunning) return nil }, }, } - tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: params, - }) - defer tc.Stopper().Stop(context.Background()) - db = tc.ServerConn(0) - kvDB := tc.Server(0).DB() - codec := tc.Server(0).ApplicationLayer().Codec() + server, db, kvDB := serverutils.StartServer(t, params) sqlDB = sqlutils.MakeSQLRunner(db) + defer server.Stopper().Stop(context.Background()) + codec := server.ApplicationLayer().Codec() // Disable strict GC TTL enforcement because we're going to shove a zero-value // TTL into the system with AddImmediateGCZoneConfig. defer sqltestutils.DisableGCTTLStrictEnforcement(t, db)() sqlDB.Exec(t, ` + SET use_declarative_schema_changer = 'off'; CREATE DATABASE t; CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL DEFAULT (DECIMAL '3.14')); `) + sqlDB.Exec(t, `SET CLUSTER SETTING jobs.registry.interval.adopt = '1s';`) + // Bulk insert. if err := sqltestutils.BulkInsertIntoTable(db, maxValue); err != nil { t.Fatal(err) } tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "t", "test") - // Split the table into multiple ranges. - var sps []serverutils.SplitPoint - const numSplits = numNodes * 2 - for i := 1; i <= numSplits-1; i++ { - sps = append(sps, serverutils.SplitPoint{TargetNodeIdx: i % numNodes, Vals: []interface{}{maxValue / numSplits * i}}) - } - tc.SplitTable(t, tableDesc, sps) ctx := context.Background() if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue); err != nil { @@ -4780,49 +4765,48 @@ func TestCancelSchemaChange(t *testing.T) { // Set to true if the rollback returns in a running, waiting status. isGC bool }{ - {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.4::DECIMAL CREATE FAMILY f2, ADD CHECK (x >= 0)`, + {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.4 CREATE FAMILY f2, ADD CHECK (x >= 0)`, true, false}, {`CREATE INDEX foo ON t.public.test (v)`, true, true}, - {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.2::DECIMAL CREATE FAMILY f3, ADD CHECK (x >= 0)`, + {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.2 CREATE FAMILY f3, ADD CHECK (x >= 0)`, false, false}, {`CREATE INDEX foo ON t.public.test (v)`, false, true}, } idx := 0 + gcIdx := 0 for _, tc := range testCases { doCancel = tc.cancel if doCancel { if _, err := db.Exec(tc.sql); !testutils.IsError(err, "job canceled") { t.Fatalf("unexpected %v", err) } - if err := jobutils.VerifySystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusCanceled, jobs.Record{ - Username: username.RootUserName(), - Description: tc.sql, - DescriptorIDs: descpb.IDs{ - tableDesc.GetID(), - }, - }); err != nil { - t.Fatal(err) - } - jobID := jobutils.GetJobID(t, sqlDB, idx) - idx++ + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifySystemJob( + t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusCanceled, + jobs.Record{ + Username: username.RootUserName(), + Description: tc.sql, + DescriptorIDs: descpb.IDs{ + tableDesc.GetID(), + }, + }, + ) + }) jobRecord := jobs.Record{ Username: username.RootUserName(), - Description: fmt.Sprintf("ROLL BACK JOB %d: %s", jobID, tc.sql), + Description: fmt.Sprintf("GC for ROLLBACK of %s", tc.sql), DescriptorIDs: descpb.IDs{ tableDesc.GetID(), }, } - var err error if tc.isGC { - err = jobutils.VerifyRunningSystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, sql.RunningStatusWaitingGC, jobRecord) - } else { - err = jobutils.VerifySystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobRecord) - } - if err != nil { - t.Fatal(err) + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifyRunningSystemJob(t, sqlDB, gcIdx*2, jobspb.TypeSchemaChangeGC, sql.RunningStatusWaitingForMVCCGC, jobRecord) + }) + gcIdx++ } } else { sqlDB.Exec(t, tc.sql) @@ -4841,7 +4825,7 @@ func TestCancelSchemaChange(t *testing.T) { // Verify that the index foo over v is consistent, and that column x has // been backfilled properly. - rows, err := db.Query(`SELECT v, x from t.test@foo`) + rows, err := db.Query(`SELECT v, x from t.test@foo ORDER BY v`) if err != nil { t.Fatal(err) } @@ -4871,7 +4855,6 @@ func TestCancelSchemaChange(t *testing.T) { } // Verify that the data from the canceled CREATE INDEX is cleaned up. - atomic.StoreUint32(&enableAsyncSchemaChanges, 1) // TODO (lucy): when this test is no longer canceled, have it correctly handle doing GC immediately if _, err := sqltestutils.AddImmediateGCZoneConfig(db, tableDesc.GetID()); err != nil { t.Fatal(err) From 83ee351a0139350976ae9d324deebbbaebc1402c Mon Sep 17 00:00:00 2001 From: Jay Date: Thu, 3 Aug 2023 07:25:09 -0400 Subject: [PATCH 6/8] ccl/sqlproxyccl: avoid holding onto the lock in ListenForDenied Previously, ListenForDenied would grab the watcher's lock (which is tied to the scope of the proxy) before calling checkConnection. As part of the updated ACL work, checkConnection now will grab a lock when it calls Initialize on a given tenant. Given that checkConnection can be blocking, grabbing onto the global watcher lock isn't ideal as it could hold up new connections, leading to timeout issues. This commit updates ListenForDenied such that the watcher's lock gets released before invoking checkConnection. Release note: None Epic: none --- pkg/ccl/sqlproxyccl/acl/watcher.go | 49 ++++++++++++++++-------------- 1 file changed, 27 insertions(+), 22 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/acl/watcher.go b/pkg/ccl/sqlproxyccl/acl/watcher.go index f541163f9f48..c3bc9a6e3eeb 100644 --- a/pkg/ccl/sqlproxyccl/acl/watcher.go +++ b/pkg/ccl/sqlproxyccl/acl/watcher.go @@ -244,42 +244,45 @@ func (w *Watcher) updateAccessController( checkListeners(ctx, copy, controllers) } -// ListenForDenied Adds a listener to the watcher for the given connection. If the -// connection is already blocked a nil remove function is returned and an error -// is returned immediately. +// ListenForDenied Adds a listener to the watcher for the given connection. If +// the connection is already blocked a nil remove function is returned and an +// error is returned immediately. // // Example Usage: // -// remove, err := w.ListenForDenied(ctx, connection, func(err error) { +// removeFn, err := w.ListenForDenied(ctx, connection, func(err error) { // /* connection was blocked by change */ // }) // -// if err != nil { /*connection already blocked*/ } -// defer remove() +// if err != nil { /*connection already blocked*/ } +// defer removeFn() // -// Warning: -// Do not call remove() within the error callback. It would deadlock. +// WARNING: Do not call removeFn() within the error callback. It would deadlock. func (w *Watcher) ListenForDenied( ctx context.Context, connection ConnectionTags, callback func(error), ) (func(), error) { - w.mu.Lock() - defer w.mu.Unlock() + lst, controllers := func() (*listener, []AccessController) { + w.mu.Lock() + defer w.mu.Unlock() - if err := checkConnection(ctx, connection, w.controllers); err != nil { - return nil, err - } + id := w.nextID + w.nextID++ - id := w.nextID - w.nextID++ + l := &listener{ + id: id, + connection: connection, + } + l.mu.denied = callback - l := &listener{ - id: id, - connection: connection, - } - l.mu.denied = callback + w.listeners.ReplaceOrInsert(l) - w.listeners.ReplaceOrInsert(l) - return func() { w.removeListener(l) }, nil + return l, w.controllers + }() + if err := checkConnection(ctx, connection, controllers); err != nil { + w.removeListener(lst) + return nil, err + } + return func() { w.removeListener(lst) }, nil } func (w *Watcher) removeListener(l *listener) { @@ -315,6 +318,8 @@ func checkListeners(ctx context.Context, listeners *btree.BTree, controllers []A }) } +// NOTE: checkConnection may grab a lock, and could be blocked waiting for it, +// so callers should not hold a global lock while calling this. func checkConnection( ctx context.Context, connection ConnectionTags, controllers []AccessController, ) error { From 8d6e4c917876e397e87134f74bb97046d37ab905 Mon Sep 17 00:00:00 2001 From: Jay Date: Thu, 3 Aug 2023 08:07:22 -0400 Subject: [PATCH 7/8] ccl/sqlproxyccl: avoid tenant lookups if we know the type of connection Previously, we were performing a tenant lookup call before checking on the type of connection. This can be unnecessary (e.g. doing a lookup call for the private endpoints ACL, even if we knew that the connection was a public one). This commit addresses that. Release note: None Epic: none --- pkg/ccl/sqlproxyccl/acl/cidr_ranges.go | 10 +++++----- pkg/ccl/sqlproxyccl/acl/private_endpoints.go | 10 +++++----- pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go | 2 +- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/pkg/ccl/sqlproxyccl/acl/cidr_ranges.go b/pkg/ccl/sqlproxyccl/acl/cidr_ranges.go index 69c8fe573e80..073172486f5b 100644 --- a/pkg/ccl/sqlproxyccl/acl/cidr_ranges.go +++ b/pkg/ccl/sqlproxyccl/acl/cidr_ranges.go @@ -28,16 +28,16 @@ var _ AccessController = &CIDRRanges{} // CheckConnection implements the AccessController interface. func (p *CIDRRanges) CheckConnection(ctx context.Context, conn ConnectionTags) error { - tenantObj, err := p.LookupTenantFn(ctx, conn.TenantID) - if err != nil { - return err - } - // Private connections. This ACL is only responsible for public CIDR ranges. if conn.EndpointID != "" { return nil } + tenantObj, err := p.LookupTenantFn(ctx, conn.TenantID) + if err != nil { + return err + } + // Cluster allows public connections, so we'll check allowed CIDR ranges. if tenantObj.AllowPublicConn() { ip := net.ParseIP(conn.IP) diff --git a/pkg/ccl/sqlproxyccl/acl/private_endpoints.go b/pkg/ccl/sqlproxyccl/acl/private_endpoints.go index 7e4e1fe55b0a..8665e26544cd 100644 --- a/pkg/ccl/sqlproxyccl/acl/private_endpoints.go +++ b/pkg/ccl/sqlproxyccl/acl/private_endpoints.go @@ -37,16 +37,16 @@ var _ AccessController = &PrivateEndpoints{} // CheckConnection implements the AccessController interface. func (p *PrivateEndpoints) CheckConnection(ctx context.Context, conn ConnectionTags) error { - tenantObj, err := p.LookupTenantFn(ctx, conn.TenantID) - if err != nil { - return err - } - // Public connections. This ACL is only responsible for private endpoints. if conn.EndpointID == "" { return nil } + tenantObj, err := p.LookupTenantFn(ctx, conn.TenantID) + if err != nil { + return err + } + // Cluster allows private connections, so we'll check allowed endpoints. if tenantObj.AllowPrivateConn() { for _, endpoints := range tenantObj.AllowedPrivateEndpoints { diff --git a/pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go b/pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go index d986b558a13b..888528b3ecd2 100644 --- a/pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go +++ b/pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go @@ -41,7 +41,7 @@ func TestPrivateEndpoints(t *testing.T) { return nil, errors.New("foo") }, } - err := p.CheckConnection(ctx, makeConn("")) + err := p.CheckConnection(ctx, makeConn("foo")) require.EqualError(t, err, "foo") }) From 2f8cd099752aadbe500e16feaaacdc8ade258cd8 Mon Sep 17 00:00:00 2001 From: Pavel Kalinnikov Date: Thu, 3 Aug 2023 11:50:47 +0100 Subject: [PATCH 8/8] timeutil/ptp: fix conversion from seconds to Time Epic: none Release note (bug fix): since 22.2.0, using a PTP clock device (enabled by the --clock-device flag) would generate timestamps in the far future. It now generates the correct time. This could cause nodes to crash due to incorrect timestamps, or in the worst case irreversibly advance the cluster's HLC clock into the far future. --- pkg/BUILD.bazel | 2 ++ pkg/util/timeutil/ptp/BUILD.bazel | 18 ++++++++++- pkg/util/timeutil/ptp/ptp_clock_linux.go | 7 ++++- pkg/util/timeutil/ptp/ptp_clock_linux_test.go | 30 +++++++++++++++++++ 4 files changed, 55 insertions(+), 2 deletions(-) create mode 100644 pkg/util/timeutil/ptp/ptp_clock_linux_test.go diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index ca3461b29dff..43065203844e 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -694,6 +694,7 @@ ALL_TESTS = [ "//pkg/util/timeofday:timeofday_test", "//pkg/util/timetz:timetz_test", "//pkg/util/timeutil/pgdate:pgdate_test", + "//pkg/util/timeutil/ptp:ptp_test", "//pkg/util/timeutil:timeutil_test", "//pkg/util/tochar:tochar_test", "//pkg/util/tracing/collector:collector_test", @@ -2416,6 +2417,7 @@ GO_TARGETS = [ "//pkg/util/timeutil/pgdate:pgdate", "//pkg/util/timeutil/pgdate:pgdate_test", "//pkg/util/timeutil/ptp:ptp", + "//pkg/util/timeutil/ptp:ptp_test", "//pkg/util/timeutil:timeutil", "//pkg/util/timeutil:timeutil_test", "//pkg/util/tochar:tochar", diff --git a/pkg/util/timeutil/ptp/BUILD.bazel b/pkg/util/timeutil/ptp/BUILD.bazel index 76eb9d5a1e9a..c1505a597fc1 100644 --- a/pkg/util/timeutil/ptp/BUILD.bazel +++ b/pkg/util/timeutil/ptp/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ptp", @@ -59,3 +59,19 @@ go_library( "//conditions:default": [], }), ) + +go_test( + name = "ptp_test", + srcs = ["ptp_clock_linux_test.go"], + args = ["-test.timeout=295s"], + embed = [":ptp"], + deps = select({ + "@io_bazel_rules_go//go/platform:android": [ + "//pkg/util/timeutil", + ], + "@io_bazel_rules_go//go/platform:linux": [ + "//pkg/util/timeutil", + ], + "//conditions:default": [], + }), +) diff --git a/pkg/util/timeutil/ptp/ptp_clock_linux.go b/pkg/util/timeutil/ptp/ptp_clock_linux.go index 3f2a880a28b5..4bbbcea6782f 100644 --- a/pkg/util/timeutil/ptp/ptp_clock_linux.go +++ b/pkg/util/timeutil/ptp/ptp_clock_linux.go @@ -79,5 +79,10 @@ func (p Clock) Now() time.Time { panic(err) } - return timeutil.Unix(int64(ts.tv_sec)*1e9, int64(ts.tv_nsec)) + return timeutil.Unix(int64(ts.tv_sec), int64(ts.tv_nsec)) +} + +// realtime returns a clock using the system CLOCK_REALTIME device. For testing. +func realtime() Clock { + return Clock{clockDeviceID: uintptr(C.CLOCK_REALTIME)} } diff --git a/pkg/util/timeutil/ptp/ptp_clock_linux_test.go b/pkg/util/timeutil/ptp/ptp_clock_linux_test.go new file mode 100644 index 000000000000..01e915bf9983 --- /dev/null +++ b/pkg/util/timeutil/ptp/ptp_clock_linux_test.go @@ -0,0 +1,30 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build linux +// +build linux + +package ptp + +import ( + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// TestClockNow sanity checks that Clock.Now() sourced from the CLOCK_REALTIME +// device returns time close to timeutil.Now(). This ensures that the conversion +// from the time returned by a clock device to Go's time.Time is correct. +func TestClockNow(t *testing.T) { + if got, want := realtime().Now(), timeutil.Now(); want.Sub(got).Abs() > 10*time.Second { + t.Errorf("clock mismatch: got %v; timeutil says %v", got, want) + } +}