Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

merge ringpop services #5080

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 14 additions & 13 deletions common/membership/ringpop/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,30 +115,31 @@ func (factory *factory) getMonitor() *monitor {
factory.monOnce.Do(func() {
ctx, cancel := context.WithTimeout(context.Background(), persistenceOperationTimeout)
defer cancel()
ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)

ctx = headers.SetCallerInfo(ctx, headers.SystemBackgroundCallerInfo)
currentClusterMetadata, err := factory.MetadataManager.GetCurrentClusterMetadata(ctx)
if err != nil {
factory.Logger.Fatal("Failed to get current cluster ID", tag.Error(err))
}

appName := "temporal"
if currentClusterMetadata.UseClusterIdMembership {
appName = fmt.Sprintf("temporal-%s", currentClusterMetadata.GetClusterId())
}
if rp, err := ringpop.New(appName, ringpop.Channel(factory.getTChannel()), ringpop.AddressResolverFunc(factory.broadcastAddressResolver)); err != nil {
rp, err := ringpop.New(appName, ringpop.Channel(factory.getTChannel()), ringpop.AddressResolverFunc(factory.broadcastAddressResolver))
if err != nil {
factory.Logger.Fatal("Failed to get new ringpop", tag.Error(err))
} else {
mrp := newService(rp, factory.Config.MaxJoinDuration, factory.Logger)

factory.monitor = newMonitor(
factory.ServiceName,
factory.ServicePortMap,
mrp,
factory.Logger,
factory.MetadataManager,
factory.broadcastAddressResolver,
)
}

factory.monitor = newMonitor(
factory.ServiceName,
factory.ServicePortMap,
rp,
factory.Logger,
factory.MetadataManager,
factory.broadcastAddressResolver,
factory.Config.MaxJoinDuration,
)
})

return factory.monitor
Expand Down
70 changes: 53 additions & 17 deletions common/membership/ringpop/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,27 +35,32 @@ import (
"sync"
"time"

"github.com/pborman/uuid"
"github.com/temporalio/ringpop-go"
"github.com/temporalio/ringpop-go/discovery/statichosts"
"github.com/temporalio/ringpop-go/swim"

"go.temporal.io/server/common"
"go.temporal.io/server/common/backoff"
"go.temporal.io/server/common/config"
"go.temporal.io/server/common/convert"
"go.temporal.io/server/common/future"
"go.temporal.io/server/common/headers"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/primitives"

"github.com/pborman/uuid"

"go.temporal.io/server/common/persistence"

"go.temporal.io/server/common"
"go.temporal.io/server/common/log"
"go.temporal.io/server/common/log/tag"
"go.temporal.io/server/common/membership"
"go.temporal.io/server/common/persistence"
"go.temporal.io/server/common/primitives"
)

const (
upsertMembershipRecordExpiryDefault = time.Hour * 48

// 10 second base reporting frequency + 5 second jitter + 5 second acceptable time skew
healthyHostLastHeartbeatCutoff = time.Second * 20

// Number of times we retry refreshing the bootstrap list and try to join the Ringpop cluster before giving up
maxBootstrapRetries = 5
)

type monitor struct {
Expand All @@ -67,7 +72,8 @@ type monitor struct {

serviceName primitives.ServiceName
services config.ServicePortMap
rp *service
rp *ringpop.Ringpop
maxJoinDuration time.Duration
rings map[primitives.ServiceName]*serviceResolver
logger log.Logger
metadataManager persistence.ClusterMetadataManager
Expand All @@ -82,10 +88,11 @@ var _ membership.Monitor = (*monitor)(nil)
func newMonitor(
serviceName primitives.ServiceName,
services config.ServicePortMap,
rp *service,
rp *ringpop.Ringpop,
logger log.Logger,
metadataManager persistence.ClusterMetadataManager,
broadcastHostPortResolver func() (string, error),
maxJoinDuration time.Duration,
) *monitor {
lifecycleCtx, lifecycleCancel := context.WithCancel(context.Background())
lifecycleCtx = headers.SetCallerInfo(
Expand All @@ -108,6 +115,7 @@ func newMonitor(
broadcastHostPortResolver: broadcastHostPortResolver,
hostID: uuid.NewUUID(),
initialized: future.NewFuture[struct{}](),
maxJoinDuration: maxJoinDuration,
}
for service, port := range services {
rpo.rings[service] = newServiceResolver(service, port, rp, logger)
Expand Down Expand Up @@ -135,15 +143,12 @@ func (rpo *monitor) Start() {
// TODO - Note this presents a small race condition as we write our identity before we bootstrap ringpop.
// This is a current limitation of the current structure of the ringpop library as
// we must know our seed nodes before bootstrapping
err = rpo.startHeartbeat(broadcastAddress)
if err != nil {

if err = rpo.startHeartbeat(broadcastAddress); err != nil {
rpo.logger.Fatal("unable to initialize membership heartbeats", tag.Error(err))
}

err = rpo.rp.start(
func() ([]string, error) { return rpo.fetchCurrentBootstrapHostports() },
healthyHostLastHeartbeatCutoff/2)
if err != nil {
if err = rpo.bootstrapRingPop(); err != nil {
// Stop() called during Start()'s execution. This is ok
if strings.Contains(err.Error(), "destroyed while attempting to join") {
return
Expand Down Expand Up @@ -174,6 +179,37 @@ func (rpo *monitor) Start() {
rpo.initialized.Set(struct{}{}, nil)
}

// bootstrap ring pop service by discovering the bootstrap hosts and joining the ring pop cluster
func (rpo *monitor) bootstrapRingPop() error {
policy := backoff.NewExponentialRetryPolicy(healthyHostLastHeartbeatCutoff / 2).
WithBackoffCoefficient(1).
WithMaximumAttempts(maxBootstrapRetries)
op := func() error {
hostPorts, err := rpo.fetchCurrentBootstrapHostports()
if err != nil {
return err
}

bootParams := &swim.BootstrapOptions{
ParallelismFactor: 10,
JoinSize: 1,
MaxJoinDuration: rpo.maxJoinDuration,
DiscoverProvider: statichosts.New(hostPorts...),
}

_, err = rpo.rp.Bootstrap(bootParams)
if err != nil {
rpo.logger.Warn("unable to bootstrap ringpop. retrying", tag.Error(err))
}
return err
}

if err := backoff.ThrottleRetry(op, policy, nil); err != nil {
return fmt.Errorf("exhausted all retries: %w", err)
}
return nil
}

func (rpo *monitor) WaitUntilInitialized(ctx context.Context) error {
_, err := rpo.initialized.Get(ctx)
return err
Expand Down Expand Up @@ -354,7 +390,7 @@ func (rpo *monitor) Stop() {
ring.Stop()
}

rpo.rp.stop()
rpo.rp.Destroy()
}

func (rpo *monitor) EvictSelf() error {
Expand Down
133 changes: 0 additions & 133 deletions common/membership/ringpop/ringpop.go

This file was deleted.

8 changes: 6 additions & 2 deletions common/membership/ringpop/service_resolver.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,10 +62,14 @@ const (
replicaPoints = 100
)

type membershipManager interface {
AddListener()
}

type serviceResolver struct {
service primitives.ServiceName
port int
rp *service
rp *ringpop.Ringpop
refreshChan chan struct{}
shutdownCh chan struct{}
shutdownWG sync.WaitGroup
Expand All @@ -86,7 +90,7 @@ var _ membership.ServiceResolver = (*serviceResolver)(nil)
func newServiceResolver(
service primitives.ServiceName,
port int,
rp *service,
rp *ringpop.Ringpop,
logger log.Logger,
) *serviceResolver {
resolver := &serviceResolver{
Expand Down
4 changes: 2 additions & 2 deletions common/membership/ringpop/test_cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,15 +156,15 @@ func newTestCluster(
logger.Error("failed to create ringpop instance", tag.Error(err))
return nil
}
rpWrapper := newService(ringPop, time.Second*2, logger)
_, port, _ := splitHostPortTyped(cluster.hostAddrs[i])
cluster.rings[i] = newMonitor(
serviceName,
config.ServicePortMap{serviceName: int(port)}, // use same port for "grpc" port
rpWrapper,
ringPop,
logger,
mockMgr,
resolver,
2*time.Second,
)
cluster.rings[i].Start()
}
Expand Down
2 changes: 0 additions & 2 deletions proto/buf.yaml
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
version: v1beta1
deps:
- buf.build/googleapis/googleapis
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🙁 I think I needed to add this to get proto generation to work correctly on a branch

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's weird: it broke suddenly in CI for me :/

build:
roots:
- internal
Expand Down
Loading