[Rd] most robust way to call R API functions from a secondary thread

Andreas Kersting r-deve| @end|ng |rom @ker@t|ng@de
Sun May 19 11:31:35 CEST 2019


Hi,

As the subject suggests, I am looking for the most robust way to call an (arbitrary) function from the R API from another but the main POSIX thread in a package's code.

I know that, "[c]alling any of the R API from threaded code is ‘for experts only’ and strongly discouraged. Many functions in the R API modify internal R data structures and might corrupt these data structures if called simultaneously from multiple threads. Most R API functions can signal errors, which must only happen on the R main thread." (https://cran.r-project.org/doc/manuals/r-release/R-exts.html#OpenMP-support)

Let me start with my understanding of the related issues and possible solutions:

1) R API functions are generally not thread-safe and hence one must ensure, e.g. by using mutexes, that no two threads use the R API simultaneously

2) R uses longjmps on error and interrupts as well as for condition handling and it is undefined behaviour to do a longjmp from one thread to another; interrupts can be suspended before creating the threads by setting R_interrupts_suspended = TRUE; by wrapping the calls to functions from the R API with R_ToplevelExec(), longjmps across thread boundaries can be avoided; the only reason for R_ToplevelExec() itself to fail with an R-style error (longjmp) is a pointer protection stack overflow

3) R_CheckStack() might be executed (indirectly), which will (probably) signal a stack overflow because it only works correctly when called form the main thread (see https://cran.r-project.org/doc/manuals/r-release/R-exts.html#Threading-issues); in particular, any function that does allocations, e.g. via allocVector3() might end up calling it via GC -> finalizer -> ... -> eval; the only way around this problem which I could find is to adjust R_CStackLimit, which is outside of the official API; it can be set to -1 to disable the check or be changed to a value appropriate for the current thread

4) R sets signal handlers for several signals and some of them make use of the R API; hence, issues 1) - 3) apply; signal masks can be used to block delivery of signals to secondary threads in general and to the main thread while other threads are using the R API


I basically have the following questions:

a) Is my understanding of the issues accurate?
b) Are there more things to consider when calling the R API from secondary threads?
c) Are the solutions proposed appropriate? Are there scenarios in which they will fail to solve the issue? Or might they even cause new problems?
d) Are there alternative/better solutions?

Any feedback on this is highly appreciated.

Below you can find a template which, combines the proposed solutions (and skips all non-illustrative checks of return values). Additionally, R_CheckUserInterrupt() is used in combination with R_UnwindProtect() to regularly check for interrupts from the main thread, while still being able to cleanly cancel the threads before fun_running_in_main_thread() is left via a longjmp. This is e.g. required if the secondary threads use memory which was allocated in fun_running_in_main_thread() using e.g. R_alloc().

Best regards,
Andreas Kersting



#include <Rinternals.h>
#include <pthread.h>
#include <signal.h>
#include <stdint.h>

extern uintptr_t R_CStackLimit;
extern int R_PPStackTop;
extern int R_PPStackSize;

#include <R_ext/libextern.h>
LibExtern Rboolean R_interrupts_suspended;
LibExtern int R_interrupts_pending;
extern void Rf_onintr(void);

// mutex for exclusive access to the R API:
static pthread_mutex_t r_api_mutex = PTHREAD_MUTEX_INITIALIZER;

// a wrapper arround R_CheckUserInterrupt() which can be passed to R_UnwindProtect():
SEXP check_interrupt(void *data) {
  R_CheckUserInterrupt();
  return R_NilValue;
}

// a wrapper arround Rf_onintr() which can be passed to R_UnwindProtect():
SEXP my_onintr(void *data) {
  Rf_onintr();
  return R_NilValue;
}

// function called by R_UnwindProtect() to cleanup on interrupt
void cleanfun(void *data, Rboolean jump) {
  if (jump) {
    // terminate threads cleanly ...
  }
}

void fun_calling_R_API(void *data) {
  // call some R API function, e.g. mkCharCE() ...
}

void *threaded_fun(void *td) {

  // ...

  pthread_mutex_lock(&r_api_mutex);

  // avoid false stack overflow error:
  intptr_t R_CStackLimit_old = R_CStackLimit;
  R_CStackLimit = -1;


  // R_ToplevelExec() below will call PROTECT 4x:
  if (R_PPStackTop > R_PPStackSize - 4) {
    // ppstack would overflow in R_ToplevelExec() -> handle this ...
  }

  // avoid longjmp to different thread:
  Rboolean ok = R_ToplevelExec(fun_calling_R_API, (void *) &some_data);

  // re-enable stack size checking:
  R_CStackLimit = R_CStackLimit_old;
  pthread_mutex_unlock(&r_api_mutex);

  if (!ok) {
    // handle error ...
  }

  // ...
}

SEXP fun_running_in_main_thread() {

  // ...

  /* create continuation token for R_UnwindProtect():
   *
   * do this explicitly here before the threads are created because this might
   * fail in allocation or with pointer protection stack overflow
   */
  SEXP cont = PROTECT(R_MakeUnwindCont());

  /* block all signals:
   *
   * do this before the threads are created such that they inherit the mask
   */
  sigset_t block_set, prev_mask;
  sigfillset(&block_set);
  pthread_sigmask(SIG_SETMASK, &block_set, &prev_mask);

  // suspend interrupts:
  Rboolean __oldsusp__ = R_interrupts_suspended;
  R_interrupts_suspended = TRUE;

  // create threads running threaded_fun() ...

  for(;;) {
    // timed blocking check if threads are done ...

    // unblock signals, check for interrupts and run cleanfun if there is one:
    pthread_mutex_lock(&r_api_mutex);
    pthread_sigmask(SIG_SETMASK, &prev_mask, NULL);

    R_interrupts_suspended = __oldsusp__;
    if (R_interrupts_pending && ! R_interrupts_suspended) {
      R_UnwindProtect(my_onintr, NULL, cleanfun, (void *) clean_data, cont);
    }

    R_UnwindProtect(check_interrupt, NULL, cleanfun, (void *) clean_data, cont);

    R_interrupts_suspended = TRUE;

    pthread_sigmask(SIG_SETMASK, &block_set, NULL);
    pthread_mutex_unlock(&r_api_mutex);
  }

  // now all threads are dead

  UNPROTECT(1);  // continuation token

  // unblock signals:
  pthread_sigmask(SIG_SETMASK, &prev_mask, NULL);

  // reset interrupt-suspension:
  R_interrupts_suspended = __oldsusp__;
  if (R_interrupts_pending && ! R_interrupts_suspended) {
    Rf_onintr();
  }

  // ...
}


More information about the R-devel mailing list