summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/ConnectionI.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2015-03-10 12:12:10 +0100
committerBenoit Foucher <benoit@zeroc.com>2015-03-10 12:12:10 +0100
commitc6ca68d97aa5bbc2a172e3e35171b5452657fa22 (patch)
tree46edcca4c8e313285a205bf6fad7c56c452c0cc0 /cpp/src/Ice/ConnectionI.cpp
parentMinor JS style fixes (diff)
downloadice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.bz2
ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.tar.xz
ice-c6ca68d97aa5bbc2a172e3e35171b5452657fa22.zip
ICE-6170 - fixed behavior of batch requests
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r--cpp/src/Ice/ConnectionI.cpp420
1 files changed, 57 insertions, 363 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp
index 1c22af06bb8..734e9c48695 100644
--- a/cpp/src/Ice/ConnectionI.cpp
+++ b/cpp/src/Ice/ConnectionI.cpp
@@ -27,6 +27,7 @@
#include <Ice/RequestHandler.h> // For RetryException
#include <Ice/ReferenceFactory.h> // For createProxy().
#include <Ice/ProxyFactory.h> // For createProxy().
+#include <Ice/BatchRequestQueue.h>
#ifdef ICE_HAS_BZIP2
# include <bzlib.h>
@@ -71,7 +72,7 @@ public:
DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB,
const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId,
Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter,
- const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback,
+ const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback,
BasicStream& stream) :
DispatchWorkItem(connection),
_connection(connection),
@@ -106,7 +107,7 @@ private:
const Int _invokeNum;
const ServantManagerPtr _servantManager;
const ObjectAdapterPtr _adapter;
- const OutgoingAsyncPtr _outAsync;
+ const OutgoingAsyncBasePtr _outAsync;
const ConnectionCallbackPtr _heartbeatCallback;
BasicStream _stream;
};
@@ -597,7 +598,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__));
}
else if(acm.close != CloseOnInvocation &&
- _dispatchCount == 0 && _batchStream.b.empty() && _requests.empty() && _asyncRequests.empty())
+ _dispatchCount == 0 && _batchRequestQueue->isEmpty() && _requests.empty() && _asyncRequests.empty())
{
//
// The connection is idle, close it.
@@ -608,7 +609,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm)
}
bool
-Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
+Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, int batchRequestNum)
{
BasicStream* os = out->os();
@@ -655,6 +656,15 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#endif
}
+ else if(batchRequestNum > 0)
+ {
+ const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#endif
+ }
out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
@@ -680,14 +690,14 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response)
//
// Add to the requests map.
//
- _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out));
+ _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingBase*>(requestId, out));
}
return sent;
}
AsyncStatus
-Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response)
+Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compress, bool response, int batchRequestNum)
{
BasicStream* os = out->getOs();
@@ -740,6 +750,15 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#endif
}
+ else if(batchRequestNum > 0)
+ {
+ const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), os->b.begin() + headerSize);
+#endif
+ }
out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
@@ -762,211 +781,21 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b
// Add to the async requests map.
//
_asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(),
- pair<const Int, OutgoingAsyncPtr>(requestId, out));
+ pair<const Int, OutgoingAsyncBasePtr>(requestId, out));
}
return status;
}
-void
-Ice::ConnectionI::prepareBatchRequest(BasicStream* os)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // Wait if flushing is currently in progress.
- //
- while(_batchStreamInUse && !_exception.get())
- {
- wait();
- }
-
- if(_exception.get())
- {
- //
- // If there were no batch requests queued when the connection failed, we can safely
- // retry with a new connection. Otherwise, we must throw to notify the caller that
- // some previous batch requests were not sent.
- //
- if(_batchStream.b.empty())
- {
- throw RetryException(*_exception.get());
- }
- else
- {
- _exception->ice_throw();
- }
- }
-
- assert(_state > StateNotValidated);
- assert(_state < StateClosing);
-
- if(_batchStream.b.empty())
- {
- try
- {
- _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- ex.ice_throw();
- }
- }
-
- _batchStreamInUse = true;
- _batchMarker = _batchStream.b.size();
- _batchStream.swap(*os);
-
- //
- // The batch stream now belongs to the caller, until
- // finishBatchRequest() or abortBatchRequest() is called.
- //
-}
-
-void
-Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress)
-{
- try
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- //
- // Get the batch stream back.
- //
- _batchStream.swap(*os);
-
- if(_exception.get())
- {
- return;
- }
-
- bool flush = false;
- if(_batchAutoFlushSize > 0)
- {
- if(_batchStream.b.size() > _batchAutoFlushSize)
- {
- flush = true;
- }
-
- //
- // Throw memory limit exception if the first message added causes us to
- // go over limit. Otherwise put aside the marshalled message that caused
- // limit to be exceeded and rollback stream to the marker.
- //
- try
- {
- _transceiver->checkSendSize(_batchStream);
- }
- catch(const Ice::Exception&)
- {
- if(_batchRequestNum > 0)
- {
- flush = true;
- }
- else
- {
- throw;
- }
- }
- }
-
- if(flush)
- {
- //
- // Temporarily save the last request.
- //
- vector<Ice::Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end());
- _batchStream.b.resize(_batchMarker);
-
- //
- // Send the batch stream without the last request.
- //
- try
- {
- //
- // Fill in the number of requests in the batch.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#endif
-
- OutgoingMessage message(&_batchStream, _batchRequestCompress);
- sendMessage(message);
- }
- catch(const Ice::LocalException& ex)
- {
- setState(StateClosed, ex);
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- //
- // Reset the batch.
- //
- BasicStream dummy(_instance.get(), currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
-
- //
- // Start a new batch with the last message that caused us to go over the limit.
- //
- _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr));
- _batchStream.writeBlob(&lastRequest[0], lastRequest.size());
- }
-
- //
- // Increment the number of requests in the batch.
- //
- ++_batchRequestNum;
-
- //
- // We compress the whole batch if there is at least one compressed
- // message.
- //
- if(compress)
- {
- _batchRequestCompress = true;
- }
-
- //
- // Notify about the batch stream not being in use anymore.
- //
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
- }
- catch(const Ice::LocalException&)
- {
- abortBatchRequest();
- throw;
- }
-}
-
-void
-Ice::ConnectionI::abortBatchRequest()
+BatchRequestQueuePtr
+Ice::ConnectionI::getBatchRequestQueue() const
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
-
- BasicStream dummy(_instance.get(), currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
-
- assert(_batchStreamInUse);
- _batchStreamInUse = false;
- notifyAll();
+ return _batchRequestQueue;
}
void
Ice::ConnectionI::flushBatchRequests()
{
- FlushBatch out(this, _instance.get(), __flushBatchRequests_name);
+ ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name);
out.invoke();
}
@@ -1032,12 +861,12 @@ Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (con
AsyncResultPtr
Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie)
{
- ConnectionFlushBatchPtr result = new ConnectionFlushBatch(this,
- _communicator,
- _instance,
- __flushBatchRequests_name,
- cb,
- cookie);
+ ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync(this,
+ _communicator,
+ _instance,
+ __flushBatchRequests_name,
+ cb,
+ cookie);
result->invoke();
return result;
}
@@ -1049,136 +878,6 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r)
r->__wait();
}
-bool
-Ice::ConnectionI::flushBatchRequests(OutgoingBase* out)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while(_batchStreamInUse && !_exception.get())
- {
- wait();
- }
-
- if(_exception.get())
- {
- _exception->ice_throw();
- }
-
- if(_batchRequestNum == 0)
- {
- out->sent();
- return true;
- }
-
- //
- // Fill in the number of requests in the batch.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#endif
- _batchStream.swap(*out->os());
-
- out->attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
-
- //
- // Send the batch stream.
- //
- bool sent = false;
- try
- {
- OutgoingMessage message(out, out->os(), _batchRequestCompress, 0);
- sent = sendMessage(message) & AsyncStatusSent;
- }
- catch(const Ice::LocalException& ex)
- {
- setState(StateClosed, ex);
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy(_instance.get(), Ice::currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
- return sent;
-}
-
-AsyncStatus
-Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
- while(_batchStreamInUse && !_exception.get())
- {
- wait();
- }
-
- if(_exception.get())
- {
- _exception->ice_throw();
- }
-
- if(_batchRequestNum == 0)
- {
- AsyncStatus status = AsyncStatusSent;
- if(outAsync->sent())
- {
- status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback);
- }
- return status;
- }
-
- //
- // Notify the request that it's cancelable with this connection.
- // This will throw if the request is canceled.
- //
- outAsync->cancelable(this);
-
- //
- // Fill in the number of requests in the batch.
- //
- const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#endif
- _batchStream.swap(*outAsync->getOs());
-
- outAsync->attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
-
- //
- // Send the batch stream.
- //
- AsyncStatus status = AsyncStatusQueued;
- try
- {
- OutgoingMessage message(outAsync, outAsync->getOs(), _batchRequestCompress, 0);
- status = sendMessage(message);
- }
- catch(const Ice::LocalException& ex)
- {
- setState(StateClosed, ex);
- assert(_exception.get());
- _exception->ice_throw();
- }
-
- //
- // Reset the batch stream.
- //
- BasicStream dummy(_instance.get(), Ice::currentProtocolEncoding);
- _batchStream.swap(dummy);
- _batchRequestNum = 0;
- _batchRequestCompress = false;
- _batchMarker = 0;
- return status;
-}
-
void
Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback)
{
@@ -1295,7 +994,7 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException&
{
if(o->requestId)
{
- if(_requestsHint != _requests.end() && _requestsHint->second == dynamic_cast<Outgoing*>(out))
+ if(_requestsHint != _requests.end() && _requestsHint->second == out)
{
_requests.erase(_requestsHint);
_requestsHint = _requests.end();
@@ -1331,10 +1030,9 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException&
}
}
- Outgoing* o = dynamic_cast<Outgoing*>(out);
- if(o)
+ if(dynamic_cast<Outgoing*>(out))
{
- if(_requestsHint != _requests.end() && _requestsHint->second == o)
+ if(_requestsHint != _requests.end() && _requestsHint->second == out)
{
if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
{
@@ -1342,7 +1040,7 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException&
}
else
{
- o->completed(ex);
+ out->completed(ex);
_requests.erase(_requestsHint);
_requestsHint = _requests.end();
}
@@ -1350,9 +1048,9 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException&
}
else
{
- for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
+ for(map<Int, OutgoingBase*>::iterator p = _requests.begin(); p != _requests.end(); ++p)
{
- if(p->second == o)
+ if(p->second == out)
{
if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
{
@@ -1360,7 +1058,7 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException&
}
else
{
- o->completed(ex);
+ p->second->completed(ex);
assert(p != _requestsHint);
_requests.erase(p);
}
@@ -1430,12 +1128,11 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
}
}
- OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync);
- if(o)
+ if(OutgoingAsyncPtr::dynamicCast(outAsync))
{
if(_asyncRequestsHint != _asyncRequests.end())
{
- if(_asyncRequestsHint->second == o)
+ if(_asyncRequestsHint->second == outAsync)
{
if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
{
@@ -1454,9 +1151,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con
}
}
- for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
+ for(map<Int, OutgoingAsyncBasePtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p)
{
- if(p->second.get() == o.get())
+ if(p->second.get() == outAsync.get())
{
if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex))
{
@@ -1769,7 +1466,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
Int invokeNum = 0;
ServantManagerPtr servantManager;
ObjectAdapterPtr adapter;
- OutgoingAsyncPtr outAsync;
+ OutgoingAsyncBasePtr outAsync;
ConnectionCallbackPtr heartbeatCallback;
int dispatchCount = 0;
@@ -2039,7 +1736,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current)
void
ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs,
Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager,
- const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync,
+ const ObjectAdapterPtr& adapter, const OutgoingAsyncBasePtr& outAsync,
const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream)
{
int dispatchedCount = 0;
@@ -2290,13 +1987,13 @@ Ice::ConnectionI::finish(bool close)
_sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage
}
- for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
{
p->second->completed(*_exception.get());
}
_requests.clear();
- for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
+ for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q)
{
if(q->second->completed(*_exception.get()))
{
@@ -2429,12 +2126,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator,
_requestsHint(_requests.end()),
_asyncRequestsHint(_asyncRequests.end()),
_messageSizeMax(adapter ? adapter->messageSizeMax() : _instance->messageSizeMax()),
- _batchAutoFlushSize(_instance->batchAutoFlushSize()),
- _batchStream(_instance.get(), Ice::currentProtocolEncoding),
- _batchStreamInUse(false),
- _batchRequestNum(0),
- _batchRequestCompress(false),
- _batchMarker(0),
+ _batchRequestQueue(new BatchRequestQueue(instance, endpoint->datagram())),
_readStream(_instance.get(), Ice::currentProtocolEncoding),
_readHeader(false),
_writeStream(_instance.get(), Ice::currentProtocolEncoding),
@@ -2647,6 +2339,8 @@ Ice::ConnectionI::setState(State state)
return;
}
+ _batchRequestQueue->destroy(*_exception.get());
+
//
// Don't need to close now for connections so only close the transceiver
// if the selector request it.
@@ -3431,7 +3125,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse
SocketOperation
Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress,
ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter,
- OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
+ OutgoingAsyncBasePtr& outAsync, ConnectionCallbackPtr& heartbeatCallback,
int& dispatchCount)
{
assert(_state > StateNotValidated && _state < StateClosed);
@@ -3556,8 +3250,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request
stream.read(requestId);
- map<Int, Outgoing*>::iterator p = _requests.end();
- map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end();
+ map<Int, OutgoingBase*>::iterator p = _requests.end();
+ map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.end();
if(_requestsHint != _requests.end())
{