Skip to content

Commit

Permalink
Merge pull request #169 from sgotti/repl_slot_prefix
Browse files Browse the repository at this point in the history
keeper: prefix replication slot names.
  • Loading branch information
sgotti committed Oct 21, 2016
2 parents 35466fe + 58e5e17 commit c2598a1
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 15 deletions.
59 changes: 46 additions & 13 deletions cmd/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,22 @@ func readPasswordFromFile(filepath string) (string, error) {
return strings.TrimSpace(string(pwBytes)), nil
}

const (
replSlotPrefix = "stolon_"
)

func replSlotName(id string) string {
return replSlotPrefix + id
}

func idFromReplSlotName(replSlotName string) string {
return strings.TrimPrefix(replSlotName, replSlotPrefix)
}

func ownedReplSlotName(replSlotName string) bool {
return strings.HasPrefix(replSlotName, replSlotPrefix)
}

func (p *PostgresKeeper) getSUConnParams(keeperState *cluster.KeeperState) pg.ConnParams {
cp := pg.ConnParams{
"user": p.pgSUUsername,
Expand Down Expand Up @@ -544,7 +560,7 @@ func (p *PostgresKeeper) resync(followed *cluster.KeeperState, initialized, star
// log pg_rewind error and fallback to pg_basebackup
log.Errorf("error syncing with pg_rewind: %v", err)
} else {
if err := pgm.WriteRecoveryConf(replConnParams); err != nil {
if err := pgm.WriteRecoveryConf(replConnParams, replSlotName(p.id)); err != nil {
return fmt.Errorf("err: %v", err)
}
return nil
Expand All @@ -561,7 +577,7 @@ func (p *PostgresKeeper) resync(followed *cluster.KeeperState, initialized, star
}
log.Infof("sync from followed instance %q successfully finished", followed.ID)

if err := pgm.WriteRecoveryConf(replConnParams); err != nil {
if err := pgm.WriteRecoveryConf(replConnParams, replSlotName(p.id)); err != nil {
return fmt.Errorf("err: %v", err)
}
return nil
Expand Down Expand Up @@ -742,31 +758,43 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
log.Errorf("err: %v", err)
return
}
// Create replication slots
// Drop replication slots
for _, slotName := range replSlots {
if !util.StringInSlice(followersIDs, slotName) {
log.Infof("dropping replication slot for keeper %q not marked as follower", slotName)
if !ownedReplSlotName(slotName) {
// TODO(sgotti) remove this in the next release
// remove old format replication slots
if util.StringInSlice(followersIDs, slotName) {
log.Infof("dropping old format replication slot %q", slotName)
if err = pgm.DropReplicationSlot(slotName); err != nil {
log.Errorf("err: %v", err)
}
}
continue
}
if !util.StringInSlice(followersIDs, idFromReplSlotName(slotName)) {
log.Infof("dropping replication slot %q since keeper %q is not marked as follower", slotName, idFromReplSlotName(slotName))
if err = pgm.DropReplicationSlot(slotName); err != nil {
log.Errorf("err: %v", err)
}
}
}

// Create replication slots
for _, followerID := range followersIDs {
if followerID == p.id {
continue
}
if !util.StringInSlice(replSlots, followerID) {
if err = pgm.CreateReplicationSlot(followerID); err != nil {
replSlot := replSlotName(followerID)
if !util.StringInSlice(replSlots, replSlot) {
if err = pgm.CreateReplicationSlot(replSlot); err != nil {
log.Errorf("err: %v", err)
}
}
}

}
} else if keeperRole.Follow != "" {
// We are a standby
log.Infof("our cluster requested state is standby following %q", keeperRole.Follow)
followedID := keeperRole.Follow
log.Infof("our cluster requested state is standby following %q", followedID)
followed, ok := keepersState[keeperRole.Follow]
if !ok {
log.Errorf("no keeper state data for %q", keeperRole.Follow)
Expand All @@ -781,7 +809,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
// check if it's on the same branch or force a
// resync
replConnParams := p.getReplConnParams(followed)
if err = pgm.WriteRecoveryConf(replConnParams); err != nil {
if err = pgm.WriteRecoveryConf(replConnParams, replSlotName(p.id)); err != nil {
log.Errorf("err: %v", err)
return
}
Expand Down Expand Up @@ -842,6 +870,11 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
return
}
if !started {
replConnParams := p.getReplConnParams(followed)
if err = pgm.WriteRecoveryConf(replConnParams, replSlotName(p.id)); err != nil {
log.Errorf("err: %v", err)
return
}
if err = pgm.Start(); err != nil {
log.Errorf("failed to start postgres: %v", err)
return
Expand Down Expand Up @@ -893,7 +926,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
if !curConnParams.Equals(newConnParams) {
log.Infof("followed instance connection parameters changed. Reconfiguring...")
log.Infof("following %s with connection parameters %v", keeperRole.Follow, newConnParams)
if err = pgm.WriteRecoveryConf(newConnParams); err != nil {
if err = pgm.WriteRecoveryConf(newConnParams, replSlotName(p.id)); err != nil {
log.Errorf("err: %v", err)
return
}
Expand All @@ -905,7 +938,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) {
}
} else if keeperRole.Follow == "" {
log.Infof("our cluster requested state is standby following no one")
if err = pgm.WriteRecoveryConf(nil); err != nil {
if err = pgm.WriteRecoveryConf(nil, ""); err != nil {
log.Errorf("err: %v", err)
return
}
Expand Down
6 changes: 4 additions & 2 deletions pkg/postgresql/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -411,15 +411,17 @@ func (p *Manager) WriteConf() error {
return nil
}

func (p *Manager) WriteRecoveryConf(followedConnParams ConnParams) error {
func (p *Manager) WriteRecoveryConf(followedConnParams ConnParams, replSlotName string) error {
f, err := ioutil.TempFile(p.dataDir, "recovery.conf")
if err != nil {
return err
}
defer f.Close()

f.WriteString("standby_mode = 'on'\n")
f.WriteString(fmt.Sprintf("primary_slot_name = '%s'\n", p.name))
if replSlotName != "" {
f.WriteString(fmt.Sprintf("primary_slot_name = '%s'\n", replSlotName))
}
f.WriteString("recovery_target_timeline = 'latest'\n")

if followedConnParams != nil {
Expand Down

0 comments on commit c2598a1

Please sign in to comment.