diff options
author | Marc Laukien <marc@zeroc.com> | 2004-01-23 17:07:18 +0000 |
---|---|---|
committer | Marc Laukien <marc@zeroc.com> | 2004-01-23 17:07:18 +0000 |
commit | 19f638094baa01b7a776cad39345aef167ff7f3e (patch) | |
tree | 818afb5e0101c0b1dd10c693d5fe20081f5118f1 /cpp/src/Ice/Connection.cpp | |
parent | for Matthew (diff) | |
download | ice-19f638094baa01b7a776cad39345aef167ff7f3e.tar.bz2 ice-19f638094baa01b7a776cad39345aef167ff7f3e.tar.xz ice-19f638094baa01b7a776cad39345aef167ff7f3e.zip |
added sendMutex
Diffstat (limited to 'cpp/src/Ice/Connection.cpp')
-rw-r--r-- | cpp/src/Ice/Connection.cpp | 461 |
1 files changed, 244 insertions, 217 deletions
diff --git a/cpp/src/Ice/Connection.cpp b/cpp/src/Ice/Connection.cpp index 683237d9466..f684f3525b2 100644 --- a/cpp/src/Ice/Connection.cpp +++ b/cpp/src/Ice/Connection.cpp @@ -70,6 +70,8 @@ IceInternal::Connection::validate() os.write(validateConnectionMsg); os.write((Byte)1); // Compression status. os.write(headerSize); // Message size. + + IceUtil::Mutex::Lock sendSync(_sendMutex); os.i = os.b.begin(); traceHeader("sending validate connection", os, _logger, _traceLevels); _transceiver->write(os, _endpoint->timeout()); @@ -240,25 +242,22 @@ IceInternal::Connection::destroy(DestructionReason reason) bool IceInternal::Connection::isValidated() const { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock sync(_queryMutex); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); return _state > StateNotValidated; } bool IceInternal::Connection::isDestroyed() const { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock sync(_queryMutex); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); return _state >= StateClosing; } bool IceInternal::Connection::isFinished() const { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock sync(_queryMutex); - return _transceiver == 0 && _dispatchCount == 0; + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + return _threadPool == 0 && _dispatchCount == 0; } void @@ -293,7 +292,7 @@ IceInternal::Connection::waitUntilFinished() // Now we must wait for connection closure. If there is a timeout, // we force the connection closure. // - while(_transceiver) + while(_threadPool) { if(_state != StateClosed && _endpoint->timeout() >= 0) { @@ -407,16 +406,52 @@ IceInternal::Connection::prepareRequest(BasicStream* os) void IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + Int requestId; - if(_exception.get()) { - _exception->ice_throw(); - } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - Int requestId; + if(_exception.get()) + { + // + // Only raise an exception if this is a datagram or oneway + // call. For twoway calls, the exception will be provided + // using finished(). + // + if(_endpoint->datagram() || oneway) + { + _exception->ice_throw(); + } + else + { + out->finished(*_exception.get()); + return; + } + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Only add to the request map if this is a twoway call. + // + if(!_endpoint->datagram() && !oneway) + { + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } + } try { @@ -427,12 +462,6 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) // if(!_endpoint->datagram() && !oneway) { - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } const Byte* p = reinterpret_cast<const Byte*>(&requestId); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); @@ -467,6 +496,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) // // Send the request. // + IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceRequest("sending request", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); @@ -488,6 +518,7 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) // // Send the request. // + IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceRequest("sending request", *os, _logger, _traceLevels); _transceiver->write(*os, _endpoint->timeout()); @@ -495,39 +526,56 @@ IceInternal::Connection::sendRequest(Outgoing* out, bool oneway) } catch(const LocalException& ex) { + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); - _exception->ice_throw(); - } - - // - // Only add to the request map if there was no exception, and if - // the operation is not oneway. - // - if(!_endpoint->datagram() && !oneway) - { - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); - } - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + // + // Only raise an exception if this is a datagram or oneway + // call. For twoway calls, the exception will be provided + // using the finished() callbacks. + // + if(_endpoint->datagram() || oneway) + { + _exception->ice_throw(); + } } } void IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + assert(!_endpoint->datagram()); // Async requests are always twoway. + + Int requestId; - if(_exception.get()) { - _exception->ice_throw(); - } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - Int requestId; + if(_exception.get()) + { + out->__finished(*_exception.get()); + return; + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), + pair<const Int, OutgoingAsyncPtr>(requestId, out)); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } + } try { @@ -536,12 +584,6 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) // // Fill in the request ID. // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } const Byte* p = reinterpret_cast<const Byte*>(&requestId); #ifdef ICE_BIG_ENDIAN reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); @@ -575,6 +617,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) // // Send the request. // + IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceRequest("sending asynchronous request", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); @@ -596,6 +639,7 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) // // Send the request. // + IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceRequest("sending asynchronous request", *os, _logger, _traceLevels); _transceiver->write(*os, _endpoint->timeout()); @@ -603,20 +647,9 @@ IceInternal::Connection::sendAsyncRequest(const OutgoingAsyncPtr& out) } catch(const LocalException& ex) { + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); - _exception->ice_throw(); - } - - // - // Only add to the request map if there was no exception. - // - _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), - pair<const Int, OutgoingAsyncPtr>(requestId, out)); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); } } @@ -681,134 +714,140 @@ IceInternal::Connection::abortBatchRequest() void IceInternal::Connection::flushBatchRequest() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + BasicStream batchStream(_instance.get()); - if(_exception.get()) { - _exception->ice_throw(); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + if(_exception.get()) + { + // + // Since batch requests are all oneways (or datagrams), we + // must report the exception to the caller. + // + _exception->ice_throw(); + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + if(_batchStream.b.empty()) + { + return; // Nothing to do. + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } + + // + // Reset _batchStream and _batchRequestNum, so that new batch + // messages can be sent. + // + _batchStream.swap(batchStream); + assert(_batchStream.b.empty()); + _batchRequestNum = 0; } - assert(_state > StateNotValidated); - assert(_state < StateClosing); - if(!_batchStream.b.empty()) + assert(!batchStream.b.empty()); + + try { - try + batchStream.i = batchStream.b.begin(); + + // + // Fill in the number of requests in the batch. + // + const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), batchStream.b.begin() + headerSize); +#endif + + bool compress; + if(batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. + { + compress = false; + } + else + { + compress = _endpoint->compress(); + } + + if(compress) { - _batchStream.i = _batchStream.b.begin(); + // + // Set compression status. + // + batchStream.b[9] = 2; // Message is compressed. // - // Fill in the number of requests in the batch. + // Do compression. // - const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); -#endif - - bool compress; - if(_batchStream.b.size() < 100) // Don't compress if message size is smaller than 100 bytes. - { - compress = false; - } - else - { - compress = _endpoint->compress(); - } + BasicStream cstream(_instance.get()); + doCompress(batchStream, cstream); - if(compress) - { - // - // Set compression status. - // - _batchStream.b[9] = 2; // Message is compressed. - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(_batchStream, cstream); - - // - // Send the batch request. - // - _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); - } - else - { - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(_batchStream.b.size()); - p = reinterpret_cast<const Byte*>(&sz); + // + // Send the batch request. + // + IceUtil::Mutex::Lock sendSync(_sendMutex); + batchStream.i = batchStream.b.begin(); + traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + // + // No compression, just fill in the message size. + // + Int sz = static_cast<Int>(batchStream.b.size()); + p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); + reverse_copy(p, p + sizeof(Int), batchStream.b.begin() + 10); #else - copy(p, p + sizeof(Int), _batchStream.b.begin() + 10); + copy(p, p + sizeof(Int), batchStream.b.begin() + 10); #endif - - // - // Send the batch request. - // - _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - _transceiver->write(_batchStream, _endpoint->timeout()); - } // - // Reset _batchStream and _batchRequestNum, so that new batch - // messages can be sent. + // Send the batch request. // - BasicStream dummy(_instance.get()); - _batchStream.swap(dummy); - assert(_batchStream.b.empty()); - _batchRequestNum = 0; - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - assert(_exception.get()); - _exception->ice_throw(); - } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + IceUtil::Mutex::Lock sendSync(_sendMutex); + batchStream.i = batchStream.b.begin(); + traceBatchRequest("sending batch request", batchStream, _logger, _traceLevels); + _transceiver->write(batchStream, _endpoint->timeout()); } } - - if(_proxyCount == 0 && !_adapter && closingOK()) + catch(const LocalException& ex) { - setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + setState(StateClosed, ex); + assert(_exception.get()); + + // + // Since batch requests are all oneways (or datagrams), we + // must report the exception to the caller. + // + _exception->ice_throw(); + } + + { + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + if(_proxyCount == 0 && !_adapter && closingOK()) + { + setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + } } } void IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - try { - { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock s(_queryMutex); - --_dispatchCount; - } - - if(_dispatchCount == 0) - { - notifyAll(); - } - - if(_state == StateClosed) - { - return; - } - bool compress; if(os->b.size() < 100) // Don't compress if message size is smaller than 100 bytes. { @@ -825,7 +864,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) // Set compression status. // os->b[9] = 2; // Message is compressed. - + // // Do compression. // @@ -835,6 +874,7 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) // // Send the reply. // + IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceReply("sending reply", *os, _logger, _traceLevels); cstream.i = cstream.b.begin(); @@ -856,24 +896,42 @@ IceInternal::Connection::sendResponse(BasicStream* os, Byte compressFlag) // // Send the reply. // + IceUtil::Mutex::Lock sendSync(_sendMutex); os->i = os->b.begin(); traceReply("sending reply", *os, _logger, _traceLevels); _transceiver->write(*os, _endpoint->timeout()); } - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } } catch(const LocalException& ex) { + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); setState(StateClosed, ex); } - if(_acmTimeout > 0) { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); + + try + { + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } } } @@ -884,22 +942,11 @@ IceInternal::Connection::sendNoResponse() try { - { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock s(_queryMutex); - --_dispatchCount; - } - - if(_dispatchCount == 0) + if(--_dispatchCount == 0) { notifyAll(); } - if(_state == StateClosed) - { - return; - } - if(_state == StateClosing && _dispatchCount == 0) { initiateShutdown(); @@ -968,10 +1015,7 @@ IceInternal::Connection::readable() const void IceInternal::Connection::read(BasicStream& stream) { - if(_transceiver) - { - _transceiver->read(stream, 0); - } + _transceiver->read(stream, 0); // // Updating _acmAbsoluteTimeout is too expensive here, because we @@ -1102,11 +1146,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa traceRequest("received request", stream, _logger, _traceLevels); stream.read(requestId); invoke = 1; - { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock s(_queryMutex); - ++_dispatchCount; - } + ++_dispatchCount; } break; } @@ -1127,11 +1167,7 @@ IceInternal::Connection::message(BasicStream& stream, const ThreadPoolPtr& threa { throw NegativeSizeException(__FILE__, __LINE__); } - { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock s(_queryMutex); - _dispatchCount += invoke; - } + _dispatchCount += invoke; } break; } @@ -1340,12 +1376,6 @@ IceInternal::Connection::finished(const ThreadPoolPtr& threadPool) closeException = auto_ptr<LocalException>(dynamic_cast<LocalException*>(ex.ice_clone())); } - { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock s(_queryMutex); - _transceiver = 0; - } - _threadPool = 0; // We don't need the thread pool anymore. notifyAll(); } @@ -1387,7 +1417,6 @@ string IceInternal::Connection::toString() const { IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(*this); - assert(_transceiver); return _transceiver->toString(); } @@ -1485,10 +1514,8 @@ IceInternal::Connection::Connection(const InstancePtr& instance, IceInternal::Connection::~Connection() { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock sync(_queryMutex); assert(_state == StateClosed); - assert(!_transceiver); + assert(!_threadPool); assert(_dispatchCount == 0); assert(_proxyCount == 0); } @@ -1652,12 +1679,6 @@ IceInternal::Connection::setState(State state) // Here we ignore any exceptions in close(). } - { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock sync(_queryMutex); - _transceiver = 0; - } - _threadPool = 0; // We don't need the thread pool anymore. } else @@ -1669,12 +1690,8 @@ IceInternal::Connection::setState(State state) } } - { - // See _queryMutex comment in header file. - IceUtil::Mutex::Lock sync(_queryMutex); - _state = state; - _stateTime = IceUtil::Time::now(); - } + _state = state; + _stateTime = IceUtil::Time::now(); notifyAll(); @@ -1682,6 +1699,11 @@ IceInternal::Connection::setState(State state) { try { + // + // Locking of _sendMutex is not necessary here, because if + // we are in closing state, there are no sending threads + // anymore. + // initiateShutdown(); } catch(const LocalException& ex) @@ -1711,6 +1733,11 @@ IceInternal::Connection::initiateShutdown() const os.write(closeConnectionMsg); os.write((Byte)1); // Compression status: compression supported but not used. os.write(headerSize); // Message size. + + // + // Send the message. + // + IceUtil::Mutex::Lock sendSync(_sendMutex); os.i = os.b.begin(); traceHeader("sending close connection", os, _logger, _traceLevels); _transceiver->write(os, _endpoint->timeout()); |