Skip to content

Commit

Permalink
NATS is now always attempted
Browse files Browse the repository at this point in the history
  • Loading branch information
nosuchtim committed Dec 25, 2023
1 parent 5dce5ad commit 9e704a6
Show file tree
Hide file tree
Showing 13 changed files with 82 additions and 90 deletions.
3 changes: 1 addition & 2 deletions data/config/paramdefs.json
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@
"misc.volstyle": {"valuetype":"string", "min":"volstyle", "max":"volstyle", "init":"pressure", "comment":"# Velocity style" },

"global.attractenabled": {"valuetype":"bool", "min":"false", "max":"true", "init":"false", "comment":"# enable attract mode" },
"global.attractidlesecs": {"valuetype":"int", "min":"10", "max":"3600", "init":"0", "comment":"# Attract timeout" },
"global.attractidlesecs": {"valuetype":"int", "min":"20", "max":"3600", "init":"40", "comment":"# Attract timeout" },
"global.attractchangeinterval": {"valuetype":"int", "min":"10", "max":"3600", "init":"30", "comment":"# Attract change interval" },
"global.attractgesturenumsteps": {"valuetype":"int", "min":"1", "max":"256", "init":"32", "comment":"#" },
"global.attractgestureduration": {"valuetype": "float", "min": "0.1", "max": "20.0", "init": "2.0", "comment": "# in seconds" },
Expand Down Expand Up @@ -196,7 +196,6 @@
"global.mmtt_xexpand": {"valuetype": "float", "min": "0.5", "max": "2.0", "init": "1.25", "comment": "#" },
"global.mmtt_yexpand": {"valuetype": "float", "min": "0.5", "max": "2.0", "init": "1.25", "comment": "#" },
"global.mmtt_zexpand": {"valuetype": "float", "min": "0.5", "max": "10.0", "init": "4.0", "comment": "#" },
"global.nats": {"valuetype":"bool", "min":"false", "max":"true", "init":"false", "comment":"#" },
"global.notifygui": {"valuetype":"bool", "min":"false", "max":"true", "init":"true", "comment":"#" },
"global.obspath": {"valuetype":"string", "min":"", "max":"", "init":"C:/Program Files/obs-studio/bin/64bit/obs64.exe", "comment":"# OBS executable path" },
"global.obsstream": {"valuetype":"bool", "min":"false", "max":"true", "init":"false", "comment":"# turn on OBS streaming" },
Expand Down
7 changes: 5 additions & 2 deletions kit/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ func InitEngine() {

TheEngine = e

EngineSubscribeNats()

for name := range(ParamDefs) {
if strings.HasPrefix(name,"global.") {
ActivateGlobalParam(name)
Expand All @@ -106,11 +108,12 @@ func EngineSubscribeNats() {
err := TheNats.Connect()
LogIfError(err)
if err == nil {
subscribeTo := "to_palette.>"
subscribeTo := fmt.Sprintf("to_palette.%s.>",Hostname())
err = TheNats.Subscribe(subscribeTo, natsRequestHandler)
LogIfError(err)
}
}

func EngineCloseNats() {
TheNats.Disconnect()
}
Expand Down Expand Up @@ -406,7 +409,7 @@ func (e *Engine) NewRecordingPath() (string, error) {
if err != nil {
if os.IsNotExist(err) {
// Try to create it
LogInfo("NewRecordingPath: Creating %s", recdir)
LogInfo("NewRecordingPath: Creating", "recdir", recdir)
err = os.MkdirAll(recdir, os.FileMode(0777))
}
if err != nil {
Expand Down
40 changes: 21 additions & 19 deletions kit/engineapi.go
Original file line number Diff line number Diff line change
Expand Up @@ -292,9 +292,9 @@ func GetInt(value string, i *int64) bool {
func ActivateGlobalParam(name string) {
val, err := GetParam(name)
LogIfError(err)
err = ApplyGlobalParam(name,val)
err = ApplyGlobalParam(name, val)
LogIfError(err)

}

func ApplyGlobalParam(name string, value string) (err error) {
Expand All @@ -313,14 +313,16 @@ func ApplyGlobalParam(name string, value string) (err error) {

if strings.HasPrefix(name, "global.process.") {
process := strings.TrimPrefix(name, "global.process.")
if IsTrueValue(value) {
err = TheProcessManager.StartRunning(process)
} else {
err = TheProcessManager.StopRunning(process)
}
if err != nil {
LogError(err)
return err
if TheProcessManager.IsAvailable(process) {
if IsTrueValue(value) {
err = TheProcessManager.StartRunning(process)
} else {
err = TheProcessManager.StopRunning(process)
}
if err != nil {
LogError(err)
return err
}
}
}

Expand Down Expand Up @@ -358,13 +360,13 @@ func ApplyGlobalParam(name string, value string) (err error) {
}

case "global.looping_override":
LogInfo("global.looping_override needs handling")
LogOfType("loop","global.looping_override needs handling")

case "global.looping_fade":
LogInfo("global.looping_fade needs handling")
LogOfType("loop","global.looping_fade needs handling")

case "global.looping_beats":
LogInfo("global.looping_beats needs handling")
LogOfType("loop","global.looping_beats needs handling")

case "global.midithru":
TheRouter.midithru = IsTrueValue(value)
Expand Down Expand Up @@ -400,12 +402,12 @@ func ApplyGlobalParam(name string, value string) (err error) {
TheProcessManager.processCheckSecs = f
}

case "global.nats":
if IsTrueValue(value) {
EngineSubscribeNats()
} else {
EngineCloseNats()
}
// case "global.nats":
// if IsTrueValue(value) {
// EngineSubscribeNats()
// } else {
// EngineCloseNats()
// }

case "global.obsstream":
if IsTrueValue(value) {
Expand Down
2 changes: 1 addition & 1 deletion kit/logic.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (logic *PatchLogic) cursorToNoteOn(ce CursorEvent) *NoteOn {
velocity := logic.cursorToVelocity(ce)
pitch, err := logic.cursorToPitch(ce)
if err != nil {
LogIfError(fmt.Errorf("cursorToNoteOn: no pitch for cursor, ce=%v", ce))
LogError(fmt.Errorf("cursorToNoteOn: no pitch for cursor"),"ce",ce)
return nil
}
LogOfType("cursor", "cursorToNoteOn", "pitch", pitch, "velocity", velocity)
Expand Down
7 changes: 3 additions & 4 deletions kit/misc.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,7 +270,7 @@ func IsTrueValue(value string) bool {
case "off":
return false
default:
LogIfError(fmt.Errorf("IsTrueValue: invalid boolean value (%s), assuming false", value))
LogError(fmt.Errorf("IsTrueValue: invalid boolean value"), "value", value)
return false
}
}
Expand All @@ -297,13 +297,12 @@ func (w *FileWriter) Write(p []byte) (n int, err error) {
s = string(p)
}
// Hack to avoid logging things that resolume always logs
if w.Exe == "resolume" && (
strings.Contains(s, "Logging input channels") ||
if w.Exe == "resolume" && (strings.Contains(s, "Logging input channels") ||
strings.Contains(s, "Logging output channels") ||
strings.Contains(s, "Logging midi input devices") ||
strings.Contains(s, "Internal MIDI") ||
strings.Contains(s, "Sensel MIDI") ||
strings.Contains(s, "Could not find preset") ) {
strings.Contains(s, "Could not find preset")) {
// don't log it
} else {
LogInfo("ExecutableOutput", "exe", w.Exe, "output", s)
Expand Down
80 changes: 26 additions & 54 deletions kit/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package kit
import (
"encoding/json"
"fmt"
"log"
"os"
"time"

Expand Down Expand Up @@ -32,9 +31,10 @@ func NatsApi(cmd string) (result string, err error) {
return retdata, err
}

// Publish sends an asynchronous message via NATS
func Publish(subject string, msg string) {
err := TheNats.Publish(subject, msg)
// PublishFromEngine sends an asynchronous message via NATS
func PublishFromEngine(subject string, msg string) {
fullsubject := fmt.Sprintf("from_engine.%s.%s", Hostname(), subject)
err := TheNats.Publish(fullsubject, msg)
LogIfError(err)
}

Expand All @@ -56,8 +56,7 @@ func PublishCursorEvent(ce CursorEvent) {
"\"z\": \"" + fmt.Sprintf("%f", ce.Pos.Z) + "\", " +
"\"area\": \"" + fmt.Sprintf("%f", ce.Area) + "\" }"

subject := "fromengine.event.cursor"
Publish(subject, params)
PublishFromEngine("event.cursor", params)
}

// PublishMIDIDeviceEvent xxx
Expand All @@ -73,8 +72,7 @@ func PublishMIDIDeviceEvent(me MidiEvent) {
"\"millisecs\": \"" + fmt.Sprintf("%d", dt.Milliseconds()) + "\", " +
"\"bytes\": \"" + fmt.Sprintf("%v", me.Msg.Bytes()) + "\" }"

subject := "fromengine.event.midi"
Publish(subject, params)
PublishFromEngine("event.midi", params)
}

// PublishSpriteEvent xxx
Expand All @@ -85,22 +83,8 @@ func PublishSpriteEvent(x, y, z float32) {
"\"y\": \"" + fmt.Sprintf("%f", y) + "\", " +
"\"z\": \"" + fmt.Sprintf("%f", z) + "\" }"

subject := "fromengine.event.sprite"
log.Printf("Publishing %s %s\n", subject, params)

Publish(subject, params)
}

/*
// StartVizNats xxx
func StartVizNats() {
err := TheNats.Connect()
if err != nil {
log.Printf("VizNats.Connect: err=%s\n", err)
TheNats.natsConn = nil
}
PublishFromEngine("event.sprite", params)
}
*/

// NewVizNats xxx
func NewNats() *VizNats {
Expand All @@ -119,8 +103,10 @@ func (vn *VizNats) Connect() error {

if vn.natsConn != nil {
// Already connected
LogInfo("VisNats.Connect: Already connected!")
return nil
}
LogInfo("VisNats.Connect: about to try to connect")
user := os.Getenv("NATS_USER")
password := os.Getenv("NATS_PASSWORD")
url := os.Getenv("NATS_URL")
Expand All @@ -146,13 +132,16 @@ func (vn *VizNats) Connect() error {
// Connect to NATS
nc, err := nats.Connect(fullurl, opts...)
if err != nil {
return fmt.Errorf("nats.Connect failed, user=%s err=%s", user, err)
vn.natsConn = nil
return fmt.Errorf("nats.Connect failed, user=%s url=%s err=%s", user, url, err)
}
vn.natsConn = nc
LogInfo("Successful connect to NATS")

msg := fmt.Sprintf("Successful connection from hostname = %s", Hostname())
return vn.Publish("palette.info", msg)
date := time.Now().Format("2006-01-02 15:04:05")
msg := fmt.Sprintf("Successful connection from hostname=%s date=%s", Hostname(), date)
PublishFromEngine("connect.info", msg)
return nil
}

func natsRequestHandler(msg *nats.Msg) {
Expand All @@ -174,12 +163,10 @@ func natsRequestHandler(msg *nats.Msg) {

// Request is used for APIs - it blocks waiting for a response and returns the response
func (vn *VizNats) Request(subj, data string, timeout time.Duration) (retdata string, err error) {
if IsLogging("nats") {
log.Printf("VizNats.Request: %s %s\n", subj, data)
}
LogOfType("nats", "VizNats.Request", "subject", subj, "data", data)
nc := vn.natsConn
if nc == nil {
return "", fmt.Errorf("unable to communicate with NATS")
return "", fmt.Errorf("Viznats.Request: no NATS connection")
}
bytes := []byte(data)
msg, err := nc.Request(subj, bytes, timeout)
Expand All @@ -196,13 +183,11 @@ func (vn *VizNats) Publish(subj string, msg string) error {

nc := vn.natsConn
if nc == nil {
return fmt.Errorf("Publish: subject=%s, no connection to nats-server", subj)
return fmt.Errorf("Viznats.Publish: no NATS connection, subject=%s", subj)
}
bytes := []byte(msg)

if IsLogging("nats") {
log.Printf("Nats.Publish: %s %s\n", subj, msg)
}
LogInfo("Nats.Publish", "subject", subj, "msg", msg)

err := nc.Publish(subj, bytes)
LogIfError(err)
Expand All @@ -217,13 +202,11 @@ func (vn *VizNats) Publish(subj string, msg string) error {
// Subscribe xxx
func (vn *VizNats) Subscribe(subj string, callback nats.MsgHandler) error {

// if IsLogging("nats") {
LogInfo("VizNats.Subscribe: %s\n", subj)
// }
LogInfo("VizNats.Subscribe", "subject", subj)

nc := vn.natsConn
if nc == nil {
return fmt.Errorf("Subscribe: subject=%s, no connection to nats-server", subj)
return fmt.Errorf("VizNats.Subscribe: subject=%s, no connection to NATS server", subj)
}
_, err := nc.Subscribe(subj, callback)
LogIfError(err)
Expand Down Expand Up @@ -273,13 +256,13 @@ func GetNUID() string {
/*
file, err := os.OpenFile(nuidpath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
log.Printf("InitLogs: Unable to open %s err=%s", nuidpath, err)
LogWarn("InitLogs: Unable to open", "nuidpath", nuidpath, "err", err)
return "UnableToOpenNUIDFile"
}
nuid := nuid.Next()
file.WriteString("{\n\t\"nuid\": \"" + nuid + "\"\n}\n")
file.Close()
log.Printf("GetNUID: generated nuid.json for %s\n", nuid)
LogInfo("GetNUID: generated nuid.json", "nuid", nuid)
return nuid
*/
}
Expand All @@ -291,25 +274,14 @@ func setupConnOptions(opts []nats.Option) []nats.Option {
opts = append(opts, nats.ReconnectWait(reconnectDelay))
opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay)))
opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) {
log.Printf("Disconnected due to:%s, will attempt reconnects for %.0fm", err, totalWait.Minutes())
LogWarn("nats.Disconnected", "rr", err, "waitminutes", totalWait.Minutes())
}))
opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) {
log.Printf("Reconnected [%s]", nc.ConnectedUrl())
LogWarn("nats.Reconnected", "connecturl", nc.ConnectedUrl())
}))
opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) {
log.Printf("nats.ClosedHandler, Exiting: %v", nc.LastError())
LogWarn("nats.ClosedHandler, Exiting", "lasterror", nc.LastError())
TheNats.natsConn = nil
}))
return opts
}

func handleDiscover(msg *nats.Msg) {
response := MyNUID()
if IsLogging("api") {
log.Printf("handleDiscover: data=%s reply=%s response=%s\n", string(msg.Data), msg.Reply, response)
}
err := msg.Respond([]byte(response))
LogIfError(err)
}

var _ = handleDiscover // to avoid unused error from go-staticcheck
2 changes: 1 addition & 1 deletion kit/patch.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ func (patch *Patch) noticeValueChange(paramName string, paramValue string) {
synth := GetSynth(paramValue)
patch.mutex.Lock()
patch.synth = synth
LogInfo("Patch.noticeValueChange: changed synth", "patch", patch.name, "synth.name", synth.name)
// LogInfo("Patch.noticeValueChange: changed synth", "patch", patch.name, "synth.name", synth.name)
patch.mutex.Unlock()
}
}
Expand Down
17 changes: 14 additions & 3 deletions kit/process.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,6 +145,9 @@ func (pm *ProcessManager) CheckAutorestartProcesses() {
defer pm.mutex.Unlock()

for _, process := range ProcessList() {
if ! pm.IsAvailable(process) {
continue
}
runit, err := GetParamBool("global.process." + process)
if err != nil {
LogError(err)
Expand Down Expand Up @@ -304,15 +307,23 @@ func (pm *ProcessManager) GetProcessInfo(process string) (*ProcessInfo, error) {
p, ok := pm.info[process]
if !ok || p == nil {
err := fmt.Errorf("GetProcessInfo: no process info for %s", process)
LogIfError(err)
LogError(err)
return nil, err
}
if p.Exe == "" {
err := fmt.Errorf("GetProcessInfo: no executable info for %s", process)
LogError(err)
}
if p.FullPath == "" {
err := fmt.Errorf("GetProcessInfo: no fullpath info for %s", process)
LogError(err)
}
return p, nil
}

func (pm *ProcessManager) IsAvailable(process string) bool {
p, err := pm.GetProcessInfo(process)
return err == nil && p != nil && p.FullPath != ""
p, ok := pm.info[process]
return ok && p != nil && p.Exe != "" && p.FullPath != ""
}

func (pm *ProcessManager) IsRunning(process string) bool {
Expand Down
Loading

0 comments on commit 9e704a6

Please sign in to comment.