diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
commit | c6dbd090d9691cc0116a2967b2827b858b184dfe (patch) | |
tree | 6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /cpp/src/Ice/ThreadPool.cpp | |
parent | Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff) | |
download | ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2 ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip |
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 205 |
1 files changed, 111 insertions, 94 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index d0db3c20276..07d7b08492e 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -33,6 +33,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _size(0), _sizeMax(0), _sizeWarn(0), + _serialize(_instance->initializationData().properties->getPropertyAsInt(_prefix + ".Serialize") > 0), _stackSize(0), _running(0), _inUse(0), @@ -125,7 +126,6 @@ IceInternal::ThreadPool::destroy() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); - assert(_handlerMap.empty()); _destroyed = true; _selector.setInterrupt(); } @@ -133,9 +133,7 @@ IceInternal::ThreadPool::destroy() void IceInternal::ThreadPool::incFdsInUse() { - // This is windows specific since every other platform uses an API - // that doesn't have a specific FD limit. -#ifdef _WIN32 +#ifdef ICE_USE_SELECT IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); _selector.incFdsInUse(); #endif @@ -144,29 +142,56 @@ IceInternal::ThreadPool::incFdsInUse() void IceInternal::ThreadPool::decFdsInUse() { - // This is windows specific since every other platform uses an API - // that doesn't have a specific FD limit. -#ifdef _WIN32 +#ifdef ICE_USE_SELECT IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); _selector.decFdsInUse(); #endif } void -IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) +IceInternal::ThreadPool::_register(const EventHandlerPtr& handler) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed); - _changes.push_back(make_pair(fd, handler)); - _selector.setInterrupt(); + assert(!_destroyed && handler->_fd != INVALID_SOCKET); + if(!handler->_registered) + { + if(!handler->_serializing) + { + _selector.add(handler.get(), NeedRead); + } + handler->_registered = true; + } } void -IceInternal::ThreadPool::unregister(SOCKET fd) +IceInternal::ThreadPool::unregister(const EventHandlerPtr& handler) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed); - _changes.push_back(make_pair(fd, EventHandlerPtr(0))); + assert(!_destroyed && handler->_fd != INVALID_SOCKET); + if(handler->_registered) + { + if(!handler->_serializing) + { + _selector.remove(handler.get(), NeedRead); + } + handler->_registered = false; + } +} + +void +IceInternal::ThreadPool::finish(const EventHandlerPtr& handler) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed && handler->_fd != INVALID_SOCKET); + if(handler->_registered) + { + if(!handler->_serializing) + { + _selector.remove(handler.get(), NeedRead); + } + handler->_registered = false; + } + _finished.push_back(handler); _selector.setInterrupt(); } @@ -183,12 +208,21 @@ IceInternal::ThreadPool::execute(const ThreadPoolWorkItemPtr& workItem) } void -IceInternal::ThreadPool::promoteFollower() +IceInternal::ThreadPool::promoteFollower(EventHandler* handler) { if(_sizeMax > 1) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_serialize && handler) + { + handler->_serializing = true; + if(handler->_registered) + { + _selector.remove(handler, NeedRead, true); // No interrupt, no thread is blocked on select(). + } + } + assert(!_promote); _promote = true; notify(); @@ -283,85 +317,63 @@ IceInternal::ThreadPool::run() bool finished = false; bool shutdown = false; + if(ret == 0) // We initiate a shutdown if there is a thread pool timeout. + { + shutdown = true; + } + else { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(ret == 0) // We initiate a shutdown if there is a thread pool timeout. + if(_selector.isInterrupted()) { - shutdown = true; - } - else - { - if(_selector.isInterrupted()) + if(_selector.processInterrupt()) + { + continue; + } + + // + // There are three possiblities for an interrupt: + // + // 1. An event handler is being finished (closed). + // + // 2. The thread pool has been destroyed. + // + // 3. A work item has been schedulded. + // + if(!_finished.empty()) + { + _selector.clearInterrupt(); + handler = _finished.front(); + _finished.pop_front(); + finished = true; + } + else if(!_workItems.empty()) { // - // There are two possiblities for an interrupt: - // - // 1. The thread pool has been destroyed. + // Work items must be executed first even if the thread pool is destroyed. // - // 2. An event handler was registered or unregistered. + _selector.clearInterrupt(); + workItem = _workItems.front(); + _workItems.pop_front(); + } + else if(_destroyed) + { // - // 3. A work item has been schedulded. + // Don't clear the interrupt if destroyed, so that the other threads exit as well. // - - if(!_workItems.empty()) - { - // - // Work items must be executed first even if the thread pool is destroyed. - // - _selector.clearInterrupt(); - workItem = _workItems.front(); - _workItems.pop_front(); - } - else if(_destroyed) - { - // - // Don't clear the interrupt if destroyed, so that the other threads exit as well. - // - return true; - } - else - { - // - // An event handler must have been registered or unregistered. - // - _selector.clearInterrupt(); - - assert(!_changes.empty()); - pair<SOCKET, EventHandlerPtr> change = _changes.front(); - _changes.pop_front(); - - if(change.second) // Addition if handler is set. - { - _handlerMap.insert(change); - _selector.add(change.first, NeedRead); - continue; - } - else // Removal if handler is not set. - { - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(change.first); - assert(p != _handlerMap.end()); - handler = p->second; - finished = true; - _handlerMap.erase(p); - _selector.remove(change.first, NeedRead); - // Don't continue; we have to call finished() on the event handler below, - // outside the thread synchronization. - } - } + return true; } else { - SOCKET fd = _selector.getNextSelected(); - map<SOCKET, EventHandlerPtr>::iterator p = _handlerMap.find(fd); - if(p == _handlerMap.end()) - { - Error out(_instance->initializationData().logger); - out << "filedescriptor " << fd << " not registered with `" << _prefix << "'"; - continue; - } - - handler = p->second; + assert(false); + } + } + else + { + handler = _selector.getNextSelected(); + if(!handler) + { + continue; } } } @@ -423,14 +435,12 @@ IceInternal::ThreadPool::run() if(finished) { // - // Notify a handler about it's removal from the thread - // pool. + // Notify a handler about it's removal from the thread pool. // try { // - // "self" is faster than "this", as the reference - // count is not modified. + // "self" is faster than "this", as the reference count is not modified. // handler->finished(self); } @@ -451,8 +461,7 @@ IceInternal::ThreadPool::run() else { // - // If the handler is "readable", try to read a - // message. + // If the handler is "readable", try to read a message. // BasicStream stream(_instance.get()); if(handler->readable()) @@ -505,8 +514,7 @@ IceInternal::ThreadPool::run() try { // - // "self" is faster than "this", as the reference - // count is not modified. + // "self" is faster than "this", as the reference count is not modified. // handler->message(stream, self); } @@ -529,9 +537,19 @@ IceInternal::ThreadPool::run() if(_sizeMax > 1) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + if(!_destroyed) { + if(_serialize && handler && handler->_serializing) + { + if(handler->_registered) + { + // No interrupt if no thread is blocked on select (_promote == true) + _selector.add(handler.get(), NeedRead, _promote); + } + handler->_serializing = false; + } + // // First we reap threads that have been destroyed before. // @@ -599,8 +617,7 @@ IceInternal::ThreadPool::run() assert(_inUse > 0); --_inUse; } - - + while(!_promote) { wait(); |