-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Add basic async task support to PACTA
Add a new `runner` binary, which will be invoked either locally (via Docker) or remotely (via `aztasks`/Azure Container Apps Jobs). Currently, none of the PACTA-specific handling is here, like passing portfolio information, loading blobs, etc, it's just the basic infra. Signed-off-by: Brandon Sprague <[email protected]>
- Loading branch information
Showing
27 changed files
with
1,139 additions
and
11 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,12 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library") | ||
|
||
go_library( | ||
name = "azlog", | ||
srcs = ["azlog.go"], | ||
importpath = "github.com/RMI/pacta/azure/azlog", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"@org_uber_go_zap//:zap", | ||
"@org_uber_go_zap//zapcore", | ||
], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,79 @@ | ||
// Package azlog provides a thin wrapper around the zap logging package to | ||
// enable structured logging on Azure. This is still a work in progress and | ||
// is untested. | ||
package azlog | ||
|
||
import ( | ||
"go.uber.org/zap" | ||
"go.uber.org/zap/zapcore" | ||
) | ||
|
||
type Config struct { | ||
Local bool | ||
MinLogLevel zapcore.Level | ||
} | ||
|
||
func New(cfg *Config) (*zap.Logger, error) { | ||
if cfg.Local { | ||
zCfg := zap.NewDevelopmentConfig() | ||
zCfg.Level = zap.NewAtomicLevelAt(cfg.MinLogLevel) | ||
return zCfg.Build() | ||
} | ||
|
||
zCfg := &zap.Config{ | ||
Level: zap.NewAtomicLevelAt(cfg.MinLogLevel), | ||
Development: false, | ||
Sampling: &zap.SamplingConfig{ | ||
Initial: 100, | ||
Thereafter: 100, | ||
}, | ||
Encoding: "json", | ||
EncoderConfig: zapcore.EncoderConfig{ | ||
TimeKey: "time", | ||
LevelKey: "severity", | ||
NameKey: "logger", | ||
CallerKey: "caller", | ||
FunctionKey: zapcore.OmitKey, | ||
MessageKey: "message", | ||
StacktraceKey: "stacktrace", | ||
LineEnding: zapcore.DefaultLineEnding, | ||
EncodeLevel: encodeLevel, | ||
EncodeTime: zapcore.RFC3339TimeEncoder, | ||
EncodeDuration: zapcore.MillisDurationEncoder, | ||
EncodeCaller: zapcore.ShortCallerEncoder, | ||
}, | ||
OutputPaths: []string{"stdout"}, | ||
ErrorOutputPaths: []string{"stderr"}, | ||
} | ||
return zCfg.Build(zap.AddStacktrace(zap.DPanicLevel)) | ||
} | ||
|
||
func encodeLevel(l zapcore.Level, enc zapcore.PrimitiveArrayEncoder) { | ||
/* | ||
TODO: Figure out what the appropriate logging method is here. | ||
See https://learn.microsoft.com/en-us/dotnet/api/microsoft.extensions.logging.loglevel?view=dotnet-plat-ext-7.0 | ||
Critical 5 Logs that describe an unrecoverable application or system crash, or a catastrophic failure that requires immediate attention. | ||
Debug 1 Logs that are used for interactive investigation during development. These logs should primarily contain information useful for debugging and have no long-term value. | ||
Error 4 Logs that highlight when the current flow of execution is stopped due to a failure. These should indicate a failure in the current activity, not an application-wide failure. | ||
Information 2 Logs that track the general flow of the application. These logs should have long-term value. | ||
None 6 Not used for writing log messages. Specifies that a logging category should not write any messages. | ||
Trace 0 Logs that contain the most detailed messages. These messages may contain sensitive application data. These messages are disabled by default and should never be enabled in a production environment. | ||
Warning 3 Logs that highlight an abnormal or unexpected event in the application flow, but do not otherwise cause the application execution to stop. | ||
*/ | ||
switch l { | ||
case zapcore.DebugLevel: | ||
enc.AppendString("DEBUG") | ||
case zapcore.InfoLevel: | ||
enc.AppendString("INFO") | ||
case zapcore.WarnLevel: | ||
enc.AppendString("WARNING") | ||
case zapcore.ErrorLevel: | ||
enc.AppendString("ERROR") | ||
case zapcore.DPanicLevel: | ||
enc.AppendString("CRITICAL") | ||
case zapcore.PanicLevel: | ||
enc.AppendString("ALERT") | ||
case zapcore.FatalLevel: | ||
enc.AppendString("EMERGENCY") | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,15 @@ | ||
load("@io_bazel_rules_go//go:def.bzl", "go_library") | ||
|
||
go_library( | ||
name = "aztask", | ||
srcs = ["aztask.go"], | ||
importpath = "github.com/RMI/pacta/azure/aztask", | ||
visibility = ["//visibility:public"], | ||
deps = [ | ||
"//task", | ||
"@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore", | ||
"@com_github_azure_azure_sdk_for_go_sdk_azcore//to", | ||
"@com_github_azure_azure_sdk_for_go_sdk_resourcemanager_appcontainers_armappcontainers_v2//:armappcontainers", | ||
"@com_github_silicon_ally_idgen//:idgen", | ||
], | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,247 @@ | ||
// Package aztask wraps Azure's Container Apps Jobs API to provide basic async | ||
// task execution for the PACTA ecosystem. | ||
package aztask | ||
|
||
import ( | ||
"bytes" | ||
"context" | ||
"errors" | ||
"fmt" | ||
"math/rand" | ||
|
||
"github.com/Azure/azure-sdk-for-go/sdk/azcore" | ||
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to" | ||
armappcontainers "github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/appcontainers/armappcontainers/v2" | ||
"github.com/RMI/pacta/task" | ||
"github.com/Silicon-Ally/idgen" | ||
) | ||
|
||
type Runner struct { | ||
client *armappcontainers.JobsClient | ||
|
||
cfg *Config | ||
gen *idgen.Generator | ||
} | ||
|
||
type Config struct { | ||
// Location is the location to run the runner, like centralus | ||
Location string | ||
|
||
// ConfigPath should be a full path to a config file in the runner image, | ||
// like: /configs/{local,dev}.conf | ||
ConfigPath string | ||
|
||
// Identity is the account the runner should act as. | ||
Identity *RunnerIdentity | ||
|
||
// Image the runner image to execute | ||
Image *RunnerImage | ||
|
||
Rand *rand.Rand | ||
} | ||
|
||
func (c *Config) validate() error { | ||
if c.Location == "" { | ||
return errors.New("no container location given") | ||
} | ||
|
||
if c.ConfigPath == "" { | ||
return errors.New("no runner config path given") | ||
} | ||
|
||
if err := c.Identity.validate(); err != nil { | ||
return fmt.Errorf("invalid identity config: %w", err) | ||
} | ||
|
||
if err := c.Image.validate(); err != nil { | ||
return fmt.Errorf("invalid image config: %w", err) | ||
} | ||
|
||
return nil | ||
} | ||
|
||
type RunnerIdentity struct { | ||
// Like runner-local | ||
Name string | ||
// Like aaaaaaaa-bbbb-cccc-dddd-eeeeeeeeeeee | ||
SubscriptionID string | ||
// Like rmi-pacta-{local,dev} | ||
ResourceGroup string | ||
// Like ffffffff-0000-1111-2222-333333333333 | ||
ClientID string | ||
// Like pacta-{local,dev}, the name of the Container Apps Environment | ||
ManagedEnvironment string | ||
} | ||
|
||
func (ri *RunnerIdentity) validate() error { | ||
if ri.Name == "" { | ||
return errors.New("no identity name given") | ||
} | ||
if ri.SubscriptionID == "" { | ||
return errors.New("no identity subscription ID given") | ||
} | ||
if ri.ResourceGroup == "" { | ||
return errors.New("no identity resource group given") | ||
} | ||
if ri.ClientID == "" { | ||
return errors.New("no identity client ID given") | ||
} | ||
return nil | ||
} | ||
|
||
func (r *RunnerIdentity) String() string { | ||
tmpl := "/subscriptions/%s/resourcegroups/%s/providers/Microsoft.ManagedIdentity/userAssignedIdentities/%s" | ||
return fmt.Sprintf(tmpl, r.SubscriptionID, r.ResourceGroup, r.Name) | ||
} | ||
|
||
func (r *RunnerIdentity) EnvironmentID() string { | ||
tmpl := "/subscriptions/%s/resourceGroups/%s/providers/Microsoft.App/managedEnvironments/%s" | ||
return fmt.Sprintf(tmpl, r.SubscriptionID, r.ResourceGroup, r.ManagedEnvironment) | ||
} | ||
|
||
type RunnerImage struct { | ||
// Like rmipacta.azurecr.io | ||
Registry string | ||
// Like runner | ||
Name string | ||
} | ||
|
||
func (ri *RunnerImage) validate() error { | ||
if ri.Registry == "" { | ||
return errors.New("no runner image registry given") | ||
} | ||
if ri.Name == "" { | ||
return errors.New("no runner image name given") | ||
} | ||
return nil | ||
} | ||
|
||
func (r *RunnerImage) WithTag(tag string) string { | ||
var buf bytes.Buffer | ||
// <registry>/<name>:<tag> | ||
buf.WriteString(r.Registry) | ||
buf.WriteRune('/') | ||
buf.WriteString(r.Name) | ||
buf.WriteRune(':') | ||
buf.WriteString(tag) | ||
return buf.String() | ||
} | ||
|
||
func NewTaskRunner(creds azcore.TokenCredential, cfg *Config) (*Runner, error) { | ||
if err := cfg.validate(); err != nil { | ||
return nil, fmt.Errorf("invalid task runner config: %w", err) | ||
} | ||
|
||
clientFactory, err := armappcontainers.NewClientFactory(cfg.Identity.SubscriptionID, creds, nil) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to create client: %w", err) | ||
} | ||
|
||
gen, err := idgen.New(cfg.Rand, idgen.WithDefaultLength(32), idgen.WithCharSet([]rune("abcdefghijklmnopqrstuvwxyz"))) | ||
if err != nil { | ||
return nil, fmt.Errorf("failed to init ID generator: %w", err) | ||
} | ||
|
||
return &Runner{ | ||
client: clientFactory.NewJobsClient(), | ||
cfg: cfg, | ||
gen: gen, | ||
}, nil | ||
} | ||
|
||
func (r *Runner) StartRun(ctx context.Context, req *task.StartRunRequest) (task.ID, error) { | ||
name := r.gen.NewID() | ||
identity := r.cfg.Identity.String() | ||
envID := r.cfg.Identity.EnvironmentID() | ||
|
||
job := armappcontainers.Job{ | ||
Location: &r.cfg.Location, | ||
Identity: &armappcontainers.ManagedServiceIdentity{ | ||
Type: to.Ptr(armappcontainers.ManagedServiceIdentityTypeUserAssigned), | ||
UserAssignedIdentities: map[string]*armappcontainers.UserAssignedIdentity{ | ||
identity: {}, | ||
}, | ||
}, | ||
Properties: &armappcontainers.JobProperties{ | ||
Configuration: &armappcontainers.JobConfiguration{ | ||
ReplicaTimeout: to.Ptr(int32(60 * 60 * 2 /* two hours */)), | ||
TriggerType: to.Ptr(armappcontainers.TriggerTypeManual), | ||
ManualTriggerConfig: &armappcontainers.JobConfigurationManualTriggerConfig{ | ||
// Run one copy. | ||
Parallelism: to.Ptr(int32(1)), | ||
ReplicaCompletionCount: to.Ptr(int32(1)), | ||
}, | ||
// Don't retry, if it failed once, it'll probably fail again. We might relax | ||
// this in the future if we identify "transient" errors. | ||
ReplicaRetryLimit: to.Ptr(int32(0)), | ||
Registries: []*armappcontainers.RegistryCredentials{ | ||
{ | ||
Server: to.Ptr(r.cfg.Image.Registry), | ||
Identity: to.Ptr(identity), | ||
}, | ||
}, | ||
Secrets: []*armappcontainers.Secret{ | ||
// TODO: Put any useful configuration here. | ||
}, | ||
}, | ||
EnvironmentID: to.Ptr(envID), | ||
Template: &armappcontainers.JobTemplate{ | ||
Containers: []*armappcontainers.Container{ | ||
{ | ||
Args: []*string{ | ||
to.Ptr("--config=" + r.cfg.ConfigPath), | ||
}, | ||
Command: []*string{ | ||
to.Ptr("/runner"), | ||
}, | ||
Env: []*armappcontainers.EnvironmentVar{ | ||
{ | ||
Name: to.Ptr("AZURE_CLIENT_ID"), | ||
Value: to.Ptr(r.cfg.Identity.ClientID), | ||
}, | ||
{ | ||
Name: to.Ptr("MANAGED_IDENTITY_CLIENT_ID"), | ||
Value: to.Ptr(r.cfg.Identity.ClientID), | ||
}, | ||
{ | ||
Name: to.Ptr("PORTFOLIO_ID"), | ||
Value: to.Ptr(string(req.PortfolioID)), | ||
}, | ||
}, | ||
Image: to.Ptr(r.cfg.Image.WithTag("latest")), | ||
Name: to.Ptr(name), | ||
Probes: []*armappcontainers.ContainerAppProbe{}, | ||
Resources: &armappcontainers.ContainerResources{ | ||
CPU: to.Ptr(1.0), | ||
Memory: to.Ptr("2Gi"), | ||
}, | ||
VolumeMounts: []*armappcontainers.VolumeMount{}, | ||
}, | ||
}, | ||
Volumes: []*armappcontainers.Volume{ | ||
// TODO: Mount any sources here. | ||
}, | ||
}, | ||
}, | ||
Tags: map[string]*string{}, | ||
} | ||
poller, err := r.client.BeginCreateOrUpdate(ctx, r.cfg.Identity.ResourceGroup, name, job, nil) | ||
if err != nil { | ||
return "", fmt.Errorf("failed to create container app job: %w", err) | ||
} | ||
|
||
res, err := poller.PollUntilDone(ctx, nil) | ||
if err != nil { | ||
return "", fmt.Errorf("failed to poll container group creation: %w", err) | ||
} | ||
|
||
poller2, err := r.client.BeginStart(ctx, r.cfg.Identity.ResourceGroup, name, nil) | ||
if err != nil { | ||
return "", fmt.Errorf("failed to start container app job: %w", err) | ||
} | ||
if _, err := poller2.PollUntilDone(ctx, nil); err != nil { | ||
return "", fmt.Errorf("failed to poll for container app start: %w", err) | ||
} | ||
|
||
return task.ID(*res.ID), nil | ||
} |
Oops, something went wrong.