Skip to content

Commit

Permalink
Don't retry NATS by default
Browse files Browse the repository at this point in the history
  • Loading branch information
nosuchtim committed Dec 26, 2023
1 parent 0bf4b58 commit 9adb05d
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 10 deletions.
2 changes: 1 addition & 1 deletion VERSION
Original file line number Diff line number Diff line change
@@ -1 +1 @@
7.90
7.91
86 changes: 77 additions & 9 deletions kit/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,11 @@ import (

// VizNats xxx
type VizNats struct {
natsConn *nats.Conn
natsConn *nats.Conn
enabled bool
isConnected bool
doReconnect bool
attempts int
}

// TheNats is the only one
Expand All @@ -20,6 +24,9 @@ var TheNats *VizNats
var time0 = time.Now()

func NatsApi(cmd string) (result string, err error) {
if !TheNats.enabled {
return "", fmt.Errorf("NatsAPI: NATS not enabled")
}
err = TheNats.Connect()
if err != nil {
LogIfError(err)
Expand All @@ -33,13 +40,20 @@ func NatsApi(cmd string) (result string, err error) {

// PublishFromEngine sends an asynchronous message via NATS
func PublishFromEngine(subject string, msg string) {
if !TheNats.enabled {
// silent, but perhaps you could log it every once in a while
return
}
fullsubject := fmt.Sprintf("from_engine.%s.%s", Hostname(), subject)
err := TheNats.Publish(fullsubject, msg)
LogIfError(err)
}

// PublishCursorEvent xxx
func PublishCursorEvent(ce CursorEvent) {
if !TheNats.enabled {
return // silent
}
dt := time.Since(time0)
regionvalue := ""
if ce.Tag != "" {
Expand All @@ -61,6 +75,9 @@ func PublishCursorEvent(ce CursorEvent) {

// PublishMIDIDeviceEvent xxx
func PublishMIDIDeviceEvent(me MidiEvent) {
if !TheNats.enabled {
return // silent
}
dt := time.Since(time0)
// NOTE: we ignore the Timestamp on the MIDIDeviceEvent
// and use our own, so the timestamps are consistent with
Expand All @@ -77,6 +94,9 @@ func PublishMIDIDeviceEvent(me MidiEvent) {

// PublishSpriteEvent xxx
func PublishSpriteEvent(x, y, z float32) {
if !TheNats.enabled {
return // silent
}
params := "{ " +
"\"nuid\": \"" + MyNUID() + "\", " +
"\"x\": \"" + fmt.Sprintf("%f", x) + "\", " +
Expand All @@ -89,7 +109,11 @@ func PublishSpriteEvent(x, y, z float32) {
// NewNats xxx
func NewNats() *VizNats {
return &VizNats{
natsConn: nil,
natsConn: nil,
enabled: true,
isConnected: false,
doReconnect: false,
attempts: 0,
}
}

Expand All @@ -101,12 +125,21 @@ func (vn *VizNats) Disconnect() {
// Connect xxx
func (vn *VizNats) Connect() error {

if !TheNats.enabled {
return fmt.Errorf("VizNats.Connect: called when NATS not enabled")
}
if vn.natsConn != nil {
// Already connected
LogInfo("VisNats.Connect: Already connected!")
return nil
}
LogInfo("VisNats.Connect: about to try to connect")
if !vn.doReconnect && vn.attempts > 0 {
err := fmt.Errorf("VisNats.Connect: doReconnect is false, not attempting another connect")
LogError(err)
return err
}
vn.attempts++
LogInfo("VisNats.Connect: about to try to connect", "attempt", vn.attempts)
user := os.Getenv("NATS_USER")
password := os.Getenv("NATS_PASSWORD")
url := os.Getenv("NATS_URL")
Expand All @@ -115,19 +148,22 @@ func (vn *VizNats) Connect() error {
}
fullurl := fmt.Sprintf("%s:%s@%s", user, password, url)

var userCreds = "" // User Credentials File

// Connect Options.
opts := []nats.Option{nats.Name("Palette hostwin Subscriber")}
opts = setupConnOptions(opts)

// Use UserCredentials
var userCreds = "" // User Credentials File
if userCreds != "" {
opts = append(opts, nats.UserCredentials(userCreds))
}

// Keep reconnecting forever
opts = append(opts, nats.MaxReconnects(-1))
reconnects := -1 // Keep reconnecting forever
if ! vn.doReconnect {
LogInfo("VizNats.Connect: will not attempt to reconnect")
reconnects = 0
}
opts = append(opts, nats.MaxReconnects(reconnects))

// Connect to NATS
nc, err := nats.Connect(fullurl, opts...)
Expand Down Expand Up @@ -163,6 +199,9 @@ func natsRequestHandler(msg *nats.Msg) {

// Request is used for APIs - it blocks waiting for a response and returns the response
func (vn *VizNats) Request(subj, data string, timeout time.Duration) (retdata string, err error) {
if !TheNats.enabled {
return "", fmt.Errorf("VizNats.Request: called when NATS not enabled")
}
LogOfType("nats", "VizNats.Request", "subject", subj, "data", data)
nc := vn.natsConn
if nc == nil {
Expand All @@ -181,6 +220,10 @@ func (vn *VizNats) Request(subj, data string, timeout time.Duration) (retdata st
// Publish xxx
func (vn *VizNats) Publish(subj string, msg string) error {

if !TheNats.enabled {
return fmt.Errorf("VizNats.Publish: called when NATS not enabled")
}

nc := vn.natsConn
if nc == nil {
return fmt.Errorf("Viznats.Publish: no NATS connection, subject=%s", subj)
Expand All @@ -202,6 +245,10 @@ func (vn *VizNats) Publish(subj string, msg string) error {
// Subscribe xxx
func (vn *VizNats) Subscribe(subj string, callback nats.MsgHandler) error {

if !TheNats.enabled {
return fmt.Errorf("VizNats.Subscribe: called when NATS not enabled")
}

LogInfo("VizNats.Subscribe", "subject", subj)

nc := vn.natsConn
Expand All @@ -216,9 +263,21 @@ func (vn *VizNats) Subscribe(subj string, callback nats.MsgHandler) error {
}

func (vn *VizNats) Close() {
if !vn.enabled {
LogError(fmt.Errorf("VisNats.Close: called with NATS not enabled"))
return
}
if vn.doReconnect && vn.attempts > 0 {
LogInfo("VizNats.CLose called, should NOT attempt another connection, NATS is being disabled")
vn.enabled = false
return
}
if vn.natsConn != nil {
vn.natsConn.Close()
vn.natsConn = nil
LogError(fmt.Errorf("VizNats.CLose called"))
} else {
LogError(fmt.Errorf("VizNats.CLose called when natsConn is nil"))
}
}

Expand Down Expand Up @@ -274,14 +333,23 @@ func setupConnOptions(opts []nats.Option) []nats.Option {
opts = append(opts, nats.ReconnectWait(reconnectDelay))
opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay)))
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
LogWarn("nats.Disconnected", "rr", err, "waitminutes", totalWait.Minutes())
TheNats.natsConn = nil
TheNats.enabled = TheNats.doReconnect
LogWarn("nats.Disconnected",
"err", err,
"waitminutes", totalWait.Minutes(),
"doReconnect", TheNats.doReconnect)
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
LogWarn("nats.Reconnected", "connecturl", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
LogWarn("nats.ClosedHandler, Exiting", "lasterror", nc.LastError())
TheNats.natsConn = nil
TheNats.enabled = TheNats.doReconnect
LogWarn("nats.ClosedHandler",
"lasterror", nc.LastError(),
"doReconnect", TheNats.doReconnect)

}))
return opts
}
Binary file added release/palette_7.91_win_setup.exe
Binary file not shown.

0 comments on commit 9adb05d

Please sign in to comment.