Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HA SDK terminators test #2527

Merged
merged 3 commits into from
Nov 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,25 @@
# Release 1.2.1

## What's New

* Bug fixes and continuing progress on controller HA

## Component Updates and Bug Fixes

* github.com/openziti/agent: [v1.0.19 -> v1.0.20](https://github.com/openziti/agent/compare/v1.0.19...v1.0.20)
* github.com/openziti/channel/v3: [v3.0.10 -> v3.0.13](https://github.com/openziti/channel/compare/v3.0.10...v3.0.13)
* github.com/openziti/foundation/v2: [v2.0.50 -> v2.0.52](https://github.com/openziti/foundation/compare/v2.0.50...v2.0.52)
* github.com/openziti/identity: [v1.0.88 -> v1.0.90](https://github.com/openziti/identity/compare/v1.0.88...v1.0.90)
* github.com/openziti/metrics: [v1.2.59 -> v1.2.61](https://github.com/openziti/metrics/compare/v1.2.59...v1.2.61)
* github.com/openziti/runzmd: [v1.0.53 -> v1.0.55](https://github.com/openziti/runzmd/compare/v1.0.53...v1.0.55)
* github.com/openziti/storage: [v0.3.2 -> v0.3.6](https://github.com/openziti/storage/compare/v0.3.2...v0.3.6)
* github.com/openziti/transport/v2: [v2.0.150 -> v2.0.153](https://github.com/openziti/transport/compare/v2.0.150...v2.0.153)
* github.com/openziti/ziti: [v1.2.0 -> v1.2.1](https://github.com/openziti/ziti/compare/v1.2.0...v1.2.1)
* [Issue #2532](https://github.com/openziti/ziti/issues/2532) - When adding an existing HA cluster member, remove/add if suffrage has changed
* [Issue #2217](https://github.com/openziti/ziti/issues/2217) - Controller list is empty until peers connect
* [Issue #2533](https://github.com/openziti/ziti/issues/2533) - Handle concurrent raft connections
* [Issue #2528](https://github.com/openziti/ziti/issues/2528) - Updated router costs are not use when evaluating current path cost in the context of smart rerouting

# Release 1.2.0

## What's New
Expand Down
16 changes: 13 additions & 3 deletions common/pb/cmd_pb/cmd.pb.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions common/pb/cmd_pb/cmd.proto
Original file line number Diff line number Diff line change
Expand Up @@ -123,4 +123,5 @@ message Terminator {
string hostId = 12;
bool isSystem = 13;
uint32 savedPrecedence = 14;
string sourceCtrl = 15;
}
31 changes: 30 additions & 1 deletion common/pb/ctrl_pb/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@

package ctrl_pb

import "github.com/openziti/ziti/controller/xt"
import (
"github.com/openziti/channel/v3/protobufs"
"github.com/openziti/ziti/controller/xt"
)

func (request *CircuitConfirmation) GetContentType() int32 {
return int32(ContentType_CircuitConfirmationType)
Expand Down Expand Up @@ -107,3 +110,29 @@ func (request *UpdateCtrlAddresses) GetContentType() int32 {
func (request *PeerStateChanges) GetContentType() int32 {
return int32(ContentType_PeerStateChangeRequestType)
}

type FilterableValidateTerminatorsRequest interface {
protobufs.TypedMessage
FilterTerminators(f func(terminator *Terminator) bool)
GetTerminators() []*Terminator
}

func (request *ValidateTerminatorsRequest) FilterTerminators(f func(terminator *Terminator) bool) {
var terminators []*Terminator
for _, terminator := range request.Terminators {
if f(terminator) {
terminators = append(terminators, terminator)
}
}
request.Terminators = terminators
}

func (request *ValidateTerminatorsV2Request) FilterTerminators(f func(terminator *Terminator) bool) {
var terminators []*Terminator
for _, terminator := range request.Terminators {
if f(terminator) {
terminators = append(terminators, terminator)
}
}
request.Terminators = terminators
}
50 changes: 40 additions & 10 deletions common/router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package common

import (
"compress/gzip"
"crypto"
"crypto/x509"
"encoding/json"
"fmt"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
cmap "github.com/orcaman/concurrent-map/v2"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -93,14 +96,15 @@ type RouterDataModel struct {
EventCache
listeners map[chan *edge_ctrl_pb.DataState_ChangeSet]struct{}

ConfigTypes cmap.ConcurrentMap[string, *ConfigType] `json:"configTypes"`
Configs cmap.ConcurrentMap[string, *Config] `json:"configs"`
Identities cmap.ConcurrentMap[string, *Identity] `json:"identities"`
Services cmap.ConcurrentMap[string, *Service] `json:"services"`
ServicePolicies cmap.ConcurrentMap[string, *ServicePolicy] `json:"servicePolicies"`
PostureChecks cmap.ConcurrentMap[string, *PostureCheck] `json:"postureChecks"`
PublicKeys cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_PublicKey] `json:"publicKeys"`
Revocations cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_Revocation] `json:"revocations"`
ConfigTypes cmap.ConcurrentMap[string, *ConfigType] `json:"configTypes"`
Configs cmap.ConcurrentMap[string, *Config] `json:"configs"`
Identities cmap.ConcurrentMap[string, *Identity] `json:"identities"`
Services cmap.ConcurrentMap[string, *Service] `json:"services"`
ServicePolicies cmap.ConcurrentMap[string, *ServicePolicy] `json:"servicePolicies"`
PostureChecks cmap.ConcurrentMap[string, *PostureCheck] `json:"postureChecks"`
PublicKeys cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_PublicKey] `json:"publicKeys"`
Revocations cmap.ConcurrentMap[string, *edge_ctrl_pb.DataState_Revocation] `json:"revocations"`
CachedPublicKeys concurrenz.AtomicValue[map[string]crypto.PublicKey]

listenerBufferSize uint
lastSaveIndex *uint64
Expand Down Expand Up @@ -433,6 +437,7 @@ func (rdm *RouterDataModel) HandlePublicKeyEvent(event *edge_ctrl_pb.DataState_E
} else {
rdm.PublicKeys.Set(model.PublicKey.Kid, model.PublicKey)
}
rdm.recalculateCachedPublicKeys()
}

// HandleRevocationEvent will apply the delta event to the router data model. It is not restricted by index calculations.
Expand Down Expand Up @@ -507,8 +512,33 @@ func (rdm *RouterDataModel) HandleServicePolicyChange(index uint64, model *edge_
})
}

func (rdm *RouterDataModel) GetPublicKeys() map[string]*edge_ctrl_pb.DataState_PublicKey {
return rdm.PublicKeys.Items()
func (rdm *RouterDataModel) GetPublicKeys() map[string]crypto.PublicKey {
return rdm.CachedPublicKeys.Load()
}

func (rdm *RouterDataModel) recalculateCachedPublicKeys() {
publicKeys := map[string]crypto.PublicKey{}
rdm.PublicKeys.IterCb(func(kid string, pubKey *edge_ctrl_pb.DataState_PublicKey) {
log := pfxlog.Logger().WithField("format", pubKey.Format).WithField("kid", kid)

switch pubKey.Format {
case edge_ctrl_pb.DataState_PublicKey_X509CertDer:
if cert, err := x509.ParseCertificate(pubKey.GetData()); err != nil {
log.WithError(err).Error("error parsing x509 certificate DER")
} else {
publicKeys[kid] = cert.PublicKey
}
case edge_ctrl_pb.DataState_PublicKey_PKIXPublicKey:
if pub, err := x509.ParsePKIXPublicKey(pubKey.GetData()); err != nil {
log.WithError(err).Error("error parsing PKIX public key DER")
} else {
publicKeys[kid] = pub
}
default:
log.Error("unknown public key format")
}
})
rdm.CachedPublicKeys.Store(publicKeys)
}

func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState {
Expand Down
5 changes: 5 additions & 0 deletions controller/command/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ type Dispatcher interface {
Dispatch(command Command) error
IsLeaderOrLeaderless() bool
IsLeaderless() bool
IsLeader() bool
GetPeers() map[string]channel.Channel
GetRateLimiter() rate.RateLimiter
Bootstrap() error
Expand All @@ -67,6 +68,10 @@ func (self *LocalDispatcher) Bootstrap() error {
return nil
}

func (self *LocalDispatcher) IsLeader() bool {
return true
}

func (self *LocalDispatcher) IsLeaderOrLeaderless() bool {
return true
}
Expand Down
12 changes: 8 additions & 4 deletions controller/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ const (
DefaultHealthChecksBoltCheckTimeout = 20 * time.Second
DefaultHealthChecksBoltCheckInitialDelay = 30 * time.Second

DefaultRaftCommandHandlerMaxQueueSize = 1000
DefaultRaftCommandHandlerMaxQueueSize = 250

// DefaultTlsHandshakeRateLimiterEnabled is whether the tls handshake rate limiter is enabled by default
DefaultTlsHandshakeRateLimiterEnabled = false
Expand Down Expand Up @@ -204,6 +204,10 @@ func LoadConfig(path string) (*Config, error) {
if value, found := cfgmap["raft"]; found {
if submap, ok := value.(map[interface{}]interface{}); ok {
controllerConfig.Raft = &RaftConfig{}

controllerConfig.Raft.ElectionTimeout = 5 * time.Second
controllerConfig.Raft.HeartbeatTimeout = 3 * time.Second
controllerConfig.Raft.LeaderLeaseTimeout = 3 * time.Second
controllerConfig.Raft.CommandHandlerOptions.MaxQueueSize = DefaultRaftCommandHandlerMaxQueueSize

if value, found := submap["dataDir"]; found {
Expand Down Expand Up @@ -243,23 +247,23 @@ func LoadConfig(path string) (*Config, error) {

if value, found := submap["electionTimeout"]; found {
if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil {
controllerConfig.Raft.ElectionTimeout = &val
controllerConfig.Raft.ElectionTimeout = val
} else {
return nil, errors.Wrapf(err, "failed to parse raft.electionTimeout value '%v", value)
}
}

if value, found := submap["heartbeatTimeout"]; found {
if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil {
controllerConfig.Raft.HeartbeatTimeout = &val
controllerConfig.Raft.HeartbeatTimeout = val
} else {
return nil, errors.Wrapf(err, "failed to parse raft.heartbeatTimeout value '%v", value)
}
}

if value, found := submap["leaderLeaseTimeout"]; found {
if val, err := time.ParseDuration(fmt.Sprintf("%v", value)); err == nil {
controllerConfig.Raft.LeaderLeaseTimeout = &val
controllerConfig.Raft.LeaderLeaseTimeout = val
} else {
return nil, errors.Wrapf(err, "failed to parse raft.leaderLeaseTimeout value '%v", value)
}
Expand Down
6 changes: 3 additions & 3 deletions controller/config/config_raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@ type RaftConfig struct {
TrailingLogs *uint32
MaxAppendEntries *uint32

ElectionTimeout *time.Duration
ElectionTimeout time.Duration
CommitTimeout *time.Duration
HeartbeatTimeout *time.Duration
LeaderLeaseTimeout *time.Duration
HeartbeatTimeout time.Duration
LeaderLeaseTimeout time.Duration

LogLevel *string
Logger hclog.Logger
Expand Down
2 changes: 1 addition & 1 deletion controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -540,7 +540,7 @@ func (c *Controller) GetEventDispatcher() event.Dispatcher {
}

func (c *Controller) routerDispatchCallback(evt *event.ClusterEvent) {
if evt.EventType == event.ClusterMembersChanged {
if evt.EventType == event.ClusterMembersChanged || evt.EventType == event.ClusterLeadershipGained {
var endpoints []string
for _, peer := range evt.Peers {
endpoints = append(endpoints, peer.Addr)
Expand Down
16 changes: 8 additions & 8 deletions controller/db/controller_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,12 @@ const (

type Controller struct {
boltz.BaseExtEntity
Name string `json:"name"`
CtrlAddress string `json:"address"`
CertPem string `json:"certPem"`
Fingerprint string `json:"fingerprint"`
IsOnline bool `json:"isOnline"`
LastJoinedAt *time.Time `json:"lastJoinedAt"`
Name string `json:"name"`
CtrlAddress string `json:"address"`
CertPem string `json:"certPem"`
Fingerprint string `json:"fingerprint"`
IsOnline bool `json:"isOnline"`
LastJoinedAt time.Time `json:"lastJoinedAt"`
ApiAddresses map[string][]ApiAddress
}

Expand Down Expand Up @@ -105,7 +105,7 @@ func (store *controllerStoreImpl) FillEntity(entity *Controller, bucket *boltz.T
entity.CertPem = bucket.GetStringOrError(FieldControllerCertPem)
entity.Fingerprint = bucket.GetStringOrError(FieldControllerFingerprint)
entity.IsOnline = bucket.GetBoolWithDefault(FieldControllerIsOnline, false)
entity.LastJoinedAt = bucket.GetTime(FieldControllerLastJoinedAt)
entity.LastJoinedAt = bucket.GetTimeOrError(FieldControllerLastJoinedAt)
entity.ApiAddresses = map[string][]ApiAddress{}

apiListBucket := bucket.GetBucket(FieldControllerApiAddresses)
Expand Down Expand Up @@ -142,7 +142,7 @@ func (store *controllerStoreImpl) PersistEntity(entity *Controller, ctx *boltz.P
ctx.SetString(FieldControllerCertPem, entity.CertPem)
ctx.SetString(FieldControllerFingerprint, entity.Fingerprint)
ctx.SetBool(FieldControllerIsOnline, entity.IsOnline)
ctx.SetTimeP(FieldControllerLastJoinedAt, entity.LastJoinedAt)
ctx.SetTimeP(FieldControllerLastJoinedAt, &entity.LastJoinedAt)

apiListBucket := ctx.Bucket.GetOrCreateBucket(FieldControllerApiAddresses)

Expand Down
11 changes: 10 additions & 1 deletion controller/db/terminator_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ package db
import (
"encoding/binary"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/ziti/controller/xt"
"github.com/openziti/foundation/v2/sequence"
"github.com/openziti/storage/ast"
"github.com/openziti/storage/boltz"
"github.com/openziti/ziti/controller/xt"
"go.etcd.io/bbolt"
)

Expand All @@ -39,6 +39,7 @@ const (
FieldServerPeerData = "peerData"
FieldTerminatorHostId = "hostId"
FieldTerminatorSavedPrecedence = "savedPrecedence"
FieldTerminatorsSourceCtrl = "sourceCtrl"
)

type Terminator struct {
Expand All @@ -54,6 +55,7 @@ type Terminator struct {
PeerData xt.PeerData `json:"peerData"`
HostId string `json:"hostId"`
SavedPrecedence *string `json:"savedPrecedence"`
SourceCtrl string `json:"sourceCtrl"`
}

func (entity *Terminator) GetCost() uint16 {
Expand Down Expand Up @@ -100,6 +102,10 @@ func (entity *Terminator) GetEntityType() string {
return EntityTypeTerminators
}

func (entity *Terminator) GetSourceCtrl() string {
return entity.SourceCtrl
}

type TerminatorStore interface {
boltz.EntityStore[*Terminator]
GetTerminatorsInIdentityGroup(tx *bbolt.Tx, terminatorId string) ([]*Terminator, error)
Expand Down Expand Up @@ -133,6 +139,7 @@ func (store *terminatorStoreImpl) initializeLocal() {
store.AddSymbol(FieldTerminatorAddress, ast.NodeTypeString)
store.AddSymbol(FieldTerminatorInstanceId, ast.NodeTypeString)
store.AddSymbol(FieldTerminatorHostId, ast.NodeTypeString)
store.AddSymbol(FieldTerminatorsSourceCtrl, ast.NodeTypeString)

store.serviceSymbol = store.AddFkSymbol(FieldTerminatorService, store.stores.service)
store.routerSymbol = store.AddFkSymbol(FieldTerminatorRouter, store.stores.router)
Expand Down Expand Up @@ -164,6 +171,7 @@ func (store *terminatorStoreImpl) FillEntity(entity *Terminator, bucket *boltz.T
entity.Precedence = bucket.GetStringWithDefault(FieldTerminatorPrecedence, xt.Precedences.Default.String())
entity.HostId = bucket.GetStringWithDefault(FieldTerminatorHostId, "")
entity.SavedPrecedence = bucket.GetString(FieldTerminatorSavedPrecedence)
entity.SourceCtrl = bucket.GetStringWithDefault(FieldTerminatorsSourceCtrl, "")

data := bucket.GetBucket(FieldServerPeerData)
if data != nil {
Expand Down Expand Up @@ -201,6 +209,7 @@ func (store *terminatorStoreImpl) PersistEntity(entity *Terminator, ctx *boltz.P
ctx.SetRequiredString(FieldTerminatorPrecedence, entity.Precedence)
ctx.SetString(FieldTerminatorHostId, entity.HostId)
ctx.SetStringP(FieldTerminatorSavedPrecedence, entity.SavedPrecedence)
ctx.SetString(FieldTerminatorsSourceCtrl, entity.SourceCtrl)

if ctx.ProceedWithSet(FieldServerPeerData) {
_ = ctx.Bucket.DeleteBucket([]byte(FieldServerPeerData))
Expand Down
4 changes: 4 additions & 0 deletions controller/env/appenv.go
Original file line number Diff line number Diff line change
Expand Up @@ -413,6 +413,10 @@ func (ae *AppEnv) AddRouterPresenceHandler(h model.RouterPresenceHandler) {
ae.HostController.GetNetwork().AddRouterPresenceHandler(h)
}

func (ae *AppEnv) GetId() string {
return ae.HostController.GetNetwork().GetAppId()
}

type HostController interface {
GetConfig() *config.Config
GetEnv() *AppEnv
Expand Down
Loading
Loading