summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/include/Ice/OutgoingAsync.h6
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp4
-rw-r--r--cpp/src/Ice/ConnectionI.cpp109
-rw-r--r--cpp/src/Ice/ConnectionI.h2
-rw-r--r--cpp/src/Ice/Instance.cpp8
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp83
-rw-r--r--cs/src/Ice/CollocatedRequestHandler.cs6
-rw-r--r--cs/src/Ice/ConnectionI.cs128
-rw-r--r--cs/src/Ice/Instance.cs7
-rw-r--r--cs/src/Ice/OutgoingAsync.cs100
-rw-r--r--java/src/Ice/ConnectionI.java93
-rw-r--r--java/src/IceInternal/AsyncResultI.java2
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java17
-rw-r--r--java/src/IceInternal/OutgoingAsync.java105
14 files changed, 417 insertions, 253 deletions
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h
index 13185c386b4..b7a14f2dd39 100644
--- a/cpp/include/Ice/OutgoingAsync.h
+++ b/cpp/include/Ice/OutgoingAsync.h
@@ -113,6 +113,7 @@ public:
bool __wait();
void __throwUserException();
virtual void __invokeExceptionAsync(const Exception&);
+ void __invokeCompleted();
static void __check(const AsyncResultPtr&, const ::IceProxy::Ice::Object*, const ::std::string&);
static void __check(const AsyncResultPtr&, const Connection*, const ::std::string&);
@@ -143,7 +144,6 @@ protected:
const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
void __invokeSentAsync();
- void __invokeCompleted();
void runTimerTask(); // Implementation of TimerTask::runTimerTask()
@@ -257,7 +257,7 @@ public:
virtual void __finished(const Ice::Exception&);
virtual void __invokeExceptionAsync(const Ice::Exception&);
- void __finished();
+ bool __finished();
bool __invoke(bool);
BasicStream* __startWriteParams(Ice::FormatType format)
@@ -297,7 +297,7 @@ protected:
private:
- bool handleException(const Ice::Exception&);
+ void handleException(const Ice::Exception&);
RequestHandlerPtr _handler;
Ice::EncodingVersion _encoding;
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 452511591a5..95c151ff6e2 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -527,9 +527,9 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte)
}
}
- if(outAsync)
+ if(outAsync && outAsync->__finished())
{
- outAsync->__finished();
+ outAsync->__invokeCompleted();
}
_adapter->decDirectCount();
}
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index b42b8558a7f..fcf05ef9db0 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1601,6 +1601,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
ObjectAdapterPtr adapter;
OutgoingAsyncPtr outAsync;
ConnectionCallbackPtr heartbeatCallback;
+ int dispatchCount = 0;
ThreadPoolMessage<ConnectionI> msg(current, *this);
@@ -1760,7 +1761,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
swap(_startCallback, startCB);
if(startCB)
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
}
@@ -1781,7 +1782,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
servantManager,
adapter,
outAsync,
- heartbeatCallback));
+ heartbeatCallback,
+ dispatchCount));
}
if(readyOp & SocketOperationWrite)
@@ -1789,7 +1791,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
newOp = static_cast<SocketOperation>(newOp | sendNextMessage(sentCBs));
if(!sentCBs.empty())
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
@@ -1801,9 +1803,23 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
if(!readyOp)
{
+ assert(dispatchCount == 0);
return;
}
}
+
+ if(_acmLastActivity != IceUtil::Time())
+ {
+ _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ }
+
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
+
+ _dispatchCount += dispatchCount;
+ io.completed();
}
catch(const DatagramLimitException&) // Expected.
{
@@ -1841,11 +1857,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
}
return;
}
- if(_acmLastActivity != IceUtil::Time())
- {
- _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic);
- }
- io.completed();
}
if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher.
@@ -1867,7 +1878,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync,
const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream)
{
- int count = 0;
+ int dispatchedCount = 0;
//
// Notify the factory that the connection establishment and
@@ -1876,7 +1887,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
if(startCB)
{
startCB->connectionStartCompleted(this);
- ++count;
+ ++dispatchedCount;
}
//
@@ -1893,13 +1904,17 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
}
if(p->receivedReply)
{
- OutgoingAsyncPtr::dynamicCast(p->outAsync)->__finished();
+ OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync);
+ if(outAsync->__finished())
+ {
+ outAsync->__invokeCompleted();
+ }
}
#else
p->outAsync->__invokeSent();
#endif
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1908,8 +1923,8 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
if(outAsync)
{
- outAsync->__finished();
- ++count;
+ outAsync->__invokeCompleted();
+ ++dispatchedCount;
}
if(heartbeatCallback)
@@ -1928,7 +1943,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
Error out(_instance->initializationData().logger);
out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc;
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1949,10 +1964,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess
//
// Decrease dispatch count.
//
- if(count > 0)
+ if(dispatchedCount > 0)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- _dispatchCount -= count;
+ _dispatchCount -= dispatchedCount;
if(_dispatchCount == 0)
{
//
@@ -2046,7 +2061,11 @@ Ice::ConnectionI::finish()
}
if(message->receivedReply)
{
- OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished();
+ OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync);
+ if(outAsync->__finished())
+ {
+ outAsync->__invokeCompleted();
+ }
}
_sendStreams.pop_front();
}
@@ -2896,27 +2915,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks)
_observer.finishWrite(_writeStream);
}
}
+
+ //
+ // If all the messages were sent and we are in the closing state, we schedule
+ // the close timeout to wait for the peer to close the connection.
+ //
+ if(_state == StateClosing && _shutdownInitiated)
+ {
+ setState(StateClosingPending);
+ SocketOperation op = _transceiver->closing(true, *_exception.get());
+ if(op)
+ {
+ return op;
+ }
+ }
}
catch(const Ice::LocalException& ex)
{
setState(StateClosed, ex);
- return SocketOperationNone;
- }
-
- //
- // If all the messages were sent and we are in the closing state, we schedule
- // the close timeout to wait for the peer to close the connection.
- //
- if(_state == StateClosing && _dispatchCount == 0)
- {
- setState(StateClosingPending);
- SocketOperation op = _transceiver->closing(true, *_exception.get());
- if(op)
- {
- return op;
- }
}
-
return SocketOperationNone;
}
@@ -3215,7 +3232,8 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
SocketOperation
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback)
+ OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
+ int& dispatchCount)
{
assert(_state > StateNotValidated && _state < StateClosed);
@@ -3301,7 +3319,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
invokeNum = 1;
servantManager = _servantManager;
adapter = _adapter;
- ++_dispatchCount;
+ ++dispatchCount;
}
break;
}
@@ -3324,7 +3342,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
}
servantManager = _servantManager;
adapter = _adapter;
- _dispatchCount += invokeNum;
+ dispatchCount += invokeNum;
}
break;
}
@@ -3410,12 +3428,23 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
message->receivedReply = true;
outAsync = 0;
}
+ else if(outAsync->__finished())
+ {
+ ++dispatchCount;
+ }
else
{
- ++_dispatchCount;
+ outAsync = 0;
}
#else
- ++_dispatchCount;
+ if(outAsync->__finished())
+ {
+ ++dispatchCount;
+ }
+ else
+ {
+ outAsync = 0;
+ }
#endif
notifyAll(); // Notify threads blocked in close(false)
}
@@ -3429,7 +3458,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
if(_callback)
{
heartbeatCallback = _callback;
- ++_dispatchCount;
+ ++dispatchCount;
}
break;
}
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index b7254df51ba..d9ff91c5f53 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -268,7 +268,7 @@ private:
#endif
IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&,
IceInternal::ServantManagerPtr&, ObjectAdapterPtr&,
- IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&);
+ IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&, int&);
void invokeAll(IceInternal::BasicStream&, Int, Int, Byte,
const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&);
diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp
index a223683cc6b..f4b0d89c90c 100644
--- a/cpp/src/Ice/Instance.cpp
+++ b/cpp/src/Ice/Instance.cpp
@@ -1531,6 +1531,7 @@ IceInternal::Instance::destroy()
ThreadPoolPtr serverThreadPool;
ThreadPoolPtr clientThreadPool;
EndpointHostResolverPtr endpointHostResolver;
+ IceUtil::TimerPtr timer;
{
IceUtil::RecMutex::Lock sync(*this);
@@ -1557,8 +1558,7 @@ IceInternal::Instance::destroy()
if(_timer)
{
- _timer->destroy();
- _timer = 0;
+ std::swap(_timer, timer);
}
if(_servantFactoryManager)
@@ -1610,6 +1610,10 @@ IceInternal::Instance::destroy()
//
// Join with the thread pool threads outside the synchronization.
//
+ if(timer)
+ {
+ timer->destroy();
+ }
if(clientThreadPool)
{
clientThreadPool->joinWithAllThreads();
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index c9d0ac28234..509ba18ad06 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -608,12 +608,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc)
//
try
{
- if(!handleException(exc)) // This will throw if the invocation can't be retried.
- {
- return; // Can't be retried immediately.
- }
-
- __invoke(false); // Retry the invocation
+ handleException(exc);
}
catch(const Ice::Exception& ex)
{
@@ -640,9 +635,14 @@ IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex)
AsyncResult::__invokeExceptionAsync(ex);
}
-void
+bool
IceInternal::OutgoingAsync::__finished()
{
+ //
+ // NOTE: this method is called from ConnectionI.parseMessage
+ // with the connection locked. Therefore, it must not invoke
+ // any user callbacks.
+ //
assert(_proxy->ice_isTwoway()); // Can only be called for twoways.
Ice::Byte replyStatus;
@@ -789,15 +789,43 @@ IceInternal::OutgoingAsync::__finished()
_state |= OK;
}
_monitor.notifyAll();
+
+ if(!_callback)
+ {
+ _observer.detach();
+ return false;
+ }
+ return true;
}
- catch(const LocalException& ex)
+ catch(const LocalException& exc)
{
- __finished(ex);
- return;
- }
+ //
+ // We don't call finished(exc) here because we don't want
+ // to invoke the completion callback. The completion
+ // callback is invoked by the connection is this method
+ // returns true.
+ //
+ try
+ {
+ handleException(exc);
+ return false; // Invocation will be retried.
+ }
+ catch(const Ice::Exception& ex)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _state |= Done;
+ _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ _exception.reset(ex.ice_clone());
+ _monitor.notifyAll();
- assert(replyStatus == replyOK || replyStatus == replyUserException);
- __invokeCompleted();
+ if(!_callback)
+ {
+ _observer.detach();
+ return false;
+ }
+ return true;
+ }
+ }
}
bool
@@ -851,39 +879,38 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous)
}
}
}
- break;
}
catch(const RetryException&)
{
_proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
+ continue;
}
catch(const Ice::Exception& ex)
{
- if(!handleException(ex)) // This will throw if the invocation can't be retried.
- {
- break; // Can't be retried immediately.
- }
+ handleException(ex);
}
+ break;
}
return _sentSynchronously;
}
-bool
+void
IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc)
{
try
{
int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
_observer.retried(); // Invocation is being retried.
- if(interval > 0)
- {
- _instance->retryQueue()->add(this, interval);
- return false; // Don't retry immediately, the retry queue will take care of the retry.
- }
- else
- {
- return true; // Retry immediately.
- }
+
+ //
+ // Schedule the retry. Note that we always schedule the retry
+ // on the retry queue even if the invocation can be retried
+ // immediately. This is required because it might not be safe
+ // to retry from this thread (this is for instance called by
+ // finished(BasicStream) which is called with the connection
+ // locked.
+ //
+ _instance->retryQueue()->add(this, interval);
}
catch(const Ice::Exception& ex)
{
diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs
index dc78cf97f98..0e6b3f84b16 100644
--- a/cs/src/Ice/CollocatedRequestHandler.cs
+++ b/cs/src/Ice/CollocatedRequestHandler.cs
@@ -216,7 +216,11 @@ namespace IceInternal
if(outAsync != null)
{
- outAsync.finished();
+ Ice.AsyncCallback cb = outAsync.finished();
+ if(cb != null)
+ {
+ outAsync.invokeCompleted(cb);
+ }
}
_adapter.decDirectCount();
}
diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs
index 637599d07e5..7e6999b2611 100644
--- a/cs/src/Ice/ConnectionI.cs
+++ b/cs/src/Ice/ConnectionI.cs
@@ -1073,6 +1073,7 @@ namespace Ice
StartCallback startCB = null;
Queue<OutgoingMessage> sentCBs = null;
MessageInfo info = new MessageInfo();
+ int dispatchCount = 0;
IceInternal.ThreadPoolMessage msg = new IceInternal.ThreadPoolMessage(this);
lock(this)
@@ -1234,7 +1235,7 @@ namespace Ice
_startCallback = null;
if(startCB != null)
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
}
@@ -1249,19 +1250,15 @@ namespace Ice
if((readyOp & IceInternal.SocketOperation.Read) != 0)
{
newOp |= parseMessage(ref info);
+ dispatchCount += info.messageDispatchCount;
}
if((readyOp & IceInternal.SocketOperation.Write) != 0)
{
- sentCBs = new Queue<OutgoingMessage>();
- newOp |= sendNextMessage(sentCBs);
- if(sentCBs.Count > 0)
+ newOp |= sendNextMessage(out sentCBs);
+ if(sentCBs != null)
{
- ++_dispatchCount;
- }
- else
- {
- sentCBs = null;
+ ++dispatchCount;
}
}
@@ -1273,11 +1270,24 @@ namespace Ice
if(readyOp == 0)
{
+ Debug.Assert(dispatchCount == 0);
return;
}
+ }
- msg.completed(ref current);
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
}
+
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
+
+ _dispatchCount += dispatchCount;
+
+ msg.completed(ref current);
}
catch(DatagramLimitException) // Expected.
{
@@ -1319,11 +1329,6 @@ namespace Ice
msg.finishIOScope(ref current);
}
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
-
//
// Unlike C++/Java, this method is called from an IO thread of the .NET thread
// pool or from the communicator async IO thread. While it's fine to handle the
@@ -1341,7 +1346,7 @@ namespace Ice
private void dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info)
{
- int count = 0;
+ int dispatchedCount = 0;
//
// Notify the factory that the connection establishment and
@@ -1350,7 +1355,7 @@ namespace Ice
if(startCB != null)
{
startCB.connectionStartCompleted(this);
- ++count;
+ ++dispatchedCount;
}
//
@@ -1366,10 +1371,15 @@ namespace Ice
}
if(m.receivedReply)
{
- ((IceInternal.OutgoingAsync)m.outAsync).finished();
+ IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)m.outAsync;
+ Ice.AsyncCallback cb = outAsync.finished();
+ if(cb != null)
+ {
+ outAsync.invokeCompleted(cb);
+ }
}
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1378,8 +1388,8 @@ namespace Ice
//
if(info.outAsync != null)
{
- info.outAsync.finished();
- ++count;
+ info.outAsync.invokeCompleted(info.completedCallback);
+ ++dispatchedCount;
}
if(info.heartbeatCallback != null)
@@ -1392,7 +1402,7 @@ namespace Ice
{
_logger.error("connection callback exception:\n" + ex + '\n' + _desc);
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1406,7 +1416,7 @@ namespace Ice
info.adapter);
//
- // Don't increase count, the dispatch count is
+ // Don't increase dispatchedCount, the dispatch count is
// decreased when the incoming reply is sent.
//
}
@@ -1414,11 +1424,11 @@ namespace Ice
//
// Decrease dispatch count.
//
- if(count > 0)
+ if(dispatchedCount > 0)
{
lock(this)
{
- _dispatchCount -= count;
+ _dispatchCount -= dispatchedCount;
if(_dispatchCount == 0)
{
//
@@ -1508,7 +1518,12 @@ namespace Ice
}
if(message.receivedReply)
{
- ((IceInternal.OutgoingAsync)message.outAsync).finished();
+ IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)message.outAsync;
+ Ice.AsyncCallback cb = outAsync.finished();
+ if(cb != null)
+ {
+ outAsync.invokeCompleted(cb);
+ }
}
_sendStreams.RemoveFirst();
}
@@ -2179,8 +2194,10 @@ namespace Ice
return true;
}
- private int sendNextMessage(Queue<OutgoingMessage> callbacks)
+ private int sendNextMessage(out Queue<OutgoingMessage> callbacks)
{
+ callbacks = null;
+
if(_sendStreams.Count == 0)
{
return IceInternal.SocketOperation.None;
@@ -2205,6 +2222,10 @@ namespace Ice
_writeStream.swap(message.stream);
if(message.sent())
{
+ if(callbacks == null)
+ {
+ callbacks = new Queue<OutgoingMessage>();
+ }
callbacks.Enqueue(message);
}
_sendStreams.RemoveFirst();
@@ -2270,27 +2291,25 @@ namespace Ice
observerFinishWrite(_writeStream.getBuffer());
}
}
+
+ //
+ // If all the messages were sent and we are in the closing state, we schedule
+ // the close timeout to wait for the peer to close the connection.
+ //
+ if(_state == StateClosing && _shutdownInitiated)
+ {
+ setState(StateClosingPending);
+ int op = _transceiver.closing(true, _exception);
+ if(op != 0)
+ {
+ return op;
+ }
+ }
}
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- return IceInternal.SocketOperation.None;
}
-
- //
- // If all the messages were sent and we are in the closing state, we schedule
- // the close timeout to wait for the peer to close the connection.
- //
- if(_state == StateClosing && _dispatchCount == 0)
- {
- setState(StateClosingPending);
- int op = _transceiver.closing(true, _exception);
- if(op != 0)
- {
- return op;
- }
- }
-
return IceInternal.SocketOperation.None;
}
@@ -2419,7 +2438,9 @@ namespace Ice
public IceInternal.ServantManager servantManager;
public ObjectAdapter adapter;
public IceInternal.OutgoingAsync outAsync;
+ public Ice.AsyncCallback completedCallback;
public ConnectionCallback heartbeatCallback;
+ public int messageDispatchCount;
}
private int parseMessage(ref MessageInfo info)
@@ -2509,7 +2530,7 @@ namespace Ice
info.invokeNum = 1;
info.servantManager = _servantManager;
info.adapter = _adapter;
- ++_dispatchCount;
+ ++info.messageDispatchCount;
}
break;
}
@@ -2533,7 +2554,7 @@ namespace Ice
}
info.servantManager = _servantManager;
info.adapter = _adapter;
- _dispatchCount += info.invokeNum;
+ info.messageDispatchCount += info.invokeNum;
}
break;
}
@@ -2542,11 +2563,12 @@ namespace Ice
{
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
- if(_asyncRequests.TryGetValue(info.requestId, out info.outAsync))
+ IceInternal.OutgoingAsync outAsync = null;
+ if(_asyncRequests.TryGetValue(info.requestId, out outAsync))
{
_asyncRequests.Remove(info.requestId);
- info.outAsync.istr__.swap(info.stream);
+ outAsync.istr__.swap(info.stream);
//
// If we just received the reply for a request which isn't acknowledge as
@@ -2554,14 +2576,18 @@ namespace Ice
// will be processed once the write callback is invoked for the message.
//
OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null;
- if(message != null && message.outAsync == info.outAsync)
+ if(message != null && message.outAsync == outAsync)
{
message.receivedReply = true;
- info.outAsync = null;
}
else
{
- ++_dispatchCount;
+ info.completedCallback = outAsync.finished();
+ if(info.completedCallback != null)
+ {
+ info.outAsync = outAsync;
+ ++info.messageDispatchCount;
+ }
}
System.Threading.Monitor.PulseAll(this); // Notify threads blocked in close(false)
}
@@ -2574,7 +2600,7 @@ namespace Ice
if(_callback != null)
{
info.heartbeatCallback = _callback;
- ++_dispatchCount;
+ ++info.messageDispatchCount;
}
break;
}
diff --git a/cs/src/Ice/Instance.cs b/cs/src/Ice/Instance.cs
index ac6962b26f4..14fcce1f716 100644
--- a/cs/src/Ice/Instance.cs
+++ b/cs/src/Ice/Instance.cs
@@ -1083,6 +1083,7 @@ namespace IceInternal
ThreadPool serverThreadPool = null;
ThreadPool clientThreadPool = null;
AsyncIOThread asyncIOThread = null;
+ IceInternal.Timer timer = null;
#if !SILVERLIGHT
EndpointHostResolver endpointHostResolver = null;
@@ -1125,7 +1126,7 @@ namespace IceInternal
if(_timer != null)
{
- _timer.destroy();
+ timer = _timer;
_timer = null;
}
@@ -1176,6 +1177,10 @@ namespace IceInternal
//
// Join with threads outside the synchronization.
//
+ if(timer != null)
+ {
+ timer.destroy();
+ }
if(clientThreadPool != null)
{
clientThreadPool.joinWithAllThreads();
diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs
index 524990f0ed4..5a7e9a11f8b 100644
--- a/cs/src/Ice/OutgoingAsync.cs
+++ b/cs/src/Ice/OutgoingAsync.cs
@@ -982,6 +982,11 @@ namespace IceInternal
base.invokeSent(cb);
}
+ public new void invokeCompleted(Ice.AsyncCallback cb)
+ {
+ base.invokeCompleted(cb);
+ }
+
public void finished(Ice.Exception exc)
{
lock(monitor_)
@@ -1004,15 +1009,9 @@ namespace IceInternal
// NOTE: at this point, synchronization isn't needed, no other threads should be
// calling on the callback.
//
-
try
{
- if(!handleException(exc))
- {
- return; // Can't be retried immediately.
- }
-
- invoke(false); // Retry the invocation
+ handleException(exc);
}
catch(Ice.Exception ex)
{
@@ -1030,12 +1029,11 @@ namespace IceInternal
}, connection);
}
- public void finished()
+ public Ice.AsyncCallback finished()
{
Debug.Assert(proxy_.ice_isTwoway()); // Can only be called for twoways.
byte replyStatus;
- Ice.AsyncCallback cb = null;
try
{
lock(monitor_)
@@ -1185,18 +1183,58 @@ namespace IceInternal
{
state_ |= StateOK;
}
- cb = completedCallback_;
System.Threading.Monitor.PulseAll(monitor_);
+
+ if(completedCallback_ == null)
+ {
+ if(observer_ != null)
+ {
+ observer_.detach();
+ observer_ = null;
+ }
+ return null;
+ }
+ return completedCallback_;
}
}
- catch(Ice.LocalException ex)
+ catch(Ice.LocalException exc)
{
- finished(ex);
- return;
+ //
+ // We don't call finished(exc) here because we don't want
+ // to invoke the completion callback. The completion
+ // callback is invoked by the connection is this method
+ // returns true.
+ //
+ try
+ {
+ handleException(exc);
+ return null;
+ }
+ catch(Ice.LocalException ex)
+ {
+ lock(monitor_)
+ {
+ state_ |= StateDone;
+ exception_ = ex;
+ if(waitHandle_ != null)
+ {
+ waitHandle_.Set();
+ }
+ System.Threading.Monitor.PulseAll(monitor_);
+
+ if(completedCallback_ == null)
+ {
+ if(observer_ != null)
+ {
+ observer_.detach();
+ observer_ = null;
+ }
+ return null;
+ }
+ return completedCallback_;
+ }
+ }
}
-
- Debug.Assert(replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
- invokeCompleted(cb);
}
public bool invoke(bool synchronous)
@@ -1250,20 +1288,18 @@ namespace IceInternal
}
}
}
- break;
}
catch(RetryException)
{
proxy_.setRequestHandler__(_handler, null); // Clear request handler and retry.
+ continue;
}
catch(Ice.Exception ex)
{
- if(!handleException(ex)) // This will throw if the invocation can't be retried.
- {
- break; // Can't be retried immediately.
- }
+ handleException(ex);
}
+ break;
}
return sentSynchronously_;
}
@@ -1376,7 +1412,7 @@ namespace IceInternal
base.invokeExceptionAsync(ex);
}
- private bool handleException(Ice.Exception exc)
+ private void handleException(Ice.Exception exc)
{
try
{
@@ -1385,15 +1421,16 @@ namespace IceInternal
{
observer_.retried(); // Invocation is being retried.
}
- if(interval > 0)
- {
- instance_.retryQueue().add(this, interval);
- return false; // Don't retry immediately, the retry queue will take care of the retry.
- }
- else
- {
- return true; // Retry immediately.
- }
+
+ //
+ // Schedule the retry. Note that we always schedule the retry
+ // on the retry queue even if the invocation can be retried
+ // immediately. This is required because it might not be safe
+ // to retry from this thread (this is for instance called by
+ // finished(BasicStream) which is called with the connection
+ // locked.
+ //
+ instance_.retryQueue().add(this, interval);
}
catch(Ice.Exception ex)
{
@@ -1464,7 +1501,6 @@ namespace IceInternal
}
}
-
instance_.clientThreadPool().dispatch(() =>
{
try
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 00645ce8dc9..5cc8188ef09 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -953,7 +953,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
StartCallback startCB = null;
java.util.List<OutgoingMessage> sentCBs = null;
MessageInfo info = null;
-
+ int dispatchCount = 0;
+
synchronized(this)
{
if(_state >= StateClosed)
@@ -1114,7 +1115,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_startCallback = null;
if(startCB != null)
{
- ++_dispatchCount;
+ ++dispatchCount;
}
}
}
@@ -1131,6 +1132,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// Optimization: use the thread's stream.
info = new MessageInfo(current.stream);
newOp |= parseMessage(info);
+ dispatchCount += info.messageDispatchCount;
}
if((readyOp & IceInternal.SocketOperation.Write) != 0)
@@ -1139,7 +1141,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
newOp |= sendNextMessage(sentCBs);
if(!sentCBs.isEmpty())
{
- ++_dispatchCount;
+ ++dispatchCount;
}
else
{
@@ -1155,9 +1157,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(readyOp == 0)
{
+ assert(dispatchCount == 0);
return;
}
}
+
+ if(_acmLastActivity > 0)
+ {
+ _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
+ }
+
+ if(dispatchCount == 0)
+ {
+ return; // Nothing to dispatch we're done!
+ }
+
+ _dispatchCount += dispatchCount;
+ current.ioCompleted();
}
catch(DatagramLimitException ex) // Expected.
{
@@ -1194,13 +1210,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
return;
}
-
- if(_acmLastActivity > 0)
- {
- _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis();
- }
-
- current.ioCompleted();
}
if(!_dispatcher) // Optimization, call dispatch() directly if there's no
@@ -1239,7 +1248,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
{
- int count = 0;
+ int dispatchedCount = 0;
//
// Notify the factory that the connection establishment and
@@ -1248,7 +1257,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(startCB != null)
{
startCB.connectionStartCompleted(this);
- ++count;
+ ++dispatchedCount;
}
//
@@ -1260,7 +1269,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
msg.outAsync.invokeSent();
}
- ++count;
+ ++dispatchedCount;
}
if(info != null)
@@ -1271,8 +1280,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
if(info.outAsync != null)
{
- info.outAsync.finished(info.stream);
- ++count;
+ info.outAsync.invokeCompleted();
+ ++dispatchedCount;
}
if(info.heartbeatCallback != null)
@@ -1285,7 +1294,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
_logger.error("connection callback exception:\n" + ex + '\n' + _desc);
}
- ++count;
+ ++dispatchedCount;
}
//
@@ -1298,7 +1307,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter);
//
- // Don't increase count, the dispatch count is
+ // Don't increase dispatchedCount, the dispatch count is
// decreased when the incoming reply is sent.
//
}
@@ -1307,11 +1316,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
//
// Decrease dispatch count.
//
- if(count > 0)
+ if(dispatchedCount > 0)
{
synchronized(this)
{
- _dispatchCount -= count;
+ _dispatchCount -= dispatchedCount;
if(_dispatchCount == 0)
{
//
@@ -2194,28 +2203,26 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
observerFinishWrite(_writeStream.getBuffer());
}
}
+
+ //
+ // If all the messages were sent and we are in the closing state, we
+ // schedule the close timeout to wait for the peer to close the
+ // connection.
+ //
+ if(_state == StateClosing && _shutdownInitiated)
+ {
+ setState(StateClosingPending);
+ int op = _transceiver.closing(true, _exception);
+ if(op != 0)
+ {
+ return op;
+ }
+ }
}
catch(Ice.LocalException ex)
{
setState(StateClosed, ex);
- return IceInternal.SocketOperation.None;
}
-
- //
- // If all the messages were sent and we are in the closing state, we
- // schedule the close timeout to wait for the peer to close the
- // connection.
- //
- if(_state == StateClosing && _dispatchCount == 0)
- {
- setState(StateClosingPending);
- int op = _transceiver.closing(true, _exception);
- if(op != 0)
- {
- return op;
- }
- }
-
return IceInternal.SocketOperation.None;
}
@@ -2362,6 +2369,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
ObjectAdapter adapter;
IceInternal.OutgoingAsync outAsync;
ConnectionCallback heartbeatCallback;
+ int messageDispatchCount;
}
private int parseMessage(MessageInfo info)
@@ -2453,7 +2461,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
info.invokeNum = 1;
info.servantManager = _servantManager;
info.adapter = _adapter;
- ++_dispatchCount;
+ ++info.messageDispatchCount;
}
break;
}
@@ -2477,7 +2485,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
}
info.servantManager = _servantManager;
info.adapter = _adapter;
- _dispatchCount += info.invokeNum;
+ info.messageDispatchCount += info.invokeNum;
}
break;
}
@@ -2486,10 +2494,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
- info.outAsync = _asyncRequests.remove(info.requestId);
- if(info.outAsync != null)
+ IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId);
+ if(outAsync != null && outAsync.finished(info.stream))
{
- ++_dispatchCount;
+ info.outAsync = outAsync;
+ ++info.messageDispatchCount;
}
notifyAll(); // Notify threads blocked in close(false)
break;
@@ -2501,7 +2510,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(_callback != null)
{
info.heartbeatCallback = _callback;
- ++_dispatchCount;
+ ++info.messageDispatchCount;
}
break;
}
diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java
index 9b211f0bb72..e2ca0b7c466 100644
--- a/java/src/IceInternal/AsyncResultI.java
+++ b/java/src/IceInternal/AsyncResultI.java
@@ -456,7 +456,7 @@ public class AsyncResultI implements Ice.AsyncResult
{
}
- protected final void invokeCompleted()
+ public final void invokeCompleted()
{
//
// Note: no need to change the _state here, specializations are responsible for
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 1d97dc9a8d6..3c94e5d93a8 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -227,16 +227,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
TraceUtil.traceRecv(os, _logger, _traceLevels);
}
- outAsync = _asyncRequests.get(requestId);
- if(outAsync != null)
- {
- _asyncRequests.remove(requestId);
- }
+ outAsync = _asyncRequests.remove(requestId);
}
- if(outAsync != null)
+ if(outAsync != null && outAsync.finished(os))
{
- outAsync.finished(os);
+ outAsync.invokeCompleted();
}
_adapter.decDirectCount();
}
@@ -500,13 +496,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
OutgoingAsync outAsync = null;
synchronized(this)
{
- outAsync = _asyncRequests.get(requestId);
- if(outAsync != null)
- {
- _asyncRequests.remove(requestId);
- }
+ outAsync = _asyncRequests.remove(requestId);
}
-
if(outAsync != null)
{
outAsync.finished(ex);
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 0618ffb93e9..84bfc477768 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -236,12 +236,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
//
try
{
- if(!handleException(exc))
- {
- return; // Can't be retried immediately.
- }
-
- invoke(false); // Retry the invocation
+ handleException(exc);
}
catch(Ice.Exception ex)
{
@@ -262,8 +257,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
});
}
- public final void finished(BasicStream is)
+ public final boolean finished(BasicStream is)
{
+ //
+ // NOTE: this method is called from ConnectionI.parseMessage
+ // with the connection locked. Therefore, it must not invoke
+ // any user callbacks.
+ //
+
assert (_proxy.ice_isTwoway()); // Can only be called for twoways.
byte replyStatus;
@@ -278,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
_childObserver.detach();
_childObserver = null;
}
-
+
if(_timeoutRequestHandler != null)
{
_future.cancel(false);
_future = null;
_timeoutRequestHandler = null;
}
-
+
// _is can already be initialized if the invocation is retried
if(_is == null)
{
@@ -293,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
_is.swap(is);
replyStatus = _is.readByte();
-
+
switch(replyStatus)
{
case ReplyStatus.replyOK:
@@ -415,25 +416,59 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
}
- _state |= StateDone;
- // Clear buffer now, instead of waiting for AsyncResult
- // deallocation
- // _os.resize(0, false);
if(replyStatus == ReplyStatus.replyOK)
{
_state |= StateOK;
}
+ _state |= StateDone;
_monitor.notifyAll();
+
+ if(_callback == null)
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return false;
+ }
+ return true;
}
}
- catch(Ice.LocalException ex)
+ catch(Ice.Exception exc)
{
- finished(ex);
- return;
- }
+ //
+ // We don't call finished(exc) here because we don't want
+ // to invoke the completion callback. The completion
+ // callback is invoked by the connection is this method
+ // returns true.
+ //
+ try
+ {
+ handleException(exc);
+ return false;
+ }
+ catch(Ice.LocalException ex)
+ {
+ synchronized(_monitor)
+ {
+ _state |= StateDone;
+ _exception = ex;
+ _monitor.notifyAll();
- assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException);
- invokeCompleted();
+ if(_callback == null)
+ {
+ if(_observer != null)
+ {
+ _observer.detach();
+ _observer = null;
+ }
+ return false;
+ }
+ return true;
+ }
+ }
+ }
}
public final boolean invoke(boolean synchronous)
@@ -500,21 +535,19 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
}
}
}
- break;
}
catch(RetryException ex)
{
// Clear request handler and retry.
_proxy.__setRequestHandler(_handler, null);
+ continue;
}
catch(Ice.Exception ex)
{
// This will throw if the invocation can't be retried.
- if(!handleException(ex))
- {
- break; // Can't be retried immediately.
- }
+ handleException(ex);
}
+ break;
}
return _sentSynchronously;
}
@@ -589,7 +622,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
super.invokeExceptionAsync(ex);
}
- private boolean handleException(Ice.Exception exc)
+ private void handleException(Ice.Exception exc)
{
try
{
@@ -599,16 +632,16 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC
{
_observer.retried(); // Invocation is being retried.
}
- if(interval.value > 0)
- {
- _instance.retryQueue().add(this, interval.value);
- return false; // Don't retry immediately, the retry queue will
- // take care of the retry.
- }
- else
- {
- return true; // Retry immediately.
- }
+
+ //
+ // Schedule the retry. Note that we always schedule the retry
+ // on the retry queue even if the invocation can be retried
+ // immediately. This is required because it might not be safe
+ // to retry from this thread (this is for instance called by
+ // finished(BasicStream) which is called with the connection
+ // locked.
+ //
+ _instance.retryQueue().add(this, interval.value);
}
catch(Ice.Exception ex)
{