[R-sig-hpc] ff and parallel computing (multiple nodes, shared disk)

Benilton Carvalho bcarvalh at jhsph.edu
Thu Nov 12 19:29:34 CET 2009


Ramon,

I've been investigating solutions for something similar.

I wrote my own code to use NetCDF, which doesn't perform well when I  
need random access to the data.

I while back, I also tried something using SQLite, which didn't work  
well either, but I fail to remember the specifics.

I'm currently "playing" with ff and bigmemory. The limitations I  
observed were that ff objects have max length limited to .Machine 
$integer.max, 2^31-1, and that, currently, bigmemory does not behave  
as expected on my Leopard machine when I run "R --arch x86_64" and  
create a huge object.

About the 2^31-1 limit: each column of datasets I handle are often  
6.5e6 elements long; therefore, I'd have "only" 330 columns per ff  
object (my situation requires me to be ready for thousands columns).  
This would require me to create several ff objects, which is not  
currently an option.

So, let me know if you come up with a compromise and I'll let you know  
if I come across anything relevant.

Cheers,

b

On Nov 11, 2009, at 2:43 PM, Ramon Diaz-Uriarte wrote:

> Dear All,
>
> I am considering using ff for processing medium-sized data sets in a
> computer cluster (distributed memory), with shared disk space. The
> presentation by the ff authors in
>
> http://ff.r-forge.r-project.org/ff&bit_UseR!2009.pdf
>
> shows examples of using ff with snowfall on a multicore machine but
> not over several machines.
>
> I have been playing with ff and it seems possible to read ff and ffdf
> objects over several nodes. It seems it is also possible to safely put
> together an ffdf object which has been created by columns in different
> nodes. I am pasting code below (you'd only need to replace the name of
> the shared directory).
>
>
> I'd appreciate comments, suggestions, and alternatives (on P.S.2 I
> provide some further details on intended usage and other alternatives
> I've looked at).
>
> Best,
>
> R.
>
> P.S. I am CC'ing Jens Oehlschlaegel, the maintainer of ff, since I do
> not know if he reads this list.
>
> P.S.2. The data, in a single RData, can take from 500 MB to 4 GB,
> which is more than any one process should use. I plan to use between
> 20 and 30 nodes, each with two dual-core Opteron CPUs.
>
> I do not think a DB solution is warranted here: the original data
> (floats) are in a flat text file, where columns are samples, and we do
> the processing by sample (or in chunks withing sample). The data are
> used twice: first for the analysis and then for the plotting (which
> uses the original data + the output from analysis).  And we are done.
> Almost always, processing is carried out non-interactively.
>
> Up to now I've been using papply, but I am running into large memory
> consumption because of the creation of intermediate lists. Moreover,
> I'd rather avoid sending a lot of data via MPI (our network is less
> than ideal) ---yes, sure, the shared space also needs to use a switch.
> Finally, if intermediate results are stored in the disk, I can recover
> from network errors and MPI crashes.
>
> One approach would be to break down the original data set in
> individual "RData" files, and load each one as needed in the slaves.
> But this seems like reinventing the wheel, I doubt I'd write efficient
> code, it clutters the disk, and "load"ing adds time. I also looked at
> the package "BufferedMatrix" (in BioC), which seemd ideal, but
> currently "you can not save and reload a BufferedMatrix between R
> sessions", which I think excludes it for my intended usage.
>
> *********************************************************
>
> ## LAM-MPI universe booted, etc.
> ## 4 nodes, each in a different machine
>
>
> library(ff)
> library(snowfall)
>
> ## Names for dirs and files
> mydir <- "/http/tmp"
> fn1 <- "t1"
> fn2 <- "t2"
> fn3 <- "t3"
>
> fullname1 <- paste(mydir, fn1, sep = "/")
> fullname2 <- paste(mydir, fn2, sep = "/")
> fullname3 <- paste(mydir, fn3, sep = "/")
>
>
> sfInit(parallel = TRUE, cpus = 4, type = "MPI")
> sfLibrary(ff)
> sfClusterSetupRNG(type = "SPRNG")
> sfExport("mydir")
> setwd(mydir)
> sfClusterEval(setwd(mydir))
>
> ## To know where things happen
> sfClusterApply(1:4, function(i) assign("nodenum", i, env  
> = .GlobalEnv))
>
>
>
> ##   Getting values of ff object in slaves
> x1 <- ff(1:60, dim = c(4, 15), filename = fullname1)
> close(x1)
> save(file = "x1.RData", x1)
>
> ## Acessing values; read-only
> sfClusterEval(load("x1.RData"))
> sfClusterEval(open(x1, readonly = TRUE))
> sfClusterApplyLB(1:15, function(i) {list(nodenum = nodenum, values =  
> x1[, i])})
> sfClusterEval(close(x1))
>
>
>
> ##### I think it is OK to change in place if stored as ffdf:
> ### each column is a different file
>
> ## a) Create ffdf in master, send to nodes, modify there
>
> ## I use pattern here, and not latter, to avoid 'Invalid cross- 
> device link'
> ## if R binary and filename are in different disks
>
> x2 <- ffdf(c1 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c2 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c3 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c4 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c5 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c4 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c7 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c8 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c9 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c10 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c11 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c12 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c13 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c14 = ff(length = 4, vmode = "double", pattern = fullname2),
>           c15 = ff(length = 4, vmode = "double", pattern = fullname2))
>
> close(x2)
> save(file = "x2.RData", x2)
>
> sfClusterEval(load("x2.RData"))
> sfClusterEval(open(x2))
> ## The following is, of course, non-deterministic. Just to see it
> ## working.
> sfClusterApplyLB(1:15, function(i) x2[, i] <- rep(nodenum, 4))
> sfClusterEval(close(x2))
>
> rm(x2)
> load("x2.RData")
> open(x2)
> x2[, ] ## changes are stored
>
>
>
> ## b) Create ff object in nodes, and put together as ffdf in master
> ##    Note that we pass the object back as the return of  
> sfClusterApplyLB
> ##    In contrast, in a) we do not pass anything back from
> ##    sfClusterApplyLB, but leave it in the disk.
>
> sfExport("fullname3")
>
> createAndClose <- function(i) {
>  nameobj <- paste("ffpiece", i, sep = "")
>  assign(nameobj,
>         ff(rep(nodenum, 4), pattern = fullname3))
>  close(get(nameobj))
>  return(get(nameobj))
> }
>
>
> list.ff <- sfClusterApplyLB(1:15, createAndClose)
>
> ## put together ffdf
> ## x3 <- ffdf(list.ff[[1]], list.ff[[2]], list.ff[[3]]) ## etc ...
>
> eval(parse(text = paste("x3 <- ffdf(", paste("list.ff[[", 1:15, "]]",
>             sep = "", collapse = ", "), ")")))
>
> open(x3)
> x3
>
>
>
>
>
>
> --
> Ramon Diaz-Uriarte
> Structural Biology and Biocomputing Programme
> Spanish National Cancer Centre (CNIO)
> http://ligarto.org/rdiaz
> Phone: +34-91-732-8000 ext. 3019
>
> _______________________________________________
> R-sig-hpc mailing list
> R-sig-hpc at r-project.org
> https://stat.ethz.ch/mailman/listinfo/r-sig-hpc



More information about the R-sig-hpc mailing list