diff options
Diffstat (limited to 'cpp')
33 files changed, 1638 insertions, 184 deletions
diff --git a/cpp/include/Ice/ConnectionAsync.h b/cpp/include/Ice/ConnectionAsync.h index bc3f6064525..58f54cc3492 100644 --- a/cpp/include/Ice/ConnectionAsync.h +++ b/cpp/include/Ice/ConnectionAsync.h @@ -112,6 +112,99 @@ newCallback_Connection_flushBatchRequests(T* instance, void (T::*excb)(const ::I return new Callback_Connection_flushBatchRequests<T, CT>(instance, excb, sentcb); } +template<class T> +class CallbackNC_Connection_heartbeat : public Callback_Connection_heartbeat_Base, + public ::IceInternal::OnewayCallbackNC<T> +{ +public: + + typedef IceUtil::Handle<T> TPtr; + + typedef void (T::*Exception)(const ::Ice::Exception&); + typedef void (T::*Sent)(bool); + + CallbackNC_Connection_heartbeat(const TPtr& obj, Exception excb, Sent sentcb) + : ::IceInternal::OnewayCallbackNC<T>(obj, 0, excb, sentcb) + { + } + + virtual void completed(const ::Ice::AsyncResultPtr& __result) const + { + ::Ice::ConnectionPtr __con = __result->getConnection(); + assert(__con); + try + { + __con->end_heartbeat(__result); + assert(false); + } + catch(const ::Ice::Exception& ex) + { + ::IceInternal::CallbackNC<T>::exception(__result, ex); + } + } +}; + +template<class T> Callback_Connection_heartbeatPtr +newCallback_Connection_heartbeat(const IceUtil::Handle<T>& instance, + void (T::*excb)(const ::Ice::Exception&), + void (T::*sentcb)(bool) = 0) +{ + return new CallbackNC_Connection_heartbeat<T>(instance, excb, sentcb); +} + +template<class T> Callback_Connection_heartbeatPtr +newCallback_Connection_heartbeat(T* instance, void (T::*excb)(const ::Ice::Exception&), void (T::*sentcb)(bool) = 0) +{ + return new CallbackNC_Connection_heartbeat<T>(instance, excb, sentcb); +} + +template<class T, typename CT> +class Callback_Connection_heartbeat : public Callback_Connection_heartbeat_Base, + public ::IceInternal::OnewayCallback<T, CT> +{ +public: + + typedef IceUtil::Handle<T> TPtr; + + typedef void (T::*Exception)(const ::Ice::Exception& , const CT&); + typedef void (T::*Sent)(bool , const CT&); + + Callback_Connection_heartbeat(const TPtr& obj, Exception excb, Sent sentcb) + : ::IceInternal::OnewayCallback<T, CT>(obj, 0, excb, sentcb) + { + } + + virtual void completed(const ::Ice::AsyncResultPtr& __result) const + { + ::Ice::ConnectionPtr __con = __result->getConnection(); + assert(__con); + try + { + __con->end_heartbeat(__result); + assert(false); + } + catch(const ::Ice::Exception& ex) + { + ::IceInternal::Callback<T, CT>::exception(__result, ex); + } + } +}; + +template<class T, typename CT> Callback_Connection_heartbeatPtr +newCallback_Connection_heartbeat(const IceUtil::Handle<T>& instance, + void (T::*excb)(const ::Ice::Exception&, const CT&), + void (T::*sentcb)(bool, const CT&) = 0) +{ + return new Callback_Connection_heartbeat<T, CT>(instance, excb, sentcb); +} + +template<class T, typename CT> Callback_Connection_heartbeatPtr +newCallback_Connection_heartbeat(T* instance, void (T::*excb)(const ::Ice::Exception&, const CT&), + void (T::*sentcb)(bool, const CT&) = 0) +{ + return new Callback_Connection_heartbeat<T, CT>(instance, excb, sentcb); +} + } #endif diff --git a/cpp/src/Glacier2/RoutingTable.cpp b/cpp/src/Glacier2/RoutingTable.cpp index fde938fba00..e6b090de4cb 100644 --- a/cpp/src/Glacier2/RoutingTable.cpp +++ b/cpp/src/Glacier2/RoutingTable.cpp @@ -65,7 +65,7 @@ Glacier2::RoutingTable::add(const ObjectProxySeq& unfiltered, const Current& cur if(!_verifier->verify(*prx)) { - current.con->close(true); + current.con->close(CloseForcefully); throw ObjectNotExistException(__FILE__, __LINE__); } ObjectPrx proxy = (*prx)->ice_twoway()->ice_secure(false)->ice_facet(""); // We add proxies in default form. diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp index 9f774b79a8e..512d371611f 100644 --- a/cpp/src/Glacier2/SessionRouterI.cpp +++ b/cpp/src/Glacier2/SessionRouterI.cpp @@ -80,7 +80,7 @@ public: // Close the connection otherwise the peer has no way to know that // the session has gone. // - _connection->close(true); + _connection->close(CloseForcefully); _router->destroySession(_connection); } } @@ -922,7 +922,7 @@ SessionRouterI::refreshSession(const Ice::ConnectionPtr& con) // Close the connection otherwise the peer has no way to know that the // session has gone. // - con->close(true); + con->close(CloseForcefully); throw SessionNotExistException(); } } @@ -1149,10 +1149,10 @@ SessionRouterI::getRouterImpl(const ConnectionPtr& connection, const Ice::Identi if(_rejectTraceLevel >= 1) { Trace out(_instance->logger(), "Glacier2"); - out << "rejecting request. no session is associated with the connection.\n"; - out << "identity: " << _instance->communicator()->identityToString(id); + out << "rejecting request, no session is associated with the connection.\n"; + out << "identity: " << identityToString(id); } - connection->close(true); + connection->close(CloseForcefully); throw ObjectNotExistException(__FILE__, __LINE__); } return 0; diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 7845259855c..23388d399c6 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -409,29 +409,31 @@ Ice::ConnectionI::destroy(DestructionReason reason) } void -Ice::ConnectionI::close(bool force) +Ice::ConnectionI::close(ConnectionClose mode) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(force) + if(mode == CloseForcefully) { - setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); + setState(StateClosed, ConnectionManuallyClosedException(__FILE__, __LINE__, false)); + } + else if(mode == CloseGracefully) + { + setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true)); } else { + assert(mode == CloseGracefullyAndWait); + // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, the - // CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the server - // has processed them or not. + // Wait until all outstanding requests have been completed. // while(!_asyncRequests.empty()) { wait(); } - setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true)); } } @@ -550,13 +552,13 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) // // We send a heartbeat if there was no activity in the last // (timeout / 4) period. Sending a heartbeat sooner than really - // needed is safer to ensure that the receiver will receive in - // time the heartbeat. Sending the heartbeat if there was no + // needed is safer to ensure that the receiver will receive the + // heartbeat in time. Sending the heartbeat if there was no // activity in the last (timeout / 2) period isn't enough since // monitor() is called only every (timeout / 2) period. // // Note that this doesn't imply that we are sending 4 heartbeats - // per timeout period because the monitor() method is sill only + // per timeout period because the monitor() method is still only // called every (timeout / 2) period. // if(acm.heartbeat == HeartbeatAlways || @@ -564,7 +566,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) { if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0) { - heartbeat(); + sendHeartbeatNow(); } } @@ -802,6 +804,174 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) } #endif +namespace +{ + +const ::std::string __heartbeat_name = "heartbeat"; + +class HeartbeatAsync : public OutgoingAsyncBase +{ +public: + + HeartbeatAsync(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance) : + OutgoingAsyncBase(instance), + _communicator(communicator), + _connection(connection) + { + } + + virtual CommunicatorPtr getCommunicator() const + { + return _communicator; + } + + virtual ConnectionPtr getConnection() const + { + return _connection; + } + + virtual const string& getOperation() const + { + return __heartbeat_name; + } + + void invoke() + { + _observer.attach(_instance.get(), __heartbeat_name); + try + { + _os.write(magic[0]); + _os.write(magic[1]); + _os.write(magic[2]); + _os.write(magic[3]); + _os.write(currentProtocol); + _os.write(currentProtocolEncoding); + _os.write(validateConnectionMsg); + _os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). + _os.write(headerSize); // Message size. + _os.i = _os.b.begin(); + + AsyncStatus status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, 0); + if(status & AsyncStatusSent) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + invokeSent(); + } + } + } + catch(const RetryException& ex) + { + if(exception(*ex.get())) + { + invokeExceptionAsync(); + } + } + catch(const Exception& ex) + { + if(exception(ex)) + { + invokeExceptionAsync(); + } + } + } + +private: + + CommunicatorPtr _communicator; + ConnectionIPtr _connection; +}; +typedef IceUtil::Handle<HeartbeatAsync> HeartbeatAsyncPtr; + +} + +#ifdef ICE_CPP11_MAPPING +void +Ice::ConnectionI::heartbeat() +{ + Connection::heartbeatAsync().get(); +} + +std::function<void()> +Ice::ConnectionI::heartbeatAsync(::std::function<void(::std::exception_ptr)> ex, ::std::function<void(bool)> sent) +{ + class HeartbeatLambda : public HeartbeatAsync, public LambdaInvoke + { + public: + + HeartbeatLambda(std::shared_ptr<Ice::ConnectionI>&& connection, + std::shared_ptr<Ice::Communicator>& communicator, + const InstancePtr& instance, + std::function<void(std::exception_ptr)> ex, + std::function<void(bool)> sent) : + HeartbeatAsync(connection, communicator, instance), LambdaInvoke(std::move(ex), std::move(sent)) + { + } + }; + auto outAsync = make_shared<HeartbeatLambda>(ICE_SHARED_FROM_THIS, _communicator, _instance, ex, sent); + outAsync->invoke(); + return [outAsync]() { outAsync->cancel(); }; +} +#else +void +Ice::ConnectionI::heartbeat() +{ + end_heartbeat(begin_heartbeat()); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat() +{ + return _iceI_begin_heartbeat(dummyCallback, 0); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat(const CallbackPtr& cb, const LocalObjectPtr& cookie) +{ + return _iceI_begin_heartbeat(cb, cookie); +} + +AsyncResultPtr +Ice::ConnectionI::begin_heartbeat(const Callback_Connection_heartbeatPtr& cb, const LocalObjectPtr& cookie) +{ + return _iceI_begin_heartbeat(cb, cookie); +} + +AsyncResultPtr +Ice::ConnectionI::_iceI_begin_heartbeat(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) +{ + class HeartbeatCallback : public HeartbeatAsync, public CallbackCompletion + { + public: + + HeartbeatCallback(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance, + const CallbackBasePtr& callback, + const LocalObjectPtr& cookie) : + HeartbeatAsync(connection, communicator, instance), + CallbackCompletion(callback, cookie) + { + _cookie = cookie; + } + }; + + HeartbeatAsyncPtr result = new HeartbeatCallback(this, _communicator, _instance, cb, cookie); + result->invoke(); + return result; +} + +void +Ice::ConnectionI::end_heartbeat(const AsyncResultPtr& r) +{ + AsyncResult::check(r, this, __heartbeat_name); + r->waitForResponse(); +} +#endif + void Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback) { @@ -1755,7 +1925,7 @@ Ice::ConnectionI::finish(bool close) out << "closed " << _endpoint->protocol() << " connection\n" << toString(); if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) || dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()))) @@ -2076,7 +2246,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) // Don't warn about certain expected exceptions. // if(!(dynamic_cast<const CloseConnectionException*>(&ex) || - dynamic_cast<const ForcedCloseConnectionException*>(&ex) || + dynamic_cast<const ConnectionManuallyClosedException*>(&ex) || dynamic_cast<const ConnectionTimeoutException*>(&ex) || dynamic_cast<const CommunicatorDestroyedException*>(&ex) || dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) || @@ -2255,7 +2425,7 @@ Ice::ConnectionI::setState(State state) if(_observer && state == StateClosed && _exception) { if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) || dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || @@ -2285,8 +2455,7 @@ Ice::ConnectionI::setState(State state) void Ice::ConnectionI::initiateShutdown() { - assert(_state == StateClosing); - assert(_dispatchCount == 0); + assert(_state == StateClosing && _dispatchCount == 0); if(_shutdownInitiated) { @@ -2316,7 +2485,7 @@ Ice::ConnectionI::initiateShutdown() setState(StateClosingPending); // - // Notify the the transceiver of the graceful connection closure. + // Notify the transceiver of the graceful connection closure. // SocketOperation op = _transceiver->closing(true, *_exception); if(op) @@ -2329,7 +2498,7 @@ Ice::ConnectionI::initiateShutdown() } void -Ice::ConnectionI::heartbeat() +Ice::ConnectionI::sendHeartbeatNow() { assert(_state == StateActive); @@ -3016,7 +3185,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request setState(StateClosingPending, CloseConnectionException(__FILE__, __LINE__)); // - // Notify the the transceiver of the graceful connection closure. + // Notify the transceiver of the graceful connection closure. // SocketOperation op = _transceiver->closing(false, *_exception); if(op) diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 72652518b94..7fb3781e880 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -157,12 +157,12 @@ public: void activate(); void hold(); void destroy(DestructionReason); - virtual void close(bool); // From Connection. + virtual void close(ConnectionClose); // From Connection. bool isActiveOrHolding() const; bool isFinished() const; - void throwException() const; // Throws the connection exception if destroyed. + virtual void throwException() const; // From Connection. Throws the connection exception if destroyed. void waitUntilHolding() const; void waitUntilFinished(); // Not const, as this might close the connection upon timeout. @@ -193,6 +193,19 @@ public: virtual void setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK)); virtual void setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK)); + virtual void heartbeat(); + +#ifdef ICE_CPP11_MAPPING + virtual std::function<void()> + heartbeatAsync(::std::function<void(::std::exception_ptr)>, ::std::function<void(bool)> = nullptr); +#else + virtual AsyncResultPtr begin_heartbeat(); + virtual AsyncResultPtr begin_heartbeat(const CallbackPtr&, const LocalObjectPtr& = 0); + virtual AsyncResultPtr begin_heartbeat(const Callback_Connection_heartbeatPtr&, const LocalObjectPtr& = 0); + + virtual void end_heartbeat(const AsyncResultPtr&); +#endif + virtual void setACM(const IceUtil::Optional<int>&, const IceUtil::Optional<ACMClose>&, const IceUtil::Optional<ACMHeartbeat>&); @@ -276,7 +289,7 @@ private: void setState(State); void initiateShutdown(); - void heartbeat(); + void sendHeartbeatNow(); bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone); bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone); @@ -308,6 +321,7 @@ private: #ifndef ICE_CPP11_MAPPING AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); + AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&); #endif Ice::CommunicatorPtr _communicator; diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp index 4ac7d66cdd3..a20c564101e 100644 --- a/cpp/src/Ice/Exception.cpp +++ b/cpp/src/Ice/Exception.cpp @@ -632,14 +632,10 @@ Ice::CloseConnectionException::ice_print(ostream& out) const } void -Ice::ForcedCloseConnectionException::ice_print(ostream& out) const +Ice::ConnectionManuallyClosedException::ice_print(ostream& out) const { Exception::ice_print(out); - out << ":\nprotocol error: connection forcefully closed"; - if(!reason.empty()) - { - out << ":\n" << reason; - } + out << ":\nprotocol error: connection manually closed (" << (graceful ? "gracefully" : "forcefully") << ")"; } void diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp index eba949508b6..34503ec961b 100644 --- a/cpp/src/Ice/ProxyFactory.cpp +++ b/cpp/src/Ice/ProxyFactory.cpp @@ -201,11 +201,12 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co } // - // Don't retry if the communicator is destroyed or object adapter - // deactivated. + // Don't retry if the communicator is destroyed, object adapter is deactivated, + // or connection is manually closed. // if(dynamic_cast<const CommunicatorDestroyedException*>(&ex) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex)) + dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) || + dynamic_cast<const ConnectionManuallyClosedException*>(&ex)) { ex.ice_throw(); } diff --git a/cpp/src/IceBridge/IceBridge.cpp b/cpp/src/IceBridge/IceBridge.cpp new file mode 100644 index 00000000000..31ec58aae6c --- /dev/null +++ b/cpp/src/IceBridge/IceBridge.cpp @@ -0,0 +1,837 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <Ice/Connection.h> +#include <Ice/ObjectAdapter.h> +#include <Ice/Service.h> +#include <Ice/UUID.h> +#include <IceUtil/Options.h> + +using namespace std; +using namespace Ice; + +namespace +{ + +// +// Represents a pending twoway invocation. +// +class Invocation : public IceUtil::Shared +{ +public: + + Invocation(const AMD_Object_ice_invokePtr& cb) : + _cb(cb) + { + } + + void success(bool ok, const pair<const Byte*, const Byte*>& results) + { + _cb->ice_response(ok, results); + } + + void exception(const Exception& ex) + { + _cb->ice_exception(ex); + } + +private: + + AMD_Object_ice_invokePtr _cb; +}; +typedef IceUtil::Handle<Invocation> InvocationPtr; + +// +// Represents a pending oneway invocation. +// +class OnewayInvocation : public IceUtil::Shared +{ +public: + + OnewayInvocation(const AMD_Object_ice_invokePtr& cb) : + _cb(cb) + { + } + + void success(bool, const pair<const Byte*, const Byte*>&) + { + assert(false); + } + + void exception(const Exception& ex) + { + _cb->ice_exception(ex); + } + + void sent(bool sentSynchronously) + { + _cb->ice_response(true, vector<Byte>()); + } + +private: + + AMD_Object_ice_invokePtr _cb; +}; +typedef IceUtil::Handle<OnewayInvocation> OnewayInvocationPtr; + +// +// Holds information about an incoming invocation that's been queued until an outgoing connection has +// been established. +// +struct QueuedInvocation : public IceUtil::Shared +{ + QueuedInvocation(const AMD_Object_ice_invokePtr& c, const pair<const Byte*, const Byte*>& p, const Current& curr) : + cb(c), current(curr) + { + if(p.first) + { + // + // The pointers in paramData refer to the Ice marshaling buffer and won't remain valid after + // ice_invoke_async completes, so we have to make a copy of the parameter data. + // + vector<Byte> tmp(p.first, p.second); + paramData.swap(tmp); + } + } + + const AMD_Object_ice_invokePtr cb; + vector<Byte> paramData; + const Current current; +}; +typedef IceUtil::Handle<QueuedInvocation> QueuedInvocationPtr; +typedef vector<QueuedInvocationPtr> InvocationList; + +// +// Relays heartbeat messages. +// +class HeartbeatCallbackI : public HeartbeatCallback +{ +public: + + HeartbeatCallbackI(const ConnectionPtr& con) : + _connection(con) + { + } + + virtual void heartbeat(const ConnectionPtr&) + { + // + // When a connection receives a heartbeat message, we send one over its corresponding connection. + // + try + { + _connection->begin_heartbeat(); + } + catch(...) + { + } + } + +private: + + const ConnectionPtr _connection; +}; + +// +// Represents a pair of bridged connections. +// +class BridgeConnection : public IceUtil::Shared +{ +public: + + BridgeConnection(const ObjectAdapterPtr& adapter, const ObjectPrx& target, const ConnectionPtr& incoming) : + _adapter(adapter), _target(target), _incoming(incoming), _closed(false) + { + } + + void setOutgoing(const ConnectionPtr& outgoing) + { + if(outgoing) + { + InvocationList queuedInvocations; + + { + IceUtil::Mutex::Lock lock(_lock); + + assert(!_outgoing); + + if(_closed) + { + // + // The incoming connection is already closed. There's no point in leaving the outgoing + // connection open. + // + outgoing->close(CloseGracefully); + } + else + { + _outgoing = outgoing; + + // + // Save the queued invocations. + // + queuedInvocations.swap(_queue); + + // + // Register hearbeat callbacks on both connections. + // + _incoming->setHeartbeatCallback(new HeartbeatCallbackI(_outgoing)); + _outgoing->setHeartbeatCallback(new HeartbeatCallbackI(_incoming)); + + // + // Configure the outgoing connection for bidirectional requests. + // + _outgoing->setAdapter(_adapter); + } + } + + // + // Flush any queued invocations. + // + flush(outgoing, queuedInvocations); + } + else + { + IceUtil::Mutex::Lock lock(_lock); + + // + // The outgoing connection failed so we close the incoming connection. closed() will eventually + // be called for it. + // + if(_incoming) + { + _incoming->close(CloseGracefully); + } + } + } + + void closed(const ConnectionPtr& con) + { + InvocationList queuedInvocations; + ConnectionPtr toBeClosed; + + { + IceUtil::Mutex::Lock lock(_lock); + + if(con == _incoming) + { + // + // The incoming connection was closed. + // + toBeClosed = _outgoing; + queuedInvocations.swap(_queue); + _closed = true; + _incoming = 0; + } + else + { + assert(con == _outgoing); + + // + // An outgoing connection was closed. + // + toBeClosed = _incoming; + _outgoing = 0; + } + } + + // + // Close the corresponding connection. + // + if(toBeClosed) + { + // + // Examine the exception that caused the connection to be closed. A CloseConnectionException + // indicates the connection was closed gracefully and we do the same. We also look for + // ConnectionManuallyClosedException, which indicates the connection was closed locally. + // We close forcefully for any other exception. + // + try + { + con->throwException(); + } + catch(const Ice::CloseConnectionException&) + { + toBeClosed->close(CloseGracefully); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + // + // Connection was manually closed by the bridge. + // + toBeClosed->close(ex.graceful ? CloseGracefully : CloseForcefully); + } + catch(...) + { + toBeClosed->close(CloseForcefully); + } + } + + // + // Even though the connection is already closed, we still need to "complete" the pending invocations so + // that the connection's dispatch count is updated correctly. + // + for(InvocationList::iterator p = queuedInvocations.begin(); p != queuedInvocations.end(); ++p) + { + (*p)->cb->ice_exception(ConnectionLostException(__FILE__, __LINE__)); + } + } + + void dispatch(const AMD_Object_ice_invokePtr& cb, const pair<const Byte*, const Byte*>& paramData, + const Current& current) + { + // + // We've received an invocations, either from the client via the incoming connection, or from + // the server via the outgoing (bidirectional) connection. The current.con member tells us + // the connection over which the request arrived. + // + + ConnectionPtr dest; + + { + IceUtil::Mutex::Lock lock(_lock); + + if(_closed) + { + // + // If the incoming connection has already closed, we're done. + // + cb->ice_exception(ConnectionLostException(__FILE__, __LINE__)); + } + else if(!_outgoing) + { + // + // Queue the invocation until the outgoing connection is established. + // + _queue.push_back(new QueuedInvocation(cb, paramData, current)); + } + else if(current.con == _incoming) + { + // + // Forward to the outgoing connection. + // + dest = _outgoing; + } + else + { + // + // Forward to the incoming connection. + // + assert(current.con == _outgoing); + dest = _incoming; + } + } + + if(dest) + { + send(dest, cb, paramData, current); + } + } + +private: + + void flush(const ConnectionPtr& outgoing, const InvocationList& invocations) + { + for(InvocationList::const_iterator p = invocations.begin(); p != invocations.end(); ++p) + { + QueuedInvocationPtr i = *p; + + pair<const Byte*, const Byte*> paramData; + if(i->paramData.empty()) + { + paramData.first = paramData.second = 0; + } + else + { + paramData.first = &i->paramData[0]; + paramData.second = paramData.first + i->paramData.size(); + } + + send(outgoing, i->cb, paramData, i->current); + } + } + + void send(const ConnectionPtr& dest, const AMD_Object_ice_invokePtr& cb, + const pair<const Byte*, const Byte*>& paramData, const Current& current) + { + try + { + // + // Create a proxy having the same identity as the request. + // + ObjectPrx prx = dest->createProxy(current.id); + + // + // Examine the proxy and the request to determine whether it should be forwarded as a oneway or a twoway. + // + if(!prx->ice_isTwoway() || !current.requestId) + { + if(prx->ice_isTwoway()) + { + prx = prx->ice_oneway(); + } + OnewayInvocationPtr i = new OnewayInvocation(cb); + Callback_Object_ice_invokePtr d = + newCallback_Object_ice_invoke(i, &OnewayInvocation::success, &OnewayInvocation::exception, + &OnewayInvocation::sent); + prx->begin_ice_invoke(current.operation, current.mode, paramData, current.ctx, d); + } + else + { + InvocationPtr i = new Invocation(cb); + Callback_Object_ice_invokePtr d = + newCallback_Object_ice_invoke(i, &Invocation::success, &Invocation::exception); + prx->begin_ice_invoke(current.operation, current.mode, paramData, current.ctx, d); + } + } + catch(const std::exception& ex) + { + cb->ice_exception(ex); + } + } + + const ObjectAdapterPtr _adapter; + const ObjectPrx _target; + ConnectionPtr _incoming; + + IceUtil::Mutex _lock; + + bool _closed; + ConnectionPtr _outgoing; + + // + // We maintain our own queue for invocations that arrive on the incoming connection before the outgoing + // connection has been established. We don't want to forward these to proxies and let the proxies handle + // the queuing because then the invocations could be sent out of order (e.g., when invocations are split + // among twoway/oneway/datagram proxies). + // + InvocationList _queue; +}; +typedef IceUtil::Handle<BridgeConnection> BridgeConnectionPtr; + +// +// Allows the bridge to be used as an Ice router. +// +class RouterI : public Router +{ +public: + + virtual ObjectPrx getClientProxy(const Current&) const + { + return 0; + } + + virtual ObjectPrx getServerProxy(const Current&) const + { + return 0; + } + + virtual ObjectProxySeq addProxies(const ObjectProxySeq&, const Current&) + { + return ObjectProxySeq(); + } +}; + +class FinderI : public RouterFinder +{ +public: + + FinderI(const RouterPrx& router) : + _router(router) + { + } + + virtual RouterPrx getRouter(const Current&) + { + return _router; + } + +private: + + const RouterPrx _router; +}; + +class BridgeI; +typedef IceUtil::Handle<BridgeI> BridgeIPtr; + +class CloseCallbackI : public CloseCallback +{ +public: + + CloseCallbackI(const BridgeIPtr& bridge) : + _bridge(bridge) + { + } + + virtual void closed(const ConnectionPtr&); + +private: + + const BridgeIPtr _bridge; +}; + +class GetConnectionCallback : public IceUtil::Shared +{ +public: + + GetConnectionCallback(const BridgeIPtr& bridge, const BridgeConnectionPtr& bc) : + _bridge(bridge), _bc(bc) + { + } + + void success(const ConnectionPtr&); + void exception(const Exception&); + +private: + + const BridgeIPtr _bridge; + const BridgeConnectionPtr _bc; +}; +typedef IceUtil::Handle<GetConnectionCallback> GetConnectionCallbackPtr; + +// +// The main bridge servant. +// +class BridgeI : public Ice::BlobjectArrayAsync +{ +public: + + BridgeI(const ObjectAdapterPtr& adapter, const ObjectPrx& target) : + _adapter(adapter), _target(target) + { + } + + virtual void ice_invoke_async(const AMD_Object_ice_invokePtr& cb, + const std::pair<const Byte*, const Byte*>& paramData, + const Current& current) + { + BridgeConnectionPtr bc; + + { + IceUtil::Mutex::Lock lock(_lock); + + ConnectionMap::iterator p = _connections.find(current.con); + if(p == _connections.end()) + { + // + // The connection is unknown to us, it must be a new incoming connection. + // + + EndpointInfoPtr info = current.con->getEndpoint()->getInfo(); + + // + // Create a target proxy that matches the configuration of the incoming connection. + // + ObjectPrx target; + if(info->datagram()) + { + target = _target->ice_datagram(); + } + else if(info->secure()) + { + target = _target->ice_secure(true); + } + else + { + target = _target; + } + + // + // Force the proxy to establish a new connection by using a unique connection ID. + // + target = target->ice_connectionId(Ice::generateUUID()); + + bc = new BridgeConnection(_adapter, target, current.con); + _connections.insert(make_pair(current.con, bc)); + current.con->setCloseCallback(new CloseCallbackI(this)); + + // + // Try to establish the outgoing connection. + // + try + { + // + // Begin the connection establishment process asynchronously. This can take a while to complete, + // especially when using Bluetooth. + // + GetConnectionCallbackPtr gc = new GetConnectionCallback(this, bc); + Callback_Object_ice_getConnectionPtr d = + newCallback_Object_ice_getConnection(gc, &GetConnectionCallback::success, + &GetConnectionCallback::exception); + target->begin_ice_getConnection(d); + } + catch(const Exception& ex) + { + cb->ice_exception(ex); + return; + } + } + else + { + bc = p->second; + } + } + + // + // Delegate the invocation to the BridgeConnection object. + // + bc->dispatch(cb, paramData, current); + } + + void closed(const ConnectionPtr& con) + { + // + // Notify the BridgeConnection that a connection has closed. We also need to remove it from our map. + // + + BridgeConnectionPtr bc; + + { + IceUtil::Mutex::Lock lock(_lock); + + ConnectionMap::iterator p = _connections.find(con); + assert(p != _connections.end()); + bc = p->second; + _connections.erase(p); + } + + bc->closed(con); + } + + void outgoingSuccess(const BridgeConnectionPtr& bc, const ConnectionPtr& outgoing) + { + // + // An outgoing connection was established. Notify the BridgeConnection object. + // + IceUtil::Mutex::Lock lock(_lock); + _connections.insert(make_pair(outgoing, bc)); + outgoing->setCloseCallback(new CloseCallbackI(this)); + bc->setOutgoing(outgoing); + } + + void outgoingException(const BridgeConnectionPtr& bc, const Exception& ex) + { + // + // An outgoing connection attempt failed. Notify the BridgeConnection object. + // + bc->setOutgoing(0); + } + +private: + + const ObjectAdapterPtr _adapter; + const ObjectPrx _target; + + IceUtil::Mutex _lock; + + typedef map<ConnectionPtr, BridgeConnectionPtr> ConnectionMap; + ConnectionMap _connections; +}; + +void +CloseCallbackI::closed(const ConnectionPtr& con) +{ + _bridge->closed(con); +} + +void +GetConnectionCallback::success(const ConnectionPtr& outgoing) +{ + _bridge->outgoingSuccess(_bc, outgoing); +} + +void +GetConnectionCallback::exception(const Exception& ex) +{ + _bridge->outgoingException(_bc, ex); +} + +class BridgeService : public Service +{ +public: + + BridgeService(); + +protected: + + virtual bool start(int, char*[], int&); + virtual bool stop(); + virtual CommunicatorPtr initializeCommunicator(int&, char*[], const InitializationData&); + +private: + + void usage(const std::string&); +}; + +} + +BridgeService::BridgeService() +{ +} + +bool +BridgeService::start(int argc, char* argv[], int& status) +{ + IceUtilInternal::Options opts; + opts.addOpt("h", "help"); + opts.addOpt("v", "version"); + + vector<string> args; + try + { + args = opts.parse(argc, const_cast<const char**>(argv)); + } + catch(const IceUtilInternal::BadOptException& e) + { + error(e.reason); + usage(argv[0]); + return false; + } + + if(opts.isSet("help")) + { + usage(argv[0]); + status = EXIT_SUCCESS; + return false; + } + if(opts.isSet("version")) + { + print(ICE_STRING_VERSION); + status = EXIT_SUCCESS; + return false; + } + + if(!args.empty()) + { + cerr << argv[0] << ": too many arguments" << endl; + usage(argv[0]); + return false; + } + + PropertiesPtr properties = communicator()->getProperties(); + + const string targetProperty = "IceBridge.Target.Endpoints"; + const string targetEndpoints = properties->getProperty(targetProperty); + if(targetEndpoints.empty()) + { + error("property '" + targetProperty + "' is not set"); + return false; + } + + Ice::ObjectPrx target; + + try + { + target = communicator()->stringToProxy("dummy:" + targetEndpoints); + } + catch(const std::exception& ex) + { + ServiceError err(this); + err << "setting for target endpoints '" << targetEndpoints << "' is invalid:\n" << ex; + return false; + } + + // + // Initialize the object adapter. + // + const string endpointsProperty = "IceBridge.Source.Endpoints"; + if(properties->getProperty(endpointsProperty).empty()) + { + error("property '" + endpointsProperty + "' is not set"); + return false; + } + + ObjectAdapterPtr adapter = communicator()->createObjectAdapter("IceBridge.Source"); + + adapter->addDefaultServant(new BridgeI(adapter, target), ""); + + if(properties->getPropertyAsIntWithDefault("IceBridge.Router", 0) > 0) + { + RouterPrx router = RouterPrx::uncheckedCast(adapter->add(new RouterI, stringToIdentity("IceBridge/router"))); + adapter->add(new FinderI(router), stringToIdentity("Ice/RouterFinder")); + } + + try + { + adapter->activate(); + } + catch(const std::exception& ex) + { + { + ServiceError err(this); + err << "caught exception activating object adapter\n" << ex; + } + + stop(); + return false; + } + + return true; +} + +bool +BridgeService::stop() +{ + return true; +} + +CommunicatorPtr +BridgeService::initializeCommunicator(int& argc, char* argv[], const InitializationData& initializationData) +{ + InitializationData initData = initializationData; + initData.properties = createProperties(argc, argv, initializationData.properties); + + StringSeq args = argsToStringSeq(argc, argv); + args = initData.properties->parseCommandLineOptions("IceBridge", args); + stringSeqToArgs(args, argc, argv); + + // + // Disable automatic retry by default. + // + if(initData.properties->getProperty("Ice.RetryIntervals").empty()) + { + initData.properties->setProperty("Ice.RetryIntervals", "-1"); + } + + return Service::initializeCommunicator(argc, argv, initData); +} + +void +BridgeService::usage(const string& appName) +{ + string options = + "Options:\n" + "-h, --help Show this message.\n" + "-v, --version Display the Ice version.\n"; +#ifndef _WIN32 + options.append( + "--daemon Run as a daemon.\n" + "--pidfile FILE Write process ID into FILE.\n" + "--noclose Do not close open file descriptors.\n" + "--nochdir Do not change the current working directory.\n" + ); +#endif + print("Usage: " + appName + " [options]\n" + options); +} + +#ifdef _WIN32 + +int +wmain(int argc, wchar_t* argv[]) + +#else + +int +main(int argc, char* argv[]) + +#endif +{ + BridgeService svc; + return svc.main(argc, argv); +} diff --git a/cpp/src/IceBridge/Makefile.mk b/cpp/src/IceBridge/Makefile.mk new file mode 100644 index 00000000000..b3c282b0c39 --- /dev/null +++ b/cpp/src/IceBridge/Makefile.mk @@ -0,0 +1,15 @@ +# ********************************************************************** +# +# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. +# +# This copy of Ice is licensed to you under the terms described in the +# ICE_LICENSE file included in this distribution. +# +# ********************************************************************** + +$(project)_programs := icebridge +$(project)_dependencies := Ice + +icebridge_targetdir := $(bindir) + +projects += $(project) diff --git a/cpp/src/IceGrid/SessionManager.cpp b/cpp/src/IceGrid/SessionManager.cpp index 34caf1f9bd2..57ded43015f 100644 --- a/cpp/src/IceGrid/SessionManager.cpp +++ b/cpp/src/IceGrid/SessionManager.cpp @@ -58,7 +58,7 @@ SessionManager::findAllQueryObjects(bool cached) { try { - connection->close(false); + connection->close(Ice::CloseGracefullyAndWait); } catch(const Ice::LocalException&) { diff --git a/cpp/test/Ice/acm/AllTests.cpp b/cpp/test/Ice/acm/AllTests.cpp index 52c4922d98b..8e6563ede85 100644 --- a/cpp/test/Ice/acm/AllTests.cpp +++ b/cpp/test/Ice/acm/AllTests.cpp @@ -535,6 +535,32 @@ public: } }; +class HeartbeatManualTest : public TestCase +{ +public: + + HeartbeatManualTest(const RemoteCommunicatorPrxPtr& com) : TestCase("manual heartbeats", com) + { + // + // Disable heartbeats. + // + setClientACM(10, -1, 0); + setServerACM(10, -1, 0); + } + + virtual void runTestCase(const RemoteObjectAdapterPrxPtr& adapter, const TestIntfPrxPtr& proxy) + { + proxy->startHeartbeatCount(); + Ice::ConnectionPtr con = proxy->ice_getConnection(); + con->heartbeat(); + con->heartbeat(); + con->heartbeat(); + con->heartbeat(); + con->heartbeat(); + proxy->waitForHeartbeatCount(5); + } +}; + class SetACMTest : public TestCase { public: @@ -564,8 +590,9 @@ public: test(acm.close == Ice::CloseOnInvocationAndIdle); test(acm.heartbeat == Ice::HeartbeatAlways); - // Make sure the client sends few heartbeats to the server - proxy->waitForHeartbeat(2); + // Make sure the client sends a few heartbeats to the server. + proxy->startHeartbeatCount(); + proxy->waitForHeartbeatCount(2); } }; @@ -591,6 +618,7 @@ allTests(const Ice::CommunicatorPtr& communicator) tests.push_back(ICE_MAKE_SHARED(HeartbeatOnIdleTest, com)); tests.push_back(ICE_MAKE_SHARED(HeartbeatAlwaysTest, com)); + tests.push_back(ICE_MAKE_SHARED(HeartbeatManualTest, com)); tests.push_back(ICE_MAKE_SHARED(SetACMTest, com)); for(vector<TestCasePtr>::const_iterator p = tests.begin(); p != tests.end(); ++p) diff --git a/cpp/test/Ice/acm/Test.ice b/cpp/test/Ice/acm/Test.ice index 5ab98180dd3..d78abd6eb0f 100644 --- a/cpp/test/Ice/acm/Test.ice +++ b/cpp/test/Ice/acm/Test.ice @@ -17,7 +17,8 @@ interface TestIntf void sleep(int seconds); void sleepAndHold(int seconds); void interruptSleep(); - void waitForHeartbeat(int count); + void startHeartbeatCount(); + void waitForHeartbeatCount(int count); }; interface RemoteObjectAdapter diff --git a/cpp/test/Ice/acm/TestI.cpp b/cpp/test/Ice/acm/TestI.cpp index d7742680a70..f4987856421 100644 --- a/cpp/test/Ice/acm/TestI.cpp +++ b/cpp/test/Ice/acm/TestI.cpp @@ -26,41 +26,6 @@ toString(int value) return os.str(); } -class HeartbeatCallbackI : -#ifdef ICE_CPP11_MAPPING - public enable_shared_from_this<HeartbeatCallbackI>, -#else - public Ice::HeartbeatCallback, -#endif - private IceUtil::Monitor<IceUtil::Mutex> -{ -public: - - void - waitForCount(int count) - { - Lock sync(*this); - _count = count; - while(_count > 0) - { - wait(); - } - } - - virtual void - heartbeat(const Ice::ConnectionPtr&) - { - Lock sync(*this); - --_count; - notifyAll(); - } - -private: - - int _count; -}; -ICE_DEFINE_PTR(HeartbeatCallbackIPtr, HeartbeatCallbackI); - } RemoteObjectAdapterPrxPtr @@ -162,16 +127,23 @@ TestI::interruptSleep(const Ice::Current& current) } void -TestI::waitForHeartbeat(int count, const Ice::Current& current) +TestI::startHeartbeatCount(const Ice::Current& current) { - HeartbeatCallbackIPtr callback = ICE_MAKE_SHARED(HeartbeatCallbackI); + _callback = ICE_MAKE_SHARED(HeartbeatCallbackI); #ifdef ICE_CPP11_MAPPING + HeartbeatCallbackIPtr callback = _callback; current.con->setHeartbeatCallback([callback](Ice::ConnectionPtr connection) { callback->heartbeat(move(connection)); }); #else - current.con->setHeartbeatCallback(callback); + current.con->setHeartbeatCallback(_callback); #endif - callback->waitForCount(count); +} + +void +TestI::waitForHeartbeatCount(int count, const Ice::Current&) +{ + assert(_callback); + _callback->waitForCount(count); } diff --git a/cpp/test/Ice/acm/TestI.h b/cpp/test/Ice/acm/TestI.h index be031e3799d..c45415022a6 100644 --- a/cpp/test/Ice/acm/TestI.h +++ b/cpp/test/Ice/acm/TestI.h @@ -44,7 +44,51 @@ public: virtual void sleep(int, const Ice::Current&); virtual void sleepAndHold(int, const Ice::Current&); virtual void interruptSleep(const Ice::Current&); - virtual void waitForHeartbeat(int, const Ice::Current&); + virtual void startHeartbeatCount(const Ice::Current&); + virtual void waitForHeartbeatCount(int, const Ice::Current&); + +private: + + class HeartbeatCallbackI : +#ifdef ICE_CPP11_MAPPING + public std::enable_shared_from_this<HeartbeatCallbackI>, +#else + public Ice::HeartbeatCallback, +#endif + private IceUtil::Monitor<IceUtil::Mutex> + { + public: + + HeartbeatCallbackI() : + _count(0) + { + } + + void + waitForCount(int count) + { + Lock sync(*this); + while(_count < count) + { + wait(); + } + } + + virtual void + heartbeat(const Ice::ConnectionPtr&) + { + Lock sync(*this); + ++_count; + notifyAll(); + } + + private: + + int _count; + }; + ICE_DEFINE_PTR(HeartbeatCallbackIPtr, HeartbeatCallbackI); + + HeartbeatCallbackIPtr _callback; }; #endif diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index 439f8a7792f..3366c4fdc7e 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -773,6 +773,21 @@ private: }; typedef IceUtil::Handle<FlushExCallback> FlushExCallbackPtr; +class CloseCallback : virtual public CallbackBase, virtual public Ice::CloseCallback +{ +public: + + CloseCallback() + { + } + + virtual void closed(const Ice::ConnectionPtr& con) + { + called(); + } +}; +typedef IceUtil::Handle<CloseCallback> CloseCallbackPtr; + class Thrower : public CallbackBase { public: @@ -1850,7 +1865,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); auto b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); auto id = this_thread::get_id(); promise<void> promise; @@ -1924,7 +1939,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) auto b1 = Ice::uncheckedCast<Test::TestIntfPrx>( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); promise<void> promise; b1->ice_getConnection()->flushBatchRequestsAsync( @@ -2002,7 +2017,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) auto b1 = Ice::uncheckedCast<Test::TestIntfPrx>( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); promise<void> promise; auto id = this_thread::get_id(); @@ -2072,8 +2087,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); - b2->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait); promise<void> promise; auto id = this_thread::get_id(); @@ -2161,8 +2176,38 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) if(p->ice_getConnection() && protocol != "bt") { - cout << "testing close connection with sending queue... " << flush; + cout << "testing graceful close connection with wait... " << flush; + { + // + // Local case: begin several requests, close the connection gracefully, and make sure it waits + // for the requests to complete. + // + vector<future<void>> results; + for(int i = 0; i < 3; ++i) + { + auto s = make_shared<promise<void>>(); + p->sleepAsync(50, + [s]() { s->set_value(); }, + [s](exception_ptr ex) { s->set_exception(ex); }); + results.push_back(s->get_future()); + } + p->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + for(vector<future<void>>::iterator p = results.begin(); p != results.end(); ++p) + { + try + { + p->get(); + } + catch(const Ice::LocalException&) + { + test(false); + } + } + } { + // + // Remote case. + // Ice::ByteSeq seq; seq.resize(1024 * 10); for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q) @@ -2191,7 +2236,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) results.push_back(s->get_future()); } atomic_flag sent = ATOMIC_FLAG_INIT; - p->closeAsync(false, nullptr, nullptr, [&sent](bool) { sent.test_and_set(); }); + p->closeAsync(Test::CloseMode::CloseGracefullyAndWait, nullptr, nullptr, + [&sent](bool) { sent.test_and_set(); }); if(!sent.test_and_set()) { for(int i = 0; i < maxQueue; i++) @@ -2231,8 +2277,111 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) } } cout << "ok" << endl; - } + cout << "testing graceful close connection without wait... " << flush; + { + // + // Local case: start a lengthy operation and then close the connection gracefully on the client side + // without waiting for the pending invocation to complete. There will be no retry and we expect the + // invocation to fail with ConnectionManuallyClosedException. + // + // This test requires two threads in the server's thread pool: one will block in sleep() and the other + // will process the CloseConnection message. + // + p->ice_ping(); + auto con = p->ice_getConnection(); + auto s = make_shared<promise<void>>(); + p->sleepAsync(100, + [s]() { s->set_value(); }, + [s](exception_ptr ex) { s->set_exception(ex); }); + future<void> f = s->get_future(); + con->close(Ice::ConnectionClose::CloseGracefully); + try + { + f.get(); + test(false); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + test(ex.graceful); + } + + // + // Remote case: the server closes the connection gracefully. Our call to TestIntf::close() + // completes successfully and then the connection should be closed immediately afterward, + // despite the fact that there's a pending call to sleep(). The call to sleep() should be + // automatically retried and complete successfully with a new connection. + // + p->ice_ping(); + con = p->ice_getConnection(); + auto sc = make_shared<promise<void>>(); + con->setCloseCallback( + [sc](Ice::ConnectionPtr connection) + { + sc->set_value(); + }); + future<void> fc = sc->get_future(); + s = make_shared<promise<void>>(); + p->sleepAsync(100, + [s]() { s->set_value(); }, + [s](exception_ptr ex) { s->set_exception(ex); }); + f = s->get_future(); + p->close(Test::CloseMode::CloseGracefully); + fc.get(); + try + { + f.get(); + } + catch(const Ice::LocalException&) + { + test(false); + } + p->ice_ping(); + test(p->ice_getConnection() != con); + } + cout << "ok" << endl; + + cout << "testing forceful close connection... " << flush; + { + // + // Local case: start a lengthy operation and then close the connection forcefully on the client side. + // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException. + // + p->ice_ping(); + auto con = p->ice_getConnection(); + auto s = make_shared<promise<void>>(); + p->sleepAsync(100, + [s]() { s->set_value(); }, + [s](exception_ptr ex) { s->set_exception(ex); }); + future<void> f = s->get_future(); + con->close(Ice::ConnectionClose::CloseForcefully); + try + { + f.get(); + test(false); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + test(!ex.graceful); + } + + // + // Remote case: the server closes the connection forcefully. This causes the request to fail + // with a ConnectionLostException. Since the close() operation is not idempotent, the client + // will not retry. + // + try + { + p->close(Test::CloseMode::CloseForcefully); + test(false); + } + catch(const Ice::ConnectionLostException&) + { + // Expected. + } + } + cout << "ok" << endl; + } } p->shutdown(); @@ -2944,7 +3093,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); Test::TestIntfPrx b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = b1->begin_ice_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); @@ -2962,7 +3111,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); Test::TestIntfPrx b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); b1->begin_ice_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie); @@ -3013,7 +3162,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); Test::TestIntfPrx b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = b1->begin_ice_flushBatchRequests( Ice::newCallback_Object_ice_flushBatchRequests(cb, &FlushCallback::exception, @@ -3031,7 +3180,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); Test::TestIntfPrx b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); b1->begin_ice_flushBatchRequests( Ice::newCallback_Object_ice_flushBatchRequests(cb, &FlushCallback::exceptionWC, @@ -3100,7 +3249,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(); Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests( Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync)); @@ -3119,7 +3268,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(cookie); b1->ice_getConnection()->begin_flushBatchRequests( Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync), cookie); @@ -3171,7 +3320,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(); Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests( Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exception, @@ -3191,7 +3340,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(cookie); b1->ice_getConnection()->begin_flushBatchRequests( Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exceptionWC, @@ -3249,7 +3398,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); @@ -3268,7 +3417,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); communicator->begin_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie); @@ -3319,7 +3468,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); @@ -3346,8 +3495,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); - b2->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); @@ -3402,7 +3551,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, @@ -3422,7 +3571,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); communicator->begin_flushBatchRequests( Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exceptionWC, @@ -3476,7 +3625,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, @@ -3504,8 +3653,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); - b2->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, @@ -3715,8 +3864,35 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) if(p->ice_getConnection() && protocol != "bt") { - cout << "testing close connection with sending queue... " << flush; + cout << "testing graceful close connection with wait... " << flush; + { + // + // Local case: begin several requests, close the connection gracefully, and make sure it waits + // for the requests to complete. + // + vector<Ice::AsyncResultPtr> results; + for(int i = 0; i < 3; ++i) + { + results.push_back(p->begin_sleep(50)); + } + p->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + for(vector<Ice::AsyncResultPtr>::const_iterator q = results.begin(); q != results.end(); ++q) + { + (*q)->waitForCompleted(); + try + { + (*q)->throwLocalException(); + } + catch(const Ice::LocalException&) + { + test(false); + } + } + } { + // + // Remote case. + // Ice::ByteSeq seq; seq.resize(1024 * 10); for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q) @@ -3740,7 +3916,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) { results.push_back(p->begin_opWithPayload(seq)); } - if(!p->begin_close(false)->isSent()) + if(!p->begin_close(Test::CloseGracefullyAndWait)->isSent()) { for(int i = 0; i < maxQueue; i++) { @@ -3775,6 +3951,96 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) } } cout << "ok" << endl; + + cout << "testing graceful close connection without wait... " << flush; + { + // + // Local case: start a lengthy operation and then close the connection gracefully on the client side + // without waiting for the pending invocation to complete. There will be no retry and we expect the + // invocation to fail with ConnectionManuallyClosedException. + // + // This test requires two threads in the server's thread pool: one will block in sleep() and the other + // will process the CloseConnection message. + // + p->ice_ping(); + Ice::ConnectionPtr con = p->ice_getConnection(); + Ice::AsyncResultPtr r = p->begin_sleep(100); + con->close(Ice::CloseGracefully); + r->waitForCompleted(); + try + { + r->throwLocalException(); + test(false); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + test(ex.graceful); + } + + // + // Remote case: the server closes the connection gracefully. Our call to TestIntf::close() + // completes successfully and then the connection should be closed immediately afterward, + // despite the fact that there's a pending call to sleep(). The call to sleep() should be + // automatically retried and complete successfully. + // + p->ice_ping(); + con = p->ice_getConnection(); + CloseCallbackPtr cb = new CloseCallback; + con->setCloseCallback(cb); + r = p->begin_sleep(100); + p->close(Test::CloseGracefully); + cb->check(); + r->waitForCompleted(); + try + { + r->throwLocalException(); + } + catch(const Ice::LocalException&) + { + test(false); + } + p->ice_ping(); + test(p->ice_getConnection() != con); + } + cout << "ok" << endl; + + cout << "testing forceful close connection... " << flush; + { + // + // Local case: start a lengthy operation and then close the connection forcefully on the client side. + // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException. + // + p->ice_ping(); + Ice::ConnectionPtr con = p->ice_getConnection(); + Ice::AsyncResultPtr r = p->begin_sleep(100); + con->close(Ice::CloseForcefully); + r->waitForCompleted(); + try + { + r->throwLocalException(); + test(false); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + test(!ex.graceful); + } + + // + // Remote case: the server closes the connection forcefully. This causes the request to fail + // with a ConnectionLostException. Since the close() operation is not idempotent, the client + // will not retry. + // + try + { + p->close(Test::CloseForcefully); + test(false); + } + catch(const Ice::ConnectionLostException&) + { + // Expected. + } + } + cout << "ok" << endl; } p->shutdown(); diff --git a/cpp/test/Ice/ami/Client.cpp b/cpp/test/Ice/ami/Client.cpp index 18fb26c44bb..c6915bc2557 100644 --- a/cpp/test/Ice/ami/Client.cpp +++ b/cpp/test/Ice/ami/Client.cpp @@ -37,6 +37,7 @@ main(int argc, char* argv[]) { Ice::InitializationData initData = getTestInitData(argc, argv); initData.properties->setProperty("Ice.Warn.AMICallback", "0"); + initData.properties->setProperty("Ice.Warn.Connections", "0"); // // Limit the send buffer size, this test relies on the socket diff --git a/cpp/test/Ice/ami/Test.ice b/cpp/test/Ice/ami/Test.ice index 289750a3eb2..c82910e4620 100644 --- a/cpp/test/Ice/ami/Test.ice +++ b/cpp/test/Ice/ami/Test.ice @@ -19,6 +19,13 @@ exception TestIntfException { }; +enum CloseMode +{ + CloseForcefully, + CloseGracefully, + CloseGracefullyAndWait +}; + interface TestIntf { void op(); @@ -34,7 +41,8 @@ interface TestIntf out int eight, out int nine, out int ten, out int eleven); int opBatchCount(); bool waitForBatch(int count); - void close(bool force); + void close(CloseMode mode); + void sleep(int ms); void shutdown(); bool supportsFunctionalTests(); diff --git a/cpp/test/Ice/ami/TestI.cpp b/cpp/test/Ice/ami/TestI.cpp index f2dab5afd70..d7aa1d44762 100644 --- a/cpp/test/Ice/ami/TestI.cpp +++ b/cpp/test/Ice/ami/TestI.cpp @@ -93,9 +93,16 @@ TestIntfI::waitForBatch(Ice::Int count, const Ice::Current&) } void -TestIntfI::close(bool force, const Ice::Current& current) +TestIntfI::close(Test::CloseMode mode, const Ice::Current& current) { - current.con->close(force); + current.con->close(static_cast<ConnectionClose>(mode)); +} + +void +TestIntfI::sleep(Ice::Int ms, const Ice::Current& current) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + timedWait(IceUtil::Time::milliSeconds(ms)); } void diff --git a/cpp/test/Ice/ami/TestI.h b/cpp/test/Ice/ami/TestI.h index bee3571b74b..edde85f6008 100644 --- a/cpp/test/Ice/ami/TestI.h +++ b/cpp/test/Ice/ami/TestI.h @@ -32,7 +32,8 @@ public: virtual void opWithArgs(Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, const Ice::Current&); virtual bool waitForBatch(Ice::Int, const Ice::Current&); - virtual void close(bool, const Ice::Current&); + virtual void close(Test::CloseMode, const Ice::Current&); + virtual void sleep(Ice::Int, const Ice::Current&); virtual void shutdown(const Ice::Current&); virtual bool supportsFunctionalTests(const Ice::Current&); diff --git a/cpp/test/Ice/background/AllTests.cpp b/cpp/test/Ice/background/AllTests.cpp index f5fdc059870..4d6b82904d6 100644 --- a/cpp/test/Ice/background/AllTests.cpp +++ b/cpp/test/Ice/background/AllTests.cpp @@ -379,7 +379,7 @@ allTests(const Ice::CommunicatorPtr& communicator) #ifdef ICE_CPP11_MAPPING background->opAsync(); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); background->opAsync(); vector<future<void>> results; @@ -407,7 +407,7 @@ allTests(const Ice::CommunicatorPtr& communicator) } #else background->begin_op(); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); background->begin_op(); vector<Ice::AsyncResultPtr> results; @@ -452,7 +452,7 @@ connectTests(const ConfigurationPtr& configuration, const Test::BackgroundPrxPtr { test(false); } - background->ice_getConnection()->close(false); + background->ice_getConnection()->close(Ice::CloseGracefullyAndWait); int i; for(i = 0; i < 4; ++i) @@ -560,7 +560,7 @@ connectTests(const ConfigurationPtr& configuration, const Test::BackgroundPrxPtr } configuration->connectException(new Ice::SocketException(__FILE__, __LINE__)); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10)); configuration->connectException(0); try @@ -592,7 +592,7 @@ initializeTests(const ConfigurationPtr& configuration, { test(false); } - background->ice_getConnection()->close(false); + background->ice_getConnection()->close(Ice::CloseGracefullyAndWait); int i; for(i = 0; i < 4; i++) @@ -682,7 +682,7 @@ initializeTests(const ConfigurationPtr& configuration, cerr << ex << endl; test(false); } - background->ice_getConnection()->close(false); + background->ice_getConnection()->close(Ice::CloseGracefullyAndWait); try { @@ -695,7 +695,7 @@ initializeTests(const ConfigurationPtr& configuration, cerr << ex << endl; test(false); } - background->ice_getConnection()->close(false); + background->ice_getConnection()->close(Ice::CloseGracefullyAndWait); #endif // @@ -728,7 +728,7 @@ initializeTests(const ConfigurationPtr& configuration, { test(false); } - background->ice_getConnection()->close(false); + background->ice_getConnection()->close(Ice::CloseGracefullyAndWait); try { @@ -764,7 +764,7 @@ initializeTests(const ConfigurationPtr& configuration, } configuration->initializeException(new Ice::SocketException(__FILE__, __LINE__)); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10)); configuration->initializeException(0); try @@ -784,12 +784,12 @@ initializeTests(const ConfigurationPtr& configuration, } configuration->initializeSocketOperation(IceInternal::SocketOperationWrite); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); background->ice_ping(); configuration->initializeSocketOperation(IceInternal::SocketOperationNone); ctl->initializeException(true); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10)); ctl->initializeException(false); try @@ -812,11 +812,11 @@ initializeTests(const ConfigurationPtr& configuration, { #if !defined(ICE_USE_IOCP) && !defined(ICE_USE_CFSTREAM) ctl->initializeSocketOperation(IceInternal::SocketOperationWrite); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); background->op(); ctl->initializeSocketOperation(IceInternal::SocketOperationNone); #else - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); background->op(); #endif } @@ -847,7 +847,7 @@ validationTests(const ConfigurationPtr& configuration, { test(false); } - background->ice_getConnection()->close(false); + background->ice_getConnection()->close(Ice::CloseGracefullyAndWait); try { @@ -921,7 +921,7 @@ validationTests(const ConfigurationPtr& configuration, cerr << ex << endl; test(false); } - background->ice_getConnection()->close(false); + background->ice_getConnection()->close(Ice::CloseGracefullyAndWait); try { @@ -1081,7 +1081,7 @@ validationTests(const ConfigurationPtr& configuration, cerr << ex << endl; test(false); } - background->ice_getConnection()->close(false); + background->ice_getConnection()->close(Ice::CloseGracefullyAndWait); try { @@ -1163,7 +1163,7 @@ validationTests(const ConfigurationPtr& configuration, #else backgroundBatchOneway->begin_ice_flushBatchRequests(); #endif - backgroundBatchOneway->ice_getConnection()->close(false); + backgroundBatchOneway->ice_getConnection()->close(Ice::CloseGracefullyAndWait); ctl->holdAdapter(); backgroundBatchOneway->opWithPayload(seq); @@ -1183,10 +1183,10 @@ validationTests(const ConfigurationPtr& configuration, // in the flush to report a CloseConnectionException). Instead we // wait for the first flush to complete. // - //backgroundBatchOneway->ice_getConnection()->close(false); + //backgroundBatchOneway->ice_getConnection()->close(Ice::CloseGracefullyAndWait); backgroundBatchOneway->end_ice_flushBatchRequests(r); #endif - backgroundBatchOneway->ice_getConnection()->close(false); + backgroundBatchOneway->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } void @@ -1775,10 +1775,10 @@ readWriteTests(const ConfigurationPtr& configuration, IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10)); background->ice_ping(); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10)); - background->ice_getCachedConnection()->close(true); + background->ice_getCachedConnection()->close(Ice::CloseForcefully); } thread1->destroy(); diff --git a/cpp/test/Ice/binding/AllTests.cpp b/cpp/test/Ice/binding/AllTests.cpp index a8d5f454a74..26e7fa48aa6 100644 --- a/cpp/test/Ice/binding/AllTests.cpp +++ b/cpp/test/Ice/binding/AllTests.cpp @@ -170,7 +170,7 @@ allTests(const Ice::CommunicatorPtr& communicator) test(test2->ice_getConnection() == test3->ice_getConnection()); names.erase(test1->getAdapterName()); - test1->ice_getConnection()->close(false); + test1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } // @@ -192,7 +192,7 @@ allTests(const Ice::CommunicatorPtr& communicator) for(vector<RemoteObjectAdapterPrxPtr>::const_iterator q = adapters.begin(); q != adapters.end(); ++q) { - (*q)->getTestIntf()->ice_getConnection()->close(false); + (*q)->getTestIntf()->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } } @@ -217,7 +217,7 @@ allTests(const Ice::CommunicatorPtr& communicator) test(test2->ice_getConnection() == test3->ice_getConnection()); names.erase(test1->getAdapterName()); - test1->ice_getConnection()->close(false); + test1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } // @@ -314,7 +314,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { try { - (*q)->getTestIntf()->ice_getConnection()->close(false); + (*q)->getTestIntf()->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } catch(const Ice::LocalException&) { @@ -354,7 +354,7 @@ allTests(const Ice::CommunicatorPtr& communicator) test(test2->ice_getConnection() == test3->ice_getConnection()); names.erase(getAdapterNameWithAMI(test1)); - test1->ice_getConnection()->close(false); + test1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } // @@ -376,7 +376,7 @@ allTests(const Ice::CommunicatorPtr& communicator) for(vector<RemoteObjectAdapterPrxPtr>::const_iterator q = adapters.begin(); q != adapters.end(); ++q) { - (*q)->getTestIntf()->ice_getConnection()->close(false); + (*q)->getTestIntf()->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } } @@ -401,7 +401,7 @@ allTests(const Ice::CommunicatorPtr& communicator) test(test2->ice_getConnection() == test3->ice_getConnection()); names.erase(test1->getAdapterName()); - test1->ice_getConnection()->close(false); + test1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } // @@ -433,7 +433,7 @@ allTests(const Ice::CommunicatorPtr& communicator) while(!names.empty()) { names.erase(test->getAdapterName()); - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } test = ICE_UNCHECKED_CAST(TestIntfPrx, test->ice_endpointSelection(Ice::Random)); @@ -445,7 +445,7 @@ allTests(const Ice::CommunicatorPtr& communicator) while(!names.empty()) { names.erase(test->getAdapterName()); - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } deactivate(com, adapters); @@ -473,7 +473,7 @@ allTests(const Ice::CommunicatorPtr& communicator) #if TARGET_OS_IPHONE > 0 if(i != nRetry) { - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); for(i = 0; i < nRetry && test->getAdapterName() == "Adapter31"; i++); } #endif @@ -483,7 +483,7 @@ allTests(const Ice::CommunicatorPtr& communicator) #if TARGET_OS_IPHONE > 0 if(i != nRetry) { - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); for(i = 0; i < nRetry && test->getAdapterName() == "Adapter32"; i++); } #endif @@ -493,7 +493,7 @@ allTests(const Ice::CommunicatorPtr& communicator) #if TARGET_OS_IPHONE > 0 if(i != nRetry) { - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); for(i = 0; i < nRetry && test->getAdapterName() == "Adapter33"; i++); } #endif @@ -525,29 +525,29 @@ allTests(const Ice::CommunicatorPtr& communicator) #if TARGET_OS_IPHONE > 0 if(i != nRetry) { - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); for(i = 0; i < nRetry && test->getAdapterName() == "Adapter36"; i++); } #endif test(i == nRetry); - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); adapters.push_back(com->createObjectAdapter("Adapter35", endpoints[1]->toString())); for(i = 0; i < nRetry && test->getAdapterName() == "Adapter35"; i++); #if TARGET_OS_IPHONE > 0 if(i != nRetry) { - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); for(i = 0; i < nRetry && test->getAdapterName() == "Adapter35"; i++); } #endif test(i == nRetry); - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); adapters.push_back(com->createObjectAdapter("Adapter34", endpoints[0]->toString())); for(i = 0; i < nRetry && test->getAdapterName() == "Adapter34"; i++); #if TARGET_OS_IPHONE > 0 if(i != nRetry) { - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); for(i = 0; i < nRetry && test->getAdapterName() == "Adapter34"; i++); } #endif @@ -865,7 +865,7 @@ allTests(const Ice::CommunicatorPtr& communicator) for(i = 0; i < 5; i++) { test(test->getAdapterName() == "Adapter82"); - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } TestIntfPrxPtr testSecure = ICE_UNCHECKED_CAST(TestIntfPrx, test->ice_secure(true)); @@ -881,7 +881,7 @@ allTests(const Ice::CommunicatorPtr& communicator) for(i = 0; i < 5; i++) { test(test->getAdapterName() == "Adapter81"); - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } com->createObjectAdapter("Adapter83", (test->ice_getEndpoints()[1])->toString()); // Reactive tcp OA. @@ -889,7 +889,7 @@ allTests(const Ice::CommunicatorPtr& communicator) for(i = 0; i < 5; i++) { test(test->getAdapterName() == "Adapter83"); - test->ice_getConnection()->close(false); + test->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } com->deactivateObjectAdapter(adapters[0]); @@ -1098,7 +1098,7 @@ allTests(const Ice::CommunicatorPtr& communicator) // Close the connection now to free a FD (it could be done after the sleep but // there could be race condiutation since the connection might not be closed // immediately due to threading). - test->ice_connectionId("0")->ice_getConnection()->close(false); + test->ice_connectionId("0")->ice_getConnection()->close(Ice::CloseGracefullyAndWait); // // The server closed the acceptor, wait one second and retry after freeing a FD. diff --git a/cpp/test/Ice/hold/AllTests.cpp b/cpp/test/Ice/hold/AllTests.cpp index d08764d57d8..7cb0e59a53c 100644 --- a/cpp/test/Ice/hold/AllTests.cpp +++ b/cpp/test/Ice/hold/AllTests.cpp @@ -290,7 +290,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { completed->get_future().get(); holdSerialized->ice_ping(); // Ensure everything's dispatched - holdSerialized->ice_getConnection()->close(false); + holdSerialized->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } } completed->get_future().get(); @@ -305,7 +305,7 @@ allTests(const Ice::CommunicatorPtr& communicator) { result->waitForSent(); holdSerialized->ice_ping(); // Ensure everything's dispatched - holdSerialized->ice_getConnection()->close(false); + holdSerialized->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } } result->waitForCompleted(); diff --git a/cpp/test/Ice/location/AllTests.cpp b/cpp/test/Ice/location/AllTests.cpp index 148230c9fbf..4e96a75a0d5 100644 --- a/cpp/test/Ice/location/AllTests.cpp +++ b/cpp/test/Ice/location/AllTests.cpp @@ -637,7 +637,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const string& ref) cout << "testing object migration... " << flush; hello = ICE_CHECKED_CAST(HelloPrx, communicator->stringToProxy("hello")); obj->migrateHello(); - hello->ice_getConnection()->close(false); + hello->ice_getConnection()->close(Ice::CloseGracefullyAndWait); hello->sayHello(); obj->migrateHello(); hello->sayHello(); diff --git a/cpp/test/Ice/metrics/AllTests.cpp b/cpp/test/Ice/metrics/AllTests.cpp index c880af199ce..8d30231b520 100644 --- a/cpp/test/Ice/metrics/AllTests.cpp +++ b/cpp/test/Ice/metrics/AllTests.cpp @@ -287,7 +287,7 @@ struct Connect { if(proxy->ice_getCachedConnection()) { - proxy->ice_getCachedConnection()->close(false); + proxy->ice_getCachedConnection()->close(Ice::CloseGracefullyAndWait); } try { @@ -298,7 +298,7 @@ struct Connect } if(proxy->ice_getCachedConnection()) { - proxy->ice_getCachedConnection()->close(false); + proxy->ice_getCachedConnection()->close(Ice::CloseGracefullyAndWait); } } @@ -534,8 +534,8 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt if(!collocated) { - metrics->ice_getConnection()->close(false); - metrics->ice_connectionId("Con1")->ice_getConnection()->close(false); + metrics->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + metrics->ice_connectionId("Con1")->ice_getConnection()->close(Ice::CloseGracefullyAndWait); waitForCurrent(clientMetrics, "View", "Connection", 0); waitForCurrent(serverMetrics, "View", "Connection", 0); @@ -645,7 +645,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt map = toMap(serverMetrics->getMetricsView("View", timestamp)["Connection"]); test(map["holding"]->current == 1); - metrics->ice_getConnection()->close(false); + metrics->ice_getConnection()->close(Ice::CloseGracefullyAndWait); map = toMap(clientMetrics->getMetricsView("View", timestamp)["Connection"]); test(map["closing"]->current == 1); @@ -660,7 +660,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt props["IceMX.Metrics.View.Map.Connection.GroupBy"] = "none"; updateProps(clientProps, serverProps, update.get(), props, "Connection"); - metrics->ice_getConnection()->close(false); + metrics->ice_getConnection()->close(Ice::CloseGracefullyAndWait); metrics->ice_timeout(500)->ice_ping(); controller->hold(); @@ -717,7 +717,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt testAttribute(clientMetrics, clientProps, update.get(), "Connection", "mcastHost", ""); testAttribute(clientMetrics, clientProps, update.get(), "Connection", "mcastPort", ""); - m->ice_getConnection()->close(false); + m->ice_getConnection()->close(Ice::CloseGracefullyAndWait); waitForCurrent(clientMetrics, "View", "Connection", 0); waitForCurrent(serverMetrics, "View", "Connection", 0); @@ -736,7 +736,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt IceMX::MetricsPtr m1 = clientMetrics->getMetricsView("View", timestamp)["ConnectionEstablishment"][0]; test(m1->current == 0 && m1->total == 1 && m1->id == hostAndPort); - metrics->ice_getConnection()->close(false); + metrics->ice_getConnection()->close(Ice::CloseGracefullyAndWait); controller->hold(); try { @@ -788,7 +788,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt try { prx->ice_ping(); - prx->ice_getConnection()->close(false); + prx->ice_getConnection()->close(Ice::CloseGracefullyAndWait); } catch(const Ice::LocalException&) { diff --git a/cpp/test/Ice/metrics/TestAMDI.cpp b/cpp/test/Ice/metrics/TestAMDI.cpp index 621bc976c1d..d5173e4380b 100644 --- a/cpp/test/Ice/metrics/TestAMDI.cpp +++ b/cpp/test/Ice/metrics/TestAMDI.cpp @@ -22,7 +22,7 @@ MetricsI::opAsync(function<void()> response, function<void(exception_ptr)>, cons void MetricsI::failAsync(function<void()> response, function<void(exception_ptr)>, const Ice::Current& current) { - current.con->close(true); + current.con->close(Ice::CloseForcefully); response(); } @@ -87,7 +87,7 @@ MetricsI::op_async(const Test::AMD_Metrics_opPtr& cb, const Ice::Current&) void MetricsI::fail_async(const Test::AMD_Metrics_failPtr& cb, const Ice::Current& current) { - current.con->close(true); + current.con->close(Ice::CloseForcefully); cb->ice_response(); } diff --git a/cpp/test/Ice/metrics/TestI.cpp b/cpp/test/Ice/metrics/TestI.cpp index 24e7b9b8ec3..30f7a98bf35 100644 --- a/cpp/test/Ice/metrics/TestI.cpp +++ b/cpp/test/Ice/metrics/TestI.cpp @@ -18,7 +18,7 @@ MetricsI::op(const Ice::Current&) void MetricsI::fail(const Ice::Current& current) { - current.con->close(true); + current.con->close(Ice::CloseForcefully); } void diff --git a/cpp/test/Ice/operations/BatchOneways.cpp b/cpp/test/Ice/operations/BatchOneways.cpp index cc86c14a951..e3d261cf7e3 100644 --- a/cpp/test/Ice/operations/BatchOneways.cpp +++ b/cpp/test/Ice/operations/BatchOneways.cpp @@ -121,7 +121,7 @@ batchOneways(const Test::MyClassPrxPtr& p) batch1->ice_ping(); batch2->ice_ping(); batch1->ice_flushBatchRequests(); - batch1->ice_getConnection()->close(false); + batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); batch1->ice_ping(); batch2->ice_ping(); @@ -129,7 +129,7 @@ batchOneways(const Test::MyClassPrxPtr& p) batch2->ice_getConnection(); batch1->ice_ping(); - batch1->ice_getConnection()->close(false); + batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); batch1->ice_ping(); batch2->ice_ping(); } diff --git a/cpp/test/Ice/operations/BatchOnewaysAMI.cpp b/cpp/test/Ice/operations/BatchOnewaysAMI.cpp index 9adb2be5c7b..4545d8581cf 100644 --- a/cpp/test/Ice/operations/BatchOnewaysAMI.cpp +++ b/cpp/test/Ice/operations/BatchOnewaysAMI.cpp @@ -127,7 +127,7 @@ batchOnewaysAMI(const Test::MyClassPrxPtr& p) batch1->ice_pingAsync().get(); batch2->ice_pingAsync().get(); batch1->ice_flushBatchRequestsAsync().get(); - batch1->ice_getConnection()->close(false); + batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); batch1->ice_pingAsync().get(); batch2->ice_pingAsync().get(); @@ -135,7 +135,7 @@ batchOnewaysAMI(const Test::MyClassPrxPtr& p) batch2->ice_getConnection(); batch1->ice_pingAsync().get(); - batch1->ice_getConnection()->close(false); + batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); batch1->ice_pingAsync().get(); batch2->ice_pingAsync().get(); @@ -182,7 +182,7 @@ batchOnewaysAMI(const Test::MyClassPrxPtr& p) batch1->end_ice_ping(batch1->begin_ice_ping()); batch2->end_ice_ping(batch2->begin_ice_ping()); batch1->end_ice_flushBatchRequests(batch1->begin_ice_flushBatchRequests()); - batch1->ice_getConnection()->close(false); + batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); batch1->end_ice_ping(batch1->begin_ice_ping()); batch2->end_ice_ping(batch2->begin_ice_ping()); @@ -190,7 +190,7 @@ batchOnewaysAMI(const Test::MyClassPrxPtr& p) batch2->ice_getConnection(); batch1->end_ice_ping(batch1->begin_ice_ping()); - batch1->ice_getConnection()->close(false); + batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); batch1->end_ice_ping(batch1->begin_ice_ping()); batch2->end_ice_ping(batch2->begin_ice_ping()); diff --git a/cpp/test/Ice/retry/TestI.cpp b/cpp/test/Ice/retry/TestI.cpp index f721a3b6ab7..7fd3c0d4dd4 100644 --- a/cpp/test/Ice/retry/TestI.cpp +++ b/cpp/test/Ice/retry/TestI.cpp @@ -22,7 +22,7 @@ RetryI::op(bool kill, const Ice::Current& current) { if(current.con) { - current.con->close(true); + current.con->close(Ice::CloseForcefully); } else { diff --git a/cpp/test/Ice/timeout/AllTests.cpp b/cpp/test/Ice/timeout/AllTests.cpp index 6681a8f56df..1947f4e9aa1 100644 --- a/cpp/test/Ice/timeout/AllTests.cpp +++ b/cpp/test/Ice/timeout/AllTests.cpp @@ -294,7 +294,7 @@ allTests(const Ice::CommunicatorPtr& communicator) TimeoutPrxPtr to = ICE_CHECKED_CAST(TimeoutPrx, obj->ice_timeout(250)); Ice::ConnectionPtr connection = to->ice_getConnection(); timeout->holdAdapter(600); - connection->close(false); + connection->close(Ice::CloseGracefullyAndWait); try { connection->getInfo(); // getInfo() doesn't throw in the closing state. @@ -309,9 +309,10 @@ allTests(const Ice::CommunicatorPtr& communicator) connection->getInfo(); test(false); } - catch(const Ice::CloseConnectionException&) + catch(const Ice::ConnectionManuallyClosedException& ex) { // Expected. + test(ex.graceful); } timeout->op(); // Ensure adapter is active. } diff --git a/cpp/test/Ice/udp/AllTests.cpp b/cpp/test/Ice/udp/AllTests.cpp index e2e00f19da2..f16b10baa70 100644 --- a/cpp/test/Ice/udp/AllTests.cpp +++ b/cpp/test/Ice/udp/AllTests.cpp @@ -116,7 +116,7 @@ allTests(const CommunicatorPtr& communicator) { test(seq.size() > 16384); } - obj->ice_getConnection()->close(false); + obj->ice_getConnection()->close(CloseGracefullyAndWait); communicator->getProperties()->setProperty("Ice.UDP.SndSize", "64000"); seq.resize(50000); try diff --git a/cpp/test/IceSSL/configuration/AllTests.cpp b/cpp/test/IceSSL/configuration/AllTests.cpp index 96d14174b36..96c1e527cb7 100644 --- a/cpp/test/IceSSL/configuration/AllTests.cpp +++ b/cpp/test/IceSSL/configuration/AllTests.cpp @@ -1585,7 +1585,7 @@ allTests(const CommunicatorPtr& communicator, const string& testDir, bool p12) // verifier->reset(); verifier->returnValue(false); - server->ice_getConnection()->close(false); + server->ice_getConnection()->close(Ice::CloseGracefullyAndWait); try { server->ice_ping(); diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp index d43c2469b28..4722fe93caf 100644 --- a/cpp/test/IceStorm/stress/Subscriber.cpp +++ b/cpp/test/IceStorm/stress/Subscriber.cpp @@ -163,7 +163,7 @@ public: if(!_done && (IceUtilInternal::random(10) == 1 || ++_count == _total)) { _done = true; - current.con->close(true); + current.con->close(CloseForcefully); // Deactivate the OA. This ensures that the subscribers // that have subscribed with oneway QoS will be booted. current.adapter->deactivate(); |