summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorMark Spruiell <mes@zeroc.com>2014-03-19 12:45:55 -0700
committerMark Spruiell <mes@zeroc.com>2014-03-19 12:45:55 -0700
commitcdcffbcc3c3c052afdeb772ff0167e7a90b525bb (patch)
tree4f16ee41ef7d33394c44e9db81e4d6cd89908250 /cpp/src/Ice/ThreadPool.cpp
parentfixing testicedist.py for 5487 (diff)
downloadice-cdcffbcc3c3c052afdeb772ff0167e7a90b525bb.tar.bz2
ice-cdcffbcc3c3c052afdeb772ff0167e7a90b525bb.tar.xz
ice-cdcffbcc3c3c052afdeb772ff0167e7a90b525bb.zip
merging javascript branch
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp138
1 files changed, 128 insertions, 10 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 2234e8c7ceb..bdfe1a095ac 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -1,6 +1,6 @@
// **********************************************************************
//
-// Copyright (c) 2003-2013 ZeroC, Inc. All rights reserved.
+// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
@@ -108,6 +108,29 @@ private:
IceUtil::ThreadPtr _thread;
};
+class InterruptWorkItem : public ThreadPoolWorkItem
+{
+public:
+
+ virtual void
+ execute(ThreadPoolCurrent& current)
+ {
+ // Nothing to do, this is just used to interrupt the thread pool selector.
+ }
+};
+ThreadPoolWorkItemPtr interruptWorkItem;
+
+class InterruptWorkItemInit
+{
+public:
+
+ InterruptWorkItemInit()
+ {
+ interruptWorkItem = new InterruptWorkItem;
+ }
+};
+InterruptWorkItemInit init;
+
//
// Exception raised by the thread pool work queue when the thread pool
// is destroyed.
@@ -561,7 +584,33 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation
{
Lock sync(*this);
assert(!_destroyed);
+
+ // Don't remove what needs to be added
+ remove = static_cast<SocketOperation>(remove & ~add);
+
+ // Don't remove/add if already un-registered or registered
+ remove = static_cast<SocketOperation>(handler->_registered & remove);
+ add = static_cast<SocketOperation>(~handler->_registered & add);
+ if(remove == add)
+ {
+ return;
+ }
+
_selector.update(handler.get(), remove, add);
+#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
+ if(add & SocketOperationRead && handler->_hasMoreData && !(handler->_disabled & SocketOperationRead))
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(handler.get());
+ }
+ else if(remove & SocketOperationRead)
+ {
+ _pendingHandlers.erase(handler.get());
+ }
+#endif
}
void
@@ -577,8 +626,8 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler)
// Clear the current ready handlers. The handlers from this vector can't be
// reference counted and a handler might get destroyed once it's finished.
//
- _handlers.clear();
- _nextHandler = _handlers.end();
+ //_handlers.clear();
+ //_nextHandler = _handlers.end();
#else
// If there are no pending asynchronous operations, we can call finish on the handler now.
if(!handler->_pending)
@@ -682,6 +731,19 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
if(select)
{
_handlers.swap(handlers);
+ if(!_pendingHandlers.empty())
+ {
+ for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler)
+ {
+ _pendingHandlers.erase(_nextHandler->first);
+ }
+ set<EventHandler*>::const_iterator p;
+ for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p)
+ {
+ _handlers.push_back(make_pair(*p, SocketOperationRead));
+ }
+ _pendingHandlers.clear();
+ }
_nextHandler = _handlers.begin();
_selector.finishSelect();
select = false;
@@ -700,6 +762,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// the IO thread count now.
//
--_inUseIO;
+ if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(current._handler.get());
+ }
}
else
{
@@ -707,7 +777,18 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// If the handler called ioCompleted(), we re-enable the handler in
// case it was disabled and we decrease the number of thread in use.
//
- _selector.enable(current._handler.get(), current.operation);
+ if(_serialize)
+ {
+ _selector.enable(current._handler.get(), current.operation);
+ if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(current._handler.get());
+ }
+ }
assert(_inUse > 0);
--_inUse;
}
@@ -717,10 +798,22 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
return; // Wait timed-out.
}
}
+ else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(current._handler.get());
+ }
//
// Get the next ready handler.
//
+ while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered))
+ {
+ ++_nextHandler;
+ }
if(_nextHandler != _handlers.end())
{
current._ioCompleted = false;
@@ -748,6 +841,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
}
else
{
+ _handlers.clear();
_selector.startSelect();
select = true;
thread->setState(ThreadStateIdle);
@@ -774,7 +868,8 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
try
{
current._ioCompleted = false;
- current._handler = _selector.getNextHandler(current.operation, _threadIdleTime);
+ current._handler = _selector.getNextHandler(current.operation, current._count, current._error,
+ _threadIdleTime);
}
catch(const SelectorTimeoutException&)
{
@@ -819,7 +914,8 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
try
{
- current._handler = _selector.getNextHandler(current.operation, _serverIdleTime);
+ current._handler = _selector.getNextHandler(current.operation, current._count, current._error,
+ _serverIdleTime);
}
catch(const SelectorTimeoutException&)
{
@@ -889,13 +985,30 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
if(_sizeMax > 1)
{
+
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
--_inUseIO;
-
- if(_serialize && !_destroyed)
+
+ if(!_destroyed)
{
- _selector.disable(current._handler.get(), current.operation);
- }
+ if(_serialize)
+ {
+ _selector.disable(current._handler.get(), current.operation);
+
+ // Make sure the handler isn't in the set of pending handlers (this can
+ // for example occur if the handler is has more data and its added by
+ // ThreadPool::update while we were processing IO).
+ _pendingHandlers.erase(current._handler.get());
+ }
+ else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
+ {
+ if(_pendingHandlers.empty())
+ {
+ _workQueue->queue(interruptWorkItem); // Interrupt select()
+ }
+ _pendingHandlers.insert(current._handler.get());
+ }
+ }
if(current._leader)
{
@@ -967,6 +1080,11 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
assert(!(current._handler->_ready & current.operation));
current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation);
current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation);
+
+ AsyncInfo* info = current._handler->getNativeInfo()->getAsyncInfo(current.operation);
+ info->count = current._count;
+ info->error = current._error;
+
if(!current._handler->finishAsync(current.operation)) // Returns false if the handler is finished.
{
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);