Lock free many-producer/single-consumer patterns: A work queue of identical non-coalescable events


Onward with our miniseries on lock-free many-producer/single-consumer patterns. Today, we're going to look at the case where you have a work queue where there can be multiple work items, and you need to perform them all, but each item is identical.

For example, you may have a Buy It button. Each time the user clicks the button, you want to run a transaction. But each button press is equivalent; all that's important is the number of times the user pushed the button.

Okay, that's not a very good example, but it'll have to do.

One way of doing this is with a semaphore, where the number of tokens in the semaphore is the number of work items left to be done. But let's stick with our current pattern where the producers need to manually wake the consumer, say with a message, and we want to minimize the number of times we need to perform the wake ritual.

LONG WorkCount;

void RequestWork()
{
 if (InterlockedIncrement(&WorkCount) == 1) {
  // You provide the WakeConsumer() function.
  WakeConsumer();
 }
}

// You call this function when the consumer receives the
// signal raised by WakeConsumer().
void ConsumeWork()
{
 while (InterlockedDecrementToZero(&WorkCount)) {
  DoSomeWork();
 }
}

bool InterlockedDecrementToZero(LONG volatile* value)
{
 LONG original, result;
 do {
  original = *value;
  if (original == 0) return false;
  result = original - 1;
 } while (InterlockedCompareExchange(value, result,
                               original) != original);
 return true;
}

The Interlocked­Decrement­To­Zero function follows the pattern for building complex interlocked operations, in this case, decrementing a number, but not decrementing it below zero. We check if the value is zero; if so, then stop and return false. Otherwise, try to swap it with the value one less than the current value. If that fails, then it means that another thread changed the WorkCount while we were busy thinking, so we start over. If we successfully decremented, then return true.

The Work­Count variable remembers how much work there is for the consumer to do. When the first piece of work arrives, we wake the consumer, and the consumer keeps draining the work until it's all done.

Remember, there is only one consumer, so if Wake­Consumer is called while Consume­Work is still running, the wake will not start a new consumer immediately. It will wait for the existing Consume­Work to complete before starting a new Consume­Work.

Although this specific pattern may not be all that interesting, it can be viewed as a building block on top of which other patterns are built. We'll look at one such next time.

Exercise: Why couldn't the Interlocked­Decrement­To­Zero function have been written like this?

// Code in italics is wrong.
LONG InterlockedDecrementToZero(LONG volatile* value)
{
 LONG original = *value;
 if (original == 0) return false;
 InterlockedDecrement(value);
 return true;
}

Bonus chatter: We could have avoided having to write the Interlocked­Decrement­To­Zero by writing this instead: void ConsumeWork() { LONG count = InterlockedExchange(&WorkCount); for (LONG i = 0; i < count; i++) { DoSomeWork(); } }

Discuss.

