Multi-Threading

Base.Threads.@threads — Macro

  1. Threads.@threads [schedule] for ... end

A macro to execute a for loop in parallel. The iteration space is distributed to coarse-grained tasks. This policy can be specified by the schedule argument. The execution of the loop waits for the evaluation of all iterations.

See also: @spawn and pmap in Distributed.

Extended help

Semantics

Unless stronger guarantees are specified by the scheduling option, the loop executed by @threads macro have the following semantics.

The @threads macro executes the loop body in an unspecified order and potentially concurrently. It does not specify the exact assignments of the tasks and the worker threads. The assignments can be different for each execution. The loop body code (including any code transitively called from it) must not make any assumptions about the distribution of iterations to tasks or the worker thread in which they are executed. The loop body for each iteration must be able to make forward progress independent of other iterations and be free from data races. As such, invalid synchronizations across iterations may deadlock while unsynchronized memory accesses may result in undefined behavior.

For example, the above conditions imply that:

  • A lock taken in an iteration must be released within the same iteration.
  • Communicating between iterations using blocking primitives like Channels is incorrect.
  • Write only to locations not shared across iterations (unless a lock or atomic operation is used).
  • Unless the :static schedule is used, the value of threadid() may change even within a single iteration. See Task Migration.

Schedulers

Without the scheduler argument, the exact scheduling is unspecified and varies across Julia releases. Currently, :dynamic is used when the scheduler is not specified.

Julia 1.5

The schedule argument is available as of Julia 1.5.

:dynamic (default)

:dynamic scheduler executes iterations dynamically to available worker threads. Current implementation assumes that the workload for each iteration is uniform. However, this assumption may be removed in the future.

This scheduling option is merely a hint to the underlying execution mechanism. However, a few properties can be expected. The number of Tasks used by :dynamic scheduler is bounded by a small constant multiple of the number of available worker threads (Threads.threadpoolsize()). Each task processes contiguous regions of the iteration space. Thus, @threads :dynamic for x in xs; f(x); end is typically more efficient than @sync for x in xs; @spawn f(x); end if length(xs) is significantly larger than the number of the worker threads and the run-time of f(x) is relatively smaller than the cost of spawning and synchronizing a task (typically less than 10 microseconds).

Julia 1.8

The :dynamic option for the schedule argument is available and the default as of Julia 1.8.

:static

:static scheduler creates one task per thread and divides the iterations equally among them, assigning each task specifically to each thread. In particular, the value of threadid() is guaranteed to be constant within one iteration. Specifying :static is an error if used from inside another @threads loop or from a thread other than 1.

Note

:static scheduling exists for supporting transition of code written before Julia 1.3. In newly written library functions, :static scheduling is discouraged because the functions using this option cannot be called from arbitrary worker threads.

Example

To illustrate of the different scheduling strategies, consider the following function busywait containing a non-yielding timed loop that runs for a given number of seconds.

  1. julia> function busywait(seconds)
  2. tstart = time_ns()
  3. while (time_ns() - tstart) / 1e9 < seconds
  4. end
  5. end
  6. julia> @time begin
  7. Threads.@spawn busywait(5)
  8. Threads.@threads :static for i in 1:Threads.threadpoolsize()
  9. busywait(1)
  10. end
  11. end
  12. 6.003001 seconds (16.33 k allocations: 899.255 KiB, 0.25% compilation time)
  13. julia> @time begin
  14. Threads.@spawn busywait(5)
  15. Threads.@threads :dynamic for i in 1:Threads.threadpoolsize()
  16. busywait(1)
  17. end
  18. end
  19. 2.012056 seconds (16.05 k allocations: 883.919 KiB, 0.66% compilation time)

The :dynamic example takes 2 seconds since one of the non-occupied threads is able to run two of the 1-second iterations to complete the for loop.

source

Base.Threads.foreach — Function

  1. Threads.foreach(f, channel::Channel;
  2. schedule::Threads.AbstractSchedule=Threads.FairSchedule(),
  3. ntasks=Threads.threadpoolsize())

Similar to foreach(f, channel), but iteration over channel and calls to f are split across ntasks tasks spawned by Threads.@spawn. This function will wait for all internally spawned tasks to complete before returning.

If schedule isa FairSchedule, Threads.foreach will attempt to spawn tasks in a manner that enables Julia’s scheduler to more freely load-balance work items across threads. This approach generally has higher per-item overhead, but may perform better than StaticSchedule in concurrence with other multithreaded workloads.

