summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp238
1 files changed, 121 insertions, 117 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 30fa07e37cc..8162665728c 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -110,6 +110,7 @@ IceInternal::Connection::waitUntilFinished()
if(!timedWait(IceUtil::Time::milliSeconds(_endpoint->timeout())))
{
setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__));
+ assert(_dispatchCount == 0);
// No return here, we must still wait until _transceiver becomes null.
}
}
@@ -149,10 +150,7 @@ IceInternal::Connection::monitor()
//
// Active connection management for idle connections.
//
- if(_acmTimeout > 0 &&
- _requests.empty() && _asyncRequests.empty() &&
- _batchStream.b.empty() &&
- _dispatchCount == 0)
+ if(_acmTimeout > 0 && closeOK())
{
if(IceUtil::Time::now() >= _acmAbsoluteTimeout)
{
@@ -342,15 +340,8 @@ IceInternal::Connection::decProxyCount()
assert(_proxyCount > 0);
--_proxyCount;
- //
- // We close the connection if
- // - no proxy uses this connection anymore; and
- // - there are not outstanding asynchronous requests; and
- // - this is an outgoing connection only.
- //
- if(_proxyCount == 0 && _asyncRequests.empty() && !_adapter)
+ if(_proxyCount == 0 && !_adapter && closeOK())
{
- assert(_requests.empty());
setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
}
}
@@ -389,8 +380,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
_nextRequestId = 1;
requestId = _nextRequestId++;
}
- const Byte* p;
- p = reinterpret_cast<const Byte*>(&requestId);
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#else
@@ -434,14 +424,14 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
//
// No compression, just fill in the message size.
//
- const Byte* p;
Int sz = static_cast<Int>(os->b.size());
- p = reinterpret_cast<const Byte*>(&sz);
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
#else
copy(p, p + sizeof(Int), os->b.begin() + 10);
#endif
+
//
// Send the request.
//
@@ -498,9 +488,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
_nextRequestId = 1;
requestId = _nextRequestId++;
}
- const Byte* p;
- p = reinterpret_cast<const Byte*>(&requestId);
-
+ const Byte* p = reinterpret_cast<const Byte*>(&requestId);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#else
@@ -544,12 +532,13 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
// No compression, just fill in the message size.
//
Int sz = static_cast<Int>(os->b.size());
- p = reinterpret_cast<const Byte*>(&sz);
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
#else
copy(p, p + sizeof(Int), os->b.begin() + 10);
#endif
+
//
// Send the request.
//
@@ -637,104 +626,105 @@ IceInternal::Connection::flushBatchRequest()
{
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this);
- if(_batchStream.b.empty())
- {
- return; // Nothing to send.
- }
-
if(_exception.get())
{
_exception->ice_throw();
}
assert(_state > StateNotValidated && _state < StateClosing);
- try
+ if(!_batchStream.b.empty())
{
- _batchStream.i = _batchStream.b.begin();
-
- //
- // Fill in the number of requests in the batch.
- //
- const Byte* p;
- p = reinterpret_cast<const Byte*>(&_batchRequestNum);
-
-#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#else
- copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
-#endif
-
- bool compress;
- if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
- {
- compress = false;
- }
- else
- {
- compress = _endpoint->compress();
- }
-
- if(compress)
+ try
{
- //
- // Set compression status.
- //
- _batchStream.b[9] = 2; // Message is compressed.
-
- //
- // Do compression.
- //
- BasicStream cstream(_instance.get());
- doCompress(_batchStream, cstream);
-
- //
- // Send the batch request.
- //
_batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- cstream.i = cstream.b.begin();
- _transceiver->write(cstream, _endpoint->timeout());
- }
- else
- {
+
//
- // No compression, just fill in the message size.
+ // Fill in the number of requests in the batch.
//
- Int sz = static_cast<Int>(_batchStream.b.size());
- p = reinterpret_cast<const Byte*>(&sz);
+ const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum);
+#ifdef ICE_BIG_ENDIAN
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#else
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize);
+#endif
+ bool compress;
+ if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
+ {
+ compress = false;
+ }
+ else
+ {
+ compress = _endpoint->compress();
+ }
+
+ if(compress)
+ {
+ //
+ // Set compression status.
+ //
+ _batchStream.b[9] = 2; // Message is compressed.
+
+ //
+ // Do compression.
+ //
+ BasicStream cstream(_instance.get());
+ doCompress(_batchStream, cstream);
+
+ //
+ // Send the batch request.
+ //
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ cstream.i = cstream.b.begin();
+ _transceiver->write(cstream, _endpoint->timeout());
+ }
+ else
+ {
+ //
+ // No compression, just fill in the message size.
+ //
+ Int sz = static_cast<Int>(_batchStream.b.size());
+ p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
- reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
+ reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
#else
- copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
+ copy(p, p + sizeof(Int), _batchStream.b.begin() + 10);
#endif
+
+ //
+ // Send the batch request.
+ //
+ _batchStream.i = _batchStream.b.begin();
+ traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
+ _transceiver->write(_batchStream, _endpoint->timeout());
+ }
+
//
- // Send the batch request.
+ // Reset _batchStream and _batchRequestNum, so that new batch
+ // messages can be sent.
//
- _batchStream.i = _batchStream.b.begin();
- traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
- _transceiver->write(_batchStream, _endpoint->timeout());
+ BasicStream dummy(_instance.get());
+ _batchStream.swap(dummy);
+ assert(_batchStream.b.empty());
+ _batchRequestNum = 0;
+ }
+ catch(const LocalException& ex)
+ {
+ setState(StateClosed, ex);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
+ if(_acmTimeout > 0)
+ {
+ _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
}
-
- //
- // Reset _batchStream and _batchRequestNum, so that new batch
- // messages can be sent.
- //
- BasicStream dummy(_instance.get());
- _batchStream.swap(dummy);
- assert(_batchStream.b.empty());
- _batchRequestNum = 0;
- }
- catch(const LocalException& ex)
- {
- setState(StateClosed, ex);
- assert(_exception.get());
- _exception->ice_throw();
}
-
- if(_acmTimeout > 0)
+
+ if(_proxyCount == 0 && !_adapter && closeOK())
{
- _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout);
+ setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
}
}
@@ -745,16 +735,17 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
try
{
- if(--_dispatchCount == 0)
- {
- notifyAll();
- }
-
if(_state == StateClosed)
{
+ assert(_dispatchCount == 0);
return;
}
+ if(--_dispatchCount == 0)
+ {
+ notifyAll();
+ }
+
bool compress;
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
@@ -791,15 +782,14 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
//
// No compression, just fill in the message size.
//
- const Byte* p;
Int sz = static_cast<Int>(os->b.size());
- p = reinterpret_cast<const Byte*>(&sz);
-
+ const Byte* p = reinterpret_cast<const Byte*>(&sz);
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), os->b.begin() + 10);
#else
copy(p, p + sizeof(Int), os->b.begin() + 10);
#endif
+
//
// Send the reply.
//
@@ -831,6 +821,12 @@ IceInternal::Connection::sendNoResponse()
try
{
+ if(_state == StateClosed)
+ {
+ assert(_dispatchCount == 0);
+ return;
+ }
+
if(--_dispatchCount == 0)
{
notifyAll();
@@ -1127,15 +1123,8 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa
_asyncRequests.erase(q);
}
- //
- // We close the connection if
- // - no proxy uses this connection anymore; and
- // - there are not outstanding asynchronous requests; and
- // - this is an outgoing connection only.
- //
- if(_proxyCount == 0 && _asyncRequests.empty() && !_adapter)
+ if(_proxyCount == 0 && !_adapter && closeOK())
{
- assert(_requests.empty());
setState(StateClosing, CloseConnectionException(__FILE__, __LINE__));
}
}
@@ -1494,6 +1483,12 @@ IceInternal::Connection::setState(State state)
case StateClosed:
{
//
+ // If we do a hard close, all outstanding requests are
+ // disregarded.
+ //
+ _dispatchCount = 0;
+
+ //
// If we change from not validated, we can close right
// away. Otherwise we first must make sure that we are
// registered, then we unregister, and let finished() do
@@ -1695,25 +1690,24 @@ IceInternal::Connection::doCompress(BasicStream& uncompressed, BasicStream& comp
//
Int compressedSize = static_cast<Int>(compressed.b.size());
p = reinterpret_cast<const Byte*>(&compressedSize);
-
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), uncompressed.b.begin() + 10);
#else
copy(p, p + sizeof(Int), uncompressed.b.begin() + 10);
#endif
+
//
// Add the size of the uncompressed stream before the message body
// of the compressed stream.
//
Int uncompressedSize = static_cast<Int>(uncompressed.b.size());
p = reinterpret_cast<const Byte*>(&uncompressedSize);
-
#ifdef ICE_BIG_ENDIAN
reverse_copy(p, p + sizeof(Int), compressed.b.begin() + headerSize);
#else
copy(p, p + sizeof(Int), compressed.b.begin() + headerSize);
#endif
-
+
//
// Copy the header from the uncompressed stream to the compressed one.
//
@@ -1748,3 +1742,13 @@ IceInternal::Connection::doUncompress(BasicStream& compressed, BasicStream& unco
copy(compressed.b.begin(), compressed.b.begin() + headerSize, uncompressed.b.begin());
}
+
+bool
+IceInternal::Connection::closeOK() const
+{
+ return
+ _requests.empty() &&
+ _asyncRequests.empty() &&
+ _batchStream.b.empty() &&
+ _dispatchCount == 0;
+}