Waiting on Events from within Threadpool

Download source code from MSDN Code Gallery.

Till now we have seen how to use the default process-wide threadpool to submit work items, how to create your own private threadpool and how to create timer objects to execute callbacks when the timer fires. In this post (final one in the threadpool series), we will see how to create waiter objects so that you can run your callback code when an event fires.

 

Both the windowsthreadpool::SimpleThreadPool and windowsthreadpool::PrivateThreadPool class support the following member functions to support events; RegisterEvent, ReRegisterEvent and DestroyEvent. To wait on an event, you need to call RegisterEvent; once that event fires, you need to call ReRegisterEvent to wait on it again. DestroyEvent closes the wait object in the threadpool so that you no longer wait on that event.

Example Code:

void CALLBACK HelloWorld2(PVOID state)

{

       UNREFERENCED_PARAMETER(state);

 

       cout << "Hello World2." << endl;

}

 

void CALLBACK HelloWorld(PVOID state)

{

       UNREFERENCED_PARAMETER(state);

 

       cout << "Hello World." << endl;

}

 

int _cdecl _tmain()

{

       using namespace windowsthreadpool;

 

       SimpleThreadPool stp;

       PrivateThreadPool ptp;

 

       HANDLE hEvent = CreateEvent(NULL, FALSE, FALSE, NULL);

       if (NULL == hEvent)

       {

              throw "Could not create event."; 

       }

 

       PVOID handle = stp.RegisterEvent(HelloWorld, hEvent, 100000);

 

       SetEvent(hEvent);

 

       stp.DestroyEvent<THREADPOOLCALLBACK>(handle);

 

       stp.WaitForAll();

 

 

       PVOID phandle = ptp.RegisterEvent(HelloWorld2, hEvent, 100000);

 

       SetEvent(hEvent);

       Sleep(100);

 

       ptp.ReRegisterEvent<THREADPOOLCALLBACK>(phandle);

 

       SetEvent(hEvent);

 

       Sleep(100);

 

       ptp.DestroyEvent<THREADPOOLCALLBACK>(phandle);

 

       ptp.WaitForAll();

}

In this example, we create an event and register interest in the event by queuing a work item to the process-wide threadpool along with a pointer to our callback (HelloWorld). When the event is set, the callback “HelloWorld” executes. A different callback (HelloWorld2) is registered with the private threadpool for the same event. This callback will run twice.

 

The RegisterEvent method takes in a function pointer to the callback function, an event handle and a timeout value. If the timeout is -1, then it waits indefinitely for the event. The RegisterEvent method returns an opaque handle which is passed as input to the DestroyEvent and ReRegisterEvent methods. Once a raised event is handled in the callback, you need to re-register interest in the event for the threadpool to call your callback again.

 

The majority of the work for these methods is happening in the internal class WaitCallBack. When you register an event, it first creates TP_WAIT object by calling CreateThreadpoolWait using the same environment block for all work items queued via the same threadpool. It then passes this wait object along with the event handle and a timeout value to SetThreadpoolWait. If a user wants to wait indefinitely for the event to fire, then you need to pass NULL as the timeout value to SetThreadpoolWait.

WaitCallBack(const Function Func, PVOID st, PTP_CALLBACK_ENVIRON pEnv, HANDLE Event, int timeout) : m_Func(Func), state(st), m_Event(Event), TimeOut(timeout) 

{                   

       wait = CreateThreadpoolWait(callback, this, pEnv);

       if (!wait)

       {

              throw "Error: Could not create threadpool wait object.";

       }

                    

       if (timeout > -1)

       {

              FILETIME dueTime;

              *reinterpret_cast<PLONGLONG>(&dueTime) = -static_cast<LONGLONG>(MILLI_SECOND_TO_NANO100(timeout));

              SetThreadpoolWait(wait, Event, &dueTime);

       }

       else

       {

              SetThreadpoolWait(wait, Event, NULL); // timeout is -1 which indicates infinite timeout

       }

}

The ReRegisterEvent method simply calls the SetThreadpoolWait as shown above.

The DestroyEvent method indicates that you are no longer interested in this event. Remember back in the first post I mentioned that if you call CloseThreadpoolCleanupGroupMembers, it allows you to free all work objects, wait objects and timers associated with the cleanup group in one call. So that’s why in the DestroyEvent method, we check if WaitForAll has already been called (which would imply the wait object is already freed) and if not then call the internal DestroyEvent method.

template <class Function>

bool DestroyEvent(PVOID EventHandle)

{

       windowsthreadpool::internal::WaitCallBack<Function> *wcb = reinterpret_cast<windowsthreadpool::internal::WaitCallBack<Function>*> (EventHandle);

       if (InfraInitialized)

       {

              wcb->DestroyEvent();

       }

       delete wcb;

       return true;

}

 

And that’s it! Over the course of this series on threadpool we have learnt how to queue work items to the default process-wide threadpool, create your own private threadpool, queue work items to it, create a background threadpool which runs low priority work items, create timer objects and wait on events to run code when the events are signaled.

 

The only functionality that’s not covered in this series is using the threadpool to perform asynchronous IO.  Maybe I will cover that in a later series. In the meantime, if you have any questions on the win32 threadpool, please leave a comment and I will try to get it answered.