Skip to content

Commit

Permalink
Move startup to event based arch
Browse files Browse the repository at this point in the history
  • Loading branch information
NHAS committed Apr 28, 2024
1 parent 6ec5dff commit fa4bde5
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 90 deletions.
119 changes: 80 additions & 39 deletions commands/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"os"
"os/signal"
"strings"
"sync"
"syscall"

"github.com/NHAS/wag/internal/config"
Expand Down Expand Up @@ -97,15 +98,87 @@ func (g *start) Check() error {

}

func teardown() {
router.TearDown(false)
// Tear down Unix socket
server.TearDown()

}

func clusterState(noIptables bool, errorChan chan<- error) func(string) {

// Make sure that node states are sync'd
var lck sync.Mutex
wasDead := true
lastState := ""
return func(stateText string) {
lck.Lock()
defer lck.Unlock()

if lastState != stateText {
log.Println("node entered state: ", stateText)
lastState = stateText
}

switch stateText {
case "dead":
if !wasDead {
log.Println("Tearing down node")

teardown()

log.Println("Tear down complete")

// Only teardown if we were at one point alive
wasDead = true
}
case "healthy":
if wasDead {

if !config.Values.Clustering.Witness {
err := router.Setup(errorChan, !noIptables)
if err != nil {
errorChan <- fmt.Errorf("unable to start router: %v", err)
return
}

err = server.StartControlSocket()
if err != nil {
errorChan <- fmt.Errorf("unable to create control socket: %v", err)
return
}

err = webserver.Start(errorChan)
if err != nil {
errorChan <- fmt.Errorf("unable to start webserver: %v", err)
return
}

err = ui.StartWebServer(errorChan)
if err != nil {
errorChan <- fmt.Errorf("unable to start management web server: %v", err)
return
}
}

wasDead = false
}
}
}
}

func (g *start) Run() error {

var err error
defer func() {
data.TearDown()
}()
defer data.TearDown()

errorChan := make(chan error)

_, err = data.RegisterClusterHealthListener(clusterState(g.noIptables, errorChan))
if err != nil {
return err
}

if config.Values.Clustering.Witness {
log.Println("this node is a witness, and will not start a wireguard device")
}
Expand All @@ -114,40 +187,6 @@ func (g *start) Run() error {
log.Println("Node has successfully joined cluster! This node is currently a learner, and needs to be promoted in the UI before wireguard device will start")
}

if !config.Values.Clustering.Witness && !data.IsLearner() {

err = router.Setup(errorChan, !g.noIptables)
if err != nil {
return fmt.Errorf("unable to start router: %v", err)
}
defer func() {
if !(strings.Contains(err.Error(), "listen unix") && strings.Contains(err.Error(), "address already in use")) {
router.TearDown(false)
}
}()

err = server.StartControlSocket()
if err != nil {
return fmt.Errorf("unable to create control socket: %v", err)
}
defer func() {

if !(strings.Contains(err.Error(), "listen unix") && strings.Contains(err.Error(), "address already in use")) {
server.TearDown()
}
}()

err = webserver.Start(errorChan)
if err != nil {
return fmt.Errorf("unable to start webserver: %v", err)
}

err = ui.StartWebServer(errorChan)
if err != nil {
return fmt.Errorf("unable to start management web server: %v", err)
}
}

go func() {
cancel := make(chan os.Signal, 1)
signal.Notify(cancel, syscall.SIGTERM, syscall.SIGINT, os.Interrupt, syscall.SIGQUIT)
Expand All @@ -159,9 +198,10 @@ func (g *start) Run() error {
os.Exit(1)
}(cancel)

errorChan <- errors.New("ignore me I am signal")

log.Printf("Got signal %s gracefully exiting\n", s)

errorChan <- errors.New("ignore me I am signal")
}()

wagType := "Wag"
Expand All @@ -173,9 +213,10 @@ func (g *start) Run() error {
wagType += " Learner"
}

log.Printf("%s started successfully, Ctrl + C to stop", wagType)
log.Printf("%s starting, Ctrl + C to stop", wagType)

err = <-errorChan
teardown()
if err != nil && !strings.Contains(err.Error(), "ignore me I am signal") {
return err
}
Expand Down
39 changes: 23 additions & 16 deletions internal/data/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ var (
clusterHealthListeners = map[string]func(string){}

EventsQueue = queue.NewQueue(40)

checkState chan bool
exit chan bool
)

func RegisterEventListener[T any](path string, isPrefix bool, f func(key string, current, previous T, et EventType) error) (string, error) {
Expand Down Expand Up @@ -147,7 +150,7 @@ func RegisterClusterHealthListener(f func(status string)) (string, error) {

key, err := generateRandomBytes(16)
if err != nil {
return "", nil
return "", err
}

clusterHealthListeners[key] = f
Expand Down Expand Up @@ -176,31 +179,35 @@ func checkClusterHealth() {
for {

select {

case <-exit:
notifyClusterHealthListeners("dead")
return
case <-etcdServer.Server.LeaderChangedNotify():
notifyHealthy()

case <-time.After(1 * time.Second):
if etcdServer == nil {
return
}

leader := etcdServer.Server.Leader()
if leader == 0 {
notifyClusterHealthListeners("electing")
<-time.After(etcdServer.Server.Cfg.ElectionTimeout() * 2)
leader = etcdServer.Server.Leader()
if leader == 0 {
notifyClusterHealthListeners("dead")
} else {
notifyHealthy()
}
}
testState()

}

}
}

func testState() {
if etcdServer.Server.Leader() == 0 {
notifyClusterHealthListeners("electing")
<-time.After(etcdServer.Server.Cfg.ElectionTimeout() * 2)
if etcdServer.Server.Leader() == 0 {
notifyClusterHealthListeners("dead")
return
}
// Intentional drop through
}

notifyHealthy()
}

func notifyHealthy() {
if etcdServer.Server.IsLearner() {
notifyClusterHealthListeners("learner")
Expand Down
1 change: 1 addition & 0 deletions internal/router/bpf.go
Original file line number Diff line number Diff line change
Expand Up @@ -561,6 +561,7 @@ func RefreshConfiguration() []error {
func SetInactivityTimeout(inactivityTimeoutMinutes int) error {
lock.Lock()
defer lock.Unlock()

return setInactivityTimeout(inactivityTimeoutMinutes)
}

Expand Down
35 changes: 0 additions & 35 deletions internal/router/statemachine.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,6 @@ func handleEvents(erroChan chan<- error) {
return
}

_, err = data.RegisterClusterHealthListener(clusterState(erroChan))
if err != nil {
erroChan <- err
return
}

_, err = data.RegisterEventListener(data.InactivityTimeoutKey, true, inactivityTimeoutChanges)
if err != nil {
erroChan <- err
Expand Down Expand Up @@ -196,32 +190,3 @@ func groupChanges(key string, current []string, previous []string, et data.Event
}
return nil
}

func clusterState(errorsChan chan<- error) func(string) {

hasDied := false
return func(stateText string) {
log.Println("entered state: ", stateText)

switch stateText {
case "dead":
if !hasDied {
hasDied = true
log.Println("Cluster has entered dead state, tearing down: ", hasDied)
TearDown(false)
log.Println("cluster finished tearing down")
}
case "healthy":
if hasDied {
err := Setup(errorsChan, true)
if err != nil {
log.Println("was unable to return wag member to healthy state, dying: ", err)
errorsChan <- err
return
}

hasDied = false
}
}
}
}

0 comments on commit fa4bde5

Please sign in to comment.