Disclaimer: some code and commands had not been tested.
A time once comes when computation is too expensive to run. If you are lucky, you can adjust the parameters of the numerical method or find a better one. If that is not the case, you are now looking at what the phrase parallel computing promises.
Often the numerical tasks can be reduced to actor-model, meaning that there is a single master giving orders to the computing slaves. Most programming languages give many tools for these capabilities; however, the way and complexity can alienate us from using them. This is another strength of Julia which shows us a better and more elegant way, which is this blog post.
The most straightforward problem to parallelise is one where you know the domain in advance on which you attempt to calculate the function values or its aggregation, like integral or stochastic average. Another set of problems parallel computing can tackle is when the Domain elements are generated by some function learnt from the previous evaluation history. For that, I have written a small package TaskMaster.jl,
inspired by my colleague's work on the Python adaptive package, which I plan to discuss in another post.
Domain -> f -> Values
Domain -> f -> Sum(Values)
Before one can use the parallel computing abilities of the code, we need to discuss how to make the code parallel with the abstractions offered. Fortunately, there is no difference in the code whether a cluster or a local machine runs it. On the latter, we can start Julia in a command line with two worker processes as simple as julia -p 2
.
With such a command, REPL (an interactive compiler shell) would open on the master process, so, in the end, we end up with 3 Julia processes. From the REPL, we can run the function evaluation:
using Distributed # The package containing abstractions
@everywhere f(x) = x^2 #defines function on all slaves
res = pmap(f, 1:10) #runs the function splitting the work evenly between workers
As one can see, parallelism took us only three lines of code. To see that the code has been executed on all cores (if you are suspicious ;), you can add a print statement in the function and see that the work has been split.
In most cases, pmap
is enough. However, sometimes we need more. Let's say we would like to evaluate a numerical derivative of one expensive function at a few points, and let's say the number of points is more minor than the available cores we can get.
On the first try, one would do the following:
using Distributed
@everywhere f(x) = x^2
h = 0.1
y0 = pmap(f, 1:10)
y1 = pmap(f, 1:10 + h)
derivative = (y1-y0)./h
However, in this particular situation, that would mean that some workers would be idle. To overcome that, we can start a map asynchronously which works well with parallel processing.
The @async
macro makes the line run parallel to a master process with all other commands. Whereas @sync
macro determines which processes we must wait to finish before proceeding further. Thus to resolve the shortcoming of the previous code, we could modify it as follows:
using Distributed
@everywhere f(x) = x^2
h = 0.1
@sync begin
@async y0 = pmap(f, 1:10)
@async y1 = pmap (f, 1:10 + h)
end
derivative = (y1-y0)./h
A surprising aspect of the code above is that both pmap
functions would not try to chase the same workers, leaving some idle. That is achieved by keeping track of the previously used worker with a function Distributed.nextproc()
.
So far, we have yet to consider how we can allocate a task to a particular worker. For that command, @sapwnat
is designed (there is also @spawn
macro which uses worker given by Distributed.nextproc()
. To execute a function call on the second worker, one does the following:
using Distributed
@everywhere f(x) = x^2
ref = @spawnat 2 f(2) # returns a reference
isready(ref) # would return true if the task finishes.
result = fetch(ref)
The @spawnat
returns a reference which allows to kill the task, check if it is already finished and fetch the result from the worker.
That is the minimum one needs to know to feel comfortable running calculations on multiple cores or even implementing your pmap
function implementation (I encourage looking at an example in Julia's manual). The next step is to learn how to use parallelised code on the cluster.
Now you know how to name your programs parallel, but at some point, local computer resources are needed. In such cases, you might look on of using. The cluster usually is a farm of servers joined together with fast network connections. If ssh access is possible to each node, it is relatively easy to start using it.
For example, assuming that Julia had been set up, then for two PCs with ssh access, "user@A" and "user@B" can be used to initiate the cluster:
using Distributed
addprocs("user@A", 2) # creates two workers on machine A
addprocs("user@B", 3) # creates three workers on machine B
@everywhere f(x) = run(`hostname`)
pmap(f, 1:10)
Voila, that was all that we needed! Since such initialisation is relatively common, Julia offers a command line option for that julia --machinefile MachineFile
, where MachineFile
contains the list of hosts. That is reasonably convenient to initialise Julia in the Cluster environment.
The one thing which every cluster is going to have is a job scheduler - a program which stands in your way before you can ssh to the nodes (unless you own the cluster). A job scheduler delegates resources according to the cluster's owner policies. Those define how many computing resources you can use and how long you need to wait for them. Although it is a simple task, there is no universal standard for scheduling jobs; to list a few, we have PBS, Torque, SLURM, etc. Fortunately, the basics should apply to any cluster.
I am considering the PBS scheduling system. First, let's try to run a useful test script to see how stuff works:
!#/usr/bin/bash
cat $MachinFile >> test.log
To make it executable, we first give permission for this file to be executable by chmod +x ./test
(assuming the test
is the filename). At that point, we can schedule this script to be run on the cluster:
qsub -l nodes=2:ppn=2 ./test
That tells the scheduler to run the job on two different nodes with two cores on each of them. To see the job status, we can run qstat
in the terminal. That also allows us to see how occupied the cluster is with the jobs.
The second argument in a qsub
is a resource list. There one can say what kind of nodes one wants. For example, one can specify nodes with the newest processors and those with GPU or TPUs. In the resource list, one also sets the maximum time a job will be run and how much memory a job can take. Moreover, which is also relevant to proprietary software users, one specifies the necessary licences (Mathematica, Matlab, etc.).
If the previous script successfully produced a test.log
file, we can take a small step to run Julia on the cluster. Instead of printing the nodes in the file test.log
, we only need to pass them to a Julia.
As an example, let's consider a Julia program code.jl
:
@everywhere f(x) = (println(x),x^2)
res = pmap(f,1:10)
# saving result
# ...
which calculates the squares of numbers 1 to 10 and prints interactively what argument function receives. The result is saved to a file code.log
.
To execute the code above on the cluster, we use a Julia machinefile option which makes execution as simple as
qsub --l:nodes=2:ppn=2 julia --machinefile=$MachineFile code.jl
If that works code.log
file is produced after the job has run. That is bare essentials to the usage of the cluster.
Sometimes things go wrong, and one needs to find out why. Issues could be numerous. The code has a bug for a cluster not configured correctly. One can use log files, qstat
command to see the standard output or interactive mode, and sometimes a debug node to find these issues.
Interactive mode allows interaction with the shell environment where the job is executed. Thus, it is the most powerful of them all. To use it, one submits an ordinary job except that in place of the file we pass -I
argument:
qsub --l:nodes=2:ppn=2 -I
That would tell the scheduler to open the shell on the master process when the job starts.
In the opened shell then, we can execute the desired script of ours:
julia --machinefile=$MachineFile code.jl
Or even initiate Julia process without file argument and then run it with include("code.jl")
. It is particularly useful when a code has a bug and needs a quick fix for saving the output; one can also use it to see the progress of the calculation to see how long it would take to finish. Although unproductive, I admittedly am guilty of doing and enjoying that.
Previously we used a master process delegated to us by the scheduler to initiate the cluster. That is only sometimes convenient. For example, one could wish to release allocated resources when work has finished on a particular worker. Also, the requirements for nodes and processors per node could become too bulky to schedule. Typically, scheduling multiple single-core jobs is more accessible, so some schedulers offer an array job option.
In contrast to an ordinary job, an array job collection of similar jobs is submitted on each core individually. To demonstrate that, we can write a script test
which could print out the hostname of where the script had been executed (again needs to be chmod +x test
).
#!/bin/bash
echo hostname >> test.log
To start an array job with a PBS scheduler, we give a - t
flag to qsub
. That tells the scheduler to execute the file on each allocated node or other words, literally to run n
single-core jobs. To schedule the script above, we can do the following:
qsub -l:nodes=2:ppn=2 -t ./test
In the resource list, we do not need to specify the nodes and processes per node (although that might still work).
However, while this array job was relatively easy to schedule, it could have been more useful since we needed to coordinate what computation each core job required. For that, the PBS scheduler made it possible to find the id (integer) from the execution environment to tell the script which computations to execute. That is a somewhat archaic way to do these things and requires thinking (which, as intelligent human beings, we want to avoid) for setting up calculations.
Fortunately, we can use Julia to manage array jobs with the fabulous ClusterManagers.jl
package. It gives us two options for initiating the cluster with array jobs. The first option is to start the TCP/IP server on the cluster login node and start array jobs which would start Julia processes and establish a connection with the master process by TCP/IP connection. The second option is to use a filesystem to connect the master and workers.
First, consider the TCP/IP connection between the master and workers. Before submitting an array job, we need to set up the listening process to which the workers connect. The second step is to submit an array job which initiates the workers, and then we need to wait until the scheduler puts it in the running state:
using Distributed
using ClusterManagers
ElasticManager(cookie="foobar")
# Initiate workers
run(`qsub -t "echo \"using ClusterManagers;
ClusterManagers.elastic_worker(\"foobar\",\"hpc05\",port=9000) > julia"`)
while nworkers()!=5
sleep(1)
end
pmap(x->x^2,1:10)
# To end the job
kill(workers())
The last step, after calculations, kills all workers and releases resources given by the scheduler.
Another possibility is to use the filesystem for communication with workers. For that ClusterManagers.jl offers a simple addprocs_pbs
function which would schedule jobs and initiate workers with this simple function at once.
using Distributed
using ClusterManagers
addprocs_pbs(5)
pmap(x->x^2,1:10)
Ultimately, one can exit the Julia process or remove all allocated workers to finish the job.
I extensively used communication through the filesystem for the last project because that was the only thing I knew. I found it annoyingly slow to initiate, so I started to think about better ways of initiating it, and that's how I eventually got these experiences on how to initiate the Cluster.
After reflecting on this blog post, Julia has excelled in giving excellent tools for parallel computing. Moreover, with this post, I have shown that it is relatively easy to start using the cluster. Nevertheless, the tools could be more comfortable.
Particularly the ClusterManagers.jl could make it equally easy to initiate the PBS cluster with TCP/IP connection as it is with the filesystem. Also, the scheduler does not always support array jobs; thus, it would be nice to optionally use the ordinary scheduling (giving nodes and processes per node).
Moreover, we could significantly improve how we interact with the cluster. We are accustomed to ssh into the cluster, sending our code there, scheduling the job and transferring the data. A better way, I envision, would be to create a macro which, from the user perspective, could work similarly to @spawn
and return a remote reference something like:
N = 100 # The @cluster macro would see that
@cluster hpc05 10 begin # .juliarc could host cluster information
using Distributed
using MyModule # Modules would be synchronised with the cluster automatically, as also the necessary source files.
res = pmap(x->(println(x),f(x)),N) # Text output would be redirected locally
end
res # All variables in the global scope of the cluster would be visible locally afterwards execution and possibly also during that.