[Rd] [parallel] fixes load balancing of parLapplyLB
Christian Krause
christian.krause at idiv.de
Mon Feb 12 20:08:15 CET 2018
Dear R-Devel List,
**TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not
been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem
and we also have a patch to fix it (attached). A similar fix has also been provided [here][2].
[1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792
## The Call Chain
First, we traced the relevant R function calls through the code, beginning with `parLapplyLB`:
1. **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB**
2. **splitList:** clusterApply.R:157
3. **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply**
4. **dynamicClusterApply:** clusterApply.R:39
## splitList
We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5
workers. First, lets take a look at **splitList**:
```r
> sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
> sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20
> sapply(parallel:::splitList(1:97, 20), length)
[1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```
As we can see in the examples, the work is distributed as equally as possible.
## dynamicClusterApply
**dynamicClusterApply** works this way (simplified):
1. it first gives a chunk to each worker
2. once a worker comes back with the result, it is given the next chunk
**This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If
there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing.
## parLapplyLB
This is how **parLapplyLB** splits the input list (with a bit of refactoring, for readability):
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunks <- splitList(X, length(cl))
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
For our examples, the chunks have these sizes:
```r
> sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```
There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because
each worker is given a single chunk and then it stops working because there are no more chunks.
Instead, **parLapplyLB** should look like this (patch is attached):
```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
cl <- defaultCluster(cl)
chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))
chunks <- splitList(X, chunkSize)
do.call(c,
clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
quote = TRUE)
}
```
Examples with a cluster of 5 workers:
```r
# length(cl) < length(X)
> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
[1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5
# length(cl) >= length(X)
> sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```
With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing
should work.
Best Regards
--
Christian Krause
Scientific Computing Administration and Support
------------------------------------------------------------------------------------------------------------------------
Phone: +49 341 97 33144
Email: christian.krause at idiv.de
------------------------------------------------------------------------------------------------------------------------
German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig
Deutscher Platz 5e
04103 Leipzig
Germany
------------------------------------------------------------------------------------------------------------------------
iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft
iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92 Abs. 1 SächsHSFG und wird zusammen mit der
Martin-Luther-Universität Halle-Wittenberg und der Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation
mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte Kooperationspartner sind die folgenden
außeruniversitären Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH - UFZ, das
Max-Planck-Institut für Biogeochemie (MPI BGC), das Max-Planck-Institut für chemische Ökologie (MPI CE), das
Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen
und Zellkulturen (DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das Leibniz-Institut für Pflanzengenetik und
Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für Naturkunde Görlitz (SMNG). USt-IdNr. DE
141510383
-------------- next part --------------
A non-text attachment was scrubbed...
Name: fixes-parLapplyLB.patch
Type: text/x-patch
Size: 676 bytes
Desc: not available
URL: <https://stat.ethz.ch/pipermail/r-devel/attachments/20180212/927bc7d1/attachment.bin>
-------------- next part --------------
A non-text attachment was scrubbed...
Name: r-parallel-load-balancing.png
Type: image/png
Size: 47263 bytes
Desc: not available
URL: <https://stat.ethz.ch/pipermail/r-devel/attachments/20180212/927bc7d1/attachment.png>
More information about the R-devel
mailing list