[R-sig-hpc] Rmpi and ncdf4

clement clement.tisseuil at gmail.com
Fri Sep 10 18:12:42 CEST 2010


Dear members,

I have started to adapt the "task pull" approach with Rmpi to 
parallelize my calculations on a NetCDF file of dimension lat[1:6], 
lon[1:6] and time[1:11323]. The basic idea is to parallelize the 
calculation by splitting the process according to each latitudinal 
coordinate i.e. for lat=1, lat=2, ..., lat=6. Please, see the code below 
and the link to the NetCDF file if you want to try : 
https://www.transferbigfiles.com/e4d7acd2-c186-4867-b4bf-82fe5aee4023?rid=jIcbZN%2blDcQz3kMeEHCQog%3d%3d

In the given example, the function works perfectly because the NetCDF 
file is quite small (~ 2 Mo). However, some difficulties occur when I 
try the function with larger NetCDF files (~ 8 Go), covering the entire 
globe, of dimensions lat[1:576], lon[1:1152] and time[1:11323]. The 
problem is that each processor (slave) needs to "connect" the NetCDF 
file before extracting the data required according to the latitudinal 
coordinate of interest. This connection takes more than 2 minutes.

I wondered if there was a possibility to avoid this problem of "time 
connection"  when a process is sent to a new slave. The best solution 
would be to open the connection to the NetCDF file in the master 
process, then to "send this connection" to the slaves. I do not know how 
it works exactly, but maybe there is something to check around the ncdf4 
package (which is based on the NetCDF4 library) to allow multiple 
connections to a single file... If you have ideas, I am opened !

Thanks in advance.

##path_job="/scratch/ctisseuil/job/Test_DWS_Rmpi"
path_job="/media/partage/Test_DWS_Rmpi"
file.log=paste(path_job,"Test_DWS_Rmpi.res",sep="/")
system(paste("rm -rf",file.log, sep=" "))
write("file.log was initialized",file=file.log,append=T)
ObsRp=paste(path_job,"ObsRp.nc",sep="/")

## Load libraries
library(Rmpi)
library(ncdf4)
write("Libraries are loaded",file=file.log,append=T)

mpi.spawn.Rslaves()
write("Available slaves are spawn",file=file.log,append=T)

## Check the number of slaves
if (mpi.comm.size() < 2) {
   mpi.close.Rslaves(); mpi.spawn.Rslaves()
   if (mpi.comm.size() < 2) {
     write("More slave processes are required.",file=file.log,append=T)
     mpi.quit()
   }
}
.Last= function(){
   if (is.loaded("mpi_initialize")){
     if (mpi.comm.size(1) > 0){
       write("Please use mpi.close.Rslaves() to close 
slaves.",file=file.log,append=T)
       mpi.close.Rslaves()
     }
     write("Please use mpi.quit() to quit R",file=file.log,append=T)
     .Call("mpi_finalize")
   }
}
write("The number of slaves was checked",file=file.log,append=T)

slavefunction=function() {
   ## Note the use of the tag for sent messages:
   ##     1=ready_for_task, 2=done_task, 3=exiting
   ## Note the use of the tag for received messages:
   ##     1=task, 2=done_tasks
   junk= 0
   done=0
   while (done != 1) {
     ## Signal being ready to receive a new task
     mpi.send.Robj(junk,0,1)
     ## Receive a task
     task=mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
     task_info=mpi.get.sourcetag()
     tag=task_info[2]
     if (tag == 1) {
       lat.sel=task$lat.sel
       start.sel=start
       count.sel=count
       start.sel["lat"]=lat.sel
       count.sel["lat"]=1
       data=ncvar_get(file.nc,varid=var.name,start=start.sel, 
count=count.sel)
       ##data=get.var.ncdf(file.nc,varid=var.name,start=start.lat, 
count=count.lat)
       file.save=paste(file.log,lat.sel,"res",sep=".")
       system(paste("rm -rf",file.save,sep=" "))
       ##write(lat.sel,file=file.save)
       results=list(lat.sel=lat.sel,result="done")
       mpi.send.Robj(results,0,2)
       write(paste("Process done for",lat.sel,sep=" : 
"),file=file.log,append=T)
     } else if (tag == 2) {
       done=1
     }
     ## We'll just ignore any unknown messages
   }
   mpi.send.Robj(junk,0,3)
}
write("Slave function created",file=file.log,append=T)

## Record spatial characteristics of the NetCDF file
file.nc=nc_open(ObsRp,write=F,readunlim=T)
lat=file.nc$dim$lat$vals
lat.n=length(lat)
lon.n=length(file.nc$dim$lon$vals)
tim.n=length(file.nc$dim$time$vals)
lat.id=c(1:lat.n)
start=rep(1,3)
names(start)=c("lon","lat","time")
count=start
count["lon"]=lon.n
count["time"]=tim.n
count["lat"]=lat.n
write("Record spatial characteristics",file=file.log,append=T)

## Create the vector of
status=character(lat.n)
names(status)=lat.id
write("Create the table to store results",file=file.log,append=T)

## Call the function in all the slaves to get them ready to
## undertake tasks
## Now, send the data and function to the slaves

write("Send functions and data to slaves",file=file.log,append=T)
mpi.bcast.Robj2slave(file.log)
mpi.bcast.Robj2slave(ObsRp)
mpi.bcast.Robj2slave(file.nc)
mpi.bcast.Robj2slave(count)
mpi.bcast.Robj2slave(start)
mpi.bcast.Robj2slave(slavefunction)

mpi.bcast.cmd(library(ncdf4))
mpi.bcast.cmd(library(udunits))
mpi.bcast.cmd(library(CDFt))
mpi.bcast.cmd(file.nc <- nc_open(ObsRp,write=T,readunlim=T)) ## C'est la 
bonne !!!
mpi.bcast.cmd(slavefunction())
##write("Call functions in all slaves",file=file.log,append=T)

## Create task list
tasks= vector('list')
for (i in 1:length(lat.id)) {
   tasks[[i]]=list(lat.sel=i)
}

junk=0
closed_slaves=0
n_slaves=mpi.comm.size()-1

while (closed_slaves < n_slaves) {
   ## Receive a message from a slave
   message=mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
   message_info=mpi.get.sourcetag()
   slave_id=message_info[1]
   tag=message_info[2]
   if (tag == 1) {
     ## slave is ready for a task. Give it the next task, or tell it tasks
     ## are done if there are none.
     if (length(tasks) > 0) {
       ## Send a task, and then remove it from the task list
       mpi.send.Robj(tasks[[1]], slave_id, 1)
       tasks[[1]]=NULL
     } else {
       mpi.send.Robj(junk, slave_id, 2)
     }
   } else if (tag == 2) {
     ## The message contains results. Do something with the results.
     ## Store them in the data structure
     lat.sel=message$lat.sel
     status[lat.sel]=message$result

   } else if (tag == 3) {
     ## A slave has closed down.
     closed_slaves=closed_slaves + 1
   }
}

mpi.bcast.cmd(nc_close(file.nc))
write("Close NetCDF connections in all slaves",file=file.log,append=T)
mpi.close.Rslaves()
write("Close slaves",file=file.log,append=T)


On 9/8/2010 9:11 AM, clement wrote:
> Dear List Members,
>
> I am using the ncdf4 package to work on General Circulation Model 
> (GCM) data (NetCDF file format) and I would like to parallelize some 
> calculations using Rmpi. Does anyone have an experience or advices in 
> using Rmpi and ncdf4 packages?
>

-- 
Clément Tisseuil



More information about the R-sig-hpc mailing list