Skip to content

Commit

Permalink
Finish integrating new parser into pipeline
Browse files Browse the repository at this point in the history
This PR finishes integrating the new parser logic from [1] into our pipeline.

It parses the `processed_portfolios.json` file from the output directory (in this case `/home/portfolio-parser/output`) and uses that to both correlate input + output files as well as upload the output CSV files. Since the R code now includes a row count, we no longer need to parse the files manually.

This all mostly works as expected. A few sharp edges (relying on UUIDs from the R code) are noted in the PR, and there's metadata produced by the new code (both at the input file level and the output file level) that we aren't currently recording anywhere.

Adjacent changes:
- In creating the `parser`, I also duplicated the `taskrunner` package. That has been hoisted to the top level and de-duped
- Assorted refactorings and renamings to make sure the `pactaparser` image gets invoked correctly

[1] https://github.com/RMI-PACTA/workflow.portfolio.parsing
  • Loading branch information
bcspragu committed Feb 14, 2024
1 parent ffe410c commit 83b748a
Show file tree
Hide file tree
Showing 14 changed files with 143 additions and 276 deletions.
1 change: 1 addition & 0 deletions async/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ go_library(
importpath = "github.com/RMI/pacta/async",
visibility = ["//visibility:public"],
deps = [
"//async/parsed",
"//blob",
"//pacta",
"//task",
Expand Down
138 changes: 76 additions & 62 deletions async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@
package async

import (
"bufio"
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
Expand All @@ -17,6 +17,7 @@ import (

"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventgrid/publisher"
"github.com/RMI/pacta/async/parsed"
"github.com/RMI/pacta/blob"
"github.com/RMI/pacta/pacta"
"github.com/RMI/pacta/task"
Expand Down Expand Up @@ -70,69 +71,101 @@ func New(cfg *Config) (*Handler, error) {

// TODO: Send a notification when parsing fails.
func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest, destPortfolioContainer string) error {
// Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where

// Make the directories we require first. We use these instead of
// /mnt/{input,output} because the base image (quite reasonably) uses a non-root
// user, so we can't be creating directories in the root filesystem all willy
// nilly.
inputDir := filepath.Join("/", "home", "portfolio-parser", "input")
outputDir := filepath.Join("/", "home", "portfolio-parser", "output")

if err := os.MkdirAll(inputDir, 0700); err != nil {
return fmt.Errorf("failed to create input dir to store input CSVs: %w", err)
}
if err := os.MkdirAll(outputDir, 0700); err != nil {
return fmt.Errorf("failed to create output dir to store output CSVs: %w", err)
}

// Load the portfolio from blob storage, place it in /mnt/inputs, where
// the `process_portfolios.R` script expects it to be.
localCSVToBlob := make(map[string]pacta.BlobURI)
for _, srcURI := range req.BlobURIs {
id := uuid.New().String()
// TODO: Probably set the CSV extension in the signed upload URL instead.
destPath := filepath.Join("/", "mnt", "raw_portfolios", fmt.Sprintf("%s.csv", id))
fn := fmt.Sprintf("%s.csv", id)
destPath := filepath.Join(inputDir, fn)
if err := h.downloadBlob(ctx, string(srcURI), destPath); err != nil {
return fmt.Errorf("failed to download raw portfolio blob: %w", err)
}
localCSVToBlob[fn] = srcURI
}

processedDir := filepath.Join("/", "mnt", "processed_portfolios")
if err := os.MkdirAll(processedDir, 0600); err != nil {
return fmt.Errorf("failed to create directory to download blob to: %w", err)
}
cmd := exec.CommandContext(ctx,
"/usr/local/bin/Rscript",
"-e", "logger::log_threshold(Sys.getenv('LOG_LEVEL', 'INFO'));workflow.portfolio.parsing::process_directory('"+inputDir+"', '"+outputDir+"')",
)

// We don't expect log output to be particularly large, it's fine to write them to an in-memory buffer.
// TODO(#185): Find a good place to put these in storage, such that it can be correlated with the input file(s)
var stdout, stderr bytes.Buffer
cmd := exec.CommandContext(ctx, "/usr/local/bin/Rscript", "/app/process_portfolios.R")
cmd.Stdout = io.MultiWriter(os.Stdout, &stdout)
cmd.Stderr = io.MultiWriter(os.Stderr, &stderr)
cmd.Stdout = &stdout
cmd.Stderr = &stderr

if err := cmd.Run(); err != nil {
return fmt.Errorf("failed to run process_portfolios script: %w", err)
}

sc := bufio.NewScanner(&stderr)
// After successful execution, the API contract is that there should be a 'processed_portfolios.json' file in the output directory.
outManifestPath := filepath.Join(outputDir, "processed_portfolios.json")
omf, err := os.Open(outManifestPath)
if err != nil {
return fmt.Errorf("failed to open output processed_portfolios.json file: %w", err)
}
defer omf.Close()

// TODO: Load from the output database file (or similar, like reading the processed_portfolios dir) instead of parsing stderr
var paths []string
for sc.Scan() {
line := sc.Text()
idx := strings.Index(line, "writing to file: " /* 17 chars */)
if idx == -1 {
continue
}
paths = append(paths, strings.TrimSpace(line[idx+17:]))
var sourceFiles []parsed.SourceFile
if err := json.NewDecoder(omf).Decode(&sourceFiles); err != nil {
return fmt.Errorf("failed to decode processed_portfolios.json as JSON: %w", err)
}

// NOTE: This code could benefit from some concurrency, but I'm opting not to prematurely optimize.
var out []*task.ParsePortfolioResponseItem
for _, p := range paths {
lineCount, err := countCSVLines(p)
if err != nil {
return fmt.Errorf("failed to count lines in file %q: %w", p, err)
}
fileName := filepath.Base(p)
blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, fileName))
if err := h.uploadBlob(ctx, p, string(blobURI)); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err)
}
extension := filepath.Ext(fileName)
fileType, err := pacta.ParseFileType(extension)
if err != nil {
return fmt.Errorf("failed to parse file type from file name %q: %w", fileName, err)
for _, sf := range sourceFiles {

// TODO: There's lots of metadata associated with the input files (e.g.
// sf.Errors, sf.GroupCols, etc), we should likely store that info somewhere.

for _, p := range sf.Portfolios {
outPath := filepath.Join(outputDir, p.OutputFilename)

// XXX: One risk here is that we're depending on the R code to generate truly
// random UUIDs, we likely want some sort of namespacing here.
blobURI := pacta.BlobURI(blob.Join(h.blob.Scheme(), destPortfolioContainer, p.OutputFilename))

if err := h.uploadBlob(ctx, outPath, string(blobURI)); err != nil {
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, blobURI, err)
}
extension := filepath.Ext(p.OutputFilename)
fileType, err := pacta.ParseFileType(extension)
if err != nil {
return fmt.Errorf("failed to parse file type from file name %q: %w", p.OutputFilename, err)
}

sourceURI, ok := localCSVToBlob[sf.InputFilename]
if !ok {
return fmt.Errorf("parse output mentioned input file %q, which wasn't found in our input -> blob URI map %+v", sf.InputFilename, localCSVToBlob)
}

out = append(out, &task.ParsePortfolioResponseItem{
Source: sourceURI,
Blob: pacta.Blob{
FileName: p.OutputFilename,
FileType: fileType,
BlobURI: blobURI,
},
Portfolio: p,
})
}
out = append(out, &task.ParsePortfolioResponseItem{
Blob: pacta.Blob{
FileName: fileName,
FileType: fileType,
BlobURI: blobURI,
},
LineCount: lineCount,
})
}

events := []publisher.Event{
Expand All @@ -159,25 +192,6 @@ func (h *Handler) ParsePortfolio(ctx context.Context, taskID task.ID, req *task.
return nil
}

// TODO(grady): Move this line counting into the image to prevent having our code do any read of the actual underlying data.
func countCSVLines(path string) (int, error) {
file, err := os.Open(path)
if err != nil {
return 0, fmt.Errorf("opening file failed: %w", err)
}
defer file.Close()
scanner := bufio.NewScanner(file)
lineCount := 0
for scanner.Scan() {
lineCount++
}
if err := scanner.Err(); err != nil {
return 0, fmt.Errorf("scanner.error returned: %w", err)
}
// Subtract 1 for the header row
return lineCount - 1, nil
}

func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.CreateAuditRequest) error {
return errors.New("not implemented")
}
Expand Down Expand Up @@ -261,7 +275,7 @@ func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.Cr

func (h *Handler) downloadBlob(ctx context.Context, srcURI, destPath string) error {
// Make sure the destination exists
if err := os.MkdirAll(filepath.Dir(destPath), 0600); err != nil {
if err := os.MkdirAll(filepath.Dir(destPath), 0700); err != nil {
return fmt.Errorf("failed to create directory to download blob to: %w", err)
}

Expand Down
8 changes: 8 additions & 0 deletions async/parsed/BUILD.bazel
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
load("@io_bazel_rules_go//go:def.bzl", "go_library")

go_library(
name = "parsed",
srcs = ["parsed.go"],
importpath = "github.com/RMI/pacta/async/parsed",
visibility = ["//visibility:public"],
)
35 changes: 35 additions & 0 deletions async/parsed/parsed.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
// Package parsed just holds the domain types for dealing with the output of the
// ParsePortfolio async task.
package parsed

type SourceFile struct {
InputFilename string `json:"input_filename"`
InputMD5 string `json:"input_md5"`
SystemInfo SystemInfo `json:"system_info"`
InputEntries int `json:"input_entries"`
GroupCols []string `json:"group_cols"`
SubportfoliosCount int `json:"subportfolios_count"`
Portfolios []Portfolio `json:"portfolios"`
Errors [][]string `json:"errors"`
}

type SystemInfo struct {
Timestamp string `json:"timestamp"`
Package string `json:"package"`
PackageVersion string `json:"packageVersion"`
RVersion string `json:"RVersion"`
Dependencies []Dependency `json:"dependencies"`
}

type Dependency struct {
Package string `json:"package"`
Version string `json:"version"`
}

type Portfolio struct {
OutputMD5 string `json:"output_md5"`
OutputFilename string `json:"output_filename"`
OutputRows int `json:"output_rows"`
PortfolioName string `json:"portfolio_name"`
InvestorName string `json:"investor_name"`
}
5 changes: 4 additions & 1 deletion azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -327,10 +327,13 @@ func (s *Server) handleParsedPortfolio(id string, resp *task.ParsePortfolioRespo
if err != nil {
return fmt.Errorf("creating blob %d: %w", i, err)
}

// TODO: There's other metadata in output.Portfolio, like `InvestorName`, that
// we aren't currently storing.
portfolioID, err := s.db.CreatePortfolio(tx, &pacta.Portfolio{
Owner: &pacta.Owner{ID: ownerID},
Name: output.Blob.FileName,
NumberOfRows: output.LineCount,
NumberOfRows: output.Portfolio.OutputRows,
Blob: &pacta.Blob{ID: blobID},
Properties: *properties,
})
Expand Down
13 changes: 0 additions & 13 deletions cmd/runner/taskrunner/BUILD.bazel

This file was deleted.

Loading

0 comments on commit 83b748a

Please sign in to comment.