diff --git a/bin/restart-repmgrd b/bin/restart-repmgrd index 9b3d4db0..10ef6bef 100755 --- a/bin/restart-repmgrd +++ b/bin/restart-repmgrd @@ -1,10 +1,3 @@ #!/bin/bash -if [ -f /tmp/repmgrd.pid ]; then - PID=$(cat /tmp/repmgrd.pid) - - # Check if the process is running - if ps -p $PID > /dev/null 2>&1; then - kill $PID - fi -fi \ No newline at end of file +kill `cat /tmp/repmgrd.pid` diff --git a/cmd/pg_unregister/main.go b/cmd/pg_unregister/main.go index 049ab09a..c1ac9517 100644 --- a/cmd/pg_unregister/main.go +++ b/cmd/pg_unregister/main.go @@ -3,7 +3,6 @@ package main import ( "context" "encoding/base64" - "errors" "fmt" "log" "os" @@ -45,15 +44,7 @@ func processUnregistration(ctx context.Context) error { defer func() { _ = conn.Close(ctx) }() member, err := node.RepMgr.MemberByHostname(ctx, conn, string(hostnameBytes)) - if errors.Is(err, pgx.ErrNoRows) { - // for historical reasons, old versions of flyctl passes in the 6pn as the hostname - // most likely this won't work because the hostname does not resolve if the machine is stopped, - // but we try anyway - member, err = node.RepMgr.MemberBy6PN(ctx, conn, string(hostnameBytes)) - if err != nil { - return fmt.Errorf("failed to resolve member by hostname and 6pn: %s", err) - } - } else if err != nil { + if err != nil { return fmt.Errorf("failed to resolve member: %s", err) } diff --git a/go.mod b/go.mod index 9cf3f9ad..b1b1e9fb 100644 --- a/go.mod +++ b/go.mod @@ -8,10 +8,8 @@ require ( github.com/hashicorp/consul/api v1.18.0 github.com/jackc/pgconn v1.14.3 github.com/jackc/pgx/v5 v5.5.4 - github.com/olekukonko/tablewriter v0.0.5 github.com/pkg/errors v0.9.1 github.com/pkg/term v1.1.0 - github.com/spf13/cobra v1.8.1 github.com/superfly/fly-checks v0.0.0-20230510154016-d189351293f2 golang.org/x/exp v0.0.0-20230105202349-8879d0199aa3 golang.org/x/sync v0.1.0 @@ -38,6 +36,8 @@ require ( github.com/mattn/go-runewidth v0.0.9 // indirect github.com/mitchellh/go-homedir v1.1.0 // indirect github.com/mitchellh/mapstructure v1.4.1 // indirect + github.com/olekukonko/tablewriter v0.0.5 // indirect + github.com/spf13/cobra v1.8.1 // indirect github.com/spf13/pflag v1.0.5 // indirect github.com/stretchr/objx v0.5.0 // indirect golang.org/x/crypto v0.20.0 // indirect diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 28d832c0..3b099218 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -16,12 +16,10 @@ import ( "github.com/fly-apps/postgres-flex/internal/privnet" "github.com/fly-apps/postgres-flex/internal/utils" "github.com/jackc/pgx/v5" - "golang.org/x/exp/slices" ) type Node struct { AppName string - MachineID string PrivateIP string PrimaryRegion string DataDir string @@ -54,8 +52,6 @@ func NewNode() (*Node, error) { node.PrivateIP = ipv6.String() - node.MachineID = os.Getenv("FLY_MACHINE_ID") - node.PrimaryRegion = os.Getenv("PRIMARY_REGION") if node.PrimaryRegion == "" { return nil, fmt.Errorf("PRIMARY_REGION environment variable must be set") @@ -93,7 +89,6 @@ func NewNode() (*Node, error) { PasswordConfigPath: "/data/.pgpass", DataDir: node.DataDir, PrivateIP: node.PrivateIP, - MachineID: node.MachineID, Port: 5433, DatabaseName: "repmgr", Credentials: node.ReplCredentials, @@ -270,7 +265,7 @@ func (n *Node) PostInit(ctx context.Context) error { return fmt.Errorf("failed to resolve member role: %s", err) } - // Restart repmgrd in the event the machine ID changes for an already registered node. + // Restart repmgrd in the event the IP changes for an already registered node. // This can happen if the underlying volume is moved to a different node. daemonRestartRequired := n.RepMgr.daemonRestartRequired(member) @@ -284,8 +279,6 @@ func (n *Node) PostInit(ctx context.Context) error { if err := Quarantine(ctx, n, primary); err != nil { return fmt.Errorf("failed to quarantine failed primary: %s", err) } - - panic(err) } else if errors.Is(err, ErrZombieDiscovered) { log.Printf("[ERROR] The majority of registered members agree that '%s' is the real primary.\n", primary) // Turn member read-only @@ -299,10 +292,10 @@ func (n *Node) PostInit(ctx context.Context) error { } // This should never happen - if primary != n.RepMgr.machineIdToDNS(n.MachineID) { + if primary != n.PrivateIP { return fmt.Errorf("resolved primary '%s' does not match ourself '%s'. this should not happen", primary, - n.RepMgr.machineIdToDNS(n.MachineID), + n.PrivateIP, ) } @@ -318,11 +311,6 @@ func (n *Node) PostInit(ctx context.Context) error { } } case StandbyRoleName: - if err := n.migrateNodeNameIfNeeded(ctx, repConn); err != nil { - log.Printf("[ERROR] failed to migrate node name: %s", err) - // We try to bring the standby up anyway - } - // Register existing standby to apply any configuration changes. if err := n.RepMgr.registerStandby(daemonRestartRequired); err != nil { return fmt.Errorf("failed to register existing standby: %s", err) @@ -539,50 +527,3 @@ func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) erro return nil } - -// migrate node name from 6pn to machine ID if needed -func (n *Node) migrateNodeNameIfNeeded(ctx context.Context, repConn *pgx.Conn) error { - primary, err := n.RepMgr.PrimaryMember(ctx, repConn) - if err != nil { - return fmt.Errorf("failed to resolve primary member when updating standby: %s", err) - } - - primaryConn, err := n.RepMgr.NewRemoteConnection(ctx, primary.Hostname) - if err != nil { - return fmt.Errorf("failed to establish connection to primary: %s", err) - } - defer func() { _ = primaryConn.Close(ctx) }() - - rows, err := primaryConn.Query(ctx, "select application_name from pg_stat_replication") - if err != nil { - return fmt.Errorf("failed to query pg_stat_replication: %s", err) - } - defer rows.Close() - - var applicationNames []string - for rows.Next() { - var applicationName string - if err := rows.Scan(&applicationName); err != nil { - return fmt.Errorf("failed to scan application_name: %s", err) - } - applicationNames = append(applicationNames, applicationName) - } - if err := rows.Err(); err != nil { - return fmt.Errorf("failed to iterate over rows: %s", err) - } - - // if we find our 6pn as application_name, we need to regenerate postgresql.auto.conf and reload postgresql - if slices.Contains(applicationNames, n.PrivateIP) { - log.Printf("pg_stat_replication on the primary has our ipv6 address as application_name, converting to machine ID...") - - if err := n.RepMgr.regenReplicationConf(ctx); err != nil { - return fmt.Errorf("failed to clone standby: %s", err) - } - - if err := admin.ReloadPostgresConfig(ctx, repConn); err != nil { - return fmt.Errorf("failed to reload postgresql: %s", err) - } - } - - return nil -} diff --git a/internal/flypg/readonly.go b/internal/flypg/readonly.go index 1c075538..a714c254 100644 --- a/internal/flypg/readonly.go +++ b/internal/flypg/readonly.go @@ -70,7 +70,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { for _, member := range members { if member.Role == PrimaryRoleName { - endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, target) + endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, target) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to broadcast readonly state change to member %s: %s", member.Hostname, err) @@ -85,7 +85,7 @@ func BroadcastReadonlyChange(ctx context.Context, n *Node, enabled bool) error { } for _, member := range members { - endpoint := fmt.Sprintf("http://%s:5500/%s", member.Hostname, RestartHaproxyEndpoint) + endpoint := fmt.Sprintf("http://[%s]:5500/%s", member.Hostname, RestartHaproxyEndpoint) resp, err := http.Get(endpoint) if err != nil { log.Printf("[WARN] Failed to restart haproxy on member %s: %s", member.Hostname, err) diff --git a/internal/flypg/repmgr.go b/internal/flypg/repmgr.go index f6687169..46ceab8c 100644 --- a/internal/flypg/repmgr.go +++ b/internal/flypg/repmgr.go @@ -34,7 +34,6 @@ type RepMgr struct { PrimaryRegion string Region string PrivateIP string - MachineID string DataDir string DatabaseName string Credentials admin.Credential @@ -162,12 +161,10 @@ func (r *RepMgr) setDefaults() error { return err } - hostname := r.machineIdToDNS(r.MachineID) - conf := ConfigMap{ "node_id": nodeID, - "node_name": fmt.Sprintf("'%s'", hostname), - "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", hostname, r.Port, r.Credentials.Username, r.DatabaseName), + "node_name": fmt.Sprintf("'%s'", r.PrivateIP), + "conninfo": fmt.Sprintf("'host=%s port=%d user=%s dbname=%s connect_timeout=5'", r.PrivateIP, r.Port, r.Credentials.Username, r.DatabaseName), "data_directory": fmt.Sprintf("'%s'", r.DataDir), "failover": "'automatic'", "use_replication_slots": "yes", @@ -279,7 +276,7 @@ func (*RepMgr) restartDaemon() error { } func (r *RepMgr) daemonRestartRequired(m *Member) bool { - return m.Hostname != r.MachineID + return m.Hostname != r.PrivateIP } func (r *RepMgr) unregisterWitness(id int) error { @@ -304,14 +301,14 @@ func (r *RepMgr) rejoinCluster(hostname string) error { return err } -func (r *RepMgr) clonePrimary(hostname string) error { +func (r *RepMgr) clonePrimary(ipStr string) error { cmdStr := fmt.Sprintf("mkdir -p %s", r.DataDir) if _, err := utils.RunCommand(cmdStr, "postgres"); err != nil { return fmt.Errorf("failed to create pg directory: %s", err) } cmdStr = fmt.Sprintf("repmgr -h %s -p %d -d %s -U %s -f %s standby clone -c -F", - hostname, + ipStr, r.Port, r.DatabaseName, r.Credentials.Username, @@ -325,21 +322,6 @@ func (r *RepMgr) clonePrimary(hostname string) error { return nil } -func (r *RepMgr) regenReplicationConf(ctx context.Context) error { - // TODO: do we need -c? - if _, err := utils.RunCmd(ctx, "postgres", - "repmgr", "--replication-conf-only", - "-h", "", - "-p", fmt.Sprint(r.Port), - "-d", r.DatabaseName, - "-U", r.Credentials.Username, - "-f", r.ConfigPath, - "standby", "clone", "-F"); err != nil { - return fmt.Errorf("failed to regenerate replication conf: %s", err) - } - return nil -} - type Member struct { ID int Hostname string @@ -449,56 +431,26 @@ func (*RepMgr) MemberByHostname(ctx context.Context, pg *pgx.Conn, hostname stri return &member, nil } -// MemberBy6PN returns a member by its 6PN address. -func (r *RepMgr) MemberBy6PN(ctx context.Context, pg *pgx.Conn, ip string) (*Member, error) { - members, err := r.Members(ctx, pg) - if err != nil { - return nil, err - } - - resolver := privnet.GetResolver() - var lastErr error - for _, member := range members { - ips, err := resolver.LookupIPAddr(ctx, member.Hostname) - if err != nil { - lastErr = err - continue - } - - for _, addr := range ips { - if addr.IP.String() == ip { - return &member, nil - } - } - } - - if lastErr != nil { - return nil, fmt.Errorf("no matches found for %s, and error encountered: %s", ip, lastErr) - } - - return nil, nil -} - func (r *RepMgr) ResolveMemberOverDNS(ctx context.Context) (*Member, error) { - machineIds, err := r.InRegionPeerMachines(ctx) + ips, err := r.InRegionPeerIPs(ctx) if err != nil { return nil, err } var target *Member - for _, machineId := range machineIds { - if machineId == r.MachineID { + for _, ip := range ips { + if ip.String() == r.PrivateIP { continue } - conn, err := r.NewRemoteConnection(ctx, r.machineIdToDNS(machineId)) + conn, err := r.NewRemoteConnection(ctx, ip.String()) if err != nil { continue } defer func() { _ = conn.Close(ctx) }() - member, err := r.MemberByHostname(ctx, conn, r.machineIdToDNS(machineId)) + member, err := r.MemberByHostname(ctx, conn, ip.String()) if err != nil { continue } @@ -525,21 +477,6 @@ func (r *RepMgr) InRegionPeerIPs(ctx context.Context) ([]net.IPAddr, error) { return privnet.AllPeers(ctx, targets) } -func (r *RepMgr) InRegionPeerMachines(ctx context.Context) ([]string, error) { - machines, err := privnet.AllMachines(ctx, r.AppName) - if err != nil { - return nil, err - } - - var machineIDs []string - for _, machine := range machines { - if machine.Region == r.PrimaryRegion { - machineIDs = append(machineIDs, machine.Id) - } - } - return machineIDs, nil -} - func (r *RepMgr) HostInRegion(ctx context.Context, hostname string) (bool, error) { ips, err := r.InRegionPeerIPs(ctx) if err != nil { @@ -577,11 +514,3 @@ func (r *RepMgr) UnregisterMember(member Member) error { func (r *RepMgr) eligiblePrimary() bool { return r.Region == r.PrimaryRegion } - -func (r *RepMgr) machineIdToDNS(nodeName string) string { - if len(nodeName) != 14 { - panic("invalid machine id") - } - - return fmt.Sprintf("%s.vm.%s.internal", nodeName, r.AppName) -} diff --git a/internal/flypg/repmgr_test.go b/internal/flypg/repmgr_test.go index d5233805..8251c7d4 100644 --- a/internal/flypg/repmgr_test.go +++ b/internal/flypg/repmgr_test.go @@ -33,7 +33,6 @@ func TestRepmgrInitialization(t *testing.T) { UserConfigPath: repgmrUserConfigFilePath, PasswordConfigPath: repgmrPasswordConfigFilePath, DataDir: repmgrTestDirectory, - MachineID: "abcdefg1234567", PrivateIP: "127.0.0.1", Credentials: admin.Credential{ Username: "user", @@ -92,8 +91,8 @@ func TestRepmgrInitialization(t *testing.T) { t.Fatal(err) } - if config["node_name"] != "'abcdefg1234567.vm.test-app.internal'" { - t.Fatalf("expected node_name to be 'abcdefg1234567.vm.test-app.internal', got %v", config["node_name"]) + if config["node_name"] != "'127.0.0.1'" { + t.Fatalf("expected node_name to be '127.0.0.1', got %v", config["node_name"]) } if config["location"] != "'dev'" { @@ -123,7 +122,6 @@ func TestRepmgrNodeIDGeneration(t *testing.T) { DataDir: repmgrTestDirectory, PrivateIP: "127.0.0.1", - MachineID: "abcdefg1234567", Port: 5433, DatabaseName: "repmgr", Credentials: admin.Credential{ diff --git a/internal/flypg/zombie.go b/internal/flypg/zombie.go index 6380338e..7204846d 100644 --- a/internal/flypg/zombie.go +++ b/internal/flypg/zombie.go @@ -5,6 +5,7 @@ import ( "errors" "fmt" "log" + "net" "os" "github.com/fly-apps/postgres-flex/internal/utils" @@ -84,7 +85,7 @@ type DNASample struct { func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASample, error) { sample := &DNASample{ - hostname: node.RepMgr.machineIdToDNS(node.MachineID), + hostname: node.PrivateIP, totalMembers: len(standbys) + 1, totalActive: 1, totalInactive: 0, @@ -117,8 +118,7 @@ func TakeDNASample(ctx context.Context, node *Node, standbys []Member) (*DNASamp sample.totalActive++ // Record conflict when primary doesn't match. - // We're checking PrivateIP here for backwards compatibility - if primary.Hostname != node.RepMgr.machineIdToDNS(node.MachineID) && primary.Hostname != node.PrivateIP { + if primary.Hostname != node.PrivateIP { sample.totalConflicts++ sample.conflictMap[primary.Hostname]++ } @@ -199,19 +199,24 @@ func handleZombieLock(ctx context.Context, n *Node) error { // If the zombie lock contains a hostname, it means we were able to // resolve the real primary and will attempt to rejoin it. if primaryStr != "" { - conn, err := n.RepMgr.NewRemoteConnection(ctx, primaryStr) + ip := net.ParseIP(primaryStr) + if ip == nil { + return fmt.Errorf("zombie.lock file contains an invalid ipv6 address") + } + + conn, err := n.RepMgr.NewRemoteConnection(ctx, ip.String()) if err != nil { - return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", primaryStr, err) + return fmt.Errorf("failed to establish a connection to our rejoin target %s: %s", ip.String(), err) } defer func() { _ = conn.Close(ctx) }() primary, err := n.RepMgr.PrimaryMember(ctx, conn) if err != nil { - return fmt.Errorf("failed to confirm primary on recover target %s: %s", primaryStr, err) + return fmt.Errorf("failed to confirm primary on recover target %s: %s", ip.String(), err) } // Confirm that our rejoin target still identifies itself as the primary. - if primary.Hostname != primaryStr { + if primary.Hostname != ip.String() { // Clear the zombie.lock file so we can attempt to re-resolve the correct primary. if err := RemoveZombieLock(); err != nil { return fmt.Errorf("failed to remove zombie lock: %s", err) diff --git a/internal/privnet/sixpn.go b/internal/privnet/sixpn.go index 5232b9b8..85b1eaeb 100644 --- a/internal/privnet/sixpn.go +++ b/internal/privnet/sixpn.go @@ -14,7 +14,20 @@ func AllPeers(ctx context.Context, appName string) ([]net.IPAddr, error) { } func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { - r := GetResolver() + nameserver := os.Getenv("FLY_NAMESERVER") + if nameserver == "" { + nameserver = "fdaa::3" + } + nameserver = net.JoinHostPort(nameserver, "53") + r := &net.Resolver{ + PreferGo: true, + Dial: func(ctx context.Context, network, address string) (net.Conn, error) { + d := net.Dialer{ + Timeout: 1 * time.Second, + } + return d.DialContext(ctx, "udp6", nameserver) + }, + } ips, err := r.LookupIPAddr(ctx, hostname) if err != nil { @@ -41,49 +54,6 @@ func Get6PN(ctx context.Context, hostname string) ([]net.IPAddr, error) { return ips, err } -type Machine struct { - Id string - Region string -} - -func AllMachines(ctx context.Context, appName string) ([]Machine, error) { - r := GetResolver() - txts, err := r.LookupTXT(ctx, fmt.Sprintf("vms.%s.internal", appName)) - if err != nil { - return nil, err - } - - machines := make([]Machine, 0) - for _, txt := range txts { - parts := strings.Split(txt, ",") - for _, part := range parts { - parts := strings.Split(part, " ") - if len(parts) != 2 { - return nil, fmt.Errorf("invalid machine DNS TXT format: %s", txt) - } - machines = append(machines, Machine{Id: parts[0], Region: parts[1]}) - } - } - return machines, nil -} - -func GetResolver() *net.Resolver { - nameserver := os.Getenv("FLY_NAMESERVER") - if nameserver == "" { - nameserver = "fdaa::3" - } - nameserver = net.JoinHostPort(nameserver, "53") - return &net.Resolver{ - PreferGo: true, - Dial: func(ctx context.Context, _, _ string) (net.Conn, error) { - d := net.Dialer{ - Timeout: 1 * time.Second, - } - return d.DialContext(ctx, "udp6", nameserver) - }, - } -} - func PrivateIPv6() (net.IP, error) { ips, err := net.LookupIP("fly-local-6pn") if err != nil && !strings.HasSuffix(err.Error(), "no such host") && !strings.HasSuffix(err.Error(), "server misbehaving") {