Skip to content

Commit

Permalink
SharedArrays
Browse files Browse the repository at this point in the history
  • Loading branch information
montyvesselinov committed Feb 2, 2024
1 parent 380a8ad commit 62a71fa
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 20 deletions.
2 changes: 1 addition & 1 deletion examples/parallel/distributed_test.jl
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ sol .= 0
@Distributed.everywhere function_test(x) = rand(8) .+ sum(x)

@sync @Distributed.distributed for i = 1:5
so[i, :] .= function_test(sil[i, :])
so[i, :] .= function_test(allparameters[i, :])
end
45 changes: 26 additions & 19 deletions src/MadsForward.jl
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import OrderedCollections
import DocumentFunction
import DelimitedFiles
import Distributed
import SharedArrays

function forward(madsdata::AbstractDict; all::Bool=false)
paramdict = Mads.getparamdict(madsdata)
Expand Down Expand Up @@ -61,17 +62,13 @@ function forward(madsdata::AbstractDict, paramarray::AbstractArray; parallel::Bo
Mads.madswarn("Incorrect array size: size(paramarray) = $(size(paramarray))")
return
elseif nrow == np
nr = ncol
if ncol == np
Mads.madswarn("Matrix columns assumed to represent the parameters!")
end
ncases = ncol
elseif ncol == np
np = ncol
nr = nrow
ncases = nrow
end
else
np = s[1]
nr = 1
ncases = 1
end
if all
madsdata_c = deepcopy(madsdata)
Expand All @@ -83,37 +80,47 @@ function forward(madsdata::AbstractDict, paramarray::AbstractArray; parallel::Bo
else
madsdata_c = madsdata
end
f = Mads.makearrayfunction(madsdata_c)
func_forward = Mads.makearrayfunction(madsdata_c)
local r
if length(s) == 2
local rv
restartdir = getrestartdir(madsdata_c)
if checkpointfrequency != 0 && restartdir != ""
@info("RobustPmap for parallel execution of forward runs ...")
if s[2] == np
rv = RobustPmap.crpmap(i->f(vec(paramarray[i, :])), checkpointfrequency, joinpath(restartdir, checkpointfilename), 1:nr)
rv = RobustPmap.crpmap(i->func_forward(vec(paramarray[i, :])), checkpointfrequency, joinpath(restartdir, checkpointfilename), 1:nr)
else
rv = RobustPmap.crpmap(i->f(vec(paramarray[:, i])), checkpointfrequency, joinpath(restartdir, checkpointfilename), 1:nr)
rv = RobustPmap.crpmap(i->func_forward(vec(paramarray[:, i])), checkpointfrequency, joinpath(restartdir, checkpointfilename), 1:nr)
end
r = hcat(collect.(values.(rv))...)
elseif parallel && Distributed.nprocs() > 1
@info("Distributed.pmap for parallel execution of forward runs ...")
@info("Parallel execution of forward runs ...")
Distributed.@everywhere madsdata_c = $madsdata_c
Distributed.@everywhere func_forward = Mads.makearrayfunction(madsdata_c)
psa = SharedArrays.SharedArray{Float64}(ncases, np)
if s[2] == np
rv = Distributed.pmap(i->f(vec(paramarray[i, :])), 1:nr)
rv1 = collect(values(func_forward(vec(paramarray[1, :]))))
psa .= collect(paramarray) # collect to avoid issues if paramarray is a SharedArray
else
rv = Distributed.pmap(i->f(vec(paramarray[:, i])), 1:nr)
rv1 = collect(values(func_forward(vec(paramarray[:, 1]))))
psa .= permutedims(collect(paramarray)) # collect to avoid issues if paramarray is a SharedArray
end
r = SharedArrays.SharedArray{Float64}(length(rv1), ncases)
r[:, 1] = rv1
@sync @Distributed.distributed for i = 2:ncases
func_forward = Mads.makearrayfunction(madsdata_c)
r[:, i] = collect(values(func_forward(vec(psa[i, :]))))
end
r = hcat(collect.(values.(rv))...)
else
@info("Serial execution of forward runs ...")
rv = Array{Array{Float64}}(undef, nr)
rv = Array{Array{Float64}}(undef, ncases)
if s[2] == np
@ProgressMeter.showprogress 4 for i = 1:nr
rv[i] = collect(values(f(vec(paramarray[i, :]))))
@ProgressMeter.showprogress 4 for i = 1:ncases
rv[i] = collect(values(func_forward(vec(paramarray[i, :]))))
end
else
@ProgressMeter.showprogress 4 for i = 1:nr
rv[i] = collect(values(f(vec(paramarray[:, i]))))
@ProgressMeter.showprogress 4 for i = 1:ncases
rv[i] = collect(values(func_forward(vec(paramarray[:, i]))))
end
end
r = hcat(rv...)
Expand Down

0 comments on commit 62a71fa

Please sign in to comment.