diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 109 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 2 | ||||
-rw-r--r-- | cpp/src/Ice/Instance.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 83 |
5 files changed, 133 insertions, 73 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 452511591a5..95c151ff6e2 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -527,9 +527,9 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) } } - if(outAsync) + if(outAsync && outAsync->__finished()) { - outAsync->__finished(); + outAsync->__invokeCompleted(); } _adapter->decDirectCount(); } diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index b42b8558a7f..fcf05ef9db0 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1601,6 +1601,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) ObjectAdapterPtr adapter; OutgoingAsyncPtr outAsync; ConnectionCallbackPtr heartbeatCallback; + int dispatchCount = 0; ThreadPoolMessage<ConnectionI> msg(current, *this); @@ -1760,7 +1761,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) swap(_startCallback, startCB); if(startCB) { - ++_dispatchCount; + ++dispatchCount; } } } @@ -1781,7 +1782,8 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) servantManager, adapter, outAsync, - heartbeatCallback)); + heartbeatCallback, + dispatchCount)); } if(readyOp & SocketOperationWrite) @@ -1789,7 +1791,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) newOp = static_cast<SocketOperation>(newOp | sendNextMessage(sentCBs)); if(!sentCBs.empty()) { - ++_dispatchCount; + ++dispatchCount; } } @@ -1801,9 +1803,23 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) if(!readyOp) { + assert(dispatchCount == 0); return; } } + + if(_acmLastActivity != IceUtil::Time()) + { + _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); + } + + if(dispatchCount == 0) + { + return; // Nothing to dispatch we're done! + } + + _dispatchCount += dispatchCount; + io.completed(); } catch(const DatagramLimitException&) // Expected. { @@ -1841,11 +1857,6 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) } return; } - if(_acmLastActivity != IceUtil::Time()) - { - _acmLastActivity = IceUtil::Time::now(IceUtil::Time::Monotonic); - } - io.completed(); } if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher. @@ -1867,7 +1878,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) { - int count = 0; + int dispatchedCount = 0; // // Notify the factory that the connection establishment and @@ -1876,7 +1887,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess if(startCB) { startCB->connectionStartCompleted(this); - ++count; + ++dispatchedCount; } // @@ -1893,13 +1904,17 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess } if(p->receivedReply) { - OutgoingAsyncPtr::dynamicCast(p->outAsync)->__finished(); + OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync); + if(outAsync->__finished()) + { + outAsync->__invokeCompleted(); + } } #else p->outAsync->__invokeSent(); #endif } - ++count; + ++dispatchedCount; } // @@ -1908,8 +1923,8 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(outAsync) { - outAsync->__finished(); - ++count; + outAsync->__invokeCompleted(); + ++dispatchedCount; } if(heartbeatCallback) @@ -1928,7 +1943,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess Error out(_instance->initializationData().logger); out << "connection callback exception:\nunknown c++ exception" << '\n' << _desc; } - ++count; + ++dispatchedCount; } // @@ -1949,10 +1964,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // // Decrease dispatch count. // - if(count > 0) + if(dispatchedCount > 0) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - _dispatchCount -= count; + _dispatchCount -= dispatchedCount; if(_dispatchCount == 0) { // @@ -2046,7 +2061,11 @@ Ice::ConnectionI::finish() } if(message->receivedReply) { - OutgoingAsyncPtr::dynamicCast(message->outAsync)->__finished(); + OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync); + if(outAsync->__finished()) + { + outAsync->__invokeCompleted(); + } } _sendStreams.pop_front(); } @@ -2896,27 +2915,25 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) _observer.finishWrite(_writeStream); } } + + // + // If all the messages were sent and we are in the closing state, we schedule + // the close timeout to wait for the peer to close the connection. + // + if(_state == StateClosing && _shutdownInitiated) + { + setState(StateClosingPending); + SocketOperation op = _transceiver->closing(true, *_exception.get()); + if(op) + { + return op; + } + } } catch(const Ice::LocalException& ex) { setState(StateClosed, ex); - return SocketOperationNone; - } - - // - // If all the messages were sent and we are in the closing state, we schedule - // the close timeout to wait for the peer to close the connection. - // - if(_state == StateClosing && _dispatchCount == 0) - { - setState(StateClosingPending); - SocketOperation op = _transceiver->closing(true, *_exception.get()); - if(op) - { - return op; - } } - return SocketOperationNone; } @@ -3215,7 +3232,8 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback) + OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3301,7 +3319,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request invokeNum = 1; servantManager = _servantManager; adapter = _adapter; - ++_dispatchCount; + ++dispatchCount; } break; } @@ -3324,7 +3342,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request } servantManager = _servantManager; adapter = _adapter; - _dispatchCount += invokeNum; + dispatchCount += invokeNum; } break; } @@ -3410,12 +3428,23 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request message->receivedReply = true; outAsync = 0; } + else if(outAsync->__finished()) + { + ++dispatchCount; + } else { - ++_dispatchCount; + outAsync = 0; } #else - ++_dispatchCount; + if(outAsync->__finished()) + { + ++dispatchCount; + } + else + { + outAsync = 0; + } #endif notifyAll(); // Notify threads blocked in close(false) } @@ -3429,7 +3458,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request if(_callback) { heartbeatCallback = _callback; - ++_dispatchCount; + ++dispatchCount; } break; } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index b7254df51ba..d9ff91c5f53 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -268,7 +268,7 @@ private: #endif IceInternal::SocketOperation parseMessage(IceInternal::BasicStream&, Int&, Int&, Byte&, IceInternal::ServantManagerPtr&, ObjectAdapterPtr&, - IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&); + IceInternal::OutgoingAsyncPtr&, ConnectionCallbackPtr&, int&); void invokeAll(IceInternal::BasicStream&, Int, Int, Byte, const IceInternal::ServantManagerPtr&, const ObjectAdapterPtr&); diff --git a/cpp/src/Ice/Instance.cpp b/cpp/src/Ice/Instance.cpp index a223683cc6b..f4b0d89c90c 100644 --- a/cpp/src/Ice/Instance.cpp +++ b/cpp/src/Ice/Instance.cpp @@ -1531,6 +1531,7 @@ IceInternal::Instance::destroy() ThreadPoolPtr serverThreadPool; ThreadPoolPtr clientThreadPool; EndpointHostResolverPtr endpointHostResolver; + IceUtil::TimerPtr timer; { IceUtil::RecMutex::Lock sync(*this); @@ -1557,8 +1558,7 @@ IceInternal::Instance::destroy() if(_timer) { - _timer->destroy(); - _timer = 0; + std::swap(_timer, timer); } if(_servantFactoryManager) @@ -1610,6 +1610,10 @@ IceInternal::Instance::destroy() // // Join with the thread pool threads outside the synchronization. // + if(timer) + { + timer->destroy(); + } if(clientThreadPool) { clientThreadPool->joinWithAllThreads(); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index c9d0ac28234..509ba18ad06 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -608,12 +608,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc) // try { - if(!handleException(exc)) // This will throw if the invocation can't be retried. - { - return; // Can't be retried immediately. - } - - __invoke(false); // Retry the invocation + handleException(exc); } catch(const Ice::Exception& ex) { @@ -640,9 +635,14 @@ IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex) AsyncResult::__invokeExceptionAsync(ex); } -void +bool IceInternal::OutgoingAsync::__finished() { + // + // NOTE: this method is called from ConnectionI.parseMessage + // with the connection locked. Therefore, it must not invoke + // any user callbacks. + // assert(_proxy->ice_isTwoway()); // Can only be called for twoways. Ice::Byte replyStatus; @@ -789,15 +789,43 @@ IceInternal::OutgoingAsync::__finished() _state |= OK; } _monitor.notifyAll(); + + if(!_callback) + { + _observer.detach(); + return false; + } + return true; } - catch(const LocalException& ex) + catch(const LocalException& exc) { - __finished(ex); - return; - } + // + // We don't call finished(exc) here because we don't want + // to invoke the completion callback. The completion + // callback is invoked by the connection is this method + // returns true. + // + try + { + handleException(exc); + return false; // Invocation will be retried. + } + catch(const Ice::Exception& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation + _exception.reset(ex.ice_clone()); + _monitor.notifyAll(); - assert(replyStatus == replyOK || replyStatus == replyUserException); - __invokeCompleted(); + if(!_callback) + { + _observer.detach(); + return false; + } + return true; + } + } } bool @@ -851,39 +879,38 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous) } } } - break; } catch(const RetryException&) { _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. + continue; } catch(const Ice::Exception& ex) { - if(!handleException(ex)) // This will throw if the invocation can't be retried. - { - break; // Can't be retried immediately. - } + handleException(ex); } + break; } return _sentSynchronously; } -bool +void IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) { try { int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); _observer.retried(); // Invocation is being retried. - if(interval > 0) - { - _instance->retryQueue()->add(this, interval); - return false; // Don't retry immediately, the retry queue will take care of the retry. - } - else - { - return true; // Retry immediately. - } + + // + // Schedule the retry. Note that we always schedule the retry + // on the retry queue even if the invocation can be retried + // immediately. This is required because it might not be safe + // to retry from this thread (this is for instance called by + // finished(BasicStream) which is called with the connection + // locked. + // + _instance->retryQueue()->add(this, interval); } catch(const Ice::Exception& ex) { |