Skip to content

Commit

Permalink
WAL-Archiving + PITR (#231)
Browse files Browse the repository at this point in the history
* Allow cloud archiving to run next ot the primary

* fix test

* Retention work

* Cleanup

* fix log

* Fix deletion call

* Move the log under the debug flag

* Adding tests

* Checkin

* Checkin

* Add todo

* Update AWS_ENDPOINT_URL -> AWS_ENDPOINT_URL_S3

* Refactor

* Checkin

* Working PITR, but there's quite a bit of cleanup to do

* cleanup

* cleanup

* Didn't mean to remove this

* Set archive_timeout

* More cleanup

* fixes and cleanup

* Fixing up recovery time logic

* Adding todo's and removing unused code

* Unexport functions

* Separate credentials into separate profiles

* Fix errcheck

* Add todo

* static check fixes

* Make archive settings configurable

* Removing unnecessary change

* Cleanup

* Adding tests

* Bug fix

* Wait for PG to be up before monitoring.  Also removing some logging that is unnecessary

* Fix compile error

* Adding log indicating the monitor is starting

* Rename the env vars to something more appropriate

* Cleanup

* Fix that ensures the user configured full_backup_frequency is honored

* Cleanup

* Fixes issue where a failed backup was being selected as a base restore target

* Remove support for 'latest' string, since this should be controlled with the restore target

* Don't allow full_backup_frequencies lower than 1h

* Cleaning up the config validations

* Adding support for targetTimeline

* Do not perform a remote restore if there's an existing postgresql directory

* Revert back to RFC3339

* Postgres doesn't support 'Z', so trim it and conver to -00:00

* Moving back to RFC3339, however, 'Z' needs to be stripped and replaced with +00:00

* Allow target to be left out

* Cleanup

---------

Co-authored-by: Ben Iofel <[email protected]>
  • Loading branch information
davissp14 and benwaffle authored Jul 5, 2024
1 parent 84d5715 commit 135085d
Show file tree
Hide file tree
Showing 22 changed files with 2,288 additions and 49 deletions.
63 changes: 60 additions & 3 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
"github.com/fly-apps/postgres-flex/internal/flypg/state"
)

var (
Expand All @@ -16,8 +18,13 @@ var (

defaultDeadMemberRemovalThreshold = time.Hour * 12
defaultInactiveSlotRemovalThreshold = time.Hour * 12

defaultBackupRetentionEvalFrequency = time.Hour * 12
defaultFullBackupSchedule = time.Hour * 24
)

// TODO - Harden this so one failure doesn't take down the whole monitor

func main() {
ctx := context.Background()

Expand All @@ -28,19 +35,69 @@ func main() {
panic(fmt.Sprintf("failed to reference node: %s\n", err))
}

// Wait for postgres to boot and become accessible.
log.Println("Waiting for Postgres to be ready...")
waitOnPostgres(ctx, node)
log.Println("Postgres is ready to accept connections. Starting monitor...")

// Dead member monitor
log.Println("Monitoring dead members")
go func() {
if err := monitorDeadMembers(ctx, node); err != nil {
panic(err)
}
}()

if os.Getenv("S3_ARCHIVE_CONFIG") != "" {
store, err := state.NewStore()
if err != nil {
panic(fmt.Errorf("failed initialize cluster state store: %s", err))
}

barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile)
if err != nil {
panic(err)
}

if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil {
panic(err)
}

// Backup scheduler
go monitorBackupSchedule(ctx, barman)

// Backup retention monitor
go monitorBackupRetention(ctx, barman)
}

// Readonly monitor
log.Println("Monitoring cluster state")
go monitorClusterState(ctx, node)

// Replication slot monitor
log.Println("Monitoring replication slots")
monitorReplicationSlots(ctx, node)
}

func waitOnPostgres(ctx context.Context, node *flypg.Node) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
conn, err := node.NewLocalConnection(ctx, "postgres", node.SUCredentials)
if err != nil {
log.Printf("failed to open local connection: %s", err)
continue
}
defer func() { _ = conn.Close(ctx) }()

if err := conn.Ping(ctx); err != nil {
log.Printf("failed to ping local connection: %s", err)
continue
}

return
}
}
}
31 changes: 31 additions & 0 deletions cmd/monitor/monitor_backup_retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"context"
"log"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) {
ticker := time.NewTicker(defaultBackupRetentionEvalFrequency)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup retention monitor")
return
case <-ticker.C:
result, err := barman.WALArchiveDelete(ctx)
if err != nil {
log.Printf("Backup retention failed with: %s", err)
}

if len(result) > 0 {
log.Printf("Backup retention response: %s", result)
}
}
}
}
123 changes: 123 additions & 0 deletions cmd/monitor/monitor_backup_schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupSchedule(ctx context.Context, barman *flypg.Barman) {
// Determine when the last backup was taken.
lastBackupTime, err := barman.LastCompletedBackup(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
}

fullBackupSchedule := defaultFullBackupSchedule

// Set the full backup schedule if it is defined in the configuration.
if barman.Settings.FullBackupFrequency != "" {
fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency)
switch {
case err != nil:
log.Printf("Failed to parse full backup frequency: %s", err)
default:
fullBackupSchedule = fullBackupDur
}
}

// Ensure we have a least one backup before proceeding.
if lastBackupTime.IsZero() {
log.Println("No backups found! Performing the initial base backup.")

if err := performInitialBaseBackup(ctx, barman); err != nil {
log.Printf("Failed to perform the initial full backup: %s", err)
log.Printf("Backup scheduler will re-attempt in %s.", fullBackupSchedule)
}

lastBackupTime = time.Now()
}

