[Rd] There was a problem by the use of snow.

Ei-ji Nakama nakama at ki.rim.or.jp
Mon Jun 25 12:52:43 CEST 2007


Thank you for advice.

library(snow)
cl <- makeSOCKcluster(c("localhost","localhost"))
clusterCall(cl, function(x){Sys.sleep(10);1})
clusterCall(cl, function(x){Sys.sleep(60);1}) # Timeout Code.
stopCluster(cl)

I worked in AIX and Linux.

diff -ruN R-devel.orig/src/include/Rinternals.h R-devel/src/include/Rinternals.h
--- R-devel.orig/src/include/Rinternals.h       2007-06-18
00:50:00.000000000 +0900
+++ R-devel/src/include/Rinternals.h    2007-06-25 13:08:00.000000000 +0900
@@ -725,7 +725,7 @@
     R_pstream_format_t type;
     int version;
     void (*OutChar)(R_outpstream_t, int);
-    void (*OutBytes)(R_outpstream_t, void *, int);
+    void (*OutBytes)(R_outpstream_t, void *, size_t);
     SEXP (*OutPersistHookFunc)(SEXP, SEXP);
     SEXP OutPersistHookData;
 };
@@ -735,7 +735,7 @@
     R_pstream_data_t data;
     R_pstream_format_t type;
     int (*InChar)(R_inpstream_t);
-    void (*InBytes)(R_inpstream_t, void *, int);
+    void (*InBytes)(R_inpstream_t, void *, size_t);
     SEXP (*InPersistHookFunc)(SEXP, SEXP);
     SEXP InPersistHookData;
 };
@@ -743,12 +743,12 @@
 void R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data,
                     R_pstream_format_t type,
                     int (*inchar)(R_inpstream_t),
-                    void (*inbytes)(R_inpstream_t, void *, int),
+                    void (*inbytes)(R_inpstream_t, void *, size_t),
                     SEXP (*phook)(SEXP, SEXP), SEXP pdata);
 void R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data,
                      R_pstream_format_t type, int version,
                      void (*outchar)(R_outpstream_t, int),
-                     void (*outbytes)(R_outpstream_t, void *, int),
+                     void (*outbytes)(R_outpstream_t, void *, size_t),
                      SEXP (*phook)(SEXP, SEXP), SEXP pdata);

 void R_InitFileInPStream(R_inpstream_t stream, FILE *fp,
diff -ruN R-devel.orig/src/main/serialize.c R-devel/src/main/serialize.c
--- R-devel.orig/src/main/serialize.c   2007-06-18 00:50:02.000000000 +0900
+++ R-devel/src/main/serialize.c        2007-06-25 19:32:05.000000000 +0900
@@ -1529,7 +1529,7 @@
 R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data,
                     R_pstream_format_t type,
                     int (*inchar)(R_inpstream_t),
-                    void (*inbytes)(R_inpstream_t, void *, int),
+                    void (*inbytes)(R_inpstream_t, void *, size_t),
                     SEXP (*phook)(SEXP, SEXP), SEXP pdata)
 {
     stream->data = data;
@@ -1544,7 +1544,7 @@
 R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data,
                      R_pstream_format_t type, int version,
                      void (*outchar)(R_outpstream_t, int),
-                     void (*outbytes)(R_outpstream_t, void *, int),
+                     void (*outbytes)(R_outpstream_t, void *, size_t),
                      SEXP (*phook)(SEXP, SEXP), SEXP pdata)
 {
     stream->data = data;
@@ -1574,14 +1574,14 @@
     return fgetc(fp);
 }

-static void OutBytesFile(R_outpstream_t stream, void *buf, int length)
+static void OutBytesFile(R_outpstream_t stream, void *buf, size_t length)
 {
     FILE *fp = stream->data;
     size_t out = fwrite(buf, 1, length, fp);
     if (out != length) error(_("write failed"));
 }

