[Rd] most robust way to call R API functions from a secondary thread

Stepan @tep@n@@|nde|@r @end|ng |rom or@c|e@com
Mon May 20 11:45:47 CEST 2019


Hi Andreas,

note that with the introduction of ALTREP, as far as I understand, calls 
as "simple" as DATAPTR can execute arbitrary code (R or native). Even 
without ALTREP, if you execute user-provided R code via Rf_eval and such 
on some custom thread, you may end up executing native code of some 
package, which may assume it is executed only from the R main thread.

Could you (1) decompose your problem in a way that in some initial phase 
you pull all the necessary data from R, then start the parallel 
computation, and then again in the R main thread "submit" the results 
back to the R world?

If you wanted something really robust, you can (2) "send" the requests 
for R API usage to the R main thread and pause the worker thread until 
it receives the results back. This looks similar to what the "later" 
package does. Maybe you can even use that package for your purposes?

Do you want to parallelize your code to achieve better performance? Even 
with your proposed solution, you need synchronization and chances are 
that excessive synchronization will severely affect the expected 
performance benefits of parallelization. If you do not need to 
synchronize that much, then the question is if you can do with (1) or (2).

Best regards,
Stepan

On 19/05/2019 11:31, Andreas Kersting wrote:
> Hi,
> 
> As the subject suggests, I am looking for the most robust way to call an (arbitrary) function from the R API from another but the main POSIX thread in a package's code.
> 
> I know that, "[c]alling any of the R API from threaded code is ‘for experts only’ and strongly discouraged. Many functions in the R API modify internal R data structures and might corrupt these data structures if called simultaneously from multiple threads. Most R API functions can signal errors, which must only happen on the R main thread." (https://urldefense.proofpoint.com/v2/url?u=https-3A__cran.r-2Dproject.org_doc_manuals_r-2Drelease_R-2Dexts.html-23OpenMP-2Dsupport&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=JaadZR_m-QiJ3BQzzQ_fJPYt034tM5Ts6vKhdi6f__A&e=)
> 
> Let me start with my understanding of the related issues and possible solutions:
> 
> 1) R API functions are generally not thread-safe and hence one must ensure, e.g. by using mutexes, that no two threads use the R API simultaneously
> 
> 2) R uses longjmps on error and interrupts as well as for condition handling and it is undefined behaviour to do a longjmp from one thread to another; interrupts can be suspended before creating the threads by setting R_interrupts_suspended = TRUE; by wrapping the calls to functions from the R API with R_ToplevelExec(), longjmps across thread boundaries can be avoided; the only reason for R_ToplevelExec() itself to fail with an R-style error (longjmp) is a pointer protection stack overflow
> 
> 3) R_CheckStack() might be executed (indirectly), which will (probably) signal a stack overflow because it only works correctly when called form the main thread (see https://urldefense.proofpoint.com/v2/url?u=https-3A__cran.r-2Dproject.org_doc_manuals_r-2Drelease_R-2Dexts.html-23Threading-2Dissues&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=J_TMw2gu43dxB_EX2vHbtF4Zr4bIAFR8RSFzvbRV6jE&e=); in particular, any function that does allocations, e.g. via allocVector3() might end up calling it via GC -> finalizer -> ... -> eval; the only way around this problem which I could find is to adjust R_CStackLimit, which is outside of the official API; it can be set to -1 to disable the check or be changed to a value appropriate for the current thread
> 
> 4) R sets signal handlers for several signals and some of them make use of the R API; hence, issues 1) - 3) apply; signal masks can be used to block delivery of signals to secondary threads in general and to the main thread while other threads are using the R API
> 
> 
> I basically have the following questions:
> 
> a) Is my understanding of the issues accurate?
> b) Are there more things to consider when calling the R API from secondary threads?
> c) Are the solutions proposed appropriate? Are there scenarios in which they will fail to solve the issue? Or might they even cause new problems?
> d) Are there alternative/better solutions?
> 
> Any feedback on this is highly appreciated.
> 
> Below you can find a template which, combines the proposed solutions (and skips all non-illustrative checks of return values). Additionally, R_CheckUserInterrupt() is used in combination with R_UnwindProtect() to regularly check for interrupts from the main thread, while still being able to cleanly cancel the threads before fun_running_in_main_thread() is left via a longjmp. This is e.g. required if the secondary threads use memory which was allocated in fun_running_in_main_thread() using e.g. R_alloc().
> 
> Best regards,
> Andreas Kersting
> 
> 
> 
> #include <Rinternals.h>
> #include <pthread.h>
> #include <signal.h>
> #include <stdint.h>
> 
> extern uintptr_t R_CStackLimit;
> extern int R_PPStackTop;
> extern int R_PPStackSize;
> 
> #include <R_ext/libextern.h>
> LibExtern Rboolean R_interrupts_suspended;
> LibExtern int R_interrupts_pending;
> extern void Rf_onintr(void);
> 
> // mutex for exclusive access to the R API:
> static pthread_mutex_t r_api_mutex = PTHREAD_MUTEX_INITIALIZER;
> 
> // a wrapper arround R_CheckUserInterrupt() which can be passed to R_UnwindProtect():
> SEXP check_interrupt(void *data) {
>    R_CheckUserInterrupt();
>    return R_NilValue;
> }
> 
> // a wrapper arround Rf_onintr() which can be passed to R_UnwindProtect():
> SEXP my_onintr(void *data) {
>    Rf_onintr();
>    return R_NilValue;
> }
> 
> // function called by R_UnwindProtect() to cleanup on interrupt
> void cleanfun(void *data, Rboolean jump) {
>    if (jump) {
>      // terminate threads cleanly ...
>    }
> }
> 
> void fun_calling_R_API(void *data) {
>    // call some R API function, e.g. mkCharCE() ...
> }
> 
> void *threaded_fun(void *td) {
> 
>    // ...
> 
>    pthread_mutex_lock(&r_api_mutex);
> 
>    // avoid false stack overflow error:
>    intptr_t R_CStackLimit_old = R_CStackLimit;
>    R_CStackLimit = -1;
> 
> 
>    // R_ToplevelExec() below will call PROTECT 4x:
>    if (R_PPStackTop > R_PPStackSize - 4) {
>      // ppstack would overflow in R_ToplevelExec() -> handle this ...
>    }
> 
>    // avoid longjmp to different thread:
>    Rboolean ok = R_ToplevelExec(fun_calling_R_API, (void *) &some_data);
> 
>    // re-enable stack size checking:
>    R_CStackLimit = R_CStackLimit_old;
>    pthread_mutex_unlock(&r_api_mutex);
> 
>    if (!ok) {
>      // handle error ...
>    }
> 
>    // ...
> }
> 
> SEXP fun_running_in_main_thread() {
> 
>    // ...
> 
>    /* create continuation token for R_UnwindProtect():
>     *
>     * do this explicitly here before the threads are created because this might
>     * fail in allocation or with pointer protection stack overflow
>     */
>    SEXP cont = PROTECT(R_MakeUnwindCont());
> 
>    /* block all signals:
>     *
>     * do this before the threads are created such that they inherit the mask
>     */
>    sigset_t block_set, prev_mask;
>    sigfillset(&block_set);
>    pthread_sigmask(SIG_SETMASK, &block_set, &prev_mask);
> 
>    // suspend interrupts:
>    Rboolean __oldsusp__ = R_interrupts_suspended;
>    R_interrupts_suspended = TRUE;
> 
>    // create threads running threaded_fun() ...
> 
>    for(;;) {
>      // timed blocking check if threads are done ...
> 
>      // unblock signals, check for interrupts and run cleanfun if there is one:
>      pthread_mutex_lock(&r_api_mutex);
>      pthread_sigmask(SIG_SETMASK, &prev_mask, NULL);
> 
>      R_interrupts_suspended = __oldsusp__;
>      if (R_interrupts_pending && ! R_interrupts_suspended) {
>        R_UnwindProtect(my_onintr, NULL, cleanfun, (void *) clean_data, cont);
>      }
> 
>      R_UnwindProtect(check_interrupt, NULL, cleanfun, (void *) clean_data, cont);
> 
>      R_interrupts_suspended = TRUE;
> 
>      pthread_sigmask(SIG_SETMASK, &block_set, NULL);
>      pthread_mutex_unlock(&r_api_mutex);
>    }
> 
>    // now all threads are dead
> 
>    UNPROTECT(1);  // continuation token
> 
>    // unblock signals:
>    pthread_sigmask(SIG_SETMASK, &prev_mask, NULL);
> 
>    // reset interrupt-suspension:
>    R_interrupts_suspended = __oldsusp__;
>    if (R_interrupts_pending && ! R_interrupts_suspended) {
>      Rf_onintr();
>    }
> 
>    // ...
> }
> ______________________________________________
> R-devel using r-project.org mailing list
> https://urldefense.proofpoint.com/v2/url?u=https-3A__stat.ethz.ch_mailman_listinfo_r-2Ddevel&d=DwIFaQ&c=RoP1YumCXCgaWHvlZYR8PZh8Bv7qIrMUB65eapI_JnE&r=neKFCw86thQe2E2-61NAgpDMw4cC7oD_tUTTzraOkQM&m=d1r2raD4w0FF7spOVuz2IVEo0P_II3ZtSbw0TU2NmaE&s=Cv18jzHzhslwwzRcd4ztMRE_4xqphJz7bQ8lprgol6I&e=
>



More information about the R-devel mailing list