Skip to content

Commit

Permalink
HA SDK terminators test. Fixes #2217. Fixes #2533
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Nov 12, 2024
1 parent 361be18 commit 52a2efb
Show file tree
Hide file tree
Showing 52 changed files with 844 additions and 475 deletions.
16 changes: 13 additions & 3 deletions common/pb/cmd_pb/cmd.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/pb/cmd_pb/cmd.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,5 @@ message Terminator {
string hostId = 12;
bool isSystem = 13;
uint32 savedPrecedence = 14;
string sourceCtrl = 15;
}
31 changes: 30 additions & 1 deletion common/pb/ctrl_pb/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package ctrl_pb

import "github.com/openziti/ziti/controller/xt"
import (
"github.com/openziti/channel/v3/protobufs"
"github.com/openziti/ziti/controller/xt"
)

func (request *CircuitConfirmation) GetContentType() int32 {
return int32(ContentType_CircuitConfirmationType)
Expand Down Expand Up @@ -107,3 +110,29 @@ func (request *UpdateCtrlAddresses) GetContentType() int32 {
func (request *PeerStateChanges) GetContentType() int32 {
return int32(ContentType_PeerStateChangeRequestType)
}

type FilterableValidateTerminatorsRequest interface {
protobufs.TypedMessage
FilterTerminators(f func(terminator *Terminator) bool)
GetTerminators() []*Terminator
}

func (request *ValidateTerminatorsRequest) FilterTerminators(f func(terminator *Terminator) bool) {
var terminators []*Terminator
for _, terminator := range request.Terminators {
if f(terminator) {
terminators = append(terminators, terminator)
}
}
request.Terminators = terminators
}

func (request *ValidateTerminatorsV2Request) FilterTerminators(f func(terminator *Terminator) bool) {
var terminators []*Terminator
for _, terminator := range request.Terminators {
if f(terminator) {
terminators = append(terminators, terminator)
}
}
request.Terminators = terminators
}
50 changes: 40 additions & 10 deletions common/router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package common

import (
"compress/gzip"
"crypto"
"crypto/x509"
"encoding/json"
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -93,14 +96,15 @@ type RouterDataModel struct {
EventCache
listeners map[chan *edge_ctrl_pb.DataState_ChangeSet]struct{}

ConfigTypes cmap.ConcurrentMap[string, *ConfigType] `json:"configTypes"`
Configs cmap.ConcurrentMap[string, *Config] `json:"configs"`
Identities cmap.ConcurrentMap[string, *Identity] `json:"identities"`
Services cmap.ConcurrentMap[string, *Service] `json:"services"`
ServicePolicies cmap.ConcurrentMap[string, *ServicePolicy] `json:"servicePolicies"`
PostureChecks cmap.ConcurrentMap[string, *PostureCheck] `json:"postureChecks"`
PublicKeys cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_PublicKey] `json:"publicKeys"`
Revocations cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_Revocation] `json:"revocations"`
ConfigTypes cmap.ConcurrentMap[string, *ConfigType] `json:"configTypes"`
Configs cmap.ConcurrentMap[string, *Config] `json:"configs"`
Identities cmap.ConcurrentMap[string, *Identity] `json:"identities"`
Services cmap.ConcurrentMap[string, *Service] `json:"services"`
ServicePolicies cmap.ConcurrentMap[string, *ServicePolicy] `json:"servicePolicies"`
PostureChecks cmap.ConcurrentMap[string, *PostureCheck] `json:"postureChecks"`
PublicKeys cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_PublicKey] `json:"publicKeys"`
Revocations cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_Revocation] `json:"revocations"`
CachedPublicKeys concurrenz.AtomicValue[map[string]crypto.PublicKey]

listenerBufferSize uint
lastSaveIndex *uint64
Expand Down Expand Up @@ -433,6 +437,7 @@ func (rdm *RouterDataModel) HandlePublicKeyEvent(event *edge_ctrl_pb.DataState_E
} else {
rdm.PublicKeys.Set(model.PublicKey.Kid, model.PublicKey)
}
rdm.recalculateCachedPublicKeys()
}

