In my last past, we looked at building an AsyncLock in terms of an AsyncSemaphore. In this post, we’ll build a more advanced construct, an asynchronous reader/writer lock.
An asynchronous reader/writer lock is more complicated than any of the previous coordination primitives we’ve created. It also involves more policy, meaning there are more decisions to be made about how exactly the type should behave. For the purposes of this example, I’ve made a few decisions. First, writers take precedence over readers. This means that regardless of the order in which read or write requests arrive, if a writer is waiting, it will also get priority over any number of waiting readers, even if it arrived later than those readers. Second, I’ve decided not to throttle readers, meaning that all waiting readers will be released as soon as there are no writers outstanding or waiting. Both of those points could be debated based on the intended usage of the type, so you might choose to modify the implementation based on your needs.
Here’s the shape of the type we’ll build:
public class AsyncReaderWriterLock
{
public AsyncReaderWriterLock();public Task<Releaser> ReaderLockAsync();
public Task<Releaser> WriterLockAsync();public struct Releaser : IDisposable
{
public void Dispose();
}
}
As with the AsyncLock, we’ll utilize a disposable Releaser to make it easy to use this type in a scoped manner, e.g.
private readonly AsyncReaderWriterLock m_lock = new AsyncReaderWriterLock();
…
using(var releaser = await m_lock.ReaderLockAsync())
{
… // protected code here
}
This Releaser is almost identical to that used in AsyncLock, except that we’re using the same type to represent both readers and writers, and since we need to behave differently based on which kind of lock is being released, I’ve parameterized the Releaser accordingly:
public struct Releaser : IDisposable
{
private readonly AsyncReaderWriterLock m_toRelease;
private readonly bool m_writer;internal Releaser(AsyncReaderWriterLock toRelease, bool writer)
{
m_toRelease = toRelease;
m_writer = writer;
}public void Dispose()
{
if (m_toRelease != null)
{
if (m_writer) m_toRelease.WriterRelease();
else m_toRelease.ReaderRelease();
}
}
}
In terms of members variables, I need several more for this type than I’ve needed for the other data structures previously discussed. First, we will have fast paths in this type, so I want to cache a Task<Releaser> for reader waits that complete immediately, and one for writer waits that complete immediately.
private readonly Task<Releaser> m_readerReleaser;
private readonly Task<Releaser> m_writerReleaser;
These members will be initialized in the constructor:
public AsyncReaderWriterLock()
{
m_readerReleaser = Task.FromResult(new Releaser(this, false));
m_writerReleaser = Task.FromResult(new Releaser(this, true));
}
Next, I need to maintain a queue of writer waiters, one TaskCompletionSource<Releaser> for each, since I need to be able to wake them individually. I also need a TaskCompletionSource<Releaser> for my readers; however, for the readers, per our previously discussed design, when it’s time to allow a reader to run, I can allow them all to run, and therefore I just need a single TaskCompletionSource<Releaser> that all of the readers in a given group will wait on. However, since I’m maintaining a single TaskCompletionSource<Releaser> for all readers, I also need to maintain a count of how many readers are waiting, so that when I eventually wake them all, I can keep track of all of their releases and know when there are no more outstanding readers.
private readonly Queue<TaskCompletionSource<Releaser>> m_waitingWriters =
new Queue<TaskCompletionSource<Releaser>>();
private TaskCompletionSource<Releaser> m_waitingReader =
new TaskCompletionSource<Releaser>();
private int m_readersWaiting;
Finally, I need a variable to maintain the current status of the lock. This will be an integer, where the value of 0 means that no one has acquired the lock, a value of –1 means that a writer has acquired the lock, and a positive value means that one or more readers have acquired the lock, where the positive value indicates how many.
private int m_status;
We now have four methods to implement: ReaderLockAsync, ReaderRelease, WriterLockAsync, and WriterRelease.
ReaderLockAsync is used when a new reader wants in. After acquiring the lock on m_waitingWriters (which we’ll use across all four of these methods to ensure data consistency), we need to determine whether the reader should be allowed in immediately or should be forced to wait. Based on the policy described earlier, if there are currently no writers active or waiting, then this reader can be allowed in immediately; in that case, we increment the status (which would have either been 0, meaning no activity on the lock, or positive, meaning there are currently readers) and we return the cached reader releaser. If, however, there was an active or waiting writer, then we need to force the reader to wait, which we do by incrementing the count of the number of readers waiting, and return the m_waitingReader task (or, rather, a continuation off of the reader task, ensuring that all awaiters will be able to run concurrently rather than getting serialized).
public Task<Releaser> ReaderLockAsync()
{
lock (m_waitingWriters)
{
if (m_status >= 0 && m_waitingWriters.Count == 0)
{
++m_status;
return m_readerReleaser;
}
else
{
++m_readersWaiting;
return m_waitingReader.Task.ContinueWith(t => t.Result);
}
}
}
WriterLockAsync is used when a new writer wants in. As with ReaderLockAsync, there are two cases to deal with: when the writer can be allowed in immediately, and when the writer must be forced to wait. The only time a writer can be allowed in immediately is when the lock is currently not being used at all; since a writer must be exclusive, it can’t run when there are an active readers or active writers. So, if m_status is 0, we change the status to indicate that there’s now an active writer, and we return the cached writer releaser. Otherwise, we create a new TaskCompletionSource<Releaser> for this writer, queue it, and return its Task.
public Task<Releaser> WriterLockAsync()
{
lock (m_waitingWriters)
{
if (m_status == 0)
{
m_status = -1;
return m_writerReleaser;
}
else
{
var waiter = new TaskCompletionSource<Releaser>();
m_waitingWriters.Enqueue(waiter);
return waiter.Task;
}
}
}
Now we need to write the release functions, which are called when an active reader or writer completes its work and wants to release its hold on the lock. ReaderRelease needs to decrement the count of active readers, and then check the current state of the lock. If it was the last active reader and there are now writers waiting, then it needs to wake one of those writers and mark that the lock now has an active writer. We don’t need to check for any pending readers; if there are any writers, then they’d take priority anyway, and if there aren’t any pending writers, than any readers that had arrived would have been allowed in immediately.
private void ReaderRelease()
{
TaskCompletionSource<Releaser> toWake = null;lock (m_waitingWriters)
{
–m_status;
if (m_status == 0 && m_waitingWriters.Count > 0)
{
m_status = -1;
toWake = m_waitingWriters.Dequeue();
}
}if (toWake != null)
toWake.SetResult(new Releaser(this, true));
}
Finally, we need our WriterRelease method. When a writer completes, if there are any pending writers waiting to get in, we simply dequeue and complete one of their tasks (we don’t need to update the lock’s status, since there will still be a single active writer, with one having completed and a new one having taken its place). If there aren’t any writers, but there are readers waiting, then we can complete the single task on which all of those readers are waiting; in that case, we also need to create a new single task for all subsequent readers to wait on, and we need to update our status accordingly to now indicate how many active readers there are. If there weren’t any writers or readers waiting, then we can simply reset the lock’s status.
private void WriterRelease()
{
TaskCompletionSource<Releaser> toWake = null;
bool toWakeIsWriter = false;lock (m_waitingWriters)
{
if (m_waitingWriters.Count > 0)
{
toWake = m_waitingWriters.Dequeue();
toWakeIsWriter = true;
}
else if (m_readersWaiting > 0)
{
toWake = m_waitingReader;
m_status = m_readersWaiting;
m_readersWaiting = 0;
m_waitingReader = new TaskCompletionSource<Releaser>();
}
else m_status = 0;
}if (toWake != null)
toWake.SetResult(new Releaser(this, toWakeIsWriter));
}
That’s it. Now, a production implementation of such a lock would likely want to be better instrumented, throw exceptions for erroneous usage (e.g. releasing when there wasn’t anything to be released), and so forth, but this should give you a basic sense of how such an asynchronous reader/writer lock could be implemented.
Before I conclude, it’s worth highlighting that .NET 4.5 includes a related type: ConcurrentExclusiveSchedulerPair. I briefly discussed this type when describing what was new for parallelism in .NET 4.5 Developer Preview, but in short, it provides reader/writer-like scheduling for tasks (and it’s robust and has been well-tested, unlike the code in this post). Hanging off of an instance of ConcurrentExclusiveSchedulerPair are two TaskScheduler instances: ConcurrentScheduler and ExclusiveScheduler. These two schedulers collude to ensure that an “exclusive” (or writer) task may only run when no other task associated with the schedulers is running, and that one or more “concurrent” (or reader) tasks may run concurrently as long as there are no exclusive tasks. The type includes more advanced capabilities than what I’ve implemented in this post, for example being able to throttle readers (whereas in my AsyncReaderWriterLock in this post, all readers are allowed in as long as there are no writers).
You can use ConcurrentExclusiveSchedulerPair to build similar solutions as what you might use AsyncReaderWriterLock in. For example, instead of wrapping the protected code with a block that will on entrance access and await AsyncReaderWriterLock.WriterLockAsync and then on exit call the returned Releaser’s Dispose, you could instead await a task queued to the ConcurrentExclusiveSchedulerPair’s ExclusiveScheduler. ConcurrentExclusiveSchedulerPair also works well with systems layered on top of TPL and in terms of TaskScheduler, like TPL Dataflow. You can, for example, create multiple ActionBlocks that all target the same ConcurrentScheduler instance configured with a maximum concurrency level, and then all of those blocks will collude to ensure they don’t go above that maximum.
However, there is a key behavior aspect of ConcurrentExclusiveSchedulerPair to keep in mind: it works at the level of a Task’s execution, and this may or may not be what you want. If you write code like the following:
Task.Factory.StartNew(async delegate =>
{
… // code #1
await SomethingAsync();
… // code #2
}, CancellationToken.None, TaskCreationOptions.None, myExclusiveScheduler);
that could either end up resulting in one Task queued to the scheduler (if the Task returned from SomethingAsync was complete by the time we awaited it), or it could result in two Tasks queued to the scheduler (if the Task returned from SomethingAsync wasn’t yet complete by the time we awaited it). If it results in two tasks, then the atomicity provided by the exclusive scheduler applies to each task individually, not across them… in effect the exclusive lock can be released while awaiting. For certain scenarios, this is exactly what you want, and in particular for cases where you’re using the atomicity to provide consistency while accessing in-memory data structures and where you ensure that await calls do not come in the middle of such modifications. To achieve the same behavior with AsyncReaderWriterLock, you’d need to do something like the following:
Task t = null;
using(var releaser = await m_lock.WriterLockAsync())
{
… // code #1
t = SomethingAsync();
}
await t;
using (var releaser = await m_lock.WriterLockAsync())
{
… // code #2
}
That concludes my short series on building async coordination primitives. I hope you enjoyed it.
Hey, there are missing await keywords here too. The same typo as in the previous post. Unfortunately the Task class implements IDisposable, so the compiler interprets this as valid code. Would it be a reliable solution to wrap the returned Task instances with lightweight objects that has only a GetAwaiter method?
Thanks, Tamas. I'd typed that example at the end directly into Live Writer, so no compiler was involved. I've fixed it the typo.
Regarding wrapping Task, you could do that if you wanted to. If you remembered to do it, it would help flag missing awaits, but then again, if you remember to do it, you might as well remember to use awaits 😉 I expect an FxCop rule or some similar analysis tool would be more reliable.
Great series, thanks. Are there plans to include any of these types natively in .NET 4.5? (Perhaps other than AsyncReaderWriterLock, which seems redundant compared to ConcurrentExclusiveSchedulerPair.)
Hi Dave-
I'm glad you enjoyed the posts. Regarding .NET 4.5 plans, stay tuned 🙂
Great series! Very interesting.
I was wondering about memory barriers.
In these sample posts, you're using lock during each acquire/release – which includes a memory barrier – so a volatile qualifier wouldn't be necessary for any data protected by AsyncReaderWriterLock (I think).
Question: what memory barrier guarantees does ConcurrentExclusiveSchedulerPair make?
Hi Stephen-
I'm glad you like the series.
CESP doesn't itself need to make any such guarantees, because it only schedules Tasks, and tasks themselves have their own guarantees. Any data published prior to a Task getting scheduled will be visible to that Task's body, and any modifications by a Task will be visible to anyone joining with the Task when it completes.
Hi Stephen,
I have a very general question regarding async context 'behind' await – and was not able to clear this from existing documents.
In order to have lock-free code I use the actor model and single threaded synchronization contexts (like WinForms or Stephen Clearys Nito library).
In this case, it seems that when continuing after 'await' it is guaranteed to be on the same thread as before – therefore no locks are needed to protect datastructures accessed before and after await.
But – what thread is used inside the async method2 called through 'await' ?
Do I have to protect shared datastructures when accessing them from e.g. the UI-Thread as well as from a task generated by the compiler through await ?
In my opinion this would be *VERY* dangerous.
But on the other hand, how can these auto-generated tasks beeing scheduled onto other threads and CPU's in order to speed up long running tasks ?
e.g:
List<string> list; // shared, not threadsafe data
async Task method1()
{
list.Add ("Start");
await method2();
list.Add (" "Stop"); // ok, same thread (when NOT using the default threadpool synchronization context)
}
async Task method2()
{
for(i=0; i<1000; i++)
{
list.Add ("Doing "+i); // ???? what thread ????
DoSomethingComputeIntense();
}
}
Hi Stefan-
Asynchronous methods are invoked synchronously. When you call method2, you're calling it on the current thread. If method2 never awaits something that's not yet completed, method2 will run to completion on the current thread. Since you have no awaits in your method2, method2 will run entirely synchronously on the current thread.
If you want to run code on a thread other than the current one, you can use Task.Run, e.g.
… // on UI thread
await Task.Run(() =>
{
… // on ThreadPool thread
});
… // back on UI thread
I hope that helps.
Sorry, my example was not so good, I try to formulate my question different:
1) Is it true, that I never will run on another thread unless I use "await Task.Run(…)" or explicitly start a thread ?
Reason:
– the start of an async function runs synchronously
– the completion after await runs on the same thread as before await – since I use a single threaded synchronization context.
2) Is it true, that using this programming model, the non interruptable (atomic) code portions span from one await to the next await – regardless of how deeply nested the async functions are ?
Many thanks for yor response!
Hi Stefan-
Regarding (1), no, that's not true. There are two initial cases to think through here: has the task being awaited completed or not when you await it. If it has completed, then the await is effectively a nop, and the current thread just continues running what comes after the await. The second case of the task not yet being complete is the more interesting one, and again there are two cases here, based on whether or not there's a custom SynchronizationContext or TaskScheduler on the current thread. If there is such a context/scheduler set, then the code that comes after the await will be scheduled back to that context/scheduler when the awaited task completes (e.g. if you were running on the UI thread of a WPF app when you awaited the task, then when the task completes, the continuation will be queued back to the Dispatcher for the UI thread). If there is no custom context, then there's nowhere to marshal the continuation back to, and as such, it'll typically just run on whatever thread is completing the task… that's likely to be a thread pool thread.
Now, for your specific example under "Reason", yes, that's true. Since you were using a single-threaded synchronization context, all of the awaits will return back to that same thread, so unless you explicitly change that, you'll only execute on that one thread. Such "explicit" means could include using Task.Run(…) to put a delegate onto a ThreadPool thread. It could also include using "await task.ConfigureAwait(false);" to ignore the current context if there is one.
Regarding (2), yes, that's true. The only points at which an async method will potentially yield is at an await site within the method.
Now I got it – many thanks for your detailed answer!
Coming back to async coordination primitives …
I understand the possibility to do such things and it is instructive – but they look very scary to me.
C# async/await seems to be a concept to bring asynchronous multitasked applications to mainstream.
Would't it be nice to have a small set of rules that enables 90% of the programmers to write deadlock- and race condition-free multitasked applications without ever thinking about locks of whatever kind ?
I think an actor based model could come near to that goal and the rules would have to include …
– Always have a single threaded synchronization context, and be warned when falling back to a default multithreaded threadpool context.
– Data belongs to a sync context, do something against foreign threads trying to access your data.
– Guarantee that data sent to another sync context (e.g. post messages, post lambdas+parameters…) is not accessible by the origing sync context anymore.
It would be interresting to hear from you what you think about an actor based model and what you would do to enforce such guiding rules.
Sorry for being a bit off topic here.
I recently updated AsyncWcfLib (sourceforge.net/…/asyncwcflib) to support async-await and an actor based programming model (sourceforge.net/…/index.php).
It is a library implementation of the ideas I mentioned in my last post.
The library user can write lockfree code even for a multithreaded and asynchronous application.
But I could not make it completly foolproof. The user has to be careful not to fall back on a threadpool thread and he may not modify a already sent message. I'm doing some threadsafety checking during runtime (like WinForms or WPF) to fight against these programming errors.
Any chance of a awaitable (global) Mutex or is that not possible?
@ Richard Szalay: By "global", you mean across Windows processes?
I do, though since Mutex's have thread identity I ended up using them synchronously in a background thread. (Semaphores aren't available on WP7)
Is there a specific reason to keep creating new Releasers instead of caching just 2, a reader one and a writer one, just like the cached tasks?
or, rather, a continuation off of the reader task, ensuring that all awaiters will be able to run concurrently rather than getting serialized
What does this mean? Can you explain it in detail, thanks.
@Bar Arnon: Release is a struct; it's cheap to create and doesn't involve any allocations.
@Bin Du: Continuations can be either asynchronous or synchronous with respect to the antecedent task completing; when a task completes, it'll queue any continuations registered as asynchronous, and it'll execute any continuations registered as synchronous. Obviously if it's executing a continuation, it can only one run at a time, so if there are multiple synchronous continuations registered, they'll be executed serially with respect to each other. ContinueWith creates asynchronous continuations by default, but you can ask for a synchronous continuation via the TaskContinuationOptions.ExecuteSynchronously option. The await keyword by default creates synchronous continuations (because they have less overhead and the most common case is that a Task is created and then immediately awaited). So, if m_waitingReader.Task were returned directly, it's likely that multiple pieces of code would end up awaiting it with synchronous continuations that would end up getting executed serially with respect to each other. By tacking on a ContinueWith call, the continuations on m_waitingReader.Task will all be asynchronous.
Regarding to the latest issue about synchronous continuations and your answer, I am a little confused, so I need to clarify some things.
Is it true, in general, that in C# 5.0 async-await functionality, When we await one Task instance across multiple threads, then:
1) If we have SynchronizationContent.Current set, then all continuations are serialized and executed on the same Context ( i.e on the same UI thread ) serially? This case is rather obvious I hope.
2) If we have custom TaskScheduler implemented and deployed ( TaskScheduler.Current != TaskScheduler.Default), then all continuations are scheduled to that scheduler, and may or may not be executed serially or paralelly, depending on internal implementation of our custom TaskScheduler?
3) If we have no custom TaskScheduler and we use default one System.Threading.Tasks.ThreadPoolTaskScheduler ( TaskScheduler.Current == TaskScheduler.Default), then all continuations are serialized, because for optimization purposes they are executed on the thread associated with awaited and already completed Task? Despite of that default TaskScheduler support paralellism via ThreadPool thread?
4) If we have configured awaiting by using ConfigureAwait( false ), then continuations are marshalled back to the default TaskScheduler ( TaskScheduler.Default ), so is is the same situation as above ( Case 3rd ), thus continuations are serialized?
Thank you for your attention,
Best regards,
Sgn
@sgn:
Most of your questions are answered at blogs.msdn.com/…/10293335.aspx, but answers below as well…
re: "If we have SynchronizationContent.Current set, then all continuations are serialized and executed on the same Context ( i.e on the same UI thread ) serially? This case is rather obvious I hope."
When you await a task (assuming you don't use ConfigureAwait(false)), the system looks to see if there's a current SynchronizationContext. If there is, it ensures the continuation executes there. Whether all continuations on the same task and associated with the same SynchronizationContext are executed serially depends on the SynchronizationContext. Most, but not all, SynchronizationContexts will execute queued work serially, such as ones for UI contexts, but it's not a requirement, and in fact the base SynchronizationContext class targets the ThreadPool and has no such guarantee… its Post implementation just does ThreadPool.QueueUserWorkItem.
re: "If we have custom TaskScheduler implemented and deployed ( TaskScheduler.Current != TaskScheduler.Default), then all continuations are scheduled to that scheduler, and may or may not be executed serially or paralelly, depending on internal implementation of our custom TaskScheduler?"
When you await a task (assuming you don't use ConfigureAwait(false)), the system first looks to see if there's a custom SynchronizationContext. If there isn't, it then looks to see if there's a custom TaskScheduler. As with SynchronizationContext, it's up to the scheduler how the queued work runs. Some TaskSchedulers will ensure queued work runs serialized, others will let work run in parallel.
re: "if we have no custom TaskScheduler and we use default one System.Threading.Tasks.ThreadPoolTaskScheduler ( TaskScheduler.Current == TaskScheduler.Default), then all continuations are serialized, because for optimization purposes they are executed on the thread associated with awaited and already completed Task? Despite of that default TaskScheduler support paralellism via ThreadPool thread?"
Usually the case but not guaranteed. In many cases the system will iterate through such continuations on the task and just execute them serially rather than queueing them, but there are situations in which they'll be queued instead.
re: "If we have configured awaiting by using ConfigureAwait( false ), then continuations are marshalled back to the default TaskScheduler ( TaskScheduler.Default ), so is is the same situation as above ( Case 3rd ), thus continuations are serialized?"
It's the same as above, as if TaskScheduler.Default was being used.
Thank you for your explanation. I will definitely read your FAQ post blog, but I would like to ask you last question about async-await (multiple) continuations issue:
Let's assume we that have TaskScheduler context. If we have the case that continuation are executed serially, in sequence, does it mean that it is done via invocations of TryExecuteTaskInline method? If so, are these invocations done on the thread that was executed awaited Task (or on thread that has invoked SetResult method of TaskCompletionSource)?
Than you for your answers and wonderful blog posts?
Best regards,
Sgn
@Sgn:
re: "Let's assume we that have TaskScheduler context. If we have the case that continuation are executed serially, in sequence, does it mean that it is done via invocations of TryExecuteTaskInline method? If so, are these invocations done on the thread that was executed awaited Task (or on thread that has invoked SetResult method of TaskCompletionSource)?"
Yes, TryExecuteTaskInline is used from the thread that's completing the Task's execution.
You can trace through all of this yourself in the reference source, e.g. here's the method invoked to run all of a task's continuations:
referencesource.microsoft.com