diff --git a/cmd/palette_monitor/palette_monitor.go b/cmd/palette_monitor/palette_monitor.go index 91271b42..0eb50675 100644 --- a/cmd/palette_monitor/palette_monitor.go +++ b/cmd/palette_monitor/palette_monitor.go @@ -1,6 +1,7 @@ package main import ( + "context" "flag" "fmt" "os/exec" @@ -10,6 +11,8 @@ import ( "github.com/0xcafed00d/joystick" "github.com/vizicist/palette/engine" + "github.com/reugn/go-quartz/quartz" + midi "gitlab.com/gomidi/midi/v2" _ "gitlab.com/gomidi/midi/v2/drivers/rtmididrv" // autoregisters driver ) @@ -18,22 +21,67 @@ func main() { engine.InitLog("monitor") - pcheck := flag.Bool("engine", true, "Check Engine") + // pcheck := flag.Bool("engine", true, "Check Engine") pjsid := flag.Int("joystick", -1, "Joystick ID") flag.Parse() + engine.LogInfo("HACK - palette_monitor is not checking engine") + /* if *pcheck { engine.LogInfo("monitor is checking the engine.") go checkEngine() } else { engine.LogInfo("monitor is NOT checking the engine.") } + */ go joystickMonitor(*pjsid) go midiMonitor("Logidy UMI3") + go scheduler() + + select {} +} + +func scheduler() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // create scheduler + sched := quartz.NewStdScheduler() + + // async start scheduler + sched.Start(ctx) + + // create jobs + // "0 15 10 * * ?" Fire at 10:15am every day + cronTrigger, _ := quartz.NewCronTrigger("0,15,30,45 * * * * *") + shellJob := quartz.NewShellJob("palette status") + fjob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { + fmt.Printf("hi from function!\n") + return 0, nil + }) + + // request, _ := http.NewRequest(http.MethodGet, "https://worldtimeapi.org/api/timezone/utc", nil) + // curlJob := quartz.NewCurlJob(request) + + // functionJob := quartz.NewFunctionJob(func(_ context.Context) (int, error) { return 42, nil }) + + // register jobs to scheduler + // sched.ScheduleJob(ctx, shellJob, cronTrigger) + sched.ScheduleJob(ctx, fjob, cronTrigger) + sched.ScheduleJob(ctx, shellJob, cronTrigger) + // sched.ScheduleJob(ctx, curlJob, quartz.NewSimpleTrigger(time.Second*7)) + // sched.ScheduleJob(ctx, functionJob, quartz.NewSimpleTrigger(time.Second*5)) + + // stop scheduler + // sched.Stop() + + // wait for all workers to exit + // sched.Wait(ctx) + select {} } diff --git a/data/config/paramdefs.json b/data/config/paramdefs.json index e29c0b76..f9cdfb07 100644 --- a/data/config/paramdefs.json +++ b/data/config/paramdefs.json @@ -198,6 +198,7 @@ "engine.mmtt_xexpand": {"valuetype": "float", "min": "0.5", "max": "2.0", "init": "1.25", "comment": "#" }, "engine.mmtt_yexpand": {"valuetype": "float", "min": "0.5", "max": "2.0", "init": "1.25", "comment": "#" }, "engine.mmtt_zexpand": {"valuetype": "float", "min": "0.5", "max": "10.0", "init": "4.0", "comment": "#" }, +"engine.nats": {"valuetype":"bool", "min":"false", "max":"true", "init":"false", "comment":"#" }, "engine.notifygui": {"valuetype":"bool", "min":"false", "max":"true", "init":"true", "comment":"#" }, "engine.obspath": {"valuetype":"string", "min":"", "max":"", "init":"C:/Program Files/obs-studio/bin/64bit/obs64.exe", "comment":"# OBS executable path" }, "engine.obsstream": {"valuetype":"bool", "min":"false", "max":"true", "init":"false", "comment":"# turn on OBS streaming" }, diff --git a/engine/engine.go b/engine/engine.go index 0428d2f2..963525a2 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -76,6 +76,7 @@ func InitEngine() { TheQuadPro = NewQuadPro() TheMidiIO = NewMidiIO() TheErae = NewErae() + TheNats = NewNats() InitLogTypes() diff --git a/engine/engineapi.go b/engine/engineapi.go index c2a9f119..cfcaf4b1 100644 --- a/engine/engineapi.go +++ b/engine/engineapi.go @@ -368,6 +368,15 @@ func (e *Engine) Set(name string, value string) (err error) { TheProcessManager.processCheckSecs = f } + case "engine.nats": + if !e.loading { + if IsTrueValue(value) { + TheNats.Connect() + } else { + TheNats.Disconnect() + } + } + case "engine.obsstream": if !e.loading { if IsTrueValue(value) { diff --git a/engine/misc.go b/engine/misc.go index 2889d23c..d17fa999 100644 --- a/engine/misc.go +++ b/engine/misc.go @@ -128,6 +128,11 @@ func ConfigDir() string { return filepath.Join(PaletteDataPath(), "config") } +func GetConfigFileData(filename string) ([]byte, error) { + path := ConfigFilePath(filename) + return os.ReadFile(path) +} + func ConfigFilePath(nm string) string { return filepath.Join(ConfigDir(), nm) } @@ -550,7 +555,7 @@ func ArchiveLogs() error { err = ziplogs(logsdir, zippath) if err != nil { - return fmt.Errorf("Archivelogs: err=%s", err) + return fmt.Errorf("archivelogs: err=%s", err) } else { // If archiving is successful, clear the logs return ClearLogs() diff --git a/engine/nats.go b/engine/nats.go new file mode 100644 index 00000000..db867275 --- /dev/null +++ b/engine/nats.go @@ -0,0 +1,309 @@ +package engine + +import ( + "encoding/json" + "fmt" + "log" + "os" + "time" + + "github.com/nats-io/nats.go" +) + +// VizNats xxx +type VizNats struct { + natsConn *nats.Conn +} + +// TheNats is the only one +var TheNats *VizNats + +// PaletteAPISubject xxx +var PaletteAPISubject = "palette.api" + +var time0 = time.Now() + +// Publish sends an asynchronous message via NATS +func Publish(subject string, msg string) { + err := TheNats.Publish(subject, msg) + LogIfError(err) +} + +// PublishCursorEvent xxx +func PublishCursorEvent(ce CursorEvent) { + dt := time.Since(time0) + regionvalue := "" + if ce.Tag != "" { + regionvalue = "\"tag\": \"" + ce.Tag + "\", " + } + params := "{ " + + // "\"tag\": \"" + ce.Tag + "\", " + + "\"cid\": \"" + fmt.Sprintf("%d", ce.Gid) + "\", " + + regionvalue + + "\"ddu\": \"" + ce.Ddu + "\", " + + "\"millisecs\": \"" + fmt.Sprintf("%d", dt.Milliseconds()) + "\", " + + "\"x\": \"" + fmt.Sprintf("%f", ce.Pos.X) + "\", " + + "\"y\": \"" + fmt.Sprintf("%f", ce.Pos.Y) + "\", " + + "\"z\": \"" + fmt.Sprintf("%f", ce.Pos.Z) + "\", " + + "\"area\": \"" + fmt.Sprintf("%f", ce.Area) + "\" }" + + subject := "fromengine.event.cursor" + Publish(subject, params) +} + +// PublishMIDIDeviceEvent xxx +func PublishMIDIDeviceEvent(me MidiEvent) { + dt := time.Since(time0) + // NOTE: we ignore the Timestamp on the MIDIDeviceEvent + // and use our own, so the timestamps are consistent with + // the ones on Cursor events + params := "{ " + + "\"nuid\": \"" + MyNUID() + "\", " + + "\"event\": \"" + "midi" + "\", " + + // "\"timestamp\": \"" + fmt.Sprintf("%d", me.Timestamp) + "\", " + + "\"millisecs\": \"" + fmt.Sprintf("%d", dt.Milliseconds()) + "\", " + + "\"bytes\": \"" + fmt.Sprintf("%v", me.Msg.Bytes()) + "\" }" + + subject := "fromengine.event.midi" + Publish(subject, params) +} + +// PublishSpriteEvent xxx +func PublishSpriteEvent(x, y, z float32) { + params := "{ " + + "\"nuid\": \"" + MyNUID() + "\", " + + "\"x\": \"" + fmt.Sprintf("%f", x) + "\", " + + "\"y\": \"" + fmt.Sprintf("%f", y) + "\", " + + "\"z\": \"" + fmt.Sprintf("%f", z) + "\" }" + + subject := "fromengine.event.sprite" + log.Printf("Publishing %s %s\n", subject, params) + + Publish(subject, params) +} + +/* +// StartVizNats xxx +func StartVizNats() { + err := TheNats.Connect() + if err != nil { + log.Printf("VizNats.Connect: err=%s\n", err) + TheNats.natsConn = nil + } +} +*/ + +// NewVizNats xxx +func NewNats() *VizNats { + return &VizNats{ + natsConn: nil, + } +} + +func (vn *VizNats) Disconnect() { + vn.natsConn = nil +} + +// Connect xxx +func (vn *VizNats) Connect() error { + + if vn.natsConn != nil { + // Already connected + return nil + } + user := os.Getenv("NATS_USER") + password := os.Getenv("NATS_PASSWORD") + url := os.Getenv("NATS_URL") + if url == "" { + url = LocalAddress + } + fullurl := fmt.Sprintf("%s:%s@%s", user, password, url) + + var userCreds = "" // User Credentials File + + // Connect Options. + opts := []nats.Option{nats.Name("Palette hostwin Subscriber")} + opts = setupConnOptions(opts) + + // Use UserCredentials + if userCreds != "" { + opts = append(opts, nats.UserCredentials(userCreds)) + } + + // Keep reconnecting forever + opts = append(opts, nats.MaxReconnects(-1)) + + // Connect to NATS + nc, err := nats.Connect(fullurl, opts...) + if err != nil { + return fmt.Errorf("nats.Connect failed, user=%s err=%s", user, err) + } + vn.natsConn = nc + LogInfo("Successful connect to NATS") + + subscribeTo := "toengine.>" + err = TheNats.Subscribe(subscribeTo, natsRequestHandler) + if err != nil { + return fmt.Errorf("nats.Connect failed, unable to subscribe err=%s",err.Error()) + } + + return vn.Publish("palette.info", "nats.Connect has succeeded") +} + +func natsRequestHandler(msg *nats.Msg) { + data := string(msg.Data) + var err error + var result string + LogInfo("NatsHandler", "subject", msg.Subject, "data", data) + // result, err := executeApiFromJson(data) + response := "" + if err != nil { + LogError(fmt.Errorf("unable to nats api data: %s", data)) + response = ErrorResponse(err) + } else { + response = ResultResponse(result) + } + bytes := []byte(response) + // Send the response. + err = msg.Respond(bytes) + LogIfError(err) +} + +// Request is used for APIs - it blocks waiting for a response and returns the response +func (vn *VizNats) Request(subj, data string, timeout time.Duration) (retdata string, err error) { + if IsLogging("nats") { + log.Printf("VizNats.Request: %s %s\n", subj, data) + } + nc := vn.natsConn + if nc == nil { + return "", fmt.Errorf("unable to communicate with NATS") + } + bytes := []byte(data) + msg, err := nc.Request(subj, bytes, timeout) + if err != nil { + return "", fmt.Errorf("Request: subj=%s err=%s", subj, err) + } + return string(msg.Data), nil +} + +// Publish xxx +func (vn *VizNats) Publish(subj string, msg string) error { + + nc := vn.natsConn + if nc == nil { + return fmt.Errorf("Publish: subject=%s, no connection to nats-server", subj) + } + bytes := []byte(msg) + + if IsLogging("nats") { + log.Printf("Nats.Publish: %s %s\n", subj, msg) + } + + err := nc.Publish(subj, bytes) + LogIfError(err) + nc.Flush() + + if err := nc.LastError(); err != nil { + return err + } + return nil +} + +// Subscribe xxx +func (vn *VizNats) Subscribe(subj string, callback nats.MsgHandler) error { + + if IsLogging("nats") { + log.Printf("VizNats.Subscribe: %s\n", subj) + } + nc := vn.natsConn + if nc == nil { + return fmt.Errorf("Subscribe: subject=%s, no connection to nats-server", subj) + } + _, err := nc.Subscribe(subj, callback) + LogIfError(err) + nc.Flush() + + return nc.LastError() +} + +func (vn *VizNats) Close() { + if vn.natsConn != nil { + vn.natsConn.Close() + vn.natsConn = nil + } +} + +var myNUID = "" + +// MyNUID xxx +func MyNUID() string { + if myNUID == "" { + myNUID = GetNUID() + } + return myNUID +} + +// GetNUID xxx +func GetNUID() string { + bytes, err := GetConfigFileData("nuid.json") + if err != nil { + LogError(err) + return "FakeNUID" + } + var f any + err = json.Unmarshal(bytes, &f) + if err != nil { + LogError(err) + return "FakeNUID" + } + toplevel := f.(map[string]any) + t, ok := toplevel["nuid"] + nuid, ok2 := t.(string) + if !ok || !ok2 { + LogWarn("No nuid in nuid.json") + return "FakeNUID" + } + return nuid + /* + file, err := os.OpenFile(nuidpath, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644) + if err != nil { + log.Printf("InitLogs: Unable to open %s err=%s", nuidpath, err) + return "UnableToOpenNUIDFile" + } + nuid := nuid.Next() + file.WriteString("{\n\t\"nuid\": \"" + nuid + "\"\n}\n") + file.Close() + log.Printf("GetNUID: generated nuid.json for %s\n", nuid) + return nuid + */ +} + +func setupConnOptions(opts []nats.Option) []nats.Option { + totalWait := 10 * time.Minute + reconnectDelay := time.Second + + opts = append(opts, nats.ReconnectWait(reconnectDelay)) + opts = append(opts, nats.MaxReconnects(int(totalWait/reconnectDelay))) + opts = append(opts, nats.DisconnectErrHandler(func(nc *nats.Conn, err error) { + log.Printf("Disconnected due to:%s, will attempt reconnects for %.0fm", err, totalWait.Minutes()) + })) + opts = append(opts, nats.ReconnectHandler(func(nc *nats.Conn) { + log.Printf("Reconnected [%s]", nc.ConnectedUrl()) + })) + opts = append(opts, nats.ClosedHandler(func(nc *nats.Conn) { + log.Printf("nats.ClosedHandler, Exiting: %v", nc.LastError()) + TheNats.natsConn = nil + })) + return opts +} + +func handleDiscover(msg *nats.Msg) { + response := MyNUID() + if IsLogging("api") { + log.Printf("handleDiscover: data=%s reply=%s response=%s\n", string(msg.Data), msg.Reply, response) + } + err := msg.Respond([]byte(response)) + LogIfError(err) +} + +var _ = handleDiscover // to avoid unused error from go-staticcheck diff --git a/go.mod b/go.mod index a2910b88..94c796b1 100644 --- a/go.mod +++ b/go.mod @@ -9,7 +9,9 @@ require ( github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0 github.com/hajimehoshi/ebiten/v2 v2.5.4 github.com/hypebeast/go-osc v0.0.0-20220308234300-cec5a8a1e5f5 + github.com/nats-io/nats.go v1.31.0 github.com/pkg/profile v1.7.0 + github.com/reugn/go-quartz v0.9.0 gitlab.com/gomidi/midi/v2 v2.0.30 go.uber.org/zap v1.23.0 golang.org/x/image v0.6.0 @@ -25,13 +27,17 @@ require ( github.com/gorilla/websocket v1.5.0 // indirect github.com/hashicorp/logutils v1.0.0 // indirect github.com/jezek/xgb v1.1.0 // indirect + github.com/klauspost/compress v1.17.0 // indirect + github.com/nats-io/nkeys v0.4.5 // indirect + github.com/nats-io/nuid v1.0.1 // indirect github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d // indirect go.uber.org/atomic v1.10.0 // indirect go.uber.org/multierr v1.8.0 // indirect + golang.org/x/crypto v0.6.0 // indirect golang.org/x/exp/shiny v0.0.0-20230210204819-062eb4c674ab // indirect golang.org/x/mobile v0.0.0-20230301163155-e0f57694e12c // indirect golang.org/x/sync v0.2.0 // indirect golang.org/x/sys v0.9.0 // indirect - golang.org/x/text v0.9.0 // indirect + golang.org/x/text v0.13.0 // indirect gopkg.in/alexcesaro/quotedprintable.v3 v3.0.0-20150716171945-2caba252f4dc // indirect ) diff --git a/go.sum b/go.sum index b2a80b4e..e8ee68fa 100644 --- a/go.sum +++ b/go.sum @@ -36,6 +36,14 @@ github.com/hypebeast/go-osc v0.0.0-20220308234300-cec5a8a1e5f5/go.mod h1:lqMjoCs github.com/ianlancetaylor/demangle v0.0.0-20210905161508-09a460cdf81d/go.mod h1:aYm2/VgdVmcIU8iMfdMvDMsRAQjcfZSKFby6HOFvi/w= github.com/jezek/xgb v1.1.0 h1:wnpxJzP1+rkbGclEkmwpVFQWpuE2PUGNUzP8SbfFobk= github.com/jezek/xgb v1.1.0/go.mod h1:nrhwO0FX/enq75I7Y7G8iN1ubpSGZEiA3v9e9GyRFlk= +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d h1:VhgPp6v9qf9Agr/56bj7Y/xa04UccTW04VP0Qed4vnQ= github.com/nu7hatch/gouuid v0.0.0-20131221200532-179d4d0c4d8d/go.mod h1:YUTz3bUH2ZwIWBy3CJBeOBEugqcmXREj14T+iG/4k4U= github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I= @@ -43,6 +51,8 @@ github.com/pkg/profile v1.7.0 h1:hnbDkaNWPCLMO9wGLdBFTIZvzDrDfBM2072E1S9gJkA= github.com/pkg/profile v1.7.0/go.mod h1:8Uer0jas47ZQMJ7VD+OHknK4YDY07LPUC6dEvqDjvNo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= +github.com/reugn/go-quartz v0.9.0 h1:JPaxyWx6YtNG3020DuT/LXRqishMokWwpMiLfiTWGJg= +github.com/reugn/go-quartz v0.9.0/go.mod h1:no4ktgYbAAuY0E1SchR8cTx1LF4jYIzdgaQhzRPSkpk= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= @@ -63,6 +73,8 @@ go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= golang.org/x/exp/shiny v0.0.0-20230210204819-062eb4c674ab h1:y2uli2x184LeiyUzNzvO4v2mrfH6T77bf4M/Ef7H9m8= golang.org/x/exp/shiny v0.0.0-20230210204819-062eb4c674ab/go.mod h1:UH99kUObWAZkDnWqppdQe5ZhPYESUw8I0zVV1uWBR+0= golang.org/x/image v0.6.0 h1:bR8b5okrPI3g/gyZakLZHeWxAR8Dn5CyxXv1hLH5g/4= @@ -97,8 +109,8 @@ golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ= golang.org/x/text v0.3.7/go.mod h1:u+2+/6zg+i71rQMx5EYifcz6MCKuco9NR6JIITiCfzQ= golang.org/x/text v0.7.0/go.mod h1:mrYo+phRRbMaCq/xk9113O4dZlRixOauAjOtrjsXDZ8= golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= -golang.org/x/text v0.9.0 h1:2sjJmO8cDvYveuX97RDLsxlyUxLl+GHoLxBiRdHllBE= -golang.org/x/text v0.9.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= +golang.org/x/text v0.13.0 h1:ablQoSUd0tRdKxZewP80B+BaqeKJuVhuRxj/dkrun3k= +golang.org/x/text v0.13.0/go.mod h1:TvPlkZtksWOMsz7fbANvkp4WM8x/WCo/om8BMLbz+aE= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo= golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc= diff --git a/release/palette_7.81_win_setup.exe b/release/palette_7.81_win_setup.exe new file mode 100644 index 00000000..f3353a61 Binary files /dev/null and b/release/palette_7.81_win_setup.exe differ diff --git a/scripts/natsmon.bat b/scripts/natsmon.bat index 05030ac8..71188541 100644 --- a/scripts/natsmon.bat +++ b/scripts/natsmon.bat @@ -1 +1 @@ -"%PALETTE%\bin\nats-sub" ">" +"%PALETTE%\bin\nats" subscribe ">"