diff --git a/cmd/keeper/cmd/keeper.go b/cmd/keeper/cmd/keeper.go index 585a43a5e..ba2f1352e 100644 --- a/cmd/keeper/cmd/keeper.go +++ b/cmd/keeper/cmd/keeper.go @@ -427,6 +427,8 @@ type PostgresKeeper struct { pgStateMutex sync.Mutex getPGStateMutex sync.Mutex lastPGState *cluster.PostgresState + + waitSyncStandbysSynced bool } func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) { @@ -992,8 +994,8 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { return } - // Dynamicly generate hba auth from clusterData - pgm.SetHba(p.generateHBA(cd, db)) + // Generate hba auth from clusterData + pgm.SetHba(p.generateHBA(cd, db, p.waitSyncStandbysSynced)) var pgParameters common.Parameters @@ -1035,6 +1037,7 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Infow("current db UID different than cluster data db UID", "db", p.dbLocalState.UID, "cdDB", db.UID) pgm.SetRecoveryParameters(nil) + p.waitSyncStandbysSynced = false switch db.Spec.InitMode { case cluster.DBInitModeNew: @@ -1394,6 +1397,13 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { return } if !started { + // if we have syncrepl enabled and the postgres instance is stopped, before opening connections to normal users wait for having the defined synchronousStandbys in sync state. + if db.Spec.SynchronousReplication { + p.waitSyncStandbysSynced = true + log.Infow("not allowing connection as normal users since synchronous replication is enabled and instance was down") + pgm.SetHba(p.generateHBA(cd, db, true)) + } + if err = pgm.Start(); err != nil { log.Errorw("failed to start postgres", zap.Error(err)) return @@ -1547,8 +1557,24 @@ func (p *PostgresKeeper) postgresKeeperSM(pctx context.Context) { log.Infow("postgres parameters not changed") } - // Dynamicly generate hba auth from clusterData - newHBA := p.generateHBA(cd, db) + // Generate hba auth from clusterData + + // if we have syncrepl enabled and the postgres instance is stopped, before opening connections to normal users wait for having the defined synchronousStandbys in sync state. + if db.Spec.SynchronousReplication && p.waitSyncStandbysSynced { + inSyncStandbys, err := p.GetInSyncStandbys() + if err != nil { + log.Errorw("failed to retrieve current in sync standbys from instance", zap.Error(err)) + return + } + if !util.CompareStringSliceNoOrder(inSyncStandbys, db.Spec.SynchronousStandbys) { + log.Infow("not allowing connection as normal users since synchronous replication is enabled, instance was down and not all sync standbys are synced") + } else { + p.waitSyncStandbysSynced = false + } + } else { + p.waitSyncStandbysSynced = false + } + newHBA := p.generateHBA(cd, db, p.waitSyncStandbysSynced) if !reflect.DeepEqual(newHBA, pgm.CurHba()) { log.Infow("postgres hba entries changed, reloading postgres instance") pgm.SetHba(newHBA) @@ -1653,8 +1679,12 @@ func IsMaster(db *cluster.DB) bool { } } -// generateHBA generates the instance hba entries depending on the value of DefaultSUReplAccessMode. -func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB) []string { +// generateHBA generates the instance hba entries depending on the value of +// DefaultSUReplAccessMode. +// When onlyInternal is true only rules needed for replication will be setup +// and the traffic should be permitted only for pgSUUsername standard +// connections and pgReplUsername replication connections. +func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB, onlyInternal bool) []string { // Minimal entries for local normal and replication connections needed by the stolon keeper // Matched local connections are for postgres database and suUsername user with md5 auth // Matched local replication connections are for replUsername user with md5 auth @@ -1693,16 +1723,18 @@ func (p *PostgresKeeper) generateHBA(cd *cluster.ClusterData, db *cluster.DB) [] } } - // By default, if no custom pg_hba entries are provided, accept - // connections for all databases and users with md5 auth - if db.Spec.PGHBA != nil { - computedHBA = append(computedHBA, db.Spec.PGHBA...) - } else { - computedHBA = append( - computedHBA, - "host all all 0.0.0.0/0 md5", - "host all all ::0/0 md5", - ) + if !onlyInternal { + // By default, if no custom pg_hba entries are provided, accept + // connections for all databases and users with md5 auth + if db.Spec.PGHBA != nil { + computedHBA = append(computedHBA, db.Spec.PGHBA...) + } else { + computedHBA = append( + computedHBA, + "host all all 0.0.0.0/0 md5", + "host all all ::0/0 md5", + ) + } } // return generated Hba merged with user Hba diff --git a/cmd/keeper/cmd/keeper_test.go b/cmd/keeper/cmd/keeper_test.go index cba013225..13cc3935d 100644 --- a/cmd/keeper/cmd/keeper_test.go +++ b/cmd/keeper/cmd/keeper_test.go @@ -232,7 +232,7 @@ func TestGenerateHBA(t *testing.T) { db := cd.DBs[tt.dbUID] db.Spec.PGHBA = tt.pgHBA - out := p.generateHBA(cd, db) + out := p.generateHBA(cd, db, false) if !reflect.DeepEqual(out, tt.out) { var b bytes.Buffer diff --git a/doc/faq.md b/doc/faq.md index 4171c8dbb..6911beb07 100644 --- a/doc/faq.md +++ b/doc/faq.md @@ -58,7 +58,7 @@ stolon let you easily integrate with any backup/restore solution. See the [point When using async replication the leader sentinel tries to find the best standby using a valid standby with the (last reported) nearest xlog location to the master latest knows xlog location. If a master is down there's no way to know its latest xlog position (stolon get and save it at some intervals) so there's no way to guarantee that the standby is not behind but just that the best standby of the ones available will be choosen. -When using synchronous replication only synchronous standbys will be choosen so standbys behind the master won't be choosen (be aware of postgresql synchronous replication limits explaned in the [postgresql documentation](https://www.postgresql.org/docs/9.6/static/warm-standby.html#SYNCHRONOUS-REPLICATION), for example, when a master restarts while no synchronous standbys are available, the transactions waiting for acknowledgement on the master will be marked as fully committed. We are thinking of a way to avoid this using stolon). +When using synchronous replication only synchronous standbys will be choosen so standbys behind the master won't be choosen (be aware of postgresql synchronous replication limits explaned in the [postgresql documentation](https://www.postgresql.org/docs/9.6/static/warm-standby.html#SYNCHRONOUS-REPLICATION), for example, when a master restarts while no synchronous standbys are available, the transactions waiting for acknowledgement on the master will be marked as fully committed. This is "fixed" by stolon. See the [synchronous replication doc](syncrepl.md). ## Does stolon uses postgres sync replication [quorum methods](https://www.postgresql.org/docs/10/static/runtime-config-replication.html#RUNTIME-CONFIG-REPLICATION-MASTER) (FIRST or ANY)? diff --git a/doc/syncrepl.md b/doc/syncrepl.md index 4552178e0..a427513b6 100644 --- a/doc/syncrepl.md +++ b/doc/syncrepl.md @@ -29,3 +29,32 @@ Set MinSynchronousStandbys/MaxSynchronousStandbys to a value greater than 1 (onl ``` stolonctl --cluster-name=mycluster --store-backend=etcd update --patch '{ "synchronousReplication" : true, "minSynchronousStandbys": 2, "maxSynchronousStandbys": 3 }' ``` + +## Handling postgresql sync repl limits under such circumstances + +Postgres synchronous replication has a downside explained in the [docs](https://www.postgresql.org/docs/current/static/warm-standby.html) + +`If primary restarts while commits are waiting for acknowledgement, those waiting transactions will be marked fully committed once the primary database recovers. There is no way to be certain that all standbys have received all outstanding WAL data at time of the crash of the primary. Some transactions may not show as committed on the standby, even though they show as committed on the primary. The guarantee we offer is that the application will not receive explicit acknowledgement of the successful commit of a transaction until the WAL data is known to be safely received by all the synchronous standbys.` + +Under some events this will cause lost transactions. For example: + +* Sync standby goes down. +* A client commits a transaction, it blocks waiting for acknowledgement. +* Primary restart, it'll mark the above transaction as fully committed. All the +clients will now see that transaction. +* Primary dies +* Standby comes back. +* The sentinel will elect the standby as the new master since it's in the +synchronous_standby_names list. +* The above transaction will be lost despite synchronous replication being +enabled. + +So there can be some conditions where a syncstandby could be elected also if it's missing the last transactions if it was down at the commit time. + +It's not easy to fix this issue since these events cannot be resolved by the sentinel because it's not possible to know if a sync standby is really in sync when the master is down (since we cannot query its last wal position and the reporting from the keeper is asynchronous). + +But with stolon we have the power to overcome this issue by noticing when a primary restarts (since we control it), allow only "internal" connections until all the defined synchronous standbys are really in sync. + +Allowing only "internal" connections means not adding the default rules or the user defined pgHBA rules but only the rules needed for replication (and local communication from the keeper). + +Since "internal" rules accepts the defined superuser and replication users, client should not use these roles for normal operation or the above solution won't work (but they shouldn't do it anyway since this could cause exhaustion of reserved superuser connections needed by the keeper to check the instance). diff --git a/tests/integration/ha_test.go b/tests/integration/ha_test.go index c2055ff7b..1fbe3beef 100644 --- a/tests/integration/ha_test.go +++ b/tests/integration/ha_test.go @@ -16,10 +16,12 @@ package integration import ( "context" + "database/sql" "fmt" "io/ioutil" "os" "path/filepath" + "strings" "syscall" "testing" "time" @@ -27,6 +29,7 @@ import ( "github.com/satori/go.uuid" "github.com/sorintlab/stolon/internal/cluster" "github.com/sorintlab/stolon/internal/common" + pg "github.com/sorintlab/stolon/internal/postgresql" "github.com/sorintlab/stolon/internal/store" ) @@ -1929,3 +1932,112 @@ func TestForceFailSyncReplStandbyCluster(t *testing.T) { t.Parallel() testForceFail(t, false, true) } + +// TestSyncStandbyNotInSync tests that, when using synchronous replication, a +// normal user cannot connect to primary db after it has restarted until all +// defined synchronous standbys are in sync. +func TestSyncStandbyNotInSync(t *testing.T) { + t.Parallel() + dir, err := ioutil.TempDir("", "stolon") + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + defer os.RemoveAll(dir) + clusterName := uuid.NewV4().String() + tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, true, false, nil) + defer shutdown(tks, tss, tp, tstore) + storePath := filepath.Join(common.StorePrefix, clusterName) + sm := store.NewKVBackedStore(tstore.store, storePath) + master, standbys := waitMasterStandbysReady(t, sm, tks) + standby := standbys[0] + if err := WaitClusterDataSynchronousStandbys([]string{standby.uid}, sm, 30*time.Second); err != nil { + t.Fatalf("expected synchronous standby on keeper %q in cluster data", standby.uid) + } + if err := populate(t, master); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if err := write(t, master, 1, 1); err != nil { + t.Fatalf("unexpected err: %v", err) + } + // create a normal user + if _, err := master.Exec("CREATE USER user01 PASSWORD 'password'"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if _, err := master.Exec("GRANT ALL ON DATABASE postgres TO user01"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + if _, err := master.Exec("GRANT ALL ON TABLE table01 TO user01"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + connParams := pg.ConnParams{ + "user": "user01", + "password": "password", + "host": master.pgListenAddress, + "port": master.pgPort, + "dbname": "postgres", + "sslmode": "disable", + } + connString := connParams.ConnString() + user01db, err := sql.Open("postgres", connString) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if _, err := user01db.Exec("SELECT * from table01"); err != nil { + t.Fatalf("unexpected err: %v", err) + } + // get the master XLogPos + xLogPos, err := GetXLogPos(master) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + // wait for the keepers to have reported their state + if err := WaitClusterSyncedXLogPos([]*TestKeeper{master, standby}, xLogPos, sm, 20*time.Second); err != nil { + t.Fatalf("unexpected err: %v", err) + } + // the proxy should connect to the right master + if err := tp.WaitRightMaster(master, 3*cluster.DefaultProxyCheckInterval); err != nil { + t.Fatalf("unexpected err: %v", err) + } + // Stop the standby keeper, should also stop the database + t.Logf("Stopping current standby keeper: %s", standby.uid) + standby.Stop() + // this call will block and then exit with an error when the master is restarted + go func() { + write(t, master, 2, 2) + }() + time.Sleep(1 * time.Second) + // restart master + t.Logf("Restarting current master keeper: %s", master.uid) + master.Stop() + master.Start() + waitKeeperReady(t, sm, master) + // The transaction should be fully committed on master + c, err := getLines(t, master) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if c != 2 { + t.Fatalf("wrong number of lines, want: %d, got: %d", 2, c) + } + // The normal user shouldn't be able to connect + if _, err := user01db.Exec("SELECT * from table01"); err != nil { + exp := `pq: no pg_hba.conf entry for host "127.0.0.1", user "user01", database "postgres"` + if !strings.HasPrefix(err.Error(), exp) { + t.Fatalf("expected error when connecting to db as user01 starting with %q, got err: %q", exp, err.Error()) + } + } else { + t.Fatalf("expected error connecting to db as user01, got no err") + } + // Starting the standby keeper + t.Logf("Starting current standby keeper: %s", standby.uid) + standby.Start() + time.Sleep(10 * time.Second) + // The normal user should now be able to connect and see 2 lines + c, err = getLines(t, user01db) + if err != nil { + t.Fatalf("unexpected err: %v", err) + } + if c != 2 { + t.Fatalf("wrong number of lines, want: %d, got: %d", 2, c) + } +} diff --git a/tests/integration/utils.go b/tests/integration/utils.go index b16869e4c..2aa2486f8 100644 --- a/tests/integration/utils.go +++ b/tests/integration/utils.go @@ -72,6 +72,9 @@ func pgParametersWithDefaults(p cluster.PGParameters) cluster.PGParameters { type Querier interface { Exec(query string, args ...interface{}) (sql.Result, error) Query(query string, args ...interface{}) (*sql.Rows, error) +} + +type ReplQuerier interface { ReplQuery(query string, args ...interface{}) (*sql.Rows, error) } @@ -94,7 +97,7 @@ func GetPGParameters(q Querier) (common.Parameters, error) { return pgParameters, nil } -func GetSystemData(q Querier) (*pg.SystemData, error) { +func GetSystemData(q ReplQuerier) (*pg.SystemData, error) { rows, err := q.ReplQuery("IDENTIFY_SYSTEM") if err != nil { return nil, err @@ -116,7 +119,7 @@ func GetSystemData(q Querier) (*pg.SystemData, error) { return nil, fmt.Errorf("query returned 0 rows") } -func GetXLogPos(q Querier) (uint64, error) { +func GetXLogPos(q ReplQuerier) (uint64, error) { // get the current master XLogPos systemData, err := GetSystemData(q) if err != nil {