Skip to content

Commit

Permalink
Integrate portfolio processing into runner
Browse files Browse the repository at this point in the history
This PR integrates the runner with the PACTA portfolio processing code, integrating with Azure blob storage to read + write portfolios and reports.

I've tested the `process_portfolio` piece against local Docker (i.e. `dockertask`), but I haven't tested the `create_report` flow yet in it's current incarnation. This PR is already large enough that I'll add the relevant async task stuff in a follow-up PR.
  • Loading branch information
bcspragu committed Oct 28, 2023
1 parent 94b42db commit d3e3dfb
Show file tree
Hide file tree
Showing 44 changed files with 1,482 additions and 406 deletions.
5 changes: 5 additions & 0 deletions .sops.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
# creation rules are evaluated sequentially, the first match wins
creation_rules:
- path_regex: secrets/local\.enc\.json$
azure_keyvault: https://rmipactalocalsops.vault.azure.net/keys/sops/d670bcbc510f456d821306913b4c55c6

8 changes: 4 additions & 4 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ oci_pull(
platforms = ["linux/amd64"],
)

# TODO: Replace this with the base image provided by RMI
oci_pull(
name = "runner_base",
digest = "sha256:46c5b9bd3e3efff512e28350766b54355fce6337a0b44ba3f822ab918eca4520",
image = "gcr.io/distroless/base",
platforms = ["linux/amd64"],
digest = "sha256:d0b2922dc48cb6acb7c767f89f0c92ccbe1a043166971bac0b585b3851a9b720",
# TODO: Replace this with a real one
image = "docker.io/curfewreplica/pactatest",
# platforms = ["linux/amd64"],
)
16 changes: 16 additions & 0 deletions azure/azblob/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "azblob",
srcs = ["azblob.go"],
importpath = "github.com/RMI/pacta/azure/azblob",
visibility = ["//visibility:public"],
deps = [
"//blob",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//:azblob",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//sas",
"@com_github_azure_azure_sdk_for_go_sdk_storage_azblob//service",
],
)
185 changes: 185 additions & 0 deletions azure/azblob/azblob.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,185 @@
// Package azblob wraps the existing Azure blob library to provide basic upload,
// download, and URL signing functionality against a standardized interface.
package azblob

import (
"context"
"fmt"
"io"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
"github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/sas"
azservice "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/service"
"github.com/RMI/pacta/blob"
)

const (
Scheme = blob.Scheme("az")
)

type Client struct {
storageAccount string
now func() time.Time

client *azblob.Client
svcClient *azservice.Client

cachedUDCMu *sync.Mutex
cachedUDC *azservice.UserDelegationCredential
cachedUDCExpiry time.Time
}

func NewClient(creds azcore.TokenCredential, storageAccount string) (*Client, error) {
serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", storageAccount)

client, err := azblob.NewClient(serviceURL, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to init Azure blob client: %w", err)
}

svcClient, err := azservice.NewClient(serviceURL, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to init Azure blob service client: %w", err)
}

return &Client{
storageAccount: storageAccount,
now: func() time.Time { return time.Now().UTC() },

client: client,
svcClient: svcClient,

cachedUDCMu: &sync.Mutex{},
}, nil
}

func (c *Client) Scheme() blob.Scheme {
return Scheme
}

func (c *Client) WriteBlob(ctx context.Context, uri string, r io.Reader) error {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return fmt.Errorf("malformed URI %q is not for Azure", uri)
}

if _, err := c.client.UploadStream(ctx, ctr, blb, r, nil); err != nil {
return fmt.Errorf("failed to upload blob: %w", err)
}
return nil
}

func (c *Client) ReadBlob(ctx context.Context, uri string) (io.ReadCloser, error) {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return nil, fmt.Errorf("malformed URI %q is not for Azure", uri)
}

resp, err := c.client.DownloadStream(ctx, ctr, blb, nil)
if err != nil {
return nil, fmt.Errorf("failed to read blob: %w", err)
}

return resp.Body, nil
}

// SignedUploadURL returns a URL that is allowed to upload to the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/[email protected]/sas#example-package-UserDelegationSAS
func (c *Client) SignedUploadURL(ctx context.Context, uri string) (string, error) {
return c.signBlob(ctx, uri, &sas.BlobPermissions{Create: true, Write: true})
}

