Skip to content

Commit

Permalink
Refactor cluster abstraction: remove Member() and ResolveNodeURL() (#…
Browse files Browse the repository at this point in the history
…1320)

This change is refactoring-only, it does not introduce any functional changes.

It improves the Cluster interface abstraction, two of its functions were used only inside geolocation, so it's better to move it there and keep it package-private.
  • Loading branch information
leszko authored Jul 16, 2024
1 parent ca7f48f commit 10a437c
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 65 deletions.
56 changes: 0 additions & 56 deletions cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,6 @@ import (
"fmt"
"math/rand"
"net"
"net/url"
"path/filepath"
"strconv"
"strings"
"time"
Expand All @@ -23,9 +21,7 @@ import (
type Cluster interface {
Start(ctx context.Context) error
MembersFiltered(filter map[string]string, status, name string) ([]Member, error)
Member(filter map[string]string, status, name string) (Member, error)
MemberChan() chan []Member
ResolveNodeURL(streamURL string) (string, error)
EventChan() <-chan serf.UserEvent
BroadcastEvent(serf.UserEvent) error
}
Expand Down Expand Up @@ -214,63 +210,11 @@ func (c *ClusterImpl) MembersFiltered(filter map[string]string, status, name str
return nodes, nil
}

func (c *ClusterImpl) Member(filter map[string]string, status, name string) (Member, error) {
members, err := c.MembersFiltered(filter, "", name)
if err != nil {
return Member{}, err
}
if len(members) < 1 {
return Member{}, fmt.Errorf("could not find serf member name=%s", name)
}
if len(members) > 1 {
glog.Errorf("found multiple serf members with the same name! this shouldn't happen! name=%s count=%d", name, len(members))
}
if members[0].Status != status {
return Member{}, fmt.Errorf("found serf member name=%s but status=%s (wanted %s)", name, members[0].Status, status)
}

return members[0], nil
}

// Subscribe to changes in the member list. Please only call me once. I only have one channel internally.
func (c *ClusterImpl) MemberChan() chan []Member {
return c.memberCh
}

// Given a dtsc:// or https:// url, resolve the proper address of the node via serf tags
func (c *ClusterImpl) ResolveNodeURL(streamURL string) (string, error) {
return ResolveNodeURL(c, streamURL)
}

// Separated here to be more easily fed mocks for testing
func ResolveNodeURL(c Cluster, streamURL string) (string, error) {
u, err := url.Parse(streamURL)
if err != nil {
return "", err
}
nodeName := u.Host
protocol := u.Scheme

member, err := c.Member(map[string]string{}, "alive", nodeName)
if err != nil {
return "", err
}
addr, has := member.Tags[protocol]
if !has {
glog.V(7).Infof("no tag found, not tag resolving protocol=%s nodeName=%s", protocol, nodeName)
return streamURL, nil
}
u2, err := url.Parse(addr)
if err != nil {
err = fmt.Errorf("node has unparsable tag!! nodeName=%s protocol=%s tag=%s", nodeName, protocol, addr)
glog.Error(err)
return "", err
}
u2.Path = filepath.Join(u2.Path, u.Path)
u2.RawQuery = u.RawQuery
return u2.String(), nil
}

// Subscribe to events broadcaster in the serf cluster. Please only call me once. I only have one channel internally.
func (c *ClusterImpl) EventChan() <-chan serf.UserEvent {
return c.eventCh
Expand Down
52 changes: 50 additions & 2 deletions handlers/geolocation/geolocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"math/rand"
"net/http"
"net/url"
"path/filepath"
"regexp"
"strconv"
"strings"
Expand Down Expand Up @@ -189,7 +190,7 @@ func (c *GeolocationHandlersCollection) RedirectHandler() httprouter.Handle {

rPath := fmt.Sprintf(pathTmpl, fullPlaybackID)
rURL := fmt.Sprintf("%s://%s%s?%s", protocol(r), bestNode, rPath, r.URL.RawQuery)
rURL, err = c.Cluster.ResolveNodeURL(rURL)
rURL, err = c.resolveNodeURL(rURL)
if err != nil {
glog.Errorf("failed to resolve node URL playbackID=%s err=%s", playbackID, err)
w.WriteHeader(http.StatusInternalServerError)
Expand All @@ -213,6 +214,53 @@ func (c *GeolocationHandlersCollection) RedirectHandler() httprouter.Handle {
}
}

// Given a dtsc:// or https:// url, resolve the proper address of the node via serf tags
func (c *GeolocationHandlersCollection) resolveNodeURL(streamURL string) (string, error) {
u, err := url.Parse(streamURL)
if err != nil {
return "", err
}
nodeName := u.Host
protocol := u.Scheme

member, err := c.clusterMember(map[string]string{}, "alive", nodeName)
if err != nil {
return "", err
}
addr, has := member.Tags[protocol]
if !has {
glog.V(7).Infof("no tag found, not tag resolving protocol=%s nodeName=%s", protocol, nodeName)
return streamURL, nil
}
u2, err := url.Parse(addr)
if err != nil {
err = fmt.Errorf("node has unparsable tag!! nodeName=%s protocol=%s tag=%s", nodeName, protocol, addr)
glog.Error(err)
return "", err
}
u2.Path = filepath.Join(u2.Path, u.Path)
u2.RawQuery = u.RawQuery
return u2.String(), nil
}

func (c *GeolocationHandlersCollection) clusterMember(filter map[string]string, status, name string) (cluster.Member, error) {
members, err := c.Cluster.MembersFiltered(filter, "", name)
if err != nil {
return cluster.Member{}, err
}
if len(members) < 1 {
return cluster.Member{}, fmt.Errorf("could not find serf member name=%s", name)
}
if len(members) > 1 {
glog.Errorf("found multiple serf members with the same name! this shouldn't happen! name=%s count=%d", name, len(members))
}
if members[0].Status != status {
return cluster.Member{}, fmt.Errorf("found serf member name=%s but status=%s (wanted %s)", name, members[0].Status, status)
}

return members[0], nil
}

// RedirectConstPathHandler redirects const path into the self catalyst node if it was not yet redirected.
func (c *GeolocationHandlersCollection) RedirectConstPathHandler() httprouter.Handle {
return func(w http.ResponseWriter, r *http.Request, params httprouter.Params) {
Expand Down Expand Up @@ -283,7 +331,7 @@ func playbackIdFor(streamName string) string {
}

func (c *GeolocationHandlersCollection) resolveReplicatedStream(dtscURL string, streamName string) (string, error) {
outURL, err := c.Cluster.ResolveNodeURL(dtscURL)
outURL, err := c.resolveNodeURL(dtscURL)
if err != nil {
glog.Errorf("error finding STREAM_SOURCE: %s", err)
return "push://", nil
Expand Down
10 changes: 3 additions & 7 deletions handlers/geolocation/geolocation_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ var fakeSerfMember = cluster.Member{
"https": fmt.Sprintf("https://%s", closestNodeAddr),
"dtsc": fmt.Sprintf("dtsc://%s", closestNodeAddr),
},
Status: "alive",
}

var prefixes = [...]string{"video", "videorec", "stream", "playback", "vod"}
Expand Down Expand Up @@ -158,15 +159,10 @@ func mockHandlers(t *testing.T) *GeolocationHandlersCollection {
Return("", "", errors.New(""))

mc.EXPECT().
Member(map[string]string{}, "alive", closestNodeAddr).
MembersFiltered(map[string]string{}, gomock.Any(), closestNodeAddr).
AnyTimes().
Return(fakeSerfMember, nil)
Return([]cluster.Member{fakeSerfMember}, nil)

mc.EXPECT().
ResolveNodeURL(gomock.Any()).DoAndReturn(func(streamURL string) (string, error) {
return cluster.ResolveNodeURL(mc, streamURL)
}).
AnyTimes()
coll := GeolocationHandlersCollection{
Balancer: mb,
Cluster: mc,
Expand Down

0 comments on commit 10a437c

Please sign in to comment.