[Rd] Bug report: parLapply with capture.output(type="message") produces an error

Travers Ching tr@ver@c @end|ng |rom gm@||@com
Fri Oct 6 21:04:54 CEST 2023


Hi Henrik,

Thanks for the detailed technical explanation! I ended up using the
withCallingHandlers solution to achieve what I needed (thanks to stack
overflow). If this is not technically a bug I think it is unintuitive and
unexpected behavior from a user perspective. So take this as a feature
request rather than a bug report.

The error message at the end of the script doesn't inform the user what
part of the script is wrong (using sink or capture.output in parallel). It
is difficult to understand what's going on.

The "correct" solution using withCallingHandlers is esoteric, and I think
most users would not code that up naturally much less understand what it is
doing. Could capture.output(type="messages") be rewritten using this
approach?

Lastly, the help file for stopCluster says

"the workers will terminate themselves once the socket on which they are
listening for commands becomes unavailable, which it should if the master R
session is completed"

To me, this implies that I shouldn't need to call stopCluster and that the
workers are automatically stopped at the end. The place where I first saw
the error was using future_lapply and following the vignette there's no
call to stopCluster there either.

Best,
Travers


On Thu, Oct 5, 2023 at 6:15 PM Henrik Bengtsson <henrik.bengtsson using gmail.com>
wrote:

