Skip to content

Commit

Permalink
run: Parse query files early
Browse files Browse the repository at this point in the history
  • Loading branch information
ethanyzhang committed Jun 19, 2024
1 parent 2f4156f commit 89f35ec
Show file tree
Hide file tree
Showing 4 changed files with 29 additions and 17 deletions.
2 changes: 2 additions & 0 deletions cmd/run/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ func Run(_ *cobra.Command, args []string) {
}
defaultRunNameBuilder.WriteString(st.Id)
}
} else {
log.Fatal().Str("path", path).Err(err).Msg("failed to process stage path")
}
}
if defaultRunNameBuilder != nil {
Expand Down
17 changes: 9 additions & 8 deletions stage/map.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,15 @@ func ReadStageFromFile(filePath string) (*Stage, error) {
if err = json.Unmarshal(bytes, stage); err != nil {
return nil, fmt.Errorf("failed to parse json %s: %w", filePath, err)
}
for i, queryFile := range stage.QueryFiles {
if !filepath.IsAbs(queryFile) {
queryFile = filepath.Join(stage.BaseDir, queryFile)
stage.QueryFiles[i] = queryFile
}
if _, err = os.Stat(queryFile); err != nil {
return nil, fmt.Errorf("%s links to an invalid query file %s: %w", stage.Id, queryFile, err)
}
}
log.Debug().Str("id", stage.Id).Str("path", filePath).Msg("read stage file")
return stage, nil
}
Expand All @@ -80,14 +89,6 @@ func ParseStage(stage *Stage, stages Map) (*Stage, error) {
log.Debug().Msgf("%s already parsed, returned", stage.Id)
return stageFound, nil
}
for _, queryFile := range stage.QueryFiles {
if !filepath.IsAbs(queryFile) {
queryFile = filepath.Join(stage.BaseDir, queryFile)
}
if _, err := os.Stat(queryFile); err != nil {
return nil, fmt.Errorf("%s links to an invalid query file %s: %w", stage.Id, queryFile, err)
}
}
for i, nextStagePath := range stage.NextStagePaths {
if !filepath.IsAbs(nextStagePath) {
nextStagePath = filepath.Join(stage.BaseDir, nextStagePath)
Expand Down
5 changes: 4 additions & 1 deletion stage/no_influx.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,10 @@ import (
"errors"
)

func NewInfluxRunRecorder(_ string) RunRecorder {
func NewInfluxRunRecorder(cfgPath string) RunRecorder {
if cfgPath == "" {
return nil
}
return &NotSupportedRecorder{}
}

Expand Down
22 changes: 14 additions & 8 deletions stage/stage.go
Original file line number Diff line number Diff line change
Expand Up @@ -271,11 +271,15 @@ func (s *Stage) runSequentially(ctx context.Context) (returnErr error) {
}

func (s *Stage) runQueryFile(ctx context.Context, queryFile string, expectedRowCountStartIndex *int, fileAlias *string) error {
queryFileAbsPath := queryFile
if !filepath.IsAbs(queryFileAbsPath) {
queryFileAbsPath = filepath.Join(s.BaseDir, queryFileAbsPath)
file, err := os.Open(queryFile)
if fileAlias == nil {
if relPath, relErr := filepath.Rel(s.BaseDir, queryFile); relErr == nil {
fileAlias = &relPath
} else {
fileAlias = &queryFile
}
}
file, err := os.Open(queryFileAbsPath)

var queries []string
if err == nil {
queries, err = presto.SplitQueries(file)
Expand All @@ -292,9 +296,7 @@ func (s *Stage) runQueryFile(ctx context.Context, queryFile string, expectedRowC
}
return err
}
if fileAlias == nil {
fileAlias = &queryFile
}

if expectedRowCountStartIndex != nil {
err = s.runQueries(ctx, queries, fileAlias, *expectedRowCountStartIndex)
*expectedRowCountStartIndex += len(queries)
Expand Down Expand Up @@ -345,7 +347,11 @@ func (s *Stage) runRandomly(ctx context.Context) error {
}
} else {
queryFile := s.QueryFiles[idx-len(s.Queries)]
fileAlias := fmt.Sprintf("rand_%d/%s", i, queryFile)
fileAlias := queryFile
if relPath, relErr := filepath.Rel(s.BaseDir, queryFile); relErr == nil {
fileAlias = relPath
}
fileAlias = fmt.Sprintf("rand_%d_%s", i, fileAlias)
if err := s.runQueryFile(ctx, queryFile, nil, &fileAlias); err != nil {
return err
}
Expand Down

0 comments on commit 89f35ec

Please sign in to comment.