[Rd] [parallel] fixes load balancing of parLapplyLB

Christian Krause christian.krause at idiv.de
Mon Feb 12 20:08:15 CET 2018


Dear R-Devel List,

**TL;DR:** The function **parLapplyLB** of the parallel package has [reportedly][1] (see also attached RRD output) not
been doing its job, i.e. not actually balancing the load. My colleague Dirk Sarpe and I found the cause of the problem
and we also have a patch to fix it (attached). A similar fix has also been provided [here][2].

[1]: https://stackoverflow.com/questions/38230831/why-does-parlapplylb-not-actually-balance-load
[2]: https://bugs.r-project.org/bugzilla3/show_bug.cgi?id=16792


## The Call Chain

First, we traced the relevant R function calls through the code, beginning with `parLapplyLB`:

1.  **parLapplyLB:** clusterApply.R:177, calls **splitList**, then **clusterApplyLB**
2.  **splitList:** clusterApply.R:157
3.  **clusterApplyLB:** clusterApply.R:87, calls **dynamicClusterApply**
4.  **dynamicClusterApply:** clusterApply.R:39


## splitList

We used both our whiteboard and an R session to manually *run* a few examples. We were using lists of 100 elements and 5
workers. First, lets take a look at **splitList**:

```r
> sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20

> sapply(parallel:::splitList(1:97, 5), length)
[1] 20 19 19 19 20

> sapply(parallel:::splitList(1:97, 20), length)
 [1] 5 5 5 5 4 5 5 5 5 5 4 5 5 5 5 4 5 5 5 5
```

As we can see in the examples, the work is distributed as equally as possible.


## dynamicClusterApply

**dynamicClusterApply** works this way (simplified):

1.  it first gives a chunk to each worker
2.  once a worker comes back with the result, it is given the next chunk

**This is the important part:** As long as there are **more** chunks than workers, there will be load balancing. If
there are fewer chunks than workers, each worker will get **at most one chunk** and there is **no** load balancing.


## parLapplyLB

This is how **parLapplyLB** splits the input list (with a bit of refactoring, for readability):

```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
    cl <- defaultCluster(cl)

    chunks <- splitList(X, length(cl))

    do.call(c,
            clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
            quote = TRUE)
}
```

For our examples, the chunks have these sizes:

```r
> sapply(parallel:::splitList(1:100, 5), length)
[1] 20 20 20 20 20
```

There we have it: 5 chunks. 5 workers. With this work distribution, there can't possibly be any load balancing, because
each worker is given a single chunk and then it stops working because there are no more chunks.

Instead, **parLapplyLB** should look like this (patch is attached):

```r
parLapplyLB <- function(cl = NULL, X, fun, ...)
{
    cl <- defaultCluster(cl)

    chunkSize <- max(length(cl), ceiling(length(X) / length(cl)))

    chunks <- splitList(X, chunkSize)

    do.call(c,
            clusterApplyLB(cl, x = chunks, fun = lapply, fun, ...),
            quote = TRUE)
}
```

Examples with a cluster of 5 workers:

```r
# length(cl) < length(X)
> sapply(parallel:::splitList(1:100, ceiling(100 / 5)), length)
 [1] 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5 5

# length(cl) >= length(X)
> sapply(parallel:::splitList(1:4, 4), length)
[1] 1 1 1 1
# one worker idles here, but we can't do better than that
```

With this patch, the number of chunks is larger than the number of workers, if possible at all, and then load balancing
should work.

Best Regards

-- 

Christian Krause

Scientific Computing Administration and Support

------------------------------------------------------------------------------------------------------------------------

Phone: +49 341 97 33144

Email: christian.krause at idiv.de

------------------------------------------------------------------------------------------------------------------------

German Centre for Integrative Biodiversity Research (iDiv) Halle-Jena-Leipzig

Deutscher Platz 5e

04103 Leipzig

Germany

------------------------------------------------------------------------------------------------------------------------

iDiv is a research centre of the DFG – Deutsche Forschungsgemeinschaft

iDiv ist eine zentrale Einrichtung der Universität Leipzig im Sinne des § 92 Abs. 1 SächsHSFG und wird zusammen mit der
Martin-Luther-Universität Halle-Wittenberg und der Friedrich-Schiller-Universität Jena betrieben sowie in Kooperation
mit dem Helmholtz-Zentrum für Umweltforschung GmbH – UFZ. Beteiligte Kooperationspartner sind die folgenden
außeruniversitären Forschungseinrichtungen: das Helmholtz-Zentrum für Umweltforschung GmbH - UFZ, das
Max-Planck-Institut für Biogeochemie (MPI BGC), das Max-Planck-Institut für chemische Ökologie (MPI CE), das
Max-Planck-Institut für evolutionäre Anthropologie (MPI EVA), das Leibniz-Institut Deutsche Sammlung von Mikroorganismen
und Zellkulturen (DSMZ), das Leibniz-Institut für Pflanzenbiochemie (IPB), das Leibniz-Institut für Pflanzengenetik und
Kulturpflanzenforschung (IPK) und das Leibniz-Institut Senckenberg Museum für Naturkunde Görlitz (SMNG). USt-IdNr. DE
141510383


-------------- next part --------------
A non-text attachment was scrubbed...
Name: fixes-parLapplyLB.patch
Type: text/x-patch
Size: 676 bytes
Desc: not available
URL: <https://stat.ethz.ch/pipermail/r-devel/attachments/20180212/927bc7d1/attachment.bin>

-------------- next part --------------
A non-text attachment was scrubbed...
Name: r-parallel-load-balancing.png
Type: image/png
Size: 47263 bytes
Desc: not available
URL: <https://stat.ethz.ch/pipermail/r-devel/attachments/20180212/927bc7d1/attachment.png>


More information about the R-devel mailing list