// HandleRevocationEvent will apply the delta event to the router data model. It is not restricted by index calculations.
Expand Down Expand Up @@ -507,8 +512,33 @@ func (rdm *RouterDataModel) HandleServicePolicyChange(index uint64, model *edge_
})
}

func (rdm *RouterDataModel) GetPublicKeys() map[string]*edge_ctrl_pb.DataState_PublicKey {
return rdm.PublicKeys.Items()
func (rdm *RouterDataModel) GetPublicKeys() map[string]crypto.PublicKey {
return rdm.CachedPublicKeys.Load()
}

func (rdm *RouterDataModel) recalculateCachedPublicKeys() {
publicKeys := map[string]crypto.PublicKey{}
rdm.PublicKeys.IterCb(func(kid string, pubKey *edge_ctrl_pb.DataState_PublicKey) {
log := pfxlog.Logger().WithField("format", pubKey.Format).WithField("kid", kid)

switch pubKey.Format {
case edge_ctrl_pb.DataState_PublicKey_X509CertDer:
if cert, err := x509.ParseCertificate(pubKey.GetData()); err != nil {
log.WithError(err).Error("error parsing x509 certificate DER")
} else {
publicKeys[kid] = cert.PublicKey
}
case edge_ctrl_pb.DataState_PublicKey_PKIXPublicKey:
if pub, err := x509.ParsePKIXPublicKey(pubKey.GetData()); err != nil {
log.WithError(err).Error("error parsing PKIX public key DER")
} else {
publicKeys[kid] = pub
}
default:
log.Error("unknown public key format")
}
})
rdm.CachedPublicKeys.Store(publicKeys)
}

func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState {
Expand Down
5 changes: 5 additions & 0 deletions controller/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Dispatcher interface {
Dispatch(command Command) error
IsLeaderOrLeaderless() bool
IsLeaderless() bool
IsLeader() bool
GetPeers() map[string]channel.Channel
GetRateLimiter() rate.RateLimiter
Bootstrap() error
Expand All @@ -67,6 +68,10 @@ func (self *LocalDispatcher) Bootstrap() error {
return nil
}

func (self *LocalDispatcher) IsLeader() bool {
return true
}

func (self *LocalDispatcher) IsLeaderOrLeaderless() bool {
return true
}
Expand Down
12 changes: 8 additions & 4 deletions controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
DefaultHealthChecksBoltCheckTimeout = 20 * time.Second
DefaultHealthChecksBoltCheckInitialDelay = 30 * time.Second

DefaultRaftCommandHandlerMaxQueueSize = 1000
DefaultRaftCommandHandlerMaxQueueSize = 250

// DefaultTlsHandshakeRateLimiterEnabled is whether the tls handshake rate limiter is enabled by default
DefaultTlsHandshakeRateLimiterEnabled = false
Expand Down Expand Up @@ -204,6 +204,10 @@ func LoadConfig(path string) (*Config, error) {
if value, found := cfgmap["raft"]; found {
if submap, ok := value.(map[interface{}]interface{}); ok {
controllerConfig.Raft = &RaftConfig{}

controllerConfig.Raft.ElectionTimeout = 5 * time.Second
controllerConfig.Raft.HeartbeatTimeout = 3 * time.Second
controllerConfig.Raft.LeaderLeaseTimeout = 3 * time.Second
controllerConfig.Raft.CommandHandlerOptions.MaxQueueSize = DefaultRaftCommandHandlerMaxQueueSize

if value, found := submap["dataDir"]; found {
Expand Down Expand Up @@ -243,23 +247,23 @@ func LoadConfig(path string) (*Config, error) {

if value, found := submap["electionTimeout"]; found {
if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil {
controllerConfig.Raft.ElectionTimeout = &val
controllerConfig.Raft.ElectionTimeout = val
} else {
return nil, errors.Wrapf(err, "failed to parse raft.electionTimeout value '%v", value)
}
}

if value, found := submap["heartbeatTimeout"]; found {
if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil {
controllerConfig.Raft.HeartbeatTimeout = &val
controllerConfig.Raft.HeartbeatTimeout = val
} else {
return nil, errors.Wrapf(err, "failed to parse raft.heartbeatTimeout value '%v", value)
}
}

if value, found := submap["leaderLeaseTimeout"]; found {
if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil {
controllerConfig.Raft.LeaderLeaseTimeout = &val
controllerConfig.Raft.LeaderLeaseTimeout = val
} else {
return nil, errors.Wrapf(err, "failed to parse raft.leaderLeaseTimeout value '%v", value)
}
Expand Down
6 changes: 3 additions & 3 deletions controller/config/config_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type RaftConfig struct {
TrailingLogs *uint32
MaxAppendEntries *uint32

ElectionTimeout *time.Duration
ElectionTimeout time.Duration
CommitTimeout *time.Duration
HeartbeatTimeout *time.Duration
LeaderLeaseTimeout *time.Duration
HeartbeatTimeout time.Duration
LeaderLeaseTimeout time.Duration

LogLevel *string
Logger hclog.Logger
Expand Down
2 changes: 1 addition & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (c *Controller) GetEventDispatcher() event.Dispatcher {
}

func (c *Controller) routerDispatchCallback(evt *event.ClusterEvent) {
if evt.EventType == event.ClusterMembersChanged {
if evt.EventType == event.ClusterMembersChanged || evt.EventType == event.ClusterLeadershipGained {
var endpoints []string
for _, peer := range evt.Peers {
endpoints = append(endpoints, peer.Addr)
Expand Down
16 changes: 8 additions & 8 deletions controller/db/controller_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ const (

type Controller struct {
boltz.BaseExtEntity
Name string `json:"name"`
CtrlAddress string `json:"address"`
CertPem string `json:"certPem"`
Fingerprint string `json:"fingerprint"`
IsOnline bool `json:"isOnline"`
LastJoinedAt *time.Time `json:"lastJoinedAt"`
Name string `json:"name"`
CtrlAddress string `json:"address"`
CertPem string `json:"certPem"`
Fingerprint string `json:"fingerprint"`
IsOnline bool `json:"isOnline"`
LastJoinedAt time.Time `json:"lastJoinedAt"`
ApiAddresses map[string][]ApiAddress
}

Expand Down Expand Up @@ -105,7 +105,7 @@ func (store *controllerStoreImpl) FillEntity(entity *Controller, bucket *boltz.T
entity.CertPem = bucket.GetStringOrError(FieldControllerCertPem)
entity.Fingerprint = bucket.GetStringOrError(FieldControllerFingerprint)
entity.IsOnline = bucket.GetBoolWithDefault(FieldControllerIsOnline, false)
entity.LastJoinedAt = bucket.GetTime(FieldControllerLastJoinedAt)
entity.LastJoinedAt = bucket.GetTimeOrError(FieldControllerLastJoinedAt)
entity.ApiAddresses = map[string][]ApiAddress{}

apiListBucket := bucket.GetBucket(FieldControllerApiAddresses)
Expand Down Expand Up @@ -142,7 +142,7 @@ func (store *controllerStoreImpl) PersistEntity(entity *Controller, ctx *boltz.P
ctx.SetString(FieldControllerCertPem, entity.CertPem)
ctx.SetString(FieldControllerFingerprint, entity.Fingerprint)
ctx.SetBool(FieldControllerIsOnline, entity.IsOnline)
ctx.SetTimeP(FieldControllerLastJoinedAt, entity.LastJoinedAt)
ctx.SetTimeP(FieldControllerLastJoinedAt, &entity.LastJoinedAt)

apiListBucket := ctx.Bucket.GetOrCreateBucket(FieldControllerApiAddresses)

Expand Down
11 changes: 10 additions & 1 deletion controller/db/terminator_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package db
import (
"encoding/binary"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/ziti/controller/xt"
"github.com/openziti/foundation/v2/sequence"
"github.com/openziti/storage/ast"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/controller/xt"
"go.etcd.io/bbolt"
)

Expand All @@ -39,6 +39,7 @@ const (
FieldServerPeerData = "peerData"
FieldTerminatorHostId = "hostId"
FieldTerminatorSavedPrecedence = "savedPrecedence"
FieldTerminatorsSourceCtrl = "sourceCtrl"
)

type Terminator struct {
Expand All @@ -54,6 +55,7 @@ type Terminator struct {
PeerData xt.PeerData `json:"peerData"`
HostId string `json:"hostId"`
SavedPrecedence *string `json:"savedPrecedence"`
SourceCtrl string `json:"sourceCtrl"`
}

func (entity *Terminator) GetCost() uint16 {
Expand Down Expand Up @@ -100,6 +102,10 @@ func (entity *Terminator) GetEntityType() string {
return EntityTypeTerminators
}

func (entity *Terminator) GetSourceCtrl() string {
return entity.SourceCtrl
}

type TerminatorStore interface {
boltz.EntityStore[*Terminator]
GetTerminatorsInIdentityGroup(tx *bbolt.Tx, terminatorId string) ([]*Terminator, error)
Expand Down Expand Up @@ -133,6 +139,7 @@ func (store *terminatorStoreImpl) initializeLocal() {
store.AddSymbol(FieldTerminatorAddress, ast.NodeTypeString)
store.AddSymbol(FieldTerminatorInstanceId, ast.NodeTypeString)
store.AddSymbol(FieldTerminatorHostId, ast.NodeTypeString)
store.AddSymbol(FieldTerminatorsSourceCtrl, ast.NodeTypeString)

store.serviceSymbol = store.AddFkSymbol(FieldTerminatorService, store.stores.service)
store.routerSymbol = store.AddFkSymbol(FieldTerminatorRouter, store.stores.router)
Expand Down Expand Up @@ -164,6 +171,7 @@ func (store *terminatorStoreImpl) FillEntity(entity *Terminator, bucket *boltz.T
entity.Precedence = bucket.GetStringWithDefault(FieldTerminatorPrecedence, xt.Precedences.Default.String())
entity.HostId = bucket.GetStringWithDefault(FieldTerminatorHostId, "")
entity.SavedPrecedence = bucket.GetString(FieldTerminatorSavedPrecedence)
entity.SourceCtrl = bucket.GetStringWithDefault(FieldTerminatorsSourceCtrl, "")

data := bucket.GetBucket(FieldServerPeerData)
if data != nil {
Expand Down Expand Up @@ -201,6 +209,7 @@ func (store *terminatorStoreImpl) PersistEntity(entity *Terminator, ctx *boltz.P
ctx.SetRequiredString(FieldTerminatorPrecedence, entity.Precedence)
ctx.SetString(FieldTerminatorHostId, entity.HostId)
ctx.SetStringP(FieldTerminatorSavedPrecedence, entity.SavedPrecedence)
ctx.SetString(FieldTerminatorsSourceCtrl, entity.SourceCtrl)

if ctx.ProceedWithSet(FieldServerPeerData) {
_ = ctx.Bucket.DeleteBucket([]byte(FieldServerPeerData))
Expand Down
4 changes: 4 additions & 0 deletions controller/env/appenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ func (ae *AppEnv) AddRouterPresenceHandler(h model.RouterPresenceHandler) {
ae.HostController.GetNetwork().AddRouterPresenceHandler(h)
}

func (ae *AppEnv) GetId() string {
return ae.HostController.GetNetwork().GetAppId()
}

type HostController interface {
GetConfig() *config.Config
GetEnv() *AppEnv
Expand Down
Loading

0 comments on commit 52a2efb

Please sign in to comment.