diff options
Diffstat (limited to 'cpp/test/Ice/ami/AllTests.cpp')
-rw-r--r-- | cpp/test/Ice/ami/AllTests.cpp | 322 |
1 files changed, 294 insertions, 28 deletions
diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index 439f8a7792f..3366c4fdc7e 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -773,6 +773,21 @@ private: }; typedef IceUtil::Handle<FlushExCallback> FlushExCallbackPtr; +class CloseCallback : virtual public CallbackBase, virtual public Ice::CloseCallback +{ +public: + + CloseCallback() + { + } + + virtual void closed(const Ice::ConnectionPtr& con) + { + called(); + } +}; +typedef IceUtil::Handle<CloseCallback> CloseCallbackPtr; + class Thrower : public CallbackBase { public: @@ -1850,7 +1865,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); auto b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); auto id = this_thread::get_id(); promise<void> promise; @@ -1924,7 +1939,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) auto b1 = Ice::uncheckedCast<Test::TestIntfPrx>( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); promise<void> promise; b1->ice_getConnection()->flushBatchRequestsAsync( @@ -2002,7 +2017,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) auto b1 = Ice::uncheckedCast<Test::TestIntfPrx>( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); promise<void> promise; auto id = this_thread::get_id(); @@ -2072,8 +2087,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); - b2->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait); promise<void> promise; auto id = this_thread::get_id(); @@ -2161,8 +2176,38 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) if(p->ice_getConnection() && protocol != "bt") { - cout << "testing close connection with sending queue... " << flush; + cout << "testing graceful close connection with wait... " << flush; + { + // + // Local case: begin several requests, close the connection gracefully, and make sure it waits + // for the requests to complete. + // + vector<future<void>> results; + for(int i = 0; i < 3; ++i) + { + auto s = make_shared<promise<void>>(); + p->sleepAsync(50, + [s]() { s->set_value(); }, + [s](exception_ptr ex) { s->set_exception(ex); }); + results.push_back(s->get_future()); + } + p->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + for(vector<future<void>>::iterator p = results.begin(); p != results.end(); ++p) + { + try + { + p->get(); + } + catch(const Ice::LocalException&) + { + test(false); + } + } + } { + // + // Remote case. + // Ice::ByteSeq seq; seq.resize(1024 * 10); for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q) @@ -2191,7 +2236,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) results.push_back(s->get_future()); } atomic_flag sent = ATOMIC_FLAG_INIT; - p->closeAsync(false, nullptr, nullptr, [&sent](bool) { sent.test_and_set(); }); + p->closeAsync(Test::CloseMode::CloseGracefullyAndWait, nullptr, nullptr, + [&sent](bool) { sent.test_and_set(); }); if(!sent.test_and_set()) { for(int i = 0; i < maxQueue; i++) @@ -2231,8 +2277,111 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) } } cout << "ok" << endl; - } + cout << "testing graceful close connection without wait... " << flush; + { + // + // Local case: start a lengthy operation and then close the connection gracefully on the client side + // without waiting for the pending invocation to complete. There will be no retry and we expect the + // invocation to fail with ConnectionManuallyClosedException. + // + // This test requires two threads in the server's thread pool: one will block in sleep() and the other + // will process the CloseConnection message. + // + p->ice_ping(); + auto con = p->ice_getConnection(); + auto s = make_shared<promise<void>>(); + p->sleepAsync(100, + [s]() { s->set_value(); }, + [s](exception_ptr ex) { s->set_exception(ex); }); + future<void> f = s->get_future(); + con->close(Ice::ConnectionClose::CloseGracefully); + try + { + f.get(); + test(false); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + test(ex.graceful); + } + + // + // Remote case: the server closes the connection gracefully. Our call to TestIntf::close() + // completes successfully and then the connection should be closed immediately afterward, + // despite the fact that there's a pending call to sleep(). The call to sleep() should be + // automatically retried and complete successfully with a new connection. + // + p->ice_ping(); + con = p->ice_getConnection(); + auto sc = make_shared<promise<void>>(); + con->setCloseCallback( + [sc](Ice::ConnectionPtr connection) + { + sc->set_value(); + }); + future<void> fc = sc->get_future(); + s = make_shared<promise<void>>(); + p->sleepAsync(100, + [s]() { s->set_value(); }, + [s](exception_ptr ex) { s->set_exception(ex); }); + f = s->get_future(); + p->close(Test::CloseMode::CloseGracefully); + fc.get(); + try + { + f.get(); + } + catch(const Ice::LocalException&) + { + test(false); + } + p->ice_ping(); + test(p->ice_getConnection() != con); + } + cout << "ok" << endl; + + cout << "testing forceful close connection... " << flush; + { + // + // Local case: start a lengthy operation and then close the connection forcefully on the client side. + // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException. + // + p->ice_ping(); + auto con = p->ice_getConnection(); + auto s = make_shared<promise<void>>(); + p->sleepAsync(100, + [s]() { s->set_value(); }, + [s](exception_ptr ex) { s->set_exception(ex); }); + future<void> f = s->get_future(); + con->close(Ice::ConnectionClose::CloseForcefully); + try + { + f.get(); + test(false); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + test(!ex.graceful); + } + + // + // Remote case: the server closes the connection forcefully. This causes the request to fail + // with a ConnectionLostException. Since the close() operation is not idempotent, the client + // will not retry. + // + try + { + p->close(Test::CloseMode::CloseForcefully); + test(false); + } + catch(const Ice::ConnectionLostException&) + { + // Expected. + } + } + cout << "ok" << endl; + } } p->shutdown(); @@ -2944,7 +3093,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); Test::TestIntfPrx b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = b1->begin_ice_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); @@ -2962,7 +3111,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); Test::TestIntfPrx b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); b1->begin_ice_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie); @@ -3013,7 +3162,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); Test::TestIntfPrx b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = b1->begin_ice_flushBatchRequests( Ice::newCallback_Object_ice_flushBatchRequests(cb, &FlushCallback::exception, @@ -3031,7 +3180,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) test(p->opBatchCount() == 0); Test::TestIntfPrx b1 = p->ice_batchOneway(); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); b1->begin_ice_flushBatchRequests( Ice::newCallback_Object_ice_flushBatchRequests(cb, &FlushCallback::exceptionWC, @@ -3100,7 +3249,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(); Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests( Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync)); @@ -3119,7 +3268,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(cookie); b1->ice_getConnection()->begin_flushBatchRequests( Ice::newCallback(cb, &FlushExCallback::completedAsync, &FlushExCallback::sentAsync), cookie); @@ -3171,7 +3320,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(); Ice::AsyncResultPtr r = b1->ice_getConnection()->begin_flushBatchRequests( Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exception, @@ -3191,7 +3340,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushExCallbackPtr cb = new FlushExCallback(cookie); b1->ice_getConnection()->begin_flushBatchRequests( Ice::newCallback_Connection_flushBatchRequests(cb, &FlushExCallback::exceptionWC, @@ -3249,7 +3398,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); @@ -3268,7 +3417,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); communicator->begin_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync), cookie); @@ -3319,7 +3468,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); @@ -3346,8 +3495,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); - b2->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback(cb, &FlushCallback::completedAsync, &FlushCallback::sentAsync)); @@ -3402,7 +3551,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, @@ -3422,7 +3571,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) Test::TestIntfPrx b1 = Test::TestIntfPrx::uncheckedCast( p->ice_getConnection()->createProxy(p->ice_getIdentity())->ice_batchOneway()); b1->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(cookie); communicator->begin_flushBatchRequests( Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exceptionWC, @@ -3476,7 +3625,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, @@ -3504,8 +3653,8 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) b2->ice_getConnection(); // Ensure connection is established. b1->opBatch(); b2->opBatch(); - b1->ice_getConnection()->close(false); - b2->ice_getConnection()->close(false); + b1->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + b2->ice_getConnection()->close(Ice::CloseGracefullyAndWait); FlushCallbackPtr cb = new FlushCallback(); Ice::AsyncResultPtr r = communicator->begin_flushBatchRequests( Ice::newCallback_Communicator_flushBatchRequests(cb, &FlushCallback::exception, @@ -3715,8 +3864,35 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) if(p->ice_getConnection() && protocol != "bt") { - cout << "testing close connection with sending queue... " << flush; + cout << "testing graceful close connection with wait... " << flush; + { + // + // Local case: begin several requests, close the connection gracefully, and make sure it waits + // for the requests to complete. + // + vector<Ice::AsyncResultPtr> results; + for(int i = 0; i < 3; ++i) + { + results.push_back(p->begin_sleep(50)); + } + p->ice_getConnection()->close(Ice::CloseGracefullyAndWait); + for(vector<Ice::AsyncResultPtr>::const_iterator q = results.begin(); q != results.end(); ++q) + { + (*q)->waitForCompleted(); + try + { + (*q)->throwLocalException(); + } + catch(const Ice::LocalException&) + { + test(false); + } + } + } { + // + // Remote case. + // Ice::ByteSeq seq; seq.resize(1024 * 10); for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q) @@ -3740,7 +3916,7 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) { results.push_back(p->begin_opWithPayload(seq)); } - if(!p->begin_close(false)->isSent()) + if(!p->begin_close(Test::CloseGracefullyAndWait)->isSent()) { for(int i = 0; i < maxQueue; i++) { @@ -3775,6 +3951,96 @@ allTests(const Ice::CommunicatorPtr& communicator, bool collocated) } } cout << "ok" << endl; + + cout << "testing graceful close connection without wait... " << flush; + { + // + // Local case: start a lengthy operation and then close the connection gracefully on the client side + // without waiting for the pending invocation to complete. There will be no retry and we expect the + // invocation to fail with ConnectionManuallyClosedException. + // + // This test requires two threads in the server's thread pool: one will block in sleep() and the other + // will process the CloseConnection message. + // + p->ice_ping(); + Ice::ConnectionPtr con = p->ice_getConnection(); + Ice::AsyncResultPtr r = p->begin_sleep(100); + con->close(Ice::CloseGracefully); + r->waitForCompleted(); + try + { + r->throwLocalException(); + test(false); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + test(ex.graceful); + } + + // + // Remote case: the server closes the connection gracefully. Our call to TestIntf::close() + // completes successfully and then the connection should be closed immediately afterward, + // despite the fact that there's a pending call to sleep(). The call to sleep() should be + // automatically retried and complete successfully. + // + p->ice_ping(); + con = p->ice_getConnection(); + CloseCallbackPtr cb = new CloseCallback; + con->setCloseCallback(cb); + r = p->begin_sleep(100); + p->close(Test::CloseGracefully); + cb->check(); + r->waitForCompleted(); + try + { + r->throwLocalException(); + } + catch(const Ice::LocalException&) + { + test(false); + } + p->ice_ping(); + test(p->ice_getConnection() != con); + } + cout << "ok" << endl; + + cout << "testing forceful close connection... " << flush; + { + // + // Local case: start a lengthy operation and then close the connection forcefully on the client side. + // There will be no retry and we expect the invocation to fail with ConnectionManuallyClosedException. + // + p->ice_ping(); + Ice::ConnectionPtr con = p->ice_getConnection(); + Ice::AsyncResultPtr r = p->begin_sleep(100); + con->close(Ice::CloseForcefully); + r->waitForCompleted(); + try + { + r->throwLocalException(); + test(false); + } + catch(const Ice::ConnectionManuallyClosedException& ex) + { + test(!ex.graceful); + } + + // + // Remote case: the server closes the connection forcefully. This causes the request to fail + // with a ConnectionLostException. Since the close() operation is not idempotent, the client + // will not retry. + // + try + { + p->close(Test::CloseForcefully); + test(false); + } + catch(const Ice::ConnectionLostException&) + { + // Expected. + } + } + cout << "ok" << endl; } p->shutdown(); |