If schedule isa StaticSchedule, Threads.foreach will spawn tasks in a manner that incurs lower per-item overhead than FairSchedule, but is less amenable to load-balancing. This approach thus may be more suitable for fine-grained, uniform workloads, but may perform worse than FairSchedule in concurrence with other multithreaded workloads.

Examples

  1. julia> n = 20
  2. julia> c = Channel{Int}(ch -> foreach(i -> put!(ch, i), 1:n), 1)
  3. julia> d = Channel{Int}(n) do ch
  4. f = i -> put!(ch, i^2)
  5. Threads.foreach(f, c)
  6. end
  7. julia> collect(d)
  8. collect(d) = [1, 4, 9, 16, 25, 36, 49, 64, 81, 100, 121, 144, 169, 196, 225, 256, 289, 324, 361, 400]

Julia 1.6

This function requires Julia 1.6 or later.

source

Base.Threads.@spawn — Macro

  1. Threads.@spawn [:default|:interactive] expr

Create a Task and schedule it to run on any available thread in the specified threadpool (:default if unspecified). The task is allocated to a thread once one becomes available. To wait for the task to finish, call wait on the result of this macro, or call fetch to wait and then obtain its return value.

Values can be interpolated into @spawn via $, which copies the value directly into the constructed underlying closure. This allows you to insert the value of a variable, isolating the asynchronous code from changes to the variable’s value in the current task.

Note

The thread that the task runs on may change if the task yields, therefore threadid() should not be treated as constant for a task. See Task Migration, and the broader multi-threading manual for further important caveats. See also the chapter on threadpools.

Julia 1.3

This macro is available as of Julia 1.3.

Julia 1.4

Interpolating values via $ is available as of Julia 1.4.

Julia 1.9

A threadpool may be specified as of Julia 1.9.

Examples

  1. julia> t() = println("Hello from ", Threads.threadid());
  2. julia> tasks = fetch.([Threads.@spawn t() for i in 1:4]);
  3. Hello from 1
  4. Hello from 1
  5. Hello from 3
  6. Hello from 4

source

Base.Threads.threadid — Function

  1. Threads.threadid() -> Int

Get the ID number of the current thread of execution. The master thread has ID 1.

Examples

  1. julia> Threads.threadid()
  2. 1
  3. julia> Threads.@threads for i in 1:4
  4. println(Threads.threadid())
  5. end
  6. 4
  7. 2
  8. 5
  9. 4

Note

The thread that a task runs on may change if the task yields, which is known as Task Migration. For this reason in most cases it is not safe to use threadid() to index into, say, a vector of buffer or stateful objects.

source

Base.Threads.maxthreadid — Function

  1. Threads.maxthreadid() -> Int

Get a lower bound on the number of threads (across all thread pools) available to the Julia process, with atomic-acquire semantics. The result will always be greater than or equal to threadid() as well as threadid(task) for any task you were able to observe before calling maxthreadid.

source

Base.Threads.nthreads — Function

  1. Threads.nthreads(:default | :interactive) -> Int

Get the current number of threads within the specified thread pool. The threads in default have id numbers 1:nthreads(:default).

See also BLAS.get_num_threads and BLAS.set_num_threads in the LinearAlgebra standard library, and nprocs() in the Distributed standard library and Threads.maxthreadid().

source

Base.Threads.threadpool — Function

  1. Threads.threadpool(tid = threadid()) -> Symbol

Returns the specified thread’s threadpool; either :default, :interactive, or :foreign.

source

Base.Threads.nthreadpools — Function

  1. Threads.nthreadpools() -> Int

Returns the number of threadpools currently configured.

source

Base.Threads.threadpoolsize — Function

  1. Threads.threadpoolsize(pool::Symbol = :default) -> Int

Get the number of threads available to the default thread pool (or to the specified thread pool).

See also: BLAS.get_num_threads and BLAS.set_num_threads in the LinearAlgebra standard library, and nprocs() in the Distributed standard library.

source

Base.Threads.ngcthreads — Function

  1. Threads.ngcthreads() -> Int

Returns the number of GC threads currently configured. This includes both mark threads and concurrent sweep threads.

source

See also Multi-Threading.

Atomic operations

atomic — Keyword

