From 89f35ec9314ba5737f94d090c7c3493adedbad66 Mon Sep 17 00:00:00 2001 From: "Yiqun (Ethan) Zhang" Date: Wed, 19 Jun 2024 02:15:21 -0500 Subject: [PATCH] run: Parse query files early --- cmd/run/main.go | 2 ++ stage/map.go | 17 +++++++++-------- stage/no_influx.go | 5 ++++- stage/stage.go | 22 ++++++++++++++-------- 4 files changed, 29 insertions(+), 17 deletions(-) diff --git a/cmd/run/main.go b/cmd/run/main.go index 92bf4620..ad8b3005 100644 --- a/cmd/run/main.go +++ b/cmd/run/main.go @@ -59,6 +59,8 @@ func Run(_ *cobra.Command, args []string) { } defaultRunNameBuilder.WriteString(st.Id) } + } else { + log.Fatal().Str("path", path).Err(err).Msg("failed to process stage path") } } if defaultRunNameBuilder != nil { diff --git a/stage/map.go b/stage/map.go index 60621e2e..62c85ba0 100644 --- a/stage/map.go +++ b/stage/map.go @@ -57,6 +57,15 @@ func ReadStageFromFile(filePath string) (*Stage, error) { if err = json.Unmarshal(bytes, stage); err != nil { return nil, fmt.Errorf("failed to parse json %s: %w", filePath, err) } + for i, queryFile := range stage.QueryFiles { + if !filepath.IsAbs(queryFile) { + queryFile = filepath.Join(stage.BaseDir, queryFile) + stage.QueryFiles[i] = queryFile + } + if _, err = os.Stat(queryFile); err != nil { + return nil, fmt.Errorf("%s links to an invalid query file %s: %w", stage.Id, queryFile, err) + } + } log.Debug().Str("id", stage.Id).Str("path", filePath).Msg("read stage file") return stage, nil } @@ -80,14 +89,6 @@ func ParseStage(stage *Stage, stages Map) (*Stage, error) { log.Debug().Msgf("%s already parsed, returned", stage.Id) return stageFound, nil } - for _, queryFile := range stage.QueryFiles { - if !filepath.IsAbs(queryFile) { - queryFile = filepath.Join(stage.BaseDir, queryFile) - } - if _, err := os.Stat(queryFile); err != nil { - return nil, fmt.Errorf("%s links to an invalid query file %s: %w", stage.Id, queryFile, err) - } - } for i, nextStagePath := range stage.NextStagePaths { if !filepath.IsAbs(nextStagePath) { nextStagePath = filepath.Join(stage.BaseDir, nextStagePath) diff --git a/stage/no_influx.go b/stage/no_influx.go index f2d2cc58..8e56a47b 100644 --- a/stage/no_influx.go +++ b/stage/no_influx.go @@ -7,7 +7,10 @@ import ( "errors" ) -func NewInfluxRunRecorder(_ string) RunRecorder { +func NewInfluxRunRecorder(cfgPath string) RunRecorder { + if cfgPath == "" { + return nil + } return &NotSupportedRecorder{} } diff --git a/stage/stage.go b/stage/stage.go index 4bd22e29..bff37752 100644 --- a/stage/stage.go +++ b/stage/stage.go @@ -271,11 +271,15 @@ func (s *Stage) runSequentially(ctx context.Context) (returnErr error) { } func (s *Stage) runQueryFile(ctx context.Context, queryFile string, expectedRowCountStartIndex *int, fileAlias *string) error { - queryFileAbsPath := queryFile - if !filepath.IsAbs(queryFileAbsPath) { - queryFileAbsPath = filepath.Join(s.BaseDir, queryFileAbsPath) + file, err := os.Open(queryFile) + if fileAlias == nil { + if relPath, relErr := filepath.Rel(s.BaseDir, queryFile); relErr == nil { + fileAlias = &relPath + } else { + fileAlias = &queryFile + } } - file, err := os.Open(queryFileAbsPath) + var queries []string if err == nil { queries, err = presto.SplitQueries(file) @@ -292,9 +296,7 @@ func (s *Stage) runQueryFile(ctx context.Context, queryFile string, expectedRowC } return err } - if fileAlias == nil { - fileAlias = &queryFile - } + if expectedRowCountStartIndex != nil { err = s.runQueries(ctx, queries, fileAlias, *expectedRowCountStartIndex) *expectedRowCountStartIndex += len(queries) @@ -345,7 +347,11 @@ func (s *Stage) runRandomly(ctx context.Context) error { } } else { queryFile := s.QueryFiles[idx-len(s.Queries)] - fileAlias := fmt.Sprintf("rand_%d/%s", i, queryFile) + fileAlias := queryFile + if relPath, relErr := filepath.Rel(s.BaseDir, queryFile); relErr == nil { + fileAlias = relPath + } + fileAlias = fmt.Sprintf("rand_%d_%s", i, fileAlias) if err := s.runQueryFile(ctx, queryFile, nil, &fileAlias); err != nil { return err }