summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Connection.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r--cpp/src/Ice/Connection.cpp77
1 files changed, 56 insertions, 21 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp
index 6a2d185c3b3..1d00d73b1ea 100644
--- a/cpp/src/Ice/Connection.cpp
+++ b/cpp/src/Ice/Connection.cpp
@@ -57,6 +57,9 @@ IceInternal::Connection::validate()
{
if(_adapter)
{
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ assert(_threadPool); // The transceiver cannot be closed already.
+
//
// Incoming connections play the active role with respect to
// connection validation.
@@ -70,8 +73,6 @@ IceInternal::Connection::validate()
os.write(validateConnectionMsg);
os.write((Byte)1); // Compression status.
os.write(headerSize); // Message size.
-
- IceUtil::Mutex::Lock sendSync(_sendMutex);
os.i = os.b.begin();
traceHeader("sending validate connection", os, _logger, _traceLevels);
_transceiver->write(os, _endpoint->timeout());
@@ -457,6 +458,14 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
try
{
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ if(!_threadPool) // Has the transceiver already been closed?
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
BasicStream* os = out->os();
//
@@ -471,7 +480,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
copy(p, p + sizeof(Int), os->b.begin() + headerSize);
#endif
}
-
+
bool compress;
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
@@ -481,24 +490,23 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
{
compress = _endpoint->compress();
}
-
+
if(compress)
{
//
// Set compression status.
//
os->b[9] = 2; // Message is compressed.
-
+
//
// Do compression.
//
BasicStream cstream(_instance.get());
doCompress(*os, cstream);
-
+
//
// Send the request.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceRequest("sending request", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
@@ -516,11 +524,10 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway)
#else
copy(p, p + sizeof(Int), os->b.begin() + 10);
#endif
-
+
//
// Send the request.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceRequest("sending request", *os, _logger, _traceLevels);
_transceiver->write(*os, _endpoint->timeout());
@@ -597,6 +604,14 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
try
{
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ if(!_threadPool) // Has the transceiver already been closed?
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
BasicStream* os = out->__os();
//
@@ -635,7 +650,6 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
//
// Send the request.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
@@ -657,7 +671,6 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out)
//
// Send the request.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceRequest("sending asynchronous request", *os, _logger, _traceLevels);
_transceiver->write(*os, _endpoint->timeout());
@@ -788,6 +801,14 @@ IceInternal::Connection::flushBatchRequest()
try
{
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ if(!_threadPool) // Has the transceiver already been closed?
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
//
// Fill in the number of requests in the batch.
//
@@ -824,7 +845,6 @@ IceInternal::Connection::flushBatchRequest()
//
// Send the batch request.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
_batchStream.i = _batchStream.b.begin();
traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
cstream.i = cstream.b.begin();
@@ -846,7 +866,6 @@ IceInternal::Connection::flushBatchRequest()
//
// Send the batch request.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
_batchStream.i = _batchStream.b.begin();
traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels);
_transceiver->write(_batchStream, _endpoint->timeout());
@@ -900,6 +919,14 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
{
try
{
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ if(!_threadPool) // Has the transceiver already been closed?
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this);
+ assert(_exception.get());
+ _exception->ice_throw();
+ }
+
bool compress;
if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes.
{
@@ -926,7 +953,6 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
//
// Send the reply.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceReply("sending reply", *os, _logger, _traceLevels);
cstream.i = cstream.b.begin();
@@ -948,7 +974,6 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag)
//
// Send the reply.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
os->i = os->b.begin();
traceReply("sending reply", *os, _logger, _traceLevels);
_transceiver->write(*os, _endpoint->timeout());
@@ -1419,6 +1444,12 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool)
}
else if(_state == StateClosed)
{
+ //
+ // We must make sure that nobody is sending when we close
+ // the transceiver.
+ //
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+
try
{
_transceiver->close();
@@ -1723,6 +1754,12 @@ IceInternal::Connection::setState(State state)
if(_state == StateNotValidated)
{
assert(!_registeredWithPool);
+
+ //
+ // We must make sure that nobody is sending when we
+ // close the transceiver.
+ //
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
try
{
@@ -1735,6 +1772,7 @@ IceInternal::Connection::setState(State state)
assert(_threadPool);
_threadPool = 0; // We don't need the thread pool anymore.
+ //notifyAll(); // We notify already below.
}
else
{
@@ -1754,11 +1792,6 @@ IceInternal::Connection::setState(State state)
{
try
{
- //
- // Locking of _sendMutex is not necessary here, because if
- // we are in closing state, there are no sending threads
- // anymore.
- //
initiateShutdown();
}
catch(const LocalException& ex)
@@ -1776,6 +1809,9 @@ IceInternal::Connection::initiateShutdown() const
if(!_endpoint->datagram())
{
+ IceUtil::Mutex::Lock sendSync(_sendMutex);
+ assert(_threadPool); // The transceiver cannot be closed already.
+
//
// Before we shut down, we send a close connection message.
//
@@ -1792,7 +1828,6 @@ IceInternal::Connection::initiateShutdown() const
//
// Send the message.
//
- IceUtil::Mutex::Lock sendSync(_sendMutex);
os.i = os.b.begin();
traceHeader("sending close connection", os, _logger, _traceLevels);
_transceiver->write(os, _endpoint->timeout());