Julia for Cluster

 2019-02-25
about me     cv     research     blog    

Disclaimer: some code and commands had not been tested.

A time once comes that computation is too expensive to run. If you are lucky, you can adjust the parameters of the numerical method or find a better one. If that is not the case, you are now looking at what the phrase parallel computing promises.

Often the numerical tasks can be reduced to actor-model meaning that there is a single master giving orders to the computing slaves. Most programming languages give many tools for these capabilities however the way and complexity can make us alienated to use them. I believe this is another strength of Julia which shows us a better and more elegant way and about that is this blog post.

Minimal set of Julia tools for parallel computing

The most straightforward problem to parallelise is one where you do know domain in advance on which you attempt to calculate the function values or its aggregation like integral or stochastic average. Another set of problems parallel computing can tackle is when the Domain elements are generated by some function which is learnt from the previous evaluation history. For that, I have written a small package TaskMaster.jl inspired by my colleagues work on the python adaptive package which I plan to discuss in another post.

Domain -> f -> Values
Domain -> f -> Sum(Values)

Before one can use parallel computing abilities of the code, we need to discuss how to make the code parallel with the abstractions offered. Fortunately, there exists no difference in the code either it is being run by a cluster or by a local machine. On the later we can start Julia in a command line with two worker processes as simple as julia -p 2.

With such command REPL (an interactive compiler shell) would open on the master process so, in the end, we end up with 3 Julia processes. From the REPL we can run the simplest function evaluation

using Distributed # The package containing abstractions
@everywhere f(x) = x^2 #defines function on all slaves
res = pmap(f, 1:10) #runs the function splitting the work evenly between workers

As one can see parallelism took us only 3 lines of code. To see that the code had been executed on all cores (if you are suspicious ;) you can add a print statement in the function and see that the work indeed had been split.

In most cases, pmap is enough. However, sometimes we need a little bit more. Let's say we would like to evaluate a numerical derivative of one expensive function at some few points and let's say the number of points is smaller than available cores we can get.

As the first try, one would do

using Distributed
@everywhere f(x) = x^2
h = 0.1
y0 = pmap(f, 1:10)
y1 = pmap(f, 1:10 + h)
derivative = (y1-y0)./h

However in this particular situation that would mean that some workers would be idle. To overcome that we can start a map asynchronously which works well with parallel processing.

The @async macro makes the line to run in parallel with a master process with all other commands. Whereas @sync macro determines which processes we need to wait to finish before proceeding further. Thus to resolve the shortcoming of the previous code we could modify it as:

using Distributed
@everywhere f(x) = x^2
h = 0.1
@sync begin
    @async y0 =pmap(f, 1:10)
    @async y1 = pmap (f, 1:10 + h)
end
derivative = (y1-y0)./h

A surprising aspect about the code above is that both pmap functions would not try to chase the same workers so leaving some in the meantime idle. That is achieved by keeping track of the previously used worker with a function Distributed.nextproc().

So far we had not yet considered a point on how we can allocate a task to a particular worker. For that command, @sapwnat is designed (there is also @spawn macro which just uses worker given by Distributed.nextproc(). To execute a function call on the second worker one does:

using Distributed
@everywhere f(x) = x^2
ref = @spawnat 2 f(2) # returns a reference
isready(ref) # would return true if the task finishes.
result = fetch(ref)

The @spawnat returns a reference which allows to kill the task, check if it is already finished and fetch the result from the worker.

That, in my opinion, is the bare minimum one needs to know for feeling comfortable running calculation on multiple cores or even to implement your own pmap function (I encourage to look on an example in Julia manual). The next step is to learn about how to use a parallelised code on the cluster.

The cluster

Now you know how to name your programs parallel, but at some point resources of the local computer are not enough. In such cases, you might look on of using. The cluster which usually is a farm of servers joined together with fast network connections. If ssh access is possible to each of the nodes, then it is rather easy to start using it.

For example, assuming that Julia had been set up, then for two PCs with ssh access "user@A" and "user@B" can be used to initiate the cluster:

using Distributed
addprocs("user@A", 2) # creates two workers on machine A
addprocs("user@B", 3) # creates three workers on machine B
@everywhere f(x) = run(`hostname`)
pmap(f, 1:10)

Voila, that was all that we needed! Since such initialisation is a rather common thing, Julia offers command line option for that julia --machinefile MachineFile, where MachineFile contains the list of hosts. That is reasonably convenient to initialise Julia in the Cluster environment.

Scheduling

The one thing which every cluster is going to have is a job scheduler - a program which probably stands in your way before you can ssh to the nodes (unless you own the cluster ;). A job scheduler delegates resources according to the cluster's owner policies. Those defines how many computing resources you can use and how long you need to wait for them. Although it is such a simple task, there is no universal standard for scheduling jobs, to list few we have PBS, Torque, SLURM, etc. Fortunately, the basics should apply to any cluster.

I am considering the PBS scheduling system. First, let's try to run a useful test script to see how stuff works:

!#/usr/bin/bash
cat $MachinFile >> test.log

To make it executable, we first give permission for this file to be executable by chmod +x ./test (assuming test to be the filename). At that point we can schedule this script to be run on the cluster:

qsub -l nodes=2:ppn=2 ./test

That tells the scheduler to run the job on two different nodes with two cores on each of them. To see the status of the job we can run qstat in the terminal. That also allows us to see how occupied cluster is with the jobs.

The second argument in a qsub is a resource list. There one can say what kind of nodes one wants. For example, one can specify nodes which have the newest processors, ones which do have a GPU or TPUs. In the resource list, one also specifies what is the maximum time a job is going to be run and how much memory a job can take. Moreover, also which is relevant proprietary software users, one specifies licences needed (Mathematica, Matlab, etc.).

Running Julia

If the previous script was successful in producing a test.log file, then we can take a small step to run Julia on the cluster. Instead of printing the nodes in the file test.log we would only need to pass them to a Julia.

As an example let's consider a Julia program code.jl:

@everywhere f(x) = (println(x),x^2)
res = pmap(f,1:10)

# saving result
# ...

which calculates the squares of numbers 1 to 10 and prints interactively what argument function receives. The result is saved to a file code.log.

To execute the code above on the cluster, we use a Julia machinefile option which makes execution as simple as

qsub --l:nodes=2:ppn=2 julia --machinefile=$MachineFile code.jl

If that works code.log file is produced after the job had run. That is bare essentials to the usage of the cluster.

Interactive mode

Sometimes things go wrong, and one needs to find out why. Issues could be numerous. From that, the code has a bug up to a cluster not being configured correctly. To find these issues on can use log files, qstat command to see standard output or use interactive mode sometimes could be called a debug node.

Interactive mode allows interacting with the shell environment where the job is being executed. Thus I find it the most powerful of them all. To use it, one submits an ordinary job except that in place of the file we pass -I argument:

qsub --l:nodes=2:ppn=2 -I

That would say to the scheduler to open the shell on the master process when the job had been started.

In the opened shell then we can execute the desired script of ours:

julia --machinefile=$MachineFile code.jl

or even initiate Julia process without file argument and then run it with include("code.jl"). That I find particularly useful in the case when a code has a bug and needs a quick fix of for saving the output. One can also use it to see the progress of the calculation to see how long it would take to finish. Although unproductive activity I admittedly am guilty of doing and in enjoying that ;).