Comments (21)
  1. Doug says:

    Exercise: is the answer because if two threads call (the bad version of) InterlockedDecrementToZero() at the same time, you could 'skip' 0: if value is currently 1, both threads skip through the vs 0 check, then both threads call InterlockedDecrement() => you end up with value being -1.
    The correct implementation avoids this with the loop & guard - 'only apply the decrement if it will result in one less than the value when I started my thinking.'

  2. Chris B says:

    For the exercise, the write is atomic, but the read & decrement are not an atomic unit. This leaves the possibility that thread A and thread B both read 1 into "original" , so the "return false" is skipped, and both execute the decrement, leaving -1 in *value.

    Side note - shouldn't the InterlockedExchange call in the bonus chatter be:
    LONG count = InterlockedExchange(&WorkCount, 0);

    1. PetSerAl says:

      I am agree with you about general purpose InterlockedDecrementToZero. But in given example we have only one consumer. So, function InterlockedDecrementToZero invoked only in one thread at time.

      1. I was writing a general purpose InterlockedDecrementToZero, because somebody is going to take this function and use it in a general-purpose way, unaware that there is an implicit assumption that there is only one consumer.

  3. Pierre B. says:

    As far as the alternative is concerned, the main difference depends on how long doing one work takes vs how long a sleep/wake up cycle takes. (I'm assuming that the exchange functions are extremely fast comapred to those two, as they should.) If the work is short, then there is only a small probability that another work item gets queued, so fetching all the counts at once makes sense. If the work is longer, it might be better to decrement the value one by one so that if a producer comes in during the work, the worker will see the new work immediately, avoiding a sleep/wake (or post/get message).

    (OTOH, I find the second version simpler, which given how hard multi-threaded code is, is a net win in my opinion. That DecrementToZero function is easy to get wrong.)

    1. I think you could avoid the redundant sleep/wake cycle by having a second interlocked variable using the "refresh" model. But the overhead of doing the multiple decrements is probably trivial in almost all cases, so the added complexity would only be worth it if you can combine multiple operations together, e.g., a `DoSomeWork(n)` function which is faster than calling `DoSomeWork(1)` n times.

  4. DohyunShin says:

    Hi Chen,
    I have read your post "Querying information from an Explorer window" and it was helpful for me. Thank you.

    I want to ask a question to you but I couldn't write a comment at the post "Querying information from an Explorer window" because the comments are closed.
    Furthermore, I couldn't find your e-mail address. So I writing this comment at this post. Sorry.

    Anyway, My question is How can i get the full paths of selected (of focused) files by user's click or keyboard in all currently open File Open Dialogs?
    This is the same as the content of the post "Querying information from an Explorer windows". Difference is that the subject is File Open Dialog instead explorer windows.
    What should i do for this?

    I'm trying to implement.
    Currently, I got the window handles of all currently open File Open Dialogs using EnumWindows().
    I plan to get the Item ListView handle of File Open Dialog using EnumChildWindows().
    And then I'll get the name of selected (or focused) file using ListView macro (like ListView_GetItemText or ListView_GetSelectedCount).
    But My plan have a problem that can get not the full path but just name of file.

    Do you have other solutions for this??

  5. Max says:

    Could it have been written like this ?

    if(InterlockedDecrement(value) == 0) return false;
    else return true;

  6. laonianren says:

    If there's only one consumer, how can WorkCount be decremented by two threads simultaneously?

    Also, the consumer may see the WorkCount as zero because the read isn't synchronized, so the work won't get done. The variable is volatile but the meaning of that depends on the language (C or C++, which version?), the compiler and even the compiler options!

    https://msdn.microsoft.com/en-us/library/jj204392.aspx

    1. Typo. Should be "if another thread changed the WorkCount while we were busy thinking." Fixed.

  7. alegr1 says:

    >Each time the user clicks the button, you want to run a transaction

    "Customer service! I wanted to only buy one piece, but double-clicked the button by mistake, and it charged me for two!"

  8. Since there is only a single consumer, I don't see why you couldn't use a more efficient decrement-to-zero function:

    LONG SingleConsumerInterlockedDecrementToZero(LONG volatile* value)
    {
    LONG original = *value;
    if (original == 0) return false;
    return (InterlockedDecrement(value) == 0);
    }

    1. Neil says:

      I can't see how InterlockedDecrement can return zero - you just checked for it, and nothing else decrements it. (But then that reduces to the case which Raymond says is wrong, so I'm none the wiser.)

      1. InterlockedDecrement returns the new value, not the original value, so it will return zero if the counter started out as one and doesn't get incremented in the meantime. Of course, checking the new value is logically incorrect, so my version of the code is wrong - not because of a threading issue, it just does the wrong thing, skipping one work item every sleep/wake cycle.

        If I understand Raymond's latest comment correctly, the code posted as wrong is only wrong because it was intended to be a general-purpose InterlockedDecrementToZero. It will work perfectly in the single-consumer case.

    2. MG says:

      This won't work because (InterlockedDecrement(value) == 0) returns false when the value is larger than one.

      However, Raymond's InterlockedDecrementToZero with InterlockedDecrement should work correctly, if there is a guarantee that there exists only one consumer and WakeConsumer() just signals the existing consumer. The consumer is the only thread to decrement the value and it knows that the value is larger than zero so it can safely do atomic decrement. It does not matter that the read and decrement are separate because there is no one else to decrement the value and the decrement itself is atomic and does not use the original value that was read. The code would be wrong if the decrementation were done by writing "value = (original - 1);", because the original value may have been incremented after it was read.

      The first InterlockedDecrementToZero with InterlockedCompareExchange is needed when there are more than one consumer.

      I would write ConsumeWork() a bit differently, though. In ConsumeWork() I would ensure that the WorkCount is not zero (although it should never be) and then I would first DoSomeWork() and then decrement the WorkCount. If there were any work left (i.e. WorkCount > 0 after decrementation), I would loop back to DoSomeWork() again. In InterlockedDecrementToZero() I would check the value after the decrementation and return true if the value were non-zero.

      void ConsumeWork()
      {
      if (InterlockedCompareExchange(&WorkCount, 0, 0) > 0) {
      do {
      DoSomeWork();
      } while (InterlockedDecrement(&WorkCount) > 0);
      }
      }

      This implementation decreases the number of unneeded WakeConsumer() calls, because the WorkCount is always non-zero when a consumer is running and doing work. The consumer decrements the WorkCount to zero just before it is about to exit so even if WakeConsumer() were to start a new consumer thread there would be no more than one active consumer running at a time.

    3. True, we can simplify if there is a single consumer, but I was looking at a generalized InterlockedDecrementToZero function.

      1. Karellen says:

        Ah, that's why I couldn't figure out the exercise! That was really bugging me. Thanks for clarifying.

      2. Pierre B. says:

        Actually, I thought the bug was that reading a variable without a read/write barrier is not guaranteed to be correct? The Interlocked functions act as the barriers and prevent weird things happening between multiple processors with incoherent caches.

  9. ismo says:

    Why limit consumers to one ? I would limit them to number of cores ( or one smaller) if performance is wanted. Or monitor cpu load and aim for near 100%. But that would complicated things and perhaps this would use threadpool.

    1. I believe Raymond is assuming a scenario where multi-threading the consumer doesn't make sense. For example, the consumer might be updating the GUI, which is something you pretty much never want to multi-thread; if the consumer deals with a memory structure that needs to be kept consistent, and does not require a lot of CPU, single-threading can often be more efficient than multi-threading; or perhaps the consumer communicates with a remote service which only allows you a single connection (for whatever reason).

      In at least some cases, the whole point of using the producer-consumer pattern is to interface between a task that is naturally multi-threaded and a task that is naturally single-threaded. Often, if the consumer could sensibly be multi-threaded, there'd be no need for the queue in the first place - you could just consume the data in the same thread that produced it.

  10. I'm tempting fate here after yesterday's blunder, but I think this even simpler approach would also work:

    void ConsumeWork()
    {
    if (WorkCount > 0) do
    {
    DoSomeWork();
    } while (InterlockedDecrement(&WorkCount) > 0);
    }

Comments are closed.

Skip to main content