Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WAL-Archiving + PITR #231

Merged
merged 51 commits into from
Jul 5, 2024
Merged
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
51 commits
Select commit Hold shift + click to select a range
261167d
Allow cloud archiving to run next ot the primary
davissp14 Jun 25, 2024
c7e764a
fix test
davissp14 Jun 25, 2024
479dd0f
Retention work
davissp14 Jun 26, 2024
8d497ea
Cleanup
davissp14 Jun 26, 2024
2b69519
fix log
davissp14 Jun 26, 2024
2cc0b45
Fix deletion call
davissp14 Jun 26, 2024
ba33e59
Move the log under the debug flag
davissp14 Jun 26, 2024
39bc76a
Adding tests
davissp14 Jun 26, 2024
78c3ac0
Checkin
davissp14 Jun 27, 2024
47a6f21
Checkin
davissp14 Jun 27, 2024
d91ea1a
Add todo
davissp14 Jun 27, 2024
96190c4
Update AWS_ENDPOINT_URL -> AWS_ENDPOINT_URL_S3
davissp14 Jun 27, 2024
1033d55
Refactor
davissp14 Jun 27, 2024
645e07d
Checkin
davissp14 Jun 27, 2024
49784fe
Working PITR, but there's quite a bit of cleanup to do
davissp14 Jun 29, 2024
f5e1353
cleanup
davissp14 Jun 29, 2024
c1b27e5
cleanup
davissp14 Jun 30, 2024
a3e3ced
Didn't mean to remove this
davissp14 Jun 30, 2024
f431edb
Set archive_timeout
davissp14 Jun 30, 2024
383da38
More cleanup
davissp14 Jul 1, 2024
b17193f
fixes and cleanup
davissp14 Jul 1, 2024
4d6c412
Fixing up recovery time logic
davissp14 Jul 1, 2024
900b8e6
Adding todo's and removing unused code
davissp14 Jul 1, 2024
8fa510c
Unexport functions
davissp14 Jul 1, 2024
4ddcecc
Separate credentials into separate profiles
davissp14 Jul 1, 2024
61eedd0
Fix errcheck
davissp14 Jul 1, 2024
ea0c59e
Add todo
davissp14 Jul 1, 2024
fd001e5
static check fixes
davissp14 Jul 1, 2024
dc6c13c
Make archive settings configurable
davissp14 Jul 2, 2024
eba8aa7
Removing unnecessary change
davissp14 Jul 2, 2024
644c0d1
Cleanup
davissp14 Jul 2, 2024
d19644c
Adding tests
davissp14 Jul 2, 2024
2ac4327
Bug fix
davissp14 Jul 2, 2024
6284d76
Wait for PG to be up before monitoring. Also removing some logging t…
davissp14 Jul 2, 2024
6a76b7b
Fix compile error
benwaffle Jul 2, 2024
c37df40
Adding log indicating the monitor is starting
davissp14 Jul 2, 2024
b7802e3
Rename the env vars to something more appropriate
davissp14 Jul 2, 2024
06c4695
Cleanup
davissp14 Jul 2, 2024
2029c2c
Fix that ensures the user configured full_backup_frequency is honored
davissp14 Jul 2, 2024
589866b
Cleanup
davissp14 Jul 3, 2024
5eaa2c2
Fixes issue where a failed backup was being selected as a base restor…
davissp14 Jul 3, 2024
5734e7c
Remove support for 'latest' string, since this should be controlled w…
davissp14 Jul 3, 2024
010ffb0
Don't allow full_backup_frequencies lower than 1h
davissp14 Jul 3, 2024
884a782
Cleaning up the config validations
davissp14 Jul 3, 2024
53c1112
Adding support for targetTimeline
davissp14 Jul 3, 2024
b8fdd5c
Do not perform a remote restore if there's an existing postgresql dir…
davissp14 Jul 3, 2024
096e3ad
Revert back to RFC3339
davissp14 Jul 3, 2024
fdbbc12
Postgres doesn't support 'Z', so trim it and conver to -00:00
davissp14 Jul 3, 2024
6e3aa73
Moving back to RFC3339, however, 'Z' needs to be stripped and replace…
davissp14 Jul 3, 2024
0f58fd5
Allow target to be left out
davissp14 Jul 5, 2024
5a6eae6
Cleanup
davissp14 Jul 5, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 60 additions & 3 deletions cmd/monitor/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,11 @@ import (
"context"
"fmt"
"log"
"os"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
"github.com/fly-apps/postgres-flex/internal/flypg/state"
)

