Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add multi-threading support for mul!, add! and tsvd! #100

Open
wants to merge 4 commits into
base: master
Choose a base branch
from

Conversation

Qiaoyi-Li
Copy link

I modified 3 methods mul!, add! and _compute_svddata! by adding a keyword argument numthreads::Int64 to support multi-threading parallel computation. The behaviors will not change if numthreads == 1.

I did some benchmark tests. The behaviors are different between openblas(julia's default) and mkl(using MKL with an intel's cpu) as backend .

  • openblas: nested multi-threading (i.e. both distributing sectors and using multi-threading BLAS functions) seems to be forbidden by julia. Nevertheless, parallelizing sectors or basic BLAS operations share similar performance.
  • mkl: nested multi-threading is supported, which significantly accelerates the computation when using enough cpu cores.

Note added: I found parallelized add! seems to exhibit worse performance, since it is not expansive enough compared with the multi-threading overhead, just a guess. Therefore, I set the default numthreads == 1 specially.

@Jutho
Copy link
Owner

Jutho commented Jan 11, 2024

Thanks for this; we were actually just discussing about this today. I will have to study the implementation a bit. It seems like it is certainly breaking the tests. Maybe it is just type inference that is broken as a result of this, but a quick glance also reveals other error messages.

One major question in this is indeed how the different levels of parallelism combine (as you point out, it already depends on things like which BLAS library is being used), and what the best interface is. If you add this to a keyword argument to each of the functions, it can only be used by higher level algorithms if they explicitly set this argument. Another approach would be to set the number of tasks that can be spawned/threads that can be used, as a global setting in the package. This is what Strided.jl currently does. Then, a user can change this and it would directly affect any algorithm written using TensorKit.jl, such as e.g. all the algorithms in MPSKit.jl (which also use multithreading at a higher level).

@Qiaoyi-Li
Copy link
Author

--- About implementation. The simplest way to implement multi-threading is to use Threads.@sync and Threads.@spawn. I noticed that TensorKit.jl has used this solution to parallelize some functions e.g. _add_abelian_kernel!. However, it will submit all tasks at the same time and lead to a memory cost proportional to the problem's size . Therefore i choose to implement a producer-consumer mode manually with Channels, which results in a memory cost proportional to the number of threads used. It indeed works, although i am not sure if there are some more elegant approaches.

Moreover, sorting by size will improve the efficiency if the complexity of different tasks fluctuate a lot. However, my detailed implementation needs to collect the iterator of blocksectors and leads to a 'no method' error for tensors over elementary spaces. I quickly fixed it by modifying the condition to apply multi-threading, which is natural as these tensors have single block and cannot be parallelized.

--- About interface. I agree that keyword argument is not a convenient way to control multi-threading. Binding it with Threads.nthreads() is a possible solution, which is chosen in _add_abelian_kernel!. To go further, setting a global variable and add functions like BLAS.set_num_threads is better, in my opinion.

@Jutho
Copy link
Owner

Jutho commented Jan 12, 2024

@Qiaoyi-Li , maybe we should indeed discuss the direction we want to go here, before you invest more time in this. I very much welcome your efforts and help with this, and I agree that we definitely want to support multithreading over the different blocks, and that simply binding with Threads.nthreads() is too limited.

I have been looking at ThreadPools.jl recently, which seems like it could be very useful exactly for this purpose. Maybe we can schedule an online meeting somewhere soon. I think it would be useful to include @lkdvos and @maartenvd to also discuss the interaction with the threading going on in the higher level algorithms (MPSKit.jl). Would all of you be available, e.g. next Friday (January 19th), and if so, during which time slots?

@Jutho
Copy link
Owner

Jutho commented Jan 19, 2024

Linking this here, as this discussion is also very relevant when doing benchmarking with mul! and svd!:

https://carstenbauer.github.io/ThreadPinning.jl/stable/explanations/blas/

@Jutho
Copy link
Owner

Jutho commented Jan 19, 2024

@Qiaoyi-Li , as a follow up of the discussion we had, could you share the space information of the tensors that you were multiplying in your benchmark? Simply space(tensor1) and space(tensor1) should be all we need to benchmark this particular use case. The same for svd, where it is just a single tensor.

@Qiaoyi-Li
Copy link
Author

@Qiaoyi-Li , as a follow up of the discussion we had, could you share the space information of the tensors that you were multiplying in your benchmark? Simply space(tensor1) and space(tensor1) should be all we need to benchmark this particular use case. The same for svd, where it is just a single tensor.

Sure, the following show the results of space(A) and space(B) where A and B are used to benchmark A*B:

space(A)
(Rep[U₁ × SU₂]((-1, 0)=>15, (-3, 0)=>214, (-5, 0)=>525, (-7, 0)=>334, (-9, 0)=>38, (-2, 1/2)=>144, (-4, 1/2)=>765, (-6, 1/2)=>950, (-8, 1/2)=>293, (-10, 1/2)=>7, (-1, 1)=>30, (-3, 1)=>459, (-5, 1)=>1134, (-7, 1)=>721, (-9, 1)=>79, (-2, 3/2)=>121, (-4, 3/2)=>696, (-6, 3/2)=>868, (-8, 3/2)=>248, (-10, 3/2)=>3, (-1, 2)=>7, (-3, 2)=>211, (-5, 2)=>548, (-7, 2)=>332, (-9, 2)=>23, (-2, 5/2)=>24, (-4, 5/2)=>174, (-6, 5/2)=>227, (-8, 5/2)=>48, (-3, 3)=>20, (-5, 3)=>73, (-7, 3)=>39, (-4, 7/2)=>10, (-6, 7/2)=>12) ⊗ Rep[U₁ × SU₂]((1, 0)=>1, (-1, 0)=>1, (0, 1/2)=>1)) ← Rep[U₁ × SU₂]((0, 0)=>1, (-2, 0)=>98, (-4, 0)=>455, (-6, 0)=>510, (-8, 0)=>133, (-10, 0)=>2, (-1, 1/2)=>40, (-3, 1/2)=>487, (-5, 1/2)=>1042, (-7, 1/2)=>597, (-9, 1/2)=>57, (0, 1)=>1, (-2, 1)=>206, (-4, 1)=>936, (-6, 1)=>1062, (-8, 1)=>276, (-10, 1)=>2, (-1, 3/2)=>31, (-3, 3/2)=>409, (-5, 3/2)=>940, (-7, 3/2)=>519, (-9, 3/2)=>34, (-2, 2)=>72, (-4, 2)=>437, (-6, 2)=>485, (-8, 2)=>100, (-3, 5/2)=>100, (-5, 5/2)=>239, (-7, 5/2)=>115, (-9, 5/2)=>2, (-2, 3)=>4, (-4, 3)=>55, (-6, 3)=>55, (-8, 3)=>6, (-3, 7/2)=>4, (-5, 7/2)=>10, (-7, 7/2)=>2)

