Developing with Thread Pool Enhancements
This article discusses building scalable applications that use the new thread pool APIs in Windows Server® 2008. A conceptual overview of the new thread pool architecture can be found in the companion topic, Understanding Thread Pool Enhancements.
Callback Objects and Behaviors
A thread pool is a collection of worker threads managed by the operating system to schedule and execute asynchronous operations, known as callbacks, on behalf of applications. Because the operating system is responsible for the worker threads and assigning queued work items to the threads, the developer is freed from handling the complexities of creating and destroying threads to optimize application throughput.
The new thread pool architecture is object-based. It includes the following object types:
Thread pools
Callback environments
Cleanup groups
Work callback objects
Wait callback objects
Timer callback objects
IO completion callback objects
For each of the objects that are used to schedule asynchronous callbacks for execution by a thread pool, it is important to ensure that the callback code makes no assumption about which worker thread it will execute on. Also, a worker thread must not be left in a modified state as a result of executing a callback. For example, if a callback modifies the worker thread’s priority, the callback must restore the original thread priority before exiting. It is also important to recognize that managing the lifecycle of a worker thread is the responsibility of the thread pool. Callbacks should never terminate the thread they are executing on.
Working with Thread Pools
When an application creates thread pool objects, it is responsible for freeing the thread pool resources when the thread pool is no longer needed. The thread pool APIs in Windows Server 2008 allow developers to:
Create a new thread pool.
Set the maximum and minimum number of threads managed by a thread pool.
Free a thread pool.
To create a new thread pool, applications call the CreateThreadpool function. The argument passed to this function is reserved for future use. To set the maximum and minimum number of threads used by a thread pool, call the SetThreadpoolThreadMaximum and SetThreadpoolThreadMinimum functions respectively. Each of these functions takes a pointer to a thread pool and a numeric value. To create a pool of persistent threads, set the maximum and minimum to the same value. For an example that demonstrates creating and using a thread pool, see the Work Callback Objects section of this topic.
Working with Callback Environments and Cleanup Groups
Callback environments allow an application to associate callback objects with a thread pool, associate a cleanup group with a thread pool's callback objects, and to specify characteristics of the callbacks that are bound to the callback environment. An application allocates a TP_CALLBACK_ENVIRON callback environment object and initializes it by calling the InitializeThreadpoolEnvironment function. When the application no longer needs the callback environment, it must be freed by calling the DestroyThreadpoolEnvironment function. To bind a callback environment to a thread pool, call the SetThreadpoolCallbackPool function. An application can use the SetThreadpoolCallbackRunsLong function to specify that the callbacks that share the callback environment do not return quickly. By calling the SetThreadpoolCallbackLibrary function, an application can associate the callbacks with a dynamic link library to prevent the library from unloading when there are callbacks that have not finished executing. However, callbacks that are awaiting execution in the thread pool's queue will not prevent the library from unloading. Therefore, it is important that the library's DllMain function handles DLL_PROCESS_DETACH and that it cancels all pending callback objects that might be queued.
A cleanup group provides a simple mechanism for freeing all of the callback objects that share a callback environment. An application creates a cleanup group by calling the CreateThreadpoolCleanupGroup function. When the cleanup group is no longer needed, the application must free it by calling the CloseThreadpoolCleanupGroup function. To associate a cleanup group with a callback environment, use the SetThreadpoolCallbackCleanupGroup function. When the callback objects associated with a callback environment are no longer needed, instead of freeing them individually, the application should free all of the objects as a group by calling the CloseThreadpoolCleanupGroupMembers function.
Working with Work, Wait, Timer, and IO Completion Callback Objects
Work, wait, timer, and IO completion callback objects function is very similar ways. Callback objects must be created and can be destroyed individually or as part of a clean up group as discussed in the previous section. A callback object can supply application-specific data to be passed to its callback function when the callback is executed on a worker thread. Callback objects are reusable; the same object can be used to queue multiple callback instances. Applications can block while waiting for the thread pool to process all pending callbacks for an object. Applications can cancel callbacks for an object if the thread pool has not assigned them to a worker thread. It is also possible to notify another thread when the callback object’s callback function has finished executing by calling LeaveCriticalSectionWhenCallbackReturns, ReleaseMutexWhenCallbackReturns, ReleaseSemaphoreWhenCallbackReturns, or SetEventWhenCallbackReturns. Additionally, the FreeLibraryWhenCallbackReturns function directs the system to unload a DLL when a callback function has finished executing. This unloading is useful when the callback function is implemented inside a DLL, making it impossible for the callback function to unload the DLL because its code would be unmapped from the executing process.
Work Callback Objects
Applications use work callback objects to have thread pool queue asynchronous callbacks executed in FIFO (first in, first out) order. Create a work object by calling the CreateThreadpoolWork function. If the work object is not part of a callback environment with a cleanup group, applications free work objects by calling the CloseThreadpoolWork function. To add a work callback to the thread pool's queue, call the SubmitThreadpoolWork function. Applications can block until all callbacks for a work object are assigned to worker threads by calling the WaitForThreadpoolWorkCallbacks function. Using this function, an application can optionally cancel pending work callbacks.
The following code examples demonstrate creating a custom thread pool, callback environment, and cleanup group to manage a work object's callbacks. The following structure is used to pass application-specific information to the callback function.
struct CallbackContext
{
WCHAR* info;
};
The callback function that will be executed by a worker thread is shown in this example. This callback displays the thread ID and data passed to the callback.
static VOID CALLBACK MyWorkCallback(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_WORK Work)
{
wostringstream oss;
DWORD tid = GetCurrentThreadId();
CallbackContext* pCallbackContext = (CallbackContext*) Context;
// Simulate doing some work...
SleepEx(1000, false);
oss << L",TID:" << tid;
oss << L",WorkCallback Instance:" << Instance;
oss << L",Context:" << pCallbackContext->info;
oss << L",Work: " << Work;
oss << L"\n\n";
wcout << oss.str();
}
The following code example demonstrates creating a custom thread pool, callback environment, and cleanup group for a work callback object. The work object's callback is queued for execution by the thread pool ten times before all objects are cleaned up.
void CallEnvironmentWorkDemo()
{
PTP_POOL threadPool;
PTP_WORK worker;
CallbackContext wc;
TP_CALLBACK_ENVIRON cbe;
PTP_CLEANUP_GROUP cleanupGroup;
// Create the thread pool.
threadPool = CreateThreadpool(NULL);
if (NULL == threadPool)
{
wcerr << L"Failed trying to create worker.";
return;
}
// Set the minimum and maximum threads in the pool.
SetThreadpoolThreadMinimum(threadPool,1);
SetThreadpoolThreadMaximum(threadPool,3);
// Create a callback environment.
InitializeThreadpoolEnvironment(&cbe);
// Bind the callback environment to the threadpool.
SetThreadpoolCallbackPool(&cbe, threadPool);
// Create a cleanup group.
cleanupGroup = CreateThreadpoolCleanupGroup();
if (NULL == cleanupGroup)
{
wcerr << L"Failed trying to create cleanup group.";
// Close all previously allocated objects.
DestroyThreadpoolEnvironment(&cbe);
CloseThreadpool(threadPool);
return;
}
// Associate the cleanup group with the
// callback environment.
SetThreadpoolCallbackCleanupGroup(&cbe,
cleanupGroup,
NULL);
// Create a work object that uses the
// callback environment.
wc.info = L"some context data";
worker = CreateThreadpoolWork(MyWorkCallback,
&wc,
&cbe);
if (NULL == worker)
{
wcerr << L"Failed trying to create worker.";
// Close all previously allocated objects.
CloseThreadpoolCleanupGroup(cleanupGroup);
DestroyThreadpoolEnvironment(&cbe);
CloseThreadpool(threadPool);
return;
}
// Submit a bunch of work items to the thread pool.
for (int i = 0; i < 10; i++)
{
SubmitThreadpoolWork(worker);
}
// Wait for all the work callbacks to
// finish executing.
WaitForThreadpoolWorkCallbacks(worker,FALSE);
// Close thread pool members.
CloseThreadpoolCleanupGroupMembers(
cleanupGroup,
TRUE, // Cancel pending callbacks.
NULL);
// Close cleanup group.
CloseThreadpoolCleanupGroup(cleanupGroup);
// Destroy the callback environment.
DestroyThreadpoolEnvironment(&cbe);
// Close the thread pool.
CloseThreadpool(threadPool);
}
Wait Callback Objects
Wait callback objects execute asynchronous callbacks when a kernel object is signaled or a specified time has elapsed. The CreateThreadpoolWait function creates new wait callback objects, and the CloseThreadpoolWait function frees them. The SetThreadpoolWait function specifies the kernel object to wait on and an optional timeout value after which the callback will be queued for execution. The timeout can be specified as an absolute or relative time. When the callback executes, it receives a value that indicates whether the kernel object was signaled or the wait timed out. Applications can block on a wait object's callbacks and cancel unscheduled callbacks using the WaitForThreadpoolWaitCallbacks function. Using wait callback objects with a mutual-exclusion object (mutex) should be avoided because the thread that waits for the mutex to be signaled is not the same thread where the callback executes, which means that there is no way for application code to release ownership of the mutex.
The following code example illustrates implementing a wait object's callback function that displays the reason the callback was executed.
static VOID CALLBACK MyWaitCallback(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_WAIT Wait,
TP_WAIT_RESULT WaitResult)
{
wostringstream oss;
DWORD tid = GetCurrentThreadId();
WCHAR* why;
// Capture why the wait callback was invoked.
switch (WaitResult)
{
case WAIT_OBJECT_0:
why = L"object signaled";
break;
case WAIT_ABANDONED:
why = L"abandoned";
break;
case WAIT_TIMEOUT:
why = L"timeout";
break;
default:
why = L"unknown";
break;
}
oss << L"TID:" << tid;
oss << L",WaitCallback Instance:" << Instance;
oss << L",WaitResult:" << why;
oss << L"\n\n";
wcout << oss.str();
}
In the following example, a wait object waits for an event to be signaled. The example shows the wait object's callback executing first due to the event and again due to a timeout.
void WaitDemo()
{
PTP_WAIT waiter;
HANDLE hEventCallback;
FILETIME waitTime;
ULARGE_INTEGER ulWaitTime;
// Create an auto-reset event.
hEventCallback = CreateEvent(NULL, FALSE, FALSE, NULL);
if (NULL == hEventCallback) {
wcerr << L"Failed trying to create event for callback.";
return;
}
// Create a wait object that uses
// the default thread pool.
waiter = CreateThreadpoolWait(
MyWaitCallback,
NULL,
NULL);
if (NULL == waiter)
{
wcerr << L"Failed trying to create waiter 1.";
// Close the event handle.
CloseHandle(hEventCallback);
return;
}
// Set the wait time to one second.
// The time unit is in 100-nanosecond units and it is negative
// because it is a relative time.
ulWaitTime.QuadPart = (LONGLONG) -(1 * 10 * 1000 * 1000);
waitTime.dwHighDateTime = ulWaitTime.HighPart;
waitTime.dwLowDateTime = ulWaitTime.LowPart;
// Place the wait object in the thread pool.
SetThreadpoolWait(waiter, hEventCallback,&waitTime);
// Demo wait callback due to object signal.
// Signal the event to call the wait callback.
SetEvent(hEventCallback);
// Stall to allow the wait callback to be
// added to the thread pool queue.
SleepEx(2000, FALSE);
// Block until the wait callback is executed.
WaitForThreadpoolWaitCallbacks(waiter,FALSE);
// Demo wait callback due to timeout.
SetThreadpoolWait(waiter, hEventCallback,&waitTime);
SleepEx(2000, FALSE);
// Cleanup.
CloseThreadpoolWait(waiter);
CloseHandle(hEventCallback);
}
Timer Callback Objects
Timer callback objects queue asynchronous callbacks at a specified absolute or relative time. Timer objects are created by the CreateThreadpoolTimer function and are freed by the CloseThreadpoolTimer function. In addition to scheduling a callback at a specified time, timer objects can also schedule an additional callback each time a specified time period has elapsed. Applications use the SetThreadpoolTimer function to start and stop timer callback scheduling. To determine whether a timer callback object has been set, applications can call the IsThreadpoolTimerSet function. Applications can block on a timer object's callbacks and cancel unscheduled callbacks using the WaitForThreadpoolTimerCallbacks function.
The following code example illustrates implementing a timer object's callback function.
static VOID CALLBACK MyTimerCallback(
PTP_CALLBACK_INSTANCE Instance,
PVOID Context,
PTP_TIMER Timer)
{
wostringstream oss;
DWORD tid = GetCurrentThreadId();
// Simulate doing some work...
SleepEx(1000, false);
oss << L",TID: " << tid;
oss << L",TimerCallback Instance: " << Instance;
oss << L",Timer: " << Timer;
oss << L"\n\n";
wcout << oss.str();
}
The following code example demonstrates creating and setting a timer callback using the default thread pool and callback environment.
void TimerDemo()
{
PTP_TIMER timer;
FILETIME waitTime;
ULARGE_INTEGER ulWaitTime;
timer = CreateThreadpoolTimer(MyTimerCallback,
NULL,
NULL);
if (NULL == timer)
{
wcerr << L"Failed trying to create timer.";
return;
}
// Set the wait time to one second
ulWaitTime.QuadPart = (LONGLONG) -(1 * 10 * 1000 * 1000);
waitTime.dwHighDateTime = ulWaitTime.HighPart;
waitTime.dwLowDateTime = ulWaitTime.LowPart;
// Set a timer that expires after an initial one second interval,
// and subsequently fires with a one second period.
SetThreadpoolTimer(timer,
&waitTime,
1000, // Period.
0); // Window length.
SleepEx(10000, false);
// Wait for scheduled timer callbacks
// and cancel any pending ones.
WaitForThreadpoolTimerCallbacks(timer,TRUE);
CloseThreadpoolTimer(timer);
}
IO Completion Callback Objects
An IO completion callback object executes asynchronous callbacks when an asynchronous I/O operation completes. IO completion objects are created using the CreateThreadpoolIo function and are freed using the CloseThreadpoolIo function. Before issuing each asynchronous IO operation, applications must call the StartThreadpoolIo function. If an application does not call StartThreadpoolIo, the OverlappedCompletionRoutine callback function will not execute. Applications call CancelThreadpoolIo to stop the callback function from being called after issuing an asynchronous IO operation or if, when issuing the IO operation, a FALSE result is returned and GetLastError returns a value other than ERROR_IO_PENDING.
Applications can block on an IO completion object's callbacks and cancel unscheduled callbacks using the WaitForThreadpoolIoCallbacks function.
For a sample application that demonstrates using IO completion objects, refer to the samples provided with the article titled Pooled Threads: Improve Scalability With New Thread Pool APIs, available on MSDN.