summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/Ice/ConnectionAsync.h93
-rw-r--r--cpp/src/Glacier2/RoutingTable.cpp2
-rw-r--r--cpp/src/Glacier2/SessionRouterI.cpp10
-rw-r--r--cpp/src/Ice/ConnectionI.cpp211
-rw-r--r--cpp/src/Ice/ConnectionI.h20
-rw-r--r--cpp/src/Ice/Exception.cpp8
-rw-r--r--cpp/src/Ice/ProxyFactory.cpp7
-rw-r--r--cpp/src/IceBridge/IceBridge.cpp837
-rw-r--r--cpp/src/IceBridge/Makefile.mk15
-rw-r--r--cpp/src/IceGrid/SessionManager.cpp2
-rw-r--r--cpp/test/Ice/acm/AllTests.cpp32
-rw-r--r--cpp/test/Ice/acm/Test.ice3
-rw-r--r--cpp/test/Ice/acm/TestI.cpp50
-rw-r--r--cpp/test/Ice/acm/TestI.h46
-rw-r--r--cpp/test/Ice/ami/AllTests.cpp322
-rw-r--r--cpp/test/Ice/ami/Client.cpp1
-rw-r--r--cpp/test/Ice/ami/Test.ice10
-rw-r--r--cpp/test/Ice/ami/TestI.cpp11
-rw-r--r--cpp/test/Ice/ami/TestI.h3
-rw-r--r--cpp/test/Ice/background/AllTests.cpp42
-rw-r--r--cpp/test/Ice/binding/AllTests.cpp42
-rw-r--r--cpp/test/Ice/hold/AllTests.cpp4
-rw-r--r--cpp/test/Ice/location/AllTests.cpp2
-rw-r--r--cpp/test/Ice/metrics/AllTests.cpp18
-rw-r--r--cpp/test/Ice/metrics/TestAMDI.cpp4
-rw-r--r--cpp/test/Ice/metrics/TestI.cpp2
-rw-r--r--cpp/test/Ice/operations/BatchOneways.cpp4
-rw-r--r--cpp/test/Ice/operations/BatchOnewaysAMI.cpp8
-rw-r--r--cpp/test/Ice/retry/TestI.cpp2
-rw-r--r--cpp/test/Ice/timeout/AllTests.cpp5
-rw-r--r--cpp/test/Ice/udp/AllTests.cpp2
-rw-r--r--cpp/test/IceSSL/configuration/AllTests.cpp2
-rw-r--r--cpp/test/IceStorm/stress/Subscriber.cpp2
33 files changed, 1638 insertions, 184 deletions
diff --git a/cpp/include/Ice/ConnectionAsync.h b/cpp/include/Ice/ConnectionAsync.h
index bc3f6064525..58f54cc3492 100644
--- a/cpp/include/Ice/ConnectionAsync.h
+++ b/cpp/include/Ice/ConnectionAsync.h
@@ -112,6 +112,99 @@ newCallback_Connection_flushBatchRequests(T* instance, void (T::*excb)(const ::I
return new Callback_Connection_flushBatchRequests<T, CT>(instance, excb, sentcb);
}
+template<class T>
+class CallbackNC_Connection_heartbeat : public Callback_Connection_heartbeat_Base,
+ public ::IceInternal::OnewayCallbackNC<T>
+{
+public:
+
+ typedef IceUtil::Handle<T> TPtr;
+
+ typedef void (T::*Exception)(const ::Ice::Exception&);
+ typedef void (T::*Sent)(bool);
+
+ CallbackNC_Connection_heartbeat(const TPtr& obj, Exception excb, Sent sentcb)
+ : ::IceInternal::OnewayCallbackNC<T>(obj, 0, excb, sentcb)
+ {
+ }
+
+ virtual void completed(const ::Ice::AsyncResultPtr& __result) const
+ {
+ ::Ice::ConnectionPtr __con = __result->getConnection();
+ assert(__con);
+ try
+ {
+ __con->end_heartbeat(__result);
+ assert(false);
+ }
+ catch(const ::Ice::Exception& ex)
+ {
+ ::IceInternal::CallbackNC<T>::exception(__result, ex);
+ }
+ }
+};
+
+template<class T> Callback_Connection_heartbeatPtr
+newCallback_Connection_heartbeat(const IceUtil::Handle<T>& instance,
+ void (T::*excb)(const ::Ice::Exception&),
+ void (T::*sentcb)(bool) = 0)
+{
+ return new CallbackNC_Connection_heartbeat<T>(instance, excb, sentcb);
+}
+
+template<class T> Callback_Connection_heartbeatPtr
+newCallback_Connection_heartbeat(T* instance, void (T::*excb)(const ::Ice::Exception&), void (T::*sentcb)(bool) = 0)
+{
+ return new CallbackNC_Connection_heartbeat<T>(instance, excb, sentcb);
+}
+
+template<class T, typename CT>
+class Callback_Connection_heartbeat : public Callback_Connection_heartbeat_Base,
+ public ::IceInternal::OnewayCallback<T, CT>
+{
+public:
+
+ typedef IceUtil::Handle<T> TPtr;
+
+ typedef void (T::*Exception)(const ::Ice::Exception& , const CT&);
+ typedef void (T::*Sent)(bool , const CT&);
+
+ Callback_Connection_heartbeat(const TPtr& obj, Exception excb, Sent sentcb)
+ : ::IceInternal::OnewayCallback<T, CT>(obj, 0, excb, sentcb)
+ {
+ }
+
+ virtual void completed(const ::Ice::AsyncResultPtr& __result) const
+ {
+ ::Ice::ConnectionPtr __con = __result->getConnection();
+ assert(__con);
+ try
+ {
+ __con->end_heartbeat(__result);
+ assert(false);
+ }
+ catch(const ::Ice::Exception& ex)
+ {
+ ::IceInternal::Callback<T, CT>::exception(__result, ex);
+ }
+ }
+};
+
+template<class T, typename CT> Callback_Connection_heartbeatPtr
+newCallback_Connection_heartbeat(const IceUtil::Handle<T>& instance,
+ void (T::*excb)(const ::Ice::Exception&, const CT&),
+ void (T::*sentcb)(bool, const CT&) = 0)
+{
+ return new Callback_Connection_heartbeat<T, CT>(instance, excb, sentcb);
+}
+
+template<class T, typename CT> Callback_Connection_heartbeatPtr
+newCallback_Connection_heartbeat(T* instance, void (T::*excb)(const ::Ice::Exception&, const CT&),
+ void (T::*sentcb)(bool, const CT&) = 0)
+{
+ return new Callback_Connection_heartbeat<T, CT>(instance, excb, sentcb);
+}
+
}
#endif
diff --git a/cpp/src/Glacier2/RoutingTable.cpp b/cpp/src/Glacier2/RoutingTable.cpp
index fde938fba00..e6b090de4cb 100644
--- a/cpp/src/Glacier2/RoutingTable.cpp
+++ b/cpp/src/Glacier2/RoutingTable.cpp
@@ -65,7 +65,7 @@ Glacier2::RoutingTable::add(const ObjectProxySeq& unfiltered, const Current& cur
if(!_verifier->verify(*prx))
{
- current.con->close(true);
+ current.con->close(CloseForcefully);
throw ObjectNotExistException(__FILE__, __LINE__);
}
ObjectPrx proxy = (*prx)->ice_twoway()->ice_secure(false)->ice_facet(""); // We add proxies in default form.
diff --git a/cpp/src/Glacier2/SessionRouterI.cpp b/cpp/src/Glacier2/SessionRouterI.cpp
index 9f774b79a8e..512d371611f 100644
--- a/cpp/src/Glacier2/SessionRouterI.cpp
+++ b/cpp/src/Glacier2/SessionRouterI.cpp
@@ -80,7 +80,7 @@ public:
// Close the connection otherwise the peer has no way to know that
// the session has gone.
//
- _connection->close(true);
+ _connection->close(CloseForcefully);
_router->destroySession(_connection);
}
}
@@ -922,7 +922,7 @@ SessionRouterI::refreshSession(const Ice::ConnectionPtr& con)
// Close the connection otherwise the peer has no way to know that the
// session has gone.
//
- con->close(true);
+ con->close(CloseForcefully);
throw SessionNotExistException();
}
}
@@ -1149,10 +1149,10 @@ SessionRouterI::getRouterImpl(const ConnectionPtr& connection, const Ice::Identi
if(_rejectTraceLevel >= 1)
{
Trace out(_instance->logger(), "Glacier2");
- out << "rejecting request. no session is associated with the connection.\n";
- out << "identity: " << _instance->communicator()->identityToString(id);
+ out << "rejecting request, no session is associated with the connection.\n";
+ out << "identity: " << identityToString(id);
}
- connection->close(true);
+ connection->close(CloseForcefully);
throw ObjectNotExistException(__FILE__, __LINE__);
}
return 0;
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 7845259855c..23388d399c6 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -409,29 +409,31 @@ Ice::ConnectionI::destroy(DestructionReason reason)
}
void
-Ice::ConnectionI::close(bool force)
+Ice::ConnectionI::close(ConnectionClose mode)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(force)
+ if(mode == CloseForcefully)
{
- setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__));
+ setState(StateClosed, ConnectionManuallyClosedException(__FILE__, __LINE__, false));
+ }
+ else if(mode == CloseGracefully)
+ {
+ setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true));
}
else
{
+ assert(mode == CloseGracefullyAndWait);
+
//
- // If we do a graceful shutdown, then we wait until all
- // outstanding requests have been completed. Otherwise, the
- // CloseConnectionException will cause all outstanding
- // requests to be retried, regardless of whether the server
- // has processed them or not.
+ // Wait until all outstanding requests have been completed.
//
while(!_asyncRequests.empty())
{
wait();
}
- setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
+ setState(StateClosing, ConnectionManuallyClosedException(__FILE__, __LINE__, true));
}
}
@@ -550,13 +552,13 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
//
// We send a heartbeat if there was no activity in the last
// (timeout / 4) period. Sending a heartbeat sooner than really
- // needed is safer to ensure that the receiver will receive in
- // time the heartbeat. Sending the heartbeat if there was no
+ // needed is safer to ensure that the receiver will receive the
+ // heartbeat in time. Sending the heartbeat if there was no
// activity in the last (timeout / 2) period isn't enough since
// monitor() is called only every (timeout / 2) period.
//
// Note that this doesn't imply that we are sending 4 heartbeats
- // per timeout period because the monitor() method is sill only
+ // per timeout period because the monitor() method is still only
// called every (timeout / 2) period.
//
if(acm.heartbeat == HeartbeatAlways ||
@@ -564,7 +566,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
{
if(acm.heartbeat != HeartbeatOnInvocation || _dispatchCount > 0)
{
- heartbeat();
+ sendHeartbeatNow();
}
}
@@ -802,6 +804,174 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
}
#endif
+namespace
+{
+
+const ::std::string __heartbeat_name = "heartbeat";
+
+class HeartbeatAsync : public OutgoingAsyncBase
+{
+public:
+
+ HeartbeatAsync(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance) :
+ OutgoingAsyncBase(instance),
+ _communicator(communicator),
+ _connection(connection)
+ {
+ }
+
+ virtual CommunicatorPtr getCommunicator() const
+ {
+ return _communicator;
+ }
+
+ virtual ConnectionPtr getConnection() const
+ {
+ return _connection;
+ }
+
+ virtual const string& getOperation() const
+ {
+ return __heartbeat_name;
+ }
+
+ void invoke()
+ {
+ _observer.attach(_instance.get(), __heartbeat_name);
+ try
+ {
+ _os.write(magic[0]);
+ _os.write(magic[1]);
+ _os.write(magic[2]);
+ _os.write(magic[3]);
+ _os.write(currentProtocol);
+ _os.write(currentProtocolEncoding);
+ _os.write(validateConnectionMsg);
+ _os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection).
+ _os.write(headerSize); // Message size.
+ _os.i = _os.b.begin();
+
+ AsyncStatus status = _connection->sendAsyncRequest(ICE_SHARED_FROM_THIS, false, false, 0);
+ if(status & AsyncStatusSent)
+ {
+ _sentSynchronously = true;
+ if(status & AsyncStatusInvokeSentCallback)
+ {
+ invokeSent();
+ }
+ }
+ }
+ catch(const RetryException& ex)
+ {
+ if(exception(*ex.get()))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ catch(const Exception& ex)
+ {
+ if(exception(ex))
+ {
+ invokeExceptionAsync();
+ }
+ }
+ }
+
+private:
+
+ CommunicatorPtr _communicator;
+ ConnectionIPtr _connection;
+};
+typedef IceUtil::Handle<HeartbeatAsync> HeartbeatAsyncPtr;
+
+}
+
+#ifdef ICE_CPP11_MAPPING
+void
+Ice::ConnectionI::heartbeat()
+{
+ Connection::heartbeatAsync().get();
+}
+
+std::function<void()>
+Ice::ConnectionI::heartbeatAsync(::std::function<void(::std::exception_ptr)> ex, ::std::function<void(bool)> sent)
+{
+ class HeartbeatLambda : public HeartbeatAsync, public LambdaInvoke
+ {
+ public:
+
+ HeartbeatLambda(std::shared_ptr<Ice::ConnectionI>&& connection,
+ std::shared_ptr<Ice::Communicator>& communicator,
+ const InstancePtr& instance,
+ std::function<void(std::exception_ptr)> ex,
+ std::function<void(bool)> sent) :
+ HeartbeatAsync(connection, communicator, instance), LambdaInvoke(std::move(ex), std::move(sent))
+ {
+ }
+ };
+ auto outAsync = make_shared<HeartbeatLambda>(ICE_SHARED_FROM_THIS, _communicator, _instance, ex, sent);
+ outAsync->invoke();
+ return [outAsync]() { outAsync->cancel(); };
+}
+#else
+void
+Ice::ConnectionI::heartbeat()
+{
+ end_heartbeat(begin_heartbeat());
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat()
+{
+ return _iceI_begin_heartbeat(dummyCallback, 0);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat(const CallbackPtr& cb, const LocalObjectPtr& cookie)
+{
+ return _iceI_begin_heartbeat(cb, cookie);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::begin_heartbeat(const Callback_Connection_heartbeatPtr& cb, const LocalObjectPtr& cookie)
+{
+ return _iceI_begin_heartbeat(cb, cookie);
+}
+
+AsyncResultPtr
+Ice::ConnectionI::_iceI_begin_heartbeat(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
+{
+ class HeartbeatCallback : public HeartbeatAsync, public CallbackCompletion
+ {
+ public:
+
+ HeartbeatCallback(const ConnectionIPtr& connection,
+ const CommunicatorPtr& communicator,
+ const InstancePtr& instance,
+ const CallbackBasePtr& callback,
+ const LocalObjectPtr& cookie) :
+ HeartbeatAsync(connection, communicator, instance),
+ CallbackCompletion(callback, cookie)
+ {
+ _cookie = cookie;
+ }
+ };
+
+ HeartbeatAsyncPtr result = new HeartbeatCallback(this, _communicator, _instance, cb, cookie);
+ result->invoke();
+ return result;
+}
+
+void
+Ice::ConnectionI::end_heartbeat(const AsyncResultPtr& r)
+{
+ AsyncResult::check(r, this, __heartbeat_name);
+ r->waitForResponse();
+}
+#endif
+
void
Ice::ConnectionI::setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK) callback)
{
@@ -1755,7 +1925,7 @@ Ice::ConnectionI::finish(bool close)
out << "closed " << _endpoint->protocol() << " connection\n" << toString();
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) ||
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get())))
@@ -2076,7 +2246,7 @@ Ice::ConnectionI::setState(State state, const LocalException& ex)
// Don't warn about certain expected exceptions.
//
if(!(dynamic_cast<const CloseConnectionException*>(&ex) ||
- dynamic_cast<const ForcedCloseConnectionException*>(&ex) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(&ex) ||
dynamic_cast<const ConnectionTimeoutException*>(&ex) ||
dynamic_cast<const CommunicatorDestroyedException*>(&ex) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) ||
@@ -2255,7 +2425,7 @@ Ice::ConnectionI::setState(State state)
if(_observer && state == StateClosed && _exception)
{
if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(_exception.get()) ||
dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) ||
dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) ||
dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) ||
@@ -2285,8 +2455,7 @@ Ice::ConnectionI::setState(State state)
void
Ice::ConnectionI::initiateShutdown()
{
- assert(_state == StateClosing);
- assert(_dispatchCount == 0);
+ assert(_state == StateClosing && _dispatchCount == 0);
if(_shutdownInitiated)
{
@@ -2316,7 +2485,7 @@ Ice::ConnectionI::initiateShutdown()
setState(StateClosingPending);
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the transceiver of the graceful connection closure.
//
SocketOperation op = _transceiver->closing(true, *_exception);
if(op)
@@ -2329,7 +2498,7 @@ Ice::ConnectionI::initiateShutdown()
}
void
-Ice::ConnectionI::heartbeat()
+Ice::ConnectionI::sendHeartbeatNow()
{
assert(_state == StateActive);
@@ -3016,7 +3185,7 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request
setState(StateClosingPending, CloseConnectionException(__FILE__, __LINE__));
//
- // Notify the the transceiver of the graceful connection closure.
+ // Notify the transceiver of the graceful connection closure.
//
SocketOperation op = _transceiver->closing(false, *_exception);
if(op)
diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h
index 72652518b94..7fb3781e880 100644
--- a/cpp/src/Ice/ConnectionI.h
+++ b/cpp/src/Ice/ConnectionI.h
@@ -157,12 +157,12 @@ public:
void activate();
void hold();
void destroy(DestructionReason);
- virtual void close(bool); // From Connection.
+ virtual void close(ConnectionClose); // From Connection.
bool isActiveOrHolding() const;
bool isFinished() const;
- void throwException() const; // Throws the connection exception if destroyed.
+ virtual void throwException() const; // From Connection. Throws the connection exception if destroyed.
void waitUntilHolding() const;
void waitUntilFinished(); // Not const, as this might close the connection upon timeout.
@@ -193,6 +193,19 @@ public:
virtual void setCloseCallback(ICE_IN(ICE_CLOSE_CALLBACK));
virtual void setHeartbeatCallback(ICE_IN(ICE_HEARTBEAT_CALLBACK));
+ virtual void heartbeat();
+
+#ifdef ICE_CPP11_MAPPING
+ virtual std::function<void()>
+ heartbeatAsync(::std::function<void(::std::exception_ptr)>, ::std::function<void(bool)> = nullptr);
+#else
+ virtual AsyncResultPtr begin_heartbeat();
+ virtual AsyncResultPtr begin_heartbeat(const CallbackPtr&, const LocalObjectPtr& = 0);
+ virtual AsyncResultPtr begin_heartbeat(const Callback_Connection_heartbeatPtr&, const LocalObjectPtr& = 0);
+
+ virtual void end_heartbeat(const AsyncResultPtr&);
+#endif
+
virtual void setACM(const IceUtil::Optional<int>&,
const IceUtil::Optional<ACMClose>&,
const IceUtil::Optional<ACMHeartbeat>&);
@@ -276,7 +289,7 @@ private:
void setState(State);
void initiateShutdown();
- void heartbeat();
+ void sendHeartbeatNow();
bool initialize(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
bool validate(IceInternal::SocketOperation = IceInternal::SocketOperationNone);
@@ -308,6 +321,7 @@ private:
#ifndef ICE_CPP11_MAPPING
AsyncResultPtr _iceI_begin_flushBatchRequests(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
+ AsyncResultPtr _iceI_begin_heartbeat(const IceInternal::CallbackBasePtr&, const LocalObjectPtr&);
#endif
Ice::CommunicatorPtr _communicator;
diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp
index 4ac7d66cdd3..a20c564101e 100644
--- a/cpp/src/Ice/Exception.cpp
+++ b/cpp/src/Ice/Exception.cpp
@@ -632,14 +632,10 @@ Ice::CloseConnectionException::ice_print(ostream& out) const
}
void
-Ice::ForcedCloseConnectionException::ice_print(ostream& out) const
+Ice::ConnectionManuallyClosedException::ice_print(ostream& out) const
{
Exception::ice_print(out);
- out << ":\nprotocol error: connection forcefully closed";
- if(!reason.empty())
- {
- out << ":\n" << reason;
- }
+ out << ":\nprotocol error: connection manually closed (" << (graceful ? "gracefully" : "forcefully") << ")";
}
void
diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp
index eba949508b6..34503ec961b 100644
--- a/cpp/src/Ice/ProxyFactory.cpp
+++ b/cpp/src/Ice/ProxyFactory.cpp
@@ -201,11 +201,12 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co
}
//
- // Don't retry if the communicator is destroyed or object adapter
- // deactivated.
+ // Don't retry if the communicator is destroyed, object adapter is deactivated,
+ // or connection is manually closed.
//
if(dynamic_cast<const CommunicatorDestroyedException*>(&ex) ||
- dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex))
+ dynamic_cast<const ObjectAdapterDeactivatedException*>(&ex) ||
+ dynamic_cast<const ConnectionManuallyClosedException*>(&ex))
{
ex.ice_throw();
}
diff --git a/cpp/src/IceBridge/IceBridge.cpp b/cpp/src/IceBridge/IceBridge.cpp
new file mode 100644
index 00000000000..31ec58aae6c
--- /dev/null
+++ b/cpp/src/IceBridge/IceBridge.cpp
@@ -0,0 +1,837 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+#include <Ice/Connection.h>
+#include <Ice/ObjectAdapter.h>
+#include <Ice/Service.h>
+#include <Ice/UUID.h>
+#include <IceUtil/Options.h>
+
+using namespace std;
+using namespace Ice;
+
+namespace
+{
+
+//
+// Represents a pending twoway invocation.
+//
+class Invocation : public IceUtil::Shared
+{
+public:
+
+ Invocation(const AMD_Object_ice_invokePtr& cb) :
+ _cb(cb)
+ {
+ }
+
+ void success(bool ok, const pair<const Byte*, const Byte*>& results)
+ {
+ _cb->ice_response(ok, results);
+ }
+
+ void exception(const Exception& ex)
+ {
+ _cb->ice_exception(ex);
+ }
+
+private:
+
+ AMD_Object_ice_invokePtr _cb;
+};
+typedef IceUtil::Handle<Invocation> InvocationPtr;
+
+//
+// Represents a pending oneway invocation.
+//
+class OnewayInvocation : public IceUtil::Shared
+{
+public:
+
+ OnewayInvocation(const AMD_Object_ice_invokePtr& cb) :
+ _cb(cb)
+ {
+ }
+
+ void success(bool, const pair<const Byte*, const Byte*>&)
+ {
+ assert(false);
+ }
+
+ void exception(const Exception& ex)
+ {
+ _cb->ice_exception(ex);
+ }
+
+ void sent(bool sentSynchronously)
+ {
+ _cb->ice_response(true, vector<Byte>());
+ }
+
+private:
+
+ AMD_Object_ice_invokePtr _cb;
+};
+typedef IceUtil::Handle<OnewayInvocation> OnewayInvocationPtr;
+
+//
+// Holds information about an incoming invocation that's been queued until an outgoing connection has
+// been established.
+//
+struct QueuedInvocation : public IceUtil::Shared
+{
+ QueuedInvocation(const AMD_Object_ice_invokePtr& c, const pair<const Byte*, const Byte*>& p, const Current& curr) :
+ cb(c), current(curr)
+ {
+ if(p.first)
+ {
+ //
+ // The pointers in paramData refer to the Ice marshaling buffer and won't remain valid after
+ // ice_invoke_async completes, so we have to make a copy of the parameter data.
+ //
+ vector<Byte> tmp(p.first, p.second);
+ paramData.swap(tmp);
+ }
+ }
+
+ const AMD_Object_ice_invokePtr cb;
+ vector<Byte> paramData;
+ const Current current;
+};
+typedef IceUtil::Handle<QueuedInvocation> QueuedInvocationPtr;
+typedef vector<QueuedInvocationPtr> InvocationList;
+
+//
+// Relays heartbeat messages.
+//
+class HeartbeatCallbackI : public HeartbeatCallback
+{
+public:
+
+ HeartbeatCallbackI(const ConnectionPtr& con) :
+ _connection(con)
+ {
+ }
+
+ virtual void heartbeat(const ConnectionPtr&)
+ {
+ //
+ // When a connection receives a heartbeat message, we send one over its corresponding connection.
+ //
+ try
+ {
+ _connection->begin_heartbeat();
+ }
+ catch(...)
+ {
+ }
+ }
+
+private:
+
+ const ConnectionPtr _connection;
+};
+
+//
+// Represents a pair of bridged connections.
+//
+class BridgeConnection : public IceUtil::Shared
+{
+public:
+
+ BridgeConnection(const ObjectAdapterPtr& adapter, const ObjectPrx& target, const ConnectionPtr& incoming) :
+ _adapter(adapter), _target(target), _incoming(incoming), _closed(false)
+ {
+ }
+
+ void setOutgoing(const ConnectionPtr& outgoing)
+ {
+ if(outgoing)
+ {
+ InvocationList queuedInvocations;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ assert(!_outgoing);
+
+ if(_closed)
+ {
+ //
+ // The incoming connection is already closed. There's no point in leaving the outgoing
+ // connection open.
+ //
+ outgoing->close(CloseGracefully);
+ }
+ else
+ {
+ _outgoing = outgoing;
+
+ //
+ // Save the queued invocations.
+ //
+ queuedInvocations.swap(_queue);
+
+ //
+ // Register hearbeat callbacks on both connections.
+ //
+ _incoming->setHeartbeatCallback(new HeartbeatCallbackI(_outgoing));
+ _outgoing->setHeartbeatCallback(new HeartbeatCallbackI(_incoming));
+
+ //
+ // Configure the outgoing connection for bidirectional requests.
+ //
+ _outgoing->setAdapter(_adapter);
+ }
+ }
+
+ //
+ // Flush any queued invocations.
+ //
+ flush(outgoing, queuedInvocations);
+ }
+ else
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ //
+ // The outgoing connection failed so we close the incoming connection. closed() will eventually
+ // be called for it.
+ //
+ if(_incoming)
+ {
+ _incoming->close(CloseGracefully);
+ }
+ }
+ }
+
+ void closed(const ConnectionPtr& con)
+ {
+ InvocationList queuedInvocations;
+ ConnectionPtr toBeClosed;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ if(con == _incoming)
+ {
+ //
+ // The incoming connection was closed.
+ //
+ toBeClosed = _outgoing;
+ queuedInvocations.swap(_queue);
+ _closed = true;
+ _incoming = 0;
+ }
+ else
+ {
+ assert(con == _outgoing);
+
+ //
+ // An outgoing connection was closed.
+ //
+ toBeClosed = _incoming;
+ _outgoing = 0;
+ }
+ }
+
+ //
+ // Close the corresponding connection.
+ //
+ if(toBeClosed)
+ {
+ //
+ // Examine the exception that caused the connection to be closed. A CloseConnectionException
+ // indicates the connection was closed gracefully and we do the same. We also look for
+ // ConnectionManuallyClosedException, which indicates the connection was closed locally.
+ // We close forcefully for any other exception.
+ //
+ try
+ {
+ con->throwException();
+ }
+ catch(const Ice::CloseConnectionException&)
+ {
+ toBeClosed->close(CloseGracefully);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ //
+ // Connection was manually closed by the bridge.
+ //
+ toBeClosed->close(ex.graceful ? CloseGracefully : CloseForcefully);
+ }
+ catch(...)
+ {
+ toBeClosed->close(CloseForcefully);
+ }
+ }
+
+ //
+ // Even though the connection is already closed, we still need to "complete" the pending invocations so
+ // that the connection's dispatch count is updated correctly.
+ //
+ for(InvocationList::iterator p = queuedInvocations.begin(); p != queuedInvocations.end(); ++p)
+ {
+ (*p)->cb->ice_exception(ConnectionLostException(__FILE__, __LINE__));
+ }
+ }
+
+ void dispatch(const AMD_Object_ice_invokePtr& cb, const pair<const Byte*, const Byte*>& paramData,
+ const Current& current)
+ {
+ //
+ // We've received an invocations, either from the client via the incoming connection, or from
+ // the server via the outgoing (bidirectional) connection. The current.con member tells us
+ // the connection over which the request arrived.
+ //
+
+ ConnectionPtr dest;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ if(_closed)
+ {
+ //
+ // If the incoming connection has already closed, we're done.
+ //
+ cb->ice_exception(ConnectionLostException(__FILE__, __LINE__));
+ }
+ else if(!_outgoing)
+ {
+ //
+ // Queue the invocation until the outgoing connection is established.
+ //
+ _queue.push_back(new QueuedInvocation(cb, paramData, current));
+ }
+ else if(current.con == _incoming)
+ {
+ //
+ // Forward to the outgoing connection.
+ //
+ dest = _outgoing;
+ }
+ else
+ {
+ //
+ // Forward to the incoming connection.
+ //
+ assert(current.con == _outgoing);
+ dest = _incoming;
+ }
+ }
+
+ if(dest)
+ {
+ send(dest, cb, paramData, current);
+ }
+ }
+
+private:
+
+ void flush(const ConnectionPtr& outgoing, const InvocationList& invocations)
+ {
+ for(InvocationList::const_iterator p = invocations.begin(); p != invocations.end(); ++p)
+ {
+ QueuedInvocationPtr i = *p;
+
+ pair<const Byte*, const Byte*> paramData;
+ if(i->paramData.empty())
+ {
+ paramData.first = paramData.second = 0;
+ }
+ else
+ {
+ paramData.first = &i->paramData[0];
+ paramData.second = paramData.first + i->paramData.size();
+ }
+
+ send(outgoing, i->cb, paramData, i->current);
+ }
+ }
+
+ void send(const ConnectionPtr& dest, const AMD_Object_ice_invokePtr& cb,
+ const pair<const Byte*, const Byte*>& paramData, const Current& current)
+ {
+ try
+ {
+ //
+ // Create a proxy having the same identity as the request.
+ //
+ ObjectPrx prx = dest->createProxy(current.id);
+
+ //
+ // Examine the proxy and the request to determine whether it should be forwarded as a oneway or a twoway.
+ //
+ if(!prx->ice_isTwoway() || !current.requestId)
+ {
+ if(prx->ice_isTwoway())
+ {
+ prx = prx->ice_oneway();
+ }
+ OnewayInvocationPtr i = new OnewayInvocation(cb);
+ Callback_Object_ice_invokePtr d =
+ newCallback_Object_ice_invoke(i, &OnewayInvocation::success, &OnewayInvocation::exception,
+ &OnewayInvocation::sent);
+ prx->begin_ice_invoke(current.operation, current.mode, paramData, current.ctx, d);
+ }
+ else
+ {
+ InvocationPtr i = new Invocation(cb);
+ Callback_Object_ice_invokePtr d =
+ newCallback_Object_ice_invoke(i, &Invocation::success, &Invocation::exception);
+ prx->begin_ice_invoke(current.operation, current.mode, paramData, current.ctx, d);
+ }
+ }
+ catch(const std::exception& ex)
+ {
+ cb->ice_exception(ex);
+ }
+ }
+
+ const ObjectAdapterPtr _adapter;
+ const ObjectPrx _target;
+ ConnectionPtr _incoming;
+
+ IceUtil::Mutex _lock;
+
+ bool _closed;
+ ConnectionPtr _outgoing;
+
+ //
+ // We maintain our own queue for invocations that arrive on the incoming connection before the outgoing
+ // connection has been established. We don't want to forward these to proxies and let the proxies handle
+ // the queuing because then the invocations could be sent out of order (e.g., when invocations are split
+ // among twoway/oneway/datagram proxies).
+ //
+ InvocationList _queue;
+};
+typedef IceUtil::Handle<BridgeConnection> BridgeConnectionPtr;
+
+//
+// Allows the bridge to be used as an Ice router.
+//
+class RouterI : public Router
+{
+public:
+
+ virtual ObjectPrx getClientProxy(const Current&) const
+ {
+ return 0;
+ }
+
+ virtual ObjectPrx getServerProxy(const Current&) const
+ {
+ return 0;
+ }
+
+ virtual ObjectProxySeq addProxies(const ObjectProxySeq&, const Current&)
+ {
+ return ObjectProxySeq();
+ }
+};
+
+class FinderI : public RouterFinder
+{
+public:
+
+ FinderI(const RouterPrx& router) :
+ _router(router)
+ {
+ }
+
+ virtual RouterPrx getRouter(const Current&)
+ {
+ return _router;
+ }
+
+private:
+
+ const RouterPrx _router;
+};
+
+class BridgeI;
+typedef IceUtil::Handle<BridgeI> BridgeIPtr;
+
+class CloseCallbackI : public CloseCallback
+{
+public:
+
+ CloseCallbackI(const BridgeIPtr& bridge) :
+ _bridge(bridge)
+ {
+ }
+
+ virtual void closed(const ConnectionPtr&);
+
+private:
+
+ const BridgeIPtr _bridge;
+};
+
+class GetConnectionCallback : public IceUtil::Shared
+{
+public:
+
+ GetConnectionCallback(const BridgeIPtr& bridge, const BridgeConnectionPtr& bc) :
+ _bridge(bridge), _bc(bc)
+ {
+ }
+
+ void success(const ConnectionPtr&);
+ void exception(const Exception&);
+
+private:
+
+ const BridgeIPtr _bridge;
+ const BridgeConnectionPtr _bc;
+};
+typedef IceUtil::Handle<GetConnectionCallback> GetConnectionCallbackPtr;
+
+//
+// The main bridge servant.
+//
+class BridgeI : public Ice::BlobjectArrayAsync
+{
+public:
+
+ BridgeI(const ObjectAdapterPtr& adapter, const ObjectPrx& target) :
+ _adapter(adapter), _target(target)
+ {
+ }
+
+ virtual void ice_invoke_async(const AMD_Object_ice_invokePtr& cb,
+ const std::pair<const Byte*, const Byte*>& paramData,
+ const Current& current)
+ {
+ BridgeConnectionPtr bc;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ ConnectionMap::iterator p = _connections.find(current.con);
+ if(p == _connections.end())
+ {
+ //
+ // The connection is unknown to us, it must be a new incoming connection.
+ //
+
+ EndpointInfoPtr info = current.con->getEndpoint()->getInfo();
+
+ //
+ // Create a target proxy that matches the configuration of the incoming connection.
+ //
+ ObjectPrx target;
+ if(info->datagram())
+ {
+ target = _target->ice_datagram();
+ }
+ else if(info->secure())
+ {
+ target = _target->ice_secure(true);
+ }
+ else
+ {
+ target = _target;
+ }
+
+ //
+ // Force the proxy to establish a new connection by using a unique connection ID.
+ //
+ target = target->ice_connectionId(Ice::generateUUID());
+
+ bc = new BridgeConnection(_adapter, target, current.con);
+ _connections.insert(make_pair(current.con, bc));
+ current.con->setCloseCallback(new CloseCallbackI(this));
+
+ //
+ // Try to establish the outgoing connection.
+ //
+ try
+ {
+ //
+ // Begin the connection establishment process asynchronously. This can take a while to complete,
+ // especially when using Bluetooth.
+ //
+ GetConnectionCallbackPtr gc = new GetConnectionCallback(this, bc);
+ Callback_Object_ice_getConnectionPtr d =
+ newCallback_Object_ice_getConnection(gc, &GetConnectionCallback::success,
+ &GetConnectionCallback::exception);
+ target->begin_ice_getConnection(d);
+ }
+ catch(const Exception& ex)
+ {
+ cb->ice_exception(ex);
+ return;
+ }
+ }
+ else
+ {
+ bc = p->second;
+ }
+ }
+
+ //
+ // Delegate the invocation to the BridgeConnection object.
+ //
+ bc->dispatch(cb, paramData, current);
+ }
+
+ void closed(const ConnectionPtr& con)
+ {
+ //
+ // Notify the BridgeConnection that a connection has closed. We also need to remove it from our map.
+ //
+
+ BridgeConnectionPtr bc;
+
+ {
+ IceUtil::Mutex::Lock lock(_lock);
+
+ ConnectionMap::iterator p = _connections.find(con);
+ assert(p != _connections.end());
+ bc = p->second;
+ _connections.erase(p);
+ }
+
+ bc->closed(con);
+ }
+
+ void outgoingSuccess(const BridgeConnectionPtr& bc, const ConnectionPtr& outgoing)
+ {
+ //
+ // An outgoing connection was established. Notify the BridgeConnection object.
+ //
+ IceUtil::Mutex::Lock lock(_lock);
+ _connections.insert(make_pair(outgoing, bc));
+ outgoing->setCloseCallback(new CloseCallbackI(this));
+ bc->setOutgoing(outgoing);
+ }
+
+ void outgoingException(const BridgeConnectionPtr& bc, const Exception& ex)
+ {
+ //
+ // An outgoing connection attempt failed. Notify the BridgeConnection object.
+ //
+ bc->setOutgoing(0);
+ }
+
+private:
+
+ const ObjectAdapterPtr _adapter;
+ const ObjectPrx _target;
+
+ IceUtil::Mutex _lock;
+
+ typedef map<ConnectionPtr, BridgeConnectionPtr> ConnectionMap;
+ ConnectionMap _connections;
+};
+
+void
+CloseCallbackI::closed(const ConnectionPtr& con)
+{
+ _bridge->closed(con);
+}
+
+void
+GetConnectionCallback::success(const ConnectionPtr& outgoing)
+{
+ _bridge->outgoingSuccess(_bc, outgoing);
+}
+
+void
+GetConnectionCallback::exception(const Exception& ex)
+{
+ _bridge->outgoingException(_bc, ex);
+}
+
+class BridgeService : public Service
+{
+public:
+
+ BridgeService();
+
+protected:
+
+ virtual bool start(int, char*[], int&);
+ virtual bool stop();
+ virtual CommunicatorPtr initializeCommunicator(int&, char*[], const InitializationData&);
+
+private:
+
+ void usage(const std::string&);
+};
+
+}
+
+BridgeService::BridgeService()
+{
+}
+
+bool
+BridgeService::start(int argc, char* argv[], int& status)
+{
+ IceUtilInternal::Options opts;
+ opts.addOpt("h", "help");
+ opts.addOpt("v", "version");
+
+ vector<string> args;
+ try
+ {
+ args = opts.parse(argc, const_cast<const char**>(argv));
+ }
+ catch(const IceUtilInternal::BadOptException& e)
+ {
+ error(e.reason);
+ usage(argv[0]);
+ return false;
+ }
+
+ if(opts.isSet("help"))
+ {
+ usage(argv[0]);
+ status = EXIT_SUCCESS;
+ return false;
+ }
+ if(opts.isSet("version"))
+ {
+ print(ICE_STRING_VERSION);
+ status = EXIT_SUCCESS;
+ return false;
+ }
+
+ if(!args.empty())
+ {
+ cerr << argv[0] << ": too many arguments" << endl;
+ usage(argv[0]);
+ return false;
+ }
+
+ PropertiesPtr properties = communicator()->getProperties();
+
+ const string targetProperty = "IceBridge.Target.Endpoints";
+ const string targetEndpoints = properties->getProperty(targetProperty);
+ if(targetEndpoints.empty())
+ {
+ error("property '" + targetProperty + "' is not set");
+ return false;
+ }
+
+ Ice::ObjectPrx target;
+
+ try
+ {
+ target = communicator()->stringToProxy("dummy:" + targetEndpoints);
+ }
+ catch(const std::exception& ex)
+ {
+ ServiceError err(this);
+ err << "setting for target endpoints '" << targetEndpoints << "' is invalid:\n" << ex;
+ return false;
+ }
+
+ //
+ // Initialize the object adapter.
+ //
+ const string endpointsProperty = "IceBridge.Source.Endpoints";
+ if(properties->getProperty(endpointsProperty).empty())
+ {
+ error("property '" + endpointsProperty + "' is not set");
+ return false;
+ }
+
+ ObjectAdapterPtr adapter = communicator()->createObjectAdapter("IceBridge.Source");
+
+ adapter->addDefaultServant(new BridgeI(adapter, target), "");
+
+ if(properties->getPropertyAsIntWithDefault("IceBridge.Router", 0) > 0)
+ {
+ RouterPrx router = RouterPrx::uncheckedCast(adapter->add(new RouterI, stringToIdentity("IceBridge/router")));
+ adapter->add(new FinderI(router), stringToIdentity("Ice/RouterFinder"));
+ }
+
+ try
+ {
+ adapter->activate();
+ }
+ catch(const std::exception& ex)
+ {
+ {
+ ServiceError err(this);
+ err << "caught exception activating object adapter\n" << ex;
+ }
+
+ stop();
+ return false;
+ }
+
+ return true;
+}
+
+bool
+BridgeService::stop()
+{
+ return true;
+}
+
+CommunicatorPtr
+BridgeService::initializeCommunicator(int& argc, char* argv[], const InitializationData& initializationData)
+{
+ InitializationData initData = initializationData;
+ initData.properties = createProperties(argc, argv, initializationData.properties);
+
+ StringSeq args = argsToStringSeq(argc, argv);
+ args = initData.properties->parseCommandLineOptions("IceBridge", args);
+ stringSeqToArgs(args, argc, argv);
+
+ //
+ // Disable automatic retry by default.
+ //
+ if(initData.properties->getProperty("Ice.RetryIntervals").empty())
+ {
+ initData.properties->setProperty("Ice.RetryIntervals", "-1");
+ }
+
+ return Service::initializeCommunicator(argc, argv, initData);
+}
+
+void
+BridgeService::usage(const string& appName)
+{
+ string options =
+ "Options:\n"
+ "-h, --help Show this message.\n"
+ "-v, --version Display the Ice version.\n";
+#ifndef _WIN32
+ options.append(
+ "--daemon Run as a daemon.\n"
+ "--pidfile FILE Write process ID into FILE.\n"
+ "--noclose Do not close open file descriptors.\n"
+ "--nochdir Do not change the current working directory.\n"
+ );
+#endif
+ print("Usage: " + appName + " [options]\n" + options);
+}
+
+#ifdef _WIN32
+
+int
+wmain(int argc, wchar_t* argv[])
+
+#else
+
+int
+main(int argc, char* argv[])
+
+#endif
+{
+ BridgeService svc;
+ return svc.main(argc, argv);
+}
diff --git a/cpp/src/IceBridge/Makefile.mk b/cpp/src/IceBridge/Makefile.mk
new file mode 100644
index 00000000000..b3c282b0c39
--- /dev/null
+++ b/cpp/src/IceBridge/Makefile.mk
@@ -0,0 +1,15 @@
+# **********************************************************************
+#
+# Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
+#
+# This copy of Ice is licensed to you under the terms described in the
+# ICE_LICENSE file included in this distribution.
+#
+# **********************************************************************
+
+$(project)_programs := icebridge
+$(project)_dependencies := Ice
+
+icebridge_targetdir := $(bindir)
+
+projects += $(project)
diff --git a/cpp/src/IceGrid/SessionManager.cpp b/cpp/src/IceGrid/SessionManager.cpp
index 34caf1f9bd2..57ded43015f 100644
--- a/cpp/src/IceGrid/SessionManager.cpp
+++ b/cpp/src/IceGrid/SessionManager.cpp
@@ -58,7 +58,7 @@ SessionManager::findAllQueryObjects(bool cached)
{
try
{
- connection->close(false);
+ connection->close(Ice::CloseGracefullyAndWait);
}
catch(const Ice::LocalException&)
{
diff --git a/cpp/test/Ice/acm/AllTests.cpp b/cpp/test/Ice/acm/AllTests.cpp
index 52c4922d98b..8e6563ede85 100644
--- a/cpp/test/Ice/acm/AllTests.cpp
+++ b/cpp/test/Ice/acm/AllTests.cpp
@@ -535,6 +535,32 @@ public:
}
};
+class HeartbeatManualTest : public TestCase
+{
+public:
+
+ HeartbeatManualTest(const RemoteCommunicatorPrxPtr& com) : TestCase("manual heartbeats", com)
+ {
+ //
+ // Disable heartbeats.
+ //
+ setClientACM(10, -1, 0);
+ setServerACM(10, -1, 0);
+ }
+
+ virtual void runTestCase(const RemoteObjectAdapterPrxPtr& adapter, const TestIntfPrxPtr& proxy)
+ {
+ proxy->startHeartbeatCount();
+ Ice::ConnectionPtr con = proxy->ice_getConnection();
+ con->heartbeat();
+ con->heartbeat();
+ con->heartbeat();
+ con->heartbeat();
+ con->heartbeat();
+ proxy->waitForHeartbeatCount(5);
+ }
+};
+
class SetACMTest : public TestCase
{
public:
@@ -564,8 +590,9 @@ public:
test(acm.close == Ice::CloseOnInvocationAndIdle);
test(acm.heartbeat == Ice::HeartbeatAlways);
- // Make sure the client sends few heartbeats to the server
- proxy->waitForHeartbeat(2);
+ // Make sure the client sends a few heartbeats to the server.
+ proxy->startHeartbeatCount();
+ proxy->waitForHeartbeatCount(2);
}
};
@@ -591,6 +618,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
tests.push_back(ICE_MAKE_SHARED(HeartbeatOnIdleTest, com));
tests.push_back(ICE_MAKE_SHARED(HeartbeatAlwaysTest, com));
+ tests.push_back(ICE_MAKE_SHARED(HeartbeatManualTest, com));
tests.push_back(ICE_MAKE_SHARED(SetACMTest, com));
for(vector<TestCasePtr>::const_iterator p = tests.begin(); p != tests.end(); ++p)
diff --git a/cpp/test/Ice/acm/Test.ice b/cpp/test/Ice/acm/Test.ice
index 5ab98180dd3..d78abd6eb0f 100644
--- a/cpp/test/Ice/acm/Test.ice
+++ b/cpp/test/Ice/acm/Test.ice
@@ -17,7 +17,8 @@ interface TestIntf
void sleep(int seconds);
void sleepAndHold(int seconds);
void interruptSleep();
- void waitForHeartbeat(int count);
+ void startHeartbeatCount();
+ void waitForHeartbeatCount(int count);
};
interface RemoteObjectAdapter
diff --git a/cpp/test/Ice/acm/TestI.cpp b/cpp/test/Ice/acm/TestI.cpp
index d7742680a70..f4987856421 100644
--- a/cpp/test/Ice/acm/TestI.cpp
+++ b/cpp/test/Ice/acm/TestI.cpp
@@ -26,41 +26,6 @@ toString(int value)
return os.str();
}
-class HeartbeatCallbackI :
-#ifdef ICE_CPP11_MAPPING
- public enable_shared_from_this<HeartbeatCallbackI>,
-#else
- public Ice::HeartbeatCallback,
-#endif
- private IceUtil::Monitor<IceUtil::Mutex>
-{
-public:
-
- void
- waitForCount(int count)
- {
- Lock sync(*this);
- _count = count;
- while(_count > 0)
- {
- wait();
- }
- }
-
- virtual void
- heartbeat(const Ice::ConnectionPtr&)
- {
- Lock sync(*this);
- --_count;
- notifyAll();
- }
-
-private:
-
- int _count;
-};
-ICE_DEFINE_PTR(HeartbeatCallbackIPtr, HeartbeatCallbackI);
-
}
RemoteObjectAdapterPrxPtr
@@ -162,16 +127,23 @@ TestI::interruptSleep(const Ice::Current& current)
}
void
-TestI::waitForHeartbeat(int count, const Ice::Current& current)
+TestI::startHeartbeatCount(const Ice::Current& current)
{
- HeartbeatCallbackIPtr callback = ICE_MAKE_SHARED(HeartbeatCallbackI);
+ _callback = ICE_MAKE_SHARED(HeartbeatCallbackI);
#ifdef ICE_CPP11_MAPPING
+ HeartbeatCallbackIPtr callback = _callback;
current.con->setHeartbeatCallback([callback](Ice::ConnectionPtr connection)
{
callback->heartbeat(move(connection));
});
#else
- current.con->setHeartbeatCallback(callback);
+ current.con->setHeartbeatCallback(_callback);
#endif
- callback->waitForCount(count);
+}
+
+void
+TestI::waitForHeartbeatCount(int count, const Ice::Current&)
+{
+ assert(_callback);
+ _callback->waitForCount(count);
}
diff --git a/cpp/test/Ice/acm/TestI.h b/cpp/test/Ice/acm/TestI.h
index be031e3799d..c45415022a6 100644
--- a/cpp/test/Ice/acm/TestI.h
+++ b/cpp/test/Ice/acm/TestI.h
@@ -44,7 +44,51 @@ public:
virtual void sleep(int, const Ice::Current&);
virtual void sleepAndHold(int, const Ice::Current&);
virtual void interruptSleep(const Ice::Current&);
- virtual void waitForHeartbeat(int, const Ice::Current&);
+ virtual void startHeartbeatCount(const Ice::Current&);
+ virtual void waitForHeartbeatCount(int, const Ice::Current&);
+
+private:
+
+ class HeartbeatCallbackI :
+#ifdef ICE_CPP11_MAPPING
+ public std::enable_shared_from_this<HeartbeatCallbackI>,
+#else
+ public Ice::HeartbeatCallback,
+#endif
+ private IceUtil::Monitor<IceUtil::Mutex>
+ {
+ public:
+
+ HeartbeatCallbackI() :
+ _count(0)
+ {
+ }
+
+ void
+ waitForCount(int count)
+ {
+ Lock sync(*this);
+ while(_count < count)
+ {
+ wait();
+ }
+ }
+
+ virtual void
+ heartbeat(const Ice::ConnectionPtr&)
+ {
+ Lock sync(*this);
+ ++_count;
+ notifyAll();
+ }
+
+ private:
+
+ int _count;
+ };
+ ICE_DEFINE_PTR(HeartbeatCallbackIPtr, HeartbeatCallbackI);
+
+ HeartbeatCallbackIPtr _callback;
};
#endif
diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp
index 439f8a7792f..3366c4fdc7e 100644
--- a/cpp/test/Ice/ami/AllTests.cpp
+++ b/cpp/test/Ice/ami/AllTests.cpp
@@ -773,6 +773,21 @@ private:
};
typedef IceUtil::Handle<FlushExCallback> FlushExCallbackPtr;
+class CloseCallback : virtual public CallbackBase, virtual public Ice::CloseCallback
+{
+public:
+
+ CloseCallback()
+ {
+ }
+
+ virtual void closed(const Ice::ConnectionPtr& con)
+ {
+ called();
+ }
+};
+typedef IceUtil::Handle<CloseCallback> CloseCallbackPtr;
+
class Thrower : public CallbackBase
{
public:
@@ -1850,7 +1865,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
auto b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
auto id = this_thread::get_id();
promise<void> promise;
@@ -1924,7 +1939,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
auto b1 = Ice::uncheckedCast<Test::TestIntfPrx>(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
promise<void> promise;
b1->ice_getConnection()->flushBatchRequestsAsync(
@@ -2002,7 +2017,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
auto b1 = Ice::uncheckedCast<Test::TestIntfPrx>(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
promise<void> promise;
auto id = this_thread::get_id();
@@ -2072,8 +2087,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
- b2->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
promise<void> promise;
auto id = this_thread::get_id();
@@ -2161,8 +2176,38 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
if(p->ice_getConnection() && protocol != "bt")
{
- cout << "testing close connection with sending queue... " << flush;
+ cout << "testing graceful close connection with wait... " << flush;
+ {
+ //
+ // Local case: begin several requests, close the connection gracefully, and make sure it waits
+ // for the requests to complete.
+ //
+ vector<future<void>> results;
+ for(int i = 0; i < 3; ++i)
+ {
+ auto s = make_shared<promise<void>>();
+ p->sleepAsync(50,
+ [s]() { s->set_value(); },
+ [s](exception_ptr ex) { s->set_exception(ex); });
+ results.push_back(s->get_future());
+ }
+ p->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ for(vector<future<void>>::iterator p = results.begin(); p != results.end(); ++p)
+ {
+ try
+ {
+ p->get();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ }
+ }
{
+ //
+ // Remote case.
+ //
Ice::ByteSeq seq;
seq.resize(1024 * 10);
for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q)
@@ -2191,7 +2236,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
results.push_back(s->get_future());
}
atomic_flag sent = ATOMIC_FLAG_INIT;
- p->closeAsync(false, nullptr, nullptr, [&sent](bool) { sent.test_and_set(); });
+ p->closeAsync(Test::CloseMode::CloseGracefullyAndWait, nullptr, nullptr,
+ [&sent](bool) { sent.test_and_set(); });
if(!sent.test_and_set())
{
for(int i = 0; i < maxQueue; i++)
@@ -2231,8 +2277,111 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
}
}
cout << "ok" << endl;
- }
+ cout << "testing graceful close connection without wait... " << flush;
+ {
+ //
+ // Local case: start a lengthy operation and then close the connection gracefully on the client side
+ // without waiting for the pending invocation to complete. There will be no retry and we expect the
+ // invocation to fail with ConnectionManuallyClosedException.
+ //
+ // This test requires two threads in the server's thread pool: one will block in sleep() and the other
+ // will process the CloseConnection message.
+ //
+ p->ice_ping();
+ auto con = p->ice_getConnection();
+ auto s = make_shared<promise<void>>();
+ p->sleepAsync(100,
+ [s]() { s->set_value(); },
+ [s](exception_ptr ex) { s->set_exception(ex); });
+ future<void> f = s->get_future();
+ con->close(Ice::ConnectionClose::CloseGracefully);
+ try
+ {
+ f.get();
+ test(false);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ test(ex.graceful);
+ }
+
+ //
+ // Remote case: the server closes the connection gracefully. Our call to TestIntf::close()
+ // completes successfully and then the connection should be closed immediately afterward,
+ // despite the fact that there's a pending call to sleep(). The call to sleep() should be
+ // automatically retried and complete successfully with a new connection.
+ //
+ p->ice_ping();
+ con = p->ice_getConnection();
+ auto sc = make_shared<promise<void>>();
+ con->setCloseCallback(
+ [sc](Ice::ConnectionPtr connection)
+ {
+ sc->set_value();
+ });
+ future<void> fc = sc->get_future();
+ s = make_shared<promise<void>>();
+ p->sleepAsync(100,
+ [s]() { s->set_value(); },
+ [s](exception_ptr ex) { s->set_exception(ex); });
+ f = s->get_future();
+ p->close(Test::CloseMode::CloseGracefully);
+ fc.get();
+ try
+ {
+ f.get();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ p->ice_ping();
+ test(p->ice_getConnection() != con);
+ }
+ cout << "ok" << endl;
+
+ cout << "testing forceful close connection... " << flush;
+ {
+ //
+ // Local case: start a lengthy operation and then close the connection forcefully on the client side.
+ // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException.
+ //
+ p->ice_ping();
+ auto con = p->ice_getConnection();
+ auto s = make_shared<promise<void>>();
+ p->sleepAsync(100,
+ [s]() { s->set_value(); },
+ [s](exception_ptr ex) { s->set_exception(ex); });
+ future<void> f = s->get_future();
+ con->close(Ice::ConnectionClose::CloseForcefully);
+ try
+ {
+ f.get();
+ test(false);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ test(!ex.graceful);
+ }
+
+ //
+ // Remote case: the server closes the connection forcefully. This causes the request to fail
+ // with a ConnectionLostException. Since the close() operation is not idempotent, the client
+ // will not retry.
+ //
+ try
+ {
+ p->close(Test::CloseMode::CloseForcefully);
+ test(false);
+ }
+ catch(const Ice::ConnectionLostException&)
+ {
+ // Expected.
+ }
+ }
+ cout << "ok" << endl;
+ }
}
p->shutdown();
@@ -2944,7 +3093,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
Test::TestIntfPrx b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = b1->begin_ice_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
@@ -2962,7 +3111,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
Test::TestIntfPrx b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
b1->begin_ice_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie);
@@ -3013,7 +3162,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
Test::TestIntfPrx b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = b1->begin_ice_flushBatchRequests(
Ice::newCallback_Object_ice_flushBatchRequests(cb, &FlushCallback::exception,
@@ -3031,7 +3180,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
Test::TestIntfPrx b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
b1->begin_ice_flushBatchRequests(
Ice::newCallback_Object_ice_flushBatchRequests(cb, &FlushCallback::exceptionWC,
@@ -3100,7 +3249,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback();
Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync));
@@ -3119,7 +3268,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback(cookie);
b1->ice_getConnection()->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync), cookie);
@@ -3171,7 +3320,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback();
Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(
Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exception,
@@ -3191,7 +3340,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback(cookie);
b1->ice_getConnection()->begin_flushBatchRequests(
Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exceptionWC,
@@ -3249,7 +3398,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
@@ -3268,7 +3417,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
communicator->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie);
@@ -3319,7 +3468,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
@@ -3346,8 +3495,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
- b2->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
@@ -3402,7 +3551,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
@@ -3422,7 +3571,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
communicator->begin_flushBatchRequests(
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exceptionWC,
@@ -3476,7 +3625,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
@@ -3504,8 +3653,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
- b2->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
@@ -3715,8 +3864,35 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
if(p->ice_getConnection() && protocol != "bt")
{
- cout << "testing close connection with sending queue... " << flush;
+ cout << "testing graceful close connection with wait... " << flush;
+ {
+ //
+ // Local case: begin several requests, close the connection gracefully, and make sure it waits
+ // for the requests to complete.
+ //
+ vector<Ice::AsyncResultPtr> results;
+ for(int i = 0; i < 3; ++i)
+ {
+ results.push_back(p->begin_sleep(50));
+ }
+ p->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ for(vector<Ice::AsyncResultPtr>::const_iterator q = results.begin(); q != results.end(); ++q)
+ {
+ (*q)->waitForCompleted();
+ try
+ {
+ (*q)->throwLocalException();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ }
+ }
{
+ //
+ // Remote case.
+ //
Ice::ByteSeq seq;
seq.resize(1024 * 10);
for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q)
@@ -3740,7 +3916,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
{
results.push_back(p->begin_opWithPayload(seq));
}
- if(!p->begin_close(false)->isSent())
+ if(!p->begin_close(Test::CloseGracefullyAndWait)->isSent())
{
for(int i = 0; i < maxQueue; i++)
{
@@ -3775,6 +3951,96 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
}
}
cout << "ok" << endl;
+
+ cout << "testing graceful close connection without wait... " << flush;
+ {
+ //
+ // Local case: start a lengthy operation and then close the connection gracefully on the client side
+ // without waiting for the pending invocation to complete. There will be no retry and we expect the
+ // invocation to fail with ConnectionManuallyClosedException.
+ //
+ // This test requires two threads in the server's thread pool: one will block in sleep() and the other
+ // will process the CloseConnection message.
+ //
+ p->ice_ping();
+ Ice::ConnectionPtr con = p->ice_getConnection();
+ Ice::AsyncResultPtr r = p->begin_sleep(100);
+ con->close(Ice::CloseGracefully);
+ r->waitForCompleted();
+ try
+ {
+ r->throwLocalException();
+ test(false);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ test(ex.graceful);
+ }
+
+ //
+ // Remote case: the server closes the connection gracefully. Our call to TestIntf::close()
+ // completes successfully and then the connection should be closed immediately afterward,
+ // despite the fact that there's a pending call to sleep(). The call to sleep() should be
+ // automatically retried and complete successfully.
+ //
+ p->ice_ping();
+ con = p->ice_getConnection();
+ CloseCallbackPtr cb = new CloseCallback;
+ con->setCloseCallback(cb);
+ r = p->begin_sleep(100);
+ p->close(Test::CloseGracefully);
+ cb->check();
+ r->waitForCompleted();
+ try
+ {
+ r->throwLocalException();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ p->ice_ping();
+ test(p->ice_getConnection() != con);
+ }
+ cout << "ok" << endl;
+
+ cout << "testing forceful close connection... " << flush;
+ {
+ //
+ // Local case: start a lengthy operation and then close the connection forcefully on the client side.
+ // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException.
+ //
+ p->ice_ping();
+ Ice::ConnectionPtr con = p->ice_getConnection();
+ Ice::AsyncResultPtr r = p->begin_sleep(100);
+ con->close(Ice::CloseForcefully);
+ r->waitForCompleted();
+ try
+ {
+ r->throwLocalException();
+ test(false);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ test(!ex.graceful);
+ }
+
+ //
+ // Remote case: the server closes the connection forcefully. This causes the request to fail
+ // with a ConnectionLostException. Since the close() operation is not idempotent, the client
+ // will not retry.
+ //
+ try
+ {
+ p->close(Test::CloseForcefully);
+ test(false);
+ }
+ catch(const Ice::ConnectionLostException&)
+ {
+ // Expected.
+ }
+ }
+ cout << "ok" << endl;
}
p->shutdown();
diff --git a/cpp/test/Ice/ami/Client.cpp b/cpp/test/Ice/ami/Client.cpp
index 18fb26c44bb..c6915bc2557 100644
--- a/cpp/test/Ice/ami/Client.cpp
+++ b/cpp/test/Ice/ami/Client.cpp
@@ -37,6 +37,7 @@ main(int argc, char* argv[])
{
Ice::InitializationData initData = getTestInitData(argc, argv);
initData.properties->setProperty("Ice.Warn.AMICallback", "0");
+ initData.properties->setProperty("Ice.Warn.Connections", "0");
//
// Limit the send buffer size, this test relies on the socket
diff --git a/cpp/test/Ice/ami/Test.ice b/cpp/test/Ice/ami/Test.ice
index 289750a3eb2..c82910e4620 100644
--- a/cpp/test/Ice/ami/Test.ice
+++ b/cpp/test/Ice/ami/Test.ice
@@ -19,6 +19,13 @@ exception TestIntfException
{
};
+enum CloseMode
+{
+ CloseForcefully,
+ CloseGracefully,
+ CloseGracefullyAndWait
+};
+
interface TestIntf
{
void op();
@@ -34,7 +41,8 @@ interface TestIntf
out int eight, out int nine, out int ten, out int eleven);
int opBatchCount();
bool waitForBatch(int count);
- void close(bool force);
+ void close(CloseMode mode);
+ void sleep(int ms);
void shutdown();
bool supportsFunctionalTests();
diff --git a/cpp/test/Ice/ami/TestI.cpp b/cpp/test/Ice/ami/TestI.cpp
index f2dab5afd70..d7aa1d44762 100644
--- a/cpp/test/Ice/ami/TestI.cpp
+++ b/cpp/test/Ice/ami/TestI.cpp
@@ -93,9 +93,16 @@ TestIntfI::waitForBatch(Ice::Int count, const Ice::Current&)
}
void
-TestIntfI::close(bool force, const Ice::Current& current)
+TestIntfI::close(Test::CloseMode mode, const Ice::Current& current)
{
- current.con->close(force);
+ current.con->close(static_cast<ConnectionClose>(mode));
+}
+
+void
+TestIntfI::sleep(Ice::Int ms, const Ice::Current& current)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ timedWait(IceUtil::Time::milliSeconds(ms));
}
void
diff --git a/cpp/test/Ice/ami/TestI.h b/cpp/test/Ice/ami/TestI.h
index bee3571b74b..edde85f6008 100644
--- a/cpp/test/Ice/ami/TestI.h
+++ b/cpp/test/Ice/ami/TestI.h
@@ -32,7 +32,8 @@ public:
virtual void opWithArgs(Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&,
Ice::Int&, Ice::Int&, Ice::Int&, Ice::Int&, const Ice::Current&);
virtual bool waitForBatch(Ice::Int, const Ice::Current&);
- virtual void close(bool, const Ice::Current&);
+ virtual void close(Test::CloseMode, const Ice::Current&);
+ virtual void sleep(Ice::Int, const Ice::Current&);
virtual void shutdown(const Ice::Current&);
virtual bool supportsFunctionalTests(const Ice::Current&);
diff --git a/cpp/test/Ice/background/AllTests.cpp b/cpp/test/Ice/background/AllTests.cpp
index f5fdc059870..4d6b82904d6 100644
--- a/cpp/test/Ice/background/AllTests.cpp
+++ b/cpp/test/Ice/background/AllTests.cpp
@@ -379,7 +379,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
#ifdef ICE_CPP11_MAPPING
background->opAsync();
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
background->opAsync();
vector<future<void>> results;
@@ -407,7 +407,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
}
#else
background->begin_op();
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
background->begin_op();
vector<Ice::AsyncResultPtr> results;
@@ -452,7 +452,7 @@ connectTests(const ConfigurationPtr& configuration, const Test::BackgroundPrxPtr
{
test(false);
}
- background->ice_getConnection()->close(false);
+ background->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
int i;
for(i = 0; i < 4; ++i)
@@ -560,7 +560,7 @@ connectTests(const ConfigurationPtr& configuration, const Test::BackgroundPrxPtr
}
configuration->connectException(new Ice::SocketException(__FILE__, __LINE__));
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10));
configuration->connectException(0);
try
@@ -592,7 +592,7 @@ initializeTests(const ConfigurationPtr& configuration,
{
test(false);
}
- background->ice_getConnection()->close(false);
+ background->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
int i;
for(i = 0; i < 4; i++)
@@ -682,7 +682,7 @@ initializeTests(const ConfigurationPtr& configuration,
cerr << ex << endl;
test(false);
}
- background->ice_getConnection()->close(false);
+ background->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
try
{
@@ -695,7 +695,7 @@ initializeTests(const ConfigurationPtr& configuration,
cerr << ex << endl;
test(false);
}
- background->ice_getConnection()->close(false);
+ background->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
#endif
//
@@ -728,7 +728,7 @@ initializeTests(const ConfigurationPtr& configuration,
{
test(false);
}
- background->ice_getConnection()->close(false);
+ background->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
try
{
@@ -764,7 +764,7 @@ initializeTests(const ConfigurationPtr& configuration,
}
configuration->initializeException(new Ice::SocketException(__FILE__, __LINE__));
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10));
configuration->initializeException(0);
try
@@ -784,12 +784,12 @@ initializeTests(const ConfigurationPtr& configuration,
}
configuration->initializeSocketOperation(IceInternal::SocketOperationWrite);
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
background->ice_ping();
configuration->initializeSocketOperation(IceInternal::SocketOperationNone);
ctl->initializeException(true);
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10));
ctl->initializeException(false);
try
@@ -812,11 +812,11 @@ initializeTests(const ConfigurationPtr& configuration,
{
#if !defined(ICE_USE_IOCP) && !defined(ICE_USE_CFSTREAM)
ctl->initializeSocketOperation(IceInternal::SocketOperationWrite);
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
background->op();
ctl->initializeSocketOperation(IceInternal::SocketOperationNone);
#else
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
background->op();
#endif
}
@@ -847,7 +847,7 @@ validationTests(const ConfigurationPtr& configuration,
{
test(false);
}
- background->ice_getConnection()->close(false);
+ background->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
try
{
@@ -921,7 +921,7 @@ validationTests(const ConfigurationPtr& configuration,
cerr << ex << endl;
test(false);
}
- background->ice_getConnection()->close(false);
+ background->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
try
{
@@ -1081,7 +1081,7 @@ validationTests(const ConfigurationPtr& configuration,
cerr << ex << endl;
test(false);
}
- background->ice_getConnection()->close(false);
+ background->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
try
{
@@ -1163,7 +1163,7 @@ validationTests(const ConfigurationPtr& configuration,
#else
backgroundBatchOneway->begin_ice_flushBatchRequests();
#endif
- backgroundBatchOneway->ice_getConnection()->close(false);
+ backgroundBatchOneway->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
ctl->holdAdapter();
backgroundBatchOneway->opWithPayload(seq);
@@ -1183,10 +1183,10 @@ validationTests(const ConfigurationPtr& configuration,
// in the flush to report a CloseConnectionException). Instead we
// wait for the first flush to complete.
//
- //backgroundBatchOneway->ice_getConnection()->close(false);
+ //backgroundBatchOneway->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
backgroundBatchOneway->end_ice_flushBatchRequests(r);
#endif
- backgroundBatchOneway->ice_getConnection()->close(false);
+ backgroundBatchOneway->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
void
@@ -1775,10 +1775,10 @@ readWriteTests(const ConfigurationPtr& configuration,
IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10));
background->ice_ping();
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(10));
- background->ice_getCachedConnection()->close(true);
+ background->ice_getCachedConnection()->close(Ice::CloseForcefully);
}
thread1->destroy();
diff --git a/cpp/test/Ice/binding/AllTests.cpp b/cpp/test/Ice/binding/AllTests.cpp
index a8d5f454a74..26e7fa48aa6 100644
--- a/cpp/test/Ice/binding/AllTests.cpp
+++ b/cpp/test/Ice/binding/AllTests.cpp
@@ -170,7 +170,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
test(test2->ice_getConnection() == test3->ice_getConnection());
names.erase(test1->getAdapterName());
- test1->ice_getConnection()->close(false);
+ test1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
//
@@ -192,7 +192,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
for(vector<RemoteObjectAdapterPrxPtr>::const_iterator q = adapters.begin(); q != adapters.end(); ++q)
{
- (*q)->getTestIntf()->ice_getConnection()->close(false);
+ (*q)->getTestIntf()->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
}
@@ -217,7 +217,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
test(test2->ice_getConnection() == test3->ice_getConnection());
names.erase(test1->getAdapterName());
- test1->ice_getConnection()->close(false);
+ test1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
//
@@ -314,7 +314,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
{
try
{
- (*q)->getTestIntf()->ice_getConnection()->close(false);
+ (*q)->getTestIntf()->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
catch(const Ice::LocalException&)
{
@@ -354,7 +354,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
test(test2->ice_getConnection() == test3->ice_getConnection());
names.erase(getAdapterNameWithAMI(test1));
- test1->ice_getConnection()->close(false);
+ test1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
//
@@ -376,7 +376,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
for(vector<RemoteObjectAdapterPrxPtr>::const_iterator q = adapters.begin(); q != adapters.end(); ++q)
{
- (*q)->getTestIntf()->ice_getConnection()->close(false);
+ (*q)->getTestIntf()->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
}
@@ -401,7 +401,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
test(test2->ice_getConnection() == test3->ice_getConnection());
names.erase(test1->getAdapterName());
- test1->ice_getConnection()->close(false);
+ test1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
//
@@ -433,7 +433,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
while(!names.empty())
{
names.erase(test->getAdapterName());
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
test = ICE_UNCHECKED_CAST(TestIntfPrx, test->ice_endpointSelection(Ice::Random));
@@ -445,7 +445,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
while(!names.empty())
{
names.erase(test->getAdapterName());
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
deactivate(com, adapters);
@@ -473,7 +473,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
#if TARGET_OS_IPHONE > 0
if(i != nRetry)
{
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
for(i = 0; i < nRetry && test->getAdapterName() == "Adapter31"; i++);
}
#endif
@@ -483,7 +483,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
#if TARGET_OS_IPHONE > 0
if(i != nRetry)
{
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
for(i = 0; i < nRetry && test->getAdapterName() == "Adapter32"; i++);
}
#endif
@@ -493,7 +493,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
#if TARGET_OS_IPHONE > 0
if(i != nRetry)
{
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
for(i = 0; i < nRetry && test->getAdapterName() == "Adapter33"; i++);
}
#endif
@@ -525,29 +525,29 @@ allTests(const Ice::CommunicatorPtr& communicator)
#if TARGET_OS_IPHONE > 0
if(i != nRetry)
{
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
for(i = 0; i < nRetry && test->getAdapterName() == "Adapter36"; i++);
}
#endif
test(i == nRetry);
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
adapters.push_back(com->createObjectAdapter("Adapter35", endpoints[1]->toString()));
for(i = 0; i < nRetry && test->getAdapterName() == "Adapter35"; i++);
#if TARGET_OS_IPHONE > 0
if(i != nRetry)
{
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
for(i = 0; i < nRetry && test->getAdapterName() == "Adapter35"; i++);
}
#endif
test(i == nRetry);
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
adapters.push_back(com->createObjectAdapter("Adapter34", endpoints[0]->toString()));
for(i = 0; i < nRetry && test->getAdapterName() == "Adapter34"; i++);
#if TARGET_OS_IPHONE > 0
if(i != nRetry)
{
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
for(i = 0; i < nRetry && test->getAdapterName() == "Adapter34"; i++);
}
#endif
@@ -865,7 +865,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
for(i = 0; i < 5; i++)
{
test(test->getAdapterName() == "Adapter82");
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
TestIntfPrxPtr testSecure = ICE_UNCHECKED_CAST(TestIntfPrx, test->ice_secure(true));
@@ -881,7 +881,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
for(i = 0; i < 5; i++)
{
test(test->getAdapterName() == "Adapter81");
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
com->createObjectAdapter("Adapter83", (test->ice_getEndpoints()[1])->toString()); // Reactive tcp OA.
@@ -889,7 +889,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
for(i = 0; i < 5; i++)
{
test(test->getAdapterName() == "Adapter83");
- test->ice_getConnection()->close(false);
+ test->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
com->deactivateObjectAdapter(adapters[0]);
@@ -1098,7 +1098,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
// Close the connection now to free a FD (it could be done after the sleep but
// there could be race condiutation since the connection might not be closed
// immediately due to threading).
- test->ice_connectionId("0")->ice_getConnection()->close(false);
+ test->ice_connectionId("0")->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
//
// The server closed the acceptor, wait one second and retry after freeing a FD.
diff --git a/cpp/test/Ice/hold/AllTests.cpp b/cpp/test/Ice/hold/AllTests.cpp
index d08764d57d8..7cb0e59a53c 100644
--- a/cpp/test/Ice/hold/AllTests.cpp
+++ b/cpp/test/Ice/hold/AllTests.cpp
@@ -290,7 +290,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
{
completed->get_future().get();
holdSerialized->ice_ping(); // Ensure everything's dispatched
- holdSerialized->ice_getConnection()->close(false);
+ holdSerialized->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
}
completed->get_future().get();
@@ -305,7 +305,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
{
result->waitForSent();
holdSerialized->ice_ping(); // Ensure everything's dispatched
- holdSerialized->ice_getConnection()->close(false);
+ holdSerialized->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
}
result->waitForCompleted();
diff --git a/cpp/test/Ice/location/AllTests.cpp b/cpp/test/Ice/location/AllTests.cpp
index 148230c9fbf..4e96a75a0d5 100644
--- a/cpp/test/Ice/location/AllTests.cpp
+++ b/cpp/test/Ice/location/AllTests.cpp
@@ -637,7 +637,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const string& ref)
cout << "testing object migration... " << flush;
hello = ICE_CHECKED_CAST(HelloPrx, communicator->stringToProxy("hello"));
obj->migrateHello();
- hello->ice_getConnection()->close(false);
+ hello->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
hello->sayHello();
obj->migrateHello();
hello->sayHello();
diff --git a/cpp/test/Ice/metrics/AllTests.cpp b/cpp/test/Ice/metrics/AllTests.cpp
index c880af199ce..8d30231b520 100644
--- a/cpp/test/Ice/metrics/AllTests.cpp
+++ b/cpp/test/Ice/metrics/AllTests.cpp
@@ -287,7 +287,7 @@ struct Connect
{
if(proxy->ice_getCachedConnection())
{
- proxy->ice_getCachedConnection()->close(false);
+ proxy->ice_getCachedConnection()->close(Ice::CloseGracefullyAndWait);
}
try
{
@@ -298,7 +298,7 @@ struct Connect
}
if(proxy->ice_getCachedConnection())
{
- proxy->ice_getCachedConnection()->close(false);
+ proxy->ice_getCachedConnection()->close(Ice::CloseGracefullyAndWait);
}
}
@@ -534,8 +534,8 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt
if(!collocated)
{
- metrics->ice_getConnection()->close(false);
- metrics->ice_connectionId("Con1")->ice_getConnection()->close(false);
+ metrics->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ metrics->ice_connectionId("Con1")->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
waitForCurrent(clientMetrics, "View", "Connection", 0);
waitForCurrent(serverMetrics, "View", "Connection", 0);
@@ -645,7 +645,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt
map = toMap(serverMetrics->getMetricsView("View", timestamp)["Connection"]);
test(map["holding"]->current == 1);
- metrics->ice_getConnection()->close(false);
+ metrics->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
map = toMap(clientMetrics->getMetricsView("View", timestamp)["Connection"]);
test(map["closing"]->current == 1);
@@ -660,7 +660,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt
props["IceMX.Metrics.View.Map.Connection.GroupBy"] = "none";
updateProps(clientProps, serverProps, update.get(), props, "Connection");
- metrics->ice_getConnection()->close(false);
+ metrics->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
metrics->ice_timeout(500)->ice_ping();
controller->hold();
@@ -717,7 +717,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt
testAttribute(clientMetrics, clientProps, update.get(), "Connection", "mcastHost", "");
testAttribute(clientMetrics, clientProps, update.get(), "Connection", "mcastPort", "");
- m->ice_getConnection()->close(false);
+ m->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
waitForCurrent(clientMetrics, "View", "Connection", 0);
waitForCurrent(serverMetrics, "View", "Connection", 0);
@@ -736,7 +736,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt
IceMX::MetricsPtr m1 = clientMetrics->getMetricsView("View", timestamp)["ConnectionEstablishment"][0];
test(m1->current == 0 && m1->total == 1 && m1->id == hostAndPort);
- metrics->ice_getConnection()->close(false);
+ metrics->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
controller->hold();
try
{
@@ -788,7 +788,7 @@ allTests(const Ice::CommunicatorPtr& communicator, const CommunicatorObserverIPt
try
{
prx->ice_ping();
- prx->ice_getConnection()->close(false);
+ prx->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
}
catch(const Ice::LocalException&)
{
diff --git a/cpp/test/Ice/metrics/TestAMDI.cpp b/cpp/test/Ice/metrics/TestAMDI.cpp
index 621bc976c1d..d5173e4380b 100644
--- a/cpp/test/Ice/metrics/TestAMDI.cpp
+++ b/cpp/test/Ice/metrics/TestAMDI.cpp
@@ -22,7 +22,7 @@ MetricsI::opAsync(function<void()> response, function<void(exception_ptr)>, cons
void
MetricsI::failAsync(function<void()> response, function<void(exception_ptr)>, const Ice::Current& current)
{
- current.con->close(true);
+ current.con->close(Ice::CloseForcefully);
response();
}
@@ -87,7 +87,7 @@ MetricsI::op_async(const Test::AMD_Metrics_opPtr& cb, const Ice::Current&)
void
MetricsI::fail_async(const Test::AMD_Metrics_failPtr& cb, const Ice::Current& current)
{
- current.con->close(true);
+ current.con->close(Ice::CloseForcefully);
cb->ice_response();
}
diff --git a/cpp/test/Ice/metrics/TestI.cpp b/cpp/test/Ice/metrics/TestI.cpp
index 24e7b9b8ec3..30f7a98bf35 100644
--- a/cpp/test/Ice/metrics/TestI.cpp
+++ b/cpp/test/Ice/metrics/TestI.cpp
@@ -18,7 +18,7 @@ MetricsI::op(const Ice::Current&)
void
MetricsI::fail(const Ice::Current& current)
{
- current.con->close(true);
+ current.con->close(Ice::CloseForcefully);
}
void
diff --git a/cpp/test/Ice/operations/BatchOneways.cpp b/cpp/test/Ice/operations/BatchOneways.cpp
index cc86c14a951..e3d261cf7e3 100644
--- a/cpp/test/Ice/operations/BatchOneways.cpp
+++ b/cpp/test/Ice/operations/BatchOneways.cpp
@@ -121,7 +121,7 @@ batchOneways(const Test::MyClassPrxPtr& p)
batch1->ice_ping();
batch2->ice_ping();
batch1->ice_flushBatchRequests();
- batch1->ice_getConnection()->close(false);
+ batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
batch1->ice_ping();
batch2->ice_ping();
@@ -129,7 +129,7 @@ batchOneways(const Test::MyClassPrxPtr& p)
batch2->ice_getConnection();
batch1->ice_ping();
- batch1->ice_getConnection()->close(false);
+ batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
batch1->ice_ping();
batch2->ice_ping();
}
diff --git a/cpp/test/Ice/operations/BatchOnewaysAMI.cpp b/cpp/test/Ice/operations/BatchOnewaysAMI.cpp
index 9adb2be5c7b..4545d8581cf 100644
--- a/cpp/test/Ice/operations/BatchOnewaysAMI.cpp
+++ b/cpp/test/Ice/operations/BatchOnewaysAMI.cpp
@@ -127,7 +127,7 @@ batchOnewaysAMI(const Test::MyClassPrxPtr& p)
batch1->ice_pingAsync().get();
batch2->ice_pingAsync().get();
batch1->ice_flushBatchRequestsAsync().get();
- batch1->ice_getConnection()->close(false);
+ batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
batch1->ice_pingAsync().get();
batch2->ice_pingAsync().get();
@@ -135,7 +135,7 @@ batchOnewaysAMI(const Test::MyClassPrxPtr& p)
batch2->ice_getConnection();
batch1->ice_pingAsync().get();
- batch1->ice_getConnection()->close(false);
+ batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
batch1->ice_pingAsync().get();
batch2->ice_pingAsync().get();
@@ -182,7 +182,7 @@ batchOnewaysAMI(const Test::MyClassPrxPtr& p)
batch1->end_ice_ping(batch1->begin_ice_ping());
batch2->end_ice_ping(batch2->begin_ice_ping());
batch1->end_ice_flushBatchRequests(batch1->begin_ice_flushBatchRequests());
- batch1->ice_getConnection()->close(false);
+ batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
batch1->end_ice_ping(batch1->begin_ice_ping());
batch2->end_ice_ping(batch2->begin_ice_ping());
@@ -190,7 +190,7 @@ batchOnewaysAMI(const Test::MyClassPrxPtr& p)
batch2->ice_getConnection();
batch1->end_ice_ping(batch1->begin_ice_ping());
- batch1->ice_getConnection()->close(false);
+ batch1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
batch1->end_ice_ping(batch1->begin_ice_ping());
batch2->end_ice_ping(batch2->begin_ice_ping());
diff --git a/cpp/test/Ice/retry/TestI.cpp b/cpp/test/Ice/retry/TestI.cpp
index f721a3b6ab7..7fd3c0d4dd4 100644
--- a/cpp/test/Ice/retry/TestI.cpp
+++ b/cpp/test/Ice/retry/TestI.cpp
@@ -22,7 +22,7 @@ RetryI::op(bool kill, const Ice::Current& current)
{
if(current.con)
{
- current.con->close(true);
+ current.con->close(Ice::CloseForcefully);
}
else
{
diff --git a/cpp/test/Ice/timeout/AllTests.cpp b/cpp/test/Ice/timeout/AllTests.cpp
index 6681a8f56df..1947f4e9aa1 100644
--- a/cpp/test/Ice/timeout/AllTests.cpp
+++ b/cpp/test/Ice/timeout/AllTests.cpp
@@ -294,7 +294,7 @@ allTests(const Ice::CommunicatorPtr& communicator)
TimeoutPrxPtr to = ICE_CHECKED_CAST(TimeoutPrx, obj->ice_timeout(250));
Ice::ConnectionPtr connection = to->ice_getConnection();
timeout->holdAdapter(600);
- connection->close(false);
+ connection->close(Ice::CloseGracefullyAndWait);
try
{
connection->getInfo(); // getInfo() doesn't throw in the closing state.
@@ -309,9 +309,10 @@ allTests(const Ice::CommunicatorPtr& communicator)
connection->getInfo();
test(false);
}
- catch(const Ice::CloseConnectionException&)
+ catch(const Ice::ConnectionManuallyClosedException& ex)
{
// Expected.
+ test(ex.graceful);
}
timeout->op(); // Ensure adapter is active.
}
diff --git a/cpp/test/Ice/udp/AllTests.cpp b/cpp/test/Ice/udp/AllTests.cpp
index e2e00f19da2..f16b10baa70 100644
--- a/cpp/test/Ice/udp/AllTests.cpp
+++ b/cpp/test/Ice/udp/AllTests.cpp
@@ -116,7 +116,7 @@ allTests(const CommunicatorPtr& communicator)
{
test(seq.size() > 16384);
}
- obj->ice_getConnection()->close(false);
+ obj->ice_getConnection()->close(CloseGracefullyAndWait);
communicator->getProperties()->setProperty("Ice.UDP.SndSize", "64000");
seq.resize(50000);
try
diff --git a/cpp/test/IceSSL/configuration/AllTests.cpp b/cpp/test/IceSSL/configuration/AllTests.cpp
index 96d14174b36..96c1e527cb7 100644
--- a/cpp/test/IceSSL/configuration/AllTests.cpp
+++ b/cpp/test/IceSSL/configuration/AllTests.cpp
@@ -1585,7 +1585,7 @@ allTests(const CommunicatorPtr& communicator, const string& testDir, bool p12)
//
verifier->reset();
verifier->returnValue(false);
- server->ice_getConnection()->close(false);
+ server->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
try
{
server->ice_ping();
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp
index d43c2469b28..4722fe93caf 100644
--- a/cpp/test/IceStorm/stress/Subscriber.cpp
+++ b/cpp/test/IceStorm/stress/Subscriber.cpp
@@ -163,7 +163,7 @@ public:
if(!_done && (IceUtilInternal::random(10) == 1 || ++_count == _total))
{
_done = true;
- current.con->close(true);
+ current.con->close(CloseForcefully);
// Deactivate the OA. This ensures that the subscribers
// that have subscribed with oneway QoS will be booted.
current.adapter->deactivate();