[R-sig-hpc] ff and parallel computing (multiple nodes, shared disk)

Ramon Diaz-Uriarte rdiaz02 at gmail.com
Wed Nov 11 17:43:18 CET 2009


Dear All,

I am considering using ff for processing medium-sized data sets in a
computer cluster (distributed memory), with shared disk space. The
presentation by the ff authors in

http://ff.r-forge.r-project.org/ff&bit_UseR!2009.pdf

shows examples of using ff with snowfall on a multicore machine but
not over several machines.

I have been playing with ff and it seems possible to read ff and ffdf
objects over several nodes. It seems it is also possible to safely put
together an ffdf object which has been created by columns in different
nodes. I am pasting code below (you'd only need to replace the name of
the shared directory).


I'd appreciate comments, suggestions, and alternatives (on P.S.2 I
provide some further details on intended usage and other alternatives
I've looked at).

Best,

R.

P.S. I am CC'ing Jens Oehlschlaegel, the maintainer of ff, since I do
not know if he reads this list.

P.S.2. The data, in a single RData, can take from 500 MB to 4 GB,
which is more than any one process should use. I plan to use between
20 and 30 nodes, each with two dual-core Opteron CPUs.

I do not think a DB solution is warranted here: the original data
(floats) are in a flat text file, where columns are samples, and we do
the processing by sample (or in chunks withing sample). The data are
used twice: first for the analysis and then for the plotting (which
uses the original data + the output from analysis).  And we are done.
Almost always, processing is carried out non-interactively.

Up to now I've been using papply, but I am running into large memory
consumption because of the creation of intermediate lists. Moreover,
I'd rather avoid sending a lot of data via MPI (our network is less
than ideal) ---yes, sure, the shared space also needs to use a switch.
Finally, if intermediate results are stored in the disk, I can recover
from network errors and MPI crashes.

One approach would be to break down the original data set in
individual "RData" files, and load each one as needed in the slaves.
But this seems like reinventing the wheel, I doubt I'd write efficient
code, it clutters the disk, and "load"ing adds time. I also looked at
the package "BufferedMatrix" (in BioC), which seemd ideal, but
currently "you can not save and reload a BufferedMatrix between R
sessions", which I think excludes it for my intended usage.

*********************************************************

## LAM-MPI universe booted, etc.
## 4 nodes, each in a different machine


library(ff)
library(snowfall)

## Names for dirs and files
mydir <- "/http/tmp"
fn1 <- "t1"
fn2 <- "t2"
fn3 <- "t3"

fullname1 <- paste(mydir, fn1, sep = "/")
fullname2 <- paste(mydir, fn2, sep = "/")
fullname3 <- paste(mydir, fn3, sep = "/")


sfInit(parallel = TRUE, cpus = 4, type = "MPI")
sfLibrary(ff)
sfClusterSetupRNG(type = "SPRNG")
sfExport("mydir")
setwd(mydir)
sfClusterEval(setwd(mydir))

## To know where things happen
sfClusterApply(1:4, function(i) assign("nodenum", i, env = .GlobalEnv))



##   Getting values of ff object in slaves
x1 <- ff(1:60, dim = c(4, 15), filename = fullname1)
close(x1)
save(file = "x1.RData", x1)

## Acessing values; read-only
sfClusterEval(load("x1.RData"))
sfClusterEval(open(x1, readonly = TRUE))
sfClusterApplyLB(1:15, function(i) {list(nodenum = nodenum, values = x1[, i])})
sfClusterEval(close(x1))



##### I think it is OK to change in place if stored as ffdf:
### each column is a different file

## a) Create ffdf in master, send to nodes, modify there

## I use pattern here, and not latter, to avoid 'Invalid cross-device link'
## if R binary and filename are in different disks

x2 <- ffdf(c1 = ff(length = 4, vmode = "double", pattern = fullname2),
           c2 = ff(length = 4, vmode = "double", pattern = fullname2),
           c3 = ff(length = 4, vmode = "double", pattern = fullname2),
           c4 = ff(length = 4, vmode = "double", pattern = fullname2),
           c5 = ff(length = 4, vmode = "double", pattern = fullname2),
           c4 = ff(length = 4, vmode = "double", pattern = fullname2),
           c7 = ff(length = 4, vmode = "double", pattern = fullname2),
           c8 = ff(length = 4, vmode = "double", pattern = fullname2),
           c9 = ff(length = 4, vmode = "double", pattern = fullname2),
           c10 = ff(length = 4, vmode = "double", pattern = fullname2),
           c11 = ff(length = 4, vmode = "double", pattern = fullname2),
           c12 = ff(length = 4, vmode = "double", pattern = fullname2),
           c13 = ff(length = 4, vmode = "double", pattern = fullname2),
           c14 = ff(length = 4, vmode = "double", pattern = fullname2),
           c15 = ff(length = 4, vmode = "double", pattern = fullname2))

close(x2)
save(file = "x2.RData", x2)

sfClusterEval(load("x2.RData"))
sfClusterEval(open(x2))
## The following is, of course, non-deterministic. Just to see it
## working.
sfClusterApplyLB(1:15, function(i) x2[, i] <- rep(nodenum, 4))
sfClusterEval(close(x2))

rm(x2)
load("x2.RData")
open(x2)
x2[, ] ## changes are stored



## b) Create ff object in nodes, and put together as ffdf in master
##    Note that we pass the object back as the return of sfClusterApplyLB
##    In contrast, in a) we do not pass anything back from
##    sfClusterApplyLB, but leave it in the disk.

sfExport("fullname3")

createAndClose <- function(i) {
  nameobj <- paste("ffpiece", i, sep = "")
  assign(nameobj,
         ff(rep(nodenum, 4), pattern = fullname3))
  close(get(nameobj))
  return(get(nameobj))
}


list.ff <- sfClusterApplyLB(1:15, createAndClose)

## put together ffdf
## x3 <- ffdf(list.ff[[1]], list.ff[[2]], list.ff[[3]]) ## etc ...

eval(parse(text = paste("x3 <- ffdf(", paste("list.ff[[", 1:15, "]]",
             sep = "", collapse = ", "), ")")))

open(x3)
x3






-- 
Ramon Diaz-Uriarte
Structural Biology and Biocomputing Programme
Spanish National Cancer Centre (CNIO)
http://ligarto.org/rdiaz
Phone: +34-91-732-8000 ext. 3019



More information about the R-sig-hpc mailing list