Simple Network of Workstations for R on UIowa Beowulf

Luke Tierney
Department of Statistics and Actuarial Science
University of Iowa

The snow package

The snow (Simple Network of Workstations) package implements a simple mechanism for using a collection of workstations or a Beowulf cluster for ``embarrassingly parallel'' computations in R. The interface, which is based in part on the Python CoW (Cluster of Workstations) package, is intended to be quite simple, and is designed so that it can be implemented on top of several different lower level communication mechanisms. Three low level interfaces have been implemented, one using PVM via the rpvm package by Li and Rossini, one using MPI via the Rmpi package by Hao Yu, and one using raw sockets that may be useful if PVM and MPI are not available. This note describes how to start up a cluster, the basic functions for computing with a cluster, and an example of using the cluster for parallel bootstrapping.

Two notes of caution:

Starting and Stopping a Cluster

Starting a workstation cluster is the only step in using a cluster that is dependent on the underlying communication mechanism.

Starting a PVM Cluster

To start a PVM (Parallel Virtual Machine) cluster you first need to start PVM. I prefer to do this using the PVM console pvm:

<starting PVM using the pvm console>=
[luke@node00 ~]$ pvm
pvm> conf      
conf
1 host, 1 data format
                    HOST     DTID     ARCH   SPEED       DSIG
node00.beowulf.stat.uiowa.edu    40000  LINUX64    1000 0x00408c41
pvm> add node01 node02
add node01 node02
2 successful
                    HOST     DTID
                  node01    80000
                  node02    c0000
pvm> quit
quit

Console: exit handler called
pvmd still running.
[luke@node00 ~]$ 

You can also use a list of nodes in a file, say pvmhosts, and start the pvm console with pvm pvmhosts.

If you prefer, you can use xpvm to start PVM. xpvm also provides some nice monitoring tools. If you use xpvm you may want to also place a .xpvm_hosts file in your home directory on the master. My .xpvm_hosts file looks like

<.xpvm_hosts>=
node00
&node01
&node02
...
&node21

Once PVM is running, to use a cluster of two worker nodes you would start R on the master, load the snow package, and call makeCluster with argument 2, the number of worker nodes, to start the worker processes:

<starting a PVM cluster>=
library(snow)
cl <- makeCluster(2)

The returned value is a list of references to these two processes. You can also specify type="PVM" in the makeCluster call, but PVM is the default if PVM is available.

Once you are done using PVM remember to shut it down with the halt command in the console:

<stopping PVM using the pvm console>=
[luke@node00 ~]$ pvm
pvmd already running.
pvm> halt
halt
Terminated
[luke@node00 ~]$ 

Starting an MPI Cluster

Currently only LAM-MPI is supported for use with snow on our cluster. Support for MPICH or MPICH2 will be added eventually.

Avoid using ^C as I believe it will kill all your R processes once R is connected to MPI.

You can start LAM-MPI with

lamboot <hostfile>
A simple host file might look like

<simple lam host file>=
node00
node01
node02
node03

There are two approaches to running snow under LAM-MPI. One uses process spawning and is similar to the PVM approach. The other uses mpirun to set up the processes. The mpirun approach may work better with batch scheduling. To use process spawning, start R and run

<starting an MPI cluster with process spawning>=
library(snow)
cl<-makeCluster(type="MPI",3)

To use mpirun you use the shell script RMPISNOW to start R. This script should allow the same command line arguments as R; only the master process will be given these arguments. The number of process you specify to mpirun includes the master process, so -np 4 gives you a master and three workers:

<starting an MPI cluster with mpirun>=
mpirun -np 4 RMPISNOW

This starts up a master and three worker processes and loads snow into each. It uses the R_PROFILE variable to do this. In the master R process use

<getting the MPI cluster>=
cl <- getMPIcluster()

to get a reference to the running cluster of worker processes.

Once you are done with LAM-MPI remember to shut it down with lamhalt. If this has difficulty lamwipe may be useful. The man pages give more details.

Starting a Socket Cluster

To start a socket cluster you use makeCluster and specify type="SOCK" and a list of machines to use. For example, to create a socket cluster with two nodes you would start R on the master, load the snow package with

