Skip to content

Commit

Permalink
record-tester: Remove all fatal logs from m3utester2
Browse files Browse the repository at this point in the history
  • Loading branch information
victorges committed Dec 20, 2021
1 parent 85d96ba commit 7febd3e
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 61 deletions.
113 changes: 65 additions & 48 deletions internal/testers/m3utester2.go
Original file line number Diff line number Diff line change
Expand Up @@ -190,13 +190,11 @@ func (mut *m3utester2) VODStats() model.VODStats {
return vs
}

func (mut *m3utester2) doSavePlaylist() {
func (mut *m3utester2) doSavePlaylist() error {
if mut.savePlayList != nil {
err := ioutil.WriteFile(mut.savePlayListName, mut.savePlayList.Encode().Bytes(), 0644)
if err != nil {
glog.Fatal(err)
}
return ioutil.WriteFile(mut.savePlayListName, mut.savePlayList.Encode().Bytes(), 0644)
}
return nil
}

func (mut *m3utester2) initSave(streamURL, mediaURL string) {
Expand All @@ -221,7 +219,7 @@ func (mut *m3utester2) initSave(streamURL, mediaURL string) {
}

func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, resolution string, u *url.URL, wowzaMode bool, masterDR chan *downloadResult,
sm *segmentsMatcher, latencyResults chan *latencyResult, save, failIfTranscodingStops, statsOnly bool) *m3uMediaStream {
sm *segmentsMatcher, latencyResults chan *latencyResult, save, failIfTranscodingStops, statsOnly bool) (*m3uMediaStream, error) {

ms := &m3uMediaStream{
finite: finite{
Expand All @@ -247,7 +245,7 @@ func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, res
if ms.save {
streamName, mediaStreamName, err := parseMediaURL(u.String())
if err != nil {
glog.Fatal(err)
return nil, err
}
ms.saveDirName = filepath.Join(streamName, mediaStreamName)
ms.savePlayListName = filepath.Join(ms.saveDirName, mediaStreamName+".m3u8")
Expand All @@ -256,14 +254,14 @@ func newM3uMediaStream(ctx context.Context, cancel context.CancelFunc, name, res
}
glog.Infof("Save dir name: '%s', main playlist save name %s", ms.saveDirName, ms.savePlayListName)
mpl, err := m3u8.NewMediaPlaylist(0, 1024)
mpl.MediaType = m3u8.VOD
mpl.Live = false
if err != nil {
panic(err)
return nil, err
}
mpl.MediaType = m3u8.VOD
mpl.Live = false
ms.savePlayList = mpl
}
return ms
return ms, nil
}

func (mut *m3utester2) Stats() model.Stats1 {
Expand Down Expand Up @@ -636,10 +634,17 @@ func (mut *m3utester2) manifestPullerLoop(waitForTarget time.Duration) {
if mut.save {
mut.initSave("", surl)
mut.savePlayList.Append(mediaName+"/"+mediaName+".m3u8", nil, m3u8.VariantParams{Name: mediaName})
mut.doSavePlaylist()
if err := mut.doSavePlaylist(); err != nil {
mut.fatalEnd(err)
return
}
}
stream := newM3uMediaStream(mut.ctx, mut.cancel, mediaName, mres, mut.initialURL, mut.wowzaMode, mut.driftCheckResults, mut.segmentsMatcher, mut.latencyResults,
stream, err := newM3uMediaStream(mut.ctx, mut.cancel, mediaName, mres, mut.initialURL, mut.wowzaMode, mut.driftCheckResults, mut.segmentsMatcher, mut.latencyResults,
mut.save, mut.failIfTranscodingStops, mut.statsOnly)
if err != nil {
mut.fatalEnd(err)
return
}
mut.streams[resolution(mres)] = stream
return
}
Expand Down Expand Up @@ -717,20 +722,28 @@ func (mut *m3utester2) manifestPullerLoop(waitForTarget time.Duration) {
if mut.sourceRes == "" {
mut.sourceRes = ress
}
stream := newM3uMediaStream(mut.ctx, mut.cancel, variant.URI, ress, pvrui, mut.wowzaMode, mut.driftCheckResults,
stream, err := newM3uMediaStream(mut.ctx, mut.cancel, variant.URI, ress, pvrui, mut.wowzaMode, mut.driftCheckResults,
mut.segmentsMatcher, mut.latencyResults, mut.save, mut.failIfTranscodingStops, mut.statsOnly)
if err != nil {
mut.fatalEnd(err)
return
}
mut.streams[res] = stream
if mut.save {
needSavePlaylist = true
_, mediaName, err := parseMediaURL(pvrui.String())
if err != nil {
glog.Fatal(err)
mut.fatalEnd(err)
return
}
mut.savePlayList.Append(mediaName+"/"+mediaName+".m3u8", nil, variant.VariantParams)
}
}
if needSavePlaylist {
mut.doSavePlaylist()
if err := mut.doSavePlaylist(); err != nil {
mut.fatalEnd(err)
return
}
}
time.Sleep(2 * time.Second)
}
Expand Down Expand Up @@ -827,10 +840,12 @@ func (ms *m3uMediaStream) workerLoop(masterDR chan *downloadResult, latencyResul
seg.Duration = dres.task.duration
seg.Title = dres.task.title
if err := ms.insertSegmentToSavePlaylist(dres.task.seqNo, seg); err != nil {
glog.Fatal(err)
ms.fatalEnd(err)
return
}
if err := ioutil.WriteFile(ms.savePlayListName, ms.savePlayList.Encode().Bytes(), 0644); err != nil {
glog.Fatal(err)
ms.fatalEnd(err)
return
}
go func(segFileName, fullpath string, b []byte) {
if err := ioutil.WriteFile(fullpath, b, 0644); err != nil {
Expand Down Expand Up @@ -1002,10 +1017,12 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) {

gpl, plt, err := m3u8.Decode(*bytes.NewBuffer(b), true)
if err != nil {
glog.Fatal(err)
ms.fatalEnd(err)
return
}
if plt != m3u8.MEDIA {
glog.Fatalf("Expecting media playlist, got %d (url=%s)", plt, surl)
ms.fatalEnd(fmt.Errorf("Expecting media playlist, got %d (url=%s)", plt, surl))
return
}
pl := gpl.(*m3u8.MediaPlaylist)
// pl, err := m3u8.NewMediaPlaylist(100, 100)
Expand Down Expand Up @@ -1055,7 +1072,12 @@ func (ms *m3uMediaStream) manifestPullerLoop(wowzaMode bool) {
ms.downloadResults <- &downloadResult{name: segment.URI, seqNo: segSeqNo, status: "200 OK",
duration: time.Duration(segment.Duration * float64(time.Second))}
} else {
ms.downTasks <- downloadTask{baseURL: ms.u, url: segment.URI, seqNo: segSeqNo, title: segment.Title, duration: segment.Duration, appTime: now}
segUrl, err := url.Parse(segment.URI)
if err != nil {
ms.fatalEnd(err)
return
}
ms.downTasks <- downloadTask{baseURL: ms.u, url: segUrl, seqNo: segSeqNo, title: segment.Title, duration: segment.Duration, appTime: now}
ms.segmentsToDownload++
metrics.Census.IncSegmentsToDownload()
}
Expand Down Expand Up @@ -1085,37 +1107,32 @@ func (ms *m3uMediaStream) isFiniteDownloadsFinished() bool {
}

func (ms *m3uMediaStream) insertSegmentToSavePlaylist(seqNo uint64, seg *m3u8.MediaSegment) error {
var err error
err = ms.savePlayList.InsertSegment(seqNo, seg)
if err == m3u8.ErrPlaylistFull {
mpl, err := m3u8.NewMediaPlaylist(0, uint(len(ms.savePlayList.Segments)*2))
if err != nil {
glog.Fatal(err)
}
mpl.TargetDuration = ms.savePlayList.TargetDuration
mpl.SeqNo = ms.savePlayList.SeqNo
mpl.MediaType = m3u8.VOD
mpl.Live = false
for _, oseg := range ms.savePlayList.Segments {
if oseg != nil {
if err = mpl.InsertSegment(oseg.SeqId, oseg); err != nil {
glog.Fatal(err)
}
err := ms.savePlayList.InsertSegment(seqNo, seg)
if err != m3u8.ErrPlaylistFull {
return err
}
mpl, err := m3u8.NewMediaPlaylist(0, uint(len(ms.savePlayList.Segments)*2))
if err != nil {
return err
}
mpl.TargetDuration = ms.savePlayList.TargetDuration
mpl.SeqNo = ms.savePlayList.SeqNo
mpl.MediaType = m3u8.VOD
mpl.Live = false
for _, oseg := range ms.savePlayList.Segments {
if oseg != nil {
if err = mpl.InsertSegment(oseg.SeqId, oseg); err != nil {
return err
}
}
err = ms.savePlayList.InsertSegment(seqNo, seg)
}
return err
return ms.savePlayList.InsertSegment(seqNo, seg)
}

func downloadSegment(task *downloadTask, res chan *downloadResult) {
purl, err := url.Parse(task.url)
if err != nil {
glog.Fatal(err)
}
fsurl := task.url
if !purl.IsAbs() {
fsurl = task.baseURL.ResolveReference(purl).String()
fsurl := task.url.String()
if !task.url.IsAbs() {
fsurl = task.baseURL.ResolveReference(task.url).String()
}
try := 0
for {
Expand Down Expand Up @@ -1166,7 +1183,7 @@ func downloadSegment(task *downloadTask, res chan *downloadResult) {
glog.V(model.DEBUG).Infof("==============>>>>>>>>>>>>> Saving segment %s", sn)
ioutil.WriteFile(sn, b, 0644)
sid := strconv.FormatInt(time.Now().Unix(), 10)
if savedName, service, serr := SaveToExternalStorage(sid+"_"+task.url, b); serr != nil {
if savedName, service, serr := SaveToExternalStorage(sid+"_"+task.url.String(), b); serr != nil {
messenger.SendFatalMessage(fmt.Sprintf("Failure to save segment to %s %v", service, serr))
} else {
messenger.SendMessage(fmt.Sprintf("Segment %s (which can't be parsed) saved to %s %s", task.url, service, savedName))
Expand All @@ -1178,7 +1195,7 @@ func downloadSegment(task *downloadTask, res chan *downloadResult) {
// ioutil.WriteFile(sn, b, 0644)
// glog.V(model.DEBUG).Infof("Download %s result: %s len %d timeStart %s segment duration %s", fsurl, resp.Status, len(b), fsttim, dur)
glog.V(model.DEBUG).Infof("Download %s result: %s len %d timeStart %s segment duration %s took=%s", fsurl, resp.Status, len(b), fsttim, dur, time.Since(start))
res <- &downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url, seqNo: task.seqNo,
res <- &downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url.String(), seqNo: task.seqNo,
videoParseError: verr, startTime: fsttim, duration: dur, mySeqNo: task.mySeqNo, appTime: task.appTime, downloadCompetedAt: completedAt,
downloadStartedAt: start, data: b, task: task,
}
Expand Down
26 changes: 13 additions & 13 deletions internal/testers/mediadownloader.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ type downloadStats struct {

type downloadTask struct {
baseURL *url.URL
url string
url *url.URL
seqNo uint64
title string
duration float64
Expand Down Expand Up @@ -186,13 +186,9 @@ func (md *mediaDownloader) statsFormatted() string {
}

func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan downloadResult) {
purl, err := url.Parse(task.url)
if err != nil {
glog.Fatal(err)
}
fsurl := task.url
if !purl.IsAbs() {
fsurl = md.u.ResolveReference(purl).String()
fsurl := task.url.String()
if !task.url.IsAbs() {
fsurl = md.u.ResolveReference(task.url).String()
}
try := 0
for {
Expand Down Expand Up @@ -250,7 +246,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download
}
}
if md.picartoMode {
fsttim = time.Duration(mistGetTimeFromSegURI(task.url)) * time.Millisecond
fsttim = time.Duration(mistGetTimeFromSegURI(task.url.String())) * time.Millisecond
}
} else {
// add keys
Expand Down Expand Up @@ -319,7 +315,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download

if md.saveSegmentsToDisk {
seg := new(m3u8.MediaSegment)
seg.URI = task.url
seg.URI = task.url.String()
seg.SeqId = task.seqNo
seg.Duration = task.duration
seg.Title = task.title
Expand All @@ -331,7 +327,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download
upts := strings.Split(fsurl, "/")
// fn := upts[len(upts)-2] + "-" + path.Base(task.url)
ind := len(upts) - 2
fn := path.Base(task.url)
fn := path.Base(task.url.String())
if !md.livepeerNameSchema {
// ind = 0
// fn = upts[0]
Expand Down Expand Up @@ -378,7 +374,7 @@ func (md *mediaDownloader) downloadSegment(task *downloadTask, res chan download
glog.V(model.DEBUG).Infof("Segment %s saved to %s", seg.URI, filepath.Join(md.saveDir, fn))
}(fn, fullpath, b)
}
res <- downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url, seqNo: task.seqNo, downloadCompetedAt: now,
res <- downloadResult{status: resp.Status, bytes: len(b), try: try, name: task.url.String(), seqNo: task.seqNo, downloadCompetedAt: now,
videoParseError: verr, startTime: fsttim, duration: dur, mySeqNo: task.mySeqNo, appTime: task.appTime, keyFrames: keyFrames}
return
}
Expand Down Expand Up @@ -560,7 +556,11 @@ func (md *mediaDownloader) manifestDownloadLoop() {
if err == nil {
seqNo = parsedSeq
}
md.downTasks <- downloadTask{url: segment.URI, seqNo: seqNo, title: segment.Title, duration: segment.Duration, mySeqNo: mySeqNo, appTime: now}
segUrl, err := url.Parse(segment.URI)
if err != nil {
glog.Fatal(err)
}
md.downTasks <- downloadTask{url: segUrl, seqNo: seqNo, title: segment.Title, duration: segment.Duration, mySeqNo: mySeqNo, appTime: now}
md.segmentsToDownload++
now = now.Add(time.Millisecond)
// glog.V(model.VERBOSE).Infof("segment %s is of length %f seqId=%d", segment.URI, segment.Duration, segment.SeqId)
Expand Down

0 comments on commit 7febd3e

Please sign in to comment.