Array jobs

Previously we were using a master process delegated to us by the scheduler to initiate the cluster. That is not always convenient. For example, one could wish to release allocated resources at the moment work had finished on a particular worker. Also, the requirements for nodes and processors per nodes could become too bulky to schedule. Typically, it is easier to schedule multiple single core jobs, and that is why some schedulers offer an array job option.

In contrast to an ordinary job, an array job collection of similar jobs submitted on each core individually. To demonstrate that we can write a script test which could print out the hostname of where the script had been executed (again needs to be chmod +x test).

#!/bin/bash
echo hostname >> test.log

To start an array job with a PBS scheduler we give a - t flag to qsub. That tells the scheduler to execute the file on each allocated node or on other words literally to run n single core jobs. To execute the script above, we can do:

qsub -l:nodes=2:ppn=2 -t ./test

In the resource list, we do not need to specify the nodes and processes per nodes (although that might still work).

However, while this array job was quite easy to schedule, it is often useless since we did not coordinate what computation each single core job needs to do. For that PBS scheduler had made it possible to find out id (integer) from the execution environment to tell the script which computations to execute. That is, however, a somewhat archaic way to do these things and requires thinking (which as intelligent human beings we want to avoid) for setting up calculations.

ClusterManagers.jl

Fortunately, we can use Julia to manage array jobs for us with fabulous ClusterManagers.jl package. It gives us two options on how to initiate the cluster with array jobs. The first option is to start the TCP/IP server on the cluster login node and start array jobs which would start Julia processes and in that establish connection with the master process by TCP/IP connection. The second option is to use a filesystem to establish the connection between master and workers.

First, let's consider the TCP/IP connection between master and workers. Before submitting an array job, we need to set up the listening process to which the workers connect. The second step is to submit an array job which initiates the workers, and then we need to wait until scheduler puts in the running state

using Distributed
using ClusterManagers
ElasticManager(cookie="foobar")
# Initiate workers

run(`qsub -t "echo \"using ClusterManagers;
ClusterManagers.elastic_worker(\"foobar\",\"hpc05\",port=9000) > julia"`)

while nworkers()!=5
    sleep(1)
end

pmap(x->x^2,1:10)

# To end the job 

kill(workers())

The last step after calculations kills all workers, and so releasing resources given by the scheduler.

Another possibility is to use filesystem for communication with workers. For that ClusterManagers.jl offers a simple addprocs_pbs function which would schedule job and initiate workers with this simple function at once.

using Distributed
using ClusterManagers
addprocs_pbs(5)
pmap(x->x^2,1:10)

At the end to finish the job, one can exit the Julia process or remove all allocated workers.

For the last project, I extensively used communication through filesystem because that was the only thing I knew. I did find it however annoyingly slow to initiate, so I started to think about better ways of initiating it, and that's how eventually I got these experiences on how to initiate the Cluster.

Conclusion

After reflecting with this blog post, I think Julia has excelled with giving excellent tools for parallel computing. Moreover, with this post, I think I have accomplished the goal of showing that it is rather easy to start using the cluster. Nevertheless, the tools could be more comfortable.

Particularly the ClusterManagers.jl could make it equally easy to initiate the PBS cluster with TCP/IP connection as it is with the filesystem. Also not always array jobs are supported by the scheduler; thus it would be nice to optionally use the ordinary scheduling (giving nodes and processes per nodes).

Moreover, I think there could be a significant improvement in how we interact with the cluster. At the moment we are accustomed to ssh into the cluster, send our code there, schedule the job and transfer the data back. A better way I envision would be to create a macro which from the user perspective could work similarly as @spawn and return a remote reference. Something like

N = 100 # The @cluster macro would see that

@cluster hpc05 10 begin # .juliarc could host cluster information
    using Distributed
    using MyModule # Modules would be synchronised with cluster automatically as also the necessary source files.
    res = pmap(x->(println(x),f(x)),N) # Text output would be redirected locally
end

res # All variables in the global scope of the cluster would be visible locally afterwards execution and possibly also during that.

This website was made with Skeleton with modificactions from JuliaDiff.