summaryrefslogtreecommitdiff
path: root/cpp/test/Ice/ami/AllTests.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/test/Ice/ami/AllTests.cpp')
-rw-r--r--cpp/test/Ice/ami/AllTests.cpp322
1 files changed, 294 insertions, 28 deletions
diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp
index 439f8a7792f..3366c4fdc7e 100644
--- a/cpp/test/Ice/ami/AllTests.cpp
+++ b/cpp/test/Ice/ami/AllTests.cpp
@@ -773,6 +773,21 @@ private:
};
typedef IceUtil::Handle<FlushExCallback> FlushExCallbackPtr;
+class CloseCallback : virtual public CallbackBase, virtual public Ice::CloseCallback
+{
+public:
+
+ CloseCallback()
+ {
+ }
+
+ virtual void closed(const Ice::ConnectionPtr& con)
+ {
+ called();
+ }
+};
+typedef IceUtil::Handle<CloseCallback> CloseCallbackPtr;
+
class Thrower : public CallbackBase
{
public:
@@ -1850,7 +1865,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
auto b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
auto id = this_thread::get_id();
promise<void> promise;
@@ -1924,7 +1939,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
auto b1 = Ice::uncheckedCast<Test::TestIntfPrx>(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
promise<void> promise;
b1->ice_getConnection()->flushBatchRequestsAsync(
@@ -2002,7 +2017,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
auto b1 = Ice::uncheckedCast<Test::TestIntfPrx>(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
promise<void> promise;
auto id = this_thread::get_id();
@@ -2072,8 +2087,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
- b2->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
promise<void> promise;
auto id = this_thread::get_id();
@@ -2161,8 +2176,38 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
if(p->ice_getConnection() && protocol != "bt")
{
- cout << "testing close connection with sending queue... " << flush;
+ cout << "testing graceful close connection with wait... " << flush;
+ {
+ //
+ // Local case: begin several requests, close the connection gracefully, and make sure it waits
+ // for the requests to complete.
+ //
+ vector<future<void>> results;
+ for(int i = 0; i < 3; ++i)
+ {
+ auto s = make_shared<promise<void>>();
+ p->sleepAsync(50,
+ [s]() { s->set_value(); },
+ [s](exception_ptr ex) { s->set_exception(ex); });
+ results.push_back(s->get_future());
+ }
+ p->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ for(vector<future<void>>::iterator p = results.begin(); p != results.end(); ++p)
+ {
+ try
+ {
+ p->get();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ }
+ }
{
+ //
+ // Remote case.
+ //
Ice::ByteSeq seq;
seq.resize(1024 * 10);
for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q)
@@ -2191,7 +2236,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
results.push_back(s->get_future());
}
atomic_flag sent = ATOMIC_FLAG_INIT;
- p->closeAsync(false, nullptr, nullptr, [&sent](bool) { sent.test_and_set(); });
+ p->closeAsync(Test::CloseMode::CloseGracefullyAndWait, nullptr, nullptr,
+ [&sent](bool) { sent.test_and_set(); });
if(!sent.test_and_set())
{
for(int i = 0; i < maxQueue; i++)
@@ -2231,8 +2277,111 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
}
}
cout << "ok" << endl;
- }
+ cout << "testing graceful close connection without wait... " << flush;
+ {
+ //
+ // Local case: start a lengthy operation and then close the connection gracefully on the client side
+ // without waiting for the pending invocation to complete. There will be no retry and we expect the
+ // invocation to fail with ConnectionManuallyClosedException.
+ //
+ // This test requires two threads in the server's thread pool: one will block in sleep() and the other
+ // will process the CloseConnection message.
+ //
+ p->ice_ping();
+ auto con = p->ice_getConnection();
+ auto s = make_shared<promise<void>>();
+ p->sleepAsync(100,
+ [s]() { s->set_value(); },
+ [s](exception_ptr ex) { s->set_exception(ex); });
+ future<void> f = s->get_future();
+ con->close(Ice::ConnectionClose::CloseGracefully);
+ try
+ {
+ f.get();
+ test(false);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ test(ex.graceful);
+ }
+
+ //
+ // Remote case: the server closes the connection gracefully. Our call to TestIntf::close()
+ // completes successfully and then the connection should be closed immediately afterward,
+ // despite the fact that there's a pending call to sleep(). The call to sleep() should be
+ // automatically retried and complete successfully with a new connection.
+ //
+ p->ice_ping();
+ con = p->ice_getConnection();
+ auto sc = make_shared<promise<void>>();
+ con->setCloseCallback(
+ [sc](Ice::ConnectionPtr connection)
+ {
+ sc->set_value();
+ });
+ future<void> fc = sc->get_future();
+ s = make_shared<promise<void>>();
+ p->sleepAsync(100,
+ [s]() { s->set_value(); },
+ [s](exception_ptr ex) { s->set_exception(ex); });
+ f = s->get_future();
+ p->close(Test::CloseMode::CloseGracefully);
+ fc.get();
+ try
+ {
+ f.get();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ p->ice_ping();
+ test(p->ice_getConnection() != con);
+ }
+ cout << "ok" << endl;
+
+ cout << "testing forceful close connection... " << flush;
+ {
+ //
+ // Local case: start a lengthy operation and then close the connection forcefully on the client side.
+ // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException.
+ //
+ p->ice_ping();
+ auto con = p->ice_getConnection();
+ auto s = make_shared<promise<void>>();
+ p->sleepAsync(100,
+ [s]() { s->set_value(); },
+ [s](exception_ptr ex) { s->set_exception(ex); });
+ future<void> f = s->get_future();
+ con->close(Ice::ConnectionClose::CloseForcefully);
+ try
+ {
+ f.get();
+ test(false);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ test(!ex.graceful);
+ }
+
+ //
+ // Remote case: the server closes the connection forcefully. This causes the request to fail
+ // with a ConnectionLostException. Since the close() operation is not idempotent, the client
+ // will not retry.
+ //
+ try
+ {
+ p->close(Test::CloseMode::CloseForcefully);
+ test(false);
+ }
+ catch(const Ice::ConnectionLostException&)
+ {
+ // Expected.
+ }
+ }
+ cout << "ok" << endl;
+ }
}
p->shutdown();
@@ -2944,7 +3093,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
Test::TestIntfPrx b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = b1->begin_ice_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
@@ -2962,7 +3111,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
Test::TestIntfPrx b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
b1->begin_ice_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie);
@@ -3013,7 +3162,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
Test::TestIntfPrx b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = b1->begin_ice_flushBatchRequests(
Ice::newCallback_Object_ice_flushBatchRequests(cb, &FlushCallback::exception,
@@ -3031,7 +3180,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
test(p->opBatchCount() == 0);
Test::TestIntfPrx b1 = p->ice_batchOneway();
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
b1->begin_ice_flushBatchRequests(
Ice::newCallback_Object_ice_flushBatchRequests(cb, &FlushCallback::exceptionWC,
@@ -3100,7 +3249,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback();
Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync));
@@ -3119,7 +3268,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback(cookie);
b1->ice_getConnection()->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync), cookie);
@@ -3171,7 +3320,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback();
Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests(
Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exception,
@@ -3191,7 +3340,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushExCallbackPtr cb = new FlushExCallback(cookie);
b1->ice_getConnection()->begin_flushBatchRequests(
Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exceptionWC,
@@ -3249,7 +3398,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
@@ -3268,7 +3417,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
communicator->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie);
@@ -3319,7 +3468,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
@@ -3346,8 +3495,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
- b2->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync));
@@ -3402,7 +3551,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
@@ -3422,7 +3571,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast(
p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway());
b1->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback(cookie);
communicator->begin_flushBatchRequests(
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exceptionWC,
@@ -3476,7 +3625,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
@@ -3504,8 +3653,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
b2->ice_getConnection(); // Ensure connection is established.
b1->opBatch();
b2->opBatch();
- b1->ice_getConnection()->close(false);
- b2->ice_getConnection()->close(false);
+ b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
FlushCallbackPtr cb = new FlushCallback();
Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests(
Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception,
@@ -3715,8 +3864,35 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
if(p->ice_getConnection() && protocol != "bt")
{
- cout << "testing close connection with sending queue... " << flush;
+ cout << "testing graceful close connection with wait... " << flush;
+ {
+ //
+ // Local case: begin several requests, close the connection gracefully, and make sure it waits
+ // for the requests to complete.
+ //
+ vector<Ice::AsyncResultPtr> results;
+ for(int i = 0; i < 3; ++i)
+ {
+ results.push_back(p->begin_sleep(50));
+ }
+ p->ice_getConnection()->close(Ice::CloseGracefullyAndWait);
+ for(vector<Ice::AsyncResultPtr>::const_iterator q = results.begin(); q != results.end(); ++q)
+ {
+ (*q)->waitForCompleted();
+ try
+ {
+ (*q)->throwLocalException();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ }
+ }
{
+ //
+ // Remote case.
+ //
Ice::ByteSeq seq;
seq.resize(1024 * 10);
for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q)
@@ -3740,7 +3916,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
{
results.push_back(p->begin_opWithPayload(seq));
}
- if(!p->begin_close(false)->isSent())
+ if(!p->begin_close(Test::CloseGracefullyAndWait)->isSent())
{
for(int i = 0; i < maxQueue; i++)
{
@@ -3775,6 +3951,96 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated)
}
}
cout << "ok" << endl;
+
+ cout << "testing graceful close connection without wait... " << flush;
+ {
+ //
+ // Local case: start a lengthy operation and then close the connection gracefully on the client side
+ // without waiting for the pending invocation to complete. There will be no retry and we expect the
+ // invocation to fail with ConnectionManuallyClosedException.
+ //
+ // This test requires two threads in the server's thread pool: one will block in sleep() and the other
+ // will process the CloseConnection message.
+ //
+ p->ice_ping();
+ Ice::ConnectionPtr con = p->ice_getConnection();
+ Ice::AsyncResultPtr r = p->begin_sleep(100);
+ con->close(Ice::CloseGracefully);
+ r->waitForCompleted();
+ try
+ {
+ r->throwLocalException();
+ test(false);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ test(ex.graceful);
+ }
+
+ //
+ // Remote case: the server closes the connection gracefully. Our call to TestIntf::close()
+ // completes successfully and then the connection should be closed immediately afterward,
+ // despite the fact that there's a pending call to sleep(). The call to sleep() should be
+ // automatically retried and complete successfully.
+ //
+ p->ice_ping();
+ con = p->ice_getConnection();
+ CloseCallbackPtr cb = new CloseCallback;
+ con->setCloseCallback(cb);
+ r = p->begin_sleep(100);
+ p->close(Test::CloseGracefully);
+ cb->check();
+ r->waitForCompleted();
+ try
+ {
+ r->throwLocalException();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ p->ice_ping();
+ test(p->ice_getConnection() != con);
+ }
+ cout << "ok" << endl;
+
+ cout << "testing forceful close connection... " << flush;
+ {
+ //
+ // Local case: start a lengthy operation and then close the connection forcefully on the client side.
+ // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException.
+ //
+ p->ice_ping();
+ Ice::ConnectionPtr con = p->ice_getConnection();
+ Ice::AsyncResultPtr r = p->begin_sleep(100);
+ con->close(Ice::CloseForcefully);
+ r->waitForCompleted();
+ try
+ {
+ r->throwLocalException();
+ test(false);
+ }
+ catch(const Ice::ConnectionManuallyClosedException& ex)
+ {
+ test(!ex.graceful);
+ }
+
+ //
+ // Remote case: the server closes the connection forcefully. This causes the request to fail
+ // with a ConnectionLostException. Since the close() operation is not idempotent, the client
+ // will not retry.
+ //
+ try
+ {
+ p->close(Test::CloseForcefully);
+ test(false);
+ }
+ catch(const Ice::ConnectionLostException&)
+ {
+ // Expected.
+ }
+ }
+ cout << "ok" << endl;
}
p->shutdown();