Skip to content

Commit

Permalink
gcp force ipv4
Browse files Browse the repository at this point in the history
  • Loading branch information
frostbyte73 committed Mar 5, 2024
1 parent bdbbd99 commit 2b4d75f
Showing 1 changed file with 43 additions and 4 deletions.
47 changes: 43 additions & 4 deletions pkg/pipeline/sink/uploader/gcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,21 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"os"
"syscall"
"time"

"cloud.google.com/go/storage"
"github.com/googleapis/gax-go/v2"
"google.golang.org/api/googleapi"
"google.golang.org/api/option"
"google.golang.org/api/option/internaloption"
"google.golang.org/api/transport"
htransport "google.golang.org/api/transport/http"

"github.com/livekit/egress/pkg/errors"
"github.com/livekit/egress/pkg/types"
"github.com/livekit/protocol/livekit"
)
Expand All @@ -42,15 +49,47 @@ func newGCPUploader(conf *livekit.GCPUpload) (uploader, error) {
conf: conf,
}

var err error
var opts []option.ClientOption
if conf.Credentials != "" {
u.client, err = storage.NewClient(context.Background(), option.WithCredentialsJSON([]byte(u.conf.Credentials)))
} else {
u.client, err = storage.NewClient(context.Background())
opts = append(opts, option.WithCredentialsJSON([]byte(conf.Credentials)))
}
opts = append([]option.ClientOption{
option.WithScopes(storage.ScopeFullControl, "https://www.googleapis.com/auth/cloud-platform"),
option.WithUserAgent("gcloud-golang-storage/1.36.0")}, opts...)
opts = append(opts, internaloption.WithDefaultEndpoint("https://storage.googleapis.com/storage/v1/"))
opts = append(opts, internaloption.WithDefaultMTLSEndpoint("https://storage.mtls.googleapis.com/storage/v1/"))
creds, err := transport.Creds(context.Background(), opts...)
if err == nil {
opts = append(opts, internaloption.WithCredentials(creds))
}

// force ipv4 to avoid "service not available in your location, forbidden" errors from Google
defaultTransport := http.DefaultTransport.(*http.Transport).Clone()
// override default transport DialContext
http.DefaultTransport.(*http.Transport).DialContext = func(ctx context.Context, network, addr string) (net.Conn, error) {
return (&net.Dialer{
Timeout: time.Second * 30,
KeepAlive: time.Second * 30,
FallbackDelay: time.Nanosecond,
ControlContext: func(ctx context.Context, network, address string, c syscall.RawConn) error {
if network == "tcp6" {
return errors.New("tcp6 disabled")
}
return nil
},
}).DialContext(ctx, network, addr)
}
httpClient, _, err := htransport.NewClient(context.Background(), opts...)
// reset default transport
http.DefaultTransport = defaultTransport

opts = append(opts, option.WithHTTPClient(httpClient))

c, err := storage.NewClient(context.Background(), opts...)
if err != nil {
return nil, err
}
u.client = c

return u, nil
}
Expand Down

0 comments on commit 2b4d75f

Please sign in to comment.