diff options
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 238 |
1 files changed, 121 insertions, 117 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 30fa07e37cc..8162665728c 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -110,6 +110,7 @@ IceInternal::Connection::waitUntilFinished() if(!timedWait(IceUtil::Time::milliSeconds(_endpoint->timeout()))) { setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + assert(_dispatchCount == 0); // No return here, we must still wait until _transceiver becomes null. } } @@ -149,10 +150,7 @@ IceInternal::Connection::monitor() // // Active connection management for idle connections. // - if(_acmTimeout > 0 && - _requests.empty() && _asyncRequests.empty() && - _batchStream.b.empty() && - _dispatchCount == 0) + if(_acmTimeout > 0 && closeOK()) { if(IceUtil::Time::now() >= _acmAbsoluteTimeout) { @@ -342,15 +340,8 @@ IceInternal::Connection::decProxyCount() assert(_proxyCount > 0); --_proxyCount; - // - // We close the connection if - // - no proxy uses this connection anymore; and - // - there are not outstanding asynchronous requests; and - // - this is an outgoing connection only. - // - if(_proxyCount == 0 && _asyncRequests.empty() && !_adapter) + if(_proxyCount == 0 && !_adapter && closeOK()) { - assert(_requests.empty()); setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } } @@ -389,8 +380,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) _nextRequestId = 1; requestId = _nextRequestId++; } - const Byte* p; - p = reinterpret_cast<const Byte*>(&requestId); + const Byte* p = reinterpret_cast<const Byte*>(&requestId); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); #else @@ -434,14 +424,14 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) // // No compression, just fill in the message size. // - const Byte* p; Int sz = static_cast<Int>(os->b.size()); - p = reinterpret_cast<const Byte*>(&sz); + const Byte* p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); #else copy(p, p + sizeof(Int), os->b.begin() + 10); #endif + // // Send the request. // @@ -498,9 +488,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) _nextRequestId = 1; requestId = _nextRequestId++; } - const Byte* p; - p = reinterpret_cast<const Byte*>(&requestId); - + const Byte* p = reinterpret_cast<const Byte*>(&requestId); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); #else @@ -544,12 +532,13 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) // No compression, just fill in the message size. // Int sz = static_cast<Int>(os->b.size()); - p = reinterpret_cast<const Byte*>(&sz); + const Byte* p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); #else copy(p, p + sizeof(Int), os->b.begin() + 10); #endif + // // Send the request. // @@ -637,104 +626,105 @@ IceInternal::Connection::flushBatchRequest() { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - if(_batchStream.b.empty()) - { - return; // Nothing to send. - } - if(_exception.get()) { _exception->ice_throw(); } assert(_state > StateNotValidated && _state < StateClosing); - try + if(!_batchStream.b.empty()) { - _batchStream.i = _batchStream.b.begin(); - - // - // Fill in the number of requests in the batch. - // - const Byte* p; - p = reinterpret_cast<const Byte*>(&_batchRequestNum); - -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#endif - - bool compress; - if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. - { - compress = false; - } - else - { - compress = _endpoint->compress(); - } - - if(compress) + try { - // - // Set compression status. - // - _batchStream.b[9] = 2; // Message is compressed. - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(_batchStream, cstream); - - // - // Send the batch request. - // _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); - } - else - { + // - // No compression, just fill in the message size. + // Fill in the number of requests in the batch. // - Int sz = static_cast<Int>(_batchStream.b.size()); - p = reinterpret_cast<const Byte*>(&sz); + const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); +#endif + bool compress; + if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. + { + compress = false; + } + else + { + compress = _endpoint->compress(); + } + + if(compress) + { + // + // Set compression status. + // + _batchStream.b[9] = 2; // Message is compressed. + + // + // Do compression. + // + BasicStream cstream(_instance.get()); + doCompress(_batchStream, cstream); + + // + // Send the batch request. + // + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + // + // No compression, just fill in the message size. + // + Int sz = static_cast<Int>(_batchStream.b.size()); + p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); #else - copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); + copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); #endif + + // + // Send the batch request. + // + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver->write(_batchStream, _endpoint->timeout()); + } + // - // Send the batch request. + // Reset _batchStream and _batchRequestNum, so that new batch + // messages can be sent. // - _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - _transceiver->write(_batchStream, _endpoint->timeout()); + BasicStream dummy(_instance.get()); + _batchStream.swap(dummy); + assert(_batchStream.b.empty()); + _batchRequestNum = 0; + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + assert(_exception.get()); + _exception->ice_throw(); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); } - - // - // Reset _batchStream and _batchRequestNum, so that new batch - // messages can be sent. - // - BasicStream dummy(_instance.get()); - _batchStream.swap(dummy); - assert(_batchStream.b.empty()); - _batchRequestNum = 0; - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - assert(_exception.get()); - _exception->ice_throw(); } - - if(_acmTimeout > 0) + + if(_proxyCount == 0 && !_adapter && closeOK()) { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } } @@ -745,16 +735,17 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) try { - if(--_dispatchCount == 0) - { - notifyAll(); - } - if(_state == StateClosed) { + assert(_dispatchCount == 0); return; } + if(--_dispatchCount == 0) + { + notifyAll(); + } + bool compress; if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { @@ -791,15 +782,14 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) // // No compression, just fill in the message size. // - const Byte* p; Int sz = static_cast<Int>(os->b.size()); - p = reinterpret_cast<const Byte*>(&sz); - + const Byte* p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); #else copy(p, p + sizeof(Int), os->b.begin() + 10); #endif + // // Send the reply. // @@ -831,6 +821,12 @@ IceInternal::Connection::sendNoResponse() try { + if(_state == StateClosed) + { + assert(_dispatchCount == 0); + return; + } + if(--_dispatchCount == 0) { notifyAll(); @@ -1127,15 +1123,8 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa _asyncRequests.erase(q); } - // - // We close the connection if - // - no proxy uses this connection anymore; and - // - there are not outstanding asynchronous requests; and - // - this is an outgoing connection only. - // - if(_proxyCount == 0 && _asyncRequests.empty() && !_adapter) + if(_proxyCount == 0 && !_adapter && closeOK()) { - assert(_requests.empty()); setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } } @@ -1494,6 +1483,12 @@ IceInternal::Connection::setState(State state) case StateClosed: { // + // If we do a hard close, all outstanding requests are + // disregarded. + // + _dispatchCount = 0; + + // // If we change from not validated, we can close right // away. Otherwise we first must make sure that we are // registered, then we unregister, and let finished() do @@ -1695,25 +1690,24 @@ IceInternal::Connection::doCompress(BasicStream& uncompressed, BasicStream& comp // Int compressedSize = static_cast<Int>(compressed.b.size()); p = reinterpret_cast<const Byte*>(&compressedSize); - #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), uncompressed.b.begin() + 10); #else copy(p, p + sizeof(Int), uncompressed.b.begin() + 10); #endif + // // Add the size of the uncompressed stream before the message body // of the compressed stream. // Int uncompressedSize = static_cast<Int>(uncompressed.b.size()); p = reinterpret_cast<const Byte*>(&uncompressedSize); - #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), compressed.b.begin() + headerSize); #else copy(p, p + sizeof(Int), compressed.b.begin() + headerSize); #endif - + // // Copy the header from the uncompressed stream to the compressed one. // @@ -1748,3 +1742,13 @@ IceInternal::Connection::doUncompress(BasicStream& compressed, BasicStream& unco copy(compressed.b.begin(), compressed.b.begin() + headerSize, uncompressed.b.begin()); } + +bool +IceInternal::Connection::closeOK() const +{ + return + _requests.empty() && + _asyncRequests.empty() && + _batchStream.b.empty() && + _dispatchCount == 0; +} |