diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 77 |
1 files changed, 56 insertions, 21 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 6a2d185c3b3..1d00d73b1ea 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -57,6 +57,9 @@ IceInternal::Connection::validate() { if(_adapter) { + IceUtil::Mutex::Lock sendSync(_sendMutex); + assert(_threadPool); // The transceiver cannot be closed already. + // // Incoming connections play the active role with respect to // connection validation. @@ -70,8 +73,6 @@ IceInternal::Connection::validate() os.write(validateConnectionMsg); os.write((Byte)1); // Compression status. os.write(headerSize); // Message size. - - IceUtil::Mutex::Lock sendSync(_sendMutex); os.i = os.b.begin(); traceHeader("sending validate connection", os, _logger, _traceLevels); _transceiver->write(os, _endpoint->timeout()); @@ -457,6 +458,14 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) try { + IceUtil::Mutex::Lock sendSync(_sendMutex); + if(!_threadPool) // Has the transceiver already been closed? + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_exception.get()); + _exception->ice_throw(); + } + BasicStream* os = out->os(); // @@ -471,7 +480,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) copy(p, p + sizeof(Int), os->b.begin() + headerSize); #endif } - + bool compress; if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { @@ -481,24 +490,23 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) { compress = _endpoint->compress(); } - + if(compress) { // // Set compression status. // os->b[9] = 2; // Message is compressed. - + // // Do compression. // BasicStream cstream(_instance.get()); doCompress(*os, cstream); - + // // Send the request. // - IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceRequest("sending request", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); @@ -516,11 +524,10 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) #else copy(p, p + sizeof(Int), os->b.begin() + 10); #endif - + // // Send the request. // - IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceRequest("sending request", *os, _logger, _traceLevels); _transceiver->write(*os, _endpoint->timeout()); @@ -597,6 +604,14 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) try { + IceUtil::Mutex::Lock sendSync(_sendMutex); + if(!_threadPool) // Has the transceiver already been closed? + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_exception.get()); + _exception->ice_throw(); + } + BasicStream* os = out->__os(); // @@ -635,7 +650,6 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) // // Send the request. // - IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceRequest("sending asynchronous request", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); @@ -657,7 +671,6 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) // // Send the request. // - IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceRequest("sending asynchronous request", *os, _logger, _traceLevels); _transceiver->write(*os, _endpoint->timeout()); @@ -788,6 +801,14 @@ IceInternal::Connection::flushBatchRequest() try { + IceUtil::Mutex::Lock sendSync(_sendMutex); + if(!_threadPool) // Has the transceiver already been closed? + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_exception.get()); + _exception->ice_throw(); + } + // // Fill in the number of requests in the batch. // @@ -824,7 +845,6 @@ IceInternal::Connection::flushBatchRequest() // // Send the batch request. // - IceUtil::Mutex::Lock sendSync(_sendMutex); _batchStream.i = _batchStream.b.begin(); traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); cstream.i = cstream.b.begin(); @@ -846,7 +866,6 @@ IceInternal::Connection::flushBatchRequest() // // Send the batch request. // - IceUtil::Mutex::Lock sendSync(_sendMutex); _batchStream.i = _batchStream.b.begin(); traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); _transceiver->write(_batchStream, _endpoint->timeout()); @@ -900,6 +919,14 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) { try { + IceUtil::Mutex::Lock sendSync(_sendMutex); + if(!_threadPool) // Has the transceiver already been closed? + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_exception.get()); + _exception->ice_throw(); + } + bool compress; if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { @@ -926,7 +953,6 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) // // Send the reply. // - IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceReply("sending reply", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); @@ -948,7 +974,6 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) // // Send the reply. // - IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceReply("sending reply", *os, _logger, _traceLevels); _transceiver->write(*os, _endpoint->timeout()); @@ -1419,6 +1444,12 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) } else if(_state == StateClosed) { + // + // We must make sure that nobody is sending when we close + // the transceiver. + // + IceUtil::Mutex::Lock sendSync(_sendMutex); + try { _transceiver->close(); @@ -1723,6 +1754,12 @@ IceInternal::Connection::setState(State state) if(_state == StateNotValidated) { assert(!_registeredWithPool); + + // + // We must make sure that nobody is sending when we + // close the transceiver. + // + IceUtil::Mutex::Lock sendSync(_sendMutex); try { @@ -1735,6 +1772,7 @@ IceInternal::Connection::setState(State state) assert(_threadPool); _threadPool = 0; // We don't need the thread pool anymore. + //notifyAll(); // We notify already below. } else { @@ -1754,11 +1792,6 @@ IceInternal::Connection::setState(State state) { try { - // - // Locking of _sendMutex is not necessary here, because if - // we are in closing state, there are no sending threads - // anymore. - // initiateShutdown(); } catch(const LocalException& ex) @@ -1776,6 +1809,9 @@ IceInternal::Connection::initiateShutdown() const if(!_endpoint->datagram()) { + IceUtil::Mutex::Lock sendSync(_sendMutex); + assert(_threadPool); // The transceiver cannot be closed already. + // // Before we shut down, we send a close connection message. // @@ -1792,7 +1828,6 @@ IceInternal::Connection::initiateShutdown() const // // Send the message. // - IceUtil::Mutex::Lock sendSync(_sendMutex); os.i = os.b.begin(); traceHeader("sending close connection", os, _logger, _traceLevels); _transceiver->write(os, _endpoint->timeout()); |