Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Renames "process" step to "parse" #60

Merged
merged 2 commits into from
Nov 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions azure/azevents/azevents.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,11 @@ type Config struct {
// prevent random unauthenticated internet requests from triggering webhooks.
AllowedAuthSecrets []string

ProcessedPortfolioTopicName string
ParsedPortfolioTopicName string
}

const parsedPortfolioPath = "/events/parsed_portfolio"
gbdubs marked this conversation as resolved.
Show resolved Hide resolved

func (c *Config) validate() error {
if c.Logger == nil {
return errors.New("no logger was given")
Expand All @@ -44,8 +46,8 @@ func (c *Config) validate() error {
if len(c.AllowedAuthSecrets) == 0 {
return errors.New("no auth secrets were given")
}
if c.ProcessedPortfolioTopicName == "" {
return errors.New("no resource group given")
if c.ParsedPortfolioTopicName == "" {
return errors.New("no parsed portfolio topic name given")
}
return nil
}
Expand All @@ -72,7 +74,7 @@ func NewServer(cfg *Config) (*Server, error) {
subscription: cfg.Subscription,
resourceGroup: cfg.ResourceGroup,
pathToTopic: map[string]string{
"/events/processed_portfolio": cfg.ProcessedPortfolioTopicName,
parsedPortfolioPath: cfg.ParsedPortfolioTopicName,
},
}, nil
}
Expand Down Expand Up @@ -182,16 +184,16 @@ func (s *Server) verifyWebhook(next http.Handler) http.Handler {

func (s *Server) RegisterHandlers(r chi.Router) {
r.Use(s.verifyWebhook)
r.Post("/events/processed_portfolio", func(w http.ResponseWriter, r *http.Request) {
r.Post(parsedPortfolioPath, func(w http.ResponseWriter, r *http.Request) {
var reqs []struct {
Data *task.ProcessPortfolioResponse `json:"data"`
EventType string `json:"eventType"`
ID string `json:"id"`
Subject string `json:"subject"`
DataVersion string `json:"dataVersion"`
MetadataVersion string `json:"metadataVersion"`
EventTime time.Time `json:"eventTime"`
Topic string `json:"topic"`
Data *task.ParsePortfolioResponse `json:"data"`
EventType string `json:"eventType"`
ID string `json:"id"`
Subject string `json:"subject"`
DataVersion string `json:"dataVersion"`
MetadataVersion string `json:"metadataVersion"`
EventTime time.Time `json:"eventTime"`
Topic string `json:"topic"`
}
if err := json.NewDecoder(r.Body).Decode(&reqs); err != nil {
s.logger.Error("failed to parse webhook request body", zap.Error(err))
Expand All @@ -211,7 +213,7 @@ func (s *Server) RegisterHandlers(r chi.Router) {
return
}

// TODO: Add any database persistence and other things we'd want to do after a portfolio was processed.
s.logger.Info("processed portfolio", zap.String("task_id", string(req.Data.TaskID)), zap.Strings("outputs", req.Data.Outputs))
// TODO: Add any database persistence and other things we'd want to do after a portfolio was parsed.
s.logger.Info("parsed portfolio", zap.String("task_id", string(req.Data.TaskID)), zap.Strings("outputs", req.Data.Outputs))
})
}
4 changes: 2 additions & 2 deletions cmd/runner/configs/dev.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
env dev
min_log_level warn

azure_processed_portfolio_topic processed-portfolios-dev
azure_event_parsed_portfolio_topic parsed-portfolios-dev
azure_topic_location centralus-1

azure_storage_account rmipactadev
azure_source_portfolio_container uploadedportfolios
azure_dest_portfolio_container processedportfolios
azure_dest_portfolio_container parsedportfolios
azure_report_container reports
4 changes: 2 additions & 2 deletions cmd/runner/configs/local.conf
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
env local
min_log_level debug

azure_processed_portfolio_topic processed-portfolios-local
azure_event_parsed_portfolio_topic parsed-portfolios-local
azure_topic_location centralus-1

azure_storage_account rmipactalocal
azure_source_portfolio_container uploadedportfolios
azure_dest_portfolio_container processedportfolios
azure_dest_portfolio_container parsedportfolios
azure_report_container reports
26 changes: 13 additions & 13 deletions cmd/runner/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ func run(args []string) error {
var (
env = fs.String("env", "", "The environment we're running in.")

azProcessedPortfolioTopic = fs.String("azure_processed_portfolio_topic", "", "The EventGrid topic to send notifications of processed portfolios")
azTopicLocation = fs.String("azure_topic_location", "", "The location (like 'centralus-1') where our EventGrid topics are hosted")
azEventParsePortfolioCompleteTopic = fs.String("azure_event_parse_portfolio_complete_topic", "", "The EventGrid topic to send notifications when parsing of portfolio(s) has finished")
azTopicLocation = fs.String("azure_topic_location", "", "The location (like 'centralus-1') where our EventGrid topics are hosted")

azStorageAccount = fs.String("azure_storage_account", "", "The storage account to authenticate against for blob operations")
azSourcePortfolioContainer = fs.String("azure_source_portfolio_container", "", "The container in the storage account where we read raw portfolios from")
Expand Down Expand Up @@ -99,7 +99,7 @@ func run(args []string) error {
}
}

pubsubClient, err := publisher.NewClient(fmt.Sprintf("https://%s.%s.eventgrid.azure.net/api/events", *azProcessedPortfolioTopic, *azTopicLocation), creds, nil)
pubsubClient, err := publisher.NewClient(fmt.Sprintf("https://%s.%s.eventgrid.azure.net/api/events", *azEventParsePortfolioCompleteTopic, *azTopicLocation), creds, nil)
if err != nil {
return fmt.Errorf("failed to init pub/sub client: %w", err)
}
Expand All @@ -120,8 +120,8 @@ func run(args []string) error {
}

validTasks := map[task.Type]func(context.Context, task.ID) error{
task.ProcessPortfolio: toRunFn(processPortfolioReq, h.processPortfolio),
task.CreateReport: toRunFn(createReportReq, h.createReport),
task.ParsePortfolio: toRunFn(parsePortfolioReq, h.parsePortfolio),
task.CreateReport: toRunFn(createReportReq, h.createReport),
}

taskID := task.ID(os.Getenv("TASK_ID"))
Expand Down Expand Up @@ -166,7 +166,7 @@ type handler struct {
reportContainer string
}

func processPortfolioReq() (*task.ProcessPortfolioRequest, error) {
func parsePortfolioReq() (*task.ParsePortfolioRequest, error) {
rawAssetIDs := os.Getenv("ASSET_IDS")
if rawAssetIDs == "" {
return nil, errors.New("no ASSET_IDS given")
Expand All @@ -177,7 +177,7 @@ func processPortfolioReq() (*task.ProcessPortfolioRequest, error) {
return nil, fmt.Errorf("failed to load asset IDs: %w", err)
}

return &task.ProcessPortfolioRequest{
return &task.ParsePortfolioRequest{
AssetIDs: assetIDs,
}, nil
}
Expand Down Expand Up @@ -250,7 +250,7 @@ func (h *handler) downloadBlob(ctx context.Context, srcURI, destPath string) err
return nil
}

func (h *handler) processPortfolio(ctx context.Context, taskID task.ID, req *task.ProcessPortfolioRequest) error {
func (h *handler) parsePortfolio(ctx context.Context, taskID task.ID, req *task.ParsePortfolioRequest) error {
// Load the portfolio from blob storage, place it in /mnt/raw_portfolios, where
// the `process_portfolios.R` script expects it to be.
for _, assetID := range req.AssetIDs {
Expand Down Expand Up @@ -295,20 +295,20 @@ func (h *handler) processPortfolio(ctx context.Context, taskID task.ID, req *tas
for _, p := range paths {
destURI := blob.Join(h.blob.Scheme(), h.destPortfolioContainer, filepath.Base(p))
if err := h.uploadBlob(ctx, p, destURI); err != nil {
return fmt.Errorf("failed to copy processed portfolio from %q to %q: %w", p, destURI, err)
return fmt.Errorf("failed to copy parsed portfolio from %q to %q: %w", p, destURI, err)
}
out = append(out, destURI)
}

events := []publisher.Event{
{
Data: task.ProcessPortfolioResponse{
Data: task.ParsePortfolioResponse{
TaskID: taskID,
AssetIDs: req.AssetIDs,
Outputs: out,
},
DataVersion: to.Ptr("1.0"),
EventType: to.Ptr("processed-portfolio"),
EventType: to.Ptr("parse-portfolio-complete"),
EventTime: to.Ptr(time.Now()),
ID: to.Ptr(string(taskID)),
Subject: to.Ptr("subject"),
Expand All @@ -319,7 +319,7 @@ func (h *handler) processPortfolio(ctx context.Context, taskID task.ID, req *tas
return fmt.Errorf("failed to publish event: %w", err)
}

h.logger.Info("processed portfolio", zap.String("task_id", string(taskID)))
h.logger.Info("parsed portfolio", zap.String("task_id", string(taskID)))

return nil
}
Expand All @@ -338,7 +338,7 @@ func createReportReq() (*task.CreateReportRequest, error) {
func (h *handler) createReport(ctx context.Context, taskID task.ID, req *task.CreateReportRequest) error {
baseName := string(req.PortfolioID) + ".json"

// Load the processed portfolio from blob storage, place it in /mnt/
// Load the parsed portfolio from blob storage, place it in /mnt/
// processed_portfolios, where the `create_report.R` script expects it
// to be.
srcURI := blob.Join(h.blob.Scheme(), h.destPortfolioContainer, baseName)
Expand Down
4 changes: 2 additions & 2 deletions cmd/runner/taskrunner/taskrunner.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ func New(cfg *Config) (*TaskRunner, error) {
}, nil
}

func (tr *TaskRunner) ProcessPortfolio(ctx context.Context, req *task.ProcessPortfolioRequest) (task.ID, task.RunnerID, error) {
func (tr *TaskRunner) ParsePortfolio(ctx context.Context, req *task.ParsePortfolioRequest) (task.ID, task.RunnerID, error) {
var buf bytes.Buffer
if err := json.NewEncoder(&buf).Encode(req.AssetIDs); err != nil {
return "", "", fmt.Errorf("failed to encode asset IDs: %w", err)
}
return tr.run(ctx, []task.EnvVar{
{
Key: "TASK_TYPE",
Value: string(task.ProcessPortfolio),
Value: string(task.ParsePortfolio),
},
{
Key: "ASSET_IDS",
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/configs/dev.conf
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,4 @@ use_azure_runner true

azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9
azure_event_resource_group rmi-pacta-dev
azure_event_processed_portfolio_topic processed-portfolios-dev
azure_event_parsed_portfolio_topic parsed-portfolios-dev
2 changes: 1 addition & 1 deletion cmd/server/configs/local.conf
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ allowed_cors_origin http://localhost:3000

azure_event_subscription 69b6db12-37e3-4e1f-b48c-aa41dba612a9
azure_event_resource_group rmi-pacta-local
azure_event_processed_portfolio_topic processed-portfolios-local
azure_event_parsed_portfolio_topic parsed-portfolios-local

secret_postgres_host UNUSED
# Also unused
Expand Down
16 changes: 8 additions & 8 deletions cmd/server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,9 @@ func run(args []string) error {
env = fs.String("env", "", "The environment that we're running in.")
localDSN = fs.String("local_dsn", "", "If set, override the DB addresses retrieved from the secret configuration. Can only be used when running locally.")

azEventSubscription = fs.String("azure_event_subscription", "", "The Azure Subscription ID to allow webhook registrations from")
azEventResourceGroup = fs.String("azure_event_resource_group", "", "The Azure resource group to allow webhook registrations from")
azEventProcessedPortfolioTopic = fs.String("azure_event_processed_portfolio_topic", "", "The name of the topic for webhooks about processed portfolios")
azEventSubscription = fs.String("azure_event_subscription", "", "The Azure Subscription ID to allow webhook registrations from")
azEventResourceGroup = fs.String("azure_event_resource_group", "", "The Azure resource group to allow webhook registrations from")
azEventParsedPortfolioTopic = fs.String("azure_event_parsed_portfolio_topic", "", "The name of the topic for webhooks about parsed portfolios")

// Only when running locally because the Dockerized runner can't use local `az` CLI credentials
localDockerTenantID = fs.String("local_docker_tenant_id", "", "The Azure Tenant ID the localdocker service principal lives in")
Expand Down Expand Up @@ -285,11 +285,11 @@ func run(args []string) error {
})

eventSrv, err := azevents.NewServer(&azevents.Config{
Logger: logger,
AllowedAuthSecrets: strings.Split(*azEventWebhookSecrets, ","),
Subscription: *azEventSubscription,
ResourceGroup: *azEventResourceGroup,
ProcessedPortfolioTopicName: *azEventProcessedPortfolioTopic,
Logger: logger,
AllowedAuthSecrets: strings.Split(*azEventWebhookSecrets, ","),
Subscription: *azEventSubscription,
ResourceGroup: *azEventResourceGroup,
ParsedPortfolioTopicName: *azEventParsedPortfolioTopic,
})
if err != nil {
return fmt.Errorf("failed to init Azure Event Grid handler: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/pactasrv/pactasrv.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ var (
)

type TaskRunner interface {
ProcessPortfolio(ctx context.Context, req *task.ProcessPortfolioRequest) (task.ID, task.RunnerID, error)
ParsePortfolio(ctx context.Context, req *task.ParsePortfolioRequest) (task.ID, task.RunnerID, error)
CreateReport(ctx context.Context, req *task.CreateReportRequest) (task.ID, task.RunnerID, error)
}

Expand Down
8 changes: 4 additions & 4 deletions cmd/server/pactasrv/portfolio.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,18 @@ func (s *Server) CreatePortfolioAsset(ctx context.Context, req api.CreatePortfol
}, nil
}

func (s *Server) ProcessPortfolio(ctx context.Context, req api.ProcessPortfolioRequestObject) (api.ProcessPortfolioResponseObject, error) {
taskID, runnerID, err := s.TaskRunner.ProcessPortfolio(ctx, &task.ProcessPortfolioRequest{
func (s *Server) ParsePortfolio(ctx context.Context, req api.ParsePortfolioRequestObject) (api.ParsePortfolioResponseObject, error) {
taskID, runnerID, err := s.TaskRunner.ParsePortfolio(ctx, &task.ParsePortfolioRequest{
AssetIDs: req.Body.AssetIds,
// PortfolioID: req.Body.PortfolioID,
})
if err != nil {
return nil, oapierr.Internal("failed to start task", zap.Error(err))
}
s.Logger.Info("triggered process portfolio task",
s.Logger.Info("triggered parse portfolio task",
zap.String("task_id", string(taskID)),
zap.String("task_runner_id", string(runnerID)))
return api.ProcessPortfolio200JSONResponse{
return api.ParsePortfolio200JSONResponse{
TaskId: string(taskID),
}, nil
}
4 changes: 2 additions & 2 deletions frontend/openapi/generated/pacta/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@ export type { NewPortfolioAsset } from './models/NewPortfolioAsset';
export type { PactaVersion } from './models/PactaVersion';
export type { PactaVersionChanges } from './models/PactaVersionChanges';
export type { PactaVersionCreate } from './models/PactaVersionCreate';
export type { ProcessPortfolioReq } from './models/ProcessPortfolioReq';
export type { ProcessPortfolioResp } from './models/ProcessPortfolioResp';
export type { ParsePortfolioReq } from './models/ParsePortfolioReq';
export type { ParsePortfolioResp } from './models/ParsePortfolioResp';
export { User } from './models/User';
export { UserChanges } from './models/UserChanges';

Expand Down
1 change: 0 additions & 1 deletion frontend/openapi/generated/pacta/models/Error.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ export type Error = {
* An enum-like type indicating a more specific type of error.
*
* An example might be getting a 401 Unauthorized because you're logged in with multiple emails and haven't selected one, the error_id could be 'multiple_emails'.
*
*/
error_id: string;
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/* tslint:disable */
/* eslint-disable */

export type ProcessPortfolioReq = {
export type ParsePortfolioReq = {
asset_ids: Array<string>;
};

Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/* tslint:disable */
/* eslint-disable */

export type ProcessPortfolioResp = {
export type ParsePortfolioResp = {
/**
* The ID of the async task for processing the portfoio
*/
Expand Down
14 changes: 7 additions & 7 deletions frontend/openapi/generated/pacta/services/DefaultService.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ import type { NewPortfolioAsset } from '../models/NewPortfolioAsset';
import type { PactaVersion } from '../models/PactaVersion';
import type { PactaVersionChanges } from '../models/PactaVersionChanges';
import type { PactaVersionCreate } from '../models/PactaVersionCreate';
import type { ProcessPortfolioReq } from '../models/ProcessPortfolioReq';
import type { ProcessPortfolioResp } from '../models/ProcessPortfolioResp';
import type { ParsePortfolioReq } from '../models/ParsePortfolioReq';
import type { ParsePortfolioResp } from '../models/ParsePortfolioResp';
import type { User } from '../models/User';
import type { UserChanges } from '../models/UserChanges';

Expand Down Expand Up @@ -508,15 +508,15 @@ export class DefaultService {
* Starts processing raw uploaded files
*
* @param requestBody The raw portfolio files to process
* @returns ProcessPortfolioResp The task has been started successfully
* @returns ParsePortfolioResp The task has been started successfully
* @throws ApiError
*/
public processPortfolio(
requestBody: ProcessPortfolioReq,
): CancelablePromise<ProcessPortfolioResp> {
public parsePortfolio(
requestBody: ParsePortfolioReq,
): CancelablePromise<ParsePortfolioResp> {
return this.httpRequest.request({
method: 'POST',
url: '/test:processPortfolio',
url: '/test:parsePortfolio',
body: requestBody,
mediaType: 'application/json',
});
Expand Down
2 changes: 1 addition & 1 deletion frontend/pages/admin/portfolio_test.vue
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ const startProcessing = async () => {
if (assetIDs.value?.length === 0) {
return
}
const resp = await pactaClient.processPortfolio({ asset_ids: assetIDs.value })
const resp = await pactaClient.parsePortfolio({ asset_ids: assetIDs.value })
alert(`TASK ID: ${resp.task_id}`)
}

Expand Down
Loading