From 09167358c630c60b66abcf3f6fc95d380a157d6b Mon Sep 17 00:00:00 2001 From: Ben Iofel Date: Fri, 2 Aug 2024 00:18:39 -0400 Subject: [PATCH] move to function --- internal/flypg/node.go | 90 +++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 41 deletions(-) diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 7f852b0..4726865 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -318,48 +318,9 @@ func (n *Node) PostInit(ctx context.Context) error { } } case StandbyRoleName: - // This section handles migration from 6pn as repmgr node name to machine ID as repmgr node name - primary, err := n.RepMgr.PrimaryMember(ctx, repConn) - if err != nil { - return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) - } - - primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName) - if err != nil { - return fmt.Errorf("failed to establish connection to primary: %s", err) - } - defer func() { _ = primaryConn.Close(ctx) }() - - rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication") - if err != nil { - return fmt.Errorf("failed to query pg_stat_replication: %s", err) - } - defer rows.Close() - - var applicationNames []string - for rows.Next() { - var applicationName string - if err := rows.Scan(&applicationName); err != nil { - return fmt.Errorf("failed to scan application_name: %s", err) - } - applicationNames = append(applicationNames, applicationName) - } - if err := rows.Err(); err != nil { - return fmt.Errorf("failed to iterate over rows: %s", err) - } - - if slices.Contains(applicationNames, n.PrivateIP) { - log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...") - - if err := n.RepMgr.regenReplicationConf(ctx); err != nil { - return fmt.Errorf("failed to clone standby: %s", err) - } - - if err := n.PGConfig.reload(ctx); err != nil { - return fmt.Errorf("failed to reload postgresql: %s", err) - } + if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil { + return fmt.Errorf("failed to migrate node name: %s", err) } - // end of 6pn -> machine ID migration stuff // Register existing standby to apply any configuration changes. if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil { @@ -577,3 +538,50 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro return nil } + +// migrate node name from 6pn to machine ID if needed +func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error { + primary, err := n.RepMgr.PrimaryMember(ctx, repConn) + if err != nil { + return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) + } + + primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.NodeName) + if err != nil { + return fmt.Errorf("failed to establish connection to primary: %s", err) + } + defer func() { _ = primaryConn.Close(ctx) }() + + rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication") + if err != nil { + return fmt.Errorf("failed to query pg_stat_replication: %s", err) + } + defer rows.Close() + + var applicationNames []string + for rows.Next() { + var applicationName string + if err := rows.Scan(&applicationName); err != nil { + return fmt.Errorf("failed to scan application_name: %s", err) + } + applicationNames = append(applicationNames, applicationName) + } + if err := rows.Err(); err != nil { + return fmt.Errorf("failed to iterate over rows: %s", err) + } + + // if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql + if slices.Contains(applicationNames, n.PrivateIP) { + log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...") + + if err := n.RepMgr.regenReplicationConf(ctx); err != nil { + return fmt.Errorf("failed to clone standby: %s", err) + } + + if err := n.PGConfig.reload(ctx); err != nil { + return fmt.Errorf("failed to reload postgresql: %s", err) + } + } + + return nil +}