diff --git a/cmd/monitor/main.go b/cmd/monitor/main.go index 402e8717..a11f48b7 100644 --- a/cmd/monitor/main.go +++ b/cmd/monitor/main.go @@ -4,9 +4,11 @@ import ( "context" "fmt" "log" + "os" "time" "github.com/fly-apps/postgres-flex/internal/flypg" + "github.com/fly-apps/postgres-flex/internal/flypg/state" ) var ( @@ -16,8 +18,13 @@ var ( defaultDeadMemberRemovalThreshold = time.Hour * 12 defaultInactiveSlotRemovalThreshold = time.Hour * 12 + + defaultBackupRetentionEvalFrequency = time.Hour * 12 + defaultFullBackupSchedule = time.Hour * 24 ) +// TODO - Harden this so one failure doesn't take down the whole monitor + func main() { ctx := context.Background() @@ -28,19 +35,69 @@ func main() { panic(fmt.Sprintf("failed to reference node: %s\n", err)) } + // Wait for postgres to boot and become accessible. + log.Println("Waiting for Postgres to be ready...") + waitOnPostgres(ctx, node) + log.Println("Postgres is ready to accept connections. Starting monitor...") + // Dead member monitor - log.Println("Monitoring dead members") go func() { if err := monitorDeadMembers(ctx, node); err != nil { panic(err) } }() + if os.Getenv("S3_ARCHIVE_CONFIG") != "" { + store, err := state.NewStore() + if err != nil { + panic(fmt.Errorf("failed initialize cluster state store: %s", err)) + } + + barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile) + if err != nil { + panic(err) + } + + if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil { + panic(err) + } + + // Backup scheduler + go monitorBackupSchedule(ctx, barman) + + // Backup retention monitor + go monitorBackupRetention(ctx, barman) + } + // Readonly monitor - log.Println("Monitoring cluster state") go monitorClusterState(ctx, node) // Replication slot monitor - log.Println("Monitoring replication slots") monitorReplicationSlots(ctx, node) } + +func waitOnPostgres(ctx context.Context, node *flypg.Node) { + ticker := time.NewTicker(5 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return + case <-ticker.C: + conn, err := node.NewLocalConnection(ctx, "postgres", node.SUCredentials) + if err != nil { + log.Printf("failed to open local connection: %s", err) + continue + } + defer func() { _ = conn.Close(ctx) }() + + if err := conn.Ping(ctx); err != nil { + log.Printf("failed to ping local connection: %s", err) + continue + } + + return + } + } +} diff --git a/cmd/monitor/monitor_backup_retention.go b/cmd/monitor/monitor_backup_retention.go new file mode 100644 index 00000000..c2b5915c --- /dev/null +++ b/cmd/monitor/monitor_backup_retention.go @@ -0,0 +1,31 @@ +package main + +import ( + "context" + "log" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg" +) + +func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) { + ticker := time.NewTicker(defaultBackupRetentionEvalFrequency) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Println("Shutting down backup retention monitor") + return + case <-ticker.C: + result, err := barman.WALArchiveDelete(ctx) + if err != nil { + log.Printf("Backup retention failed with: %s", err) + } + + if len(result) > 0 { + log.Printf("Backup retention response: %s", result) + } + } + } +} diff --git a/cmd/monitor/monitor_backup_schedule.go b/cmd/monitor/monitor_backup_schedule.go new file mode 100644 index 00000000..f26b7a28 --- /dev/null +++ b/cmd/monitor/monitor_backup_schedule.go @@ -0,0 +1,123 @@ +package main + +import ( + "context" + "fmt" + "log" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg" +) + +func monitorBackupSchedule(ctx context.Context, barman *flypg.Barman) { + // Determine when the last backup was taken. + lastBackupTime, err := barman.LastCompletedBackup(ctx) + if err != nil { + log.Printf("Failed to resolve the last backup taken: %s", err) + } + + fullBackupSchedule := defaultFullBackupSchedule + + // Set the full backup schedule if it is defined in the configuration. + if barman.Settings.FullBackupFrequency != "" { + fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency) + switch { + case err != nil: + log.Printf("Failed to parse full backup frequency: %s", err) + default: + fullBackupSchedule = fullBackupDur + } + } + + // Ensure we have a least one backup before proceeding. + if lastBackupTime.IsZero() { + log.Println("No backups found! Performing the initial base backup.") + + if err := performInitialBaseBackup(ctx, barman); err != nil { + log.Printf("Failed to perform the initial full backup: %s", err) + log.Printf("Backup scheduler will re-attempt in %s.", fullBackupSchedule) + } + + lastBackupTime = time.Now() + } + + log.Printf("Full backup schedule set to: %s", fullBackupSchedule) + + // Calculate the time until the next backup is due. + timeUntilNextBackup := time.Until(lastBackupTime.Add(fullBackupSchedule)) + + // Perform backup immediately if the time until the next backup is negative. + if timeUntilNextBackup < 0 { + log.Println("Performing full backup now") + _, err := barman.Backup(ctx, false) + if err != nil { + log.Printf("Full backup failed with: %s", err) + } + + timeUntilNextBackup = fullBackupSchedule + } + + log.Printf("Next full backup due in: %s", timeUntilNextBackup) + + ticker := time.NewTicker(timeUntilNextBackup) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + log.Println("Shutting down backup scheduler") + return + case <-ticker.C: + // Perform a backup while passively waiting for the checkpoint process to complete. + // This could actually take a while, so we should be prepared to wait. + log.Println("Performing full backup") + _, err := barman.Backup(ctx, false) + if err != nil { + // TODO - Implement a backup-off strategy. + timeUntilNextBackup = time.Hour * 1 + ticker.Reset(timeUntilNextBackup) + + log.Printf("Backup retention failed with: %s.", err) + log.Printf("Backup will be re-attempted in %s.", timeUntilNextBackup) + + continue + } + + log.Printf("Full backup completed successfully") + ticker.Reset(fullBackupSchedule) + } + } +} + +func performInitialBaseBackup(ctx context.Context, barman *flypg.Barman) error { + maxRetries := 10 + retryCount := 0 + for { + select { + case <-ctx.Done(): + return nil + default: + _, err := barman.Backup(ctx, true) + if err != nil { + log.Printf("Failed to perform the initial full backup: %s. Retrying in 30 seconds.", err) + + // If we've exceeded the maximum number of retries, we should return an error. + if retryCount >= maxRetries { + return fmt.Errorf("failed to perform the initial full backup after %d retries", maxRetries) + } + + retryCount++ + + select { + case <-ctx.Done(): + return ctx.Err() + case <-time.After(time.Second * 30): + continue + } + } + + log.Println("Initial full backup completed successfully") + return nil + } + } +} diff --git a/cmd/monitor/monitor_dead_members.go b/cmd/monitor/monitor_dead_members.go index 78370ff9..5602cc06 100644 --- a/cmd/monitor/monitor_dead_members.go +++ b/cmd/monitor/monitor_dead_members.go @@ -37,8 +37,6 @@ func monitorDeadMembers(ctx context.Context, node *flypg.Node) error { ticker := time.NewTicker(deadMemberMonitorFrequency) defer ticker.Stop() - log.Printf("Pruning every %s...\n", removalThreshold) - for range ticker.C { err := deadMemberMonitorTick(ctx, node, seenAt, removalThreshold) if err != nil { diff --git a/cmd/start/main.go b/cmd/start/main.go index abe8a8c3..e9b7005f 100644 --- a/cmd/start/main.go +++ b/cmd/start/main.go @@ -23,6 +23,7 @@ func main() { } } + // Deprecated - We are moving away from having a separate barman Machine if os.Getenv("IS_BARMAN") != "" { node, err := flybarman.NewNode() if err != nil { @@ -54,6 +55,14 @@ func main() { return } + // TODO - Find a better way to handle this + if os.Getenv("S3_ARCHIVE_CONFIG") != "" || os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG") != "" { + if err := os.Setenv("AWS_SHARED_CREDENTIALS_FILE", "/data/.aws/credentials"); err != nil { + panicHandler(err) + return + } + } + node, err := flypg.NewNode() if err != nil { panicHandler(err) diff --git a/internal/api/handle_admin.go b/internal/api/handle_admin.go index 89ff78be..7148dcf1 100644 --- a/internal/api/handle_admin.go +++ b/internal/api/handle_admin.go @@ -4,6 +4,7 @@ import ( "encoding/json" "fmt" "net/http" + "os" "strings" "github.com/fly-apps/postgres-flex/internal/flypg" @@ -298,3 +299,100 @@ func handleViewRepmgrSettings(w http.ResponseWriter, r *http.Request) { resp := &Response{Result: out} renderJSON(w, resp, http.StatusOK) } + +func handleViewBarmanSettings(w http.ResponseWriter, _ *http.Request) { + if os.Getenv("S3_ARCHIVE_CONFIG") == "" { + renderErr(w, fmt.Errorf("barman is not enabled")) + return + } + + store, err := state.NewStore() + if err != nil { + renderErr(w, err) + return + } + + barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile) + if err != nil { + renderErr(w, err) + return + } + + if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil { + renderErr(w, err) + return + } + + all, err := barman.CurrentConfig() + if err != nil { + renderErr(w, err) + return + } + + resp := &Response{Result: all} + renderJSON(w, resp, http.StatusOK) +} + +func handleUpdateBarmanSettings(w http.ResponseWriter, r *http.Request) { + if os.Getenv("S3_ARCHIVE_CONFIG") == "" { + renderErr(w, fmt.Errorf("barman is not enabled")) + return + } + + store, err := state.NewStore() + if err != nil { + renderErr(w, err) + return + } + + barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile) + if err != nil { + renderErr(w, err) + return + } + + if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil { + renderErr(w, err) + return + } + + cfg, err := flypg.ReadFromFile(barman.UserConfigFile()) + if err != nil { + renderErr(w, err) + return + } + + var requestedChanges map[string]interface{} + if err := json.NewDecoder(r.Body).Decode(&requestedChanges); err != nil { + renderErr(w, err) + return + } + + if err := barman.Validate(requestedChanges); err != nil { + renderErr(w, err) + return + } + + for k, v := range requestedChanges { + cfg[k] = v + } + + barman.SetUserConfig(cfg) + + if err := flypg.PushUserConfig(barman, store); err != nil { + renderErr(w, err) + return + } + + if err := flypg.SyncUserConfig(barman, store); err != nil { + renderErr(w, err) + return + } + + res := &Response{Result: SettingsUpdate{ + Message: "Updated", + RestartRequired: true, + }} + + renderJSON(w, res, http.StatusOK) +} diff --git a/internal/api/handler.go b/internal/api/handler.go index 204147b0..ee37b77a 100644 --- a/internal/api/handler.go +++ b/internal/api/handler.go @@ -60,7 +60,11 @@ func Handler() http.Handler { r.Get("/role", handleRole) r.Get("/settings/view/postgres", handleViewPostgresSettings) r.Get("/settings/view/repmgr", handleViewRepmgrSettings) + r.Get("/settings/view/barman", handleViewBarmanSettings) + r.Post("/settings/update/postgres", handleUpdatePostgresSettings) + r.Post("/settings/update/barman", handleUpdateBarmanSettings) + r.Post("/settings/apply", handleApplyConfig) }) diff --git a/internal/flybarman/node.go b/internal/flybarman/node.go index 455d9cd5..2c1fe0d4 100644 --- a/internal/flybarman/node.go +++ b/internal/flybarman/node.go @@ -139,8 +139,8 @@ wal_retention_policy = main return fmt.Errorf("failed to write file %s: %s", n.RootPasswordConfigPath, err) } - barmanCronFileContent := `* * * * * /usr/local/bin/barman_cron -` + barmanCronFileContent := `* * * * * /usr/local/bin/barman_cron` + if err := os.WriteFile(n.BarmanCronFile, []byte(barmanCronFileContent), 0644); err != nil { return fmt.Errorf("failed write %s: %s", n.BarmanCronFile, err) } diff --git a/internal/flypg/barman.go b/internal/flypg/barman.go new file mode 100644 index 00000000..176a3959 --- /dev/null +++ b/internal/flypg/barman.go @@ -0,0 +1,265 @@ +package flypg + +import ( + "context" + "encoding/json" + "fmt" + "net/url" + "os" + "strings" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg/state" + "github.com/fly-apps/postgres-flex/internal/utils" +) + +const ( + providerDefault = "aws-s3" + // DefaultAuthProfile is the default AWS profile used for barman operations. + DefaultAuthProfile = "barman" + // RestoreAuthProfile is the AWS profile used for barman restore operations. + RestoreAuthProfile = "restore" +) + +type Barman struct { + appName string + provider string + endpoint string + bucket string + bucketDirectory string + authProfile string + store *state.Store + + *BarmanConfig +} + +type Backup struct { + Status string `json:"status"` + BackupID string `json:"backup_id"` + StartTime string `json:"begin_time"` + EndTime string `json:"end_time"` + BeginWal string `json:"begin_wal"` + EndWal string `json:"end_wal"` +} + +type BackupList struct { + Backups []Backup `json:"backups_list"` +} + +// NewBarman creates a new Barman instance. +// The configURL is expected to be in the format: +// https://s3-access-key:s3-secret-key@s3-endpoint/bucket/bucket-directory +func NewBarman(store *state.Store, configURL, authProfile string) (*Barman, error) { + parsedURL, err := url.Parse(configURL) + if err != nil { + return nil, fmt.Errorf("invalid credential url: %w", err) + } + + endpoint := parsedURL.Host + if endpoint == "" { + return nil, fmt.Errorf("object storage endpoint missing") + } + + path := strings.TrimLeft(parsedURL.Path, "/") + if path == "" { + return nil, fmt.Errorf("bucket and directory missing") + } + + pathSlice := strings.Split(path, "/") + if len(pathSlice) != 2 { + return nil, fmt.Errorf("invalid bucket and directory format") + } + + // Extract user information for credentials (not used here but necessary for the complete parsing) + username := parsedURL.User.Username() + password, _ := parsedURL.User.Password() + + // Ensure the credentials are not empty + if username == "" || password == "" { + return nil, fmt.Errorf("access key or secret key is missing") + } + + return &Barman{ + appName: os.Getenv("FLY_APP_NAME"), + provider: providerDefault, + endpoint: fmt.Sprintf("https://%s", endpoint), + bucket: pathSlice[0], + bucketDirectory: pathSlice[1], + authProfile: authProfile, + store: store, + }, nil +} + +func (b *Barman) LoadConfig(configDir string) error { + barCfg, err := NewBarmanConfig(b.store, configDir) + if err != nil { + return err + } + + b.BarmanConfig = barCfg + + return nil +} + +func (b *Barman) BucketURL() string { + return fmt.Sprintf("s3://%s", b.bucket) +} + +// Backup performs a base backup of the database. +// immediateCheckpoint - forces the initial checkpoint to be done as quickly as possible. +func (b *Barman) Backup(ctx context.Context, immediateCheckpoint bool) ([]byte, error) { + args := []string{ + "--cloud-provider", providerDefault, + "--endpoint-url", b.endpoint, + "--profile", b.authProfile, + "--host", fmt.Sprintf("%s.internal", b.appName), + "--user", "repmgr", + b.BucketURL(), + b.bucketDirectory, + } + + if immediateCheckpoint { + args = append(args, "--immediate-checkpoint") + } + + return utils.RunCmd(ctx, "postgres", "barman-cloud-backup", args...) +} + +// RestoreBackup returns the command string used to restore a base backup. +func (b *Barman) RestoreBackup(ctx context.Context, backupID string) ([]byte, error) { + args := []string{ + "--cloud-provider", providerDefault, + "--endpoint-url", b.endpoint, + "--profile", b.authProfile, + b.BucketURL(), + b.bucketDirectory, + backupID, + defaultRestoreDir, + } + + return utils.RunCmd(ctx, "postgres", "barman-cloud-restore", args...) +} + +func (b *Barman) ListBackups(ctx context.Context) (BackupList, error) { + args := []string{ + "--cloud-provider", providerDefault, + "--endpoint-url", b.endpoint, + "--profile", b.authProfile, + "--format", "json", + b.BucketURL(), + b.bucketDirectory, + } + + backupsBytes, err := utils.RunCmd(ctx, "postgres", "barman-cloud-backup-list", args...) + if err != nil { + return BackupList{}, fmt.Errorf("failed to list backups: %s", err) + } + + return b.parseBackups(backupsBytes) +} + +func (b *Barman) WALArchiveDelete(ctx context.Context) ([]byte, error) { + args := []string{ + "--cloud-provider", providerDefault, + "--endpoint-url", b.endpoint, + "--profile", b.authProfile, + "--retention", b.Settings.RecoveryWindow, + "--minimum-redundancy", b.Settings.MinimumRedundancy, + b.BucketURL(), + b.bucketDirectory, + } + + return utils.RunCmd(ctx, "postgres", "barman-cloud-backup-delete", args...) +} + +func (b *Barman) ListCompletedBackups(ctx context.Context) (BackupList, error) { + backups, err := b.ListBackups(ctx) + if err != nil { + return BackupList{}, fmt.Errorf("failed to list backups: %s", err) + } + + var completedBackups BackupList + + for _, backup := range backups.Backups { + if backup.Status == "DONE" { + completedBackups.Backups = append(completedBackups.Backups, backup) + } + } + + return completedBackups, nil +} + +func (b *Barman) LastCompletedBackup(ctx context.Context) (time.Time, error) { + backups, err := b.ListCompletedBackups(ctx) + if err != nil { + return time.Time{}, fmt.Errorf("failed to list backups: %s", err) + } + + if len(backups.Backups) == 0 { + return time.Time{}, nil + } + + // Layout used by barman. + layout := "Mon Jan 2 15:04:05 2006" + + var latestBackupTime time.Time + + // Sort the backups start time + for _, backup := range backups.Backups { + startTime, err := time.Parse(layout, backup.StartTime) + if err != nil { + return time.Time{}, fmt.Errorf("failed to parse backup start time: %s", err) + } + + if latestBackupTime.IsZero() || startTime.After(latestBackupTime) { + latestBackupTime = startTime + } + } + + return latestBackupTime, nil +} + +func (b *Barman) walArchiveCommand() string { + // TODO - Make compression configurable + return fmt.Sprintf("barman-cloud-wal-archive --cloud-provider %s --gzip --endpoint-url %s --profile %s %s %s %%p", + b.provider, + b.endpoint, + b.authProfile, + b.BucketURL(), + b.bucketDirectory, + ) +} + +// walRestoreCommand returns the command string used to restore WAL files. +// The %f and %p placeholders are replaced with the file path and file name respectively. +func (b *Barman) walRestoreCommand() string { + return fmt.Sprintf("barman-cloud-wal-restore --cloud-provider %s --endpoint-url %s --profile %s %s %s %%f %%p", + b.provider, + b.endpoint, + b.authProfile, + b.BucketURL(), + b.bucketDirectory, + ) +} + +func (*Barman) parseBackups(backupBytes []byte) (BackupList, error) { + var backupList BackupList + + if err := json.Unmarshal(backupBytes, &backupList); err != nil { + return BackupList{}, fmt.Errorf("failed to parse backups: %s", err) + } + + return backupList, nil +} + +func formatTimestamp(timestamp string) (string, error) { + if strings.HasSuffix(timestamp, "Z") { + timestamp = strings.TrimSuffix(timestamp, "Z") + "+00:00" + } + parsedTime, err := time.Parse(time.RFC3339, timestamp) + if err != nil { + return "", fmt.Errorf("failed to parse timestamp: %s", err) + } + + return parsedTime.Format("2006-01-02T15:04:05-07:00"), nil +} diff --git a/internal/flypg/barman_config.go b/internal/flypg/barman_config.go new file mode 100644 index 00000000..48fa4c55 --- /dev/null +++ b/internal/flypg/barman_config.go @@ -0,0 +1,206 @@ +package flypg + +import ( + "fmt" + "log" + "os" + "regexp" + "strconv" + "strings" + "time" + + "github.com/fly-apps/postgres-flex/internal/flypg/state" +) + +type BarmanSettings struct { + ArchiveTimeout string + RecoveryWindow string + FullBackupFrequency string + MinimumRedundancy string +} + +type BarmanConfig struct { + internalConfigFilePath string + userConfigFilePath string + internalConfig ConfigMap + userConfig ConfigMap + + Settings BarmanSettings +} + +const ( + barmanConsulKey = "BarmanConfig" + DefaultBarmanConfigDir = "/data/barman/" +) + +// type assertion +var _ Config = &BarmanConfig{} + +func (c *BarmanConfig) InternalConfig() ConfigMap { return c.internalConfig } +func (c *BarmanConfig) UserConfig() ConfigMap { return c.userConfig } +func (*BarmanConfig) ConsulKey() string { return barmanConsulKey } +func (c *BarmanConfig) SetUserConfig(newConfig ConfigMap) { c.userConfig = newConfig } +func (c *BarmanConfig) InternalConfigFile() string { return c.internalConfigFilePath } +func (c *BarmanConfig) UserConfigFile() string { return c.userConfigFilePath } + +func NewBarmanConfig(store *state.Store, configDir string) (*BarmanConfig, error) { + cfg := &BarmanConfig{ + internalConfigFilePath: configDir + "barman.internal.conf", + userConfigFilePath: configDir + "barman.user.conf", + } + + if err := cfg.initialize(store, configDir); err != nil { + return nil, err + } + + return cfg, nil +} + +func (c *BarmanConfig) SetDefaults() { + c.internalConfig = ConfigMap{ + "archive_timeout": "60s", + "recovery_window": "7d", + "full_backup_frequency": "24h", + "minimum_redundancy": "3", + } +} + +func (c *BarmanConfig) CurrentConfig() (ConfigMap, error) { + internal, err := ReadFromFile(c.InternalConfigFile()) + if err != nil { + return nil, err + } + + user, err := ReadFromFile(c.UserConfigFile()) + if err != nil { + return nil, err + } + + all := ConfigMap{} + + for k, v := range internal { + all[k] = v + } + for k, v := range user { + all[k] = v + } + + return all, nil +} + +// ParseSettings reads the current config and returns the settings in a structured format. +func (c *BarmanConfig) ParseSettings() (BarmanSettings, error) { + cfg, err := c.CurrentConfig() + if err != nil { + return BarmanSettings{}, fmt.Errorf("failed to read current config: %s", err) + } + + recoveryWindow := fmt.Sprintf("RECOVERY WINDOW OF %s", + convertRecoveryWindowDuration(cfg["recovery_window"].(string))) + + return BarmanSettings{ + ArchiveTimeout: cfg["archive_timeout"].(string), + RecoveryWindow: recoveryWindow, + FullBackupFrequency: cfg["full_backup_frequency"].(string), + MinimumRedundancy: cfg["minimum_redundancy"].(string), + }, nil +} + +func (c *BarmanConfig) Validate(requestedChanges map[string]interface{}) error { + // Verify that the keys provided are valid + for k := range requestedChanges { + if _, ok := c.internalConfig[k]; !ok { + return fmt.Errorf("invalid key: %s", k) + } + } + + for k, v := range requestedChanges { + switch k { + case "archive_timeout": + // Ensure that the value is a valid duration + if _, err := time.ParseDuration(v.(string)); err != nil { + return fmt.Errorf("invalid value for archive_timeout: %v", v) + } + case "recovery_window": + // Ensure that the value is a valid duration + re := regexp.MustCompile(`^(\d+)([dwy])$`) + matches := re.FindStringSubmatch(v.(string)) + if len(matches) != 3 { + return fmt.Errorf("invalid value for recovery_window: %v", v) + } + + num, err := strconv.Atoi(matches[1]) + if err != nil { + return fmt.Errorf("failed to parse recovery_window: %w", err) + } + + if num < 1 { + return fmt.Errorf("invalid value for recovery_window (expected to be >= 1, got %v)", num) + } + + case "full_backup_frequency": + dur, err := time.ParseDuration(v.(string)) + if err != nil { + return fmt.Errorf("invalid value for full_backup_frequency: %v", v) + } + + if dur.Hours() < 1 { + return fmt.Errorf("invalid value for full_backup_frequency (expected to be >= 1h, got %v)", dur) + } + case "minimum_redundancy": + val, err := strconv.Atoi(v.(string)) + if err != nil { + return fmt.Errorf("invalid value for minimum_redundancy: %v", v) + } + + if val < 0 { + return fmt.Errorf("invalid value for minimum_redundancy (expected be >= 0, got %v)", val) + } + } + } + + return nil +} + +func (c *BarmanConfig) initialize(store *state.Store, configDir string) error { + // Ensure directory exists + if err := os.MkdirAll(configDir, 0600); err != nil { + return fmt.Errorf("failed to create barman config directory: %s", err) + } + + c.SetDefaults() + + if err := SyncUserConfig(c, store); err != nil { + log.Printf("[WARN] Failed to sync user config from consul for barman: %s\n", err.Error()) + if err := writeInternalConfigFile(c); err != nil { + return fmt.Errorf("failed to write barman config files: %s", err) + } + } else { + if err := WriteConfigFiles(c); err != nil { + return fmt.Errorf("failed to write barman config files: %s", err) + } + } + + settings, err := c.ParseSettings() + if err != nil { + return fmt.Errorf("failed to parse barman config: %w", err) + } + + c.Settings = settings + + return nil +} + +func convertRecoveryWindowDuration(durationStr string) string { + unitMap := map[string]string{ + "m": "MONTHS", + "w": "WEEKS", + "d": "DAYS", + } + for unit, text := range unitMap { + if strings.HasSuffix(durationStr, unit) { + return strings.TrimSuffix(durationStr, unit) + " " + text + } + } + return durationStr +} diff --git a/internal/flypg/barman_config_test.go b/internal/flypg/barman_config_test.go new file mode 100644 index 00000000..edeaf759 --- /dev/null +++ b/internal/flypg/barman_config_test.go @@ -0,0 +1,134 @@ +package flypg + +import ( + "testing" + + "github.com/fly-apps/postgres-flex/internal/flypg/state" +) + +const ( + barmanConfigTestDir = "./test_results/barman" +) + +func TestValidateBarmanConfig(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + store, _ := state.NewStore() + + b, err := NewBarmanConfig(store, barmanConfigTestDir) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + t.Run("valid-config", func(t *testing.T) { + + conf := ConfigMap{ + "archive_timeout": "120s", + "recovery_window": "7d", + "full_backup_frequency": "24h", + "minimum_redundancy": "3", + } + + if err := b.Validate(conf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("invalid-archive-timeout", func(t *testing.T) { + conf := ConfigMap{ + "archive_timeout": "120seconds", + } + + if err := b.Validate(conf); err == nil { + t.Fatalf("expected error, got nil") + } + }) + + t.Run("invalid-recovery-window", func(t *testing.T) { + conf := ConfigMap{ + "recovery_window": "10seconds", + } + + if err := b.Validate(conf); err == nil { + t.Fatalf("expected error, got nil") + } + + conf = ConfigMap{ + "recovery_window": "1m", + } + + if err := b.Validate(conf); err == nil { + t.Fatalf("expected error, got nil") + } + + conf = ConfigMap{ + "recovery_window": "0w", + } + if err := b.Validate(conf); err == nil { + t.Fatalf("expected error, got nil") + } + }) + + t.Run("invalid-full-backup-frequency", func(t *testing.T) { + conf := ConfigMap{ + "full_backup_frequency": "10seconds", + } + + if err := b.Validate(conf); err == nil { + t.Fatalf("expected error, got nil") + } + + conf = ConfigMap{ + "full_backup_frequency": "1m", + } + + if err := b.Validate(conf); err == nil { + t.Fatalf("expected error, got nil") + } + + conf = ConfigMap{ + "full_backup_frequency": "0w", + } + if err := b.Validate(conf); err == nil { + t.Fatalf("expected invalid value for recovery_window (expected to be >= 1, got, got 1") + } + }) + + t.Run("invalid-minimum-redundancy", func(t *testing.T) { + conf := ConfigMap{ + "minimum_redundancy": "-1", + } + + if err := b.Validate(conf); err == nil { + t.Fatalf("expected error, got nil") + } + }) +} + +func TestBarmanConfigSettings(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + store, _ := state.NewStore() + + t.Run("defaults", func(t *testing.T) { + b, err := NewBarmanConfig(store, barmanConfigTestDir) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if b.Settings.MinimumRedundancy != "3" { + t.Fatalf("expected minimumRedundancy to be 3, but got %s", b.Settings.MinimumRedundancy) + } + + if b.Settings.FullBackupFrequency != "24h" { + t.Fatalf("expected fullBackupFrequency to be 24h, but got %s", b.Settings.FullBackupFrequency) + } + + }) +} diff --git a/internal/flypg/barman_restore.go b/internal/flypg/barman_restore.go new file mode 100644 index 00000000..8ec6d51a --- /dev/null +++ b/internal/flypg/barman_restore.go @@ -0,0 +1,302 @@ +package flypg + +import ( + "context" + "fmt" + "log" + "net/url" + "os" + "time" + + "github.com/fly-apps/postgres-flex/internal/supervisor" +) + +type BarmanRestore struct { + *Barman + + // Target parameters + // For more information on these parameters, see: + // https://www.postgresql.org/docs/current/runtime-config-wal.html#RUNTIME-CONFIG-WAL-RECOVERY-TARGET + recoveryTarget string + recoveryTargetName string + recoveryTargetTime string + + recoveryTargetTimeline string + recoveryTargetAction string + recoveryTargetInclusive string +} + +const ( + defaultRestoreDir = "/data/postgresql" + waitOnRecoveryTimeout = 10 * time.Minute +) + +func NewBarmanRestore(configURL string) (*BarmanRestore, error) { + // We only need access to the barman endpoints + barman, err := NewBarman(nil, configURL, RestoreAuthProfile) + if err != nil { + return nil, fmt.Errorf("failed to create barman client: %s", err) + } + + url, err := url.Parse(configURL) + if err != nil { + return nil, fmt.Errorf("invalid restore config url: %w", err) + } + + restore := &BarmanRestore{Barman: barman} + + for key, value := range url.Query() { + v := value[0] + + switch key { + case "target": + restore.recoveryTarget = v + case "targetName": + restore.recoveryTargetName = v + case "targetInclusive": + restore.recoveryTargetInclusive = v + case "targetAction": + restore.recoveryTargetAction = v + case "targetTime": + ts, err := formatTimestamp(v) + if err != nil { + return nil, fmt.Errorf("failed to parse target time: %s", err) + } + restore.recoveryTargetTime = ts + case "targetTimeline": + restore.recoveryTargetTimeline = v + default: + log.Printf("[WARN] unknown query parameter: %s. ignoring.", key) + } + } + + if restore.recoveryTargetAction == "" { + restore.recoveryTargetAction = "promote" + } + + return restore, nil +} + +func (*BarmanRestore) walReplayAndReset(ctx context.Context, node *Node) error { + // create a copy of the pg_hba.conf file so we can revert back to it when needed. + if err := backupHBAFile(); err != nil { + if os.IsNotExist(err) { + return nil + } + return fmt.Errorf("failed backing up pg_hba.conf: %s", err) + } + + // Grant local access so we can update internal credentials to match the environment. + if err := grantLocalAccess(); err != nil { + return fmt.Errorf("failed to grant local access: %s", err) + } + + // Boot postgres and wait for WAL replay to complete + svisor := supervisor.New("flypg", 5*time.Minute) + svisor.AddProcess("restore", fmt.Sprintf("gosu postgres postgres -D /data/postgresql -p 5433 -h %s", node.PrivateIP)) + + // Start the postgres process in the background. + go func() { + if err := svisor.Run(); err != nil { + log.Printf("[ERROR] failed to boot postgres in the background: %s", err) + } + }() + + // Wait for the WAL replay to complete. + if err := waitOnRecovery(ctx, node.PrivateIP); err != nil { + return fmt.Errorf("failed to monitor recovery mode: %s", err) + } + + // Open read/write connection + conn, err := openConn(ctx, node.PrivateIP) + if err != nil { + return fmt.Errorf("failed to establish connection to local node: %s", err) + } + defer func() { _ = conn.Close(ctx) }() + + // Drop repmgr database to clear any metadata that belonged to the old cluster. + if _, err = conn.Exec(ctx, "DROP DATABASE repmgr;"); err != nil { + return fmt.Errorf("failed to drop repmgr database: %s", err) + } + + // Ensure auth is configured to match the environment. + if err := node.setupCredentials(ctx, conn); err != nil { + return fmt.Errorf("failed creating required users: %s", err) + } + + // Stop the postgres process + svisor.Stop() + + // Revert back to the original config file + if err := restoreHBAFile(); err != nil { + return fmt.Errorf("failed to restore original pg_hba.conf: %s", err) + } + + if err := conn.Close(ctx); err != nil { + return fmt.Errorf("failed to close connection: %s", err) + } + + return nil +} + +func (b *BarmanRestore) restoreFromBackup(ctx context.Context) error { + backups, err := b.ListCompletedBackups(ctx) + if err != nil { + return fmt.Errorf("failed to list backups: %s", err) + } + + if len(backups.Backups) == 0 { + return fmt.Errorf("no backups found") + } + + var backupID string + + switch { + case b.recoveryTarget != "": + backupID, err = b.resolveBackupFromTime(backups, time.Now().Format(time.RFC3339)) + if err != nil { + return fmt.Errorf("failed to resolve backup target by time: %s", err) + } + case b.recoveryTargetTime != "": + backupID, err = b.resolveBackupFromTime(backups, b.recoveryTargetTime) + if err != nil { + return fmt.Errorf("failed to resolve backup target by time: %s", err) + } + case b.recoveryTargetName != "": + // Resolve the target base backup + backupID, err = b.resolveBackupFromID(backups, b.recoveryTargetName) + if err != nil { + return fmt.Errorf("failed to resolve backup target by id: %s", err) + } + default: + backupID, err = b.resolveBackupFromTime(backups, time.Now().Format(time.RFC3339)) + if err != nil { + return fmt.Errorf("failed to resolve backup target by time: %s", err) + } + } + + if backupID == "" { + return fmt.Errorf("no backup found") + } + + // Download and restore the base backup + if _, err := b.RestoreBackup(ctx, backupID); err != nil { + return fmt.Errorf("failed to restore backup: %s", err) + } + + // Write the recovery.signal file + if err := os.WriteFile("/data/postgresql/recovery.signal", []byte(""), 0600); err != nil { + return fmt.Errorf("failed to write recovery.signal: %s", err) + } + + return nil +} + +func (*BarmanRestore) resolveBackupFromID(backupList BackupList, id string) (string, error) { + if len(backupList.Backups) == 0 { + return "", fmt.Errorf("no backups found") + } + + for _, backup := range backupList.Backups { + if backup.BackupID == id { + return backup.BackupID, nil + } + } + + return "", fmt.Errorf("no backup found with id %s", id) +} + +func (*BarmanRestore) resolveBackupFromTime(backupList BackupList, restoreStr string) (string, error) { + if len(backupList.Backups) == 0 { + return "", fmt.Errorf("no backups found") + } + + var restoreTime time.Time + + // Parse the restore string + restoreTime, err := time.Parse(time.RFC3339, restoreStr) + if err != nil { + return "", fmt.Errorf("failed to parse restore time: %s", err) + } + + latestBackupID := "" + latestStartTime := time.Time{} + + earliestBackupID := "" + earliestEndTime := time.Time{} + + // This is the layout presented by barman + layout := "Mon Jan 2 15:04:05 2006" + + for _, backup := range backupList.Backups { + // Skip backups that that failed or are in progress + // TODO - This shouldn't be needed, but will keep it around until we can improve tests. + if backup.Status != "DONE" { + continue + } + + // Parse the backup start time + startTime, err := time.Parse(layout, backup.StartTime) + if err != nil { + return "", fmt.Errorf("failed to parse backup start time: %s", err) + } + // Parse the backup start time + endTime, err := time.Parse(layout, backup.EndTime) + if err != nil { + return "", fmt.Errorf("failed to parse backup end time: %s", err) + } + // Check if the restore time falls within the backup window + if restoreTime.After(startTime) && restoreTime.Before(endTime) { + return backup.BackupID, nil + } + + // Track the latest and earliest backups in case the restore time is outside + // the available backup windows + if latestBackupID == "" || startTime.After(latestStartTime) { + latestBackupID = backup.BackupID + latestStartTime = startTime + } + + if earliestBackupID == "" || endTime.Before(earliestEndTime) { + earliestBackupID = backup.BackupID + earliestEndTime = endTime + } + } + + // if the restore time is before the earliest backup, restore the earliest backup + if restoreTime.Before(earliestEndTime) { + return earliestBackupID, nil + } + + return latestBackupID, nil +} + +func waitOnRecovery(ctx context.Context, privateIP string) error { + conn, err := openConn(ctx, privateIP) + if err != nil { + return fmt.Errorf("failed to establish connection to local node: %s", err) + } + defer func() { _ = conn.Close(ctx) }() + + // TODO - Figure out a more reasonable timeout to use here. + timeout := time.After(waitOnRecoveryTimeout) + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return ctx.Err() + case <-timeout: + return fmt.Errorf("timed out waiting for PG to exit recovery mode") + case <-ticker.C: + var inRecovery bool + if err := conn.QueryRow(ctx, "SELECT pg_is_in_recovery();").Scan(&inRecovery); err != nil { + return fmt.Errorf("failed to check recovery status: %w", err) + } + if !inRecovery { + return nil + } + } + } +} diff --git a/internal/flypg/barman_restore_test.go b/internal/flypg/barman_restore_test.go new file mode 100644 index 00000000..b6f3f32f --- /dev/null +++ b/internal/flypg/barman_restore_test.go @@ -0,0 +1,368 @@ +package flypg + +import ( + "fmt" + "os" + "testing" +) + +const backupsResponse = `{ + "backups_list": [ + { + "backup_label": null, + "begin_offset": 216, + "begin_time": "Tue Jun 24 19:44:20 2024", + "begin_wal": "00000005000000000000002A", + "begin_xlog": "0/2A0000D8", + "compression": null, + "config_file": "/data/postgresql/postgresql.conf", + "copy_stats": null, + "deduplicated_size": null, + "end_offset": null, + "end_time": null, + "end_wal": null, + "end_xlog": null, + "error": "failure uploading data (connection already closed)", + "hba_file": "/data/postgresql/pg_hba.conf", + "ident_file": "/data/postgresql/pg_ident.conf", + "included_files": [ + "/data/postgresql/postgresql.auto.conf", + "/data/postgresql/postgresql.internal.conf" + ], + "mode": null, + "pgdata": "/data/postgresql", + "server_name": "cloud", + "size": null, + "status": "FAILED", + "systemid": "7332222271544570189", + "tablespaces": null, + "timeline": 5, + "version": 150006, + "xlog_segment_size": 16777216, + "backup_id": "20240702T210544" + }, + { + "backup_label": "'START WAL LOCATION: 0/8000028 (file 000000010000000000000008)\\nCHECKPOINT LOCATION: 0/8000098\\nBACKUP METHOD: streamed\\nBACKUP FROM: primary\\nSTART TIME: 2024-06-25 19:44:13 UTC\\nLABEL: Barman backup cloud 20240625T194412\\nSTART TIMELINE: 1\\n'", + "backup_name": "test-backup-2", + "begin_offset": 40, + "begin_time": "Tue Jun 25 19:44:12 2024", + "begin_wal": "000000010000000000000008", + "begin_xlog": "0/8000028", + "compression": null, + "config_file": "/data/postgresql/postgresql.conf", + "copy_stats": { + "total_time": 8.527192, + "number_of_workers": 2, + "analysis_time": 0, + "analysis_time_per_item": { + "data": 0 + }, + "copy_time_per_item": { + "data": 0.624873 + }, + "serialized_copy_time_per_item": { + "data": 0.430501 + }, + "copy_time": 0.624873, + "serialized_copy_time": 0.430501 + }, + "deduplicated_size": null, + "end_offset": 312, + "end_time": "Tue Jun 25 19:44:18 2024", + "end_wal": "000000010000000000000008", + "end_xlog": "0/8000138", + "error": null, + "hba_file": "/data/postgresql/pg_hba.conf", + "ident_file": "/data/postgresql/pg_ident.conf", + "included_files": [ + "/data/postgresql/postgresql.internal.conf" + ], + "mode": null, + "pgdata": "/data/postgresql", + "server_name": "cloud", + "size": null, + "status": "DONE", + "systemid": "7384497274230341974", + "tablespaces": null, + "timeline": 1, + "version": 150006, + "xlog_segment_size": 16777216, + "backup_id": "20240625T194412" + }, + { + "backup_label": "'START WAL LOCATION: 0/11000238 (file 000000010000000000000011)\\nCHECKPOINT LOCATION: 0/11000270\\nBACKUP METHOD: streamed\\nBACKUP FROM: primary\\nSTART TIME: 2024-06-26 17:26:53 UTC\\nLABEL: Barman backup cloud 20240626T172443\\nSTART TIMELINE: 1\\n'", + "begin_offset": 568, + "begin_time": "Wed Jun 26 17:24:43 2024", + "begin_wal": "000000010000000000000011", + "begin_xlog": "0/11000238", + "compression": null, + "config_file": "/data/postgresql/postgresql.conf", + "copy_stats": { + "total_time": 142.572774, + "number_of_workers": 2, + "analysis_time": 0, + "analysis_time_per_item": { + "data": 0 + }, + "copy_time_per_item": { + "data": 0.845993 + }, + "serialized_copy_time_per_item": { + "data": 0.545768 + }, + "copy_time": 0.845993, + "serialized_copy_time": 0.545768 + }, + "deduplicated_size": null, + "end_offset": 840, + "end_time": "Wed Jun 26 17:27:02 2024", + "end_wal": "000000010000000000000011", + "end_xlog": "0/11000348", + "error": null, + "hba_file": "/data/postgresql/pg_hba.conf", + "ident_file": "/data/postgresql/pg_ident.conf", + "included_files": [ + "/data/postgresql/postgresql.internal.conf" + ], + "mode": null, + "pgdata": "/data/postgresql", + "server_name": "cloud", + "size": null, + "status": "DONE", + "systemid": "7384497274230341974", + "tablespaces": null, + "timeline": 1, + "version": 150006, + "xlog_segment_size": 16777216, + "backup_id": "20240626T172443" + } + ] +}` + +func TestNewBarmanRestore(t *testing.T) { + setRestoreDefaultEnv(t) + t.Run("defaults", func(t *testing.T) { + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + t.Fatalf("NewBarmanRestore failed with: %v", err) + } + + if restore.bucket != "my-bucket" { + t.Fatalf("expected bucket to be my-bucket, got %s", restore.bucket) + } + + if restore.BucketURL() != "s3://my-bucket" { + t.Fatalf("expected bucket to be my-bucket, got %s", restore.bucket) + } + + if restore.bucketDirectory != "my-directory" { + t.Fatalf("expected bucket directory to be my-directory, got %s", restore.bucketDirectory) + } + + if restore.appName != "postgres-flex" { + t.Fatalf("expected app name to be postgres-flex, got %s", restore.appName) + } + + if restore.provider != "aws-s3" { + t.Fatalf("expected provider to be aws-s3, got %s", restore.provider) + } + + if restore.endpoint != "https://fly.storage.tigris.dev" { + t.Fatalf("expected endpoint to be https://fly.storage.tigris.dev, got %s", restore.endpoint) + } + + }) + + t.Run("target", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?target=immediate") + + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + t.Fatalf("NewBarmanRestore failed with: %v", err) + } + + if restore.recoveryTarget != "immediate" { + t.Fatalf("expected recovery target to be 'immediate', got %s", restore.recoveryTarget) + } + }) + + t.Run("target-time", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetTime=2024-07-03T17:55:22Z") + + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + t.Fatalf("NewBarmanRestore failed with: %v", err) + } + + if restore.recoveryTargetTime != "2024-07-03T17:55:22+00:00" { + t.Fatalf("expected recovery target time to be 2024-07-03T17:55:22+00:00, got %s", restore.recoveryTargetTime) + } + }) + + t.Run("target-name", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetName=20240705T051010") + + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + t.Fatalf("NewBarmanRestore failed with: %v", err) + } + + if restore.recoveryTargetName != "20240705T051010" { + t.Fatalf("expected recovery target name to be 20240705T051010, got %s", restore.recoveryTargetName) + } + }) + + t.Run("target-name-with-options", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetName=20240705T051010&targetAction=shutdown&targetTimeline=2&targetInclusive=false") + + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + t.Fatalf("NewBarmanRestore failed with: %v", err) + } + + if restore.recoveryTargetName != "20240705T051010" { + t.Fatalf("expected recovery target name to be 20240705T051010, got %s", restore.recoveryTargetName) + } + + if restore.recoveryTargetAction != "shutdown" { + t.Fatalf("expected recovery target action to be shutdown, got %s", restore.recoveryTargetAction) + } + + if restore.recoveryTargetTimeline != "2" { + t.Fatalf("expected recovery target timeline to be 2, got %s", restore.recoveryTargetTimeline) + } + + }) +} + +func TestWALRestoreCommand(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + setRestoreDefaultEnv(t) + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := fmt.Sprintf("barman-cloud-wal-restore --cloud-provider aws-s3 --endpoint-url https://fly.storage.tigris.dev --profile restore s3://my-bucket my-directory %%f %%p") + + if restore.walRestoreCommand() != expected { + t.Fatalf("expected WALRestoreCommand to be %s, but got %s", expected, restore.walRestoreCommand()) + } +} + +func TestParseBackups(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + t.Run("parseBackups", func(t *testing.T) { + setRestoreDefaultEnv(t) + + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + t.Fatalf("NewBarmanRestore failed with: %v", err) + } + + list, err := restore.parseBackups([]byte(backupsResponse)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if len(list.Backups) != 3 { + t.Fatalf("expected 2 backups, got %d", len(list.Backups)) + } + + firstBackup := list.Backups[0] + if firstBackup.BackupID != "20240702T210544" { + t.Fatalf("expected backup ID to be 20240625T194412, got %s", firstBackup.BackupID) + } + + if firstBackup.StartTime != "Tue Jun 24 19:44:20 2024" { + t.Fatalf("expected start time to be Tue Jun 24 19:44:20 2024, got %s", firstBackup.StartTime) + } + + if firstBackup.EndTime != "" { + t.Fatalf("expected end time to be empty, but got %s", firstBackup.EndTime) + } + + if firstBackup.Status != "FAILED" { + t.Fatalf("expected status to be FAILED, got %s", firstBackup.Status) + } + + secondBackup := list.Backups[2] + + if secondBackup.BackupID != "20240626T172443" { + t.Fatalf("expected backup ID to be 20240626T172443, got %s", secondBackup.BackupID) + } + + if secondBackup.StartTime != "Wed Jun 26 17:24:43 2024" { + t.Fatalf("expected start time to be Wed Jun 26 17:24:43 2024, got %s", secondBackup.StartTime) + } + + if secondBackup.EndTime != "Wed Jun 26 17:27:02 2024" { + t.Fatalf("expected end time to be Wed Jun 26 17:27:02 2024, got %s", secondBackup.EndTime) + } + }) +} + +func TestResolveBackupTarget(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + setRestoreDefaultEnv(t) + + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + t.Fatalf("NewBarmanRestore failed with: %v", err) + } + + list, err := restore.parseBackups([]byte(backupsResponse)) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + t.Run("resolve-earliest-backup-target", func(t *testing.T) { + backupID, err := restore.resolveBackupFromTime(list, "2024-06-25T19:44:12-00:00") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if backupID != "20240625T194412" { + t.Fatalf("expected backup ID to be 20240625T194412, got %s", backupID) + } + }) + + t.Run("resolve-backup-within-first-window", func(t *testing.T) { + backupID, err := restore.resolveBackupFromTime(list, "2024-06-25T19:44:15-00:00") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if backupID != "20240625T194412" { + t.Fatalf("expected backup ID to be 20240625T194412, got %s", backupID) + } + }) + + t.Run("resolve-backup-within-second-window", func(t *testing.T) { + backupID, err := restore.resolveBackupFromTime(list, "2024-06-26T17:25:15-00:00") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if backupID != "20240626T172443" { + t.Fatalf("expected backup ID to be 20240626T172443, got %s", backupID) + } + }) +} + +func setRestoreDefaultEnv(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") + t.Setenv("FLY_APP_NAME", "postgres-flex") +} diff --git a/internal/flypg/barman_test.go b/internal/flypg/barman_test.go new file mode 100644 index 00000000..3e800186 --- /dev/null +++ b/internal/flypg/barman_test.go @@ -0,0 +1,109 @@ +package flypg + +import ( + "os" + "testing" + + "github.com/fly-apps/postgres-flex/internal/flypg/state" +) + +const ( + testBarmanConfigDir = "./test_results/barman" +) + +func TestNewBarman(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + store, _ := state.NewStore() + + t.Run("defaults", func(t *testing.T) { + setDefaultEnv(t) + + configURL := os.Getenv("S3_ARCHIVE_CONFIG") + barman, err := NewBarman(store, configURL, DefaultAuthProfile) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if barman.provider != "aws-s3" { + t.Fatalf("expected provider to be aws, but got %s", barman.provider) + } + + if barman.endpoint != "https://fly.storage.tigris.dev" { + t.Fatalf("expected endpoint to be https://fly.storage.tigris.dev, but got %s", barman.endpoint) + } + + if barman.bucket != "my-bucket" { + t.Fatalf("expected bucket to be my-bucket, but got %s", barman.bucket) + } + + if barman.BucketURL() != "s3://my-bucket" { + t.Fatalf("expected bucket to be s3://my-bucket, but got %s", barman.bucket) + } + + if barman.bucketDirectory != "my-directory" { + t.Fatalf("expected directory to be my-directory, but got %s", barman.bucketDirectory) + } + + if barman.appName != "postgres-flex" { + t.Fatalf("expected appName to be postgres-flex, but got %s", barman.appName) + } + + // Defaults + if barman.Settings.MinimumRedundancy != "3" { + t.Fatalf("expected minimumRedundancy to be 3, but got %s", barman.Settings.MinimumRedundancy) + } + + if barman.Settings.RecoveryWindow != "RECOVERY WINDOW OF 7 DAYS" { + t.Fatalf("expected recovery_window to be 'RECOVERY WINDOW OF 7 DAYS', but got %s", barman.Settings.RecoveryWindow) + } + + if barman.Settings.FullBackupFrequency != "24h" { + t.Fatalf("expected fullBackupFrequency to be 24, but got %s", barman.Settings.FullBackupFrequency) + } + + if barman.Settings.ArchiveTimeout != "60s" { + t.Fatalf("expected archiveTimeout to be 60s, but got %s", barman.Settings.ArchiveTimeout) + } + }) +} + +func TestFormatTimestamp(t *testing.T) { + t.Run("valid", func(t *testing.T) { + ts, err := formatTimestamp("2024-07-03T17:55:22Z") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ts != "2024-07-03T17:55:22+00:00" { + t.Fatalf("expected timestamp to be 2024-07-03T17:55:22+00:00, but got %s", ts) + } + + ts, err = formatTimestamp("2024-07-03T17:55:22-07:00") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if ts != "2024-07-03T17:55:22-07:00" { + t.Fatalf("expected timestamp to be 2024-07-03T17:55:22-07:00, but got %s", ts) + } + }) + + t.Run("invalid", func(t *testing.T) { + _, err := formatTimestamp("2024-07-03T17:55:22Z07:00") + if err == nil { + t.Fatalf("unexpected error, but not nil") + } + }) +} + +func setDefaultEnv(t *testing.T) { + t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") + t.Setenv("FLY_APP_NAME", "postgres-flex") + +} diff --git a/internal/flypg/config.go b/internal/flypg/config.go index 2bd10bcd..5eac91d2 100644 --- a/internal/flypg/config.go +++ b/internal/flypg/config.go @@ -63,6 +63,7 @@ func SyncUserConfig(c Config, consul *state.Store) error { if cfg == nil { return nil } + c.SetUserConfig(cfg) if err := writeUserConfigFile(c); err != nil { @@ -134,7 +135,7 @@ func ReadFromFile(path string) (ConfigMap, error) { conf[key] = value } - return conf, file.Sync() + return conf, nil } func writeInternalConfigFile(c Config) error { @@ -159,6 +160,10 @@ func writeInternalConfigFile(c Config) error { return fmt.Errorf("failed to close file: %s", err) } + if os.Getenv("UNIT_TESTING") != "" { + return nil + } + if err := utils.SetFileOwnership(c.InternalConfigFile(), "postgres"); err != nil { return fmt.Errorf("failed to set file ownership on %s: %s", c.InternalConfigFile(), err) } @@ -186,6 +191,10 @@ func writeUserConfigFile(c Config) error { return fmt.Errorf("failed to close file: %s", err) } + if os.Getenv("UNIT_TESTING") != "" { + return nil + } + if err := utils.SetFileOwnership(c.UserConfigFile(), "postgres"); err != nil { return fmt.Errorf("failed to set file ownership on %s: %s", c.UserConfigFile(), err) } diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 5a9364b2..f309092a 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -7,7 +7,6 @@ import ( "log" "net" "os" - "os/exec" "strconv" "strings" "time" @@ -50,6 +49,7 @@ func NewNode() (*Node, error) { if err != nil { return nil, fmt.Errorf("failed getting private ip: %s", err) } + node.PrivateIP = ipv6.String() node.PrimaryRegion = os.Getenv("PRIMARY_REGION") @@ -109,6 +109,11 @@ func NewNode() (*Node, error) { repmgrDatabase: node.RepMgr.DatabaseName, } + if os.Getenv("S3_ARCHIVE_CONFIG") != "" { + // Specific PG configuration is injected when Barman is enabled. + node.PGConfig.barmanConfigPath = DefaultBarmanConfigDir + } + node.FlyConfig = FlyPGConfig{ internalConfigFilePath: "/data/flypg.internal.conf", userConfigFilePath: "/data/flypg.user.conf", @@ -119,21 +124,24 @@ func NewNode() (*Node, error) { func (n *Node) Init(ctx context.Context) error { // Ensure directory and files have proper permissions - if err := setDirOwnership(); err != nil { + if err := setDirOwnership(ctx, "/data"); err != nil { return fmt.Errorf("failed to set directory ownership: %s", err) } - // Check to see if we were just restored - if os.Getenv("FLY_RESTORED_FROM") != "" { - active, err := isRestoreActive() - if err != nil { - return fmt.Errorf("failed to verify active restore: %s", err) - } + store, err := state.NewStore() + if err != nil { + return fmt.Errorf("failed initialize cluster state store: %s", err) + } - if active { - if err := Restore(ctx, n); err != nil { - return fmt.Errorf("failed to issue restore: %s", err) - } + // Determine if we are performing a remote restore. + if err := n.handleRemoteRestore(ctx, store); err != nil { + return fmt.Errorf("failed to handle remote restore: %s", err) + } + + // Ensure we have the required s3 credentials set. + if os.Getenv("S3_ARCHIVE_CONFIG") != "" || os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG") != "" { + if err := writeS3Credentials(ctx, s3AuthDir); err != nil { + return fmt.Errorf("failed to write s3 credentials: %s", err) } } @@ -144,16 +152,10 @@ func (n *Node) Init(ctx context.Context) error { } } - err := WriteSSHKey() - if err != nil { + if err = WriteSSHKey(); err != nil { return fmt.Errorf("failed write ssh keys: %s", err) } - store, err := state.NewStore() - if err != nil { - return fmt.Errorf("failed initialize cluster state store: %s", err) - } - if err := n.RepMgr.initialize(); err != nil { return fmt.Errorf("failed to initialize repmgr: %s", err) } @@ -211,7 +213,7 @@ func (n *Node) Init(ctx context.Context) error { return fmt.Errorf("failed to initialize pg config: %s", err) } - if err := setDirOwnership(); err != nil { + if err := setDirOwnership(ctx, "/data"); err != nil { return fmt.Errorf("failed to set directory ownership: %s", err) } @@ -457,16 +459,67 @@ func openConnection(parentCtx context.Context, host string, database string, cre return pgx.ConnectConfig(ctx, conf) } -func setDirOwnership() error { +func setDirOwnership(ctx context.Context, pathToDir string) error { + if os.Getenv("UNIT_TESTING") == "true" { + return nil + } + pgUID, pgGID, err := utils.SystemUserIDs("postgres") if err != nil { return fmt.Errorf("failed to find postgres user ids: %s", err) } - cmdStr := fmt.Sprintf("chown -R %d:%d %s", pgUID, pgGID, "/data") - cmd := exec.Command("sh", "-c", cmdStr) - if _, err = cmd.Output(); err != nil { - return err + args := []string{"-R", fmt.Sprintf("%d:%d", pgUID, pgGID), pathToDir} + + if _, err := utils.RunCmd(ctx, "root", "chown", args...); err != nil { + return fmt.Errorf("failed to set directory ownership: %s", err) + } + + return nil +} + +func (n *Node) handleRemoteRestore(ctx context.Context, store *state.Store) error { + // Handle snapshot restore + if os.Getenv("FLY_RESTORED_FROM") != "" { + active, err := isRestoreActive() + if err != nil { + return fmt.Errorf("failed to verify active restore: %s", err) + } + + if !active { + return nil + } + + return prepareRemoteRestore(ctx, n) + } + + // WAL-based Restore + if os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG") != "" { + if n.PGConfig.isInitialized() { + log.Println("[INFO] Postgres directory present, ignoring `S3_ARCHIVE_REMOTE_RESTORE_CONFIG` ") + return nil + } + + log.Println("[INFO] Postgres directory not present, proceeding with remote restore.") + + // Initialize barman restore + restore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + return fmt.Errorf("failed to initialize barman restore: %s", err) + } + + // Restore base backup + if err := restore.restoreFromBackup(ctx); err != nil { + return fmt.Errorf("failed to restore base backup: %s", err) + } + + // Set restore configuration + if err := n.PGConfig.initialize(store); err != nil { + return fmt.Errorf("failed to initialize pg config: %s", err) + } + + // Replay WAL and reset the cluster + return restore.walReplayAndReset(ctx, n) } return nil diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index f67f9f33..730cbd5b 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -25,6 +25,8 @@ type PGConfig struct { Port int DataDir string + barmanConfigPath string + passwordFilePath string repmgrUsername string repmgrDatabase string @@ -110,7 +112,7 @@ func (c *PGConfig) Print(w io.Writer) error { return e.Encode(cfg) } -func (c *PGConfig) SetDefaults() error { +func (c *PGConfig) SetDefaults(store *state.Store) error { // The default wal_segment_size in mb const walSegmentSize = 16 @@ -171,6 +173,72 @@ func (c *PGConfig) SetDefaults() error { "shared_preload_libraries": fmt.Sprintf("'%s'", strings.Join(sharedPreloadLibraries, ",")), } + // Set WAL Archive specific settings + if err := c.setArchiveConfig(store); err != nil { + return fmt.Errorf("failed to set archive config: %s", err) + } + + // Set recovery target settings + if err := c.setRecoveryTargetConfig(); err != nil { + return fmt.Errorf("failed to set recovery target config: %s", err) + } + + return nil +} + +func (c *PGConfig) setArchiveConfig(store *state.Store) error { + if os.Getenv("S3_ARCHIVE_CONFIG") == "" { + c.internalConfig["archive_mode"] = "off" + return nil + } + + barman, err := NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) + if err != nil { + return fmt.Errorf("failed to initialize barman instance: %s", err) + } + + if err := barman.LoadConfig(c.barmanConfigPath); err != nil { + return fmt.Errorf("failed to load barman config: %s", err) + } + + c.internalConfig["archive_mode"] = "on" + c.internalConfig["archive_command"] = fmt.Sprintf("'%s'", barman.walArchiveCommand()) + c.internalConfig["archive_timeout"] = barman.Settings.ArchiveTimeout + + return nil +} + +func (c *PGConfig) setRecoveryTargetConfig() error { + if os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG") == "" { + return nil + } + + barmanRestore, err := NewBarmanRestore(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG")) + if err != nil { + return err + } + + // Set restore command and associated recovery target settings + c.internalConfig["restore_command"] = fmt.Sprintf("'%s'", barmanRestore.walRestoreCommand()) + c.internalConfig["recovery_target_action"] = barmanRestore.recoveryTargetAction + + if barmanRestore.recoveryTargetTimeline != "" { + c.internalConfig["recovery_target_timeline"] = barmanRestore.recoveryTargetTimeline + } + + if barmanRestore.recoveryTargetInclusive != "" { + c.internalConfig["recovery_target_inclusive"] = barmanRestore.recoveryTargetInclusive + } + + switch { + case barmanRestore.recoveryTarget != "": + c.internalConfig["recovery_target"] = barmanRestore.recoveryTarget + case barmanRestore.recoveryTargetName != "": + c.internalConfig["recovery_target_name"] = fmt.Sprintf("barman_%s", barmanRestore.recoveryTargetName) + case barmanRestore.recoveryTargetTime != "": + c.internalConfig["recovery_target_time"] = fmt.Sprintf("'%s'", barmanRestore.recoveryTargetTime) + } + return nil } @@ -225,7 +293,7 @@ func (c *PGConfig) initialize(store *state.Store) error { } } - if err := c.SetDefaults(); err != nil { + if err := c.SetDefaults(store); err != nil { return fmt.Errorf("failed to set pg defaults: %s", err) } diff --git a/internal/flypg/pg_test.go b/internal/flypg/pg_test.go index bee86622..a6d603d1 100644 --- a/internal/flypg/pg_test.go +++ b/internal/flypg/pg_test.go @@ -16,6 +16,7 @@ const ( pgInternalConfigFilePath = "./test_results/postgresql.internal.conf" pgUserConfigFilePath = "./test_results/postgresql.user.conf" pgPasswordFilePath = "./test_results/default_password" + barmanConfigDir = "./test_results/barman" pgHBAFilePath = "./test_results/pg_hba.conf" ) @@ -33,6 +34,7 @@ func TestPGConfigInitialization(t *testing.T) { InternalConfigFilePath: pgInternalConfigFilePath, UserConfigFilePath: pgUserConfigFilePath, passwordFilePath: pgPasswordFilePath, + barmanConfigPath: barmanConfigDir, } if err := stubPGConfigFile(); err != nil { @@ -112,7 +114,169 @@ func TestPGConfigInitialization(t *testing.T) { if cfg["shared_preload_libraries"] != expected { t.Fatalf("expected %s, got %s", expected, cfg["shared_preload_libraries"]) } + }) + + t.Run("barman-enabled", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") + + store, _ := state.NewStore() + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + if cfg["archive_mode"] != "on" { + t.Fatalf("expected archive_mode to be on, got %v", cfg["archive_mode"]) + } + + barman, err := NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) + if err != nil { + t.Fatal(err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatal(err) + } + + expected := fmt.Sprintf("'%s'", barman.walArchiveCommand()) + if cfg["archive_command"] != expected { + t.Fatalf("expected %s, got %s", expected, cfg["archive_command"]) + } + }) + + t.Run("barman-disabled", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") + store, _ := state.NewStore() + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["archive_mode"] != "on" { + t.Fatalf("expected archive_mode to be on, got %v", cfg["archive_mode"]) + } + + t.Setenv("S3_ARCHIVE_CONFIG", "") + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err = pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["archive_mode"] != "off" { + t.Fatalf("expected archive_mode to be off, got %v", cfg["archive_mode"]) + } + }) + + t.Run("barman-restore-from-time", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetTime=2024-06-30T11:15:00-06:00") + store, _ := state.NewStore() + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["recovery_target_time"] != "'2024-06-30T11:15:00-06:00'" { + t.Fatalf("expected recovery_target_time to be 2024-06-30T11:15:00-06:00, got %v", cfg["recovery_target_time"]) + } + }) + + t.Run("barman-restore-from-name", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetName=20240626T172443") + store, _ := state.NewStore() + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["recovery_target_name"] != "barman_20240626T172443" { + t.Fatalf("expected recovery_target_name to be barman_20240626T172443, got %v", cfg["recovery_target_name"]) + } + }) + + t.Run("barman-restore-from-target", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?target=immediate") + store, _ := state.NewStore() + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["recovery_target"] != "immediate" { + t.Fatalf("expected recovery_target to be immediate, got %v", cfg["recovery_target_name"]) + } + }) + + t.Run("barman-restore-from-target-time-non-inclusive", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetTime=2024-06-30T11:15:00Z&targetInclusive=false") + store, _ := state.NewStore() + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["recovery_target_time"] != "'2024-06-30T11:15:00+00:00'" { + t.Fatalf("expected recovery_target_time to be 2024-06-30T11:15:00+00:00, got %v", cfg["recovery_target_time"]) + } + + if cfg["recovery_target_inclusive"] != "false" { + t.Fatalf("expected recovery_target_inclusive to be false, got %v", cfg["recovery_target_inclusive"]) + } + }) + + t.Run("barman-restore-from-target-time-custom-timeline", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory?targetTime=2024-06-30T11:15:00-06:00&targetTimeline=2") + store, _ := state.NewStore() + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["recovery_target_time"] != "'2024-06-30T11:15:00-06:00'" { + t.Fatalf("expected recovery_target_time to be 2024-06-30T11:15:00-06:00, got %v", cfg["recovery_target_time"]) + } + + if cfg["recovery_target_timeline"] != "2" { + t.Fatalf("expected recovery_target_timeline to be 2, got %v", cfg["recovery_target_timeline"]) + } }) } diff --git a/internal/flypg/restore.go b/internal/flypg/restore.go index 70dc5160..aa927062 100644 --- a/internal/flypg/restore.go +++ b/internal/flypg/restore.go @@ -17,7 +17,12 @@ const ( restoreLockFile = "/data/restore.lock" ) -func Restore(ctx context.Context, node *Node) error { +// prepareRemoteRestore will reset the environment to a state where it can be restored +// from a remote backup. This process includes: +// * Clearing any locks that may have been set on the original cluster. +// * Dropping the repmgr database to clear any metadata that belonged to the old cluster. +// * Ensuring the internal user credentials match the environment. +func prepareRemoteRestore(ctx context.Context, node *Node) error { // Clear any locks that may have been set on the original cluster if err := clearLocks(); err != nil { return fmt.Errorf("failed to clear locks: %s", err) @@ -56,15 +61,14 @@ func Restore(ctx context.Context, node *Node) error { } }() - conn, err := openConn(ctx, node) + conn, err := openConn(ctx, node.PrivateIP) if err != nil { return fmt.Errorf("failed to establish connection to local node: %s", err) } defer func() { _ = conn.Close(ctx) }() // Drop repmgr database to clear any metadata that belonged to the old cluster. - sql := "DROP DATABASE repmgr;" - _, err = conn.Exec(ctx, sql) + _, err = conn.Exec(ctx, "DROP DATABASE repmgr;") if err != nil { return fmt.Errorf("failed to drop repmgr database: %s", err) } @@ -155,8 +159,7 @@ func restoreHBAFile() error { defer func() { _ = file.Close() }() // revert back to our original config - _, err = file.Write(data) - if err != nil { + if _, err = file.Write(data); err != nil { return err } @@ -175,24 +178,24 @@ func setRestoreLock() error { } defer func() { _ = file.Close() }() - _, err = file.WriteString(os.Getenv("FLY_APP_NAME")) - if err != nil { + if _, err = file.WriteString(os.Getenv("FLY_APP_NAME")); err != nil { return err } return file.Sync() } -func openConn(ctx context.Context, n *Node) (*pgx.Conn, error) { - url := fmt.Sprintf("postgres://[%s]:5433?target_session_attrs=any", n.PrivateIP) +func openConn(ctx context.Context, privateIP string) (*pgx.Conn, error) { + url := fmt.Sprintf("postgres://[%s]:5433?target_session_attrs=any", privateIP) + conf, err := pgx.ParseConfig(url) if err != nil { return nil, err } conf.User = "postgres" - // Allow up to 30 seconds for PG to boot and accept connections. - timeout := time.After(30 * time.Second) + // Allow up to 60 seconds for PG to boot and accept connections. + timeout := time.After(60 * time.Second) tick := time.NewTicker(1 * time.Second) defer tick.Stop() for { diff --git a/internal/flypg/s3_credentials.go b/internal/flypg/s3_credentials.go new file mode 100644 index 00000000..3b65da24 --- /dev/null +++ b/internal/flypg/s3_credentials.go @@ -0,0 +1,110 @@ +package flypg + +import ( + "context" + "fmt" + "net/url" + "os" + "path/filepath" +) + +const ( + s3AuthDir = "/data/.aws" + s3AuthFileName = "credentials" +) + +type s3Credentials struct { + profile string + accessKeyID string + secretAccessKey string +} + +func writeS3Credentials(ctx context.Context, s3AuthDir string) error { + var creds []*s3Credentials + + if os.Getenv("S3_ARCHIVE_CONFIG") != "" { + cred, err := parseCredentialsFromConfigURL(os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) + if err != nil { + return fmt.Errorf("failed to parse credentials from barman configURL: %v", err) + } + creds = append(creds, cred) + } + + if os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG") != "" { + cred, err := parseCredentialsFromConfigURL(os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG"), RestoreAuthProfile) + if err != nil { + return fmt.Errorf("failed to parse credentials from barman restore configURL: %v", err) + } + creds = append(creds, cred) + } + + if len(creds) == 0 { + return nil + } + + s3AuthFilePath := filepath.Join(s3AuthDir, s3AuthFileName) + + // Ensure the directory exists + if err := os.MkdirAll(s3AuthDir, 0700); err != nil { + return fmt.Errorf("failed to create AWS credentials directory: %w", err) + } + + // Write the credentials to disk + if err := writeCredentialsToFile(creds, s3AuthFilePath); err != nil { + return fmt.Errorf("failed to write credentials to file: %w", err) + } + + // Set file permissions + if err := os.Chmod(s3AuthFilePath, 0644); err != nil { + return fmt.Errorf("failed to set file permissions: %w", err) + } + + // Ensure the directory has the correct ownership + if err := setDirOwnership(ctx, s3AuthDir); err != nil { + return fmt.Errorf("failed to set directory ownership: %w", err) + } + + return nil +} + +func writeCredentialsToFile(credentials []*s3Credentials, pathToCredentialFile string) error { + file, err := os.Create(pathToCredentialFile) + if err != nil { + return fmt.Errorf("failed to create file: %w", err) + } + defer func() { _ = file.Close() }() + + // Write the credentials to disk + for _, cred := range credentials { + _, err := file.WriteString(fmt.Sprintf("[%s]\naws_access_key_id=%s\naws_secret_access_key=%s\n\n", + cred.profile, cred.accessKeyID, cred.secretAccessKey)) + if err != nil { + return fmt.Errorf("failed to write s3 profile %s: %w", cred.profile, err) + } + } + + return file.Sync() +} + +func parseCredentialsFromConfigURL(configURL, assignedProfile string) (*s3Credentials, error) { + barmanURL, err := url.Parse(configURL) + if err != nil { + return nil, fmt.Errorf("invalid configURL: %w", err) + } + + accessKey := barmanURL.User.Username() + if accessKey == "" { + return nil, fmt.Errorf("AWS ACCESS KEY is missing") + } + + secretAccessKey, _ := barmanURL.User.Password() + if secretAccessKey == "" { + return nil, fmt.Errorf("AWS SECRET KEY is missing") + } + + return &s3Credentials{ + profile: assignedProfile, + accessKeyID: accessKey, + secretAccessKey: secretAccessKey, + }, nil +} diff --git a/internal/flypg/s3_credentials_test.go b/internal/flypg/s3_credentials_test.go new file mode 100644 index 00000000..68ea5f7e --- /dev/null +++ b/internal/flypg/s3_credentials_test.go @@ -0,0 +1,94 @@ +package flypg + +import ( + "context" + "fmt" + "os" + "testing" + + "github.com/fly-apps/postgres-flex/internal/utils" +) + +func TestWriteAWSCredentials(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + authDir := "./test_results/.aws" + pathToCredentials := fmt.Sprintf("%s/credentials", authDir) + + t.Run("write-barman-credentials", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") + + if err := writeS3Credentials(context.TODO(), authDir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !utils.FileExists(pathToCredentials) { + t.Fatalf("expected %s to exist, but doesn't", pathToCredentials) + } + + // Check contents + contents, err := os.ReadFile(pathToCredentials) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "[barman]\naws_access_key_id=my-key\naws_secret_access_key=my-secret\n\n" + + if string(contents) != expected { + t.Fatalf("expected contents to be %s, but got %s", expected, string(contents)) + } + }) + + t.Run("write-restore-credentials", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://source-key:source-secret@fly.storage.tigris.dev/my-bucket/my-directory") + + if err := writeS3Credentials(context.TODO(), authDir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !utils.FileExists(pathToCredentials) { + t.Fatalf("expected %s to exist, but doesn't", pathToCredentials) + } + + // Check contents + contents, err := os.ReadFile(pathToCredentials) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "[restore]\naws_access_key_id=source-key\naws_secret_access_key=source-secret\n\n" + + if string(contents) != expected { + t.Fatalf("expected contents to be %s, but got %s", expected, string(contents)) + } + }) + + t.Run("write-barman-and-restore-credentials", func(t *testing.T) { + t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") + t.Setenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG", "https://source-key:source-secret@fly.storage.tigris.dev/source-bucket/source-directory") + + if err := writeS3Credentials(context.TODO(), authDir); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if !utils.FileExists(pathToCredentials) { + t.Fatalf("expected %s to exist, but doesn't", pathToCredentials) + } + + // Check contents + contents, err := os.ReadFile(pathToCredentials) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + expected := "[barman]\naws_access_key_id=my-key\naws_secret_access_key=my-secret\n\n[restore]\naws_access_key_id=source-key\naws_secret_access_key=source-secret\n\n" + + if string(contents) != expected { + t.Fatalf("expected contents to be %s, but got %s", expected, string(contents)) + } + }) + +} diff --git a/internal/utils/shell.go b/internal/utils/shell.go index ca79c1e9..0241f58e 100644 --- a/internal/utils/shell.go +++ b/internal/utils/shell.go @@ -2,6 +2,7 @@ package utils import ( "bytes" + "context" "fmt" "io" "log" @@ -12,18 +13,51 @@ import ( "syscall" ) -func RunCommand(cmdStr, usr string) ([]byte, error) { +func RunCmd(ctx context.Context, usr string, name string, args ...string) ([]byte, error) { uid, gid, err := SystemUserIDs(usr) if err != nil { return nil, err } - log.Printf("> Running command as %s: %s\n", usr, cmdStr) + cmd := exec.CommandContext(ctx, name, args...) + cmd.SysProcAttr = &syscall.SysProcAttr{} + cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)} + + if os.Getenv("DEBUG") != "" { + log.Printf("> Running command as %s: %s\n", usr, cmd.String()) + + var stdoutBuf, stderrBuf bytes.Buffer + cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) + cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf) + + err = cmd.Run() + if err != nil { + if ee, ok := err.(*exec.ExitError); ok { + ee.Stderr = stderrBuf.Bytes() + } + } + + return stdoutBuf.Bytes(), err + } + + return cmd.Output() +} + +// Deprecated, use RunCmd instead +func RunCommand(cmdStr, usr string) ([]byte, error) { + uid, gid, err := SystemUserIDs(usr) + if err != nil { + return nil, err + } cmd := exec.Command("sh", "-c", cmdStr) cmd.SysProcAttr = &syscall.SysProcAttr{} cmd.SysProcAttr.Credential = &syscall.Credential{Uid: uint32(uid), Gid: uint32(gid)} + if os.Getenv("DEBUG") != "" { + log.Printf("> Running command as %s: %s\n", usr, cmdStr) + } + var stdoutBuf, stderrBuf bytes.Buffer cmd.Stdout = io.MultiWriter(os.Stdout, &stdoutBuf) cmd.Stderr = io.MultiWriter(os.Stderr, &stderrBuf)