Skip to content

Commit

Permalink
Refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
leszko committed Jul 17, 2024
1 parent 289f822 commit 52c8727
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 35 deletions.
2 changes: 1 addition & 1 deletion api/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewCatalystAPIRouter(cli config.Cli, vodEngine *pipeline.Coordinator, bal b
AccessToken: cli.APIToken,
})
catalystApiHandlers := &handlers.CatalystAPIHandlersCollection{VODEngine: vodEngine}
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi)

router.GET("/ok", withLogging(catalystApiHandlers.Ok()))
router.GET("/healthcheck", withLogging(catalystApiHandlers.Healthcheck()))
Expand Down
2 changes: 1 addition & 1 deletion api/http_internal.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewCatalystAPIRouterInternal(cli config.Cli, vodEngine *pipeline.Coordinato
Server: cli.APIServer,
AccessToken: cli.APIToken,
})
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, c, cli, lapi)
geoHandlers := geolocation.NewGeolocationHandlersCollection(bal, cli, lapi)

spkiPublicKey, _ := crypto.ConvertToSpki(cli.VodDecryptPublicKey)

Expand Down
27 changes: 19 additions & 8 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,10 +182,25 @@ func (c *ClusterImpl) retryJoin(ctx context.Context) {
}

func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name string) ([]Member, error) {
all := c.serf.Members()
nodes := []Member{}
return FilterMembers(toClusterMembers(c.serf.Members()), filter, status, name)
}

func toClusterMembers(members []serf.Member) []Member {
var nodes []Member
for _, member := range members {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
}
return nodes
}

func FilterMembers(all []Member, filter map[string]string, status string, name string) ([]Member, error) {
var nodes []Member
for _, member := range all {
if status != "" && status != member.Status.String() {
if status != "" && status != member.Status {
continue
}
if name != "" && name != member.Name {
Expand All @@ -200,11 +215,7 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str
}
}
if matches {
nodes = append(nodes, Member{
Name: member.Name,
Tags: member.Tags,
Status: member.Status.String(),
})
nodes = append(nodes, member)
}
}
return nodes, nil
Expand Down
30 changes: 5 additions & 25 deletions handlers/geolocation/geolocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ type GeolocationHandlersCollection struct {
streamPullRateLimit *streamPullRateLimit
}

func NewGeolocationHandlersCollection(balancer balancer.Balancer, cluster cluster.Cluster, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection {
func NewGeolocationHandlersCollection(balancer balancer.Balancer, config config.Cli, lapi *api.Client) *GeolocationHandlersCollection {
return &GeolocationHandlersCollection{
Balancer: balancer,
Config: config,
Expand Down Expand Up @@ -422,41 +422,21 @@ func (c *GeolocationHandlersCollection) sendPlaybackRequestAsync(playbackID stri

func (c *GeolocationHandlersCollection) membersFiltered(filter map[string]string, status, name string) ([]cluster.Member, error) {
membersEndpoint := fmt.Sprintf("http://%s:7979/api/serf/members", c.Config.MistHost)
resMembers := []cluster.Member{}

resp, err := http.Get(membersEndpoint)
if err != nil {
return resMembers, err
return []cluster.Member{}, err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return resMembers, fmt.Errorf("failed to get members: %s", resp.Status)
return []cluster.Member{}, fmt.Errorf("failed to get members: %s", resp.Status)
}
var members []cluster.Member
if err := json.NewDecoder(resp.Body).Decode(&members); err != nil {
return resMembers, err
return []cluster.Member{}, err
}
for _, member := range members {
if status != "" && status != member.Status {
continue
}
if name != "" && name != member.Name {
continue
}
matches := true
for k, v := range filter {
val, ok := member.Tags[k]
if !ok || val != v {
matches = false
break
}
}
if matches {
members = append(members, member)
}

}
return members, nil
return cluster.FilterMembers(members, filter, status, name)
}

func parsePlus(plusString string) (string, string) {
Expand Down

0 comments on commit 52c8727

Please sign in to comment.