[Bioc-devel] Parallel processing of reads in a single fastq file
Ryan
rct at thompsonclan.org
Wed Aug 6 21:18:31 CEST 2014
Hi Jeff,
See my replies below inline.
On 8/6/14, 7:16 AM, Johnston, Jeffrey wrote:
> Hi,
>
> I have been using FastqStreamer() and yield() to process a large fastq file in chunks, modifying both the read and name and then appending the output to a new fastq file as each chunk is processed. This works great, but would benefit greatly from being parallelized.
>
> As far as I can tell, this problem isn’t easily solved with the existing parallel tools because you can’t determine how many jobs you’ll need in advance (you just call yield() until it stops returning reads).
That's right, we don't currently have anything in R like e.g. Python's
multiprocessing.Pool:
https://docs.python.org/2/library/multiprocessing.html#using-a-pool-of-workers
Although we wouldn't want something exactly like that, because Python's
implementation exhausts the input stream as fast as possible and buffers
all the results in memory if they are not consumed quickly enough, as
described here:
http://stackoverflow.com/questions/5318936/python-multiprocessing-pool-lazy-iteration
I think it would be ideal to have a function that takes an input stream,
a BPPARAM, and a maximum number of chunks to buffer, and returns another
input stream of the results.
>
> After some digging, I found the sclapply() function in the HTSeqGenie package by Gregoire Pau, which he describes as a “multicore dispatcher”:
>
> https://stat.ethz.ch/pipermail/bioc-devel/2013-October/004754.html
>
> I wasn’t able to get the package to install from source due to some dependencies (there are no binaries for Mac), but I did extract the function and adapt it slightly for my use case. Here’s an example:
>
> processChunk <- function(fq_chunk) {
> # manipulate fastq reads here
> }
>
> yieldHelper <- function() {
> fq <- yield(fqstream)
> if(length(fq) == 0) return(NULL)
> fq
> }
>
> fqstream <- FastqStreamer(“…”, n=1e6)
> sclapply(yieldHelper, processChunk, max.parallel.jobs=4)
> close(fqstream)
>
> Based on the discussion linked above, it seems like there was some interest in integrating this idea into BiocParallel. I would find that very useful as it improves performance quite a bit and can likely be applied to numerous stream-based processing tasks.
>
> I will point out that in my case above, the processChunk() function doesn’t return anything. Instead it appends the modified fastq records to a new file. I have to use the Unix lockfile command to ensure that only one child process appends to the output file at a time. I am not certain if there is a more elegant solution to this (perhaps a queue that is emptied by a dedicated writer process?).
This is the problem, I think. A general solution would allow you to
stream the processed results back into R, where you could pass them into
another stream filter, or finally consume them. That was the idea behind
the example code that I demonstrated, but my code worked a little
differently, in that the task of *reading* the fastq file was delegated
to a subprocess. So my solution also doesn't generalize to multi-step
parallel pipelines.
>
> Thanks,
> Jeff
>
-Ryan
More information about the Bioc-devel
mailing list