diff options
Diffstat (limited to 'cpp/src/Ice/SelectorThread.cpp')
-rw-r--r-- | cpp/src/Ice/SelectorThread.cpp | 177 |
1 files changed, 79 insertions, 98 deletions
diff --git a/cpp/src/Ice/SelectorThread.cpp b/cpp/src/Ice/SelectorThread.cpp index abf1834542d..0045abf2cf9 100644 --- a/cpp/src/Ice/SelectorThread.cpp +++ b/cpp/src/Ice/SelectorThread.cpp @@ -57,7 +57,7 @@ IceInternal::SelectorThread::~SelectorThread() void IceInternal::SelectorThread::destroy() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); assert(!_destroyed); _destroyed = true; _selector.setInterrupt(); @@ -66,11 +66,8 @@ IceInternal::SelectorThread::destroy() void IceInternal::SelectorThread::incFdsInUse() { - // This is windows specific since every other platform uses an API - // that doesn't have a specific FD limit. -#ifdef _WIN32 - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed); +#ifdef ICE_USE_SELECT + Lock sync(*this); _selector.incFdsInUse(); #endif } @@ -78,11 +75,8 @@ IceInternal::SelectorThread::incFdsInUse() void IceInternal::SelectorThread::decFdsInUse() { - // This is windows specific since every other platform uses an API - // that doesn't have a specific FD limit. -#ifdef _WIN32 - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(!_destroyed); +#ifdef ICE_USE_SELECT + Lock sync(*this); _selector.decFdsInUse(); #endif } @@ -90,24 +84,43 @@ IceInternal::SelectorThread::decFdsInUse() void IceInternal::SelectorThread::_register(SOCKET fd, const SocketReadyCallbackPtr& cb, SocketStatus status, int timeout) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. assert(status != Finished); - SocketInfo info(fd, cb, status, timeout); - _changes.push_back(info); - if(info.timeout >= 0) + + cb->_fd = fd; + cb->_status = status; + cb->_timeout = timeout; + if(cb->_timeout >= 0) { - _timer->schedule(info.cb, IceUtil::Time::milliSeconds(info.timeout)); + _timer->schedule(cb, IceUtil::Time::milliSeconds(cb->_timeout)); } - _selector.setInterrupt(); + + _selector.add(cb.get(), cb->_status); +} + +void +IceInternal::SelectorThread::unregister(const SocketReadyCallbackPtr& cb) +{ + // Note: unregister should only be called from the socketReady() call-back. + Lock sync(*this); + assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. + assert(cb->_fd != INVALID_SOCKET); + + _selector.remove(cb.get(), cb->_status, true); // No interrupt needed, it's always called from the selector thread. + cb->_status = Finished; } void -IceInternal::SelectorThread::unregister(SOCKET fd) +IceInternal::SelectorThread::finish(const SocketReadyCallbackPtr& cb) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + Lock sync(*this); assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories. - _changes.push_back(SocketInfo(fd, 0, Finished, 0)); + assert(cb->_fd != INVALID_SOCKET); + + _selector.remove(cb.get(), cb->_status); + + _finished.push_back(cb); _selector.setInterrupt(); } @@ -124,9 +137,6 @@ IceInternal::SelectorThread::joinWithThread() void IceInternal::SelectorThread::run() { - std::map<SOCKET, SocketInfo> socketMap; - vector<SocketInfo*> readyList; - vector<SocketInfo*> finishedList; while(true) { try @@ -140,20 +150,25 @@ IceInternal::SelectorThread::run() continue; } - assert(readyList.empty() && finishedList.empty()); - + vector<SocketReadyCallbackPtr> readyList; + bool finished = false; + { + Lock sync(*this); if(_selector.isInterrupted()) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_selector.processInterrupt()) + { + continue; + } // // There are two possiblities for an interrupt: // // 1. The selector thread has been destroyed. - // 2. A socket was registered or unregistered. + // 2. A callback is being finished (closed). // - + // // Thread destroyed? // @@ -161,65 +176,49 @@ IceInternal::SelectorThread::run() { break; } - - _selector.clearInterrupt(); - - SocketInfo& change = _changes.front(); - if(change.cb) // Registration + + do { - _selector.add(change.fd, change.status); - assert(socketMap.find(change.fd) == socketMap.end()); - socketMap.insert(make_pair(change.fd, change)); - _maxFd = max(_maxFd, change.fd); - _minFd = min(_minFd, change.fd); + SocketReadyCallbackPtr cb = _finished.front(); + cb->_status = Finished; + _finished.pop_front(); + readyList.push_back(cb); } - else // Unregistration - { - map<SOCKET, SocketInfo>::iterator r = socketMap.find(change.fd); - if(r != socketMap.end() && r->second.status != Finished) - { - if(r->second.timeout >= 0) - { - _timer->cancel(r->second.cb); - } - assert(r->second.status != Finished); - _selector.remove(r->second.fd, r->second.status); - r->second.status = Finished; - readyList.push_back(&(r->second)); - } - } - _changes.pop_front(); + while(_selector.clearInterrupt()); // As long as there are interrupts. + finished = true; } else { - // // Examine the selection key set. // - SOCKET fd; - while((fd = _selector.getNextSelected()) != INVALID_SOCKET) + SocketReadyCallbackPtr cb; + while(cb = _selector.getNextSelected()) { - map<SOCKET, SocketInfo>::iterator r = socketMap.find(fd); - if(r != socketMap.end()) - { - if(r->second.timeout >= 0) - { - _timer->cancel(r->second.cb); - } - - readyList.push_back(&(r->second)); - } + readyList.push_back(cb); } } } - for(vector<SocketInfo*>::iterator p = readyList.begin(); p != readyList.end(); ++p) + for(vector<SocketReadyCallbackPtr>::iterator p = readyList.begin(); p != readyList.end(); ++p) { - SocketInfo* info = *p; - SocketStatus status; + SocketStatus status = Finished; + SocketReadyCallbackPtr cb = *p; try { - status = info->cb->socketReady(info->status == Finished); + if(cb->_timeout >= 0) + { + _timer->cancel(cb); + } + + if(finished) + { + cb->socketFinished(); + } + else + { + status = cb->socketReady(); + } } catch(const std::exception& ex) { @@ -234,39 +233,21 @@ IceInternal::SelectorThread::run() status = Finished; } - if(status == Finished) + if(status != Finished) { - finishedList.push_back(info); - } - else if(status != info->status) - { - assert(info->status != Finished); - _selector.remove(info->fd, info->status); - info->status = status; - _selector.add(info->fd, info->status); - if(info->timeout >= 0) + if(status != cb->_status) { - _timer->schedule(info->cb, IceUtil::Time::milliSeconds(info->timeout)); + Lock sync(*this); + _selector.update(cb.get(), cb->_status, status); + cb->_status = status; } - } - } - - readyList.clear(); - - if(finishedList.empty()) - { - continue; - } - for(vector<SocketInfo*>::const_iterator q = finishedList.begin(); q != finishedList.end(); ++q) - { - if((*q)->status != Finished) - { - _selector.remove((*q)->fd, (*q)->status); + if(cb->_timeout >= 0) + { + _timer->schedule(cb, IceUtil::Time::milliSeconds(cb->_timeout)); + } } - socketMap.erase((*q)->fd); } - finishedList.clear(); } assert(_destroyed); |