space(B)
Rep[U₁ × SU₂]((0, 0)=>1, (-2, 0)=>98, (-4, 0)=>455, (-6, 0)=>510, (-8, 0)=>133, (-10, 0)=>2, (-1, 1/2)=>40, (-3, 1/2)=>487, (-5, 1/2)=>1042, (-7, 1/2)=>597, (-9, 1/2)=>57, (0, 1)=>1, (-2, 1)=>206, (-4, 1)=>936, (-6, 1)=>1062, (-8, 1)=>276, (-10, 1)=>2, (-1, 3/2)=>31, (-3, 3/2)=>409, (-5, 3/2)=>940, (-7, 3/2)=>519, (-9, 3/2)=>34, (-2, 2)=>72, (-4, 2)=>437, (-6, 2)=>485, (-8, 2)=>100, (-3, 5/2)=>100, (-5, 5/2)=>239, (-7, 5/2)=>115, (-9, 5/2)=>2, (-2, 3)=>4, (-4, 3)=>55, (-6, 3)=>55, (-8, 3)=>6, (-3, 7/2)=>4, (-5, 7/2)=>10, (-7, 7/2)=>2) ← (Rep[U₁ × SU₂]((1, 0)=>1, (-1, 0)=>1, (0, 1/2)=>1)' ⊗ Rep[U₁ × SU₂]((-1, 0)=>26, (-3, 0)=>261, (-5, 0)=>530, (-7, 0)=>272, (-9, 0)=>20, (0, 1/2)=>2, (-2, 1/2)=>200, (-4, 1/2)=>850, (-6, 1/2)=>875, (-8, 1/2)=>207, (-10, 1/2)=>2, (-1, 1)=>47, (-3, 1)=>567, (-5, 1)=>1154, (-7, 1)=>590, (-9, 1)=>40, (-2, 3/2)=>172, (-4, 3/2)=>783, (-6, 3/2)=>807, (-8, 3/2)=>172, (-1, 2)=>16, (-3, 2)=>265, (-5, 2)=>566, (-7, 2)=>275, (-9, 2)=>11, (-2, 5/2)=>35, (-4, 5/2)=>201, (-6, 5/2)=>213, (-8, 5/2)=>31, (-3, 3)=>26, (-5, 3)=>79, (-7, 3)=>31, (-4, 7/2)=>12, (-6, 7/2)=>12))

The tensor used in tsvd is just A*B, thus codomain(A) and domain(B) are its spaces.

@Jutho
Copy link
Owner

Jutho commented Jan 30, 2024

@Qiaoyi-Li, I have already played a little bit with your implementation for multithreading the mul! implementation, and comparing it with alternatives. I very much like your implementation, it is very flexible in allowing to configure the number of threads used, and at the same time performs very favourably in comparison to alternatives. In fact, the ThreadPools.jl solution seems to come with a substantial overhead, which is something about which I will file a report.

However, so far I did not manage to actually make this multithreaded mul! version faster than the single-threaded version, where all the multithreading is spent in BLAS/MKL. I have only tested this on a rather small (10-core) workstation. Could you share again the benchmarks you were showing?

Irrespective of this, I am likely to merge this PR, and then modify it a bit more, so that this infrastructure is in place and can more easily be experimented with.

@Qiaoyi-Li
Copy link
Author

@Qiaoyi-Li, I have already played a little bit with your implementation for multithreading the mul! implementation, and comparing it with alternatives. I very much like your implementation, it is very flexible in allowing to configure the number of threads used, and at the same time performs very favourably in comparison to alternatives. In fact, the ThreadPools.jl solution seems to come with a substantial overhead, which is something about which I will file a report.

However, so far I did not manage to actually make this multithreaded mul! version faster than the single-threaded version, where all the multithreading is spent in BLAS/MKL. I have only tested this on a rather small (10-core) workstation. Could you share again the benchmarks you were showing?

Irrespective of this, I am likely to merge this PR, and then modify it a bit more, so that this infrastructure is in place and can more easily be experimented with.

Here are the two pages in my slides
benchmark_mul
benchmark_svd

Yeah, it seems that spending more cores to BLAS/MKL is more efficient when cpu cores number is limited to a small value [see the comparison between the first column and first row in the above tables]. However, the ratio of speeding up by MKL will saturate and therefore the nested multithreading is important when using enough cores [see 2 $\times$ 4 vs 1 $\times$ 8 for svd]. In fact, I am just trying to complete the benchmark tests , e.g. using more cores for mul! and trying more multi-threading implementations. I will update this PR as soon as I obtain some meaningful results.

@lkdvos
Copy link
Collaborator

lkdvos commented Jan 30, 2024

It might be interesting to try this for the cases where the number of blocks is rather large, but the blocks themselves are on the smaller side. In that case, I'd expect the blas threads to saturate faster too.

@Jutho
Copy link
Owner

Jutho commented Jan 30, 2024

Great. I also have another question. I am very much intrigued by your way of distributing jobs across a fixed number of task, namely with the channel which you then feed a number of nothing entries to terminate the tasks. Is this an approach you found somewhere, or is this something you came up with yourself?

@Qiaoyi-Li
Copy link
Author

Great. I also have another question. I am very much intrigued by your way of distributing jobs across a fixed number of task, namely with the channel which you then feed a number of nothing entries to terminate the tasks. Is this an approach you found somewhere, or is this something you came up with yourself?

It is just a naive solution from myself, and it may lead to the risk of dead loops. However, as a julia beginner, I don't know how to implement the requirement in a more robust way to make sure the tasks will always terminate even an error happens.

@Qiaoyi-Li
Copy link
Author

I tried more implementations and did some preliminary benchmarks, in which I found using channels together with a descend sorting is indeed the fastest one. Moreover, feeding nothing is not necessary. Just changing the loop condition from while true to for c in ch will work. No dead loop occurs, at least in my local tests.

Interestingly, using atomic_add to get the index of sectors [details please see the attached codes] can also make sure each sector will be distributed to one and only one task and has similar performance. This may also be a possible solution.

More implementations and benchmark scripts I used are attached below, please feel free to use them if interested.
benchmark_scripts.zip

task = Threads.@spawn for c in ch
U, Σ, V = MatrixAlgebra.svd!(blocks(t)[c], alg)

# note inserting keys to dict is not thread safe
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid the lock? In principle it should be possible to "pre-allocate" the dictionaries, as in this case we know all keys in advance, and also that every entry will be visited just once

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way to avoid the lock? In principle it should be possible to "pre-allocate" the dictionaries, as in this case we know all keys in advance, and also that every entry will be visited just once

This is a good idea to avoid locks. I implement it as the following code,

    # preallocate
    for c in lsc
        sz = size(block(t, c))
        dims[c] = min(sz[1], sz[2])
        Udata[c] = similar(block(t, c), sz[1], dims[c])
        Vdata[c] = similar(block(t, c), dims[c], sz[2])
        Σdata[c] = similar(block(t, c), dims[c])
    end

    # producer
    taskref = Ref{Task}()
    ch = Channel(; taskref=taskref, spawn=true) do ch
        for c in lsc
            put!(ch, c)
        end
    end

    # consumers
    tasks = map(1:ntasks) do _
        task = Threads.@spawn for c in ch
            U, Σ, V = MatrixAlgebra.svd!(block(t, c), alg)
            copyto!(Udata[c], U)
            copyto!(Vdata[c], V)
            copyto!(Σdata[c], Σ)
        end
        return errormonitor(task)
    end

However, I found it will not change the time cost but lead to some additional memory allocations due to the temporary local variables U, Σ, V. I also tried

      Udata[c][:], Σdata[c][:], Vdata[c][:] = MatrixAlgebra.svd!(block(t, c), alg)

Unfortunately, this naive try cannot solve the problem because the compiler does not optimize this to an inplace one, see the following example,

julia> Meta.@lower U[:], S[:], V[:] = svd!(A)
:($(Expr(:thunk, CodeInfo(
    @ none within `top-level scope`
1%1 = svd!(A)
│   %2 = Base.indexed_iterate(%1, 1)
│   %3 = Core.getfield(%2, 1)
│        #s49 = Core.getfield(%2, 2)%5 = Base.indexed_iterate(%1, 2, #s49)%6 = Core.getfield(%5, 1)
│        #s49 = Core.getfield(%5, 2)%8 = Base.indexed_iterate(%1, 3, #s49)%9 = Core.getfield(%8, 1)
│        Base.setindex!(U, %3, :)
│        Base.setindex!(S, %6, :)
│        Base.setindex!(V, %9, :)
└──      return %1
))))

In conclusion, I think the original implementation is better. If you have any idea to avoid the additional allocations, please tell me. I will be happy to learn it.

@Jutho
Copy link
Owner

Jutho commented Feb 8, 2024

I have a different implementation without the lock. I am not behind my computer right know but will send it later today.

@lkdvos
Copy link
Collaborator

lkdvos commented Feb 8, 2024

Thanks for sending the benchmark files by the way!
Below is a version without the lock, making use of the fact that the dictionaries have just a sorted vector of keys and values, and knowing that you only visit them once. However, it does not seem to really make any kind of noticeable difference in my (rather limited) testing.
In any case, I would guess that the majority of the time is spent in the SVD algorithm anyways, so the amount of time that the lock is actually blocking seems negligeable

function _compute_svddata_channel!(t::TensorMap, alg::Union{SVD,SDD};
                                   ntasks::Int64=Threads.nthreads(),
                                   sort::Bool=true)
    InnerProductStyle(t) === EuclideanProduct() || throw_invalid_innerproduct(:tsvd!)
    I = sectortype(t)
    A = storagetype(t)
    A′ = Vector{real(scalartype(t))}
    lsc = blocksectors(t)
    
    Udata = SectorDict{I,A}(lsc, similar(lsc, A))
    Vdata = SectorDict{I,A}(lsc, similar(lsc, A))
    dims = SectorDict{I,Int}(lsc, similar(lsc, Int))
    Σdata = SectorDict{I,A′}(lsc, similar(lsc, A′))
    
    # sort sectors by size
    if sort && lsc isa AbstractVector
        lsD3 = map(lsc) do c
            # O(D1^2D2) or O(D1D2^2)
            return min(size(block(t, c), 1)^2 * size(block(t, c), 2),
                       size(block(t, c), 1) * size(block(t, c), 2)^2)
        end
        lsc = lsc[sortperm(lsD3; rev=true)]
    end

    # producer
    taskref = Ref{Task}()
    ch = Channel(; taskref=taskref, spawn=true) do ch
        for c in lsc
            put!(ch, c)
        end
    end

    # consumers
    tasks = map(1:ntasks) do _
        task = Threads.@spawn for c in ch
            U, Σ, V = MatrixAlgebra.svd!(block(t, c), alg)

            Udata[c] = U
            Vdata[c] = V
            Σdata[c] = Σ
            dims[c] = length(Σ)
        end
        return errormonitor(task)
    end

    wait.(tasks)
    wait(taskref[])
    return Udata, Σdata, Vdata, dims
end

@Jutho
Copy link
Owner

Jutho commented Feb 9, 2024

So, some quick comments, although it is already quite late here, so I hope I don't make any mistakes that lead to confusion.

scripts.zip

So the attachment contains two scripts, namely mul.jl and svd.jl.

For mul!, there are 5 multithreaded implementations

  • mul_threaded1! is essentially your original implementation, slightly rewritten.
  • mul_threaded2! is more or less the same, but removes the step which adds blocksectors to the channel according to decreasing cost, so just in the order they are returned by blocksectors, to assess the effect of this extra step
  • mul_threaded3! uses the costs, but just statically distributes blocksectors across the different tasks in such a way as to even out as good as possible the total (theoretical) cost per task.
  • mul_threaded4! does the same as 3 but uses @threads instead of @spawn and @sync
  • mul_threaded5! does the same as 3 and 4 but uses @batch from Polyester.jl instead of `@threads

For the specific problem and on my computer, for a fixed number of nthreads * num_blas_threads, I found that 2 is the worst (so the sorting does help), but then 3 and 4 are slightly faster than 1 (so not using the channel for dynamically distributing blocks reduced the overhead). However, all of those were always slower than just sequentially spending all threads in BLAS. The only approach by which I managed to sometimes beat the sequential approach where BLAS uses all the threads is with mul_threaded5!, which uses the lightweight tasks from Polyester.jl. BLAS/MKL gemm! is so well optimised that any reasonable amount of overhead immediately has a significant effect.

For svd!, the situation is very different as in that case the time spent in BLAS/LAPACK/MKL is a lot bigger, and in that case it is a lot easier to actually get some benefit from multithreading over the blocks. There, there is only a single implementation of _compute_svddata_threaded1! (because I was still working on this), which is basically based on your original implementation but modified so as to not need locks etc.

Also, since the svd! method on a single block allocates space for the U, S and V arrays, you want to use those, and not preallocate space and copy the results into that, as this means you would be allocating twice. This is avoided with my implementation.

@Qiaoyi-Li
Copy link
Author

I further tested these newest implementations in my linux machine. Moreover, I assume the size of sectors follow a gaussian distribution so I can use $\sigma$ to choose the two different situations, i.e. a few quite large sectors ($\sigma \ll 1$) or a lot of relatively small sectors ($\sigma \gg 1$). The results are consistent with our previous guess, parallelizing BLAS prefers the former and vice versa, see the following 2 tables.

  • Results obtained via TimerOutputs.jl, total D = 2^16, $\sigma = 0.5$. It can be obviously seen the more BLAS threads (so the less tasks) used, the better the performance.
 atomic(16×1)            4     355s   17.1%   88.7s   45.5KiB   11.6%  11.4KiB
 channel(16×1)           4     349s   16.8%   87.1s   62.2KiB   15.8%  15.6KiB
 static(16×1)            4     342s   16.5%   85.5s   31.8KiB    8.1%  7.95KiB
 atomic(8×2)             4     172s    8.3%   42.9s   26.3KiB    6.7%  6.58KiB
 static(8×2)             4     172s    8.3%   42.9s   23.8KiB    6.1%  5.95KiB
 channel(8×2)            4     171s    8.3%   42.9s   51.5KiB   13.1%  12.9KiB
 channel(4×4)            4    86.0s    4.1%   21.5s   44.2KiB   11.2%  11.0KiB
 static(4×4)             4    86.0s    4.1%   21.5s   19.8KiB    5.0%  4.95KiB
 atomic(4×4)             4    85.5s    4.1%   21.4s   17.3KiB    4.4%  4.33KiB
 static(2×8)             4    64.5s    3.1%   16.1s   17.8KiB    4.5%  4.45KiB
 channel(2×8)            4    64.4s    3.1%   16.1s   39.7KiB   10.1%  9.91KiB
 atomic(2×8)             4    64.2s    3.1%   16.1s   12.8KiB    3.3%  3.21KiB
 sequential(1×16)        4    64.1s    3.1%   16.0s     0.00B    0.0%    0.00B
  • The other situation, total D = 2^16, $\sigma = 1.5$. Now distributing sectors does help.
 channel(2×8)            4    482ms    9.3%   121ms    131KiB   11.3%  32.7KiB
 channel(4×4)            4    436ms    8.4%   109ms    135KiB   11.7%  33.8KiB
 channel(16×1)           4    422ms    8.2%   106ms    137KiB   11.9%  34.3KiB
 sequential(1×16)        4    420ms    8.1%   105ms     0.00B    0.0%    0.00B
 atomic(2×8)             4    414ms    8.0%   104ms   34.4KiB    3.0%  8.61KiB
 atomic(16×1)            4    412ms    8.0%   103ms   67.1KiB    5.8%  16.8KiB
 static(16×1)            4    405ms    7.8%   101ms    110KiB    9.5%  27.5KiB
 atomic(4×4)             4    391ms    7.6%  97.8ms   38.9KiB    3.4%  9.73KiB
 channel(8×2)            4    381ms    7.4%  95.3ms    146KiB   12.6%  36.4KiB
 static(2×8)             4    381ms    7.4%  95.2ms   92.4KiB    8.0%  23.1KiB
 static(4×4)             4    341ms    6.6%  85.3ms    122KiB   10.6%  30.5KiB
 atomic(8×2)             4    339ms    6.6%  84.7ms   47.9KiB    4.2%  12.0KiB
 static(8×2)             4    338ms    6.5%  84.5ms   90.9KiB    7.9%  22.7KiB

Benchmark scripts and more results obtained with $ntasks*nmkl = 16, 32, 64$ can be found below. However, the differences become less obvious when using more cores. The reason may be the relatively higher fluctuations or the effect of NUMA appears.
benchmark_mul.zip

Moreover, I found using Threads.Atomic instead of Channel for dynamical version seems better. Maybe we can choose it together with the statical one (mul_threaded5!) as the candidates of the final solution to support multi-threading in TensorKit.

@lkdvos
Copy link
Collaborator

lkdvos commented Feb 9, 2024

Some more thoughts I just wanted to share here:

  • For the atomic approach, is there any difference between the @sync-@spawn combination, or could you just use @threads? I think using @threads automatically performs some kind of error handling as well, and might benefit more from future julia updates.
  • The benchmark case you originally sent has 37 different blocks, and thus it is not unreasonable that using 16 tasks to handle the matrix multiplications separately does not perform all that well on large blocks, even if all blocks would have been the same size. Mostly what I am trying to say here is that it is super hard to come up with a one-size-fits-all algorithm, and perhaps we should consider expanding the benchmark cases a bit to also include e.g. a typical DMRG contraction?
  • Has anyone considered the @threads :greedy approach that is coming up in Julia 1.11? linked here. It is very similar to the channel approach, however it uses a channel of length 0, which automatically blocks putting new items in the channel until a spot is available, avoiding some extra boilerplate.
  • We could consider first handling large blocks with all threads sent to BLAS, and only then, if enough small blocks survive, turning down the number of BLAS threads and manually spawning some tasks to handle them.
  • It would be nice to have an idea of the "thread efficiency", i.e. how optimal are the different threads actually being used
  • It is probably also a good idea to also come up with a "base size", i.e. to avoid spawning tasks in the first place when the problem is not large enough. I'm thinking of cases like when the symmetry is Z2, (2x8) or (1x16) are the only thread divisions to consider.

@Qiaoyi-Li
Copy link
Author

  • For the atomic approach, is there any difference between the @sync-@spawn combination, or could you just use @threads? I think using @threads automatically performs some kind of error handling as well, and might benefit more from future julia updates.

For loop here is just used to submit tasks, which is quite cheap, so I think the difference can be negligible.

  • The benchmark case you originally sent has 37 different blocks, and thus it is not unreasonable that using 16 tasks to handle the matrix multiplications separately does not perform all that well on large blocks, even if all blocks would have been the same size. Mostly what I am trying to say here is that it is super hard to come up with a one-size-fits-all algorithm, and perhaps we should consider expanding the benchmark cases a bit to also include e.g. a typical DMRG contraction?

I strongly agree that it is quite hard to find a fixed plan to be suitable for all cases. Maybe it is a good idea to provide a simple usage together with a highly controllable expert mode. For the requirement of myself as a user of TensorKit, I may close the multi-threading mul while leave the svd to be parallelized as I actually distribute the interactions in my own DMRG codes (idea from MPSKit), which will conflict with multi-threading mul.

  • Has anyone considered the @threads :greedy approach that is coming up in Julia 1.11? linked here. It is very similar to the channel approach, however it uses a channel of length 0, which automatically blocks putting new items in the channel until a spot is available, avoiding some extra boilerplate.

I am not familiar with this new feature. However, the original implementation I provided just uses unbuffered channels therefore I guess the behavior will be quite similar.

  • We could consider first handling large blocks with all threads sent to BLAS, and only then, if enough small blocks survive, turning down the number of BLAS threads and manually spawning some tasks to handle them.
  • It is probably also a good idea to also come up with a "base size", i.e. to avoid spawning tasks in the first place when the problem is not large enough. I'm thinking of cases like when the symmetry is Z2, (2x8) or (1x16) are the only thread divisions to consider.

It seems the both functionalities need a way to control the threads used by each BLAS task manually and dynamically [mkl_set_num_threads_local may be helpful, linked here]. However, the practical implementation must not be straightforward. At least, both MKL.jl and ThreadPinning.jl do not provide a relative API as I know.

@Jutho
Copy link
Owner

Jutho commented Feb 11, 2024

Just a small comment that I also think the :greedy approach is very similar. I think the length 0 argument to the Channel is the implicit default value, so I believe there is no difference here. The Channel approach is very general in that it can work with any iterable that does not need to be indexable.

But after collecting and sorting the sectors, they become indexable and then the atomic index seems to have much less overhead (comparing the atomic to the channel benchmarks in the table above for the same thread choices). So I am in favor of having this approach as the dynamic approach, together with the static approach. As it seems further experimentation will lead to additional threading strategies, it seems we need an interface that is sufficiently flexible or versatile to accomodate this. Furthermore, as with the number of threads, it would be good if this can be controlled globally at the package level.

@lkdvos
Copy link
Collaborator

lkdvos commented Apr 18, 2024

As an update on this matter, the following comments:

I think it has become clear that the most important things are some form of flexibility, as well as a minimal amount of maintenance from our part. To that end, I propose the following as a final solution: #117 which makes use of OhMyThreads. The hope is to have a system which can be extended when needed, without overly complicating the base implementation.

I played around with some benchmarks to compare to the approaches here, and have found the following promising results. I will still try and get a more extensive set of benchmarks on some different machines, and maybe for some different symmetry types as well:
benchmark_mul_ohmythreads.zip

────────────────────────────────────────────────────────────────────────────────
  D = 4096, σc = 1.0, σs = 0.5          Time                    Allocations
                               ───────────────────────   ────────────────────────
       Tot / % measured:            93.4s /  14.8%           20.0GiB /   0.0%
Section               ncalls     time    %tot     avg     alloc    %tot      avg
 ────────────────────────────────────────────────────────────────────────────────
 channel(2×4)              20    1.49s   10.8%  74.3ms    114KiB    8.0%  5.72KiB
 ohmydynamic(8×1)          20    1.28s    9.3%  64.0ms    101KiB    7.1%  5.06KiB
 ohmystatic(8×1)           20    1.27s    9.2%  63.7ms    130KiB    9.1%  6.52KiB
 ohmystatic(4×2)           20    996ms    7.2%  49.8ms   69.7KiB    4.9%  3.48KiB
 ohmydynamic(4×2)          20    964ms    7.0%  48.2ms   54.4KiB    3.8%  2.72KiB
 atomic(8×1)               20    932ms    6.7%  46.6ms    123KiB    8.6%  6.14KiB
 ohmygreedy(8×1)           20    861ms    6.2%  43.0ms    143KiB   10.0%  7.17KiB
 ohmydynamic(2×4)          20    824ms    6.0%  41.2ms   30.9KiB    2.2%  1.55KiB
 channel(8×1)              20    764ms    5.5%  38.2ms    176KiB   12.3%  8.81KiB
 channel(4×2)              20    741ms    5.4%  37.1ms    136KiB    9.5%  6.81KiB
 ohmygreedy(2×4)           20    626ms    4.5%  31.3ms   73.9KiB    5.2%  3.70KiB
 atomic(2×4)               20    594ms    4.3%  29.7ms   59.1KiB    4.1%  2.95KiB
 atomic(4×2)               20    587ms    4.2%  29.3ms   80.3KiB    5.6%  4.02KiB
 ohmystatic(2×4)           20    563ms    4.1%  28.2ms   39.4KiB    2.8%  1.97KiB
 ohmygreedy(4×2)           20    562ms    4.1%  28.1ms   96.8KiB    6.8%  4.84KiB
 sequential(1×8)           20    431ms    3.1%  21.6ms     0.00B    0.0%    0.00B
 ohmysequential(1×8)       20    330ms    2.4%  16.5ms     0.00B    0.0%    0.00B
 ────────────────────────────────────────────────────────────────────────────────
 ────────────────────────────────────────────────────────────────────────────────
  D = 4096, σc = 3.0, σs = 1.5          Time                    Allocations
                               ───────────────────────   ────────────────────────
       Tot / % measured:            20.8s /   5.0%           1.07GiB /   0.2%

 Section               ncalls     time    %tot     avg     alloc    %tot      avg
 ────────────────────────────────────────────────────────────────────────────────
 channel(2×4)              20    653ms   62.6%  32.6ms    260KiB   11.4%  13.0KiB
 channel(4×2)              20   70.5ms    6.8%  3.52ms    286KiB   12.5%  14.3KiB
 ohmygreedy(2×4)           20   52.6ms    5.0%  2.63ms    130KiB    5.7%  6.50KiB
 atomic(2×4)               20   44.5ms    4.3%  2.23ms    133KiB    5.8%  6.64KiB
 channel(8×1)              20   32.9ms    3.2%  1.64ms    337KiB   14.7%  16.8KiB
 ohmydynamic(2×4)          20   28.0ms    2.7%  1.40ms   30.9KiB    1.4%  1.55KiB
 atomic(4×2)               20   25.0ms    2.4%  1.25ms    154KiB    6.7%  7.70KiB
 atomic(8×1)               20   25.0ms    2.4%  1.25ms    197KiB    8.6%  9.83KiB
 ohmygreedy(4×2)           20   16.5ms    1.6%   824μs    156KiB    6.8%  7.78KiB
 ohmystatic(2×4)           20   16.0ms    1.5%   799μs   39.4KiB    1.7%  1.97KiB
 ohmydynamic(4×2)          20   15.3ms    1.5%   764μs   54.4KiB    2.4%  2.72KiB
 sequential(1×8)           20   13.6ms    1.3%   681μs     0.00B    0.0%    0.00B
 ohmysequential(1×8)       20   13.2ms    1.3%   660μs     0.00B    0.0%    0.00B
 ohmystatic(4×2)           20   10.4ms    1.0%   518μs   69.7KiB    3.0%  3.48KiB
 ohmydynamic(8×1)          20   9.51ms    0.9%   475μs    101KiB    4.4%  5.06KiB
 ohmygreedy(8×1)           20   8.49ms    0.8%   425μs    207KiB    9.1%  10.4KiB
 ohmystatic(8×1)           20   8.46ms    0.8%   423μs    130KiB    5.7%  6.52KiB
 ────────────────────────────────────────────────────────────────────────────────

@xiangjianqian
Copy link

I am also very interested in the multi-threading support for the mul! function. While the direct parallelization across symmetry blocks shows promise, its efficiency tends to wane when dealing with larger sectors. I'm curious if we could optimize performance by initially decomposing tensors, followed by executing the multiplication process in parallel. Here is a simple example:

using LinearAlgebra
using MKL
BLAS.set_num_threads(4)

using BenchmarkTools
using TensorKit

function factorize_tensor_left(t::TensorMap,divid::Int64)
    tt=Vector{TensorMap}(undef,divid)
    for i in 1:divid
        I = sectortype(t)
        S = spacetype(t)
        A = storagetype(t)
        LCdata = Dict{I, A}()
        sector_dims = Dict{I, Int}()
        for c in blocksectors(codomain(t))
            LC=block(t,c)
            LC_Size=size(LC, 1)
            Len=fld(LC_Size,divid)
            if (i-1)*Len+1<=LC_Size
                if i<divid
                    LCdata[c]=LC[(i-1)*Len+1:i*Len,:]
                else
                    LCdata[c]=LC[(i-1)*Len+1:end,:]
                end
                sector_dims[c] = size(LCdata[c], 1)
            end
        end
        V = S(sector_dims)
        W = ProductSpace(V)
        tt[i]=TensorMap(LCdata,W,domain(t))
    end
    return tt
end

function factorize_tensor_right(t::TensorMap,divid::Int64)
    tt=Vector{TensorMap}(undef,divid)
    for i in 1:divid
        I = sectortype(t)
        S = spacetype(t)
        A = storagetype(t)
        LCdata = Dict{I, A}()
        sector_dims = Dict{I, Int}()
        for c in blocksectors(domain(t))
            LC=block(t,c)
            LC_Size=size(LC, 2)
            Len=fld(LC_Size,divid)
            if (i-1)*Len+1<=LC_Size
                if i<divid
                    LCdata[c]=LC[:,(i-1)*Len+1:i*Len]
                else
                    LCdata[c]=LC[:,(i-1)*Len+1:end]
                end
                sector_dims[c] = size(LCdata[c], 2)
            end
        end
        V = S(sector_dims)
        W = ProductSpace(V)
        tt[i]=TensorMap(LCdata,domain(t),W)
    end
    return tt
end

function Summation(t::Vector{TensorMap})
    for i in 2:length(t)
        t[1]=t[1]+t[i]
    end
    return t[1]
end

V=ProductSpace(Vect[(Irrep[U₁]  Irrep[SU₂])]((72, 0)=>180, (74, 0)=>928, (76, 0)=>427, (71, 1/2)=>41, (73, 1/2)=>1099, (75, 1/2)=>1622, (77, 1/2)=>185, (72, 1)=>397, (74, 1)=>2104, (76, 1)=>969, (71, 3/2)=>31, (73, 3/2)=>1104, (75, 3/2)=>1694, (77, 3/2)=>169, (72, 2)=>197, (74, 2)=>1256, (76, 2)=>556, (71, 5/2)=>2, (73, 5/2)=>366, (75, 5/2)=>620, (77, 5/2)=>45, (72, 3)=>26, (74, 3)=>271, (76, 3)=>108, (73, 7/2)=>35, (75, 7/2)=>76, (77, 7/2)=>1, (74, 4)=>16, (76, 4)=>3, (73, 9/2)=>1, (75, 9/2)=>1))

B=TensorMap(randn,V,V)
C=TensorMap(randn,V,V)
Divid=2

Bs=factorize_tensor_right(B,Divid)   #Factorize B in to a list of tensors#

Cs=factorize_tensor_left(C,Divid)    #Factorize C in to a list of tensors#

# B*C test     Cost: D^3 #

function Sequential_BC()    #cost: D^3#
    return B*C
end

function Sequential_BC2()    #cost: D^3#
    D=TensorMap(zeros,V,V)
    for i in 1:length(Bs)
        D+=Bs[i]*Cs[i]
    end
    return 0
end

function Parallel_BC()
    D=Vector{TensorMap}(undef,Divid)
    Threads.@threads for i in 1:length(Bs)  #cost: D^3/Divid for each thread#
        D[i]=Bs[i]*Cs[i]
    end
    return Summation(D)
end
        
@btime Sequential_BC()

println("********")

@btime Parallel_BC()

println("********")

@btime Sequential_BC2()

@lkdvos
Copy link
Collaborator

lkdvos commented Apr 21, 2024

I am not entirely sure that's a very fair comparison though, I would assume that you have to include the time to factorize in the benchmark, while this is now done in advance.

As an aside, I think you could speed up that process by quite a bit by considering view instead of slicing, as this would reduce on the amount of allocations. Additionally, OhMyThreads has some functionality for working with parallel reductions, so you might even be able to have a look at tmapreduce to also implement the summation in parallel.

@xiangjianqian
Copy link

Here is an improved version of the naive one (very similar to _threaded_blas_mul! in Strided.jl). However, it only keep in pace with naively parallelizing over BLAS threads.

## test start with julia -t 4

using LinearAlgebra
using MKL
# if Sys.islinux()
#     using ThreadPinning
#     pinthreads_opt = :compact
#     if pinthreads_opt != :none
#         ThreadPinning.mkl_set_dynamic(0)
#         ThreadPinning.pinthreads(pinthreads_opt)
#         threadinfo(; color=false, blas=true)
#     end
# else
#     pinthreads_opt = :none
# end
BLAS.set_num_threads(2)
using BenchmarkTools
using OhMyThreads
using TensorKit
import TensorKit: hasblock, StridedView

function _mul_decompose!(C::StridedView{T,2}, A::StridedView{T,2}, B::StridedView{T,2}, α, β, ndivid,position) where {T<:LinearAlgebra.BlasFloat}
    m=cal_interval(position,size(C,1),size(C,2),ndivid)
    if m[1]!=0
        mul!(C[m[1]:m[2], m[3]:m[4]], A[m[1]:m[2], :], B[:, m[3]:m[4]], α, β)
    end
end

function _rmul_decompose!(C::StridedView{T,2}, α , β, ndivid, position) where {T<:LinearAlgebra.BlasFloat}
    m=cal_interval(position,size(C,1),size(C,2),ndivid)
    if m[1]!=0
        rmul!(C[m[1]:m[2], m[3]:m[4]], β)
    end
end

function cal_interval(i::Int64, len_i::Int64, len_j::Int64, ndivid::Int64)
    n=BLAS.get_num_threads()
    if len_i>len_j
        interval=max(128*n,fld(len_i,ndivid))
        if i*interval<=len_i
            return [(i-1)*interval+1,i*interval,1,len_j]
        elseif (i-1)*interval+1<=len_i
            return [(i-1)*interval+1,len_i,1,len_j]
        else
            return [0,0,0,0]
        end
    else
        interval=max(128*n,fld(len_j,ndivid))
        if i*interval<=len_j
            return [1,len_i,(i-1)*interval+1,i*interval]
        elseif (i-1)*interval+1<=len_j
            return [1,len_i,(i-1)*interval+1,len_j]
        else
            return [0,0,0,0]
        end
    end
    return [0,0,0,0]
end

## symmetry_block_with_decompose

function blocktask_decompose!(position::Int64, tA, tB, tC, α, β,ndivid)
    c=blocksectors(tC)[fld(position-1,ndivid)+1]
    if hasblock(tA, c)
        _mul_decompose!(StridedView(block(tC, c)),StridedView(block(tA, c)),StridedView(block(tB, c)), α, β, ndivid, (position-1)%(ndivid)+1)
    elseif β != one(β)
        _rmul_decompose!(block(tC, c), β, ndivid, (position-1)%ndivid+1)
    end
    return nothing
end

function mul_decompose!(tC, tA, tB, α=true, β=true; M=Threads.nthreads())
    if !(codomain(tC) == codomain(tA) && domain(tC) == domain(tB) &&
         domain(tA) == codomain(tB))
        throw(SpaceMismatch("$(space(tC))$(space(tA)) * $(space(tB))"))
    end
    ndivid = M
    ntasks = Threads.nthreads()
    scheduler = GreedyScheduler(; ntasks)
    blocktask_decompose(i) = blocktask_decompose!(i, tA, tB, tC, α, β,ndivid)
    tforeach(blocktask_decompose, [i for i in 1:ndivid*length(blocksectors(tC))]; scheduler)
    # Threads.@threads for i in 1:ndivid*length(blocksectors(tC))
    #     blocktask_decompose!(i, tA, tB, tC, α, β,ndivid)
    # end
    return nothing
end

## decompose only

function blocktask_decompose2!(position::Int64, tA, tB, tC, α, β,ndivid)
    for c in blocksectors(tC)
        if hasblock(tA, c)
            _mul_decompose!(StridedView(block(tC, c)),StridedView(block(tA, c)),StridedView(block(tB, c)), α, β, ndivid, position)
        elseif β != one(β)
            _rmul_decompose!(block(tC, c), β, ndivid, position)
        end
    end
    return nothing
end

function mul_decompose2!(tC, tA, tB, α=true, β=true; M=Threads.nthreads())
    if !(codomain(tC) == codomain(tA) && domain(tC) == domain(tB) &&
         domain(tA) == codomain(tB))
        throw(SpaceMismatch("$(space(tC))$(space(tA)) * $(space(tB))"))
    end
    ndivid = M
    ntasks = Threads.nthreads()
    scheduler =GreedyScheduler(; ntasks)
    blocktask_decompose(i) = blocktask_decompose2!(i, tA, tB, tC, α, β,ndivid)
    tforeach(blocktask_decompose, [i for i in 1:ndivid]; scheduler)
    # Threads.@threads for i in 1:ndivid
    #     blocktask_decompose2!(i, tA, tB, tC, α, β,ndivid)
    # end
    return nothing
end

function mul_sequential!(tC, tA, tB, α=true, β=true)
    if !(codomain(tC) == codomain(tA) && domain(tC) == domain(tB) &&
         domain(tA) == codomain(tB))
        throw(SpaceMismatch("$(space(tC))$(space(tA)) * $(space(tB))"))
    end
    for c in blocksectors(tC)
        blocktask!(c, tA, tB, tC, α, β)
    end
    return nothing
end

function blocktask!(c, tA, tB, tC, α, β)
    if hasblock(tA, c)
        mul!(StridedView(block(tC, c)),
             StridedView(block(tA, c)),
             StridedView(block(tB, c)),
             α, β)
    elseif β != one(β)
        rmul!(block(tC, c), β)
    end
    return nothing
end

function Space_Gaussian(D::Int64, σc::Real, σs::Real)
    lsq = [(c, s) for c in (-Int64(round(3 * σc))):Int64(round(3 * σc))
           for
           s in 0:(1 // 2):(Int64(round(6 * σs)) // 2)]
    lsdim = map(lsq) do (c, s)
        return exp(-c^2 / (2 * σc^2) - s^2 / (2 * σs^2))
    end
    Dtot = sum(map(lsq, lsdim) do (_, s), D
                   return (2s + 1) * D
               end)
    lsdim = map(lsdim / Dtot * D) do x
        return round(Int64, x)
    end

    return ProductSpace(Vect[(Irrep[U₁]  Irrep[SU₂])]((c, s) => d for ((c, s), d) in zip(lsq, lsdim)))
end
V1=ProductSpace(Vect[(Irrep[U₁]  Irrep[SU₂])]((1, 1/2)=>1, (0, 0)=>1))
V = Space_Gaussian(2^12, 1.0 , 0.5)

B=TensorMap(randn,V*V1,V)
C=TensorMap(randn,V,V)
D = B * C


#B*C test     Cost: D^3 #

println("***Serial 1*2***")
tC = zerovector(D)
@btime mul_sequential!(tC, B, C)

println("---------------")
println("ntasks*n_MKL: 4*2")
println("---------------")

println("***Parallel_symmetry_block-only***")
tC = zerovector(D)
@btime mul_decompose!(tC, B, C, M=1)

println("***Parallel_symmetry_block_with_decompose***")
tC = zerovector(D)
@btime mul_decompose!(tC, B, C, M=Threads.nthreads())

println("***Parallel_symmetry_block_with_decompose (heavily)***")
tC = zerovector(D)
@btime mul_decompose!(tC, B, C, M=2*Threads.nthreads())

println("***Parallel_no-decompose (same with serial)***")
tC = zerovector(D)
@btime mul_decompose2!(tC, B, C , M=1)


println("****Parallel_decompose-only***")
tC = zerovector(D)
@btime mul_decompose2!(tC, B, C , M=Threads.nthreads())

println("****Parallel_decompose-only (heavily)***")
tC = zerovector(D)
@btime mul_decompose2!(tC, B, C , M=2*Threads.nthreads())

BLAS.set_num_threads(8)

println("***Serial 1*8***")
tC = zerovector(D)
@btime mul_sequential!(tC, B, C)

***Serial 1*2***
  24.590 ms (0 allocations: 0 bytes)
---------------
ntasks*n_MKL: 4*2
---------------
***Parallel_symmetry_block-only***
  13.052 ms (104 allocations: 8.75 KiB)
***Parallel_symmetry_block_with_decompose***
  8.288 ms (201 allocations: 19.20 KiB)
***Parallel_symmetry_block_with_decompose (heavily)***
  8.779 ms (305 allocations: 29.84 KiB)
***Parallel_no-decompose (same with serial)***
  25.556 ms (85 allocations: 6.51 KiB)
****Parallel_decompose-only***
  10.963 ms (199 allocations: 16.64 KiB)
****Parallel_decompose-only (heavily)***
  11.192 ms (303 allocations: 26.42 KiB)
***Serial 1*8***
  8.370 ms (0 allocations: 0 bytes)

@lkdvos
Copy link
Collaborator

lkdvos commented Apr 22, 2024

To be honest, I am not sure if there is too much performance to be gained there, as this is effectively trying to compete with BLAS. This is definitely possible in Julia, but I don't think this is something we necessarily want to do in TensorKit. I think if you want to achieve maximal performance within a single block, I'd recommend having a look at the different approaches already in the ecosystem, i.e. the Strided.jl approach you already mentioned, or LoopVectorization, or Octavian, or Tullio, or ...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants