diff options
Diffstat (limited to 'cpp/src/Ice/ConnectionI.cpp')
-rw-r--r-- | cpp/src/Ice/ConnectionI.cpp | 3414 |
1 files changed, 1707 insertions, 1707 deletions
diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index d832f87a886..22b4c28cf23 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -41,189 +41,189 @@ Ice::ConnectionI::validate() if(!_endpoint->datagram()) // Datagram connections are always implicitly validated. { - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(_thread && _thread->getThreadControl() != IceUtil::ThreadControl()) - { - // - // In thread per connection mode, this connection's thread - // will take care of connection validation. Therefore all we - // have to do here is to wait until this thread has completed - // validation. - // - while(_state == StateNotValidated) - { - wait(); - } - - if(_state >= StateClosing) - { - assert(_exception.get()); - _exception->ice_throw(); - } - - return; - } - - // - // The connection might already be closed (e.g.: the communicator - // was destroyed or object adapter deactivated.) - // - assert(_state == StateNotValidated || _state == StateClosed); - if(_state == StateClosed) - { - assert(_exception.get()); - _exception->ice_throw(); - } - - if(_adapter) - { - active = true; // The server side has the active role for connection validation. - } - else - { - active = false; // The client side has the passive role for connection validation. - } - } - - try - { - Int timeout; - if(_instance->defaultsAndOverrides()->overrideConnectTimeout) - { - timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; - } - else - { - timeout = _endpoint->timeout(); - } - - if(active) - { - IceUtil::Mutex::Lock sendSync(_sendMutex); - - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } - - BasicStream os(_instance.get()); - os.write(magic[0]); - os.write(magic[1]); - os.write(magic[2]); - os.write(magic[3]); - os.write(protocolMajor); - os.write(protocolMinor); - os.write(encodingMajor); - os.write(encodingMinor); - os.write(validateConnectionMsg); - os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). - os.write(headerSize); // Message size. - os.i = os.b.begin(); - traceHeader("sending validate connection", os, _logger, _traceLevels); - try - { - _transceiver->initialize(timeout); - _transceiver->write(os, timeout); - } - catch(const TimeoutException&) - { - throw ConnectTimeoutException(__FILE__, __LINE__); - } - } - else - { - BasicStream is(_instance.get()); - is.b.resize(headerSize); - is.i = is.b.begin(); - try - { - _transceiver->initialize(timeout); - _transceiver->read(is, timeout); - } - catch(const TimeoutException&) - { - throw ConnectTimeoutException(__FILE__, __LINE__); - } - assert(is.i == is.b.end()); - is.i = is.b.begin(); - Byte m[4]; - is.read(m[0]); - is.read(m[1]); - is.read(m[2]); - is.read(m[3]); - if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) - { - BadMagicException ex(__FILE__, __LINE__); - ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic)); - throw ex; - } - Byte pMajor; - Byte pMinor; - is.read(pMajor); - is.read(pMinor); - if(pMajor != protocolMajor) - { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(pMajor); - ex.badMinor = static_cast<unsigned char>(pMinor); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; - } - Byte eMajor; - Byte eMinor; - is.read(eMajor); - is.read(eMinor); - if(eMajor != encodingMajor) - { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(eMajor); - ex.badMinor = static_cast<unsigned char>(eMinor); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; - } - Byte messageType; - is.read(messageType); - if(messageType != validateConnectionMsg) - { - throw ConnectionNotValidatedException(__FILE__, __LINE__); - } + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_thread && _thread->getThreadControl() != IceUtil::ThreadControl()) + { + // + // In thread per connection mode, this connection's thread + // will take care of connection validation. Therefore all we + // have to do here is to wait until this thread has completed + // validation. + // + while(_state == StateNotValidated) + { + wait(); + } + + if(_state >= StateClosing) + { + assert(_exception.get()); + _exception->ice_throw(); + } + + return; + } + + // + // The connection might already be closed (e.g.: the communicator + // was destroyed or object adapter deactivated.) + // + assert(_state == StateNotValidated || _state == StateClosed); + if(_state == StateClosed) + { + assert(_exception.get()); + _exception->ice_throw(); + } + + if(_adapter) + { + active = true; // The server side has the active role for connection validation. + } + else + { + active = false; // The client side has the passive role for connection validation. + } + } + + try + { + Int timeout; + if(_instance->defaultsAndOverrides()->overrideConnectTimeout) + { + timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; + } + else + { + timeout = _endpoint->timeout(); + } + + if(active) + { + IceUtil::Mutex::Lock sendSync(_sendMutex); + + if(!_transceiver) // Has the transceiver already been closed? + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } + + BasicStream os(_instance.get()); + os.write(magic[0]); + os.write(magic[1]); + os.write(magic[2]); + os.write(magic[3]); + os.write(protocolMajor); + os.write(protocolMinor); + os.write(encodingMajor); + os.write(encodingMinor); + os.write(validateConnectionMsg); + os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). + os.write(headerSize); // Message size. + os.i = os.b.begin(); + traceHeader("sending validate connection", os, _logger, _traceLevels); + try + { + _transceiver->initialize(timeout); + _transceiver->write(os, timeout); + } + catch(const TimeoutException&) + { + throw ConnectTimeoutException(__FILE__, __LINE__); + } + } + else + { + BasicStream is(_instance.get()); + is.b.resize(headerSize); + is.i = is.b.begin(); + try + { + _transceiver->initialize(timeout); + _transceiver->read(is, timeout); + } + catch(const TimeoutException&) + { + throw ConnectTimeoutException(__FILE__, __LINE__); + } + assert(is.i == is.b.end()); + is.i = is.b.begin(); + Byte m[4]; + is.read(m[0]); + is.read(m[1]); + is.read(m[2]); + is.read(m[3]); + if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) + { + BadMagicException ex(__FILE__, __LINE__); + ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(magic)); + throw ex; + } + Byte pMajor; + Byte pMinor; + is.read(pMajor); + is.read(pMinor); + if(pMajor != protocolMajor) + { + UnsupportedProtocolException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(pMajor); + ex.badMinor = static_cast<unsigned char>(pMinor); + ex.major = static_cast<unsigned char>(protocolMajor); + ex.minor = static_cast<unsigned char>(protocolMinor); + throw ex; + } + Byte eMajor; + Byte eMinor; + is.read(eMajor); + is.read(eMinor); + if(eMajor != encodingMajor) + { + UnsupportedEncodingException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(eMajor); + ex.badMinor = static_cast<unsigned char>(eMinor); + ex.major = static_cast<unsigned char>(encodingMajor); + ex.minor = static_cast<unsigned char>(encodingMinor); + throw ex; + } + Byte messageType; + is.read(messageType); + if(messageType != validateConnectionMsg) + { + throw ConnectionNotValidatedException(__FILE__, __LINE__); + } Byte compress; is.read(compress); // Ignore compression status for validate connection. - Int size; - is.read(size); - if(size != headerSize) - { - throw IllegalMessageSizeException(__FILE__, __LINE__); - } - traceHeader("received validate connection", is, _logger, _traceLevels); - } - } - catch(const LocalException& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); - _exception->ice_throw(); - } - } - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); - } - - // - // We start out in holding state. - // - setState(StateHolding); + Int size; + is.read(size); + if(size != headerSize) + { + throw IllegalMessageSizeException(__FILE__, __LINE__); + } + traceHeader("received validate connection", is, _logger, _traceLevels); + } + } + catch(const LocalException& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + assert(_exception.get()); + _exception->ice_throw(); + } + } + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } + + // + // We start out in holding state. + // + setState(StateHolding); } } @@ -234,7 +234,7 @@ Ice::ConnectionI::activate() while(_state == StateNotValidated) { - wait(); + wait(); } setState(StateActive); @@ -247,7 +247,7 @@ Ice::ConnectionI::hold() while(_state == StateNotValidated) { - wait(); + wait(); } setState(StateHolding); @@ -260,17 +260,17 @@ Ice::ConnectionI::destroy(DestructionReason reason) switch(reason) { - case ObjectAdapterDeactivated: - { - setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); - break; - } + case ObjectAdapterDeactivated: + { + setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); + break; + } - case CommunicatorDestroyed: - { - setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); - break; - } + case CommunicatorDestroyed: + { + setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); + break; + } } } @@ -281,23 +281,23 @@ Ice::ConnectionI::close(bool force) if(force) { - setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); + setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); } else { - // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, the - // CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the server - // has processed them or not. - // - while(!_requests.empty() || !_asyncRequests.empty()) - { - wait(); - } - - setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + // + // If we do a graceful shutdown, then we wait until all + // outstanding requests have been completed. Otherwise, the + // CloseConnectionException will cause all outstanding + // requests to be retried, regardless of whether the server + // has processed them or not. + // + while(!_requests.empty() || !_asyncRequests.empty()) + { + wait(); + } + + setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } } @@ -320,37 +320,37 @@ Ice::ConnectionI::isFinished() const IceUtil::ThreadPtr threadPerConnection; { - // - // We can use trylock here, because as long as there are still - // threads operating in this connection object, connection - // destruction is considered as not yet finished. - // - IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); - - if(!sync.acquired()) - { - return false; - } + // + // We can use trylock here, because as long as there are still + // threads operating in this connection object, connection + // destruction is considered as not yet finished. + // + IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); + + if(!sync.acquired()) + { + return false; + } - if(_transceiver || _dispatchCount != 0) - { - return false; - } + if(_transceiver || _dispatchCount != 0) + { + return false; + } - if(_thread && _thread->isAlive()) - { - return false; - } + if(_thread && _thread->isAlive()) + { + return false; + } - assert(_state == StateClosed); + assert(_state == StateClosed); - threadPerConnection = _thread; - _thread = 0; + threadPerConnection = _thread; + _thread = 0; } if(threadPerConnection) { - threadPerConnection->getThreadControl().join(); + threadPerConnection->getThreadControl().join(); } return true; @@ -363,8 +363,8 @@ Ice::ConnectionI::throwException() const if(_exception.get()) { - assert(_state >= StateClosing); - _exception->ice_throw(); + assert(_state >= StateClosing); + _exception->ice_throw(); } } @@ -375,7 +375,7 @@ Ice::ConnectionI::waitUntilHolding() const while(_state < StateHolding || _dispatchCount > 0) { - wait(); + wait(); } } @@ -385,76 +385,76 @@ Ice::ConnectionI::waitUntilFinished() IceUtil::ThreadPtr threadPerConnection; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee - // that there are no outstanding calls when deactivate() is - // called on the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) - { - wait(); - } - - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver) - { - if(_state != StateClosed && _endpoint->timeout() >= 0) - { - IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); - IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(); - - if(waitTime > IceUtil::Time()) - { - // - // We must wait a bit longer until we close this - // connection. - // - if(!timedWait(waitTime)) - { - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); - } - } - else - { - // - // We already waited long enough, so let's close this - // connection! - // - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); - } - - // - // No return here, we must still wait until close() is - // called on the _transceiver. - // - } - else - { - wait(); - } - } - - assert(_state == StateClosed); - - threadPerConnection = _thread; - _thread = 0; - - // - // Clear the OA. See bug 1673 for the details of why this is necessary. - // - _adapter = 0; + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // We wait indefinitely until connection closing has been + // initiated. We also wait indefinitely until all outstanding + // requests are completed. Otherwise we couldn't guarantee + // that there are no outstanding calls when deactivate() is + // called on the servant locators. + // + while(_state < StateClosing || _dispatchCount > 0) + { + wait(); + } + + // + // Now we must wait until close() has been called on the + // transceiver. + // + while(_transceiver) + { + if(_state != StateClosed && _endpoint->timeout() >= 0) + { + IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); + IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(); + + if(waitTime > IceUtil::Time()) + { + // + // We must wait a bit longer until we close this + // connection. + // + if(!timedWait(waitTime)) + { + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + } + } + else + { + // + // We already waited long enough, so let's close this + // connection! + // + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + } + + // + // No return here, we must still wait until close() is + // called on the _transceiver. + // + } + else + { + wait(); + } + } + + assert(_state == StateClosed); + + threadPerConnection = _thread; + _thread = 0; + + // + // Clear the OA. See bug 1673 for the details of why this is necessary. + // + _adapter = 0; } if(threadPerConnection) { - threadPerConnection->getThreadControl().join(); + threadPerConnection->getThreadControl().join(); } } @@ -465,12 +465,12 @@ Ice::ConnectionI::monitor() if(!sync.acquired()) { - return; + return; } if(_state != StateActive) { - return; + return; } // @@ -478,11 +478,11 @@ Ice::ConnectionI::monitor() // for(map<Int, AsyncRequest>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) { - if(p->second.t > IceUtil::Time() && p->second.t <= IceUtil::Time::now()) - { - setState(StateClosed, TimeoutException(__FILE__, __LINE__)); - return; - } + if(p->second.t > IceUtil::Time() && p->second.t <= IceUtil::Time::now()) + { + setState(StateClosed, TimeoutException(__FILE__, __LINE__)); + return; + } } // @@ -493,11 +493,11 @@ Ice::ConnectionI::monitor() !_batchStreamInUse && _batchStream.b.empty() && _dispatchCount == 0) { - if(IceUtil::Time::now() >= _acmAbsoluteTimeout) - { - setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); - return; - } + if(IceUtil::Time::now() >= _acmAbsoluteTimeout) + { + setState(StateClosing, ConnectionTimeoutException(__FILE__, __LINE__)); + return; + } } } @@ -507,162 +507,162 @@ Ice::ConnectionI::sendRequest(BasicStream* os, Outgoing* out, bool compress) Int requestId; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(!(out && _endpoint->datagram())); // Twoway requests cannot be datagrams. - - if(_exception.get()) - { - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw LocalExceptionWrapper(*_exception.get(), true); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Only add to the request map if this is a twoway call. - // - if(out) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - const Byte* p = reinterpret_cast<const Byte*>(&requestId); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + assert(!(out && _endpoint->datagram())); // Twoway requests cannot be datagrams. + + if(_exception.get()) + { + // + // If the connection is closed before we even have a chance + // to send our request, we always try to send the request + // again. + // + throw LocalExceptionWrapper(*_exception.get(), true); + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + // + // Only add to the request map if this is a twoway call. + // + if(out) + { + // + // Create a new unique request ID. + // + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + const Byte* p = reinterpret_cast<const Byte*>(&requestId); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); + reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); #else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); + copy(p, p + sizeof(Int), os->b.begin() + headerSize); #endif - // - // Add to the requests map. - // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); - } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); - } + // + // Add to the requests map. + // + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } } try { - IceUtil::Mutex::Lock sendSync(_sendMutex); - - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } - - if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes. - { - // - // Message compressed. Request compressed response, if any. - // - os->b[9] = 2; - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(*os, cstream); - - // - // Send the request. - // - os->i = os->b.begin(); - traceRequest("sending request", *os, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); - } - else - { - if(compress) - { - // - // Message not compressed. Request compressed response, if any. - // - os->b[9] = 1; - } - - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(os->b.size()); - const Byte* p = reinterpret_cast<const Byte*>(&sz); + IceUtil::Mutex::Lock sendSync(_sendMutex); + + if(!_transceiver) // Has the transceiver already been closed? + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } + + if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes. + { + // + // Message compressed. Request compressed response, if any. + // + os->b[9] = 2; + + // + // Do compression. + // + BasicStream cstream(_instance.get()); + doCompress(*os, cstream); + + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending request", *os, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + if(compress) + { + // + // Message not compressed. Request compressed response, if any. + // + os->b[9] = 1; + } + + // + // No compression, just fill in the message size. + // + Int sz = static_cast<Int>(os->b.size()); + const Byte* p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); + reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); #else - copy(p, p + sizeof(Int), os->b.begin() + 10); + copy(p, p + sizeof(Int), os->b.begin() + 10); #endif - - // - // Send the request. - // - os->i = os->b.begin(); - traceRequest("sending request", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); - } + + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending request", *os, _logger, _traceLevels); + _transceiver->write(*os, _endpoint->timeout()); + } } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); - - if(out) - { - // - // If the request has already been removed from the - // request map, we are out of luck. It would mean that - // finished() has been called already, and therefore the - // exception has been set using the Outgoing::finished() - // callback. In this case, we cannot throw the exception - // here, because we must not both raise an exception and - // have Outgoing::finished() called with an - // exception. This means that in some rare cases, a - // request will not be retried even though it could. But I - // honestly don't know how I could avoid this, without a - // very elaborate and complex design, which would be bad - // for performance. - // - map<Int, Outgoing*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) - { - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - - _exception->ice_throw(); - } - } - else - { - _exception->ice_throw(); - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + assert(_exception.get()); + + if(out) + { + // + // If the request has already been removed from the + // request map, we are out of luck. It would mean that + // finished() has been called already, and therefore the + // exception has been set using the Outgoing::finished() + // callback. In this case, we cannot throw the exception + // here, because we must not both raise an exception and + // have Outgoing::finished() called with an + // exception. This means that in some rare cases, a + // request will not be retried even though it could. But I + // honestly don't know how I could avoid this, without a + // very elaborate and complex design, which would be bad + // for performance. + // + map<Int, Outgoing*>::iterator p = _requests.find(requestId); + if(p != _requests.end()) + { + if(p == _requestsHint) + { + _requests.erase(p++); + _requestsHint = p; + } + else + { + _requests.erase(p); + } + + _exception->ice_throw(); + } + } + else + { + _exception->ice_throw(); + } } } @@ -672,156 +672,156 @@ Ice::ConnectionI::sendAsyncRequest(BasicStream* os, const OutgoingAsyncPtr& out, Int requestId; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams, and async implies twoway. - - if(_exception.get()) - { - // - // If the exception is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw LocalExceptionWrapper(*_exception.get(), true); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - const Byte* p = reinterpret_cast<const Byte*>(&requestId); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + assert(!_endpoint->datagram()); // Twoway requests cannot be datagrams, and async implies twoway. + + if(_exception.get()) + { + // + // If the exception is closed before we even have a chance + // to send our request, we always try to send the request + // again. + // + throw LocalExceptionWrapper(*_exception.get(), true); + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + // + // Create a new unique request ID. + // + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + const Byte* p = reinterpret_cast<const Byte*>(&requestId); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); + reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); #else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); + copy(p, p + sizeof(Int), os->b.begin() + headerSize); #endif - - // - // Add to the async requests map. - // - struct AsyncRequest asyncRequest; - asyncRequest.p = out; - if(_endpoint->timeout() > 0) - { - asyncRequest.t = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_endpoint->timeout()); - } - _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), - pair<const Int, AsyncRequest>(requestId, asyncRequest)); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); - } + + // + // Add to the async requests map. + // + struct AsyncRequest asyncRequest; + asyncRequest.p = out; + if(_endpoint->timeout() > 0) + { + asyncRequest.t = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_endpoint->timeout()); + } + _asyncRequestsHint = _asyncRequests.insert(_asyncRequests.end(), + pair<const Int, AsyncRequest>(requestId, asyncRequest)); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } } try { - IceUtil::Mutex::Lock sendSync(_sendMutex); - - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } - - if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes. - { - // - // Message compressed. Request compressed response, if any. - // - os->b[9] = 2; - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(*os, cstream); - - // - // Send the request. - // - os->i = os->b.begin(); - traceRequest("sending asynchronous request", *os, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); - } - else - { - if(compress) - { - // - // Message not compressed. Request compressed response, if any. - // - os->b[9] = 1; - } - - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(os->b.size()); - const Byte* p = reinterpret_cast<const Byte*>(&sz); + IceUtil::Mutex::Lock sendSync(_sendMutex); + + if(!_transceiver) // Has the transceiver already been closed? + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } + + if(compress && os->b.size() >= 100) // Only compress messages larger than 100 bytes. + { + // + // Message compressed. Request compressed response, if any. + // + os->b[9] = 2; + + // + // Do compression. + // + BasicStream cstream(_instance.get()); + doCompress(*os, cstream); + + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending asynchronous request", *os, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + if(compress) + { + // + // Message not compressed. Request compressed response, if any. + // + os->b[9] = 1; + } + + // + // No compression, just fill in the message size. + // + Int sz = static_cast<Int>(os->b.size()); + const Byte* p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); + reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); #else - copy(p, p + sizeof(Int), os->b.begin() + 10); + copy(p, p + sizeof(Int), os->b.begin() + 10); #endif - // - // Send the request. - // - os->i = os->b.begin(); - traceRequest("sending asynchronous request", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); - } + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending asynchronous request", *os, _logger, _traceLevels); + _transceiver->write(*os, _endpoint->timeout()); + } } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); - - // - // If the request has already been removed from the async - // request map, we are out of luck. It would mean that - // finished() has been called already, and therefore the - // exception has been set using the - // OutgoingAsync::__finished() callback. In this case, we - // cannot throw the exception here, because we must not both - // raise an exception and have OutgoingAsync::__finished() - // called with an exception. This means that in some rare - // cases, a request will not be retried even though it - // could. But I honestly don't know how I could avoid this, - // without a very elaborate and complex design, which would be - // bad for performance. - // - map<Int, AsyncRequest>::iterator p = _asyncRequests.find(requestId); - if(p != _asyncRequests.end()) - { - if(p == _asyncRequestsHint) - { - _asyncRequests.erase(p++); - _asyncRequestsHint = p; - } - else - { - _asyncRequests.erase(p); - } - - _exception->ice_throw(); - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + assert(_exception.get()); + + // + // If the request has already been removed from the async + // request map, we are out of luck. It would mean that + // finished() has been called already, and therefore the + // exception has been set using the + // OutgoingAsync::__finished() callback. In this case, we + // cannot throw the exception here, because we must not both + // raise an exception and have OutgoingAsync::__finished() + // called with an exception. This means that in some rare + // cases, a request will not be retried even though it + // could. But I honestly don't know how I could avoid this, + // without a very elaborate and complex design, which would be + // bad for performance. + // + map<Int, AsyncRequest>::iterator p = _asyncRequests.find(requestId); + if(p != _asyncRequests.end()) + { + if(p == _asyncRequestsHint) + { + _asyncRequests.erase(p++); + _asyncRequestsHint = p; + } + else + { + _asyncRequests.erase(p); + } + + _exception->ice_throw(); + } } } @@ -835,12 +835,12 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os) // while(_batchStreamInUse && !_exception.get()) { - wait(); + wait(); } if(_exception.get()) { - _exception->ice_throw(); + _exception->ice_throw(); } assert(_state > StateNotValidated); @@ -848,15 +848,15 @@ Ice::ConnectionI::prepareBatchRequest(BasicStream* os) if(_batchStream.b.empty()) { - try - { - _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - ex.ice_throw(); - } + try + { + _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + ex.ice_throw(); + } } _batchStreamInUse = true; @@ -884,41 +884,41 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) _batchStream.swap(*os); if(_batchAutoFlush) - { - IceUtil::Mutex::Lock sendSync(_sendMutex); - if(!_transceiver) - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } - - // - // Throw memory limit exception if the first message added causes us to - // go over limit. Otherwise put aside the marshalled message that caused - // limit to be exceeded and rollback stream to the marker. - // - try - { - _transceiver->checkSendSize(_batchStream, _instance->messageSizeMax()); - } - catch(const Ice::Exception&) - { - if(_batchRequestNum == 0) - { - resetBatch(true); - throw; - } - vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest); - _batchStream.b.resize(_batchMarker); - autoflush = true; - } - } - - if(!autoflush) - { - // - // Increment the number of requests in the batch. - // + { + IceUtil::Mutex::Lock sendSync(_sendMutex); + if(!_transceiver) + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } + + // + // Throw memory limit exception if the first message added causes us to + // go over limit. Otherwise put aside the marshalled message that caused + // limit to be exceeded and rollback stream to the marker. + // + try + { + _transceiver->checkSendSize(_batchStream, _instance->messageSizeMax()); + } + catch(const Ice::Exception&) + { + if(_batchRequestNum == 0) + { + resetBatch(true); + throw; + } + vector<Ice::Byte>(_batchStream.b.begin() + _batchMarker, _batchStream.b.end()).swap(lastRequest); + _batchStream.b.resize(_batchMarker); + autoflush = true; + } + } + + if(!autoflush) + { + // + // Increment the number of requests in the batch. + // ++_batchRequestNum; // @@ -927,7 +927,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) // if(compress) { - _batchRequestCompress = true; + _batchRequestCompress = true; } // @@ -942,41 +942,41 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) if(autoflush) { // - // We have to keep _batchStreamInUse set until after we insert the - // saved marshalled data into a new stream. - // + // We have to keep _batchStreamInUse set until after we insert the + // saved marshalled data into a new stream. + // flushBatchRequestsInternal(true); IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // Throw memory limit exception if the message that caused us to go over - // limit causes us to exceed the limit by itself. - // + // + // Throw memory limit exception if the message that caused us to go over + // limit causes us to exceed the limit by itself. + // if(sizeof(requestBatchHdr) + lastRequest.size() > _instance->messageSizeMax()) - { - resetBatch(true); - throw MemoryLimitException(__FILE__, __LINE__); - } - - // - // Start a new batch with the last message that caused us to - // go over the limit. - // - try - { - _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - ex.ice_throw(); - } + { + resetBatch(true); + throw MemoryLimitException(__FILE__, __LINE__); + } + + // + // Start a new batch with the last message that caused us to + // go over the limit. + // + try + { + _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); + _batchStream.writeBlob(&lastRequest[0], lastRequest.size()); + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + ex.ice_throw(); + } if(compress) { - _batchRequestCompress = true; + _batchRequestCompress = true; } // @@ -984,7 +984,7 @@ Ice::ConnectionI::finishBatchRequest(BasicStream* os, bool compress) // ++_batchRequestNum; _batchStreamInUse = false; - notifyAll(); + notifyAll(); } } @@ -1011,133 +1011,133 @@ void Ice::ConnectionI::flushBatchRequestsInternal(bool ignoreInUse) { { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(!ignoreInUse) - { - while(_batchStreamInUse && !_exception.get()) - { - wait(); - } - } - - if(_exception.get()) - { - _exception->ice_throw(); - } - - if(_batchStream.b.empty()) - { - return; // Nothing to do. - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - _batchStream.i = _batchStream.b.begin(); - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); - } - - // - // Prevent that new batch requests are added while we are - // flushing. - // - _batchStreamInUse = true; + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(!ignoreInUse) + { + while(_batchStreamInUse && !_exception.get()) + { + wait(); + } + } + + if(_exception.get()) + { + _exception->ice_throw(); + } + + if(_batchStream.b.empty()) + { + return; // Nothing to do. + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + _batchStream.i = _batchStream.b.begin(); + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } + + // + // Prevent that new batch requests are added while we are + // flushing. + // + _batchStreamInUse = true; } try { - IceUtil::Mutex::Lock sendSync(_sendMutex); + IceUtil::Mutex::Lock sendSync(_sendMutex); - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } + if(!_transceiver) // Has the transceiver already been closed? + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } - // - // Fill in the number of requests in the batch. - // - const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); + // + // Fill in the number of requests in the batch. + // + const Byte* p = reinterpret_cast<const Byte*>(&_batchRequestNum); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); + reverse_copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #else - copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); + copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif - - if(_batchRequestCompress && _batchStream.b.size() >= 100) // Only compress messages larger than 100 bytes. - { - // - // Message compressed. Request compressed response, if any. - // - _batchStream.b[9] = 2; - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(_batchStream, cstream); - - // - // Send the batch request. - // - _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); - } - else - { - if(_batchRequestCompress) - { - // - // Message not compressed. Request compressed response, if any. - // - _batchStream.b[9] = 1; - } - - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(_batchStream.b.size()); - const Byte* q = reinterpret_cast<const Byte*>(&sz); + + if(_batchRequestCompress && _batchStream.b.size() >= 100) // Only compress messages larger than 100 bytes. + { + // + // Message compressed. Request compressed response, if any. + // + _batchStream.b[9] = 2; + + // + // Do compression. + // + BasicStream cstream(_instance.get()); + doCompress(_batchStream, cstream); + + // + // Send the batch request. + // + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + if(_batchRequestCompress) + { + // + // Message not compressed. Request compressed response, if any. + // + _batchStream.b[9] = 1; + } + + // + // No compression, just fill in the message size. + // + Int sz = static_cast<Int>(_batchStream.b.size()); + const Byte* q = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN - reverse_copy(q, q + sizeof(Int), _batchStream.b.begin() + 10); + reverse_copy(q, q + sizeof(Int), _batchStream.b.begin() + 10); #else - copy(q, q + sizeof(Int), _batchStream.b.begin() + 10); + copy(q, q + sizeof(Int), _batchStream.b.begin() + 10); #endif - - // - // Send the batch request. - // - _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - _transceiver->write(_batchStream, _endpoint->timeout()); - } + + // + // Send the batch request. + // + _batchStream.i = _batchStream.b.begin(); + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + _transceiver->write(_batchStream, _endpoint->timeout()); + } } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + assert(_exception.get()); - // - // Since batch requests are all oneways (or datagrams), we - // must report the exception to the caller. - // - _exception->ice_throw(); + // + // Since batch requests are all oneways (or datagrams), we + // must report the exception to the caller. + // + _exception->ice_throw(); } { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // Reset the batch stream, and notify that flushing is over. - // - resetBatch(!ignoreInUse); + // + // Reset the batch stream, and notify that flushing is over. + // + resetBatch(!ignoreInUse); } } @@ -1167,100 +1167,100 @@ Ice::ConnectionI::sendResponse(BasicStream* os, Byte compressFlag) { try { - IceUtil::Mutex::Lock sendSync(_sendMutex); - - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } - - // - // Only compress if compression was requested by the client, - // and if the message is larger than 100 bytes. - // - if(compressFlag > 0 && os->b.size() >= 100) - { - // - // Message compressed. Request compressed response, if any. - // - os->b[9] = 2; - - // - // Do compression. - // - BasicStream cstream(_instance.get()); - doCompress(*os, cstream); - - // - // Send the reply. - // - os->i = os->b.begin(); - traceReply("sending reply", *os, _logger, _traceLevels); - cstream.i = cstream.b.begin(); - _transceiver->write(cstream, _endpoint->timeout()); - } - else - { - if(compressFlag > 0) - { - // - // Message not compressed. Request compressed response, if any. - // - os->b[9] = 1; - } - - // - // No compression, just fill in the message size. - // - Int sz = static_cast<Int>(os->b.size()); - const Byte* p = reinterpret_cast<const Byte*>(&sz); + IceUtil::Mutex::Lock sendSync(_sendMutex); + + if(!_transceiver) // Has the transceiver already been closed? + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } + + // + // Only compress if compression was requested by the client, + // and if the message is larger than 100 bytes. + // + if(compressFlag > 0 && os->b.size() >= 100) + { + // + // Message compressed. Request compressed response, if any. + // + os->b[9] = 2; + + // + // Do compression. + // + BasicStream cstream(_instance.get()); + doCompress(*os, cstream); + + // + // Send the reply. + // + os->i = os->b.begin(); + traceReply("sending reply", *os, _logger, _traceLevels); + cstream.i = cstream.b.begin(); + _transceiver->write(cstream, _endpoint->timeout()); + } + else + { + if(compressFlag > 0) + { + // + // Message not compressed. Request compressed response, if any. + // + os->b[9] = 1; + } + + // + // No compression, just fill in the message size. + // + Int sz = static_cast<Int>(os->b.size()); + const Byte* p = reinterpret_cast<const Byte*>(&sz); #ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); + reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); #else - copy(p, p + sizeof(Int), os->b.begin() + 10); + copy(p, p + sizeof(Int), os->b.begin() + 10); #endif - - // - // Send the reply. - // - os->i = os->b.begin(); - traceReply("sending reply", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); - } + + // + // Send the reply. + // + os->i = os->b.begin(); + traceReply("sending reply", *os, _logger, _traceLevels); + _transceiver->write(*os, _endpoint->timeout()); + } } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - } - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(_state > StateNotValidated); - - try - { - if(--_dispatchCount == 0) - { - notifyAll(); - } - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } - - if(_acmTimeout > 0) - { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); - } - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + } + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + assert(_state > StateNotValidated); + + try + { + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + + if(_acmTimeout > 0) + { + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + } + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } } } @@ -1273,19 +1273,19 @@ Ice::ConnectionI::sendNoResponse() try { - if(--_dispatchCount == 0) - { - notifyAll(); - } + if(--_dispatchCount == 0) + { + notifyAll(); + } - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } } catch(const LocalException& ex) { - setState(StateClosed, ex); + setState(StateClosed, ex); } } @@ -1308,7 +1308,7 @@ Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter) if(_exception.get()) { - _exception->ice_throw(); + _exception->ice_throw(); } assert(_state < StateClosing); @@ -1317,15 +1317,15 @@ Ice::ConnectionI::setAdapter(const ObjectAdapterPtr& adapter) if(_adapter) { - _servantManager = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getServantManager(); - if(!_servantManager) - { - _adapter = 0; - } + _servantManager = dynamic_cast<ObjectAdapterI*>(_adapter.get())->getServantManager(); + if(!_servantManager) + { + _adapter = 0; + } } else { - _servantManager = 0; + _servantManager = 0; } // @@ -1351,7 +1351,7 @@ Ice::ConnectionI::createProxy(const Identity& ident) const vector<ConnectionIPtr> connections; connections.push_back(const_cast<ConnectionI*>(this)); ReferencePtr ref = _instance->referenceFactory()->create(ident, _instance->getDefaultContext(), - "", Reference::ModeTwoway, connections); + "", Reference::ModeTwoway, connections); return _instance->proxyFactory()->referenceToProxy(ref); } @@ -1396,28 +1396,28 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) OutgoingAsyncPtr outAsync; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // We must promote within the synchronization, otherwise there - // could be various race conditions with close connection - // messages and other messages. - // - threadPool->promoteFollower(); + // + // We must promote within the synchronization, otherwise there + // could be various race conditions with close connection + // messages and other messages. + // + threadPool->promoteFollower(); - if(_state != StateClosed) - { - parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync); - } + if(_state != StateClosed) + { + parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync); + } - // - // parseMessage() can close the connection, so we must check - // for closed state again. - // - if(_state == StateClosed) - { - return; - } + // + // parseMessage() can close the connection, so we must check + // for closed state again. + // + if(_state == StateClosed) + { + return; + } } // @@ -1426,7 +1426,7 @@ Ice::ConnectionI::message(BasicStream& stream, const ThreadPoolPtr& threadPool) // if(outAsync) { - outAsync->__finished(stream); + outAsync->__finished(stream); } // @@ -1450,57 +1450,57 @@ Ice::ConnectionI::finished(const ThreadPoolPtr& threadPool) map<Int, AsyncRequest> asyncRequests; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - --_finishedCount; - assert(threadPool.get() == _threadPool.get()); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + --_finishedCount; + assert(threadPool.get() == _threadPool.get()); - if(_finishedCount == 0 && _state == StateClosed) - { - _threadPool->decFdsInUse(); + if(_finishedCount == 0 && _state == StateClosed) + { + _threadPool->decFdsInUse(); - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); + // + // We must make sure that nobody is sending when we close + // the transceiver. + // + IceUtil::Mutex::Lock sendSync(_sendMutex); - try - { - _transceiver->close(); - } - catch(const LocalException& ex) - { - localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - } + try + { + _transceiver->close(); + } + catch(const LocalException& ex) + { + localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + } - _transceiver = 0; - notifyAll(); - } + _transceiver = 0; + notifyAll(); + } - if(_state == StateClosed || _state == StateClosing) - { - requests.swap(_requests); - _requestsHint = _requests.end(); + if(_state == StateClosed || _state == StateClosing) + { + requests.swap(_requests); + _requestsHint = _requests.end(); - asyncRequests.swap(_asyncRequests); - _asyncRequestsHint = _asyncRequests.end(); - } + asyncRequests.swap(_asyncRequests); + _asyncRequestsHint = _asyncRequests.end(); + } } for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) { - p->second->finished(*_exception.get()); // The exception is immutable at this point. + p->second->finished(*_exception.get()); // The exception is immutable at this point. } for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q) { - q->second.p->__finished(*_exception.get()); // The exception is immutable at this point. + q->second.p->__finished(*_exception.get()); // The exception is immutable at this point. } if(localEx.get()) { - localEx->ice_throw(); + localEx->ice_throw(); } } @@ -1524,13 +1524,13 @@ Ice::ConnectionI::invokeException(const LocalException& ex, int invokeNum) if(invokeNum > 0) { - assert(_dispatchCount > 0); - _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); - if(_dispatchCount == 0) - { - notifyAll(); - } + assert(_dispatchCount > 0); + _dispatchCount -= invokeNum; + assert(_dispatchCount >= 0); + if(_dispatchCount == 0) + { + notifyAll(); + } } } @@ -1565,9 +1565,9 @@ Ice::ConnectionI::getTransceiver() const } Ice::ConnectionI::ConnectionI(const InstancePtr& instance, - const TransceiverPtr& transceiver, - const EndpointIPtr& endpoint, - const ObjectAdapterPtr& adapter, + const TransceiverPtr& transceiver, + const EndpointIPtr& endpoint, + const ObjectAdapterPtr& adapter, bool threadPerConnection, size_t threadPerConnectionStackSize) : EventHandler(instance), @@ -1601,92 +1601,92 @@ Ice::ConnectionI::ConnectionI(const InstancePtr& instance, Int& acmTimeout = const_cast<Int&>(_acmTimeout); if(_endpoint->datagram()) { - acmTimeout = 0; + acmTimeout = 0; } else { - if(_adapter) - { - acmTimeout = _instance->serverACM(); - } - else - { - acmTimeout = _instance->clientACM(); - } + if(_adapter) + { + acmTimeout = _instance->serverACM(); + } + else + { + acmTimeout = _instance->clientACM(); + } } int& compressionLevel = const_cast<int&>(_compressionLevel); compressionLevel = _instance->initializationData().properties->getPropertyAsIntWithDefault( - "Ice.Compression.Level", 1); + "Ice.Compression.Level", 1); if(compressionLevel < 1) { - compressionLevel = 1; + compressionLevel = 1; } else if(compressionLevel > 9) { - compressionLevel = 9; + compressionLevel = 9; } ObjectAdapterI* adapterImpl = _adapter ? dynamic_cast<ObjectAdapterI*>(_adapter.get()) : 0; if(adapterImpl) { - _servantManager = adapterImpl->getServantManager(); + _servantManager = adapterImpl->getServantManager(); } __setNoDelete(true); try { - if(!threadPerConnection) - { - // - // Only set _threadPool if we really need it, i.e., if we are - // not in thread per connection mode. Thread pools have lazy - // initialization in Instance, and we don't want them to be - // created if they are not needed. - // - if(adapterImpl) - { - const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool(); - } - else - { - const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); - } - _threadPool->incFdsInUse(); - } - else - { - // - // If we are in thread per connection mode, create the - // thread for this connection. - // - _thread = new ThreadPerConnection(this); - _thread->start(threadPerConnectionStackSize); - } + if(!threadPerConnection) + { + // + // Only set _threadPool if we really need it, i.e., if we are + // not in thread per connection mode. Thread pools have lazy + // initialization in Instance, and we don't want them to be + // created if they are not needed. + // + if(adapterImpl) + { + const_cast<ThreadPoolPtr&>(_threadPool) = adapterImpl->getThreadPool(); + } + else + { + const_cast<ThreadPoolPtr&>(_threadPool) = _instance->clientThreadPool(); + } + _threadPool->incFdsInUse(); + } + else + { + // + // If we are in thread per connection mode, create the + // thread for this connection. + // + _thread = new ThreadPerConnection(this); + _thread->start(threadPerConnectionStackSize); + } } catch(const IceUtil::Exception& ex) { - { - Error out(_logger); - if(threadPerConnection) - { - out << "cannot create thread for connection:\n" << ex; - } - // Otherwise with thread pool the thread pool itself - // prints a warning if the threads cannot be created. - } - - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - __setNoDelete(false); - ex.ice_throw(); + { + Error out(_logger); + if(threadPerConnection) + { + out << "cannot create thread for connection:\n" << ex; + } + // Otherwise with thread pool the thread pool itself + // prints a warning if the threads cannot be created. + } + + try + { + _transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + + __setNoDelete(false); + ex.ice_throw(); } __setNoDelete(false); } @@ -1710,40 +1710,40 @@ Ice::ConnectionI::setState(State state, const LocalException& ex) if(_state == state) // Don't switch twice. { - return; + return; } if(!_exception.get()) { - // - // If we are in closed state, an exception must be set. - // - assert(_state != StateClosed); - - _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - - if(_warn) - { - // - // We don't warn if we are not validated. - // - if(_state > StateNotValidated) - { - // - // Don't warn about certain expected exceptions. - // - if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || - dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || - dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || - dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || - (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) - { - Warning out(_logger); - out << "connection exception:\n" << *_exception.get() << '\n' << _desc; - } - } - } + // + // If we are in closed state, an exception must be set. + // + assert(_state != StateClosed); + + _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + + if(_warn) + { + // + // We don't warn if we are not validated. + // + if(_state > StateNotValidated) + { + // + // Don't warn about certain expected exceptions. + // + if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || + dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const ConnectionTimeoutException*>(_exception.get()) || + dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || + dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || + (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) + { + Warning out(_logger); + out << "connection exception:\n" << *_exception.get() << '\n' << _desc; + } + } + } } // @@ -1763,7 +1763,7 @@ Ice::ConnectionI::setState(State state) // if(_endpoint->datagram() && state == StateClosing) { - state = StateClosed; + state = StateClosed; } // @@ -1771,133 +1771,133 @@ Ice::ConnectionI::setState(State state) // if(_state == StateNotValidated && state == StateClosing) { - state = StateClosed; + state = StateClosed; } if(_state == state) // Don't switch twice. { - return; + return; } switch(state) { - case StateNotValidated: - { - assert(false); - break; - } + case StateNotValidated: + { + assert(false); + break; + } - case StateActive: - { - // + case StateActive: + { + // // Can only switch from holding or not validated to // active. - // - if(_state != StateHolding && _state != StateNotValidated) - { - return; - } - if(!_threadPerConnection) - { - registerWithPool(); - } - break; - } - - case StateHolding: - { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) - { - return; - } - if(!_threadPerConnection) - { - unregisterWithPool(); - } - break; - } - - case StateClosing: - { - // - // Can't change back from closed. - // - if(_state == StateClosed) - { - return; - } - if(!_threadPerConnection) - { - registerWithPool(); // We need to continue to read in closing state. - } - break; - } - - case StateClosed: - { - if(_threadPerConnection) - { - // - // If we are in thread per connection mode, we - // shutdown both for reading and writing. This will - // unblock and read call with an exception. The thread - // per connection then closes the transceiver. - // - _transceiver->shutdownReadWrite(); - } - else if(_state == StateNotValidated) - { - // - // If we change from not validated we can close right - // away. - // - assert(!_registeredWithPool); - - _threadPool->decFdsInUse(); - - // - // We must make sure that nobody is sending when we - // close the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); - - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - _transceiver = 0; - //notifyAll(); // We notify already below. - } - else - { - // - // Otherwise we first must make sure that we are - // registered, then we unregister, and let finished() - // do the close. - // - registerWithPool(); - unregisterWithPool(); - - // - // We must prevent any further writes when _state == StateClosed. - // However, functions such as sendResponse cannot acquire the main - // mutex in order to check _state. Therefore we shut down the write - // end of the transceiver, which causes subsequent write attempts - // to fail with an exception. - // - _transceiver->shutdownWrite(); - } - break; - } + // + if(_state != StateHolding && _state != StateNotValidated) + { + return; + } + if(!_threadPerConnection) + { + registerWithPool(); + } + break; + } + + case StateHolding: + { + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) + { + return; + } + if(!_threadPerConnection) + { + unregisterWithPool(); + } + break; + } + + case StateClosing: + { + // + // Can't change back from closed. + // + if(_state == StateClosed) + { + return; + } + if(!_threadPerConnection) + { + registerWithPool(); // We need to continue to read in closing state. + } + break; + } + + case StateClosed: + { + if(_threadPerConnection) + { + // + // If we are in thread per connection mode, we + // shutdown both for reading and writing. This will + // unblock and read call with an exception. The thread + // per connection then closes the transceiver. + // + _transceiver->shutdownReadWrite(); + } + else if(_state == StateNotValidated) + { + // + // If we change from not validated we can close right + // away. + // + assert(!_registeredWithPool); + + _threadPool->decFdsInUse(); + + // + // We must make sure that nobody is sending when we + // close the transceiver. + // + IceUtil::Mutex::Lock sendSync(_sendMutex); + + try + { + _transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + + _transceiver = 0; + //notifyAll(); // We notify already below. + } + else + { + // + // Otherwise we first must make sure that we are + // registered, then we unregister, and let finished() + // do the close. + // + registerWithPool(); + unregisterWithPool(); + + // + // We must prevent any further writes when _state == StateClosed. + // However, functions such as sendResponse cannot acquire the main + // mutex in order to check _state. Therefore we shut down the write + // end of the transceiver, which causes subsequent write attempts + // to fail with an exception. + // + _transceiver->shutdownWrite(); + } + break; + } } // @@ -1909,14 +1909,14 @@ Ice::ConnectionI::setState(State state) ConnectionMonitorPtr connectionMonitor = _instance->connectionMonitor(); if(connectionMonitor) { - if(state == StateActive) - { - connectionMonitor->add(this); - } - else if(_state == StateActive) - { - connectionMonitor->remove(this); - } + if(state == StateActive) + { + connectionMonitor->add(this); + } + else if(_state == StateActive) + { + connectionMonitor->remove(this); + } } _state = state; @@ -1926,14 +1926,14 @@ Ice::ConnectionI::setState(State state) if(_state == StateClosing && _dispatchCount == 0) { - try - { - initiateShutdown(); - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - } + try + { + initiateShutdown(); + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } } } @@ -1945,39 +1945,39 @@ Ice::ConnectionI::initiateShutdown() const if(!_endpoint->datagram()) { - IceUtil::Mutex::Lock sendSync(_sendMutex); - - // - // Before we shut down, we send a close connection message. - // - BasicStream os(_instance.get()); - os.write(magic[0]); - os.write(magic[1]); - os.write(magic[2]); - os.write(magic[3]); - os.write(protocolMajor); - os.write(protocolMinor); - os.write(encodingMajor); - os.write(encodingMinor); - os.write(closeConnectionMsg); - os.write((Byte)1); // Compression status: compression supported but not used. - os.write(headerSize); // Message size. - - // - // Send the message. - // - os.i = os.b.begin(); - traceHeader("sending close connection", os, _logger, _traceLevels); - _transceiver->write(os, _endpoint->timeout()); - // - // The CloseConnection message should be sufficient. Closing the write - // end of the socket is probably an artifact of how things were done - // in IIOP. In fact, shutting down the write end of the socket causes - // problems on Windows by preventing the peer from using the socket. - // For example, the peer is no longer able to continue writing a large - // message after the socket is shutdown. - // - //_transceiver->shutdownWrite(); + IceUtil::Mutex::Lock sendSync(_sendMutex); + + // + // Before we shut down, we send a close connection message. + // + BasicStream os(_instance.get()); + os.write(magic[0]); + os.write(magic[1]); + os.write(magic[2]); + os.write(magic[3]); + os.write(protocolMajor); + os.write(protocolMinor); + os.write(encodingMajor); + os.write(encodingMinor); + os.write(closeConnectionMsg); + os.write((Byte)1); // Compression status: compression supported but not used. + os.write(headerSize); // Message size. + + // + // Send the message. + // + os.i = os.b.begin(); + traceHeader("sending close connection", os, _logger, _traceLevels); + _transceiver->write(os, _endpoint->timeout()); + // + // The CloseConnection message should be sufficient. Closing the write + // end of the socket is probably an artifact of how things were done + // in IIOP. In fact, shutting down the write end of the socket causes + // problems on Windows by preventing the peer from using the socket. + // For example, the peer is no longer able to continue writing a large + // message after the socket is shutdown. + // + //_transceiver->shutdownWrite(); } } @@ -1988,8 +1988,8 @@ Ice::ConnectionI::registerWithPool() if(!_registeredWithPool) { - _threadPool->_register(_transceiver->fd(), this); - _registeredWithPool = true; + _threadPool->_register(_transceiver->fd(), this); + _registeredWithPool = true; } } @@ -2000,9 +2000,9 @@ Ice::ConnectionI::unregisterWithPool() if(_registeredWithPool) { - _threadPool->unregister(_transceiver->fd()); - _registeredWithPool = false; - ++_finishedCount; // For each unregistration, finished() is called once. + _threadPool->unregister(_transceiver->fd()); + _registeredWithPool = false; + ++_finishedCount; // For each unregistration, finished() is called once. } } @@ -2011,59 +2011,59 @@ getBZ2Error(int bzError) { if(bzError == BZ_RUN_OK) { - return ": BZ_RUN_OK"; + return ": BZ_RUN_OK"; } else if(bzError == BZ_FLUSH_OK) { - return ": BZ_FLUSH_OK"; + return ": BZ_FLUSH_OK"; } else if(bzError == BZ_FINISH_OK) { - return ": BZ_FINISH_OK"; + return ": BZ_FINISH_OK"; } else if(bzError == BZ_STREAM_END) { - return ": BZ_STREAM_END"; + return ": BZ_STREAM_END"; } else if(bzError == BZ_CONFIG_ERROR) { - return ": BZ_CONFIG_ERROR"; + return ": BZ_CONFIG_ERROR"; } else if(bzError == BZ_SEQUENCE_ERROR) { - return ": BZ_SEQUENCE_ERROR"; + return ": BZ_SEQUENCE_ERROR"; } else if(bzError == BZ_PARAM_ERROR) { - return ": BZ_PARAM_ERROR"; + return ": BZ_PARAM_ERROR"; } else if(bzError == BZ_MEM_ERROR) { - return ": BZ_MEM_ERROR"; + return ": BZ_MEM_ERROR"; } else if(bzError == BZ_DATA_ERROR) { - return ": BZ_DATA_ERROR"; + return ": BZ_DATA_ERROR"; } else if(bzError == BZ_DATA_ERROR_MAGIC) { - return ": BZ_DATA_ERROR_MAGIC"; + return ": BZ_DATA_ERROR_MAGIC"; } else if(bzError == BZ_IO_ERROR) { - return ": BZ_IO_ERROR"; + return ": BZ_IO_ERROR"; } else if(bzError == BZ_UNEXPECTED_EOF) { - return ": BZ_UNEXPECTED_EOF"; + return ": BZ_UNEXPECTED_EOF"; } else if(bzError == BZ_OUTBUFF_FULL) { - return ": BZ_OUTBUFF_FULL"; + return ": BZ_OUTBUFF_FULL"; } else { - return ""; + return ""; } } @@ -2079,15 +2079,15 @@ Ice::ConnectionI::doCompress(BasicStream& uncompressed, BasicStream& compressed) unsigned int compressedLen = static_cast<unsigned int>(uncompressedLen * 1.01 + 600); compressed.b.resize(headerSize + sizeof(Int) + compressedLen); int bzError = BZ2_bzBuffToBuffCompress(reinterpret_cast<char*>(&compressed.b[0]) + headerSize + sizeof(Int), - &compressedLen, - reinterpret_cast<char*>(&uncompressed.b[0]) + headerSize, - uncompressedLen, - _compressionLevel, 0, 0); + &compressedLen, + reinterpret_cast<char*>(&uncompressed.b[0]) + headerSize, + uncompressedLen, + _compressionLevel, 0, 0); if(bzError != BZ_OK) { - CompressionException ex(__FILE__, __LINE__); - ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError); - throw ex; + CompressionException ex(__FILE__, __LINE__); + ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError); + throw ex; } compressed.b.resize(headerSize + sizeof(Int) + compressedLen); @@ -2130,22 +2130,22 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse compressed.read(uncompressedSize); if(uncompressedSize <= headerSize) { - throw IllegalMessageSizeException(__FILE__, __LINE__); + throw IllegalMessageSizeException(__FILE__, __LINE__); } uncompressed.resize(uncompressedSize); unsigned int uncompressedLen = uncompressedSize - headerSize; unsigned int compressedLen = static_cast<unsigned int>(compressed.b.size() - headerSize - sizeof(Int)); int bzError = BZ2_bzBuffToBuffDecompress(reinterpret_cast<char*>(&uncompressed.b[0]) + headerSize, - &uncompressedLen, - reinterpret_cast<char*>(&compressed.b[0]) + headerSize + sizeof(Int), - compressedLen, - 0, 0); + &uncompressedLen, + reinterpret_cast<char*>(&compressed.b[0]) + headerSize + sizeof(Int), + compressedLen, + 0, 0); if(bzError != BZ_OK) { - CompressionException ex(__FILE__, __LINE__); - ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError); - throw ex; + CompressionException ex(__FILE__, __LINE__); + ex.reason = "BZ2_bzBuffToBuffCompress failed" + getBZ2Error(bzError); + throw ex; } copy(compressed.b.begin(), compressed.b.begin() + headerSize, uncompressed.b.begin()); @@ -2153,222 +2153,222 @@ Ice::ConnectionI::doUncompress(BasicStream& compressed, BasicStream& uncompresse void Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& requestId, Byte& compress, - ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, - OutgoingAsyncPtr& outAsync) + ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter, + OutgoingAsyncPtr& outAsync) { assert(_state > StateNotValidated && _state < StateClosed); if(_acmTimeout > 0) { - _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); + _acmAbsoluteTimeout = IceUtil::Time::now() + IceUtil::Time::seconds(_acmTimeout); } try { - // - // We don't need to check magic and version here. This has - // already been done by the ThreadPool or the - // ThreadPerConnection, which provides us with the stream. - // - assert(stream.i == stream.b.end()); - stream.i = stream.b.begin() + 8; - Byte messageType; - stream.read(messageType); - stream.read(compress); - if(compress == 2) - { - BasicStream ustream(_instance.get()); - doUncompress(stream, ustream); - stream.b.swap(ustream.b); - } - stream.i = stream.b.begin() + headerSize; + // + // We don't need to check magic and version here. This has + // already been done by the ThreadPool or the + // ThreadPerConnection, which provides us with the stream. + // + assert(stream.i == stream.b.end()); + stream.i = stream.b.begin() + 8; + Byte messageType; + stream.read(messageType); + stream.read(compress); + if(compress == 2) + { + BasicStream ustream(_instance.get()); + doUncompress(stream, ustream); + stream.b.swap(ustream.b); + } + stream.i = stream.b.begin() + headerSize; - switch(messageType) - { - case closeConnectionMsg: - { - traceHeader("received close connection", stream, _logger, _traceLevels); - if(_endpoint->datagram()) - { - if(_warn) - { - Warning out(_logger); - out << "ignoring close connection message for datagram connection:\n" << _desc; - } - } - else - { - setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); - } - break; - } - - case requestMsg: - { - if(_state == StateClosing) - { - traceRequest("received request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - traceRequest("received request", stream, _logger, _traceLevels); - stream.read(requestId); - invokeNum = 1; - servantManager = _servantManager; - adapter = _adapter; - ++_dispatchCount; - } - break; - } - - case requestBatchMsg: - { - if(_state == StateClosing) - { - traceBatchRequest("received batch request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - traceBatchRequest("received batch request", stream, _logger, _traceLevels); - stream.read(invokeNum); - if(invokeNum < 0) - { - invokeNum = 0; - throw NegativeSizeException(__FILE__, __LINE__); - } - servantManager = _servantManager; - adapter = _adapter; - _dispatchCount += invokeNum; - } - break; - } - - case replyMsg: - { - traceReply("received reply", stream, _logger, _traceLevels); - - stream.read(requestId); - - map<Int, Outgoing*>::iterator p = _requests.end(); - map<Int, AsyncRequest>::iterator q = _asyncRequests.end(); - - if(_requestsHint != _requests.end()) - { - if(_requestsHint->first == requestId) - { - p = _requestsHint; - } - } - - if(p == _requests.end()) - { - if(_asyncRequestsHint != _asyncRequests.end()) - { - if(_asyncRequestsHint->first == requestId) - { - q = _asyncRequestsHint; - } - } - } - - if(p == _requests.end() && q == _asyncRequests.end()) - { - p = _requests.find(requestId); - } - - if(p == _requests.end() && q == _asyncRequests.end()) - { - q = _asyncRequests.find(requestId); - } - - if(p == _requests.end() && q == _asyncRequests.end()) - { - throw UnknownRequestIdException(__FILE__, __LINE__); - } - - if(p != _requests.end()) - { - p->second->finished(stream); - - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - } - else - { - assert(q != _asyncRequests.end()); - - outAsync = q->second.p; - - if(q == _asyncRequestsHint) - { - _asyncRequests.erase(q++); - _asyncRequestsHint = q; - } - else - { - _asyncRequests.erase(q); - } - } - - break; - } - - case validateConnectionMsg: - { - traceHeader("received validate connection", stream, _logger, _traceLevels); - if(_warn) - { - Warning out(_logger); - out << "ignoring unexpected validate connection message:\n" << _desc; - } - break; - } - - default: - { - traceHeader("received unknown message\n" - "(invalid, closing connection)", - stream, _logger, _traceLevels); - throw UnknownMessageException(__FILE__, __LINE__); - break; - } - } + switch(messageType) + { + case closeConnectionMsg: + { + traceHeader("received close connection", stream, _logger, _traceLevels); + if(_endpoint->datagram()) + { + if(_warn) + { + Warning out(_logger); + out << "ignoring close connection message for datagram connection:\n" << _desc; + } + } + else + { + setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); + } + break; + } + + case requestMsg: + { + if(_state == StateClosing) + { + traceRequest("received request during closing\n" + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); + } + else + { + traceRequest("received request", stream, _logger, _traceLevels); + stream.read(requestId); + invokeNum = 1; + servantManager = _servantManager; + adapter = _adapter; + ++_dispatchCount; + } + break; + } + + case requestBatchMsg: + { + if(_state == StateClosing) + { + traceBatchRequest("received batch request during closing\n" + "(ignored by server, client will retry)", + stream, _logger, _traceLevels); + } + else + { + traceBatchRequest("received batch request", stream, _logger, _traceLevels); + stream.read(invokeNum); + if(invokeNum < 0) + { + invokeNum = 0; + throw NegativeSizeException(__FILE__, __LINE__); + } + servantManager = _servantManager; + adapter = _adapter; + _dispatchCount += invokeNum; + } + break; + } + + case replyMsg: + { + traceReply("received reply", stream, _logger, _traceLevels); + + stream.read(requestId); + + map<Int, Outgoing*>::iterator p = _requests.end(); + map<Int, AsyncRequest>::iterator q = _asyncRequests.end(); + + if(_requestsHint != _requests.end()) + { + if(_requestsHint->first == requestId) + { + p = _requestsHint; + } + } + + if(p == _requests.end()) + { + if(_asyncRequestsHint != _asyncRequests.end()) + { + if(_asyncRequestsHint->first == requestId) + { + q = _asyncRequestsHint; + } + } + } + + if(p == _requests.end() && q == _asyncRequests.end()) + { + p = _requests.find(requestId); + } + + if(p == _requests.end() && q == _asyncRequests.end()) + { + q = _asyncRequests.find(requestId); + } + + if(p == _requests.end() && q == _asyncRequests.end()) + { + throw UnknownRequestIdException(__FILE__, __LINE__); + } + + if(p != _requests.end()) + { + p->second->finished(stream); + + if(p == _requestsHint) + { + _requests.erase(p++); + _requestsHint = p; + } + else + { + _requests.erase(p); + } + } + else + { + assert(q != _asyncRequests.end()); + + outAsync = q->second.p; + + if(q == _asyncRequestsHint) + { + _asyncRequests.erase(q++); + _asyncRequestsHint = q; + } + else + { + _asyncRequests.erase(q); + } + } + + break; + } + + case validateConnectionMsg: + { + traceHeader("received validate connection", stream, _logger, _traceLevels); + if(_warn) + { + Warning out(_logger); + out << "ignoring unexpected validate connection message:\n" << _desc; + } + break; + } + + default: + { + traceHeader("received unknown message\n" + "(invalid, closing connection)", + stream, _logger, _traceLevels); + throw UnknownMessageException(__FILE__, __LINE__); + break; + } + } } catch(const SocketException& ex) { - exception(ex); + exception(ex); } catch(const LocalException& ex) { - if(_endpoint->datagram()) - { - if(_warn) - { - Warning out(_logger); - out << "datagram connection exception:\n" << ex << '\n' << _desc; - } - } - else - { - setState(StateClosed, ex); - } + if(_endpoint->datagram()) + { + if(_warn) + { + Warning out(_logger); + out << "datagram connection exception:\n" << ex << '\n' << _desc; + } + } + else + { + setState(StateClosed, ex); + } } } void Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, Byte compress, - const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter) + const ServantManagerPtr& servantManager, const ObjectAdapterPtr& adapter) { // // Note: In contrast to other private or protected methods, this @@ -2377,45 +2377,45 @@ Ice::ConnectionI::invokeAll(BasicStream& stream, Int invokeNum, Int requestId, B try { - while(invokeNum > 0) - { - // - // Prepare the invocation. - // - bool response = !_endpoint->datagram() && requestId != 0; - Incoming in(_instance.get(), this, adapter, response, compress, requestId); - BasicStream* is = in.is(); - stream.swap(*is); - BasicStream* os = in.os(); - - // - // Prepare the response if necessary. - // - if(response) - { - assert(invokeNum == 1); // No further invocations if a response is expected. - os->writeBlob(replyHdr, sizeof(replyHdr)); - - // - // Add the request ID. - // - os->write(requestId); - } - - in.invoke(servantManager); - - // - // If there are more invocations, we need the stream back. - // - if(--invokeNum > 0) - { - stream.swap(*is); - } - } + while(invokeNum > 0) + { + // + // Prepare the invocation. + // + bool response = !_endpoint->datagram() && requestId != 0; + Incoming in(_instance.get(), this, adapter, response, compress, requestId); + BasicStream* is = in.is(); + stream.swap(*is); + BasicStream* os = in.os(); + + // + // Prepare the response if necessary. + // + if(response) + { + assert(invokeNum == 1); // No further invocations if a response is expected. + os->writeBlob(replyHdr, sizeof(replyHdr)); + + // + // Add the request ID. + // + os->write(requestId); + } + + in.invoke(servantManager); + + // + // If there are more invocations, we need the stream back. + // + if(--invokeNum > 0) + { + stream.swap(*is); + } + } } catch(const LocalException& ex) { - invokeException(ex, invokeNum); // Fatal invocation exception + invokeException(ex, invokeNum); // Fatal invocation exception } } @@ -2430,40 +2430,40 @@ Ice::ConnectionI::run() // if(!_endpoint->datagram()) { - try - { - validate(); - } - catch(const LocalException&) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - assert(_state == StateClosed); - - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); - - if(_transceiver) - { - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - _transceiver = 0; - } - notifyAll(); - return; - } - - activate(); + try + { + validate(); + } + catch(const LocalException&) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + assert(_state == StateClosed); + + // + // We must make sure that nobody is sending when we close + // the transceiver. + // + IceUtil::Mutex::Lock sendSync(_sendMutex); + + if(_transceiver) + { + try + { + _transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + + _transceiver = 0; + } + notifyAll(); + return; + } + + activate(); } const bool warnUdp = _instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Datagrams") > 0; @@ -2472,213 +2472,213 @@ Ice::ConnectionI::run() while(!closed) { - // - // We must accept new connections outside the thread - // synchronization, because we use blocking accept. - // - - BasicStream stream(_instance.get()); - - try - { - stream.b.resize(headerSize); - stream.i = stream.b.begin(); - _transceiver->read(stream, -1); - - ptrdiff_t pos = stream.i - stream.b.begin(); - if(pos < headerSize) - { - // - // This situation is possible for small UDP packets. - // - throw IllegalMessageSizeException(__FILE__, __LINE__); - } - stream.i = stream.b.begin(); - const Byte* header; - stream.readBlob(header, headerSize); - if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3]) - { - BadMagicException ex(__FILE__, __LINE__); - ex.badMagic = Ice::ByteSeq(&header[0], &header[0] + sizeof(magic)); - throw ex; - } - if(header[4] != protocolMajor) - { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(header[4]); - ex.badMinor = static_cast<unsigned char>(header[5]); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; - } - if(header[6] != encodingMajor) - { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(header[6]); - ex.badMinor = static_cast<unsigned char>(header[7]); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; - } - - Int size; - stream.i -= sizeof(Int); - stream.read(size); - if(size < headerSize) - { - throw IllegalMessageSizeException(__FILE__, __LINE__); - } - if(size > static_cast<Int>(_instance->messageSizeMax())) - { - throw MemoryLimitException(__FILE__, __LINE__); - } - if(size > static_cast<Int>(stream.b.size())) - { - stream.b.resize(size); - } - stream.i = stream.b.begin() + pos; - - if(stream.i != stream.b.end()) - { - if(_endpoint->datagram()) - { - if(warnUdp) - { - Warning out(_logger); - out << "DatagramLimitException: maximum size of " << pos << " exceeded"; - } - throw DatagramLimitException(__FILE__, __LINE__); - } - else - { - _transceiver->read(stream, -1); - assert(stream.i == stream.b.end()); - } - } - } - catch(const DatagramLimitException&) // Expected. - { - continue; - } - catch(const SocketException& ex) - { - exception(ex); - } - catch(const LocalException& ex) - { - if(_endpoint->datagram()) - { - if(_warn) - { - Warning out(_logger); - out << "datagram connection exception:\n" << ex << '\n' << _desc; - } - continue; - } - else - { - exception(ex); - } - } - - Byte compress = 0; - Int requestId = 0; - Int invokeNum = 0; - ServantManagerPtr servantManager; - ObjectAdapterPtr adapter; - OutgoingAsyncPtr outAsync; - - auto_ptr<LocalException> localEx; - - map<Int, Outgoing*> requests; - map<Int, AsyncRequest> asyncRequests; - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(_state == StateHolding) - { - wait(); - } - - if(_state != StateClosed) - { - parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync); - } - - // + // + // We must accept new connections outside the thread + // synchronization, because we use blocking accept. + // + + BasicStream stream(_instance.get()); + + try + { + stream.b.resize(headerSize); + stream.i = stream.b.begin(); + _transceiver->read(stream, -1); + + ptrdiff_t pos = stream.i - stream.b.begin(); + if(pos < headerSize) + { + // + // This situation is possible for small UDP packets. + // + throw IllegalMessageSizeException(__FILE__, __LINE__); + } + stream.i = stream.b.begin(); + const Byte* header; + stream.readBlob(header, headerSize); + if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3]) + { + BadMagicException ex(__FILE__, __LINE__); + ex.badMagic = Ice::ByteSeq(&header[0], &header[0] + sizeof(magic)); + throw ex; + } + if(header[4] != protocolMajor) + { + UnsupportedProtocolException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(header[4]); + ex.badMinor = static_cast<unsigned char>(header[5]); + ex.major = static_cast<unsigned char>(protocolMajor); + ex.minor = static_cast<unsigned char>(protocolMinor); + throw ex; + } + if(header[6] != encodingMajor) + { + UnsupportedEncodingException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(header[6]); + ex.badMinor = static_cast<unsigned char>(header[7]); + ex.major = static_cast<unsigned char>(encodingMajor); + ex.minor = static_cast<unsigned char>(encodingMinor); + throw ex; + } + + Int size; + stream.i -= sizeof(Int); + stream.read(size); + if(size < headerSize) + { + throw IllegalMessageSizeException(__FILE__, __LINE__); + } + if(size > static_cast<Int>(_instance->messageSizeMax())) + { + throw MemoryLimitException(__FILE__, __LINE__); + } + if(size > static_cast<Int>(stream.b.size())) + { + stream.b.resize(size); + } + stream.i = stream.b.begin() + pos; + + if(stream.i != stream.b.end()) + { + if(_endpoint->datagram()) + { + if(warnUdp) + { + Warning out(_logger); + out << "DatagramLimitException: maximum size of " << pos << " exceeded"; + } + throw DatagramLimitException(__FILE__, __LINE__); + } + else + { + _transceiver->read(stream, -1); + assert(stream.i == stream.b.end()); + } + } + } + catch(const DatagramLimitException&) // Expected. + { + continue; + } + catch(const SocketException& ex) + { + exception(ex); + } + catch(const LocalException& ex) + { + if(_endpoint->datagram()) + { + if(_warn) + { + Warning out(_logger); + out << "datagram connection exception:\n" << ex << '\n' << _desc; + } + continue; + } + else + { + exception(ex); + } + } + + Byte compress = 0; + Int requestId = 0; + Int invokeNum = 0; + ServantManagerPtr servantManager; + ObjectAdapterPtr adapter; + OutgoingAsyncPtr outAsync; + + auto_ptr<LocalException> localEx; + + map<Int, Outgoing*> requests; + map<Int, AsyncRequest> asyncRequests; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + while(_state == StateHolding) + { + wait(); + } + + if(_state != StateClosed) + { + parseMessage(stream, invokeNum, requestId, compress, servantManager, adapter, outAsync); + } + + // // parseMessage() can close the connection, so we must // check for closed state again. - // - if(_state == StateClosed) - { - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); - - try - { - _transceiver->close(); - } - catch(const LocalException& ex) - { - localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - } - - _transceiver = 0; - notifyAll(); - - // - // We cannot simply return here. We have to make sure - // that all requests (regular and async) are notified - // about the closed connection below. - // - closed = true; - } - - if(_state == StateClosed || _state == StateClosing) - { - requests.swap(_requests); - _requestsHint = _requests.end(); - - asyncRequests.swap(_asyncRequests); - _asyncRequestsHint = _asyncRequests.end(); - } - } - - // - // Asynchronous replies must be handled outside the thread - // synchronization, so that nested calls are possible. - // - if(outAsync) - { - outAsync->__finished(stream); - } - - // - // Method invocation (or multiple invocations for batch messages) - // must be done outside the thread synchronization, so that nested - // calls are possible. - // - invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); - - for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) - { - p->second->finished(*_exception.get()); // The exception is immutable at this point. - } - - for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q) - { - q->second.p->__finished(*_exception.get()); // The exception is immutable at this point. - } - - if(localEx.get()) - { - assert(closed); - localEx->ice_throw(); - } + // + if(_state == StateClosed) + { + // + // We must make sure that nobody is sending when we close + // the transceiver. + // + IceUtil::Mutex::Lock sendSync(_sendMutex); + + try + { + _transceiver->close(); + } + catch(const LocalException& ex) + { + localEx.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + } + + _transceiver = 0; + notifyAll(); + + // + // We cannot simply return here. We have to make sure + // that all requests (regular and async) are notified + // about the closed connection below. + // + closed = true; + } + + if(_state == StateClosed || _state == StateClosing) + { + requests.swap(_requests); + _requestsHint = _requests.end(); + + asyncRequests.swap(_asyncRequests); + _asyncRequestsHint = _asyncRequests.end(); + } + } + + // + // Asynchronous replies must be handled outside the thread + // synchronization, so that nested calls are possible. + // + if(outAsync) + { + outAsync->__finished(stream); + } + + // + // Method invocation (or multiple invocations for batch messages) + // must be done outside the thread synchronization, so that nested + // calls are possible. + // + invokeAll(stream, invokeNum, requestId, compress, servantManager, adapter); + + for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) + { + p->second->finished(*_exception.get()); // The exception is immutable at this point. + } + + for(map<Int, AsyncRequest>::iterator q = asyncRequests.begin(); q != asyncRequests.end(); ++q) + { + q->second.p->__finished(*_exception.get()); // The exception is immutable at this point. + } + + if(localEx.get()) + { + assert(closed); + localEx->ice_throw(); + } } } @@ -2700,19 +2700,19 @@ Ice::ConnectionI::ThreadPerConnection::run() _connection->run(); } catch(const Exception& ex) - { - Error out(_connection->_logger); - out << "exception in thread per connection:\n" << _connection->toString() << ex; + { + Error out(_connection->_logger); + out << "exception in thread per connection:\n" << _connection->toString() << ex; } catch(const std::exception& ex) { - Error out(_connection->_logger); - out << "std::exception in thread per connection:\n" << _connection->toString() << ex.what(); + Error out(_connection->_logger); + out << "std::exception in thread per connection:\n" << _connection->toString() << ex.what(); } catch(...) { - Error out(_connection->_logger); - out << "unknown exception in thread per connection:\n" << _connection->toString(); + Error out(_connection->_logger); + out << "unknown exception in thread per connection:\n" << _connection->toString(); } if(_connection->_instance->initializationData().threadHook) |