[R-sig-hpc] Rmpi and ncdf4
clement
clement.tisseuil at gmail.com
Fri Sep 10 18:12:42 CEST 2010
Dear members,
I have started to adapt the "task pull" approach with Rmpi to
parallelize my calculations on a NetCDF file of dimension lat[1:6],
lon[1:6] and time[1:11323]. The basic idea is to parallelize the
calculation by splitting the process according to each latitudinal
coordinate i.e. for lat=1, lat=2, ..., lat=6. Please, see the code below
and the link to the NetCDF file if you want to try :
https://www.transferbigfiles.com/e4d7acd2-c186-4867-b4bf-82fe5aee4023?rid=jIcbZN%2blDcQz3kMeEHCQog%3d%3d
In the given example, the function works perfectly because the NetCDF
file is quite small (~ 2 Mo). However, some difficulties occur when I
try the function with larger NetCDF files (~ 8 Go), covering the entire
globe, of dimensions lat[1:576], lon[1:1152] and time[1:11323]. The
problem is that each processor (slave) needs to "connect" the NetCDF
file before extracting the data required according to the latitudinal
coordinate of interest. This connection takes more than 2 minutes.
I wondered if there was a possibility to avoid this problem of "time
connection" when a process is sent to a new slave. The best solution
would be to open the connection to the NetCDF file in the master
process, then to "send this connection" to the slaves. I do not know how
it works exactly, but maybe there is something to check around the ncdf4
package (which is based on the NetCDF4 library) to allow multiple
connections to a single file... If you have ideas, I am opened !
Thanks in advance.
##path_job="/scratch/ctisseuil/job/Test_DWS_Rmpi"
path_job="/media/partage/Test_DWS_Rmpi"
file.log=paste(path_job,"Test_DWS_Rmpi.res",sep="/")
system(paste("rm -rf",file.log, sep=" "))
write("file.log was initialized",file=file.log,append=T)
ObsRp=paste(path_job,"ObsRp.nc",sep="/")
## Load libraries
library(Rmpi)
library(ncdf4)
write("Libraries are loaded",file=file.log,append=T)
mpi.spawn.Rslaves()
write("Available slaves are spawn",file=file.log,append=T)
## Check the number of slaves
if (mpi.comm.size() < 2) {
mpi.close.Rslaves(); mpi.spawn.Rslaves()
if (mpi.comm.size() < 2) {
write("More slave processes are required.",file=file.log,append=T)
mpi.quit()
}
}
.Last= function(){
if (is.loaded("mpi_initialize")){
if (mpi.comm.size(1) > 0){
write("Please use mpi.close.Rslaves() to close
slaves.",file=file.log,append=T)
mpi.close.Rslaves()
}
write("Please use mpi.quit() to quit R",file=file.log,append=T)
.Call("mpi_finalize")
}
}
write("The number of slaves was checked",file=file.log,append=T)
slavefunction=function() {
## Note the use of the tag for sent messages:
## 1=ready_for_task, 2=done_task, 3=exiting
## Note the use of the tag for received messages:
## 1=task, 2=done_tasks
junk= 0
done=0
while (done != 1) {
## Signal being ready to receive a new task
mpi.send.Robj(junk,0,1)
## Receive a task
task=mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
task_info=mpi.get.sourcetag()
tag=task_info[2]
if (tag == 1) {
lat.sel=task$lat.sel
start.sel=start
count.sel=count
start.sel["lat"]=lat.sel
count.sel["lat"]=1
data=ncvar_get(file.nc,varid=var.name,start=start.sel,
count=count.sel)
##data=get.var.ncdf(file.nc,varid=var.name,start=start.lat,
count=count.lat)
file.save=paste(file.log,lat.sel,"res",sep=".")
system(paste("rm -rf",file.save,sep=" "))
##write(lat.sel,file=file.save)
results=list(lat.sel=lat.sel,result="done")
mpi.send.Robj(results,0,2)
write(paste("Process done for",lat.sel,sep=" :
"),file=file.log,append=T)
} else if (tag == 2) {
done=1
}
## We'll just ignore any unknown messages
}
mpi.send.Robj(junk,0,3)
}
write("Slave function created",file=file.log,append=T)
## Record spatial characteristics of the NetCDF file
file.nc=nc_open(ObsRp,write=F,readunlim=T)
lat=file.nc$dim$lat$vals
lat.n=length(lat)
lon.n=length(file.nc$dim$lon$vals)
tim.n=length(file.nc$dim$time$vals)
lat.id=c(1:lat.n)
start=rep(1,3)
names(start)=c("lon","lat","time")
count=start
count["lon"]=lon.n
count["time"]=tim.n
count["lat"]=lat.n
write("Record spatial characteristics",file=file.log,append=T)
## Create the vector of
status=character(lat.n)
names(status)=lat.id
write("Create the table to store results",file=file.log,append=T)
## Call the function in all the slaves to get them ready to
## undertake tasks
## Now, send the data and function to the slaves
write("Send functions and data to slaves",file=file.log,append=T)
mpi.bcast.Robj2slave(file.log)
mpi.bcast.Robj2slave(ObsRp)
mpi.bcast.Robj2slave(file.nc)
mpi.bcast.Robj2slave(count)
mpi.bcast.Robj2slave(start)
mpi.bcast.Robj2slave(slavefunction)
mpi.bcast.cmd(library(ncdf4))
mpi.bcast.cmd(library(udunits))
mpi.bcast.cmd(library(CDFt))
mpi.bcast.cmd(file.nc <- nc_open(ObsRp,write=T,readunlim=T)) ## C'est la
bonne !!!
mpi.bcast.cmd(slavefunction())
##write("Call functions in all slaves",file=file.log,append=T)
## Create task list
tasks= vector('list')
for (i in 1:length(lat.id)) {
tasks[[i]]=list(lat.sel=i)
}
junk=0
closed_slaves=0
n_slaves=mpi.comm.size()-1
while (closed_slaves < n_slaves) {
## Receive a message from a slave
message=mpi.recv.Robj(mpi.any.source(),mpi.any.tag())
message_info=mpi.get.sourcetag()
slave_id=message_info[1]
tag=message_info[2]
if (tag == 1) {
## slave is ready for a task. Give it the next task, or tell it tasks
## are done if there are none.
if (length(tasks) > 0) {
## Send a task, and then remove it from the task list
mpi.send.Robj(tasks[[1]], slave_id, 1)
tasks[[1]]=NULL
} else {
mpi.send.Robj(junk, slave_id, 2)
}
} else if (tag == 2) {
## The message contains results. Do something with the results.
## Store them in the data structure
lat.sel=message$lat.sel
status[lat.sel]=message$result
} else if (tag == 3) {
## A slave has closed down.
closed_slaves=closed_slaves + 1
}
}
mpi.bcast.cmd(nc_close(file.nc))
write("Close NetCDF connections in all slaves",file=file.log,append=T)
mpi.close.Rslaves()
write("Close slaves",file=file.log,append=T)
On 9/8/2010 9:11 AM, clement wrote:
> Dear List Members,
>
> I am using the ncdf4 package to work on General Circulation Model
> (GCM) data (NetCDF file format) and I would like to parallelize some
> calculations using Rmpi. Does anyone have an experience or advices in
> using Rmpi and ncdf4 packages?
>
--
Clément Tisseuil
More information about the R-sig-hpc
mailing list