summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/CHANGES10
-rw-r--r--cpp/src/Ice/Connection.cpp71
-rw-r--r--cpp/src/Ice/ThreadPool.cpp211
-rw-r--r--cpp/src/Ice/ThreadPool.h6
-rw-r--r--cpp/test/IceStorm/federation/Subscriber.cpp17
-rw-r--r--cpp/test/IceStorm/single/Subscriber.cpp17
-rw-r--r--java/CHANGES4
-rw-r--r--java/src/IceInternal/Connection.java160
-rw-r--r--java/src/IceInternal/ThreadPool.java38
9 files changed, 350 insertions, 184 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES
index ad483c0ef25..c812e81888d 100644
--- a/cpp/CHANGES
+++ b/cpp/CHANGES
@@ -1,9 +1,13 @@
Changes since version 1.2.0
---------------------------
-- Fixed a race condition that could lead to a deadlock if
- a thread tried to join with another thread from within
- the destructor of a servant or class.
+- Fixed a rare connection deadlock, that could happen if lots of long
+ messages are sent rapidly in parallel, using separate threads or
+ AMI.
+
+- Fixed a race condition that could lead to a deadlock if a thread
+ tried to join with another thread from within the destructor of a
+ servant or class.
- Added support for IcePack and Glacier on Windows platforms.
The IcePack Registry, IcePack Node and Glacier Starter programs
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 56c9f90245e..0bd263a26b6 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -996,19 +996,63 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
return;
}
- OutgoingAsyncPtr outAsync;
+ //
+ // We need a special handling for close connection messages. If we
+ // get a close connection message, we must *first* set the state
+ // to closed, and *then* promote a follower thread. Otherwise we get
+ // lots of bogus warnings about connections being lost.
+ //
+ if(messageType == closeConnectionMsg)
+ {
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
+
+ if(_state == StateClosed)
+ {
+ threadPool->promoteFollower();
+ return;
+ }
+ try
+ {
+ traceHeader("received close connection", stream, _logger, _traceLevels);
+ if(_endpoint->datagram())
+ {
+ if(_warn)
+ {
+ Warning out(_logger);
+ out << "ignoring close connection message for datagram connection:\n"
+ << _transceiver->toString();
+ }
+ }
+ else
+ {
+ setState(StateClosed, CloseConnectionException(__FILE__, __LINE__));
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ }
+
+ threadPool->promoteFollower();
+ return;
+ }
+
+ //
+ // For all other messages, we can promote a follower right away,
+ // without setting the state first, or holding the mutex lock.
+ //
+ threadPool->promoteFollower();
+
+ OutgoingAsyncPtr outAsync;
Int invoke = 0;
Int requestId = 0;
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
- threadPool->promoteFollower();
-
if(_state == StateClosed)
{
- IceUtil::ThreadControl::yield();
return;
}
@@ -1165,20 +1209,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
case closeConnectionMsg:
{
- traceHeader("received close connection", stream, _logger, _traceLevels);
- if(_endpoint->datagram())
- {
- if(_warn)
- {
- Warning out(_logger);
- out << "ignoring close connection message for datagram connection:\n"
- << _transceiver->toString();
- }
- }
- else
- {
- throw CloseConnectionException(__FILE__, __LINE__);
- }
+ assert(false); // Message has special handling above.
break;
}
@@ -1261,6 +1292,8 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
void
IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
+ threadPool->promoteFollower();
+
auto_ptr<LocalException> closeException;
map<Int, Outgoing*> requests;
@@ -1269,8 +1302,6 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
- threadPool->promoteFollower();
-
if(_state == StateActive || _state == StateClosing)
{
registerWithPool();
diff --git a/cpp/src/Ice/ThreadPool.cpp b/cpp/src/Ice/ThreadPool.cpp
index 98812026aa7..3bce62e5bc7 100644
--- a/cpp/src/Ice/ThreadPool.cpp
+++ b/cpp/src/Ice/ThreadPool.cpp
@@ -43,6 +43,7 @@ IceInternal::ThreadPool::ThreadPool(const InstancePtr& instance, const string& p
_running(0),
_inUse(0),
_load(0),
+ _promote(true),
_warnUdp(_instance->properties()->getPropertyAsInt("Ice.Warn.Datagrams") > 0)
{
SOCKET fds[2];
@@ -116,7 +117,7 @@ IceInternal::ThreadPool::~ThreadPool()
void
IceInternal::ThreadPool::destroy()
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
assert(_handlerMap.empty());
assert(_changes.empty());
@@ -127,7 +128,7 @@ IceInternal::ThreadPool::destroy()
void
IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, handler));
setInterrupt(0);
@@ -136,7 +137,7 @@ IceInternal::ThreadPool::_register(SOCKET fd, const EventHandlerPtr& handler)
void
IceInternal::ThreadPool::unregister(SOCKET fd)
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(!_destroyed);
_changes.push_back(make_pair(fd, EventHandlerPtr(0)));
setInterrupt(0);
@@ -147,37 +148,37 @@ IceInternal::ThreadPool::promoteFollower()
{
if(_sizeMax > 1)
{
- _promoteMutex.unlock();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- {
- IceUtil::Mutex::Lock sync(*this);
+ assert(!_promote);
+ _promote = true;
+ notify();
- if(!_destroyed)
+ if(!_destroyed)
+ {
+ assert(_inUse >= 0);
+ ++_inUse;
+
+ if(_inUse == _sizeWarn)
{
- assert(_inUse >= 0);
- ++_inUse;
-
- if(_inUse == _sizeWarn)
+ Warning out(_instance->logger());
+ out << "thread pool `" << _prefix << "' is running low on threads\n"
+ << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ }
+
+ assert(_inUse <= _running);
+ if(_inUse < _sizeMax && _inUse == _running)
+ {
+ try
{
- Warning out(_instance->logger());
- out << "thread pool `" << _prefix << "' is running low on threads\n"
- << "Size=" << _size << ", " << "SizeMax=" << _sizeMax << ", " << "SizeWarn=" << _sizeWarn;
+ IceUtil::ThreadPtr thread = new EventHandlerThread(this);
+ _threads.push_back(thread->start());
+ ++_running;
}
-
- assert(_inUse <= _running);
- if(_inUse < _sizeMax && _inUse == _running)
+ catch(const IceUtil::Exception& ex)
{
- try
- {
- IceUtil::ThreadPtr thread = new EventHandlerThread(this);
- _threads.push_back(thread->start());
- ++_running;
- }
- catch(const IceUtil::Exception& ex)
- {
- Error out(_instance->logger());
- out << "cannot create thread for `" << _prefix << "':\n" << ex;
- }
+ Error out(_instance->logger());
+ out << "cannot create thread for `" << _prefix << "':\n" << ex;
}
}
}
@@ -297,7 +298,14 @@ IceInternal::ThreadPool::run()
if(_sizeMax > 1)
{
- _promoteMutex.lock();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ while(!_promote)
+ {
+ wait();
+ }
+
+ _promote = false;
}
while(true)
@@ -344,7 +352,7 @@ IceInternal::ThreadPool::run()
bool shutdown = false;
{
- IceUtil::Mutex::Lock sync(*this);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
if(FD_ISSET(_fdIntrRead, &fdSet))
{
@@ -514,6 +522,12 @@ IceInternal::ThreadPool::run()
promoteFollower();
factory->shutdown();
+
+ //
+ // No "continue", because we want shutdown to be done in
+ // its own thread from this pool. Therefore we called
+ // promoteFollower().
+ //
}
else
{
@@ -527,7 +541,11 @@ IceInternal::ThreadPool::run()
//
try
{
- handler->finished(self); // "self" is faster than "this", as the reference count is not modified.
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->finished(self);
}
catch(const LocalException& ex)
{
@@ -535,6 +553,13 @@ IceInternal::ThreadPool::run()
out << "exception in `" << _prefix << "' while calling finished():\n"
<< ex << '\n' << handler->toString();
}
+
+ //
+ // No "continue", because we want finished() to be
+ // called in its own thread from this pool. Note that
+ // this means that finished() must call
+ // promoteFollower().
+ //
}
else
{
@@ -568,69 +593,92 @@ IceInternal::ThreadPool::run()
}
//
- // "self" is faster than "this", as the reference
- // count is not modified.
+ // Provide a new mesage to the handler.
+ //
+ try
+ {
+ //
+ // "self" is faster than "this", as the reference
+ // count is not modified.
+ //
+ handler->message(stream, self);
+ }
+ catch(const LocalException& ex)
+ {
+ Error out(_instance->logger());
+ out << "exception in `" << _prefix << "' while calling finished():\n"
+ << ex << '\n' << handler->toString();
+ }
+
+ //
+ // No "continue", because we want message() to be
+ // called in its own thread from this pool. Note that
+ // this means that message() must call
+ // promoteFollower().
//
- handler->message(stream, self);
}
}
if(_sizeMax > 1)
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ if(!_destroyed)
{
- IceUtil::Mutex::Lock sync(*this);
-
- if(!_destroyed)
+ //
+ // First we reap threads that have been destroyed before.
+ //
+ int sz = static_cast<int>(_threads.size());
+ assert(_running <= sz);
+ if(_running < sz)
{
- //
- // First we reap threads that have been destroyed before.
- //
- int sz = static_cast<int>(_threads.size());
- assert(_running <= sz);
- if(_running < sz)
- {
- vector<IceUtil::ThreadControl>::iterator start =
- partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive));
+ vector<IceUtil::ThreadControl>::iterator start =
+ partition(_threads.begin(), _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::isAlive));
#if defined(_MSC_VER) && _MSC_VER <= 1200 // The mem_fun_ref below does not work with VC++ 6.0
- for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p)
- {
- p->join();
- }
+ for(vector<IceUtil::ThreadControl>::iterator p = start; p != _threads.end(); ++p)
+ {
+ p->join();
+ }
#else
- for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join));
+ for_each(start, _threads.end(), mem_fun_ref(&IceUtil::ThreadControl::join));
#endif
- _threads.erase(start, _threads.end());
- }
-
- //
- // Now we check if this thread can be destroyed, based
- // on a load factor.
- //
- const double loadFactor = 0.05; // TODO: Configurable?
- const double oneMinusLoadFactor = 1 - loadFactor;
- _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
-
- if(_running > _size)
+ _threads.erase(start, _threads.end());
+ }
+
+ //
+ // Now we check if this thread can be destroyed, based
+ // on a load factor.
+ //
+ const double loadFactor = 0.05; // TODO: Configurable?
+ const double oneMinusLoadFactor = 1 - loadFactor;
+ _load = _load * oneMinusLoadFactor + _inUse * loadFactor;
+
+ if(_running > _size)
+ {
+ int load = static_cast<int>(_load + 1);
+ if(load < _running)
{
- int load = static_cast<int>(_load + 1);
- if(load < _running)
- {
- assert(_inUse > 0);
- --_inUse;
-
- assert(_running > 0);
- --_running;
-
- return false;
- }
- }
+ assert(_inUse > 0);
+ --_inUse;
+
+ assert(_running > 0);
+ --_running;
- assert(_inUse > 0);
- --_inUse;
+ return false;
+ }
}
+
+ assert(_inUse > 0);
+ --_inUse;
}
- _promoteMutex.lock();
+
+ while(!_promote)
+ {
+ wait();
+ }
+
+ _promote = false;
}
}
}
@@ -771,7 +819,12 @@ IceInternal::ThreadPool::EventHandlerThread::run()
// Promote a follower, but w/o modifying _inUse or creating
// new threads.
//
- _pool->_promoteMutex.unlock();
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*_pool.get());
+ assert(!_pool->_promote);
+ _pool->_promote = true;
+ _pool->notify();
+ }
}
_pool = 0; // Break cyclic dependency.
diff --git a/cpp/src/Ice/ThreadPool.h b/cpp/src/Ice/ThreadPool.h
index 35b445b667b..35b2a82977b 100644
--- a/cpp/src/Ice/ThreadPool.h
+++ b/cpp/src/Ice/ThreadPool.h
@@ -36,7 +36,7 @@ namespace IceInternal
class BasicStream;
-class ThreadPool : public IceUtil::Shared, public IceUtil::Mutex
+class ThreadPool : public IceUtil::Shared, public IceUtil::Monitor<IceUtil::Mutex>
{
public:
@@ -100,9 +100,9 @@ private:
int _inUse; // Number of threads that are currently in use.
double _load; // Current load in number of threads.
- const bool _warnUdp;
+ bool _promote;
- IceUtil::Mutex _promoteMutex;
+ const bool _warnUdp;
};
}
diff --git a/cpp/test/IceStorm/federation/Subscriber.cpp b/cpp/test/IceStorm/federation/Subscriber.cpp
index 8c2dcb1151f..8a08d43ed36 100644
--- a/cpp/test/IceStorm/federation/Subscriber.cpp
+++ b/cpp/test/IceStorm/federation/Subscriber.cpp
@@ -15,12 +15,15 @@
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Event.h>
-#include <fstream>
#include <TestCommon.h>
#ifdef _WIN32
# include <io.h>
+#else
+# include <sys/types.h>
+# include <sys/stat.h>
+# include <fcntl.h>
#endif
using namespace std;
@@ -60,17 +63,23 @@ typedef IceUtil::Handle<EventI> EventIPtr;
void
createLock(const string& name)
{
- ofstream f(name.c_str());
+#ifdef _WIN32
+ int ret = _open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL);
+#else
+ int ret = open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL);
+#endif
+ assert(ret != -1);
}
void
deleteLock(const string& name)
{
#ifdef _WIN32
- _unlink(name.c_str());
+ int ret = _unlink(name.c_str());
#else
- unlink(name.c_str());
+ int ret = unlink(name.c_str());
#endif
+ assert(ret != -1);
}
void
diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp
index d955e965340..0767819523d 100644
--- a/cpp/test/IceStorm/single/Subscriber.cpp
+++ b/cpp/test/IceStorm/single/Subscriber.cpp
@@ -15,10 +15,13 @@
#include <Ice/Ice.h>
#include <IceStorm/IceStorm.h>
#include <Single.h>
-#include <fstream>
#ifdef _WIN32
# include <io.h>
+#else
+# include <sys/types.h>
+# include <sys/stat.h>
+# include <fcntl.h>
#endif
using namespace std;
@@ -53,17 +56,23 @@ private:
void
createLock(const string& name)
{
- ofstream f(name.c_str());
+#ifdef _WIN32
+ int ret = _open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL);
+#else
+ int ret = open(name.c_str(), O_CREAT | O_WRONLY | O_EXCL);
+#endif
+ assert(ret != -1);
}
void
deleteLock(const string& name)
{
#ifdef _WIN32
- _unlink(name.c_str());
+ int ret = _unlink(name.c_str());
#else
- unlink(name.c_str());
+ int ret = unlink(name.c_str());
#endif
+ assert(ret != -1);
}
int
diff --git a/java/CHANGES b/java/CHANGES
index a6de4791618..2c578505ad7 100644
--- a/java/CHANGES
+++ b/java/CHANGES
@@ -1,6 +1,10 @@
Changes since version 1.2.0
---------------------------
+- Fixed a rare connection deadlock, that could happen if lots of long
+ messages are sent rapidly in parallel, using separate threads or
+ AMI.
+
- Ported IcePack demo from C++.
- Added new mechanism for generating Java code into packages.
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java
index c0344de79b1..9d7bc899cd1 100644
--- a/java/src/IceInternal/Connection.java
+++ b/java/src/IceInternal/Connection.java
@@ -837,19 +837,101 @@ public final class Connection extends EventHandler
public void
message(BasicStream stream, ThreadPool threadPool)
{
- OutgoingAsync outAsync = null;
+ byte messageType;
+ byte compress;
+
+ //
+ // Read the message type, and uncompress the message if
+ // necessary.
+ //
+ try
+ {
+ assert(stream.pos() == stream.size());
+
+ //
+ // We don't need to check magic and version here. This has
+ // already been done by the ThreadPool, which provides us the
+ // stream.
+ //
+
+ stream.pos(8);
+ messageType = stream.readByte();
+ compress = stream.readByte();
+ if(compress == (byte)2)
+ {
+ throw new Ice.CompressionNotSupportedException();
+ }
+
+ stream.pos(Protocol.headerSize);
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(this)
+ {
+ threadPool.promoteFollower();
+ setState(StateClosed, ex);
+ return;
+ }
+ }
+ //
+ // We need a special handling for close connection
+ // messages. If we get a close connection message, we must
+ // *first* set the state to closed, and *then* promote a
+ // follower thread. Otherwise we get lots of bogus warnings
+ // about connections being lost.
+ //
+ if(messageType == Protocol.closeConnectionMsg)
+ {
+ synchronized(this)
+ {
+ if(_state == StateClosed)
+ {
+ threadPool.promoteFollower();
+ return;
+ }
+
+ try
+ {
+ TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels);
+ if(_endpoint.datagram())
+ {
+ if(_warn)
+ {
+ _logger.warning("ignoring close connection message for datagram connection:\n" +
+ _transceiver.toString());
+ }
+ }
+ else
+ {
+ setState(StateClosed, new Ice.CloseConnectionException());
+ }
+ }
+ catch(Ice.LocalException ex)
+ {
+ setState(StateClosed, ex);
+ }
+
+ threadPool.promoteFollower();
+ return;
+ }
+ }
+
+ //
+ // For all other messages, we can promote a follower right
+ // away, without setting the state first, or holding the mutex
+ // lock.
+ //
+ threadPool.promoteFollower();
+
+ OutgoingAsync outAsync = null;
int invoke = 0;
int requestId = 0;
- byte compress = 0;
synchronized(this)
{
- threadPool.promoteFollower();
-
if(_state == StateClosed)
{
- Thread.yield();
return;
}
@@ -860,56 +942,6 @@ public final class Connection extends EventHandler
try
{
- assert(stream.pos() == stream.size());
- stream.pos(0);
-
- byte[] m = new byte[4];
- m[0] = stream.readByte();
- m[1] = stream.readByte();
- m[2] = stream.readByte();
- m[3] = stream.readByte();
- if(m[0] != Protocol.magic[0] || m[1] != Protocol.magic[1]
- || m[2] != Protocol.magic[2] || m[3] != Protocol.magic[3])
- {
- Ice.BadMagicException ex = new Ice.BadMagicException();
- ex.badMagic = m;
- throw ex;
- }
-
- byte pMajor = stream.readByte();
- byte pMinor = stream.readByte();
- if(pMajor != Protocol.protocolMajor || pMinor > Protocol.protocolMinor)
- {
- Ice.UnsupportedProtocolException e = new Ice.UnsupportedProtocolException();
- e.badMajor = pMajor < 0 ? pMajor + 255 : pMajor;
- e.badMinor = pMinor < 0 ? pMinor + 255 : pMinor;
- e.major = Protocol.protocolMajor;
- e.minor = Protocol.protocolMinor;
- throw e;
- }
-
- byte eMajor = stream.readByte();
- byte eMinor = stream.readByte();
- if(eMajor != Protocol.encodingMajor || eMinor > Protocol.encodingMinor)
- {
- Ice.UnsupportedEncodingException e = new Ice.UnsupportedEncodingException();
- e.badMajor = eMajor < 0 ? eMajor + 255 : eMajor;
- e.badMinor = eMinor < 0 ? eMinor + 255 : eMinor;
- e.major = Protocol.encodingMajor;
- e.minor = Protocol.encodingMinor;
- throw e;
- }
-
- byte messageType = stream.readByte();
- compress = stream.readByte();
-
- if(compress == (byte)2)
- {
- throw new Ice.CompressionNotSupportedException();
- }
-
- stream.pos(Protocol.headerSize);
-
switch(messageType)
{
case Protocol.requestMsg:
@@ -989,19 +1021,7 @@ public final class Connection extends EventHandler
case Protocol.closeConnectionMsg:
{
- TraceUtil.traceHeader("received close connection", stream, _logger, _traceLevels);
- if(_endpoint.datagram())
- {
- if(_warn)
- {
- _logger.warning("ignoring close connection message for datagram connection:\n" +
- _transceiver.toString());
- }
- }
- else
- {
- throw new Ice.CloseConnectionException();
- }
+ assert(false); // Message has special handling above.
break;
}
@@ -1110,6 +1130,8 @@ public final class Connection extends EventHandler
public void
finished(ThreadPool threadPool)
{
+ threadPool.promoteFollower();
+
Ice.LocalException closeException = null;
IntMap requests = null;
@@ -1117,8 +1139,6 @@ public final class Connection extends EventHandler
synchronized(this)
{
- threadPool.promoteFollower();
-
if(_state == StateActive || _state == StateClosing)
{
registerWithPool();
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 813c6ac8816..0256c015b0f 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -627,6 +627,12 @@ public final class ThreadPool
promoteFollower();
factory.shutdown();
+
+ //
+ // No "continue", because we want shutdown to be done in
+ // its own thread from this pool. Therefore we called
+ // promoteFollower().
+ //
}
else
{
@@ -650,6 +656,13 @@ public final class ThreadPool
sw.toString() + "\n" + handler.toString();
_instance.logger().error(s);
}
+
+ //
+ // No "continue", because we want finished() to be
+ // called in its own thread from this pool. Note
+ // that this means that finished() must call
+ // promoteFollower().
+ //
}
else
{
@@ -690,7 +703,30 @@ public final class ThreadPool
assert(stream.pos() == stream.size());
}
- handler.message(stream, this);
+ //
+ // Provide a new mesage to the handler.
+ //
+ try
+ {
+ handler.message(stream, this);
+ }
+ catch(Ice.LocalException ex)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ String s = "exception in `" + _prefix + "' while calling finished():\n" +
+ sw.toString() + "\n" + handler.toString();
+ _instance.logger().error(s);
+ }
+
+ //
+ // No "continue", because we want message() to
+ // be called in its own thread from this
+ // pool. Note that this means that message()
+ // must call promoteFollower().
+ //
}
finally
{