Skip to content

Commit

Permalink
Remove HA config from router. Fixes #2566
Browse files Browse the repository at this point in the history
  • Loading branch information
plorenz committed Jan 6, 2025
1 parent f3b692c commit 6191645
Show file tree
Hide file tree
Showing 25 changed files with 531 additions and 171 deletions.
91 changes: 91 additions & 0 deletions common/config/value.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
Copyright NetFoundry Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
https://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package config

import (
"github.com/openziti/foundation/v2/concurrenz"
"sync"
)

type Listener[T any] interface {
NotifyChanged(init bool, old T, new T)
}

type ListenerFunc[T any] func(init bool, old T, new T)

func (f ListenerFunc[T]) NotifyChanged(init bool, old T, new T) {
f(init, old, new)
}

func NewConfigValue[T comparable]() *Value[T] {
return &Value[T]{
notifyInitialized: make(chan struct{}),
}
}

type Value[T comparable] struct {
lock sync.Mutex
initialized bool
notifyInitialized chan struct{}
value concurrenz.AtomicValue[T]
listeners concurrenz.CopyOnWriteSlice[Listener[T]]
}

func (self *Value[T]) Store(value T) {
self.lock.Lock()
defer self.lock.Unlock()

first := !self.initialized
old := self.value.Swap(value)

if first || old != value {
for _, l := range self.listeners.Value() {
l.NotifyChanged(first, old, value)
}
}

if first {
self.initialized = true
close(self.notifyInitialized)
}
}

func (self *Value[T]) Load() T {
return self.value.Load()
}

func (self *Value[T]) AddListener(listener Listener[T]) {
self.lock.Lock()
defer self.lock.Unlock()

self.listeners.Append(listener)

if self.initialized {
listener.NotifyChanged(true, self.Load(), self.Load())
}
}

func (self *Value[T]) RemoveListener(listener Listener[T]) {
self.lock.Lock()
defer self.lock.Unlock()

self.listeners.Delete(listener)
}

func (self *Value[T]) GetInitNotifyChannel() <-chan struct{} {
return self.notifyInitialized
}
112 changes: 79 additions & 33 deletions common/router_data_model.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"encoding/json"
"fmt"
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/michaelquigley/pfxlog"
"github.com/openziti/foundation/v2/concurrenz"
"github.com/openziti/ziti/common/pb/edge_ctrl_pb"
Expand All @@ -49,36 +50,36 @@ type DataStateIdentity = edge_ctrl_pb.DataState_Identity
type Identity struct {
*DataStateIdentity
ServicePolicies map[string]struct{} `json:"servicePolicies"`
IdentityIndex uint64
ServiceSetIndex uint64
identityIndex uint64
serviceSetIndex uint64
}

type DataStateConfigType = edge_ctrl_pb.DataState_ConfigType

type ConfigType struct {
*DataStateConfigType
Index uint64
index uint64
}

type DataStateConfig = edge_ctrl_pb.DataState_Config

type Config struct {
*DataStateConfig
Index uint64
index uint64
}

type DataStateService = edge_ctrl_pb.DataState_Service

type Service struct {
*DataStateService
Index uint64
index uint64
}

type DataStatePostureCheck = edge_ctrl_pb.DataState_PostureCheck

type PostureCheck struct {
*DataStatePostureCheck
Index uint64
index uint64
}

type DataStateServicePolicy = edge_ctrl_pb.DataState_ServicePolicy
Expand Down Expand Up @@ -354,14 +355,14 @@ func (rdm *RouterDataModel) HandleIdentityEvent(index uint64, event *edge_ctrl_p
identity = &Identity{
DataStateIdentity: model.Identity,
ServicePolicies: map[string]struct{}{},
IdentityIndex: index,
identityIndex: index,
}
} else {
identity = &Identity{
DataStateIdentity: model.Identity,
ServicePolicies: valueInMap.ServicePolicies,
IdentityIndex: index,
ServiceSetIndex: valueInMap.ServiceSetIndex,
identityIndex: index,
serviceSetIndex: valueInMap.serviceSetIndex,
}
}
return identity
Expand All @@ -381,10 +382,13 @@ func (rdm *RouterDataModel) HandleIdentityEvent(index uint64, event *edge_ctrl_p
func (rdm *RouterDataModel) HandleServiceEvent(index uint64, event *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_Service) {
if event.Action == edge_ctrl_pb.DataState_Delete {
rdm.Services.Remove(model.Service.Id)
rdm.ServicePolicies.IterCb(func(key string, v *ServicePolicy) {
delete(v.Services, model.Service.Id)
})
} else {
rdm.Services.Set(model.Service.Id, &Service{
DataStateService: model.Service,
Index: index,
index: index,
})
}
}
Expand All @@ -398,7 +402,7 @@ func (rdm *RouterDataModel) HandleConfigTypeEvent(index uint64, event *edge_ctrl
} else {
rdm.ConfigTypes.Set(model.ConfigType.Id, &ConfigType{
DataStateConfigType: model.ConfigType,
Index: index,
index: index,
})
}
}
Expand All @@ -412,12 +416,12 @@ func (rdm *RouterDataModel) HandleConfigEvent(index uint64, event *edge_ctrl_pb.
} else {
rdm.Configs.Set(model.Config.Id, &Config{
DataStateConfig: model.Config,
Index: index,
index: index,
})
}
}