// SignedDownloadURL returns a URL that is allowed to download the file at the given URI.
// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/storage/[email protected]/sas#example-package-UserDelegationSAS
func (c *Client) SignedDownloadURL(ctx context.Context, uri string) (string, error) {
return c.signBlob(ctx, uri, &sas.BlobPermissions{Read: true})
}

func (c *Client) signBlob(ctx context.Context, uri string, perms *sas.BlobPermissions) (string, error) {
ctr, blb, ok := blob.SplitURI(Scheme, uri)
if !ok {
return "", fmt.Errorf("malformed URI %q is not for Azure", uri)
}

// The blob component is important, otherwise the signed URL is applicable to the whole container.
if blb == "" {
return "", fmt.Errorf("uri %q did not contain a blob component", uri)
}

now := c.now().UTC().Add(-10 * time.Second)
udc, err := c.getUserDelegationCredential(ctx, now)
if err != nil {
return "", fmt.Errorf("failed to get udc: %w", err)
}

// Create Blob Signature Values with desired permissions and sign with user delegation credential
sasQueryParams, err := sas.BlobSignatureValues{
Protocol: sas.ProtocolHTTPS,
StartTime: now,
ExpiryTime: now.Add(15 * time.Minute),
Permissions: perms.String(),
ContainerName: ctr,
BlobName: blb,
}.SignWithUserDelegation(udc)
if err != nil {
return "", fmt.Errorf("failed to sign blob: %w", err)
}

return fmt.Sprintf("https://%s.blob.core.windows.net/%s/%s?%s", c.storageAccount, ctr, blb, sasQueryParams.Encode()), nil
}

func (c *Client) ListBlobs(ctx context.Context, uriPrefix string) ([]string, error) {
ctr, blobPrefix, ok := blob.SplitURI(Scheme, uriPrefix)
if !ok {
return nil, fmt.Errorf("malformed URI prefix %q is not for Azure", uriPrefix)
}

if blobPrefix == "" {
return nil, fmt.Errorf("uri prefix %q did not contain a blob component", uriPrefix)
}

pager := c.client.NewListBlobsFlatPager(ctr, &azblob.ListBlobsFlatOptions{
Prefix: &blobPrefix,
})

var blobs []string
for pager.More() {
resp, err := pager.NextPage(ctx)
if err != nil {
return nil, fmt.Errorf("failed to load page of blobs: %w", err)
}
for _, bi := range resp.Segment.BlobItems {
blobs = append(blobs, blob.Join(Scheme, ctr, *bi.Name))
}
}

return blobs, nil
}

func (c *Client) getUserDelegationCredential(ctx context.Context, now time.Time) (*azservice.UserDelegationCredential, error) {
c.cachedUDCMu.Lock()
defer c.cachedUDCMu.Unlock()

expiry := now.Add(48 * time.Hour)
info := azservice.KeyInfo{
Start: to.Ptr(now.UTC().Format(sas.TimeFormat)),
Expiry: to.Ptr(expiry.UTC().Format(sas.TimeFormat)),
}

if !c.cachedUDCExpiry.IsZero() && c.cachedUDCExpiry.Sub(now) > 1*time.Minute {
return c.cachedUDC, nil
}

udc, err := c.svcClient.GetUserDelegationCredential(ctx, info, nil)
if err != nil {
return nil, fmt.Errorf("failed to get delegated credentials: %w", err)
}
c.cachedUDC = udc
c.cachedUDCExpiry = expiry

return udc, nil
}
109 changes: 40 additions & 69 deletions azure/aztask/aztask.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
package aztask

