Skip to content

Commit

Permalink
merge ringpop services (#5080)
Browse files Browse the repository at this point in the history
**What changed?**

I merged the ringpop `service` with the membership monitor.

**Why?**

It's not a useful abstraction


**How did you test it?**


**Potential risks**


**Is hotfix candidate?**

No.
  • Loading branch information
tdeebswihart authored Nov 8, 2023
1 parent 5986b4f commit 1da93ba
Show file tree
Hide file tree
Showing 6 changed files with 75 additions and 169 deletions.
27 changes: 14 additions & 13 deletions common/membership/ringpop/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,30 +115,31 @@ func (factory *factory) getMonitor() *monitor {
factory.monOnce.Do(func() {
ctx, cancel := context.WithTimeout(context.Background(), persistenceOperationTimeout)
defer cancel()
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)

ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)
currentClusterMetadata, err := factory.MetadataManager.GetCurrentClusterMetadata(ctx)
if err != nil {
factory.Logger.Fatal("Failed to get current cluster ID", tag.Error(err))
}

appName := "temporal"
if currentClusterMetadata.UseClusterIdMembership {
appName = fmt.Sprintf("temporal-%s", currentClusterMetadata.GetClusterId())
}
if rp, err := ringpop.New(appName, ringpop.Channel(factory.getTChannel()), ringpop.AddressResolverFunc(factory.broadcastAddressResolver)); err != nil {
rp, err := ringpop.New(appName, ringpop.Channel(factory.getTChannel()), ringpop.AddressResolverFunc(factory.broadcastAddressResolver))
if err != nil {
factory.Logger.Fatal("Failed to get new ringpop", tag.Error(err))
} else {
mrp := newService(rp, factory.Config.MaxJoinDuration, factory.Logger)

factory.monitor = newMonitor(
factory.ServiceName,
factory.ServicePortMap,
mrp,
factory.Logger,
factory.MetadataManager,
factory.broadcastAddressResolver,
)
}

factory.monitor = newMonitor(
factory.ServiceName,
factory.ServicePortMap,
rp,
factory.Logger,
factory.MetadataManager,
factory.broadcastAddressResolver,
factory.Config.MaxJoinDuration,
)
})

return factory.monitor
Expand Down
70 changes: 53 additions & 17 deletions common/membership/ringpop/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,32 @@ import (
"sync"
"time"

"github.com/pborman/uuid"
"github.com/temporalio/ringpop-go"
"github.com/temporalio/ringpop-go/discovery/statichosts"
"github.com/temporalio/ringpop-go/swim"

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/primitives"

"github.com/pborman/uuid"

"go.temporal.io/server/common/persistence"

"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives"
)

const (
upsertMembershipRecordExpiryDefault = time.Hour * 48

// 10 second base reporting frequency + 5 second jitter + 5 second acceptable time skew
healthyHostLastHeartbeatCutoff = time.Second * 20

// Number of times we retry refreshing the bootstrap list and try to join the Ringpop cluster before giving up
maxBootstrapRetries = 5
)

type monitor struct {
Expand All @@ -67,7 +72,8 @@ type monitor struct {

serviceName primitives.ServiceName
services config.ServicePortMap
rp *service
rp *ringpop.Ringpop
maxJoinDuration time.Duration
rings map[primitives.ServiceName]*serviceResolver
logger log.Logger
metadataManager persistence.ClusterMetadataManager
Expand All @@ -82,10 +88,11 @@ var _ membership.Monitor = (*monitor)(nil)
func newMonitor(
serviceName primitives.ServiceName,
services config.ServicePortMap,
rp *service,
rp *ringpop.Ringpop,
logger log.Logger,
metadataManager persistence.ClusterMetadataManager,
broadcastHostPortResolver func() (string, error),
maxJoinDuration time.Duration,
) *monitor {
lifecycleCtx, lifecycleCancel := context.WithCancel(context.Background())
lifecycleCtx = headers.SetCallerInfo(
Expand All @@ -108,6 +115,7 @@ func newMonitor(
broadcastHostPortResolver: broadcastHostPortResolver,
hostID: uuid.NewUUID(),
initialized: future.NewFuture[struct{}](),
maxJoinDuration: maxJoinDuration,
}
for service, port := range services {
rpo.rings[service] = newServiceResolver(service, port, rp, logger)
Expand Down Expand Up @@ -135,15 +143,12 @@ func (rpo *monitor) Start() {
// TODO - Note this presents a small race condition as we write our identity before we bootstrap ringpop.
// This is a current limitation of the current structure of the ringpop library as
// we must know our seed nodes before bootstrapping
err = rpo.startHeartbeat(broadcastAddress)
if err != nil {

if err = rpo.startHeartbeat(broadcastAddress); err != nil {
rpo.logger.Fatal("unable to initialize membership heartbeats", tag.Error(err))
}

err = rpo.rp.start(
func() ([]string, error) { return rpo.fetchCurrentBootstrapHostports() },
healthyHostLastHeartbeatCutoff/2)
if err != nil {
if err = rpo.bootstrapRingPop(); err != nil {
// Stop() called during Start()'s execution. This is ok
if strings.Contains(err.Error(), "destroyed while attempting to join") {
return
Expand Down Expand Up @@ -174,6 +179,37 @@ func (rpo *monitor) Start() {
rpo.initialized.Set(struct{}{}, nil)
}

// bootstrap ring pop service by discovering the bootstrap hosts and joining the ring pop cluster
func (rpo *monitor) bootstrapRingPop() error {
policy := backoff.NewExponentialRetryPolicy(healthyHostLastHeartbeatCutoff / 2).
WithBackoffCoefficient(1).
WithMaximumAttempts(maxBootstrapRetries)
op := func() error {
hostPorts, err := rpo.fetchCurrentBootstrapHostports()
if err != nil {
return err
}

bootParams := &swim.BootstrapOptions{
ParallelismFactor: 10,
JoinSize: 1,
MaxJoinDuration: rpo.maxJoinDuration,
DiscoverProvider: statichosts.New(hostPorts...),
}

_, err = rpo.rp.Bootstrap(bootParams)
if err != nil {
rpo.logger.Warn("unable to bootstrap ringpop. retrying", tag.Error(err))
}
return err
}

if err := backoff.ThrottleRetry(op, policy, nil); err != nil {
return fmt.Errorf("exhausted all retries: %w", err)
}
return nil
}

func (rpo *monitor) WaitUntilInitialized(ctx context.Context) error {
_, err := rpo.initialized.Get(ctx)
return err
Expand Down Expand Up @@ -354,7 +390,7 @@ func (rpo *monitor) Stop() {
ring.Stop()
}

rpo.rp.stop()
rpo.rp.Destroy()
}

func (rpo *monitor) EvictSelf() error {
Expand Down
133 changes: 0 additions & 133 deletions common/membership/ringpop/ringpop.go

This file was deleted.

8 changes: 6 additions & 2 deletions common/membership/ringpop/service_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,14 @@ const (
replicaPoints = 100
)

type membershipManager interface {
AddListener()
}

type serviceResolver struct {
service primitives.ServiceName
port int
rp *service
rp *ringpop.Ringpop
refreshChan chan struct{}
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
Expand All @@ -86,7 +90,7 @@ var _ membership.ServiceResolver = (*serviceResolver)(nil)
func newServiceResolver(
service primitives.ServiceName,
port int,
rp *service,
rp *ringpop.Ringpop,
logger log.Logger,
) *serviceResolver {
resolver := &serviceResolver{
Expand Down
4 changes: 2 additions & 2 deletions common/membership/ringpop/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ func newTestCluster(
logger.Error("failed to create ringpop instance", tag.Error(err))
return nil
}
rpWrapper := newService(ringPop, time.Second*2, logger)
_, port, _ := splitHostPortTyped(cluster.hostAddrs[i])
cluster.rings[i] = newMonitor(
serviceName,
config.ServicePortMap{serviceName: int(port)}, // use same port for "grpc" port
rpWrapper,
ringPop,
logger,
mockMgr,
resolver,
2*time.Second,
)
cluster.rings[i].Start()
}
Expand Down
2 changes: 0 additions & 2 deletions proto/buf.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
version: v1beta1
deps:
- buf.build/googleapis/googleapis
build:
roots:
- internal
Expand Down

0 comments on commit 1da93ba

Please sign in to comment.