summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/SelectorThread.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/SelectorThread.cpp')
-rw-r--r--cpp/src/Ice/SelectorThread.cpp177
1 files changed, 79 insertions, 98 deletions
diff --git a/cpp/src/Ice/SelectorThread.cpp b/cpp/src/Ice/SelectorThread.cpp
index abf1834542d..0045abf2cf9 100644
--- a/cpp/src/Ice/SelectorThread.cpp
+++ b/cpp/src/Ice/SelectorThread.cpp
@@ -57,7 +57,7 @@ IceInternal::SelectorThread::~SelectorThread()
void
IceInternal::SelectorThread::destroy()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
assert(!_destroyed);
_destroyed = true;
_selector.setInterrupt();
@@ -66,11 +66,8 @@ IceInternal::SelectorThread::destroy()
void
IceInternal::SelectorThread::incFdsInUse()
{
- // This is windows specific since every other platform uses an API
- // that doesn't have a specific FD limit.
-#ifdef _WIN32
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed);
+#ifdef ICE_USE_SELECT
+ Lock sync(*this);
_selector.incFdsInUse();
#endif
}
@@ -78,11 +75,8 @@ IceInternal::SelectorThread::incFdsInUse()
void
IceInternal::SelectorThread::decFdsInUse()
{
- // This is windows specific since every other platform uses an API
- // that doesn't have a specific FD limit.
-#ifdef _WIN32
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- assert(!_destroyed);
+#ifdef ICE_USE_SELECT
+ Lock sync(*this);
_selector.decFdsInUse();
#endif
}
@@ -90,24 +84,43 @@ IceInternal::SelectorThread::decFdsInUse()
void
IceInternal::SelectorThread::_register(SOCKET fd, const SocketReadyCallbackPtr& cb, SocketStatus status, int timeout)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
assert(status != Finished);
- SocketInfo info(fd, cb, status, timeout);
- _changes.push_back(info);
- if(info.timeout >= 0)
+
+ cb->_fd = fd;
+ cb->_status = status;
+ cb->_timeout = timeout;
+ if(cb->_timeout >= 0)
{
- _timer->schedule(info.cb, IceUtil::Time::milliSeconds(info.timeout));
+ _timer->schedule(cb, IceUtil::Time::milliSeconds(cb->_timeout));
}
- _selector.setInterrupt();
+
+ _selector.add(cb.get(), cb->_status);
+}
+
+void
+IceInternal::SelectorThread::unregister(const SocketReadyCallbackPtr& cb)
+{
+ // Note: unregister should only be called from the socketReady() call-back.
+ Lock sync(*this);
+ assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
+ assert(cb->_fd != INVALID_SOCKET);
+
+ _selector.remove(cb.get(), cb->_status, true); // No interrupt needed, it's always called from the selector thread.
+ cb->_status = Finished;
}
void
-IceInternal::SelectorThread::unregister(SOCKET fd)
+IceInternal::SelectorThread::finish(const SocketReadyCallbackPtr& cb)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ Lock sync(*this);
assert(!_destroyed); // The selector thread is destroyed after the incoming/outgoing connection factories.
- _changes.push_back(SocketInfo(fd, 0, Finished, 0));
+ assert(cb->_fd != INVALID_SOCKET);
+
+ _selector.remove(cb.get(), cb->_status);
+
+ _finished.push_back(cb);
_selector.setInterrupt();
}
@@ -124,9 +137,6 @@ IceInternal::SelectorThread::joinWithThread()
void
IceInternal::SelectorThread::run()
{
- std::map<SOCKET, SocketInfo> socketMap;
- vector<SocketInfo*> readyList;
- vector<SocketInfo*> finishedList;
while(true)
{
try
@@ -140,20 +150,25 @@ IceInternal::SelectorThread::run()
continue;
}
- assert(readyList.empty() && finishedList.empty());
-
+ vector<SocketReadyCallbackPtr> readyList;
+ bool finished = false;
+
{
+ Lock sync(*this);
if(_selector.isInterrupted())
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ if(_selector.processInterrupt())
+ {
+ continue;
+ }
//
// There are two possiblities for an interrupt:
//
// 1. The selector thread has been destroyed.
- // 2. A socket was registered or unregistered.
+ // 2. A callback is being finished (closed).
//
-
+
//
// Thread destroyed?
//
@@ -161,65 +176,49 @@ IceInternal::SelectorThread::run()
{
break;
}
-
- _selector.clearInterrupt();
-
- SocketInfo& change = _changes.front();
- if(change.cb) // Registration
+
+ do
{
- _selector.add(change.fd, change.status);
- assert(socketMap.find(change.fd) == socketMap.end());
- socketMap.insert(make_pair(change.fd, change));
- _maxFd = max(_maxFd, change.fd);
- _minFd = min(_minFd, change.fd);
+ SocketReadyCallbackPtr cb = _finished.front();
+ cb->_status = Finished;
+ _finished.pop_front();
+ readyList.push_back(cb);
}
- else // Unregistration
- {
- map<SOCKET, SocketInfo>::iterator r = socketMap.find(change.fd);
- if(r != socketMap.end() && r->second.status != Finished)
- {
- if(r->second.timeout >= 0)
- {
- _timer->cancel(r->second.cb);
- }
- assert(r->second.status != Finished);
- _selector.remove(r->second.fd, r->second.status);
- r->second.status = Finished;
- readyList.push_back(&(r->second));
- }
- }
- _changes.pop_front();
+ while(_selector.clearInterrupt()); // As long as there are interrupts.
+ finished = true;
}
else
{
-
//
// Examine the selection key set.
//
- SOCKET fd;
- while((fd = _selector.getNextSelected()) != INVALID_SOCKET)
+ SocketReadyCallbackPtr cb;
+ while(cb = _selector.getNextSelected())
{
- map<SOCKET, SocketInfo>::iterator r = socketMap.find(fd);
- if(r != socketMap.end())
- {
- if(r->second.timeout >= 0)
- {
- _timer->cancel(r->second.cb);
- }
-
- readyList.push_back(&(r->second));
- }
+ readyList.push_back(cb);
}
}
}
- for(vector<SocketInfo*>::iterator p = readyList.begin(); p != readyList.end(); ++p)
+ for(vector<SocketReadyCallbackPtr>::iterator p = readyList.begin(); p != readyList.end(); ++p)
{
- SocketInfo* info = *p;
- SocketStatus status;
+ SocketStatus status = Finished;
+ SocketReadyCallbackPtr cb = *p;
try
{
- status = info->cb->socketReady(info->status == Finished);
+ if(cb->_timeout >= 0)
+ {
+ _timer->cancel(cb);
+ }
+
+ if(finished)
+ {
+ cb->socketFinished();
+ }
+ else
+ {
+ status = cb->socketReady();
+ }
}
catch(const std::exception& ex)
{
@@ -234,39 +233,21 @@ IceInternal::SelectorThread::run()
status = Finished;
}
- if(status == Finished)
+ if(status != Finished)
{
- finishedList.push_back(info);
- }
- else if(status != info->status)
- {
- assert(info->status != Finished);
- _selector.remove(info->fd, info->status);
- info->status = status;
- _selector.add(info->fd, info->status);
- if(info->timeout >= 0)
+ if(status != cb->_status)
{
- _timer->schedule(info->cb, IceUtil::Time::milliSeconds(info->timeout));
+ Lock sync(*this);
+ _selector.update(cb.get(), cb->_status, status);
+ cb->_status = status;
}
- }
- }
-
- readyList.clear();
-
- if(finishedList.empty())
- {
- continue;
- }
- for(vector<SocketInfo*>::const_iterator q = finishedList.begin(); q != finishedList.end(); ++q)
- {
- if((*q)->status != Finished)
- {
- _selector.remove((*q)->fd, (*q)->status);
+ if(cb->_timeout >= 0)
+ {
+ _timer->schedule(cb, IceUtil::Time::milliSeconds(cb->_timeout));
+ }
}
- socketMap.erase((*q)->fd);
}
- finishedList.clear();
}
assert(_destroyed);