Skip to content

Commit

Permalink
Add basic async task support to PACTA (#32)
Browse files Browse the repository at this point in the history
* Add basic async task support to PACTA

Add a new `runner` binary, which will be invoked either locally (via Docker) or remotely (via `aztasks`/Azure Container Apps Jobs). Currently, none of the PACTA-specific handling is here, like passing portfolio information, loading blobs, etc, it's just the basic infra.

Signed-off-by: Brandon Sprague <[email protected]>

* Update deployment config and scripts

Signed-off-by: Brandon Sprague <[email protected]>

---------

Signed-off-by: Brandon Sprague <[email protected]>
  • Loading branch information
bcspragu authored Oct 4, 2023
1 parent 45c4920 commit 5930dea
Show file tree
Hide file tree
Showing 30 changed files with 1,182 additions and 13 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/deploy.yml
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ jobs:
with:
disableTelemetry: true
containerAppName: pactasrv-dev
containerAppEnvironment: pactasrv-dev
containerAppEnvironment: pacta-dev
resourceGroup: rmi-pacta-dev
imageToDeploy: rmipacta.azurecr.io/pacta:dev
location: centalus
Expand Down
10 changes: 9 additions & 1 deletion WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,15 @@ load("@rules_oci//oci:pull.bzl", "oci_pull")

oci_pull(
name = "distroless_base",
digest = "sha256:73deaaf6a207c1a33850257ba74e0f196bc418636cada9943a03d7abea980d6d",
digest = "sha256:46c5b9bd3e3efff512e28350766b54355fce6337a0b44ba3f822ab918eca4520",
image = "gcr.io/distroless/base",
platforms = ["linux/amd64"],
)

# TODO: Replace this with the base image provided by RMI
oci_pull(
name = "runner_base",
digest = "sha256:46c5b9bd3e3efff512e28350766b54355fce6337a0b44ba3f822ab918eca4520",
image = "gcr.io/distroless/base",
platforms = ["linux/amd64"],
)
12 changes: 12 additions & 0 deletions azure/azlog/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "azlog",
srcs = ["azlog.go"],
importpath = "github.com/RMI/pacta/azure/azlog",
visibility = ["//visibility:public"],
deps = [
"@org_uber_go_zap//:zap",
"@org_uber_go_zap//zapcore",
],
)
79 changes: 79 additions & 0 deletions azure/azlog/azlog.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
// Package azlog provides a thin wrapper around the zap logging package to
// enable structured logging on Azure. This is still a work in progress and
// is untested.
package azlog

import (
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
)

type Config struct {
Local bool
MinLogLevel zapcore.Level
}

func New(cfg *Config) (*zap.Logger, error) {
if cfg.Local {
zCfg := zap.NewDevelopmentConfig()
zCfg.Level = zap.NewAtomicLevelAt(cfg.MinLogLevel)
return zCfg.Build()
}

zCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(cfg.MinLogLevel),
Development: false,
Sampling: &zap.SamplingConfig{
Initial: 100,
Thereafter: 100,
},
Encoding: "json",
EncoderConfig: zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "severity",
NameKey: "logger",
CallerKey: "caller",
FunctionKey: zapcore.OmitKey,
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: encodeLevel,
EncodeTime: zapcore.RFC3339TimeEncoder,
EncodeDuration: zapcore.MillisDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
},
OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stderr"},
}
return zCfg.Build(zap.AddStacktrace(zap.DPanicLevel))
}

