diff options
Diffstat (limited to 'cpp/src/Ice/SelectorThread.cpp')
-rw-r--r-- | cpp/src/Ice/SelectorThread.cpp | 309 |
1 files changed, 309 insertions, 0 deletions
diff --git a/cpp/src/Ice/SelectorThread.cpp b/cpp/src/Ice/SelectorThread.cpp new file mode 100644 index 00000000000..b0eb5007257 --- /dev/null +++ b/cpp/src/Ice/SelectorThread.cpp @@ -0,0 +1,309 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2007 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceUtil/DisableWarnings.h> +#include <Ice/SelectorThread.h> +#include <Ice/Network.h> +#include <Ice/Instance.h> +#include <Ice/LoggerUtil.h> + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +IceUtil::Shared* IceInternal::upCast(SelectorThread* p) { return p; } + +IceInternal::SelectorThread::SelectorThread(const InstancePtr& instance) : + _instance(instance), + _destroyed(false), + _selector(instance), + _timer(_instance->timer()) +{ + + __setNoDelete(true); + try + { + _thread = new HelperThread(this); + _thread->start(); + } + catch(const IceUtil::Exception& ex) + { + { + Error out(_instance->initializationData().logger); + out << "cannot create thread for selector thread:\n" << ex; + } + __setNoDelete(false); + throw; + } + catch(...) + { + __setNoDelete(false); + throw; + } + __setNoDelete(false); +} + +IceInternal::SelectorThread::~SelectorThread() +{ + assert(_destroyed); +} + +void +IceInternal::SelectorThread::destroy() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed); + _destroyed = true; + _selector.setInterrupt(); +} + +void +IceInternal::SelectorThread::incFdsInUse() +{ + // This is windows specific since every other platform uses an API + // that doesn't have a specific FD limit. +#ifdef _WIN32 + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed); + _selector.incFdsInUse(); +#endif +} + +void +IceInternal::SelectorThread::decFdsInUse() +{ + // This is windows specific since every other platform uses an API + // that doesn't have a specific FD limit. +#ifdef _WIN32 + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed); + _selector.decFdsInUse(); +#endif +} + +void +IceInternal::SelectorThread::_register(SOCKET fd, const SocketReadyCallbackPtr& cb, SocketStatus status, int timeout) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. + assert(status != Finished); + SocketInfo info(fd, cb, status, timeout); + _changes.push_back(info); + if(info.timeout >= 0) + { + _timer->schedule(info.cb, IceUtil::Time::milliSeconds(info.timeout)); + } + _selector.setInterrupt(); +} + +void +IceInternal::SelectorThread::unregister(SOCKET fd) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. + _changes.push_back(SocketInfo(fd, 0, Finished, 0)); + _selector.setInterrupt(); +} + +void +IceInternal::SelectorThread::joinWithThread() +{ + assert(_destroyed); + if(_thread) + { + _thread->getThreadControl().join(); + } +} + +void +IceInternal::SelectorThread::run() +{ + std::map<SOCKET, SocketInfo> socketMap; + vector<SocketInfo*> readyList; + vector<SocketInfo*> finishedList; + while(true) + { + try + { + _selector.select(); + } + catch(const Ice::LocalException& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in selector thread:\n" << ex; + continue; + } + + assert(readyList.empty() && finishedList.empty()); + + { + if(_selector.isInterrupted()) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // There are two possiblities for an interrupt: + // + // 1. The selector thread has been destroyed. + // 2. A socket was registered or unregistered. + // + + // + // Thread destroyed? + // + if(_destroyed) + { + break; + } + + _selector.clearInterrupt(); + + SocketInfo& change = _changes.front(); + if(change.cb) // Registration + { + _selector.add(change.fd, change.status); + assert(socketMap.find(change.fd) == socketMap.end()); + socketMap.insert(make_pair(change.fd, change)); + _maxFd = max(_maxFd, change.fd); + _minFd = min(_minFd, change.fd); + } + else // Unregistration + { + map<SOCKET, SocketInfo>::iterator r = socketMap.find(change.fd); + if(r != socketMap.end() && r->second.status != Finished) + { + if(r->second.timeout >= 0) + { + _timer->cancel(r->second.cb); + } + assert(r->second.status != Finished); + _selector.remove(r->second.fd, r->second.status); + r->second.status = Finished; + readyList.push_back(&(r->second)); + } + } + _changes.pop_front(); + } + else + { + + // + // Examine the selection key set. + // + SOCKET fd; + while((fd = _selector.getNextSelected()) != INVALID_SOCKET) + { + map<SOCKET, SocketInfo>::iterator r = socketMap.find(fd); + if(r != socketMap.end()) + { + if(r->second.timeout >= 0) + { + _timer->cancel(r->second.cb); + } + + readyList.push_back(&(r->second)); + } + } + } + } + + for(vector<SocketInfo*>::iterator p = readyList.begin(); p != readyList.end(); ++p) + { + SocketInfo* info = *p; + SocketStatus status; + try + { + status = info->cb->socketReady(info->status == Finished); + } + catch(const std::exception& ex) + { + Error out(_instance->initializationData().logger); + out << "exception in selector thread while calling socketReady():\n" << ex.what(); + status = Finished; + } + catch(...) + { + Error out(_instance->initializationData().logger); + out << "unknown exception in selector thread while calling socketReady()"; + status = Finished; + } + + if(status == Finished) + { + finishedList.push_back(info); + } + else if(status != info->status) + { + assert(info->status != Finished); + _selector.remove(info->fd, info->status); + info->status = status; + _selector.add(info->fd, info->status); + if(info->timeout >= 0) + { + _timer->schedule(info->cb, IceUtil::Time::milliSeconds(info->timeout)); + } + } + } + + readyList.clear(); + + if(finishedList.empty()) + { + continue; + } + + for(vector<SocketInfo*>::const_iterator p = finishedList.begin(); p != finishedList.end(); ++p) + { + if((*p)->status != Finished) + { + _selector.remove((*p)->fd, (*p)->status); + } + socketMap.erase((*p)->fd); + } + finishedList.clear(); + } + + assert(_destroyed); +} + +IceInternal::SelectorThread::HelperThread::HelperThread(const SelectorThreadPtr& selectorThread) : + _selectorThread(selectorThread) +{ +} + +void +IceInternal::SelectorThread::HelperThread::run() +{ + if(_selectorThread->_instance->initializationData().threadHook) + { + _selectorThread->_instance->initializationData().threadHook->start(); + } + + try + { + _selectorThread->run(); + } + catch(const std::exception& ex) + { + Error out(_selectorThread->_instance->initializationData().logger); + out << "exception in selector thread:\n" << ex.what(); + } + catch(...) + { + Error out(_selectorThread->_instance->initializationData().logger); + out << "unknown exception in selector thread"; + } + + if(_selectorThread->_instance->initializationData().threadHook) + { + _selectorThread->_instance->initializationData().threadHook->stop(); + } + + _selectorThread = 0; // Break cyclic dependency. +} |