The Many-to-One Parallel Signal Sending Optimization
This blog post discusses the parallel signal sending
optimization that recently got merged into the
master branch (scheduled to be included in Erlang/OTP 25). The
optimization improves signal sending throughput when several processes
send signals to a single process simultaneously on multicore
machines. At the moment, the optimization is only active when one
configures the receiving process with the {message_queue_data,
off_heap}
setting. The following figure gives an
idea of what type of scalability improvement the optimization can give
in extreme scenarios (number of Erlang processes sending signals on
the x-axis and throughput on the y-axis):
This blog post aims to give you an understanding of how signal sending on a single node is implemented in Erlang and how the new optimization can yield the impressive scalability improvement illustrated in the figure above. Let us begin with a brief introduction to what Erlang signals are.
Erlang Signals #
All concurrently executing entities (processes, ports, etc.) in an Erlang system communicate using asynchronous signals. The most common signal is normal messages that are typically sent between processes with the bang (!) operator. As Erlang takes pride in being a concurrent programming language, it is, of course, essential that signals are sent efficiently between different entities. Let us now discuss what guarantees Erlang programmers get about signal sending ordering, as this will help when learning how the new optimization works.
The Signal Ordering Guarantee #
The signal ordering guarantee is described in the Erlang documentation like this:
“The only signal ordering guarantee given is the following: if an entity sends multiple signals to the same destination entity, the order is preserved; that is, if
A
sends a signalS1
toB
, and later sends signalS2
toB
,S1
is guaranteed not to arrive afterS2
.”
This guarantee means that if multiple processes send signals to a
single process, all signals from the same process are received in the
send order in the receiving process. Still, there is no ordering
guarantee for two signals coming from two distinct processes. One
should not think about signal sending as instantaneous. There can be
an arbitrary delay after a signal has been sent until it has reached
its destination, but all signals from A
to B
travel on the same path
and cannot pass each other.
The guarantee has deliberately been designed to allow for efficient implementations and allow for future optimizations. However, as we will see in the next section, before the optimization presented in this blog post, the implementation did not take advantage of the permissive ordering guarantee for signals sent between processes running on the same node.
Single-Node Process-to-Process Implementation before the Optimization #
Conceptually, the Erlang VM organized the data structure for an Erlang process as in the following figure before the optimization:
Of course, this is an extreme simplification of the Erlang process
structure, but it is enough for our explanation. When a process has
the {message_queue_data, off_heap}
setting activated, the following
algorithm is executed to send a signal:
- Allocate a new linked list node containing the signal data
- Acquire the
OuterSignalQueueLock
in the receiving process - Insert the new node at the end of the
OuterSignalQueue
- Release the
OuterSignalQueueLock
When a receiving process has run out of signals in its
InnerSignalQueue
and/or wants to check if there are more signals in
the outer queue, the following algorithm is executed:
- Acquire the
OuterSignalQueueLock
- Append the
OuterSignalQueue
at the end of theInnerSignalQueue
- Release the
OuterSignalQueueLock
How signal sending works when the receiving process is configured with
{message_queue_data, on_heap}
is not so relevant for the main topic
of this blog post. Still, understanding how {message_queue_data,
on_heap}
works will also give you an understaning of why the parallel
signal queue optimization is not enabled when a process is configured
with {message_queue_data, on_heap}
(which is the default setting),
so here is the algorithm for sending a signal to such a process:
- Try to acquire the
MainProcessLock
with atry_lock
call- If the
try_lock
call succeeded:- Allocate space for the signal data on the process’ main heap area and copy the signal data there
- Allocate a linked list node containing a pointer to the process heap-allocated signal data
- Acquire the
OuterSignalQueueLock
- Insert the linked list node at the end of the
OuterSignalQueue
- Release the
OuterSignalQueueLock
- Release the
MainProcessLock
- Else:
- Allocate a new linked list node containing the signal data
- Acquire the
OuterSignalQueueLock
- Insert the new node at the end of the
OuterSignalQueue
- Release the
OuterSignalQueueLock
- If the
The advantage of {message_queue_data, on_heap}
compared to
{message_queue_data, off_heap}
is that the signal data is copied
directly to the receiving process main heap (when the try_lock
call
for the MainProcessLock
succeeds). The disadvantage of
{message_queue_data, on_heap}
is that the sender creates extra
contention on the receiver’s MainProcessLock
. Notice that we cannot
simply release the MainProcessLock
directly after allocating the
data on the receiver’s process heap. If a garbage collection happen
before the signal have been inserted into the process’ heap, the
signal data would be lost (holding the MainProcessLock
prevents a
garbage collection from happening). Therefore, {message_queue_data,
off_heap}
provides much better scalability than {message_queue_data,
on_heap}
when multiple processes send signals to the same process
concurrently on a multicore system.
However, even though {message_queue_data, off_heap}
scales better
than {message_queue_data, on_heap}
with the old implementation,
signal senders still had to acquire the OuterSignalQueueLock
for a
short time. This lock can become a scalability bottleneck and a
contended hot-spot when there are enough parallel senders. This is why
we saw very poor scalability and even a slowdown for the old
implementation in the benchmark figure above. Now, we are ready to
look at the new optimization.
The Parallel Signal Sending Optimization #
The optimization takes advantage of Erlang’s permissive signal
ordering guarantee discussed above. It is enough to keep the order of
signals coming from the same entity to ensure that the signal ordering
guarantee holds. So there is no need for different senders to
synchronize with each other! In theory, signal sending could therefore
be parallelized perfectly. In practice, however, there is only one
thread of execution that handles incoming signals, so we also have to
keep in mind that we don’t want to slow down the receiver and ideally
make receiving signals faster. As signal queue data is stored outside
the process main heap area when the {message_queue_data, off_heap}
setting is enabled, the garbage collector does not need to go through
the whole signal queue, giving better performance for processes with a
lot of signals in their signal queue. Therefore, it is also important
for the optimization not to add unnecessary overhead when the
OuterSignalQueueLock
is uncontended, so that we do not slow down
existing use cases for {message_queue_data, off_heap}
too much.
Data Structure and Birds-Eye-View of Optimized Implementation #
We decided to go for a design that enables the parallel signal sending
optimization on demand when the contention on the OuterSignalQueueLock
seems to be high to avoid as much overhead as possible when the
optimization is unnecessary. Here is a conceptual view of the process
structure when the optimization is not active (which is the initial
state when creating a process with {message_queue_data, off_heap}
):
The following figure shows a conceptual view of the process structure
when the parallel signal sending optimization is turned on. The only
difference between this and the previous figure is that the
OuterSignalQueueBufferArray
field now points to a structure
containing an array with buffers.
When the parallel signal sending optimization is active, senders do
not need to acquire the OuterSignalQueueLock
anymore. Senders are
mapped to a slot in the OuterSignalQueueBufferArray
by a simple hash
function that is applied to the process ID (senders without a process
ID are currently mapped to the same slot). Before a sender takes the
OuterSignalQueueLock
in the receiving process’ structure, the sender
tries to enqueue in its slot in the OuterSignalQueueBufferArray
(if
it exists). If the enqueue attempt succeeds, the sender can continue
without even touching the OuterSignalQueueLock
! The order of signals
coming from the same sender is maintained because the same sender is
always mapped to the same slot in the buffer array. Now, you have
probably got an idea of why the signal sending throughput can increase
so much with the new optimization, as we saw in the benchmark figure
presented earlier. Essentially, the contention on the
OuterSignalQueueLock
gets distributed among the slots in the
OuterSignalQueueBufferArray
. The rest of the subsections in this
section cover details of the implementation, so you can skip those
if you do not want to dig deeper.
Adaptively Activating the Outer Signal Queue Buffers #
As the figure above tries to illustrate, the OuterSignalQueueLock
carries
a statistics counter. When that statistics counter reaches a certain
threshold, the new parallel signal sending optimization is activated
by installing the OuterSignalQueueBufferArray
in the process
structure. The statistics counter for the lock is updated in a simple
way. When a thread tries to acquire the OuterSignalQueueLock
and the lock
is already taken, the counter is increased, and otherwise, it is
decreased, as the following code snippet illustrates:
void erts_proc_sig_queue_lock(Process* proc)
{
if (EBUSY == erts_proc_trylock(proc, ERTS_PROC_LOCK_MSGQ)) {
erts_proc_lock(proc, ERTS_PROC_LOCK_MSGQ);
proc->sig_inq_contention_counter += 1;
} else if(proc->sig_inq_contention_counter > 0) {
proc->sig_inq_contention_counter -= 1;
}
}
The Outer Signal Queue Buffer Array Structure #
Currently, the number of slots in the OuterSignalQueueBufferArray
is
fixed to 64. Sixty-four slots should go a long way to reduce signal
queue contention in most practical application that exists today. Few
servers have more than 100 cores, and typical applications spend a lot
of time doing other things than sending signals. Using 64 slots also
allows us to implement a very efficient atomically updatable bitset
containing information about which slots are currently non-empty (the
NonEmptySlots
field in the figure above). This bitset makes flushing
the buffer array into the OuterSignalQueue
more efficient
since only the non-empty slots in the buffer array need to be visited
and updated to perform the flush.
Sending Signals with the Optimization Activated #
Pseudo-code for the algorithm that is executed when a process is
sending a signal to another process that has the
OuterSignalQueueBufferArray
installed can be seen below:
- Allocate a new linked list node containing the signal data
- Map the process ID of the sender to the right slot
I
with the hash function - Acquire the
SlotLock
for the slotI
- Check the
IsAlive
field for slotI
- If the
IsAlive
field’s value istrue
:- Set the appropriate bit in the
NonEmptySlots
field, if the buffer is empty - Insert the allocated signal node at the end of the
BufferQueue
for slotI
- Increase the
NumberOfEnqueues
in slotI
by 1 - Release
SlotLock
for slotI
- The signal is enqueued, and the thread can continue with the next task
- Set the appropriate bit in the
- Else (the
OuterSignalQueueBufferArray
has been deactivated):- Release the lock for slot
I
- Do the insert into the
OuterSignalQueue
in the same way as the signal sending algorithm did it prior to the optimization
- Release the lock for slot
- If the
Fetching Signals from the Outer Signal Queue Buffer Array and Deactivation of the Optimization #
The algorithm for fetching signals from the outer signal queue uses
the NonEmptySlots
field in the OuterSignalQueueBufferArray
, so it
only needs to check slots that are guaranteed to be non-empty. At a
high level, the routine works according to the following pseudo-code:
- Acquire the
OuterSignalQueueLock
- For each non-empty slot in the buffer array:
- Lock the slot
- Append the signals in the slot to the end of
OuterSignalQueue
- Add the value of the slot’s
NumberOfEnqueues
field to theTotNumberOfEnqueues
field in theOuterSignalQueueBufferArray
- Reset the slot’s
BufferQueue
andNumberOfEnqueues
fields - Unlock the slot
- Increase the value of the
NumberOfFlushes
field in theOuterSignalQueueBufferArray
by one - If the value of the
NumberOfFlushes
field has reached a certain thresholdT
:- Calculate the average number of enqueues per flush
(
EnqPerFlush
) during the lastT
flushes (TotNumberOfEnqueues
/T
).- If
EnqPerFlush
is below a certain thresholdQ
:- Deactivate the parallel signal sending optimization:
- For each slot in the
OuterSignalQueueBufferArray
:- Acquire the
SlotLock
- Append the signals in the slot (if any) to the end of
OuterSignalQueue
- Set the slot’s
IsAlive
field tofalse
- Release the
SlotLock
- Acquire the
- Set the
OuterSignalQueueBufferArray
field in the process structure toNULL
- Schedule deallocation of the buffer array structure
- For each slot in the
- Deactivate the parallel signal sending optimization:
- Else if the average is equal to or above the threshold
Q
:- Set the
NumberOfFlushes
and theTotNumberOfEnqueues
fields in the buffer array struct to 0
- Set the
- If
- Calculate the average number of enqueues per flush
(
- Append the
OuterSignalQueue
to the end of theInnerSignalQueue
- Reset the
OuterSignalQueue
- Release the
OuterSignalQueueLock
For simplicity, many details have been left out from the pseudo-code snippets above. However, if you have understood them, you have an excellent understanding of how signal sending in Erlang works, how the new optimization is implemented, and how it automatically activates and deactivates itself. Let us now dive a little bit deeper into benchmark results for the new implementation.
Benchmark #
A configurable benchmark to measure the performance of both signal
sending processes and receiving processes has been created. The
benchmark lets N
Erlang processes send signals (of configurable types
and sizes) to a single process during a period of T
seconds. Both N
and T
are configurable variables. A signal with size S
has a payload
consisting of a list of length S
with word-sized (64 bits) items. The
send throughput is calculated by dividing the number of signals that
are sent by T
. The receive throughput is calculated by waiting until
all sent signals have been received and then dividing the total number
of signals sent by the time between when the first signal was sent and
when the last signal was received. The benchmark machine has 32 cores
and two hardware threads per core (giving 64 hardware threads). You
can find a detailed benchmark description on the signal queue
benchmark page.
First, let us look at the results for very small messages (a list containing a single integer) below. The graph for the receive throughput is the same as we saw at the beginning of this blog post. Not surprisingly, the scalability for sending messages is much better after the optimization. More surprising is that the performance of receiving messages is also substantially improved. For example, with 16 processes, the receive throughput is 520 times better with the optimization! The improved receive throughput can be explained by the fact that in this scenario, the receiver has to fetch messages from the outer signal queue much more seldom. Sending is much faster after the optimization, so the receiver will bring more messages from the outer signal queue to the inner every time it runs out of messages. The sender can thus process messages from the inner queue for a longer time before it needs to fetch messages from the outer queue again. We cannot expect any improvement for the receiver beyond a certain point as there is only a single hardware thread that can work on processing messages at the same time.
Below are the results for larger messages (a list containing 100 integers). We do not get as good improvement in this scenario with a larger message size. With larger messages, the benchmark spends more time doing other work than sending and receiving messages. Things like the speed of the memory system and memory allocation might become limiting factors. Still, we get decent improvement both in the send throughput and receive throughput, as seen below.
You can find results for even larger messages as well as for non-message signals on the benchmark page. Real Erlang applications do much more than message and signal sending, so this benchmark is, of course, not representative of what kind of improvements real applications will get. However, the benchmarks show that we have pushed the threshold for when parallel message sending to a single process becomes a problem. Perhaps the new optimization opens up new interesting ways of writing software that was impractical due to previous performance reasons.
Possible Future Work #
Users can configure processes with {message_queue_data, off_heap}
or
{message_queue_data, on_heap}
. This configurability increases the
burden for Erlang programmers as it can be difficult to figure out
which one is better for a particular process. It would therefore make
sense also to have a {message_queue_data, auto}
option that would
automatically detect lock contention even in on_heap
mode and
seamlessly switch between on_heap
and off_heap
based on how much
contention is detected.
As discussed previously, 64 slots in the signal queue buffer array is a good start but might not be enough when servers have thousands of cores. A possible way to make the implementation even more scalable would be to make the signal queue buffer array expandable. For example, one could have contention detecting locks for each slot in the array. If the contention is high in a particular slot, one could expand this slot by creating a link to a subarray with buffers where senders can use another hash function (similar to how the HAMT data structure works).
Conclusion #
The new parallel signal queue optimization that affects processes
configured with {message_queue_data, off_heap}
yields much better
scalability when multiple processes send signals to the same process
in parallel. The optimization has a very low overhead when the
contention is low as it is only activated when its contention
detection mechanism indicates that the contention is high.