Unsafe pointer operations are compatible with loading and storing pointers declared with _Atomic and std::atomic type in C11 and C++23 respectively. An error may be thrown if there is not support for atomically loading the Julia type T.

See also: unsafe_load, unsafe_modify!, unsafe_replace!, unsafe_store!, unsafe_swap!

source

Base.@atomic — Macro

  1. @atomic var
  2. @atomic order ex

Mark var or ex as being performed atomically, if ex is a supported expression. If no order is specified it defaults to :sequentially_consistent.

  1. @atomic a.b.x = new
  2. @atomic a.b.x += addend
  3. @atomic :release a.b.x = new
  4. @atomic :acquire_release a.b.x += addend

Perform the store operation expressed on the right atomically and return the new value.

With =, this operation translates to a setproperty!(a.b, :x, new) call. With any operator also, this operation translates to a modifyproperty!(a.b, :x, +, addend)[2] call.

  1. @atomic a.b.x max arg2
  2. @atomic a.b.x + arg2
  3. @atomic max(a.b.x, arg2)
  4. @atomic :acquire_release max(a.b.x, arg2)
  5. @atomic :acquire_release a.b.x + arg2
  6. @atomic :acquire_release a.b.x max arg2

Perform the binary operation expressed on the right atomically. Store the result into the field in the first argument and return the values (old, new).

This operation translates to a modifyproperty!(a.b, :x, func, arg2) call.

See Per-field atomics section in the manual for more details.

Examples

  1. julia> mutable struct Atomic{T}; @atomic x::T; end
  2. julia> a = Atomic(1)
  3. Atomic{Int64}(1)
  4. julia> @atomic a.x # fetch field x of a, with sequential consistency
  5. 1
  6. julia> @atomic :sequentially_consistent a.x = 2 # set field x of a, with sequential consistency
  7. 2
  8. julia> @atomic a.x += 1 # increment field x of a, with sequential consistency
  9. 3
  10. julia> @atomic a.x + 1 # increment field x of a, with sequential consistency
  11. 3 => 4
  12. julia> @atomic a.x # fetch field x of a, with sequential consistency
  13. 4
  14. julia> @atomic max(a.x, 10) # change field x of a to the max value, with sequential consistency
  15. 4 => 10
  16. julia> @atomic a.x max 5 # again change field x of a to the max value, with sequential consistency
  17. 10 => 10

Julia 1.7

This functionality requires at least Julia 1.7.

source

Base.@atomicswap — Macro

  1. @atomicswap a.b.x = new
  2. @atomicswap :sequentially_consistent a.b.x = new

Stores new into a.b.x and returns the old value of a.b.x.

This operation translates to a swapproperty!(a.b, :x, new) call.

See Per-field atomics section in the manual for more details.

Examples

  1. julia> mutable struct Atomic{T}; @atomic x::T; end
  2. julia> a = Atomic(1)
  3. Atomic{Int64}(1)
  4. julia> @atomicswap a.x = 2+2 # replace field x of a with 4, with sequential consistency
  5. 1
  6. julia> @atomic a.x # fetch field x of a, with sequential consistency
  7. 4

Julia 1.7

This functionality requires at least Julia 1.7.

source

Base.@atomicreplace — Macro

  1. @atomicreplace a.b.x expected => desired
  2. @atomicreplace :sequentially_consistent a.b.x expected => desired
  3. @atomicreplace :sequentially_consistent :monotonic a.b.x expected => desired

Perform the conditional replacement expressed by the pair atomically, returning the values (old, success::Bool). Where success indicates whether the replacement was completed.

This operation translates to a replaceproperty!(a.b, :x, expected, desired) call.

See Per-field atomics section in the manual for more details.

Examples

  1. julia> mutable struct Atomic{T}; @atomic x::T; end
  2. julia> a = Atomic(1)
  3. Atomic{Int64}(1)
  4. julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
  5. (old = 1, success = true)
  6. julia> @atomic a.x # fetch field x of a, with sequential consistency
  7. 2
  8. julia> @atomicreplace a.x 1 => 2 # replace field x of a with 2 if it was 1, with sequential consistency
  9. (old = 2, success = false)
  10. julia> xchg = 2 => 0; # replace field x of a with 0 if it was 2, with sequential consistency
  11. julia> @atomicreplace a.x xchg
  12. (old = 2, success = true)
  13. julia> @atomic a.x # fetch field x of a, with sequential consistency
  14. 0

