[R-sig-hpc] apparent failure of mpi.isend.Robj (Rmpi)

Ross Boylan ross at biostat.ucsf.edu
Thu Mar 6 23:15:37 CET 2014

I have a distributed application that includes 2 simulators and 1
assembler.  simulators send results to the assembler.

Except that they don't.  Communication between these and other
components mostly works, but not to the assembler (messages from master
also don't reach the assembler).  Thinking the assembler had some
problem, I replaced it with a stub that simply prints a message when it
gets a message.

Standard scenario: no messages received by assembler.

Scenario 2: I replaced the send function in one simulator with a
defective function (that caused the process to fail when it tried to
send).  In this case, messages from the other simulator were received.
(I didn't mean to have a broken function).

Scenario 3: I captured the objects being sent (that was what the
replacement send function was supposed to do) and then tried to send
them manually.  First I used mpi.send; that worked.  Then I used
mpi.isend; that failed.  Then I did mpi.send and 2 messages were receivd
by the assembler (because of some double echoing it's hard to be sure
there really were 2, but I think so.)  So it appeared as if mpi.send
unstuck the previous message.

Thinking there might be buffering going on, I did 20 isends.  Nothing
was received.  Then I did a send, and I never got the command prompt
back.  The objects being sent are a bit involved (lists of lists of
reference class instances), but small (the entire log capture of sends
was only about 70k on disk).

I suspected I might have overwritten the standard mpi.isend.Robj, but
can find no evidence of that (it's not in the global namespace, when I
print it the display says in namespace:Rmpi, and no active code
reassigns it).

I have also tried to isolate components outside of the distributed
system, but they behave properly when I do so.

At this point I think I'll try building a current mpi library and see if
that helps.

If anyone has any thoughts, I'm all ears.

R 3.0.2, Debian squeeze, openmpi-bin 1.4.2-4 (I think that's the
standard one with squeeze), Rmpi 0.6.3 built in my personal directory.

# r is the object the simulator was trying to send
> mpi.send.Robj(r, 1, 4)
# ESS is echoing every command
mpi.send.Robj(r, 1, 4)
# The "Fake Assembler" message is emited by the fake assembler
# (at rank 1) when it receives a message.  It prints sender, tag, class of object
> Fake Assembler: 0 4 list
mpi.isend.Robj(r, 1, 4)
mpi.isend.Robj(r, 1, 4)
# nothing happened but I thought that might have been because
# cursor was not on the command line.  So I tried again.
> mpi.isend.Robj(r, 1, 4)
mpi.isend.Robj(r, 1, 4)
# still no sign message received
# try to verify I'm using the real mpi.isend.Robj
> mpi.isend.Robj
function (obj, dest, tag, comm = 1, request = 0)
mpi.isend(x = serialize(obj, NULL), type = 4, dest = dest, tag = tag,
    comm = comm, request = request)
<environment: namespace:Rmpi>
# now back to send
> mpi.send.Robj(r, 1, 4)
mpi.send.Robj(r, 1, 4)
Fake Assembler: 0 4 list
> Fake Assembler: 0 4 list
Fake Assembler: 0 4 list

# apparently 2 or 3 messages received
# try to fill buffer and force transmission
> for (i in 1:20) mpi.isend.Robj(r, 1, 4)
for (i in 1:20) mpi.isend.Robj(r, 1, 4)
# no joy.  Try send to flush it out
> mpi.send.Robj(r, 1, 4)
mpi.send.Robj(r, 1, 4)
# never came back
  C-c C-corterun: killing job...

The child processes are called by distributing the following function to
them and then invoking it:

stemcell is the startup function to intialize the child processes.
Though it's far fromo self-contained, it might be illuminating.  In
particular, there are some games with the sending and receiving
functions the simulators use.  My guess/hope is that when the stemcell
function is exported the "mpi.isend.Robj" becomes a reference to the
function in the Rmpi namespace on the remote machine.
# to the appropriate type                                                                                                             
# rSim <integer> ranks of the simulator processes                                                                                     
# rCoef <list>                                                                                                                        
#    rCoef[[m]] is <integer> ranks of coefficient servers for model m <integer>                                                       
# rAssembler <integer> rank of Assembler (singleton) (the one not getting messages)                                                                                 
# iParams indices of parameter sets to analyze                                                                                        
stemcell <- function(rSim, rCoef, rAssembler, iParams) {                                                                              
    nSet <- length(iParams)                                                                                                           
    nChunk <- nSet/dbox.option$chunk                                                                                                  
    r <- mpi.comm.rank()                                                                                                              
    #if (! r %in% unlist(rCoef)) return(0)  # hack for testing                                                                        
    if (r %in% rSim) {                                                                                                                
        # temporary for debugging                                                                                                     
        recvfn <- mpi.recv.Robj                                                                                                       
        sendfn <- mpi.isend.Robj                                                                                                      
        if (r == 3L) {                                                                                                                
            recvfn <- makeLoggingReceive()                                                                                            
            sendfn <- makeLoggingSend()                                                                                               
        db <- dbox.sim(rCoef, rAssembler,                                                                                             
                       list(b=sim.b5.3.gen3, c=sim.c1.gen3, d=sim.d1.gen3),                                                           
                       maxExpectedLog = ceiling((2*nChunk+3.2*nSet)/length(rSim) + 15),                                               
    } else if (r %in% rAssembler) {                                                                                                   
        # assume each result is communicated separately                                                                               
        recvfn <- makeLoggingReceive("recv-assembler.rdata")                                                                          
        db <- fake.assembler("/local/ross/KHC/sunbelt/testsim", ceiling(2.5*nSet),                                                    
    } else {                                                                                                                          
        nlog <- function(servers) {                                                                                                   
            # return estimated log size for each coefficient server                                                                   
            # current design asks for each coefficient set separately                                                                 
            # since servers are allocated randomly to simulators                                                                      
            # we allow (via length(servers)-2) for some unevenness                                                                    
            ceiling(2.2*nSet/max(length(servers-2), 1) + 3)                                                                           
        if (r %in% rCoef[[1]])                                                                                                        
            # args are class source file, data file, coefficients                                                                     
            # we are now calling this b1 because that is the outcome variable                                                         
            # but the files are called a1                                                                                             
            db <- dbox.coef("simulator.b5.3.R", "s.b5.3", nlog(rCoef[[1]]))                                                           
        else if (r %in% rCoef[[2]])                                                                                                   
            db <- dbox.coef("simulator.multinomial.R", "s.multi", nlog(rCoef[[2]]))                                                   
        else if (r %in% rCoef[[3]])                                                                                                   
            db <- dbox.coef("simulator.d1.R", "s.d1", nlog(rCoef[[3]]))                                                               
            stop(paste("Rank", r, "has no assigned task"))                                                                            

When dbox.master initializes it does
  mpi.bcast.cmd(stemcell, rSim=mySim, rCoef=myCoef, rAssembler=myAssembler, iParams=myParams)
Ross Boyla

More information about the R-sig-hpc mailing list