Skip to content

Commit

Permalink
Start working on runner blob integration
Browse files Browse the repository at this point in the history
  • Loading branch information
bcspragu committed Oct 21, 2023
1 parent 94b42db commit 8d07abb
Show file tree
Hide file tree
Showing 37 changed files with 1,250 additions and 405 deletions.
8 changes: 4 additions & 4 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ oci_pull(
platforms = ["linux/amd64"],
)

# TODO: Replace this with the base image provided by RMI
oci_pull(
name = "runner_base",
digest = "sha256:46c5b9bd3e3efff512e28350766b54355fce6337a0b44ba3f822ab918eca4520",
image = "gcr.io/distroless/base",
platforms = ["linux/amd64"],
digest = "sha256:d0b2922dc48cb6acb7c767f89f0c92ccbe1a043166971bac0b585b3851a9b720",
# TODO: Replace this with a real one
image = "docker.io/curfewreplica/pactatest",
# platforms = ["linux/amd64"],
)
16 changes: 16 additions & 0 deletions azure/azblob/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "azblob",
srcs = ["azblob.go"],
importpath = "github.com/RMI/pacta/azure/azblob",
visibility = ["//visibility:public"],
deps = [
"//blob",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//sas",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//service",
],
)
185 changes: 185 additions & 0 deletions azure/azblob/azblob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Package azblob wraps the existing Azure blob library to provide basic upload,
// download, and URL signing functionality against a standardized interface.
package azblob

import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
azservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/RMI/pacta/blob"
)

const (
Scheme = blob.Scheme("az")
)

type Client struct {
storageAccount string
now func() time.Time

client *azblob.Client
svcClient *azservice.Client

cachedUDCMu *sync.Mutex
cachedUDC *azservice.UserDelegationCredential
cachedUDCExpiry time.Time
}

func NewClient(creds azcore.TokenCredential, storageAccount string) (*Client, error) {
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", storageAccount)

client, err := azblob.NewClient(serviceURL, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to init Azure blob client: %w", err)
}

svcClient, err := azservice.NewClient(serviceURL, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to init Azure blob service client: %w", err)
}

return &Client{
storageAccount: storageAccount,
now: func() time.Time { return time.Now().UTC() },

client: client,
svcClient: svcClient,

cachedUDCMu: &sync.Mutex{},
}, nil
}

func (c *Client) Scheme() blob.Scheme {
return Scheme
}

func (c *Client) WriteBlob(ctx context.Context, uri string, r io.Reader) error {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return fmt.Errorf("malformed URI %q is not for Azure", uri)
}

if _, err := c.client.UploadStream(ctx, ctr, blb, r, nil); err != nil {
return fmt.Errorf("failed to upload blob: %w", err)
}
return nil
}

func (c *Client) ReadBlob(ctx context.Context, uri string) (io.ReadCloser, error) {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return nil, fmt.Errorf("malformed URI %q is not for Azure", uri)
}

resp, err := c.client.DownloadStream(ctx, ctr, blb, nil)
if err != nil {
return nil, fmt.Errorf("failed to read blob: %w", err)
}

return resp.Body, nil
}

// SignedUploadURL returns a URL that is allowed to upload to the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/[email protected]/sas#example-package-UserDelegationSAS
func (c *Client) SignedUploadURL(ctx context.Context, uri string) (string, error) {
return c.signBlob(ctx, uri, &sas.BlobPermissions{Create: true, Write: true})
}

// SignedDownloadURL returns a URL that is allowed to download the file at the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/[email protected]/sas#example-package-UserDelegationSAS
func (c *Client) SignedDownloadURL(ctx context.Context, uri string) (string, error) {
return c.signBlob(ctx, uri, &sas.BlobPermissions{Read: true})
}

func (c *Client) signBlob(ctx context.Context, uri string, perms *sas.BlobPermissions) (string, error) {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return "", fmt.Errorf("malformed URI %q is not for Azure", uri)
}

// The blob component is important, otherwise the signed URL is applicable to the whole container.
if blb == "" {
return "", fmt.Errorf("uri %q did not contain a blob component", uri)
}

now := c.now().UTC().Add(-10 * time.Second)
udc, err := c.getUserDelegationCredential(ctx, now)
if err != nil {
return "", fmt.Errorf("failed to get udc: %w", err)
}

// Create Blob Signature Values with desired permissions and sign with user delegation credential
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: now,
ExpiryTime: now.Add(15 * time.Minute),
Permissions: perms.String(),
ContainerName: ctr,
BlobName: blb,
}.SignWithUserDelegation(udc)
if err != nil {
return "", fmt.Errorf("failed to sign blob: %w", err)
}

return fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", c.storageAccount, ctr, blb, sasQueryParams.Encode()), nil
}

func (c *Client) ListBlobs(ctx context.Context, uriPrefix string) ([]string, error) {
ctr, blobPrefix, ok := blob.SplitURI(Scheme, uriPrefix)
if !ok {
return nil, fmt.Errorf("malformed URI prefix %q is not for Azure", uriPrefix)
}

if blobPrefix == "" {
return nil, fmt.Errorf("uri prefix %q did not contain a blob component", uriPrefix)
}

pager := c.client.NewListBlobsFlatPager(ctr, &azblob.ListBlobsFlatOptions{
Prefix: &blobPrefix,
})

var blobs []string
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load page of blobs: %w", err)
}
for _, bi := range resp.Segment.BlobItems {
blobs = append(blobs, blob.Join(Scheme, ctr, *bi.Name))
}
}

