Skip to content

Commit

Permalink
Merge pull request #531 from sgotti/namespace_additional_master_repli…
Browse files Browse the repository at this point in the history
…cation_slots

keeper: prefix additional repl slots with `stolon_`
  • Loading branch information
sgotti committed Jul 3, 2018
2 parents 07bdac8 + 54f20a5 commit df6d4ee
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 67 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
### v0.12.0

## Upgrades notes.

* Replication slots declared in the clusterspec `additionalMasterReplicationSlots` option will now be prefixed with the `stolon_` string to let users be able to manually create/drop custom replication slots (they shouldn't start with `stolon_`). Users of these feature should upgrade all the references to these replication slots adding the `stolon_` prefix.

### v0.11.0

#### New features
Expand Down
82 changes: 27 additions & 55 deletions cmd/keeper/cmd/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -886,68 +886,46 @@ func (p *PostgresKeeper) isDifferentTimelineBranch(followedDB *cluster.DB, pgSta
return false
}

func (p *PostgresKeeper) updateReplSlots(curReplSlots []string, uid string, followersUIDs []string) error {
// Drop internal replication slots
for _, slot := range curReplSlots {
if !common.IsStolonName(slot) {
continue
}
if !util.StringInSlice(followersUIDs, common.NameFromStolonName(slot)) {
log.Infow("dropping replication slot since db not marked as follower", "slot", slot, "db", common.NameFromStolonName(slot))
if err := p.pgm.DropReplicationSlot(slot); err != nil {
log.Errorw("failed to drop replication slot", "slot", slot, "err", err)
// don't return the error but continue also if drop failed (standby still connected)
}
}
}
// Create internal replication slots
func (p *PostgresKeeper) updateReplSlots(curReplSlots []string, uid string, followersUIDs, additionalReplSlots []string) error {
internalReplSlots := map[string]struct{}{}

// Create a list of the wanted internal replication slots
for _, followerUID := range followersUIDs {
if followerUID == uid {
continue
}
replSlot := common.StolonName(followerUID)
if !util.StringInSlice(curReplSlots, replSlot) {
log.Infow("creating replication slot", "slot", replSlot, "db", followerUID)
if err := p.pgm.CreateReplicationSlot(replSlot); err != nil {
log.Errorw("failed to create replication slot", "slot", replSlot, zap.Error(err))
return err
}
}
internalReplSlots[common.StolonName(followerUID)] = struct{}{}
}
return nil
}

func (p *PostgresKeeper) updateAdditionalReplSlots(curReplSlots []string, additionalReplSlots []string) error {
// detect not stolon replication slots
notStolonSlots := []string{}
for _, curReplSlot := range curReplSlots {
if !common.IsStolonName(curReplSlot) {
notStolonSlots = append(notStolonSlots, curReplSlot)
}
// Add AdditionalReplicationSlots
for _, slot := range additionalReplSlots {
internalReplSlots[common.StolonName(slot)] = struct{}{}
}

// drop unnecessary slots
for _, slot := range notStolonSlots {
if !util.StringInSlice(additionalReplSlots, slot) {
// Drop internal replication slots
for _, slot := range curReplSlots {
if !common.IsStolonName(slot) {
continue
}
if _, ok := internalReplSlots[slot]; !ok {
log.Infow("dropping replication slot", "slot", slot)
if err := p.pgm.DropReplicationSlot(slot); err != nil {
log.Errorw("failed to drop replication slot", "slot", slot, zap.Error(err))
return err
log.Errorw("failed to drop replication slot", "slot", slot, "err", err)
// don't return the error but continue also if drop failed (standby still connected)
}
}
}

// create required slots
for _, slot := range additionalReplSlots {
if !util.StringInSlice(notStolonSlots, slot) {
// Create internal replication slots
for slot := range internalReplSlots {
if !util.StringInSlice(curReplSlots, slot) {
log.Infow("creating replication slot", "slot", slot)
if err := p.pgm.CreateReplicationSlot(slot); err != nil {
log.Errorw("failed to create replication slot", "slot", slot, zap.Error(err))
return err
}
}
}

return nil
}

Expand All @@ -961,14 +939,10 @@ func (p *PostgresKeeper) refreshReplicationSlots(cd *cluster.ClusterData, db *cl

followersUIDs := db.Spec.Followers

if err = p.updateReplSlots(currentReplicationSlots, db.UID, followersUIDs); err != nil {
if err = p.updateReplSlots(currentReplicationSlots, db.UID, followersUIDs, db.Spec.AdditionalReplicationSlots); err != nil {
log.Errorw("error updating replication slots", zap.Error(err))
return err
}
if err = p.updateAdditionalReplSlots(currentReplicationSlots, db.Spec.AdditionalReplicationSlots); err != nil {
log.Errorw("error updating additional replication slots", zap.Error(err))
return err
}

return nil
}
Expand Down Expand Up @@ -1449,7 +1423,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Infow("already master")
}

if err = p.refreshReplicationSlots(cd, db); err != nil {
if err := p.refreshReplicationSlots(cd, db); err != nil {
log.Errorw("error updating replication slots", zap.Error(err))
return
}
Expand Down Expand Up @@ -1499,7 +1473,8 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
// TODO(sgotti) Check that the followed instance has all the needed WAL segments

// Update our primary_conninfo if replConnString changed
if db.Spec.FollowConfig.Type == cluster.FollowTypeInternal {
switch db.Spec.FollowConfig.Type {
case cluster.FollowTypeInternal:
var curReplConnParams postgresql.ConnParams

curReplConnParams, err = pgm.GetPrimaryConninfo()
Expand Down Expand Up @@ -1531,13 +1506,11 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
}
}

// TODO(sgotti) currently we ignore DBSpec.AdditionalReplicationSlots on standbys
// So we don't touch replication slots and manually created
// slots are kept. If the instance becomes master then they'll
// be dropped.
}
if err = p.refreshReplicationSlots(cd, db); err != nil {
log.Errorw("error updating replication slots", zap.Error(err))
}

if db.Spec.FollowConfig.Type == cluster.FollowTypeExternal {
case cluster.FollowTypeExternal:
// Update recovery conf if our FollowConfig has changed
curReplConnParams, err := pgm.GetPrimaryConninfo()
if err != nil {
Expand Down Expand Up @@ -1573,7 +1546,6 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {

if err = p.refreshReplicationSlots(cd, db); err != nil {
log.Errorw("error updating replication slots", zap.Error(err))
return
}
}

Expand Down
2 changes: 1 addition & 1 deletion doc/cluster_spec.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ Some options in a running cluster specification can be changed to update the des
| minSynchronousStandbys | minimum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | uint16 | 1 |
| maxSynchronousStandbys | maximum number of required synchronous standbys when synchronous replication is enabled (only set this to a value > 1 when using PostgreSQL >= 9.6) | no | uint16 | 1 |
| additionalWalSenders | number of additional wal_senders in addition to the ones internally defined by stolon, useful to provide enough wal senders for external standbys (changing this value requires an instance restart) | no | uint16 | 5 |
| additionalMasterReplicationSlots | a list of additional replication slots to be created on the master postgres instance. Replication slots not defined here will be dropped from the master instance (i.e. manually created replication slots will be removed). | no | []string | null |
| additionalMasterReplicationSlots | a list of additional physical replication slots to be created on the master postgres instance. They will be prefixed with `stolon_` (like internal replication slots used for standby replication) to make them "namespaced" from other replication slots. Replication slots starting with `stolon_` and not defined here (and not used for standby replication) will be dropped from the master instance. | no | []string | null |
| usePgrewind | try to use pg_rewind for faster instance resyncronization. | no | bool | false |
| initMode | The cluster initialization mode. Can be *new* or *existing*. *new* means that a new db cluster will be created on a random keeper and the other keepers will sync with it. *existing* means that a keeper (that needs to have an already created db cluster) will be choosed as the initial master and the other keepers will sync with it. In this case the `existingConfig` object needs to be populated. | yes | string | |
| existingConfig | configuration for initMode of type "existing" | if initMode is "existing" | ExistingConfig | |
Expand Down
43 changes: 32 additions & 11 deletions tests/integration/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package integration

import (
"context"
"fmt"
"io/ioutil"
"os"
Expand Down Expand Up @@ -364,16 +365,28 @@ func TestAdditionalReplicationSlots(t *testing.T) {
t.Fatalf("unexpected err: %v", err)
}

cd, _, err := sm.GetClusterData(context.TODO())
if err != nil {
t.Fatalf("unexpected err: %v", err)
}

var standbyDBUID string
for _, db := range cd.DBs {
if db.Spec.KeeperUID == standby.uid {
standbyDBUID = db.UID
}
}

// create additional replslots on master
err = StolonCtl(clusterName, tstore.storeBackend, storeEndpoints, "update", "--patch", `{ "additionalMasterReplicationSlots" : [ "replslot01", "replslot02" ] }`)
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// no repl slot on standby
if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}

Expand All @@ -382,11 +395,11 @@ func TestAdditionalReplicationSlots(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := waitNotStolonReplicationSlots(master, []string{"replslot01"}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01"}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// no repl slot on standby
if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}

Expand All @@ -395,11 +408,11 @@ func TestAdditionalReplicationSlots(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := waitNotStolonReplicationSlots(master, []string{}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(master, []string{standbyDBUID}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// no repl slot on standby
if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}

Expand All @@ -408,19 +421,27 @@ func TestAdditionalReplicationSlots(t *testing.T) {
if err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// no repl slot on standby
if err := waitNotStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(standby, []string{}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}

// Manually create a replication slot. It should be dropped.
// Manually create a replication slot. It should not be dropped.
if _, err := master.Exec("select pg_create_physical_replication_slot('manualreplslot')"); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := waitNotStolonReplicationSlots(master, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
// Manually create a replication slot starting with stolon_ . It should be dropped.
if _, err := master.Exec("select pg_create_physical_replication_slot('stolon_manualreplslot')"); err != nil {
t.Fatalf("unexpected err: %v", err)
}
if err := waitStolonReplicationSlots(master, []string{standbyDBUID, "replslot01", "replslot02"}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
// check it here so we are sure the refresh slots function has already been called
if err := waitNotStolonReplicationSlots(master, []string{"manualreplslot"}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}

Expand All @@ -437,7 +458,7 @@ func TestAdditionalReplicationSlots(t *testing.T) {
}

// repl slot on standby which is the new master
if err := waitNotStolonReplicationSlots(standby, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
if err := waitStolonReplicationSlots(standby, []string{"replslot01", "replslot02"}, 30*time.Second); err != nil {
t.Fatalf("unexpected err: %v", err)
}
}
31 changes: 31 additions & 0 deletions tests/integration/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,37 @@ func waitReplicationSlots(q Querier, replSlots []string, timeout time.Duration)
return fmt.Errorf("timeout waiting for replSlots %v, got: %v, last err: %v", replSlots, curReplSlots, err)
}

func waitStolonReplicationSlots(q Querier, replSlots []string, timeout time.Duration) error {
// prefix with stolon_
for i, slot := range replSlots {
replSlots[i] = common.StolonName(slot)
}
sort.Sort(sort.StringSlice(replSlots))

start := time.Now()
var curReplSlots []string
var err error
for time.Now().Add(-timeout).Before(start) {
allReplSlots, err := getReplicationSlots(q)
if err != nil {
goto end
}
curReplSlots = []string{}
for _, s := range allReplSlots {
if common.IsStolonName(s) {
curReplSlots = append(curReplSlots, s)
}
}
sort.Sort(sort.StringSlice(curReplSlots))
if reflect.DeepEqual(replSlots, curReplSlots) {
return nil
}
end:
time.Sleep(2 * time.Second)
}
return fmt.Errorf("timeout waiting for replSlots %v, got: %v, last err: %v", replSlots, curReplSlots, err)
}

func waitNotStolonReplicationSlots(q Querier, replSlots []string, timeout time.Duration) error {
sort.Sort(sort.StringSlice(replSlots))

Expand Down

0 comments on commit df6d4ee

Please sign in to comment.