summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ThreadPool.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-10-09 15:00:57 +0200
committerBenoit Foucher <benoit@zeroc.com>2015-10-09 15:00:57 +0200
commit20b6c0ccb95118ffc685826904a8edd06a38ac1b (patch)
tree1b389964fa35ca9de23c548120ecedcc9d82074c /cpp/src/Ice/ThreadPool.cpp
parentMerge branch '3.6' (diff)
downloadice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.bz2
ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.tar.xz
ice-20b6c0ccb95118ffc685826904a8edd06a38ac1b.zip
Added ready callback to allow transports to signal readiness to the thread pool
Diffstat (limited to 'cpp/src/Ice/ThreadPool.cpp')
-rw-r--r--cpp/src/Ice/ThreadPool.cpp322
1 files changed, 89 insertions, 233 deletions
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 7ff3f57db4b..0de8b971559 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -99,29 +99,6 @@ private:
IceUtil::ThreadPtr _thread;
};
-class InterruptWorkItem : public ThreadPoolWorkItem
-{
-public:
-
- virtual void
- execute(ThreadPoolCurrent& current)
- {
- // Nothing to do, this is just used to interrupt the thread pool selector.
- }
-};
-ThreadPoolWorkItemPtr interruptWorkItem;
-
-class InterruptWorkItemInit
-{
-public:
-
- InterruptWorkItemInit()
- {
- interruptWorkItem = new InterruptWorkItem;
- }
-};
-InterruptWorkItemInit init;
-
//
// Exception raised by the thread pool work queue when the thread pool
// is destroyed.
@@ -147,77 +124,38 @@ IceInternal::DispatchWorkItem::execute(ThreadPoolCurrent& current)
current.dispatchFromThisThread(this);
}
-IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(const InstancePtr& instance, Selector& selector) :
- _instance(instance),
- _selector(selector),
+IceInternal::ThreadPoolWorkQueue::ThreadPoolWorkQueue(ThreadPool& threadPool) :
+ _threadPool(threadPool),
_destroyed(false)
-#ifdef ICE_USE_IOCP
- , _info(SocketOperationRead)
-#endif
{
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- SOCKET fds[2];
- createPipe(fds);
- _fdIntrRead = fds[0];
- _fdIntrWrite = fds[1];
-
- _selector.initialize(this);
- _selector.update(this, SocketOperationNone, SocketOperationRead);
-#endif
-}
-
-IceInternal::ThreadPoolWorkQueue::~ThreadPoolWorkQueue()
-{
- assert(_destroyed);
-
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- try
- {
- closeSocket(_fdIntrRead);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in selector while calling closeSocket():\n" << ex;
- }
-
- try
- {
- closeSocket(_fdIntrWrite);
- }
- catch(const LocalException& ex)
- {
- Error out(_instance->initializationData().logger);
- out << "exception in selector while calling closeSocket():\n" << ex;
- }
-#endif
+ _registered = SocketOperationRead;
}
void
IceInternal::ThreadPoolWorkQueue::destroy()
{
- Lock sync(*this);
+ //Lock sync(*this); Called with the thread pool locked
assert(!_destroyed);
_destroyed = true;
- postMessage();
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ _threadPool._selector.completed(this, SocketOperationRead);
+#else
+ _threadPool._selector.ready(this, SocketOperationRead, true);
+#endif
}
void
IceInternal::ThreadPoolWorkQueue::queue(const ThreadPoolWorkItemPtr& item)
{
- Lock sync(*this);
- if(_destroyed)
- {
- throw CommunicatorDestroyedException(__FILE__, __LINE__);
- }
+ //Lock sync(*this); Called with the thread pool locked
_workItems.push_back(item);
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
+ _threadPool._selector.completed(this, SocketOperationRead);
+#else
if(_workItems.size() == 1)
{
- postMessage();
+ _threadPool._selector.ready(this, SocketOperationRead, true);
}
-#else
- postMessage();
#endif
}
@@ -242,47 +180,24 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
{
ThreadPoolWorkItemPtr workItem;
{
- Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_threadPool);
if(!_workItems.empty())
{
workItem = _workItems.front();
_workItems.pop_front();
-
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- if(_workItems.empty())
- {
- char c;
- while(true)
- {
- ssize_t ret;
-# ifdef _WIN32
- ret = ::recv(_fdIntrRead, &c, 1, 0);
-# else
- ret = ::read(_fdIntrRead, &c, 1);
-# endif
- if(ret == SOCKET_ERROR)
- {
- if(interrupted())
- {
- continue;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- break;
- }
- }
-#endif
}
+#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
else
{
assert(_destroyed);
-#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
- postMessage();
-#endif
+ _threadPool._selector.completed(this, SocketOperationRead);
}
+#else
+ if(_workItems.empty() && !_destroyed)
+ {
+ _threadPool._selector.ready(this, SocketOperationRead, false);
+ }
+#endif
}
if(workItem)
@@ -291,6 +206,7 @@ IceInternal::ThreadPoolWorkQueue::message(ThreadPoolCurrent& current)
}
else
{
+ assert(_destroyed);
current.ioCompleted();
throw ThreadPoolDestroyedException();
}
@@ -311,47 +227,7 @@ IceInternal::ThreadPoolWorkQueue::toString() const
NativeInfoPtr
IceInternal::ThreadPoolWorkQueue::getNativeInfo()
{
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- return new NativeInfo(_fdIntrRead);
-#else
return 0;
-#endif
-}
-
-void
-IceInternal::ThreadPoolWorkQueue::postMessage()
-{
-#if defined(ICE_USE_IOCP)
- if(!PostQueuedCompletionStatus(_selector.getIOCPHandle(), 0, reinterpret_cast<ULONG_PTR>(this), &_info))
- {
- SocketException ex(__FILE__, __LINE__);
- ex.error = GetLastError();
- throw ex;
- }
-#elif defined(ICE_OS_WINRT)
- _selector.completed(this, SocketOperationRead);
-#else
- char c = 0;
- while(true)
- {
-# ifdef _WIN32
- if(::send(_fdIntrWrite, &c, 1, 0) == SOCKET_ERROR)
-# else
- if(::write(_fdIntrWrite, &c, 1) == SOCKET_ERROR)
-# endif
- {
- if(interrupted())
- {
- continue;
- }
-
- SocketException ex(__FILE__, __LINE__);
- ex.error = getSocketErrno();
- throw ex;
- }
- break;
- }
-#endif
}
IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& prefix, int timeout) :
@@ -469,7 +345,8 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
const_cast<int&>(_priority) = properties->getPropertyAsInt("Ice.ThreadPriority");
}
- _workQueue = new ThreadPoolWorkQueue(_instance, _selector);
+ _workQueue = new ThreadPoolWorkQueue(*this);
+ _selector.initialize(_workQueue.get());
if(_instance->traceLevels()->threadPool >= 1)
{
@@ -528,7 +405,6 @@ IceInternal::ThreadPool::destroy()
{
return;
}
-
_destroyed = true;
_workQueue->destroy();
}
@@ -549,6 +425,28 @@ IceInternal::ThreadPool::initialize(const EventHandlerPtr& handler)
Lock sync(*this);
assert(!_destroyed);
_selector.initialize(handler.get());
+
+ class ReadyCallbackI : public ReadyCallback
+ {
+ public:
+
+ ReadyCallbackI(const ThreadPoolPtr& threadPool, const EventHandlerPtr& handler) :
+ _threadPool(threadPool), _handler(handler)
+ {
+ }
+
+ virtual void
+ ready(SocketOperation op, bool value)
+ {
+ _threadPool->ready(_handler, op, value);
+ }
+
+ private:
+
+ const ThreadPoolPtr _threadPool;
+ const EventHandlerPtr _handler;
+ };
+ handler->getNativeInfo()->setReadyCallback(new ReadyCallbackI(this, handler));
}
void
@@ -569,20 +467,6 @@ IceInternal::ThreadPool::update(const EventHandlerPtr& handler, SocketOperation
}
_selector.update(handler.get(), remove, add);
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- if(add & SocketOperationRead && handler->_hasMoreData && !(handler->_disabled & SocketOperationRead))
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(handler.get());
- }
- else if(remove & SocketOperationRead)
- {
- _pendingHandlers.erase(handler.get());
- }
-#endif
}
bool
@@ -592,7 +476,6 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow)
assert(!_destroyed);
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
closeNow = _selector.finish(handler.get(), closeNow); // This must be called before!
- _pendingHandlers.erase(handler.get());
_workQueue->queue(new FinishedWorkItem(handler, !closeNow));
return closeNow;
#else
@@ -611,6 +494,17 @@ IceInternal::ThreadPool::finish(const EventHandlerPtr& handler, bool closeNow)
}
void
+IceInternal::ThreadPool::ready(const EventHandlerPtr& handler, SocketOperation op, bool value)
+{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ return;
+ }
+ _selector.ready(handler.get(), op, value);
+}
+
+void
IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workItem)
{
if(_dispatcher)
@@ -645,6 +539,11 @@ IceInternal::ThreadPool::dispatchFromThisThread(const DispatchWorkItemPtr& workI
void
IceInternal::ThreadPool::dispatch(const DispatchWorkItemPtr& workItem)
{
+ Lock sync(*this);
+ if(_destroyed)
+ {
+ throw CommunicatorDestroyedException(__FILE__, __LINE__);
+ }
_workQueue->queue(workItem);
}
@@ -663,10 +562,6 @@ IceInternal::ThreadPool::joinWithAllThreads()
{
(*p)->getThreadControl().join();
}
-
-#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
- _selector.finish(_workQueue.get(), true);
-#endif
_selector.destroy();
}
@@ -682,7 +577,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
ThreadPoolCurrent current(_instance, this, thread);
bool select = false;
- vector<pair<EventHandler*, SocketOperation> > handlers;
while(true)
{
if(current._handler)
@@ -711,7 +605,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
{
try
{
- _selector.select(handlers, _serverIdleTime);
+ _selector.select(_serverIdleTime);
}
catch(const SelectorTimeoutException&)
{
@@ -730,22 +624,8 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
{
if(select)
{
- _handlers.swap(handlers);
- if(!_pendingHandlers.empty())
- {
- for(_nextHandler = _handlers.begin(); _nextHandler != _handlers.end(); ++_nextHandler)
- {
- _pendingHandlers.erase(_nextHandler->first);
- }
- set<EventHandler*>::const_iterator p;
- for(p = _pendingHandlers.begin(); p != _pendingHandlers.end(); ++p)
- {
- _handlers.push_back(make_pair(*p, SocketOperationRead));
- }
- _pendingHandlers.clear();
- }
+ _selector.finishSelect(_handlers);
_nextHandler = _handlers.begin();
- _selector.finishSelect();
select = false;
}
else if(!current._leader && followerWait(current))
@@ -762,14 +642,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
// the IO thread count now.
//
--_inUseIO;
- if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
}
else
{
@@ -780,14 +652,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
if(_serialize)
{
_selector.enable(current._handler.get(), current.operation);
- if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
}
assert(_inUse > 0);
--_inUse;
@@ -798,19 +662,12 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
return; // Wait timed-out.
}
}
- else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
- }
//
// Get the next ready handler.
//
- while(_nextHandler != _handlers.end() && !(_nextHandler->second & _nextHandler->first->_registered))
+ while(_nextHandler != _handlers.end() &&
+ !(_nextHandler->second & ~_nextHandler->first->_disabled & _nextHandler->first->_registered))
{
++_nextHandler;
}
@@ -1007,19 +864,6 @@ IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
if(_serialize)
{
_selector.disable(current._handler.get(), current.operation);
-
- // Make sure the handler isn't in the set of pending handlers (this can
- // for example occur if the handler is has more data and its added by
- // ThreadPool::update while we were processing IO).
- _pendingHandlers.erase(current._handler.get());
- }
- else if(current._handler->_hasMoreData && current._handler->_registered & SocketOperationRead)
- {
- if(_pendingHandlers.empty())
- {
- _workQueue->queue(interruptWorkItem); // Interrupt select()
- }
- _pendingHandlers.insert(current._handler.get());
}
}
@@ -1090,8 +934,8 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
if(current._handler->_started & current.operation)
{
- assert(!(current._handler->_ready & current.operation));
- current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready | current.operation);
+ assert(!(current._handler->_completed & current.operation));
+ current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed | current.operation);
current._handler->_started = static_cast<SocketOperation>(current._handler->_started & ~current.operation);
#ifndef ICE_OS_WINRT
@@ -1105,20 +949,26 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
+ Lock sync(*this);
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
return false;
}
}
- else if(!(current._handler->_ready & current.operation) && (current._handler->_registered & current.operation))
+ else if(!(current._handler->_completed & current.operation) && (current._handler->_registered & current.operation))
{
assert(!(current._handler->_started & current.operation));
- if(!current._handler->startAsync(current.operation))
+ if(current._handler->_ready & current.operation)
+ {
+ return true;
+ }
+ else if(!current._handler->startAsync(current.operation))
{
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
+ Lock sync(*this);
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
@@ -1133,8 +983,8 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
if(current._handler->_registered & current.operation)
{
- assert(current._handler->_ready & current.operation);
- current._handler->_ready = static_cast<SocketOperation>(current._handler->_ready & ~current.operation);
+ assert(current._handler->_completed & current.operation);
+ current._handler->_completed = static_cast<SocketOperation>(current._handler->_completed & ~current.operation);
return true;
}
else
@@ -1142,6 +992,7 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
+ Lock sync(*this);
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}
@@ -1152,10 +1003,14 @@ IceInternal::ThreadPool::startMessage(ThreadPoolCurrent& current)
void
IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
{
- if(current._handler->_registered & current.operation)
+ if(current._handler->_registered & current.operation && !current._handler->_finish)
{
- assert(!(current._handler->_ready & current.operation));
- if(!current._handler->startAsync(current.operation))
+ assert(!(current._handler->_completed & current.operation));
+ if(current._handler->_ready & current.operation)
+ {
+ _selector.completed(current._handler.get(), current.operation);
+ }
+ else if(!current._handler->startAsync(current.operation))
{
current._handler->_pending = static_cast<SocketOperation>(current._handler->_pending & ~current.operation);
}
@@ -1173,6 +1028,7 @@ IceInternal::ThreadPool::finishMessage(ThreadPoolCurrent& current)
if(!(current._handler->_pending & SocketOperationWaitForClose) && current._handler->_finish)
{
// There are no more pending async operations, it's time to call finish.
+ Lock sync(*this);
_workQueue->queue(new FinishedWorkItem(current._handler, false));
_selector.finish(current._handler.get());
}