[R-sig-hpc] ff and parallel computing (multiple nodes, shared disk)

Jeff Ryan jeff.a.ryan at gmail.com
Thu Nov 12 20:40:28 CET 2009


Not that this is by any means a pre-rolled solution, but I have a
interface to BerkeleyDB on CRAN now called RBerkeley:

http://cran.r-project.org/web/packages/RBerkeley/index.html

In spite of its age (wisdom :) ) ... BDB might offer a possible
partial solution.

Across nodes gets tricky though, as there is not pre-built server per
se, as it is really a library.

HTH,
Jeff


On Thu, Nov 12, 2009 at 12:51 PM, Ramon Diaz-Uriarte <rdiaz02 at gmail.com> wrote:
> Dear Benilton,
>
> Thanks for your comments.
>
> On Thu, Nov 12, 2009 at 7:29 PM, Benilton Carvalho <bcarvalh at jhsph.edu> wrote:
>> Ramon,
>>
>> I've been investigating solutions for something similar.
>>
>> I wrote my own code to use NetCDF, which doesn't perform well when I need
>> random access to the data.
>>
>> I while back, I also tried something using SQLite, which didn't work well
>> either, but I fail to remember the specifics.
>>
>
> OK, I won't go through those paths, then.
>
>
>> I'm currently "playing" with ff and bigmemory. The limitations I observed
>> were that ff objects have max length limited to .Machine$integer.max,
>> 2^31-1, and that, currently, bigmemory does not behave as expected on my
>> Leopard machine when I run "R --arch x86_64" and create a huge object.
>>
>
> bigmemory is really neat, but does not fit what I am trying to do: it
> would not work if I want to acess the very same elements across
> machines.
>
>> About the 2^31-1 limit: each column of datasets I handle are often 6.5e6
>> elements long; therefore, I'd have "only" 330 columns per ff object (my
>> situation requires me to be ready for thousands columns). This would require
>> me to create several ff objects, which is not currently an option.
>>
>
> The limits are so far OK for me. In addition, I could also use an ffdf
> object, which, if I undersand, could be used to overcome those limits
> (as you can have an arbitrary number of ff components).
>
> Anyway, thanks for pointing that out. I need to make sure I do not
> accidentally cross the size limits of ff objects.
>
>
>> So, let me know if you come up with a compromise and I'll let you know if I
>> come across anything relevant.
>>
>
> I'll do.
>
> Best,
>
> R.
>
>> Cheers,
>>
>> b
>>
>> On Nov 11, 2009, at 2:43 PM, Ramon Diaz-Uriarte wrote:
>>
>>> Dear All,
>>>
>>> I am considering using ff for processing medium-sized data sets in a
>>> computer cluster (distributed memory), with shared disk space. The
>>> presentation by the ff authors in
>>>
>>> http://ff.r-forge.r-project.org/ff&bit_UseR!2009.pdf
>>>
>>> shows examples of using ff with snowfall on a multicore machine but
>>> not over several machines.
>>>
>>> I have been playing with ff and it seems possible to read ff and ffdf
>>> objects over several nodes. It seems it is also possible to safely put
>>> together an ffdf object which has been created by columns in different
>>> nodes. I am pasting code below (you'd only need to replace the name of
>>> the shared directory).
>>>
>>>
>>> I'd appreciate comments, suggestions, and alternatives (on P.S.2 I
>>> provide some further details on intended usage and other alternatives
>>> I've looked at).
>>>
>>> Best,
>>>
>>> R.
>>>
>>> P.S. I am CC'ing Jens Oehlschlaegel, the maintainer of ff, since I do
>>> not know if he reads this list.
>>>
>>> P.S.2. The data, in a single RData, can take from 500 MB to 4 GB,
>>> which is more than any one process should use. I plan to use between
>>> 20 and 30 nodes, each with two dual-core Opteron CPUs.
>>>
>>> I do not think a DB solution is warranted here: the original data
>>> (floats) are in a flat text file, where columns are samples, and we do
>>> the processing by sample (or in chunks withing sample). The data are
>>> used twice: first for the analysis and then for the plotting (which
>>> uses the original data + the output from analysis).  And we are done.
>>> Almost always, processing is carried out non-interactively.
>>>
>>> Up to now I've been using papply, but I am running into large memory
>>> consumption because of the creation of intermediate lists. Moreover,
>>> I'd rather avoid sending a lot of data via MPI (our network is less
>>> than ideal) ---yes, sure, the shared space also needs to use a switch.
>>> Finally, if intermediate results are stored in the disk, I can recover
>>> from network errors and MPI crashes.
>>>
>>> One approach would be to break down the original data set in
>>> individual "RData" files, and load each one as needed in the slaves.
>>> But this seems like reinventing the wheel, I doubt I'd write efficient
>>> code, it clutters the disk, and "load"ing adds time. I also looked at
>>> the package "BufferedMatrix" (in BioC), which seemd ideal, but
>>> currently "you can not save and reload a BufferedMatrix between R
>>> sessions", which I think excludes it for my intended usage.
>>>
>>> *********************************************************
>>>
>>> ## LAM-MPI universe booted, etc.
>>> ## 4 nodes, each in a different machine
>>>
>>>
>>> library(ff)
>>> library(snowfall)
>>>
>>> ## Names for dirs and files
>>> mydir <- "/http/tmp"
>>> fn1 <- "t1"
>>> fn2 <- "t2"
>>> fn3 <- "t3"
>>>
>>> fullname1 <- paste(mydir, fn1, sep = "/")
>>> fullname2 <- paste(mydir, fn2, sep = "/")
>>> fullname3 <- paste(mydir, fn3, sep = "/")
>>>
>>>
>>> sfInit(parallel = TRUE, cpus = 4, type = "MPI")
>>> sfLibrary(ff)
>>> sfClusterSetupRNG(type = "SPRNG")
>>> sfExport("mydir")
>>> setwd(mydir)
>>> sfClusterEval(setwd(mydir))
>>>
>>> ## To know where things happen
>>> sfClusterApply(1:4, function(i) assign("nodenum", i, env = .GlobalEnv))
>>>
>>>
>>>
>>> ##   Getting values of ff object in slaves
>>> x1 <- ff(1:60, dim = c(4, 15), filename = fullname1)
>>> close(x1)
>>> save(file = "x1.RData", x1)
>>>
>>> ## Acessing values; read-only
>>> sfClusterEval(load("x1.RData"))
>>> sfClusterEval(open(x1, readonly = TRUE))
>>> sfClusterApplyLB(1:15, function(i) {list(nodenum = nodenum, values = x1[,
>>> i])})
>>> sfClusterEval(close(x1))
>>>
>>>
>>>
>>> ##### I think it is OK to change in place if stored as ffdf:
>>> ### each column is a different file
>>>
>>> ## a) Create ffdf in master, send to nodes, modify there
>>>
>>> ## I use pattern here, and not latter, to avoid 'Invalid cross-device
>>> link'
>>> ## if R binary and filename are in different disks
>>>
>>> x2 <- ffdf(c1 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c2 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c3 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c4 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c5 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c4 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c7 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c8 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c9 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c10 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c11 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c12 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c13 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c14 = ff(length = 4, vmode = "double", pattern = fullname2),
>>>          c15 = ff(length = 4, vmode = "double", pattern = fullname2))
>>>
>>> close(x2)
>>> save(file = "x2.RData", x2)
>>>
>>> sfClusterEval(load("x2.RData"))
>>> sfClusterEval(open(x2))
>>> ## The following is, of course, non-deterministic. Just to see it
>>> ## working.
>>> sfClusterApplyLB(1:15, function(i) x2[, i] <- rep(nodenum, 4))
>>> sfClusterEval(close(x2))
>>>
>>> rm(x2)
>>> load("x2.RData")
>>> open(x2)
>>> x2[, ] ## changes are stored
>>>
>>>
>>>
>>> ## b) Create ff object in nodes, and put together as ffdf in master
>>> ##    Note that we pass the object back as the return of sfClusterApplyLB
>>> ##    In contrast, in a) we do not pass anything back from
>>> ##    sfClusterApplyLB, but leave it in the disk.
>>>
>>> sfExport("fullname3")
>>>
>>> createAndClose <- function(i) {
>>>  nameobj <- paste("ffpiece", i, sep = "")
>>>  assign(nameobj,
>>>        ff(rep(nodenum, 4), pattern = fullname3))
>>>  close(get(nameobj))
>>>  return(get(nameobj))
>>> }
>>>
>>>
>>> list.ff <- sfClusterApplyLB(1:15, createAndClose)
>>>
>>> ## put together ffdf
>>> ## x3 <- ffdf(list.ff[[1]], list.ff[[2]], list.ff[[3]]) ## etc ...
>>>
>>> eval(parse(text = paste("x3 <- ffdf(", paste("list.ff[[", 1:15, "]]",
>>>            sep = "", collapse = ", "), ")")))
>>>
>>> open(x3)
>>> x3
>>>
>>>
>>>
>>>
>>>
>>>
>>> --
>>> Ramon Diaz-Uriarte
>>> Structural Biology and Biocomputing Programme
>>> Spanish National Cancer Centre (CNIO)
>>> http://ligarto.org/rdiaz
>>> Phone: +34-91-732-8000 ext. 3019
>>>
>>> _______________________________________________
>>> R-sig-hpc mailing list
>>> R-sig-hpc at r-project.org
>>> https://stat.ethz.ch/mailman/listinfo/r-sig-hpc
>>
>>
>
>
>
> --
> Ramon Diaz-Uriarte
> Structural Biology and Biocomputing Programme
> Spanish National Cancer Centre (CNIO)
> http://ligarto.org/rdiaz
> Phone: +34-91-732-8000 ext. 3019
>
> _______________________________________________
> R-sig-hpc mailing list
> R-sig-hpc at r-project.org
> https://stat.ethz.ch/mailman/listinfo/r-sig-hpc
>



-- 
Jeffrey Ryan
jeffrey.ryan at insightalgo.com

ia: insight algorithmics
www.insightalgo.com



More information about the R-sig-hpc mailing list