diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 420 |
1 files changed, 57 insertions, 363 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 1c22af06bb8..734e9c48695 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -27,6 +27,7 @@ #include <Ice/RequestHandler.h> // For RetryException #include <Ice/ReferenceFactory.h> // For createProxy(). #include <Ice/ProxyFactory.h> // For createProxy(). +#include <Ice/BatchRequestQueue.h> #ifdef ICE_HAS_BZIP2 # include <bzlib.h> @@ -71,7 +72,7 @@ public: DispatchCall(const ConnectionIPtr& connection, const ConnectionI::StartCallbackPtr& startCB, const vector<ConnectionI::OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter, - const OutgoingAsyncPtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, + const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) : DispatchWorkItem(connection), _connection(connection), @@ -106,7 +107,7 @@ private: const Int _invokeNum; const ServantManagerPtr _servantManager; const ObjectAdapterPtr _adapter; - const OutgoingAsyncPtr _outAsync; + const OutgoingAsyncBasePtr _outAsync; const ConnectionCallbackPtr _heartbeatCallback; BasicStream _stream; }; @@ -597,7 +598,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) setState(StateClosed, ConnectionTimeoutException(__FILE__, __LINE__)); } else if(acm.close != CloseOnInvocation && - _dispatchCount == 0 && _batchStream.b.empty() && _requests.empty() && _asyncRequests.empty()) + _dispatchCount == 0 && _batchRequestQueue->isEmpty() && _requests.empty() && _asyncRequests.empty()) { // // The connection is idle, close it. @@ -608,7 +609,7 @@ Ice::ConnectionI::monitor(const IceUtil::Time& now, const ACMConfig& acm) } bool -Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) +Ice::ConnectionI::sendRequest(OutgoingBase* out, bool compress, bool response, int batchRequestNum) { BasicStream* os = out->os(); @@ -655,6 +656,15 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) copy(p, p + sizeof(Int), os->b.begin() + headerSize); #endif } + else if(batchRequestNum > 0) + { + const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#endif + } out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); @@ -680,14 +690,14 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) // // Add to the requests map. // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + _requestsHint = _requests.insert(_requests.end(), pair<const Int, OutgoingBase*>(requestId, out)); } return sent; } AsyncStatus -Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response) +Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncBasePtr& out, bool compress, bool response, int batchRequestNum) { BasicStream* os = out->getOs(); @@ -740,6 +750,15 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b copy(p, p + sizeof(Int), os->b.begin() + headerSize); #endif } + else if(batchRequestNum > 0) + { + const Byte* p = reinterpret_cast<const Byte*>(&batchRequestNum); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#endif + } out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); @@ -762,211 +781,21 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b // Add to the async requests map. // _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), - pair<const Int, OutgoingAsyncPtr>(requestId, out)); + pair<const Int, OutgoingAsyncBasePtr>(requestId, out)); } return status; } -void -Ice::ConnectionI::prepareBatchRequest(BasicStream* os) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // Wait if flushing is currently in progress. - // - while(_batchStreamInUse && !_exception.get()) - { - wait(); - } - - if(_exception.get()) - { - // - // If there were no batch requests queued when the connection failed, we can safely - // retry with a new connection. Otherwise, we must throw to notify the caller that - // some previous batch requests were not sent. - // - if(_batchStream.b.empty()) - { - throw RetryException(*_exception.get()); - } - else - { - _exception->ice_throw(); - } - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - if(_batchStream.b.empty()) - { - try - { - _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - ex.ice_throw(); - } - } - - _batchStreamInUse = true; - _batchMarker = _batchStream.b.size(); - _batchStream.swap(*os); - - // - // The batch stream now belongs to the caller, until - // finishBatchRequest() or abortBatchRequest() is called. - // -} - -void -Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) -{ - try - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // Get the batch stream back. - // - _batchStream.swap(*os); - - if(_exception.get()) - { - return; - } - - bool flush = false; - if(_batchAutoFlushSize > 0) - { - if(_batchStream.b.size() > _batchAutoFlushSize) - { - flush = true; - } - - // - // Throw memory limit exception if the first message added causes us to - // go over limit. Otherwise put aside the marshalled message that caused - // limit to be exceeded and rollback stream to the marker. - // - try - { - _transceiver->checkSendSize(_batchStream); - } - catch(const Ice::Exception&) - { - if(_batchRequestNum > 0) - { - flush = true; - } - else - { - throw; - } - } - } - - if(flush) - { - // - // Temporarily save the last request. - // - vector<Ice::Byte> lastRequest(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()); - _batchStream.b.resize(_batchMarker); - - // - // Send the batch stream without the last request. - // - try - { - // - // Fill in the number of requests in the batch. - // - const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#endif - - OutgoingMessage message(&_batchStream, _batchRequestCompress); - sendMessage(message); - } - catch(const Ice::LocalException& ex) - { - setState(StateClosed, ex); - assert(_exception.get()); - _exception->ice_throw(); - } - - // - // Reset the batch. - // - BasicStream dummy(_instance.get(), currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - - // - // Start a new batch with the last message that caused us to go over the limit. - // - _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); - } - - // - // Increment the number of requests in the batch. - // - ++_batchRequestNum; - - // - // We compress the whole batch if there is at least one compressed - // message. - // - if(compress) - { - _batchRequestCompress = true; - } - - // - // Notify about the batch stream not being in use anymore. - // - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); - } - catch(const Ice::LocalException&) - { - abortBatchRequest(); - throw; - } -} - -void -Ice::ConnectionI::abortBatchRequest() +BatchRequestQueuePtr +Ice::ConnectionI::getBatchRequestQueue() const { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - BasicStream dummy(_instance.get(), currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); + return _batchRequestQueue; } void Ice::ConnectionI::flushBatchRequests() { - FlushBatch out(this, _instance.get(), __flushBatchRequests_name); + ConnectionFlushBatch out(this, _instance.get(), __flushBatchRequests_name); out.invoke(); } @@ -1032,12 +861,12 @@ Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (con AsyncResultPtr Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) { - ConnectionFlushBatchPtr result = new ConnectionFlushBatch(this, - _communicator, - _instance, - __flushBatchRequests_name, - cb, - cookie); + ConnectionFlushBatchAsyncPtr result = new ConnectionFlushBatchAsync(this, + _communicator, + _instance, + __flushBatchRequests_name, + cb, + cookie); result->invoke(); return result; } @@ -1049,136 +878,6 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) r->__wait(); } -bool -Ice::ConnectionI::flushBatchRequests(OutgoingBase* out) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - while(_batchStreamInUse && !_exception.get()) - { - wait(); - } - - if(_exception.get()) - { - _exception->ice_throw(); - } - - if(_batchRequestNum == 0) - { - out->sent(); - return true; - } - - // - // Fill in the number of requests in the batch. - // - const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#endif - _batchStream.swap(*out->os()); - - out->attachRemoteObserver(initConnectionInfo(), _endpoint, 0); - - // - // Send the batch stream. - // - bool sent = false; - try - { - OutgoingMessage message(out, out->os(), _batchRequestCompress, 0); - sent = sendMessage(message) & AsyncStatusSent; - } - catch(const Ice::LocalException& ex) - { - setState(StateClosed, ex); - assert(_exception.get()); - _exception->ice_throw(); - } - - // - // Reset the batch stream. - // - BasicStream dummy(_instance.get(), Ice::currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - return sent; -} - -AsyncStatus -Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - while(_batchStreamInUse && !_exception.get()) - { - wait(); - } - - if(_exception.get()) - { - _exception->ice_throw(); - } - - if(_batchRequestNum == 0) - { - AsyncStatus status = AsyncStatusSent; - if(outAsync->sent()) - { - status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); - } - return status; - } - - // - // Notify the request that it's cancelable with this connection. - // This will throw if the request is canceled. - // - outAsync->cancelable(this); - - // - // Fill in the number of requests in the batch. - // - const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#endif - _batchStream.swap(*outAsync->getOs()); - - outAsync->attachRemoteObserver(initConnectionInfo(), _endpoint, 0); - - // - // Send the batch stream. - // - AsyncStatus status = AsyncStatusQueued; - try - { - OutgoingMessage message(outAsync, outAsync->getOs(), _batchRequestCompress, 0); - status = sendMessage(message); - } - catch(const Ice::LocalException& ex) - { - setState(StateClosed, ex); - assert(_exception.get()); - _exception->ice_throw(); - } - - // - // Reset the batch stream. - // - BasicStream dummy(_instance.get(), Ice::currentProtocolEncoding); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchMarker = 0; - return status; -} - void Ice::ConnectionI::setCallback(const ConnectionCallbackPtr& callback) { @@ -1295,7 +994,7 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& { if(o->requestId) { - if(_requestsHint != _requests.end() && _requestsHint->second == dynamic_cast<Outgoing*>(out)) + if(_requestsHint != _requests.end() && _requestsHint->second == out) { _requests.erase(_requestsHint); _requestsHint = _requests.end(); @@ -1331,10 +1030,9 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& } } - Outgoing* o = dynamic_cast<Outgoing*>(out); - if(o) + if(dynamic_cast<Outgoing*>(out)) { - if(_requestsHint != _requests.end() && _requestsHint->second == o) + if(_requestsHint != _requests.end() && _requestsHint->second == out) { if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { @@ -1342,7 +1040,7 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& } else { - o->completed(ex); + out->completed(ex); _requests.erase(_requestsHint); _requestsHint = _requests.end(); } @@ -1350,9 +1048,9 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& } else { - for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) + for(map<Int, OutgoingBase*>::iterator p = _requests.begin(); p != _requests.end(); ++p) { - if(p->second == o) + if(p->second == out) { if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { @@ -1360,7 +1058,7 @@ Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& } else { - o->completed(ex); + p->second->completed(ex); assert(p != _requestsHint); _requests.erase(p); } @@ -1430,12 +1128,11 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con } } - OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); - if(o) + if(OutgoingAsyncPtr::dynamicCast(outAsync)) { if(_asyncRequestsHint != _asyncRequests.end()) { - if(_asyncRequestsHint->second == o) + if(_asyncRequestsHint->second == outAsync) { if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { @@ -1454,9 +1151,9 @@ Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, con } } - for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) + for(map<Int, OutgoingAsyncBasePtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) { - if(p->second.get() == o.get()) + if(p->second.get() == outAsync.get()) { if(dynamic_cast<const Ice::ConnectionTimeoutException*>(&ex)) { @@ -1769,7 +1466,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) Int invokeNum = 0; ServantManagerPtr servantManager; ObjectAdapterPtr adapter; - OutgoingAsyncPtr outAsync; + OutgoingAsyncBasePtr outAsync; ConnectionCallbackPtr heartbeatCallback; int dispatchCount = 0; @@ -2039,7 +1736,7 @@ Ice::ConnectionI::message(ThreadPoolCurrent& current) void ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMessage>& sentCBs, Byte compress, Int requestId, Int invokeNum, const ServantManagerPtr& servantManager, - const ObjectAdapterPtr& adapter, const OutgoingAsyncPtr& outAsync, + const ObjectAdapterPtr& adapter, const OutgoingAsyncBasePtr& outAsync, const ConnectionCallbackPtr& heartbeatCallback, BasicStream& stream) { int dispatchedCount = 0; @@ -2290,13 +1987,13 @@ Ice::ConnectionI::finish(bool close) _sendStreams.clear(); // Must be cleared before _requests because of Outgoing* references in OutgoingMessage } - for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) + for(map<Int, OutgoingBase*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { p->second->completed(*_exception.get()); } _requests.clear(); - for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) + for(map<Int, OutgoingAsyncBasePtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { if(q->second->completed(*_exception.get())) { @@ -2429,12 +2126,7 @@ Ice::ConnectionI::ConnectionI(const CommunicatorPtr& communicator, _requestsHint(_requests.end()), _asyncRequestsHint(_asyncRequests.end()), _messageSizeMax(adapter ? adapter->messageSizeMax() : _instance->messageSizeMax()), - _batchAutoFlushSize(_instance->batchAutoFlushSize()), - _batchStream(_instance.get(), Ice::currentProtocolEncoding), - _batchStreamInUse(false), - _batchRequestNum(0), - _batchRequestCompress(false), - _batchMarker(0), + _batchRequestQueue(new BatchRequestQueue(instance, endpoint->datagram())), _readStream(_instance.get(), Ice::currentProtocolEncoding), _readHeader(false), _writeStream(_instance.get(), Ice::currentProtocolEncoding), @@ -2647,6 +2339,8 @@ Ice::ConnectionI::setState(State state) return; } + _batchRequestQueue->destroy(*_exception.get()); + // // Don't need to close now for connections so only close the transceiver // if the selector request it. @@ -3431,7 +3125,7 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse SocketOperation Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncPtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, + OutgoingAsyncBasePtr& outAsync, ConnectionCallbackPtr& heartbeatCallback, int& dispatchCount) { assert(_state > StateNotValidated && _state < StateClosed); @@ -3556,8 +3250,8 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request stream.read(requestId); - map<Int, Outgoing*>::iterator p = _requests.end(); - map<Int, OutgoingAsyncPtr>::iterator q = _asyncRequests.end(); + map<Int, OutgoingBase*>::iterator p = _requests.end(); + map<Int, OutgoingAsyncBasePtr>::iterator q = _asyncRequests.end(); if(_requestsHint != _requests.end()) { |