[Rd] RFC: "loop connections"

dhinds@sonic.net dhinds at sonic.net
Tue Aug 23 01:34:15 CEST 2005


I've just implemented a generalization of R's text connections, to
also support reading/writing raw binary data.  There is very little
new code to speak of.  For input connections, I wrote code to populate
the old text connection buffer from a raw vector, and provided a new
raw_read() method.  For output connections, I wrote a raw_write() to
append to a raw vector.  On input, the mode (text or binary) is
determined by the data type of the input object; on output, I use the
requested output mode (i.e. "w" / "wb").  For example:

 > con <- loopConnection("r", "wb")
 > a <- c(10,100,1000)
 > writeBin(a, con, size=4)
 > r
  [1] 00 00 20 41 00 00 c8 42 00 00 7a 44
 > close(con)
 > con <- loopConnection(r)
 > readBin(con, "double", n=3, size=4)
 [1]   10  100 1000
 > close(con)

I think "loop connection" is a better name for this sort of connection
than "text connection" was even for the old version; that confuses the
mode of the connection (text vs binary) with the mechanism (file,
socket, etc).

I've appended a patch to the end of this message.  As implemented
here, textConnection is replaced by loopConnection but functionally
this is a superset of the old textConnection.  For compatibility, one
could add:

  textConnection <- function(...) loopConnection(...)

The patch is against R-2.1.1.  I can investigate whether any changes
are required for the current development tree.  I can also update the
documentation files as required.  I thought I'd first check whether
anyone else thought this was worth inclusion before spending more time
on it.

The raw_write() code could be improved with smarter memory allocation
(grabbing bigger chunks rather than reallocating the raw vector for
every write), but this is at least a proof of principle.

-- David Hinds



--- src/main/connections.c.orig	2005-06-17 19:05:02.000000000 -0700
+++ src/main/connections.c	2005-08-22 15:54:03.156038200 -0700
@@ -1644,13 +1644,13 @@
     return ans;
 }
 
-/* ------------------- text connections --------------------- */
+/* ------------------- loop connections --------------------- */
 
 /* read a R character vector into a buffer */
 static void text_init(Rconnection con, SEXP text)
 {
     int i, nlines = length(text), nchars = 0;
-    Rtextconn this = (Rtextconn)con->private;
+    Rloopconn this = (Rloopconn)con->private;
 
     for(i = 0; i < nlines; i++)
 	nchars += strlen(CHAR(STRING_ELT(text, i))) + 1;
@@ -1668,19 +1668,35 @@
     this->cur = this->save = 0;
 }
 
-static Rboolean text_open(Rconnection con)
+/* read a R raw vector into a buffer */
+static void raw_init(Rconnection con, SEXP raw)
+{
+    int nbytes = length(raw);
+    Rloopconn this = (Rloopconn)con->private;
+
+    this->data = (char *) malloc(nbytes);
+    if(!this->data) {
+	free(this); free(con->description); free(con->class); free(con);
+	error(_("cannot allocate memory for raw connection"));
+    }
+    memcpy(this->data, RAW(raw), nbytes);
+    this->nchars = nbytes;
+    this->cur = this->save = 0;
+}
+
+static Rboolean loop_open(Rconnection con)
 {
     con->save = -1000;
     return TRUE;
 }
 
-static void text_close(Rconnection con)
+static void loop_close(Rconnection con)
 {
 }
 
-static void text_destroy(Rconnection con)
+static void loop_destroy(Rconnection con)
 {
-    Rtextconn this = (Rtextconn)con->private;
+    Rloopconn this = (Rloopconn)con->private;
 
     free(this->data);
     /* this->cur = this->nchars = 0; */
@@ -1689,7 +1705,7 @@
 
 static int text_fgetc(Rconnection con)
 {
-    Rtextconn this = (Rtextconn)con->private;
+    Rloopconn this = (Rloopconn)con->private;
     if(this->save) {
 	int c;
 	c = this->save;
@@ -1700,48 +1716,69 @@
     else return (int) (this->data[this->cur++]);
 }
 
-static double text_seek(Rconnection con, double where, int origin, int rw)
+static double loop_seek(Rconnection con, double where, int origin, int rw)
 {
-    if(where >= 0) error(_("seek is not relevant for text connection"));
+    if(where >= 0) error(_("seek is not relevant for loop connection"));
     return 0; /* if just asking, always at the beginning */
 }
 
-static Rconnection newtext(char *description, SEXP text)
+static size_t raw_read(void *ptr, size_t size, size_t nitems,
+		       Rconnection con)
+{
+    Rloopconn this = (Rloopconn)con->private;
+    if (this->cur + size*nitems > this->nchars) {
+	nitems = (this->nchars - this->cur)/size;
+	memcpy(ptr, this->data+this->cur, size*nitems);
+	this->cur = this->nchars;
+    } else {
+	memcpy(ptr, this->data+this->cur, size*nitems);
+	this->cur += size*nitems;
+    }
+    return nitems;
+}
+
+static Rconnection newloop(char *description, SEXP data)
 {
     Rconnection new;
     new = (Rconnection) malloc(sizeof(struct Rconn));
-    if(!new) error(_("allocation of text connection failed"));
-    new->class = (char *) malloc(strlen("textConnection") + 1);
+    if(!new) error(_("allocation of loop connection failed"));
+    new->class = (char *) malloc(strlen("loopConnection") + 1);
     if(!new->class) {
 	free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }
-    strcpy(new->class, "textConnection");
+    strcpy(new->class, "loopConnection");
     new->description = (char *) malloc(strlen(description) + 1);
     if(!new->description) {
 	free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }
     init_con(new, description, "r");
     new->isopen = TRUE;
     new->canwrite = FALSE;
-    new->open = &text_open;
-    new->close = &text_close;
-    new->destroy = &text_destroy;
-    new->fgetc = &text_fgetc;
-    new->seek = &text_seek;
-    new->private = (void*) malloc(sizeof(struct textconn));
+    new->open = &loop_open;
+    new->close = &loop_close;
+    new->destroy = &loop_destroy;
+    new->seek = &loop_seek;
+    new->private = (void*) malloc(sizeof(struct loopconn));
     if(!new->private) {
 	free(new->description); free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
+    }
+    new->text = isString(data);
+    if (new->text) {
+	new->fgetc = &text_fgetc;
+	text_init(new, data);
+    } else {
+	new->read = &raw_read;
+	raw_init(new, data);
     }
-    text_init(new, text);
     return new;
 }
 
-static void outtext_close(Rconnection con)
+static void outloop_close(Rconnection con)
 {
-    Routtextconn this = (Routtextconn)con->private;
+    Routloopconn this = (Routloopconn)con->private;
     SEXP tmp;
     int idx = ConnIndex(con);
 
@@ -1755,9 +1792,9 @@
     SET_VECTOR_ELT(OutTextData, idx, R_NilValue);
 }
 
-static void outtext_destroy(Rconnection con)
+static void outloop_destroy(Rconnection con)
 {
-    Routtextconn this = (Routtextconn)con->private;
+    Routloopconn this = (Routloopconn)con->private;
     free(this->lastline); free(this);
 }
 
@@ -1765,7 +1802,7 @@
 
 static int text_vfprintf(Rconnection con, const char *format, va_list ap)
 {
-    Routtextconn this = (Routtextconn)con->private;
+    Routloopconn this = (Routloopconn)con->private;
     char buf[BUFSIZE], *b = buf, *p, *q, *vmax = vmaxget();
     int res = 0, usedRalloc = FALSE, buffree,
 	already = strlen(this->lastline);
@@ -1830,24 +1867,41 @@
     return res;
 }
 
-static void outtext_init(Rconnection con, char *mode, int idx)
+static size_t raw_write(const void *ptr, size_t size, size_t nitems,
+			Rconnection con)
+{
+    Routloopconn this = (Routloopconn)con->private;
+    SEXP tmp;
+    int idx = ConnIndex(con);
+
+    PROTECT(tmp = lengthgets(this->data, this->len + size*nitems));
+    memcpy(RAW(tmp)+this->len, ptr, size*nitems);
+    this->len += size*nitems;
+    defineVar(this->namesymbol, tmp, VECTOR_ELT(OutTextData, idx));
+    this->data = tmp;
+    UNPROTECT(1);
+    return nitems;
+}
+
+static void outloop_init(Rconnection con, char *mode, int idx)
 {
-    Routtextconn this = (Routtextconn)con->private;
+    Routloopconn this = (Routloopconn)con->private;
+    int st = (con->text ? STRSXP : RAWSXP);
     SEXP val;
 
     this->namesymbol = install(con->description);
-    if(strcmp(mode, "w") == 0) {
+    if(strncmp(mode, "w", 1) == 0) {
 	/* create variable pointed to by con->description */
-	PROTECT(val = allocVector(STRSXP, 0));
+	PROTECT(val = allocVector(st, 0));
 	defineVar(this->namesymbol, val, VECTOR_ELT(OutTextData, idx));
 	UNPROTECT(1);
     } else {
 	/* take over existing variable */
 	val = findVar1(this->namesymbol, VECTOR_ELT(OutTextData, idx),
-		       STRSXP, FALSE);
+		       st, FALSE);
 	if(val == R_UnboundValue) {
-	    warning(_("text connection: appending to a non-existent char vector"));
-	    PROTECT(val = allocVector(STRSXP, 0));
+	    warning(_("loop connection: appending to a non-existent vector"));
+	    PROTECT(val = allocVector(st, 0));
 	    defineVar(this->namesymbol, val, VECTOR_ELT(OutTextData, idx));
 	    UNPROTECT(1);
 	}
@@ -1859,49 +1913,55 @@
 }
 
 
-static Rconnection newouttext(char *description, SEXP sfile, char *mode,
+static Rconnection newoutloop(char *description, SEXP sfile, char *mode,
 			      int idx)
 {
+    int isText = (mode[1] != 'b');
     Rconnection new;
     void *tmp;
 
     new = (Rconnection) malloc(sizeof(struct Rconn));
-    if(!new) error(_("allocation of text connection failed"));
-    new->class = (char *) malloc(strlen("textConnection") + 1);
+    if(!new) error(_("allocation of loop connection failed"));
+    new->class = (char *) malloc(strlen("loopConnection") + 1);
     if(!new->class) {
 	free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }
-    strcpy(new->class, "textConnection");
+    strcpy(new->class, "loopConnection");
     new->description = (char *) malloc(strlen(description) + 1);
     if(!new->description) {
 	free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }
     init_con(new, description, mode);
+    new->text = isText;
     new->isopen = TRUE;
     new->canread = FALSE;
-    new->open = &text_open;
-    new->close = &outtext_close;
-    new->destroy = &outtext_destroy;
-    new->vfprintf = &text_vfprintf;
-    new->seek = &text_seek;
-    new->private = (void*) malloc(sizeof(struct outtextconn));
+    new->open = &loop_open;
+    new->close = &outloop_close;
+    new->destroy = &outloop_destroy;
+    new->seek = &loop_seek;
+    new->private = (void*) malloc(sizeof(struct outloopconn));
     if(!new->private) {
 	free(new->description); free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }
-    ((Routtextconn)new->private)->lastline = tmp = malloc(LAST_LINE_LEN);
+    ((Routloopconn)new->private)->lastline = tmp = malloc(LAST_LINE_LEN);
     if(!tmp) {
 	free(new->private);
 	free(new->description); free(new->class); free(new);
-	error(_("allocation of text connection failed"));
+	error(_("allocation of loop connection failed"));
     }
-    outtext_init(new, mode, idx);
+    if (isText) {
+	new->vfprintf = &text_vfprintf;
+    } else {
+	new->write = &raw_write;
+    }
+    outloop_init(new, mode, idx);
     return new;
 }
 
-SEXP do_textconnection(SEXP call, SEXP op, SEXP args, SEXP env)
+SEXP do_loopconnection(SEXP call, SEXP op, SEXP args, SEXP env)
 {
     SEXP sfile, stext, sopen, ans, class, venv;
     char *desc, *open;
@@ -1914,8 +1974,6 @@
 	error(_("invalid 'description' argument"));
     desc = CHAR(STRING_ELT(sfile, 0));
     stext = CADR(args);
-    if(!isString(stext))
-	error(_("invalid 'text' argument"));
     sopen = CADDR(args);
     if(!isString(sopen) || length(sopen) != 1)
     error(_("invalid 'open' argument"));
@@ -1924,16 +1982,20 @@
     if (!isEnvironment(venv) && venv != R_NilValue)
 	error(_("invalid 'environment' argument"));
     ncon = NextConnection();
-    if(!strlen(open) || strncmp(open, "r", 1) == 0)
-	con = Connections[ncon] = newtext(desc, stext);
-    else if (strncmp(open, "w", 1) == 0 || strncmp(open, "a", 1) == 0) {
+    if(!strlen(open) || strncmp(open, "r", 1) == 0) {
+	if(!isString(stext) && (TYPEOF(stext) != RAWSXP))
+	    error(_("invalid 'object' argument"));
+	con = Connections[ncon] = newloop(desc, stext);
+    } else if (strncmp(open, "w", 1) == 0 || strncmp(open, "a", 1) == 0) {
+	if(!isString(stext))
+	    error(_("invalid 'object' argument"));
 	if (OutTextData == NULL) {
 	    OutTextData = allocVector(VECSXP, NCONNECTIONS);
 	    R_PreserveObject(OutTextData);
 	}
 	SET_VECTOR_ELT(OutTextData, ncon, venv);
 	con = Connections[ncon] =
-	    newouttext(CHAR(STRING_ELT(stext, 0)), sfile, open, ncon);
+	    newoutloop(CHAR(STRING_ELT(stext, 0)), sfile, open, ncon);
     }
     else
 	errorcall(call, _("unsupported mode"));
@@ -1942,7 +2004,7 @@
     PROTECT(ans = allocVector(INTSXP, 1));
     INTEGER(ans)[0] = ncon;
     PROTECT(class = allocVector(STRSXP, 2));
-    SET_STRING_ELT(class, 0, mkChar("textConnection"));
+    SET_STRING_ELT(class, 0, mkChar("loopConnection"));
     SET_STRING_ELT(class, 1, mkChar("connection"));
     classgets(ans, class);
     UNPROTECT(2);
--- src/main/names.c.orig	2005-05-20 05:51:46.000000000 -0700
+++ src/main/names.c	2005-08-22 15:59:47.968828400 -0700
@@ -866,7 +866,7 @@
 {"pushBack", 	do_pushback,	0,      11,     3,      {PP_FUNCALL, PREC_FN,	0}},
 {"clearPushBackLength",do_clearpushback,0,  11,     1,      {PP_FUNCALL, PREC_FN,	0}},
 {"pushBackLength",do_pushbacklength,0,  11,     1,      {PP_FUNCALL, PREC_FN,	0}},
-{"textConnection",do_textconnection,0,	11,     4,      {PP_FUNCALL, PREC_FN,	0}},
+{"loopConnection",do_loopconnection,0,	11,     4,      {PP_FUNCALL, PREC_FN,	0}},
 {"socketConnection",do_sockconn,0,	11,     6,      {PP_FUNCALL, PREC_FN,	0}},
 {"sockSelect",do_sockselect,0,	11,     3,      {PP_FUNCALL, PREC_FN,	0}},
 {"getAllConnections",do_getallconnections,0,11, 0,      {PP_FUNCALL, PREC_FN,	0}},
--- src/include/Rconnections.h.orig	2005-04-18 04:34:02.000000000 -0700
+++ src/include/Rconnections.h	2005-08-22 15:40:02.582767400 -0700
@@ -82,19 +82,19 @@
     int cp;
 } *Rgzfileconn;
 
-typedef struct textconn {
+typedef struct loopconn {
     char *data;  /* all the data */
     int cur, nchars; /* current pos and number of chars */
     char save; /* pushback */
-} *Rtextconn;
+} *Rloopconn;
 
-typedef struct outtextconn {
+typedef struct outloopconn {
     int len;  /* number of lines */
     SEXP namesymbol;
     SEXP data;
     char *lastline;
     int lastlinelength; /* buffer size */
-} *Routtextconn;
+} *Routloopconn;
 
 typedef enum {HTTPsh, FTPsh} UrlScheme;
 
--- src/library/base/R/connections.R.orig	2005-04-18 04:34:17.000000000 -0700
+++ src/library/base/R/connections.R	2005-08-22 16:18:22.095231400 -0700
@@ -83,10 +83,10 @@
                              encoding = getOption("encoding"))
     .Internal(socketConnection(host, port, server, blocking, open, encoding))
 
-textConnection <- function(object, open = "r", local = FALSE) {
+loopConnection <- function(object, open = "r", local = FALSE) {
     if (local) env <- parent.frame()
     else env <- .GlobalEnv
-    .Internal(textConnection(deparse(substitute(object)), object, open, env))
+    .Internal(loopConnection(deparse(substitute(object)), object, open, env))
 }
 
 seek <- function(con, ...)



More information about the R-devel mailing list