# Astro 528, Lab 6, Exercise 2 ## Parallelization for Multi-Core Workstations (using multiple processes) ### Hardware configuration for this exercise In this lab exercise, we'll explore a few different ways that we can parallelize calculations across multiple cores of a single workstation or server. Some of the syntax and programming patterns are very similar as to what you'll use later when we parallelize our code for distributed computing (i.e., over a number of processors that do not have direct access to the same memory.) Fortunately, most modern workstations and even laptops have multiple cores. Our class has an allocation on Lynx consisting of two standard compute nodes, each with 48 cores. As of Fall 2025, Roar Collab's "standard" compute nodes have 48 cores (some standard nodes have as few as 24 cores and some as as many as 64). You can find [up-to-date information](https://docs.icds.psu.edu/getting-started/compute-hardware/) in the Roar Collab manual. For this exercise, it's important that you actually have access to multiple processor cores. However, if you're using the Lynx portal to access the Jupyter Lab server, then you will need to have requested multiple processor cores when you first submited the request for the JupyterLab server using the box labeled "Number of Cores", i.e. _before you start executing cells in this notebook_. Here's a [screenshot](images/portal_screenshot.png) showing the screen where you specify multiple cores. While we're in class, please ask for just 4 cores, since there are many of us using the system at once. If you return to working on the lab outside of class (and not right before the deadline), then feel free to try benchmarking the code using 8 cores. If you do ask for several cores, then please be extra diligent about deleting your session via the Roar Collab Portal when you're done, so they become avaliable to other members of the class. ### Why Jupyter? For this exercise, we're using a Jupyter notebook, instead of a Pluto notebook. Why? In the previous exercise, we parallelized our code using one process that organized its computation into multiple threads, all within a Pluto notebook. Pluto worked well for that kind of parallelism. In contrast, for this exercise, we're going to parallelize our code using multiple processes. Pluto already uses multiple processes to analyze, run and display the Pluto notebook, so you can't (easily) parallelize your code over multiple processes within a Pluto notebook environment. Since each Jupyter notebook is executed using a separate Julia kernel, they are more ammenable to parallelization via multiple processes. ### Multi-process vs Multi-threading As discussed in [the previous exercise](https://psuastro528.github.io/lab6/ex1.html), both approaches are viable ways to parallelize code over multiple processors contained within one workstation/compute node/server. Multiple threads within a single process all have access to the memory that the operating system (OS) has assigned to that process. However, different processes are assigned separate memory locations. While it is possible to send data from one process to another, it needs to be done explicitly. For simples codes, I usually start with multi-threading, as it tends to be a little easier to implement and is often more efficient. For more complex codes (e.g., integrating PDEs), there can be advanages to using multiple processes (e.g., reducing need for memory locks). However, the main reason I want students to get experience parallelizing code using multiple processes is that this apporach is necessary to scale up the level parallelism to use more compute cores than exist in compute node. If we write (and test) our code to run in parallel using multiple processes on one workstation, then there will be relatively little work to implement a distributed memory parallelization. ### Installing packages First, let's make sure that the packages we'll use are installed. <<>>= using Pkg Pkg.UPDATED_REGISTRY_THIS_SESSION[] = true # Tell Julia package manager not to download an updated registry for speed's sake Pkg.activate(".") # Project.toml specifies the packages required for this lab Pkg.instantiate() # Install any packages needed @ We'll want the packages that we plan to use for our computations below to be loaded and in scope on each worker process. In order to make sure that we don't trigger multiple worker processes trying to precompile packages at once, we'll first have the delegator process precompile all the packages used in this exercise. <<>>= Pkg.precompile() @ ### Getting setup for parallel computation First, let us load Julia's `Distributed` module that provides much of the needed functionality for communicating across processes (that could be on the same node or different nodes). <<>>= using Distributed nworkers() @ Note that even if you have a Jupyter notebook server (or remote desktop or Slum, PBS or HTCondor job) that has been allocated multiple cores, that doesn't mean that you'll use them. By default, Julia starts running on a single core which is the one worker process. To enable parallel computing, we need to tell Julia how many processor cores to use. By default, we're just using one. How many should we add? Let's get some information about the hardware that we're running on. <<>>= Sys.cpu_summary() @ As discussed in exercise 1, just because the compute node we've been assigned has many cores, doesn't mean that we're allowed to use all of them. We'll check if this notebook is running inside a Slurm (or PBS) job, and if so, use the number of cores that we've been assigned. If we're not inside a Slurm (or PBS) job, then we'll use the number of physical cores. <<>>= using CpuId @ <<>>= if haskey(ENV,"PBS_NUM_PPN") # Check PBS env variable if running on Roar Restricted num_cores_to_use = parse(Int64,ENV["PBS_NUM_PPN"]) elseif haskey(ENV,"SLURM_CPUS_PER_TASK") # Check Slurm env variable if running on Roar Collab or Lynx num_cores_to_use = parse(Int64,ENV["SLURM_CPUS_PER_TASK"]) else # Presumably running locally and can use all cores num_physical_cores = cpucores() num_cores_to_use = num_physical_cores end @ 2a. How many cores will you be using for this exercise (i.e., the result of cell above)? <<>>= response_2a = missing # Explicilty set to integer (i.e., don't set to num_cores_to_use, so we'll be able to provide appropriate feedback on your future responces.) @ Next, we'll tell Julia to add several worker processes. <<>>= if num_cores_to_use>1 workers_ids = addprocs(num_cores_to_use-1, exeflags="--threads=1") end @ `addprocs(N)` returned a list of ids that you can use to refer to specific workers within Julia. (We won't be using these, but it can be useful if you want finer grained control.) If you're every unsure of how many workers you currently have avaliable, you can run `nworkers()`. <<>>= nworkers() @ <<>>= if length(workers_ids) >1 map(proc->fetch(@spawnat proc Threads.nthreads()) , workers_ids) end @ Note that the list of workers now excludes 1 and the number of workers is equal to the number you added, rather than being one plus that number. Why? The original process is now in charge of delegating the work amongst the worker processes. (Historically, this process was referred to as the "master process", and you're likely to still that terminology in online documentation and questions. I'll try to use the "delegator process" instead.) When you have a lot of workers, it's often advantageous to let one process focus on managing all the communications and other overhead associated with parallelization. That way the worker processes are likely to stay more nearly in sync, rather than having to repeatedly wait for the delegator process to catch up. On the other hand, if there is relatively little overhead work for the delegator process, then it is often useful to set the number of workers equal to the number of available CPU cores, even though that means one process will have to do a little more work than the others. We'll try both below. ### Loading Packages and modules Now, that we have loaded the `Distributed` package, we can use the functions and macros that it provides to start code and packages that we want to use during this exercise onto the other processes. Unlike when we were using multi-threading, with distributed computing each worker has its own chunk of memory and the programmer can control whether code is to be loaded on every process or just one/some. When running on Lynx (or Roar Collab), we don't need to install or instantiate the pacakages on each worker, since we already installed them above and each worker has access to the same filesystem. Again, we'll use the `@everywhere` macro in front of the `using` statement to load packages on both the delegator and worker processes. <<>>= @everywhere using Distributions @ Typing `@everywhere` in front of every line of code would get very annoying fast. So it's better to put the common code that you want every worker to load into a separate file (and even better is to organize the code into one or more modules or packages). Then, we can load a whole bunch of functions that we'll want to use with one line. In this exercise, we'll load the same code as in exercise 1 of this lab. (As before, you don't need to review all the code in `src` unless you find it interesting or useful to see the programming patterns being demonstrated.) The main difference in this exercise is that we'll include the file `src/lab6.jl` instead of the file `model_spectrum.jl`. `lab6.jl` includes the code from `model_spectrum.jl` (and the other files that it includes) and wraps them in a module named `lab6`. (In contrast, Pluto's `@ingredients` macro automatically wraps code from a file into a module and returns it, so that Pluto can keep track of dependancies, which it needs to do to make a reactive notebook.) We preface the `include` with `@everywhere`, since we want all of the processors to be able to make use of these function and types. <<>>= @everywhere include("src/lab6.jl") @ Now, we'll bring the variables and functions that are exported by the lab6 module into scope. Note that we include a `.` to tell Julia that the module is already in the current namespace and it doesn't need to load a package. <<>>= using .lab6 @ ### Initialize data to be analyzed In this exercise, we're going to create a model spectrum consisting of continuum, stellar absorption lines, telluric absorption lines. The `ModelSpectrum` module provides a `SimulatedSpectrum` type, but we'll need to initialize a variable with some specific parameter values. The function will do that for us. <<>>= "Create an object that provides a model for the raw spetrum (i.e., before entering the telescope)" function make_spectrum_object(;lambda_min = 4500, lambda_max = 7500, flux_scale = 1.0, num_star_lines = 200, num_telluric_lines = 100, limit_line_effect = 10.0) continuum_param = flux_scale .* [1.0, 1e-5, -2e-8] star_line_locs = rand(Uniform(lambda_min,lambda_max),num_star_lines) star_line_widths = fill(1.0,num_star_lines) star_line_depths = rand(Uniform(0,1.0),num_star_lines) telluric_line_locs = rand(Uniform(lambda_min,lambda_max),num_telluric_lines) telluric_line_widths = fill(0.2,num_telluric_lines) telluric_line_depths = rand(Uniform(0,0.4),num_telluric_lines) SimulatedSpectrum(star_line_locs,star_line_widths,star_line_depths,telluric_line_locs,telluric_line_widths,telluric_line_depths,continuum_param=continuum_param,lambda_mid=0.5*(lambda_min+lambda_max),limit_line_effect=limit_line_effect) end @ Next, we: 1. create a set of wavelengths to observe the spectrum at, 2. call the function above to create a spectrum object, 3. create an object containing a model for the point spread function, and 4. create an object that can compute the convolution of our spectral model with the point spread function model. <<>>= # 1. Pick range of of wavelength to work on. lambda_min = 5000 lambda_max = 6000 # You may want to adjust the num_lambda to make things more/less computationally intensive num_lambda = 8*1024 lambdas = collect(range(lambda_min,stop=lambda_max, length=num_lambda)); # 2. Create a model spectrum that we'll analyze below raw_spectrum = make_spectrum_object(lambda_min=lambda_min,lambda_max=lambda_max) # 3. Create a model for the point spread function (PSF) psf_widths = [0.5, 1.0, 2.0] psf_weights = [0.8, 0.15, 0.05] psf_model = GaussianMixtureConvolutionKernel(psf_widths,psf_weights) # 4. Create a model for the the convolution of thte raw spectrum with the PDF model conv_spectrum = ConvolvedSpectrum(raw_spectrum,psf_model) @ <<>>= result_ref = conv_spectrum.(lambdas); @ ### Visualize what we've created This is very similar to exercise 1. If you'd like to plot the raw spectrum and the convolved spectrum to see what's going on, there's some plotting code below. The cells with plots below require that you first load the `Plots` package. If you are confident you know what it looks like, then you can save some time, by not loading the Plots package. If you'd like to plot the spectrum model we'll be generating, then let go ahead and load the plots package now. Since we don't need the worker processes to plot things, we don't need to load the Plots code on all the worker processes. <<>>= using Plots @ <<>>= plot(lambdas,raw_spectrum.(lambdas),xlabel="λ", ylabel="Flux", label="Raw spectrum", legend=:bottomright) plot!(lambdas,conv_spectrum.(lambdas), label="Convolved spectrum") @ That's fairly crowded, you it may be useful to zoom in on a narrower range. <<>>= idx_plot = 1:min(1024,length(lambdas)) plot(lambdas[idx_plot],raw_spectrum.(lambdas[idx_plot]),xlabel="λ", ylabel="Flux", label="Raw spectrum", legend=:bottomright) plot!(lambdas[idx_plot],result_ref[idx_plot], label="Convolved spectrum") @ ### Benchmarking serial code First, let's benchmark the calculation of this spectrum on a single processor. To keep things speedy, we'll time each method just a few times, rather than using `@btime`. <<>>= num_runs = 3; @ <<>>= for i in 1:num_runs @time conv_spectrum(lambdas); end @ Thinking back to [exercise 1](https://psuastro528.github.io/lab6/ex1.html), you may remember that we improved the performance by reducing memory allocations. There were a few ways to do that, and we'll repeat just a few of them here for reference. <<>>= function compute_using_for_loop(x::AbstractArray, spectrum::T) where T<:AbstractSpectrum out = zeros(length(x)) for i in 1:length(x) out[i] = conv_spectrum(x[i]) end return out end @ <<>>= for i in 1:num_runs @time compute_using_for_loop(lambdas,conv_spectrum) end @ As before, writing the code as an explicit for loop significantly decreased the number of allocations and improved performance. ### Broadcasting Thinking about to some previous labs, we made use of Julia's [dot syntax](https://docs.julialang.org/en/v1/manual/functions/#man-vectorized-1) to [\"broadcast\" and \"fuse\"](https://docs.julialang.org/en/v1/base/arrays/#Broadcast-and-vectorization-1) the array operation. 2b. How do you expect the performance and memory allocations when we use broadcasting will compare the performance and memory allocation when we used a for loop above? <<>>= response_2b = missing # INSERT response as String, e.g. "The same" or "50% faster". @ <<>>= for i in 1:num_runs @time conv_spectrum.(lambdas); end @ 2c. How did the results compare to your prediction? If different, what could explain the difference(s)? <<>>= response_2c = missing # INSERT response as String @ ### map Again, thinking back to the last exercise, a very useful and closely related programming pattern is [map](https://docs.julialang.org/en/v1/base/collections/#Base.map) (or [mapreduce](https://docs.julialang.org/en/v1/base/collections/#Base.mapreduce-Tuple{Any,Any,Any})). `map(func,collection)` applies func to every element of the collection and returns a collection similar in size to collection. <<>>= for i in 1:num_runs @time map(conv_spectrum,lambdas) end @ The performance of map results should be very similar to the broadcasted and fused version above. Before we start parallelizing code, let's increase the number of times we'll benchmark each function. <<>>= num_runs = 5; @ Notice that `num_runs` has now been set to two different values at different parts of this notebook. The value of `num_runs` will depend on which of the cells has been run most recently. Sometimes, it can be convenient to change a variable like this. But it also makes it very easy to confuse yourself. E.g., the code that you see in the notebook at any time is no longer guaranteed to be a complete representation of what the computer has computed. (When I first created this course, I used Jupyter notebooks. By far the most common problem students encountered was running cells out of order and getting into some unintended state. So my first suggestion was nearly always "Restart your kernel and step through the notebook in order." While that's not a big deal for small notebooks, it can quickly become tedious and time consuming. Pluto solved that problem! That's why I'm using Jupyter for only a few exercises that wouldn't work well within a Pluto notebook.) ### Parallelizing with pmap If you can write your computations in terms of calling `map`, then one easy way to parallelize your code is to replace the call to `map` with a call to `pmap`, a parallel map. If you only have one worker process, then it will still run in serial. But if you have multiple workers, then `pmap` will parallelize your code. When parallelizing code, it's always good to double check that your results are consistent with the reference implementation. <<>>= using Test @test isapprox(pmap(x->conv_spectrum.(x),lambdas), result_ref) @ 2d. How much faster do you expect the code to run using `pmap`? response_2d = missing # INSERT responce as string <<>>= for i in 1:num_runs @time pmap(x->conv_spectrum.(x),lambdas) end @ 2e. By what speed-up factor did the wall time decrease (ignoring the first call affected by compilation)? How can you explain the difference? <<>>= response_2e = missing # INSERT responce as string @ You were likely a little disappointed in the speed-up factor. What could have gone wrong? In this case, we have a non-trivial, but still modest amount of work to do for each wavelength. `pmap` distributes the work one element at a time. The overhead in distributing the work and assembling the pieces likely ate into the potential performance gains. To improve on this, we can tell `pmap` to distribute the work in batches. Below, we'll specify a batch_size via an optional named parameter. <<>>= #for i in 1:num_runs @time pmap(conv_spectrum,lambdas,batch_size=256) end @ <<>>= for i in 1:num_runs @time pmap(x->conv_spectrum.(x),lambdas,batch_size=256) end @ 2f. Was `pmap` faster when using batches than the the serial version (focusing on the version using broadcasting)? If so, how much? How does this compare to your original expectations? <<>>= response_2f = missing # INSERT responce as string @ Remember that we originally added one less worker processes than the number of physical cores avaliable to us. Let's try adding one more worker process and comparing the performance. <<>>= if nworkers() < num_cores_to_use procs_without_code = addprocs(num_cores_to_use-nworkers(), exeflags="--threads=1") else procs_without_code = Int64[] end @ <<>>= nworkers() @ 2g. How long do you expect our function to take using `pmap` (using same batch size) now that we have one more worker process? <<>>= response_2g = missing # INSERT RESPONSE @ Let's test that. But before we can, we'll need to load the code onto the recently added worker node. <<>>= @everywhere using Distributions @ We don't need/want to reread read and recompile our code on every worker node. Instead, we'll use the [`@spawnat` macro](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.@spawnat) to tell julia to `include` our code only on specific processors. `@spawnat` sends the code to the worker(s), but doesn't wait for the result. Instead it gets a [Future](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.Future) that can be used to retreive the results when we need it by calling [`fetch`](https://docs.julialang.org/en/v1/stdlib/Distributed/#Base.fetch-Tuple{Distributed.Future}). Here, we want to wait until the worker has finished loading the code, so we'll just do both in one line. <<>>= map(proc->fetch(@spawnat proc include("src/lab6.jl") ), procs_without_code); @ <<>>= @everywhere using .lab6 @ <<>>= @test isapprox(pmap(conv_spectrum,lambdas,batch_size=256), result_ref) @ <<>>= for i in 1:num_runs @time pmap(conv_spectrum,lambdas,batch_size=256) end @ 2h. How did the runtime compare when we used `pmap` with as many worker processes as physical cores avaliable compared to when we used one fewer worker processes than the number of physical cores avaliable? How does this compare to your original expectations? Try to explain any differences. <<>>= response_2h = missing # INSERT RESPONSE @ ### Shared Arrays Sometime the map programming pattern is limiting, inefficient, or just too cumbersome to use. In these cases, it can be useful to define a [`SharedArray`](https://docs.julialang.org/en/v1/manual/distributed-computing/#man-shared-arrays). A SharedArray is only possible when using a [shared memory system](https://en.wikipedia.org/wiki/Shared_memory), i.e., one computer has multiple processor cores that are all able to read and write data stored in a common memory system. Data stored in a SharedArray is visible and accessible to every process. A SharedArray also specifies which indices have been assigned to each worker process. When an operation is to be parallelized, this information can be used to spread the work over the worker processes. (Note that code using SharedArray's can not run using processors split across multiple compute nodes, as we'll do in the next lab.) First, we'll create a SharedArray from our existing array of wavelengths. <<>>= using SharedArrays @ <<>>= lambdas_shared = SharedArray(lambdas) typeof(lambdas_shared) @ We apply `map` to a SharedArray just like to a regular Array, but the calculation is still performed in serial. <<>>= @test isapprox(map(conv_spectrum,lambdas_shared), result_ref) @ <<>>= for i in 1:num_runs @time map(conv_spectrum,lambdas_shared) end @ Similarly, we can apply `pmap` to a SharedArray, and the calculation will be parallelized just like a regular Array. <<>>= for i in 1:num_runs @time pmap(x->conv_spectrum(x),lambdas_shared) end @ As before, the performance wasn't particularly impressive. 2i. What do you suggest we do to improve the performance of `pmap` applied to a SharedArray? <<>>= response_2i = missing # INSERT RESPONSE @ Try implementing and benchmarking your suggestion in the cell below. <<>>= # TODO: Insert your code @ 2j. How did the performance compare to using `pmap` on regular Array (with similar optimizations)? Is there anything fundamentally different about what the computer is doing that should affect the performance? <<>>= response_2j = missing # INSERT RESPONSE @ Pro tip: There are fancy ways of initializing a SharedArray that can help minimize the number of allocations and spread the work of initializing the matrix over all the worker processes. In case you're curious, I'll demonstrate below. I find the syntax a little confusing, so I suggest skipping over this for now. But you may want to return to it later if you want to initialize a SharedArray efficiently for your project. <<>>= # If we are initializing the array using values stored in a generic Array function compute_by_initializing_shared_array(x::AbstractArray, spectrum::T) where T<:AbstractSpectrum SharedArray{Float64}(size(x), init = S-> S[SharedArrays.localindices(S)] .= spectrum.(view(x,SharedArrays.localindices(S))) ) end # If we are initializing the array using values already stored in a SharedArray, then assign work based on the input SharedArray function compute_by_initializing_shared_array(x::SharedArray, spectrum::T) where T<:AbstractSpectrum SharedArray{Float64}(size(x), init = S-> S[SharedArrays.localindices(x)] .= spectrum.(view(x,SharedArrays.localindices(x))) ) end @ <<>>= for i in 1:num_runs @time compute_by_initializing_shared_array(lambdas_shared,conv_spectrum) end @ ## Distributed Arrays Sometimes, you want to spread the work over more processor cores than are available on a single workstation. Or maybe you don't actually need the features of a SharedArray, so you want to write your code in a more general way, so that it could be parallelized on a [distributed memory system](https://en.wikipedia.org/wiki/Distributed_memory) in the future. This can be useful even if your job could in principle be run on a single node with $N$ processor cores. If you submit a job that requires $N$ processor cores on the same node, then your job is likely to wait in the queue longer than it you submit a job that requires $N$ processor cores that could be spread over multiple compute nodes. If you want to make use a very large number of cores (e.g., using a cloud provider like Amazon AWS, Google Compute Engine, Microsoft Azure), then you'll need to use a distributed memory system. For a long time, [MPI](https://en.wikipedia.org/wiki/Message_Passing_Interface) was the most common way to use distributed memory systems. If you're programming with C/C++ or Fortran, then you will likely still be using MPI. ([MPI.jl](https://github.com/JuliaParallel/MPI.jl) proivdes a wrapper for Julia to use most of the common MPI functions.) Julia's [DistributedArrays.jl](https://juliaparallel.github.io/DistributedArrays.jl/latest/index.html) package is an attempt to make programming for distributed memory systems a little easier. Now there are higher-level packages layered on top of DistributedArrays.jl, such as [Dagger.jl](https://juliaparallel.org/Dagger.jl/dev/) that can make it every easier. Like for SharedArrays, there are several ways to initialize a `DistributedArray` (`DArray` for short) efficiently, where each workers initializes its own data. Functions like `dzeros`, `dones`, `drand`, `drandn` and `dfill` act similarly to their counterparts without a `d` prefix, but create DArrays instead of regular Arrays. Here we'll create a distributed array by simply applying `distribute` to our existing array of wavelengths. <<>>= @everywhere using DistributedArrays lambdas_dist = distribute(lambdas) typeof(lambdas_dist) size(lambdas_dist) @ As usual, the first time we call a function, it takes some extra time and memory to compile it. So let's do that again, this time benchmarking the `distribute` operation. <<>>= @time lambdas_dist = distribute(lambdas); typeof(lambdas_dist) @ In this case, distribute should be quite fast. That's because we're creating a `DArray` on a shared memory system. So the computer doesn't actually have to send communications over a network to access the data. When we apply map to a `DArray`, map parallelizes the calculation and returns the results in a `DArray`. Each worker only operates on the subset of the array that is local to that worker process. We can view the part of the array that is local to each worker with `localpart()` and `localindices()`. Note that none of the array was assigned to the delegator process. <<>>= fetch(@spawnat 1 (;indices=DistributedArrays.localindices(lambdas_dist), values=localpart(lambdas_dist)) ) @ <<>>= fetch(@spawnat first(workers()) (;indices=DistributedArrays.localindices(lambdas_dist), values=localpart(lambdas_dist)) ) @ 2k. Before we benchmark this, what do you expect for the performance of `map` applied to DArray? <<>>= response_2k = missing # INSERT response @ <<>>= @test isapprox(map(conv_spectrum,lambdas_dist), result_ref) @ <<>>= for i in 1:num_runs @time map(conv_spectrum,lambdas_dist) end typeof(map(conv_spectrum,lambdas_dist)) @ 2l. How did the actual performance compare to your expectations? <<>>= response_2l = missing # INSERT response @ Since map returned a `DArray`, we shouldn't assume that all the data is avaliable to the delegator process. In this case, we've only added processors that are on the same node, so we know that the data is actually all on one compute node. But a `DArray` can't count on that being true. Indeed, in a future exercise, we'll use multiple cores that are spread over multiple nodes. In order to bring make all the data in the DArray accessible to the delegator process, we could `collect` the data. 2m. How do you expect the performance to compare when we run map on a `DArray` with and without collecting the data at the end? <<>>= response_2m = missing # INSERT response @ <<>>= for i in 1:num_runs @time collect(map(conv_spectrum,lambdas_dist)) end @ 2n. How did the actual performance compare to your expectations? <<>>= response_2n = missing # INSERT response @ ## Reducing Data Transport Copying all that data back into one array for the delegator process to use added a significant amount to the total time. If the data had to move over the network, then it would have taken even longer. Fortunately, often you don't actually need to bring all the data back to the delegator process. For example, you might have several calculations that can be done, each leaving the data distributed across many workers, until the very end. Another common scenario is that you want to performing a task that can be phrased as a [`mapreduce`](https://en.wikipedia.org/wiki/MapReduce) programming pattern. For example, imagine that we only wanted to compute the mean squared error between two models. For testing purposes, it'll be good to first implement it as a serial for loop. <<>>= function calc_mse_loop(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, v::Number) c = lab6.speed_of_light z = v/c spec2_shifted = doppler_shifted_spectrum(spec2,z) eltype_spec1 = first(Base.return_types(spec1,(eltype(lambdas),))) eltype_spec2_shifted = first(Base.return_types(spec2_shifted,(eltype(lambdas),))) mse = zero(promote_type(eltype_spec1,eltype_spec2_shifted)) for i in eachindex(lambdas) @inbounds l = lambdas[i] flux1 = spec1(l) flux2 = spec2_shifted(l) mse += (flux1-flux2)^2 end mse /= length(lambdas) return mse end @ Now we'll implement the same calculation using mapreduce. <<>>= function calc_mse_mapreduce(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, v::Number) c = lab6.speed_of_light z = v/c spec2_shifted = doppler_shifted_spectrum(spec2,z) mse = mapreduce(x->(spec1(x)-spec2_shifted(x))^2, +, lambdas) mse /= length(lambdas) end @ When writing parallel code, it's always good to check that the results are consistent regardless of whether you compute in serial or parallel for some test cases. <<>>= @test calc_mse_loop(lambdas,conv_spectrum,conv_spectrum,10.0) ≈ calc_mse_mapreduce(lambdas,conv_spectrum,conv_spectrum,10.0) @ 2o. How do you expect the performance of `calc_mse_mapreduce` version will compare to twice the cost of applying `map` to `conv_spectrum` and the wavelengths stored as a `DArray`? How do you expect the performance of `calc_mse_mapreduce` version will compare to twice the cost of applying `map` to `conv_spectrum` and the wavelengths stored as a `DArray` *and* collecting the results to the delegator process? (Remember, that calculating the mean squared error between two models involves evaluating two spectral models for each wavelength.) <<>>= response_2o = missing # INSERT response @ <<>>= @time calc_mse_loop(lambdas,conv_spectrum,conv_spectrum,10.0) @time calc_mse_mapreduce(lambdas,conv_spectrum,conv_spectrum,10.0) @ <<>>= for i in 1:num_runs @time calc_mse_mapreduce(lambdas_dist,conv_spectrum,conv_spectrum,10.0) end @ 2o. How did the actual performance compare to your expectations? <<>>= response_2o = missing # INSERT response @ ### Distributed for loops While map, pmap and mapreduce can be very convenient, sometimes it's more natural to express your calculation in terms of a for loop. We can do that with julia's [`@distributed`](https://docs.julialang.org/en/v1/stdlib/Distributed/#Distributed.@distributed) macro (with more explanation [here](https://docs.julialang.org/en/v1/manual/parallel-computing/#Parallel-Map-and-Loops-1)). We'll try that below. <<>>= function compute_using_distributed_for_loop_unsynced(x::AbstractArray, spectrum::T) where T<:AbstractSpectrum out = zeros(length(x)) # Problematic since workers won't write to out on delegator @distributed for i in 1:length(x) out[i] = conv_spectrum(x[i]) end return out end @ <<>>= for i in 1:num_runs @time res= compute_using_distributed_for_loop_unsynced(lambdas_dist,conv_spectrum) end @ Wow, that was fast! Or was it? No. Actually, what happened is that Julia started the computations on the workers, and let the delegator process keep going, without waiting for the workers to finish. Sometimes we want to do this, so the delegator processes can do other work while waiting on the workers to finish. Also, each worker wrote to its own out array, so we didn't even get the output. For timing purposes, we want to make sure all the work is complete and the workers have synchronized. We can do this by adding the [`@sync` macro](https://docs.julialang.org/en/v1/stdlib/Distributed/#Base.@sync). <<>>= function compute_using_distributed_for_loop(x::AbstractArray, spectrum::T) where T<:AbstractSpectrum out = SharedArray(zeros(length(x))) @sync @distributed for i in 1:length(x) out[i] = conv_spectrum(x[i]) end return out end @ <<>>= @test isapprox( compute_using_distributed_for_loop(lambdas,conv_spectrum) , result_ref) @ 2p. How do you expect the performance of `compute_using_distributed_for_loop` to compare to the performance of using `pmap` to compute the spectrum (after using a good batch size)? <<>>= response_2p = missing # INSERT response @ <<>>= for i in 1:num_runs @time compute_using_distributed_for_loop(lambdas_dist,conv_spectrum) end @ The `@distributed` macro also works with data stored in a SharedArray or even regular Array. For example... <<>>= for i in 1:num_runs @time compute_using_distributed_for_loop(lambdas_shared,conv_spectrum) end @ <<>>= for i in 1:num_runs @time compute_using_distributed_for_loop(lambdas,conv_spectrum) end @ 2q. How did the benchmarking results compare to your predictions? <<>>= response_2q = missing # INSERT response @ In this case, we don't notice a significant difference in performance with the distributed for loop depending on how the lambdas were stored, since all three are being stored in a shared memory system. Once we use multiple compute nodes and communications must travel over a network, differences in performance will become more apparent. There's also an option to specify a reducing operation that is performed on the result of each pass through the `@distributed` for loop. (The function to perform the reduction is provided in parentheses before the `for`.) <<>>= function calc_mse_distributed_loop(lambdas::DArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, v::Number) c = lab6.speed_of_light z = v/c spec2_shifted = doppler_shifted_spectrum(spec2,z) mse = @distributed (+) for i in eachindex(lambdas) @inbounds l = lambdas[i] flux1 = spec1(l) flux2 = spec2_shifted(l) (flux1-flux2)^2 # result of last line will be reduced by + operator end mse /= length(lambdas) return mse end @ <<>>= @test calc_mse_loop(lambdas,conv_spectrum,conv_spectrum,10.0) ≈ calc_mse_distributed_loop(lambdas_dist, conv_spectrum, conv_spectrum, 10.0) @ 2r. How do you expect that the performance of the `@distributed` for loop with a reduction operator will perform compared to `mapreduce` on a distributed array? <<>>= response_2r = missing # RESPONSE @ <<>>= for i in 1:num_runs @time calc_mse_distributed_loop(lambdas_dist, conv_spectrum, conv_spectrum, 10.0) end @ 2s. How did the results compare to your expectations? <<>>= response_2s = missing # RESPONSE @ ## Distributed for loop using FLoops Here we'll demonstrate one more way to perform the same calculation in parallel, using the [FLoops.jl](https://juliafolds.github.io/FLoops.jl/dev/) package. This package is power, but it seems to have remained a bit experimental rather than becoming well documented. Still, it is nice for enabling programmers to implement more complex calculations in parallel efficiently for the programmer, and hopefully eventually also efficiently for the computers. <<>>= using FLoops @ <<>>= function calc_mse_flloop(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, v::Number; ex = ThreadedEx()) c = lab6.speed_of_light z = v/c spec2_shifted = doppler_shifted_spectrum(spec2,z) eltype_spec1 = first(Base.return_types(spec1,(eltype(lambdas),))) eltype_spec2_shifted = first(Base.return_types(spec2_shifted,(eltype(lambdas),))) mse = zero(promote_type(eltype_spec1,eltype_spec2_shifted)) @sync @floop ex for i in eachindex(lambdas) @inbounds l = lambdas[i] flux1 = spec1(l) flux2 = spec2_shifted(l) @reduce(mse += (flux1-flux2)^2) end mse /= length(lambdas) return mse end @ <<>>= @test calc_mse_loop(lambdas,conv_spectrum,conv_spectrum,10.0) ≈ calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex=DistributedEx() ) @ <<>>= for i in 1:num_runs @time calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex=DistributedEx() ) end @ ## Comparing Multi-threading vs Multi-processing performance more easily Wouldn't it be nice if we didn't have to rewrite our code for each architecture? There are a few packages that make this possible (e.g., [FLoops.jl](https://juliafolds.github.io/FLoops.jl/dev/) and [KernelAbstractions.jl](https://juliagpu.github.io/KernelAbstractions.jl/stable/)). Below we'll compare the performance of serial, multi-threaded and multi-processing versions of our function by simply specifying a different *executor*, as shown below. <<>>= for i in 1:num_runs @time calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex = SequentialEx(simd = Val(true)) ) end @ <<>>= for i in 1:num_runs @time calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex = ThreadedEx() ) end # default if we don't specify ex explicitly @ <<>>= for i in 1:num_runs @time calc_mse_flloop(lambdas, conv_spectrum, conv_spectrum, 10.0, ex=DistributedEx() ) end @ 2t. Think about your class project and the parallel programming patterns demonstrated in this exercise. Could your key calculation be phrased as a mapreduce problem? Could it be phrased as one or more map calculations? Is it advantageous to using a distributed for loop or FLoops? Do you see a reason to implement different parts of the problem using different programming patterns? If you were to parallelize your code using multi-processing, which of the methods above would you choose? Why? <<>>= response_2t = missing #= """ Example multi-line response """ =#; @ 2u. *If* you're planning to parallelize your class project for distributed memory, then before you parallelize your class project, get some practice by parallelizing the function `calc_χ²` (at bottom of notebook) using whichever parallelization strategy that you plan to use for your project. <<>>= # response_u2: function calc_χ²_my_way(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, σ1::AbstractArray, σ2::AbstractArray, v::Number; #= any optional parameters? =# ) # INSERT YOUR CODE HERE return NaN end @ <<>>= σ_obs1 = 0.02*ones(size(lambdas)) σ_obs2 = 0.02*ones(size(lambdas)); @ <<>>= function calc_χ²_loop(lambdas::AbstractArray, spec1::AbstractSpectrum, spec2::AbstractSpectrum, σ1::AbstractArray, σ2::AbstractArray, v::Number ) @assert size(lambdas) == size(σ1) == size(σ2) c = lab6.speed_of_light z = v/c spec2_shifted = doppler_shifted_spectrum(spec2,z) eltype_spec1 = first(Base.return_types(spec1,(eltype(lambdas),))) eltype_spec2_shifted = first(Base.return_types(spec2_shifted,(eltype(lambdas),))) χ² = zero(promote_type(eltype_spec1,eltype_spec2_shifted)) for i in eachindex(lambdas) @inbounds l = lambdas[i] flux1 = spec1(l) flux2 = spec2_shifted(l) @inbounds χ² += (flux1-flux2)^2/(σ1[i]^2+σ2[i]^2) end return χ² end @ <<>>= @test abs(calc_χ²_my_way(lambdas,conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 0.0 )) < 1e-8 @ <<>>= @test calc_χ²_my_way(lambdas,conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 10.0 ) ≈ calc_χ²_loop(lambdas,conv_spectrum, conv_spectrum, σ_obs1, σ_obs2, 10.0 ) @ <<>>= @