diff --git a/async/BUILD.bazel b/async/BUILD.bazel index c2f2170..0a7ec8b 100644 --- a/async/BUILD.bazel +++ b/async/BUILD.bazel @@ -17,5 +17,6 @@ go_library( "@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher", "@com_github_google_uuid//:uuid", "@org_uber_go_zap//:zap", + "@org_uber_go_zap_exp//zapfield", ], ) diff --git a/async/async.go b/async/async.go index 8d5dd24..8a5cc8b 100644 --- a/async/async.go +++ b/async/async.go @@ -23,6 +23,7 @@ import ( "github.com/RMI/pacta/task" "github.com/google/uuid" "go.uber.org/zap" + "go.uber.org/zap/exp/zapfield" ) type Config struct { @@ -128,34 +129,46 @@ func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task. return fmt.Errorf("failed to decode processed_portfolios.json as JSON: %w", err) } + // We keep track of the outputs we processed, and then check if there are any files in the output directory that we weren't expecting. + knownOutputFiles := map[string]bool{ + "processed_portfolios.json": true, + } + // NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize. var out []*task.ParsePortfolioResponseItem for _, sf := range sourceFiles { + sourceURI, ok := localCSVToBlob[sf.InputFilename] + if !ok { + return fmt.Errorf("parse output mentioned input file %q, which wasn't found in our input -> blob URI map %+v", sf.InputFilename, localCSVToBlob) + } - // TODO: There's lots of metadata associated with the input files (e.g. + // TODO(#187): There's lots of metadata associated with the input files (e.g. // sf.Errors, sf.GroupCols, etc), we should likely store that info somewhere. for _, p := range sf.Portfolios { outPath := filepath.Join(outputDir, p.OutputFilename) - // XXX: One risk here is that we're depending on the R code to generate truly - // random UUIDs, we likely want some sort of namespacing here. - blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, p.OutputFilename)) + // We generate a fresh UUID here for uploading the file to blob storage, so that + // we don't depend on the R code generating truly unique UUIDs. + uploadName := fmt.Sprintf("%s.csv", uuid.New().String()) + + blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, uploadName)) if err := h.uploadBlob(ctx, outPath, string(blobURI)); err != nil { return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err) } + h.logger.Info("uploaded output CSV to blob storage", zap.Any("portfolio", p), zapfield.Str("blob_uri", blobURI)) + extension := filepath.Ext(p.OutputFilename) fileType, err := pacta.ParseFileType(extension) if err != nil { return fmt.Errorf("failed to parse file type from file name %q: %w", p.OutputFilename, err) } - - sourceURI, ok := localCSVToBlob[sf.InputFilename] - if !ok { - return fmt.Errorf("parse output mentioned input file %q, which wasn't found in our input -> blob URI map %+v", sf.InputFilename, localCSVToBlob) + if fileType != pacta.FileType_CSV { + return fmt.Errorf("output portfolio %q was not of type CSV, was %q", p.OutputFilename, fileType) } + knownOutputFiles[p.OutputFilename] = true out = append(out, &task.ParsePortfolioResponseItem{ Source: sourceURI, Blob: pacta.Blob{ @@ -168,6 +181,18 @@ func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task. } } + // Now that we're done uploading files, check the output directory and make sure + // there aren't any unaccounted for files. + dirEntries, err := os.ReadDir(outputDir) + if err != nil { + return fmt.Errorf("failed to read the output directory: %w", err) + } + for _, de := range dirEntries { + if !knownOutputFiles[de.Name()] { + h.logger.Error("output directory contained files not present in the generated 'processed_portfolios.json' manifest", zap.String("filename", de.Name())) + } + } + events := []publisher.Event{ { Data: task.ParsePortfolioResponse{ diff --git a/async/parsed/parsed.go b/async/parsed/parsed.go index d2f6caa..0d8aa91 100644 --- a/async/parsed/parsed.go +++ b/async/parsed/parsed.go @@ -1,5 +1,8 @@ // Package parsed just holds the domain types for dealing with the output of the -// ParsePortfolio async task. +// ParsePortfolio async task. The code that generates output in this structure +// lives in [1], which provides the base image for our parser binary. +// +// [1] https://github.com/RMI-PACTA/workflow.portfolio.parsing package parsed type SourceFile struct { diff --git a/azure/azevents/azevents.go b/azure/azevents/azevents.go index 69d2a99..aff3cb2 100644 --- a/azure/azevents/azevents.go +++ b/azure/azevents/azevents.go @@ -328,7 +328,7 @@ func (s *Server) handleParsedPortfolio(id string, resp *task.ParsePortfolioRespo return fmt.Errorf("creating blob %d: %w", i, err) } - // TODO: There's other metadata in output.Portfolio, like `InvestorName`, that + // TODO(#187): There's other metadata in output.Portfolio, like `InvestorName`, that // we aren't currently storing. portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{ Owner: &pacta.Owner{ID: ownerID},