> This is actually not a bug. If we really want to identify a bug, then
> it's actually a bug in your code. We'll get to that at the very end.
> Either way, it's an interesting report that reveals a lot of things.
>
> First, here's a slightly simpler version of your example:
>
> $ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1); x <-
> clusterEvalQ(cl, { capture.output(NULL, type = "message") })'
> Error in unserialize(node$con) : error reading from connection
> Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> unserialize
> Execution halted
>
> There are lots of things going on here, but before we get to the
> answer, the most important take-home message here is:
>
>  Never ever use capture.output(..., type = "message") in R.
>
> Second comment is:
>
>  No, really, do not do that!
>
> Now, towards what is going on in your example. First, I don't think
> help("capture.output") is too "kind" here, when it says:
>
> 'Messages sent to stderr() (including those from message, warning and
> stop) are captured by type = "message". Note that this can be “unsafe”
> and should only be used with care.'
>
> To understand why you shouldn't do this, you have to know that
> capture.output() uses sink() internally, and its help page says:
>
> "Sink-ing the messages stream should be done only with great care. For
> that stream file must be an already open connection, and there is no
> stack of connections."
>
> The "[When] Sink-ing the messages stream ... there is no stack of
> connections" is the reason for your the problem you're experiencing.
> What happens is that, the background workers that you launch with
> parallel::makeCluster() will use sink(..., type = "message")
> internally and that's active throughout all parallel evaluation.  Now,
> when you add another one of, via your capture.output(..., type =
> "message"), you are stealing the "message" sink from the parallel
> worker.  Our simplified example can be reproduced using only sink():s
> as:
>
> $ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1); x <-
> clusterEvalQ(cl, { sink(file(nullfile(), open = "a"), type =
> "message"); sink(type = "message") })'
> Error in unserialize(node$con) : error reading from connection
> Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> unserialize
> Execution halted
>
> Back to the "message" sink that parallel sets up. By default, it sinks
> to the "null" file.  This is done to avoid output on parallel workers
> from cluttering up the terminal.  The default is controlled by
> argument 'outfile' of makeCluster(), i.e. our example does:
>
> cl <- makeCluster(1, outfile = "/dev/null")
>
> Now, since we're stealing the "message" sink from the worker when we
> call sink(..., type = "message") on the parallel worker, any output on
> the workers is no longer sent to the "null" file, but instead straight
> out to the terminal. So, after stealing the sink, we're effectively
> running as if we had told the parallel workers to not sink output.  We
> can manually do this by:
>
> cl <- makeCluster(1, outfile = "")
>
> We're almost there.  If we use the latter, we will see all output from
> the parallel worker(s).  Let's try that:
>
> $ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1,
> outfile = ""); x <- clusterEvalQ(cl, { })'
> starting worker pid=349252 on localhost:11036 at 17:45:05.125
> Error in unserialize(node$con) : error reading from connection
> Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> unserialize
> Execution halted
>
> You see. There's a "starting worker ..." output that we now see.  But
> more importantly, we now also see that "error reading from connection"
> message.  So, as you see, that error message is there regardless of us
> capturing or sinking the "message" output.  Instead, what it tells us
> is that there is an error taking place at the very end, but we
> normally don't see it.
>
> This error is because when the main R session shuts down, the parallel
> workers are still running and trying to listen to the socket
> connection that they use to communicate with the main R session.  But
> that is now broken, so each parallel worker will fail when it tries to
> communicate.
>
> How to fix it? Make sure to close the 'cl' cluster before exiting the
> main R session, i.e.
>
> $ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1,
> outfile = ""); x <- clusterEvalQ(cl, { }); stopCluster(cl)'
> starting worker pid=349703 on localhost:11011 at 17:50:20.357
>
> The error is no longer there, because the main R session will tell the
> parallel workers to shut down *before* terminating itself. This means
> there are no stray parallel workers trying to reach a non-existing
> main R session.
>
> In a way, your example revealed that you forgot to call
> stopCluster(cl) at the end.
>
> But, the real message here is: Do not mess with the "message" output in R!
>
> I'll take the moment to rant about this: I think sink(..., type =
> "message") should not be part of the public R API; it's simply
> impossible to use safely, because there is no one owner controlling
> it. To prevent it being used by mistake, at least it could throw an
> error if there's already an active "message" sink.  Oh, well ...
>
>
> Almost finally, do what you're probably trying to achieve here, when you
> call:
>
>  out <- capture.output({ message("hello"); message("world") }, type =
> "message")
>
> What you really want to do is:
>
> capture_messages <- function(expr, envir = parent.frame()) {
>   msgs <- list()
>   withCallingHandlers({
>     eval(expr, envir = envir)
>   }, message = function(m) {
>     msgs <<- c(msgs, list(m))
>     invokeRestart("muffleMessage")
>   })
>   msgs
> }
>
> msgs <- capture_messages({ message("hello"); message("world") })
>
> When you capture 'message' conditions this way, you can decide to
> resignal then later, e.g.
>
> > void <- lapply(msgs, message)
> hello
> world
>
> You can capture 'warning' conditions in the same way.
>
>
>
> Finally, if you've got to this because you wanted to
> capture/see/display/view output that is taking place on parallel
> workers, I recommend using the Futureverse (https://futureverse.org)
> for parallelization. Disclaimer, I'm the author.  The Futureverse
> takes care of relaying stdout, messages, warnings, errors, and other
> types of conditions automatically. Here's an example that resembles
> your original example:
>
> > cl <- parallel::makeCluster(2)
> > future::plan("cluster", workers = cl)
> > y <- future.apply::future_lapply(1:3, function(i) message("hello"))
> hello
> hello
> hello
> > parallel::stopCluster(cl)
>
> Note that those "hello" messages are truly relayed versions of the
> original 'message' conditions. Warnings works the same way.
>
> A cleaner and slightly better version of the above example is:
>
> > library(future.apply)
> > plan(multisession, workers = 2)
> > y <- future.apply::future_lapply(1:3, function(i) message("hello"))
> hello
> hello
> hello
> > plan(sequential)
>
> Over and out,
>
> Henrik
>
> On Thu, Oct 5, 2023 at 4:07 PM Travers Ching <traversc using gmail.com> wrote:
> >
> > Hello, I have tested this on a fresh ubuntu image with R 4.3.1.
> >
> > Rscript -e 'library(parallel)
> > cl = makeCluster(2)
> > x = parLapply(cl, 1:100, function(i) {
> >   capture.output(message("hello"), type = "message")
> > })
> > print("bye")'
> >
> > This produces the following output:
> >
> > [1] "bye"
> > Error in unserialize(node$con) : error reading from connection
> > Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> > unserialize
> > Execution halted
> > Error in unserialize(node$con) : error reading from connection
> > Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> > unserialize
> > Execution halted
> >
> > The error does not occur interactively or if stopCluster gets called at
> the
> > end.
> >
> >         [[alternative HTML version deleted]]
> >
> > ______________________________________________
> > R-devel using r-project.org mailing list
> > https://stat.ethz.ch/mailman/listinfo/r-devel
>

	[[alternative HTML version deleted]]



More information about the R-devel mailing list