import (
"bytes"
"context"
"errors"
"fmt"
Expand All @@ -27,16 +26,9 @@ type Config struct {
// Location is the location to run the runner, like centralus
Location string

// ConfigPath should be a full path to a config file in the runner image,
// like: /configs/{local,dev}.conf
ConfigPath string

// Identity is the account the runner should act as.
Identity *RunnerIdentity

// Image the runner image to execute
Image *RunnerImage

Rand *rand.Rand
}

Expand All @@ -45,16 +37,12 @@ func (c *Config) validate() error {
return errors.New("no container location given")
}

if c.ConfigPath == "" {
return errors.New("no runner config path given")
}

if err := c.Identity.validate(); err != nil {
return fmt.Errorf("invalid identity config: %w", err)
}

if err := c.Image.validate(); err != nil {
return fmt.Errorf("invalid image config: %w", err)
if c.Rand == nil {
return errors.New("no random number generator given")
}

return nil
Expand Down Expand Up @@ -99,35 +87,7 @@ func (r *RunnerIdentity) EnvironmentID() string {
return fmt.Sprintf(tmpl, r.SubscriptionID, r.ResourceGroup, r.ManagedEnvironment)
}

type RunnerImage struct {
// Like rmipacta.azurecr.io
Registry string
// Like runner
Name string
}

func (ri *RunnerImage) validate() error {
if ri.Registry == "" {
return errors.New("no runner image registry given")
}
if ri.Name == "" {
return errors.New("no runner image name given")
}
return nil
}

func (r *RunnerImage) WithTag(tag string) string {
var buf bytes.Buffer
// <registry>/<name>:<tag>
buf.WriteString(r.Registry)
buf.WriteRune('/')
buf.WriteString(r.Name)
buf.WriteRune(':')
buf.WriteString(tag)
return buf.String()
}

func NewTaskRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
func NewRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid task runner config: %w", err)
}
Expand All @@ -149,11 +109,29 @@ func NewTaskRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
}, nil
}

func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.ID, error) {
func (r *Runner) Run(ctx context.Context, cfg *task.Config) (task.ID, error) {

name := r.gen.NewID()
identity := r.cfg.Identity.String()
envID := r.cfg.Identity.EnvironmentID()

envVars := []*armappcontainers.EnvironmentVar{
{
Name: to.Ptr("AZURE_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("MANAGED_IDENTITY_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
}
for _, v := range cfg.Env {
envVars = append(envVars, &armappcontainers.EnvironmentVar{
Name: to.Ptr(v.Key),
Value: to.Ptr(v.Value),
})
}

job := armappcontainers.Job{
Location: &r.cfg.Location,
Identity: &armappcontainers.ManagedServiceIdentity{
Expand All @@ -176,7 +154,7 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.
ReplicaRetryLimit: to.Ptr(int32(0)),
Registries: []*armappcontainers.RegistryCredentials{
{
Server: to.Ptr(r.cfg.Image.Registry),
Server: to.Ptr(cfg.Image.Base.Registry),
Identity: to.Ptr(identity),
},
},
Expand All @@ -188,30 +166,12 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.
Template: &armappcontainers.JobTemplate{
Containers: []*armappcontainers.Container{
{
Args: []*string{
to.Ptr("--config=" + r.cfg.ConfigPath),
},
Command: []*string{
to.Ptr("/runner"),
},
Env: []*armappcontainers.EnvironmentVar{
{
Name: to.Ptr("AZURE_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("MANAGED_IDENTITY_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("PORTFOLIO_ID"),
Value: to.Ptr(string(req.PortfolioID)),
},
},
// TODO: Take in the image digest as part of the task definition, as this can change per request.
Image: to.Ptr(r.cfg.Image.WithTag("latest")),
Name: to.Ptr(name),
Probes: []*armappcontainers.ContainerAppProbe{},
Args: toPtrs(cfg.Flags),
Command: toPtrs(cfg.Command),
Env: envVars,
Image: to.Ptr(cfg.Image.String()),
Name: to.Ptr(name),
Probes: []*armappcontainers.ContainerAppProbe{},
Resources: &armappcontainers.ContainerResources{
CPU: to.Ptr(1.0),
Memory: to.Ptr("2Gi"),
Expand Down Expand Up @@ -246,3 +206,14 @@ func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.

return task.ID(*res.ID), nil
}

func toPtrs[T any](in []T) []*T {
if in == nil {
return nil
}
out := make([]*T, len(in))
for i, v := range in {
out[i] = &v
}
return out
}
Loading

0 comments on commit d3e3dfb

Please sign in to comment.