Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
bcspragu committed Feb 14, 2024
1 parent e2e1c33 commit 5cac6c8
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 10 deletions.
1 change: 1 addition & 0 deletions async/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,6 @@ go_library(
"@com_github_azure_azure_sdk_for_go_sdk_messaging_azeventgrid//publisher",
"@com_github_google_uuid//:uuid",
"@org_uber_go_zap//:zap",
"@org_uber_go_zap_exp//zapfield",
],
)
41 changes: 33 additions & 8 deletions async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"github.com/RMI/pacta/task"
"github.com/google/uuid"
"go.uber.org/zap"
"go.uber.org/zap/exp/zapfield"
)

type Config struct {
Expand Down Expand Up @@ -128,34 +129,46 @@ func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.
return fmt.Errorf("failed to decode processed_portfolios.json as JSON: %w", err)
}

// We keep track of the outputs we processed, and then check if there are any files in the output directory that we weren't expecting.
knownOutputFiles := map[string]bool{
"processed_portfolios.json": true,
}

// NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize.
var out []*task.ParsePortfolioResponseItem
for _, sf := range sourceFiles {
sourceURI, ok := localCSVToBlob[sf.InputFilename]
if !ok {
return fmt.Errorf("parse output mentioned input file %q, which wasn't found in our input -> blob URI map %+v", sf.InputFilename, localCSVToBlob)
}

// TODO: There's lots of metadata associated with the input files (e.g.
// TODO(#187): There's lots of metadata associated with the input files (e.g.
// sf.Errors, sf.GroupCols, etc), we should likely store that info somewhere.

for _, p := range sf.Portfolios {
outPath := filepath.Join(outputDir, p.OutputFilename)

// XXX: One risk here is that we're depending on the R code to generate truly
// random UUIDs, we likely want some sort of namespacing here.
blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, p.OutputFilename))
// We generate a fresh UUID here for uploading the file to blob storage, so that
// we don't depend on the R code generating truly unique UUIDs.
uploadName := fmt.Sprintf("%s.csv", uuid.New().String())

blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, uploadName))

if err := h.uploadBlob(ctx, outPath, string(blobURI)); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err)
}
h.logger.Info("uploaded output CSV to blob storage", zap.Any("portfolio", p), zapfield.Str("blob_uri", blobURI))

extension := filepath.Ext(p.OutputFilename)
fileType, err := pacta.ParseFileType(extension)
if err != nil {
return fmt.Errorf("failed to parse file type from file name %q: %w", p.OutputFilename, err)
}

sourceURI, ok := localCSVToBlob[sf.InputFilename]
if !ok {
return fmt.Errorf("parse output mentioned input file %q, which wasn't found in our input -> blob URI map %+v", sf.InputFilename, localCSVToBlob)
if fileType != pacta.FileType_CSV {
return fmt.Errorf("output portfolio %q was not of type CSV, was %q", p.OutputFilename, fileType)
}

knownOutputFiles[p.OutputFilename] = true
out = append(out, &task.ParsePortfolioResponseItem{
Source: sourceURI,
Blob: pacta.Blob{
Expand All @@ -168,6 +181,18 @@ func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.
}
}

// Now that we're done uploading files, check the output directory and make sure
// there aren't any unaccounted for files.
dirEntries, err := os.ReadDir(outputDir)
if err != nil {
return fmt.Errorf("failed to read the output directory: %w", err)
}
for _, de := range dirEntries {
if !knownOutputFiles[de.Name()] {
h.logger.Error("output directory contained files not present in the generated 'processed_portfolios.json' manifest", zap.String("filename", de.Name()))
}
}

events := []publisher.Event{
{
Data: task.ParsePortfolioResponse{
Expand Down
5 changes: 4 additions & 1 deletion async/parsed/parsed.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
// Package parsed just holds the domain types for dealing with the output of the
// ParsePortfolio async task.
// ParsePortfolio async task. The code that generates output in this structure
// lives in [1], which provides the base image for our parser binary.
//
// [1] https://github.com/RMI-PACTA/workflow.portfolio.parsing
package parsed

type SourceFile struct {
Expand Down
2 changes: 1 addition & 1 deletion azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func (s *Server) handleParsedPortfolio(id string, resp *task.ParsePortfolioRespo
return fmt.Errorf("creating blob %d: %w", i, err)
}

// TODO: There's other metadata in output.Portfolio, like `InvestorName`, that
// TODO(#187): There's other metadata in output.Portfolio, like `InvestorName`, that
// we aren't currently storing.
portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{
Owner: &pacta.Owner{ID: ownerID},
Expand Down

0 comments on commit 5cac6c8

Please sign in to comment.