From cc5721d584f138790ed885e0a8c2590729474ee8 Mon Sep 17 00:00:00 2001 From: Shaun Davis Date: Tue, 9 Jul 2024 16:53:02 -0500 Subject: [PATCH] Ensure units specified for archive_command are compatible with PG --- internal/flypg/barman_config.go | 63 +++++++-- internal/flypg/barman_config_test.go | 111 ++++++++++++++-- internal/flypg/barman_test.go | 2 +- internal/flypg/pg.go | 7 +- internal/flypg/pg_test.go | 187 +++++++++++++++++++++++---- 5 files changed, 323 insertions(+), 47 deletions(-) diff --git a/internal/flypg/barman_config.go b/internal/flypg/barman_config.go index 48fa4c55..903daf51 100644 --- a/internal/flypg/barman_config.go +++ b/internal/flypg/barman_config.go @@ -117,10 +117,12 @@ func (c *BarmanConfig) Validate(requestedChanges map[string]interface{}) error { for k, v := range requestedChanges { switch k { case "archive_timeout": - // Ensure that the value is a valid duration - if _, err := time.ParseDuration(v.(string)); err != nil { - return fmt.Errorf("invalid value for archive_timeout: %v", v) + val := v.(string) + // Ensure it can be converted to a Postgres duration + if _, err := convertToPostgresUnits(val); err != nil { + return fmt.Errorf("invalid value for archive_timeout: %v", err) } + case "recovery_window": // Ensure that the value is a valid duration re := regexp.MustCompile(`^(\d+)([dwy])$`) @@ -170,17 +172,24 @@ func (c *BarmanConfig) initialize(store *state.Store, configDir string) error { c.SetDefaults() + // Sync the user config from consul if err := SyncUserConfig(c, store); err != nil { log.Printf("[WARN] Failed to sync user config from consul for barman: %s\n", err.Error()) - if err := writeInternalConfigFile(c); err != nil { - return fmt.Errorf("failed to write barman config files: %s", err) - } - } else { - if err := WriteConfigFiles(c); err != nil { - return fmt.Errorf("failed to write barman config files: %s", err) + } + + // Write the internal defaults + if err := writeInternalConfigFile(c); err != nil { + return fmt.Errorf("failed to write barman config files: %s", err) + } + + // Create the user config file if it doesn't exist + if _, err := os.Stat(c.UserConfigFile()); os.IsNotExist(err) { + if _, err := os.Create(c.UserConfigFile()); err != nil { + return fmt.Errorf("failed to stub user config file: %s", err) } } + // Load the settings settings, err := c.ParseSettings() if err != nil { return fmt.Errorf("failed to parse barman config: %w", err) @@ -204,3 +213,39 @@ func convertRecoveryWindowDuration(durationStr string) string { } return durationStr } + +func convertToPostgresUnits(dStr string) (string, error) { + // Use regex to split the numeric part and the unit + re := regexp.MustCompile(`(\d+)([a-z]+)`) + matches := re.FindStringSubmatch(dStr) + if len(matches) != 3 { + return "", fmt.Errorf("invalid duration format: %s", dStr) + } + + // Parse the numeric value + num, err := strconv.Atoi(matches[1]) + if err != nil { + return "", err + } + + // Map the Go units to Postgres units + var postgresUnit string + switch matches[2] { + case "us": + postgresUnit = "us" + case "ms": + postgresUnit = "ms" + case "s": + postgresUnit = "s" + case "min", "m": + postgresUnit = "min" + case "h": + postgresUnit = "h" + case "d": + postgresUnit = "d" + default: + return "", fmt.Errorf("unsupported postgres unit: %s", matches[2]) + } + + return fmt.Sprintf("%d%s", num, postgresUnit), nil +} diff --git a/internal/flypg/barman_config_test.go b/internal/flypg/barman_config_test.go index edeaf759..8e585563 100644 --- a/internal/flypg/barman_config_test.go +++ b/internal/flypg/barman_config_test.go @@ -6,10 +6,6 @@ import ( "github.com/fly-apps/postgres-flex/internal/flypg/state" ) -const ( - barmanConfigTestDir = "./test_results/barman" -) - func TestValidateBarmanConfig(t *testing.T) { if err := setup(t); err != nil { t.Fatal(err) @@ -18,7 +14,7 @@ func TestValidateBarmanConfig(t *testing.T) { store, _ := state.NewStore() - b, err := NewBarmanConfig(store, barmanConfigTestDir) + b, err := NewBarmanConfig(store, testBarmanConfigDir) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -37,12 +33,61 @@ func TestValidateBarmanConfig(t *testing.T) { } }) - t.Run("invalid-archive-timeout", func(t *testing.T) { - conf := ConfigMap{ - "archive_timeout": "120seconds", + t.Run("valid-archive-timeout-us", func(t *testing.T) { + conf := ConfigMap{"archive_timeout": "120us"} + + if err := b.Validate(conf); err != nil { + t.Fatalf("unexpected error: %v", err) } + }) - if err := b.Validate(conf); err == nil { + t.Run("archive-timeout-with-ms-unit", func(t *testing.T) { + conf := ConfigMap{"archive_timeout": "120ms"} + + if err := b.Validate(conf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("archive-timeout-with-s-unit", func(t *testing.T) { + conf := ConfigMap{"archive_timeout": "120s"} + + if err := b.Validate(conf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("archive-timeout-with-m-unit", func(t *testing.T) { + conf := ConfigMap{"archive_timeout": "120m"} + + if err := b.Validate(conf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + conf = ConfigMap{"archive_timeout": "120min"} + if err := b.Validate(conf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("archive-timeout-with-h-unit", func(t *testing.T) { + conf := ConfigMap{"archive_timeout": "120h"} + if err := b.Validate(conf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("archive-timeout-with-d-unit", func(t *testing.T) { + conf := ConfigMap{"archive_timeout": "120d"} + if err := b.Validate(conf); err != nil { + t.Fatalf("unexpected error: %v", err) + } + }) + + t.Run("invalid-archive-timeout", func(t *testing.T) { + conf := ConfigMap{"archive_timeout": "120seconds"} + err := b.Validate(conf) + if err == nil { t.Fatalf("expected error, got nil") } }) @@ -106,6 +151,7 @@ func TestValidateBarmanConfig(t *testing.T) { t.Fatalf("expected error, got nil") } }) + } func TestBarmanConfigSettings(t *testing.T) { @@ -117,7 +163,7 @@ func TestBarmanConfigSettings(t *testing.T) { store, _ := state.NewStore() t.Run("defaults", func(t *testing.T) { - b, err := NewBarmanConfig(store, barmanConfigTestDir) + b, err := NewBarmanConfig(store, testBarmanConfigDir) if err != nil { t.Fatalf("unexpected error: %v", err) } @@ -130,5 +176,50 @@ func TestBarmanConfigSettings(t *testing.T) { t.Fatalf("expected fullBackupFrequency to be 24h, but got %s", b.Settings.FullBackupFrequency) } + if b.Settings.RecoveryWindow != "RECOVERY WINDOW OF 7 DAYS" { + t.Fatalf("expected recovery_window to be 'RECOVERY WINDOW OF 7 DAYS', but got %s", b.Settings.RecoveryWindow) + } + + if b.Settings.ArchiveTimeout != "60s" { + t.Fatalf("expected archive_timeout to be 60s, but got %s", b.Settings.ArchiveTimeout) + } + }) } + +func TestBarmanSettingUpdate(t *testing.T) { + if err := setup(t); err != nil { + t.Fatal(err) + } + defer cleanup() + + store, _ := state.NewStore() + + b, err := NewBarmanConfig(store, testBarmanConfigDir) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + usrCfg := ConfigMap{ + "archive_timeout": "60m", + } + + if err := b.Validate(usrCfg); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + b.SetUserConfig(usrCfg) + + if err := writeUserConfigFile(b); err != nil { + t.Fatalf("unexpected error: %v", err) + } + + cfg, err := b.CurrentConfig() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + if cfg["archive_timeout"] != "60m" { + t.Fatalf("expected archive_timeout to be 60m, but got %s", cfg["archive_timeout"]) + } +} diff --git a/internal/flypg/barman_test.go b/internal/flypg/barman_test.go index 3e800186..7d2c019b 100644 --- a/internal/flypg/barman_test.go +++ b/internal/flypg/barman_test.go @@ -8,7 +8,7 @@ import ( ) const ( - testBarmanConfigDir = "./test_results/barman" + testBarmanConfigDir = "./test_results/barman/" ) func TestNewBarman(t *testing.T) { diff --git a/internal/flypg/pg.go b/internal/flypg/pg.go index 730cbd5b..c668bcb0 100644 --- a/internal/flypg/pg.go +++ b/internal/flypg/pg.go @@ -201,9 +201,14 @@ func (c *PGConfig) setArchiveConfig(store *state.Store) error { return fmt.Errorf("failed to load barman config: %s", err) } + archiveTimeout, err := convertToPostgresUnits(barman.Settings.ArchiveTimeout) + if err != nil { + return fmt.Errorf("failed to convert archive_timeout to postgres units: %s", err) + } + c.internalConfig["archive_mode"] = "on" c.internalConfig["archive_command"] = fmt.Sprintf("'%s'", barman.walArchiveCommand()) - c.internalConfig["archive_timeout"] = barman.Settings.ArchiveTimeout + c.internalConfig["archive_timeout"] = archiveTimeout return nil } diff --git a/internal/flypg/pg_test.go b/internal/flypg/pg_test.go index a6d603d1..ac93aca2 100644 --- a/internal/flypg/pg_test.go +++ b/internal/flypg/pg_test.go @@ -16,9 +16,7 @@ const ( pgInternalConfigFilePath = "./test_results/postgresql.internal.conf" pgUserConfigFilePath = "./test_results/postgresql.user.conf" pgPasswordFilePath = "./test_results/default_password" - barmanConfigDir = "./test_results/barman" - - pgHBAFilePath = "./test_results/pg_hba.conf" + pgHBAFilePath = "./test_results/pg_hba.conf" ) func TestPGConfigInitialization(t *testing.T) { @@ -34,7 +32,7 @@ func TestPGConfigInitialization(t *testing.T) { InternalConfigFilePath: pgInternalConfigFilePath, UserConfigFilePath: pgUserConfigFilePath, passwordFilePath: pgPasswordFilePath, - barmanConfigPath: barmanConfigDir, + barmanConfigPath: testBarmanConfigDir, } if err := stubPGConfigFile(); err != nil { @@ -116,36 +114,163 @@ func TestPGConfigInitialization(t *testing.T) { } }) - t.Run("barman-enabled", func(t *testing.T) { + t.Run("archive-enabled", func(t *testing.T) { t.Setenv("S3_ARCHIVE_CONFIG", "https://my-key:my-secret@fly.storage.tigris.dev/my-bucket/my-directory") store, _ := state.NewStore() - if err := pgConf.initialize(store); err != nil { - t.Fatal(err) - } - cfg, err := pgConf.CurrentConfig() - if err != nil { - t.Fatal(err) - } + t.Run("defaults", func(t *testing.T) { + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } - if cfg["archive_mode"] != "on" { - t.Fatalf("expected archive_mode to be on, got %v", cfg["archive_mode"]) - } + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } - barman, err := NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) - if err != nil { - t.Fatal(err) - } + if cfg["archive_mode"] != "on" { + t.Fatalf("expected archive_mode to be on, got %v", cfg["archive_mode"]) + } - if err := barman.LoadConfig(testBarmanConfigDir); err != nil { - t.Fatal(err) - } + barman, err := NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) + if err != nil { + t.Fatal(err) + } - expected := fmt.Sprintf("'%s'", barman.walArchiveCommand()) - if cfg["archive_command"] != expected { - t.Fatalf("expected %s, got %s", expected, cfg["archive_command"]) - } + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatal(err) + } + + expected := fmt.Sprintf("'%s'", barman.walArchiveCommand()) + if cfg["archive_command"] != expected { + t.Fatalf("expected %s, got %s", expected, cfg["archive_command"]) + } + + if cfg["archive_timeout"] != "60s" { + t.Fatalf("expected 60s, got %s", cfg["archive_timeout"]) + } + }) + + t.Run("custom-archive-timeout-with-m", func(t *testing.T) { + barman, err := NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) + if err != nil { + t.Fatal(err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatal(err) + } + + barman.SetUserConfig(ConfigMap{"archive_timeout": "60m"}) + + if err := writeUserConfigFile(barman); err != nil { + t.Fatal(err) + } + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["archive_timeout"] != "60min" { + t.Fatalf("expected 60min, got %s", cfg["archive_timeout"]) + } + }) + + t.Run("custom-archive-timeout-with-min", func(t *testing.T) { + barman, err := NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) + if err != nil { + t.Fatal(err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatal(err) + } + + barman.SetUserConfig(ConfigMap{"archive_timeout": "60min"}) + + if err := writeUserConfigFile(barman); err != nil { + t.Fatal(err) + } + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["archive_timeout"] != "60min" { + t.Fatalf("expected 60min, got %s", cfg["archive_timeout"]) + } + }) + + t.Run("custom-archive-timeout-with-s", func(t *testing.T) { + barman, err := NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) + if err != nil { + t.Fatal(err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatal(err) + } + + barman.SetUserConfig(ConfigMap{"archive_timeout": "60s"}) + + if err := writeUserConfigFile(barman); err != nil { + t.Fatal(err) + } + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["archive_timeout"] != "60s" { + t.Fatalf("expected 60s, got %s", cfg["archive_timeout"]) + } + }) + + t.Run("custom-archive-timeout-w", func(t *testing.T) { + barman, err := NewBarman(store, os.Getenv("S3_ARCHIVE_CONFIG"), DefaultAuthProfile) + if err != nil { + t.Fatal(err) + } + + if err := barman.LoadConfig(testBarmanConfigDir); err != nil { + t.Fatal(err) + } + + barman.SetUserConfig(ConfigMap{"archive_timeout": "24h"}) + + if err := writeUserConfigFile(barman); err != nil { + t.Fatal(err) + } + + if err := pgConf.initialize(store); err != nil { + t.Fatal(err) + } + + cfg, err := pgConf.CurrentConfig() + if err != nil { + t.Fatal(err) + } + + if cfg["archive_timeout"] != "24h" { + t.Fatalf("expected 24h, got %s", cfg["archive_timeout"]) + } + }) }) t.Run("barman-disabled", func(t *testing.T) { @@ -416,6 +541,16 @@ func setup(t *testing.T) error { } } + if _, err := os.Stat(testBarmanConfigDir); err != nil { + if os.IsNotExist(err) { + if err := os.Mkdir(testBarmanConfigDir, 0750); err != nil { + return err + } + } else { + return err + } + } + return nil }