Skip to content

Commit

Permalink
Merge pull request #425 from sgotti/config_storeprefix
Browse files Browse the repository at this point in the history
*: Make the store prefix configurable
  • Loading branch information
sgotti committed Jan 29, 2018
2 parents 5fbf4d9 + 0802a61 commit f4a7881
Show file tree
Hide file tree
Showing 14 changed files with 39 additions and 33 deletions.
4 changes: 4 additions & 0 deletions cmd/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ import (
"fmt"
"os"

"github.com/sorintlab/stolon/common"

"github.com/mattn/go-isatty"
"github.com/spf13/cobra"
)

type CommonConfig struct {
StoreBackend string
StoreEndpoints string
StorePrefix string
StoreCertFile string
StoreKeyFile string
StoreCAFile string
Expand All @@ -39,6 +42,7 @@ type CommonConfig struct {
func AddCommonFlags(cmd *cobra.Command, cfg *CommonConfig, hasLogger bool) {
cmd.PersistentFlags().StringVar(&cfg.StoreBackend, "store-backend", "", "store backend type (etcdv2/etcd, etcdv3 or consul)")
cmd.PersistentFlags().StringVar(&cfg.StoreEndpoints, "store-endpoints", "", "a comma-delimited list of store endpoints (use https scheme for tls communication) (defaults: http://127.0.0.1:2379 for etcd, http://127.0.0.1:8500 for consul)")
cmd.PersistentFlags().StringVar(&cfg.StorePrefix, "store-prefix", common.StorePrefix, "the store base prefix")
cmd.PersistentFlags().StringVar(&cfg.StoreCertFile, "store-cert-file", "", "certificate file for client identification to the store")
cmd.PersistentFlags().StringVar(&cfg.StoreKeyFile, "store-key", "", "private key file for client identification to the store")
cmd.PersistentFlags().BoolVar(&cfg.StoreSkipTlsVerify, "store-skip-tls-verify", false, "skip store certificate verification (insecure!!!)")
Expand Down
2 changes: 1 addition & 1 deletion cmd/keeper/keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -387,7 +387,7 @@ type PostgresKeeper struct {
}

func NewPostgresKeeper(cfg *config, end chan error) (*PostgresKeeper, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
storePath := filepath.Join(cfg.StorePrefix, cfg.ClusterName)

kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Expand Down
2 changes: 1 addition & 1 deletion cmd/proxy/proxy.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ type ClusterChecker struct {
}

func NewClusterChecker(uid string, cfg config) (*ClusterChecker, error) {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
storePath := filepath.Join(cfg.StorePrefix, cfg.ClusterName)

kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Expand Down
2 changes: 1 addition & 1 deletion cmd/sentinel/sentinel.go
Original file line number Diff line number Diff line change
Expand Up @@ -1484,7 +1484,7 @@ func NewSentinel(uid string, cfg *config, end chan bool) (*Sentinel, error) {
}
}

storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
storePath := filepath.Join(cfg.StorePrefix, cfg.ClusterName)

kvstore, err := store.NewKVStore(store.Config{
Backend: store.Backend(cfg.StoreBackend),
Expand Down
4 changes: 2 additions & 2 deletions cmd/stolonctl/stolonctl.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,12 +107,12 @@ func NewKVStore() (store.KVStore, error) {
}

func NewStore(kvStore store.KVStore) *store.Store {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
storePath := filepath.Join(cfg.StorePrefix, cfg.ClusterName)
return store.NewStore(kvStore, storePath)
}

func NewElection(kvStore store.KVStore) store.Election {
storePath := filepath.Join(common.StoreBasePath, cfg.ClusterName)
storePath := filepath.Join(cfg.StorePrefix, cfg.ClusterName)
return store.NewElection(kvStore, filepath.Join(storePath, common.SentinelLeaderKey), "")
}

Expand Down
2 changes: 1 addition & 1 deletion common/common.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import (
)

const (
StoreBasePath = "stolon/cluster"
StorePrefix = "stolon/cluster"

SentinelLeaderKey = "sentinel-leader"
)
Expand Down
1 change: 1 addition & 0 deletions doc/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ When this happens the stolon components not able to read or write to a quorate p

In addition, the stolon-proxy, if not able to talk with the store, to avoid sending client connections to a paritioned master, will drop all the connections since it cannot know if the cluster data has changed (for example if the proxy has problems reading from the store but the sentinel can write to it).

Every stolon executable has a `--store-prefix` option (defaulting to `stolon/cluster`) to set the store path prefix. For etcdv3 and consul, if not provided, a starting `/` will be automatically added since they have a directory based layout. Instead, for etcdv3, the prefix will be kept as provided (etcdv3 has a flat namespace and for this reason two prefixes with and without a starting `/` are different and both valid).

##### Handling permanent loss of the store.

Expand Down
1 change: 1 addition & 0 deletions pkg/store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ var URLSchemeRegexp = regexp.MustCompile(`^([a-zA-Z][a-zA-Z0-9+-.]*)://`)
type Config struct {
Backend Backend
Endpoints string
BasePath string
CertFile string
KeyFile string
CAFile string
Expand Down
4 changes: 2 additions & 2 deletions tests/integration/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func TestServerParameters(t *testing.T) {

clusterName := uuid.NewV4().String()

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down Expand Up @@ -150,7 +150,7 @@ func TestAlterSystem(t *testing.T) {

clusterName := uuid.NewV4().String()

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down
28 changes: 14 additions & 14 deletions tests/integration/ha_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestInitWithMultipleKeepers(t *testing.T) {

clusterName := uuid.NewV4().String()

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down Expand Up @@ -314,7 +314,7 @@ func testMasterStandby(t *testing.T, syncRepl bool) {
tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, syncRepl, false, nil)
defer shutdown(tks, tss, tp, tstore)

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -378,7 +378,7 @@ func testFailover(t *testing.T, syncRepl bool, standbyCluster bool) {
tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, syncRepl, false, ptk)
defer shutdown(tks, tss, tp, tstore)

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -492,7 +492,7 @@ func testFailoverFailed(t *testing.T, syncRepl bool, standbyCluster bool) {
tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, syncRepl, false, ptk)
defer shutdown(tks, tss, tp, tstore)

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -597,7 +597,7 @@ func testFailoverTooMuchLag(t *testing.T, standbyCluster bool) {
tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, false, false, ptk)
defer shutdown(tks, tss, tp, tstore)

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -675,7 +675,7 @@ func testOldMasterRestart(t *testing.T, syncRepl, usePgrewind bool, standbyClust
defer shutdown(tks, tss, tp, tstore)

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -814,7 +814,7 @@ func testPartition1(t *testing.T, syncRepl, usePgrewind bool, standbyCluster boo
defer shutdown(tks, tss, tp, tstore)

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -956,7 +956,7 @@ func testTimelineFork(t *testing.T, syncRepl, usePgrewind bool) {
defer shutdown(tks, tss, tp, tstore)

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -1152,7 +1152,7 @@ func testMasterChangedAddress(t *testing.T, standbyCluster bool) {
tks, tss, tp, tstore := setupServers(t, clusterName, dir, 2, 1, false, false, ptk)
defer shutdown(tks, tss, tp, tstore)

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -1245,7 +1245,7 @@ func TestFailedStandby(t *testing.T) {
tks, tss, tp, tstore := setupServersCustom(t, clusterName, dir, 3, 1, initialClusterSpec)
defer shutdown(tks, tss, tp, tstore)

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

// Wait for clusterView containing a master
Expand Down Expand Up @@ -1337,7 +1337,7 @@ func TestLoweredMaxStandbysPerSender(t *testing.T) {
defer shutdown(tks, tss, tp, tstore)

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

// Wait for clusterView containing a master
Expand Down Expand Up @@ -1404,7 +1404,7 @@ func TestKeeperRemoval(t *testing.T) {
defer shutdown(tks, tss, tp, tstore)

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -1507,7 +1507,7 @@ func testKeeperRemovalStolonCtl(t *testing.T, syncRepl bool) {
defer shutdown(tks, tss, tp, tstore)

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down Expand Up @@ -1613,7 +1613,7 @@ func TestStandbyCantSync(t *testing.T) {
tks, tss, tp, tstore := setupServersCustom(t, clusterName, dir, 3, 1, initialClusterSpec)
defer shutdown(tks, tss, tp, tstore)

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

master, standbys := waitMasterStandbysReady(t, sm, tks)
Expand Down
10 changes: 5 additions & 5 deletions tests/integration/init_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ func testInitNew(t *testing.T, merge bool) {
defer tstore.Stop()

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down Expand Up @@ -181,7 +181,7 @@ func testInitExisting(t *testing.T, merge bool) {
defer tstore.Stop()

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down Expand Up @@ -327,7 +327,7 @@ func TestInitUsers(t *testing.T) {

// Test pg-repl-username == pg-su-username
clusterName = uuid.NewV4().String()
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down Expand Up @@ -370,7 +370,7 @@ func TestInitUsers(t *testing.T) {

// Test pg-repl-username != pg-su-username and pg-su-password defined
clusterName = uuid.NewV4().String()
storePath = filepath.Join(common.StoreBasePath, clusterName)
storePath = filepath.Join(common.StorePrefix, clusterName)

sm = store.NewStore(tstore.store, storePath)

Expand Down Expand Up @@ -418,7 +418,7 @@ func TestInitialClusterSpec(t *testing.T) {
clusterName := uuid.NewV4().String()

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/pitr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ func TestPITR(t *testing.T) {

clusterName := uuid.NewV4().String()

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down
2 changes: 1 addition & 1 deletion tests/integration/proxy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func TestProxyListening(t *testing.T) {
}
}()

storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)

sm := store.NewStore(tstore.store, storePath)

Expand Down
8 changes: 4 additions & 4 deletions tests/integration/standby_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func TestInitStandbyCluster(t *testing.T) {
defer ptstore.Stop()

primaryStoreEndpoints := fmt.Sprintf("%s:%s", ptstore.listenAddress, ptstore.port)
pStorePath := filepath.Join(common.StoreBasePath, primaryClusterName)
pStorePath := filepath.Join(common.StorePrefix, primaryClusterName)
psm := store.NewStore(ptstore.store, pStorePath)

initialClusterSpec := &cluster.ClusterSpec{
Expand Down Expand Up @@ -94,7 +94,7 @@ func TestInitStandbyCluster(t *testing.T) {
defer tstore.Stop()

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

pgpass, err := ioutil.TempFile(dir, "pgpass")
Expand Down Expand Up @@ -174,7 +174,7 @@ func TestPromoteStandbyCluster(t *testing.T) {
defer ptstore.Stop()

primaryStoreEndpoints := fmt.Sprintf("%s:%s", ptstore.listenAddress, ptstore.port)
pStorePath := filepath.Join(common.StoreBasePath, primaryClusterName)
pStorePath := filepath.Join(common.StorePrefix, primaryClusterName)
psm := store.NewStore(ptstore.store, pStorePath)

initialClusterSpec := &cluster.ClusterSpec{
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestPromoteStandbyCluster(t *testing.T) {
defer tstore.Stop()

storeEndpoints := fmt.Sprintf("%s:%s", tstore.listenAddress, tstore.port)
storePath := filepath.Join(common.StoreBasePath, clusterName)
storePath := filepath.Join(common.StorePrefix, clusterName)
sm := store.NewStore(tstore.store, storePath)

pgpass, err := ioutil.TempFile(dir, "pgpass")
Expand Down

0 comments on commit f4a7881

Please sign in to comment.