func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(event *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_ServicePolicy) {
func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(model *edge_ctrl_pb.DataState_Event_ServicePolicy) {
servicePolicy := model.ServicePolicy
rdm.ServicePolicies.Upsert(servicePolicy.Id, nil, func(exist bool, valueInMap *ServicePolicy, newValue *ServicePolicy) *ServicePolicy {
if valueInMap == nil {
Expand All @@ -436,7 +440,7 @@ func (rdm *RouterDataModel) applyUpdateServicePolicyEvent(event *edge_ctrl_pb.Da
})
}

func (rdm *RouterDataModel) applyDeleteServicePolicyEvent(_ *edge_ctrl_pb.DataState_Event, model *edge_ctrl_pb.DataState_Event_ServicePolicy) {
func (rdm *RouterDataModel) applyDeleteServicePolicyEvent(model *edge_ctrl_pb.DataState_Event_ServicePolicy) {
rdm.ServicePolicies.Remove(model.ServicePolicy.Id)
}

Expand All @@ -447,11 +451,11 @@ func (rdm *RouterDataModel) HandleServicePolicyEvent(event *edge_ctrl_pb.DataSta
pfxlog.Logger().WithField("policyId", model.ServicePolicy.Id).WithField("action", event.Action).Debug("applying service policy event")
switch event.Action {
case edge_ctrl_pb.DataState_Create:
rdm.applyUpdateServicePolicyEvent(event, model)
rdm.applyUpdateServicePolicyEvent(model)
case edge_ctrl_pb.DataState_Update:
rdm.applyUpdateServicePolicyEvent(event, model)
rdm.applyUpdateServicePolicyEvent(model)
case edge_ctrl_pb.DataState_Delete:
rdm.applyDeleteServicePolicyEvent(event, model)
rdm.applyDeleteServicePolicyEvent(model)
}
}

Expand All @@ -464,7 +468,7 @@ func (rdm *RouterDataModel) HandlePostureCheckEvent(index uint64, event *edge_ct
} else {
rdm.PostureChecks.Set(model.PostureCheck.Id, &PostureCheck{
DataStatePostureCheck: model.PostureCheck,
Index: index,
index: index,
})
}
}
Expand Down Expand Up @@ -509,7 +513,7 @@ func (rdm *RouterDataModel) HandleServicePolicyChange(index uint64, model *edge_
} else {
delete(valueInMap.ServicePolicies, model.PolicyId)
}
valueInMap.ServiceSetIndex = index
valueInMap.serviceSetIndex = index
}
return valueInMap
})
Expand Down Expand Up @@ -593,7 +597,9 @@ func (rdm *RouterDataModel) recalculateCachedPublicKeys() {
func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState {
var events []*edge_ctrl_pb.DataState_Event

rdm.EventCache.WhileLocked(func(_ uint64, _ bool) {
var index uint64
rdm.EventCache.WhileLocked(func(currentIndex uint64, _ bool) {
index = currentIndex
rdm.ConfigTypes.IterCb(func(key string, v *ConfigType) {
newEvent := &edge_ctrl_pb.DataState_Event{
Action: edge_ctrl_pb.DataState_Create,
Expand Down Expand Up @@ -717,7 +723,8 @@ func (rdm *RouterDataModel) GetDataState() *edge_ctrl_pb.DataState {
})

return &edge_ctrl_pb.DataState{
Events: events,
Events: events,
EndIndex: index,
}
}

Expand Down Expand Up @@ -1005,18 +1012,48 @@ func (rdm *RouterDataModel) Diff(o *RouterDataModel, sink DiffSink) {
return
}

diffType("configType", rdm.ConfigTypes, o.ConfigTypes, sink)
diffType("config", rdm.Configs, o.Configs, sink)
diffType("identity", rdm.Identities, o.Identities, sink)
diffType("service", rdm.Services, o.Services, sink)
diffType("service-policy", rdm.ServicePolicies, o.ServicePolicies, sink)
diffType("posture-check", rdm.PostureChecks, o.PostureChecks, sink)
diffType("public-keys", rdm.PublicKeys, o.PublicKeys, sink)
diffType("revocations", rdm.Revocations, o.Revocations, sink)
diffType("cached-public-keys", rdm.getPublicKeysAsCmap(), o.getPublicKeysAsCmap(), sink)
diffType("configType", rdm.ConfigTypes, o.ConfigTypes, sink, ConfigType{}, DataStateConfigType{})
diffType("config", rdm.Configs, o.Configs, sink, Config{}, DataStateConfig{})
diffType("identity", rdm.Identities, o.Identities, sink, Identity{}, DataStateIdentity{})
diffType("service", rdm.Services, o.Services, sink, Service{}, DataStateService{})
diffType("service-policy", rdm.ServicePolicies, o.ServicePolicies, sink, ServicePolicy{}, DataStateServicePolicy{})
diffType("posture-check", rdm.PostureChecks, o.PostureChecks, sink, PostureCheck{}, DataStatePostureCheck{})
diffType("public-keys", rdm.PublicKeys, o.PublicKeys, sink, edge_ctrl_pb.DataState_PublicKey{})
diffType("revocations", rdm.Revocations, o.Revocations, sink, edge_ctrl_pb.DataState_Revocation{})
diffMaps("cached-public-keys", rdm.getPublicKeysAsCmap(), o.getPublicKeysAsCmap(), sink, func(a, b crypto.PublicKey) []string {
if a == nil || b == nil {
return []string{fmt.Sprintf("cached public key is nil: orig: %v, dest: %v", a, a)}
}
return nil
})
}

type diffF[T any] func(a, b T) []string

func diffMaps[T any](entityType string, m1, m2 cmap.ConcurrentMap[string, T], sink DiffSink, differ diffF[T]) {
hasMissing := false
m1.IterCb(func(key string, v T) {
v2, exists := m2.Get(key)
if !exists {
sink(entityType, key, DiffTypeSub, "entity missing")
hasMissing = true
} else {
for _, diff := range differ(v, v2) {
sink(entityType, key, DiffTypeMod, diff)
}
}
})

if m1.Count() != m2.Count() || hasMissing {
m2.IterCb(func(key string, v2 T) {
if _, exists := m1.Get(key); !exists {
sink(entityType, key, DiffTypeAdd, "entity unexpected")
}
})
}
}

func diffType[T any](entityType string, m1 cmap.ConcurrentMap[string, T], m2 cmap.ConcurrentMap[string, T], sink DiffSink) {
func diffType[P any, T *P](entityType string, m1 cmap.ConcurrentMap[string, T], m2 cmap.ConcurrentMap[string, T], sink DiffSink, ignoreTypes ...any) {
diffReporter := &compareReporter{
f: func(key string, detail string) {
sink(entityType, key, DiffTypeMod, detail)
Expand All @@ -1032,7 +1069,7 @@ func diffType[T any](entityType string, m1 cmap.ConcurrentMap[string, T], m2 cma
hasMissing = true
} else {
diffReporter.key = key
cmp.Diff(v, v2, adapter)
cmp.Diff(v, v2, cmpopts.IgnoreUnexported(ignoreTypes...), adapter)
}
})

Expand Down Expand Up @@ -1065,7 +1102,16 @@ func (self *compareReporter) Report(result cmp.Result) {
}
if step != nil {
vx, vy := step.Values()
err := fmt.Sprintf("%s mismatch. orig: %s, copy: %s", path.String(), vx.String(), vy.String())
var x any
var y any

if vx.IsValid() {
x = vx.Interface()
}
if vy.IsValid() {
y = vy.Interface()
}
err := fmt.Sprintf("%s mismatch. orig: %v, copy: %v", path.String(), x, y)
self.f(self.key, err)
} else {
self.f(self.key, "programming error, empty path stack")
Expand Down
Loading

0 comments on commit 6191645

Please sign in to comment.