diff --git a/CHANGELOG.md b/CHANGELOG.md index 9d2460be0..09a34406b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,3 +1,9 @@ +### v0.12.0 + +## Upgrades notes. + +* Replication slots declared in the clusterspec `additionalMasterReplicationSlots` option will now be prefixed with the `stolon_` string to let users be able to manually create/drop custom replication slots (they shouldn't start with `stolon_`). Users of these feature should upgrade all the references to these replication slots adding the `stolon_` prefix. + ### v0.11.0 #### New features diff --git a/cmd/keeper/cmd/keeper.go b/cmd/keeper/cmd/keeper.go index 277062e7b..a3786f81b 100644 --- a/cmd/keeper/cmd/keeper.go +++ b/cmd/keeper/cmd/keeper.go @@ -886,60 +886,39 @@ func (p *PostgresKeeper) isDifferentTimelineBranch(followedDB *cluster.DB, pgSta return false } -func (p *PostgresKeeper) updateReplSlots(curReplSlots []string, uid string, followersUIDs []string) error { - // Drop internal replication slots - for _, slot := range curReplSlots { - if !common.IsStolonName(slot) { - continue - } - if !util.StringInSlice(followersUIDs, common.NameFromStolonName(slot)) { - log.Infow("dropping replication slot since db not marked as follower", "slot", slot, "db", common.NameFromStolonName(slot)) - if err := p.pgm.DropReplicationSlot(slot); err != nil { - log.Errorw("failed to drop replication slot", "slot", slot, "err", err) - // don't return the error but continue also if drop failed (standby still connected) - } - } - } - // Create internal replication slots +func (p *PostgresKeeper) updateReplSlots(curReplSlots []string, uid string, followersUIDs, additionalReplSlots []string) error { + internalReplSlots := map[string]struct{}{} + + // Create a list of the wanted internal replication slots for _, followerUID := range followersUIDs { if followerUID == uid { continue } - replSlot := common.StolonName(followerUID) - if !util.StringInSlice(curReplSlots, replSlot) { - log.Infow("creating replication slot", "slot", replSlot, "db", followerUID) - if err := p.pgm.CreateReplicationSlot(replSlot); err != nil { - log.Errorw("failed to create replication slot", "slot", replSlot, zap.Error(err)) - return err - } - } + internalReplSlots[common.StolonName(followerUID)] = struct{}{} } - return nil -} -func (p *PostgresKeeper) updateAdditionalReplSlots(curReplSlots []string, additionalReplSlots []string) error { - // detect not stolon replication slots - notStolonSlots := []string{} - for _, curReplSlot := range curReplSlots { - if !common.IsStolonName(curReplSlot) { - notStolonSlots = append(notStolonSlots, curReplSlot) - } + // Add AdditionalReplicationSlots + for _, slot := range additionalReplSlots { + internalReplSlots[common.StolonName(slot)] = struct{}{} } - // drop unnecessary slots - for _, slot := range notStolonSlots { - if !util.StringInSlice(additionalReplSlots, slot) { + // Drop internal replication slots + for _, slot := range curReplSlots { + if !common.IsStolonName(slot) { + continue + } + if _, ok := internalReplSlots[slot]; !ok { log.Infow("dropping replication slot", "slot", slot) if err := p.pgm.DropReplicationSlot(slot); err != nil { - log.Errorw("failed to drop replication slot", "slot", slot, zap.Error(err)) - return err + log.Errorw("failed to drop replication slot", "slot", slot, "err", err) + // don't return the error but continue also if drop failed (standby still connected) } } } - // create required slots - for _, slot := range additionalReplSlots { - if !util.StringInSlice(notStolonSlots, slot) { + // Create internal replication slots + for slot := range internalReplSlots { + if !util.StringInSlice(curReplSlots, slot) { log.Infow("creating replication slot", "slot", slot) if err := p.pgm.CreateReplicationSlot(slot); err != nil { log.Errorw("failed to create replication slot", "slot", slot, zap.Error(err)) @@ -947,7 +926,6 @@ func (p *PostgresKeeper) updateAdditionalReplSlots(curReplSlots []string, additi } } } - return nil } @@ -961,14 +939,10 @@ func (p *PostgresKeeper) refreshReplicationSlots(cd *cluster.ClusterData, db *cl followersUIDs := db.Spec.Followers - if err = p.updateReplSlots(currentReplicationSlots, db.UID, followersUIDs); err != nil { + if err = p.updateReplSlots(currentReplicationSlots, db.UID, followersUIDs, db.Spec.AdditionalReplicationSlots); err != nil { log.Errorw("error updating replication slots", zap.Error(err)) return err } - if err = p.updateAdditionalReplSlots(currentReplicationSlots, db.Spec.AdditionalReplicationSlots); err != nil { - log.Errorw("error updating additional replication slots", zap.Error(err)) - return err - } return nil } @@ -1449,7 +1423,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Infow("already master") } - if err = p.refreshReplicationSlots(cd, db); err != nil { + if err := p.refreshReplicationSlots(cd, db); err != nil { log.Errorw("error updating replication slots", zap.Error(err)) return } @@ -1499,7 +1473,8 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { // TODO(sgotti) Check that the followed instance has all the needed WAL segments // Update our primary_conninfo if replConnString changed - if db.Spec.FollowConfig.Type == cluster.FollowTypeInternal { + switch db.Spec.FollowConfig.Type { + case cluster.FollowTypeInternal: var curReplConnParams postgresql.ConnParams curReplConnParams, err = pgm.GetPrimaryConninfo() @@ -1531,13 +1506,11 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { } } - // TODO(sgotti) currently we ignore DBSpec.AdditionalReplicationSlots on standbys - // So we don't touch replication slots and manually created - // slots are kept. If the instance becomes master then they'll - // be dropped. - } + if err = p.refreshReplicationSlots(cd, db); err != nil { + log.Errorw("error updating replication slots", zap.Error(err)) + } - if db.Spec.FollowConfig.Type == cluster.FollowTypeExternal { + case cluster.FollowTypeExternal: // Update recovery conf if our FollowConfig has changed curReplConnParams, err := pgm.GetPrimaryConninfo() if err != nil { @@ -1573,7 +1546,6 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { if err = p.refreshReplicationSlots(cd, db); err != nil { log.Errorw("error updating replication slots", zap.Error(err)) - return } } diff --git a/doc/cluster_spec.md b/doc/cluster_spec.md index 2382e94ee..07ed02a41 100644 --- a/doc/cluster_spec.md +++ b/doc/cluster_spec.md @@ -25,7 +25,7 @@ Some options in a running cluster specification can be changed to update the des | minSynchronousStandbys | minimum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | uint16 | 1 | | maxSynchronousStandbys | maximum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | uint16 | 1 | | additionalWalSenders | number of additional wal_senders in addition to the ones internally defined by stolon, useful to provide enough wal senders for external standbys (changing this value requires an instance restart) | no | uint16 | 5 | -| additionalMasterReplicationSlots | a list of additional replication slots to be created on the master postgres instance. Replication slots not defined here will be dropped from the master instance (i.e. manually created replication slots will be removed). | no | []string | null | +| additionalMasterReplicationSlots | a list of additional physical replication slots to be created on the master postgres instance. They will be prefixed with `stolon_` (like internal replication slots used for standby replication) to make them "namespaced" from other replication slots. Replication slots starting with `stolon_` and not defined here (and not used for standby replication) will be dropped from the master instance. | no | []string | null | | usePgrewind | try to use pg_rewind for faster instance resyncronization. | no | bool | false | | initMode | The cluster initialization mode. Can be *new* or *existing*. *new* means that a new db cluster will be created on a random keeper and the other keepers will sync with it. *existing* means that a keeper (that needs to have an already created db cluster) will be choosed as the initial master and the other keepers will sync with it. In this case the `existingConfig` object needs to be populated. | yes | string | | | existingConfig | configuration for initMode of type "existing" | if initMode is "existing" | ExistingConfig | | diff --git a/tests/integration/config_test.go b/tests/integration/config_test.go index cedbbc40d..047a5690e 100644 --- a/tests/integration/config_test.go +++ b/tests/integration/config_test.go @@ -15,6 +15,7 @@ package integration import ( + "context" "fmt" "io/ioutil" "os" @@ -364,16 +365,28 @@ func TestAdditionalReplicationSlots(t *testing.T) { t.Fatalf("unexpected err: %v", err) } + cd, _, err := sm.GetClusterData(context.TODO()) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + + var standbyDBUID string + for _, db := range cd.DBs { + if db.Spec.KeeperUID == standby.uid { + standbyDBUID = db.UID + } + } + // create additional replslots on master err = StolonCtl(clusterName, tstore.storeBackend, storeEndpoints, "update", "--patch", `{ "additionalMasterReplicationSlots" : [ "replslot01", "replslot02" ] }`) if err != nil { t.Fatalf("unexpected err: %v", err) } - if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } // no repl slot on standby - if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -382,11 +395,11 @@ func TestAdditionalReplicationSlots(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - if err := waitNotStolonReplicationSlots(master, []string{"replslot01"}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01"}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } // no repl slot on standby - if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -395,11 +408,11 @@ func TestAdditionalReplicationSlots(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - if err := waitNotStolonReplicationSlots(master, []string{}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(master, []string{standbyDBUID}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } // no repl slot on standby - if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -408,19 +421,27 @@ func TestAdditionalReplicationSlots(t *testing.T) { if err != nil { t.Fatalf("unexpected err: %v", err) } - if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } // no repl slot on standby - if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } - // Manually create a replication slot. It should be dropped. + // Manually create a replication slot. It should not be dropped. if _, err := master.Exec("select pg_create_physical_replication_slot('manualreplslot')"); err != nil { t.Fatalf("unexpected err: %v", err) } - if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil { + // Manually create a replication slot starting with stolon_ . It should be dropped. + if _, err := master.Exec("select pg_create_physical_replication_slot('stolon_manualreplslot')"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + // check it here so we are sure the refresh slots function has already been called + if err := waitNotStolonReplicationSlots(master, []string{"manualreplslot"}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } @@ -437,7 +458,7 @@ func TestAdditionalReplicationSlots(t *testing.T) { } // repl slot on standby which is the new master - if err := waitNotStolonReplicationSlots(standby, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil { + if err := waitStolonReplicationSlots(standby, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil { t.Fatalf("unexpected err: %v", err) } } diff --git a/tests/integration/utils.go b/tests/integration/utils.go index e90a83cbb..b16869e4c 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -167,6 +167,37 @@ func waitReplicationSlots(q Querier, replSlots []string, timeout time.Duration) return fmt.Errorf("timeout waiting for replSlots %v, got: %v, last err: %v", replSlots, curReplSlots, err) } +func waitStolonReplicationSlots(q Querier, replSlots []string, timeout time.Duration) error { + // prefix with stolon_ + for i, slot := range replSlots { + replSlots[i] = common.StolonName(slot) + } + sort.Sort(sort.StringSlice(replSlots)) + + start := time.Now() + var curReplSlots []string + var err error + for time.Now().Add(-timeout).Before(start) { + allReplSlots, err := getReplicationSlots(q) + if err != nil { + goto end + } + curReplSlots = []string{} + for _, s := range allReplSlots { + if common.IsStolonName(s) { + curReplSlots = append(curReplSlots, s) + } + } + sort.Sort(sort.StringSlice(curReplSlots)) + if reflect.DeepEqual(replSlots, curReplSlots) { + return nil + } + end: + time.Sleep(2 * time.Second) + } + return fmt.Errorf("timeout waiting for replSlots %v, got: %v, last err: %v", replSlots, curReplSlots, err) +} + func waitNotStolonReplicationSlots(q Querier, replSlots []string, timeout time.Duration) error { sort.Sort(sort.StringSlice(replSlots))