[Rd] Bug report: parLapply with capture.output(type="message") produces an error

Henrik Bengtsson henr|k@bengt@@on @end|ng |rom gm@||@com
Fri Oct 6 03:15:09 CEST 2023


This is actually not a bug. If we really want to identify a bug, then
it's actually a bug in your code. We'll get to that at the very end.
Either way, it's an interesting report that reveals a lot of things.

First, here's a slightly simpler version of your example:

$ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1); x <-
clusterEvalQ(cl, { capture.output(NULL, type = "message") })'
Error in unserialize(node$con) : error reading from connection
Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
unserialize
Execution halted

There are lots of things going on here, but before we get to the
answer, the most important take-home message here is:

 Never ever use capture.output(..., type = "message") in R.

Second comment is:

 No, really, do not do that!

Now, towards what is going on in your example. First, I don't think
help("capture.output") is too "kind" here, when it says:

'Messages sent to stderr() (including those from message, warning and
stop) are captured by type = "message". Note that this can be “unsafe”
and should only be used with care.'

To understand why you shouldn't do this, you have to know that
capture.output() uses sink() internally, and its help page says:

"Sink-ing the messages stream should be done only with great care. For
that stream file must be an already open connection, and there is no
stack of connections."

The "[When] Sink-ing the messages stream ... there is no stack of
connections" is the reason for your the problem you're experiencing.
What happens is that, the background workers that you launch with
parallel::makeCluster() will use sink(..., type = "message")
internally and that's active throughout all parallel evaluation.  Now,
when you add another one of, via your capture.output(..., type =
"message"), you are stealing the "message" sink from the parallel
worker.  Our simplified example can be reproduced using only sink():s
as:

$ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1); x <-
clusterEvalQ(cl, { sink(file(nullfile(), open = "a"), type =
"message"); sink(type = "message") })'
Error in unserialize(node$con) : error reading from connection
Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
unserialize
Execution halted

Back to the "message" sink that parallel sets up. By default, it sinks
to the "null" file.  This is done to avoid output on parallel workers
from cluttering up the terminal.  The default is controlled by
argument 'outfile' of makeCluster(), i.e. our example does:

cl <- makeCluster(1, outfile = "/dev/null")

Now, since we're stealing the "message" sink from the worker when we
call sink(..., type = "message") on the parallel worker, any output on
the workers is no longer sent to the "null" file, but instead straight
out to the terminal. So, after stealing the sink, we're effectively
running as if we had told the parallel workers to not sink output.  We
can manually do this by:

cl <- makeCluster(1, outfile = "")

We're almost there.  If we use the latter, we will see all output from
the parallel worker(s).  Let's try that:

$ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1,
outfile = ""); x <- clusterEvalQ(cl, { })'
starting worker pid=349252 on localhost:11036 at 17:45:05.125
Error in unserialize(node$con) : error reading from connection
Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
unserialize
Execution halted

You see. There's a "starting worker ..." output that we now see.  But
more importantly, we now also see that "error reading from connection"
message.  So, as you see, that error message is there regardless of us
capturing or sinking the "message" output.  Instead, what it tells us
is that there is an error taking place at the very end, but we
normally don't see it.

This error is because when the main R session shuts down, the parallel
workers are still running and trying to listen to the socket
connection that they use to communicate with the main R session.  But
that is now broken, so each parallel worker will fail when it tries to
communicate.

How to fix it? Make sure to close the 'cl' cluster before exiting the
main R session, i.e.

$ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1,
outfile = ""); x <- clusterEvalQ(cl, { }); stopCluster(cl)'
starting worker pid=349703 on localhost:11011 at 17:50:20.357

The error is no longer there, because the main R session will tell the
parallel workers to shut down *before* terminating itself. This means
there are no stray parallel workers trying to reach a non-existing
main R session.

In a way, your example revealed that you forgot to call
stopCluster(cl) at the end.

But, the real message here is: Do not mess with the "message" output in R!

I'll take the moment to rant about this: I think sink(..., type =
"message") should not be part of the public R API; it's simply
impossible to use safely, because there is no one owner controlling
it. To prevent it being used by mistake, at least it could throw an
error if there's already an active "message" sink.  Oh, well ...


Almost finally, do what you're probably trying to achieve here, when you call:

 out <- capture.output({ message("hello"); message("world") }, type = "message")

What you really want to do is:

capture_messages <- function(expr, envir = parent.frame()) {
  msgs <- list()
  withCallingHandlers({
    eval(expr, envir = envir)
  }, message = function(m) {
    msgs <<- c(msgs, list(m))
    invokeRestart("muffleMessage")
  })
  msgs
}

msgs <- capture_messages({ message("hello"); message("world") })

When you capture 'message' conditions this way, you can decide to
resignal then later, e.g.

> void <- lapply(msgs, message)
hello
world

You can capture 'warning' conditions in the same way.



Finally, if you've got to this because you wanted to
capture/see/display/view output that is taking place on parallel
workers, I recommend using the Futureverse (https://futureverse.org)
for parallelization. Disclaimer, I'm the author.  The Futureverse
takes care of relaying stdout, messages, warnings, errors, and other
types of conditions automatically. Here's an example that resembles
your original example:

> cl <- parallel::makeCluster(2)
> future::plan("cluster", workers = cl)
> y <- future.apply::future_lapply(1:3, function(i) message("hello"))
hello
hello
hello
> parallel::stopCluster(cl)

Note that those "hello" messages are truly relayed versions of the
original 'message' conditions. Warnings works the same way.

A cleaner and slightly better version of the above example is:

> library(future.apply)
> plan(multisession, workers = 2)
> y <- future.apply::future_lapply(1:3, function(i) message("hello"))
hello
hello
hello
> plan(sequential)

Over and out,

Henrik

On Thu, Oct 5, 2023 at 4:07 PM Travers Ching <traversc using gmail.com> wrote:
>
> Hello, I have tested this on a fresh ubuntu image with R 4.3.1.
>
> Rscript -e 'library(parallel)
> cl = makeCluster(2)
> x = parLapply(cl, 1:100, function(i) {
>   capture.output(message("hello"), type = "message")
> })
> print("bye")'
>
> This produces the following output:
>
> [1] "bye"
> Error in unserialize(node$con) : error reading from connection
> Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> unserialize
> Execution halted
> Error in unserialize(node$con) : error reading from connection
> Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> unserialize
> Execution halted
>
> The error does not occur interactively or if stopCluster gets called at the
> end.
>
>         [[alternative HTML version deleted]]
>
> ______________________________________________
> R-devel using r-project.org mailing list
> https://stat.ethz.ch/mailman/listinfo/r-devel



More information about the R-devel mailing list