<starting a socket cluster>= [D->]
library(snow)

and evaluate

<starting a socket cluster>+= [<-D]
cl <- makeCluster(type="SOCK", c("node01", "node02"))

to create a socket cluster consisting of R processes running on the machines node01 and node02. The returned value is a list of references to these two processes.

Stopping a Cluster

To stop a cluster you should use

<stopping a cluster>=
stopCluster(cl)

Socket clusters should stop automatically when the process that created them terminates, but calling stopCluster is still a good idea. If you are using PVM you should also make sure that PVM is shut down as well by issuing the halt command in the text console or using the halt item in the xpvm File menu. Similarly, for LAM-MPI make sure to use lamhalt to shut down LAM-MPI.

Using the Cluster

Some Basic Functions

The basic functions for using a cluster are clusterCall and clusterApply. clusterCall calls a specified function with identical arguments on each member of the cluster and returns a list of the results. The calls are executed in parallel. For example, we can ask the nodes for their names and machine types:

<R session>= [D->]
> clusterCall(cl, function() Sys.info()[c("nodename","machine")])
[[1]]
                       nodename                         machine 
"node01.beowulf.stat.uiowa.edu"                        "x86_64" 

[[2]]
                       nodename                         machine 
"node02.beowulf.stat.uiowa.edu"                        "x86_64" 

[[3]]
                       nodename                         machine 
"node03.beowulf.stat.uiowa.edu"                        "x86_64" 

A useful variation of clusterCall is clusterEvalQ, defined as

<definition of clusterEvalQ>=
clusterEvalQ<-function(cl, expr)
    clusterCall(cl, eval, substitute(expr), env=.GlobalEnv)

This can be used, for example, to load a package on each cluster node with

<loading a library on all nodes>=
clusterEvalQ(cl, library(boot))

clusterApply is a version of lapply that evaluates each call on a different member of the cluster. It requires that the number of nodes in the cluster be at least as large as the number of elements in the list argument. A simple example:

<R session>+= [<-D->]
> clusterApply(cl, 1:3, get("+"), 3)
[[1]]
[1] 4

[[2]]
[1] 5

[[3]]
[1] 6

Random Number Generation

The default random number generators are likely to be quite correlated:

<R session>+= [<-D->]
> clusterCall(cl, runif, 3)
[[1]]
[1] 0.7176320 0.1036995 0.8112116

[[2]]
[1] 0.7176320 0.1036995 0.8112116

[[3]]
[1] 0.7176320 0.1036995 0.8112116

A quick and (very) dirty way of addressing this is random seeding, using something like

<random seeding of cluster generators>=
clusterApply(cl, runif(length(cl),max=10000000), set.seed)

A better approach is to use a parallel random number genarator package. Several are available. By default snow uses therlecuyer package, which provides an interface to a parallel random number generator package of L'Ecuyer, Simard, Chen, and Kelton. The function clusterSetupRNG handles the initialization. The default call with no additional arguments uses a random seed; named arguments described in the help page can be used for more control:

<R session>+= [<-D->]
> clusterSetupRNG(cl)
> clusterCall(cl, runif, 3)
[[1]]
[1] 0.1270111 0.3185276 0.3091860

[[2]]
[1] 0.7595819 0.9783106 0.6851358

[[3]]
[1] 0.7285098 0.9655873 0.9961841

A Bootstrap Example

The boot package includes an example using the data nuclear. The setup code, given in the boot help page, is

<bootstrap setup>=
library(boot)
#  In this example we show the use of boot in a prediction from 
#  regression based on the nuclear data.  This example is taken 
#  from Example 6.8 of Davison and Hinkley (1997).  Notice also 
#  that two extra arguments to statistic are passed through boot.
data(nuclear)
nuke <- nuclear[,c(1,2,5,7,8,10,11)]
nuke.lm <- glm(log(cost)~date+log(cap)+ne+ ct+log(cum.n)+pt, data=nuke)
nuke.diag <- glm.diag(nuke.lm)
nuke.res <- nuke.diag$res*nuke.diag$sd
nuke.res <- nuke.res-mean(nuke.res)

