diff --git a/pkg/BUILD.bazel b/pkg/BUILD.bazel index ca3461b29dff..43065203844e 100644 --- a/pkg/BUILD.bazel +++ b/pkg/BUILD.bazel @@ -694,6 +694,7 @@ ALL_TESTS = [ "//pkg/util/timeofday:timeofday_test", "//pkg/util/timetz:timetz_test", "//pkg/util/timeutil/pgdate:pgdate_test", + "//pkg/util/timeutil/ptp:ptp_test", "//pkg/util/timeutil:timeutil_test", "//pkg/util/tochar:tochar_test", "//pkg/util/tracing/collector:collector_test", @@ -2416,6 +2417,7 @@ GO_TARGETS = [ "//pkg/util/timeutil/pgdate:pgdate", "//pkg/util/timeutil/pgdate:pgdate_test", "//pkg/util/timeutil/ptp:ptp", + "//pkg/util/timeutil/ptp:ptp_test", "//pkg/util/timeutil:timeutil", "//pkg/util/timeutil:timeutil_test", "//pkg/util/tochar:tochar", diff --git a/pkg/ccl/changefeedccl/BUILD.bazel b/pkg/ccl/changefeedccl/BUILD.bazel index 6b26b812349b..958dfe52ebb9 100644 --- a/pkg/ccl/changefeedccl/BUILD.bazel +++ b/pkg/ccl/changefeedccl/BUILD.bazel @@ -218,6 +218,7 @@ go_test( "//pkg/ccl/changefeedccl/changefeedbase", "//pkg/ccl/changefeedccl/changefeedpb", "//pkg/ccl/changefeedccl/kvevent", + "//pkg/ccl/changefeedccl/schemafeed/schematestutils", "//pkg/ccl/kvccl/kvtenantccl", "//pkg/ccl/multiregionccl", "//pkg/ccl/multiregionccl/multiregionccltestutils", @@ -254,7 +255,6 @@ go_test( "//pkg/sql/catalog/bootstrap", "//pkg/sql/catalog/catpb", "//pkg/sql/catalog/colinfo", - "//pkg/sql/catalog/descbuilder", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/desctestutils", "//pkg/sql/catalog/schemaexpr", @@ -279,7 +279,6 @@ go_test( "//pkg/sql/sessiondata", "//pkg/sql/sessiondatapb", "//pkg/sql/types", - "//pkg/storage", "//pkg/testutils", "//pkg/testutils/jobutils", "//pkg/testutils/serverutils", diff --git a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel index da98d1c85cf0..9fc078027ba0 100644 --- a/pkg/ccl/changefeedccl/cdctest/BUILD.bazel +++ b/pkg/ccl/changefeedccl/cdctest/BUILD.bazel @@ -31,7 +31,6 @@ go_library( "//pkg/util/fsm", "//pkg/util/hlc", "//pkg/util/log", - "//pkg/util/randutil", "//pkg/util/syncutil", "//pkg/util/timeutil", "@com_github_cockroachdb_errors//:errors", diff --git a/pkg/ccl/changefeedccl/cdctest/nemeses.go b/pkg/ccl/changefeedccl/cdctest/nemeses.go index 93b0e498693b..234a47ad3ef0 100644 --- a/pkg/ccl/changefeedccl/cdctest/nemeses.go +++ b/pkg/ccl/changefeedccl/cdctest/nemeses.go @@ -18,7 +18,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/util/fsm" "github.com/cockroachdb/cockroach/pkg/util/log" - "github.com/cockroachdb/cockroach/pkg/util/randutil" "github.com/cockroachdb/errors" ) @@ -30,7 +29,9 @@ import ( // duplicates, which the two cdctest.Validator implementations verify for the // real output of a changefeed. The output rows and resolved timestamps of the // tested feed are fed into them to check for anomalies. -func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, error) { +func RunNemesis( + f TestFeedFactory, db *gosql.DB, isSinkless bool, rng *rand.Rand, +) (Validator, error) { // possible additional nemeses: // - schema changes // - merges @@ -42,7 +43,6 @@ func RunNemesis(f TestFeedFactory, db *gosql.DB, isSinkless bool) (Validator, er // - sink chaos ctx := context.Background() - rng, _ := randutil.NewPseudoRand() eventPauseCount := 10 if isSinkless { diff --git a/pkg/ccl/changefeedccl/changefeed_test.go b/pkg/ccl/changefeedccl/changefeed_test.go index 759c559353b4..98f228e0bdd0 100644 --- a/pkg/ccl/changefeedccl/changefeed_test.go +++ b/pkg/ccl/changefeedccl/changefeed_test.go @@ -37,6 +37,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/cdctest" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/changefeedbase" "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/kvevent" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils" _ "github.com/cockroachdb/cockroach/pkg/ccl/kvccl/kvtenantccl" // multi-tenant tests _ "github.com/cockroachdb/cockroach/pkg/ccl/multiregionccl" // locality-related table mutations _ "github.com/cockroachdb/cockroach/pkg/ccl/partitionccl" @@ -55,8 +56,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/server/telemetry" "github.com/cockroachdb/cockroach/pkg/settings/cluster" "github.com/cockroachdb/cockroach/pkg/sql" - "github.com/cockroachdb/cockroach/pkg/sql/catalog" - "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" "github.com/cockroachdb/cockroach/pkg/sql/catalog/desctestutils" "github.com/cockroachdb/cockroach/pkg/sql/execinfra" "github.com/cockroachdb/cockroach/pkg/sql/execinfrapb" @@ -68,7 +67,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/sql/randgen" "github.com/cockroachdb/cockroach/pkg/sql/sem/eval" "github.com/cockroachdb/cockroach/pkg/sql/sem/tree" - "github.com/cockroachdb/cockroach/pkg/storage" "github.com/cockroachdb/cockroach/pkg/testutils" "github.com/cockroachdb/cockroach/pkg/testutils/jobutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" @@ -77,7 +75,6 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/testcluster" "github.com/cockroachdb/cockroach/pkg/util" "github.com/cockroachdb/cockroach/pkg/util/ctxgroup" - "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" @@ -2151,7 +2148,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { `add_column_def: [2]->{"after": {"a": 2}}`, }) sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`) - ts := fetchDescVersionModificationTime(t, s, `add_column_def`, 7) + ts := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `add_column_def`, 7) assertPayloads(t, addColumnDef, []string{ fmt.Sprintf(`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}`, ts.AsOfSystemTime()), @@ -2171,7 +2168,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { `add_col_comp: [2]->{"after": {"a": 2, "b": 7}}`, }) sqlDB.Exec(t, `ALTER TABLE add_col_comp ADD COLUMN c INT AS (a + 10) STORED`) - ts := fetchDescVersionModificationTime(t, s, `add_col_comp`, 7) + ts := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `add_col_comp`, 7) assertPayloads(t, addColComp, []string{ fmt.Sprintf(`add_col_comp: [1]->{"after": {"a": 1, "b": 6, "c": 11}, "updated": "%s"}`, ts.AsOfSystemTime()), @@ -2192,7 +2189,7 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { }) sqlDB.Exec(t, `ALTER TABLE drop_column DROP COLUMN b`) sqlDB.Exec(t, `INSERT INTO drop_column VALUES (3)`) - ts := fetchDescVersionModificationTime(t, s, `drop_column`, 2) + ts := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `drop_column`, 2) // Backfill for DROP COLUMN b. assertPayloads(t, dropColumn, []string{ @@ -2243,9 +2240,9 @@ func TestChangefeedSchemaChangeAllowBackfill(t *testing.T) { // version 2. Then, when adding column c, it goes from 9->17, with the schema change being visible at // the 7th step (version 15). Finally, when adding column d, it goes from 17->25 ith the schema change // being visible at the 7th step (version 23). - dropTS := fetchDescVersionModificationTime(t, s, `multiple_alters`, 2) - addTS := fetchDescVersionModificationTime(t, s, `multiple_alters`, 15) - addTS2 := fetchDescVersionModificationTime(t, s, `multiple_alters`, 23) + dropTS := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `multiple_alters`, 2) + addTS := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `multiple_alters`, 15) + addTS2 := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `multiple_alters`, 23) assertPayloads(t, multipleAlters, []string{ fmt.Sprintf(`multiple_alters: [1]->{"after": {"a": 1}, "updated": "%s"}`, dropTS.AsOfSystemTime()), @@ -2296,7 +2293,7 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { sqlDB.Exec(t, `ALTER TABLE add_column_def ADD COLUMN b STRING DEFAULT 'd'`) // The primary index swap occurs at version 7. - ts := fetchDescVersionModificationTime(t, s, `add_column_def`, 7) + ts := schematestutils.FetchDescVersionModificationTime(t, s.TestServer.Server, `d`, `public`, `add_column_def`, 7) assertPayloads(t, combinedFeed, []string{ fmt.Sprintf(`add_column_def: [1]->{"after": {"a": 1, "b": "d"}, "updated": "%s"}`, ts.AsOfSystemTime()), @@ -2319,82 +2316,6 @@ func TestChangefeedSchemaChangeBackfillScope(t *testing.T) { } } -// fetchDescVersionModificationTime fetches the `ModificationTime` of the specified -// `version` of `tableName`'s table descriptor. -func fetchDescVersionModificationTime( - t testing.TB, s TestServerWithSystem, tableName string, version int, -) hlc.Timestamp { - tblKey := s.Codec.TablePrefix(keys.DescriptorTableID) - header := kvpb.RequestHeader{ - Key: tblKey, - EndKey: tblKey.PrefixEnd(), - } - dropColTblID := sqlutils.QueryTableID(t, s.DB, `d`, "public", tableName) - req := &kvpb.ExportRequest{ - RequestHeader: header, - MVCCFilter: kvpb.MVCCFilter_All, - StartTime: hlc.Timestamp{}, - } - hh := kvpb.Header{Timestamp: hlc.NewClockForTesting(nil).Now()} - res, pErr := kv.SendWrappedWith(context.Background(), - s.SystemServer.DB().NonTransactionalSender(), hh, req) - if pErr != nil { - t.Fatal(pErr.GoError()) - } - for _, file := range res.(*kvpb.ExportResponse).Files { - it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, storage.IterOptions{ - KeyTypes: storage.IterKeyTypePointsAndRanges, - LowerBound: keys.MinKey, - UpperBound: keys.MaxKey, - }) - if err != nil { - t.Fatal(err) - } - defer it.Close() - for it.SeekGE(storage.NilKey); ; it.Next() { - if ok, err := it.Valid(); err != nil { - t.Fatal(err) - } else if !ok { - continue - } - k := it.UnsafeKey() - if _, hasRange := it.HasPointAndRange(); hasRange { - t.Fatalf("unexpected MVCC range key at %s", k) - } - remaining, _, _, err := s.Codec.DecodeIndexPrefix(k.Key) - if err != nil { - t.Fatal(err) - } - _, tableID, err := encoding.DecodeUvarintAscending(remaining) - if err != nil { - t.Fatal(err) - } - if tableID != uint64(dropColTblID) { - continue - } - unsafeValue, err := it.UnsafeValue() - require.NoError(t, err) - if unsafeValue == nil { - t.Fatal(errors.New(`value was dropped or truncated`)) - } - value := roachpb.Value{RawBytes: unsafeValue, Timestamp: k.Timestamp} - b, err := descbuilder.FromSerializedValue(&value) - if err != nil { - t.Fatal(err) - } - require.NotNil(t, b) - if b.DescriptorType() == catalog.Table { - tbl := b.BuildImmutable().(catalog.TableDescriptor) - if int(tbl.GetVersion()) == version { - return tbl.GetModificationTime() - } - } - } - } - t.Fatal(errors.New(`couldn't find table desc for given version`)) - return hlc.Timestamp{} -} - // Regression test for #34314 func TestChangefeedAfterSchemaChangeBackfill(t *testing.T) { defer leaktest.AfterTest(t)() diff --git a/pkg/ccl/changefeedccl/helpers_test.go b/pkg/ccl/changefeedccl/helpers_test.go index 1fb1aef33bc7..0cdea8ce55d9 100644 --- a/pkg/ccl/changefeedccl/helpers_test.go +++ b/pkg/ccl/changefeedccl/helpers_test.go @@ -73,13 +73,6 @@ func maybeDisableDeclarativeSchemaChangesForTest( return disable } -// disableDeclarativeSchemaChangesForTest tests that are disabled due to differences -// in changefeed behaviour and are tracked by issue #80545. -func disableDeclarativeSchemaChangesForTest(t testing.TB, sqlDB *sqlutils.SQLRunner) { - sqlDB.Exec(t, "SET use_declarative_schema_changer='off'") - sqlDB.Exec(t, "SET CLUSTER SETTING sql.defaults.use_declarative_schema_changer='off'") -} - func waitForSchemaChange( t testing.TB, sqlDB *sqlutils.SQLRunner, stmt string, arguments ...interface{}, ) { diff --git a/pkg/ccl/changefeedccl/nemeses_test.go b/pkg/ccl/changefeedccl/nemeses_test.go index 907bdd169b85..9302c522cf18 100644 --- a/pkg/ccl/changefeedccl/nemeses_test.go +++ b/pkg/ccl/changefeedccl/nemeses_test.go @@ -19,6 +19,7 @@ import ( "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" "github.com/cockroachdb/cockroach/pkg/util/log" + "github.com/cockroachdb/cockroach/pkg/util/randutil" ) func TestChangefeedNemeses(t *testing.T) { @@ -27,12 +28,15 @@ func TestChangefeedNemeses(t *testing.T) { skip.UnderRace(t, "takes >1 min under race") testFn := func(t *testing.T, s TestServer, f cdctest.TestFeedFactory) { + rng, seed := randutil.NewPseudoRand() + t.Logf("random seed: %d", seed) + sqlDB := sqlutils.MakeSQLRunner(s.DB) - disableDeclarativeSchemaChangesForTest(t, sqlDB) + _ = maybeDisableDeclarativeSchemaChangesForTest(t, sqlDB, rng) // TODO(dan): Ugly hack to disable `eventPause` in sinkless feeds. See comment in // `RunNemesis` for details. isSinkless := strings.Contains(t.Name(), "sinkless") - v, err := cdctest.RunNemesis(f, s.DB, isSinkless) + v, err := cdctest.RunNemesis(f, s.DB, isSinkless, rng) if err != nil { t.Fatalf("%+v", err) } diff --git a/pkg/ccl/changefeedccl/schemafeed/schematestutils/BUILD.bazel b/pkg/ccl/changefeedccl/schemafeed/schematestutils/BUILD.bazel index 7832057de020..0856e8378683 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schematestutils/BUILD.bazel +++ b/pkg/ccl/changefeedccl/schemafeed/schematestutils/BUILD.bazel @@ -6,12 +6,23 @@ go_library( importpath = "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils", visibility = ["//visibility:public"], deps = [ + "//pkg/keys", + "//pkg/kv", + "//pkg/kv/kvpb", + "//pkg/roachpb", "//pkg/sql/catalog", "//pkg/sql/catalog/catpb", + "//pkg/sql/catalog/descbuilder", "//pkg/sql/catalog/descpb", "//pkg/sql/catalog/tabledesc", "//pkg/sql/types", + "//pkg/storage", + "//pkg/testutils/serverutils", + "//pkg/testutils/sqlutils", + "//pkg/util/encoding", "//pkg/util/hlc", + "@com_github_cockroachdb_errors//:errors", "@com_github_gogo_protobuf//proto", + "@com_github_stretchr_testify//require", ], ) diff --git a/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go b/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go index b76dd3e22cf1..fce08e9365b6 100644 --- a/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go +++ b/pkg/ccl/changefeedccl/schemafeed/schematestutils/schema_test_utils.go @@ -11,15 +11,28 @@ package schematestutils import ( + "context" "strconv" + "testing" + "github.com/cockroachdb/cockroach/pkg/keys" + "github.com/cockroachdb/cockroach/pkg/kv" + "github.com/cockroachdb/cockroach/pkg/kv/kvpb" + "github.com/cockroachdb/cockroach/pkg/roachpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog" "github.com/cockroachdb/cockroach/pkg/sql/catalog/catpb" + "github.com/cockroachdb/cockroach/pkg/sql/catalog/descbuilder" "github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb" "github.com/cockroachdb/cockroach/pkg/sql/catalog/tabledesc" "github.com/cockroachdb/cockroach/pkg/sql/types" + "github.com/cockroachdb/cockroach/pkg/storage" + "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" + "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" + "github.com/cockroachdb/cockroach/pkg/util/encoding" "github.com/cockroachdb/cockroach/pkg/util/hlc" + "github.com/cockroachdb/errors" "github.com/gogo/protobuf/proto" + "github.com/stretchr/testify/require" ) // MakeTableDesc makes a generic table descriptor with the provided properties. @@ -124,3 +137,87 @@ func AddDropIndexMutation(desc catalog.TableDescriptor) catalog.TableDescriptor }) return tabledesc.NewBuilder(desc.TableDesc()).BuildImmutableTable() } + +// FetchDescVersionModificationTime fetches the `ModificationTime` of the +// specified `version` of `tableName`'s table descriptor. +func FetchDescVersionModificationTime( + t testing.TB, + s serverutils.ApplicationLayerInterface, + dbName string, + schemaName string, + tableName string, + version int, +) hlc.Timestamp { + db := serverutils.OpenDBConn( + t, s.SQLAddr(), dbName, false, s.Stopper()) + + tblKey := s.Codec().TablePrefix(keys.DescriptorTableID) + header := kvpb.RequestHeader{ + Key: tblKey, + EndKey: tblKey.PrefixEnd(), + } + dropColTblID := sqlutils.QueryTableID(t, db, dbName, schemaName, tableName) + req := &kvpb.ExportRequest{ + RequestHeader: header, + MVCCFilter: kvpb.MVCCFilter_All, + StartTime: hlc.Timestamp{}, + } + hh := kvpb.Header{Timestamp: hlc.NewClockForTesting(nil).Now()} + res, pErr := kv.SendWrappedWith(context.Background(), + s.DB().NonTransactionalSender(), hh, req) + if pErr != nil { + t.Fatal(pErr.GoError()) + } + for _, file := range res.(*kvpb.ExportResponse).Files { + it, err := storage.NewMemSSTIterator(file.SST, false /* verify */, storage.IterOptions{ + KeyTypes: storage.IterKeyTypePointsAndRanges, + LowerBound: keys.MinKey, + UpperBound: keys.MaxKey, + }) + if err != nil { + t.Fatal(err) + } + defer it.Close() + for it.SeekGE(storage.NilKey); ; it.Next() { + if ok, err := it.Valid(); err != nil { + t.Fatal(err) + } else if !ok { + continue + } + k := it.UnsafeKey() + if _, hasRange := it.HasPointAndRange(); hasRange { + t.Fatalf("unexpected MVCC range key at %s", k) + } + remaining, _, _, err := s.Codec().DecodeIndexPrefix(k.Key) + if err != nil { + t.Fatal(err) + } + _, tableID, err := encoding.DecodeUvarintAscending(remaining) + if err != nil { + t.Fatal(err) + } + if tableID != uint64(dropColTblID) { + continue + } + unsafeValue, err := it.UnsafeValue() + require.NoError(t, err) + if unsafeValue == nil { + t.Fatal(errors.New(`value was dropped or truncated`)) + } + value := roachpb.Value{RawBytes: unsafeValue, Timestamp: k.Timestamp} + b, err := descbuilder.FromSerializedValue(&value) + if err != nil { + t.Fatal(err) + } + require.NotNil(t, b) + if b.DescriptorType() == catalog.Table { + tbl := b.BuildImmutable().(catalog.TableDescriptor) + if int(tbl.GetVersion()) == version { + return tbl.GetModificationTime() + } + } + } + } + t.Fatal(errors.New(`couldn't find table desc for given version`)) + return hlc.Timestamp{} +} diff --git a/pkg/ccl/sqlproxyccl/acl/cidr_ranges.go b/pkg/ccl/sqlproxyccl/acl/cidr_ranges.go index 69c8fe573e80..073172486f5b 100644 --- a/pkg/ccl/sqlproxyccl/acl/cidr_ranges.go +++ b/pkg/ccl/sqlproxyccl/acl/cidr_ranges.go @@ -28,16 +28,16 @@ var _ AccessController = &CIDRRanges{} // CheckConnection implements the AccessController interface. func (p *CIDRRanges) CheckConnection(ctx context.Context, conn ConnectionTags) error { - tenantObj, err := p.LookupTenantFn(ctx, conn.TenantID) - if err != nil { - return err - } - // Private connections. This ACL is only responsible for public CIDR ranges. if conn.EndpointID != "" { return nil } + tenantObj, err := p.LookupTenantFn(ctx, conn.TenantID) + if err != nil { + return err + } + // Cluster allows public connections, so we'll check allowed CIDR ranges. if tenantObj.AllowPublicConn() { ip := net.ParseIP(conn.IP) diff --git a/pkg/ccl/sqlproxyccl/acl/private_endpoints.go b/pkg/ccl/sqlproxyccl/acl/private_endpoints.go index 7e4e1fe55b0a..8665e26544cd 100644 --- a/pkg/ccl/sqlproxyccl/acl/private_endpoints.go +++ b/pkg/ccl/sqlproxyccl/acl/private_endpoints.go @@ -37,16 +37,16 @@ var _ AccessController = &PrivateEndpoints{} // CheckConnection implements the AccessController interface. func (p *PrivateEndpoints) CheckConnection(ctx context.Context, conn ConnectionTags) error { - tenantObj, err := p.LookupTenantFn(ctx, conn.TenantID) - if err != nil { - return err - } - // Public connections. This ACL is only responsible for private endpoints. if conn.EndpointID == "" { return nil } + tenantObj, err := p.LookupTenantFn(ctx, conn.TenantID) + if err != nil { + return err + } + // Cluster allows private connections, so we'll check allowed endpoints. if tenantObj.AllowPrivateConn() { for _, endpoints := range tenantObj.AllowedPrivateEndpoints { diff --git a/pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go b/pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go index d986b558a13b..888528b3ecd2 100644 --- a/pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go +++ b/pkg/ccl/sqlproxyccl/acl/private_endpoints_test.go @@ -41,7 +41,7 @@ func TestPrivateEndpoints(t *testing.T) { return nil, errors.New("foo") }, } - err := p.CheckConnection(ctx, makeConn("")) + err := p.CheckConnection(ctx, makeConn("foo")) require.EqualError(t, err, "foo") }) diff --git a/pkg/ccl/sqlproxyccl/acl/watcher.go b/pkg/ccl/sqlproxyccl/acl/watcher.go index f541163f9f48..c3bc9a6e3eeb 100644 --- a/pkg/ccl/sqlproxyccl/acl/watcher.go +++ b/pkg/ccl/sqlproxyccl/acl/watcher.go @@ -244,42 +244,45 @@ func (w *Watcher) updateAccessController( checkListeners(ctx, copy, controllers) } -// ListenForDenied Adds a listener to the watcher for the given connection. If the -// connection is already blocked a nil remove function is returned and an error -// is returned immediately. +// ListenForDenied Adds a listener to the watcher for the given connection. If +// the connection is already blocked a nil remove function is returned and an +// error is returned immediately. // // Example Usage: // -// remove, err := w.ListenForDenied(ctx, connection, func(err error) { +// removeFn, err := w.ListenForDenied(ctx, connection, func(err error) { // /* connection was blocked by change */ // }) // -// if err != nil { /*connection already blocked*/ } -// defer remove() +// if err != nil { /*connection already blocked*/ } +// defer removeFn() // -// Warning: -// Do not call remove() within the error callback. It would deadlock. +// WARNING: Do not call removeFn() within the error callback. It would deadlock. func (w *Watcher) ListenForDenied( ctx context.Context, connection ConnectionTags, callback func(error), ) (func(), error) { - w.mu.Lock() - defer w.mu.Unlock() + lst, controllers := func() (*listener, []AccessController) { + w.mu.Lock() + defer w.mu.Unlock() - if err := checkConnection(ctx, connection, w.controllers); err != nil { - return nil, err - } + id := w.nextID + w.nextID++ - id := w.nextID - w.nextID++ + l := &listener{ + id: id, + connection: connection, + } + l.mu.denied = callback - l := &listener{ - id: id, - connection: connection, - } - l.mu.denied = callback + w.listeners.ReplaceOrInsert(l) - w.listeners.ReplaceOrInsert(l) - return func() { w.removeListener(l) }, nil + return l, w.controllers + }() + if err := checkConnection(ctx, connection, controllers); err != nil { + w.removeListener(lst) + return nil, err + } + return func() { w.removeListener(lst) }, nil } func (w *Watcher) removeListener(l *listener) { @@ -315,6 +318,8 @@ func checkListeners(ctx context.Context, listeners *btree.BTree, controllers []A }) } +// NOTE: checkConnection may grab a lock, and could be blocked waiting for it, +// so callers should not hold a global lock while calling this. func checkConnection( ctx context.Context, connection ConnectionTags, controllers []AccessController, ) error { diff --git a/pkg/rpc/context.go b/pkg/rpc/context.go index df0986cd54e2..e18cf789bacb 100644 --- a/pkg/rpc/context.go +++ b/pkg/rpc/context.go @@ -213,9 +213,6 @@ type Context struct { RemoteClocks *RemoteClockMonitor MasterCtx context.Context // cancel on stopper quiesce - heartbeatInterval time.Duration - heartbeatTimeout time.Duration - rpcCompression bool localInternalClient RestrictedInternalClient @@ -531,13 +528,11 @@ func NewContext(ctx context.Context, opts ContextOptions) *Context { secCtx.useNodeAuth = opts.UseNodeAuth rpcCtx := &Context{ - ContextOptions: opts, - SecurityContext: secCtx, - rpcCompression: enableRPCCompression, - MasterCtx: masterCtx, - metrics: makeMetrics(), - heartbeatInterval: opts.RPCHeartbeatInterval, - heartbeatTimeout: opts.RPCHeartbeatTimeout, + ContextOptions: opts, + SecurityContext: secCtx, + rpcCompression: enableRPCCompression, + MasterCtx: masterCtx, + metrics: makeMetrics(), } rpcCtx.dialbackMu.Lock() diff --git a/pkg/rpc/context_test.go b/pkg/rpc/context_test.go index 6bce3dbb7bd8..ee69efc21c6f 100644 --- a/pkg/rpc/context_test.go +++ b/pkg/rpc/context_test.go @@ -876,9 +876,6 @@ func TestOffsetMeasurement(t *testing.T) { clientClock := &AdvancingClock{time: timeutil.Unix(0, 10)} clientMaxOffset := time.Duration(0) clientCtx := newTestContext(clusterID, clientClock, clientMaxOffset, stopper) - // Make the interval shorter to speed up the test. - clientCtx.RPCHeartbeatInterval = 1 * time.Millisecond - clientCtx.RPCHeartbeatTimeout = 1 * time.Millisecond clientCtx.RemoteClocks.offsetTTL = 5 * clientClock.getAdvancementInterval() if _, err := clientCtx.GRPCDialNode(remoteAddr, serverNodeID, DefaultClass).Connect(ctx); err != nil { t.Fatal(err) @@ -952,7 +949,7 @@ func TestFailedOffsetMeasurement(t *testing.T) { clientCtx := newTestContext(clusterID, clock, maxOffset, stopper) // Remove the timeout so that failure arises from exceeding the maximum // clock reading delay, not the timeout. - clientCtx.heartbeatTimeout = 0 + clientCtx.RPCHeartbeatTimeout = 0 // Allow two heartbeat for initialization. The first ping doesn't report an offset, // the second one thus doesn't have an offset to work with, so it's only on the // third one that's fully configured. @@ -1022,9 +1019,6 @@ func TestLatencyInfoCleanupOnClosedConnection(t *testing.T) { clientClock := &AdvancingClock{time: timeutil.Unix(0, 10)} clientMaxOffset := time.Duration(0) clientCtx := newTestContext(clusterID, clientClock, clientMaxOffset, stopper) - // Make the interval shorter to speed up the test. - clientCtx.RPCHeartbeatInterval = 1 * time.Millisecond - clientCtx.RPCHeartbeatTimeout = 1 * time.Millisecond var hbDecommission atomic.Value hbDecommission.Store(false) diff --git a/pkg/rpc/datadriven_test.go b/pkg/rpc/datadriven_test.go index 4ebfbdc201da..70d5c01cc343 100644 --- a/pkg/rpc/datadriven_test.go +++ b/pkg/rpc/datadriven_test.go @@ -148,7 +148,6 @@ func setupEnv(t *testing.T) *ddEnv { maxOffset := time.Duration(250) clientCtx := newTestContext(clusterID, clock, maxOffset, stopper) - clientCtx.heartbeatInterval = 10 * time.Millisecond env := &ddEnv{ clusterID: clusterID, diff --git a/pkg/rpc/peer.go b/pkg/rpc/peer.go index 5ff1372af218..e137d8ba61c4 100644 --- a/pkg/rpc/peer.go +++ b/pkg/rpc/peer.go @@ -184,8 +184,8 @@ func (rpcCtx *Context) newPeer(k peerKey) *peer { dial: func(ctx context.Context, target string, class ConnectionClass) (*grpc.ClientConn, error) { return rpcCtx.grpcDialRaw(ctx, target, class, rpcCtx.testingDialOpts...) }, - heartbeatInterval: rpcCtx.heartbeatInterval, - heartbeatTimeout: rpcCtx.heartbeatTimeout, + heartbeatInterval: rpcCtx.RPCHeartbeatInterval, + heartbeatTimeout: rpcCtx.RPCHeartbeatTimeout, } var b *circuit.Breaker diff --git a/pkg/sql/BUILD.bazel b/pkg/sql/BUILD.bazel index 4092dedd4ca3..588a83494a33 100644 --- a/pkg/sql/BUILD.bazel +++ b/pkg/sql/BUILD.bazel @@ -733,6 +733,7 @@ go_test( "//pkg/base", "//pkg/build/bazel", "//pkg/ccl", + "//pkg/ccl/changefeedccl/schemafeed/schematestutils", "//pkg/cloud/impl:cloudimpl", "//pkg/clusterversion", "//pkg/col/coldata", diff --git a/pkg/sql/schema_changer_test.go b/pkg/sql/schema_changer_test.go index 1a5637485530..f445ee3990c8 100644 --- a/pkg/sql/schema_changer_test.go +++ b/pkg/sql/schema_changer_test.go @@ -4423,18 +4423,14 @@ ALTER TABLE t.test ADD COLUMN c INT AS (v + 4) STORED, ADD COLUMN d INT DEFAULT func TestCancelSchemaChange(t *testing.T) { defer leaktest.AfterTest(t)() defer log.Scope(t).Close(t) - skip.WithIssue(t, 51796) const ( - numNodes = 3 maxValue = 100 ) var sqlDB *sqlutils.SQLRunner - var db *gosql.DB params, _ := createTestServerParams() doCancel := false - var enableAsyncSchemaChanges uint32 params.Knobs = base.TestingKnobs{ SQLSchemaChanger: &sql.SchemaChangerTestingKnobs{ WriteCheckpointInterval: time.Nanosecond, // checkpoint after every chunk. @@ -4445,52 +4441,41 @@ func TestCancelSchemaChange(t *testing.T) { if !doCancel { return nil } - if _, err := db.Exec(`CANCEL JOB ( + sqlDB.Exec(t, `CANCEL JOB ( SELECT job_id FROM [SHOW JOBS] WHERE job_type = 'SCHEMA CHANGE' AND status = $1 AND description NOT LIKE 'ROLL BACK%' - )`, jobs.StatusRunning); err != nil { - panic(err) - } + )`, jobs.StatusRunning) return nil }, }, } - tc := serverutils.StartNewTestCluster(t, numNodes, base.TestClusterArgs{ - ReplicationMode: base.ReplicationManual, - ServerArgs: params, - }) - defer tc.Stopper().Stop(context.Background()) - db = tc.ServerConn(0) - kvDB := tc.Server(0).DB() - codec := tc.Server(0).ApplicationLayer().Codec() + server, db, kvDB := serverutils.StartServer(t, params) sqlDB = sqlutils.MakeSQLRunner(db) + defer server.Stopper().Stop(context.Background()) + codec := server.ApplicationLayer().Codec() // Disable strict GC TTL enforcement because we're going to shove a zero-value // TTL into the system with AddImmediateGCZoneConfig. defer sqltestutils.DisableGCTTLStrictEnforcement(t, db)() sqlDB.Exec(t, ` + SET use_declarative_schema_changer = 'off'; CREATE DATABASE t; CREATE TABLE t.test (k INT PRIMARY KEY, v INT, pi DECIMAL DEFAULT (DECIMAL '3.14')); `) + sqlDB.Exec(t, `SET CLUSTER SETTING jobs.registry.interval.adopt = '1s';`) + // Bulk insert. if err := sqltestutils.BulkInsertIntoTable(db, maxValue); err != nil { t.Fatal(err) } tableDesc := desctestutils.TestingGetPublicTableDescriptor(kvDB, codec, "t", "test") - // Split the table into multiple ranges. - var sps []serverutils.SplitPoint - const numSplits = numNodes * 2 - for i := 1; i <= numSplits-1; i++ { - sps = append(sps, serverutils.SplitPoint{TargetNodeIdx: i % numNodes, Vals: []interface{}{maxValue / numSplits * i}}) - } - tc.SplitTable(t, tableDesc, sps) ctx := context.Background() if err := sqltestutils.CheckTableKeyCount(ctx, kvDB, codec, 1, maxValue); err != nil { @@ -4504,49 +4489,48 @@ func TestCancelSchemaChange(t *testing.T) { // Set to true if the rollback returns in a running, waiting status. isGC bool }{ - {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.4::DECIMAL CREATE FAMILY f2, ADD CHECK (x >= 0)`, + {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.4 CREATE FAMILY f2, ADD CHECK (x >= 0)`, true, false}, {`CREATE INDEX foo ON t.public.test (v)`, true, true}, - {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.2::DECIMAL CREATE FAMILY f3, ADD CHECK (x >= 0)`, + {`ALTER TABLE t.public.test ADD COLUMN x DECIMAL DEFAULT 1.2 CREATE FAMILY f3, ADD CHECK (x >= 0)`, false, false}, {`CREATE INDEX foo ON t.public.test (v)`, false, true}, } idx := 0 + gcIdx := 0 for _, tc := range testCases { doCancel = tc.cancel if doCancel { if _, err := db.Exec(tc.sql); !testutils.IsError(err, "job canceled") { t.Fatalf("unexpected %v", err) } - if err := jobutils.VerifySystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusCanceled, jobs.Record{ - Username: username.RootUserName(), - Description: tc.sql, - DescriptorIDs: descpb.IDs{ - tableDesc.GetID(), - }, - }); err != nil { - t.Fatal(err) - } - jobID := jobutils.GetJobID(t, sqlDB, idx) - idx++ + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifySystemJob( + t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusCanceled, + jobs.Record{ + Username: username.RootUserName(), + Description: tc.sql, + DescriptorIDs: descpb.IDs{ + tableDesc.GetID(), + }, + }, + ) + }) jobRecord := jobs.Record{ Username: username.RootUserName(), - Description: fmt.Sprintf("ROLL BACK JOB %d: %s", jobID, tc.sql), + Description: fmt.Sprintf("GC for ROLLBACK of %s", tc.sql), DescriptorIDs: descpb.IDs{ tableDesc.GetID(), }, } - var err error if tc.isGC { - err = jobutils.VerifyRunningSystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, sql.RunningStatusWaitingGC, jobRecord) - } else { - err = jobutils.VerifySystemJob(t, sqlDB, idx, jobspb.TypeSchemaChange, jobs.StatusSucceeded, jobRecord) - } - if err != nil { - t.Fatal(err) + testutils.SucceedsSoon(t, func() error { + return jobutils.VerifyRunningSystemJob(t, sqlDB, gcIdx*2, jobspb.TypeSchemaChangeGC, sql.RunningStatusWaitingForMVCCGC, jobRecord) + }) + gcIdx++ } } else { sqlDB.Exec(t, tc.sql) @@ -4565,7 +4549,7 @@ func TestCancelSchemaChange(t *testing.T) { // Verify that the index foo over v is consistent, and that column x has // been backfilled properly. - rows, err := db.Query(`SELECT v, x from t.test@foo`) + rows, err := db.Query(`SELECT v, x from t.test@foo ORDER BY v`) if err != nil { t.Fatal(err) } @@ -4595,7 +4579,6 @@ func TestCancelSchemaChange(t *testing.T) { } // Verify that the data from the canceled CREATE INDEX is cleaned up. - atomic.StoreUint32(&enableAsyncSchemaChanges, 1) // TODO (lucy): when this test is no longer canceled, have it correctly handle doing GC immediately if _, err := sqltestutils.AddImmediateGCZoneConfig(db, tableDesc.GetID()); err != nil { t.Fatal(err) diff --git a/pkg/sql/show_fingerprints.go b/pkg/sql/show_fingerprints.go index acff3d23f80a..780418ad2109 100644 --- a/pkg/sql/show_fingerprints.go +++ b/pkg/sql/show_fingerprints.go @@ -66,7 +66,7 @@ func (p *planner) ShowFingerprints( return &showFingerprintsNode{ tableDesc: tableDesc, - indexes: tableDesc.NonDropIndexes(), + indexes: tableDesc.ActiveIndexes(), }, nil } diff --git a/pkg/sql/show_fingerprints_test.go b/pkg/sql/show_fingerprints_test.go index c3f8cf1e899b..ec307eba8f61 100644 --- a/pkg/sql/show_fingerprints_test.go +++ b/pkg/sql/show_fingerprints_test.go @@ -17,6 +17,7 @@ import ( "testing" "github.com/cockroachdb/cockroach/pkg/base" + "github.com/cockroachdb/cockroach/pkg/ccl/changefeedccl/schemafeed/schematestutils" "github.com/cockroachdb/cockroach/pkg/testutils/serverutils" "github.com/cockroachdb/cockroach/pkg/testutils/sqlutils" "github.com/cockroachdb/cockroach/pkg/util/leaktest" @@ -86,3 +87,36 @@ func TestShowFingerprintsColumnNames(t *testing.T) { t.Errorf("expected different fingerprints: %v vs %v", fprint1, fprint2) } } + +// TestShowFingerprintsDuringSchemaChange is a regression test that asserts that +// fingerprinting does not fail when done in the middle of a schema change using +// an AOST query. In the middle of a schema change such as `ADD COLUMN ... +// DEFAULT`, there may be non-public indexes in the descriptor. Prior to the +// change which introduced this test, fingerprinting would attempt to read these +// non-public indexes and fail. +func TestShowFingerprintsDuringSchemaChange(t *testing.T) { + defer leaktest.AfterTest(t)() + defer log.Scope(t).Close(t) + + ctx := context.Background() + s, db, _ := serverutils.StartServer(t, base.TestServerArgs{}) + defer s.Stopper().Stop(ctx) + + sqlDB := sqlutils.MakeSQLRunner(db) + sqlDB.Exec(t, `CREATE DATABASE d`) + sqlDB.Exec(t, `USE d`) + sqlDB.Exec(t, `CREATE TABLE foo ( + a INT PRIMARY KEY, + b INT + )`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (0, 0)`) + sqlDB.Exec(t, `INSERT INTO foo VALUES (1, 1)`) + + // At version 5, there are non-public indexes. + sqlDB.Exec(t, `ALTER TABLE foo ADD COLUMN c INT DEFAULT -1`) + ts := schematestutils.FetchDescVersionModificationTime( + t, s, "d", "public", "foo", 5) + sqlDB.Exec(t, fmt.Sprintf( + `SELECT * FROM [SHOW EXPERIMENTAL_FINGERPRINTS FROM TABLE foo] AS OF SYSTEM TIME %s`, + ts.AsOfSystemTime())) +} diff --git a/pkg/storage/pebble.go b/pkg/storage/pebble.go index be726c051a5c..63705f2264d1 100644 --- a/pkg/storage/pebble.go +++ b/pkg/storage/pebble.go @@ -1148,6 +1148,13 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { opts.EventListener = &el p.wrappedIntentWriter = wrapIntentWriter(p) + // If both cfg.SharedStorage and cfg.RemoteStorageFactory are set, CRDB uses + // cfg.SharedStorage. Note that eventually we will enable using both at the + // same time, but we don't have the right abstractions in place to do that + // today. + // + // We prefer cfg.SharedStorage, since the Locator -> Storage mapping contained + // in it is needed for CRDB to function properly. if cfg.SharedStorage != nil { esWrapper := &externalStorageWrapper{p: p, es: cfg.SharedStorage, ctx: ctx} opts.Experimental.RemoteStorage = remote.MakeSimpleFactory(map[remote.Locator]remote.Storage{ @@ -1155,10 +1162,10 @@ func NewPebble(ctx context.Context, cfg PebbleConfig) (p *Pebble, err error) { }) opts.Experimental.CreateOnShared = true opts.Experimental.CreateOnSharedLocator = "" - } - - if cfg.RemoteStorageFactory != nil { - opts.Experimental.RemoteStorage = remoteStorageAdaptor{p: p, ctx: ctx, factory: cfg.RemoteStorageFactory} + } else { + if cfg.RemoteStorageFactory != nil { + opts.Experimental.RemoteStorage = remoteStorageAdaptor{p: p, ctx: ctx, factory: cfg.RemoteStorageFactory} + } } // Read the current store cluster version. diff --git a/pkg/util/timeutil/ptp/BUILD.bazel b/pkg/util/timeutil/ptp/BUILD.bazel index 76eb9d5a1e9a..c1505a597fc1 100644 --- a/pkg/util/timeutil/ptp/BUILD.bazel +++ b/pkg/util/timeutil/ptp/BUILD.bazel @@ -1,4 +1,4 @@ -load("@io_bazel_rules_go//go:def.bzl", "go_library") +load("@io_bazel_rules_go//go:def.bzl", "go_library", "go_test") go_library( name = "ptp", @@ -59,3 +59,19 @@ go_library( "//conditions:default": [], }), ) + +go_test( + name = "ptp_test", + srcs = ["ptp_clock_linux_test.go"], + args = ["-test.timeout=295s"], + embed = [":ptp"], + deps = select({ + "@io_bazel_rules_go//go/platform:android": [ + "//pkg/util/timeutil", + ], + "@io_bazel_rules_go//go/platform:linux": [ + "//pkg/util/timeutil", + ], + "//conditions:default": [], + }), +) diff --git a/pkg/util/timeutil/ptp/ptp_clock_linux.go b/pkg/util/timeutil/ptp/ptp_clock_linux.go index 3f2a880a28b5..4bbbcea6782f 100644 --- a/pkg/util/timeutil/ptp/ptp_clock_linux.go +++ b/pkg/util/timeutil/ptp/ptp_clock_linux.go @@ -79,5 +79,10 @@ func (p Clock) Now() time.Time { panic(err) } - return timeutil.Unix(int64(ts.tv_sec)*1e9, int64(ts.tv_nsec)) + return timeutil.Unix(int64(ts.tv_sec), int64(ts.tv_nsec)) +} + +// realtime returns a clock using the system CLOCK_REALTIME device. For testing. +func realtime() Clock { + return Clock{clockDeviceID: uintptr(C.CLOCK_REALTIME)} } diff --git a/pkg/util/timeutil/ptp/ptp_clock_linux_test.go b/pkg/util/timeutil/ptp/ptp_clock_linux_test.go new file mode 100644 index 000000000000..01e915bf9983 --- /dev/null +++ b/pkg/util/timeutil/ptp/ptp_clock_linux_test.go @@ -0,0 +1,30 @@ +// Copyright 2023 The Cockroach Authors. +// +// Use of this software is governed by the Business Source License +// included in the file licenses/BSL.txt. +// +// As of the Change Date specified in that file, in accordance with +// the Business Source License, use of this software will be governed +// by the Apache License, Version 2.0, included in the file +// licenses/APL.txt. + +//go:build linux +// +build linux + +package ptp + +import ( + "testing" + "time" + + "github.com/cockroachdb/cockroach/pkg/util/timeutil" +) + +// TestClockNow sanity checks that Clock.Now() sourced from the CLOCK_REALTIME +// device returns time close to timeutil.Now(). This ensures that the conversion +// from the time returned by a clock device to Go's time.Time is correct. +func TestClockNow(t *testing.T) { + if got, want := realtime().Now(), timeutil.Now(); want.Sub(got).Abs() > 10*time.Second { + t.Errorf("clock mismatch: got %v; timeutil says %v", got, want) + } +}