Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/dev' into shift-left6-test
Browse files Browse the repository at this point in the history
  • Loading branch information
gailazar300 committed Dec 21, 2023
2 parents 24524ed + 3343867 commit 0d5f92a
Show file tree
Hide file tree
Showing 30 changed files with 583 additions and 597 deletions.
29 changes: 12 additions & 17 deletions artifactory/commands/buildinfo/addgit.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,16 +189,15 @@ func (config *BuildAddGitCommand) collectBuildIssues(vcsUrl string) ([]buildinfo
return config.DoCollect(config.issuesConfig, lastVcsRevision)
}

func (config *BuildAddGitCommand) DoCollect(issuesConfig *IssuesConfiguration, lastVcsRevision string) ([]buildinfo.AffectedIssue, error) {
var foundIssues []buildinfo.AffectedIssue
func (config *BuildAddGitCommand) DoCollect(issuesConfig *IssuesConfiguration, lastVcsRevision string) (foundIssues []buildinfo.AffectedIssue, err error) {
logRegExp, err := createLogRegExpHandler(issuesConfig, &foundIssues)
if err != nil {
return nil, err
return
}

errRegExp, err := createErrRegExpHandler(lastVcsRevision)
if err != nil {
return nil, err
return
}

// Get log with limit, starting from the latest commit.
Expand All @@ -207,36 +206,32 @@ func (config *BuildAddGitCommand) DoCollect(issuesConfig *IssuesConfiguration, l
// Change working dir to where .git is.
wd, err := os.Getwd()
if errorutils.CheckError(err) != nil {
return nil, err
return
}
defer func() {
e := os.Chdir(wd)
if err == nil {
err = errorutils.CheckError(e)
}
err = errors.Join(err, errorutils.CheckError(os.Chdir(wd)))
}()
err = os.Chdir(config.dotGitPath)
if errorutils.CheckError(err) != nil {
return nil, err
return
}

// Run git command.
_, _, exitOk, err := gofrogcmd.RunCmdWithOutputParser(logCmd, false, logRegExp, errRegExp)
if err != nil {
if _, ok := err.(RevisionRangeError); ok {
if errorutils.CheckError(err) != nil {
var revisionRangeError RevisionRangeError
if errors.As(err, &revisionRangeError) {
// Revision not found in range. Ignore and don't collect new issues.
log.Info(err.Error())
return []buildinfo.AffectedIssue{}, nil
}
return nil, errorutils.CheckError(err)
return
}
if !exitOk {
// May happen when trying to run git log for non-existing revision.
return nil, errorutils.CheckErrorf("failed executing git log command")
err = errorutils.CheckErrorf("failed executing git log command")
}

// Return found issues.
return foundIssues, nil
return
}

// Creates a regexp handler to parse and fetch issues from the output of the git log command.
Expand Down
3 changes: 1 addition & 2 deletions artifactory/commands/golang/go.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,7 @@ func getPackageVersion(repoName, packageName string, details auth.ServiceDetails
}
// Extract version from response
var version PackageVersionResponseContent
err = json.Unmarshal(body, &version)
if err != nil {
if err = json.Unmarshal(body, &version); err != nil {
return "", errorutils.CheckError(err)
}
return version.Version, nil
Expand Down
4 changes: 2 additions & 2 deletions artifactory/commands/gradle/gradle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

type GradleCommand struct {
tasks string
tasks []string
configPath string
configuration *utils.BuildConfiguration
serverDetails *config.ServerDetails
Expand Down Expand Up @@ -179,7 +179,7 @@ func (gc *GradleCommand) SetConfigPath(configPath string) *GradleCommand {
return gc
}

func (gc *GradleCommand) SetTasks(tasks string) *GradleCommand {
func (gc *GradleCommand) SetTasks(tasks []string) *GradleCommand {
gc.tasks = tasks
return gc
}
Expand Down
1 change: 1 addition & 0 deletions artifactory/commands/transferfiles/api/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
type ProcessStatusType string
type ChunkFileStatusType string
type ChunkId string
type NodeId string

const (
Done ProcessStatusType = "DONE"
Expand Down
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/fileserror.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (e *errorsRetryPhase) handleErrorsFile(errFilePath string, pcWrapper *produ
// Since we're about to handle the transfer retry of the failed files,
// we should now decrement the failures counter view.
e.progressBar.changeNumberOfFailuresBy(-1 * len(failedFiles.Errors))
err = e.stateManager.ChangeTransferFailureCountBy(uint(len(failedFiles.Errors)), false)
err = e.stateManager.ChangeTransferFailureCountBy(uint64(len(failedFiles.Errors)), false)
if err != nil {
return err
}
Expand Down
45 changes: 27 additions & 18 deletions artifactory/commands/transferfiles/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package transferfiles
import (
"context"
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -255,7 +256,7 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
curTokensBatch := api.UploadChunksStatusBody{}
chunksLifeCycleManager := ChunksLifeCycleManager{
deletedChunksSet: datastructures.MakeSet[api.ChunkId](),
nodeToChunksMap: make(map[nodeId]map[api.ChunkId]UploadedChunkData),
nodeToChunksMap: make(map[api.NodeId]map[api.ChunkId]UploadedChunkData),
}
curProcessedUploadChunks = 0
var timeEstMng *state.TimeEstimationManager
Expand All @@ -269,28 +270,31 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
}
time.Sleep(waitTimeBetweenChunkStatusSeconds * time.Second)

// Run once per 3 minutes
if i%60 == 0 {
// Run once per 5 minutes
if i%100 == 0 {
// 'Working threads' are determined by how many upload chunks are currently being processed by the source Artifactory instance.
if err := phaseBase.stateManager.SetWorkingThreads(curProcessedUploadChunks); err != nil {
log.Error("Couldn't set the current number of working threads:", err.Error())
}
log.Debug("There are", len(phaseBase.stateManager.StaleChunks), "chunks in transit for more than 30 minutes")
log.Debug(fmt.Sprintf("Chunks in transit: %v", chunksLifeCycleManager.GetNodeIdToChunkIdsMap()))
}

// Each uploading thread receives a token and a node id from the source via the uploadChunkChan, so this go routine can poll on its status.
fillChunkDataBatch(&chunksLifeCycleManager, uploadChunkChan)
activeChunks := fillChunkDataBatch(&chunksLifeCycleManager, uploadChunkChan)
if err := chunksLifeCycleManager.StoreStaleChunks(phaseBase.stateManager); err != nil {
log.Error("Couldn't store the stale chunks:", err.Error())
}
// When totalChunks size is zero, it means that all the tokens are uploaded,
// we received 'DONE' for all of them, and we notified the source that they can be deleted from the memory.
// If during the polling some chunks data were lost due to network issues, either on the client or on the source,
// it will be written to the error channel
if chunksLifeCycleManager.totalChunks == 0 {
if activeChunks == 0 {
if shouldStopPolling(doneChan) {
log.Debug("Stopping to poll on uploads...")
return
}
log.Debug("Active chunks counter is 0, but the 'done' signal hasn't been received yet")
continue
}

Expand All @@ -309,22 +313,25 @@ func pollUploads(phaseBase *phaseBase, srcUpService *srcUserPluginService, uploa
}

// Fill chunk data batch till full. Return if no new chunk data is available.
func fillChunkDataBatch(chunksLifeCycleManager *ChunksLifeCycleManager, uploadChunkChan chan UploadedChunk) {
for chunksLifeCycleManager.totalChunks < GetChunkUploaderThreads() {
func fillChunkDataBatch(chunksLifeCycleManager *ChunksLifeCycleManager, uploadChunkChan chan UploadedChunk) (activeChunks int) {
for _, activeNodeChunks := range chunksLifeCycleManager.nodeToChunksMap {
activeChunks += len(activeNodeChunks)
}
for ; activeChunks < GetChunkUploaderThreads(); activeChunks++ {
select {
case data := <-uploadChunkChan:
currentNodeId := nodeId(data.NodeId)
currentNodeId := api.NodeId(data.NodeId)
currentChunkId := api.ChunkId(data.UuidToken)
if _, exist := chunksLifeCycleManager.nodeToChunksMap[currentNodeId]; !exist {
chunksLifeCycleManager.nodeToChunksMap[currentNodeId] = make(map[api.ChunkId]UploadedChunkData)
}
chunksLifeCycleManager.nodeToChunksMap[currentNodeId][currentChunkId] = data.UploadedChunkData
chunksLifeCycleManager.totalChunks++
default:
// No new tokens are waiting.
return
}
}
return
}

func shouldStopPolling(doneChan chan bool) bool {
Expand Down Expand Up @@ -378,14 +385,13 @@ func handleChunksStatuses(phase *phaseBase, chunksStatus *api.UploadChunksStatus
reduceCurProcessedChunks()
log.Debug("Received status DONE for chunk '" + chunk.UuidToken + "'")

chunkSentTime := chunksLifeCycleManager.nodeToChunksMap[nodeId(chunksStatus.NodeId)][api.ChunkId(chunk.UuidToken)].TimeSent
chunkSentTime := chunksLifeCycleManager.nodeToChunksMap[api.NodeId(chunksStatus.NodeId)][api.ChunkId(chunk.UuidToken)].TimeSent
err := updateProgress(phase, timeEstMng, chunk, chunkSentTime)
if err != nil {
log.Error("Unexpected error in progress update: " + err.Error())
continue
}
delete(chunksLifeCycleManager.nodeToChunksMap[nodeId(chunksStatus.NodeId)], api.ChunkId(chunk.UuidToken))
chunksLifeCycleManager.totalChunks--
delete(chunksLifeCycleManager.nodeToChunksMap[api.NodeId(chunksStatus.NodeId)], api.ChunkId(chunk.UuidToken))
// Using the deletedChunksSet, we inform the source that the 'DONE' message has been received, and it no longer has to keep those chunks UUIDs.
chunksLifeCycleManager.deletedChunksSet.Add(api.ChunkId(chunk.UuidToken))
stopped := handleFilesOfCompletedChunk(chunk.Files, errorsChannelMng)
Expand Down Expand Up @@ -422,17 +428,21 @@ func updateProgress(phase *phaseBase, timeEstMng *state.TimeEstimationManager,
}

// Verify and handle in progress chunks synchronization between the CLI and the Source Artifactory instance
func checkChunkStatusSync(chunkStatus *api.UploadChunksStatusResponse, manager *ChunksLifeCycleManager, errorsChannelMng *ErrorsChannelMng) {
func checkChunkStatusSync(chunkStatus *api.UploadChunksStatusResponse, chunksLifeCycleManager *ChunksLifeCycleManager, errorsChannelMng *ErrorsChannelMng) {
// Compare between the number of chunks received from the latest syncChunks request to the chunks data we handle locally in nodeToChunksMap.
// If the number of the in progress chunks of a node within nodeToChunksMap differs from the chunkStatus received, there is missing data on the source side.
if len(chunkStatus.ChunksStatus) != len(manager.nodeToChunksMap[nodeId(chunkStatus.NodeId)]) {
expectedChunksInNode := len(chunksLifeCycleManager.nodeToChunksMap[api.NodeId(chunkStatus.NodeId)])
actualChunksInNode := len(chunkStatus.ChunksStatus)
if actualChunksInNode != expectedChunksInNode {
log.Info(fmt.Printf("NodeID %s: Missing chunks detected. Expected: %d, Received: %d. Storing absent chunks in error channels for later retry.",
chunkStatus.NodeId, expectedChunksInNode, actualChunksInNode))
// Get all the chunks uuids on the Artifactory side in a set of uuids
chunksUuidsSetFromResponse := datastructures.MakeSet[api.ChunkId]()
for _, chunk := range chunkStatus.ChunksStatus {
chunksUuidsSetFromResponse.Add(api.ChunkId(chunk.UuidToken))
}
// Get all the chunks uuids on the CLI side
chunksUuidsSliceFromMap := manager.GetInProgressTokensSliceByNodeId(nodeId(chunkStatus.NodeId))
chunksUuidsSliceFromMap := chunksLifeCycleManager.GetInProgressTokensSliceByNodeId(api.NodeId(chunkStatus.NodeId))
failedFile := api.FileUploadStatusResponse{
Status: api.Fail,
StatusCode: SyncErrorStatusCode,
Expand All @@ -442,13 +452,12 @@ func checkChunkStatusSync(chunkStatus *api.UploadChunksStatusResponse, manager *
// Missing chunks are those that are inside chunksUuidsSliceFromMap but not in chunksUuidsSetFromResponse
for _, chunkUuid := range chunksUuidsSliceFromMap {
if !chunksUuidsSetFromResponse.Exists(chunkUuid) {
for _, file := range manager.nodeToChunksMap[nodeId(chunkStatus.NodeId)][chunkUuid].ChunkFiles {
for _, file := range chunksLifeCycleManager.nodeToChunksMap[api.NodeId(chunkStatus.NodeId)][chunkUuid].ChunkFiles {
failedFile.FileRepresentation = file
// errorsChannelMng will upload failed files again in phase 3 or in an additional transfer file run.
addErrorToChannel(errorsChannelMng, failedFile)
}
delete(manager.nodeToChunksMap[nodeId(chunkStatus.NodeId)], chunkUuid)
manager.totalChunks--
delete(chunksLifeCycleManager.nodeToChunksMap[api.NodeId(chunkStatus.NodeId)], chunkUuid)
reduceCurProcessedChunks()
}
}
Expand Down
2 changes: 1 addition & 1 deletion artifactory/commands/transferfiles/state/runstatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ type TransferRunStatus struct {
WorkingThreads int `json:"working_threads,omitempty"`
VisitedFolders uint64 `json:"visited_folders,omitempty"`
DelayedFiles uint64 `json:"delayed_files,omitempty"`
TransferFailures uint `json:"transfer_failures,omitempty"`
TransferFailures uint64 `json:"transfer_failures,omitempty"`
TimeEstimationManager `json:"time_estimation,omitempty"`
StaleChunks []StaleChunks `json:"stale_chunks,omitempty"`
}
Expand Down
45 changes: 20 additions & 25 deletions artifactory/commands/transferfiles/state/statemanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,53 +136,56 @@ func (ts *TransferStateManager) SetRepoFullTransferCompleted() error {
// Increasing Transferred Diff files (modified files) and SizeByBytes value in suitable repository progress state
func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase1(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
err := ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase1Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase1Info.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&state.CurrentRepo.Phase1Info.TransferredSizeBytes, chunkTotalSizeInBytes)
atomicallyAddInt64(&state.CurrentRepo.Phase1Info.TransferredUnits, chunkTotalFiles)
return nil
})
if err != nil {
return err
}
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.OverallTransfer.TransferredSizeBytes += chunkTotalSizeInBytes
transferRunStatus.OverallTransfer.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&transferRunStatus.OverallTransfer.TransferredSizeBytes, chunkTotalSizeInBytes)
atomicallyAddInt64(&transferRunStatus.OverallTransfer.TransferredUnits, chunkTotalFiles)

if transferRunStatus.BuildInfoRepo {
transferRunStatus.OverallBiFiles.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&transferRunStatus.OverallBiFiles.TransferredUnits, chunkTotalFiles)
}
return nil
})
}

func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase2(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase2Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase2Info.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&state.CurrentRepo.Phase2Info.TransferredSizeBytes, chunkTotalSizeInBytes)
atomicallyAddInt64(&state.CurrentRepo.Phase2Info.TransferredUnits, chunkTotalFiles)
return nil
})
}

func (ts *TransferStateManager) IncTotalSizeAndFilesPhase2(filesNumber, totalSize int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase2Info.TotalSizeBytes += totalSize
state.CurrentRepo.Phase2Info.TotalUnits += filesNumber
atomicallyAddInt64(&state.CurrentRepo.Phase2Info.TotalSizeBytes, totalSize)
atomicallyAddInt64(&state.CurrentRepo.Phase2Info.TotalUnits, filesNumber)
return nil
})
}

// Set relevant information of files and storage we need to transfer in phase3
func (ts *TransferStateManager) SetTotalSizeAndFilesPhase3(filesNumber, totalSize int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase3Info.TotalSizeBytes = totalSize
state.CurrentRepo.Phase3Info.TotalUnits = filesNumber
state.CurrentRepo.Phase3Info.TransferredUnits = 0
state.CurrentRepo.Phase3Info.TransferredSizeBytes = 0
atomicallyAddInt64(&state.CurrentRepo.Phase3Info.TotalSizeBytes, totalSize)
atomicallyAddInt64(&state.CurrentRepo.Phase3Info.TotalUnits, filesNumber)
return nil
})
}

// Increase transferred storage and files in phase 3
func (ts *TransferStateManager) IncTransferredSizeAndFilesPhase3(chunkTotalFiles, chunkTotalSizeInBytes int64) error {
return ts.TransferState.Action(func(state *TransferState) error {
state.CurrentRepo.Phase3Info.TransferredSizeBytes += chunkTotalSizeInBytes
state.CurrentRepo.Phase3Info.TransferredUnits += chunkTotalFiles
atomicallyAddInt64(&state.CurrentRepo.Phase3Info.TransferredSizeBytes, chunkTotalSizeInBytes)
atomicallyAddInt64(&state.CurrentRepo.Phase3Info.TransferredUnits, chunkTotalFiles)
return nil
})
}
Expand Down Expand Up @@ -288,29 +291,21 @@ func (ts *TransferStateManager) GetDiffHandlingRange() (start, end time.Time, er

func (ts *TransferStateManager) IncVisitedFolders() error {
return ts.action(func(transferRunStatus *TransferRunStatus) error {
transferRunStatus.VisitedFolders++
atomicallyAddUint64(&transferRunStatus.VisitedFolders, 1, true)
return nil
})
}

func (ts *TransferStateManager) ChangeDelayedFilesCountBy(count uint64, increase bool) error {
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
if increase {
transferRunStatus.DelayedFiles += count
} else {
transferRunStatus.DelayedFiles -= count
}
atomicallyAddUint64(&transferRunStatus.DelayedFiles, count, increase)
return nil
})
}

func (ts *TransferStateManager) ChangeTransferFailureCountBy(count uint, increase bool) error {
func (ts *TransferStateManager) ChangeTransferFailureCountBy(count uint64, increase bool) error {
return ts.TransferRunStatus.action(func(transferRunStatus *TransferRunStatus) error {
if increase {
transferRunStatus.TransferFailures += count
} else {
transferRunStatus.TransferFailures -= count
}
atomicallyAddUint64(&transferRunStatus.TransferFailures, count, increase)
return nil
})
}
Expand Down
Loading

0 comments on commit 0d5f92a

Please sign in to comment.