diff --git a/clients/mist_client.go b/clients/mist_client.go index cd782f50d..314be7f25 100644 --- a/clients/mist_client.go +++ b/clients/mist_client.go @@ -37,29 +37,27 @@ type MistAPIClient interface { DeleteStream(streamName string) error NukeStream(streamName string) error StopSessions(streamName string) error - AddTrigger(streamName []string, triggerName string, sync bool) error + AddTrigger(streamName []string, triggerName, triggerCallback string, sync bool) error DeleteTrigger(streamName []string, triggerName string) error GetStreamInfo(streamName string) (MistStreamInfo, error) GetState() (MistState, error) } type MistClient struct { - ApiUrl string - Username string - Password string - HttpReqUrl string - TriggerCallback string - configMu sync.Mutex - cache *cache.Cache + ApiUrl string + Username string + Password string + HttpReqUrl string + configMu sync.Mutex + cache *cache.Cache } -func NewMistAPIClient(user, password, host string, port int, ownURL string) MistAPIClient { +func NewMistAPIClient(user, password, host string, port int) MistAPIClient { mist := &MistClient{ - ApiUrl: fmt.Sprintf("http://%s:%d", host, port), - Username: user, - Password: password, - TriggerCallback: ownURL, - cache: cache.New(defaultCacheExpiration, cacheCleanupInterval), + ApiUrl: fmt.Sprintf("http://%s:%d", host, port), + Username: user, + Password: password, + cache: cache.New(defaultCacheExpiration, cacheCleanupInterval), } return mist } @@ -310,7 +308,7 @@ func (mc *MistClient) StopSessions(streamName string) error { // 3. Add a new trigger (or update the existing one) // 4. Override the triggers // 5. Release the lock -func (mc *MistClient) AddTrigger(streamNames []string, triggerName string, sync bool) error { +func (mc *MistClient) AddTrigger(streamNames []string, triggerName, triggerCallback string, sync bool) error { mc.configMu.Lock() defer mc.configMu.Unlock() @@ -318,7 +316,7 @@ func (mc *MistClient) AddTrigger(streamNames []string, triggerName string, sync if err != nil { return err } - c := commandAddTrigger(streamNames, triggerName, mc.TriggerCallback, triggers, sync) + c := commandAddTrigger(streamNames, triggerName, triggerCallback, triggers, sync) resp, err := mc.sendCommand(c) return validateAddTrigger(streamNames, triggerName, resp, err, sync) } diff --git a/config/cli.go b/config/cli.go index f6550831a..04fefbf22 100644 --- a/config/cli.go +++ b/config/cli.go @@ -37,7 +37,6 @@ type Cli struct { MistCleanup bool LogSysUsage bool AMQPURL string - OwnHost string OwnRegion string OwnRegionTagAdjust int APIToken string @@ -88,7 +87,6 @@ type Cli struct { SerfQueueSize int SerfEventBuffer int SerfMaxQueueDepth int - SerfUserEventCallback string } // Return our own URL for callback trigger purposes @@ -99,9 +97,6 @@ func (cli *Cli) OwnInternalURL() string { if ip.IsUnspecified() { host = "127.0.0.1" } - if cli.OwnHost != "" { - host = cli.OwnHost - } addr := net.JoinHostPort(host, port) return fmt.Sprintf("http://%s", addr) } @@ -169,14 +164,10 @@ func (cli *Cli) ParseLegacyEnv() { func AddrFlag(fs *flag.FlagSet, dest *string, name, value, usage string) { *dest = value fs.Func(name, usage, func(s string) error { - host, _, err := net.SplitHostPort(s) + _, _, err := net.SplitHostPort(s) if err != nil { return err } - ip := net.ParseIP(host) - if ip == nil { - return fmt.Errorf("invalid address: %s", s) - } *dest = s return nil }) diff --git a/handlers/misttriggers/trigger_broker.go b/handlers/misttriggers/trigger_broker.go index 9eae85844..ff40cc7b8 100644 --- a/handlers/misttriggers/trigger_broker.go +++ b/handlers/misttriggers/trigger_broker.go @@ -29,7 +29,7 @@ import ( // handler for these sorts of triggers. type TriggerBroker interface { - SetupMistTriggers(clients.MistAPIClient) error + SetupMistTriggers(clients.MistAPIClient, string) error OnStreamBuffer(func(context.Context, *StreamBufferPayload) error) TriggerStreamBuffer(context.Context, *StreamBufferPayload) @@ -87,9 +87,9 @@ var triggers = map[string]bool{ TRIGGER_STREAM_SOURCE: true, } -func (b *triggerBroker) SetupMistTriggers(mist clients.MistAPIClient) error { +func (b *triggerBroker) SetupMistTriggers(mist clients.MistAPIClient, triggerCallback string) error { for name, sync := range triggers { - err := mist.AddTrigger([]string{}, name, sync) + err := mist.AddTrigger([]string{}, name, triggerCallback, sync) if err != nil { return fmt.Errorf("error setting up mist trigger trigger=%s error=%w", name, err) } diff --git a/main.go b/main.go index a96fa6497..a21008d9c 100644 --- a/main.go +++ b/main.go @@ -54,7 +54,7 @@ func main() { // listen addresses config.AddrFlag(fs, &cli.HTTPAddress, "http-addr", "0.0.0.0:8989", "Address to bind for external-facing Catalyst HTTP handling") - config.AddrFlag(fs, &cli.HTTPInternalAddress, "http-internal-addr", "127.0.0.1:7979", "Address to bind for internal privileged HTTP commands") + config.AddrFlag(fs, &cli.HTTPInternalAddress, "http-internal-addr", "127.0.0.1:7979", "Address to bind for internal privileged HTTP commands or the address of catalyst-api service if run separately ('api-only' mode)") config.AddrFlag(fs, &cli.ClusterAddress, "cluster-addr", "0.0.0.0:9935", "Address to bind Serf network listeners to. To use an IPv6 address, specify [::1] or [::1]:7946.") fs.StringVar(&cli.ClusterAdvertiseAddress, "cluster-advertise-addr", "", "Address to advertise to the other cluster members") @@ -97,7 +97,6 @@ func main() { fs.StringVar(&cli.MistBaseStreamName, "mist-base-stream-name", "video", "Base stream name to be used in wildcard-based routing scheme") fs.StringVar(&cli.APIServer, "api-server", "", "Livepeer API server to use") fs.StringVar(&cli.AMQPURL, "amqp-url", "", "RabbitMQ url") - fs.StringVar(&cli.OwnHost, "own-host", "", "Own URL under which the given catalyst-api is accessible") fs.StringVar(&cli.OwnRegion, "own-region", "", "Identifier of the region where the service is running, used for mapping external data back to current region") fs.IntVar(&cli.OwnRegionTagAdjust, "own-region-tag-adjust", 1000, "Bonus weight for 'own-region' to minimise cross-region redirects done by mist load balancer (MistUtilLoad)") fs.StringVar(&cli.StreamHealthHookURL, "stream-health-hook-url", "http://localhost:3004/api/stream/hook/health", "Address to POST stream health payloads to (response is ignored)") @@ -126,7 +125,6 @@ func main() { fs.IntVar(&cli.SerfQueueSize, "serf-queue-size", 100000, "Size of internal serf queue before messages are dropped") fs.IntVar(&cli.SerfEventBuffer, "serf-event-buffer", 100000, "Size of serf 'recent event' buffer, outside of which things are dropped") fs.IntVar(&cli.SerfMaxQueueDepth, "serf-max-queue-depth", 100000, "Size of Serf queue, outside of which things are dropped") - fs.StringVar(&cli.SerfUserEventCallback, "serf-user-event-callback", "http://127.0.0.1:7979/api/serf/receiveUserEvent", "URL to forward serf user events") fs.StringVar(&cli.EnableAnalytics, "analytics", "disabled", "Enables analytics API: enabled or disabled") fs.StringVar(&cli.KafkaBootstrapServers, "kafka-bootstrap-servers", "", "URL of Kafka Bootstrap Servers") fs.StringVar(&cli.KafkaUser, "kafka-user", "", "Kafka Username") @@ -260,21 +258,6 @@ func main() { broker = misttriggers.NewTriggerBroker() - if cli.MistEnabled { - ownURL := fmt.Sprintf("%s/api/mist/trigger", cli.OwnInternalURL()) - mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort, ownURL) - if cli.MistTriggerSetup { - err := broker.SetupMistTriggers(mist) - if err != nil { - glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.") - glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist") - glog.Fatalf("error setting up Mist triggers err=%s", err) - } - } - } else { - glog.Info("-no-mist flag detected, not initializing Mist stream triggers") - } - if cli.ShouldMapic() { mapic = mistapiconnector.NewMapic(&cli, broker, mist) group.Go(func() error { @@ -285,6 +268,21 @@ func main() { mistBalancer = mist_balancer.NewRemoteBalancer(mistBalancerConfig) } + if cli.MistEnabled { + mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort) + if cli.IsClusterMode() && cli.MistTriggerSetup { + receiveMistTriggerURL := fmt.Sprintf("%s/api/mist/trigger", cli.OwnInternalURL()) + err := broker.SetupMistTriggers(mist, receiveMistTriggerURL) + if err != nil { + glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.") + glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist") + glog.Fatalf("error setting up Mist triggers err=%s", err) + } + } + } else { + glog.Info("-no-mist flag detected, not initializing Mist stream triggers") + } + if cli.IsClusterMode() { // Start cron style apps to run periodically if cli.ShouldMistCleanup() { @@ -350,7 +348,8 @@ func main() { if cli.IsClusterMode() { group.Go(func() error { - return handleClusterEvents(ctx, cli.SerfUserEventCallback, c) + serfUserEventCallbackEndpoint := fmt.Sprintf("http://%s/api/serf/receiveUserEvent", cli.OwnInternalURL()) + return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c) }) }