Astro 528, Lab 6, Exercise 2¶
Parallelization for Multi-Core Workstations (using multiple processes)¶
Hardware configuration for this exercise¶
In this lab exercise, we'll explore a few different ways that we can parallelize calculations across multiple cores of a single workstation or server. Some of the syntax and programming patterns are very similar as to what you'll use later when we parallelize our code for distributed computing (i.e., over a number of processors that do not have direct access to the same memory.)
Fortunately, most modern workstations and even laptops have multiple cores. As of Fall 2023, most of Roar Collab's "standard" compute nodes have 48 cores. (The high-memory nodes have 40 cores. Basic cores have 80 cores. Various GPU nodes have 24-48 CPU cores.)
For this exercise, it's important that you actually have access to multiple processor cores. If you're using Julia installed on your own machine, then you don't need to do anything special before starting the lab. However, if you're using the Roar Collab portal to access the Jupyter Lab server, then you will need to have requested multiple processor cores when you first submited the request for the BYOE JupyterLab server using the box labeled "Number of Cores", i.e. before you start executing cells in this notebook. Here's a screenshot showing the screen where you specify multiple cores.
While we're in class, please ask for just 4 cores, since there are many of us using the system at once.
If you return to working on the lab outside of class (and not right before the deadline), then feel free to try benchmarking the code using 8 cores, 12 or even 16 cores. If you do ask for several cores, then please be extra diligent about deleting your session via the Roar Collab Portal when you're done, so they become avaliable to other members of the class.
Why Jupyter?¶
For this exercise, we're using a Jupyter notebook, instead of a Pluto notebook. Why? In the previous exercise, we parallelized our code using one process that organized its computation into multiple threads, all within a Pluto notebook. Pluto worked well for that kind of parallelism. In contrast, for this exercise, we're going to parallelize our code using multiple processes. Pluto already uses multiple processes to analyze, run and display the Pluto notebook, so you can't (easily) parallelize your code over multiple processes within a Pluto notebook environment.
Since each Jupyter notebook is executed using a separate Julia kernel, they are more ammenable to parallelization via multiple processes.
Multi-process vs Multi-threading¶
As discussed in the previous exercise, both approaches are viable ways to parallelize code over multiple processors contained within one workstation/compute node/server. Multiple threads within a single process all have access to the memory that the operating system (OS) has assigned to that process. However, different processes are assigned separate memory locations. While it is possible to send data from one process to another, it needs to be done explicitly. For simples codes, I usually start with multi-threading, as it tends to be a little easier to implement and is often more efficient. For more complex codes (e.g., integrating PDEs), there can be advanages to using multiple processes (e.g., reducing need for memory locks). However, the main reason I want students to get experience parallelizing code using multiple processes is that this apporach is necessary to scale up the level parallelism to use more compute cores than exist in compute node. If we write (and test) our code to run in parallel using multiple processes on one workstation, then there will be relatively little work to implement a distributed memory parallelization.
Installing packages¶
First, let's make sure that the packages we'll use are installed.
using Pkg
Pkg.UPDATED_REGISTRY_THIS_SESSION[] = true # Tell Julia package manager not to download an updated registry for speed's sake
Pkg.activate(".") # Project.toml specifies the packages required for this lab
Pkg.instantiate() # Install any packages needed
Activating project at `~/labs/lab6` Precompiling project... 1308.9 ms ✓ lab6 1 dependency successfully precompiled in 6 seconds. 260 already precompiled.
We'll want the packages that we plan to use for our computations below to be loaded and in scope on each worker process. In order to make sure that we don't trigger multiple worker processes trying to precompile packages at once, we'll first have the delegator process precompile all the packages used in this exercise.
Pkg.precompile()
Getting setup for parallel computation¶
First, let us load Julia's Distributed
module that provides much of the needed functionality for communicating across processes (that could be on the same node or different nodes).
using Distributed
nworkers()
1
Note that even if you have a Jupyter notebook server (or remote desktop or PBS job) that has been allocated multiple cores, that doesn't mean that you'll use them. By default, Julia starts running on a single core which is the one worker process.
To enable parallel computing, we need to tell Julia how many processor cores to use. By default, we're just using one. How many should we add?
Let's get some information about the hardware that we're running on.
Sys.cpu_summary()
Intel(R) Xeon(R) Gold 6248R CPU @ 3.00GHz: speed user nice sys idle irq #1-48 1795 MHz 200506 s 0 s 74314 s 362309376 s 9825 s
As discussed in exercise 1, just because the compute node we've been assigned has many cores, doesn't mean that we're allowed to use all of them.
We'll check if this notebook is running inside a Slurm (or PBS) job, and if so, use the number of cores that we've been assigned. If we're not inside a Slurm (or PBS) job, then we'll use the number of physical cores.
using CpuId
if haskey(ENV,"PBS_NUM_PPN") # Check PBS env variable if running on Roar Restricted
pbs_procs_per_node = parse(Int64,ENV["PBS_NUM_PPN"])
num_cores_to_use = pbs_procs_per_node
elseif haskey(ENV,"SLURM_CPUS_PER_TASK") # Check Slurm env variable if running on Roar Collab
num_cores_to_use = parse(Int64,ENV["SLURM_CPUS_PER_TASK"])
else # Presumably running locally and can use all cores
num_physical_cores = cpucores()
num_cores_to_use = num_physical_cores
end
4
2a. How many cores will you be using for this exercise (i.e., the result of cell above)?
response_2a = missing # Explicilty set to integer (i.e., don't set to num_cores_to_use, so we'll be able to provide appropriate feedback on your future responces.)
missing
Next, we'll tell Julia to add several worker processes.
if num_cores_to_use>1
workers_ids = addprocs(num_cores_to_use, exeflags="--threads=1")
end
4-element Vector{Int64}: 2 3 4 5
addprocs(N)
returned a list of ids that you can use to refer to specific workers within Julia. (We won't be using these, but it can be useful if you want finer grained control.)
If you're every unsure of how many workers you currently have avaliable, you can run nworkers()
.
nworkers()
4
if length(workers_ids) >1
map(proc->fetch(@spawnat proc Threads.nthreads()) , workers_ids)
end
4-element Vector{Int64}: 1 1 1 1
Note that the list of workers now excludes 1 and the number of workers is equal to the number you added, rather than being one plus that number. Why? The original process is now in charge of delegating the work amongst the worker processes. (Historically, this process was referred to as the "master process", and you're likely to still that terminology in online documentation and questions. I'll try to use the "delegator process" instead.) When you have a lot of workers, it's often advantageous to let one process focus on managing all the communications and other overhead associated with parallelization. That way the worker processes are likely to stay more nearly in sync, rather than having to repeatedly wait for the delegator process to catch up. On the other hand, if there is relatively little overhead work for the delegator process, then it is often useful to set the number of workers equal to the number of available CPU cores, even though that means one process will have to do a little more work than the others. We'll try both below.
Loading Packages and modules¶
Now, that we have loaded the Distributed
package, we can use the functions and macros that it provides to start code and packages that we want to use during this exercise onto the other processes. Unlike when we were using multi-threading, with distributed computing each worker has its own chunk of memory and the programmer can control whether code is to be loaded on every process or just one/some.
When running on Roar Collab, we don't need to install or instantiate the pacakages on each worker, since we already installed them above and each worker has access to the same filesystem on Roar Collab.
Again, we'll use the @everywhere
macro in front of the using
statement to load packages on both the delegator and worker processes.
@everywhere using Distributions
Typing @everywhere
in front of every line of code would get very annoying fast. So it's better to put the common code that you want every worker to load into a separate file (and even better is to organize the code into one or more modules or packages). Then, we can load a whole bunch of functions that we'll want to use with one line.
In this exercise, we'll load the same code as in exercise 1 of this lab. (As before, you don't need to review all the code in src
unless you find it interesting or useful to see the programming patterns being demonstrated.) The main difference in this exercise is that we'll include the file src/lab6.jl
instead of the file model_spectrum.jl
. lab6.jl
includes the code from model_spectrum.jl
(and the other files that it includes) and wraps them in a module named lab6
. (In contrast, Pluto's @ingredients
macro automatically wraps code from a file into a module and returns it, so that Pluto can keep track of dependancies, which it needs to do to make a reactive notebook.)
We preface the include
with @everywhere
, since we want all of the processors to be able to make use of these function and types.
@everywhere include("src/lab6.jl")
Now, we'll bring that module into scope. Note that since this is not a package, we need to include a .
to tell Julia that it can the module in the current namespace, rather than needing to load a package.
using .lab6
Initialize data to be analyzed¶
In this exercise, we're going to create a model spectrum consisting of continuum, stellar absorption lines, telluric absorption lines.
The ModelSpectrum
module provides a SimulatedSpectrum
type, but we'll need to initialize a variable with some specific parameter values. The function will do that for us.
"Create an object that provides a model for the raw spetrum (i.e., before entering the telescope)"
function make_spectrum_object(;lambda_min = 4500, lambda_max = 7500, flux_scale = 1.0,
num_star_lines = 200, num_telluric_lines = 100, limit_line_effect = 10.0)
continuum_param = flux_scale .* [1.0, 1e-5, -2e-8]
star_line_locs = rand(Uniform(lambda_min,lambda_max),num_star_lines)
star_line_widths = fill(1.0,num_star_lines)
star_line_depths = rand(Uniform(0,1.0),num_star_lines)
telluric_line_locs = rand(Uniform(lambda_min,lambda_max),num_telluric_lines)
telluric_line_widths = fill(0.2,num_telluric_lines)
telluric_line_depths = rand(Uniform(0,0.4),num_telluric_lines)
SimulatedSpectrum(star_line_locs,star_line_widths,star_line_depths,telluric_line_locs,telluric_line_widths,telluric_line_depths,continuum_param=continuum_param,lambda_mid=0.5*(lambda_min+lambda_max),limit_line_effect=limit_line_effect)
end
make_spectrum_object
Next, we:
- create a set of wavelengths to observe the spectrum at,
- call the function above to create a spectrum object,
- create an object containing a model for the point spread function, and
- create an object that can compute the convolution of our spectral model with the point spread function model.
# 1. Pick range of of wavelength to work on.
lambda_min = 5000
lambda_max = 6000
# You may want to adjust the num_lambda to make things more/less computationally intensive
num_lambda = 8*1024
lambdas = collect(range(lambda_min,stop=lambda_max, length=num_lambda));
# 2. Create a model spectrum that we'll analyze below
raw_spectrum = make_spectrum_object(lambda_min=lambda_min,lambda_max=lambda_max)
# 3. Create a model for the point spread function (PSF)
psf_widths = [0.5, 1.0, 2.0]
psf_weights = [0.8, 0.15, 0.05]
psf_model = GaussianMixtureConvolutionKernel(psf_widths,psf_weights)
# 4. Create a model for the the convolution of thte raw spectrum with the PDF model
conv_spectrum = ConvolvedSpectrum(raw_spectrum,psf_model)
ConvolvedSpectrum{SimulatedSpectrum{Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64}, GaussianMixtureConvolutionKernel{Float64, Float64}}(SimulatedSpectrum{Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64, Float64}([5770.553225728153, 5607.15810933441, 5886.934929507959, 5404.8424397678145, 5152.345946396189, 5528.425877959072, 5949.519729991444, 5752.804201720949, 5610.66089431819, 5599.4907838496 … 5282.730585553011, 5123.8955500888005, 5665.07523731238, 5620.113230675114, 5968.010522769329, 5079.448801997934, 5799.954293260468, 5434.633641297694, 5422.814258456717, 5375.76690503832], [1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0 … 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0, 1.0], [0.955506364215705, 0.22601301634067938, 0.13975723892484626, 0.49418766831405037, 0.738964284036851, 0.327856416075766, 0.29133013676298325, 0.31087732038225324, 0.7923961240412921, 0.9047610421475163 … 0.8852079408625151, 0.6503247346189907, 0.0731220689366977, 0.6557691248370282, 0.33149159411338536, 0.503513722832821, 0.023760961784749512, 0.4641489031167999, 0.827409243862838, 0.8531992759868878], [5642.957625382872, 5289.0208955339185, 5068.003029117258, 5839.904397286735, 5343.0443383335705, 5019.283355679138, 5793.869939681162, 5010.989757531252, 5470.205486738164, 5653.627728770876 … 5622.0251314883635, 5449.820246424847, 5860.948054023382, 5494.57568015532, 5705.086796840091, 5481.312160152376, 5134.535133885085, 5098.562756769589, 5014.528465894271, 5249.0579740460225], [0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2 … 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2, 0.2], [0.057706582850278125, 0.07292674680309212, 0.21234851653109563, 0.2800970676079748, 0.287903661191806, 0.23427761028734603, 0.18324222723736458, 0.28372917399360614, 0.17408331834378132, 0.06857233969450767 … 0.005910997639139427, 0.2913095054220587, 0.22429062300076247, 0.3822851670174109, 0.09928785417648958, 0.30855309338064413, 0.120693476529642, 0.258057949866857, 0.19135538967555796, 0.2987421951429149], [1.0, 1.0e-5, -2.0e-8], 0.0, 5500.0, 10.0), GaussianMixtureConvolutionKernel{Float64, Float64}(Normal{Float64}[Normal{Float64}(μ=0.0, σ=0.5), Normal{Float64}(μ=0.0, σ=1.0), Normal{Float64}(μ=0.0, σ=2.0)], [0.8, 0.15, 0.05], 1.0000000143325827, 10.0), 10.0)
result_ref = conv_spectrum.(lambdas);
Visualize what we've created¶
This is very similar to exercise 1. If you'd like to plot the raw spectrum and the convolved spectrum to see what's going on, there's some plotting code below. The cells with plots below require that you first load the Plots
package. If you are confident you know what it looks like, then you can save some time, by not loading the Plots package.
If you'd like to plot the spectrum model we'll be generating, then let go ahead and load the plots package now.
Since we don't need the worker processes to plot things, we don't need to load the Plots code on all the worker processes.
using Plots
[ Info: Precompiling IJuliaExt [2f4121a4-3b3a-5ce6-9c5e-1f2673ce168a] (cache misses: wrong dep version loaded (2))
plot(lambdas,raw_spectrum.(lambdas),xlabel="λ", ylabel="Flux", label="Raw spectrum", legend=:bottomright)
plot!(lambdas,conv_spectrum.(lambdas), label="Convolved spectrum")
That's fairly crowded, you it may be useful to zoom in on a narrower range.
idx_plot = 1:min(1024,length(lambdas))
plot(lambdas[idx_plot],raw_spectrum.(lambdas[idx_plot]),xlabel="λ", ylabel="Flux", label="Raw spectrum", legend=:bottomright)
plot!(lambdas[idx_plot],result_ref[idx_plot], label="Convolved spectrum")
Benchmarking serial code¶
First, let's benchmark the calculation of this spectrum on a single processor. To keep things speedy, we'll time each method just a few times, rather than using @btime
.
num_runs = 3;
for i in 1:num_runs @time conv_spectrum(lambdas); end
3.566547 seconds (12.96 M allocations: 743.196 MiB, 2.79% gc time, 25.43% compilation time) 2.661580 seconds (11.56 M allocations: 671.352 MiB, 2.98% gc time) 2.652269 seconds (11.56 M allocations: 671.352 MiB, 2.71% gc time)
Thinking back to exercise 1, you may remember that we improved the performance by reducing memory allocations. There were a few ways to do that, and we'll repeat just a few of them here for reference.
function compute_using_for_loop(x::AbstractArray, spectrum::T) where T<:AbstractSpectrum
out = zeros(length(x))
for i in 1:length(x)
out[i] = conv_spectrum(x[i])
end
return out
end
compute_using_for_loop (generic function with 1 method)
for i in 1:num_runs @time compute_using_for_loop(lambdas,conv_spectrum) end
1.261160 seconds (5.49 M allocations: 220.663 MiB, 1.73% gc time, 1.61% compilation time) 1.242864 seconds (5.49 M allocations: 220.305 MiB, 1.81% gc time) 1.236438 seconds (5.49 M allocations: 220.305 MiB, 1.46% gc time)
As before, writing the code as an explicit for loop significantly decreased the number of allocations and improved performance.
Broadcasting¶
Thinking about to some previous labs, we made use of Julia's dot syntax to "broadcast" and "fuse" the array operation.
2b. How do you expect the performance and memory allocations to compare to our for loop above?
response_2b = missing # INSERT response as String, e.g. "The same" or "50% faster".
missing
for i in 1:num_runs @time conv_spectrum.(lambdas); end
1.247787 seconds (5.46 M allocations: 219.938 MiB, 2.07% gc time) 1.240960 seconds (5.46 M allocations: 219.938 MiB, 1.64% gc time) 1.239431 seconds (5.46 M allocations: 219.938 MiB, 1.72% gc time)
2c. How did the results compare to your prediction? If different, what could explain the difference(s)?
response_2c = missing # INSERT response as String
missing
for i in 1:num_runs @time map(conv_spectrum,lambdas) end
1.277212 seconds (5.50 M allocations: 221.954 MiB, 2.14% gc time, 2.17% compilation time) 1.239454 seconds (5.46 M allocations: 219.938 MiB, 1.51% gc time) 1.236560 seconds (5.46 M allocations: 219.938 MiB, 1.47% gc time)
The performance of map results should be very similar to the broadcasted and fused version above.
Before we start parallelizing code, let's increase the number of times we'll benchmark each function.
num_runs = 5;
Notice that num_runs
has now been set to two different values at different parts of this notebook. The value of num_runs
will depend on which of the cells has been run most recently.
Sometimes, it can be convenient to change a variable like this. But it also makes it very easy to confuse yourself. E.g., the code that you see in the notebook at any time is no longer guaranteed to be a complete representation of what the computer has computed.
(In previous semesters, when I used Jupyter notebooks, by far the most common problem students encountered was running cells out of order and getting into some unintended state. So my first suggestion was nearly always "Restart your kernel and step through the notebook in order." While that's not a big deal for small notebooks, it can quickly become tedious and time consuming. Pluto solved that problem! That's why I'm only using Jupyter for a few exercises that wouldn't work well with a Pluto notebook.)
Parallelizing with pmap¶
If you can write your computations in terms of calling map
, then one easy way to parallelize your code is to replace the call to map
with a call to pmap
, a parallel map.
If you only have one worker process, then it will still run in serial. But if you have multiple workers, then pmap
will parallelize your code. When parallelizing code, it's always good to double check that your results are consistent with the reference implementation.
using Test
@test isapprox(pmap(x->conv_spectrum.(x),lambdas), result_ref)
[ Info: Precompiling DistributionsTestExt [ffbe0ea5-a612-5ff7-aaf5-cac02eef3019] (cache misses: wrong dep version loaded (4), mismatched flags (4))
Test Passed
2d. How much faster do you expect the code to run using pmap
?
for i in 1:num_runs @time pmap(x->conv_spectrum.(x),lambdas) end
0.763455 seconds (837.20 k allocations: 38.810 MiB, 1.90% gc time, 8 lock conflicts, 19.47% compilation time) 0.645425 seconds (850.57 k allocations: 44.595 MiB, 1.89% gc time, 34 lock conflicts) 0.606545 seconds (615.42 k allocations: 27.593 MiB) 0.637959 seconds (808.51 k allocations: 41.698 MiB, 2.11% gc time, 20 lock conflicts) 0.626136 seconds (811.42 k allocations: 42.017 MiB, 1.15% gc time, 96 lock conflicts)
2e. By what speed-up factor did the wall time decrease (ignoring the first call affected by compilation)? How can you explain the difference?
response_2e = missing # INSERT responce as string
missing
You were likely a little disappointed in the speed-up factor. What could have gone wrong? In this case, we have a non-trivial, but still modest amount of work to do for each wavelength. pmap
distributes the work one element at a time. The overhead in distributing the work and assembling the pieces likely ate into the potential performance gains. To improve on this, we can tell pmap
to distribute the work in batches. Below, we'll specify a batch_size via an optional named parameter.
#for i in 1:num_runs @time pmap(conv_spectrum,lambdas,batch_size=256) end
for i in 1:num_runs @time pmap(x->conv_spectrum.(x),lambdas,batch_size=256) end
1.495097 seconds (649.93 k allocations: 31.733 MiB, 24.55% compilation time) 0.370933 seconds (52.76 k allocations: 2.093 MiB) 0.366837 seconds (52.76 k allocations: 2.093 MiB) 0.365923 seconds (52.76 k allocations: 2.092 MiB) 0.362787 seconds (52.76 k allocations: 2.093 MiB)
2f. Was pmap
faster when using batches than the the serial version (focusing on the version using broadcasting)? If so, how much? How does this compare to your original expectations?
response_2f = missing # INSERT responce as string
missing
Remember that we originally added one less worker processes than the number of physical cores avaliable to us. Let's try adding one more worker process and comparing the performance.
if nworkers() < num_cores_to_use
procs_without_code = addprocs(num_cores_to_use-nworkers(), exeflags="--threads=1")
else
procs_without_code = Int64[]
end
Int64[]
2g. How long do you expect our function to take using pmap
(using same batch size) now that we have one more worker process?
response_2g = missing # INSERT RESPONSE
missing
Let's test that. But before we can, we'll need to load the code onto the recently added worker node.
@everywhere using Distributions
We don't need/want to reread read and recompile our code on every worker node. Instead, we'll use the @spawnat
macro to tell julia to include
our code only on specific processors.
@spawnat
sends the code to the worker(s), but doesn't wait for the result. Instead it gets a Future that can be used to retreive the results when we need it by calling fetch
. Here, we want to wait until the worker has finished loading the code, so we'll just do both in one line.
map(proc->fetch(@spawnat proc include("src/lab6.jl") ), procs_without_code);
@everywhere using .lab6
@test isapprox(pmap(conv_spectrum,lambdas,batch_size=512), result_ref)
Test Passed
for i in 1:num_runs @time pmap(conv_spectrum,lambdas,batch_size=256) end
0.369926 seconds (52.78 k allocations: 2.199 MiB) 0.362360 seconds (52.91 k allocations: 2.208 MiB) 0.358881 seconds (52.91 k allocations: 2.208 MiB) 0.360506 seconds (52.91 k allocations: 2.208 MiB) 0.359541 seconds (52.91 k allocations: 2.207 MiB)
2h. How did the runtime compare when we used pmap
with as many worker processes as physical cores avaliable compared to when we used one fewer worker processes than the number of physical cores avaliable?
How does this compare to your original expectations? Try to explain any differences.
response_2h = missing # INSERT RESPONSE
missing
Shared Arrays¶
Sometime the map programming pattern is limiting, inefficient, or just too cumbersome to use. In these cases, it can be useful to define a SharedArray
. A SharedArray is only possible when using a shared memory system, i.e., one computer has multiple processor cores that are all able to read and write data stored in a common memory system. Data stored in a SharedArray is visible and accessible to every process. A SharedArray also specifies which indices have been assigned to each worker process. When an operation is to be parallelized, this information can be used to spread the work over the worker processes.
(Note that code using SharedArray's can not run using processors split across multiple compute nodes, as we'll do in the next lab.)
First, we'll create a SharedArray from our existing array of wavelengths.
using SharedArrays
lambdas_shared = SharedArray(lambdas)
typeof(lambdas_shared)
SharedVector{Float64} (alias for SharedArray{Float64, 1})
We apply map
to a SharedArray just like to a regular Array, but the calculation is still performed in serial.
@test isapprox(map(conv_spectrum,lambdas_shared), result_ref)
Test Passed
for i in 1:num_runs @time map(conv_spectrum,lambdas_shared) end
1.249385 seconds (5.46 M allocations: 219.938 MiB, 1.93% gc time) 1.240897 seconds (5.46 M allocations: 219.938 MiB, 1.60% gc time) 1.240714 seconds (5.46 M allocations: 219.938 MiB, 1.62% gc time) 1.243878 seconds (5.46 M allocations: 219.938 MiB, 1.88% gc time) 1.244661 seconds (5.46 M allocations: 219.938 MiB, 1.79% gc time)
Similarly, we can apply pmap
to a SharedArray, and the calculation will be parallelized just like a regular Array.
for i in 1:num_runs @time pmap(x->conv_spectrum(x),lambdas_shared) end
0.856420 seconds (1.12 M allocations: 55.293 MiB, 1.77% gc time, 24 lock conflicts, 19.96% compilation time) 0.581693 seconds (615.44 k allocations: 27.594 MiB) 0.654527 seconds (770.18 k allocations: 39.027 MiB, 1.59% gc time, 20 lock conflicts) 0.653290 seconds (848.87 k allocations: 44.509 MiB, 1.86% gc time, 22 lock conflicts) 0.612801 seconds (729.39 k allocations: 35.828 MiB, 1.19% gc time, 12 lock conflicts)
As before, the performance wasn't particularly impressive.
2i. What do you suggest we do to improve the performance of pmap
applied to a SharedArray?
response_2i = missing # INSERT RESPONSE
missing
Try implementing and benchmarking your suggestion in the cell below.
# TODO: Insert your code
2j. How did the performance compare to using pmap
on regular Array (with similar optimizations)? Is there anything fundamentally different about what the computer is doing that should affect the performance?
response_2j = missing # INSERT RESPONSE
missing
Pro tip: There are fancy ways of initializing a SharedArray that can help minimize the number of allocations and spread the work of initializing the matrix over all the worker processes. In case you're curious, I'll demonstrate below. I find the syntax a little confusing, so I suggest skipping over this for now. But you may want to return to it later if you want to initialize a SharedArray efficiently for your project.
# If we are initializing the array using values stored in a generic Array
function compute_by_initializing_shared_array(x::AbstractArray, spectrum::T) where T<:AbstractSpectrum
SharedArray{Float64}(size(x), init = S-> S[SharedArrays.localindices(S)] .= spectrum.(view(x,SharedArrays.localindices(S))) )
end
# If we are initializing the array using values already stored in a SharedArray, then assign work based on the input SharedArray
function compute_by_initializing_shared_array(x::SharedArray, spectrum::T) where T<:AbstractSpectrum
SharedArray{Float64}(size(x), init = S-> S[SharedArrays.localindices(x)] .= spectrum.(view(x,SharedArrays.localindices(x))) )
end
compute_by_initializing_shared_array (generic function with 2 methods)
for i in 1:num_runs @time compute_by_initializing_shared_array(lambdas_shared,conv_spectrum) end
0.995799 seconds (649.27 k allocations: 32.452 MiB, 21.21% compilation time) 0.354443 seconds (1.12 k allocations: 157.125 KiB) 0.350485 seconds (1.12 k allocations: 156.453 KiB) 0.346233 seconds (1.12 k allocations: 156.734 KiB) 0.346001 seconds (1.12 k allocations: 156.453 KiB)
Distributed Arrays¶
Sometimes, you want to spread the work over more processor cores than are available on a single workstation. Or maybe you don't actually need the features of a SharedArray, so you want to write your code in a more general way, so that it could be parallelized on a distributed memory system. This can be useful even if your job could in principle be run on a single node with $N$ processor cores. If you submit a job that requires $N$ processor cores on the same node, then your job is likely to wait in the queue longer than it you submit a job that requires $N$ processor cores that could be spread over multiple compute nodes. If you want to make use a very large number of cores (e.g., using a cloud provider like Amazon AWS, Google Compute Engine, Microsoft Azure), then you'll need to use a distributed memory system. For a long time, MPI was the most common way to use distributed memory systems. If you're programming with C/C++ or Fortran, then you will likely still be using MPI. (MPI.jl proivdes a wrapper for Julia to use most of the common MPI functions.) Julia's DistributedArrays.jl package is an attempt to make programming for distributed memory systems a little easier.
Like for SharedArrays, there are several ways to initialize a DistributedArray
(DArray
for short) efficiently, where each workers initializes its own data. Functions like dzeros
, dones
, drand
, drandn
and dfill
act similarly to their counterparts without a d
prefix, but create DArrays instead of regular Arrays.
Here we'll create a distributed array by simply applying distribute
to our existing array of wavelengths.
@everywhere using DistributedArrays
lambdas_dist = distribute(lambdas)
typeof(lambdas_dist)
size(lambdas_dist)
(8192,)
As usual, the first time we call a function, it takes some extra time and memory to compile it. So let's do that again, this time benchmarking the distribute
operation.
@time lambdas_dist = distribute(lambdas);
typeof(lambdas_dist)
0.000718 seconds (551 allocations: 357.828 KiB)
DArray{Float64, 1, Vector{Float64}}
In this case, distribute should be quite fast. That's because we're creating a DArray
on a shared memory system. So the computer doesn't actually have to send communications over a network to access the data.
When we apply map to a DArray
, map parallelizes the calculation and returns the results in a DArray
. Each worker only operates on the subset of the array that is local to that worker process.
We can view the part of the array that is local to each worker with localpart()
and localindices()
. Note that none of the array was assigned to the delegator process.
fetch(@spawnat 1 (;indices=DistributedArrays.localindices(lambdas_dist), values=localpart(lambdas_dist)) )
(indices = (1:0,), values = Float64[])
fetch(@spawnat first(workers()) (;indices=DistributedArrays.localindices(lambdas_dist), values=localpart(lambdas_dist)) )
(indices = (1:2048,), values = [5000.0, 5000.12208521548, 5000.24417043096, 5000.3662556464415, 5000.488340861922, 5000.610426077402, 5000.732511292882, 5000.854596508363, 5000.976681723843, 5001.0987669393235 … 5248.809669149066, 5248.931754364547, 5249.053839580027, 5249.175924795507, 5249.298010010988, 5249.420095226468, 5249.542180441948, 5249.664265657429, 5249.78635087291, 5249.90843608839])
2k. Before we benchmark this, what do you expect for the performance of map
applied to DArray?
response_2k = missing # INSERT response
missing
@test isapprox(map(conv_spectrum,lambdas_dist), result_ref)
Test Passed
for i in 1:num_runs @time map(conv_spectrum,lambdas_dist) end
typeof(map(conv_spectrum,lambdas_dist))
0.362717 seconds (720 allocations: 133.984 KiB) 0.346740 seconds (859 allocations: 150.812 KiB) 0.345992 seconds (854 allocations: 143.000 KiB) 0.348022 seconds (860 allocations: 151.344 KiB) 0.347576 seconds (861 allocations: 150.844 KiB)
DArray{Float64, 1, Vector{Float64}}
2l. How did the actual performance compare to your expectations?
response_2l = missing # INSERT response
missing
Since map returned a DArray
, we shouldn't assume that all the data is avaliable to the delegator process.
In this case, we've only added processors that are on the same node, so we know that the data is actually all on one compute node.
But a DArray
can't count on that being true. Indeed, in a future exercise, we'll use multiple cores that are spread over multiple nodes.
In order to bring make all the data in the DArray accessible to the delegator process, we could collect
the data.
2m. How do you expect the performance to compare when we run map on a DArray
with and without collecting the data at the end?
response_2m = missing # INSERT response
missing
for i in 1:num_runs @time collect(map(conv_spectrum,lambdas_dist)) end
0.859669 seconds (816.66 k allocations: 42.419 MiB, 11.19% compilation time) 0.742140 seconds (481.53 k allocations: 25.705 MiB, 1.57% gc time, 30 lock conflicts, 0.82% compilation time) 0.712144 seconds (478.79 k allocations: 25.204 MiB) 0.730331 seconds (479.11 k allocations: 25.210 MiB, 1.27% gc time, 5 lock conflicts) 0.718886 seconds (478.99 k allocations: 25.216 MiB, 0.53% gc time)
2n. How did the actual performance compare to your expectations?
response_2n = missing # INSERT response
missing
Reducing Data Transport¶
Copying all that data back into one array for the delegator process to use added a significant amount to the total time. If the data had to move over the network, then it would have taken even longer.
Fortunately, often you don't actually need to bring all the data back to the delegator process. For example, you might have several calculations that can be done, each leaving the data distributed across many workers, until the very end.
Another common scenario is that you want to performing a task that can be phrased as a mapreduce
programming pattern. For example, imagine that we only wanted to compute the mean squared error between two models.
For testing purposes, it'll be good to first implement it as a serial for loop.
function calc_mse_loop(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, v::Number)
c = lab6.speed_of_light
z = v/c
spec2_shifted = doppler_shifted_spectrum(spec2,z)
tmp1 = spec1(first(lambdas))
tmp2 = spec2_shifted(first(lambdas))
mse = zero(promote_type(typeof(tmp1),typeof(tmp2)))
for i in eachindex(lambdas)
@inbounds l = lambdas[i]
flux1 = spec1(l)
flux2 = spec2_shifted(l)
mse += (flux1-flux2)^2
end
mse /= length(lambdas)
return mse
end
calc_mse_loop (generic function with 1 method)
Now we'll implement the same calculation using mapreduce.
function calc_mse_mapreduce(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, v::Number)
c = lab6.speed_of_light
z = v/c
spec2_shifted = doppler_shifted_spectrum(spec2,z)
mse = mapreduce(x->(spec1(x)-spec2_shifted(x))^2, +, lambdas)
mse /= length(lambdas)
end
calc_mse_mapreduce (generic function with 1 method)
When writing parallel code, it's always good to check that the results are consistent regardless of whether you compute in serial or parallel for some test cases.
@test calc_mse_loop(lambdas,conv_spectrum,conv_spectrum,10.0) ≈ calc_mse_mapreduce(lambdas,conv_spectrum,conv_spectrum,10.0)
Test Passed
2o. How do you expect the performance of calc_mse_mapreduce
version will compare to twice the cost of applying map
to conv_spectrum
and the wavelengths stored as a DArray
?
How do you expect the performance of calc_mse_mapreduce
version will compare to twice the cost of applying map
to conv_spectrum
and the wavelengths stored as a DArray
and collecting the results to the delegator process?
(Remember, that calculating the mean squared error between two models involves evaluating two spectral models for each wavelength.)
response_2o = missing # INSERT response
missing
@time calc_mse_loop(lambdas,conv_spectrum,conv_spectrum,10.0)
@time calc_mse_mapreduce(lambdas,conv_spectrum,conv_spectrum,10.0)
2.477157 seconds (10.93 M allocations: 439.792 MiB, 1.84% gc time) 2.553367 seconds (10.93 M allocations: 439.757 MiB, 1.84% gc time)
8.363489155945859e-10
for i in 1:num_runs @time calc_mse_mapreduce(lambdas_dist,conv_spectrum,conv_spectrum,10.0) end
1.039272 seconds (376.85 k allocations: 18.763 MiB, 22.24% compilation time) 0.705927 seconds (769 allocations: 100.234 KiB) 0.701545 seconds (769 allocations: 100.125 KiB) 0.708945 seconds (769 allocations: 100.125 KiB) 0.693477 seconds (769 allocations: 100.117 KiB)
2o. How did the actual performance compare to your expectations?
response_2o = missing # INSERT response
missing
Distributed for loops¶
While map, pmap and mapreduce can be very convenient, sometimes it's more natural to express your calculation in terms of a for loop. We can do that with julia's @distributed
macro (with more explanation here). We'll try that below.
function compute_using_distributed_for_loop_unsynced(x::AbstractArray, spectrum::T) where T<:AbstractSpectrum
out = zeros(length(x)) # Problematic since workers won't write to out on delegator
@distributed for i in 1:length(x)
out[i] = conv_spectrum(x[i])
end
return out
end
compute_using_distributed_for_loop_unsynced (generic function with 1 method)
for i in 1:num_runs @time res= compute_using_distributed_for_loop_unsynced(lambdas_dist,conv_spectrum) end
0.012007 seconds (12.95 k allocations: 715.258 KiB, 99.77% compilation time) 0.000011 seconds (13 allocations: 65.062 KiB) 0.000011 seconds (13 allocations: 65.062 KiB) 0.000011 seconds (13 allocations: 65.062 KiB) 0.000010 seconds (13 allocations: 65.062 KiB)
Wow, that was fast! Or was it? No. Actually, what happened is that Julia started the computations on the workers, and let the delegator process keep going, without waiting for the workers to finish.
Sometimes we want to do this, so the delegator processes can do other work while waiting on the workers to finish. Also, each worker wrote to its own out array, so we didn't even get the output.
For timing purposes, we want to make sure all the work is complete and the workers have synchronized.
We can do this by adding the @sync
macro.
function compute_using_distributed_for_loop(x::AbstractArray, spectrum::T) where T<:AbstractSpectrum
out = SharedArray(zeros(length(x)))
@sync @distributed for i in 1:length(x)
out[i] = conv_spectrum(x[i])
end
return out
end
compute_using_distributed_for_loop (generic function with 1 method)
@test isapprox( compute_using_distributed_for_loop(lambdas,conv_spectrum) , result_ref)
Test Passed
2p. How do you expect the performance of compute_using_distributed_for_loop
to compare to the performance of using pmap
to compute the spectrum (after using a good batch size)?
response_2p = missing # INSERT response
missing
for i in 1:num_runs @time compute_using_distributed_for_loop(lambdas_dist,conv_spectrum) end
0.417040 seconds (71.49 k allocations: 3.674 MiB, 11.01% compilation time) 0.352252 seconds (1.15 k allocations: 127.562 KiB) 0.355852 seconds (1.15 k allocations: 127.141 KiB) 0.350527 seconds (1.15 k allocations: 127.453 KiB) 0.354305 seconds (1.15 k allocations: 127.141 KiB)
The @distributed
macro also works with data stored in a SharedArray or even regular Array. For example...
for i in 1:num_runs @time compute_using_distributed_for_loop(lambdas_shared,conv_spectrum) end
0.422129 seconds (71.54 k allocations: 3.675 MiB, 10.90% compilation time) 0.348949 seconds (1.20 k allocations: 129.562 KiB) 0.355185 seconds (1.20 k allocations: 129.172 KiB) 0.352396 seconds (1.20 k allocations: 131.109 KiB) 0.352532 seconds (1.20 k allocations: 129.172 KiB)
for i in 1:num_runs @time compute_using_distributed_for_loop(lambdas,conv_spectrum) end
0.353326 seconds (1.04 k allocations: 123.031 KiB) 0.350838 seconds (1.19 k allocations: 132.383 KiB) 0.350677 seconds (1.18 k allocations: 131.484 KiB) 0.349806 seconds (1.18 k allocations: 131.797 KiB) 0.351097 seconds (1.18 k allocations: 131.766 KiB)
2q. How did the benchmarking results compare to your predictions?
response_2q = missing # INSERT response
missing
In this case, we don't notice a significant difference in performance with the distributed for loop depending on how the lambdas were stored, since all three are being stored in a shared memory system. Once we use multiple compute nodes and communications must travel over a network, differences in performance will become more apparent.
There's also an option to specify a reducing operation that is performed on the result of each pass through the @distributed
for loop. (The function to perform the reduction is provided in parentheses before the for
.)
function calc_mse_distributed_loop(lambdas::DArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, v::Number)
c = lab6.speed_of_light
z = v/c
spec2_shifted = doppler_shifted_spectrum(spec2,z)
mse = @distributed (+) for i in eachindex(lambdas)
@inbounds l = lambdas[i]
flux1 = spec1(l)
flux2 = spec2_shifted(l)
(flux1-flux2)^2 # result of last line will be reduced by + operator
end
mse /= length(lambdas)
return mse
end
calc_mse_distributed_loop (generic function with 1 method)
@test calc_mse_loop(lambdas,conv_spectrum,conv_spectrum,10.0) ≈ calc_mse_distributed_loop(lambdas_dist, conv_spectrum, conv_spectrum, 10.0)
Test Passed
2r. How do you expect that the performance of the @distributed
for loop with a reduction operator will perform compared to mapreduce
on a distributed array?
response_2r = missing # RESPONSE
missing
for i in 1:num_runs @time calc_mse_distributed_loop(lambdas_dist, conv_spectrum, conv_spectrum, 10.0) end
0.691082 seconds (660 allocations: 86.969 KiB) 0.690844 seconds (793 allocations: 95.453 KiB) 0.693466 seconds (793 allocations: 95.703 KiB) 0.695591 seconds (793 allocations: 95.453 KiB) 0.692840 seconds (793 allocations: 95.703 KiB)
2s. How did the results compare to your expectations?
response_2s = missing # RESPONSE
missing
Distributed for loop using FLoops¶
Here we'll demonstrate one more way to perform the same calculation in parallel, using the FLoops.jl package. This package is still young, experimental and likely to change. But it seems to hold a lot of promise for enabling programmers to implement more complex calculations in parallel efficiently for the programmer, and hopefully eventually also efficiently for the computers.
using FLoops
[ Info: Precompiling StatsFunsInverseFunctionsExt [da3fed98-1718-55bb-8128-3e4a2e454b06] (cache misses: wrong dep version loaded (2), mismatched flags (2))
function calc_mse_flloop(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, v::Number; ex = ThreadedEx())
c = lab6.speed_of_light
z = v/c
spec2_shifted = doppler_shifted_spectrum(spec2,z)
tmp1 = spec1(first(lambdas))
tmp2 = spec2_shifted(first(lambdas))
mse = zero(promote_type(typeof(tmp1),typeof(tmp2)))
@sync @floop ex for i in eachindex(lambdas)
@inbounds l = lambdas[i]
flux1 = spec1(l)
flux2 = spec2_shifted(l)
@reduce(mse += (flux1-flux2)^2)
end
mse /= length(lambdas)
return mse
end
calc_mse_flloop (generic function with 1 method)
@test calc_mse_loop(lambdas,conv_spectrum,conv_spectrum,10.0) ≈ calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex=DistributedEx() )
Test Passed
for i in 1:num_runs @time calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex=DistributedEx() ) end
1.071059 seconds (2.48 k allocations: 276.414 KiB, 1 lock conflict) 1.113734 seconds (2.51 k allocations: 279.523 KiB) 0.989851 seconds (2.51 k allocations: 279.461 KiB) 0.734157 seconds (2.50 k allocations: 281.398 KiB) 0.695179 seconds (2.50 k allocations: 281.398 KiB)
Comparing Multi-threading vs Multi-processing performance more easily¶
Wouldn't it be nice if we didn't have to rewrite our code for each architecture? There are a few packages that make this possible (e.g., FLoops.jl and KernelAbstractions.jl). Below we'll compare the performance of serial, multi-threaded and multi-processing versions of our function by simply specifying a different executor, as shown below.
for i in 1:num_runs @time calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex = SequentialEx(simd = Val(true)) ) end
2.593608 seconds (11.14 M allocations: 450.995 MiB, 2.00% gc time, 2 lock conflicts, 3.50% compilation time) 2.497195 seconds (10.93 M allocations: 439.795 MiB, 1.78% gc time) 2.484134 seconds (10.93 M allocations: 439.794 MiB, 1.54% gc time) 2.489245 seconds (10.93 M allocations: 439.792 MiB, 1.70% gc time) 2.485698 seconds (10.93 M allocations: 439.792 MiB, 1.54% gc time)
for i in 1:num_runs @time calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex = ThreadedEx() ) end # default if we don't specify ex explicitly
1.005949 seconds (11.56 M allocations: 471.898 MiB, 9.41% gc time, 21.33% compilation time) 0.783500 seconds (10.93 M allocations: 439.804 MiB, 9.81% gc time) 0.783498 seconds (10.93 M allocations: 439.804 MiB, 9.98% gc time) 0.769955 seconds (10.93 M allocations: 439.804 MiB, 9.23% gc time) 0.783891 seconds (10.93 M allocations: 439.804 MiB, 9.47% gc time)
for i in 1:num_runs @time calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex=DistributedEx() ) end
0.710807 seconds (2.37 k allocations: 270.633 KiB) 0.693898 seconds (2.50 k allocations: 277.055 KiB, 1 lock conflict) 0.713572 seconds (2.80 k allocations: 300.898 KiB, 0.50% gc time) 0.697226 seconds (2.57 k allocations: 283.242 KiB) 0.710380 seconds (2.50 k allocations: 281.398 KiB)
2t. Think about your class project and the parallel programming patterns demonstrated in this exercise.
Could your key calculation be phrased as a mapreduce problem? Could it be phrased as one or more map calculations? Is it advantageous to using a distributed for loop or FLoops?
Do you see a reason to implement different parts of the problem using different programming patterns?
If you were to parallelize your code using multi-processing, which of the methods above would you choose? Why?
response_2t = missing #=
"""
Example
multi-line
response
"""
=#;
2u. If you're planning to parallelize your class project for distributed memory, then before you parallelize your class project, get some practice by parallelizing the function calc_χ²
(at bottom of notebook) using whichever parallelization strategy that you plan to use for your project.
# response_u2:
function calc_χ²_my_way(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, σ1::AbstractArray, σ2::AbstractArray, v::Number; #= any optional parameters? =# )
# INSERT YOUR CODE HERE
return NaN
end
calc_χ²_my_way (generic function with 1 method)
σ_obs1 = 0.02*ones(size(lambdas))
σ_obs2 = 0.02*ones(size(lambdas));
function calc_χ²_loop(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, σ1::AbstractArray, σ2::AbstractArray, v::Number )
@assert size(lambdas) == size(σ1) == size(σ2)
c = lab6.speed_of_light
z = v/c
spec2_shifted = doppler_shifted_spectrum(spec2,z)
tmp1 = spec1(first(lambdas))
tmp2 = spec2_shifted(first(lambdas))
χ² = zero(promote_type(typeof(tmp1),typeof(tmp2),eltype(σ1),eltype(σ2)))
for i in eachindex(lambdas)
@inbounds l = lambdas[i]
flux1 = spec1(l)
flux2 = spec2_shifted(l)
@inbounds χ² += (flux1-flux2)^2/(σ1[i]^2+σ2[i]^2)
end
return χ²
end
calc_χ²_loop (generic function with 1 method)
@test abs(calc_χ²_my_way(lambdas,conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 0.0 )) < 1e-8
Test Failed at In[96]:1 Expression: abs(calc_χ²_my_way(lambdas, conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 0.0)) < 1.0e-8 Evaluated: NaN < 1.0e-8
There was an error during testing Stacktrace: [1] record(ts::Test.FallbackTestSet, t::Union{Test.Error, Test.Fail}) @ Test /storage/icds/RISE/sw8/julia/julia-1.11.2/share/julia/stdlib/v1.11/Test/src/Test.jl:1026 [2] do_test(result::Test.ExecutionResult, orig_expr::Any) @ Test /storage/icds/RISE/sw8/julia/julia-1.11.2/share/julia/stdlib/v1.11/Test/src/Test.jl:712 [3] macro expansion @ /storage/icds/RISE/sw8/julia/julia-1.11.2/share/julia/stdlib/v1.11/Test/src/Test.jl:679 [inlined] [4] top-level scope @ In[96]:1
@test calc_χ²_my_way(lambdas,conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 10.0 ) ≈ calc_χ²_loop(lambdas,conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 10.0 )
Test Failed at In[97]:1 Expression: calc_χ²_my_way(lambdas, conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 10.0) ≈ calc_χ²_loop(lambdas, conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 10.0) Evaluated: NaN ≈ 0.008564212895688572
There was an error during testing Stacktrace: [1] record(ts::Test.FallbackTestSet, t::Union{Test.Error, Test.Fail}) @ Test /storage/icds/RISE/sw8/julia/julia-1.11.2/share/julia/stdlib/v1.11/Test/src/Test.jl:1026 [2] do_test(result::Test.ExecutionResult, orig_expr::Any) @ Test /storage/icds/RISE/sw8/julia/julia-1.11.2/share/julia/stdlib/v1.11/Test/src/Test.jl:712 [3] macro expansion @ /storage/icds/RISE/sw8/julia/julia-1.11.2/share/julia/stdlib/v1.11/Test/src/Test.jl:679 [inlined] [4] top-level scope @ In[97]:1