diff options
author | Mark Spruiell <mes@zeroc.com> | 2017-01-30 13:45:21 -0800 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2017-01-30 13:45:21 -0800 |
commit | 61270a10f980933cf582edb766f10c8ac6d86e8a (patch) | |
tree | 45ab4a7c2986954054fce613bc3c8f7967e7951e /cpp/src/Ice | |
parent | Fix slice2cpp build failure (diff) | |
download | ice-61270a10f980933cf582edb766f10c8ac6d86e8a.tar.bz2 ice-61270a10f980933cf582edb766f10c8ac6d86e8a.tar.xz ice-61270a10f980933cf582edb766f10c8ac6d86e8a.zip |
merging IceBridge into master
Diffstat (limited to 'cpp/src/Ice')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 211 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 20 | ||||
-rw-r--r-- | cpp/src/Ice/Exception.cpp | 8 | ||||
-rw-r--r-- | cpp/src/Ice/ProxyFactory.cpp | 7 |
4 files changed, 213 insertions, 33 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 7845259855c..23388d399c6 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -409,29 +409,31 @@ Ice::ConnectionI::destroy(DestructionReason reason) } void -Ice::ConnectionI::close(bool force) +Ice::ConnectionI::close(ConnectionClose mode) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(force) + if(mode == CloseForcefully) { - setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); + setState(StateClosed, ConnectionManuallyClosedException(__FILE__, __LINE__, false)); + } + else if(mode == CloseGracefully) + { + setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true)); } else { + assert(mode == CloseGracefullyAndWait); + // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, the - // CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the server - // has processed them or not. + // Wait until all outstanding requests have been completed. // while(!_asyncRequests.empty()) { wait(); } - setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true)); } } @@ -550,13 +552,13 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) // // We send a heartbeat if there was no activity in the last // (timeout / 4) period. Sending a heartbeat sooner than really - // needed is safer to ensure that the receiver will receive in - // time the heartbeat. Sending the heartbeat if there was no + // needed is safer to ensure that the receiver will receive the + // heartbeat in time. Sending the heartbeat if there was no // activity in the last (timeout / 2) period isn't enough since // monitor() is called only every (timeout / 2) period. // // Note that this doesn't imply that we are sending 4 heartbeats - // per timeout period because the monitor() method is sill only + // per timeout period because the monitor() method is still only // called every (timeout / 2) period. // if(acm.heartbeat == HeartbeatAlways || @@ -564,7 +566,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) { if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0) { - heartbeat(); + sendHeartbeatNow(); } } @@ -802,6 +804,174 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) } #endif +namespace +{ + +const ::std::string __heartbeat_name = "heartbeat"; + +class HeartbeatAsync : public OutgoingAsyncBase +{ +public: + + HeartbeatAsync(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance) : + OutgoingAsyncBase(instance), + _communicator(communicator), + _connection(connection) + { + } + + virtual CommunicatorPtr getCommunicator() const + { + return _communicator; + } + + virtual ConnectionPtr getConnection() const + { + return _connection; + } + + virtual const string& getOperation() const + { + return __heartbeat_name; + } + + void invoke() + { + _observer.attach(_instance.get(), __heartbeat_name); + try + { + _os.write(magic[0]); + _os.write(magic[1]); + _os.write(magic[2]); + _os.write(magic[3]); + _os.write(currentProtocol); + _os.write(currentProtocolEncoding); + _os.write(validateConnectionMsg); + _os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). + _os.write(headerSize); // Message size. + _os.i = _os.b.begin(); + + AsyncStatus status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, 0); + if(status & AsyncStatusSent) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + invokeSent(); + } + } + } + catch(const RetryException& ex) + { + if(exception(*ex.get())) + { + invokeExceptionAsync(); + } + } + catch(const Exception& ex) + { + if(exception(ex)) + { + invokeExceptionAsync(); + } + } + } + +private: + + CommunicatorPtr _communicator; + ConnectionIPtr _connection; +}; +typedef IceUtil::Handle<HeartbeatAsync> HeartbeatAsyncPtr; + +} + +#ifdef ICE_CPP11_MAPPING +void +Ice::ConnectionI::heartbeat() +{ + Connection::heartbeatAsync().get(); +} + +std::function<void()> +Ice::ConnectionI::heartbeatAsync(::std::function<void(::std::exception_ptr)> ex, ::std::function<void(bool)> sent) +{ + class HeartbeatLambda : public HeartbeatAsync, public LambdaInvoke + { + public: + + HeartbeatLambda(std::shared_ptr<Ice::ConnectionI>&& connection, + std::shared_ptr<Ice::Communicator>& communicator, + const InstancePtr& instance, + std::function<void(std::exception_ptr)> ex, + std::function<void(bool)> sent) : + HeartbeatAsync(connection, communicator, instance), LambdaInvoke(std::move(ex), std::move(sent)) + { + } + }; + auto outAsync = make_shared<HeartbeatLambda>(ICE_SHARED_FROM_THIS, _communicator, _instance, ex, sent); + outAsync->invoke(); + return [outAsync]() { outAsync->cancel(); }; +} +#else +void +Ice::ConnectionI::heartbeat() +{ + end_heartbeat(begin_heartbeat()); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat() +{ + return _iceI_begin_heartbeat(dummyCallback, 0); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat(const CallbackPtr& cb, const LocalObjectPtr& cookie) +{ + return _iceI_begin_heartbeat(cb, cookie); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat(const Callback_Connection_heartbeatPtr& cb, const LocalObjectPtr& cookie) +{ + return _iceI_begin_heartbeat(cb, cookie); +} + +AsyncResultPtr +Ice::ConnectionI::_iceI_begin_heartbeat(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) +{ + class HeartbeatCallback : public HeartbeatAsync, public CallbackCompletion + { + public: + + HeartbeatCallback(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance, + const CallbackBasePtr& callback, + const LocalObjectPtr& cookie) : + HeartbeatAsync(connection, communicator, instance), + CallbackCompletion(callback, cookie) + { + _cookie = cookie; + } + }; + + HeartbeatAsyncPtr result = new HeartbeatCallback(this, _communicator, _instance, cb, cookie); + result->invoke(); + return result; +} + +void +Ice::ConnectionI::end_heartbeat(const AsyncResultPtr& r) +{ + AsyncResult::check(r, this, __heartbeat_name); + r->waitForResponse(); +} +#endif + void Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback) { @@ -1755,7 +1925,7 @@ Ice::ConnectionI::finish(bool close) out << "closed " << _endpoint->protocol() << " connection\n" << toString(); if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) || dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()))) @@ -2076,7 +2246,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) // Don't warn about certain expected exceptions. // if(!(dynamic_cast<const CloseConnectionException*>(&ex) || - dynamic_cast<const ForcedCloseConnectionException*>(&ex) || + dynamic_cast<const ConnectionManuallyClosedException*>(&ex) || dynamic_cast<const ConnectionTimeoutException*>(&ex) || dynamic_cast<const CommunicatorDestroyedException*>(&ex) || dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) || @@ -2255,7 +2425,7 @@ Ice::ConnectionI::setState(State state) if(_observer && state == StateClosed && _exception) { if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) || dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || @@ -2285,8 +2455,7 @@ Ice::ConnectionI::setState(State state) void Ice::ConnectionI::initiateShutdown() { - assert(_state == StateClosing); - assert(_dispatchCount == 0); + assert(_state == StateClosing && _dispatchCount == 0); if(_shutdownInitiated) { @@ -2316,7 +2485,7 @@ Ice::ConnectionI::initiateShutdown() setState(StateClosingPending); // - // Notify the the transceiver of the graceful connection closure. + // Notify the transceiver of the graceful connection closure. // SocketOperation op = _transceiver->closing(true, *_exception); if(op) @@ -2329,7 +2498,7 @@ Ice::ConnectionI::initiateShutdown() } void -Ice::ConnectionI::heartbeat() +Ice::ConnectionI::sendHeartbeatNow() { assert(_state == StateActive); @@ -3016,7 +3185,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request setState(StateClosingPending, CloseConnectionException(__FILE__, __LINE__)); // - // Notify the the transceiver of the graceful connection closure. + // Notify the transceiver of the graceful connection closure. // SocketOperation op = _transceiver->closing(false, *_exception); if(op) diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 72652518b94..7fb3781e880 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -157,12 +157,12 @@ public: void activate(); void hold(); void destroy(DestructionReason); - virtual void close(bool); // From Connection. + virtual void close(ConnectionClose); // From Connection. bool isActiveOrHolding() const; bool isFinished() const; - void throwException() const; // Throws the connection exception if destroyed. + virtual void throwException() const; // From Connection. Throws the connection exception if destroyed. void waitUntilHolding() const; void waitUntilFinished(); // Not const, as this might close the connection upon timeout. @@ -193,6 +193,19 @@ public: virtual void setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK)); virtual void setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK)); + virtual void heartbeat(); + +#ifdef ICE_CPP11_MAPPING + virtual std::function<void()> + heartbeatAsync(::std::function<void(::std::exception_ptr)>, ::std::function<void(bool)> = nullptr); +#else + virtual AsyncResultPtr begin_heartbeat(); + virtual AsyncResultPtr begin_heartbeat(const CallbackPtr&, const LocalObjectPtr& = 0); + virtual AsyncResultPtr begin_heartbeat(const Callback_Connection_heartbeatPtr&, const LocalObjectPtr& = 0); + + virtual void end_heartbeat(const AsyncResultPtr&); +#endif + virtual void setACM(const IceUtil::Optional<int>&, const IceUtil::Optional<ACMClose>&, const IceUtil::Optional<ACMHeartbeat>&); @@ -276,7 +289,7 @@ private: void setState(State); void initiateShutdown(); - void heartbeat(); + void sendHeartbeatNow(); bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone); bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone); @@ -308,6 +321,7 @@ private: #ifndef ICE_CPP11_MAPPING AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); + AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); #endif Ice::CommunicatorPtr _communicator; diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp index 4ac7d66cdd3..a20c564101e 100644 --- a/cpp/src/Ice/Exception.cpp +++ b/cpp/src/Ice/Exception.cpp @@ -632,14 +632,10 @@ Ice::CloseConnectionException::ice_print(ostream& out) const } void -Ice::ForcedCloseConnectionException::ice_print(ostream& out) const +Ice::ConnectionManuallyClosedException::ice_print(ostream& out) const { Exception::ice_print(out); - out << ":\nprotocol error: connection forcefully closed"; - if(!reason.empty()) - { - out << ":\n" << reason; - } + out << ":\nprotocol error: connection manually closed (" << (graceful ? "gracefully" : "forcefully") << ")"; } void diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp index eba949508b6..34503ec961b 100644 --- a/cpp/src/Ice/ProxyFactory.cpp +++ b/cpp/src/Ice/ProxyFactory.cpp @@ -201,11 +201,12 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co } // - // Don't retry if the communicator is destroyed or object adapter - // deactivated. + // Don't retry if the communicator is destroyed, object adapter is deactivated, + // or connection is manually closed. // if(dynamic_cast<const CommunicatorDestroyedException*>(&ex) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex)) + dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) || + dynamic_cast<const ConnectionManuallyClosedException*>(&ex)) { ex.ice_throw(); } |