-static void InBytesFile(R_inpstream_t stream, void *buf, int length)
+static void InBytesFile(R_inpstream_t stream, void *buf, size_t length)
 {
     FILE *fp = stream->data;
     size_t in = fread(buf, 1, length, fp);
@@ -1629,12 +1629,12 @@
        error(_("cannot write to this connection"));
 }

-static void InBytesConn(R_inpstream_t stream, void *buf, int length)
+static void InBytesConn(R_inpstream_t stream, void *buf, size_t length)
 {
     Rconnection con = (Rconnection) stream->data;
     CheckInConn(con);
     if (con->text) {
-       int i;
+       size_t i;
        char *p = buf;
        for (i = 0; i < length; i++)
            p[i] = Rconn_fgetc(con);
@@ -1659,7 +1659,7 @@
     }
 }

-static void OutBytesConn(R_outpstream_t stream, void *buf, int length)
+static void OutBytesConn(R_outpstream_t stream, void *buf, size_t length)
 {
     Rconnection con = (Rconnection) stream->data;
     CheckOutConn(con);
@@ -1817,7 +1817,7 @@
     bb->buf[bb->count++] = c;
 }

-static void OutBytesBB(R_outpstream_t stream, void *buf, int length)
+static void OutBytesBB(R_outpstream_t stream, void *buf, size_t length)
 {
     bconbuf_t bb = stream->data;
     if (bb->count + length > BCONBUFSIZ)
@@ -1863,14 +1863,14 @@
  */

 typedef struct membuf_st {
-    int size;
-    int count;
+    size_t size;
+    size_t count;
     unsigned char *buf;
 } *membuf_t;

-static void resize_buffer(membuf_t mb, int needed)
+static void resize_buffer(membuf_t mb, size_t needed)
 {
-    int newsize = 2 * needed;
+    size_t newsize = 2 * needed;
     mb->buf = realloc(mb->buf, newsize);
     if (mb->buf == NULL)
        error(_("cannot allocate buffer"));
@@ -1885,7 +1885,7 @@
     mb->buf[mb->count++] = c;
 }

-static void OutBytesMem(R_outpstream_t stream, void *buf, int length)
+static void OutBytesMem(R_outpstream_t stream, void *buf, size_t length)
 {
     membuf_t mb = stream->data;
     if (mb->count + length > mb->size)
@@ -1902,7 +1902,7 @@
     return mb->buf[mb->count++];
 }

-static void InBytesMem(R_inpstream_t stream, void *buf, int length)
+static void InBytesMem(R_inpstream_t stream, void *buf, size_t length)
 {
     membuf_t mb = stream->data;
     if (mb->count + length > mb->size)
@@ -1912,7 +1912,7 @@
 }

 static void InitMemInPStream(R_inpstream_t stream, membuf_t mb,
-                            void *buf, int length,
+                            void *buf, size_t length,
                             SEXP (*phook)(SEXP, SEXP), SEXP pdata)
 {
     mb->count = 0;
diff -ruN R-devel.orig/src/modules/internet/sockconn.c
R-devel/src/modules/internet/sockconn.c
--- R-devel.orig/src/modules/internet/sockconn.c        2007-06-03
00:50:32.000000000 +0900
+++ R-devel/src/modules/internet/sockconn.c     2007-06-25
19:41:32.000000000 +0900
@@ -155,14 +155,19 @@
 static size_t sock_read(void *ptr, size_t size, size_t nitems,
                        Rconnection con)
 {
+    int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
+
+    R_SockTimeout(timeout);
     return sock_read_helper(con, ptr, size * nitems)/size;
 }

 static size_t sock_write(const void *ptr, size_t size, size_t nitems,
                         Rconnection con)
 {
+    int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
     Rsockconn this = (Rsockconn)con->private;

+    R_SockTimeout(timeout);
     return R_SockWrite(this->fd, ptr, size * nitems)/size;
 }



2007/6/25, Prof Brian Ripley <ripley at stats.ox.ac.uk>:
> On Mon, 25 Jun 2007, Ei-ji Nakama wrote:
>
> > problem of the very large memory require by the Sign extension.
> >
> > --- R-2.5.0.orig/src/main/serialize.c   2007-03-27 01:42:08.000000000 +0900
> > +++ R-2.5.0/src/main/serialize.c        2007-06-25 00:48:58.000000000 +0900
> > @@ -1866,7 +1866,7 @@
> >
> > static void resize_buffer(membuf_t mb, int needed)
> > {
> > -    int newsize = 2 * needed;
> > +    size_t newsize = 2 * needed;
> >     mb->buf = realloc(mb->buf, newsize);
> >     if (mb->buf == NULL)
> >        error(_("cannot allocate buffer"));
>
> Yes, thanks, but the structure also needs to be changed as the next line
> is
>
>      mb->size = newsize;
>
> and so this would set mb->size to a negative value.
>
> Could you please tell us where you encountered this?
>
> As far as I can see, this code is only used via R_serialize for
> serializing to a raw vector, in which case 'size' can safely be 'int' and
> the re-allocation should be up to 2^31-1 bytes at most (and allocating
> twice what you are asked for seems undesirable).  But there is potential
> overflow at
>
>      if (mb->count + length > mb->size)
>
> and possibly elsewhere.
>
>
> > The time-out of read and write was not set.
> >
> > 51:sendData.SOCKnode <- function(node, data) {
> > 52:     timeout <- getClusterOption("timeout")
> > 53:     old <- options(timeout = timeout);
> > 54:     on.exit(options(old))
> > 55:     serialize(data, node$con)
> > 56: }
> > 57:
> > 58:recvData.SOCKnode <- function(node) {
> > 59:     timeout <- getClusterOption("timeout")
> > 60:     old <- options(timeout = timeout);
> > 61:     on.exit(options(old))
> > 62:     unserialize(node$con)
> > 63: }
>
> I don't think sock_read/sock_write is the right place to make that
> setting.  Ideally it would be set when the option is set, but as this is
> in a module that needs an extension to the interface.
>
> Looking at the code, we read from a socket in blocks of 4096, but we write
> in a single block.  The latter is likely to be the problem, and I think
> some redesigning is necessary here.
>
> Perhaps you and Luke Tierney can comment on exactly what the problem is
> and how best to work around it.
>
> >
> > --- R-2.5.0.orig/src/modules/internet/sockconn.c 2006-09-04 23:20:59.000000000 +0900
> > +++ R-2.5.0/src/modules/internet/sockconn.c     2007-06-25
> > 00:51:38.000000000 +0900
> > @@ -155,14 +155,19 @@
> > static size_t sock_read(void *ptr, size_t size, size_t nitems,
> >                        Rconnection con)
> > {
> > +    int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
> > +
> > +    R_SockTimeout(timeout);
> >     return sock_read_helper(con, ptr, size * nitems)/size;
> > }
> >
> > static size_t sock_write(const void *ptr, size_t size, size_t nitems,
> >                         Rconnection con)
> > {
> > +    int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
> >     Rsockconn this = (Rsockconn)con->private;
> >
> > +    R_SockTimeout(timeout);
> >     return R_SockWrite(this->fd, ptr, size * nitems)/size;
> > }
> >
> >
>
> --
> Brian D. Ripley,                  ripley at stats.ox.ac.uk
> Professor of Applied Statistics,  http://www.stats.ox.ac.uk/~ripley/
> University of Oxford,             Tel:  +44 1865 272861 (self)
> 1 South Parks Road,                     +44 1865 272866 (PA)
> Oxford OX1 3TG, UK                Fax:  +44 1865 272595
>
>
>


-- 
EI-JI Nakama  <nakama at ki.rim.or.jp>
"\u4e2d\u9593\u6804\u6cbb"  <nakama at ki.rim.or.jp>



More information about the R-devel mailing list