summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp271
1 files changed, 16 insertions, 255 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 7af8fad5996..170c7b95d1e 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -20,7 +20,6 @@
#include <Ice/ACM.h>
#include <Ice/ObjectAdapterI.h> // For getThreadPool() and getServantManager().
#include <Ice/EndpointI.h>
-#include <Ice/Outgoing.h>
#include <Ice/OutgoingAsync.h>
#include <Ice/Incoming.h>
#include <Ice/LocalException.h>
@@ -229,7 +228,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(OutputStream* str)
}
else if(!str)
{
- if(out || outAsync)
+ if(outAsync)
{
return; // Adopting request stream is not necessary.
}
@@ -249,8 +248,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(OutputStream* str)
void
Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream)
{
- assert((out || outAsync)); // Only requests can timeout.
- out = 0;
+ assert(outAsync); // Only requests can timeout.
outAsync = 0;
if(adoptStream)
{
@@ -271,11 +269,7 @@ Ice::ConnectionI::OutgoingMessage::sent()
}
stream = 0;
- if(out)
- {
- out->sent();
- }
- else if(outAsync)
+ if(outAsync)
{
#if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT)
invokeSent = outAsync->sent();
@@ -290,11 +284,7 @@ Ice::ConnectionI::OutgoingMessage::sent()
void
Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex)
{
- if(out)
- {
- out->completed(ex);
- }
- else if(outAsync)
+ if(outAsync)
{
if(outAsync->exception(ex))
{
@@ -436,7 +426,7 @@ Ice::ConnectionI::close(bool force)
// requests to be retried, regardless of whether the server
// has processed them or not.
//
- while(!_requests.empty() || !_asyncRequests.empty())
+ while(!_asyncRequests.empty())
{
wait();
}
@@ -591,8 +581,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
if(acm.close != CloseOff && now >= (_acmLastActivity + acm.timeout))
{
- if(acm.close == CloseOnIdleForceful ||
- (acm.close != CloseOnIdle && (!_requests.empty() || !_asyncRequests.empty())))
+ if(acm.close == CloseOnIdleForceful || (acm.close != CloseOnIdle && !_asyncRequests.empty()))
{
//
// Close the connection if we didn't receive a heartbeat in
@@ -600,8 +589,8 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
//
setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__));
}
- else if(acm.close != CloseOnInvocation &&
- _dispatchCount == 0 && _batchRequestQueue->isEmpty() && _requests.empty() && _asyncRequests.empty())
+ else if(acm.close != CloseOnInvocation && _dispatchCount == 0 && _batchRequestQueue->isEmpty() &&
+ _asyncRequests.empty())
{
//
// The connection is idle, close it.
@@ -611,94 +600,6 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
}
}
-bool
-Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, int batchRequestNum)
-{
- OutputStream* os = out->os();
-
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- //
- // If the connection is closed before we even have a chance
- // to send our request, we always try to send the request
- // again.
- //
- if(_exception)
- {
- throw RetryException(*_exception);
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- //
- // Ensure the message isn't bigger than what we can send with the
- // transport.
- //
- _transceiver->checkSendSize(*os);
-
- Int requestId = 0;
- if(response)
- {
- //
- // Create a new unique request ID.
- //
- requestId = _nextRequestId++;
- if(requestId <= 0)
- {
- _nextRequestId = 1;
- requestId = _nextRequestId++;
- }
-
- //
- // Fill in the request ID.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&requestId);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#endif
- }
- else if(batchRequestNum > 0)
- {
- const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), os->b.begin() + headerSize);
-#endif
- }
-
- out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
-
- //
- // Send the message. If it can't be sent without blocking the message is added
- // to _sendStreams and it will be sent by the selector thread.
- //
- bool sent = false;
- try
- {
- OutgoingMessage message(out, os, compress, requestId);
- sent = sendMessage(message) & AsyncStatusSent;
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- assert(_exception);
- _exception->ice_throw();
- }
-
- if(response)
- {
- //
- // Add to the requests map.
- //
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingBase*>(requestId, out));
- }
-
- return sent;
-}
-
AsyncStatus
Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compress, bool response, int batchRequestNum)
{
@@ -824,8 +725,7 @@ Ice::ConnectionI::flushBatchRequestsAsync(::std::function<void(::std::exception_
void
Ice::ConnectionI::flushBatchRequests()
{
- ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name);
- out.invoke();
+ end_flushBatchRequests(begin_flushBatchRequests());
}
AsyncResultPtr
@@ -1021,96 +921,6 @@ Ice::ConnectionI::getACM()
}
void
-Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- if(_state >= StateClosed)
- {
- return; // The request has already been or will be shortly notified of the failure.
- }
-
- for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o)
- {
- if(o->out == out)
- {
- if(o->requestId)
- {
- if(_requestsHint != _requests.end() && _requestsHint->second == out)
- {
- _requests.erase(_requestsHint);
- _requestsHint = _requests.end();
- }
- else
- {
- _requests.erase(o->requestId);
- }
- }
-
- if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
- {
- setState(StateClosed, ex);
- }
- else
- {
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- if(o == _sendStreams.begin())
- {
- o->canceled(true); // true = adopt the stream.
- }
- else
- {
- o->canceled(false);
- _sendStreams.erase(o);
- }
- out->completed(ex);
- }
- return;
- }
- }
-
- if(dynamic_cast<Outgoing*>(out))
- {
- if(_requestsHint != _requests.end() && _requestsHint->second == out)
- {
- if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
- {
- setState(StateClosed, ex);
- }
- else
- {
- out->completed(ex);
- _requests.erase(_requestsHint);
- _requestsHint = _requests.end();
- }
- return;
- }
- else
- {
- for(map<Int, OutgoingBase*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- if(p->second == out)
- {
- if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
- {
- setState(StateClosed, ex);
- }
- else
- {
- p->second->completed(ex);
- assert(p != _requestsHint);
- _requests.erase(p);
- }
- return; // We're done.
- }
- }
- }
- }
-}
-
-void
Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex)
{
//
@@ -2018,28 +1828,13 @@ Ice::ConnectionI::finish(bool close)
o->completed(*_exception);
if(o->requestId) // Make sure finished isn't called twice.
{
- if(o->out)
- {
- _requests.erase(o->requestId);
- }
- else
- {
- _asyncRequests.erase(o->requestId);
- }
+ _asyncRequests.erase(o->requestId);
}
}
- _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
+ _sendStreams.clear();
}
-
- for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- p->second->completed(*_exception);
- }
-
- _requests.clear();
-
for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
if(q->second->exception(*_exception))
@@ -2181,7 +1976,6 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_warnUdp(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0),
_compressionLevel(1),
_nextRequestId(1),
- _requestsHint(_requests.end()),
_asyncRequestsHint(_asyncRequests.end()),
_messageSizeMax(adapter ? adapter->messageSizeMax() : _instance->messageSizeMax()),
_batchRequestQueue(new BatchRequestQueue(instance, endpoint->datagram())),
@@ -2249,7 +2043,6 @@ Ice::ConnectionI::~ConnectionI()
assert(_state == StateFinished);
assert(_dispatchCount == 0);
assert(_sendStreams.empty());
- assert(_requests.empty());
assert(_asyncRequests.empty());
}
@@ -3310,54 +3103,22 @@ Ice::ConnectionI::parseMessage(InputStream& stream, Int& invokeNum, Int& request
stream.read(requestId);
- map<Int, OutgoingBase*>::iterator p = _requests.end();
map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.end();
- if(_requestsHint != _requests.end())
+ if(_asyncRequestsHint != _asyncRequests.end())
{
- if(_requestsHint->first == requestId)
+ if(_asyncRequestsHint->first == requestId)
{
- p = _requestsHint;
+ q = _asyncRequestsHint;
}
}
- if(p == _requests.end())
- {
- if(_asyncRequestsHint != _asyncRequests.end())
- {
- if(_asyncRequestsHint->first == requestId)
- {
- q = _asyncRequestsHint;
- }
- }
- }
-
- if(p == _requests.end() && q == _asyncRequests.end())
- {
- p = _requests.find(requestId);
- }
-
- if(p == _requests.end() && q == _asyncRequests.end())
+ if(q == _asyncRequests.end())
{
q = _asyncRequests.find(requestId);
}
- if(p != _requests.end())
- {
- p->second->completed(stream);
-
- if(p == _requestsHint)
- {
- _requests.erase(p++);
- _requestsHint = p;
- }
- else
- {
- _requests.erase(p);
- }
- notifyAll(); // Notify threads blocked in close(false)
- }
- else if(q != _asyncRequests.end())
+ if(q != _asyncRequests.end())
{
outAsync = q->second;