Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Thread pool #138

Open
wants to merge 8 commits into
base: develop
Choose a base branch
from
66 changes: 63 additions & 3 deletions source/dhtslib/file/file.d
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import std.array : array;
import dhtslib.memory;
import dhtslib.file.iterator;
import dhtslib : initKstring;
import dhtslib.threadpool;
import htslib;

enum HtslibFileFormatMode
Expand Down Expand Up @@ -108,6 +109,8 @@ struct HtslibFile
/// if it has been loaded
off_t headerOffset = -1;

ThreadPool pool;

/// allow HtslibFile to be used as
/// underlying ptr type
alias getFilePtr this;
Expand Down Expand Up @@ -172,10 +175,19 @@ struct HtslibFile
return newFile;
}

/// set extra multithreading
void setExtraThreads(int extra)
/// sets number of threads via global thread pool
/// if number is -1 sets to number of CPUs
void setThreads(int threads)
{
enforceGlobalThreadPool(threads);
this.setThreadPool(globalPool);
}

/// set multithreading pool
void setThreadPool(ThreadPool pool)
{
hts_set_threads(this.fp, extra);
this.pool = pool;
hts_set_thread_pool(this.fp, &this.pool.htsPool);
}

/// get file offset
Expand Down Expand Up @@ -577,6 +589,7 @@ debug(dhtslib_unittest) unittest

auto f = HtslibFile("/tmp/htslibfile.test.sam");
f.loadHeader;
f.setThreads(4);
auto h = f.bamHdr;
auto read = f.readRecord!Bam1();
assert(fromStringz(bam_get_qname(read)) == "HS18_09653:4:1315:19857:61712");
Expand Down Expand Up @@ -613,32 +626,39 @@ debug(dhtslib_unittest) unittest
import std.path:buildPath,dirName;
auto fn = buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf.gz");
{
ThreadPool pool = ThreadPool(8);
auto f = HtslibFile(fn);
f.loadHeader;
f.setThreadPool(pool);
auto h = f.bcfHdr;
auto read = f.readRecord!Bcf1();

f = HtslibFile("/tmp/htslibfile.test.vcf", HtslibFileWriteMode.Vcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);

f = HtslibFile("/tmp/htslibfile.test.bcf", HtslibFileWriteMode.Bcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);

f = HtslibFile("/tmp/htslibfile.test.ubcf", HtslibFileWriteMode.UncompressedBcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);

f = HtslibFile("/tmp/htslibfile.test.vcf.gz", HtslibFileWriteMode.GzippedVcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);

f = HtslibFile("/tmp/htslibfile.test.vcf.bgz", HtslibFileWriteMode.BgzippedVcf);
f.setThreadPool(pool);
f.setHeader(h);
f.writeHeader;
f.write(read);
Expand Down Expand Up @@ -676,4 +696,44 @@ debug(dhtslib_unittest) unittest
read = f.readRecord!Bcf1();
assert(read.pos == 3000149);
}
}

debug(dhtslib_unittest) unittest
{
hts_log_info(__FUNCTION__, "Testing HtslibFile parallel processing");
import std.path:buildPath,dirName;
auto fn = buildPath(dirName(dirName(dirName(dirName(__FILE__)))),"htslib","test","tabix","vcf_file.vcf.gz");
{

ThreadPool pool = ThreadPool(8);
auto rdr = HtslibFile(fn);
rdr.setThreadPool(pool);
rdr.loadHeader;
auto h = rdr.bcfHdr;

auto wtr = HtslibFile("/tmp/htslibfile.test_parallel.vcf", HtslibFileWriteMode.Vcf);
wtr.setThreadPool(pool);
wtr.setHeader(h);
wtr.writeHeader;

foreach(x; rdr.byRecord!Bcf1().parallelMap!((x) { x.pos++; return x;})(&pool)) {
wtr.write(x);
}
}

{
auto f = HtslibFile(fn);
f.loadHeader;
auto h = f.bcfHdr;
auto read = f.readRecord!Bcf1();
assert(read.pos == 3000149);

auto f2 = HtslibFile("/tmp/htslibfile.test_parallel.vcf");
f2.loadHeader;
h = f2.bcfHdr;
auto read2 = f2.readRecord!Bcf1();
assert(read2.pos == 3000150);

}

}
2 changes: 1 addition & 1 deletion source/dhtslib/file/iterator.d
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ if(is(T == Bam1) || is(T == Bcf1) || is(T == Kstring))
/// If you keep the result around it should be duplicated
T front()
{
return rec;
return rec.dup;
charlesgregory marked this conversation as resolved.
Show resolved Hide resolved
}