func encodeLevel(l zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
/*
TODO: Figure out what the appropriate logging method is here.
See https://learn.microsoft.com/en-us/dotnet/api/microsoft.extensions.logging.loglevel?view=dotnet-plat-ext-7.0
Critical 5 Logs that describe an unrecoverable application or system crash, or a catastrophic failure that requires immediate attention.
Debug 1 Logs that are used for interactive investigation during development. These logs should primarily contain information useful for debugging and have no long-term value.
Error 4 Logs that highlight when the current flow of execution is stopped due to a failure. These should indicate a failure in the current activity, not an application-wide failure.
Information 2 Logs that track the general flow of the application. These logs should have long-term value.
None 6 Not used for writing log messages. Specifies that a logging category should not write any messages.
Trace 0 Logs that contain the most detailed messages. These messages may contain sensitive application data. These messages are disabled by default and should never be enabled in a production environment.
Warning 3 Logs that highlight an abnormal or unexpected event in the application flow, but do not otherwise cause the application execution to stop.
*/
switch l {
case zapcore.DebugLevel:
enc.AppendString("DEBUG")
case zapcore.InfoLevel:
enc.AppendString("INFO")
case zapcore.WarnLevel:
enc.AppendString("WARNING")
case zapcore.ErrorLevel:
enc.AppendString("ERROR")
case zapcore.DPanicLevel:
enc.AppendString("CRITICAL")
case zapcore.PanicLevel:
enc.AppendString("ALERT")
case zapcore.FatalLevel:
enc.AppendString("EMERGENCY")
}
}
15 changes: 15 additions & 0 deletions azure/aztask/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "aztask",
srcs = ["aztask.go"],
importpath = "github.com/RMI/pacta/azure/aztask",
visibility = ["//visibility:public"],
deps = [
"//task",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore",
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to",
"@com_github_azure_azure_sdk_for_go_sdk_resourcemanager_appcontainers_armappcontainers_v2//:armappcontainers",
"@com_github_silicon_ally_idgen//:idgen",
],
)
248 changes: 248 additions & 0 deletions azure/aztask/aztask.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,248 @@
// Package aztask wraps Azure's Container Apps Jobs API to provide basic async
// task execution for the PACTA ecosystem.
package aztask

import (
"bytes"
"context"
"errors"
"fmt"
"math/rand"

"github.com/Azure/azure-sdk-for-go/sdk/azcore"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
armappcontainers "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/appcontainers/armappcontainers/v2"
"github.com/RMI/pacta/task"
"github.com/Silicon-Ally/idgen"
)

type Runner struct {
client *armappcontainers.JobsClient

cfg *Config
gen *idgen.Generator
}

type Config struct {
// Location is the location to run the runner, like centralus
Location string

// ConfigPath should be a full path to a config file in the runner image,
// like: /configs/{local,dev}.conf
ConfigPath string

// Identity is the account the runner should act as.
Identity *RunnerIdentity

// Image the runner image to execute
Image *RunnerImage

Rand *rand.Rand
}

func (c *Config) validate() error {
if c.Location == "" {
return errors.New("no container location given")
}

if c.ConfigPath == "" {
return errors.New("no runner config path given")
}

if err := c.Identity.validate(); err != nil {
return fmt.Errorf("invalid identity config: %w", err)
}

if err := c.Image.validate(); err != nil {
return fmt.Errorf("invalid image config: %w", err)
}

return nil
}

type RunnerIdentity struct {
// Like runner-local
Name string
// Like aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee
SubscriptionID string
// Like rmi-pacta-{local,dev}
ResourceGroup string
// Like ffffffff-0000-1111-2222-333333333333
ClientID string
// Like pacta-{local,dev}, the name of the Container Apps Environment
ManagedEnvironment string
}

func (ri *RunnerIdentity) validate() error {
if ri.Name == "" {
return errors.New("no identity name given")
}
if ri.SubscriptionID == "" {
return errors.New("no identity subscription ID given")
}
if ri.ResourceGroup == "" {
return errors.New("no identity resource group given")
}
if ri.ClientID == "" {
return errors.New("no identity client ID given")
}
return nil
}

func (r *RunnerIdentity) String() string {
tmpl := "/subscriptions/%s/resourcegroups/%s/providers/Microsoft.ManagedIdentity/userAssignedIdentities/%s"
return fmt.Sprintf(tmpl, r.SubscriptionID, r.ResourceGroup, r.Name)
}

func (r *RunnerIdentity) EnvironmentID() string {
tmpl := "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.App/managedEnvironments/%s"
return fmt.Sprintf(tmpl, r.SubscriptionID, r.ResourceGroup, r.ManagedEnvironment)
}

type RunnerImage struct {
// Like rmipacta.azurecr.io
Registry string
// Like runner
Name string
}

func (ri *RunnerImage) validate() error {
if ri.Registry == "" {
return errors.New("no runner image registry given")
}
if ri.Name == "" {
return errors.New("no runner image name given")
}
return nil
}

func (r *RunnerImage) WithTag(tag string) string {
var buf bytes.Buffer
// <registry>/<name>:<tag>
buf.WriteString(r.Registry)
buf.WriteRune('/')
buf.WriteString(r.Name)
buf.WriteRune(':')
buf.WriteString(tag)
return buf.String()
}

func NewTaskRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) {
if err := cfg.validate(); err != nil {
return nil, fmt.Errorf("invalid task runner config: %w", err)
}

clientFactory, err := armappcontainers.NewClientFactory(cfg.Identity.SubscriptionID, creds, nil)
if err != nil {
return nil, fmt.Errorf("failed to create client: %w", err)
}

gen, err := idgen.New(cfg.Rand, idgen.WithDefaultLength(32), idgen.WithCharSet([]rune("abcdefghijklmnopqrstuvwxyz")))
if err != nil {
return nil, fmt.Errorf("failed to init ID generator: %w", err)
}

return &Runner{
client: clientFactory.NewJobsClient(),
cfg: cfg,
gen: gen,
}, nil
}

func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.ID, error) {
name := r.gen.NewID()
identity := r.cfg.Identity.String()
envID := r.cfg.Identity.EnvironmentID()

job := armappcontainers.Job{
Location: &r.cfg.Location,
Identity: &armappcontainers.ManagedServiceIdentity{
Type: to.Ptr(armappcontainers.ManagedServiceIdentityTypeUserAssigned),
UserAssignedIdentities: map[string]*armappcontainers.UserAssignedIdentity{
identity: {},
},
},
Properties: &armappcontainers.JobProperties{
Configuration: &armappcontainers.JobConfiguration{
ReplicaTimeout: to.Ptr(int32(60 * 60 * 2 /* two hours */)),
TriggerType: to.Ptr(armappcontainers.TriggerTypeManual),
ManualTriggerConfig: &armappcontainers.JobConfigurationManualTriggerConfig{
// Run one copy.
Parallelism: to.Ptr(int32(1)),
ReplicaCompletionCount: to.Ptr(int32(1)),
},
// Don't retry, if it failed once, it'll probably fail again. We might relax
// this in the future if we identify "transient" errors.
ReplicaRetryLimit: to.Ptr(int32(0)),
Registries: []*armappcontainers.RegistryCredentials{
{
Server: to.Ptr(r.cfg.Image.Registry),
Identity: to.Ptr(identity),
},
},
Secrets: []*armappcontainers.Secret{
// TODO: Put any useful configuration here.
},
},
EnvironmentID: to.Ptr(envID),
Template: &armappcontainers.JobTemplate{
Containers: []*armappcontainers.Container{
{
Args: []*string{
to.Ptr("--config=" + r.cfg.ConfigPath),
},
Command: []*string{
to.Ptr("/runner"),
},
Env: []*armappcontainers.EnvironmentVar{
{
Name: to.Ptr("AZURE_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("MANAGED_IDENTITY_CLIENT_ID"),
Value: to.Ptr(r.cfg.Identity.ClientID),
},
{
Name: to.Ptr("PORTFOLIO_ID"),
Value: to.Ptr(string(req.PortfolioID)),
},
},
// TODO: Take in the image digest as part of the task definition, as this can change per request.
Image: to.Ptr(r.cfg.Image.WithTag("latest")),
Name: to.Ptr(name),
Probes: []*armappcontainers.ContainerAppProbe{},
Resources: &armappcontainers.ContainerResources{
CPU: to.Ptr(1.0),
Memory: to.Ptr("2Gi"),
},
VolumeMounts: []*armappcontainers.VolumeMount{},
},
},
Volumes: []*armappcontainers.Volume{
// TODO: Mount any sources here.
},
},
},
Tags: map[string]*string{},
}
poller, err := r.client.BeginCreateOrUpdate(ctx, r.cfg.Identity.ResourceGroup, name, job, nil)
if err != nil {
return "", fmt.Errorf("failed to create container app job: %w", err)
}

res, err := poller.PollUntilDone(ctx, nil)
if err != nil {
return "", fmt.Errorf("failed to poll container group creation: %w", err)
}

poller2, err := r.client.BeginStart(ctx, r.cfg.Identity.ResourceGroup, name, nil)
if err != nil {
return "", fmt.Errorf("failed to start container app job: %w", err)
}
if _, err := poller2.PollUntilDone(ctx, nil); err != nil {
return "", fmt.Errorf("failed to poll for container app start: %w", err)
}

return task.ID(*res.ID), nil
}
Loading

0 comments on commit 5930dea

Please sign in to comment.