summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp67
-rw-r--r--cpp/test/Ice/ami/AllTests.cpp60
-rw-r--r--cpp/test/Ice/ami/Test.ice1
-rw-r--r--cpp/test/Ice/ami/TestI.cpp6
-rw-r--r--cpp/test/Ice/ami/TestI.h1
5 files changed, 108 insertions, 27 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 310b9874f02..da9cf31f1fa 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -1348,14 +1348,18 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
{
assert(_state <= StateClosing);
- if(current.operation & SocketOperationWrite)
+ //
+ // We parse messages first, if we receive a close
+ // connection message we won't send more messages.
+ //
+ if(current.operation & SocketOperationRead)
{
- sendNextMessage(sentCBs);
+ parseMessage(current.stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
}
- if(current.operation & SocketOperationRead)
+ if(current.operation & SocketOperationWrite)
{
- parseMessage(current.stream, invokeNum, requestId, compress, servantManager, adapter, outAsync);
+ sendNextMessage(sentCBs);
}
//
@@ -1586,33 +1590,34 @@ Ice::ConnectionI::finish()
if(!_sendStreams.empty())
{
- assert(!_writeStream.b.empty());
-
- //
- // Return the stream to the outgoing call. This is important for
- // retriable AMI calls which are not marshalled again.
- //
- OutgoingMessage* message = &_sendStreams.front();
- _writeStream.swap(*message->stream);
-
-#ifdef ICE_USE_IOCP
- //
- // The current message might be sent but not yet removed from _sendStreams. If
- // the response has been received in the meantime, we remove the message from
- // _sendStreams to not call finished on a message which is already done.
- //
- if(message->requestId > 0 &&
- (message->out && _requests.find(message->requestId) == _requests.end() ||
- message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end()))
+ if(!_writeStream.b.empty())
{
- if(message->sent(this, true))
+ //
+ // Return the stream to the outgoing call. This is important for
+ // retriable AMI calls which are not marshalled again.
+ //
+ OutgoingMessage* message = &_sendStreams.front();
+ _writeStream.swap(*message->stream);
+
+#ifdef ICE_USE_IOCP
+ //
+ // The current message might be sent but not yet removed from _sendStreams. If
+ // the response has been received in the meantime, we remove the message from
+ // _sendStreams to not call finished on a message which is already done.
+ //
+ if(message->requestId > 0 &&
+ (message->out && _requests.find(message->requestId) == _requests.end() ||
+ message->outAsync && _asyncRequests.find(message->requestId) == _asyncRequests.end()))
{
- assert(message->outAsync);
- message->outAsync->__sent();
+ if(message->sent(this, true))
+ {
+ assert(message->outAsync);
+ message->outAsync->__sent();
+ }
+ _sendStreams.pop_front();
}
- _sendStreams.pop_front();
- }
#endif
+ }
for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
@@ -2285,6 +2290,14 @@ Ice::ConnectionI::sendNextMessage(vector<OutgoingAsyncMessageCallbackPtr>& callb
}
//
+ // If we are in the closed state, don't continue sending.
+ //
+ if(_state >= StateClosed)
+ {
+ break;
+ }
+
+ //
// Otherwise, prepare the next message stream for writing.
//
message = &_sendStreams.front();
diff --git a/cpp/test/Ice/ami/AllTests.cpp b/cpp/test/Ice/ami/AllTests.cpp
index de4ebee8efb..8fd66031174 100644
--- a/cpp/test/Ice/ami/AllTests.cpp
+++ b/cpp/test/Ice/ami/AllTests.cpp
@@ -2100,5 +2100,65 @@ allTests(const Ice::CommunicatorPtr& communicator)
}
cout << "ok" << endl;
+ cout << "testing close connection with sending queue... " << flush;
+ {
+ Ice::ByteSeq seq;
+ seq.resize(1024 * 10); // Make sure the request doesn't compress too well.
+ for(Ice::ByteSeq::iterator q = seq.begin(); q != seq.end(); ++q)
+ {
+ *q = static_cast<Ice::Byte>(IceUtilInternal::random(255));
+ }
+
+ //
+ // Send multiple opWithPayload, followed by a close and followed by multiple opWithPaylod.
+ // The goal is to make sure that none of the opWithPayload fail even if the server closes
+ // the connection gracefully in between.
+ //
+ int maxQueue = 2;
+ bool done = false;
+ while(!done && maxQueue < 50)
+ {
+ done = true;
+ p->ice_ping();
+ vector<Ice::AsyncResultPtr> results;
+ for(int i = 0; i < maxQueue; ++i)
+ {
+ results.push_back(p->begin_opWithPayload(seq));
+ }
+ if(!p->begin_close(false)->isSent())
+ {
+ for(int i = 0; i < maxQueue; i++)
+ {
+ Ice::AsyncResultPtr r = p->begin_opWithPayload(seq);
+ results.push_back(r);
+ if(r->isSent())
+ {
+ done = false;
+ maxQueue *= 2;
+ break;
+ }
+ }
+ }
+ else
+ {
+ maxQueue *= 2;
+ done = false;
+ }
+ for(vector<Ice::AsyncResultPtr>::const_iterator p = results.begin(); p != results.end(); ++p)
+ {
+ (*p)->waitForCompleted();
+ try
+ {
+ (*p)->throwLocalException();
+ }
+ catch(const Ice::LocalException&)
+ {
+ test(false);
+ }
+ }
+ }
+ }
+ cout << "ok" << endl;
+
p->shutdown();
}
diff --git a/cpp/test/Ice/ami/Test.ice b/cpp/test/Ice/ami/Test.ice
index 60f55c1ca8d..243a996db8f 100644
--- a/cpp/test/Ice/ami/Test.ice
+++ b/cpp/test/Ice/ami/Test.ice
@@ -29,6 +29,7 @@ interface TestIntf
void opBatch();
int opBatchCount();
bool waitForBatch(int count);
+ void close(bool force);
void shutdown();
};
diff --git a/cpp/test/Ice/ami/TestI.cpp b/cpp/test/Ice/ami/TestI.cpp
index 8ed3b7c9498..a4cc8a8ef1f 100644
--- a/cpp/test/Ice/ami/TestI.cpp
+++ b/cpp/test/Ice/ami/TestI.cpp
@@ -69,6 +69,12 @@ TestIntfI::waitForBatch(Ice::Int count, const Ice::Current&)
}
void
+TestIntfI::close(bool force, const Ice::Current& current)
+{
+ current.con->close(force);
+}
+
+void
TestIntfI::shutdown(const Ice::Current& current)
{
current.adapter->getCommunicator()->shutdown();
diff --git a/cpp/test/Ice/ami/TestI.h b/cpp/test/Ice/ami/TestI.h
index e582a6d6ab0..5b33fc1eaae 100644
--- a/cpp/test/Ice/ami/TestI.h
+++ b/cpp/test/Ice/ami/TestI.h
@@ -28,6 +28,7 @@ public:
virtual void opBatch(const Ice::Current&);
virtual Ice::Int opBatchCount(const Ice::Current&);
virtual bool waitForBatch(Ice::Int, const Ice::Current&);
+ virtual void close(bool, const Ice::Current&);
virtual void shutdown(const Ice::Current&);
private: