Skip to content

Commit

Permalink
Required nodes shall not schedule nodes if dependencies are not avail…
Browse files Browse the repository at this point in the history
…able

This reverts commit 0aa7856.
  • Loading branch information
pchalamet committed Jun 6, 2024
1 parent 0aa7856 commit e717ef0
Showing 1 changed file with 158 additions and 159 deletions.
317 changes: 158 additions & 159 deletions src/Terrabuild/Core/Build.fs
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ let run (configuration: Configuration.Workspace) (graph: Graph.Workspace) (cache
| _ -> NodeStatus.Unfulfilled nodeInfo

let buildNode (node: Graph.Node) =
// determine if step node can be reused or not
let useRemoteCache = Cacheability.Never <> (node.Cache &&& cacheMode)
let cacheEntryId = $"{node.ProjectHash}/{node.Target}/{node.Hash}"

notification.NodeDownloading node
let isAllSatisfied =
node.Dependencies
Expand All @@ -135,152 +139,150 @@ let run (configuration: Configuration.Workspace) (graph: Graph.Workspace) (cache
|> Seq.isEmpty

if isAllSatisfied then
let projectDirectory =
match node.Project with
| FS.Directory projectDirectory -> projectDirectory
| FS.File projectFile -> FS.parentDirectory projectFile
| _ -> "."

let cacheEntryId = $"{node.ProjectHash}/{node.Target}/{node.Hash}"

// check first if it's possible to restore previously built state
let summary =
if options.Force || node.Cache = Cacheability.Never then None
else
// determine if step node can be reused or not
let useRemoteCache = Cacheability.Never <> (node.Cache &&& cacheMode)

// get task execution summary & take care of retrying failed tasks
match cache.TryGetSummary useRemoteCache cacheEntryId with
| Some summary when summary.Status = Cache.TaskStatus.Failure && options.Retry -> None
| Some summary -> Some summary
| _ -> None

match summary with
| Some summary ->
Log.Debug("{Hash}: Restoring '{Project}/{Target}' from cache", node.Hash, node.Project, node.Target)
match summary.Outputs with
| Some outputs ->
let files = IO.enumerateFiles outputs
IO.copyFiles projectDirectory outputs files |> ignore
| _ -> ()
Some summary, true

| _ ->
Log.Debug("{Hash}: Building '{Project}/{Target}'", node.Hash, node.Project, node.Target)
let cacheEntry = cache.CreateEntry configuration.SourceControl.CI cacheEntryId
notification.NodeBuilding node

// NOTE:
// we use ProjectHash here because it's interesting from a cache perspective
// some binaries could have been cached in homedir, let's reuse them if available
let homeDir = cache.CreateHomeDir node.ProjectHash

let allCommands =
node.CommandLines
|> List.collect (fun batch ->
batch.Actions |> List.mapi (fun index commandLine ->
let cmd = "docker"
let wsDir = Environment.CurrentDirectory

let getContainerUser (container: string) =
match containerInfos.TryGetValue(container) with
| true, whoami ->
Log.Debug("Reusing USER {whoami} for {container}", whoami, container)
whoami
| _ ->
// discover USER
let args = $"run --rm --name {node.Hash} --entrypoint whoami {container}"
let whoami =
Log.Debug("Identifying USER for {container}", container)
match Exec.execCaptureOutput workspaceDir cmd args with
| Exec.Success (whoami, 0) -> whoami.Trim()
| _ ->
Log.Debug("USER identification failed for {container}: using root", container)
"root"

Log.Debug("Using USER {whoami} for {container}", whoami, container)
containerInfos.TryAdd(container, whoami) |> ignore
whoami

let metaCommand =
if index = 0 then batch.MetaCommand
else "+++"

match batch.Container with
| None -> metaCommand, projectDirectory, commandLine.Command, commandLine.Arguments, batch.Container
| Some container ->
let whoami = getContainerUser container
let envs =
batch.ContainerVariables
|> Seq.map (fun var -> $"-e {var}")
|> String.join " "
let args = $"run --rm --net=host --name {node.Hash} -v /var/run/docker.sock:/var/run/docker.sock -v {homeDir}:/{whoami} -v {wsDir}:/terrabuild -w /terrabuild/{projectDirectory} --entrypoint {commandLine.Command} {envs} {container} {commandLine.Arguments}"
metaCommand, workspaceDir, cmd, args, batch.Container))

let beforeFiles =
if node.IsLeaf then IO.Snapshot.Empty // FileSystem.createSnapshot projectDirectory node.Outputs
else IO.createSnapshot node.Outputs projectDirectory

let stepLogs = List<Cache.StepSummary>()
let mutable lastExitCode = 0
let mutable cmdLineIndex = 0
let cmdFirstStartedAt = DateTime.UtcNow
let mutable cmdLastEndedAt = cmdFirstStartedAt

while cmdLineIndex < allCommands.Length && lastExitCode = 0 do
let startedAt =
if cmdLineIndex > 0 then DateTime.UtcNow
else cmdFirstStartedAt
let metaCommand, workDir, cmd, args, container = allCommands[cmdLineIndex]
let logFile = cacheEntry.NextLogFile()
cmdLineIndex <- cmdLineIndex + 1

Log.Debug("{Hash}: Running '{Command}' with '{Arguments}'", node.Hash, cmd, args)
let exitCode = Exec.execCaptureTimestampedOutput workDir cmd args logFile
cmdLastEndedAt <- DateTime.UtcNow
let endedAt = cmdLastEndedAt
let duration = endedAt - startedAt
let stepLog = { Cache.StepSummary.MetaCommand = metaCommand
Cache.StepSummary.Command = cmd
Cache.StepSummary.Arguments = args
Cache.StepSummary.Container = container
Cache.StepSummary.StartedAt = startedAt
Cache.StepSummary.EndedAt = endedAt
Cache.StepSummary.Duration = duration
Cache.StepSummary.Log = logFile
Cache.StepSummary.ExitCode = exitCode }
stepLog |> stepLogs.Add
lastExitCode <- exitCode
Log.Debug("{Hash}: Execution completed with '{Code}'", node.Hash, exitCode)

notification.NodeUploading node
let afterFiles = IO.createSnapshot node.Outputs projectDirectory

// keep only new or modified files
let newFiles = afterFiles - beforeFiles

// create an archive with new files
let outputs = IO.copyFiles cacheEntry.Outputs projectDirectory newFiles

let status =
if lastExitCode = 0 then
Log.Debug("{Hash}: Marking as success", node.Hash)
Cache.TaskStatus.Success
if node.Required then
let projectDirectory =
match node.Project with
| FS.Directory projectDirectory -> projectDirectory
| FS.File projectFile -> FS.parentDirectory projectFile
| _ -> "."

// check first if it's possible to restore previously built state
let summary =
if options.Force || node.Cache = Cacheability.Never then None
else
Log.Debug("{Hash}: Marking as failed", node.Hash)
Cache.TaskStatus.Failure

let summary = { Cache.TargetSummary.Project = node.Project
Cache.TargetSummary.Target = node.Target
Cache.TargetSummary.Steps = stepLogs |> List.ofSeq
Cache.TargetSummary.Outputs = outputs
Cache.TargetSummary.Status = status
Cache.TargetSummary.StartedAt = cmdFirstStartedAt
Cache.TargetSummary.EndedAt = cmdLastEndedAt }
let files, size = cacheEntry.Complete summary
api |> Option.iter (fun api -> api.BuildAddArtifact buildId node.Project node.Target node.ProjectHash node.Hash files size (status = Cache.TaskStatus.Success))
Some summary, false
// get task execution summary & take care of retrying failed tasks
match cache.TryGetSummary useRemoteCache cacheEntryId with
| Some summary when summary.Status = Cache.TaskStatus.Failure && options.Retry -> None
| Some summary -> Some summary
| _ -> None

match summary with
| Some summary ->
Log.Debug("{Hash}: Restoring '{Project}/{Target}' from cache", node.Hash, node.Project, node.Target)
match summary.Outputs with
| Some outputs ->
let files = IO.enumerateFiles outputs
IO.copyFiles projectDirectory outputs files |> ignore
| _ -> ()
Some summary, true

| _ ->
Log.Debug("{Hash}: Building '{Project}/{Target}'", node.Hash, node.Project, node.Target)
let cacheEntry = cache.CreateEntry configuration.SourceControl.CI cacheEntryId
notification.NodeBuilding node

// NOTE:
// we use ProjectHash here because it's interesting from a cache perspective
// some binaries could have been cached in homedir, let's reuse them if available
let homeDir = cache.CreateHomeDir node.ProjectHash

let allCommands =
node.CommandLines
|> List.collect (fun batch ->
batch.Actions |> List.mapi (fun index commandLine ->
let cmd = "docker"
let wsDir = Environment.CurrentDirectory

let getContainerUser (container: string) =
match containerInfos.TryGetValue(container) with
| true, whoami ->
Log.Debug("Reusing USER {whoami} for {container}", whoami, container)
whoami
| _ ->
// discover USER
let args = $"run --rm --name {node.Hash} --entrypoint whoami {container}"
let whoami =
Log.Debug("Identifying USER for {container}", container)
match Exec.execCaptureOutput workspaceDir cmd args with
| Exec.Success (whoami, 0) -> whoami.Trim()
| _ ->
Log.Debug("USER identification failed for {container}: using root", container)
"root"

Log.Debug("Using USER {whoami} for {container}", whoami, container)
containerInfos.TryAdd(container, whoami) |> ignore
whoami

let metaCommand =
if index = 0 then batch.MetaCommand
else "+++"

match batch.Container with
| None -> metaCommand, projectDirectory, commandLine.Command, commandLine.Arguments, batch.Container
| Some container ->
let whoami = getContainerUser container
let envs =
batch.ContainerVariables
|> Seq.map (fun var -> $"-e {var}")
|> String.join " "
let args = $"run --rm --net=host --name {node.Hash} -v /var/run/docker.sock:/var/run/docker.sock -v {homeDir}:/{whoami} -v {wsDir}:/terrabuild -w /terrabuild/{projectDirectory} --entrypoint {commandLine.Command} {envs} {container} {commandLine.Arguments}"
metaCommand, workspaceDir, cmd, args, batch.Container))

let beforeFiles =
if node.IsLeaf then IO.Snapshot.Empty // FileSystem.createSnapshot projectDirectory node.Outputs
else IO.createSnapshot node.Outputs projectDirectory

let stepLogs = List<Cache.StepSummary>()
let mutable lastExitCode = 0
let mutable cmdLineIndex = 0
let cmdFirstStartedAt = DateTime.UtcNow
let mutable cmdLastEndedAt = cmdFirstStartedAt

while cmdLineIndex < allCommands.Length && lastExitCode = 0 do
let startedAt =
if cmdLineIndex > 0 then DateTime.UtcNow
else cmdFirstStartedAt
let metaCommand, workDir, cmd, args, container = allCommands[cmdLineIndex]
let logFile = cacheEntry.NextLogFile()
cmdLineIndex <- cmdLineIndex + 1

Log.Debug("{Hash}: Running '{Command}' with '{Arguments}'", node.Hash, cmd, args)
let exitCode = Exec.execCaptureTimestampedOutput workDir cmd args logFile
cmdLastEndedAt <- DateTime.UtcNow
let endedAt = cmdLastEndedAt
let duration = endedAt - startedAt
let stepLog = { Cache.StepSummary.MetaCommand = metaCommand
Cache.StepSummary.Command = cmd
Cache.StepSummary.Arguments = args
Cache.StepSummary.Container = container
Cache.StepSummary.StartedAt = startedAt
Cache.StepSummary.EndedAt = endedAt
Cache.StepSummary.Duration = duration
Cache.StepSummary.Log = logFile
Cache.StepSummary.ExitCode = exitCode }
stepLog |> stepLogs.Add
lastExitCode <- exitCode
Log.Debug("{Hash}: Execution completed with '{Code}'", node.Hash, exitCode)

notification.NodeUploading node
let afterFiles = IO.createSnapshot node.Outputs projectDirectory

// keep only new or modified files
let newFiles = afterFiles - beforeFiles

// create an archive with new files
let outputs = IO.copyFiles cacheEntry.Outputs projectDirectory newFiles

let status =
if lastExitCode = 0 then
Log.Debug("{Hash}: Marking as success", node.Hash)
Cache.TaskStatus.Success
else
Log.Debug("{Hash}: Marking as failed", node.Hash)
Cache.TaskStatus.Failure

let summary = { Cache.TargetSummary.Project = node.Project
Cache.TargetSummary.Target = node.Target
Cache.TargetSummary.Steps = stepLogs |> List.ofSeq
Cache.TargetSummary.Outputs = outputs
Cache.TargetSummary.Status = status
Cache.TargetSummary.StartedAt = cmdFirstStartedAt
Cache.TargetSummary.EndedAt = cmdLastEndedAt }
let files, size = cacheEntry.Complete summary
api |> Option.iter (fun api -> api.BuildAddArtifact buildId node.Project node.Target node.ProjectHash node.Hash files size (status = Cache.TaskStatus.Success))
Some summary, false
else
cache.TryGetSummaryOnly useRemoteCache cacheEntryId, false
else
None, false

Expand All @@ -292,20 +294,17 @@ let run (configuration: Configuration.Workspace) (graph: Graph.Workspace) (cache
let rec queueAction (nodeId: string) =
let node = graph.Nodes[nodeId]

if node.Required then
let summary, restored = buildNode node
notification.NodeCompleted node restored summary
restoredNodes.TryAdd(nodeId, restored) |> ignore

// schedule children nodes if ready
let triggers = reverseIncomings[nodeId]
for trigger in triggers do
let newValue = System.Threading.Interlocked.Decrement(readyNodes[trigger])
if newValue = 0 then
readyNodes[trigger].Value <- -1 // mark node as scheduled
buildQueue.Enqueue (fun () -> queueAction trigger)
else
restoredNodes.TryAdd(nodeId, false) |> ignore
let summary, restored = buildNode node
notification.NodeCompleted node restored summary
restoredNodes.TryAdd(nodeId, restored) |> ignore

// schedule children nodes if ready
let triggers = reverseIncomings[nodeId]
for trigger in triggers do
let newValue = System.Threading.Interlocked.Decrement(readyNodes[trigger])
if newValue = 0 then
readyNodes[trigger].Value <- -1 // mark node as scheduled
buildQueue.Enqueue (fun () -> queueAction trigger)


readyNodes
Expand Down

0 comments on commit e717ef0

Please sign in to comment.