Skip to content

Commit

Permalink
s/process/parse/g
Browse files Browse the repository at this point in the history
  • Loading branch information
gbdubs committed Nov 16, 2023
1 parent 09a2315 commit 9273aee
Show file tree
Hide file tree
Showing 19 changed files with 76 additions and 74 deletions.
32 changes: 17 additions & 15 deletions azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ type Config struct {
// prevent random unauthenticated internet requests from triggering webhooks.
AllowedAuthSecrets []string

ProcessedPortfolioTopicName string
ParsedPortfolioTopicName string
}

const parsedPortfolioPath = "/events/parsed_portfolio"

func (c *Config) validate() error {
if c.Logger == nil {
return errors.New("no logger was given")
Expand All @@ -44,8 +46,8 @@ func (c *Config) validate() error {
if len(c.AllowedAuthSecrets) == 0 {
return errors.New("no auth secrets were given")
}
if c.ProcessedPortfolioTopicName == "" {
return errors.New("no resource group given")
if c.ParsedPortfolioTopicName == "" {
return errors.New("no parsed portfolio topic name given")
}
return nil
}
Expand All @@ -72,7 +74,7 @@ func NewServer(cfg *Config) (*Server, error) {
subscription: cfg.Subscription,
resourceGroup: cfg.ResourceGroup,
pathToTopic: map[string]string{
"/events/processed_portfolio": cfg.ProcessedPortfolioTopicName,
parsedPortfolioPath: cfg.ParsedPortfolioTopicName,
},
}, nil
}
Expand Down Expand Up @@ -182,16 +184,16 @@ func (s *Server) verifyWebhook(next http.Handler) http.Handler {

func (s *Server) RegisterHandlers(r chi.Router) {
r.Use(s.verifyWebhook)
r.Post("/events/processed_portfolio", func(w http.ResponseWriter, r *http.Request) {
r.Post(parsedPortfolioPath, func(w http.ResponseWriter, r *http.Request) {
var reqs []struct {
Data *task.ProcessPortfolioResponse `json:"data"`
EventType string `json:"eventType"`
ID string `json:"id"`
Subject string `json:"subject"`
DataVersion string `json:"dataVersion"`
MetadataVersion string `json:"metadataVersion"`
EventTime time.Time `json:"eventTime"`
Topic string `json:"topic"`
Data *task.ParsePortfolioResponse `json:"data"`
EventType string `json:"eventType"`
ID string `json:"id"`
Subject string `json:"subject"`
DataVersion string `json:"dataVersion"`
MetadataVersion string `json:"metadataVersion"`
EventTime time.Time `json:"eventTime"`
Topic string `json:"topic"`
}
if err := json.NewDecoder(r.Body).Decode(&reqs); err != nil {
s.logger.Error("failed to parse webhook request body", zap.Error(err))
Expand All @@ -211,7 +213,7 @@ func (s *Server) RegisterHandlers(r chi.Router) {
return
}

// TODO: Add any database persistence and other things we'd want to do after a portfolio was processed.
s.logger.Info("processed portfolio", zap.String("task_id", string(req.Data.TaskID)), zap.Strings("outputs", req.Data.Outputs))
// TODO: Add any database persistence and other things we'd want to do after a portfolio was parsed.
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)), zap.Strings("outputs", req.Data.Outputs))
})
}
4 changes: 2 additions & 2 deletions cmd/runner/configs/dev.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
env dev
min_log_level warn

azure_processed_portfolio_topic processed-portfolios-dev
azure_event_parsed_portfolio_topic parsed-portfolios-dev
azure_topic_location centralus-1

azure_storage_account rmipactadev
azure_source_portfolio_container uploadedportfolios
azure_dest_portfolio_container processedportfolios
azure_dest_portfolio_container parsedportfolios
azure_report_container reports
4 changes: 2 additions & 2 deletions cmd/runner/configs/local.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
env local
min_log_level debug

azure_processed_portfolio_topic processed-portfolios-local
azure_event_parsed_portfolio_topic parsed-portfolios-local
azure_topic_location centralus-1

azure_storage_account rmipactalocal
azure_source_portfolio_container uploadedportfolios
azure_dest_portfolio_container processedportfolios
azure_dest_portfolio_container parsedportfolios
azure_report_container reports
26 changes: 13 additions & 13 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func run(args []string) error {
var (
env = fs.String("env", "", "The environment we're running in.")

azProcessedPortfolioTopic = fs.String("azure_processed_portfolio_topic", "", "The EventGrid topic to send notifications of processed portfolios")
azTopicLocation = fs.String("azure_topic_location", "", "The location (like 'centralus-1') where our EventGrid topics are hosted")
azEventParsePortfolioCompleteTopic = fs.String("azure_event_parse_portfolio_complete_topic", "", "The EventGrid topic to send notifications when processing of portfolio(s) has finished")
azTopicLocation = fs.String("azure_topic_location", "", "The location (like 'centralus-1') where our EventGrid topics are hosted")

azStorageAccount = fs.String("azure_storage_account", "", "The storage account to authenticate against for blob operations")
azSourcePortfolioContainer = fs.String("azure_source_portfolio_container", "", "The container in the storage account where we read raw portfolios from")
Expand Down Expand Up @@ -99,7 +99,7 @@ func run(args []string) error {
}
}

pubsubClient, err := publisher.NewClient(fmt.Sprintf("https://%s.%s.eventgrid.azure.net/api/events", *azProcessedPortfolioTopic, *azTopicLocation), creds, nil)
pubsubClient, err := publisher.NewClient(fmt.Sprintf("https://%s.%s.eventgrid.azure.net/api/events", *azEventParsePortfolioCompleteTopic, *azTopicLocation), creds, nil)
if err != nil {
return fmt.Errorf("failed to init pub/sub client: %w", err)
}
Expand All @@ -120,8 +120,8 @@ func run(args []string) error {
}

validTasks := map[task.Type]func(context.Context, task.ID) error{
task.ProcessPortfolio: toRunFn(processPortfolioReq, h.processPortfolio),
task.CreateReport: toRunFn(createReportReq, h.createReport),
task.ParsePortfolio: toRunFn(parsePortfolioReq, h.parsePortfolio),
task.CreateReport: toRunFn(createReportReq, h.createReport),
}

taskID := task.ID(os.Getenv("TASK_ID"))
Expand Down Expand Up @@ -166,7 +166,7 @@ type handler struct {
reportContainer string
}

func processPortfolioReq() (*task.ProcessPortfolioRequest, error) {
func parsePortfolioReq() (*task.ParsePortfolioRequest, error) {
rawAssetIDs := os.Getenv("ASSET_IDS")
if rawAssetIDs == "" {
return nil, errors.New("no ASSET_IDS given")
Expand All @@ -177,7 +177,7 @@ func processPortfolioReq() (*task.ProcessPortfolioRequest, error) {
return nil, fmt.Errorf("failed to load asset IDs: %w", err)
}

return &task.ProcessPortfolioRequest{
return &task.ParsePortfolioRequest{
AssetIDs: assetIDs,
}, nil
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func (h *handler) downloadBlob(ctx context.Context, srcURI, destPath string) err
return nil
}

func (h *handler) processPortfolio(ctx context.Context, taskID task.ID, req *task.ProcessPortfolioRequest) error {
func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest) error {
// Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where
// the `process_portfolios.R` script expects it to be.
for _, assetID := range req.AssetIDs {
Expand Down Expand Up @@ -295,20 +295,20 @@ func (h *handler) processPortfolio(ctx context.Context, taskID task.ID, req *tas
for _, p := range paths {
destURI := blob.Join(h.blob.Scheme(), h.destPortfolioContainer, filepath.Base(p))
if err := h.uploadBlob(ctx, p, destURI); err != nil {
return fmt.Errorf("failed to copy processed portfolio from %q to %q: %w", p, destURI, err)
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, destURI, err)
}
out = append(out, destURI)
}

events := []publisher.Event{
{
Data: task.ProcessPortfolioResponse{
Data: task.ParsePortfolioResponse{
TaskID: taskID,
AssetIDs: req.AssetIDs,
Outputs: out,
},
DataVersion: to.Ptr("1.0"),
EventType: to.Ptr("processed-portfolio"),
EventType: to.Ptr("parse-portfolio-complete"),
EventTime: to.Ptr(time.Now()),
ID: to.Ptr(string(taskID)),
Subject: to.Ptr("subject"),
Expand All @@ -319,7 +319,7 @@ func (h *handler) processPortfolio(ctx context.Context, taskID task.ID, req *tas
return fmt.Errorf("failed to publish event: %w", err)
}

h.logger.Info("processed portfolio", zap.String("task_id", string(taskID)))
h.logger.Info("parsed portfolio", zap.String("task_id", string(taskID)))

return nil
}
Expand All @@ -338,7 +338,7 @@ func createReportReq() (*task.CreateReportRequest, error) {
func (h *handler) createReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest) error {
baseName := string(req.PortfolioID) + ".json"

// Load the processed portfolio from blob storage, place it in /mnt/
// Load the parsed portfolio from blob storage, place it in /mnt/
// processed_portfolios, where the `create_report.R` script expects it
// to be.
srcURI := blob.Join(h.blob.Scheme(), h.destPortfolioContainer, baseName)
Expand Down
4 changes: 2 additions & 2 deletions cmd/runner/taskrunner/taskrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ func New(cfg *Config) (*TaskRunner, error) {
}, nil
}

func (tr *TaskRunner) ProcessPortfolio(ctx context.Context, req *task.ProcessPortfolioRequest) (task.ID, task.RunnerID, error) {
func (tr *TaskRunner) ParsePortfolio(ctx context.Context, req *task.ParsePortfolioRequest) (task.ID, task.RunnerID, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(req.AssetIDs); err != nil {
return "", "", fmt.Errorf("failed to encode asset IDs: %w", err)
}
return tr.run(ctx, []task.EnvVar{
{
Key: "TASK_TYPE",
Value: string(task.ProcessPortfolio),
Value: string(task.ParsePortfolio),
},
{
Key: "ASSET_IDS",
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/configs/dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ use_azure_runner true

azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9
azure_event_resource_group rmi-pacta-dev
azure_event_processed_portfolio_topic processed-portfolios-dev
azure_event_parsed_portfolio_topic parsed-portfolios-dev
2 changes: 1 addition & 1 deletion cmd/server/configs/local.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ allowed_cors_origin http://localhost:3000

azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9
azure_event_resource_group rmi-pacta-local
azure_event_processed_portfolio_topic processed-portfolios-local
azure_event_parsed_portfolio_topic parsed-portfolios-local

secret_postgres_host UNUSED
# Also unused
Expand Down
16 changes: 8 additions & 8 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func run(args []string) error {
env = fs.String("env", "", "The environment that we're running in.")
localDSN = fs.String("local_dsn", "", "If set, override the DB addresses retrieved from the secret configuration. Can only be used when running locally.")

azEventSubscription = fs.String("azure_event_subscription", "", "The Azure Subscription ID to allow webhook registrations from")
azEventResourceGroup = fs.String("azure_event_resource_group", "", "The Azure resource group to allow webhook registrations from")
azEventProcessedPortfolioTopic = fs.String("azure_event_processed_portfolio_topic", "", "The name of the topic for webhooks about processed portfolios")
azEventSubscription = fs.String("azure_event_subscription", "", "The Azure Subscription ID to allow webhook registrations from")
azEventResourceGroup = fs.String("azure_event_resource_group", "", "The Azure resource group to allow webhook registrations from")
azEventParsedPortfolioTopic = fs.String("azure_event_parsed_portfolio_topic", "", "The name of the topic for webhooks about parsed portfolios")

// Only when running locally because the Dockerized runner can't use local `az` CLI credentials
localDockerTenantID = fs.String("local_docker_tenant_id", "", "The Azure Tenant ID the localdocker service principal lives in")
Expand Down Expand Up @@ -285,11 +285,11 @@ func run(args []string) error {
})

eventSrv, err := azevents.NewServer(&azevents.Config{
Logger: logger,
AllowedAuthSecrets: strings.Split(*azEventWebhookSecrets, ","),
Subscription: *azEventSubscription,
ResourceGroup: *azEventResourceGroup,
ProcessedPortfolioTopicName: *azEventProcessedPortfolioTopic,
Logger: logger,
AllowedAuthSecrets: strings.Split(*azEventWebhookSecrets, ","),
Subscription: *azEventSubscription,
ResourceGroup: *azEventResourceGroup,
ParsedPortfolioTopicName: *azEventParsedPortfolioTopic,
})
if err != nil {
return fmt.Errorf("failed to init Azure Event Grid handler: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/pactasrv/pactasrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (
)

type TaskRunner interface {
ProcessPortfolio(ctx context.Context, req *task.ProcessPortfolioRequest) (task.ID, task.RunnerID, error)
ParsePortfolio(ctx context.Context, req *task.ParsePortfolioRequest) (task.ID, task.RunnerID, error)
CreateReport(ctx context.Context, req *task.CreateReportRequest) (task.ID, task.RunnerID, error)
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/server/pactasrv/portfolio.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ func (s *Server) CreatePortfolioAsset(ctx context.Context, req api.CreatePortfol
}, nil
}

func (s *Server) ProcessPortfolio(ctx context.Context, req api.ProcessPortfolioRequestObject) (api.ProcessPortfolioResponseObject, error) {
taskID, runnerID, err := s.TaskRunner.ProcessPortfolio(ctx, &task.ProcessPortfolioRequest{
func (s *Server) ParsePortfolio(ctx context.Context, req api.ParsePortfolioRequestObject) (api.ParsePortfolioResponseObject, error) {
taskID, runnerID, err := s.TaskRunner.ParsePortfolio(ctx, &task.ParsePortfolioRequest{
AssetIDs: req.Body.AssetIds,
// PortfolioID: req.Body.PortfolioID,
})
if err != nil {
return nil, oapierr.Internal("failed to start task", zap.Error(err))
}
s.Logger.Info("triggered process portfolio task",
s.Logger.Info("triggered parse portfolio task",
zap.String("task_id", string(taskID)),
zap.String("task_runner_id", string(runnerID)))
return api.ProcessPortfolio200JSONResponse{
return api.ParsePortfolio200JSONResponse{
TaskId: string(taskID),
}, nil
}
4 changes: 2 additions & 2 deletions frontend/openapi/generated/pacta/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ export type { NewPortfolioAsset } from './models/NewPortfolioAsset';
export type { PactaVersion } from './models/PactaVersion';
export type { PactaVersionChanges } from './models/PactaVersionChanges';
export type { PactaVersionCreate } from './models/PactaVersionCreate';
export type { ProcessPortfolioReq } from './models/ProcessPortfolioReq';
export type { ProcessPortfolioResp } from './models/ProcessPortfolioResp';
export type { ParsePortfolioReq } from './models/ParsePortfolioReq';
export type { ParsePortfolioResp } from './models/ParsePortfolioResp';
export { User } from './models/User';
export { UserChanges } from './models/UserChanges';

Expand Down
1 change: 0 additions & 1 deletion frontend/openapi/generated/pacta/models/Error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export type Error = {
* An enum-like type indicating a more specific type of error.
*
* An example might be getting a 401 Unauthorized because you're logged in with multiple emails and haven't selected one, the error_id could be 'multiple_emails'.
*
*/
error_id: string;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/* tslint:disable */
/* eslint-disable */

export type ProcessPortfolioReq = {
export type ParsePortfolioReq = {
asset_ids: Array<string>;
};

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/* tslint:disable */
/* eslint-disable */

export type ProcessPortfolioResp = {
export type ParsePortfolioResp = {
/**
* The ID of the async task for processing the portfoio
*/
Expand Down
14 changes: 7 additions & 7 deletions frontend/openapi/generated/pacta/services/DefaultService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import type { NewPortfolioAsset } from '../models/NewPortfolioAsset';
import type { PactaVersion } from '../models/PactaVersion';
import type { PactaVersionChanges } from '../models/PactaVersionChanges';
import type { PactaVersionCreate } from '../models/PactaVersionCreate';
import type { ProcessPortfolioReq } from '../models/ProcessPortfolioReq';
import type { ProcessPortfolioResp } from '../models/ProcessPortfolioResp';
import type { ParsePortfolioReq } from '../models/ParsePortfolioReq';
import type { ParsePortfolioResp } from '../models/ParsePortfolioResp';
import type { User } from '../models/User';
import type { UserChanges } from '../models/UserChanges';

Expand Down Expand Up @@ -508,15 +508,15 @@ export class DefaultService {
* Starts processing raw uploaded files
*
* @param requestBody The raw portfolio files to process
* @returns ProcessPortfolioResp The task has been started successfully
* @returns ParsePortfolioResp The task has been started successfully
* @throws ApiError
*/
public processPortfolio(
requestBody: ProcessPortfolioReq,
): CancelablePromise<ProcessPortfolioResp> {
public parsePortfolio(
requestBody: ParsePortfolioReq,
): CancelablePromise<ParsePortfolioResp> {
return this.httpRequest.request({
method: 'POST',
url: '/test:processPortfolio',
url: '/test:parsePortfolio',
body: requestBody,
mediaType: 'application/json',
});
Expand Down
2 changes: 1 addition & 1 deletion frontend/pages/admin/portfolio_test.vue
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const startProcessing = async () => {
if (assetIDs.value?.length === 0) {
return
}
const resp = await pactaClient.processPortfolio({ asset_ids: assetIDs.value })
const resp = await pactaClient.parsePortfolio({ asset_ids: assetIDs.value })
alert(`TASK ID: ${resp.task_id}`)
}
Expand Down
Loading

0 comments on commit 9273aee

Please sign in to comment.