Skip to content

Commit

Permalink
mist-api-connector: Recover etcd session when lease is lost (#90)
Browse files Browse the repository at this point in the history
* mac: Recover etcd session logic

* mac: Shutdown after 2 minutes unable to recover

* mac: Create helper newEtcdSession

* mac: Replace etcdSession while still with the lock

* mac: s/ETCD/etcd/g

It's not an acronym so I think it makes more sense
for it be consistently lowercase.

Actually just found out it stands for "/etc distributed"
:mindblown:

* mac: Stop background loop when app shutdown

* Update internal/app/mistapiconnector/mistapiconnector_app.go

Co-authored-by: Ivan Tivonenko <[email protected]>

* Update internal/app/mistapiconnector/mistapiconnector_app.go

Co-authored-by: Ivan Tivonenko <[email protected]>

* Update internal/app/mistapiconnector/mistapiconnector_app.go

Co-authored-by: Ivan Tivonenko <[email protected]>

* mac: Sync etcd endpoints on startup

* mac: Add timeout to etcd session creation

* mac: Remove shutdown on unrecoverable etcd sess

We better keep trying cause shutting down is not
really an option in the end.

* Revert "mac: Add timeout to etcd session creation"

This reverts commit 0476527.

* mac: Add log before/after timeout-less function

* mac: Fix missing return after handling etcd error

Bug probably came from the consul code that also
didnt have it. Fixed it too.

Co-authored-by: Ivan Tivonenko <[email protected]>
  • Loading branch information
victorges and darkdarkdragon authored Jul 21, 2021
1 parent cf59755 commit ee31a8e
Showing 1 changed file with 118 additions and 25 deletions.
143 changes: 118 additions & 25 deletions internal/app/mistapiconnector/mistapiconnector_app.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ const audioEnabledStreamSuffix = "rec"
const etcdDialTimeout = 5 * time.Second
const etcdAutoSyncInterval = 5 * time.Minute
const etcdSessionTTL = 10 // in seconds
const etcdSessionRecoverBackoff = 3 * time.Second
const etcdSessionRecoverTimeout = 2 * time.Minute

type (
// IMac creates new Mist API Connector application
Expand All @@ -62,6 +64,11 @@ type (
APIServer string
}

etcdRevData struct {
revision int64
entries []string
}

mac struct {
opts *MacOptions
mapi *mist.API
Expand All @@ -82,7 +89,7 @@ type (
useEtcd bool
etcdClient *clientv3.Client
etcdSession *concurrency.Session
etcdPub2rev map[string]int64 // public key to revision of ETCD keys
etcdPub2rev map[string]etcdRevData // public key to revision of etcd keys
}
)

Expand Down Expand Up @@ -119,17 +126,22 @@ func NewMac(mistHost string, mapi *mist.API, lapi *livepeer.API, balancerHost st
DialOptions: []grpc.DialOption{grpc.WithBlock()},
})
if err != nil {
err = fmt.Errorf("mist-api-connector: Error connecting ETCD err=%w", err)
err = fmt.Errorf("mist-api-connector: Error connecting etcd err=%w", err)
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), etcdDialTimeout)
err = cli.Sync(ctx)
cancel()
if err != nil {
err = fmt.Errorf("mist-api-connector: Error syncing etcd endpoints err=%w", err)
return nil, err
}
sess, err = concurrency.NewSession(cli, concurrency.WithTTL(etcdSessionTTL))
sess, err = newEtcdSession(cli)
if err != nil {
err = fmt.Errorf("mist-api-connector: Error creating ETCD session err=%w", err)
return nil, err
}
glog.Info("etcd got lease %d", sess.Lease())
}
return &mac{
mc := &mac{
mistHot: mistHost,
mapi: mapi,
lapi: lapi,
Expand All @@ -145,9 +157,11 @@ func NewMac(mistHost string, mapi *mist.API, lapi *livepeer.API, balancerHost st
useEtcd: useEtcd,
etcdClient: cli,
etcdSession: sess,
etcdPub2rev: make(map[string]int64), // public key to revision of ETCD keys
etcdPub2rev: make(map[string]etcdRevData), // public key to revision of etcd keys
srvShutCh: make(chan error),
}, nil
}
go mc.recoverSessionLoop()
return mc, nil
}

