Skip to content

Commit

Permalink
move to function
Browse files Browse the repository at this point in the history
  • Loading branch information
benwaffle committed Aug 2, 2024
1 parent bbd55f0 commit 0916735
Showing 1 changed file with 49 additions and 41 deletions.
90 changes: 49 additions & 41 deletions internal/flypg/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -318,48 +318,9 @@ func (n *Node) PostInit(ctx context.Context) error {
}
}
case StandbyRoleName:
// This section handles migration from 6pn as repmgr node name to machine ID as repmgr node name
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := n.PGConfig.reload(ctx); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil {
return fmt.Errorf("failed to migrate node name: %s", err)
}
// end of 6pn -> machine ID migration stuff

// Register existing standby to apply any configuration changes.
if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil {
Expand Down Expand Up @@ -577,3 +538,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro

return nil
}

// migrate node name from 6pn to machine ID if needed
func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error {
primary, err := n.RepMgr.PrimaryMember(ctx, repConn)
if err != nil {
return fmt.Errorf("failed to resolve primary member when updating standby: %s", err)
}

primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName)
if err != nil {
return fmt.Errorf("failed to establish connection to primary: %s", err)
}
defer func() { _ = primaryConn.Close(ctx) }()

rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication")
if err != nil {
return fmt.Errorf("failed to query pg_stat_replication: %s", err)
}
defer rows.Close()

var applicationNames []string
for rows.Next() {
var applicationName string
if err := rows.Scan(&applicationName); err != nil {
return fmt.Errorf("failed to scan application_name: %s", err)
}
applicationNames = append(applicationNames, applicationName)
}
if err := rows.Err(); err != nil {
return fmt.Errorf("failed to iterate over rows: %s", err)
}

// if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql
if slices.Contains(applicationNames, n.PrivateIP) {
log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...")

if err := n.RepMgr.regenReplicationConf(ctx); err != nil {
return fmt.Errorf("failed to clone standby: %s", err)
}

if err := n.PGConfig.reload(ctx); err != nil {
return fmt.Errorf("failed to reload postgresql: %s", err)
}
}

return nil
}

0 comments on commit 0916735

Please sign in to comment.