diff options
-rw-r--r-- | cpp/CHANGES | 10 | ||||
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 71 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.cpp | 211 | ||||
-rw-r--r-- | cpp/src/Ice/ThreadPool.h | 6 | ||||
-rw-r--r-- | cpp/test/IceStorm/federation/Subscriber.cpp | 17 | ||||
-rw-r--r-- | cpp/test/IceStorm/single/Subscriber.cpp | 17 | ||||
-rw-r--r-- | java/CHANGES | 4 | ||||
-rw-r--r-- | java/src/IceInternal/Connection.java | 160 | ||||
-rw-r--r-- | java/src/IceInternal/ThreadPool.java | 38 |
9 files changed, 350 insertions, 184 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES index ad483c0ef25..c812e81888d 100644 --- a/cpp/CHANGES +++ b/cpp/CHANGES @@ -1,9 +1,13 @@ Changes since version 1.2.0 --------------------------- -- Fixed a race condition that could lead to a deadlock if - a thread tried to join with another thread from within - the destructor of a servant or class. +- Fixed a rare connection deadlock, that could happen if lots of long + messages are sent rapidly in parallel, using separate threads or + AMI. + +- Fixed a race condition that could lead to a deadlock if a thread + tried to join with another thread from within the destructor of a + servant or class. - Added support for IcePack and Glacier on Windows platforms. The IcePack Registry, IcePack Node and Glacier Starter programs diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 56c9f90245e..0bd263a26b6 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -996,19 +996,63 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa return; } - OutgoingAsyncPtr outAsync; + // + // We need a special handling for close connection messages. If we + // get a close connection message, we must *first* set the state + // to closed, and *then* promote a follower thread. Otherwise we get + // lots of bogus warnings about connections being lost. + // + if(messageType == closeConnectionMsg) + { + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + if(_state == StateClosed) + { + threadPool->promoteFollower(); + return; + } + try + { + traceHeader("received close connection", stream, _logger, _traceLevels); + if(_endpoint->datagram()) + { + if(_warn) + { + Warning out(_logger); + out << "ignoring close connection message for datagram connection:\n" + << _transceiver->toString(); + } + } + else + { + setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); + } + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } + + threadPool->promoteFollower(); + return; + } + + // + // For all other messages, we can promote a follower right away, + // without setting the state first, or holding the mutex lock. + // + threadPool->promoteFollower(); + + OutgoingAsyncPtr outAsync; Int invoke = 0; Int requestId = 0; { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - threadPool->promoteFollower(); - if(_state == StateClosed) { - IceUtil::ThreadControl::yield(); return; } @@ -1165,20 +1209,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa case closeConnectionMsg: { - traceHeader("received close connection", stream, _logger, _traceLevels); - if(_endpoint->datagram()) - { - if(_warn) - { - Warning out(_logger); - out << "ignoring close connection message for datagram connection:\n" - << _transceiver->toString(); - } - } - else - { - throw CloseConnectionException(__FILE__, __LINE__); - } + assert(false); // Message has special handling above. break; } @@ -1261,6 +1292,8 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa void IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) { + threadPool->promoteFollower(); + auto_ptr<LocalException> closeException; map<Int, Outgoing*> requests; @@ -1269,8 +1302,6 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - threadPool->promoteFollower(); - if(_state == StateActive || _state == StateClosing) { registerWithPool(); diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp index 98812026aa7..3bce62e5bc7 100644 --- a/cpp/src/Ice/ThreadPool.cpp +++ b/cpp/src/Ice/ThreadPool.cpp @@ -43,6 +43,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p _running(0), _inUse(0), _load(0), + _promote(true), _warnUdp(_instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0) { SOCKET fds[2]; @@ -116,7 +117,7 @@ IceInternal::ThreadPool::~ThreadPool() void IceInternal::ThreadPool::destroy() { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); assert(_handlerMap.empty()); assert(_changes.empty()); @@ -127,7 +128,7 @@ IceInternal::ThreadPool::destroy() void IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); _changes.push_back(make_pair(fd, handler)); setInterrupt(0); @@ -136,7 +137,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler) void IceInternal::ThreadPool::unregister(SOCKET fd) { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); assert(!_destroyed); _changes.push_back(make_pair(fd, EventHandlerPtr(0))); setInterrupt(0); @@ -147,37 +148,37 @@ IceInternal::ThreadPool::promoteFollower() { if(_sizeMax > 1) { - _promoteMutex.unlock(); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - { - IceUtil::Mutex::Lock sync(*this); + assert(!_promote); + _promote = true; + notify(); - if(!_destroyed) + if(!_destroyed) + { + assert(_inUse >= 0); + ++_inUse; + + if(_inUse == _sizeWarn) { - assert(_inUse >= 0); - ++_inUse; - - if(_inUse == _sizeWarn) + Warning out(_instance->logger()); + out << "thread pool `" << _prefix << "' is running low on threads\n" + << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; + } + + assert(_inUse <= _running); + if(_inUse < _sizeMax && _inUse == _running) + { + try { - Warning out(_instance->logger()); - out << "thread pool `" << _prefix << "' is running low on threads\n" - << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn; + IceUtil::ThreadPtr thread = new EventHandlerThread(this); + _threads.push_back(thread->start()); + ++_running; } - - assert(_inUse <= _running); - if(_inUse < _sizeMax && _inUse == _running) + catch(const IceUtil::Exception& ex) { - try - { - IceUtil::ThreadPtr thread = new EventHandlerThread(this); - _threads.push_back(thread->start()); - ++_running; - } - catch(const IceUtil::Exception& ex) - { - Error out(_instance->logger()); - out << "cannot create thread for `" << _prefix << "':\n" << ex; - } + Error out(_instance->logger()); + out << "cannot create thread for `" << _prefix << "':\n" << ex; } } } @@ -297,7 +298,14 @@ IceInternal::ThreadPool::run() if(_sizeMax > 1) { - _promoteMutex.lock(); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + while(!_promote) + { + wait(); + } + + _promote = false; } while(true) @@ -344,7 +352,7 @@ IceInternal::ThreadPool::run() bool shutdown = false; { - IceUtil::Mutex::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(FD_ISSET(_fdIntrRead, &fdSet)) { @@ -514,6 +522,12 @@ IceInternal::ThreadPool::run() promoteFollower(); factory->shutdown(); + + // + // No "continue", because we want shutdown to be done in + // its own thread from this pool. Therefore we called + // promoteFollower(). + // } else { @@ -527,7 +541,11 @@ IceInternal::ThreadPool::run() // try { - handler->finished(self); // "self" is faster than "this", as the reference count is not modified. + // + // "self" is faster than "this", as the reference + // count is not modified. + // + handler->finished(self); } catch(const LocalException& ex) { @@ -535,6 +553,13 @@ IceInternal::ThreadPool::run() out << "exception in `" << _prefix << "' while calling finished():\n" << ex << '\n' << handler->toString(); } + + // + // No "continue", because we want finished() to be + // called in its own thread from this pool. Note that + // this means that finished() must call + // promoteFollower(). + // } else { @@ -568,69 +593,92 @@ IceInternal::ThreadPool::run() } // - // "self" is faster than "this", as the reference - // count is not modified. + // Provide a new mesage to the handler. + // + try + { + // + // "self" is faster than "this", as the reference + // count is not modified. + // + handler->message(stream, self); + } + catch(const LocalException& ex) + { + Error out(_instance->logger()); + out << "exception in `" << _prefix << "' while calling finished():\n" + << ex << '\n' << handler->toString(); + } + + // + // No "continue", because we want message() to be + // called in its own thread from this pool. Note that + // this means that message() must call + // promoteFollower(). // - handler->message(stream, self); } } if(_sizeMax > 1) { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(!_destroyed) { - IceUtil::Mutex::Lock sync(*this); - - if(!_destroyed) + // + // First we reap threads that have been destroyed before. + // + int sz = static_cast<int>(_threads.size()); + assert(_running <= sz); + if(_running < sz) { - // - // First we reap threads that have been destroyed before. - // - int sz = static_cast<int>(_threads.size()); - assert(_running <= sz); - if(_running < sz) - { - vector<IceUtil::ThreadControl>::iterator start = - partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive)); + vector<IceUtil::ThreadControl>::iterator start = + partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive)); #if defined(_MSC_VER) && _MSC_VER <= 1200 // The mem_fun_ref below does not work with VC++ 6.0 - for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p) - { - p->join(); - } + for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p) + { + p->join(); + } #else - for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join)); + for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join)); #endif - _threads.erase(start, _threads.end()); - } - - // - // Now we check if this thread can be destroyed, based - // on a load factor. - // - const double loadFactor = 0.05; // TODO: Configurable? - const double oneMinusLoadFactor = 1 - loadFactor; - _load = _load * oneMinusLoadFactor + _inUse * loadFactor; - - if(_running > _size) + _threads.erase(start, _threads.end()); + } + + // + // Now we check if this thread can be destroyed, based + // on a load factor. + // + const double loadFactor = 0.05; // TODO: Configurable? + const double oneMinusLoadFactor = 1 - loadFactor; + _load = _load * oneMinusLoadFactor + _inUse * loadFactor; + + if(_running > _size) + { + int load = static_cast<int>(_load + 1); + if(load < _running) { - int load = static_cast<int>(_load + 1); - if(load < _running) - { - assert(_inUse > 0); - --_inUse; - - assert(_running > 0); - --_running; - - return false; - } - } + assert(_inUse > 0); + --_inUse; + + assert(_running > 0); + --_running; - assert(_inUse > 0); - --_inUse; + return false; + } } + + assert(_inUse > 0); + --_inUse; } - _promoteMutex.lock(); + + while(!_promote) + { + wait(); + } + + _promote = false; } } } @@ -771,7 +819,12 @@ IceInternal::ThreadPool::EventHandlerThread::run() // Promote a follower, but w/o modifying _inUse or creating // new threads. // - _pool->_promoteMutex.unlock(); + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get()); + assert(!_pool->_promote); + _pool->_promote = true; + _pool->notify(); + } } _pool = 0; // Break cyclic dependency. diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h index 35b445b667b..35b2a82977b 100644 --- a/cpp/src/Ice/ThreadPool.h +++ b/cpp/src/Ice/ThreadPool.h @@ -36,7 +36,7 @@ namespace IceInternal class BasicStream; -class ThreadPool : public IceUtil::Shared, public IceUtil::Mutex +class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex> { public: @@ -100,9 +100,9 @@ private: int _inUse; // Number of threads that are currently in use. double _load; // Current load in number of threads. - const bool _warnUdp; + bool _promote; - IceUtil::Mutex _promoteMutex; + const bool _warnUdp; }; } diff --git a/cpp/test/IceStorm/federation/Subscriber.cpp b/cpp/test/IceStorm/federation/Subscriber.cpp index 8c2dcb1151f..8a08d43ed36 100644 --- a/cpp/test/IceStorm/federation/Subscriber.cpp +++ b/cpp/test/IceStorm/federation/Subscriber.cpp @@ -15,12 +15,15 @@ #include <Ice/Ice.h> #include <IceStorm/IceStorm.h> #include <Event.h> -#include <fstream> #include <TestCommon.h> #ifdef _WIN32 # include <io.h> +#else +# include <sys/types.h> +# include <sys/stat.h> +# include <fcntl.h> #endif using namespace std; @@ -60,17 +63,23 @@ typedef IceUtil::Handle<EventI> EventIPtr; void createLock(const string& name) { - ofstream f(name.c_str()); +#ifdef _WIN32 + int ret = _open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL); +#else + int ret = open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL); +#endif + assert(ret != -1); } void deleteLock(const string& name) { #ifdef _WIN32 - _unlink(name.c_str()); + int ret = _unlink(name.c_str()); #else - unlink(name.c_str()); + int ret = unlink(name.c_str()); #endif + assert(ret != -1); } void diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp index d955e965340..0767819523d 100644 --- a/cpp/test/IceStorm/single/Subscriber.cpp +++ b/cpp/test/IceStorm/single/Subscriber.cpp @@ -15,10 +15,13 @@ #include <Ice/Ice.h> #include <IceStorm/IceStorm.h> #include <Single.h> -#include <fstream> #ifdef _WIN32 # include <io.h> +#else +# include <sys/types.h> +# include <sys/stat.h> +# include <fcntl.h> #endif using namespace std; @@ -53,17 +56,23 @@ private: void createLock(const string& name) { - ofstream f(name.c_str()); +#ifdef _WIN32 + int ret = _open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL); +#else + int ret = open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL); +#endif + assert(ret != -1); } void deleteLock(const string& name) { #ifdef _WIN32 - _unlink(name.c_str()); + int ret = _unlink(name.c_str()); #else - unlink(name.c_str()); + int ret = unlink(name.c_str()); #endif + assert(ret != -1); } int diff --git a/java/CHANGES b/java/CHANGES index a6de4791618..2c578505ad7 100644 --- a/java/CHANGES +++ b/java/CHANGES @@ -1,6 +1,10 @@ Changes since version 1.2.0 --------------------------- +- Fixed a rare connection deadlock, that could happen if lots of long + messages are sent rapidly in parallel, using separate threads or + AMI. + - Ported IcePack demo from C++. - Added new mechanism for generating Java code into packages. diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index c0344de79b1..9d7bc899cd1 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -837,19 +837,101 @@ public final class Connection extends EventHandler public void message(BasicStream stream, ThreadPool threadPool) { - OutgoingAsync outAsync = null; + byte messageType; + byte compress; + + // + // Read the message type, and uncompress the message if + // necessary. + // + try + { + assert(stream.pos() == stream.size()); + + // + // We don't need to check magic and version here. This has + // already been done by the ThreadPool, which provides us the + // stream. + // + + stream.pos(8); + messageType = stream.readByte(); + compress = stream.readByte(); + if(compress == (byte)2) + { + throw new Ice.CompressionNotSupportedException(); + } + + stream.pos(Protocol.headerSize); + } + catch(Ice.LocalException ex) + { + synchronized(this) + { + threadPool.promoteFollower(); + setState(StateClosed, ex); + return; + } + } + // + // We need a special handling for close connection + // messages. If we get a close connection message, we must + // *first* set the state to closed, and *then* promote a + // follower thread. Otherwise we get lots of bogus warnings + // about connections being lost. + // + if(messageType == Protocol.closeConnectionMsg) + { + synchronized(this) + { + if(_state == StateClosed) + { + threadPool.promoteFollower(); + return; + } + + try + { + TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels); + if(_endpoint.datagram()) + { + if(_warn) + { + _logger.warning("ignoring close connection message for datagram connection:\n" + + _transceiver.toString()); + } + } + else + { + setState(StateClosed, new Ice.CloseConnectionException()); + } + } + catch(Ice.LocalException ex) + { + setState(StateClosed, ex); + } + + threadPool.promoteFollower(); + return; + } + } + + // + // For all other messages, we can promote a follower right + // away, without setting the state first, or holding the mutex + // lock. + // + threadPool.promoteFollower(); + + OutgoingAsync outAsync = null; int invoke = 0; int requestId = 0; - byte compress = 0; synchronized(this) { - threadPool.promoteFollower(); - if(_state == StateClosed) { - Thread.yield(); return; } @@ -860,56 +942,6 @@ public final class Connection extends EventHandler try { - assert(stream.pos() == stream.size()); - stream.pos(0); - - byte[] m = new byte[4]; - m[0] = stream.readByte(); - m[1] = stream.readByte(); - m[2] = stream.readByte(); - m[3] = stream.readByte(); - if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1] - || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3]) - { - Ice.BadMagicException ex = new Ice.BadMagicException(); - ex.badMagic = m; - throw ex; - } - - byte pMajor = stream.readByte(); - byte pMinor = stream.readByte(); - if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor) - { - Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException(); - e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor; - e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor; - e.major = Protocol.protocolMajor; - e.minor = Protocol.protocolMinor; - throw e; - } - - byte eMajor = stream.readByte(); - byte eMinor = stream.readByte(); - if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor) - { - Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException(); - e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor; - e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor; - e.major = Protocol.encodingMajor; - e.minor = Protocol.encodingMinor; - throw e; - } - - byte messageType = stream.readByte(); - compress = stream.readByte(); - - if(compress == (byte)2) - { - throw new Ice.CompressionNotSupportedException(); - } - - stream.pos(Protocol.headerSize); - switch(messageType) { case Protocol.requestMsg: @@ -989,19 +1021,7 @@ public final class Connection extends EventHandler case Protocol.closeConnectionMsg: { - TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels); - if(_endpoint.datagram()) - { - if(_warn) - { - _logger.warning("ignoring close connection message for datagram connection:\n" + - _transceiver.toString()); - } - } - else - { - throw new Ice.CloseConnectionException(); - } + assert(false); // Message has special handling above. break; } @@ -1110,6 +1130,8 @@ public final class Connection extends EventHandler public void finished(ThreadPool threadPool) { + threadPool.promoteFollower(); + Ice.LocalException closeException = null; IntMap requests = null; @@ -1117,8 +1139,6 @@ public final class Connection extends EventHandler synchronized(this) { - threadPool.promoteFollower(); - if(_state == StateActive || _state == StateClosing) { registerWithPool(); diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 813c6ac8816..0256c015b0f 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -627,6 +627,12 @@ public final class ThreadPool promoteFollower(); factory.shutdown(); + + // + // No "continue", because we want shutdown to be done in + // its own thread from this pool. Therefore we called + // promoteFollower(). + // } else { @@ -650,6 +656,13 @@ public final class ThreadPool sw.toString() + "\n" + handler.toString(); _instance.logger().error(s); } + + // + // No "continue", because we want finished() to be + // called in its own thread from this pool. Note + // that this means that finished() must call + // promoteFollower(). + // } else { @@ -690,7 +703,30 @@ public final class ThreadPool assert(stream.pos() == stream.size()); } - handler.message(stream, this); + // + // Provide a new mesage to the handler. + // + try + { + handler.message(stream, this); + } + catch(Ice.LocalException ex) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + String s = "exception in `" + _prefix + "' while calling finished():\n" + + sw.toString() + "\n" + handler.toString(); + _instance.logger().error(s); + } + + // + // No "continue", because we want message() to + // be called in its own thread from this + // pool. Note that this means that message() + // must call promoteFollower(). + // } finally { |