From fc5379efa1c9de7c4d876315dfd4395bf55f5bf5 Mon Sep 17 00:00:00 2001 From: David Colburn Date: Thu, 30 May 2024 02:48:10 +0200 Subject: [PATCH] remove gcp timeouts --- pkg/info/info.go | 14 +++++++++++ pkg/pipeline/sink/uploader/gcp.go | 39 +++---------------------------- 2 files changed, 17 insertions(+), 36 deletions(-) diff --git a/pkg/info/info.go b/pkg/info/info.go index 3c0745b5..16d0bb8a 100644 --- a/pkg/info/info.go +++ b/pkg/info/info.go @@ -1,3 +1,17 @@ +// Copyright 2023 LiveKit, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + package info import ( diff --git a/pkg/pipeline/sink/uploader/gcp.go b/pkg/pipeline/sink/uploader/gcp.go index af5705a1..a4e11259 100644 --- a/pkg/pipeline/sink/uploader/gcp.go +++ b/pkg/pipeline/sink/uploader/gcp.go @@ -19,25 +19,18 @@ import ( "encoding/base64" "fmt" "io" - "net" "net/http" "net/url" "os" - "syscall" - "time" "cloud.google.com/go/storage" "github.com/googleapis/gax-go/v2" - "google.golang.org/api/googleapi" "google.golang.org/api/option" - "github.com/livekit/egress/pkg/errors" "github.com/livekit/egress/pkg/types" "github.com/livekit/protocol/livekit" ) -const gcpTimeout = time.Minute - type GCPUploader struct { conf *livekit.GCPUpload client *storage.Client @@ -56,21 +49,6 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) { defaultTransport := http.DefaultTransport.(*http.Transport) transportClone := defaultTransport.Clone() - // override default transport - defaultTransport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) { - return (&net.Dialer{ - Timeout: time.Second * 30, - KeepAlive: time.Second * 30, - FallbackDelay: -1, - ControlContext: func(ctx context.Context, network, address string, c syscall.RawConn) error { - // force ipv4 to avoid "service not available in your location, forbidden" errors from Google - if network == "tcp6" { - return errors.New("tcp6 disabled") - } - return nil - }, - }).DialContext(ctx, network, addr) - } if conf.Proxy != nil { proxyUrl, err := url.Parse(conf.Proxy.Url) if err != nil { @@ -110,27 +88,16 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp return "", 0, wrap("GCP", err) } - // In case where the total amount of data to upload is larger than googleapi.DefaultUploadChunkSize, each upload request will have a timeout of - // ChunkRetryDeadline, which is 32s by default. If the request payload is smaller than googleapi.DefaultUploadChunkSize, use a context deadline - // to apply the same timeout - var ctx context.Context - if stat.Size() <= googleapi.DefaultUploadChunkSize { - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(context.Background(), gcpTimeout) - defer cancel() - } else { - ctx = context.Background() - } - wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer( storage.WithBackoff(gax.Backoff{ Initial: minDelay, Max: maxDelay, Multiplier: 2, }), + storage.WithMaxAttempts(maxRetries), storage.WithPolicy(storage.RetryAlways), - ).NewWriter(ctx) - wc.ChunkRetryDeadline = gcpTimeout + ).NewWriter(context.Background()) + wc.ChunkRetryDeadline = 0 if _, err = io.Copy(wc, file); err != nil { return "", 0, wrap("GCP", err)