Skip to content

Commit

Permalink
Change setting up triggers
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko committed Jul 17, 2024
1 parent 52c8727 commit 6e6a11b
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 48 deletions.
30 changes: 14 additions & 16 deletions clients/mist_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,29 +37,27 @@ type MistAPIClient interface {
DeleteStream(streamName string) error
NukeStream(streamName string) error
StopSessions(streamName string) error
AddTrigger(streamName []string, triggerName string, sync bool) error
AddTrigger(streamName []string, triggerName, triggerCallback string, sync bool) error
DeleteTrigger(streamName []string, triggerName string) error
GetStreamInfo(streamName string) (MistStreamInfo, error)
GetState() (MistState, error)
}

type MistClient struct {
ApiUrl string
Username string
Password string
HttpReqUrl string
TriggerCallback string
configMu sync.Mutex
cache *cache.Cache
ApiUrl string
Username string
Password string
HttpReqUrl string
configMu sync.Mutex
cache *cache.Cache
}

func NewMistAPIClient(user, password, host string, port int, ownURL string) MistAPIClient {
func NewMistAPIClient(user, password, host string, port int) MistAPIClient {
mist := &MistClient{
ApiUrl: fmt.Sprintf("http://%s:%d", host, port),
Username: user,
Password: password,
TriggerCallback: ownURL,
cache: cache.New(defaultCacheExpiration, cacheCleanupInterval),
ApiUrl: fmt.Sprintf("http://%s:%d", host, port),
Username: user,
Password: password,
cache: cache.New(defaultCacheExpiration, cacheCleanupInterval),
}
return mist
}
Expand Down Expand Up @@ -310,15 +308,15 @@ func (mc *MistClient) StopSessions(streamName string) error {
// 3. Add a new trigger (or update the existing one)
// 4. Override the triggers
// 5. Release the lock
func (mc *MistClient) AddTrigger(streamNames []string, triggerName string, sync bool) error {
func (mc *MistClient) AddTrigger(streamNames []string, triggerName, triggerCallback string, sync bool) error {
mc.configMu.Lock()
defer mc.configMu.Unlock()

triggers, err := mc.getCurrentTriggers()
if err != nil {
return err
}
c := commandAddTrigger(streamNames, triggerName, mc.TriggerCallback, triggers, sync)
c := commandAddTrigger(streamNames, triggerName, triggerCallback, triggers, sync)
resp, err := mc.sendCommand(c)
return validateAddTrigger(streamNames, triggerName, resp, err, sync)
}
Expand Down
11 changes: 1 addition & 10 deletions config/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ type Cli struct {
MistCleanup bool
LogSysUsage bool
AMQPURL string
OwnHost string
OwnRegion string
OwnRegionTagAdjust int
APIToken string
Expand Down Expand Up @@ -88,7 +87,6 @@ type Cli struct {
SerfQueueSize int
SerfEventBuffer int
SerfMaxQueueDepth int
SerfUserEventCallback string
}

// Return our own URL for callback trigger purposes
Expand All @@ -99,9 +97,6 @@ func (cli *Cli) OwnInternalURL() string {
if ip.IsUnspecified() {
host = "127.0.0.1"
}
if cli.OwnHost != "" {
host = cli.OwnHost
}
addr := net.JoinHostPort(host, port)
return fmt.Sprintf("http://%s", addr)
}
Expand Down Expand Up @@ -169,14 +164,10 @@ func (cli *Cli) ParseLegacyEnv() {
func AddrFlag(fs *flag.FlagSet, dest *string, name, value, usage string) {
*dest = value
fs.Func(name, usage, func(s string) error {
host, _, err := net.SplitHostPort(s)
_, _, err := net.SplitHostPort(s)
if err != nil {
return err
}
ip := net.ParseIP(host)
if ip == nil {
return fmt.Errorf("invalid address: %s", s)
}
*dest = s
return nil
})
Expand Down
6 changes: 3 additions & 3 deletions handlers/misttriggers/trigger_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import (
// handler for these sorts of triggers.

type TriggerBroker interface {
SetupMistTriggers(clients.MistAPIClient) error
SetupMistTriggers(clients.MistAPIClient, string) error

OnStreamBuffer(func(context.Context, *StreamBufferPayload) error)
TriggerStreamBuffer(context.Context, *StreamBufferPayload)
Expand Down Expand Up @@ -87,9 +87,9 @@ var triggers = map[string]bool{
TRIGGER_STREAM_SOURCE: true,
}

func (b *triggerBroker) SetupMistTriggers(mist clients.MistAPIClient) error {
func (b *triggerBroker) SetupMistTriggers(mist clients.MistAPIClient, triggerCallback string) error {
for name, sync := range triggers {
err := mist.AddTrigger([]string{}, name, sync)
err := mist.AddTrigger([]string{}, name, triggerCallback, sync)
if err != nil {
return fmt.Errorf("error setting up mist trigger trigger=%s error=%w", name, err)
}
Expand Down
37 changes: 18 additions & 19 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func main() {

// listen addresses
config.AddrFlag(fs, &cli.HTTPAddress, "http-addr", "0.0.0.0:8989", "Address to bind for external-facing Catalyst HTTP handling")
config.AddrFlag(fs, &cli.HTTPInternalAddress, "http-internal-addr", "127.0.0.1:7979", "Address to bind for internal privileged HTTP commands")
config.AddrFlag(fs, &cli.HTTPInternalAddress, "http-internal-addr", "127.0.0.1:7979", "Address to bind for internal privileged HTTP commands or the address of catalyst-api service if run separately ('api-only' mode)")
config.AddrFlag(fs, &cli.ClusterAddress, "cluster-addr", "0.0.0.0:9935", "Address to bind Serf network listeners to. To use an IPv6 address, specify [::1] or [::1]:7946.")
fs.StringVar(&cli.ClusterAdvertiseAddress, "cluster-advertise-addr", "", "Address to advertise to the other cluster members")

Expand Down Expand Up @@ -97,7 +97,6 @@ func main() {
fs.StringVar(&cli.MistBaseStreamName, "mist-base-stream-name", "video", "Base stream name to be used in wildcard-based routing scheme")
fs.StringVar(&cli.APIServer, "api-server", "", "Livepeer API server to use")
fs.StringVar(&cli.AMQPURL, "amqp-url", "", "RabbitMQ url")
fs.StringVar(&cli.OwnHost, "own-host", "", "Own URL under which the given catalyst-api is accessible")
fs.StringVar(&cli.OwnRegion, "own-region", "", "Identifier of the region where the service is running, used for mapping external data back to current region")
fs.IntVar(&cli.OwnRegionTagAdjust, "own-region-tag-adjust", 1000, "Bonus weight for 'own-region' to minimise cross-region redirects done by mist load balancer (MistUtilLoad)")
fs.StringVar(&cli.StreamHealthHookURL, "stream-health-hook-url", "http://localhost:3004/api/stream/hook/health", "Address to POST stream health payloads to (response is ignored)")
Expand Down Expand Up @@ -126,7 +125,6 @@ func main() {
fs.IntVar(&cli.SerfQueueSize, "serf-queue-size", 100000, "Size of internal serf queue before messages are dropped")
fs.IntVar(&cli.SerfEventBuffer, "serf-event-buffer", 100000, "Size of serf 'recent event' buffer, outside of which things are dropped")
fs.IntVar(&cli.SerfMaxQueueDepth, "serf-max-queue-depth", 100000, "Size of Serf queue, outside of which things are dropped")
fs.StringVar(&cli.SerfUserEventCallback, "serf-user-event-callback", "http://127.0.0.1:7979/api/serf/receiveUserEvent", "URL to forward serf user events")
fs.StringVar(&cli.EnableAnalytics, "analytics", "disabled", "Enables analytics API: enabled or disabled")
fs.StringVar(&cli.KafkaBootstrapServers, "kafka-bootstrap-servers", "", "URL of Kafka Bootstrap Servers")
fs.StringVar(&cli.KafkaUser, "kafka-user", "", "Kafka Username")
Expand Down Expand Up @@ -260,21 +258,6 @@ func main() {

broker = misttriggers.NewTriggerBroker()

if cli.MistEnabled {
ownURL := fmt.Sprintf("%s/api/mist/trigger", cli.OwnInternalURL())
mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort, ownURL)
if cli.MistTriggerSetup {
err := broker.SetupMistTriggers(mist)
if err != nil {
glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.")
glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist")
glog.Fatalf("error setting up Mist triggers err=%s", err)
}
}
} else {
glog.Info("-no-mist flag detected, not initializing Mist stream triggers")
}

if cli.ShouldMapic() {
mapic = mistapiconnector.NewMapic(&cli, broker, mist)
group.Go(func() error {
Expand All @@ -285,6 +268,21 @@ func main() {
mistBalancer = mist_balancer.NewRemoteBalancer(mistBalancerConfig)
}

if cli.MistEnabled {
mist = clients.NewMistAPIClient(cli.MistUser, cli.MistPassword, cli.MistHost, cli.MistPort)
if cli.IsClusterMode() && cli.MistTriggerSetup {
receiveMistTriggerURL := fmt.Sprintf("%s/api/mist/trigger", cli.OwnInternalURL())
err := broker.SetupMistTriggers(mist, receiveMistTriggerURL)
if err != nil {
glog.Error("catalyst-api was unable to communicate with MistServer to set up its triggers.")
glog.Error("hint: are you trying to boot catalyst-api without Mist for development purposes? use the flag -no-mist")
glog.Fatalf("error setting up Mist triggers err=%s", err)
}
}
} else {
glog.Info("-no-mist flag detected, not initializing Mist stream triggers")
}

if cli.IsClusterMode() {
// Start cron style apps to run periodically
if cli.ShouldMistCleanup() {
Expand Down Expand Up @@ -350,7 +348,8 @@ func main() {

if cli.IsClusterMode() {
group.Go(func() error {
return handleClusterEvents(ctx, cli.SerfUserEventCallback, c)
serfUserEventCallbackEndpoint := fmt.Sprintf("http://%s/api/serf/receiveUserEvent", cli.OwnInternalURL())
return handleClusterEvents(ctx, serfUserEventCallbackEndpoint, c)
})
}

Expand Down

0 comments on commit 6e6a11b

Please sign in to comment.