Julia 1.7

This functionality requires at least Julia 1.7.

source

Note

The following APIs are fairly primitive, and will likely be exposed through an unsafe_*-like wrapper.

  1. Core.Intrinsics.atomic_pointerref(pointer::Ptr{T}, order::Symbol) --> T
  2. Core.Intrinsics.atomic_pointerset(pointer::Ptr{T}, new::T, order::Symbol) --> pointer
  3. Core.Intrinsics.atomic_pointerswap(pointer::Ptr{T}, new::T, order::Symbol) --> old
  4. Core.Intrinsics.atomic_pointermodify(pointer::Ptr{T}, function::(old::T,arg::S)->T, arg::S, order::Symbol) --> old
  5. Core.Intrinsics.atomic_pointerreplace(pointer::Ptr{T}, expected::Any, new::T, success_order::Symbol, failure_order::Symbol) --> (old, cmp)

Warning

The following APIs are deprecated, though support for them is likely to remain for several releases.

Base.Threads.Atomic — Type

  1. Threads.Atomic{T}

Holds a reference to an object of type T, ensuring that it is only accessed atomically, i.e. in a thread-safe manner.

Only certain “simple” types can be used atomically, namely the primitive boolean, integer, and float-point types. These are Bool, Int8Int128, UInt8UInt128, and Float16Float64.

New atomic objects can be created from a non-atomic values; if none is specified, the atomic object is initialized with zero.

Atomic objects can be accessed using the [] notation:

Examples

  1. julia> x = Threads.Atomic{Int}(3)
  2. Base.Threads.Atomic{Int64}(3)
  3. julia> x[] = 1
  4. 1
  5. julia> x[]
  6. 1

Atomic operations use an atomic_ prefix, such as atomic_add!, atomic_xchg!, etc.

source

Base.Threads.atomic_cas! — Function

  1. Threads.atomic_cas!(x::Atomic{T}, cmp::T, newval::T) where T

Atomically compare-and-set x

Atomically compares the value in x with cmp. If equal, write newval to x. Otherwise, leaves x unmodified. Returns the old value in x. By comparing the returned value to cmp (via ===) one knows whether x was modified and now holds the new value newval.

For further details, see LLVM’s cmpxchg instruction.

This function can be used to implement transactional semantics. Before the transaction, one records the value in x. After the transaction, the new value is stored only if x has not been modified in the mean time.

Examples

  1. julia> x = Threads.Atomic{Int}(3)
  2. Base.Threads.Atomic{Int64}(3)
  3. julia> Threads.atomic_cas!(x, 4, 2);
  4. julia> x
  5. Base.Threads.Atomic{Int64}(3)
  6. julia> Threads.atomic_cas!(x, 3, 2);
  7. julia> x
  8. Base.Threads.Atomic{Int64}(2)

source

Base.Threads.atomic_xchg! — Function

  1. Threads.atomic_xchg!(x::Atomic{T}, newval::T) where T

Atomically exchange the value in x

Atomically exchanges the value in x with newval. Returns the old value.

For further details, see LLVM’s atomicrmw xchg instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(3)
  2. Base.Threads.Atomic{Int64}(3)
  3. julia> Threads.atomic_xchg!(x, 2)
  4. 3
  5. julia> x[]
  6. 2

source

Base.Threads.atomic_add! — Function

  1. Threads.atomic_add!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

Atomically add val to x

Performs x[] += val atomically. Returns the old value. Not defined for Atomic{Bool}.

For further details, see LLVM’s atomicrmw add instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(3)
  2. Base.Threads.Atomic{Int64}(3)
  3. julia> Threads.atomic_add!(x, 2)
  4. 3
  5. julia> x[]
  6. 5

source

Base.Threads.atomic_sub! — Function

  1. Threads.atomic_sub!(x::Atomic{T}, val::T) where T <: ArithmeticTypes

Atomically subtract val from x

Performs x[] -= val atomically. Returns the old value. Not defined for Atomic{Bool}.

For further details, see LLVM’s atomicrmw sub instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(3)
  2. Base.Threads.Atomic{Int64}(3)
  3. julia> Threads.atomic_sub!(x, 2)
  4. 3
  5. julia> x[]
  6. 1

source

Base.Threads.atomic_and! — Function

  1. Threads.atomic_and!(x::Atomic{T}, val::T) where T

Atomically bitwise-and x with val

