[R-sig-hpc] Rmpi and ncdf4

Kasper Daniel Hansen kasperdanielhansen at gmail.com
Fri Sep 10 18:57:18 CEST 2010


I suggest you start by looking through the netcdf documentation and
ask about this on their mailing lists.  It seems to me that the
problem is one of multiple processes accessing the same file
simultaneously.  I would guess that the netcdf people know whether it
is possible or not, and also what the interface should be (my guess is
that there is an interface).  The next step is then to figure out
whether ncdf4 implements this interface.

Kasper

On Fri, Sep 10, 2010 at 12:12 PM, clement <clement.tisseuil at gmail.com> wrote:
> Dear members,
>
> I have started to adapt the "task pull" approach with Rmpi to parallelize my
> calculations on a NetCDF file of dimension lat[1:6], lon[1:6] and
> time[1:11323]. The basic idea is to parallelize the calculation by splitting
> the process according to each latitudinal coordinate i.e. for lat=1, lat=2,
> ..., lat=6. Please, see the code below and the link to the NetCDF file if
> you want to try :
> https://www.transferbigfiles.com/e4d7acd2-c186-4867-b4bf-82fe5aee4023?rid=jIcbZN%2blDcQz3kMeEHCQog%3d%3d
>
> In the given example, the function works perfectly because the NetCDF file
> is quite small (~ 2 Mo). However, some difficulties occur when I try the
> function with larger NetCDF files (~ 8 Go), covering the entire globe, of
> dimensions lat[1:576], lon[1:1152] and time[1:11323]. The problem is that
> each processor (slave) needs to "connect" the NetCDF file before extracting
> the data required according to the latitudinal coordinate of interest. This
> connection takes more than 2 minutes.
>
> I wondered if there was a possibility to avoid this problem of "time
> connection"  when a process is sent to a new slave. The best solution would
> be to open the connection to the NetCDF file in the master process, then to
> "send this connection" to the slaves. I do not know how it works exactly,
> but maybe there is something to check around the ncdf4 package (which is
> based on the NetCDF4 library) to allow multiple connections to a single
> file... If you have ideas, I am opened !
>
> Thanks in advance.
>
> ##path_job="/scratch/ctisseuil/job/Test_DWS_Rmpi"
> path_job="/media/partage/Test_DWS_Rmpi"
> file.log=paste(path_job,"Test_DWS_Rmpi.res",sep="/")
> system(paste("rm -rf",file.log, sep=" "))
> write("file.log was initialized",file=file.log,append=T)
> ObsRp=paste(path_job,"ObsRp.nc",sep="/")
>
> ## Load libraries
> library(Rmpi)
> library(ncdf4)
> write("Libraries are loaded",file=file.log,append=T)
>
> mpi.spawn.Rslaves()
> write("Available slaves are spawn",file=file.log,append=T)
>
> ## Check the number of slaves
> if (mpi.comm.size() < 2) {
>  mpi.close.Rslaves(); mpi.spawn.Rslaves()
>  if (mpi.comm.size() < 2) {
>    write("More slave processes are required.",file=file.log,append=T)
>    mpi.quit()
>  }
> }
> .Last= function(){
>  if (is.loaded("mpi_initialize")){
>    if (mpi.comm.size(1) > 0){
>      write("Please use mpi.close.Rslaves() to close
> slaves.",file=file.log,append=T)
>      mpi.close.Rslaves()
>    }
>    write("Please use mpi.quit() to quit R",file=file.log,append=T)
>    .Call("mpi_finalize")
>  }
> }
> write("The number of slaves was checked",file=file.log,append=T)
>
> slavefunction=function() {
>  ## Note the use of the tag for sent messages:
>  ##     1=ready_for_task, 2=done_task, 3=exiting
>  ## Note the use of the tag for received messages:
>  ##     1=task, 2=done_tasks
>  junk= 0
>  done=0
>  while (done != 1) {
>    ## Signal being ready to receive a new task
>    mpi.send.Robj(junk,0,1)
>    ## Receive a task
>    task=mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
>    task_info=mpi.get.sourcetag()
>    tag=task_info[2]
>    if (tag == 1) {
>      lat.sel=task$lat.sel
>      start.sel=start
>      count.sel=count
>      start.sel["lat"]=lat.sel
>      count.sel["lat"]=1
>      data=ncvar_get(file.nc,varid=var.name,start=start.sel, count=count.sel)
>      ##data=get.var.ncdf(file.nc,varid=var.name,start=start.lat,
> count=count.lat)
>      file.save=paste(file.log,lat.sel,"res",sep=".")
>      system(paste("rm -rf",file.save,sep=" "))
>      ##write(lat.sel,file=file.save)
>      results=list(lat.sel=lat.sel,result="done")
>      mpi.send.Robj(results,0,2)
>      write(paste("Process done for",lat.sel,sep=" :
> "),file=file.log,append=T)
>    } else if (tag == 2) {
>      done=1
>    }
>    ## We'll just ignore any unknown messages
>  }
>  mpi.send.Robj(junk,0,3)
> }
> write("Slave function created",file=file.log,append=T)
>
> ## Record spatial characteristics of the NetCDF file
> file.nc=nc_open(ObsRp,write=F,readunlim=T)
> lat=file.nc$dim$lat$vals
> lat.n=length(lat)
> lon.n=length(file.nc$dim$lon$vals)
> tim.n=length(file.nc$dim$time$vals)
> lat.id=c(1:lat.n)
> start=rep(1,3)
> names(start)=c("lon","lat","time")
> count=start
> count["lon"]=lon.n
> count["time"]=tim.n
> count["lat"]=lat.n
> write("Record spatial characteristics",file=file.log,append=T)
>
> ## Create the vector of
> status=character(lat.n)
> names(status)=lat.id
> write("Create the table to store results",file=file.log,append=T)
>
> ## Call the function in all the slaves to get them ready to
> ## undertake tasks
> ## Now, send the data and function to the slaves
>
> write("Send functions and data to slaves",file=file.log,append=T)
> mpi.bcast.Robj2slave(file.log)
> mpi.bcast.Robj2slave(ObsRp)
> mpi.bcast.Robj2slave(file.nc)
> mpi.bcast.Robj2slave(count)
> mpi.bcast.Robj2slave(start)
> mpi.bcast.Robj2slave(slavefunction)
>
> mpi.bcast.cmd(library(ncdf4))
> mpi.bcast.cmd(library(udunits))
> mpi.bcast.cmd(library(CDFt))
> mpi.bcast.cmd(file.nc <- nc_open(ObsRp,write=T,readunlim=T)) ## C'est la
> bonne !!!
> mpi.bcast.cmd(slavefunction())
> ##write("Call functions in all slaves",file=file.log,append=T)
>
> ## Create task list
> tasks= vector('list')
> for (i in 1:length(lat.id)) {
>  tasks[[i]]=list(lat.sel=i)
> }
>
> junk=0
> closed_slaves=0
> n_slaves=mpi.comm.size()-1
>
> while (closed_slaves < n_slaves) {
>  ## Receive a message from a slave
>  message=mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
>  message_info=mpi.get.sourcetag()
>  slave_id=message_info[1]
>  tag=message_info[2]
>  if (tag == 1) {
>    ## slave is ready for a task. Give it the next task, or tell it tasks
>    ## are done if there are none.
>    if (length(tasks) > 0) {
>      ## Send a task, and then remove it from the task list
>      mpi.send.Robj(tasks[[1]], slave_id, 1)
>      tasks[[1]]=NULL
>    } else {
>      mpi.send.Robj(junk, slave_id, 2)
>    }
>  } else if (tag == 2) {
>    ## The message contains results. Do something with the results.
>    ## Store them in the data structure
>    lat.sel=message$lat.sel
>    status[lat.sel]=message$result
>
>  } else if (tag == 3) {
>    ## A slave has closed down.
>    closed_slaves=closed_slaves + 1
>  }
> }
>
> mpi.bcast.cmd(nc_close(file.nc))
> write("Close NetCDF connections in all slaves",file=file.log,append=T)
> mpi.close.Rslaves()
> write("Close slaves",file=file.log,append=T)
>
>
> On 9/8/2010 9:11 AM, clement wrote:
>>
>> Dear List Members,
>>
>> I am using the ncdf4 package to work on General Circulation Model (GCM)
>> data (NetCDF file format) and I would like to parallelize some calculations
>> using Rmpi. Does anyone have an experience or advices in using Rmpi and
>> ncdf4 packages?
>>
>
> --
> Clément Tisseuil
>
> _______________________________________________
> R-sig-hpc mailing list
> R-sig-hpc at r-project.org
> https://stat.ethz.ch/mailman/listinfo/r-sig-hpc
>



More information about the R-sig-hpc mailing list