diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 67 | ||||
-rw-r--r-- | cpp/test/Ice/ami/AllTests.cpp | 60 | ||||
-rw-r--r-- | cpp/test/Ice/ami/Test.ice | 1 | ||||
-rw-r--r-- | cpp/test/Ice/ami/TestI.cpp | 6 | ||||
-rw-r--r-- | cpp/test/Ice/ami/TestI.h | 1 |
5 files changed, 108 insertions, 27 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 310b9874f02..da9cf31f1fa 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -1348,14 +1348,18 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) { assert(_state <= StateClosing); - if(current.operation & SocketOperationWrite) + // + // We parse messages first, if we receive a close + // connection message we won't send more messages. + // + if(current.operation & SocketOperationRead) { - sendNextMessage(sentCBs); + parseMessage(current.stream, invokeNum, requestId, compress, servantManager, adapter, outAsync); } - if(current.operation & SocketOperationRead) + if(current.operation & SocketOperationWrite) { - parseMessage(current.stream, invokeNum, requestId, compress, servantManager, adapter, outAsync); + sendNextMessage(sentCBs); } // @@ -1586,33 +1590,34 @@ Ice::ConnectionI::finish() if(!_sendStreams.empty()) { - assert(!_writeStream.b.empty()); - - // - // Return the stream to the outgoing call. This is important for - // retriable AMI calls which are not marshalled again. - // - OutgoingMessage* message = &_sendStreams.front(); - _writeStream.swap(*message->stream); - -#ifdef ICE_USE_IOCP - // - // The current message might be sent but not yet removed from _sendStreams. If - // the response has been received in the meantime, we remove the message from - // _sendStreams to not call finished on a message which is already done. - // - if(message->requestId > 0 && - (message->out && _requests.find(message->requestId) == _requests.end() || - message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end())) + if(!_writeStream.b.empty()) { - if(message->sent(this, true)) + // + // Return the stream to the outgoing call. This is important for + // retriable AMI calls which are not marshalled again. + // + OutgoingMessage* message = &_sendStreams.front(); + _writeStream.swap(*message->stream); + +#ifdef ICE_USE_IOCP + // + // The current message might be sent but not yet removed from _sendStreams. If + // the response has been received in the meantime, we remove the message from + // _sendStreams to not call finished on a message which is already done. + // + if(message->requestId > 0 && + (message->out && _requests.find(message->requestId) == _requests.end() || + message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end())) { - assert(message->outAsync); - message->outAsync->__sent(); + if(message->sent(this, true)) + { + assert(message->outAsync); + message->outAsync->__sent(); + } + _sendStreams.pop_front(); } - _sendStreams.pop_front(); - } #endif + } for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { @@ -2285,6 +2290,14 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb } // + // If we are in the closed state, don't continue sending. + // + if(_state >= StateClosed) + { + break; + } + + // // Otherwise, prepare the next message stream for writing. // message = &_sendStreams.front(); diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp index de4ebee8efb..8fd66031174 100644 --- a/cpp/test/Ice/ami/AllTests.cpp +++ b/cpp/test/Ice/ami/AllTests.cpp @@ -2100,5 +2100,65 @@ allTests(const Ice::CommunicatorPtr& communicator) } cout << "ok" << endl; + cout << "testing close connection with sending queue... " << flush; + { + Ice::ByteSeq seq; + seq.resize(1024 * 10); // Make sure the request doesn't compress too well. + for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q) + { + *q = static_cast<Ice::Byte>(IceUtilInternal::random(255)); + } + + // + // Send multiple opWithPayload, followed by a close and followed by multiple opWithPaylod. + // The goal is to make sure that none of the opWithPayload fail even if the server closes + // the connection gracefully in between. + // + int maxQueue = 2; + bool done = false; + while(!done && maxQueue < 50) + { + done = true; + p->ice_ping(); + vector<Ice::AsyncResultPtr> results; + for(int i = 0; i < maxQueue; ++i) + { + results.push_back(p->begin_opWithPayload(seq)); + } + if(!p->begin_close(false)->isSent()) + { + for(int i = 0; i < maxQueue; i++) + { + Ice::AsyncResultPtr r = p->begin_opWithPayload(seq); + results.push_back(r); + if(r->isSent()) + { + done = false; + maxQueue *= 2; + break; + } + } + } + else + { + maxQueue *= 2; + done = false; + } + for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p) + { + (*p)->waitForCompleted(); + try + { + (*p)->throwLocalException(); + } + catch(const Ice::LocalException&) + { + test(false); + } + } + } + } + cout << "ok" << endl; + p->shutdown(); } diff --git a/cpp/test/Ice/ami/Test.ice b/cpp/test/Ice/ami/Test.ice index 60f55c1ca8d..243a996db8f 100644 --- a/cpp/test/Ice/ami/Test.ice +++ b/cpp/test/Ice/ami/Test.ice @@ -29,6 +29,7 @@ interface TestIntf void opBatch(); int opBatchCount(); bool waitForBatch(int count); + void close(bool force); void shutdown(); }; diff --git a/cpp/test/Ice/ami/TestI.cpp b/cpp/test/Ice/ami/TestI.cpp index 8ed3b7c9498..a4cc8a8ef1f 100644 --- a/cpp/test/Ice/ami/TestI.cpp +++ b/cpp/test/Ice/ami/TestI.cpp @@ -69,6 +69,12 @@ TestIntfI::waitForBatch(Ice::Int count, const Ice::Current&) } void +TestIntfI::close(bool force, const Ice::Current& current) +{ + current.con->close(force); +} + +void TestIntfI::shutdown(const Ice::Current& current) { current.adapter->getCommunicator()->shutdown(); diff --git a/cpp/test/Ice/ami/TestI.h b/cpp/test/Ice/ami/TestI.h index e582a6d6ab0..5b33fc1eaae 100644 --- a/cpp/test/Ice/ami/TestI.h +++ b/cpp/test/Ice/ami/TestI.h @@ -28,6 +28,7 @@ public: virtual void opBatch(const Ice::Current&); virtual Ice::Int opBatchCount(const Ice::Current&); virtual bool waitForBatch(Ice::Int, const Ice::Current&); + virtual void close(bool, const Ice::Current&); virtual void shutdown(const Ice::Current&); private: |