Performs x[] &= val atomically. Returns the old value.

For further details, see LLVM’s atomicrmw and instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(3)
  2. Base.Threads.Atomic{Int64}(3)
  3. julia> Threads.atomic_and!(x, 2)
  4. 3
  5. julia> x[]
  6. 2

source

Base.Threads.atomic_nand! — Function

  1. Threads.atomic_nand!(x::Atomic{T}, val::T) where T

Atomically bitwise-nand (not-and) x with val

Performs x[] = ~(x[] & val) atomically. Returns the old value.

For further details, see LLVM’s atomicrmw nand instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(3)
  2. Base.Threads.Atomic{Int64}(3)
  3. julia> Threads.atomic_nand!(x, 2)
  4. 3
  5. julia> x[]
  6. -3

source

Base.Threads.atomic_or! — Function

  1. Threads.atomic_or!(x::Atomic{T}, val::T) where T

Atomically bitwise-or x with val

Performs x[] |= val atomically. Returns the old value.

For further details, see LLVM’s atomicrmw or instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(5)
  2. Base.Threads.Atomic{Int64}(5)
  3. julia> Threads.atomic_or!(x, 7)
  4. 5
  5. julia> x[]
  6. 7

source

Base.Threads.atomic_xor! — Function

  1. Threads.atomic_xor!(x::Atomic{T}, val::T) where T

Atomically bitwise-xor (exclusive-or) x with val

Performs x[] $= val atomically. Returns the old value.

For further details, see LLVM’s atomicrmw xor instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(5)
  2. Base.Threads.Atomic{Int64}(5)
  3. julia> Threads.atomic_xor!(x, 7)
  4. 5
  5. julia> x[]
  6. 2

source

Base.Threads.atomic_max! — Function

  1. Threads.atomic_max!(x::Atomic{T}, val::T) where T

Atomically store the maximum of x and val in x

Performs x[] = max(x[], val) atomically. Returns the old value.

For further details, see LLVM’s atomicrmw max instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(5)
  2. Base.Threads.Atomic{Int64}(5)
  3. julia> Threads.atomic_max!(x, 7)
  4. 5
  5. julia> x[]
  6. 7

source

Base.Threads.atomic_min! — Function

  1. Threads.atomic_min!(x::Atomic{T}, val::T) where T

Atomically store the minimum of x and val in x

Performs x[] = min(x[], val) atomically. Returns the old value.

For further details, see LLVM’s atomicrmw min instruction.

Examples

  1. julia> x = Threads.Atomic{Int}(7)
  2. Base.Threads.Atomic{Int64}(7)
  3. julia> Threads.atomic_min!(x, 5)
  4. 7
  5. julia> x[]
  6. 5

source

Base.Threads.atomic_fence — Function

  1. Threads.atomic_fence()

Insert a sequential-consistency memory fence

Inserts a memory fence with sequentially-consistent ordering semantics. There are algorithms where this is needed, i.e. where an acquire/release ordering is insufficient.

This is likely a very expensive operation. Given that all other atomic operations in Julia already have acquire/release semantics, explicit fences should not be necessary in most cases.

For further details, see LLVM’s fence instruction.

source

ccall using a libuv threadpool (Experimental))

Base.@threadcall — Macro

  1. @threadcall((cfunc, clib), rettype, (argtypes...), argvals...)

The @threadcall macro is called in the same way as ccall but does the work in a different thread. This is useful when you want to call a blocking C function without causing the current julia thread to become blocked. Concurrency is limited by size of the libuv thread pool, which defaults to 4 threads but can be increased by setting the UV_THREADPOOL_SIZE environment variable and restarting the julia process.

Note that the called function should never call back into Julia.

source

Low-level synchronization primitives

These building blocks are used to create the regular synchronization objects.

Base.Threads.SpinLock — Type

  1. SpinLock()

Create a non-reentrant, test-and-test-and-set spin lock. Recursive use will result in a deadlock. This kind of lock should only be used around code that takes little time to execute and does not block (e.g. perform I/O). In general, ReentrantLock should be used instead.

Each lock must be matched with an unlock. If !islocked(lck::SpinLock) holds, trylock(lck) succeeds unless there are other tasks attempting to hold the lock “at the same time.”

Test-and-test-and-set spin locks are quickest up to about 30ish contending threads. If you have more contention than that, different synchronization approaches should be considered.

source