Skip to content

Commit

Permalink
remove gcp timeouts (#684)
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 authored May 30, 2024
1 parent f17766c commit f3ef4d4
Show file tree
Hide file tree
Showing 2 changed files with 17 additions and 36 deletions.
14 changes: 14 additions & 0 deletions pkg/info/info.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,17 @@
// Copyright 2023 LiveKit, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package info

import (
Expand Down
39 changes: 3 additions & 36 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,25 +19,18 @@ import (
"encoding/base64"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"syscall"
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)

const gcpTimeout = time.Minute

type GCPUploader struct {
conf *livekit.GCPUpload
client *storage.Client
Expand All @@ -56,21 +49,6 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) {
defaultTransport := http.DefaultTransport.(*http.Transport)
transportClone := defaultTransport.Clone()

// override default transport
defaultTransport.DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
return (&net.Dialer{
Timeout: time.Second * 30,
KeepAlive: time.Second * 30,
FallbackDelay: -1,
ControlContext: func(ctx context.Context, network, address string, c syscall.RawConn) error {
// force ipv4 to avoid "service not available in your location, forbidden" errors from Google
if network == "tcp6" {
return errors.New("tcp6 disabled")
}
return nil
},
}).DialContext(ctx, network, addr)
}
if conf.Proxy != nil {
proxyUrl, err := url.Parse(conf.Proxy.Url)
if err != nil {
Expand Down Expand Up @@ -110,27 +88,16 @@ func (u *GCPUploader) upload(localFilepath, storageFilepath string, _ types.Outp
return "", 0, wrap("GCP", err)
}

// In case where the total amount of data to upload is larger than googleapi.DefaultUploadChunkSize, each upload request will have a timeout of
// ChunkRetryDeadline, which is 32s by default. If the request payload is smaller than googleapi.DefaultUploadChunkSize, use a context deadline
// to apply the same timeout
var ctx context.Context
if stat.Size() <= googleapi.DefaultUploadChunkSize {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(context.Background(), gcpTimeout)
defer cancel()
} else {
ctx = context.Background()
}

wc := u.client.Bucket(u.conf.Bucket).Object(storageFilepath).Retryer(
storage.WithBackoff(gax.Backoff{
Initial: minDelay,
Max: maxDelay,
Multiplier: 2,
}),
storage.WithMaxAttempts(maxRetries),
storage.WithPolicy(storage.RetryAlways),
).NewWriter(ctx)
wc.ChunkRetryDeadline = gcpTimeout
).NewWriter(context.Background())
wc.ChunkRetryDeadline = 0

if _, err = io.Copy(wc, file); err != nil {
return "", 0, wrap("GCP", err)
Expand Down

0 comments on commit f3ef4d4

Please sign in to comment.