return blobs, nil
}

func (c *Client) getUserDelegationCredential(ctx context.Context, now time.Time) (*azservice.UserDelegationCredential, error) {
c.cachedUDCMu.Lock()
defer c.cachedUDCMu.Unlock()

expiry := now.Add(48 * time.Hour)
info := azservice.KeyInfo{
Start: to.Ptr(now.UTC().Format(sas.TimeFormat)),
Expiry: to.Ptr(expiry.UTC().Format(sas.TimeFormat)),
}

if !c.cachedUDCExpiry.IsZero() && c.cachedUDCExpiry.Sub(now) > 1*time.Minute {
return c.cachedUDC, nil
}

udc, err := c.svcClient.GetUserDelegationCredential(ctx, info, nil)
if err != nil {
return nil, fmt.Errorf("failed to get delegated credentials: %w", err)
}
c.cachedUDC = udc
c.cachedUDCExpiry = expiry

return udc, nil
}
109 changes: 40 additions & 69 deletions azure/aztask/aztask.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package aztask

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -27,16 +26,9 @@ type Config struct {
// Location is the location to run the runner, like centralus
Location string

// ConfigPath should be a full path to a config file in the runner image,
// like: /configs/{local,dev}.conf
ConfigPath string

// Identity is the account the runner should act as.
Identity *RunnerIdentity

// Image the runner image to execute
Image *RunnerImage

Rand *rand.Rand
}

Expand All @@ -45,16 +37,12 @@ func (c *Config) validate() error {
return errors.New("no container location given")
}

if c.ConfigPath == "" {
return errors.New("no runner config path given")
}

if err := c.Identity.validate(); err != nil {
return fmt.Errorf("invalid identity config: %w", err)
}

if err := c.Image.validate(); err != nil {
return fmt.Errorf("invalid image config: %w", err)
if c.Rand == nil {
return errors.New("no random number generator given")
}

return nil
Expand Down Expand Up @@ -99,35 +87,7 @@ func (r *RunnerIdentity) EnvironmentID() string {
return fmt.Sprintf(tmpl, r.SubscriptionID, r.ResourceGroup, r.ManagedEnvironment)
}

type RunnerImage struct {
// Like rmipacta.azurecr.io
Registry string
// Like runner
Name string
}

func (ri *RunnerImage) validate() error {
if ri.Registry == "" {
return errors.New("no runner image registry given")
}
if ri.Name == "" {
return errors.New("no runner image name given")
}
return nil
}

func (r *RunnerImage) WithTag(tag string) string {
var buf bytes.Buffer
// <registry>/<name>:<tag>
buf.WriteString(r.Registry)
buf.WriteRune('/')
buf.WriteString(r.Name)
buf.WriteRune(':')
buf.WriteString(tag)
return buf.String()
}

func NewTaskRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
func NewRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid task runner config: %w", err)
}
Expand All @@ -149,11 +109,29 @@ func NewTaskRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
}, nil
}

func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.ID, error) {
func (r *Runner) Run(ctx context.Context, cfg *task.Config) (task.ID, error) {

name := r.gen.NewID()
identity := r.cfg.Identity.String()
envID := r.cfg.Identity.EnvironmentID()

envVars := []*armappcontainers.EnvironmentVar{
{
Name: to.Ptr("AZURE_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("MANAGED_IDENTITY_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
}
for _, v := range cfg.Env {
envVars = append(envVars, &armappcontainers.EnvironmentVar{
Name: to.Ptr(v.Key),
Value: to.Ptr(v.Value),
})
}

job := armappcontainers.Job{
Location: &r.cfg.Location,
Identity: &armappcontainers.ManagedServiceIdentity{
Expand All @@ -176,7 +154,7 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.
ReplicaRetryLimit: to.Ptr(int32(0)),
Registries: []*armappcontainers.RegistryCredentials{
{
Server: to.Ptr(r.cfg.Image.Registry),
Server: to.Ptr(cfg.Image.Base.Registry),
Identity: to.Ptr(identity),
},
},
Expand All @@ -188,30 +166,12 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.
Template: &armappcontainers.JobTemplate{
Containers: []*armappcontainers.Container{
{
Args: []*string{
to.Ptr("--config=" + r.cfg.ConfigPath),
},
Command: []*string{
to.Ptr("/runner"),
},
Env: []*armappcontainers.EnvironmentVar{
{
Name: to.Ptr("AZURE_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("MANAGED_IDENTITY_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("PORTFOLIO_ID"),
Value: to.Ptr(string(req.PortfolioID)),
},
},
// TODO: Take in the image digest as part of the task definition, as this can change per request.
Image: to.Ptr(r.cfg.Image.WithTag("latest")),
Name: to.Ptr(name),
Probes: []*armappcontainers.ContainerAppProbe{},
Args: toPtrs(cfg.Flags),
Command: toPtrs(cfg.Command),
Env: envVars,
Image: to.Ptr(cfg.Image.String()),
Name: to.Ptr(name),
Probes: []*armappcontainers.ContainerAppProbe{},
Resources: &armappcontainers.ContainerResources{
CPU: to.Ptr(1.0),
Memory: to.Ptr("2Gi"),
Expand Down Expand Up @@ -246,3 +206,14 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.

return task.ID(*res.ID), nil
}

func toPtrs[T any](in []T) []*T {
if in == nil {
return nil
}
out := make([]*T, len(in))
for i, v := range in {
out[i] = &v
}
return out
}
Loading

0 comments on commit 8d07abb

Please sign in to comment.