#  We set up a new dataframe with the data, the standardized 
#  residuals and the fitted values for use in the bootstrap.
nuke.data <- data.frame(nuke,resid=nuke.res,fit=fitted(nuke.lm))

#  Now we want a prediction of plant number 32 but at date 73.00
new.data <- data.frame(cost=1, date=73.00, cap=886, ne=0,
                       ct=0, cum.n=11, pt=1)
new.fit <- predict(nuke.lm, new.data)

nuke.fun <- function(dat, inds, i.pred, fit.pred, x.pred) {
     assign(".inds", inds, envir=.GlobalEnv)
     lm.b <- glm(fit+resid[.inds] ~date+log(cap)+ne+ct+
                 log(cum.n)+pt, data=dat)
     pred.b <- predict(lm.b,x.pred)
     remove(".inds", envir=.GlobalEnv)
     c(coef(lm.b), pred.b-(fit.pred+dat$resid[i.pred]))
}

Running just on the master the actual bootstrap example takes approximately 12 seconds:

<R session>+= [<-D->]
> system.time(nuke.boot <-
+             boot(nuke.data, nuke.fun, R=999, m=1,
+                  fit.pred=new.fit, x.pred=new.data))
   user  system elapsed 
 12.345   0.012  12.369 

Using a three-worker cluster is approximately three times as fast:

<R session>+= [<-D->]
> clusterSetupRNG(cl)
> clusterEvalQ(cl,library(boot))
> system.time(cl.nuke.boot <-
+             clusterCall(cl,boot,nuke.data, nuke.fun, R=500, m=1,
+                         fit.pred=new.fit, x.pred=new.data))
   user  system elapsed 
  0.010   0.001   3.793 

The value to use comparing speeds is the third one, the elapsed time.

Some Higher Level Functions

Simple higher level utilities can be defined in terms of the basic cluster functions. For example,

<higher level functions>= [D->]
parLapply <- function(cl, x, fun, ...)
    docall(c, clusterApply(cl, splitList(x, length(cl)), lapply, fun, ...))

defines a parallel version of Lapply; it uses splitList to split the list arguments approximately evenly across the cluster. An example, using a cluster of size 5:

<R session>+= [<-D->]
> x<-1:100/101
> system.time(qtukey(x, 2, df=2))
   user  system elapsed 
  3.656   0.001   3.663 
> system.time(unlist(parLapply(cl, x, qtukey, 2, df=2)))
   user  system elapsed 
  0.018   0.000   1.061 

The functions parCapply and parRapply apply a function to the columns or rows of a matrix and return the results:

<R session>+= [<-D->]
> A<-matrix(c(1,2,3,4,5,6),nrow=2)
> A
     [,1] [,2] [,3]
[1,]    1    3    5
[2,]    2    4    6
> parCapply(cl, A, sum)
[1]  3  7 11
> parRapply(cl, A, sum)
[1]  9 12

These functions are defined as

<higher level functions>+= [<-D->]
parRapply <- function(cl, x, fun, ...)
    docall(c, clusterApply(cl, splitRows(x,length(cl)), apply, 1, fun, ...))

parCapply <- function(cl, x, fun, ...)
    docall(c, clusterApply(cl, splitCols(x,length(cl)), apply, 2, fun, ...))

A very simple version of a parallel matrix multiplier is defined as

<higher level functions>+= [<-D]
parMM <- function(cl, A, B)
    docall(rbind,clusterApply(cl, splitRows(A, length(cl)), get("%*%"), B))

For small matrices the parallel version is slower due to the communication overhead, but for large ones the parallel version can do better:

<R session>+= [<-D]
> A<-matrix(rnorm(10000),100)
> system.time(A %*% A)
   user  system elapsed 
  0.003   0.000   0.003 
> system.time(parMM(cl,A , A))
   user  system elapsed 
  0.021   0.006   0.039 
> A<-matrix(rnorm(1000000),1000)
> system.time(A %*% A)
   user  system elapsed 
  4.415   0.013   4.432 
> system.time(parMM(cl,A , A))
   user  system elapsed 
  2.122   0.390   3.495 

Comments

Some points to keep in mind:

Some open issues: