[Rd] There was a problem by the use of snow.
Ei-ji Nakama
nakama at ki.rim.or.jp
Mon Jun 25 12:52:43 CEST 2007
Thank you for advice.
library(snow)
cl <- makeSOCKcluster(c("localhost","localhost"))
clusterCall(cl, function(x){Sys.sleep(10);1})
clusterCall(cl, function(x){Sys.sleep(60);1}) # Timeout Code.
stopCluster(cl)
I worked in AIX and Linux.
diff -ruN R-devel.orig/src/include/Rinternals.h R-devel/src/include/Rinternals.h
--- R-devel.orig/src/include/Rinternals.h 2007-06-18
00:50:00.000000000 +0900
+++ R-devel/src/include/Rinternals.h 2007-06-25 13:08:00.000000000 +0900
@@ -725,7 +725,7 @@
R_pstream_format_t type;
int version;
void (*OutChar)(R_outpstream_t, int);
- void (*OutBytes)(R_outpstream_t, void *, int);
+ void (*OutBytes)(R_outpstream_t, void *, size_t);
SEXP (*OutPersistHookFunc)(SEXP, SEXP);
SEXP OutPersistHookData;
};
@@ -735,7 +735,7 @@
R_pstream_data_t data;
R_pstream_format_t type;
int (*InChar)(R_inpstream_t);
- void (*InBytes)(R_inpstream_t, void *, int);
+ void (*InBytes)(R_inpstream_t, void *, size_t);
SEXP (*InPersistHookFunc)(SEXP, SEXP);
SEXP InPersistHookData;
};
@@ -743,12 +743,12 @@
void R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data,
R_pstream_format_t type,
int (*inchar)(R_inpstream_t),
- void (*inbytes)(R_inpstream_t, void *, int),
+ void (*inbytes)(R_inpstream_t, void *, size_t),
SEXP (*phook)(SEXP, SEXP), SEXP pdata);
void R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data,
R_pstream_format_t type, int version,
void (*outchar)(R_outpstream_t, int),
- void (*outbytes)(R_outpstream_t, void *, int),
+ void (*outbytes)(R_outpstream_t, void *, size_t),
SEXP (*phook)(SEXP, SEXP), SEXP pdata);
void R_InitFileInPStream(R_inpstream_t stream, FILE *fp,
diff -ruN R-devel.orig/src/main/serialize.c R-devel/src/main/serialize.c
--- R-devel.orig/src/main/serialize.c 2007-06-18 00:50:02.000000000 +0900
+++ R-devel/src/main/serialize.c 2007-06-25 19:32:05.000000000 +0900
@@ -1529,7 +1529,7 @@
R_InitInPStream(R_inpstream_t stream, R_pstream_data_t data,
R_pstream_format_t type,
int (*inchar)(R_inpstream_t),
- void (*inbytes)(R_inpstream_t, void *, int),
+ void (*inbytes)(R_inpstream_t, void *, size_t),
SEXP (*phook)(SEXP, SEXP), SEXP pdata)
{
stream->data = data;
@@ -1544,7 +1544,7 @@
R_InitOutPStream(R_outpstream_t stream, R_pstream_data_t data,
R_pstream_format_t type, int version,
void (*outchar)(R_outpstream_t, int),
- void (*outbytes)(R_outpstream_t, void *, int),
+ void (*outbytes)(R_outpstream_t, void *, size_t),
SEXP (*phook)(SEXP, SEXP), SEXP pdata)
{
stream->data = data;
@@ -1574,14 +1574,14 @@
return fgetc(fp);
}
-static void OutBytesFile(R_outpstream_t stream, void *buf, int length)
+static void OutBytesFile(R_outpstream_t stream, void *buf, size_t length)
{
FILE *fp = stream->data;
size_t out = fwrite(buf, 1, length, fp);
if (out != length) error(_("write failed"));
}
-static void InBytesFile(R_inpstream_t stream, void *buf, int length)
+static void InBytesFile(R_inpstream_t stream, void *buf, size_t length)
{
FILE *fp = stream->data;
size_t in = fread(buf, 1, length, fp);
@@ -1629,12 +1629,12 @@
error(_("cannot write to this connection"));
}
-static void InBytesConn(R_inpstream_t stream, void *buf, int length)
+static void InBytesConn(R_inpstream_t stream, void *buf, size_t length)
{
Rconnection con = (Rconnection) stream->data;
CheckInConn(con);
if (con->text) {
- int i;
+ size_t i;
char *p = buf;
for (i = 0; i < length; i++)
p[i] = Rconn_fgetc(con);
@@ -1659,7 +1659,7 @@
}
}
-static void OutBytesConn(R_outpstream_t stream, void *buf, int length)
+static void OutBytesConn(R_outpstream_t stream, void *buf, size_t length)
{
Rconnection con = (Rconnection) stream->data;
CheckOutConn(con);
@@ -1817,7 +1817,7 @@
bb->buf[bb->count++] = c;
}
-static void OutBytesBB(R_outpstream_t stream, void *buf, int length)
+static void OutBytesBB(R_outpstream_t stream, void *buf, size_t length)
{
bconbuf_t bb = stream->data;
if (bb->count + length > BCONBUFSIZ)
@@ -1863,14 +1863,14 @@
*/
typedef struct membuf_st {
- int size;
- int count;
+ size_t size;
+ size_t count;
unsigned char *buf;
} *membuf_t;
-static void resize_buffer(membuf_t mb, int needed)
+static void resize_buffer(membuf_t mb, size_t needed)
{
- int newsize = 2 * needed;
+ size_t newsize = 2 * needed;
mb->buf = realloc(mb->buf, newsize);
if (mb->buf == NULL)
error(_("cannot allocate buffer"));
@@ -1885,7 +1885,7 @@
mb->buf[mb->count++] = c;
}
-static void OutBytesMem(R_outpstream_t stream, void *buf, int length)
+static void OutBytesMem(R_outpstream_t stream, void *buf, size_t length)
{
membuf_t mb = stream->data;
if (mb->count + length > mb->size)
@@ -1902,7 +1902,7 @@
return mb->buf[mb->count++];
}
-static void InBytesMem(R_inpstream_t stream, void *buf, int length)
+static void InBytesMem(R_inpstream_t stream, void *buf, size_t length)
{
membuf_t mb = stream->data;
if (mb->count + length > mb->size)
@@ -1912,7 +1912,7 @@
}
static void InitMemInPStream(R_inpstream_t stream, membuf_t mb,
- void *buf, int length,
+ void *buf, size_t length,
SEXP (*phook)(SEXP, SEXP), SEXP pdata)
{
mb->count = 0;
diff -ruN R-devel.orig/src/modules/internet/sockconn.c
R-devel/src/modules/internet/sockconn.c
--- R-devel.orig/src/modules/internet/sockconn.c 2007-06-03
00:50:32.000000000 +0900
+++ R-devel/src/modules/internet/sockconn.c 2007-06-25
19:41:32.000000000 +0900
@@ -155,14 +155,19 @@
static size_t sock_read(void *ptr, size_t size, size_t nitems,
Rconnection con)
{
+ int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
+
+ R_SockTimeout(timeout);
return sock_read_helper(con, ptr, size * nitems)/size;
}
static size_t sock_write(const void *ptr, size_t size, size_t nitems,
Rconnection con)
{
+ int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
Rsockconn this = (Rsockconn)con->private;
+ R_SockTimeout(timeout);
return R_SockWrite(this->fd, ptr, size * nitems)/size;
}
2007/6/25, Prof Brian Ripley <ripley at stats.ox.ac.uk>:
> On Mon, 25 Jun 2007, Ei-ji Nakama wrote:
>
> > problem of the very large memory require by the Sign extension.
> >
> > --- R-2.5.0.orig/src/main/serialize.c 2007-03-27 01:42:08.000000000 +0900
> > +++ R-2.5.0/src/main/serialize.c 2007-06-25 00:48:58.000000000 +0900
> > @@ -1866,7 +1866,7 @@
> >
> > static void resize_buffer(membuf_t mb, int needed)
> > {
> > - int newsize = 2 * needed;
> > + size_t newsize = 2 * needed;
> > mb->buf = realloc(mb->buf, newsize);
> > if (mb->buf == NULL)
> > error(_("cannot allocate buffer"));
>
> Yes, thanks, but the structure also needs to be changed as the next line
> is
>
> mb->size = newsize;
>
> and so this would set mb->size to a negative value.
>
> Could you please tell us where you encountered this?
>
> As far as I can see, this code is only used via R_serialize for
> serializing to a raw vector, in which case 'size' can safely be 'int' and
> the re-allocation should be up to 2^31-1 bytes at most (and allocating
> twice what you are asked for seems undesirable). But there is potential
> overflow at
>
> if (mb->count + length > mb->size)
>
> and possibly elsewhere.
>
>
> > The time-out of read and write was not set.
> >
> > 51:sendData.SOCKnode <- function(node, data) {
> > 52: timeout <- getClusterOption("timeout")
> > 53: old <- options(timeout = timeout);
> > 54: on.exit(options(old))
> > 55: serialize(data, node$con)
> > 56: }
> > 57:
> > 58:recvData.SOCKnode <- function(node) {
> > 59: timeout <- getClusterOption("timeout")
> > 60: old <- options(timeout = timeout);
> > 61: on.exit(options(old))
> > 62: unserialize(node$con)
> > 63: }
>
> I don't think sock_read/sock_write is the right place to make that
> setting. Ideally it would be set when the option is set, but as this is
> in a module that needs an extension to the interface.
>
> Looking at the code, we read from a socket in blocks of 4096, but we write
> in a single block. The latter is likely to be the problem, and I think
> some redesigning is necessary here.
>
> Perhaps you and Luke Tierney can comment on exactly what the problem is
> and how best to work around it.
>
> >
> > --- R-2.5.0.orig/src/modules/internet/sockconn.c 2006-09-04 23:20:59.000000000 +0900
> > +++ R-2.5.0/src/modules/internet/sockconn.c 2007-06-25
> > 00:51:38.000000000 +0900
> > @@ -155,14 +155,19 @@
> > static size_t sock_read(void *ptr, size_t size, size_t nitems,
> > Rconnection con)
> > {
> > + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
> > +
> > + R_SockTimeout(timeout);
> > return sock_read_helper(con, ptr, size * nitems)/size;
> > }
> >
> > static size_t sock_write(const void *ptr, size_t size, size_t nitems,
> > Rconnection con)
> > {
> > + int timeout = asInteger(GetOption(install("timeout"), R_BaseEnv));
> > Rsockconn this = (Rsockconn)con->private;
> >
> > + R_SockTimeout(timeout);
> > return R_SockWrite(this->fd, ptr, size * nitems)/size;
> > }
> >
> >
>
> --
> Brian D. Ripley, ripley at stats.ox.ac.uk
> Professor of Applied Statistics, http://www.stats.ox.ac.uk/~ripley/
> University of Oxford, Tel: +44 1865 272861 (self)
> 1 South Parks Road, +44 1865 272866 (PA)
> Oxford OX1 3TG, UK Fax: +44 1865 272595
>
>
>
--
EI-JI Nakama <nakama at ki.rim.or.jp>
"\u4e2d\u9593\u6804\u6cbb" <nakama at ki.rim.or.jp>
More information about the R-devel
mailing list