log.Printf("Full backup schedule set to: %s", fullBackupSchedule)

// Calculate the time until the next backup is due.
timeUntilNextBackup := time.Until(lastBackupTime.Add(fullBackupSchedule))

// Perform backup immediately if the time until the next backup is negative.
if timeUntilNextBackup < 0 {
log.Println("Performing full backup now")
_, err := barman.Backup(ctx, false)
if err != nil {
log.Printf("Full backup failed with: %s", err)
}

timeUntilNextBackup = fullBackupSchedule
}

log.Printf("Next full backup due in: %s", timeUntilNextBackup)

ticker := time.NewTicker(timeUntilNextBackup)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup scheduler")
return
case <-ticker.C:
// Perform a backup while passively waiting for the checkpoint process to complete.
// This could actually take a while, so we should be prepared to wait.
log.Println("Performing full backup")
_, err := barman.Backup(ctx, false)
if err != nil {
// TODO - Implement a backup-off strategy.
timeUntilNextBackup = time.Hour * 1
ticker.Reset(timeUntilNextBackup)

log.Printf("Backup retention failed with: %s.", err)
log.Printf("Backup will be re-attempted in %s.", timeUntilNextBackup)

continue
}

log.Printf("Full backup completed successfully")
ticker.Reset(fullBackupSchedule)
}
}
}

func performInitialBaseBackup(ctx context.Context, barman *flypg.Barman) error {
maxRetries := 10
retryCount := 0
for {
select {
case <-ctx.Done():
return nil
default:
_, err := barman.Backup(ctx, true)
if err != nil {
log.Printf("Failed to perform the initial full backup: %s. Retrying in 30 seconds.", err)

// If we've exceeded the maximum number of retries, we should return an error.
if retryCount >= maxRetries {
return fmt.Errorf("failed to perform the initial full backup after %d retries", maxRetries)
}

retryCount++

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 30):
continue
}
}

log.Println("Initial full backup completed successfully")
return nil
}
}
}
2 changes: 0 additions & 2 deletions cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func monitorDeadMembers(ctx context.Context, node *flypg.Node) error {
ticker := time.NewTicker(deadMemberMonitorFrequency)
defer ticker.Stop()

log.Printf("Pruning every %s...\n", removalThreshold)

for range ticker.C {
err := deadMemberMonitorTick(ctx, node, seenAt, removalThreshold)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func main() {
}
}

// Deprecated - We are moving away from having a separate barman Machine
if os.Getenv("IS_BARMAN") != "" {
node, err := flybarman.NewNode()
if err != nil {
Expand Down Expand Up @@ -54,6 +55,14 @@ func main() {
return
}

// TODO - Find a better way to handle this
if os.Getenv("S3_ARCHIVE_CONFIG") != "" || os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG") != "" {
if err := os.Setenv("AWS_SHARED_CREDENTIALS_FILE", "/data/.aws/credentials"); err != nil {
panicHandler(err)
return
}
}

node, err := flypg.NewNode()
if err != nil {
panicHandler(err)
Expand Down
98 changes: 98 additions & 0 deletions internal/api/handle_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"

"github.com/fly-apps/postgres-flex/internal/flypg"
Expand Down Expand Up @@ -298,3 +299,100 @@ func handleViewRepmgrSettings(w http.ResponseWriter, r *http.Request) {
resp := &Response{Result: out}
renderJSON(w, resp, http.StatusOK)
}

func handleViewBarmanSettings(w http.ResponseWriter, _ *http.Request) {
if os.Getenv("S3_ARCHIVE_CONFIG") == "" {
renderErr(w, fmt.Errorf("barman is not enabled"))
return
}

store, err := state.NewStore()
if err != nil {
renderErr(w, err)
return
}

barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile)
if err != nil {
renderErr(w, err)
return
}

if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil {
renderErr(w, err)
return
}

all, err := barman.CurrentConfig()
if err != nil {
renderErr(w, err)
return
}

resp := &Response{Result: all}
renderJSON(w, resp, http.StatusOK)
}

func handleUpdateBarmanSettings(w http.ResponseWriter, r *http.Request) {
if os.Getenv("S3_ARCHIVE_CONFIG") == "" {
renderErr(w, fmt.Errorf("barman is not enabled"))
return
}

store, err := state.NewStore()
if err != nil {
renderErr(w, err)
return
}

barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile)
if err != nil {
renderErr(w, err)
return
}

if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil {
renderErr(w, err)
return
}

cfg, err := flypg.ReadFromFile(barman.UserConfigFile())
if err != nil {
renderErr(w, err)
return
}

var requestedChanges map[string]interface{}
if err := json.NewDecoder(r.Body).Decode(&requestedChanges); err != nil {
renderErr(w, err)
return
}

if err := barman.Validate(requestedChanges); err != nil {
renderErr(w, err)
return
}

for k, v := range requestedChanges {
cfg[k] = v
}

barman.SetUserConfig(cfg)

if err := flypg.PushUserConfig(barman, store); err != nil {
renderErr(w, err)
return
}

if err := flypg.SyncUserConfig(barman, store); err != nil {
renderErr(w, err)
return
}

res := &Response{Result: SettingsUpdate{
Message: "Updated",
RestartRequired: true,
}}

renderJSON(w, res, http.StatusOK)
}
Loading

0 comments on commit 135085d

Please sign in to comment.