Skip to content

Commit

Permalink
Integrate latest report generation Docker image into site
Browse files Browse the repository at this point in the history
This PR updates the async task runner to support the new base image (which comes from [1]).

This includes things like:
- Uploading the new output directories (summaries + analysis)
- Fixing report paths so we can render the new reports
- Adding support for the new file types that show up in these new reports
- Supporting the new input formats (the benchmark + pacta-data dirs, input JSON, env vars, etc)

Importantly, this makes the new create-report image work **locally**, but doesn't do the Azure bits. That'll come in the next PR.

[1] https://github.com/rmi-pacta/workflow.pacta.webapp/pkgs/container/workflow.pacta.webapp
  • Loading branch information
bcspragu committed Jul 24, 2024
1 parent cd18e5c commit 2715e70
Show file tree
Hide file tree
Showing 15 changed files with 334 additions and 58 deletions.
8 changes: 4 additions & 4 deletions WORKSPACE
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ oci_pull(

oci_pull(
name = "runner_base",
digest = "sha256:d0b2922dc48cb6acb7c767f89f0c92ccbe1a043166971bac0b585b3851a9b720",
# TODO(#44): Replace this base image with a more permanent one.
image = "docker.io/curfewreplica/pactatest",
# platforms = ["linux/amd64"],
# This digest is of the nightly/main tag as of 2024-07-22
digest = "sha256:7adec544294b5cb9e11c6bb4c43d0b2de646e5f933639f86c85f3f03c99f650e",
image = "ghcr.io/rmi-pacta/workflow.pacta.webapp",
platforms = ["linux/amd64"],
)

oci_pull(
Expand Down
248 changes: 210 additions & 38 deletions async/async.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ type Config struct {
Blob Blob
PubSub *publisher.Client
Logger *zap.Logger

BenchmarkDir string
PACTADataDir string
}

func (c *Config) validate() error {
Expand All @@ -43,6 +46,12 @@ func (c *Config) validate() error {
return errors.New("no logger given")
}

if c.BenchmarkDir == "" {
return errors.New("no benchmark dir specified")
}
if c.PACTADataDir == "" {
return errors.New("no PACTA data dir specified")
}
return nil
}

Expand All @@ -56,6 +65,9 @@ type Handler struct {
blob Blob
pubsub *publisher.Client
logger *zap.Logger

// Mounted directories with data needed for report generation.
benchmarkDir, pactaDataDir string
}

func New(cfg *Config) (*Handler, error) {
Expand All @@ -64,9 +76,11 @@ func New(cfg *Config) (*Handler, error) {
}

return &Handler{
blob: cfg.Blob,
pubsub: cfg.PubSub,
logger: cfg.Logger,
blob: cfg.Blob,
pubsub: cfg.PubSub,
logger: cfg.Logger,
benchmarkDir: cfg.BenchmarkDir,
pactaDataDir: cfg.PACTADataDir,
}, nil
}

Expand Down Expand Up @@ -221,31 +235,169 @@ func (h *Handler) CreateAudit(ctx context.Context, taskID task.ID, req *task.Cre
return errors.New("not implemented")
}

func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest, reportContainer string) error {
fileNames := []string{}
for _, blobURI := range req.BlobURIs {
// Load the parsed portfolio from blob storage, place it in /mnt/
// processed_portfolios, where the `create_report.R` script expects it
// to be.
fileNameWithExt := filepath.Base(string(blobURI))
if !strings.HasSuffix(fileNameWithExt, ".json") {
return fmt.Errorf("given blob wasn't a JSON-formatted portfolio, %q", fileNameWithExt)
type ReportInput struct {
Portfolio ReportInputPortfolio `json:"portfolio"`
Inherit string `json:"inherit"`
}

type ReportInputPortfolio struct {
Files string `json:"files"`
HoldingsDate string `json:"holdingsDate"`
Name string `json:"name"`
}

type ReportEnv struct {
rootDir string

// These are mounted in from externally.
benchmarksDir string
pactaDataDir string
}

func initReportEnv(benchmarkDir, pactaDataDir, baseDir string) (*ReportEnv, error) {
// Make sure the base directory exists first.
if err := os.MkdirAll(baseDir, 0700); err != nil {
return nil, fmt.Errorf("failed to create base input dir: %w", err)
}
// We create temp subdirectories, because while this code currently executes in
// a new container for each invocation, that might not always be the case.
rootDir, err := os.MkdirTemp(baseDir, "create-report")
if err != nil {
return nil, fmt.Errorf("failed to create temp dir for input CSVs: %w", err)
}

re := &ReportEnv{
rootDir: rootDir,
benchmarksDir: benchmarkDir,
pactaDataDir: pactaDataDir,
}

if err := re.makeDirectories(); err != nil {
return nil, fmt.Errorf("failed to create directories: %w", err)
}

return re, nil
}

type ReportDir string

const (
PortfoliosDir = ReportDir("portfolios")
RealEstateDir = ReportDir("real-estate")
ScoreCardDir = ReportDir("score-card")
SurveyDir = ReportDir("survey")

// Outputs
AnalysisOutputDir = ReportDir("analysis-output")
ReportOutputDir = ReportDir("report-output")
SummaryOutputDir = ReportDir("summary-output")
)

func (r *ReportEnv) outputDirs() []string {
return []string{
r.pathForDir(AnalysisOutputDir),
r.pathForDir(ReportOutputDir),
r.pathForDir(SummaryOutputDir),
}
}

func (r *ReportEnv) asEnvVars() []string {
return []string{
"BENCHMARKS_DIR=" + r.benchmarksDir,
"PACTA_DATA_DIR=" + r.pactaDataDir,
"PORTFOLIO_DIR=" + r.pathForDir(PortfoliosDir),
"REAL_ESTATE_DIR=" + r.pathForDir(RealEstateDir),
"SCORE_CARD_DIR=" + r.pathForDir(ScoreCardDir),
"SURVEY_DIR=" + r.pathForDir(SurveyDir),
"ANALYSIS_OUTPUT_DIR=" + r.pathForDir(AnalysisOutputDir),
"REPORT_OUTPUT_DIR=" + r.pathForDir(ReportOutputDir),
"SUMMARY_OUTPUT_DIR=" + r.pathForDir(SummaryOutputDir),
}
}

func (r *ReportEnv) pathForDir(d ReportDir) string {
return filepath.Join(r.rootDir, string(d))
}

func (r *ReportEnv) makeDirectories() error {
var rErr error
makeDir := func(reportDir ReportDir) {
if rErr != nil {
return
}
fileNames = append(fileNames, strings.TrimSuffix(fileNameWithExt, ".json"))
destPath := filepath.Join("/", "mnt", "processed_portfolios", fileNameWithExt)
if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil {
return fmt.Errorf("failed to download processed portfolio blob: %w", err)
dir := r.pathForDir(reportDir)
if err := os.Mkdir(dir, 0700); err != nil {
rErr = fmt.Errorf("failed to create dir %q: %w", dir, err)
return
}
}

reportDir := filepath.Join("/", "mnt", "reports")
if err := os.MkdirAll(reportDir, 0600); err != nil {
return fmt.Errorf("failed to create directory for reports to get copied to: %w", err)
// Inputs
makeDir(PortfoliosDir)
makeDir(RealEstateDir) // Used as part of specific projects, empty for now.
makeDir(ScoreCardDir) // Used as part of specific projects, empty for now.
makeDir(SurveyDir) // Used as part of specific projects, empty for now.

// Outputs
makeDir(AnalysisOutputDir)
makeDir(ReportOutputDir)
makeDir(SummaryOutputDir)

if rErr != nil {
return rErr
}
return nil
}

func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest, reportContainer string) error {
if n := len(req.BlobURIs); n != 1 {
return fmt.Errorf("expected exactly one blob URI as input, got %d", n)
}
blobURI := req.BlobURIs[0]

// We use this instead of /mnt/... because the base image (quite
// reasonably) uses a non-root user, so we can't be creating directories in the
// root filesystem all willy nilly.
baseDir := filepath.Join("/", "home", "workflow-pacta-webapp")

reportEnv, err := initReportEnv(h.benchmarkDir, h.pactaDataDir, baseDir)
if err != nil {
return fmt.Errorf("failed to init report env: %w", err)
}

// Load the parsed portfolio from blob storage, place it in our PORFOLIO_DIR,
// where the `run_pacta.R` script expects it to be.
fileNameWithExt := filepath.Base(string(blobURI))
if !strings.HasSuffix(fileNameWithExt, ".csv") {
return fmt.Errorf("given blob wasn't a CSV-formatted portfolio, %q", fileNameWithExt)
}
destPath := filepath.Join(reportEnv.pathForDir(PortfoliosDir), fileNameWithExt)
if err := h.downloadBlob(ctx, string(blobURI), destPath); err != nil {
return fmt.Errorf("failed to download processed portfolio blob: %w", err)
}

inp := ReportInput{
Portfolio: ReportInputPortfolio{
Files: fileNameWithExt,
HoldingsDate: "2023-12-31", // TODO(#206)
Name: "FooPortfolio", // TODO(#206)
},
Inherit: "GENERAL_2023Q4", // TODO(#206): Should this be configurable
}

cmd := exec.CommandContext(ctx, "/usr/local/bin/Rscript", "/app/create_report.R")
var inpJSON bytes.Buffer
if err := json.NewEncoder(&inpJSON).Encode(inp); err != nil {
return fmt.Errorf("failed to encode report input as JSON: %w", err)
}

cmd := exec.CommandContext(ctx,
"/usr/local/bin/Rscript",
"--vanilla", "/workflow.pacta.webapp/inst/extdata/scripts/run_pacta.R",
inpJSON.String())

cmd.Env = append(cmd.Env, reportEnv.asEnvVars()...)
cmd.Env = append(cmd.Env,
"PORTFOLIO="+strings.Join(fileNames, ","),
"LOG_LEVEL=DEBUG",
"HOME=/root", /* Required by pandoc */
)
cmd.Stdout = os.Stdout
Expand All @@ -255,23 +407,20 @@ func (h *Handler) CreateReport(ctx context.Context, taskID task.ID, req *task.Cr
return fmt.Errorf("failed to run pacta test CLI: %w", err)
}

// Download outputs from from /out and upload them to Azure
dirEntries, err := os.ReadDir(reportDir)
if err != nil {
return fmt.Errorf("failed to read report directory: %w", err)
}

var artifacts []*task.AnalysisArtifact
for _, dirEntry := range dirEntries {
if !dirEntry.IsDir() {
continue
}
dirPath := filepath.Join(reportDir, dirEntry.Name())
tmp, err := h.uploadDirectory(ctx, dirPath, reportContainer)
uploadDir := func(dir string) error {
aas, err := h.uploadDirectory(ctx, dir, reportContainer, req.AnalysisID)
if err != nil {
return fmt.Errorf("failed to upload report directory: %w", err)
}
artifacts = tmp
artifacts = append(artifacts, aas...)
return nil
}

for _, outDir := range reportEnv.outputDirs() {
if err := uploadDir(outDir); err != nil {
return fmt.Errorf("failed to upload artifacts %q: %w", outDir, err)
}
}

events := []publisher.Event{
Expand Down Expand Up @@ -331,7 +480,7 @@ func (h *Handler) downloadBlob(ctx context.Context, srcURI, destPath string) err
return nil
}

func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string) ([]*task.AnalysisArtifact, error) {
func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string, analysisID pacta.AnalysisID) ([]*task.AnalysisArtifact, error) {
base := filepath.Base(dirPath)

var artifacts []*task.AnalysisArtifact
Expand All @@ -341,14 +490,14 @@ func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string
}

// This is a file, let's upload it to the container
uri := blob.Join(h.blob.Scheme(), container, base, strings.TrimPrefix(path, dirPath+"/"))
uri := blob.Join(h.blob.Scheme(), container, string(analysisID), base, strings.TrimPrefix(path, dirPath+"/"))
if err := h.uploadBlob(ctx, path, uri); err != nil {
return fmt.Errorf("failed to upload blob: %w", err)
}

fn := filepath.Base(path)
// Returns pacta.FileType_UNKNOWN for unrecognized extensions, which we'll serve as binary blobs.
ft := fileTypeFromExt(filepath.Ext(fn))
ft := fileTypeFromFilename(fn)
if ft == pacta.FileType_UNKNOWN {
h.logger.Error("unhandled file extension", zap.String("dir", dirPath), zap.String("file_ext", filepath.Ext(fn)))
}
Expand All @@ -365,7 +514,9 @@ func (h *Handler) uploadDirectory(ctx context.Context, dirPath, container string
return artifacts, nil
}

func fileTypeFromExt(ext string) pacta.FileType {
func fileTypeFromFilename(fn string) pacta.FileType {
ext := filepath.Ext(fn)

switch ext {
case ".csv":
return pacta.FileType_CSV
Expand All @@ -383,8 +534,29 @@ func fileTypeFromExt(ext string) pacta.FileType {
return pacta.FileType_CSS
case ".js":
return pacta.FileType_JS
case ".map":
switch ext2 := filepath.Ext(strings.TrimSuffix(fn, ext)); ext2 {
case ".js":
return pacta.FileType_JS_MAP
default:
return pacta.FileType_UNKNOWN
}
case ".ttf":
return pacta.FileType_TTF
case ".woff":
return pacta.FileType_WOFF
case ".woff2":
return pacta.FileType_WOFF2
case ".eot":
return pacta.FileType_EOT
case ".svg":
return pacta.FileType_SVG
case ".png":
return pacta.FileType_PNG
case ".jpg":
return pacta.FileType_JPG
case ".pdf":
return pacta.FileType_TTF
default:
return pacta.FileType_UNKNOWN
}
Expand Down
3 changes: 3 additions & 0 deletions cmd/runner/configs/local.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,6 @@ azure_topic_location centralus-1

azure_storage_account rmipactalocal
azure_report_container reports

benchmark_dir /mnt/workflow-data/benchmarks/2023Q4_20240529T002355Z
pacta_data_dir /mnt/workflow-data/pacta-data/2023Q4_20240424T120055Z
11 changes: 8 additions & 3 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ func run(args []string) error {
var (
env = fs.String("env", "", "The environment we're running in.")

benchmarkDir = fs.String("benchmark_dir", "", "The path to the benchmark data for report generation")
pactaDataDir = fs.String("pacta_data_dir", "", "The path to the PACTA data for report generation")

azEventTopic = fs.String("azure_event_topic", "", "The EventGrid topic to send notifications when tasks have finished")
azTopicLocation = fs.String("azure_topic_location", "", "The location (like 'centralus-1') where our EventGrid topics are hosted")

Expand Down Expand Up @@ -80,9 +83,11 @@ func run(args []string) error {
}

h, err := async.New(&async.Config{
Blob: blobClient,
PubSub: pubsubClient,
Logger: logger,
Blob: blobClient,
PubSub: pubsubClient,
Logger: logger,
BenchmarkDir: *benchmarkDir,
PACTADataDir: *pactaDataDir,
})
if err != nil {
return fmt.Errorf("failed to init async biz logic handler: %w", err)
Expand Down
Loading

0 comments on commit 2715e70

Please sign in to comment.