var (
Expand All @@ -16,8 +18,13 @@ var (

defaultDeadMemberRemovalThreshold = time.Hour * 12
defaultInactiveSlotRemovalThreshold = time.Hour * 12

defaultBackupRetentionEvalFrequency = time.Hour * 12
defaultFullBackupSchedule = time.Hour * 24
)

// TODO - Harden this so one failure doesn't take down the whole monitor

func main() {
ctx := context.Background()

Expand All @@ -28,19 +35,69 @@ func main() {
panic(fmt.Sprintf("failed to reference node: %s\n", err))
}

// Wait for postgres to boot and become accessible.
log.Println("Waiting for Postgres to be ready...")
waitOnPostgres(ctx, node)
log.Println("Postgres is ready to accept connections. Starting monitor...")

// Dead member monitor
log.Println("Monitoring dead members")
go func() {
if err := monitorDeadMembers(ctx, node); err != nil {
panic(err)
}
}()

if os.Getenv("S3_ARCHIVE_CONFIG") != "" {
store, err := state.NewStore()
if err != nil {
panic(fmt.Errorf("failed initialize cluster state store: %s", err))
}

barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile)
if err != nil {
panic(err)
}

if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil {
panic(err)
}

// Backup scheduler
go monitorBackupSchedule(ctx, barman)

// Backup retention monitor
go monitorBackupRetention(ctx, barman)
}

// Readonly monitor
log.Println("Monitoring cluster state")
go monitorClusterState(ctx, node)

// Replication slot monitor
log.Println("Monitoring replication slots")
monitorReplicationSlots(ctx, node)
}

func waitOnPostgres(ctx context.Context, node *flypg.Node) {
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
conn, err := node.NewLocalConnection(ctx, "postgres", node.SUCredentials)
if err != nil {
log.Printf("failed to open local connection: %s", err)
continue
}
defer func() { _ = conn.Close(ctx) }()

if err := conn.Ping(ctx); err != nil {
log.Printf("failed to ping local connection: %s", err)
continue
}

return
}
}
}
31 changes: 31 additions & 0 deletions cmd/monitor/monitor_backup_retention.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package main

import (
"context"
"log"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupRetention(ctx context.Context, barman *flypg.Barman) {
ticker := time.NewTicker(defaultBackupRetentionEvalFrequency)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup retention monitor")
return
case <-ticker.C:
result, err := barman.WALArchiveDelete(ctx)
if err != nil {
log.Printf("Backup retention failed with: %s", err)
}

if len(result) > 0 {
log.Printf("Backup retention response: %s", result)
}
}
}
}
123 changes: 123 additions & 0 deletions cmd/monitor/monitor_backup_schedule.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/fly-apps/postgres-flex/internal/flypg"
)

func monitorBackupSchedule(ctx context.Context, barman *flypg.Barman) {
// Determine when the last backup was taken.
lastBackupTime, err := barman.LastBackupTaken(ctx)
if err != nil {
log.Printf("Failed to resolve the last backup taken: %s", err)
}

fullBackupSchedule := defaultFullBackupSchedule

// Set the full backup schedule if it is defined in the configuration.
if barman.Settings.FullBackupFrequency != "" {
fullBackupDur, err := time.ParseDuration(barman.Settings.FullBackupFrequency)
switch {
case err != nil:
log.Printf("Failed to parse full backup frequency: %s", err)
default:
fullBackupSchedule = fullBackupDur
}
}

// Ensure we have a least one backup before proceeding.
if lastBackupTime.IsZero() {
log.Println("No backups found! Performing the initial base backup.")

if err := performInitialBaseBackup(ctx, barman); err != nil {
log.Printf("Failed to perform the initial full backup: %s", err)
log.Printf("Backup scheduler will re-attempt in %s.", fullBackupSchedule)
}

lastBackupTime = time.Now()
}

log.Printf("Full backup schedule set to: %s", fullBackupSchedule)

// Calculate the time until the next backup is due.
timeUntilNextBackup := time.Until(lastBackupTime.Add(fullBackupSchedule))

// Perform backup immediately if the time until the next backup is negative.
if timeUntilNextBackup < 0 {
log.Println("Performing full backup now")
_, err := barman.Backup(ctx, false)
if err != nil {
log.Printf("Full backup failed with: %s", err)
}

timeUntilNextBackup = fullBackupSchedule
}

log.Printf("Next full backup due in: %s", timeUntilNextBackup)

ticker := time.NewTicker(timeUntilNextBackup)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
log.Println("Shutting down backup scheduler")
return
case <-ticker.C:
// Perform a backup while passively waiting for the checkpoint process to complete.
// This could actually take a while, so we should be prepared to wait.
log.Println("Performing full backup")
_, err := barman.Backup(ctx, false)
if err != nil {
// TODO - Implement a backup-off strategy.
timeUntilNextBackup = time.Hour * 1
ticker.Reset(timeUntilNextBackup)

log.Printf("Backup retention failed with: %s.", err)
log.Printf("Backup will be re-attempted in %s.", timeUntilNextBackup)

continue
}

log.Printf("Full backup completed successfully")
ticker.Reset(fullBackupSchedule)
}
}
}

