diff options
author | Marc Laukien <marc@zeroc.com> | 2004-01-23 17:50:31 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-01-23 17:50:31 +0000 |
commit | d558a3af64c1b1cbc8e188d7f13705dad3677ced (patch) | |
tree | 176be43bf5b11db91f9778b3df709d99d7a81ed6 /cpp/src/Ice/Connection.cpp | |
parent | comment edit (diff) | |
download | ice-d558a3af64c1b1cbc8e188d7f13705dad3677ced.tar.bz2 ice-d558a3af64c1b1cbc8e188d7f13705dad3677ced.tar.xz ice-d558a3af64c1b1cbc8e188d7f13705dad3677ced.zip |
fixes
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 69 |
1 files changed, 45 insertions, 24 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index f684f3525b2..c7b08b24487 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -666,6 +666,11 @@ IceInternal::Connection::prepareBatchRequest(BasicStream* os) assert(_state > StateNotValidated); assert(_state < StateClosing); + while(_batchFlushInProgress) + { + wait(); + } + if(_batchStream.b.empty()) { try @@ -714,8 +719,6 @@ IceInternal::Connection::abortBatchRequest() void IceInternal::Connection::flushBatchRequest() { - BasicStream batchStream(_instance.get()); - { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); @@ -736,38 +739,34 @@ IceInternal::Connection::flushBatchRequest() return; // Nothing to do. } + _batchStream.i = _batchStream.b.begin(); + if(_acmTimeout > 0) { _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); } // - // Reset _batchStream and _batchRequestNum, so that new batch - // messages can be sent. + // Prevent that new batch requests are added while we are + // flushing. // - _batchStream.swap(batchStream); - assert(_batchStream.b.empty()); - _batchRequestNum = 0; + _batchFlushInProgress = true; } - assert(!batchStream.b.empty()); - try { - batchStream.i = batchStream.b.begin(); - // // Fill in the number of requests in the batch. // const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize); + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #else - copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize); + copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif bool compress; - if(batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. + if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { compress = false; } @@ -781,20 +780,20 @@ IceInternal::Connection::flushBatchRequest() // // Set compression status. // - batchStream.b[9] = 2; // Message is compressed. + _batchStream.b[9] = 2; // Message is compressed. // // Do compression. // BasicStream cstream(_instance.get()); - doCompress(batchStream, cstream); + doCompress(_batchStream, cstream); // // Send the batch request. // IceUtil::Mutex::Lock sendSync(_sendMutex); - batchStream.i = batchStream.b.begin(); - traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels); + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); cstream.i = cstream.b.begin(); _transceiver->write(cstream, _endpoint->timeout()); } @@ -803,21 +802,21 @@ IceInternal::Connection::flushBatchRequest() // // No compression, just fill in the message size. // - Int sz = static_cast<Int>(batchStream.b.size()); + Int sz = static_cast<Int>(_batchStream.b.size()); p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + 10); + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); #else - copy(p, p + sizeof(Int), batchStream.b.begin() + 10); + copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); #endif // // Send the batch request. // IceUtil::Mutex::Lock sendSync(_sendMutex); - batchStream.i = batchStream.b.begin(); - traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels); - _transceiver->write(batchStream, _endpoint->timeout()); + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver->write(_batchStream, _endpoint->timeout()); } } catch(const LocalException& ex) @@ -827,6 +826,21 @@ IceInternal::Connection::flushBatchRequest() assert(_exception.get()); // + // Reset _batchStream and _batchRequestNum, so that new batch + // messages can be sent. + // + BasicStream dummy(_instance.get()); + _batchStream.swap(dummy); + assert(_batchStream.b.empty()); + _batchRequestNum = 0; + + // + // Notify that flushing is over. + // + _batchFlushInProgress = false; + notifyAll(); + + // // Since batch requests are all oneways (or datagrams), we // must report the exception to the caller. // @@ -836,6 +850,12 @@ IceInternal::Connection::flushBatchRequest() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + // + // Notify that flushing is over. + // + _batchFlushInProgress = false; + notifyAll(); + if(_proxyCount == 0 && !_adapter && closingOK()) { setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); @@ -1459,6 +1479,7 @@ IceInternal::Connection::Connection(const InstancePtr& instance, _asyncRequestsHint(_asyncRequests.end()), _batchStream(_instance.get()), _batchRequestNum(0), + _batchFlushInProgress(false), _dispatchCount(0), _proxyCount(0), _state(StateNotValidated), |