/// popFront to move range forward
Expand Down
26 changes: 20 additions & 6 deletions source/dhtslib/memory.d
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ void hts_log_errorNoGC(const(char)[] ctx)( string msg) @trusted @nogc nothrow
/// pointer and reference counts it and then
/// destroys with destroyFun when it goes
/// truly out of scope
struct SafeHtslibPtr(T, alias destroyFun)
struct SafeHtslibPtr(T, alias destroyFun, alias dupFun = void)
if(!isPointer!T && isSomeFunction!destroyFun)
{
@safe @nogc nothrow:
Expand Down Expand Up @@ -77,6 +77,12 @@ if(!isPointer!T && isSomeFunction!destroyFun)
return ptr;
}

static if(isSomeFunction!dupFun) {
auto dup() @trusted return scope {
return typeof(this)(dupFun(this.ptr));
}
}

/// dtor that respects scope
~this() @trusted return scope
{
Expand Down Expand Up @@ -106,19 +112,19 @@ if(!isPointer!T && isSomeFunction!destroyFun)

/// reference counted bam1_t wrapper
/// can be used directly as a bam1_t *
alias Bam1 = SafeHtslibPtr!(bam1_t, bam_destroy1);
alias Bam1 = SafeHtslibPtr!(bam1_t, bam_destroy1, bam_dup1);

/// reference counted bam_hdr_t wrapper
/// can be used directly as a bam_hdr_t *
alias BamHdr = SafeHtslibPtr!(bam_hdr_t, bam_hdr_destroy);
alias BamHdr = SafeHtslibPtr!(bam_hdr_t, bam_hdr_destroy, bam_hdr_dup);

/// reference counted bcf1_t wrapper
/// can be used directly as a bcf1_t *
alias Bcf1 = SafeHtslibPtr!(bcf1_t, bcf_destroy);
alias Bcf1 = SafeHtslibPtr!(bcf1_t, bcf_destroy, bcf_dup);

/// reference counted bcf_hdr_t wrapper
/// can be used directly as a bcf_hdr_t *
alias BcfHdr = SafeHtslibPtr!(bcf_hdr_t, bcf_hdr_destroy);
alias BcfHdr = SafeHtslibPtr!(bcf_hdr_t, bcf_hdr_destroy, bcf_hdr_dup);

/// reference counted htsFile wrapper
/// can be used directly as a htsFile *
Expand Down Expand Up @@ -150,10 +156,18 @@ alias Faidx = SafeHtslibPtr!(faidx_t, fai_destroy);

/// reference counted Kstring wrapper
/// can be used directly as a kstring_t *
alias Kstring = SafeHtslibPtr!(kstring_t, ks_free);
alias Kstring = SafeHtslibPtr!(kstring_t, ks_free, ks_dup);

alias HtsItrMulti = HtsItr;

/// reference counted hts_tpool wrapper
/// can be used directly as a hts_tpool *
alias HtsTPool = SafeHtslibPtr!(hts_tpool, hts_tpool_destroy);

/// reference counted hts_tpool_process wrapper
/// can be used directly as a hts_tpool_process *
alias HtsProcess = SafeHtslibPtr!(hts_tpool_process, hts_tpool_process_destroy);

debug(dhtslib_unittest) unittest
{
auto rc1 = Bam1(bam_init1);
Expand Down
Loading