diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 310 |
1 files changed, 132 insertions, 178 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 82e1b48d6aa..97d69ed297e 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -317,8 +317,8 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. { - assert(ICE_EXCEPTION_ISSET(_exception)); - ICE_RETHROW_EXCEPTION(_exception); + assert(_exception); + _exception->ice_throw(); } if(!initialize() || !validate()) @@ -339,8 +339,8 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) if(_state >= StateClosing) { - assert(ICE_EXCEPTION_ISSET(_exception)); - ICE_RETHROW_EXCEPTION(_exception); + assert(_exception); + _exception->ice_throw(); } } @@ -487,10 +487,10 @@ Ice::ConnectionI::throwException() const { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(ICE_EXCEPTION_ISSET(_exception)) + if(_exception) { assert(_state >= StateClosing); - ICE_RETHROW_EXCEPTION(_exception); + _exception->ice_throw(); } } @@ -622,9 +622,9 @@ Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, i // to send our request, we always try to send the request // again. // - if(ICE_EXCEPTION_ISSET(_exception)) + if(_exception) { - throw RetryException(*_exception.get()); + throw RetryException(*_exception); } assert(_state > StateNotValidated); @@ -684,8 +684,8 @@ Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, i catch(const LocalException& ex) { setState(StateClosed, ex); - assert(ICE_EXCEPTION_ISSET(_exception)); - ICE_RETHROW_EXCEPTION(_exception); + assert(_exception); + _exception->ice_throw(); } if(response) @@ -710,9 +710,9 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres // to send our request, we always try to send the request // again. // - if(ICE_EXCEPTION_ISSET(_exception)) + if(_exception) { - throw RetryException(*_exception.get()); + throw RetryException(*_exception); } assert(_state > StateNotValidated); assert(_state < StateClosing); @@ -772,8 +772,8 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres catch(const LocalException& ex) { setState(StateClosed, ex); - assert(ICE_EXCEPTION_ISSET(_exception)); - ICE_RETHROW_EXCEPTION(_exception); + assert(_exception); + _exception->ice_throw(); } if(response) @@ -802,7 +802,7 @@ Ice::ConnectionI::flushBatchRequests() std::function<void()> Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_ptr)> ex, - ::std::function<void(bool)> sent) + ::std::function<void(bool)> sent) { class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke { @@ -1234,8 +1234,8 @@ Ice::ConnectionI::sendResponse(Int, OutputStream* os, Byte compressFlag, bool /* if(_state >= StateClosed) { - assert(ICE_EXCEPTION_ISSET(_exception)); - ICE_RETHROW_EXCEPTION(_exception); + assert(_exception); + _exception->ice_throw(); } OutgoingMessage message(os, compressFlag > 0); @@ -1273,8 +1273,8 @@ Ice::ConnectionI::sendNoResponse() if(_state >= StateClosed) { - assert(ICE_EXCEPTION_ISSET(_exception)); - ICE_RETHROW_EXCEPTION(_exception); + assert(_exception); + _exception->ice_throw(); } if(_state == StateClosing && _dispatchCount == 0) @@ -1932,15 +1932,9 @@ Ice::ConnectionI::finish(bool close) { string verb = _connector ? "establish" : "accept"; Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); - try - { - ICE_RETHROW_EXCEPTION(_exception); - } - catch(const Ice::Exception& ex) - { - out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() - << "\n" << ex; - } + + out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() + << "\n" << *_exception; } } else @@ -1950,23 +1944,13 @@ Ice::ConnectionI::finish(bool close) Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); out << "closed " << _endpoint->protocol() << " connection\n" << toString(); - // - // Trace the cause of unexpected connection closures - // - try - { - ICE_RETHROW_EXCEPTION(_exception); - } - catch(const Ice::LocalException& ex) + if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || + dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || + dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || + dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()))) { - if(!(dynamic_cast<const CloseConnectionException*>(&ex) || - dynamic_cast<const ForcedCloseConnectionException*>(&ex) || - dynamic_cast<const ConnectionTimeoutException*>(&ex) || - dynamic_cast<const CommunicatorDestroyedException*>(&ex) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex))) - { - out << "\n" << ex; - } + out << "\n" << *_exception; } } } @@ -1986,14 +1970,9 @@ Ice::ConnectionI::finish(bool close) if(_startCallback) { - try - { - ICE_RETHROW_EXCEPTION(_exception); - } - catch(const LocalException& ex) - { - _startCallback->connectionStartFailed(shared_from_this(), ex); - } + assert(_exception); + + _startCallback->connectionStartFailed(shared_from_this(), *_exception); _startCallback = 0; } @@ -2033,50 +2012,39 @@ Ice::ConnectionI::finish(bool close) #endif } - try - { - ICE_RETHROW_EXCEPTION(_exception); - } - catch(const Ice::LocalException& ex) + + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { - for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) + o->completed(*_exception); + if(o->requestId) // Make sure finished isn't called twice. { - o->completed(ex); - if(o->requestId) // Make sure finished isn't called twice. + if(o->out) { - if(o->out) - { - _requests.erase(o->requestId); - } - else - { - _asyncRequests.erase(o->requestId); - } + _requests.erase(o->requestId); + } + else + { + _asyncRequests.erase(o->requestId); } } } + _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage } - try + + for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - ICE_RETHROW_EXCEPTION(_exception); + p->second->completed(*_exception); } - catch(const Ice::LocalException& ex) - { - for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - p->second->completed(ex); - } - _requests.clear(); + _requests.clear(); - for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) + for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) + { + if(q->second->exception(*_exception)) { - if(q->second->exception(ex)) - { - q->second->invokeException(); - } + q->second->invokeException(); } } @@ -2161,7 +2129,7 @@ Ice::ConnectionI::getInfo() const IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state >= StateClosed) { - ICE_RETHROW_EXCEPTION(_exception); + _exception->ice_throw(); } return initConnectionInfo(); } @@ -2172,7 +2140,7 @@ Ice::ConnectionI::setBufferSize(Ice::Int rcvSize, Ice::Int sndSize) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state >= StateClosed) { - ICE_RETHROW_EXCEPTION(_exception); + _exception->ice_throw(); } _transceiver->setBufferSize(rcvSize, sndSize); _info = 0; // Invalidate the cached connection info @@ -2299,13 +2267,13 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) return; } - if(!ICE_EXCEPTION_ISSET(_exception)) + if(!_exception) { // // If we are in closed state, an exception must be set. // assert(_state != StateClosed); - ICE_RESET_EXCEPTION(_exception, ex.ice_clone()); + ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone()); // // We don't warn if we are not validated. // @@ -2364,99 +2332,92 @@ Ice::ConnectionI::setState(State state) { switch(state) { - case StateNotInitialized: - { - assert(false); - break; - } - - case StateNotValidated: - { - if(_state != StateNotInitialized) + case StateNotInitialized: { - assert(_state == StateClosed); - return; + assert(false); + break; } - break; - } - case StateActive: - { - // - // Can only switch from holding or not validated to - // active. - // - if(_state != StateHolding && _state != StateNotValidated) + case StateNotValidated: { - return; + if(_state != StateNotInitialized) + { + assert(_state == StateClosed); + return; + } + break; } - _threadPool->_register(shared_from_this(), SocketOperationRead); - break; - } - case StateHolding: - { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) - { - return; - } - if(_state == StateActive) + case StateActive: { - _threadPool->unregister(shared_from_this(), SocketOperationRead); + // + // Can only switch from holding or not validated to + // active. + // + if(_state != StateHolding && _state != StateNotValidated) + { + return; + } + _threadPool->_register(shared_from_this(), SocketOperationRead); + break; } - break; - } - case StateClosing: - case StateClosingPending: - { - // - // Can't change back from closing pending. - // - if(_state >= StateClosingPending) + case StateHolding: { - return; + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) + { + return; + } + if(_state == StateActive) + { + _threadPool->unregister(shared_from_this(), SocketOperationRead); + } + break; } - break; - } - case StateClosed: - { - if(_state == StateFinished) + case StateClosing: + case StateClosingPending: { - return; + // + // Can't change back from closing pending. + // + if(_state >= StateClosingPending) + { + return; + } + break; } - try - { - ICE_RETHROW_EXCEPTION(_exception); - } - catch(const Ice::LocalException& ex) + case StateClosed: { - _batchRequestQueue->destroy(ex); + if(_state == StateFinished) + { + return; + } + + _batchRequestQueue->destroy(*_exception); + + // + // Don't need to close now for connections so only close the transceiver + // if the selector request it. + // + if(_threadPool->finish(shared_from_this(), false)) + { + _transceiver->close(); + } + break; } - // - // Don't need to close now for connections so only close the transceiver - // if the selector request it. - // - if(_threadPool->finish(shared_from_this(), false)) + case StateFinished: { - _transceiver->close(); + assert(_state == StateClosed); + _communicator = 0; + break; } - break; - } - - case StateFinished: - { - assert(_state == StateClosed); - _communicator = 0; - break; - } } } catch(const Ice::LocalException& ex) @@ -2498,23 +2459,16 @@ Ice::ConnectionI::setState(State state) newState, _observer.get())); } - if(_observer && state == StateClosed && ICE_EXCEPTION_ISSET(_exception)) + if(_observer && state == StateClosed && _exception) { - try + if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || + dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || + dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || + dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || + (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing))) { - ICE_RETHROW_EXCEPTION(_exception); - } - catch(const Ice::LocalException& ex) - { - if(!(dynamic_cast<const CloseConnectionException*>(&ex) || - dynamic_cast<const ForcedCloseConnectionException*>(&ex) || - dynamic_cast<const ConnectionTimeoutException*>(&ex) || - dynamic_cast<const CommunicatorDestroyedException*>(&ex) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) || - (dynamic_cast<const ConnectionLostException*>(&ex) && _state >= StateClosing))) - { - _observer->failed(ex.ice_id()); - } + _observer->failed(_exception->ice_id()); } } } @@ -2571,7 +2525,7 @@ Ice::ConnectionI::initiateShutdown() // // Notify the the transceiver of the graceful connection closure. // - SocketOperation op = _transceiver->closing(true, *_exception.get()); + SocketOperation op = _transceiver->closing(true, *_exception); if(op) { scheduleTimeout(op); @@ -2607,7 +2561,7 @@ Ice::ConnectionI::heartbeat() catch(const LocalException& ex) { setState(StateClosed, ex); - assert(ICE_EXCEPTION_ISSET(_exception)); + assert(_exception); } } } @@ -2917,7 +2871,7 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingMessage>& callbacks) if(_state == StateClosing && _shutdownInitiated) { setState(StateClosingPending); - SocketOperation op = _transceiver->closing(true, *_exception.get()); + SocketOperation op = _transceiver->closing(true, *_exception); if(op) { return op; @@ -3298,7 +3252,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request // // Notify the the transceiver of the graceful connection closure. // - SocketOperation op = _transceiver->closing(false, *_exception.get()); + SocketOperation op = _transceiver->closing(false, *_exception); if(op) { return op; |