func performInitialBaseBackup(ctx context.Context, barman *flypg.Barman) error {
maxRetries := 10
retryCount := 0
for {
select {
case <-ctx.Done():
return nil
default:
_, err := barman.Backup(ctx, true)
if err != nil {
log.Printf("Failed to perform the initial full backup: %s. Retrying in 30 seconds.", err)

// If we've exceeded the maximum number of retries, we should return an error.
if retryCount >= maxRetries {
return fmt.Errorf("failed to perform the initial full backup after %d retries", maxRetries)
}

retryCount++

select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(time.Second * 30):
continue
}
}

log.Println("Initial full backup completed successfully")
return nil
}
}
}
2 changes: 0 additions & 2 deletions cmd/monitor/monitor_dead_members.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,6 @@ func monitorDeadMembers(ctx context.Context, node *flypg.Node) error {
ticker := time.NewTicker(deadMemberMonitorFrequency)
defer ticker.Stop()

log.Printf("Pruning every %s...\n", removalThreshold)

for range ticker.C {
err := deadMemberMonitorTick(ctx, node, seenAt, removalThreshold)
if err != nil {
Expand Down
9 changes: 9 additions & 0 deletions cmd/start/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ func main() {
}
}

// Deprecated - We are moving away from having a separate barman Machine
if os.Getenv("IS_BARMAN") != "" {
node, err := flybarman.NewNode()
if err != nil {
Expand Down Expand Up @@ -54,6 +55,14 @@ func main() {
return
}

// TODO - Find a better way to handle this
if os.Getenv("S3_ARCHIVE_CONFIG") != "" || os.Getenv("S3_ARCHIVE_REMOTE_RESTORE_CONFIG") != "" {
if err := os.Setenv("AWS_SHARED_CREDENTIALS_FILE", "/data/.aws/credentials"); err != nil {
panicHandler(err)
return
}
}

node, err := flypg.NewNode()
if err != nil {
panicHandler(err)
Expand Down
98 changes: 98 additions & 0 deletions internal/api/handle_admin.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
"strings"

"github.com/fly-apps/postgres-flex/internal/flypg"
Expand Down Expand Up @@ -298,3 +299,100 @@ func handleViewRepmgrSettings(w http.ResponseWriter, r *http.Request) {
resp := &Response{Result: out}
renderJSON(w, resp, http.StatusOK)
}

func handleViewBarmanSettings(w http.ResponseWriter, _ *http.Request) {
if os.Getenv("S3_ARCHIVE_CONFIG") == "" {
renderErr(w, fmt.Errorf("barman is not enabled"))
return
}

store, err := state.NewStore()
if err != nil {
renderErr(w, err)
return
}

barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile)
if err != nil {
renderErr(w, err)
return
}

if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil {
renderErr(w, err)
return
}

all, err := barman.CurrentConfig()
if err != nil {
renderErr(w, err)
return
}

resp := &Response{Result: all}
renderJSON(w, resp, http.StatusOK)
}

func handleUpdateBarmanSettings(w http.ResponseWriter, r *http.Request) {
if os.Getenv("S3_ARCHIVE_CONFIG") == "" {
renderErr(w, fmt.Errorf("barman is not enabled"))
return
}

store, err := state.NewStore()
if err != nil {
renderErr(w, err)
return
}

barman, err := flypg.NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), flypg.DefaultAuthProfile)
if err != nil {
renderErr(w, err)
return
}

if err := barman.LoadConfig(flypg.DefaultBarmanConfigDir); err != nil {
renderErr(w, err)
return
}

cfg, err := flypg.ReadFromFile(barman.UserConfigFile())
if err != nil {
renderErr(w, err)
return
}

var requestedChanges map[string]interface{}
if err := json.NewDecoder(r.Body).Decode(&requestedChanges); err != nil {
renderErr(w, err)
return
}

if err := barman.Validate(requestedChanges); err != nil {
renderErr(w, err)
return
}

for k, v := range requestedChanges {
cfg[k] = v
}

barman.SetUserConfig(cfg)

if err := flypg.PushUserConfig(barman, store); err != nil {
renderErr(w, err)
return
}

if err := flypg.SyncUserConfig(barman, store); err != nil {
renderErr(w, err)
return
}

res := &Response{Result: SettingsUpdate{
Message: "Updated",
RestartRequired: true,
}}

renderJSON(w, res, http.StatusOK)
}
Loading
Loading