diff options
-rw-r--r-- | cpp/include/Ice/OutgoingAsync.h | 6 | ||||
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 109 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 2 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 83 | ||||
-rw-r--r-- | cs/src/Ice/CollocatedRequestHandler.cs | 6 | ||||
-rw-r--r-- | cs/src/Ice/ConnectionI.cs | 128 | ||||
-rw-r--r-- | cs/src/Ice/Instance.cs | 7 | ||||
-rw-r--r-- | cs/src/Ice/OutgoingAsync.cs | 100 | ||||
-rw-r--r-- | java/src/Ice/ConnectionI.java | 93 | ||||
-rw-r--r-- | java/src/IceInternal/AsyncResultI.java | 2 | ||||
-rw-r--r-- | java/src/IceInternal/CollocatedRequestHandler.java | 17 | ||||
-rw-r--r-- | java/src/IceInternal/OutgoingAsync.java | 105 |
14 files changed, 417 insertions, 253 deletions
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h index 13185c386b4..b7a14f2dd39 100644 --- a/cpp/include/Ice/OutgoingAsync.h +++ b/cpp/include/Ice/OutgoingAsync.h @@ -113,6 +113,7 @@ public: bool __wait(); void __throwUserException(); virtual void __invokeExceptionAsync(const Exception&); + void __invokeCompleted(); static void __check(const AsyncResultPtr&, const ::IceProxy::Ice::Object*, const ::std::string&); static void __check(const AsyncResultPtr&, const Connection*, const ::std::string&); @@ -143,7 +144,6 @@ protected: const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); void __invokeSentAsync(); - void __invokeCompleted(); void runTimerTask(); // Implementation of TimerTask::runTimerTask() @@ -257,7 +257,7 @@ public: virtual void __finished(const Ice::Exception&); virtual void __invokeExceptionAsync(const Ice::Exception&); - void __finished(); + bool __finished(); bool __invoke(bool); BasicStream* __startWriteParams(Ice::FormatType format) @@ -297,7 +297,7 @@ protected: private: - bool handleException(const Ice::Exception&); + void handleException(const Ice::Exception&); RequestHandlerPtr _handler; Ice::EncodingVersion _encoding; diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 452511591a5..95c151ff6e2 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -527,9 +527,9 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) } } - if(outAsync) + if(outAsync && outAsync->__finished()) { - outAsync->__finished(); + outAsync->__invokeCompleted(); } _adapter->decDirectCount(); } diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index b42b8558a7f..fcf05ef9db0 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1601,6 +1601,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ObjectAdapterPtr adapter; OutgoingAsyncPtr outAsync; ConnectionCallbackPtr heartbeatCallback; + int dispatchCount = 0; ThreadPoolMessage<ConnectionI> msg(current, *this); @@ -1760,7 +1761,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) swap(_startCallback, startCB); if(startCB) { - ++_dispatchCount; + ++dispatchCount; } } } @@ -1781,7 +1782,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) servantManager, adapter, outAsync, - heartbeatCallback)); + heartbeatCallback, + dispatchCount)); } if(readyOp & SocketOperationWrite) @@ -1789,7 +1791,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) newOp = static_cast<SocketOperation>(newOp | sendNextMessage(sentCBs)); if(!sentCBs.empty()) { - ++_dispatchCount; + ++dispatchCount; } } @@ -1801,9 +1803,23 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) if(!readyOp) { + assert(dispatchCount == 0); return; } } + + if(_acmLastActivity != IceUtil::Time()) + { + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); + } + + if(dispatchCount == 0) + { + return; // Nothing to dispatch we're done! + } + + _dispatchCount += dispatchCount; + io.completed(); } catch(const DatagramLimitException&) // Expected. { @@ -1841,11 +1857,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } return; } - if(_acmLastActivity != IceUtil::Time()) - { - _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); - } - io.completed(); } if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher. @@ -1867,7 +1878,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) { - int count = 0; + int dispatchedCount = 0; // // Notify the factory that the connection establishment and @@ -1876,7 +1887,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess if(startCB) { startCB->connectionStartCompleted(this); - ++count; + ++dispatchedCount; } // @@ -1893,13 +1904,17 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess } if(p->receivedReply) { - OutgoingAsyncPtr::dynamicCast(p->outAsync)->__finished(); + OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync); + if(outAsync->__finished()) + { + outAsync->__invokeCompleted(); + } } #else p->outAsync->__invokeSent(); #endif } - ++count; + ++dispatchedCount; } // @@ -1908,8 +1923,8 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(outAsync) { - outAsync->__finished(); - ++count; + outAsync->__invokeCompleted(); + ++dispatchedCount; } if(heartbeatCallback) @@ -1928,7 +1943,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess Error out(_instance->initializationData().logger); out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc; } - ++count; + ++dispatchedCount; } // @@ -1949,10 +1964,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // // Decrease dispatch count. // - if(count > 0) + if(dispatchedCount > 0) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _dispatchCount -= count; + _dispatchCount -= dispatchedCount; if(_dispatchCount == 0) { // @@ -2046,7 +2061,11 @@ Ice::ConnectionI::finish() } if(message->receivedReply) { - OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished(); + OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync); + if(outAsync->__finished()) + { + outAsync->__invokeCompleted(); + } } _sendStreams.pop_front(); } @@ -2896,27 +2915,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) _observer.finishWrite(_writeStream); } } + + // + // If all the messages were sent and we are in the closing state, we schedule + // the close timeout to wait for the peer to close the connection. + // + if(_state == StateClosing && _shutdownInitiated) + { + setState(StateClosingPending); + SocketOperation op = _transceiver->closing(true, *_exception.get()); + if(op) + { + return op; + } + } } catch(const Ice::LocalException& ex) { setState(StateClosed, ex); - return SocketOperationNone; - } - - // - // If all the messages were sent and we are in the closing state, we schedule - // the close timeout to wait for the peer to close the connection. - // - if(_state == StateClosing && _dispatchCount == 0) - { - setState(StateClosingPending); - SocketOperation op = _transceiver->closing(true, *_exception.get()); - if(op) - { - return op; - } } - return SocketOperationNone; } @@ -3215,7 +3232,8 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback) + OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3301,7 +3319,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request invokeNum = 1; servantManager = _servantManager; adapter = _adapter; - ++_dispatchCount; + ++dispatchCount; } break; } @@ -3324,7 +3342,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } servantManager = _servantManager; adapter = _adapter; - _dispatchCount += invokeNum; + dispatchCount += invokeNum; } break; } @@ -3410,12 +3428,23 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request message->receivedReply = true; outAsync = 0; } + else if(outAsync->__finished()) + { + ++dispatchCount; + } else { - ++_dispatchCount; + outAsync = 0; } #else - ++_dispatchCount; + if(outAsync->__finished()) + { + ++dispatchCount; + } + else + { + outAsync = 0; + } #endif notifyAll(); // Notify threads blocked in close(false) } @@ -3429,7 +3458,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request if(_callback) { heartbeatCallback = _callback; - ++_dispatchCount; + ++dispatchCount; } break; } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index b7254df51ba..d9ff91c5f53 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -268,7 +268,7 @@ private: #endif IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&, IceInternal::ServantManagerPtr&, ObjectAdapterPtr&, - IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&); + IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&, int&); void invokeAll(IceInternal::BasicStream&, Int, Int, Byte, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&); diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index a223683cc6b..f4b0d89c90c 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -1531,6 +1531,7 @@ IceInternal::Instance::destroy() ThreadPoolPtr serverThreadPool; ThreadPoolPtr clientThreadPool; EndpointHostResolverPtr endpointHostResolver; + IceUtil::TimerPtr timer; { IceUtil::RecMutex::Lock sync(*this); @@ -1557,8 +1558,7 @@ IceInternal::Instance::destroy() if(_timer) { - _timer->destroy(); - _timer = 0; + std::swap(_timer, timer); } if(_servantFactoryManager) @@ -1610,6 +1610,10 @@ IceInternal::Instance::destroy() // // Join with the thread pool threads outside the synchronization. // + if(timer) + { + timer->destroy(); + } if(clientThreadPool) { clientThreadPool->joinWithAllThreads(); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index c9d0ac28234..509ba18ad06 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -608,12 +608,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc) // try { - if(!handleException(exc)) // This will throw if the invocation can't be retried. - { - return; // Can't be retried immediately. - } - - __invoke(false); // Retry the invocation + handleException(exc); } catch(const Ice::Exception& ex) { @@ -640,9 +635,14 @@ IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex) AsyncResult::__invokeExceptionAsync(ex); } -void +bool IceInternal::OutgoingAsync::__finished() { + // + // NOTE: this method is called from ConnectionI.parseMessage + // with the connection locked. Therefore, it must not invoke + // any user callbacks. + // assert(_proxy->ice_isTwoway()); // Can only be called for twoways. Ice::Byte replyStatus; @@ -789,15 +789,43 @@ IceInternal::OutgoingAsync::__finished() _state |= OK; } _monitor.notifyAll(); + + if(!_callback) + { + _observer.detach(); + return false; + } + return true; } - catch(const LocalException& ex) + catch(const LocalException& exc) { - __finished(ex); - return; - } + // + // We don't call finished(exc) here because we don't want + // to invoke the completion callback. The completion + // callback is invoked by the connection is this method + // returns true. + // + try + { + handleException(exc); + return false; // Invocation will be retried. + } + catch(const Ice::Exception& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation + _exception.reset(ex.ice_clone()); + _monitor.notifyAll(); - assert(replyStatus == replyOK || replyStatus == replyUserException); - __invokeCompleted(); + if(!_callback) + { + _observer.detach(); + return false; + } + return true; + } + } } bool @@ -851,39 +879,38 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous) } } } - break; } catch(const RetryException&) { _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. + continue; } catch(const Ice::Exception& ex) { - if(!handleException(ex)) // This will throw if the invocation can't be retried. - { - break; // Can't be retried immediately. - } + handleException(ex); } + break; } return _sentSynchronously; } -bool +void IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) { try { int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); _observer.retried(); // Invocation is being retried. - if(interval > 0) - { - _instance->retryQueue()->add(this, interval); - return false; // Don't retry immediately, the retry queue will take care of the retry. - } - else - { - return true; // Retry immediately. - } + + // + // Schedule the retry. Note that we always schedule the retry + // on the retry queue even if the invocation can be retried + // immediately. This is required because it might not be safe + // to retry from this thread (this is for instance called by + // finished(BasicStream) which is called with the connection + // locked. + // + _instance->retryQueue()->add(this, interval); } catch(const Ice::Exception& ex) { diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs index dc78cf97f98..0e6b3f84b16 100644 --- a/cs/src/Ice/CollocatedRequestHandler.cs +++ b/cs/src/Ice/CollocatedRequestHandler.cs @@ -216,7 +216,11 @@ namespace IceInternal if(outAsync != null) { - outAsync.finished(); + Ice.AsyncCallback cb = outAsync.finished(); + if(cb != null) + { + outAsync.invokeCompleted(cb); + } } _adapter.decDirectCount(); } diff --git a/cs/src/Ice/ConnectionI.cs b/cs/src/Ice/ConnectionI.cs index 637599d07e5..7e6999b2611 100644 --- a/cs/src/Ice/ConnectionI.cs +++ b/cs/src/Ice/ConnectionI.cs @@ -1073,6 +1073,7 @@ namespace Ice StartCallback startCB = null; Queue<OutgoingMessage> sentCBs = null; MessageInfo info = new MessageInfo(); + int dispatchCount = 0; IceInternal.ThreadPoolMessage msg = new IceInternal.ThreadPoolMessage(this); lock(this) @@ -1234,7 +1235,7 @@ namespace Ice _startCallback = null; if(startCB != null) { - ++_dispatchCount; + ++dispatchCount; } } } @@ -1249,19 +1250,15 @@ namespace Ice if((readyOp & IceInternal.SocketOperation.Read) != 0) { newOp |= parseMessage(ref info); + dispatchCount += info.messageDispatchCount; } if((readyOp & IceInternal.SocketOperation.Write) != 0) { - sentCBs = new Queue<OutgoingMessage>(); - newOp |= sendNextMessage(sentCBs); - if(sentCBs.Count > 0) + newOp |= sendNextMessage(out sentCBs); + if(sentCBs != null) { - ++_dispatchCount; - } - else - { - sentCBs = null; + ++dispatchCount; } } @@ -1273,11 +1270,24 @@ namespace Ice if(readyOp == 0) { + Debug.Assert(dispatchCount == 0); return; } + } - msg.completed(ref current); + if(_acmLastActivity > 0) + { + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); } + + if(dispatchCount == 0) + { + return; // Nothing to dispatch we're done! + } + + _dispatchCount += dispatchCount; + + msg.completed(ref current); } catch(DatagramLimitException) // Expected. { @@ -1319,11 +1329,6 @@ namespace Ice msg.finishIOScope(ref current); } - if(_acmLastActivity > 0) - { - _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); - } - // // Unlike C++/Java, this method is called from an IO thread of the .NET thread // pool or from the communicator async IO thread. While it's fine to handle the @@ -1341,7 +1346,7 @@ namespace Ice private void dispatch(StartCallback startCB, Queue<OutgoingMessage> sentCBs, MessageInfo info) { - int count = 0; + int dispatchedCount = 0; // // Notify the factory that the connection establishment and @@ -1350,7 +1355,7 @@ namespace Ice if(startCB != null) { startCB.connectionStartCompleted(this); - ++count; + ++dispatchedCount; } // @@ -1366,10 +1371,15 @@ namespace Ice } if(m.receivedReply) { - ((IceInternal.OutgoingAsync)m.outAsync).finished(); + IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)m.outAsync; + Ice.AsyncCallback cb = outAsync.finished(); + if(cb != null) + { + outAsync.invokeCompleted(cb); + } } } - ++count; + ++dispatchedCount; } // @@ -1378,8 +1388,8 @@ namespace Ice // if(info.outAsync != null) { - info.outAsync.finished(); - ++count; + info.outAsync.invokeCompleted(info.completedCallback); + ++dispatchedCount; } if(info.heartbeatCallback != null) @@ -1392,7 +1402,7 @@ namespace Ice { _logger.error("connection callback exception:\n" + ex + '\n' + _desc); } - ++count; + ++dispatchedCount; } // @@ -1406,7 +1416,7 @@ namespace Ice info.adapter); // - // Don't increase count, the dispatch count is + // Don't increase dispatchedCount, the dispatch count is // decreased when the incoming reply is sent. // } @@ -1414,11 +1424,11 @@ namespace Ice // // Decrease dispatch count. // - if(count > 0) + if(dispatchedCount > 0) { lock(this) { - _dispatchCount -= count; + _dispatchCount -= dispatchedCount; if(_dispatchCount == 0) { // @@ -1508,7 +1518,12 @@ namespace Ice } if(message.receivedReply) { - ((IceInternal.OutgoingAsync)message.outAsync).finished(); + IceInternal.OutgoingAsync outAsync = (IceInternal.OutgoingAsync)message.outAsync; + Ice.AsyncCallback cb = outAsync.finished(); + if(cb != null) + { + outAsync.invokeCompleted(cb); + } } _sendStreams.RemoveFirst(); } @@ -2179,8 +2194,10 @@ namespace Ice return true; } - private int sendNextMessage(Queue<OutgoingMessage> callbacks) + private int sendNextMessage(out Queue<OutgoingMessage> callbacks) { + callbacks = null; + if(_sendStreams.Count == 0) { return IceInternal.SocketOperation.None; @@ -2205,6 +2222,10 @@ namespace Ice _writeStream.swap(message.stream); if(message.sent()) { + if(callbacks == null) + { + callbacks = new Queue<OutgoingMessage>(); + } callbacks.Enqueue(message); } _sendStreams.RemoveFirst(); @@ -2270,27 +2291,25 @@ namespace Ice observerFinishWrite(_writeStream.getBuffer()); } } + + // + // If all the messages were sent and we are in the closing state, we schedule + // the close timeout to wait for the peer to close the connection. + // + if(_state == StateClosing && _shutdownInitiated) + { + setState(StateClosingPending); + int op = _transceiver.closing(true, _exception); + if(op != 0) + { + return op; + } + } } catch(Ice.LocalException ex) { setState(StateClosed, ex); - return IceInternal.SocketOperation.None; } - - // - // If all the messages were sent and we are in the closing state, we schedule - // the close timeout to wait for the peer to close the connection. - // - if(_state == StateClosing && _dispatchCount == 0) - { - setState(StateClosingPending); - int op = _transceiver.closing(true, _exception); - if(op != 0) - { - return op; - } - } - return IceInternal.SocketOperation.None; } @@ -2419,7 +2438,9 @@ namespace Ice public IceInternal.ServantManager servantManager; public ObjectAdapter adapter; public IceInternal.OutgoingAsync outAsync; + public Ice.AsyncCallback completedCallback; public ConnectionCallback heartbeatCallback; + public int messageDispatchCount; } private int parseMessage(ref MessageInfo info) @@ -2509,7 +2530,7 @@ namespace Ice info.invokeNum = 1; info.servantManager = _servantManager; info.adapter = _adapter; - ++_dispatchCount; + ++info.messageDispatchCount; } break; } @@ -2533,7 +2554,7 @@ namespace Ice } info.servantManager = _servantManager; info.adapter = _adapter; - _dispatchCount += info.invokeNum; + info.messageDispatchCount += info.invokeNum; } break; } @@ -2542,11 +2563,12 @@ namespace Ice { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); - if(_asyncRequests.TryGetValue(info.requestId, out info.outAsync)) + IceInternal.OutgoingAsync outAsync = null; + if(_asyncRequests.TryGetValue(info.requestId, out outAsync)) { _asyncRequests.Remove(info.requestId); - info.outAsync.istr__.swap(info.stream); + outAsync.istr__.swap(info.stream); // // If we just received the reply for a request which isn't acknowledge as @@ -2554,14 +2576,18 @@ namespace Ice // will be processed once the write callback is invoked for the message. // OutgoingMessage message = _sendStreams.Count > 0 ? _sendStreams.First.Value : null; - if(message != null && message.outAsync == info.outAsync) + if(message != null && message.outAsync == outAsync) { message.receivedReply = true; - info.outAsync = null; } else { - ++_dispatchCount; + info.completedCallback = outAsync.finished(); + if(info.completedCallback != null) + { + info.outAsync = outAsync; + ++info.messageDispatchCount; + } } System.Threading.Monitor.PulseAll(this); // Notify threads blocked in close(false) } @@ -2574,7 +2600,7 @@ namespace Ice if(_callback != null) { info.heartbeatCallback = _callback; - ++_dispatchCount; + ++info.messageDispatchCount; } break; } diff --git a/cs/src/Ice/Instance.cs b/cs/src/Ice/Instance.cs index ac6962b26f4..14fcce1f716 100644 --- a/cs/src/Ice/Instance.cs +++ b/cs/src/Ice/Instance.cs @@ -1083,6 +1083,7 @@ namespace IceInternal ThreadPool serverThreadPool = null; ThreadPool clientThreadPool = null; AsyncIOThread asyncIOThread = null; + IceInternal.Timer timer = null; #if !SILVERLIGHT EndpointHostResolver endpointHostResolver = null; @@ -1125,7 +1126,7 @@ namespace IceInternal if(_timer != null) { - _timer.destroy(); + timer = _timer; _timer = null; } @@ -1176,6 +1177,10 @@ namespace IceInternal // // Join with threads outside the synchronization. // + if(timer != null) + { + timer.destroy(); + } if(clientThreadPool != null) { clientThreadPool.joinWithAllThreads(); diff --git a/cs/src/Ice/OutgoingAsync.cs b/cs/src/Ice/OutgoingAsync.cs index 524990f0ed4..5a7e9a11f8b 100644 --- a/cs/src/Ice/OutgoingAsync.cs +++ b/cs/src/Ice/OutgoingAsync.cs @@ -982,6 +982,11 @@ namespace IceInternal base.invokeSent(cb); } + public new void invokeCompleted(Ice.AsyncCallback cb) + { + base.invokeCompleted(cb); + } + public void finished(Ice.Exception exc) { lock(monitor_) @@ -1004,15 +1009,9 @@ namespace IceInternal // NOTE: at this point, synchronization isn't needed, no other threads should be // calling on the callback. // - try { - if(!handleException(exc)) - { - return; // Can't be retried immediately. - } - - invoke(false); // Retry the invocation + handleException(exc); } catch(Ice.Exception ex) { @@ -1030,12 +1029,11 @@ namespace IceInternal }, connection); } - public void finished() + public Ice.AsyncCallback finished() { Debug.Assert(proxy_.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; - Ice.AsyncCallback cb = null; try { lock(monitor_) @@ -1185,18 +1183,58 @@ namespace IceInternal { state_ |= StateOK; } - cb = completedCallback_; System.Threading.Monitor.PulseAll(monitor_); + + if(completedCallback_ == null) + { + if(observer_ != null) + { + observer_.detach(); + observer_ = null; + } + return null; + } + return completedCallback_; } } - catch(Ice.LocalException ex) + catch(Ice.LocalException exc) { - finished(ex); - return; + // + // We don't call finished(exc) here because we don't want + // to invoke the completion callback. The completion + // callback is invoked by the connection is this method + // returns true. + // + try + { + handleException(exc); + return null; + } + catch(Ice.LocalException ex) + { + lock(monitor_) + { + state_ |= StateDone; + exception_ = ex; + if(waitHandle_ != null) + { + waitHandle_.Set(); + } + System.Threading.Monitor.PulseAll(monitor_); + + if(completedCallback_ == null) + { + if(observer_ != null) + { + observer_.detach(); + observer_ = null; + } + return null; + } + return completedCallback_; + } + } } - - Debug.Assert(replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); - invokeCompleted(cb); } public bool invoke(bool synchronous) @@ -1250,20 +1288,18 @@ namespace IceInternal } } } - break; } catch(RetryException) { proxy_.setRequestHandler__(_handler, null); // Clear request handler and retry. + continue; } catch(Ice.Exception ex) { - if(!handleException(ex)) // This will throw if the invocation can't be retried. - { - break; // Can't be retried immediately. - } + handleException(ex); } + break; } return sentSynchronously_; } @@ -1376,7 +1412,7 @@ namespace IceInternal base.invokeExceptionAsync(ex); } - private bool handleException(Ice.Exception exc) + private void handleException(Ice.Exception exc) { try { @@ -1385,15 +1421,16 @@ namespace IceInternal { observer_.retried(); // Invocation is being retried. } - if(interval > 0) - { - instance_.retryQueue().add(this, interval); - return false; // Don't retry immediately, the retry queue will take care of the retry. - } - else - { - return true; // Retry immediately. - } + + // + // Schedule the retry. Note that we always schedule the retry + // on the retry queue even if the invocation can be retried + // immediately. This is required because it might not be safe + // to retry from this thread (this is for instance called by + // finished(BasicStream) which is called with the connection + // locked. + // + instance_.retryQueue().add(this, interval); } catch(Ice.Exception ex) { @@ -1464,7 +1501,6 @@ namespace IceInternal } } - instance_.clientThreadPool().dispatch(() => { try diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 00645ce8dc9..5cc8188ef09 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -953,7 +953,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne StartCallback startCB = null; java.util.List<OutgoingMessage> sentCBs = null; MessageInfo info = null; - + int dispatchCount = 0; + synchronized(this) { if(_state >= StateClosed) @@ -1114,7 +1115,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _startCallback = null; if(startCB != null) { - ++_dispatchCount; + ++dispatchCount; } } } @@ -1131,6 +1132,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // Optimization: use the thread's stream. info = new MessageInfo(current.stream); newOp |= parseMessage(info); + dispatchCount += info.messageDispatchCount; } if((readyOp & IceInternal.SocketOperation.Write) != 0) @@ -1139,7 +1141,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne newOp |= sendNextMessage(sentCBs); if(!sentCBs.isEmpty()) { - ++_dispatchCount; + ++dispatchCount; } else { @@ -1155,9 +1157,23 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(readyOp == 0) { + assert(dispatchCount == 0); return; } } + + if(_acmLastActivity > 0) + { + _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); + } + + if(dispatchCount == 0) + { + return; // Nothing to dispatch we're done! + } + + _dispatchCount += dispatchCount; + current.ioCompleted(); } catch(DatagramLimitException ex) // Expected. { @@ -1194,13 +1210,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } return; } - - if(_acmLastActivity > 0) - { - _acmLastActivity = IceInternal.Time.currentMonotonicTimeMillis(); - } - - current.ioCompleted(); } if(!_dispatcher) // Optimization, call dispatch() directly if there's no @@ -1239,7 +1248,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne protected void dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) { - int count = 0; + int dispatchedCount = 0; // // Notify the factory that the connection establishment and @@ -1248,7 +1257,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(startCB != null) { startCB.connectionStartCompleted(this); - ++count; + ++dispatchedCount; } // @@ -1260,7 +1269,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { msg.outAsync.invokeSent(); } - ++count; + ++dispatchedCount; } if(info != null) @@ -1271,8 +1280,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // if(info.outAsync != null) { - info.outAsync.finished(info.stream); - ++count; + info.outAsync.invokeCompleted(); + ++dispatchedCount; } if(info.heartbeatCallback != null) @@ -1285,7 +1294,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { _logger.error("connection callback exception:\n" + ex + '\n' + _desc); } - ++count; + ++dispatchedCount; } // @@ -1298,7 +1307,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne invokeAll(info.stream, info.invokeNum, info.requestId, info.compress, info.servantManager, info.adapter); // - // Don't increase count, the dispatch count is + // Don't increase dispatchedCount, the dispatch count is // decreased when the incoming reply is sent. // } @@ -1307,11 +1316,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne // // Decrease dispatch count. // - if(count > 0) + if(dispatchedCount > 0) { synchronized(this) { - _dispatchCount -= count; + _dispatchCount -= dispatchedCount; if(_dispatchCount == 0) { // @@ -2194,28 +2203,26 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne observerFinishWrite(_writeStream.getBuffer()); } } + + // + // If all the messages were sent and we are in the closing state, we + // schedule the close timeout to wait for the peer to close the + // connection. + // + if(_state == StateClosing && _shutdownInitiated) + { + setState(StateClosingPending); + int op = _transceiver.closing(true, _exception); + if(op != 0) + { + return op; + } + } } catch(Ice.LocalException ex) { setState(StateClosed, ex); - return IceInternal.SocketOperation.None; } - - // - // If all the messages were sent and we are in the closing state, we - // schedule the close timeout to wait for the peer to close the - // connection. - // - if(_state == StateClosing && _dispatchCount == 0) - { - setState(StateClosingPending); - int op = _transceiver.closing(true, _exception); - if(op != 0) - { - return op; - } - } - return IceInternal.SocketOperation.None; } @@ -2362,6 +2369,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne ObjectAdapter adapter; IceInternal.OutgoingAsync outAsync; ConnectionCallback heartbeatCallback; + int messageDispatchCount; } private int parseMessage(MessageInfo info) @@ -2453,7 +2461,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne info.invokeNum = 1; info.servantManager = _servantManager; info.adapter = _adapter; - ++_dispatchCount; + ++info.messageDispatchCount; } break; } @@ -2477,7 +2485,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } info.servantManager = _servantManager; info.adapter = _adapter; - _dispatchCount += info.invokeNum; + info.messageDispatchCount += info.invokeNum; } break; } @@ -2486,10 +2494,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels); info.requestId = info.stream.readInt(); - info.outAsync = _asyncRequests.remove(info.requestId); - if(info.outAsync != null) + IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId); + if(outAsync != null && outAsync.finished(info.stream)) { - ++_dispatchCount; + info.outAsync = outAsync; + ++info.messageDispatchCount; } notifyAll(); // Notify threads blocked in close(false) break; @@ -2501,7 +2510,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if(_callback != null) { info.heartbeatCallback = _callback; - ++_dispatchCount; + ++info.messageDispatchCount; } break; } diff --git a/java/src/IceInternal/AsyncResultI.java b/java/src/IceInternal/AsyncResultI.java index 9b211f0bb72..e2ca0b7c466 100644 --- a/java/src/IceInternal/AsyncResultI.java +++ b/java/src/IceInternal/AsyncResultI.java @@ -456,7 +456,7 @@ public class AsyncResultI implements Ice.AsyncResult { } - protected final void invokeCompleted() + public final void invokeCompleted() { // // Note: no need to change the _state here, specializations are responsible for diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java index 1d97dc9a8d6..3c94e5d93a8 100644 --- a/java/src/IceInternal/CollocatedRequestHandler.java +++ b/java/src/IceInternal/CollocatedRequestHandler.java @@ -227,16 +227,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler TraceUtil.traceRecv(os, _logger, _traceLevels); } - outAsync = _asyncRequests.get(requestId); - if(outAsync != null) - { - _asyncRequests.remove(requestId); - } + outAsync = _asyncRequests.remove(requestId); } - if(outAsync != null) + if(outAsync != null && outAsync.finished(os)) { - outAsync.finished(os); + outAsync.invokeCompleted(); } _adapter.decDirectCount(); } @@ -500,13 +496,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler OutgoingAsync outAsync = null; synchronized(this) { - outAsync = _asyncRequests.get(requestId); - if(outAsync != null) - { - _asyncRequests.remove(requestId); - } + outAsync = _asyncRequests.remove(requestId); } - if(outAsync != null) { outAsync.finished(ex); diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java index 0618ffb93e9..84bfc477768 100644 --- a/java/src/IceInternal/OutgoingAsync.java +++ b/java/src/IceInternal/OutgoingAsync.java @@ -236,12 +236,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC // try { - if(!handleException(exc)) - { - return; // Can't be retried immediately. - } - - invoke(false); // Retry the invocation + handleException(exc); } catch(Ice.Exception ex) { @@ -262,8 +257,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC }); } - public final void finished(BasicStream is) + public final boolean finished(BasicStream is) { + // + // NOTE: this method is called from ConnectionI.parseMessage + // with the connection locked. Therefore, it must not invoke + // any user callbacks. + // + assert (_proxy.ice_isTwoway()); // Can only be called for twoways. byte replyStatus; @@ -278,14 +279,14 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC _childObserver.detach(); _childObserver = null; } - + if(_timeoutRequestHandler != null) { _future.cancel(false); _future = null; _timeoutRequestHandler = null; } - + // _is can already be initialized if the invocation is retried if(_is == null) { @@ -293,7 +294,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } _is.swap(is); replyStatus = _is.readByte(); - + switch(replyStatus) { case ReplyStatus.replyOK: @@ -415,25 +416,59 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } } - _state |= StateDone; - // Clear buffer now, instead of waiting for AsyncResult - // deallocation - // _os.resize(0, false); if(replyStatus == ReplyStatus.replyOK) { _state |= StateOK; } + _state |= StateDone; _monitor.notifyAll(); + + if(_callback == null) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return false; + } + return true; } } - catch(Ice.LocalException ex) + catch(Ice.Exception exc) { - finished(ex); - return; - } + // + // We don't call finished(exc) here because we don't want + // to invoke the completion callback. The completion + // callback is invoked by the connection is this method + // returns true. + // + try + { + handleException(exc); + return false; + } + catch(Ice.LocalException ex) + { + synchronized(_monitor) + { + _state |= StateDone; + _exception = ex; + _monitor.notifyAll(); - assert (replyStatus == ReplyStatus.replyOK || replyStatus == ReplyStatus.replyUserException); - invokeCompleted(); + if(_callback == null) + { + if(_observer != null) + { + _observer.detach(); + _observer = null; + } + return false; + } + return true; + } + } + } } public final boolean invoke(boolean synchronous) @@ -500,21 +535,19 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC } } } - break; } catch(RetryException ex) { // Clear request handler and retry. _proxy.__setRequestHandler(_handler, null); + continue; } catch(Ice.Exception ex) { // This will throw if the invocation can't be retried. - if(!handleException(ex)) - { - break; // Can't be retried immediately. - } + handleException(ex); } + break; } return _sentSynchronously; } @@ -589,7 +622,7 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC super.invokeExceptionAsync(ex); } - private boolean handleException(Ice.Exception exc) + private void handleException(Ice.Exception exc) { try { @@ -599,16 +632,16 @@ public class OutgoingAsync extends AsyncResultI implements OutgoingAsyncMessageC { _observer.retried(); // Invocation is being retried. } - if(interval.value > 0) - { - _instance.retryQueue().add(this, interval.value); - return false; // Don't retry immediately, the retry queue will - // take care of the retry. - } - else - { - return true; // Retry immediately. - } + + // + // Schedule the retry. Note that we always schedule the retry + // on the retry queue even if the invocation can be retried + // immediately. This is required because it might not be safe + // to retry from this thread (this is for instance called by + // finished(BasicStream) which is called with the connection + // locked. + // + _instance.retryQueue().add(this, interval.value); } catch(Ice.Exception ex) { |