[Rd] There was a problem by the use of snow.

Prof Brian Ripley ripley at stats.ox.ac.uk
Mon Jun 25 14:49:52 CEST 2007


We don't need to change as much as this.  The OutBytes function is only 
called for small objects apart from CHARSXPs:  the latter are limited to
2^31 - 1 bytes.  So 'length' in a read/write will never exceed INT_MAX.

In the current usage, the in-memory code is only used to write to a RAWSXP
which is also limited to 2^31 - 1 bytes, so we only need to deal with 
signed overflows if we do things like double the buffer size, and that's 
pointless as the result will overflow the RAWSXP.

On Mon, 25 Jun 2007, Ei-ji Nakama wrote:

> Thank you for advice.
>
> library(snow)
> cl <- makeSOCKcluster(c("localhost","localhost"))
> clusterCall(cl, function(x){Sys.sleep(10);1})
> clusterCall(cl, function(x){Sys.sleep(60);1}) # Timeout Code.
> stopCluster(cl)
>
> I worked in AIX and Linux.
>
> diff -ruN R-devel.orig/src/include/Rinternals.h 
> R-devel/src/include/Rinternals.h
> --- R-devel.orig/src/include/Rinternals.h       2007-06-18
> 00:50:00.000000000 +0900
> +++ R-devel/src/include/Rinternals.h    2007-06-25 13:08:00.000000000 +0900
> @@ -725,7 +725,7 @@
>    R_pstream_format_t type;
>    int version;
>    void (*OutChar)(R_outpstream_t, int);
> -    void (*OutBytes)(R_outpstream_t, void *, int);
> +    void (*OutBytes)(R_outpstream_t, void *, size_t);
>    SEXP (*OutPersistHookFunc)(SEXP, SEXP);
>    SEXP OutPersistHookData;
> };
> @@ -735,7 +735,7 @@
>    R_pstream_data_t data;
>    R_pstream_format_t type;
>    int (*InChar)(R_inpstream_t);
> -    void (*InBytes)(R_inpstream_t, void *, int);
> +    void (*InBytes)(R_inpstream_t, void *, size_t);
>    SEXP (*InPersistHookFunc)(SEXP, SEXP);
>    SEXP InPersistHookData;
> };
> @@ -743,12 +743,12 @@
> void R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data,
>                    R_pstream_format_t type,
>                    int (*inchar)(R_inpstream_t),
> -                    void (*inbytes)(R_inpstream_t, void *, int),
> +                    void (*inbytes)(R_inpstream_t, void *, size_t),
>                    SEXP (*phook)(SEXP, SEXP), SEXP pdata);
> void R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data,
>                     R_pstream_format_t type, int version,
>                     void (*outchar)(R_outpstream_t, int),
> -                     void (*outbytes)(R_outpstream_t, void *, int),
> +                     void (*outbytes)(R_outpstream_t, void *, size_t),
>                     SEXP (*phook)(SEXP, SEXP), SEXP pdata);
>
> void R_InitFileInPStream(R_inpstream_t stream, FILE *fp,
> diff -ruN R-devel.orig/src/main/serialize.c R-devel/src/main/serialize.c
> --- R-devel.orig/src/main/serialize.c   2007-06-18 00:50:02.000000000 +0900
> +++ R-devel/src/main/serialize.c        2007-06-25 19:32:05.000000000 +0900
> @@ -1529,7 +1529,7 @@
> R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data,
>                    R_pstream_format_t type,
>                    int (*inchar)(R_inpstream_t),
> -                    void (*inbytes)(R_inpstream_t, void *, int),
> +                    void (*inbytes)(R_inpstream_t, void *, size_t),
>                    SEXP (*phook)(SEXP, SEXP), SEXP pdata)
> {
>    stream->data = data;
> @@ -1544,7 +1544,7 @@
> R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data,
>                     R_pstream_format_t type, int version,
>                     void (*outchar)(R_outpstream_t, int),
> -                     void (*outbytes)(R_outpstream_t, void *, int),
> +                     void (*outbytes)(R_outpstream_t, void *, size_t),
>                     SEXP (*phook)(SEXP, SEXP), SEXP pdata)
> {
>    stream->data = data;
> @@ -1574,14 +1574,14 @@
>    return fgetc(fp);
> }
>
> -static void OutBytesFile(R_outpstream_t stream, void *buf, int length)
> +static void OutBytesFile(R_outpstream_t stream, void *buf, size_t length)
> {
>    FILE *fp = stream->data;
>    size_t out = fwrite(buf, 1, length, fp);
>    if (out != length) error(_("write failed"));
> }
>
> -static void InBytesFile(R_inpstream_t stream, void *buf, int length)
> +static void InBytesFile(R_inpstream_t stream, void *buf, size_t length)
> {
>    FILE *fp = stream->data;
>    size_t in = fread(buf, 1, length, fp);
> @@ -1629,12 +1629,12 @@
>       error(_("cannot write to this connection"));
> }
>
> -static void InBytesConn(R_inpstream_t stream, void *buf, int length)
> +static void InBytesConn(R_inpstream_t stream, void *buf, size_t length)
> {
>    Rconnection con = (Rconnection) stream->data;
>    CheckInConn(con);
>    if (con->text) {
> -       int i;
> +       size_t i;
>       char *p = buf;
>       for (i = 0; i < length; i++)
>           p[i] = Rconn_fgetc(con);
> @@ -1659,7 +1659,7 @@
>    }
> }
>
> -static void OutBytesConn(R_outpstream_t stream, void *buf, int length)
> +static void OutBytesConn(R_outpstream_t stream, void *buf, size_t length)
> {
>    Rconnection con = (Rconnection) stream->data;
>    CheckOutConn(con);
> @@ -1817,7 +1817,7 @@
>    bb->buf[bb->count++] = c;
> }
>
> -static void OutBytesBB(R_outpstream_t stream, void *buf, int length)
> +static void OutBytesBB(R_outpstream_t stream, void *buf, size_t length)
> {
>    bconbuf_t bb = stream->data;
>    if (bb->count + length > BCONBUFSIZ)
> @@ -1863,14 +1863,14 @@
> */
>
> typedef struct membuf_st {
> -    int size;
> -    int count;
> +    size_t size;
> +    size_t count;
>    unsigned char *buf;
> } *membuf_t;
>
> -static void resize_buffer(membuf_t mb, int needed)
> +static void resize_buffer(membuf_t mb, size_t needed)
> {
> -    int newsize = 2 * needed;
> +    size_t newsize = 2 * needed;
>    mb->buf = realloc(mb->buf, newsize);
>    if (mb->buf == NULL)
>       error(_("cannot allocate buffer"));
> @@ -1885,7 +1885,7 @@
>    mb->buf[mb->count++] = c;
> }
>
> -static void OutBytesMem(R_outpstream_t stream, void *buf, int length)
> +static void OutBytesMem(R_outpstream_t stream, void *buf, size_t length)
> {
>    membuf_t mb = stream->data;
>    if (mb->count + length > mb->size)
> @@ -1902,7 +1902,7 @@
>    return mb->buf[mb->count++];
> }
>
> -static void InBytesMem(R_inpstream_t stream, void *buf, int length)
> +static void InBytesMem(R_inpstream_t stream, void *buf, size_t length)
> {
>    membuf_t mb = stream->data;
>    if (mb->count + length > mb->size)
> @@ -1912,7 +1912,7 @@
> }
>
> static void InitMemInPStream(R_inpstream_t stream, membuf_t mb,
> -                            void *buf, int length,
> +                            void *buf, size_t length,
>                            SEXP (*phook)(SEXP, SEXP), SEXP pdata)
> {
>    mb->count = 0;
> diff -ruN R-devel.orig/src/modules/internet/sockconn.c
> R-devel/src/modules/internet/sockconn.c
> --- R-devel.orig/src/modules/internet/sockconn.c        2007-06-03
> 00:50:32.000000000 +0900
> +++ R-devel/src/modules/internet/sockconn.c     2007-06-25
> 19:41:32.000000000 +0900
> @@ -155,14 +155,19 @@
> static size_t sock_read(void *ptr, size_t size, size_t nitems,
>                       Rconnection con)
> {
> +    int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
> +
> +    R_SockTimeout(timeout);
>    return sock_read_helper(con, ptr, size * nitems)/size;
> }
>
> static size_t sock_write(const void *ptr, size_t size, size_t nitems,
>                        Rconnection con)
> {
> +    int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
>    Rsockconn this = (Rsockconn)con->private;
>
> +    R_SockTimeout(timeout);
>    return R_SockWrite(this->fd, ptr, size * nitems)/size;
> }
>
>
>
> 2007/6/25, Prof Brian Ripley <ripley at stats.ox.ac.uk>:
>> On Mon, 25 Jun 2007, Ei-ji Nakama wrote:
>> 
>> > problem of the very large memory require by the Sign extension.
>> >
>> > --- R-2.5.0.orig/src/main/serialize.c   2007-03-27 01:42:08.000000000 
>> +0900
>> > +++ R-2.5.0/src/main/serialize.c        2007-06-25 00:48:58.000000000 
>> +0900
>> > @@ -1866,7 +1866,7 @@
>> >
>> > static void resize_buffer(membuf_t mb, int needed)
>> > {
>> > -    int newsize = 2 * needed;
>> > +    size_t newsize = 2 * needed;
>> >     mb->buf = realloc(mb->buf, newsize);
>> >     if (mb->buf == NULL)
>> >        error(_("cannot allocate buffer"));
>> 
>> Yes, thanks, but the structure also needs to be changed as the next line
>> is
>>
>>      mb->size = newsize;
>> 
>> and so this would set mb->size to a negative value.
>> 
>> Could you please tell us where you encountered this?
>> 
>> As far as I can see, this code is only used via R_serialize for
>> serializing to a raw vector, in which case 'size' can safely be 'int' and
>> the re-allocation should be up to 2^31-1 bytes at most (and allocating
>> twice what you are asked for seems undesirable).  But there is potential
>> overflow at
>>
>>      if (mb->count + length > mb->size)
>> 
>> and possibly elsewhere.
>> 
>> 
>> > The time-out of read and write was not set.
>> >
>> > 51:sendData.SOCKnode <- function(node, data) {
>> > 52:     timeout <- getClusterOption("timeout")
>> > 53:     old <- options(timeout = timeout);
>> > 54:     on.exit(options(old))
>> > 55:     serialize(data, node$con)
>> > 56: }
>> > 57:
>> > 58:recvData.SOCKnode <- function(node) {
>> > 59:     timeout <- getClusterOption("timeout")
>> > 60:     old <- options(timeout = timeout);
>> > 61:     on.exit(options(old))
>> > 62:     unserialize(node$con)
>> > 63: }
>> 
>> I don't think sock_read/sock_write is the right place to make that
>> setting.  Ideally it would be set when the option is set, but as this is
>> in a module that needs an extension to the interface.
>> 
>> Looking at the code, we read from a socket in blocks of 4096, but we write
>> in a single block.  The latter is likely to be the problem, and I think
>> some redesigning is necessary here.
>> 
>> Perhaps you and Luke Tierney can comment on exactly what the problem is
>> and how best to work around it.
>> 
>> >
>> > --- R-2.5.0.orig/src/modules/internet/sockconn.c 2006-09-04 
>> 23:20:59.000000000 +0900
>> > +++ R-2.5.0/src/modules/internet/sockconn.c     2007-06-25
>> > 00:51:38.000000000 +0900
>> > @@ -155,14 +155,19 @@
>> > static size_t sock_read(void *ptr, size_t size, size_t nitems,
>> >                        Rconnection con)
>> > {
>> > +    int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
>> > +
>> > +    R_SockTimeout(timeout);
>> >     return sock_read_helper(con, ptr, size * nitems)/size;
>> > }
>> >
>> > static size_t sock_write(const void *ptr, size_t size, size_t nitems,
>> >                         Rconnection con)
>> > {
>> > +    int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
>> >     Rsockconn this = (Rsockconn)con->private;
>> >
>> > +    R_SockTimeout(timeout);
>> >     return R_SockWrite(this->fd, ptr, size * nitems)/size;
>> > }
>> >
>> >
>> 
>> --
>> Brian D. Ripley,                  ripley at stats.ox.ac.uk
>> Professor of Applied Statistics,  http://www.stats.ox.ac.uk/~ripley/
>> University of Oxford,             Tel:  +44 1865 272861 (self)
>> 1 South Parks Road,                     +44 1865 272866 (PA)
>> Oxford OX1 3TG, UK                Fax:  +44 1865 272595
>> 
>> 
>> 
>
>
>

-- 
Brian D. Ripley,                  ripley at stats.ox.ac.uk
Professor of Applied Statistics,  http://www.stats.ox.ac.uk/~ripley/
University of Oxford,             Tel:  +44 1865 272861 (self)
1 South Parks Road,                     +44 1865 272866 (PA)
Oxford OX1 3TG, UK                Fax:  +44 1865 272595



More information about the R-devel mailing list