summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorBernard Normier <bernard@zeroc.com>2007-02-01 17:09:49 +0000
committerBernard Normier <bernard@zeroc.com>2007-02-01 17:09:49 +0000
commitabada90e3f84dc703b8ddc9efcbed8a946fadead (patch)
tree2c6f9dccd510ea97cb927a7bd635422efaae547a /cpp/src/Ice/ThreadPool.cpp
parentremoving trace message (diff)
downloadice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.bz2
ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.tar.xz
ice-abada90e3f84dc703b8ddc9efcbed8a946fadead.zip
Expanded tabs into spaces
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp1192
1 files changed, 596 insertions, 596 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 87b315b9cba..6e122f575f0 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -61,9 +61,9 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_epollFd = epoll_create(1);
if(_epollFd < 0)
{
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
_events.resize(1);
epoll_event event;
@@ -71,9 +71,9 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
event.data.fd = _fdIntrRead;
if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, _fdIntrRead, &event) != 0)
{
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
#else
_pollFdSet.resize(1);
@@ -90,8 +90,8 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
if(size < 1)
{
Warning out(_instance->initializationData().logger);
- out << _prefix << ".Size < 1; Size adjusted to 1";
- size = 1;
+ out << _prefix << ".Size < 1; Size adjusted to 1";
+ size = 1;
}
int sizeMax =
@@ -99,17 +99,17 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
if(sizeMax < size)
{
Warning out(_instance->initializationData().logger);
- out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")";
- sizeMax = size;
- }
+ out << _prefix << ".SizeMax < " << _prefix << ".Size; SizeMax adjusted to Size (" << size << ")";
+ sizeMax = size;
+ }
int sizeWarn = _instance->initializationData().properties->
- getPropertyAsIntWithDefault(_prefix + ".SizeWarn", sizeMax * 80 / 100);
+ getPropertyAsIntWithDefault(_prefix + ".SizeWarn", sizeMax * 80 / 100);
if(sizeWarn > sizeMax)
{
- Warning out(_instance->initializationData().logger);
- out << _prefix << ".SizeWarn > " << _prefix << ".SizeMax; adjusted SizeWarn to SizeMax (" << sizeMax << ")";
- sizeWarn = sizeMax;
+ Warning out(_instance->initializationData().logger);
+ out << _prefix << ".SizeWarn > " << _prefix << ".SizeMax; adjusted SizeWarn to SizeMax (" << sizeMax << ")";
+ sizeWarn = sizeMax;
}
const_cast<int&>(_size) = size;
@@ -120,38 +120,38 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
if(stackSize < 0)
{
Warning out(_instance->initializationData().logger);
- out << _prefix << ".StackSize < 0; Size adjusted to OS default";
- stackSize = 0;
+ out << _prefix << ".StackSize < 0; Size adjusted to OS default";
+ stackSize = 0;
}
const_cast<size_t&>(_stackSize) = static_cast<size_t>(stackSize);
__setNoDelete(true);
try
{
- for(int i = 0 ; i < _size ; ++i)
- {
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
- thread->start(_stackSize);
- _threads.push_back(thread);
- ++_running;
- }
+ for(int i = 0 ; i < _size ; ++i)
+ {
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ thread->start(_stackSize);
+ _threads.push_back(thread);
+ ++_running;
+ }
}
catch(const IceUtil::Exception& ex)
{
- {
- Error out(_instance->initializationData().logger);
- out << "cannot create thread for `" << _prefix << "':\n" << ex;
- }
+ {
+ Error out(_instance->initializationData().logger);
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
+ }
- destroy();
- joinWithAllThreads();
- __setNoDelete(false);
- throw;
+ destroy();
+ joinWithAllThreads();
+ __setNoDelete(false);
+ throw;
}
catch(...)
{
- __setNoDelete(false);
- throw;
+ __setNoDelete(false);
+ throw;
}
__setNoDelete(false);
}
@@ -162,33 +162,33 @@ IceInternal::ThreadPool::~ThreadPool()
try
{
- closeSocket(_fdIntrWrite);
+ closeSocket(_fdIntrWrite);
}
catch(const LocalException& ex)
{
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
}
try
{
- closeSocket(_fdIntrRead);
+ closeSocket(_fdIntrRead);
}
catch(const LocalException& ex)
{
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
}
#ifdef __linux
try
{
- closeSocket(_epollFd);
+ closeSocket(_epollFd);
}
catch(const LocalException& ex)
{
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "' while calling closeSocket():\n" << ex;
}
#endif
}
@@ -214,15 +214,15 @@ IceInternal::ThreadPool::incFdsInUse()
assert(!_destroyed);
if(_fdsInUse + 1 > FD_SETSIZE)
{
- Warning warn(_instance->initializationData().logger);
- warn << "maximum number of connections exceeded";
+ Warning warn(_instance->initializationData().logger);
+ warn << "maximum number of connections exceeded";
- //
- // No appropriate errno.
- //
- SocketException ex(__FILE__, __LINE__);
- ex.error = 0;
- throw ex;
+ //
+ // No appropriate errno.
+ //
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = 0;
+ throw ex;
}
++_fdsInUse;
#endif
@@ -238,8 +238,8 @@ IceInternal::ThreadPool::decFdsInUse()
assert(!_destroyed);
if(_fdsInUse <= 1)
{
- Trace trace(_instance->initializationData().logger, "ThreadPool");
- trace << _prefix << ": about to assert";
+ Trace trace(_instance->initializationData().logger, "ThreadPool");
+ trace << _prefix << ": about to assert";
}
assert(_fdsInUse > 1); // _fdIntrRead is always in use.
--_fdsInUse;
@@ -269,41 +269,41 @@ IceInternal::ThreadPool::promoteFollower()
{
if(_sizeMax > 1)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_promote);
- _promote = true;
- notify();
+ assert(!_promote);
+ _promote = true;
+ notify();
- if(!_destroyed)
- {
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
- {
- Warning out(_instance->initializationData().logger);
- out << "thread pool `" << _prefix << "' is running low on threads\n"
- << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
- }
-
- assert(_inUse <= _running);
- if(_inUse < _sizeMax && _inUse == _running)
- {
- try
- {
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
- thread->start(_stackSize);
- _threads.push_back(thread);
- ++_running;
- }
- catch(const IceUtil::Exception& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "cannot create thread for `" << _prefix << "':\n" << ex;
- }
- }
- }
+ if(!_destroyed)
+ {
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "thread pool `" << _prefix << "' is running low on threads\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ }
+
+ assert(_inUse <= _running);
+ if(_inUse < _sizeMax && _inUse == _running)
+ {
+ try
+ {
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ thread->start(_stackSize);
+ _threads.push_back(thread);
+ ++_running;
+ }
+ catch(const IceUtil::Exception& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
+ }
+ }
+ }
}
}
@@ -319,7 +319,7 @@ IceInternal::ThreadPool::joinWithAllThreads()
assert(_destroyed);
for(vector<IceUtil::ThreadPtr>::iterator p = _threads.begin(); p != _threads.end(); ++p)
{
- (*p)->getThreadControl().join();
+ (*p)->getThreadControl().join();
}
}
@@ -339,26 +339,26 @@ repeat:
#ifdef _WIN32
if(::recv(_fdIntrRead, &c, 1, 0) == SOCKET_ERROR)
{
- if(interrupted())
- {
- goto repeat;
- }
+ if(interrupted())
+ {
+ goto repeat;
+ }
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
#else
if(::read(_fdIntrRead, &c, 1) == -1)
{
- if(interrupted())
- {
- goto repeat;
- }
+ if(interrupted())
+ {
+ goto repeat;
+ }
- SyscallException ex(__FILE__, __LINE__);
- ex.error = getSystemErrno();
- throw ex;
+ SyscallException ex(__FILE__, __LINE__);
+ ex.error = getSystemErrno();
+ throw ex;
}
#endif
}
@@ -373,26 +373,26 @@ repeat:
#ifdef _WIN32
if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
{
- if(interrupted())
- {
- goto repeat;
- }
+ if(interrupted())
+ {
+ goto repeat;
+ }
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ throw ex;
}
#else
if(::write(_fdIntrWrite, &c, 1) == -1)
{
- if(interrupted())
- {
- goto repeat;
- }
+ if(interrupted())
+ {
+ goto repeat;
+ }
- SyscallException ex(__FILE__, __LINE__);
- ex.error = getSystemErrno();
- throw ex;
+ SyscallException ex(__FILE__, __LINE__);
+ ex.error = getSystemErrno();
+ throw ex;
}
#endif
}
@@ -404,467 +404,467 @@ IceInternal::ThreadPool::run()
if(_sizeMax > 1)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while(!_promote)
- {
- wait();
- }
+ while(!_promote)
+ {
+ wait();
+ }
- _promote = false;
+ _promote = false;
}
while(true)
{
- int ret;
+ int ret;
#if defined(_WIN32)
- fd_set fdSet;
- memcpy(&fdSet, &_fdSet, sizeof(fd_set));
- if(_timeout > 0)
- {
- struct timeval tv;
- tv.tv_sec = _timeout;
- tv.tv_usec = 0;
- ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, &tv);
- }
- else
- {
- ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, 0);
- }
+ fd_set fdSet;
+ memcpy(&fdSet, &_fdSet, sizeof(fd_set));
+ if(_timeout > 0)
+ {
+ struct timeval tv;
+ tv.tv_sec = _timeout;
+ tv.tv_usec = 0;
+ ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, &tv);
+ }
+ else
+ {
+ ret = ::select(static_cast<int>(_maxFd + 1), &fdSet, 0, 0, 0);
+ }
#elif defined(__linux)
- ret = epoll_wait(_epollFd, &_events[0], _events.size(), _timeout > 0 ? _timeout * 1000 : -1);
+ ret = epoll_wait(_epollFd, &_events[0], _events.size(), _timeout > 0 ? _timeout * 1000 : -1);
#else
- ret = poll(&_pollFdSet[0], _pollFdSet.size(), _timeout > 0 ? _timeout * 1000 : -1);
+ ret = poll(&_pollFdSet[0], _pollFdSet.size(), _timeout > 0 ? _timeout * 1000 : -1);
#endif
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
- {
- continue;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- //throw ex;
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "':\n" << ex;
- continue;
- }
-
- EventHandlerPtr handler;
- bool finished = false;
- bool shutdown = false;
+ if(ret == SOCKET_ERROR)
+ {
+ if(interrupted())
+ {
+ continue;
+ }
+
+ SocketException ex(__FILE__, __LINE__);
+ ex.error = getSocketErrno();
+ //throw ex;
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "':\n" << ex;
+ continue;
+ }
+
+ EventHandlerPtr handler;
+ bool finished = false;
+ bool shutdown = false;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
- {
- assert(_timeout > 0);
- _timeout = 0;
- shutdown = true;
- }
- else
- {
- bool interrupted = false;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(ret == 0) // We initiate a shutdown if there is a thread pool timeout.
+ {
+ assert(_timeout > 0);
+ _timeout = 0;
+ shutdown = true;
+ }
+ else
+ {
+ bool interrupted = false;
#if defined(_WIN32)
- interrupted = FD_ISSET(_fdIntrRead, &fdSet);
+ interrupted = FD_ISSET(_fdIntrRead, &fdSet);
#elif defined(__linux)
for(int i = 0; i < ret; ++i)
- {
- if(_events[i].data.fd == _fdIntrRead)
- {
- interrupted = true;
- break;
- }
- }
+ {
+ if(_events[i].data.fd == _fdIntrRead)
+ {
+ interrupted = true;
+ break;
+ }
+ }
#else
- assert(_pollFdSet[0].fd == _fdIntrRead);
+ assert(_pollFdSet[0].fd == _fdIntrRead);
interrupted = _pollFdSet[0].revents != 0;
#endif
- if(interrupted)
- {
- //
- // There are two possiblities for an interrupt:
- //
- // 1. The thread pool has been destroyed.
- //
- // 2. An event handler was registered or unregistered.
- //
+ if(interrupted)
+ {
+ //
+ // There are two possiblities for an interrupt:
+ //
+ // 1. The thread pool has been destroyed.
+ //
+ // 2. An event handler was registered or unregistered.
+ //
- //
- // Thread pool destroyed?
- //
- if(_destroyed)
- {
- //
- // Don't clear the interrupt if destroyed, so that
- // the other threads exit as well.
- //
- return true;
- }
-
- clearInterrupt();
-
- //
- // An event handler must have been registered or
- // unregistered.
- //
- assert(!_changes.empty());
- pair<SOCKET, EventHandlerPtr> change = _changes.front();
- _changes.pop_front();
-
- if(change.second) // Addition if handler is set.
- {
- _handlerMap.insert(change);
+ //
+ // Thread pool destroyed?
+ //
+ if(_destroyed)
+ {
+ //
+ // Don't clear the interrupt if destroyed, so that
+ // the other threads exit as well.
+ //
+ return true;
+ }
+
+ clearInterrupt();
+
+ //
+ // An event handler must have been registered or
+ // unregistered.
+ //
+ assert(!_changes.empty());
+ pair<SOCKET, EventHandlerPtr> change = _changes.front();
+ _changes.pop_front();
+
+ if(change.second) // Addition if handler is set.
+ {
+ _handlerMap.insert(change);
#if defined(_WIN32)
- FD_SET(change.first, &_fdSet);
+ FD_SET(change.first, &_fdSet);
#elif defined(__linux)
- epoll_event event;
- event.events = EPOLLIN;
- event.data.fd = change.first;
- if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, change.first, &event) != 0)
- {
- Error out(_instance->initializationData().logger);
- out << "error while adding filedescriptor to epoll set:\n";
- out << errorToString(getSocketErrno());
- continue;
- }
- _events.resize(_handlerMap.size() + 1);
+ epoll_event event;
+ event.events = EPOLLIN;
+ event.data.fd = change.first;
+ if(epoll_ctl(_epollFd, EPOLL_CTL_ADD, change.first, &event) != 0)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "error while adding filedescriptor to epoll set:\n";
+ out << errorToString(getSocketErrno());
+ continue;
+ }
+ _events.resize(_handlerMap.size() + 1);
#else
- struct pollfd pollFd;
- pollFd.fd = change.first;
- pollFd.events = POLLIN;
- _pollFdSet.push_back(pollFd);
+ struct pollfd pollFd;
+ pollFd.fd = change.first;
+ pollFd.events = POLLIN;
+ _pollFdSet.push_back(pollFd);
#endif
- _maxFd = max(_maxFd, change.first);
- _minFd = min(_minFd, change.first);
- continue;
- }
- else // Removal if handler is not set.
- {
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
- assert(p != _handlerMap.end());
- handler = p->second;
- finished = true;
- _handlerMap.erase(p);
+ _maxFd = max(_maxFd, change.first);
+ _minFd = min(_minFd, change.first);
+ continue;
+ }
+ else // Removal if handler is not set.
+ {
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first);
+ assert(p != _handlerMap.end());
+ handler = p->second;
+ finished = true;
+ _handlerMap.erase(p);
#if defined(_WIN32)
- FD_CLR(change.first, &_fdSet);
+ FD_CLR(change.first, &_fdSet);
#elif defined(__linux)
- epoll_event event;
- event.events = 0;
- if(epoll_ctl(_epollFd, EPOLL_CTL_DEL, change.first, &event) != 0)
- {
- Error out(_instance->initializationData().logger);
- out << "error while adding filedescriptor to epoll set:\n";
- out << errorToString(getSocketErrno());
- continue;
- }
- _events.resize(_handlerMap.size() + 1);
+ epoll_event event;
+ event.events = 0;
+ if(epoll_ctl(_epollFd, EPOLL_CTL_DEL, change.first, &event) != 0)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "error while adding filedescriptor to epoll set:\n";
+ out << errorToString(getSocketErrno());
+ continue;
+ }
+ _events.resize(_handlerMap.size() + 1);
#else
- for(vector<struct pollfd>::iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
- {
- if(p->fd == change.first)
- {
- _pollFdSet.erase(p);
- break;
- }
- }
+ for(vector<struct pollfd>::iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
+ {
+ if(p->fd == change.first)
+ {
+ _pollFdSet.erase(p);
+ break;
+ }
+ }
#endif
- _maxFd = _fdIntrRead;
- _minFd = _fdIntrRead;
- if(!_handlerMap.empty())
- {
- _maxFd = max(_maxFd, (--_handlerMap.end())->first);
- _minFd = min(_minFd, _handlerMap.begin()->first);
- }
- // Don't continue; we have to call
- // finished() on the event handler below, outside
- // the thread synchronization.
- }
- }
- else
- {
- //
- // Round robin for the filedescriptors.
- //
- SOCKET largerFd = _maxFd + 1;
- SOCKET smallestFd = _maxFd + 1;
+ _maxFd = _fdIntrRead;
+ _minFd = _fdIntrRead;
+ if(!_handlerMap.empty())
+ {
+ _maxFd = max(_maxFd, (--_handlerMap.end())->first);
+ _minFd = min(_minFd, _handlerMap.begin()->first);
+ }
+ // Don't continue; we have to call
+ // finished() on the event handler below, outside
+ // the thread synchronization.
+ }
+ }
+ else
+ {
+ //
+ // Round robin for the filedescriptors.
+ //
+ SOCKET largerFd = _maxFd + 1;
+ SOCKET smallestFd = _maxFd + 1;
#if defined(_WIN32)
- if(fdSet.fd_count == 0)
- {
- Error out(_instance->initializationData().logger);
- out << "select() in `" << _prefix << "' returned " << ret
- << " but no filedescriptor is readable";
- continue;
- }
- for(u_short i = 0; i < fdSet.fd_count; ++i)
- {
- SOCKET fd = fdSet.fd_array[i];
+ if(fdSet.fd_count == 0)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "select() in `" << _prefix << "' returned " << ret
+ << " but no filedescriptor is readable";
+ continue;
+ }
+ for(u_short i = 0; i < fdSet.fd_count; ++i)
+ {
+ SOCKET fd = fdSet.fd_array[i];
#elif defined(__linux)
- for(int i = 0; i < ret; ++i)
- {
- SOCKET fd = _events[i].data.fd;
+ for(int i = 0; i < ret; ++i)
+ {
+ SOCKET fd = _events[i].data.fd;
#else
for(vector<struct pollfd>::const_iterator p = _pollFdSet.begin(); p != _pollFdSet.end(); ++p)
- {
- if(p->revents == 0)
- {
- continue;
- }
- SOCKET fd = p->fd;
+ {
+ if(p->revents == 0)
+ {
+ continue;
+ }
+ SOCKET fd = p->fd;
#endif
- assert(fd != INVALID_SOCKET);
- if(fd > _lastFd || _lastFd == INVALID_SOCKET)
- {
- largerFd = min(largerFd, fd);
- }
-
- smallestFd = min(smallestFd, fd);
- }
-
- if(largerFd <= _maxFd)
- {
- assert(largerFd >= _minFd);
- _lastFd = largerFd;
- }
- else
- {
- assert(smallestFd >= _minFd && smallestFd <= _maxFd);
- _lastFd = smallestFd;
- }
- assert(_lastFd != _fdIntrRead);
- map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
- if(p == _handlerMap.end())
- {
- Error out(_instance->initializationData().logger);
- out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'";
- continue;
- }
-
- handler = p->second;
- }
- }
- }
-
- //
- // Now we are outside the thread synchronization.
- //
+ assert(fd != INVALID_SOCKET);
+ if(fd > _lastFd || _lastFd == INVALID_SOCKET)
+ {
+ largerFd = min(largerFd, fd);
+ }
+
+ smallestFd = min(smallestFd, fd);
+ }
+
+ if(largerFd <= _maxFd)
+ {
+ assert(largerFd >= _minFd);
+ _lastFd = largerFd;
+ }
+ else
+ {
+ assert(smallestFd >= _minFd && smallestFd <= _maxFd);
+ _lastFd = smallestFd;
+ }
+ assert(_lastFd != _fdIntrRead);
+ map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(_lastFd);
+ if(p == _handlerMap.end())
+ {
+ Error out(_instance->initializationData().logger);
+ out << "filedescriptor " << _lastFd << " not registered with `" << _prefix << "'";
+ continue;
+ }
+
+ handler = p->second;
+ }
+ }
+ }
+
+ //
+ // Now we are outside the thread synchronization.
+ //
- if(shutdown)
- {
- //
- // Initiate server shutdown.
- //
- ObjectAdapterFactoryPtr factory;
- try
- {
- factory = _instance->objectAdapterFactory();
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- continue;
- }
+ if(shutdown)
+ {
+ //
+ // Initiate server shutdown.
+ //
+ ObjectAdapterFactoryPtr factory;
+ try
+ {
+ factory = _instance->objectAdapterFactory();
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ continue;
+ }
- promoteFollower();
- factory->shutdown();
+ promoteFollower();
+ factory->shutdown();
- //
- // No "continue", because we want shutdown to be done in
- // its own thread from this pool. Therefore we called
- // promoteFollower().
- //
- }
- else
- {
- assert(handler);
-
- if(finished)
- {
- //
- // Notify a handler about it's removal from the thread
- // pool.
- //
- try
- {
- //
- // "self" is faster than "this", as the reference
- // count is not modified.
- //
- handler->finished(self);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling finished():\n"
- << ex << '\n' << handler->toString();
- }
+ //
+ // No "continue", because we want shutdown to be done in
+ // its own thread from this pool. Therefore we called
+ // promoteFollower().
+ //
+ }
+ else
+ {
+ assert(handler);
+
+ if(finished)
+ {
+ //
+ // Notify a handler about it's removal from the thread
+ // pool.
+ //
+ try
+ {
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->finished(self);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "' while calling finished():\n"
+ << ex << '\n' << handler->toString();
+ }
- //
- // No "continue", because we want finished() to be
- // called in its own thread from this pool. Note that
- // this means that finished() must call
- // promoteFollower().
- //
- }
- else
- {
- //
- // If the handler is "readable", try to read a
- // message.
- //
- BasicStream stream(_instance.get());
- if(handler->readable())
- {
- try
- {
- read(handler);
- }
- catch(const TimeoutException&) // Expected.
- {
- continue;
- }
- catch(const DatagramLimitException&) // Expected.
- {
- continue;
- }
- catch(const SocketException& ex)
- {
- handler->exception(ex);
- continue;
- }
- catch(const LocalException& ex)
- {
- if(handler->datagram())
- {
- if(_instance->initializationData().properties->
- getPropertyAsInt("Ice.Warn.Connections") > 0)
- {
- Warning out(_instance->initializationData().logger);
- out << "datagram connection exception:\n" << ex << '\n' << handler->toString();
- }
- }
- else
- {
- handler->exception(ex);
- }
- continue;
- }
-
- stream.swap(handler->_stream);
- assert(stream.i == stream.b.end());
- }
-
- //
- // Provide a new mesage to the handler.
- //
- try
- {
- //
- // "self" is faster than "this", as the reference
- // count is not modified.
- //
- handler->message(stream, self);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in `" << _prefix << "' while calling message():\n"
- << ex << '\n' << handler->toString();
- }
-
- //
- // No "continue", because we want message() to be
- // called in its own thread from this pool. Note that
- // this means that message() must call
- // promoteFollower().
- //
- }
- }
+ //
+ // No "continue", because we want finished() to be
+ // called in its own thread from this pool. Note that
+ // this means that finished() must call
+ // promoteFollower().
+ //
+ }
+ else
+ {
+ //
+ // If the handler is "readable", try to read a
+ // message.
+ //
+ BasicStream stream(_instance.get());
+ if(handler->readable())
+ {
+ try
+ {
+ read(handler);
+ }
+ catch(const TimeoutException&) // Expected.
+ {
+ continue;
+ }
+ catch(const DatagramLimitException&) // Expected.
+ {
+ continue;
+ }
+ catch(const SocketException& ex)
+ {
+ handler->exception(ex);
+ continue;
+ }
+ catch(const LocalException& ex)
+ {
+ if(handler->datagram())
+ {
+ if(_instance->initializationData().properties->
+ getPropertyAsInt("Ice.Warn.Connections") > 0)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "datagram connection exception:\n" << ex << '\n' << handler->toString();
+ }
+ }
+ else
+ {
+ handler->exception(ex);
+ }
+ continue;
+ }
+
+ stream.swap(handler->_stream);
+ assert(stream.i == stream.b.end());
+ }
+
+ //
+ // Provide a new mesage to the handler.
+ //
+ try
+ {
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->message(stream, self);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->initializationData().logger);
+ out << "exception in `" << _prefix << "' while calling message():\n"
+ << ex << '\n' << handler->toString();
+ }
+
+ //
+ // No "continue", because we want message() to be
+ // called in its own thread from this pool. Note that
+ // this means that message() must call
+ // promoteFollower().
+ //
+ }
+ }
- if(_sizeMax > 1)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- if(!_destroyed)
- {
- //
- // First we reap threads that have been destroyed before.
- //
- int sz = static_cast<int>(_threads.size());
- assert(_running <= sz);
- if(_running < sz)
- {
- vector<IceUtil::ThreadPtr>::iterator start =
- partition(_threads.begin(), _threads.end(), IceUtil::constMemFun(&IceUtil::Thread::isAlive));
+ if(_sizeMax > 1)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(!_destroyed)
+ {
+ //
+ // First we reap threads that have been destroyed before.
+ //
+ int sz = static_cast<int>(_threads.size());
+ assert(_running <= sz);
+ if(_running < sz)
+ {
+ vector<IceUtil::ThreadPtr>::iterator start =
+ partition(_threads.begin(), _threads.end(), IceUtil::constMemFun(&IceUtil::Thread::isAlive));
- for(vector<IceUtil::ThreadPtr>::iterator p = start; p != _threads.end(); ++p)
- {
- (*p)->getThreadControl().join();
- }
+ for(vector<IceUtil::ThreadPtr>::iterator p = start; p != _threads.end(); ++p)
+ {
+ (*p)->getThreadControl().join();
+ }
- _threads.erase(start, _threads.end());
- }
-
- //
- // Now we check if this thread can be destroyed, based
- // on a load factor.
- //
+ _threads.erase(start, _threads.end());
+ }
+
+ //
+ // Now we check if this thread can be destroyed, based
+ // on a load factor.
+ //
- //
- // The load factor jumps immediately to the number of
- // threads that are currently in use, but decays
- // exponentially if the number of threads in use is
- // smaller than the load factor. This reflects that we
- // create threads immediately when they are needed,
- // but want the number of threads to slowly decline to
- // the configured minimum.
- //
- double inUse = static_cast<double>(_inUse);
- if(_load < inUse)
- {
- _load = inUse;
- }
- else
- {
- const double loadFactor = 0.05; // TODO: Configurable?
- const double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + inUse * loadFactor;
- }
-
- if(_running > _size)
- {
- int load = static_cast<int>(_load + 0.5);
+ //
+ // The load factor jumps immediately to the number of
+ // threads that are currently in use, but decays
+ // exponentially if the number of threads in use is
+ // smaller than the load factor. This reflects that we
+ // create threads immediately when they are needed,
+ // but want the number of threads to slowly decline to
+ // the configured minimum.
+ //
+ double inUse = static_cast<double>(_inUse);
+ if(_load < inUse)
+ {
+ _load = inUse;
+ }
+ else
+ {
+ const double loadFactor = 0.05; // TODO: Configurable?
+ const double oneMinusLoadFactor = 1 - loadFactor;
+ _load = _load * oneMinusLoadFactor + inUse * loadFactor;
+ }
+
+ if(_running > _size)
+ {
+ int load = static_cast<int>(_load + 0.5);
- //
- // We add one to the load factor because on
- // additional thread is needed for select().
- //
- if(load + 1 < _running)
- {
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
- return false;
- }
- }
-
- assert(_inUse > 0);
- --_inUse;
- }
+ //
+ // We add one to the load factor because on
+ // additional thread is needed for select().
+ //
+ if(load + 1 < _running)
+ {
+ assert(_inUse > 0);
+ --_inUse;
+
+ assert(_running > 0);
+ --_running;
+
+ return false;
+ }
+ }
+
+ assert(_inUse > 0);
+ --_inUse;
+ }
- while(!_promote)
- {
- wait();
- }
-
- _promote = false;
- }
+ while(!_promote)
+ {
+ wait();
+ }
+
+ _promote = false;
+ }
}
}
@@ -875,32 +875,32 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
if(stream.b.size() == 0)
{
- stream.b.resize(headerSize);
- stream.i = stream.b.begin();
+ stream.b.resize(headerSize);
+ stream.i = stream.b.begin();
}
if(stream.i != stream.b.end())
{
- handler->read(stream);
- assert(stream.i == stream.b.end());
+ handler->read(stream);
+ assert(stream.i == stream.b.end());
}
ptrdiff_t pos = stream.i - stream.b.begin();
if(pos < headerSize)
{
- //
- // This situation is possible for small UDP packets.
- //
- throw IllegalMessageSizeException(__FILE__, __LINE__);
+ //
+ // This situation is possible for small UDP packets.
+ //
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
}
stream.i = stream.b.begin();
const Byte* m;
stream.readBlob(m, static_cast<Int>(sizeof(magic)));
if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3])
{
- BadMagicException ex(__FILE__, __LINE__);
- ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
- throw ex;
+ BadMagicException ex(__FILE__, __LINE__);
+ ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic));
+ throw ex;
}
Byte pMajor;
Byte pMinor;
@@ -909,12 +909,12 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
if(pMajor != protocolMajor
|| static_cast<unsigned char>(pMinor) > static_cast<unsigned char>(protocolMinor))
{
- UnsupportedProtocolException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(pMajor);
- ex.badMinor = static_cast<unsigned char>(pMinor);
- ex.major = static_cast<unsigned char>(protocolMajor);
- ex.minor = static_cast<unsigned char>(protocolMinor);
- throw ex;
+ UnsupportedProtocolException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(pMajor);
+ ex.badMinor = static_cast<unsigned char>(pMinor);
+ ex.major = static_cast<unsigned char>(protocolMajor);
+ ex.minor = static_cast<unsigned char>(protocolMinor);
+ throw ex;
}
Byte eMajor;
Byte eMinor;
@@ -923,12 +923,12 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
if(eMajor != encodingMajor
|| static_cast<unsigned char>(eMinor) > static_cast<unsigned char>(encodingMinor))
{
- UnsupportedEncodingException ex(__FILE__, __LINE__);
- ex.badMajor = static_cast<unsigned char>(eMajor);
- ex.badMinor = static_cast<unsigned char>(eMinor);
- ex.major = static_cast<unsigned char>(encodingMajor);
- ex.minor = static_cast<unsigned char>(encodingMinor);
- throw ex;
+ UnsupportedEncodingException ex(__FILE__, __LINE__);
+ ex.badMajor = static_cast<unsigned char>(eMajor);
+ ex.badMinor = static_cast<unsigned char>(eMinor);
+ ex.major = static_cast<unsigned char>(encodingMajor);
+ ex.minor = static_cast<unsigned char>(encodingMinor);
+ throw ex;
}
Byte messageType;
stream.read(messageType);
@@ -938,36 +938,36 @@ IceInternal::ThreadPool::read(const EventHandlerPtr& handler)
stream.read(size);
if(size < headerSize)
{
- throw IllegalMessageSizeException(__FILE__, __LINE__);
+ throw IllegalMessageSizeException(__FILE__, __LINE__);
}
if(size > static_cast<Int>(_instance->messageSizeMax()))
{
- throw MemoryLimitException(__FILE__, __LINE__);
+ throw MemoryLimitException(__FILE__, __LINE__);
}
if(size > static_cast<Int>(stream.b.size()))
{
- stream.b.resize(size);
+ stream.b.resize(size);
}
stream.i = stream.b.begin() + pos;
if(stream.i != stream.b.end())
{
- if(handler->datagram())
- {
- if(_warnUdp)
- {
- Warning out(_instance->initializationData().logger);
- out << "DatagramLimitException: maximum size of " << pos << " exceeded";
- stream.resize(0);
- stream.i = stream.b.begin();
- }
- throw DatagramLimitException(__FILE__, __LINE__);
- }
- else
- {
- handler->read(stream);
- assert(stream.i == stream.b.end());
- }
+ if(handler->datagram())
+ {
+ if(_warnUdp)
+ {
+ Warning out(_instance->initializationData().logger);
+ out << "DatagramLimitException: maximum size of " << pos << " exceeded";
+ stream.resize(0);
+ stream.i = stream.b.begin();
+ }
+ throw DatagramLimitException(__FILE__, __LINE__);
+ }
+ else
+ {
+ handler->read(stream);
+ assert(stream.i == stream.b.end());
+ }
}
}
@@ -988,39 +988,39 @@ IceInternal::ThreadPool::EventHandlerThread::run()
try
{
- promote = _pool->run();
+ promote = _pool->run();
}
catch(const Exception& ex)
- {
- Error out(_pool->_instance->initializationData().logger);
- out << "exception in `" << _pool->_prefix << "':\n" << ex;
- promote = true;
+ {
+ Error out(_pool->_instance->initializationData().logger);
+ out << "exception in `" << _pool->_prefix << "':\n" << ex;
+ promote = true;
}
catch(const std::exception& ex)
{
- Error out(_pool->_instance->initializationData().logger);
- out << "std::exception in `" << _pool->_prefix << "':\n" << ex.what();
- promote = true;
+ Error out(_pool->_instance->initializationData().logger);
+ out << "std::exception in `" << _pool->_prefix << "':\n" << ex.what();
+ promote = true;
}
catch(...)
{
- Error out(_pool->_instance->initializationData().logger);
- out << "unknown exception in `" << _pool->_prefix << "'";
- promote = true;
+ Error out(_pool->_instance->initializationData().logger);
+ out << "unknown exception in `" << _pool->_prefix << "'";
+ promote = true;
}
if(promote && _pool->_sizeMax > 1)
{
- //
- // Promote a follower, but w/o modifying _inUse or creating
- // new threads.
- //
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get());
- assert(!_pool->_promote);
- _pool->_promote = true;
- _pool->notify();
- }
+ //
+ // Promote a follower, but w/o modifying _inUse or creating
+ // new threads.
+ //
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get());
+ assert(!_pool->_promote);
+ _pool->_promote = true;
+ _pool->notify();
+ }
}
if(_pool->_instance->initializationData().threadHook)