summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2012-12-12 12:24:27 +0100
committerBenoit Foucher <benoit@zeroc.com>2012-12-12 12:24:27 +0100
commitedd426f3667aadc58764ac83bfb12a9899e59176 (patch)
treec7c20eed4a11b792cedd763ce7035c952032c106
parentAndroid demo SSL certificate fixes (diff)
downloadice-edd426f3667aadc58764ac83bfb12a9899e59176.tar.bz2
ice-edd426f3667aadc58764ac83bfb12a9899e59176.tar.xz
ice-edd426f3667aadc58764ac83bfb12a9899e59176.zip
Fixed ICE-5129 - thread safety issue when updating thread observers
Fixed bug where "ice_invoke" was used instead of the operation name for blobject invocations.
-rw-r--r--cpp/src/Ice/Proxy.cpp2
-rw-r--r--cpp/src/Ice/ThreadPool.cpp33
-rw-r--r--cs/src/Ice/Proxy.cs2
-rw-r--r--cs/src/Ice/ThreadPool.cs14
-rw-r--r--java/src/Ice/ObjectPrxHelperBase.java2
-rw-r--r--java/src/IceInternal/ThreadPool.java118
6 files changed, 94 insertions, 77 deletions
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp
index 64f8b238d70..5d930c0310b 100644
--- a/cpp/src/Ice/Proxy.cpp
+++ b/cpp/src/Ice/Proxy.cpp
@@ -535,7 +535,7 @@ IceProxy::Ice::Object::ice_invoke(const string& operation,
vector<Byte>& outEncaps,
const Context* context)
{
- InvocationObserver __observer(this, ice_invoke_name, context);
+ InvocationObserver __observer(this, operation, context);
int __cnt = 0;
while(true)
{
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index d6114a45071..055174f3f84 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -640,7 +640,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
{
if(current._handler)
{
- thread->setState(ThreadStateInUseForIO);
try
{
current._handler->message(current);
@@ -663,7 +662,6 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
}
else if(select)
{
- thread->setState(ThreadStateIdle);
try
{
_selector.select(handlers, _serverIdleTime);
@@ -731,6 +729,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
current._handler = _nextHandler->first;
current.operation = _nextHandler->second;
++_nextHandler;
+ thread->setState(ThreadStateInUseForIO);
}
else
{
@@ -753,6 +752,7 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
{
_selector.startSelect();
select = true;
+ thread->setState(ThreadStateIdle);
}
}
else if(_sizeMax > 1)
@@ -834,17 +834,15 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
}
}
- assert(current._handler);
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ thread->setState(ThreadStateInUseForIO);
+ }
+
try
{
+ assert(current._handler);
current._handler->message(current);
-
- if(_sizeMax > 1 && current._ioCompleted)
- {
- Lock sync(*this);
- assert(_inUse > 0);
- --_inUse;
- }
}
catch(ThreadPoolDestroyedException&)
{
@@ -868,6 +866,16 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
Error out(_instance->initializationData().logger);
out << "exception in `" << _prefix << "':\nevent handler: " << current._handler->toString();
}
+
+ {
+ Lock sync(*this);
+ if(_sizeMax > 1 && current._ioCompleted)
+ {
+ assert(_inUse > 0);
+ --_inUse;
+ }
+ thread->setState(ThreadStateIdle);
+ }
}
#endif
}
@@ -875,13 +883,14 @@ IceInternal::ThreadPool::run(const EventHandlerThreadPtr& thread)
bool
IceInternal::ThreadPool::ioCompleted(ThreadPoolCurrent& current)
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
current._ioCompleted = true; // Set the IO completed flag to specifiy that ioCompleted() has been called.
current._thread->setState(ThreadStateInUseForUser);
if(_sizeMax > 1)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
#if !defined(ICE_USE_IOCP) && !defined(ICE_OS_WINRT)
--_inUseIO;
@@ -1120,6 +1129,7 @@ IceInternal::ThreadPool::EventHandlerThread::EventHandlerThread(const ThreadPool
void
IceInternal::ThreadPool::EventHandlerThread::updateObserver()
{
+ // Must be called with the thread pool mutex locked
const CommunicatorObserverPtr& obsv = _pool->_instance->initializationData().observer;
if(obsv)
{
@@ -1130,6 +1140,7 @@ IceInternal::ThreadPool::EventHandlerThread::updateObserver()
void
IceInternal::ThreadPool::EventHandlerThread::setState(Ice::Instrumentation::ThreadState s)
{
+ // Must be called with the thread pool mutex locked
if(_observer)
{
if(_state != s)
diff --git a/cs/src/Ice/Proxy.cs b/cs/src/Ice/Proxy.cs
index ff72abe575c..218973a6fe1 100644
--- a/cs/src/Ice/Proxy.cs
+++ b/cs/src/Ice/Proxy.cs
@@ -1422,7 +1422,7 @@ namespace Ice
context = emptyContext_;
}
- InvocationObserver observer = IceInternal.ObserverHelper.get(this, __ice_invoke_name, context);
+ InvocationObserver observer = IceInternal.ObserverHelper.get(this, operation, context);
int cnt__ = 0;
try
{
diff --git a/cs/src/Ice/ThreadPool.cs b/cs/src/Ice/ThreadPool.cs
index 89819e11c34..7d6b79f6093 100644
--- a/cs/src/Ice/ThreadPool.cs
+++ b/cs/src/Ice/ThreadPool.cs
@@ -253,9 +253,17 @@ namespace IceInternal
public void updateObservers()
{
- foreach(WorkerThread t in _threads)
+ _m.Lock();
+ try
+ {
+ foreach(WorkerThread t in _threads)
+ {
+ t.updateObserver();
+ }
+ }
+ finally
{
- t.updateObserver();
+ _m.Unlock();
}
}
@@ -752,6 +760,7 @@ namespace IceInternal
public void updateObserver()
{
+ // Must be called with the thread pool mutex locked
Ice.Instrumentation.CommunicatorObserver obsv = _threadPool._instance.initializationData().observer;
if(obsv != null)
{
@@ -766,6 +775,7 @@ namespace IceInternal
public void
setState(Ice.Instrumentation.ThreadState s)
{
+ // Must be called with the thread pool mutex locked
if(_observer != null)
{
if(_state != s)
diff --git a/java/src/Ice/ObjectPrxHelperBase.java b/java/src/Ice/ObjectPrxHelperBase.java
index d1fc853f6c1..db85242366a 100644
--- a/java/src/Ice/ObjectPrxHelperBase.java
+++ b/java/src/Ice/ObjectPrxHelperBase.java
@@ -872,7 +872,7 @@ public class ObjectPrxHelperBase implements ObjectPrx, java.io.Serializable
context = _emptyContext;
}
- final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, __ice_invoke_name, context);
+ final InvocationObserver __observer = IceInternal.ObserverHelper.get(this, operation, context);
int __cnt = 0;
try
{
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index e3eecdd817a..5c70893d61f 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -316,7 +316,6 @@ public final class ThreadPool
{
if(current._handler != null)
{
- thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO);
try
{
current._handler.message(current);
@@ -334,7 +333,6 @@ public final class ThreadPool
}
else if(select)
{
- thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
try
{
_selector.select(_serverIdleTime);
@@ -412,6 +410,7 @@ public final class ThreadPool
current._ioCompleted = false;
current._handler = _nextHandler.next();
current.operation = current._handler._ready;
+ thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO);
}
else
{
@@ -434,6 +433,7 @@ public final class ThreadPool
{
_selector.startSelect();
select = true;
+ thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle);
}
}
else if(_sizeMax > 1)
@@ -452,7 +452,7 @@ public final class ThreadPool
}
}
- void
+ synchronized void
ioCompleted(ThreadPoolCurrent current)
{
current._ioCompleted = true; // Set the IO completed flag to specify that ioCompleted() has been called.
@@ -461,81 +461,75 @@ public final class ThreadPool
if(_sizeMax > 1)
{
- synchronized(this)
+ --_inUseIO;
+
+ if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
{
- --_inUseIO;
+ _selector.hasMoreData(current._handler);
+ }
+
+ if(_serialize && !_destroyed)
+ {
+ _selector.disable(current._handler, current.operation);
+ }
- if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
- {
- _selector.hasMoreData(current._handler);
- }
+ if(current._leader)
+ {
+ //
+ // If this thread is still the leader, it's time to promote a new leader.
+ //
+ promoteFollower(current);
+ }
+ else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0))
+ {
+ notify();
+ }
- if(_serialize && !_destroyed)
- {
- _selector.disable(current._handler, current.operation);
- }
+ assert(_inUse >= 0);
+ ++_inUse;
- if(current._leader)
- {
- //
- // If this thread is still the leader, it's time to promote a new leader.
- //
- promoteFollower(current);
- }
- else if(_promote && (_nextHandler.hasNext() || _inUseIO == 0))
- {
- notify();
- }
-
- assert(_inUse >= 0);
- ++_inUse;
+ if(_inUse == _sizeWarn)
+ {
+ String s = "thread pool `" + _prefix + "' is running low on threads\n"
+ + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
+ _instance.initializationData().logger.warning(s);
+ }
- if(_inUse == _sizeWarn)
+ if(!_destroyed)
+ {
+ assert(_inUse <= _threads.size());
+ if(_inUse < _sizeMax && _inUse == _threads.size())
{
- String s = "thread pool `" + _prefix + "' is running low on threads\n"
- + "Size=" + _size + ", " + "SizeMax=" + _sizeMax + ", " + "SizeWarn=" + _sizeWarn;
- _instance.initializationData().logger.warning(s);
- }
-
- if(!_destroyed)
- {
- assert(_inUse <= _threads.size());
- if(_inUse < _sizeMax && _inUse == _threads.size())
+ if(_instance.traceLevels().threadPool >= 1)
{
- if(_instance.traceLevels().threadPool >= 1)
- {
- String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1);
- _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
- }
+ String s = "growing " + _prefix + ": Size=" + (_threads.size() + 1);
+ _instance.initializationData().logger.trace(_instance.traceLevels().threadPoolCat, s);
+ }
- try
+ try
+ {
+ EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
+ _threads.add(thread);
+ if(_hasPriority)
{
- EventHandlerThread thread = new EventHandlerThread(_threadPrefix + "-" + _threadIndex++);
- _threads.add(thread);
- if(_hasPriority)
- {
- thread.start(_priority);
- }
- else
- {
- thread.start(java.lang.Thread.NORM_PRIORITY);
- }
+ thread.start(_priority);
}
- catch(RuntimeException ex)
+ else
{
- String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
- _instance.initializationData().logger.error(s);
+ thread.start(java.lang.Thread.NORM_PRIORITY);
}
}
+ catch(RuntimeException ex)
+ {
+ String s = "cannot create thread for `" + _prefix + "':\n" + Ex.toString(ex);
+ _instance.initializationData().logger.error(s);
+ }
}
}
}
else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData())
{
- synchronized(this)
- {
- _selector.hasMoreData(current._handler);
- }
+ _selector.hasMoreData(current._handler);
}
}
@@ -627,6 +621,7 @@ public final class ThreadPool
public void
updateObserver()
{
+ // Must be called with the thread pool mutex locked
Ice.Instrumentation.CommunicatorObserver obsv = _instance.initializationData().observer;
if(obsv != null)
{
@@ -638,9 +633,10 @@ public final class ThreadPool
}
}
- void
+ public void
setState(Ice.Instrumentation.ThreadState s)
{
+ // Must be called with the thread pool mutex locked
if(_observer != null)
{
if(_state != s)