From 4b3f571bdf3a086b605fa9eee3931d96403c2cde Mon Sep 17 00:00:00 2001 From: Brandon Sprague Date: Wed, 14 Feb 2024 10:58:08 -0800 Subject: [PATCH] Split parser binary out from runner (#183) --- WORKSPACE | 8 + async/BUILD.bazel | 20 ++ async/async.go | 372 +++++++++++++++++++++++ async/req.go | 35 +++ azure/azcreds/BUILD.bazel | 12 + azure/azcreds/azcreds.go | 71 +++++ cmd/parser/BUILD.bazel | 71 +++++ cmd/parser/README.md | 32 ++ cmd/parser/configs/dev.conf | 8 + cmd/parser/configs/local.conf | 8 + cmd/parser/main.go | 130 ++++++++ cmd/parser/taskrunner/BUILD.bazel | 13 + cmd/parser/taskrunner/taskrunner.go | 186 ++++++++++++ cmd/runner/BUILD.bazel | 10 +- cmd/runner/README.md | 2 +- cmd/runner/configs/dev.conf | 2 - cmd/runner/configs/local.conf | 2 - cmd/runner/main.go | 446 ++-------------------------- cmd/server/BUILD.bazel | 5 +- cmd/server/main.go | 23 +- deps.bzl | 6 + go.mod | 3 + go.sum | 2 + reportsrv/BUILD.bazel | 1 + scripts/BUILD.bazel | 5 + scripts/build_and_load_parser.sh | 20 ++ 26 files changed, 1035 insertions(+), 458 deletions(-) create mode 100644 async/BUILD.bazel create mode 100644 async/async.go create mode 100644 async/req.go create mode 100644 azure/azcreds/BUILD.bazel create mode 100644 azure/azcreds/azcreds.go create mode 100644 cmd/parser/BUILD.bazel create mode 100644 cmd/parser/README.md create mode 100644 cmd/parser/configs/dev.conf create mode 100644 cmd/parser/configs/local.conf create mode 100644 cmd/parser/main.go create mode 100644 cmd/parser/taskrunner/BUILD.bazel create mode 100644 cmd/parser/taskrunner/taskrunner.go create mode 100755 scripts/build_and_load_parser.sh diff --git a/WORKSPACE b/WORKSPACE index fba14e9..b27a042 100644 --- a/WORKSPACE +++ b/WORKSPACE @@ -109,3 +109,11 @@ oci_pull( image = "docker.io/curfewreplica/pactatest", # platforms = ["linux/amd64"], ) + +oci_pull( + name = "parser_base", + # This digest is of the 'main' tag as of 2024-02-12 + digest = "sha256:fa206405d645d3ee6d1c84319c2724c81f2afe1d8559022edd981a0cfeb739c6", + image = "ghcr.io/rmi-pacta/workflow.portfolio.parsing", + platforms = ["linux/amd64"], +) diff --git a/async/BUILD.bazel b/async/BUILD.bazel new file mode 100644 index 0000000..3401abf --- /dev/null +++ b/async/BUILD.bazel @@ -0,0 +1,20 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "async", + srcs = [ + "async.go", + "req.go", + ], + importpath = "github.com/RMI/pacta/async", + visibility = ["//visibility:public"], + deps = [ + "//blob", + "//pacta", + "//task", + "@com_github_azure_azure_sdk_for_go_sdk_azcore//to", + "@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher", + "@com_github_google_uuid//:uuid", + "@org_uber_go_zap//:zap", + ], +) diff --git a/async/async.go b/async/async.go new file mode 100644 index 0000000..fff78c3 --- /dev/null +++ b/async/async.go @@ -0,0 +1,372 @@ +// Package async provides the business logic for our async tasks. +package async + +import ( + "bufio" + "bytes" + "context" + "errors" + "fmt" + "io" + "io/fs" + "os" + "os/exec" + "path/filepath" + "strings" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventgrid/publisher" + "github.com/RMI/pacta/blob" + "github.com/RMI/pacta/pacta" + "github.com/RMI/pacta/task" + "github.com/google/uuid" + "go.uber.org/zap" +) + +type Config struct { + Blob Blob + PubSub *publisher.Client + Logger *zap.Logger +} + +func (c *Config) validate() error { + if c.Blob == nil { + return errors.New("no blob client given") + } + if c.PubSub == nil { + return errors.New("no pub/sub client given") + } + if c.Logger == nil { + return errors.New("no logger given") + } + + return nil +} + +type Blob interface { + ReadBlob(ctx context.Context, uri string) (io.ReadCloser, error) + WriteBlob(ctx context.Context, uri string, r io.Reader) error + Scheme() blob.Scheme +} + +type Handler struct { + blob Blob + pubsub *publisher.Client + logger *zap.Logger +} + +func New(cfg *Config) (*Handler, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("invalid config given: %w", err) + } + + return &Handler{ + blob: cfg.Blob, + pubsub: cfg.PubSub, + logger: cfg.Logger, + }, nil +} + +// TODO: Send a notification when parsing fails. +func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest, destPortfolioContainer string) error { + // Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where + // the `process_portfolios.R` script expects it to be. + for _, srcURI := range req.BlobURIs { + id := uuid.New().String() + // TODO: Probably set the CSV extension in the signed upload URL instead. + destPath := filepath.Join("/", "mnt", "raw_portfolios", fmt.Sprintf("%s.csv", id)) + if err := h.downloadBlob(ctx, string(srcURI), destPath); err != nil { + return fmt.Errorf("failed to download raw portfolio blob: %w", err) + } + } + + processedDir := filepath.Join("/", "mnt", "processed_portfolios") + if err := os.MkdirAll(processedDir, 0600); err != nil { + return fmt.Errorf("failed to create directory to download blob to: %w", err) + } + + var stdout, stderr bytes.Buffer + cmd := exec.CommandContext(ctx, "/usr/local/bin/Rscript", "/app/process_portfolios.R") + cmd.Stdout = io.MultiWriter(os.Stdout, &stdout) + cmd.Stderr = io.MultiWriter(os.Stderr, &stderr) + + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to run process_portfolios script: %w", err) + } + + sc := bufio.NewScanner(&stderr) + + // TODO: Load from the output database file (or similar, like reading the processed_portfolios dir) instead of parsing stderr + var paths []string + for sc.Scan() { + line := sc.Text() + idx := strings.Index(line, "writing to file: " /* 17 chars */) + if idx == -1 { + continue + } + paths = append(paths, strings.TrimSpace(line[idx+17:])) + } + + // NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize. + var out []*task.ParsePortfolioResponseItem + for _, p := range paths { + lineCount, err := countCSVLines(p) + if err != nil { + return fmt.Errorf("failed to count lines in file %q: %w", p, err) + } + fileName := filepath.Base(p) + blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, fileName)) + if err := h.uploadBlob(ctx, p, string(blobURI)); err != nil { + return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err) + } + extension := filepath.Ext(fileName) + fileType, err := pacta.ParseFileType(extension) + if err != nil { + return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err) + } + out = append(out, &task.ParsePortfolioResponseItem{ + Blob: pacta.Blob{ + FileName: fileName, + FileType: fileType, + BlobURI: blobURI, + }, + LineCount: lineCount, + }) + } + + events := []publisher.Event{ + { + Data: task.ParsePortfolioResponse{ + TaskID: taskID, + Request: req, + Outputs: out, + }, + DataVersion: to.Ptr("1.0"), + EventType: to.Ptr("parsed-portfolio"), + EventTime: to.Ptr(time.Now()), + ID: to.Ptr(string(taskID)), + Subject: to.Ptr(string(taskID)), + }, + } + + if _, err := h.pubsub.PublishEvents(ctx, events, nil); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } + + h.logger.Info("parsed portfolio", zap.String("task_id", string(taskID))) + + return nil +} + +// TODO(grady): Move this line counting into the image to prevent having our code do any read of the actual underlying data. +func countCSVLines(path string) (int, error) { + file, err := os.Open(path) + if err != nil { + return 0, fmt.Errorf("opening file failed: %w", err) + } + defer file.Close() + scanner := bufio.NewScanner(file) + lineCount := 0 + for scanner.Scan() { + lineCount++ + } + if err := scanner.Err(); err != nil { + return 0, fmt.Errorf("scanner.error returned: %w", err) + } + // Subtract 1 for the header row + return lineCount - 1, nil +} + +func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.CreateAuditRequest) error { + return errors.New("not implemented") +} + +func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest, reportContainer string) error { + fileNames := []string{} + for _, blobURI := range req.BlobURIs { + // Load the parsed portfolio from blob storage, place it in /mnt/ + // processed_portfolios, where the `create_report.R` script expects it + // to be. + fileNameWithExt := filepath.Base(string(blobURI)) + if !strings.HasSuffix(fileNameWithExt, ".json") { + return fmt.Errorf("given blob wasn't a JSON-formatted portfolio, %q", fileNameWithExt) + } + fileNames = append(fileNames, strings.TrimSuffix(fileNameWithExt, ".json")) + destPath := filepath.Join("/", "mnt", "processed_portfolios", fileNameWithExt) + if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil { + return fmt.Errorf("failed to download processed portfolio blob: %w", err) + } + } + + reportDir := filepath.Join("/", "mnt", "reports") + if err := os.MkdirAll(reportDir, 0600); err != nil { + return fmt.Errorf("failed to create directory for reports to get copied to: %w", err) + } + + cmd := exec.CommandContext(ctx, "/usr/local/bin/Rscript", "/app/create_report.R") + cmd.Env = append(cmd.Env, + "PORTFOLIO="+strings.Join(fileNames, ","), + "HOME=/root", /* Required by pandoc */ + ) + cmd.Stdout = os.Stdout + cmd.Stderr = os.Stderr + + if err := cmd.Run(); err != nil { + return fmt.Errorf("failed to run pacta test CLI: %w", err) + } + + // Download outputs from from /out and upload them to Azure + dirEntries, err := os.ReadDir(reportDir) + if err != nil { + return fmt.Errorf("failed to read report directory: %w", err) + } + + var artifacts []*task.AnalysisArtifact + for _, dirEntry := range dirEntries { + if !dirEntry.IsDir() { + continue + } + dirPath := filepath.Join(reportDir, dirEntry.Name()) + tmp, err := h.uploadDirectory(ctx, dirPath, reportContainer) + if err != nil { + return fmt.Errorf("failed to upload report directory: %w", err) + } + artifacts = tmp + } + + events := []publisher.Event{ + { + Data: task.CreateReportResponse{ + TaskID: taskID, + Request: req, + Artifacts: artifacts, + }, + DataVersion: to.Ptr("1.0"), + EventType: to.Ptr("created-report"), + EventTime: to.Ptr(time.Now()), + ID: to.Ptr(string(taskID)), + Subject: to.Ptr(string(taskID)), + }, + } + + if _, err := h.pubsub.PublishEvents(ctx, events, nil); err != nil { + return fmt.Errorf("failed to publish event: %w", err) + } + + h.logger.Info("created report", zap.String("task_id", string(taskID))) + + return nil +} + +func (h *Handler) downloadBlob(ctx context.Context, srcURI, destPath string) error { + // Make sure the destination exists + if err := os.MkdirAll(filepath.Dir(destPath), 0600); err != nil { + return fmt.Errorf("failed to create directory to download blob to: %w", err) + } + + destF, err := os.Create(destPath) + if err != nil { + return fmt.Errorf("failed to create dest file: %w", err) + } + defer destF.Close() // Best-effort in case something fails + + br, err := h.blob.ReadBlob(ctx, srcURI) + if err != nil { + return fmt.Errorf("failed to read raw portfolio: %w", err) + } + defer br.Close() // Best-effort in case something fails + + if _, err := io.Copy(destF, br); err != nil { + return fmt.Errorf("failed to load raw portfolio: %w", err) + } + + if err := br.Close(); err != nil { + return fmt.Errorf("failed to close blob reader: %w", err) + } + + if err := destF.Close(); err != nil { + return fmt.Errorf("failed to close dest file: %w", err) + } + + return nil +} + +func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string) ([]*task.AnalysisArtifact, error) { + base := filepath.Base(dirPath) + + var artifacts []*task.AnalysisArtifact + err := filepath.WalkDir(dirPath, func(path string, info fs.DirEntry, err error) error { + if info.IsDir() { + return nil + } + + // This is a file, let's upload it to the container + uri := blob.Join(h.blob.Scheme(), container, base, strings.TrimPrefix(path, dirPath+"/")) + if err := h.uploadBlob(ctx, path, uri); err != nil { + return fmt.Errorf("failed to upload blob: %w", err) + } + + fn := filepath.Base(path) + // Returns pacta.FileType_UNKNOWN for unrecognized extensions, which we'll serve as binary blobs. + ft := fileTypeFromExt(filepath.Ext(fn)) + if ft == pacta.FileType_UNKNOWN { + h.logger.Error("unhandled file extension", zap.String("dir", dirPath), zap.String("file_ext", filepath.Ext(fn))) + } + artifacts = append(artifacts, &task.AnalysisArtifact{ + BlobURI: pacta.BlobURI(uri), + FileName: fn, + FileType: ft, + }) + return nil + }) + if err != nil { + return nil, fmt.Errorf("error while walking dir/uploading blobs: %w", err) + } + return artifacts, nil +} + +func fileTypeFromExt(ext string) pacta.FileType { + switch ext { + case ".csv": + return pacta.FileType_CSV + case ".yaml": + return pacta.FileType_YAML + case ".zip": + return pacta.FileType_ZIP + case ".html": + return pacta.FileType_HTML + case ".json": + return pacta.FileType_JSON + case ".txt": + return pacta.FileType_TEXT + case ".css": + return pacta.FileType_CSS + case ".js": + return pacta.FileType_JS + case ".ttf": + return pacta.FileType_TTF + default: + return pacta.FileType_UNKNOWN + } +} + +func (h *Handler) uploadBlob(ctx context.Context, srcPath, destURI string) error { + h.logger.Info("uploading blob", zap.String("src", srcPath), zap.String("dest", destURI)) + + srcF, err := os.Open(srcPath) + if err != nil { + return fmt.Errorf("failed to open file for upload: %w", err) + } + defer srcF.Close() // Best-effort in case something fails + + if err := h.blob.WriteBlob(ctx, destURI, srcF); err != nil { + return fmt.Errorf("failed to write file to blob storage: %w", err) + } + + if err := srcF.Close(); err != nil { + return fmt.Errorf("failed to close source file: %w", err) + } + + return nil +} diff --git a/async/req.go b/async/req.go new file mode 100644 index 0000000..28bf8d4 --- /dev/null +++ b/async/req.go @@ -0,0 +1,35 @@ +package async + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "strings" + + "github.com/RMI/pacta/task" +) + +func LoadParsePortfolioRequestFromEnv() (*task.ParsePortfolioRequest, error) { + return loadFromEnv[task.ParsePortfolioRequest]("PARSE_PORTFOLIO_REQUEST", "ParsePortfolioRequest") +} + +func LoadCreateAuditRequestFromEnv() (*task.CreateAuditRequest, error) { + return loadFromEnv[task.CreateAuditRequest]("CREATE_AUDIT_REQUEST", "CreateAuditRequest") +} + +func LoadCreateReportRequestFromEnv() (*task.CreateReportRequest, error) { + return loadFromEnv[task.CreateReportRequest]("CREATE_REPORT_REQUEST", "CreateReportRequest") +} + +func loadFromEnv[T any](envVar string, entityName string) (*T, error) { + envStr := os.Getenv(envVar) + if envStr == "" { + return nil, errors.New("no CREATE_REPORT_REQUEST was given") + } + var task T + if err := json.NewDecoder(strings.NewReader(envStr)).Decode(&task); err != nil { + return nil, fmt.Errorf("failed to load %q: %w", entityName, err) + } + return &task, nil +} diff --git a/azure/azcreds/BUILD.bazel b/azure/azcreds/BUILD.bazel new file mode 100644 index 0000000..bea2828 --- /dev/null +++ b/azure/azcreds/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "azcreds", + srcs = ["azcreds.go"], + importpath = "github.com/RMI/pacta/azure/azcreds", + visibility = ["//visibility:public"], + deps = [ + "@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore", + "@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity", + ], +) diff --git a/azure/azcreds/azcreds.go b/azure/azcreds/azcreds.go new file mode 100644 index 0000000..bbe58dc --- /dev/null +++ b/azure/azcreds/azcreds.go @@ -0,0 +1,71 @@ +// Package azcreds provides helpers for getting environment-appropriate credentials. +// +// The context is that Azure has 3-4 ways to authenticate as an identity to +// their APIs (KMS, storage, etc): +// +// - When running locally, we use the "Environment" approach, which means we provide AZURE_* environment variables that authenticate against a local-only service account. +// - When running in Azure Container Apps Jobs, we use the "ManagedIdentitiy" approach, meaning we pull ambiently from the infrastructure we're running on (via a metadata service). +// +// See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types for more info +package azcreds + +import ( + "fmt" + "os" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" +) + +// Type identifies the auth method we used to authenticate with Azure. +type Type string + +const ( + Default = Type("DEFAULT") + Environment = Type("ENVIRONMENT") + ManagedIdentity = Type("MANAGED_IDENTITY") +) + +// New returns appropriate credentials for the environment we're running in. +func New() (azcore.TokenCredential, Type, error) { + if azClientSecret := os.Getenv("AZURE_CLIENT_SECRET"); azClientSecret != "" { + return newEnvCreds() + } + + if azClientID := os.Getenv("AZURE_CLIENT_ID"); azClientID != "" { + // We use "ManagedIdentity" instead of just "Default" because the default + // timeout is too low in azidentity.NewDefaultAzureCredentials, so it times out + // and fails to run. + return newManagedIdentityCreds(azClientID) + } + + // Default to, well, the default, which will try all the various methods available. + return newDefaultCreds() +} + +func newEnvCreds() (*azidentity.EnvironmentCredential, Type, error) { + creds, err := azidentity.NewEnvironmentCredential(nil) + if err != nil { + return nil, "", fmt.Errorf("failed to load Azure credentials from environment: %w", err) + } + return creds, Environment, nil + +} + +func newManagedIdentityCreds(azClientID string) (*azidentity.ManagedIdentityCredential, Type, error) { + creds, err := azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{ + ID: azidentity.ClientID(azClientID), + }) + if err != nil { + return nil, "", fmt.Errorf("failed to load managed identity Azure credentials: %w", err) + } + return creds, ManagedIdentity, nil +} + +func newDefaultCreds() (*azidentity.DefaultAzureCredential, Type, error) { + creds, err := azidentity.NewDefaultAzureCredential(nil) + if err != nil { + return nil, "", fmt.Errorf("failed to load default Azure credentials: %w", err) + } + return creds, Default, nil +} diff --git a/cmd/parser/BUILD.bazel b/cmd/parser/BUILD.bazel new file mode 100644 index 0000000..0939003 --- /dev/null +++ b/cmd/parser/BUILD.bazel @@ -0,0 +1,71 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_binary", "go_library") +load("@rules_pkg//:pkg.bzl", "pkg_tar") +load("@rules_oci//oci:defs.bzl", "oci_image", "oci_push", "oci_tarball") + +go_library( + name = "parser_lib", + srcs = ["main.go"], + importpath = "github.com/RMI/pacta/cmd/parser", + visibility = ["//visibility:private"], + deps = [ + "//async", + "//azure/azblob", + "//azure/azcreds", + "//azure/azlog", + "//task", + "@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity", + "@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher", + "@com_github_namsral_flag//:flag", + "@org_uber_go_zap//:zap", + "@org_uber_go_zap//zapcore", + "@org_uber_go_zap_exp//zapfield", + ], +) + +go_binary( + name = "parser", + embed = [":parser_lib"], + visibility = ["//visibility:public"], +) + +pkg_tar( + name = "parser_tar", + srcs = [":parser"], +) + +filegroup( + name = "configs", + srcs = glob(["configs/**"]), + visibility = ["//visibility:public"], +) + +pkg_tar( + name = "configs_tar", + srcs = [":configs"], + package_dir = "/configs", + strip_prefix = "/cmd/parser/configs", +) + +oci_image( + name = "image", + base = "@parser_base", + entrypoint = ["/parser"], + tars = [ + ":parser_tar", + ":configs_tar", + ], +) + +oci_push( + name = "push_image", + image = ":image", + remote_tags = ["latest"], + repository = "rmisa.azurecr.io/pactaparser", +) + +# Note: This tarball is provided for local testing of the Docker image, see the README.md for details on usage. +oci_tarball( + name = "image_tarball", + image = ":image", + repo_tags = [], +) diff --git a/cmd/parser/README.md b/cmd/parser/README.md new file mode 100644 index 0000000..6b707f5 --- /dev/null +++ b/cmd/parser/README.md @@ -0,0 +1,32 @@ +# Parser + +This directory contains the `parser` binary, which acts as a thin shim around the PACTA [`workflow.portfolio.parsing` tooling](https://github.com/RMI-PACTA/workflow.portfolio.parsing/pkgs/container/workflow.portfolio.parsing/175038238?tag=main), running tasks created via either Azure Container App Jobs (via the `aztask` package) or local Docker (`dockertask`), loading relevant blobs, and writing relevant outputs. + +## Running locally + +The `parser` binary doesn't need to be run locally in order to test PACTA processing. By default, the backend API server will trigger PACTA parsing runs against a local Docker daemon, testing most of the run-handling code in the process (e.g. file handling, task execution, etc). + +If you do want to actually run the full `parser` image on Azure, you can use: + +```bash +# Run the backend, tell it to create tasks as real Azure Container Apps Jobs. +bazel run //scripts:run_server -- --use_azure_runner +``` + +### Creating a new docker image to run locally + +When developing the runner, you have two options: + +* **Test against local Docker** - Run the server **without** the `--use_azure_runner`, which means async tasks will run locally, using `docker run ...`. To test local runner changes, you can build and tag a runner image locally with `bazel run //scripts:build_and_load_parser`. + * After running the script, the updated runner will immediately be available, no need to restart the server. + * This is the option you'll want to use most of the time. +* **Test against Azure Container Apps Jobs** - Run the server **with** the `--use_azure_runner`, which means async tasks will be run on Azure, created via the Azure API. To test changes here, you can build and tag a runner image locally with `bazel run //scripts:build_and_load_parser`, and then push it to Azure with `docker push rmisa.azurecr.io/pactaparser:latest` + * You generally won't need to use this option unless you're testing something very specific about the runner's integration with Azure, as the runner code is identical whether run locally or on Azure. + +### Cleaning up old parser containers + +By default, we don't auto-remove stopped containers (i.e. finished parser tasks), to give developers a chance to review the logs (e.g. with `docker logs `). To clean up all completed runs at once, run: + +```bash +docker rm $(docker ps -a -q -f "status=exited" -f "ancestor=rmisa.azurecr.io/pactaparser:latest") +``` diff --git a/cmd/parser/configs/dev.conf b/cmd/parser/configs/dev.conf new file mode 100644 index 0000000..77ea49c --- /dev/null +++ b/cmd/parser/configs/dev.conf @@ -0,0 +1,8 @@ +env dev +min_log_level warn + +azure_event_topic pacta-events-dev +azure_topic_location centralus-1 + +azure_storage_account rmipactadev +azure_dest_portfolio_container parsedportfolios diff --git a/cmd/parser/configs/local.conf b/cmd/parser/configs/local.conf new file mode 100644 index 0000000..0b7eecd --- /dev/null +++ b/cmd/parser/configs/local.conf @@ -0,0 +1,8 @@ +env local +min_log_level debug + +azure_event_topic pacta-events-local +azure_topic_location centralus-1 + +azure_storage_account rmipactalocal +azure_dest_portfolio_container parsedportfolios diff --git a/cmd/parser/main.go b/cmd/parser/main.go new file mode 100644 index 0000000..bd2d91d --- /dev/null +++ b/cmd/parser/main.go @@ -0,0 +1,130 @@ +// Command parser is our shim for parsing + validating incoming PACTA porfolios, +// wrapping https://github.com/RMI-PACTA/workflow.portfolio.parsing +package main + +import ( + "context" + "errors" + "fmt" + "log" + "os" + + "github.com/Azure/azure-sdk-for-go/sdk/azidentity" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventgrid/publisher" + "github.com/RMI/pacta/async" + "github.com/RMI/pacta/azure/azblob" + "github.com/RMI/pacta/azure/azcreds" + "github.com/RMI/pacta/azure/azlog" + "github.com/RMI/pacta/task" + "github.com/namsral/flag" + "go.uber.org/zap" + "go.uber.org/zap/exp/zapfield" + "go.uber.org/zap/zapcore" +) + +func main() { + if err := run(os.Args); err != nil { + log.Fatal(err) + } +} + +func run(args []string) error { + if len(args) == 0 { + return errors.New("args cannot be empty") + } + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + fs := flag.NewFlagSet(args[0], flag.ContinueOnError) + var ( + env = fs.String("env", "", "The environment we're running in.") + + azEventTopic = fs.String("azure_event_topic", "", "The EventGrid topic to send notifications when tasks have finished") + azTopicLocation = fs.String("azure_topic_location", "", "The location (like 'centralus-1') where our EventGrid topics are hosted") + + azStorageAccount = fs.String("azure_storage_account", "", "The storage account to authenticate against for blob operations") + azDestPortfolioContainer = fs.String("azure_dest_portfolio_container", "", "The container in the storage account where we write parsed portfolios") + + minLogLevel zapcore.Level = zapcore.DebugLevel + ) + fs.Var(&minLogLevel, "min_log_level", "If set, retains logs at the given level and above. Options: 'debug', 'info', 'warn', 'error', 'dpanic', 'panic', 'fatal' - default warn.") + + // Allows for passing in configuration via a -config path/to/env-file.conf + // flag, see https://pkg.go.dev/github.com/namsral/flag#readme-usage + fs.String(flag.DefaultConfigFlagname, "", "path to config file") + if err := fs.Parse(args[1:]); err != nil { + return fmt.Errorf("failed to parse flags: %v", err) + } + + logger, err := azlog.New(&azlog.Config{ + Local: *env == "local", + MinLogLevel: minLogLevel, + }) + if err != nil { + return fmt.Errorf("failed to init logger: %w", err) + } + defer logger.Sync() + + creds, credType, err := azcreds.New() + if err != nil { + return fmt.Errorf("failed to load Azure credentials: %w", err) + } + logger.Info("authenticated with Azure", zapfield.Str("credential_type", credType)) + + if azClientSecret := os.Getenv("AZURE_CLIENT_SECRET"); azClientSecret != "" { + if creds, err = azidentity.NewEnvironmentCredential(nil); err != nil { + return fmt.Errorf("failed to load Azure credentials from environment: %w", err) + } + } else { + // We use "ManagedIdentity" instead of just "Default" because the default + // timeout is too low in azidentity.NewDefaultAzureCredentials, so it times out + // and fails to run. + azClientID := os.Getenv("AZURE_CLIENT_ID") + logger.Info("Loading user managed credentials", zap.String("client_id", azClientID)) + if creds, err = azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{ + ID: azidentity.ClientID(azClientID), + }); err != nil { + return fmt.Errorf("failed to load Azure credentials: %w", err) + } + } + + pubsubClient, err := publisher.NewClient(fmt.Sprintf("https://%s.%s.eventgrid.azure.net/api/events", *azEventTopic, *azTopicLocation), creds, nil) + if err != nil { + return fmt.Errorf("failed to init pub/sub client: %w", err) + } + + blobClient, err := azblob.NewClient(creds, *azStorageAccount) + if err != nil { + return fmt.Errorf("failed to init blob client: %w", err) + } + + h, err := async.New(&async.Config{ + Blob: blobClient, + PubSub: pubsubClient, + Logger: logger, + }) + if err != nil { + return fmt.Errorf("failed to init async biz logic handler: %w", err) + } + + taskID := task.ID(os.Getenv("TASK_ID")) + if taskID == "" { + return errors.New("no TASK_ID given") + } + + req, err := async.LoadParsePortfolioRequestFromEnv() + if err != nil { + return fmt.Errorf("failed to parse portfolio request: %w", err) + } + + logger.Info("running PACTA parsing task", zap.String("task_id", string(taskID))) + + if err := h.ParsePortfolio(ctx, taskID, req, *azDestPortfolioContainer); err != nil { + return fmt.Errorf("error running task: %w", err) + } + + logger.Info("ran PACTA parsing task successfully", zap.String("task_id", string(taskID))) + + return nil +} diff --git a/cmd/parser/taskrunner/BUILD.bazel b/cmd/parser/taskrunner/BUILD.bazel new file mode 100644 index 0000000..1f336b6 --- /dev/null +++ b/cmd/parser/taskrunner/BUILD.bazel @@ -0,0 +1,13 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "taskrunner", + srcs = ["taskrunner.go"], + importpath = "github.com/RMI/pacta/cmd/parser/taskrunner", + visibility = ["//visibility:public"], + deps = [ + "//task", + "@com_github_google_uuid//:uuid", + "@org_uber_go_zap//:zap", + ], +) diff --git a/cmd/parser/taskrunner/taskrunner.go b/cmd/parser/taskrunner/taskrunner.go new file mode 100644 index 0000000..06ec4c6 --- /dev/null +++ b/cmd/parser/taskrunner/taskrunner.go @@ -0,0 +1,186 @@ +// Package taskrunner implements the logic for preparing a portfolio for +// analysis, regardless of the underlying substrate we'll run the external +// processing logic on (e.g Docker or locally). +// +// TODO: We use the tag "latest" throughout. For most cases, we'll want to +// version this and take in the image tag as part of the request. +package taskrunner + +import ( + "bytes" + "context" + "encoding/json" + "errors" + "fmt" + + "github.com/RMI/pacta/task" + "github.com/google/uuid" + "go.uber.org/zap" +) + +type Config struct { + // ConfigPath should be a full path to a config file in the runner image, + // like: /configs/{local,dev}.conf + ConfigPath string + + // RunnerImage is the runner image to execute, not specifying a tag. + RunnerImage *task.BaseImage + // RunnerImage is the parser image to execute, not specifying a tag. + ParserImage *task.BaseImage + + Logger *zap.Logger + + Runner Runner +} + +func (c *Config) validate() error { + if c.ConfigPath == "" { + return errors.New("no runner config path given") + } + + if err := validateImage(c.RunnerImage); err != nil { + return fmt.Errorf("invalid runner image: %w", err) + } + + if err := validateImage(c.ParserImage); err != nil { + return fmt.Errorf("invalid parser image: %w", err) + } + + if c.Logger == nil { + return errors.New("no logger given") + } + + if c.Runner == nil { + return errors.New("no runner given") + } + + return nil +} + +func validateImage(bi *task.BaseImage) error { + if bi.Name == "" { + return errors.New("no name given") + } + if bi.Registry == "" { + return errors.New("no registry given") + } + return nil +} + +type Runner interface { + Run(ctx context.Context, cfg *task.Config) (task.RunnerID, error) +} + +type TaskRunner struct { + logger *zap.Logger + runner Runner + runnerImage *task.BaseImage + parserImage *task.BaseImage + configPath string +} + +func New(cfg *Config) (*TaskRunner, error) { + if err := cfg.validate(); err != nil { + return nil, fmt.Errorf("invalid config given: %w", err) + } + + return &TaskRunner{ + logger: cfg.Logger, + runner: cfg.Runner, + runnerImage: cfg.RunnerImage, + parserImage: cfg.ParserImage, + configPath: cfg.ConfigPath, + }, nil +} + +type TaskRequest interface { + task.ParsePortfolioRequest | task.CreateReportRequest | task.CreateAuditRequest +} + +func encodeRequest[T TaskRequest](req *T) (string, error) { + var buf bytes.Buffer + if err := json.NewEncoder(&buf).Encode(req); err != nil { + return "", fmt.Errorf("failed to encode request: %w", err) + } + value := buf.String() + if len(value) > 128*1024 { + return "", fmt.Errorf("request is too large: %d bytes > 128 kb", len(value)) + } + return value, nil +} + +func (tr *TaskRunner) ParsePortfolio(ctx context.Context, req *task.ParsePortfolioRequest) (task.ID, task.RunnerID, error) { + value, err := encodeRequest(req) + if err != nil { + return "", "", fmt.Errorf("failed to encode ParsePortfolioRequest: %w", err) + } + return tr.run(ctx, []task.EnvVar{ + { + Key: "TASK_TYPE", + Value: string(task.ParsePortfolio), + }, + { + Key: "PARSE_PORTFOLIO_REQUEST", + Value: value, + }, + }, withTag(tr.parserImage, "latest")) +} + +func (tr *TaskRunner) CreateAudit(ctx context.Context, req *task.CreateAuditRequest) (task.ID, task.RunnerID, error) { + value, err := encodeRequest(req) + if err != nil { + return "", "", fmt.Errorf("failed to encode CreateAuditRequest: %w", err) + } + return tr.run(ctx, []task.EnvVar{ + { + Key: "TASK_TYPE", + Value: string(task.CreateAudit), + }, + { + Key: "CREATE_AUDIT_REQUEST", + Value: value, + }, + }, withTag(tr.runnerImage, "latest")) +} + +func (tr *TaskRunner) CreateReport(ctx context.Context, req *task.CreateReportRequest) (task.ID, task.RunnerID, error) { + value, err := encodeRequest(req) + if err != nil { + return "", "", fmt.Errorf("failed to encode CreateReportRequest: %w", err) + } + return tr.run(ctx, []task.EnvVar{ + { + Key: "TASK_TYPE", + Value: string(task.CreateReport), + }, + { + Key: "CREATE_REPORT_REQUEST", + Value: value, + }, + }, withTag(tr.runnerImage, "latest")) +} + +func withTag(img *task.BaseImage, tag string) *task.Image { + return &task.Image{ + Base: *img, + Tag: tag, + } +} + +func (tr *TaskRunner) run(ctx context.Context, env []task.EnvVar, image *task.Image) (task.ID, task.RunnerID, error) { + tr.logger.Info("triggering task run", zap.Any("env", env)) + taskID := uuid.NewString() + runnerID, err := tr.runner.Run(ctx, &task.Config{ + Env: append(env, task.EnvVar{ + Key: "TASK_ID", + Value: taskID, + }), + Flags: []string{"--config=" + tr.configPath}, + Command: []string{"/runner"}, + Image: image, + }) + if err != nil { + return "", "", fmt.Errorf("failed to run task %q, %q: %w", taskID, runnerID, err) + } + return task.ID(taskID), runnerID, nil +} diff --git a/cmd/runner/BUILD.bazel b/cmd/runner/BUILD.bazel index 080d643..ed50415 100644 --- a/cmd/runner/BUILD.bazel +++ b/cmd/runner/BUILD.bazel @@ -8,20 +8,16 @@ go_library( importpath = "github.com/RMI/pacta/cmd/runner", visibility = ["//visibility:private"], deps = [ + "//async", "//azure/azblob", + "//azure/azcreds", "//azure/azlog", - "//blob", - "//pacta", "//task", - "@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore", - "@com_github_azure_azure_sdk_for_go_sdk_azcore//policy", - "@com_github_azure_azure_sdk_for_go_sdk_azcore//to", - "@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity", "@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher", - "@com_github_google_uuid//:uuid", "@com_github_namsral_flag//:flag", "@org_uber_go_zap//:zap", "@org_uber_go_zap//zapcore", + "@org_uber_go_zap_exp//zapfield", ], ) diff --git a/cmd/runner/README.md b/cmd/runner/README.md index 80ba6a0..df718ef 100644 --- a/cmd/runner/README.md +++ b/cmd/runner/README.md @@ -1,6 +1,6 @@ # Runner -This directory contains the `runner` binary, which acts as a thin shim around the PACTA portfolio analysis tooling, running tasks created via either Azure Container App Jobs (via the `aztask` package) or local Docker (`localRunner`), loading relevant blobs, and writing relevant outputs. +This directory contains the `runner` binary, which acts as a thin shim around the PACTA portfolio analysis tooling, running tasks created via either Azure Container App Jobs (via the `aztask` package) or local Docker (`dockertask`), loading relevant blobs, and writing relevant outputs. ## Running locally diff --git a/cmd/runner/configs/dev.conf b/cmd/runner/configs/dev.conf index 5c7803a..7e3d41b 100644 --- a/cmd/runner/configs/dev.conf +++ b/cmd/runner/configs/dev.conf @@ -5,6 +5,4 @@ azure_event_topic pacta-events-dev azure_topic_location centralus-1 azure_storage_account rmipactadev -azure_source_portfolio_container uploadedportfolios -azure_dest_portfolio_container parsedportfolios azure_report_container reports diff --git a/cmd/runner/configs/local.conf b/cmd/runner/configs/local.conf index 99e0ab0..9a73adf 100644 --- a/cmd/runner/configs/local.conf +++ b/cmd/runner/configs/local.conf @@ -5,6 +5,4 @@ azure_event_topic pacta-events-local azure_topic_location centralus-1 azure_storage_account rmipactalocal -azure_source_portfolio_container uploadedportfolios -azure_dest_portfolio_container parsedportfolios azure_report_container reports diff --git a/cmd/runner/main.go b/cmd/runner/main.go index e555c09..664e029 100644 --- a/cmd/runner/main.go +++ b/cmd/runner/main.go @@ -1,34 +1,21 @@ package main import ( - "bufio" - "bytes" "context" - "encoding/json" "errors" "fmt" - "io" - "io/fs" "log" "os" - "os/exec" - "path/filepath" - "strings" - "time" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy" - "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" - "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventgrid/publisher" + "github.com/RMI/pacta/async" "github.com/RMI/pacta/azure/azblob" + "github.com/RMI/pacta/azure/azcreds" "github.com/RMI/pacta/azure/azlog" - "github.com/RMI/pacta/blob" - "github.com/RMI/pacta/pacta" "github.com/RMI/pacta/task" - "github.com/google/uuid" "github.com/namsral/flag" "go.uber.org/zap" + "go.uber.org/zap/exp/zapfield" "go.uber.org/zap/zapcore" ) @@ -53,10 +40,8 @@ func run(args []string) error { azEventTopic = fs.String("azure_event_topic", "", "The EventGrid topic to send notifications when tasks have finished") azTopicLocation = fs.String("azure_topic_location", "", "The location (like 'centralus-1') where our EventGrid topics are hosted") - azStorageAccount = fs.String("azure_storage_account", "", "The storage account to authenticate against for blob operations") - azSourcePortfolioContainer = fs.String("azure_source_portfolio_container", "", "The container in the storage account where we read raw portfolios from") - azDestPortfolioContainer = fs.String("azure_dest_portfolio_container", "", "The container in the storage account where we read/write processed portfolios") - azReportContainer = fs.String("azure_report_container", "", "The container in the storage account where we write generated portfolio reports to") + azStorageAccount = fs.String("azure_storage_account", "", "The storage account to authenticate against for blob operations") + azReportContainer = fs.String("azure_report_container", "", "The container in the storage account where we write generated portfolio reports to") minLogLevel zapcore.Level = zapcore.DebugLevel ) @@ -78,27 +63,11 @@ func run(args []string) error { } defer logger.Sync() - var creds azcore.TokenCredential - // Azure has 3-4 ways to authenticate as an identity to their APIs (KMS, storage, etc). - // - When running locally, we use the "Environment" approach, which means we provide AZURE_* environment variables that authenticate against a local-only service account. - // - When running in Azure Container Apps Jobs, we use the "ManagedIdentitiy" approach, meaning we pull ambiently from the infrastructure we're running on (via a metadata service). - // See https://pkg.go.dev/github.com/Azure/azure-sdk-for-go/sdk/azidentity#readme-credential-types for more info - if azClientSecret := os.Getenv("AZURE_CLIENT_SECRET"); azClientSecret != "" { - if creds, err = azidentity.NewEnvironmentCredential(nil); err != nil { - return fmt.Errorf("failed to load Azure credentials from environment: %w", err) - } - } else { - // We use "ManagedIdentity" instead of just "Default" because the default - // timeout is too low in azidentity.NewDefaultAzureCredentials, so it times out - // and fails to run. - azClientID := os.Getenv("AZURE_CLIENT_ID") - logger.Info("Loading user managed credentials", zap.String("client_id", azClientID)) - if creds, err = azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{ - ID: azidentity.ClientID(azClientID), - }); err != nil { - return fmt.Errorf("failed to load Azure credentials: %w", err) - } + creds, credType, err := azcreds.New() + if err != nil { + return fmt.Errorf("failed to load Azure credentials: %w", err) } + logger.Info("authenticated with Azure", zapfield.Str("credential_type", credType)) pubsubClient, err := publisher.NewClient(fmt.Sprintf("https://%s.%s.eventgrid.azure.net/api/events", *azEventTopic, *azTopicLocation), creds, nil) if err != nil { @@ -110,20 +79,20 @@ func run(args []string) error { return fmt.Errorf("failed to init blob client: %w", err) } - h := handler{ - blob: blobClient, - pubsub: pubsubClient, - logger: logger, - - sourcePortfolioContainer: *azSourcePortfolioContainer, - destPortfolioContainer: *azDestPortfolioContainer, - reportContainer: *azReportContainer, + h, err := async.New(&async.Config{ + Blob: blobClient, + PubSub: pubsubClient, + Logger: logger, + }) + if err != nil { + return fmt.Errorf("failed to init async biz logic handler: %w", err) } validTasks := map[task.Type]func(context.Context, task.ID) error{ - task.ParsePortfolio: toRunFn(parsePortfolioReq, h.parsePortfolio), - task.CreateReport: toRunFn(createReportReq, h.createReport), - task.CreateAudit: toRunFn(createAuditReq, h.createAudit), + task.CreateReport: toRunFn(async.LoadCreateReportRequestFromEnv, func(ctx context.Context, id task.ID, req *task.CreateReportRequest) error { + return h.CreateReport(ctx, id, req, *azReportContainer) + }), + task.CreateAudit: toRunFn(async.LoadCreateAuditRequestFromEnv, h.CreateAudit), } taskID := task.ID(os.Getenv("TASK_ID")) @@ -152,361 +121,6 @@ func run(args []string) error { return nil } -type Blob interface { - ReadBlob(ctx context.Context, uri string) (io.ReadCloser, error) - WriteBlob(ctx context.Context, uri string, r io.Reader) error - Scheme() blob.Scheme -} - -type handler struct { - blob Blob - pubsub *publisher.Client - logger *zap.Logger - - sourcePortfolioContainer string - destPortfolioContainer string - reportContainer string -} - -func parsePortfolioReq() (*task.ParsePortfolioRequest, error) { - taskStr := os.Getenv("PARSE_PORTFOLIO_REQUEST") - if taskStr == "" { - return nil, errors.New("no PARSE_PORTFOLIO_REQUEST given") - } - var task task.ParsePortfolioRequest - if err := json.NewDecoder(strings.NewReader(taskStr)).Decode(&task); err != nil { - return nil, fmt.Errorf("failed to load ParsePortfolioRequest: %w", err) - } - return &task, nil -} - -func (h *handler) uploadDirectory(ctx context.Context, dirPath, container string) ([]*task.AnalysisArtifact, error) { - base := filepath.Base(dirPath) - - var artifacts []*task.AnalysisArtifact - err := filepath.WalkDir(dirPath, func(path string, info fs.DirEntry, err error) error { - if info.IsDir() { - return nil - } - - // This is a file, let's upload it to the container - uri := blob.Join(h.blob.Scheme(), container, base, strings.TrimPrefix(path, dirPath+"/")) - if err := h.uploadBlob(ctx, path, uri); err != nil { - return fmt.Errorf("failed to upload blob: %w", err) - } - - fn := filepath.Base(path) - // Returns pacta.FileType_UNKNOWN for unrecognized extensions, which we'll serve as binary blobs. - ft := fileTypeFromExt(filepath.Ext(fn)) - if ft == pacta.FileType_UNKNOWN { - h.logger.Error("unhandled file extension", zap.String("dir", dirPath), zap.String("file_ext", filepath.Ext(fn))) - } - artifacts = append(artifacts, &task.AnalysisArtifact{ - BlobURI: pacta.BlobURI(uri), - FileName: fn, - FileType: ft, - }) - return nil - }) - if err != nil { - return nil, fmt.Errorf("error while walking dir/uploading blobs: %w", err) - } - return artifacts, nil -} - -func fileTypeFromExt(ext string) pacta.FileType { - switch ext { - case ".csv": - return pacta.FileType_CSV - case ".yaml": - return pacta.FileType_YAML - case ".zip": - return pacta.FileType_ZIP - case ".html": - return pacta.FileType_HTML - case ".json": - return pacta.FileType_JSON - case ".txt": - return pacta.FileType_TEXT - case ".css": - return pacta.FileType_CSS - case ".js": - return pacta.FileType_JS - case ".ttf": - return pacta.FileType_TTF - default: - return pacta.FileType_UNKNOWN - } -} - -func (h *handler) uploadBlob(ctx context.Context, srcPath, destURI string) error { - h.logger.Info("uploading blob", zap.String("src", srcPath), zap.String("dest", destURI)) - - srcF, err := os.Open(srcPath) - if err != nil { - return fmt.Errorf("failed to open file for upload: %w", err) - } - defer srcF.Close() // Best-effort in case something fails - - if err := h.blob.WriteBlob(ctx, destURI, srcF); err != nil { - return fmt.Errorf("failed to write file to blob storage: %w", err) - } - - if err := srcF.Close(); err != nil { - return fmt.Errorf("failed to close source file: %w", err) - } - - return nil -} - -func (h *handler) downloadBlob(ctx context.Context, srcURI, destPath string) error { - // Make sure the destination exists - if err := os.MkdirAll(filepath.Dir(destPath), 0600); err != nil { - return fmt.Errorf("failed to create directory to download blob to: %w", err) - } - - destF, err := os.Create(destPath) - if err != nil { - return fmt.Errorf("failed to create dest file: %w", err) - } - defer destF.Close() // Best-effort in case something fails - - br, err := h.blob.ReadBlob(ctx, srcURI) - if err != nil { - return fmt.Errorf("failed to read raw portfolio: %w", err) - } - defer br.Close() // Best-effort in case something fails - - if _, err := io.Copy(destF, br); err != nil { - return fmt.Errorf("failed to load raw portfolio: %w", err) - } - - if err := br.Close(); err != nil { - return fmt.Errorf("failed to close blob reader: %w", err) - } - - if err := destF.Close(); err != nil { - return fmt.Errorf("failed to close dest file: %w", err) - } - - return nil -} - -// TODO: Send a notification when parsing fails. -func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest) error { - // Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where - // the `process_portfolios.R` script expects it to be. - for _, srcURI := range req.BlobURIs { - id := uuid.New().String() - // TODO: Probably set the CSV extension in the signed upload URL instead. - destPath := filepath.Join("/", "mnt", "raw_portfolios", fmt.Sprintf("%s.csv", id)) - if err := h.downloadBlob(ctx, string(srcURI), destPath); err != nil { - return fmt.Errorf("failed to download raw portfolio blob: %w", err) - } - } - - processedDir := filepath.Join("/", "mnt", "processed_portfolios") - if err := os.MkdirAll(processedDir, 0600); err != nil { - return fmt.Errorf("failed to create directory to download blob to: %w", err) - } - - var stdout, stderr bytes.Buffer - cmd := exec.CommandContext(ctx, "/usr/local/bin/Rscript", "/app/process_portfolios.R") - cmd.Stdout = io.MultiWriter(os.Stdout, &stdout) - cmd.Stderr = io.MultiWriter(os.Stderr, &stderr) - - if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to run process_portfolios script: %w", err) - } - - sc := bufio.NewScanner(&stderr) - - // TODO: Load from the output database file (or similar, like reading the processed_portfolios dir) instead of parsing stderr - var paths []string - for sc.Scan() { - line := sc.Text() - idx := strings.Index(line, "writing to file: " /* 17 chars */) - if idx == -1 { - continue - } - paths = append(paths, strings.TrimSpace(line[idx+17:])) - } - - // NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize. - var out []*task.ParsePortfolioResponseItem - for _, p := range paths { - lineCount, err := countCSVLines(p) - if err != nil { - return fmt.Errorf("failed to count lines in file %q: %w", p, err) - } - fileName := filepath.Base(p) - blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), h.destPortfolioContainer, fileName)) - if err := h.uploadBlob(ctx, p, string(blobURI)); err != nil { - return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err) - } - extension := filepath.Ext(fileName) - fileType, err := pacta.ParseFileType(extension) - if err != nil { - return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err) - } - out = append(out, &task.ParsePortfolioResponseItem{ - Blob: pacta.Blob{ - FileName: fileName, - FileType: fileType, - BlobURI: blobURI, - }, - LineCount: lineCount, - }) - } - - events := []publisher.Event{ - { - Data: task.ParsePortfolioResponse{ - TaskID: taskID, - Request: req, - Outputs: out, - }, - DataVersion: to.Ptr("1.0"), - EventType: to.Ptr("parsed-portfolio"), - EventTime: to.Ptr(time.Now()), - ID: to.Ptr(string(taskID)), - Subject: to.Ptr(string(taskID)), - }, - } - - if _, err := h.pubsub.PublishEvents(ctx, events, nil); err != nil { - return fmt.Errorf("failed to publish event: %w", err) - } - - h.logger.Info("parsed portfolio", zap.String("task_id", string(taskID))) - - return nil -} - -// TODO(grady): Move this line counting into the image to prevent having our code do any read of the actual underlying data. -func countCSVLines(path string) (int, error) { - file, err := os.Open(path) - if err != nil { - return 0, fmt.Errorf("opening file failed: %w", err) - } - defer file.Close() - scanner := bufio.NewScanner(file) - lineCount := 0 - for scanner.Scan() { - lineCount++ - } - if err := scanner.Err(); err != nil { - return 0, fmt.Errorf("scanner.error returned: %w", err) - } - // Subtract 1 for the header row - return lineCount - 1, nil -} - -func createAuditReq() (*task.CreateAuditRequest, error) { - car := os.Getenv("CREATE_AUDIT_REQUEST") - if car == "" { - return nil, errors.New("no CREATE_AUDIT_REQUEST was given") - } - var task task.CreateAuditRequest - if err := json.NewDecoder(strings.NewReader(car)).Decode(&task); err != nil { - return nil, fmt.Errorf("failed to load CreateAuditRequest: %w", err) - } - return &task, nil -} - -func createReportReq() (*task.CreateReportRequest, error) { - crr := os.Getenv("CREATE_REPORT_REQUEST") - if crr == "" { - return nil, errors.New("no CREATE_REPORT_REQUEST was given") - } - var task task.CreateReportRequest - if err := json.NewDecoder(strings.NewReader(crr)).Decode(&task); err != nil { - return nil, fmt.Errorf("failed to load CreateReportRequest: %w", err) - } - return &task, nil -} - -func (h *handler) createAudit(ctx context.Context, taskID task.ID, req *task.CreateAuditRequest) error { - return errors.New("not implemented") -} - -func (h *handler) createReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest) error { - fileNames := []string{} - for _, blobURI := range req.BlobURIs { - // Load the parsed portfolio from blob storage, place it in /mnt/ - // processed_portfolios, where the `create_report.R` script expects it - // to be. - fileNameWithExt := filepath.Base(string(blobURI)) - if !strings.HasSuffix(fileNameWithExt, ".json") { - return fmt.Errorf("given blob wasn't a JSON-formatted portfolio, %q", fileNameWithExt) - } - fileNames = append(fileNames, strings.TrimSuffix(fileNameWithExt, ".json")) - destPath := filepath.Join("/", "mnt", "processed_portfolios", fileNameWithExt) - if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil { - return fmt.Errorf("failed to download processed portfolio blob: %w", err) - } - } - - reportDir := filepath.Join("/", "mnt", "reports") - if err := os.MkdirAll(reportDir, 0600); err != nil { - return fmt.Errorf("failed to create directory for reports to get copied to: %w", err) - } - - cmd := exec.CommandContext(ctx, "/usr/local/bin/Rscript", "/app/create_report.R") - cmd.Env = append(cmd.Env, - "PORTFOLIO="+strings.Join(fileNames, ","), - "HOME=/root", /* Required by pandoc */ - ) - cmd.Stdout = os.Stdout - cmd.Stderr = os.Stderr - - if err := cmd.Run(); err != nil { - return fmt.Errorf("failed to run pacta test CLI: %w", err) - } - - // Download outputs from from /out and upload them to Azure - dirEntries, err := os.ReadDir(reportDir) - if err != nil { - return fmt.Errorf("failed to read report directory: %w", err) - } - - var artifacts []*task.AnalysisArtifact - for _, dirEntry := range dirEntries { - if !dirEntry.IsDir() { - continue - } - dirPath := filepath.Join(reportDir, dirEntry.Name()) - tmp, err := h.uploadDirectory(ctx, dirPath, h.reportContainer) - if err != nil { - return fmt.Errorf("failed to upload report directory: %w", err) - } - artifacts = tmp - } - - events := []publisher.Event{ - { - Data: task.CreateReportResponse{ - TaskID: taskID, - Request: req, - Artifacts: artifacts, - }, - DataVersion: to.Ptr("1.0"), - EventType: to.Ptr("created-report"), - EventTime: to.Ptr(time.Now()), - ID: to.Ptr(string(taskID)), - Subject: to.Ptr(string(taskID)), - }, - } - - if _, err := h.pubsub.PublishEvents(ctx, events, nil); err != nil { - return fmt.Errorf("failed to publish event: %w", err) - } - - h.logger.Info("created report", zap.String("task_id", string(taskID))) - - return nil -} - func toRunFn[T any](reqFn func() (T, error), runFn func(context.Context, task.ID, T) error) func(context.Context, task.ID) error { return func(ctx context.Context, taskID task.ID) error { req, err := reqFn() @@ -516,23 +130,3 @@ func toRunFn[T any](reqFn func() (T, error), runFn func(context.Context, task.ID return runFn(ctx, taskID, req) } } - -type azureTokenCredential struct { - accessToken string -} - -func (a *azureTokenCredential) GetToken(ctx context.Context, options policy.TokenRequestOptions) (azcore.AccessToken, error) { - return azcore.AccessToken{ - Token: a.accessToken, - // We just don't bother with expiration time - ExpiresOn: time.Now().AddDate(1, 0, 0), - }, nil -} - -func asStrs[T ~string](in []T) []string { - out := make([]string, len(in)) - for i, v := range in { - out[i] = string(v) - } - return out -} diff --git a/cmd/server/BUILD.bazel b/cmd/server/BUILD.bazel index 3075e9a..76c45f8 100644 --- a/cmd/server/BUILD.bazel +++ b/cmd/server/BUILD.bazel @@ -9,6 +9,7 @@ go_library( visibility = ["//visibility:private"], deps = [ "//azure/azblob", + "//azure/azcreds", "//azure/azevents", "//azure/aztask", "//cmd/runner/taskrunner", @@ -21,8 +22,6 @@ go_library( "//secrets", "//session", "//task", - "@com_github_azure_azure_sdk_for_go_sdk_azcore//:azcore", - "@com_github_azure_azure_sdk_for_go_sdk_azidentity//:azidentity", "@com_github_deepmap_oapi_codegen//pkg/chi-middleware", "@com_github_go_chi_chi_v5//:chi", "@com_github_go_chi_chi_v5//middleware", @@ -32,8 +31,8 @@ go_library( "@com_github_lestrrat_go_jwx_v2//jwk", "@com_github_namsral_flag//:flag", "@com_github_rs_cors//:cors", - "@com_github_silicon_ally_zaphttplog//:zaphttplog", "@org_uber_go_zap//:zap", + "@org_uber_go_zap_exp//zapfield", ], ) diff --git a/cmd/server/main.go b/cmd/server/main.go index 92b9b7c..6f959ea 100644 --- a/cmd/server/main.go +++ b/cmd/server/main.go @@ -12,9 +12,8 @@ import ( "strings" "time" - "github.com/Azure/azure-sdk-for-go/sdk/azcore" - "github.com/Azure/azure-sdk-for-go/sdk/azidentity" "github.com/RMI/pacta/azure/azblob" + "github.com/RMI/pacta/azure/azcreds" "github.com/RMI/pacta/azure/azevents" "github.com/RMI/pacta/azure/aztask" "github.com/RMI/pacta/cmd/runner/taskrunner" @@ -35,6 +34,7 @@ import ( "github.com/namsral/flag" "github.com/rs/cors" "go.uber.org/zap" + "go.uber.org/zap/exp/zapfield" oapimiddleware "github.com/deepmap/oapi-codegen/pkg/chi-middleware" chimiddleware "github.com/go-chi/chi/v5/middleware" @@ -193,22 +193,11 @@ func run(args []string) error { // that server names match. We don't know how this thing will be run. pactaSwagger.Servers = nil - var creds azcore.TokenCredential - // This is necessary because the default timeout is too low in - // azidentity.NewDefaultAzureCredentials, so it times out and fails to run. - if azClientID := os.Getenv("AZURE_CLIENT_ID"); azClientID != "" { - logger.Info("Loading user managed credentials", zap.String("client_id", azClientID)) - if creds, err = azidentity.NewManagedIdentityCredential(&azidentity.ManagedIdentityCredentialOptions{ - ID: azidentity.ClientID(azClientID), - }); err != nil { - return fmt.Errorf("failed to load Azure credentials: %w", err) - } - } else { - logger.Info("Loading default credentials") - if creds, err = azidentity.NewDefaultAzureCredential(nil); err != nil { - return fmt.Errorf("failed to load Azure credentials: %w", err) - } + creds, credType, err := azcreds.New() + if err != nil { + return fmt.Errorf("failed to load Azure credentials: %w", err) } + logger.Info("authenticated with Azure", zapfield.Str("credential_type", credType)) var runner taskrunner.Runner if *useAZRunner { diff --git a/deps.bzl b/deps.bzl index e754967..2667c63 100644 --- a/deps.bzl +++ b/deps.bzl @@ -2219,6 +2219,12 @@ def go_dependencies(): sum = "h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60=", version = "v1.24.0", ) + go_repository( + name = "org_uber_go_zap_exp", + importpath = "go.uber.org/zap/exp", + sum = "h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs=", + version = "v0.2.0", + ) go_repository( name = "tools_gotest_v3", importpath = "gotest.tools/v3", diff --git a/go.mod b/go.mod index 87a3e60..4ccb4cb 100644 --- a/go.mod +++ b/go.mod @@ -2,6 +2,8 @@ module github.com/RMI/pacta go 1.21 +toolchain go1.21.6 + require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.8.0 github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.1 @@ -76,6 +78,7 @@ require ( github.com/stretchr/testify v1.8.4 // indirect go.uber.org/atomic v1.11.0 // indirect go.uber.org/multierr v1.11.0 // indirect + go.uber.org/zap/exp v0.2.0 // indirect golang.org/x/crypto v0.18.0 // indirect golang.org/x/mod v0.14.0 // indirect golang.org/x/net v0.18.0 // indirect diff --git a/go.sum b/go.sum index 71b9dae..24eafc5 100644 --- a/go.sum +++ b/go.sum @@ -357,6 +357,8 @@ go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN8 go.uber.org/zap v1.10.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q= go.uber.org/zap v1.24.0 h1:FiJd5l1UOLj0wCgbSE0rwwXHzEdAZS6hiiSnxJN/D60= go.uber.org/zap v1.24.0/go.mod h1:2kMP+WWQ8aoFoedH3T2sq6iJ2yDWpHbP0f6MQbS9Gkg= +go.uber.org/zap/exp v0.2.0 h1:FtGenNNeCATRB3CmB/yEUnjEFeJWpB/pMcy7e2bKPYs= +go.uber.org/zap/exp v0.2.0/go.mod h1:t0gqAIdh1MfKv9EwN/dLwfZnJxe9ITAZN78HEWPFWDQ= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= diff --git a/reportsrv/BUILD.bazel b/reportsrv/BUILD.bazel index 31f2de8..ae231f5 100644 --- a/reportsrv/BUILD.bazel +++ b/reportsrv/BUILD.bazel @@ -23,6 +23,7 @@ go_test( "//blob", "//db", "//pacta", + "//session", "@com_github_go_chi_chi_v5//:chi", "@org_uber_go_zap//zaptest", ], diff --git a/scripts/BUILD.bazel b/scripts/BUILD.bazel index 3d48fc9..3acc3e2 100644 --- a/scripts/BUILD.bazel +++ b/scripts/BUILD.bazel @@ -52,3 +52,8 @@ sh_binary( name = "build_and_load_runner", srcs = ["build_and_load_runner.sh"], ) + +sh_binary( + name = "build_and_load_parser", + srcs = ["build_and_load_parser.sh"], +) diff --git a/scripts/build_and_load_parser.sh b/scripts/build_and_load_parser.sh new file mode 100755 index 0000000..cfbf7de --- /dev/null +++ b/scripts/build_and_load_parser.sh @@ -0,0 +1,20 @@ +#!/bin/bash +set -euo pipefail + +ROOT="$BUILD_WORKSPACE_DIRECTORY" +cd "$ROOT" + +# Build the image +bazel build --@io_bazel_rules_go//go/config:pure //cmd/parser:image_tarball + +# Load it into Docker, capture output +LOAD_OUTPUT=$(docker load < bazel-bin/cmd/parser/image_tarball/tarball.tar) + +# Extract the SHA +IMAGE_ID=$(echo $LOAD_OUTPUT | grep -oP 'sha256:\K\w+') + +# Tag the image +docker tag $IMAGE_ID rmisa.azurecr.io/pactaparser:latest + +echo "Tagged $IMAGE_ID as rmisa.azurecr.io/pactaparser:latest" +