diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 69 | ||||
-rw-r--r-- | cpp/src/Ice/Connection.h | 1 | ||||
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 5 |
3 files changed, 50 insertions, 25 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index f684f3525b2..c7b08b24487 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -666,6 +666,11 @@ IceInternal::Connection::prepareBatchRequest(BasicStream* os) assert(_state > StateNotValidated); assert(_state < StateClosing); + while(_batchFlushInProgress) + { + wait(); + } + if(_batchStream.b.empty()) { try @@ -714,8 +719,6 @@ IceInternal::Connection::abortBatchRequest() void IceInternal::Connection::flushBatchRequest() { - BasicStream batchStream(_instance.get()); - { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); @@ -736,38 +739,34 @@ IceInternal::Connection::flushBatchRequest() return; // Nothing to do. } + _batchStream.i = _batchStream.b.begin(); + if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); } // - // Reset _batchStream and _batchRequestNum, so that new batch - // messages can be sent. + // Prevent that new batch requests are added while we are + // flushing. // - _batchStream.swap(batchStream); - assert(_batchStream.b.empty()); - _batchRequestNum = 0; + _batchFlushInProgress = true; } - assert(!batchStream.b.empty()); - try { - batchStream.i = batchStream.b.begin(); - // // Fill in the number of requests in the batch. // const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize); + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #else - copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize); + copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif bool compress; - if(batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. + if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { compress = false; } @@ -781,20 +780,20 @@ IceInternal::Connection::flushBatchRequest() // // Set compression status. // - batchStream.b[9] = 2; // Message is compressed. + _batchStream.b[9] = 2; // Message is compressed. // // Do compression. // BasicStream cstream(_instance.get()); - doCompress(batchStream, cstream); + doCompress(_batchStream, cstream); // // Send the batch request. // IceUtil::Mutex::Lock sendSync(_sendMutex); - batchStream.i = batchStream.b.begin(); - traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels); + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); cstream.i = cstream.b.begin(); _transceiver->write(cstream, _endpoint->timeout()); } @@ -803,21 +802,21 @@ IceInternal::Connection::flushBatchRequest() // // No compression, just fill in the message size. // - Int sz = static_cast<Int>(batchStream.b.size()); + Int sz = static_cast<Int>(_batchStream.b.size()); p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + 10); + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); #else - copy(p, p + sizeof(Int), batchStream.b.begin() + 10); + copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); #endif // // Send the batch request. // IceUtil::Mutex::Lock sendSync(_sendMutex); - batchStream.i = batchStream.b.begin(); - traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels); - _transceiver->write(batchStream, _endpoint->timeout()); + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver->write(_batchStream, _endpoint->timeout()); } } catch(const LocalException& ex) @@ -827,6 +826,21 @@ IceInternal::Connection::flushBatchRequest() assert(_exception.get()); // + // Reset _batchStream and _batchRequestNum, so that new batch + // messages can be sent. + // + BasicStream dummy(_instance.get()); + _batchStream.swap(dummy); + assert(_batchStream.b.empty()); + _batchRequestNum = 0; + + // + // Notify that flushing is over. + // + _batchFlushInProgress = false; + notifyAll(); + + // // Since batch requests are all oneways (or datagrams), we // must report the exception to the caller. // @@ -836,6 +850,12 @@ IceInternal::Connection::flushBatchRequest() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + // + // Notify that flushing is over. + // + _batchFlushInProgress = false; + notifyAll(); + if(_proxyCount == 0 && !_adapter && closingOK()) { setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); @@ -1459,6 +1479,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, _asyncRequestsHint(_asyncRequests.end()), _batchStream(_instance.get()), _batchRequestNum(0), + _batchFlushInProgress(false), _dispatchCount(0), _proxyCount(0), _state(StateNotValidated), diff --git a/cpp/src/Ice/Connection.h b/cpp/src/Ice/Connection.h index 40afa3726e7..50aaf1f8ed8 100644 --- a/cpp/src/Ice/Connection.h +++ b/cpp/src/Ice/Connection.h @@ -165,6 +165,7 @@ private: BasicStream _batchStream; int _batchRequestNum; + bool _batchFlushInProgress; int _dispatchCount; diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 1f6eb92aec0..19341ccd5b4 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -770,7 +770,10 @@ IceProxy::Ice::Object::__rethrowException(const LocalException& ex) void IceProxy::Ice::Object::__checkTwowayOnly(const char* name) const { - IceUtil::Mutex::Lock sync(*this); + // + // No mutex lock necessary, there is nothing mutable in this + // operation. + // if(!ice_isTwoway()) { |