diff --git a/pkg/pipeline/sink/segments.go b/pkg/pipeline/sink/segments.go index 6cda02a59..6a4939b1b 100644 --- a/pkg/pipeline/sink/segments.go +++ b/pkg/pipeline/sink/segments.go @@ -61,7 +61,6 @@ type SegmentSink struct { closedSegments chan SegmentUpdate playlistUpdates chan SegmentUpdate - throttle core.Throttle done core.Fuse } @@ -103,7 +102,6 @@ func newSegmentSink(u uploader.Uploader, p *config.PipelineConfig, o *config.Seg openSegmentsStartTime: make(map[string]uint64), closedSegments: make(chan SegmentUpdate, maxPendingUploads), playlistUpdates: make(chan SegmentUpdate, maxPendingUploads), - throttle: core.NewThrottle(time.Second * 2), done: core.NewFuse(), } @@ -183,35 +181,22 @@ func (s *SegmentSink) handlePlaylistUpdates(update SegmentUpdate) error { <-update.uploadComplete s.playlistLock.Lock() + defer s.playlistLock.Unlock() + if err := s.playlist.Append(segmentStartTime, duration, update.filename); err != nil { - s.playlistLock.Unlock() return err } + if err := s.uploadPlaylist(); err != nil { + s.callbacks.OnError(err) + } if s.livePlaylist != nil { if err := s.livePlaylist.Append(segmentStartTime, duration, update.filename); err != nil { - s.playlistLock.Unlock() return err } - } - s.playlistLock.Unlock() - - // throttle playlist uploads - s.throttle(func() { - s.playlistLock.Lock() - defer s.playlistLock.Unlock() - if s.done.IsBroken() { - return - } - - if err := s.uploadPlaylist(); err != nil { + if err := s.uploadLivePlaylist(); err != nil { s.callbacks.OnError(err) } - if s.livePlaylist != nil { - if err := s.uploadLivePlaylist(); err != nil { - s.callbacks.OnError(err) - } - } - }) + } return nil } diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index 5c0e6c641..b9d42bca3 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -18,7 +18,10 @@ import ( "context" "fmt" "io" + "net" + "net/http" "os" + "syscall" "time" "cloud.google.com/go/storage" @@ -26,6 +29,7 @@ import ( "google.golang.org/api/googleapi" "google.golang.org/api/option" + "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" ) @@ -42,15 +46,35 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { conf: conf, } - var err error + var opts []option.ClientOption if conf.Credentials != "" { - u.client, err = storage.NewClient(context.Background(), option.WithCredentialsJSON([]byte(u.conf.Credentials))) - } else { - u.client, err = storage.NewClient(context.Background()) + opts = append(opts, option.WithCredentialsJSON([]byte(conf.Credentials))) + } + + // override default transport DialContext + defaultTransport := http.DefaultTransport.(*http.Transport).Clone() + http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { + return (&net.Dialer{ + Timeout: time.Second * 30, + KeepAlive: time.Second * 30, + FallbackDelay: -1, + ControlContext: func(ctx context.Context, network, address string, c syscall.RawConn) error { + // force ipv4 to avoid "service not available in your location, forbidden" errors from Google + if network == "tcp6" { + return errors.New("tcp6 disabled") + } + return nil + }, + }).DialContext(ctx, network, addr) } + + c, err := storage.NewClient(context.Background(), opts...) + // restore default transport + http.DefaultTransport = defaultTransport if err != nil { return nil, err } + u.client = c return u, nil }