summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorMarc Laukien <marc@zeroc.com>2004-01-13 16:07:56 +0000
committerMarc Laukien <marc@zeroc.com>2004-01-13 16:07:56 +0000
commit7db65aa3d43b934793aa7cc03c5adbe1f1876a20 (patch)
treefac7ee47f9b4d85a70b99df9c5f00d879a366206 /cpp
parentminor cleanup (diff)
downloadice-7db65aa3d43b934793aa7cc03c5adbe1f1876a20.tar.bz2
ice-7db65aa3d43b934793aa7cc03c5adbe1f1876a20.tar.xz
ice-7db65aa3d43b934793aa7cc03c5adbe1f1876a20.zip
promotion fix
Diffstat (limited to 'cpp')
-rw-r--r--cpp/CHANGES10
-rw-r--r--cpp/src/Ice/Connection.cpp71
-rw-r--r--cpp/src/Ice/ThreadPool.cpp211
-rw-r--r--cpp/src/Ice/ThreadPool.h6
-rw-r--r--cpp/test/IceStorm/federation/Subscriber.cpp17
-rw-r--r--cpp/test/IceStorm/single/Subscriber.cpp17
6 files changed, 219 insertions, 113 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES
index ad483c0ef25..c812e81888d 100644
--- a/cpp/CHANGES
+++ b/cpp/CHANGES
@@ -1,9 +1,13 @@
Changes since version 1.2.0
---------------------------
-- Fixed a race condition that could lead to a deadlock if
- a thread tried to join with another thread from within
- the destructor of a servant or class.
+- Fixed a rare connection deadlock, that could happen if lots of long
+ messages are sent rapidly in parallel, using separate threads or
+ AMI.
+
+- Fixed a race condition that could lead to a deadlock if a thread
+ tried to join with another thread from within the destructor of a
+ servant or class.
- Added support for IcePack and Glacier on Windows platforms.
The IcePack Registry, IcePack Node and Glacier Starter programs
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 56c9f90245e..0bd263a26b6 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -996,19 +996,63 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
return;
}
- OutgoingAsyncPtr outAsync;
+ //
+ // We need a special handling for close connection messages. If we
+ // get a close connection message, we must *first* set the state
+ // to closed, and *then* promote a follower thread. Otherwise we get
+ // lots of bogus warnings about connections being lost.
+ //
+ if(messageType == closeConnectionMsg)
+ {
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ if(_state == StateClosed)
+ {
+ threadPool->promoteFollower();
+ return;
+ }
+ try
+ {
+ traceHeader("received close connection", stream, _logger, _traceLevels);
+ if(_endpoint->datagram())
+ {
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "ignoring close connection message for datagram connection:\n"
+ << _transceiver->toString();
+ }
+ }
+ else
+ {
+ setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+
+ threadPool->promoteFollower();
+ return;
+ }
+
+ //
+ // For all other messages, we can promote a follower right away,
+ // without setting the state first, or holding the mutex lock.
+ //
+ threadPool->promoteFollower();
+
+ OutgoingAsyncPtr outAsync;
Int invoke = 0;
Int requestId = 0;
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
- threadPool->promoteFollower();
-
if(_state == StateClosed)
{
- IceUtil::ThreadControl::yield();
return;
}
@@ -1165,20 +1209,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
case closeConnectionMsg:
{
- traceHeader("received close connection", stream, _logger, _traceLevels);
- if(_endpoint->datagram())
- {
- if(_warn)
- {
- Warning out(_logger);
- out << "ignoring close connection message for datagram connection:\n"
- << _transceiver->toString();
- }
- }
- else
- {
- throw CloseConnectionException(__FILE__, __LINE__);
- }
+ assert(false); // Message has special handling above.
break;
}
@@ -1261,6 +1292,8 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
void
IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
+ threadPool->promoteFollower();
+
auto_ptr<LocalException> closeException;
map<Int, Outgoing*> requests;
@@ -1269,8 +1302,6 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
- threadPool->promoteFollower();
-
if(_state == StateActive || _state == StateClosing)
{
registerWithPool();
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 98812026aa7..3bce62e5bc7 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -43,6 +43,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_running(0),
_inUse(0),
_load(0),
+ _promote(true),
_warnUdp(_instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
{
SOCKET fds[2];
@@ -116,7 +117,7 @@ IceInternal::ThreadPool::~ThreadPool()
void
IceInternal::ThreadPool::destroy()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
assert(_handlerMap.empty());
assert(_changes.empty());
@@ -127,7 +128,7 @@ IceInternal::ThreadPool::destroy()
void
IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, handler));
setInterrupt(0);
@@ -136,7 +137,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
void
IceInternal::ThreadPool::unregister(SOCKET fd)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, EventHandlerPtr(0)));
setInterrupt(0);
@@ -147,37 +148,37 @@ IceInternal::ThreadPool::promoteFollower()
{
if(_sizeMax > 1)
{
- _promoteMutex.unlock();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- {
- IceUtil::Mutex::Lock sync(*this);
+ assert(!_promote);
+ _promote = true;
+ notify();
- if(!_destroyed)
+ if(!_destroyed)
+ {
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
{
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
+ Warning out(_instance->logger());
+ out << "thread pool `" << _prefix << "' is running low on threads\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ }
+
+ assert(_inUse <= _running);
+ if(_inUse < _sizeMax && _inUse == _running)
+ {
+ try
{
- Warning out(_instance->logger());
- out << "thread pool `" << _prefix << "' is running low on threads\n"
- << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ _threads.push_back(thread->start());
+ ++_running;
}
-
- assert(_inUse <= _running);
- if(_inUse < _sizeMax && _inUse == _running)
+ catch(const IceUtil::Exception& ex)
{
- try
- {
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
- _threads.push_back(thread->start());
- ++_running;
- }
- catch(const IceUtil::Exception& ex)
- {
- Error out(_instance->logger());
- out << "cannot create thread for `" << _prefix << "':\n" << ex;
- }
+ Error out(_instance->logger());
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
}
}
}
@@ -297,7 +298,14 @@ IceInternal::ThreadPool::run()
if(_sizeMax > 1)
{
- _promoteMutex.lock();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ while(!_promote)
+ {
+ wait();
+ }
+
+ _promote = false;
}
while(true)
@@ -344,7 +352,7 @@ IceInternal::ThreadPool::run()
bool shutdown = false;
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(FD_ISSET(_fdIntrRead, &fdSet))
{
@@ -514,6 +522,12 @@ IceInternal::ThreadPool::run()
promoteFollower();
factory->shutdown();
+
+ //
+ // No "continue", because we want shutdown to be done in
+ // its own thread from this pool. Therefore we called
+ // promoteFollower().
+ //
}
else
{
@@ -527,7 +541,11 @@ IceInternal::ThreadPool::run()
//
try
{
- handler->finished(self); // "self" is faster than "this", as the reference count is not modified.
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->finished(self);
}
catch(const LocalException& ex)
{
@@ -535,6 +553,13 @@ IceInternal::ThreadPool::run()
out << "exception in `" << _prefix << "' while calling finished():\n"
<< ex << '\n' << handler->toString();
}
+
+ //
+ // No "continue", because we want finished() to be
+ // called in its own thread from this pool. Note that
+ // this means that finished() must call
+ // promoteFollower().
+ //
}
else
{
@@ -568,69 +593,92 @@ IceInternal::ThreadPool::run()
}
//
- // "self" is faster than "this", as the reference
- // count is not modified.
+ // Provide a new mesage to the handler.
+ //
+ try
+ {
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->message(stream, self);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->logger());
+ out << "exception in `" << _prefix << "' while calling finished():\n"
+ << ex << '\n' << handler->toString();
+ }
+
+ //
+ // No "continue", because we want message() to be
+ // called in its own thread from this pool. Note that
+ // this means that message() must call
+ // promoteFollower().
//
- handler->message(stream, self);
}
}
if(_sizeMax > 1)
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(!_destroyed)
{
- IceUtil::Mutex::Lock sync(*this);
-
- if(!_destroyed)
+ //
+ // First we reap threads that have been destroyed before.
+ //
+ int sz = static_cast<int>(_threads.size());
+ assert(_running <= sz);
+ if(_running < sz)
{
- //
- // First we reap threads that have been destroyed before.
- //
- int sz = static_cast<int>(_threads.size());
- assert(_running <= sz);
- if(_running < sz)
- {
- vector<IceUtil::ThreadControl>::iterator start =
- partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive));
+ vector<IceUtil::ThreadControl>::iterator start =
+ partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive));
#if defined(_MSC_VER) && _MSC_VER <= 1200 // The mem_fun_ref below does not work with VC++ 6.0
- for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p)
- {
- p->join();
- }
+ for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p)
+ {
+ p->join();
+ }
#else
- for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join));
+ for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join));
#endif
- _threads.erase(start, _threads.end());
- }
-
- //
- // Now we check if this thread can be destroyed, based
- // on a load factor.
- //
- const double loadFactor = 0.05; // TODO: Configurable?
- const double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
-
- if(_running > _size)
+ _threads.erase(start, _threads.end());
+ }
+
+ //
+ // Now we check if this thread can be destroyed, based
+ // on a load factor.
+ //
+ const double loadFactor = 0.05; // TODO: Configurable?
+ const double oneMinusLoadFactor = 1 - loadFactor;
+ _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
+
+ if(_running > _size)
+ {
+ int load = static_cast<int>(_load + 1);
+ if(load < _running)
{
- int load = static_cast<int>(_load + 1);
- if(load < _running)
- {
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
- return false;
- }
- }
+ assert(_inUse > 0);
+ --_inUse;
+
+ assert(_running > 0);
+ --_running;
- assert(_inUse > 0);
- --_inUse;
+ return false;
+ }
}
+
+ assert(_inUse > 0);
+ --_inUse;
}
- _promoteMutex.lock();
+
+ while(!_promote)
+ {
+ wait();
+ }
+
+ _promote = false;
}
}
}
@@ -771,7 +819,12 @@ IceInternal::ThreadPool::EventHandlerThread::run()
// Promote a follower, but w/o modifying _inUse or creating
// new threads.
//
- _pool->_promoteMutex.unlock();
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get());
+ assert(!_pool->_promote);
+ _pool->_promote = true;
+ _pool->notify();
+ }
}
_pool = 0; // Break cyclic dependency.
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 35b445b667b..35b2a82977b 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -36,7 +36,7 @@ namespace IceInternal
class BasicStream;
-class ThreadPool : public IceUtil::Shared, public IceUtil::Mutex
+class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex>
{
public:
@@ -100,9 +100,9 @@ private:
int _inUse; // Number of threads that are currently in use.
double _load; // Current load in number of threads.
- const bool _warnUdp;
+ bool _promote;
- IceUtil::Mutex _promoteMutex;
+ const bool _warnUdp;
};
}
diff --git a/cpp/test/IceStorm/federation/Subscriber.cpp b/cpp/test/IceStorm/federation/Subscriber.cpp
index 8c2dcb1151f..8a08d43ed36 100644
--- a/cpp/test/IceStorm/federation/Subscriber.cpp
+++ b/cpp/test/IceStorm/federation/Subscriber.cpp
@@ -15,12 +15,15 @@
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Event.h>
-#include <fstream>
#include <TestCommon.h>
#ifdef _WIN32
# include <io.h>
+#else
+# include <sys/types.h>
+# include <sys/stat.h>
+# include <fcntl.h>
#endif
using namespace std;
@@ -60,17 +63,23 @@ typedef IceUtil::Handle<EventI> EventIPtr;
void
createLock(const string& name)
{
- ofstream f(name.c_str());
+#ifdef _WIN32
+ int ret = _open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL);
+#else
+ int ret = open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL);
+#endif
+ assert(ret != -1);
}
void
deleteLock(const string& name)
{
#ifdef _WIN32
- _unlink(name.c_str());
+ int ret = _unlink(name.c_str());
#else
- unlink(name.c_str());
+ int ret = unlink(name.c_str());
#endif
+ assert(ret != -1);
}
void
diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp
index d955e965340..0767819523d 100644
--- a/cpp/test/IceStorm/single/Subscriber.cpp
+++ b/cpp/test/IceStorm/single/Subscriber.cpp
@@ -15,10 +15,13 @@
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Single.h>
-#include <fstream>
#ifdef _WIN32
# include <io.h>
+#else
+# include <sys/types.h>
+# include <sys/stat.h>
+# include <fcntl.h>
#endif
using namespace std;
@@ -53,17 +56,23 @@ private:
void
createLock(const string& name)
{
- ofstream f(name.c_str());
+#ifdef _WIN32
+ int ret = _open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL);
+#else
+ int ret = open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL);
+#endif
+ assert(ret != -1);
}
void
deleteLock(const string& name)
{
#ifdef _WIN32
- _unlink(name.c_str());
+ int ret = _unlink(name.c_str());
#else
- unlink(name.c_str());
+ int ret = unlink(name.c_str());
#endif
+ assert(ret != -1);
}
int