From d7afc0a4b0ec08bfa4983480d41c6ce2889faf74 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Tue, 25 Jun 2024 11:38:04 -0500 Subject: [PATCH] Adding cloud archiving support that runs along-side the primary --- bin/barman-cron | 18 ++++++++++++++++++ internal/flybarman/node.go | 4 ++-- internal/flypg/node.go | 1 + internal/flypg/pg.go | 36 ++++++++++++++++++++++++++++++++++++ internal/flypg/pg_test.go | 31 +++++++++++++++++++++++++++++++ 5 files changed, 88 insertions(+), 2 deletions(-) create mode 100644 bin/barman-cron diff --git a/bin/barman-cron b/bin/barman-cron new file mode 100644 index 00000000..1a54438d --- /dev/null +++ b/bin/barman-cron @@ -0,0 +1,18 @@ +#!/bin/bash + +# Identify the process id of the cron supervisor process +cron_pid=$(pgrep -f '/usr/sbin/cron') +if [ -z "$cron_pid" ]; then + # Send the error message to main process's stderr + echo "Failed to resolve cron process id" >> /proc/1/fd/2 + exit 1 +fi + +barman_output=$(barman cron 2>&1) +barman_exit_code=$? + +# Log the result of barman cron +if [ $barman_exit_code -ne 0 ]; then + echo "Barman cron failed with exit code $barman_exit_code: $barman_output" >> /proc/$cron_pid/fd/2 + exit 1 +fi diff --git a/internal/flybarman/node.go b/internal/flybarman/node.go index 69dd7687..5c6f2d94 100644 --- a/internal/flybarman/node.go +++ b/internal/flybarman/node.go @@ -140,8 +140,8 @@ wal_retention_policy = main } if _, err := os.Stat(n.BarmanCronFile); os.IsNotExist(err) { - barmanCronFileContent := `* * * * * . $HOME/.profile; /usr/bin/barman cron >> /proc/1/fd/1 2>/proc/1/fd/2 -` + barmanCronFileContent := `* * * * * . $HOME/.profile; /usr/bin/barman-cron\n` + if err := os.WriteFile(n.BarmanCronFile, []byte(barmanCronFileContent), 0644); err != nil { return fmt.Errorf("failed write %s: %s", n.BarmanCronFile, err) } diff --git a/internal/flypg/node.go b/internal/flypg/node.go index 5a9364b2..9785b944 100644 --- a/internal/flypg/node.go +++ b/internal/flypg/node.go @@ -98,6 +98,7 @@ func NewNode() (*Node, error) { node.RepMgr.Witness = present node.PGConfig = PGConfig{ + AppName: node.AppName, DataDir: node.DataDir, Port: node.Port, ConfigFilePath: fmt.Sprintf("%s/postgresql.conf", node.DataDir), diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index f67f9f33..4eb9dc8d 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -19,6 +19,7 @@ import ( ) type PGConfig struct { + AppName string ConfigFilePath string InternalConfigFilePath string UserConfigFilePath string @@ -171,6 +172,20 @@ func (c *PGConfig) SetDefaults() error { "shared_preload_libraries": fmt.Sprintf("'%s'", strings.Join(sharedPreloadLibraries, ",")), } + // Works to configure archiving to object storage if enabled + if os.Getenv("CLOUD_ARCHIVING_ENABLED") == "true" { + if err := validateCloudArchiver(); err != nil { + return fmt.Errorf("failed to validate s3 archiver: %s", err) + } + + bucket := strings.TrimSpace(os.Getenv("AWS_BUCKET_NAME")) + endpoint := strings.TrimSpace(os.Getenv("AWS_ENDPOINT_URL")) + + archiveCommand := fmt.Sprintf("'barman-cloud-wal-archive --cloud-provider aws-s3 --endpoint-url %s s3://%s %s %%p'", endpoint, bucket, c.AppName) + c.internalConfig["archive_mode"] = "on" + c.internalConfig["archive_command"] = archiveCommand + } + return nil } @@ -336,6 +351,7 @@ func (c *PGConfig) validateCompatibility(requested ConfigMap) (ConfigMap, error) } archiveMode := resolveConfigValue(requested, current, "archive_mode", "off") + if archiveMode.(string) != "off" { return requested, errors.New("archive_mode must be set to `off` before wal_level can be set to `minimal`") } @@ -537,3 +553,23 @@ func diskSizeInBytes(dir string) (uint64, error) { } return stat.Blocks * uint64(stat.Bsize), nil } + +func validateCloudArchiver() error { + if os.Getenv("AWS_ACCESS_KEY_ID") == "" { + return fmt.Errorf("AWS_ACCESS_KEY_ID secret must be set") + } + + if os.Getenv("AWS_SECRET_ACCESS_KEY") == "" { + return fmt.Errorf("AWS_SECRET_ACCESS_KEY secret must be set") + } + + if os.Getenv("AWS_BUCKET_NAME") == "" { + return fmt.Errorf("AWS_BUCKET_NAME envvar must be set") + } + + if os.Getenv("AWS_ENDPOINT_URL") == "" { + return fmt.Errorf("AWS_ENDPOINT_URL envvar must be set") + } + + return nil +} diff --git a/internal/flypg/pg_test.go b/internal/flypg/pg_test.go index bee86622..ed5c720a 100644 --- a/internal/flypg/pg_test.go +++ b/internal/flypg/pg_test.go @@ -112,7 +112,36 @@ func TestPGConfigInitialization(t *testing.T) { if cfg["shared_preload_libraries"] != expected { t.Fatalf("expected %s, got %s", expected, cfg["shared_preload_libraries"]) } + }) + + t.Run("cloud-archiving", func(t *testing.T) { + t.Setenv("CLOUD_ARCHIVING_ENABLED", "true") + t.Setenv("AWS_ACCESS_KEY_ID", "my-key") + t.Setenv("AWS_SECRET_ACCESS_KEY", "my-secret") + t.Setenv("AWS_BUCKET_NAME", "my-bucket") + store, _ := state.NewStore() + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["archive_mode"] != "on" { + t.Fatalf("expected archive_mode to be on, got %v", cfg["archive_mode"]) + } + + expected := fmt.Sprintf("'barman-cloud-wal-archive --cloud-provider aws-s3 --endpoint-url https://fly.storage.tigris.dev s3://%s %s %%p'", + os.Getenv("AWS_BUCKET_NAME"), + pgConf.AppName) + + if cfg["archive_command"] != expected { + t.Fatalf("expected %s, got %s", expected, cfg["archive_command"]) + } }) } @@ -208,6 +237,7 @@ func TestPGDefaultPassword(t *testing.T) { defer cleanup() pgConf := &PGConfig{ + AppName: "my-app", DataDir: pgTestDirectory, Port: 5433, ConfigFilePath: pgConfigFilePath, @@ -262,6 +292,7 @@ func TestValidateCompatibility(t *testing.T) { defer cleanup() pgConf := &PGConfig{ + AppName: "my-app", DataDir: pgTestDirectory, Port: 5433, ConfigFilePath: pgConfigFilePath,