diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 307 |
1 files changed, 105 insertions, 202 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 2c8688b7a00..48542bf5aae 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -296,9 +296,9 @@ Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex) } else if(outAsync) { - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompleted(); + outAsync->invokeException(); } } @@ -317,7 +317,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state >= StateClosed) // The connection might already be closed if the communicator was destroyed. { - assert(ICE_EXCEPTION_GET(_exception)); + assert(ICE_EXCEPTION_ISSET(_exception)); ICE_RETHROW_EXCEPTION(_exception); } @@ -339,7 +339,7 @@ Ice::ConnectionI::start(const StartCallbackPtr& callback) if(_state >= StateClosing) { - assert(ICE_EXCEPTION_GET(_exception)); + assert(ICE_EXCEPTION_ISSET(_exception)); ICE_RETHROW_EXCEPTION(_exception); } } @@ -487,7 +487,7 @@ Ice::ConnectionI::throwException() const { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(ICE_EXCEPTION_GET(_exception)) + if(ICE_EXCEPTION_ISSET(_exception)) { assert(_state >= StateClosing); ICE_RETHROW_EXCEPTION(_exception); @@ -622,7 +622,7 @@ Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, i // to send our request, we always try to send the request // again. // - if(ICE_EXCEPTION_GET(_exception)) + if(ICE_EXCEPTION_ISSET(_exception)) { #ifdef ICE_CPP11_MAPPING throw RetryException(_exception); @@ -688,7 +688,7 @@ Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, i catch(const LocalException& ex) { setState(StateClosed, ex); - assert(ICE_EXCEPTION_GET(_exception)); + assert(ICE_EXCEPTION_ISSET(_exception)); ICE_RETHROW_EXCEPTION(_exception); } @@ -714,7 +714,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres // to send our request, we always try to send the request // again. // - if(ICE_EXCEPTION_GET(_exception)) + if(ICE_EXCEPTION_ISSET(_exception)) { #ifdef ICE_CPP11_MAPPING throw RetryException(_exception); @@ -780,7 +780,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compres catch(const LocalException& ex) { setState(StateClosed, ex); - assert(ICE_EXCEPTION_GET(_exception)); + assert(ICE_EXCEPTION_ISSET(_exception)); ICE_RETHROW_EXCEPTION(_exception); } @@ -801,87 +801,40 @@ Ice::ConnectionI::getBatchRequestQueue() const return _batchRequestQueue; } +#ifdef ICE_CPP11_MAPPING void Ice::ConnectionI::flushBatchRequests() { - ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name); - out.invoke(); + Connection::flushBatchRequests_async().get(); } -#ifdef ICE_CPP11_MAPPING -function<void ()> -Ice::ConnectionI::flushBatchRequests_async(function<void (exception_ptr)> exception, - function<void (bool)> sent) +std::function<void ()> +Ice::ConnectionI::flushBatchRequests_async(::std::function<void (::std::exception_ptr)> ex, + ::std::function<void (bool)> sent) { - class FlushBatchRequestsCallback : public CallbackBase + class ConnectionFlushBatchLambda : public ConnectionFlushBatchAsync, public LambdaInvoke { public: - FlushBatchRequestsCallback(function<void (exception_ptr)> exception, - function<void (bool)> sent, - shared_ptr<Connection> connection) : - _exception(move(exception)), - _sent(move(sent)), - _connection(move(connection)) + ConnectionFlushBatchLambda(std::shared_ptr<Ice::ConnectionI>&& connection, + const InstancePtr& instance, + std::function<void (std::exception_ptr)> ex, + std::function<void (bool)> sent) : + ConnectionFlushBatchAsync(connection, instance), LambdaInvoke(std::move(ex), std::move(sent)) { } - - virtual void sent(const AsyncResultPtr& result) const - { - try - { - AsyncResult::__check(result, _connection.get(), __flushBatchRequests_name); - result->__wait(); - } - catch(const ::Ice::Exception&) - { - _exception(current_exception()); - } - - if(_sent) - { - _sent(result->sentSynchronously()); - } - } - - virtual bool hasSentCallback() const - { - return true; - } - - - virtual void - completed(const ::Ice::AsyncResultPtr& result) const - { - try - { - AsyncResult::__check(result, _connection.get(), __flushBatchRequests_name); - result->__wait(); - } - catch(const ::Ice::Exception&) - { - _exception(current_exception()); - } - } - - private: - - function<void (exception_ptr)> _exception; - function<void (bool)> _sent; - shared_ptr<Connection> _connection; }; - - auto self = dynamic_pointer_cast<ConnectionI>(shared_from_this()); - - auto result = make_shared<ConnectionFlushBatchAsync>(self, _communicator, _instance, __flushBatchRequests_name, - make_shared<FlushBatchRequestsCallback>(move(exception), move(sent), self)); - result->invoke(); - return [result]() - { - result->cancel(); - }; + auto outAsync = make_shared<ConnectionFlushBatchLambda>(shared_from_this(), _instance, ex, sent); + outAsync->invoke(__flushBatchRequests_name); + return [outAsync]() { outAsync->cancel(); }; } #else +void +Ice::ConnectionI::flushBatchRequests() +{ + ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name); + out.invoke(); +} AsyncResultPtr Ice::ConnectionI::begin_flushBatchRequests() @@ -905,14 +858,47 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR AsyncResultPtr Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) { - ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync( - shared_from_this(), - _communicator, - _instance, - __flushBatchRequests_name, - cb, - cookie); - result->invoke(); + class ConnectionFlushBatchAsyncWithCallback : public ConnectionFlushBatchAsync, public CallbackCompletion + { + public: + + ConnectionFlushBatchAsyncWithCallback(const Ice::ConnectionIPtr& connection, + const Ice::CommunicatorPtr& communicator, + const InstancePtr& instance, + const CallbackBasePtr& callback, + const Ice::LocalObjectPtr& cookie) : + ConnectionFlushBatchAsync(connection, instance), + CallbackCompletion(callback, cookie), + _communicator(communicator), + _connection(connection) + { + _cookie = cookie; + } + + virtual Ice::CommunicatorPtr getCommunicator() const + { + return _communicator; + } + + virtual Ice::ConnectionPtr getConnection() const + { + return _connection; + } + + virtual const std::string& + getOperation() const + { + return __flushBatchRequests_name; + } + + private: + + Ice::CommunicatorPtr _communicator; + Ice::ConnectionPtr _connection; + }; + + ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsyncWithCallback(this, _communicator, _instance, cb, cookie); + result->invoke(__flushBatchRequests_name); return result; } @@ -925,21 +911,14 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) #endif void -#ifdef ICE_CPP11_MAPPING -Ice::ConnectionI::setHeartbeatCallback(std::function<void (std::shared_ptr<::Ice::Connection>)> callback) -#else -Ice::ConnectionI::setHeartbeatCallback(const Ice::HeartbeatCallbackPtr& callback) -#endif +Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); _heartbeatCallback = callback; } + void -#ifdef ICE_CPP11_MAPPING -Ice::ConnectionI::setCloseCallback(std::function<void (std::shared_ptr<::Ice::Connection>)> callback) -#else -Ice::ConnectionI::setCloseCallback(const Ice::CloseCallbackPtr& callback) -#endif +Ice::ConnectionI::setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK) callback) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_state >= StateClosed) @@ -949,15 +928,12 @@ Ice::ConnectionI::setCloseCallback(const Ice::CloseCallbackPtr& callback) class CallbackWorkItem : public DispatchWorkItem { public: -#ifdef ICE_CPP11_MAPPING - CallbackWorkItem(const ConnectionIPtr& connection, - std::function<void (std::shared_ptr<Ice::Connection>)> callback) : + + CallbackWorkItem(const ConnectionIPtr& connection, ICE_IN(ICE_CLOSE_CALLBACK) callback) : _connection(connection), +#ifdef ICE_CPP11_MAPPING _callback(move(callback)) #else - CallbackWorkItem(const ConnectionIPtr& connection, - const Ice::CloseCallbackPtr& callback) : - _connection(connection), _callback(callback) #endif { @@ -1192,9 +1168,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con o->canceled(false); _sendStreams.erase(o); } - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } } return; @@ -1215,9 +1191,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con { _asyncRequests.erase(_asyncRequestsHint); _asyncRequestsHint = _asyncRequests.end(); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } } return; @@ -1236,9 +1212,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con { assert(p != _asyncRequestsHint); _asyncRequests.erase(p); - if(outAsync->completed(ex)) + if(outAsync->exception(ex)) { - outAsync->invokeCompletedAsync(); + outAsync->invokeExceptionAsync(); } } return; @@ -1266,7 +1242,7 @@ Ice::ConnectionI::sendResponse(Int, OutputStream* os, Byte compressFlag, bool /* if(_state >= StateClosed) { - assert(ICE_EXCEPTION_GET(_exception)); + assert(ICE_EXCEPTION_ISSET(_exception)); ICE_RETHROW_EXCEPTION(_exception); } @@ -1305,7 +1281,7 @@ Ice::ConnectionI::sendNoResponse() if(_state >= StateClosed) { - assert(ICE_EXCEPTION_GET(_exception)); + assert(ICE_EXCEPTION_ISSET(_exception)); ICE_RETHROW_EXCEPTION(_exception); } @@ -1828,10 +1804,10 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess } if(p->receivedReply) { - OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, p->outAsync); - if(outAsync->completed()) + OutgoingAsyncPtr o = ICE_DYNAMIC_CAST(OutgoingAsync, p->outAsync); + if(o->response()) { - outAsync->invokeCompleted(); + o->invokeResponse(); } } #else @@ -1847,7 +1823,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(outAsync) { - outAsync->invokeCompleted(); + outAsync->invokeResponse(); ++dispatchedCount; } @@ -1964,20 +1940,15 @@ Ice::ConnectionI::finish(bool close) { string verb = _connector ? "establish" : "accept"; Trace out(_instance->initializationData().logger, _instance->traceLevels()->networkCat); -#ifdef ICE_CPP11_MAPPING try { - rethrow_exception(_exception); + ICE_RETHROW_EXCEPTION(_exception); } catch(const Ice::Exception& ex) { out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() << "\n" << ex; } -#else - out << "failed to " << verb << " " << _endpoint->protocol() << " connection\n" << toString() - << "\n" << *_exception.get(); -#endif } } else @@ -1990,10 +1961,9 @@ Ice::ConnectionI::finish(bool close) // // Trace the cause of unexpected connection closures // -#ifdef ICE_CPP11_MAPPING try { - rethrow_exception(_exception); + ICE_RETHROW_EXCEPTION(_exception); } catch(const Ice::LocalException& ex) { @@ -2006,16 +1976,6 @@ Ice::ConnectionI::finish(bool close) out << "\n" << ex; } } -#else - if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || - dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || - dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()))) - { - out << "\n" << *_exception.get(); - } -#endif } } @@ -2026,18 +1986,14 @@ Ice::ConnectionI::finish(bool close) if(_startCallback) { -#ifdef ICE_CPP11_MAPPING try { - rethrow_exception(_exception); + ICE_RETHROW_EXCEPTION(_exception); } catch(const LocalException& ex) { _startCallback->connectionStartFailed(shared_from_this(), ex); } -#else - _startCallback->connectionStartFailed(shared_from_this(), *_exception.get()); -#endif _startCallback = 0; } @@ -2067,9 +2023,9 @@ Ice::ConnectionI::finish(bool close) if(message->receivedReply) { OutgoingAsyncPtr outAsync = ICE_DYNAMIC_CAST(OutgoingAsync, message->outAsync); - if(outAsync->completed()) + if(outAsync->response()) { - outAsync->invokeCompleted(); + outAsync->invokeResponse(); } } _sendStreams.pop_front(); @@ -2077,10 +2033,9 @@ Ice::ConnectionI::finish(bool close) #endif } -#ifdef ICE_CPP11_MAPPING try { - rethrow_exception(_exception); + ICE_RETHROW_EXCEPTION(_exception); } catch(const Ice::LocalException& ex) { @@ -2100,30 +2055,12 @@ Ice::ConnectionI::finish(bool close) } } } -#else - for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) - { - o->completed(*_exception.get()); - if(o->requestId) // Make sure finished isn't called twice. - { - if(o->out) - { - _requests.erase(o->requestId); - } - else - { - _asyncRequests.erase(o->requestId); - } - } - } -#endif _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage } -#ifdef ICE_CPP11_MAPPING try { - rethrow_exception(_exception); + ICE_RETHROW_EXCEPTION(_exception); } catch(const Ice::LocalException& ex) { @@ -2136,28 +2073,12 @@ Ice::ConnectionI::finish(bool close) for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - if(q->second->completed(ex)) + if(q->second->exception(ex)) { - q->second->invokeCompleted(); + q->second->invokeException(); } } } -#else - for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) - { - p->second->completed(*_exception.get()); - } - - _requests.clear(); - - for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) - { - if(q->second->completed(*_exception.get())) - { - q->second->invokeCompleted(); - } - } -#endif _asyncRequests.clear(); @@ -2378,7 +2299,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) return; } - if(!ICE_EXCEPTION_GET(_exception)) + if(!ICE_EXCEPTION_ISSET(_exception)) { // // If we are in closed state, an exception must be set. @@ -2510,18 +2431,15 @@ Ice::ConnectionI::setState(State state) return; } -#ifdef ICE_CPP11_MAPPING try { - rethrow_exception(_exception); + ICE_RETHROW_EXCEPTION(_exception); } catch(const Ice::LocalException& ex) { _batchRequestQueue->destroy(ex); } -#else - _batchRequestQueue->destroy(*_exception.get()); -#endif + // // Don't need to close now for connections so only close the transceiver // if the selector request it. @@ -2580,12 +2498,11 @@ Ice::ConnectionI::setState(State state) newState, _observer.get())); } -#ifdef ICE_CPP11_MAPPING - if(_observer && state == StateClosed && _exception) + if(_observer && state == StateClosed && ICE_EXCEPTION_ISSET(_exception)) { try { - rethrow_exception(_exception); + ICE_RETHROW_EXCEPTION(_exception); } catch(const Ice::LocalException& ex) { @@ -2600,20 +2517,6 @@ Ice::ConnectionI::setState(State state) } } } -#else - if(_observer && state == StateClosed && _exception.get()) - { - if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || - dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || - dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || - (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state >= StateClosing))) - { - _observer->failed(_exception->ice_id()); - } - } -#endif } _state = state; @@ -2708,7 +2611,7 @@ Ice::ConnectionI::heartbeat() catch(const LocalException& ex) { setState(StateClosed, ex); - assert(ICE_EXCEPTION_GET(_exception)); + assert(ICE_EXCEPTION_ISSET(_exception)); } } } @@ -3540,7 +3443,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request message->receivedReply = true; outAsync = 0; } - else if(outAsync->completed()) + else if(outAsync->response()) { ++dispatchCount; } @@ -3549,7 +3452,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request outAsync = 0; } #else - if(outAsync->completed()) + if(outAsync->response()) { ++dispatchCount; } |