diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/BasicStream.cpp | 4 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 180 | ||||
-rw-r--r-- | cpp/src/Ice/ConnectionI.h | 4 |
3 files changed, 143 insertions, 45 deletions
diff --git a/cpp/src/Ice/BasicStream.cpp b/cpp/src/Ice/BasicStream.cpp index 061a8ed0981..e3048ec7cac 100644 --- a/cpp/src/Ice/BasicStream.cpp +++ b/cpp/src/Ice/BasicStream.cpp @@ -30,7 +30,7 @@ using namespace std; using namespace Ice; using namespace IceInternal; -IceInternal::BasicStream::BasicStream(Instance* instance) : +IceInternal::BasicStream::BasicStream(Instance* instance, bool unlimited) : IceInternal::Buffer(instance->messageSizeMax()), _instance(instance), _currentReadEncaps(0), @@ -38,6 +38,7 @@ IceInternal::BasicStream::BasicStream(Instance* instance) : _traceSlicing(-1), _sliceObjects(true), _messageSizeMax(_instance->messageSizeMax()), // Cached for efficiency. + _unlimited(unlimited), _stringConverter(instance->initializationData().stringConverter), _wstringConverter(instance->initializationData().wstringConverter), _seqDataStack(0), @@ -122,6 +123,7 @@ IceInternal::BasicStream::swap(BasicStream& other) std::swap(_seqDataStack, other._seqDataStack); std::swap(_objectList, other._objectList); + std::swap(_unlimited, other._unlimited); } // diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index c66e096b376..95a6fb66267 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -878,6 +878,7 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os) } _batchStreamInUse = true; + _batchMarker = _batchStream.b.size(); _batchStream.swap(*os); // @@ -889,30 +890,106 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os) void Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + bool autoflush = false; + vector<Ice::Byte> lastRequest; - // - // Get the batch stream back and increment the number of requests - // in the batch. - // - _batchStream.swap(*os); - ++_batchRequestNum; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // We compress the whole batch if there is at least one compressed - // message. - // - if(compress) + // + // Get the batch stream back. + // + _batchStream.swap(*os); + + if(_batchStream.b.size() > _instance->messageSizeMax()) + { + // + // Throw memory limit exception if the first message added causes us to + // go over limit. Otherwise put aside the marshalled message that caused + // limit to be exceeded and rollback stream to the marker. + // + if(_batchRequestNum == 0) + { + resetBatch(true); + throw MemoryLimitException(__FILE__, __LINE__); + } + + vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest); + _batchStream.b.resize(_batchMarker); + autoflush = true; + } + else + { + // + // Increment the number of requests in the batch. + // + ++_batchRequestNum; + + // + // We compress the whole batch if there is at least one compressed + // message. + // + if(compress) + { + _batchRequestCompress = true; + } + + // + // Notify about the batch stream not being in use anymore. + // + assert(_batchStreamInUse); + _batchStreamInUse = false; + notifyAll(); + } + } + + if(autoflush) { - _batchRequestCompress = true; + // + // We have to keep _batchStreamInUse set until after we insert the + // saved marshalled data into a new stream. + // + flushBatchRequestsInternal(true); + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // Throw memory limit exception if the message that caused us to go over + // limit causes us to exceed the limit by itself. + // + if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax()) + { + resetBatch(true); + throw MemoryLimitException(__FILE__, __LINE__); + } + + // + // Start a new batch with the last message that caused us to + // go over the limit. + // + try + { + _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); + _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + ex.ice_throw(); + } + + if(compress) + { + _batchRequestCompress = true; + } + + // + // Notify that the batch stream not in use anymore. + // + ++_batchRequestNum; + _batchStreamInUse = false; + notifyAll(); } - - // - // Notify about the batch stream not being in use anymore. - // - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); } void @@ -921,33 +998,31 @@ Ice::ConnectionI::abortBatchRequest() IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); // - // Destroy and reset the batch stream and batch count. We cannot - // safe old requests in the batch stream, as they might be - // corrupted due to incomplete marshaling. + // Reset the batch stream. We cannot save old requests + // in the batch stream, as they might be corrupted due to + // incomplete marshaling. // - BasicStream dummy(_instance.get()); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchRequestCompress = false; - - // - // Notify about the batch stream not being in use - // anymore. - // - assert(_batchStreamInUse); - _batchStreamInUse = false; - notifyAll(); + resetBatch(true); } void Ice::ConnectionI::flushBatchRequests() { + flushBatchRequestsInternal(false); +} + +void +Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse) +{ { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - while(_batchStreamInUse && !_exception.get()) + if(!ignoreInUse) { - wait(); + while(_batchStreamInUse && !_exception.get()) + { + wait(); + } } if(_exception.get()) @@ -1066,12 +1141,28 @@ Ice::ConnectionI::flushBatchRequests() // // Reset the batch stream, and notify that flushing is over. // - BasicStream dummy(_instance.get()); - _batchStream.swap(dummy); - _batchRequestNum = 0; - _batchRequestCompress = false; - _batchStreamInUse = false; - notifyAll(); + resetBatch(!ignoreInUse); + } +} + +void +Ice::ConnectionI::resetBatch(bool resetInUse) +{ + BasicStream dummy(_instance.get(), true); + _batchStream.swap(dummy); + _batchRequestNum = 0; + _batchRequestCompress = false; + _batchMarker = 0; + + // + // Notify about the batch stream not being in use + // anymore. + // + if(resetInUse) + { + assert(_batchStreamInUse); + _batchStreamInUse = false; + notifyAll(); } } @@ -1465,10 +1556,11 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, _nextRequestId(1), _requestsHint(_requests.end()), _asyncRequestsHint(_asyncRequests.end()), - _batchStream(_instance.get()), + _batchStream(_instance.get(), true), _batchStreamInUse(false), _batchRequestNum(0), _batchRequestCompress(false), + _batchMarker(0), _dispatchCount(0), _state(StateNotValidated), _stateTime(IceUtil::Time::now()) diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 7bbfa011d1b..4c694e0fac1 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -115,6 +115,9 @@ private: StateClosed }; + void resetBatch(bool); + void flushBatchRequestsInternal(bool); + void setState(State, const LocalException&); void setState(State); @@ -192,6 +195,7 @@ private: bool _batchStreamInUse; int _batchRequestNum; bool _batchRequestCompress; + size_t _batchMarker; int _dispatchCount; |