// LivepeerProfiles2MistProfiles converts Livepeer's API profiles to Mist's ones
Expand Down Expand Up @@ -495,15 +509,16 @@ func (mc *mac) handleDefaultStreamTrigger(w http.ResponseWriter, r *http.Request
glog.Errorf("Error creating Traefik rule err=%v", err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("false"))
return
}
}
if mc.useEtcd {
// now create routing rule in the ETCD for HLS playback
// now create routing rule in the etcd for HLS playback
if mc.baseStreamName != "" {
wildcardPlaybackID := mc.wildcardPlaybackID(stream)
playbackID := mc.consulPrefix + stream.PlaybackID
serviceName := mc.consulPrefix + serviceNameFromMistURL(mc.mistURL)
err = mc.putEtcdKeys(stream.PlaybackID,
err = mc.putEtcdKeys(mc.etcdSession, stream.PlaybackID,
traefikKeyPathRouters+playbackID+"/rule",
fmt.Sprintf(traefikRuleTemplateDouble, mc.playbackDomain, stream.PlaybackID, wildcardPlaybackID),
traefikKeyPathRouters+playbackID+"/service",
Expand All @@ -526,7 +541,7 @@ func (mc *mac) handleDefaultStreamTrigger(w http.ResponseWriter, r *http.Request
"false",
)
} else {
err = mc.putEtcdKeys(
err = mc.putEtcdKeys(mc.etcdSession,
stream.PlaybackID,
traefikKeyPathRouters+streamKey+"/rule",
fmt.Sprintf(traefikRuleTemplate, mc.playbackDomain, streamKey),
Expand All @@ -539,9 +554,10 @@ func (mc *mac) handleDefaultStreamTrigger(w http.ResponseWriter, r *http.Request
)
}
if err != nil {
glog.Errorf("Error creating ETCD Traefik rule for playbackID=%s streamID=%s err=%v", stream.PlaybackID, stream.ID, err)
glog.Errorf("Error creating etcd Traefik rule for playbackID=%s streamID=%s err=%v", stream.PlaybackID, stream.ID, err)
w.WriteHeader(http.StatusInternalServerError)
w.Write([]byte("false"))
return
}
}
w.Write([]byte(responseURL))
Expand All @@ -551,15 +567,15 @@ func (mc *mac) handleDefaultStreamTrigger(w http.ResponseWriter, r *http.Request
}

// putEtcdKeys puts keys in one transaction
func (mc *mac) putEtcdKeys(playbackID string, kvs ...string) error {
func (mc *mac) putEtcdKeys(sess *concurrency.Session, playbackID string, kvs ...string) error {
if len(kvs) == 0 || len(kvs)%2 != 0 {
return errors.New("number of arguments should be even")
}
cmp := clientv3.Compare(clientv3.CreateRevision(kvs[0]), ">", -1) // basically noop - will always be true
thn := make([]clientv3.Op, 0, len(kvs)/2)
get := clientv3.OpGet(kvs[0])
for i := 0; i < len(kvs); i += 2 {
thn = append(thn, clientv3.OpPut(kvs[i], kvs[i+1], clientv3.WithLease(mc.etcdSession.Lease())))
thn = append(thn, clientv3.OpPut(kvs[i], kvs[i+1], clientv3.WithLease(sess.Lease())))
}
ctx, cancel := context.WithTimeout(context.Background(), etcdDialTimeout)
resp, err := mc.etcdClient.Txn(ctx).If(cmp).Then(thn...).Else(get).Commit()
Expand All @@ -571,8 +587,8 @@ func (mc *mac) putEtcdKeys(playbackID string, kvs ...string) error {
if !resp.Succeeded {
panic("unexpected")
}
glog.Infof("for playbackID=%s created %d keys in ETCD revision=%d", playbackID, len(kvs)/2, resp.Header.Revision)
mc.etcdPub2rev[playbackID] = resp.Header.Revision
glog.Infof("for playbackID=%s created %d keys in etcd revision=%d", playbackID, len(kvs)/2, resp.Header.Revision)
mc.etcdPub2rev[playbackID] = etcdRevData{resp.Header.Revision, kvs}
return nil
}

Expand All @@ -581,7 +597,7 @@ func (mc *mac) deleteEtcdKeys(playbackID string) {
if rev, ok := mc.etcdPub2rev[playbackID]; ok {
pathKey := traefikKeyPathRouters + etcdPlaybackID
ruleKey := pathKey + "/rule"
cmp := clientv3.Compare(clientv3.ModRevision(ruleKey), "=", rev)
cmp := clientv3.Compare(clientv3.ModRevision(ruleKey), "=", rev.revision)
ctx, cancel := context.WithTimeout(context.Background(), etcdDialTimeout)
thn := []clientv3.Op{
clientv3.OpDelete(pathKey, clientv3.WithRange(pathKey+"~")),
Expand Down Expand Up @@ -610,10 +626,82 @@ func (mc *mac) deleteEtcdKeys(playbackID string) {
playbackID, rev, curRev, pathKey)
}
} else {
glog.Errorf("mist-api-connector: ETCD revision for stream playbackID=%s not found", playbackID)
glog.Errorf("mist-api-connector: etcd revision for stream playbackID=%s not found", playbackID)
}
}

func (mc *mac) recoverSessionLoop() {
clientCtx := mc.etcdClient.Ctx()
for clientCtx.Err() == nil {
select {
case <-clientCtx.Done():
// client closed, which means app shutted down
return
case <-mc.etcdSession.Done():
}
glog.Infof("etcd session with lease=%d is lost, trying to recover", mc.etcdSession.Lease())

ctx, cancel := context.WithTimeout(clientCtx, etcdSessionRecoverTimeout)
err := mc.recoverEtcdSession(ctx)
cancel()

if err != nil && clientCtx.Err() == nil {
glog.Errorf("mist-api-connector: unrecoverable etcd session. err=%q.", err)
return
}
}
}

func (mc *mac) recoverEtcdSession(ctx context.Context) error {
for {
err := mc.recoverEtcdSessionOnce()
if err == nil {
return nil
}

select {
case <-time.After(etcdSessionRecoverBackoff):
glog.Errorf("mist-api-connector: Retrying etcd session recover. err=%q", err)
continue
case <-ctx.Done():
return fmt.Errorf("mist-api-connector: Timeout recovering etcd session err=%w", err)
}
}
}

func (mc *mac) recoverEtcdSessionOnce() error {
sess, err := newEtcdSession(mc.etcdClient)
if err != nil {
return err
}

mc.mu.Lock()
defer mc.mu.Unlock()
for playbackId, rev := range mc.etcdPub2rev {
err := mc.putEtcdKeys(sess, playbackId, rev.entries...)
if err != nil {
sess.Close()
return fmt.Errorf("mist-api-connector: Error re-creating etcd keys. playbackId=%q, err=%w", playbackId, err)
}
}

mc.etcdSession.Close()
mc.etcdSession = sess
glog.Infof("Recovered etcd session. lease=%d", sess.Lease())
return nil
}

func newEtcdSession(etcdClient *clientv3.Client) (*concurrency.Session, error) {
glog.Infof("Starting new etcd session ttl=%d", etcdSessionTTL)
sess, err := concurrency.NewSession(etcdClient, concurrency.WithTTL(etcdSessionTTL))
if err != nil {
glog.Errorf("Failed to start etcd session err=%q", err)
return nil, fmt.Errorf("mist-api-connector: Error creating etcd session err=%w", err)
}
glog.Infof("etcd got lease %d", sess.Lease())
return sess, nil
}

func (mc *mac) wildcardPlaybackID(stream *livepeer.CreateStreamResp) string {
return mc.baseNameForStream(stream) + "+" + stream.PlaybackID
}
Expand Down Expand Up @@ -802,16 +890,21 @@ func (mc *mac) startSignalHandler() {
default:
glog.Infof("Got signal %d, shutting down", gotSig)
}
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
err := mc.srv.Shutdown(ctx)
cancel()
glog.Infof("Done shutting down server with err=%v", err)
// now call /setactve/false on active connections
mc.deactiveAllStreams()
mc.srvShutCh <- err
mc.shutdown()
}()
}

func (mc *mac) shutdown() {
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
err := mc.srv.Shutdown(ctx)
cancel()
glog.Infof("Done shutting down server with err=%v", err)
mc.etcdClient.Close()
// now call /setactve/false on active connections
mc.deactiveAllStreams()
mc.srvShutCh <- err
}

// deactiveAllStreams sends /setactive/false for all the active streams
func (mc *mac) deactiveAllStreams() {
mc.mu.Lock()
Expand Down

0 comments on commit ee31a8e

Please sign in to comment.