diff options
Diffstat (limited to 'cppe/src/IceE/Connection.cpp')
-rwxr-xr-x | cppe/src/IceE/Connection.cpp | 1332 |
1 files changed, 574 insertions, 758 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp index 7e493dbfbda..fd34a1308eb 100755 --- a/cppe/src/IceE/Connection.cpp +++ b/cppe/src/IceE/Connection.cpp @@ -22,6 +22,7 @@ #include <IceE/ReferenceFactory.h> // For createProxy(). #include <IceE/ProxyFactory.h> // For createProxy(). #include <IceE/BasicStream.h> +#include <IceE/TraceLevels.h> #ifndef ICEE_PURE_CLIENT # include <IceE/Incoming.h> @@ -69,7 +70,6 @@ Ice::Connection::waitForValidation() } } -#ifndef ICEE_PURE_CLIENT void Ice::Connection::activate() { @@ -77,6 +77,7 @@ Ice::Connection::activate() setState(StateActive); } +#ifndef ICEE_PURE_CLIENT void Ice::Connection::hold() { @@ -134,6 +135,11 @@ Ice::Connection::close(bool force) #endif setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + + // + // TODO: If blocking model we should call readStream() to wait for + // the connection closure from the server? + // } } @@ -292,302 +298,209 @@ Ice::Connection::waitUntilFinished() #endif } - -Int -Ice::Connection::fillRequestId(BasicStream* os) +void +Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) { - // - // Create a new unique request ID. - // - Int requestId = _nextRequestId++; - if(requestId <= 0) + bool requestSent = false; + try { - _nextRequestId = 1; - requestId = _nextRequestId++; - } + Lock sendSync(_sendMonitor); + if(!_transceiver) + { + assert(_exception.get()); + _exception->ice_throw(); + } - // - // Fill in the request ID. - // - Byte* dest = &(os->b[0]) + headerSize; + Int requestId; + if(out) + { + // + // Create a new unique request ID. + // + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + Byte* dest = &(os->b[0]) + headerSize; #ifdef ICE_BIG_ENDIAN - const Byte* src = reinterpret_cast<const Byte*>(&requestId) + sizeof(Ice::Int) - 1; - *dest++ = *src--; - *dest++ = *src--; - *dest++ = *src--; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&requestId) + sizeof(Ice::Int) - 1; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest = *src; #else - const Byte* src = reinterpret_cast<const Byte*>(&requestId); - *dest++ = *src++; - *dest++ = *src++; - *dest++ = *src++; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&requestId); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; #endif - return requestId; -} - -void -Ice::Connection::sendRequest(BasicStream* os) -{ - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } +#ifndef ICEE_PURE_BLOCKING_CLIENT +#ifdef ICEE_BLOCKING_CLIENT + if(!_blocking) + { +#endif + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); +#ifdef ICEE_BLOCKING_CLIENT + } +#endif +#endif + } - const Int sz = static_cast<Int>(os->b.size()); - Byte* dest = &(os->b[0]) + 10; + const Int sz = static_cast<Int>(os->b.size()); + Byte* dest = &(os->b[0]) + 10; #ifdef ICE_BIG_ENDIAN - const Byte* src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1; - *dest++ = *src--; - *dest++ = *src--; - *dest++ = *src--; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest = *src; #else - const Byte* src = reinterpret_cast<const Byte*>(&sz); - *dest++ = *src++; - *dest++ = *src++; - *dest++ = *src++; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&sz); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; #endif - // - // Send the request. - // - os->i = os->b.begin(); - traceRequest("sending request", *os, _logger, _traceLevels); - _transceiver->write(*os); -} - -#ifdef ICEE_BLOCKING_CLIENT - -void -Ice::Connection::sendBlockingRequest(BasicStream* os, Outgoing* out) -{ - Int requestId; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(_exception.get()) - { - _exception->ice_throw(); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Fill in request id if it is a twoway call. - // - if(out) - { - requestId = fillRequestId(os); - } - } - - try - { - { - IceUtil::Mutex::Lock sendSync(_sendMutex); - sendRequest(os); - - if(out) - { - os->reset(); - readStream(*os); - } + // + // Send the request. + // + os->i = os->b.begin(); + if(_traceLevels->protocol >= 1) + { + traceRequest("sending request", *os, _logger, _traceLevels); } + _transceiver->write(*os); + requestSent = true; - if(out) + if(!out) + { + return; + } + +#ifdef ICEE_BLOCKING_CLIENT +#ifndef ICEE_PURE_BLOCKING_CLIENT + if(_blocking) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(_state != StateClosed) - { -#ifndef ICEE_PURE_CLIENT - Int invokeNum = 0; - parseMessage(*os, requestId, invokeNum); -#else - parseMessage(*os, requestId); #endif - } - // - // parseMessage() can close the connection, so we must - // check for closed state again. + // Re-use the stream for reading the reply. // - if(_state == StateClosed) + os->reset(); + + Int receivedRequestId = 0; +#ifndef ICEE_PURE_CLIENT + Int invokeNum = 0; + readStreamAndParseMessage(*os, receivedRequestId, invokeNum); + if(invokeNum > 0) { - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - } - - _transceiver = 0; - _exception->ice_throw(); + throw UnknownMessageException(__FILE__, __LINE__); + } + else if(requestId != receivedRequestId) + { + throw UnknownRequestIdException(__FILE__, __LINE__); + } +#else + readStreamAndParseMessage(*os, receivedRequestId); + if(requestId != receivedRequestId) + { + throw UnknownRequestIdException(__FILE__, __LINE__); } - } - } - catch(const LocalException& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); - _exception->ice_throw(); - } -} - #endif - + out->finished(*os); #ifndef ICEE_PURE_BLOCKING_CLIENT - -void -Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) -{ - Int requestId; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - if(_exception.get()) - { - _exception->ice_throw(); - } - - assert(_state > StateNotValidated); - assert(_state < StateClosing); - - // - // Only add to the request map if this is a twoway call. - // - if(out) - { - requestId = fillRequestId(os); - - // - // Add to the requests map. - // - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); - } - } - - bool timedOut = false; - try - { - { - IceUtil::Mutex::Lock sendSync(_sendMutex); - sendRequest(os); } - - if(out) + else { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // Wait until the request has completed, or until the - // request times out. - // - Int tout = timeout(); +#endif +#endif +#ifndef ICEE_PURE_BLOCKING_CLIENT + // + // Wait until the request has completed, or until the request times out. + // + Int tout = timeout(); IceUtil::Time expireTime; if(tout > 0) { - expireTime = IceUtil::Time::now() + IceUtil::Time::milliSeconds(tout); + expireTime = IceUtil::Time::now() + IceUtil::Time::milliSeconds(tout); } - while(out->state() == Outgoing::StateInProgress && !timedOut) - { - if(tout > 0) - { + + while(out->state() == Outgoing::StateInProgress) + { + if(tout > 0) + { IceUtil::Time now = IceUtil::Time::now(); if(now < expireTime) { - timedWait(expireTime - now); + _sendMonitor.timedWait(expireTime - now); } - - // + + // // Make sure we woke up because of timeout and not another response. // - if(out->state() == Outgoing::StateInProgress && IceUtil::Time::now() > expireTime) - { - timedOut = true; - } - } - else - { - wait(); - } - } + if(out->state() == Outgoing::StateInProgress && IceUtil::Time::now() > expireTime) + { + break; + } + } + else + { + _sendMonitor.wait(); + } + } + + // + // If the outgoing is still not finished, there was a timeout + // so we close the connection and wait until the outgoing gets + // notified of the connection closure. + // + if(out->state() == Outgoing::StateInProgress) + { + setState(StateClosed, TimeoutException(__FILE__, __LINE__)); + while(out->state() == Outgoing::StateInProgress) + { + _sendMonitor.wait(); + } + } +#endif +#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) } +#endif } catch(const LocalException& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); - - if(out) + if(requestSent) { // - // If the request has already been removed from the - // request map, we are out of luck. It would mean that - // finished() has been called already, and therefore the - // exception has been set using the Outgoing::finished() - // callback. In this case, we cannot throw the exception - // here, because we must not both raise an exception and - // have Outgoing::finished() called with an - // exception. This means that in some rare cases, a - // request will not be retried even though it could. But I - // honestly don't know how I could avoid this, without a - // very elaborate and complex design, which would be bad - // for performance. + // If the request has been sent we don't throw but instead + // notify the outgoing of the connection. Throwing + // directly would cause the client to retry and would + // violate the "at-most-once" semantics. // - map<Int, Outgoing*>::iterator p = _requests.find(requestId); - if(p != _requests.end()) - { - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - - _exception->ice_throw(); - } + out->finished(*_exception.get()); } else { + // + // The request wasn't sent, we can safely retry the invocation + // without violating "at-most-once". + // _exception->ice_throw(); } } - - if(timedOut) - { - // - // Must be called outside the synchronization of this - // object. - // - exception(TimeoutException(__FILE__, __LINE__)); - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We must wait until the exception has propagted - // back to the Outgoing object. - // - while(out->state() == Outgoing::StateInProgress) - { - wait(); - } - } } -#endif - #ifdef ICEE_HAS_BATCH void @@ -711,7 +624,7 @@ Ice::Connection::flushBatchRequests() try { - IceUtil::Mutex::Lock sendSync(_sendMutex); + Lock sendSync(_sendMonitor); if(!_transceiver) // Has the transceiver already been closed? { @@ -757,7 +670,10 @@ Ice::Connection::flushBatchRequests() // Send the batch request. // _batchStream.i = _batchStream.b.begin(); - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + if(_traceLevels->protocol >= 1) + { + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + } _transceiver->write(_batchStream); } catch(const LocalException& ex) @@ -796,7 +712,7 @@ Ice::Connection::sendResponse(BasicStream* os) { try { - IceUtil::Mutex::Lock sendSync(_sendMutex); + Lock sendSync(_sendMonitor); if(!_transceiver) // Has the transceiver already been closed? { @@ -825,7 +741,10 @@ Ice::Connection::sendResponse(BasicStream* os) // Send the reply. // os->i = os->b.begin(); - traceReply("sending reply", *os, _logger, _traceLevels); + if(_traceLevels->protocol >= 1) + { + traceReply("sending reply", *os, _logger, _traceLevels); + } _transceiver->write(*os); } catch(const LocalException& ex) @@ -841,6 +760,7 @@ Ice::Connection::sendResponse(BasicStream* os) try { + assert(_dispatchCount > 0); if(--_dispatchCount == 0) { notifyAll(); @@ -867,6 +787,7 @@ Ice::Connection::sendNoResponse() try { + assert(_dispatchCount > 0); if(--_dispatchCount == 0) { notifyAll(); @@ -891,14 +812,6 @@ Ice::Connection::endpoint() const return _endpoint; // No mutex protection necessary, _endpoint is immutable. } -#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) -bool -Ice::Connection::blocking() const -{ - return _blocking; -} -#endif - #ifndef ICEE_PURE_CLIENT void @@ -937,8 +850,6 @@ Ice::Connection::getAdapter() const return _in.getAdapter(); } -#endif - ObjectPrx Ice::Connection::createProxy(const Identity& ident) const { @@ -953,12 +864,7 @@ Ice::Connection::createProxy(const Identity& ident) const return _instance->proxyFactory()->referenceToProxy(ref); } -void -Ice::Connection::exception(const LocalException& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); -} +#endif string Ice::Connection::type() const @@ -984,40 +890,40 @@ Ice::Connection::Connection(const InstancePtr& instance, const EndpointPtr& endpoint, const ObjectAdapterPtr& adapter) : #else -Ice::Connection::Connection(const InstancePtr& instance, - const TransceiverPtr& transceiver, - const EndpointPtr& endpoint) : + Ice::Connection::Connection(const InstancePtr& instance, + const TransceiverPtr& transceiver, + const EndpointPtr& endpoint) : #endif - _instance(instance), - _transceiver(transceiver), - _desc(transceiver->toString()), - _type(transceiver->type()), - _endpoint(endpoint), - _logger(_instance->logger()), // Cached for better performance. - _traceLevels(_instance->traceLevels()), // Cached for better performance. - _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0), - _requestHdr(headerSize + sizeof(Int), 0), + _instance(instance), + _transceiver(transceiver), + _desc(transceiver->toString()), + _type(transceiver->type()), + _endpoint(endpoint), + _logger(_instance->logger()), // Cached for better performance. + _traceLevels(_instance->traceLevels()), // Cached for better performance. + _warn(_instance->properties()->getPropertyAsInt("Ice.Warn.Connections") > 0), + _requestHdr(headerSize + sizeof(Int), 0), #ifndef ICEE_PURE_CLIENT - _replyHdr(headerSize, 0), + _replyHdr(headerSize, 0), + _in(_instance.get(), this, _stream, adapter), #endif #ifndef ICEE_PURE_BLOCKING_CLIENT - _nextRequestId(1), - _requestsHint(_requests.end()), - _stream(_instance.get(), _instance->messageSizeMax()), -#endif -#ifndef ICEE_PURE_CLIENT - _in(_instance.get(), this, _stream, adapter), + _stream(_instance.get(), _instance->messageSizeMax()), #endif #ifdef ICEE_HAS_BATCH - _requestBatchHdr(headerSize + sizeof(Int), 0), - _batchStream(_instance.get(), _instance->messageSizeMax()), - _batchStreamInUse(false), - _batchRequestNum(0), + _requestBatchHdr(headerSize + sizeof(Int), 0), + _batchStream(_instance.get(), _instance->messageSizeMax()), + _batchStreamInUse(false), + _batchRequestNum(0), +#endif + _dispatchCount(0), + _state(StateNotValidated), + _stateTime(IceUtil::Time::now()), + _nextRequestId(1) +#ifndef ICEE_PURE_BLOCKING_CLIENT + , _requestsHint(_requests.end()) #endif - _dispatchCount(0), - _state(StateNotValidated), - _stateTime(IceUtil::Time::now()) { #if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) # ifdef ICEE_PURE_CLIENT @@ -1131,8 +1037,8 @@ Ice::Connection::~Connection() assert(_state == StateClosed); assert(!_transceiver); -#ifndef ICEE_PURE_BLOCKING_CLIENT assert(_dispatchCount == 0); +#ifndef ICEE_PURE_BLOCKING_CLIENT assert(!_threadPerConnection); #endif } @@ -1196,7 +1102,10 @@ Ice::Connection::validate() os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). os.write(headerSize); // Message size. os.i = os.b.begin(); - traceHeader("sending validate connection", os, _logger, _traceLevels); + if(_traceLevels->protocol >= 1) + { + traceHeader("sending validate connection", os, _logger, _traceLevels); + } try { _transceiver->writeWithTimeout(os, timeout); @@ -1273,7 +1182,10 @@ Ice::Connection::validate() { throw IllegalMessageSizeException(__FILE__, __LINE__); } - traceHeader("received validate connection", is, _logger, _traceLevels); + if(_traceLevels->protocol >= 1) + { + traceHeader("received validate connection", is, _logger, _traceLevels); + } } } catch(const LocalException& ex) @@ -1285,23 +1197,9 @@ Ice::Connection::validate() } #ifdef ICEE_PURE_CLIENT - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We start out in active state. - // - setState(StateActive); - } + activate(); #else - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We start out in holding state. - // - setState(StateHolding); - } + hold(); #endif } @@ -1371,70 +1269,91 @@ Ice::Connection::setState(State state) switch(state) { - case StateNotValidated: - { - assert(false); - break; - } + case StateNotValidated: + { + assert(false); + break; + } - case StateActive: - { - // - // Can only switch from holding or not validated to - // active. - // + case StateActive: + { + // + // Can only switch from holding or not validated to + // active. + // #ifdef ICEE_PURE_CLIENT - if(_state != StateNotValidated) - { - return; - } + if(_state != StateNotValidated) + { + return; + } #else - if(_state != StateHolding && _state != StateNotValidated) - { - return; - } -#endif - break; + if(_state != StateHolding && _state != StateNotValidated) + { + return; } +#endif + break; + } #ifndef ICEE_PURE_CLIENT - case StateHolding: + case StateHolding: + { + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) - { - return; - } - break; + return; } + break; + } #endif - case StateClosing: + case StateClosing: + { + // + // Can't change back from closed. + // + if(_state == StateClosed) { - // - // Can't change back from closed. - // - if(_state == StateClosed) - { - return; - } - break; + return; } + break; + } - case StateClosed: + case StateClosed: + { + // + // We shutdown both for reading and writing. This will + // unblock and read call with an exception. The thread + // per connection then closes the transceiver. + // + _transceiver->shutdownReadWrite(); + + // + // In blocking mode, we close the transceiver now. + // +#ifdef ICEE_BLOCKING_CLIENT +# ifndef ICEE_PURE_BLOCKING_CLIENT + if(_blocking) { - // - // If we are in thread per connection mode, we - // shutdown both for reading and writing. This will - // unblock and read call with an exception. The thread - // per connection then closes the transceiver. - // - _transceiver->shutdownReadWrite(); - break; +# endif + Lock sync(_sendMonitor); + try + { + _transceiver->close(); + } + catch(const Ice::LocalException&) + { + } + _transceiver = 0; +# ifndef ICEE_PURE_BLOCKING_CLIENT } +# endif +#endif + break; + } } _state = state; @@ -1447,30 +1366,20 @@ Ice::Connection::setState(State state) try { initiateShutdown(); - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - } #ifdef ICEE_BLOCKING_CLIENT - if(_state != StateClosed # ifndef ICEE_PURE_BLOCKING_CLIENT - && _blocking + if(_blocking) # endif - ) - { - try - { - _transceiver->close(); - } - catch(const LocalException&) { + setState(StateClosed); } - _transceiver = 0; - _state = StateClosed; - } #endif + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } } } @@ -1480,7 +1389,7 @@ Ice::Connection::initiateShutdown() const assert(_state == StateClosing); assert(_dispatchCount == 0); - IceUtil::Mutex::Lock sendSync(_sendMutex); + Lock sendSync(_sendMonitor); // // Before we shut down, we send a close connection message. @@ -1502,7 +1411,10 @@ Ice::Connection::initiateShutdown() const // Send the message. // os.i = os.b.begin(); - traceHeader("sending close connection", os, _logger, _traceLevels); + if(_traceLevels->protocol >= 1) + { + traceHeader("sending close connection", os, _logger, _traceLevels); + } _transceiver->write(os); // @@ -1517,361 +1429,151 @@ Ice::Connection::initiateShutdown() const } void -Ice::Connection::parseMessage(BasicStream& stream, Int& requestId #ifndef ICEE_PURE_CLIENT - ,Int& invokeNum +Ice::Connection::readStreamAndParseMessage(IceInternal::BasicStream& stream, Int& requestId, Int& invokeNum) +#else +Ice::Connection::readStreamAndParseMessage(IceInternal::BasicStream& stream, Int& requestId) #endif -) { - assert(_state > StateNotValidated && _state < StateClosed); + stream.b.resize(headerSize); + stream.i = stream.b.begin(); + _transceiver->read(stream); - try + ptrdiff_t pos = stream.i - stream.b.begin(); + assert(pos >= headerSize); + stream.i = stream.b.begin(); + const Ice::Byte* header; + stream.readBlob(header, headerSize); + if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3]) { - // - // We don't need to check magic and version here. This has - // already been done by the ThreadPerConnection, - // which provides us with the stream. - // - assert(stream.i == stream.b.end()); - stream.i = stream.b.begin() + 8; - Byte messageType; - stream.read(messageType); - - Byte compress; - stream.read(compress); - if(compress == 2) - { - FeatureNotSupportedException ex(__FILE__, __LINE__); - ex.unsupportedFeature = "compression"; - throw ex; - } - stream.i = stream.b.begin() + headerSize; - -#ifdef ICEE_BLOCKING_CLIENT -# ifndef ICEE_PURE_BLOCKING_CLIENT - if(_blocking) - { -# endif - switch(messageType) - { - case closeConnectionMsg: - { - traceHeader("received close connection", stream, _logger, _traceLevels); - setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); - break; - } - - case replyMsg: - { - traceReply("received reply", stream, _logger, _traceLevels); - - Int reqId; - stream.read(reqId); - if(reqId != requestId) - { - throw UnknownRequestIdException(__FILE__, __LINE__); - } - break; - } - - - default: - { - traceHeader("received unexpected message\n" - "(invalid, closing connection)", - stream, _logger, _traceLevels); - throw UnknownMessageException(__FILE__, __LINE__); - break; - } - } -# ifndef ICEE_PURE_BLOCKING_CLIENT - } - else -# endif -#endif -#ifndef ICEE_PURE_BLOCKING_CLIENT - { - switch(messageType) - { - case closeConnectionMsg: - { - traceHeader("received close connection", stream, _logger, _traceLevels); - setState(StateClosed, CloseConnectionException(__FILE__, __LINE__)); - break; - } - -#ifndef ICEE_PURE_CLIENT - case requestMsg: - { - if(_state == StateClosing) - { - traceRequest("received request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - traceRequest("received request", stream, _logger, _traceLevels); - stream.read(requestId); - invokeNum = 1; - ++_dispatchCount; - } - break; - } - - case requestBatchMsg: - { - if(_state == StateClosing) - { - traceBatchRequest("received batch request during closing\n" - "(ignored by server, client will retry)", - stream, _logger, _traceLevels); - } - else - { - traceBatchRequest("received batch request", stream, _logger, _traceLevels); - stream.read(invokeNum); - if(invokeNum < 0) - { - invokeNum = 0; - throw NegativeSizeException(__FILE__, __LINE__); - } - _dispatchCount += invokeNum; - } - break; - } -#endif - - case replyMsg: - { - traceReply("received reply", stream, _logger, _traceLevels); - - stream.read(requestId); - - map<Int, Outgoing*>::iterator p = _requests.end(); - - if(_requestsHint != _requests.end()) - { - if(_requestsHint->first == requestId) - { - p = _requestsHint; - } - } - - if(p == _requests.end()) - { - p = _requests.find(requestId); - } - - if(p == _requests.end()) - { - throw UnknownRequestIdException(__FILE__, __LINE__); - } - - if(p != _requests.end()) - { - p->second->finished(stream); - - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - notifyAll(); // Wake up threads waiting in sendRequest() - } - - break; - } - - case validateConnectionMsg: - { - traceHeader("received validate connection", stream, _logger, _traceLevels); - if(_warn) - { - Warning out(_logger); - out << "ignoring unexpected validate connection message:\n" << _desc; - } - break; - } - - default: - { - traceHeader("received unknown message\n" - "(invalid, closing connection)", - stream, _logger, _traceLevels); - throw UnknownMessageException(__FILE__, __LINE__); - break; - } - } - } -#endif + BadMagicException ex(__FILE__, __LINE__); + ex.badMagic = Ice::ByteSeq(&header[0], &header[0] + sizeof(magic)); + throw ex; } - catch(const LocalException& ex) + if(header[4] != protocolMajor) { - setState(StateClosed, ex); + UnsupportedProtocolException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(header[4]); + ex.badMinor = static_cast<unsigned char>(header[5]); + ex.major = static_cast<unsigned char>(protocolMajor); + ex.minor = static_cast<unsigned char>(protocolMinor); + throw ex; + } + if(header[6] != encodingMajor) + { + UnsupportedEncodingException ex(__FILE__, __LINE__); + ex.badMajor = static_cast<unsigned char>(header[6]); + ex.badMinor = static_cast<unsigned char>(header[7]); + ex.major = static_cast<unsigned char>(encodingMajor); + ex.minor = static_cast<unsigned char>(encodingMinor); + throw ex; + } + const Byte messageType = header[8]; + if(header[9] == 2) + { + FeatureNotSupportedException ex(__FILE__, __LINE__); + ex.unsupportedFeature = "compression"; + throw ex; } -} - -#ifndef ICEE_PURE_CLIENT -void -Ice::Connection::invokeAll(Int invokeNum, Int requestId) -{ - // - // Note: In contrast to other private or protected methods, this - // operation must be called *without* the mutex locked. - // - - try + + Int size; + stream.i -= sizeof(Int); + stream.read(size); + if(size < headerSize) { - while(invokeNum > 0) - { - // - // Prepare the invocation. - // - bool response = requestId != 0; - - // - // Prepare the response if necessary. - // - if(response) - { - assert(invokeNum == 1); // No further invocations if a response is expected. - - BasicStream* os = _in.os(); - os->writeBlob(&_replyHdr[0], headerSize); - - // - // Add the request ID. - // - os->write(requestId); - } - - _in.invoke(response); - - --invokeNum; - } + throw IllegalMessageSizeException(__FILE__, __LINE__); } - catch(const LocalException& ex) + if(size > static_cast<Int>(_instance->messageSizeMax())) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); + throw MemoryLimitException(__FILE__, __LINE__); } - catch(const std::exception& ex) + if(size > static_cast<Int>(stream.b.size())) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - UnknownException uex(__FILE__, __LINE__); - uex.unknown = string("std::exception: ") + ex.what(); - setState(StateClosed, uex); + stream.b.resize(size); } - catch(...) + stream.i = stream.b.begin() + pos; + + if(stream.i != stream.b.end()) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - UnknownException uex(__FILE__, __LINE__); - uex.unknown = "unknown c++ exception"; - setState(StateClosed, uex); + _transceiver->read(stream); } - // - // If invoke() above raised an exception, and therefore neither - // sendResponse() nor sendNoResponse() has been called, then we - // must decrement _dispatchCount here. - // - if(invokeNum > 0) + assert(stream.i == stream.b.end()); + stream.i = stream.b.begin() + headerSize; + + switch(messageType) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_dispatchCount > 0); - _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); - if(_dispatchCount == 0) + case closeConnectionMsg: + { + if(_traceLevels->protocol >= 1) { - notifyAll(); + traceHeader("received close connection", stream, _logger, _traceLevels); } + throw CloseConnectionException(__FILE__, __LINE__); + break; } -} -#endif - -void -Ice::Connection::readStream(IceInternal::BasicStream& stream) -{ - try + + case replyMsg: { - stream.b.resize(headerSize); - stream.i = stream.b.begin(); - _transceiver->read(stream); - - ptrdiff_t pos = stream.i - stream.b.begin(); - assert(pos >= headerSize); - stream.i = stream.b.begin(); - Ice::Byte m[4]; - stream.read(m[0]); - stream.read(m[1]); - stream.read(m[2]); - stream.read(m[3]); - if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) + if(_traceLevels->protocol >= 1) { - BadMagicException ex(__FILE__, __LINE__); - ex.badMagic = Ice::ByteSeq(&m[0], &m[0] + sizeof(m)); - throw ex; + traceReply("received reply", stream, _logger, _traceLevels); } - Byte pMajor; - Byte pMinor; - stream.read(pMajor); - stream.read(pMinor); - if(pMajor != protocolMajor) + stream.read(requestId); + break; + } + +#ifndef ICEE_PURE_CLIENT + case requestMsg: + { + if(_traceLevels->protocol >= 1) { - UnsupportedProtocolException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(pMajor); - ex.badMinor = static_cast<unsigned char>(pMinor); - ex.major = static_cast<unsigned char>(protocolMajor); - ex.minor = static_cast<unsigned char>(protocolMinor); - throw ex; + traceRequest("received request", stream, _logger, _traceLevels); } - Byte eMajor; - Byte eMinor; - stream.read(eMajor); - stream.read(eMinor); - if(eMajor != encodingMajor) + stream.read(requestId); + invokeNum = 1; + break; + } + + case requestBatchMsg: + { + if(_traceLevels->protocol >= 1) { - UnsupportedEncodingException ex(__FILE__, __LINE__); - ex.badMajor = static_cast<unsigned char>(eMajor); - ex.badMinor = static_cast<unsigned char>(eMinor); - ex.major = static_cast<unsigned char>(encodingMajor); - ex.minor = static_cast<unsigned char>(encodingMinor); - throw ex; + traceBatchRequest("received batch request", stream, _logger, _traceLevels); } - Byte messageType; - stream.read(messageType); - Byte compress; - stream.read(compress); - Int size; - stream.read(size); - if(size < headerSize) + stream.read(invokeNum); + if(invokeNum < 0) { - throw IllegalMessageSizeException(__FILE__, __LINE__); + invokeNum = 0; + throw NegativeSizeException(__FILE__, __LINE__); } - if(size > static_cast<Int>(_instance->messageSizeMax())) + break; + } +#endif + + case validateConnectionMsg: + { + if(_traceLevels->protocol >= 1) { - throw MemoryLimitException(__FILE__, __LINE__); + traceHeader("received validate connection", stream, _logger, _traceLevels); } - if(size > static_cast<Int>(stream.b.size())) + if(_warn) { - stream.b.resize(size); + Warning out(_logger); + out << "ignoring unexpected validate connection message:\n" << _desc; } - stream.i = stream.b.begin() + pos; - - if(stream.i != stream.b.end()) + break; + } + + default: + { + if(_traceLevels->protocol >= 1) { - _transceiver->read(stream); - assert(stream.i == stream.b.end()); + traceHeader("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels); } + throw UnknownMessageException(__FILE__, __LINE__); + break; } - catch(const Ice::LocalException& ex) - { - exception(ex); } } @@ -1891,16 +1593,10 @@ Ice::Connection::run() } catch(const LocalException&) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - + Lock sync(*this); assert(_state == StateClosed); - - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); - + + Lock sendSync(_sendMonitor); try { _transceiver->close(); @@ -1914,11 +1610,8 @@ Ice::Connection::run() notifyAll(); return; } - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateActive); - } + + activate(); bool closed = false; @@ -1927,17 +1620,114 @@ Ice::Connection::run() Int requestId = 0; #ifndef ICEE_PURE_CLIENT Int invokeNum = 0; - _in.os()->resize(0); - _in.is()->resize(0); + _in.os()->reset(); + _in.is()->reset(); #endif - readStream(_stream); - - auto_ptr<LocalException> exception; - map<Int, Outgoing*> requests; + // + // Read and parse the next message. We don't need to lock the + // send monitor here as we have the guarantee that + // _transceiver won't be set to 0 by another thread, the + // thread per connection is the only thread that can set + // _transceiver to 0. + // + try + { +#ifndef ICEE_PURE_CLIENT + readStreamAndParseMessage(_stream, requestId, invokeNum); +#else + readStreamAndParseMessage(_stream, requestId); +#endif + } + catch(const Ice::LocalException& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + } { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state != StateClosed) + { +#ifndef ICEE_PURE_CLIENT + if(invokeNum > 0) // We received a request or a batch request + { + if(_state < StateClosing) + { + _dispatchCount += invokeNum; + } + else if(invokeNum == 1) + { + invokeNum = 0; + if(_traceLevels->protocol >= 1) + { + traceRequest("received request during closing\n" + "(ignored by server, client will retry)", + _stream, _logger, _traceLevels); + } + } + else if(invokeNum > 1) + { + invokeNum = 0; + if(_traceLevels->protocol >= 1) + { + traceBatchRequest("received batch request during closing\n" + "(ignored by server, client will retry)", + _stream, _logger, _traceLevels); + } + } + } + else +#endif + if(requestId > 0) + { + // + // The message is a reply, we search the Outgoing object waiting + // for this reply and pass it the stream before to notify the + // send monitor to wake up threads waiting for replies. + // + try + { + Lock sync(_sendMonitor); + + map<Int, Outgoing*>::iterator p = _requests.end(); + if(p != _requestsHint) + { + if(_requestsHint->first == requestId) + { + p = _requestsHint; + } + } + + if(p == _requests.end()) + { + p = _requests.find(requestId); + } + + if(p == _requests.end()) + { + throw UnknownRequestIdException(__FILE__, __LINE__); + } + + p->second->finished(_stream); + + if(p == _requestsHint) + { + _requests.erase(p++); + _requestsHint = p; + } + else + { + _requests.erase(p); + } + _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest() + } + catch(const Ice::LocalException& ex) + { + setState(StateClosed, ex); + } + } + } #ifndef ICEE_PURE_CLIENT while(_state == StateHolding) @@ -1945,51 +1735,32 @@ Ice::Connection::run() wait(); } #endif - if(_state != StateClosed) - { -#ifndef ICEE_PURE_CLIENT - parseMessage(_stream, requestId, invokeNum); -#else - parseMessage(_stream, requestId); -#endif - } - // - // parseMessage() can close the connection, so we must - // check for closed state again. - // if(_state == StateClosed) { - // - // We must make sure that nobody is sending when we close - // the transceiver. - // - IceUtil::Mutex::Lock sendSync(_sendMutex); - + Lock sync(_sendMonitor); try { _transceiver->close(); } - catch(const LocalException& ex) + catch(const LocalException&) { - exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); } - _transceiver = 0; notifyAll(); - - // - // We cannot simply return here. We have to make sure - // that all requests are notified about the closed - // connection below. - // - closed = true; + closed = true; } if(_state == StateClosed || _state == StateClosing) { - requests.swap(_requests); - _requestsHint = _requests.end(); + Lock sync(_sendMonitor); + assert(_exception.get()); + for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) + { + p->second->finished(*_exception.get()); // The exception is immutable at this point. + } + _requests.clear(); + _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest() } } @@ -1999,24 +1770,69 @@ Ice::Connection::run() // so that nested calls are possible. // #ifndef ICEE_PURE_CLIENT - invokeAll(invokeNum, requestId); -#endif - if(requests.size() != 0) + try + { + for(;invokeNum > 0; --invokeNum) + { + // + // Prepare the response if necessary. + // + const bool response = requestId != 0; + if(response) + { + assert(invokeNum == 1); // No further invocations if a response is expected. + + // + // Add the reply header and request id. + // + BasicStream* os = _in.os(); + os->writeBlob(&_replyHdr[0], headerSize); + os->write(requestId); + } + + // + // Dispatch the incoming request. + // + _in.invoke(response); + } + } + catch(const LocalException& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + } + catch(const std::exception& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + UnknownException uex(__FILE__, __LINE__); + uex.unknown = string("std::exception: ") + ex.what(); + setState(StateClosed, uex); + } + catch(...) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + UnknownException uex(__FILE__, __LINE__); + uex.unknown = "unknown c++ exception"; + setState(StateClosed, uex); + } - for(map<Int, Outgoing*>::iterator p = requests.begin(); p != requests.end(); ++p) + // + // If invoke() above raised an exception, and therefore neither + // sendResponse() nor sendNoResponse() has been called, then we + // must decrement _dispatchCount here. + // + if(invokeNum > 0) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_dispatchCount > 0); + _dispatchCount -= invokeNum; + assert(_dispatchCount >= 0); + if(_dispatchCount == 0) { - p->second->finished(*_exception.get()); // The exception is immutable at this point. + notifyAll(); } - notifyAll(); // Wake up threads waiting in sendRequest() } - - if(exception.get()) - { - assert(closed); - exception->ice_throw(); - } +#endif } } |