summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/ConnectionI.cpp
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp148
1 files changed, 85 insertions, 63 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index f36c83f62fa..850f245f163 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -23,6 +23,7 @@
#include <Ice/OutgoingAsync.h>
#include <Ice/Incoming.h>
#include <Ice/LocalException.h>
+#include <Ice/RequestHandler.h> // For RetryException
#include <Ice/ReferenceFactory.h> // For createProxy().
#include <Ice/ProxyFactory.h> // For createProxy().
#ifndef ICE_OS_WINRT
@@ -239,13 +240,21 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str)
adopted = true;
}
-void
-Ice::ConnectionI::OutgoingMessage::timedOut()
+bool
+Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream)
{
assert((out || outAsync) && !isSent); // Only requests can timeout.
out = 0;
outAsync = 0;
- adopt(0); // Adopt the request stream
+ if(adoptStream)
+ {
+ adopt(0); // Adopt the request stream
+ }
+ else
+ {
+ assert(!adopted && !stream);
+ }
+ return isSent;
}
bool
@@ -612,7 +621,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
// to send our request, we always try to send the request
// again.
//
- throw LocalExceptionWrapper(*_exception.get(), true);
+ throw RetryException(*_exception.get());
}
assert(_state > StateNotValidated);
@@ -692,7 +701,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
// to send our request, we always try to send the request
// again.
//
- throw LocalExceptionWrapper(*_exception.get(), true);
+ throw RetryException(*_exception.get());
}
assert(_state > StateNotValidated);
@@ -777,7 +786,7 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
//
if(_batchStream.b.empty())
{
- throw LocalExceptionWrapper(*_exception.get(), true);
+ throw RetryException(*_exception.get());
}
else
{
@@ -825,7 +834,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
if(_exception.get())
{
- _exception->ice_throw();
+ return;
}
bool flush = false;
@@ -958,8 +967,7 @@ Ice::ConnectionI::abortBatchRequest()
void
Ice::ConnectionI::flushBatchRequests()
{
- IceInternal::InvocationObserver observer(_instance.get(), __flushBatchRequests_name);
- BatchOutgoing out(this, _instance.get(), observer);
+ BatchOutgoing out(this, _instance.get(), __flushBatchRequests_name);
out.invoke();
}
@@ -1209,17 +1217,19 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
// If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
+ bool isSent;
if(o == _sendStreams.begin())
{
- o->timedOut();
+ isSent = o->timedOut(true); // true = adopt the stream.
}
else
{
+ isSent = o->timedOut(false);
_sendStreams.erase(o);
}
InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex);
+ out->finished(ex, isSent);
return;
}
}
@@ -1254,75 +1264,87 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out)
void
Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
+ bool isSent;
+ bool finished = false;
{
- if(o->outAsync.get() == outAsync.get())
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+
+ for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
{
- if(o->requestId)
+ if(o->outAsync.get() == outAsync.get())
{
- if(_asyncRequestsHint != _asyncRequests.end() &&
- _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync))
+ if(o->requestId)
{
- _asyncRequests.erase(_asyncRequestsHint);
- _asyncRequestsHint = _asyncRequests.end();
+ if(_asyncRequestsHint != _asyncRequests.end() &&
+ _asyncRequestsHint->second == OutgoingAsyncPtr::dynamicCast(outAsync))
+ {
+ _asyncRequests.erase(_asyncRequestsHint);
+ _asyncRequestsHint = _asyncRequests.end();
+ }
+ else
+ {
+ _asyncRequests.erase(o->requestId);
+ }
+ }
+
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ if(o == _sendStreams.begin())
+ {
+ isSent = o->timedOut(true); // true = adopt the stream
}
else
{
- _asyncRequests.erase(o->requestId);
+ isSent = o->timedOut(false);
+ _sendStreams.erase(o);
}
+ finished = true;
+ break; // We're done
}
-
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- if(o == _sendStreams.begin())
- {
- o->timedOut();
- }
- else
- {
- _sendStreams.erase(o);
- }
-
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->finished(ex);
- return; // We're done.
}
- }
- OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
- if(o)
- {
- if(_asyncRequestsHint != _asyncRequests.end())
+ if(!finished)
{
- if(_asyncRequestsHint->second == o)
+ OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
+ if(o)
{
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->__finished(ex, true);
- _asyncRequests.erase(_asyncRequestsHint);
- _asyncRequestsHint = _asyncRequests.end();
- }
- }
+ if(_asyncRequestsHint != _asyncRequests.end())
+ {
+ if(_asyncRequestsHint->second == o)
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ o->__finished(ex, true);
+ _asyncRequests.erase(_asyncRequestsHint);
+ _asyncRequestsHint = _asyncRequests.end();
+ }
+ }
- for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
- {
- if(p->second.get() == o.get())
- {
- InvocationTimeoutException ex(__FILE__, __LINE__);
- o->__finished(ex, true);
- assert(p != _asyncRequestsHint);
- _asyncRequests.erase(p);
- return; // We're done.
+ for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
+ {
+ if(p->second.get() == o.get())
+ {
+ assert(p != _asyncRequestsHint);
+ _asyncRequests.erase(p);
+ finished = true;
+ isSent = true;
+ break;
+ }
+ }
}
}
}
+
+ if(finished)
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ outAsync->__finished(ex, isSent);
+ }
}
void
-Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag)
+Ice::ConnectionI::sendResponse(Int, BasicStream* os, Byte compressFlag)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
assert(_state > StateNotValidated);
@@ -2156,7 +2178,7 @@ Ice::ConnectionI::exception(const LocalException& ex)
}
void
-Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum)
+Ice::ConnectionI::invokeException(Ice::Int, const LocalException& ex, int invokeNum)
{
//
// Fatal exception while invoking a request. Since sendResponse/sendNoResponse isn't
@@ -3458,7 +3480,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
bool response = !_endpoint->datagram() && requestId != 0;
assert(!response || invokeNum == 1);
- Incoming in(_instance.get(), this, adapter, response, compress, requestId);
+ Incoming in(_instance.get(), this, this, adapter, response, compress, requestId);
//
// Dispatch the invocation.
@@ -3472,7 +3494,7 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B
}
catch(const LocalException& ex)
{
- invokeException(ex, invokeNum); // Fatal invocation exception
+ invokeException(requestId, ex, invokeNum); // Fatal invocation exception
}
}