diff options
author | Benoit Foucher <benoit@zeroc.com> | 2012-12-12 12:24:27 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2012-12-12 12:24:27 +0100 |
commit | edd426f3667aadc58764ac83bfb12a9899e59176 (patch) | |
tree | c7c20eed4a11b792cedd763ce7035c952032c106 | |
parent | Android demo SSL certificate fixes (diff) | |
download | ice-edd426f3667aadc58764ac83bfb12a9899e59176.tar.bz2 ice-edd426f3667aadc58764ac83bfb12a9899e59176.tar.xz ice-edd426f3667aadc58764ac83bfb12a9899e59176.zip |
Fixed ICE-5129 - thread safety issue when updating thread observers
Fixed bug where "ice_invoke" was used instead of the operation name for blobject invocations.
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 2 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 33 | ||||
-rw-r--r-- | cs/src/Ice/Proxy.cs | 2 | ||||
-rw-r--r-- | cs/src/Ice/ThreadPool.cs | 14 | ||||
-rw-r--r-- | java/src/Ice/ObjectPrxHelperBase.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 118 |
6 files changed, 94 insertions, 77 deletions
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 64f8b238d70..5d930c0310b 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -535,7 +535,7 @@ IceProxy::Ice::Object::ice_invoke(const string& operation, vector<Byte>& outEncaps, const Context* context) { - InvocationObserver __observer(this, ice_invoke_name, context); + InvocationObserver __observer(this, operation, context); int __cnt = 0; while(true) { diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index d6114a45071..055174f3f84 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -640,7 +640,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) { if(current._handler) { - thread->setState(ThreadStateInUseForIO); try { current._handler->message(current); @@ -663,7 +662,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) } else if(select) { - thread->setState(ThreadStateIdle); try { _selector.select(handlers, _serverIdleTime); @@ -731,6 +729,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) current._handler = _nextHandler->first; current.operation = _nextHandler->second; ++_nextHandler; + thread->setState(ThreadStateInUseForIO); } else { @@ -753,6 +752,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) { _selector.startSelect(); select = true; + thread->setState(ThreadStateIdle); } } else if(_sizeMax > 1) @@ -834,17 +834,15 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) } } - assert(current._handler); + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + thread->setState(ThreadStateInUseForIO); + } + try { + assert(current._handler); current._handler->message(current); - - if(_sizeMax > 1 && current._ioCompleted) - { - Lock sync(*this); - assert(_inUse > 0); - --_inUse; - } } catch(ThreadPoolDestroyedException&) { @@ -868,6 +866,16 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) Error out(_instance->initializationData().logger); out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString(); } + + { + Lock sync(*this); + if(_sizeMax > 1 && current._ioCompleted) + { + assert(_inUse > 0); + --_inUse; + } + thread->setState(ThreadStateIdle); + } } #endif } @@ -875,13 +883,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread) bool IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current) { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called. current._thread->setState(ThreadStateInUseForUser); if(_sizeMax > 1) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); #if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT) --_inUseIO; @@ -1120,6 +1129,7 @@ IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPool void IceInternal::ThreadPool::EventHandlerThread::updateObserver() { + // Must be called with the thread pool mutex locked const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer; if(obsv) { @@ -1130,6 +1140,7 @@ IceInternal::ThreadPool::EventHandlerThread::updateObserver() void IceInternal::ThreadPool::EventHandlerThread::setState(Ice::Instrumentation::ThreadState s) { + // Must be called with the thread pool mutex locked if(_observer) { if(_state != s) diff --git a/cs/src/Ice/Proxy.cs b/cs/src/Ice/Proxy.cs index ff72abe575c..218973a6fe1 100644 --- a/cs/src/Ice/Proxy.cs +++ b/cs/src/Ice/Proxy.cs @@ -1422,7 +1422,7 @@ namespace Ice context = emptyContext_; } - InvocationObserver observer = IceInternal.ObserverHelper.get(this, __ice_invoke_name, context); + InvocationObserver observer = IceInternal.ObserverHelper.get(this, operation, context); int cnt__ = 0; try { diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs index 89819e11c34..7d6b79f6093 100644 --- a/cs/src/Ice/ThreadPool.cs +++ b/cs/src/Ice/ThreadPool.cs @@ -253,9 +253,17 @@ namespace IceInternal public void updateObservers() { - foreach(WorkerThread t in _threads) + _m.Lock(); + try + { + foreach(WorkerThread t in _threads) + { + t.updateObserver(); + } + } + finally { - t.updateObserver(); + _m.Unlock(); } } @@ -752,6 +760,7 @@ namespace IceInternal public void updateObserver() { + // Must be called with the thread pool mutex locked Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.initializationData().observer; if(obsv != null) { @@ -766,6 +775,7 @@ namespace IceInternal public void setState(Ice.Instrumentation.ThreadState s) { + // Must be called with the thread pool mutex locked if(_observer != null) { if(_state != s) diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java index d1fc853f6c1..db85242366a 100644 --- a/java/src/Ice/ObjectPrxHelperBase.java +++ b/java/src/Ice/ObjectPrxHelperBase.java @@ -872,7 +872,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable context = _emptyContext; } - final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, __ice_invoke_name, context); + final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, operation, context); int __cnt = 0; try { diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index e3eecdd817a..5c70893d61f 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -316,7 +316,6 @@ public final class ThreadPool { if(current._handler != null) { - thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); try { current._handler.message(current); @@ -334,7 +333,6 @@ public final class ThreadPool } else if(select) { - thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); try { _selector.select(_serverIdleTime); @@ -412,6 +410,7 @@ public final class ThreadPool current._ioCompleted = false; current._handler = _nextHandler.next(); current.operation = current._handler._ready; + thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); } else { @@ -434,6 +433,7 @@ public final class ThreadPool { _selector.startSelect(); select = true; + thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); } } else if(_sizeMax > 1) @@ -452,7 +452,7 @@ public final class ThreadPool } } - void + synchronized void ioCompleted(ThreadPoolCurrent current) { current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called. @@ -461,81 +461,75 @@ public final class ThreadPool if(_sizeMax > 1) { - synchronized(this) + --_inUseIO; + + if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) { - --_inUseIO; + _selector.hasMoreData(current._handler); + } + + if(_serialize && !_destroyed) + { + _selector.disable(current._handler, current.operation); + } - if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) - { - _selector.hasMoreData(current._handler); - } + if(current._leader) + { + // + // If this thread is still the leader, it's time to promote a new leader. + // + promoteFollower(current); + } + else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0)) + { + notify(); + } - if(_serialize && !_destroyed) - { - _selector.disable(current._handler, current.operation); - } + assert(_inUse >= 0); + ++_inUse; - if(current._leader) - { - // - // If this thread is still the leader, it's time to promote a new leader. - // - promoteFollower(current); - } - else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0)) - { - notify(); - } - - assert(_inUse >= 0); - ++_inUse; + if(_inUse == _sizeWarn) + { + String s = "thread pool `" + _prefix + "' is running low on threads\n" + + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; + _instance.initializationData().logger.warning(s); + } - if(_inUse == _sizeWarn) + if(!_destroyed) + { + assert(_inUse <= _threads.size()); + if(_inUse < _sizeMax && _inUse == _threads.size()) { - String s = "thread pool `" + _prefix + "' is running low on threads\n" - + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn; - _instance.initializationData().logger.warning(s); - } - - if(!_destroyed) - { - assert(_inUse <= _threads.size()); - if(_inUse < _sizeMax && _inUse == _threads.size()) + if(_instance.traceLevels().threadPool >= 1) { - if(_instance.traceLevels().threadPool >= 1) - { - String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1); - _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); - } + String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1); + _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s); + } - try + try + { + EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++); + _threads.add(thread); + if(_hasPriority) { - EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++); - _threads.add(thread); - if(_hasPriority) - { - thread.start(_priority); - } - else - { - thread.start(java.lang.Thread.NORM_PRIORITY); - } + thread.start(_priority); } - catch(RuntimeException ex) + else { - String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex); - _instance.initializationData().logger.error(s); + thread.start(java.lang.Thread.NORM_PRIORITY); } } + catch(RuntimeException ex) + { + String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex); + _instance.initializationData().logger.error(s); + } } } } else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) { - synchronized(this) - { - _selector.hasMoreData(current._handler); - } + _selector.hasMoreData(current._handler); } } @@ -627,6 +621,7 @@ public final class ThreadPool public void updateObserver() { + // Must be called with the thread pool mutex locked Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer; if(obsv != null) { @@ -638,9 +633,10 @@ public final class ThreadPool } } - void + public void setState(Ice.Instrumentation.ThreadState s) { + // Must be called with the thread pool mutex locked if(_observer != null) { if(_state != s) |