diff --git a/cmd/api-transcoder/api-transcoder.go b/cmd/api-transcoder/api-transcoder.go index 85cec7ea..867516d8 100644 --- a/cmd/api-transcoder/api-transcoder.go +++ b/cmd/api-transcoder/api-transcoder.go @@ -205,7 +205,7 @@ func transcode(apiKey, apiHost, src, dst string, presets []string, lprofile *liv } } for _, outFile := range outFiles { - if err = outFile.Close(); err != nil { + if err := outFile.Close(); err != nil { return err } } diff --git a/internal/testers/segmenter.go b/internal/testers/segmenter.go index 345ae3a6..e61bfbef 100644 --- a/internal/testers/segmenter.go +++ b/internal/testers/segmenter.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "errors" - "fmt" "io" "time" @@ -34,9 +33,8 @@ func StartSegmentingR(ctx context.Context, reader io.ReadSeekCloser, stopAtFileE glog.Errorf("avutil.OpenRC err=%v", err) return err } - go segmentingLoop(ctx, "", inFile, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out) - - return err + startSegmentingLoop(ctx, "", inFile, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out) + return nil } func StartSegmenting(ctx context.Context, fileName string, stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration, @@ -44,11 +42,11 @@ func StartSegmenting(ctx context.Context, fileName string, stopAtFileEnd bool, s glog.Infof("Starting segmenting file %s", fileName) inFile, err := avutil.Open(fileName) if err != nil { - glog.Fatal(err) + glog.Errorf("avutil.OpenRC err=%v", err) + return err } - go segmentingLoop(ctx, fileName, inFile, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out) - - return err + startSegmentingLoop(ctx, fileName, inFile, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out) + return nil } func createInMemoryTSMuxer() (av.Muxer, *bytes.Buffer) { @@ -97,12 +95,28 @@ func (wt *Walltime) ModifyPacket(pkt *av.Packet, streams []av.CodecData, videoid return } -func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxCloser, stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration, +func startSegmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxCloser, stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration, useWallTime bool, out chan<- *model.HlsSegment) { + go func() { + defer close(out) + err := segmentingLoop(ctx, fileName, inFileReal, stopAtFileEnd, stopAfter, skipFirst, segLen, useWallTime, out) + if err != nil { + glog.Errorf("Error in segmenting loop. err=%+v", err) + select { + case out <- &model.HlsSegment{Err: err}: + case <-ctx.Done(): + } + } + }() +} + +func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxCloser, + stopAtFileEnd bool, stopAfter, skipFirst, segLen time.Duration, + useWallTime bool, out chan<- *model.HlsSegment) error { var err error var streams []av.CodecData - var videoidx, audioidx int8 + streamTypes := map[int8]string{} ts := &timeShifter{} filters := pktque.Filters{ts, &pktque.FixTime{MakeIncrement: true}} @@ -111,22 +125,28 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo } inFile := &pktque.FilterDemuxer{Demuxer: inFileReal, Filter: filters} if streams, err = inFile.Streams(); err != nil { - msg := fmt.Sprintf("Can't get info about file: '%+v', isNoAudio %v isNoVideo %v", err, errors.Is(err, jerrors.ErrNoAudioInfoFound), errors.Is(err, jerrors.ErrNoVideoInfoFound)) - if !(errors.Is(err, jerrors.ErrNoAudioInfoFound) || errors.Is(err, jerrors.ErrNoVideoInfoFound)) { - glog.Fatal(msg) - } - fmt.Println(msg) - panic(msg) + glog.Errorf("Can't get info about file err=%q, isNoAudio=%v isNoVideo=%v stack=%+v", err, errors.Is(err, jerrors.ErrNoAudioInfoFound), errors.Is(err, jerrors.ErrNoVideoInfoFound), err) + return err } for i, st := range streams { - if st.Type().IsAudio() { - audioidx = int8(i) + codecType := "unknown" + if codec := st.Type(); codec.IsAudio() { + codecType = "audio" + } else if codec.IsVideo() { + codecType = "video" } - if st.Type().IsVideo() { - videoidx = int8(i) + streamTypes[int8(i)] = codecType + } + glog.V(model.VERBOSE).Infof("Stream types=%+v", streamTypes) + + sendSegment := func(seg *model.HlsSegment) bool { + select { + case out <- &model.HlsSegment{Err: err}: + return true + case <-ctx.Done(): + return false } } - glog.V(model.VERBOSE).Infof("Video stream index %d, audio stream index %d\n", videoidx, audioidx) seqNo := 0 // var curPTS time.Duration @@ -142,12 +162,12 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo segFile, buf := createInMemoryTSMuxer() err = segFile.WriteHeader(streams) if err != nil { - glog.Fatal(err) + return err } if firstFramePacket != nil { err = segFile.WritePacket(*firstFramePacket) if err != nil { - glog.Fatal(err) + return err } prevPTS = firstFramePacket.Time firstFramePacket = nil @@ -158,7 +178,8 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo for { select { case <-ctx.Done(): - return + // don't return ctx.Err() as that would send the error in the out channel + return nil default: } pkt, rerr = inFile.ReadPacket() @@ -169,7 +190,7 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo } break } - glog.Fatal(rerr) + return rerr } lastPacket = pkt @@ -182,7 +203,7 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo // This matches segmenter algorithm used in ffmpeg if pkt.IsKeyFrame && pkt.Time >= time.Duration(seqNo+1)*segLen { firstFramePacket = &pkt - glog.V(model.VERBOSE).Infof("Packet Is Keyframe %v Is Audio %v Is Video %v PTS %s sinc prev %s seqNo %d\n", pkt.IsKeyFrame, pkt.Idx == audioidx, pkt.Idx == videoidx, pkt.Time, + glog.V(model.VERBOSE).Infof("Packet isKeyframe=%v codecType=%v PTS=%s sincPrev=%s seqNo=%d", pkt.IsKeyFrame, streamTypes[pkt.Idx], pkt.Time, pkt.Time-prevPTS, seqNo+1) // prevPTS = pkt.Time curDur = pkt.Time - prevPTS @@ -190,12 +211,12 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo } err = segFile.WritePacket(pkt) if err != nil { - glog.Fatal(err) + return err } } err = segFile.WriteTrailer() if err != nil { - glog.Fatal(err) + return err } if rerr == io.EOF && stopAfter > 0 && (prevPTS+curDur) < stopAfter && len(fileName) > 0 { // re-open same file and stream it again @@ -203,8 +224,9 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo ts.timeShift = lastPacket.Time + 30*time.Millisecond inf, err := avutil.Open(fileName) if err != nil { - glog.Fatal(err) + return err } + defer inf.Close() inFile.Demuxer = inf // rs.counter.currentSegments = 0 inFile.Streams() @@ -221,7 +243,9 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo Duration: curDur, Data: buf.Bytes(), } - out <- hlsSeg + if !sendSegment(hlsSeg) { + return nil + } prevPTS = lastPacket.Time } else { seqNo-- @@ -241,7 +265,9 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo Duration: curDur, Data: buf.Bytes(), } - out <- hlsSeg + if !sendSegment(hlsSeg) { + return nil + } sent = 0 } } @@ -257,7 +283,9 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo SeqNo: seqNo + 1 + sent, Pts: prevPTS + curDur, } - out <- hlsSeg + if !sendSegment(hlsSeg) { + return nil + } break } } @@ -267,10 +295,12 @@ func segmentingLoop(ctx context.Context, fileName string, inFileReal av.DemuxClo SeqNo: seqNo + 1, Pts: prevPTS + curDur, } - out <- hlsSeg + if !sendSegment(hlsSeg) { + return nil + } break } seqNo++ } - return + return nil }