302 lines
6.9 KiB
C++
302 lines
6.9 KiB
C++
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
//
|
||
|
// Copyright (c) 1997, Microsoft Corp. All rights reserved.
|
||
|
//
|
||
|
// FILE
|
||
|
//
|
||
|
// Dispatcher.cpp
|
||
|
//
|
||
|
// SYNOPSIS
|
||
|
//
|
||
|
// This file implements the class Dispatcher.
|
||
|
//
|
||
|
// MODIFICATION HISTORY
|
||
|
//
|
||
|
// 07/31/1997 Original version.
|
||
|
// 12/04/1997 Check return value of _beginthreadex.
|
||
|
// 02/24/1998 Initialize COM run-time for all threads.
|
||
|
// 04/16/1998 Block in Finalize until all the threads have returned.
|
||
|
// 05/20/1998 GetQueuedCompletionStatus signature changed.
|
||
|
// 08/07/1998 Wait on thread handle to ensure all threads have exited.
|
||
|
//
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
#include <nt.h>
|
||
|
#include <ntrtl.h>
|
||
|
#include <nturtl.h>
|
||
|
#include <windows.h>
|
||
|
|
||
|
#include <iascore.h>
|
||
|
#include <process.h>
|
||
|
#include <cstddef>
|
||
|
|
||
|
#include <dispatcher.h>
|
||
|
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
//
|
||
|
// METHOD
|
||
|
//
|
||
|
// Dispatcher::initialize
|
||
|
//
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
BOOL Dispatcher::initialize(DWORD dwMaxThreads, DWORD dwMaxIdle) throw ()
|
||
|
{
|
||
|
// Initialize the various parameters.
|
||
|
numThreads = 0;
|
||
|
maxThreads = dwMaxThreads;
|
||
|
available = 0;
|
||
|
maxIdle = dwMaxIdle;
|
||
|
|
||
|
// If maxThreads == 0, then we compute a suitable default.
|
||
|
if (maxThreads == 0)
|
||
|
{
|
||
|
// Threads defaults to 64 times the number of processors.
|
||
|
SYSTEM_INFO sinf;
|
||
|
::GetSystemInfo(&sinf);
|
||
|
maxThreads = sinf.dwNumberOfProcessors * 64;
|
||
|
}
|
||
|
|
||
|
// Initialize the handles.
|
||
|
hPort = CreateIoCompletionPort(INVALID_HANDLE_VALUE, NULL, 0, 0);
|
||
|
if (hPort == NULL)
|
||
|
{
|
||
|
return FALSE;
|
||
|
}
|
||
|
|
||
|
hEmpty = CreateEvent(NULL, TRUE, TRUE, NULL);
|
||
|
if (hEmpty == NULL)
|
||
|
{
|
||
|
CloseHandle(hPort);
|
||
|
hPort = NULL;
|
||
|
return FALSE;
|
||
|
}
|
||
|
|
||
|
hLastOut = NULL;
|
||
|
|
||
|
return TRUE;
|
||
|
}
|
||
|
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
//
|
||
|
// METHOD
|
||
|
//
|
||
|
// Dispatcher::finalize
|
||
|
//
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
void Dispatcher::finalize()
|
||
|
{
|
||
|
Lock();
|
||
|
|
||
|
// Block any new threads from being created.
|
||
|
maxThreads = 0;
|
||
|
|
||
|
// How many threads are still in the pool?
|
||
|
DWORD remaining = numThreads;
|
||
|
|
||
|
Unlock();
|
||
|
|
||
|
// Post a null request for each existing thread.
|
||
|
while (remaining--)
|
||
|
{
|
||
|
PostQueuedCompletionStatus(hPort, 0, 0, NULL);
|
||
|
}
|
||
|
|
||
|
// Wait until the pool is empty.
|
||
|
WaitForSingleObject(hEmpty, INFINITE);
|
||
|
|
||
|
if (hLastOut != NULL)
|
||
|
{
|
||
|
// Wait for the last thread to exit.
|
||
|
WaitForSingleObject(hLastOut, INFINITE);
|
||
|
}
|
||
|
|
||
|
//////////
|
||
|
// Clean-up the handles.
|
||
|
//////////
|
||
|
|
||
|
CloseHandle(hLastOut);
|
||
|
hLastOut = NULL;
|
||
|
|
||
|
CloseHandle(hEmpty);
|
||
|
hEmpty = NULL;
|
||
|
|
||
|
CloseHandle(hPort);
|
||
|
hPort = NULL;
|
||
|
}
|
||
|
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
//
|
||
|
// METHOD
|
||
|
//
|
||
|
// Dispatcher::Dispatch
|
||
|
//
|
||
|
// DESCRIPTION
|
||
|
//
|
||
|
// This is the main loop for all the threads in the pool.
|
||
|
//
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
inline void Dispatcher::fillRequests() throw ()
|
||
|
{
|
||
|
DWORD dwNumBytes;
|
||
|
ULONG_PTR ulKey;
|
||
|
PIAS_CALLBACK pRequest;
|
||
|
|
||
|
//////////
|
||
|
// Loop until we either timeout or get a null request.
|
||
|
//////////
|
||
|
|
||
|
next:
|
||
|
BOOL success = GetQueuedCompletionStatus(hPort,
|
||
|
&dwNumBytes,
|
||
|
&ulKey,
|
||
|
(OVERLAPPED**)&pRequest,
|
||
|
maxIdle);
|
||
|
|
||
|
if (pRequest)
|
||
|
{
|
||
|
pRequest->CallbackRoutine(pRequest);
|
||
|
|
||
|
Lock();
|
||
|
|
||
|
++available;
|
||
|
|
||
|
Unlock();
|
||
|
|
||
|
goto next;
|
||
|
}
|
||
|
|
||
|
Lock();
|
||
|
|
||
|
// We never want to timeout a thread while there's a backlog.
|
||
|
if (available <= 0 && success == FALSE && GetLastError() == WAIT_TIMEOUT)
|
||
|
{
|
||
|
Unlock();
|
||
|
|
||
|
goto next;
|
||
|
}
|
||
|
|
||
|
// Save the current value of 'last out' and replace it with our handle.
|
||
|
HANDLE previousThread = hLastOut;
|
||
|
hLastOut = NULL;
|
||
|
DuplicateHandle(
|
||
|
NtCurrentProcess(),
|
||
|
NtCurrentThread(),
|
||
|
NtCurrentProcess(),
|
||
|
&hLastOut,
|
||
|
0,
|
||
|
FALSE,
|
||
|
DUPLICATE_SAME_ACCESS
|
||
|
);
|
||
|
|
||
|
// We're removing a thread from the pool, so update our state.
|
||
|
--available;
|
||
|
--numThreads;
|
||
|
|
||
|
// If there are no threads left, set the 'empty' event.
|
||
|
if (numThreads == 0) { SetEvent(hEmpty); }
|
||
|
|
||
|
Unlock();
|
||
|
|
||
|
// Wait until the previous thread exits. This guarantees that when the
|
||
|
// 'last out' thread exits, all threads have exited.
|
||
|
WaitForSingleObject(previousThread, INFINITE);
|
||
|
CloseHandle(previousThread);
|
||
|
}
|
||
|
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
//
|
||
|
// METHOD
|
||
|
//
|
||
|
// Dispatcher::RequestThread
|
||
|
//
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
BOOL Dispatcher::requestThread(PIAS_CALLBACK OnStart) throw ()
|
||
|
{
|
||
|
Lock();
|
||
|
|
||
|
// If there are no threads available AND we're below our limit,
|
||
|
// create a new thread.
|
||
|
if (--available < 0 && numThreads < maxThreads)
|
||
|
{
|
||
|
unsigned nThreadID;
|
||
|
HANDLE hThread = (HANDLE)_beginthreadex(NULL,
|
||
|
0,
|
||
|
startRoutine,
|
||
|
(void*)this,
|
||
|
0,
|
||
|
&nThreadID);
|
||
|
|
||
|
if (hThread)
|
||
|
{
|
||
|
// We don't need the thread handle.
|
||
|
CloseHandle(hThread);
|
||
|
|
||
|
// We added a thread to the pool, so update our state.
|
||
|
if (numThreads == 0) { ResetEvent(hEmpty); }
|
||
|
++numThreads;
|
||
|
++available;
|
||
|
}
|
||
|
}
|
||
|
|
||
|
Unlock();
|
||
|
|
||
|
//////////
|
||
|
// Post it to the I/O Completion Port.
|
||
|
//////////
|
||
|
|
||
|
return PostQueuedCompletionStatus(hPort, 0, 0, (OVERLAPPED*)OnStart);
|
||
|
}
|
||
|
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
//
|
||
|
// METHOD
|
||
|
//
|
||
|
// Dispatcher::setMaxNumberOfThreads
|
||
|
//
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
DWORD Dispatcher::setMaxNumberOfThreads(DWORD dwMaxThreads) throw ()
|
||
|
{
|
||
|
Lock();
|
||
|
|
||
|
DWORD oldval = maxThreads;
|
||
|
|
||
|
maxThreads = dwMaxThreads;
|
||
|
|
||
|
Unlock();
|
||
|
|
||
|
return oldval;
|
||
|
}
|
||
|
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
//
|
||
|
// METHOD
|
||
|
//
|
||
|
// Dispatcher::setMaxThreadIdle
|
||
|
//
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
DWORD Dispatcher::setMaxThreadIdle(DWORD dwMilliseconds)
|
||
|
{
|
||
|
Lock();
|
||
|
|
||
|
DWORD oldval = maxIdle;
|
||
|
|
||
|
maxIdle = dwMilliseconds;
|
||
|
|
||
|
Unlock();
|
||
|
|
||
|
return oldval;
|
||
|
}
|
||
|
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
//
|
||
|
// METHOD
|
||
|
//
|
||
|
// Dispatcher::StartRoutine
|
||
|
//
|
||
|
///////////////////////////////////////////////////////////////////////////////
|
||
|
unsigned __stdcall Dispatcher::startRoutine(void* pArg) throw ()
|
||
|
{
|
||
|
((Dispatcher*)pArg)->fillRequests();
|
||
|
|
||
|
return 0;
|
||
|
}
|