[Rd] Bug report: parLapply with capture.output(type="message") produces an error
ch@riie@g@o m@iii@g oii shikokuchuo@@et
ch@riie@g@o m@iii@g oii shikokuchuo@@et
Sat Oct 7 17:55:53 CEST 2023
Hi Travers,
This is an implementation detail for background workers in general, in that there must be some robust way for them to exit (either upon a signal from the main session, or if the main session ends / socket disconnects). As these are background workers, their error messages are usually not seen, and hence it has been deemed good enough that they exit in this case through error. However, you do see them in your case as you have diverted the message stream as Henrik has highlighted. This may be inconvenient, but can safely be ignored.
If however, clean output is important in your use case, there is a new solution that has only just become available. This is a direct outcome of the R Project Sprint in Warwick from a month ago – Luke Tierney has actually opened up the `parallel` package to allow other packages to provide alternative communications backends. Only possible with R-devel, but as of yesterday a new version of the `mirai` package was released to CRAN that provides one such backend.
You would simply replace your `makeCluster()` call with `mirai::make_cluster()`. That’s the only change.
As this is the R-devel mailing list, I will not go into the details of this particular implementation, but it seems useful for users of `parallel` to know that this is now possible. As author of `mirai`, please reach out directly with questions on the package rather than replying on the list.
I just want to highlight one other possibility - if you remove `capture.output()` in your evaluation and call `mirai::make_cluster(2, output = TRUE)` instead, you will then be able to see all the messages from the background workers in your main process. It’s probably not what you’re after, but just in case.
Thanks,
Charlie
6 October 2023 at 12:04, Travers Ching <traversc using gmail.com> wrote:
>
> Hi Henrik,
>
> Thanks for the detailed technical explanation! I ended up using the
> withCallingHandlers solution to achieve what I needed (thanks to stack
> overflow). If this is not technically a bug I think it is unintuitive and
> unexpected behavior from a user perspective. So take this as a feature
> request rather than a bug report.
>
> The error message at the end of the script doesn't inform the user what
> part of the script is wrong (using sink or capture.output in parallel). It
> is difficult to understand what's going on.
>
> The "correct" solution using withCallingHandlers is esoteric, and I think
> most users would not code that up naturally much less understand what it is
> doing. Could capture.output(type="messages") be rewritten using this
> approach?
>
> Lastly, the help file for stopCluster says
>
> "the workers will terminate themselves once the socket on which they are
> listening for commands becomes unavailable, which it should if the master R
> session is completed"
>
> To me, this implies that I shouldn't need to call stopCluster and that the
> workers are automatically stopped at the end. The place where I first saw
> the error was using future_lapply and following the vignette there's no
> call to stopCluster there either.
>
> Best,
> Travers
>
> On Thu, Oct 5, 2023 at 6:15 PM Henrik Bengtsson <henrik.bengtsson using gmail.com>
> wrote:
>
> >
> > This is actually not a bug. If we really want to identify a bug, then
> > it's actually a bug in your code. We'll get to that at the very end.
> > Either way, it's an interesting report that reveals a lot of things.
> >
> > First, here's a slightly simpler version of your example:
> >
> > $ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1); x <-
> > clusterEvalQ(cl, { capture.output(NULL, type = "message") })'
> > Error in unserialize(node$con) : error reading from connection
> > Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> > unserialize
> > Execution halted
> >
> > There are lots of things going on here, but before we get to the
> > answer, the most important take-home message here is:
> >
> > Never ever use capture.output(..., type = "message") in R.
> >
> > Second comment is:
> >
> > No, really, do not do that!
> >
> > Now, towards what is going on in your example. First, I don't think
> > help("capture.output") is too "kind" here, when it says:
> >
> > 'Messages sent to stderr() (including those from message, warning and
> > stop) are captured by type = "message". Note that this can be “unsafe”
> > and should only be used with care.'
> >
> > To understand why you shouldn't do this, you have to know that
> > capture.output() uses sink() internally, and its help page says:
> >
> > "Sink-ing the messages stream should be done only with great care. For
> > that stream file must be an already open connection, and there is no
> > stack of connections."
> >
> > The "[When] Sink-ing the messages stream ... there is no stack of
> > connections" is the reason for your the problem you're experiencing.
> > What happens is that, the background workers that you launch with
> > parallel::makeCluster() will use sink(..., type = "message")
> > internally and that's active throughout all parallel evaluation. Now,
> > when you add another one of, via your capture.output(..., type =
> > "message"), you are stealing the "message" sink from the parallel
> > worker. Our simplified example can be reproduced using only sink():s
> > as:
> >
> > $ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1); x <-
> > clusterEvalQ(cl, { sink(file(nullfile(), open = "a"), type =
> > "message"); sink(type = "message") })'
> > Error in unserialize(node$con) : error reading from connection
> > Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> > unserialize
> > Execution halted
> >
> > Back to the "message" sink that parallel sets up. By default, it sinks
> > to the "null" file. This is done to avoid output on parallel workers
> > from cluttering up the terminal. The default is controlled by
> > argument 'outfile' of makeCluster(), i.e. our example does:
> >
> > cl <- makeCluster(1, outfile = "/dev/null")
> >
> > Now, since we're stealing the "message" sink from the worker when we
> > call sink(..., type = "message") on the parallel worker, any output on
> > the workers is no longer sent to the "null" file, but instead straight
> > out to the terminal. So, after stealing the sink, we're effectively
> > running as if we had told the parallel workers to not sink output. We
> > can manually do this by:
> >
> > cl <- makeCluster(1, outfile = "")
> >
> > We're almost there. If we use the latter, we will see all output from
> > the parallel worker(s). Let's try that:
> >
> > $ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1,
> > outfile = ""); x <- clusterEvalQ(cl, { })'
> > starting worker pid=349252 on localhost:11036 at 17:45:05.125
> > Error in unserialize(node$con) : error reading from connection
> > Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> > unserialize
> > Execution halted
> >
> > You see. There's a "starting worker ..." output that we now see. But
> > more importantly, we now also see that "error reading from connection"
> > message. So, as you see, that error message is there regardless of us
> > capturing or sinking the "message" output. Instead, what it tells us
> > is that there is an error taking place at the very end, but we
> > normally don't see it.
> >
> > This error is because when the main R session shuts down, the parallel
> > workers are still running and trying to listen to the socket
> > connection that they use to communicate with the main R session. But
> > that is now broken, so each parallel worker will fail when it tries to
> > communicate.
> >
> > How to fix it? Make sure to close the 'cl' cluster before exiting the
> > main R session, i.e.
> >
> > $ Rscript --vanilla -e 'library(parallel); cl <- makeCluster(1,
> > outfile = ""); x <- clusterEvalQ(cl, { }); stopCluster(cl)'
> > starting worker pid=349703 on localhost:11011 at 17:50:20.357
> >
> > The error is no longer there, because the main R session will tell the
> > parallel workers to shut down *before* terminating itself. This means
> > there are no stray parallel workers trying to reach a non-existing
> > main R session.
> >
> > In a way, your example revealed that you forgot to call
> > stopCluster(cl) at the end.
> >
> > But, the real message here is: Do not mess with the "message" output in R!
> >
> > I'll take the moment to rant about this: I think sink(..., type =
> > "message") should not be part of the public R API; it's simply
> > impossible to use safely, because there is no one owner controlling
> > it. To prevent it being used by mistake, at least it could throw an
> > error if there's already an active "message" sink. Oh, well ...
> >
> > Almost finally, do what you're probably trying to achieve here, when you
> > call:
> >
> > out <- capture.output({ message("hello"); message("world") }, type =
> > "message")
> >
> > What you really want to do is:
> >
> > capture_messages <- function(expr, envir = parent.frame()) {
> > msgs <- list()
> > withCallingHandlers({
> > eval(expr, envir = envir)
> > }, message = function(m) {
> > msgs <<- c(msgs, list(m))
> > invokeRestart("muffleMessage")
> > })
> > msgs
> > }
> >
> > msgs <- capture_messages({ message("hello"); message("world") })
> >
> > When you capture 'message' conditions this way, you can decide to
> > resignal then later, e.g.
> >
> > void <- lapply(msgs, message)
> > hello
> > world
> >
> > You can capture 'warning' conditions in the same way.
> >
> > Finally, if you've got to this because you wanted to
> > capture/see/display/view output that is taking place on parallel
> > workers, I recommend using the Futureverse (https://futureverse.org)/
> > for parallelization. Disclaimer, I'm the author. The Futureverse
> > takes care of relaying stdout, messages, warnings, errors, and other
> > types of conditions automatically. Here's an example that resembles
> > your original example:
> >
> > cl <- parallel::makeCluster(2)
> > future::plan("cluster", workers = cl)
> > y <- future.apply::future_lapply(1:3, function(i) message("hello"))
> > hello
> > hello
> > hello
> > parallel::stopCluster(cl)
> >
> > Note that those "hello" messages are truly relayed versions of the
> > original 'message' conditions. Warnings works the same way.
> >
> > A cleaner and slightly better version of the above example is:
> >
> > library(future.apply)
> > plan(multisession, workers = 2)
> > y <- future.apply::future_lapply(1:3, function(i) message("hello"))
> > hello
> > hello
> > hello
> > plan(sequential)
> >
> > Over and out,
> >
> > Henrik
> >
> > On Thu, Oct 5, 2023 at 4:07 PM Travers Ching <traversc using gmail.com> wrote:
> >
> > Hello, I have tested this on a fresh ubuntu image with R 4.3.1.
> >
> > Rscript -e 'library(parallel)
> > cl = makeCluster(2)
> > x = parLapply(cl, 1:100, function(i) {
> > capture.output(message("hello"), type = "message")
> > })
> > print("bye")'
> >
> > This produces the following output:
> >
> > [1] "bye"
> > Error in unserialize(node$con) : error reading from connection
> > Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> > unserialize
> > Execution halted
> > Error in unserialize(node$con) : error reading from connection
> > Calls: <Anonymous> ... doTryCatch -> recvData -> recvData.SOCKnode ->
> > unserialize
> > Execution halted
> >
> > The error does not occur interactively or if stopCluster gets called at
> > the
> > end.
> >
>
More information about the R-devel
mailing list