diff options
author | Mark Spruiell <mes@zeroc.com> | 2008-06-03 19:32:20 -0700 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2008-06-03 19:32:20 -0700 |
commit | 3d649bed4328992f41f567136025f58a019a5159 (patch) | |
tree | 470be901fbbfe5c6cd4269884412b0d36b48dc92 /cppe/src/IceE/Connection.cpp | |
parent | local interface fixes for slice2javae (diff) | |
download | ice-3d649bed4328992f41f567136025f58a019a5159.tar.bz2 ice-3d649bed4328992f41f567136025f58a019a5159.tar.xz ice-3d649bed4328992f41f567136025f58a019a5159.zip |
Various Ice-E fixes:
- Bug fix in slice2javae for local interfaces/classes
- Added Ice.LocalObjectHolder
- Reviewed Java/C++ demos and aligned with Ice
- Source code clean up (removed tabs, etc.)
Diffstat (limited to 'cppe/src/IceE/Connection.cpp')
-rw-r--r-- | cppe/src/IceE/Connection.cpp | 1888 |
1 files changed, 944 insertions, 944 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp index 122f61a6aaf..f133b99f572 100644 --- a/cppe/src/IceE/Connection.cpp +++ b/cppe/src/IceE/Connection.cpp @@ -41,13 +41,13 @@ Ice::Connection::waitForValidation() while(_state == StateNotValidated) { - wait(); + wait(); } if(_state >= StateClosing) { - assert(_exception.get()); - _exception->ice_throw(); + assert(_exception.get()); + _exception->ice_throw(); } } @@ -75,18 +75,18 @@ Ice::Connection::destroy(DestructionReason reason) switch(reason) { #ifndef ICEE_PURE_CLIENT - case ObjectAdapterDeactivated: - { - setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); - break; - } + case ObjectAdapterDeactivated: + { + setState(StateClosing, ObjectAdapterDeactivatedException(__FILE__, __LINE__)); + break; + } #endif - case CommunicatorDestroyed: - { - setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); - break; - } + case CommunicatorDestroyed: + { + setState(StateClosing, CommunicatorDestroyedException(__FILE__, __LINE__)); + break; + } } } @@ -97,25 +97,25 @@ Ice::Connection::close(bool force) if(force) { - setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); + setState(StateClosed, ForcedCloseConnectionException(__FILE__, __LINE__)); } else { #ifndef ICEE_PURE_BLOCKING_CLIENT - // - // If we do a graceful shutdown, then we wait until all - // outstanding requests have been completed. Otherwise, the - // CloseConnectionException will cause all outstanding - // requests to be retried, regardless of whether the server - // has processed them or not. - // - while(!_requests.empty()) - { - wait(); - } + // + // If we do a graceful shutdown, then we wait until all + // outstanding requests have been completed. Otherwise, the + // CloseConnectionException will cause all outstanding + // requests to be retried, regardless of whether the server + // has processed them or not. + // + while(!_requests.empty()) + { + wait(); + } #endif - setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); + setState(StateClosing, CloseConnectionException(__FILE__, __LINE__)); } } @@ -140,32 +140,32 @@ Ice::Connection::isFinished() const #endif { - // - // We can use trylock here, because as long as there are still - // threads operating in this connection object, connection - // destruction is considered as not yet finished. - // - IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); - - if(!sync.acquired()) - { - return false; - } + // + // We can use trylock here, because as long as there are still + // threads operating in this connection object, connection + // destruction is considered as not yet finished. + // + IceUtil::Monitor<IceUtil::Mutex>::TryLock sync(*this); + + if(!sync.acquired()) + { + return false; + } - if(_transceiver != 0 + if(_transceiver != 0 #ifndef ICEE_PURE_BLOCKING_CLIENT - || _dispatchCount != 0 || (_threadPerConnection && _threadPerConnection->isAlive()) + || _dispatchCount != 0 || (_threadPerConnection && _threadPerConnection->isAlive()) #endif - ) - { - return false; - } + ) + { + return false; + } - assert(_state == StateClosed); + assert(_state == StateClosed); #ifndef ICEE_PURE_BLOCKING_CLIENT - threadPerConnection = _threadPerConnection; - _threadPerConnection = 0; + threadPerConnection = _threadPerConnection; + _threadPerConnection = 0; #endif } @@ -200,7 +200,7 @@ Ice::Connection::waitUntilHolding() const while(_state < StateHolding || _dispatchCount > 0) { - wait(); + wait(); } } @@ -214,74 +214,74 @@ Ice::Connection::waitUntilFinished() #endif { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // We wait indefinitely until connection closing has been - // initiated. We also wait indefinitely until all outstanding - // requests are completed. Otherwise we couldn't guarantee - // that there are no outstanding calls when deactivate() is - // called on the servant locators. - // - while(_state < StateClosing || _dispatchCount > 0) - { - wait(); - } - - // - // Now we must wait until close() has been called on the - // transceiver. - // - while(_transceiver) - { - if(_state != StateClosed && _endpoint->timeout() >= 0) - { - IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); - IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(); - - if(waitTime > IceUtil::Time()) - { - // - // We must wait a bit longer until we close this - // connection. - // - if(!timedWait(waitTime)) - { - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); - } - } - else - { - // - // We already waited long enough, so let's close this - // connection! - // - setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); - } - - // - // No return here, we must still wait until close() is - // called on the _transceiver. - // - } - else - { - wait(); - } - } - - assert(_state == StateClosed); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + // + // We wait indefinitely until connection closing has been + // initiated. We also wait indefinitely until all outstanding + // requests are completed. Otherwise we couldn't guarantee + // that there are no outstanding calls when deactivate() is + // called on the servant locators. + // + while(_state < StateClosing || _dispatchCount > 0) + { + wait(); + } + + // + // Now we must wait until close() has been called on the + // transceiver. + // + while(_transceiver) + { + if(_state != StateClosed && _endpoint->timeout() >= 0) + { + IceUtil::Time timeout = IceUtil::Time::milliSeconds(_endpoint->timeout()); + IceUtil::Time waitTime = _stateTime + timeout - IceUtil::Time::now(); + + if(waitTime > IceUtil::Time()) + { + // + // We must wait a bit longer until we close this + // connection. + // + if(!timedWait(waitTime)) + { + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + } + } + else + { + // + // We already waited long enough, so let's close this + // connection! + // + setState(StateClosed, CloseTimeoutException(__FILE__, __LINE__)); + } + + // + // No return here, we must still wait until close() is + // called on the _transceiver. + // + } + else + { + wait(); + } + } + + assert(_state == StateClosed); #ifndef ICEE_PURE_BLOCKING_CLIENT - threadPerConnection = _threadPerConnection; - _threadPerConnection = 0; + threadPerConnection = _threadPerConnection; + _threadPerConnection = 0; #endif } #ifndef ICEE_PURE_BLOCKING_CLIENT if(threadPerConnection) { - threadPerConnection->getThreadControl().join(); + threadPerConnection->getThreadControl().join(); } #endif } @@ -292,194 +292,194 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) bool requestSent = false; try { - Lock sendSync(_sendMonitor); - if(!_transceiver) - { - assert(_exception.get()); - // - // If the connection is closed before we even have a chance - // to send our request, we always try to send the request - // again. - // - throw LocalExceptionWrapper(*_exception.get(), true); - } - - Int requestId; - if(out) - { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - Byte* dest = &(os->b[0]) + headerSize; + Lock sendSync(_sendMonitor); + if(!_transceiver) + { + assert(_exception.get()); + // + // If the connection is closed before we even have a chance + // to send our request, we always try to send the request + // again. + // + throw LocalExceptionWrapper(*_exception.get(), true); + } + + Int requestId; + if(out) + { + // + // Create a new unique request ID. + // + requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + Byte* dest = &(os->b[0]) + headerSize; #ifdef ICE_BIG_ENDIAN - const Byte* src = reinterpret_cast<const Byte*>(&requestId) + sizeof(Ice::Int) - 1; - *dest++ = *src--; - *dest++ = *src--; - *dest++ = *src--; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&requestId) + sizeof(Ice::Int) - 1; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest = *src; #else - const Byte* src = reinterpret_cast<const Byte*>(&requestId); - *dest++ = *src++; - *dest++ = *src++; - *dest++ = *src++; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&requestId); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; #endif #ifndef ICEE_PURE_BLOCKING_CLIENT - if(!_blocking) - { - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); - } + if(!_blocking) + { + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + } #endif - } + } - // - // Fill in the message size. - // - const Int sz = static_cast<Int>(os->b.size()); - Byte* dest = &(os->b[0]) + 10; + // + // Fill in the message size. + // + const Int sz = static_cast<Int>(os->b.size()); + Byte* dest = &(os->b[0]) + 10; #ifdef ICE_BIG_ENDIAN - const Byte* src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1; - *dest++ = *src--; - *dest++ = *src--; - *dest++ = *src--; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest = *src; #else - const Byte* src = reinterpret_cast<const Byte*>(&sz); - *dest++ = *src++; - *dest++ = *src++; - *dest++ = *src++; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&sz); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; #endif - - // - // Send the request. - // - os->i = os->b.begin(); - if(_traceLevels->protocol >= 1) - { - traceRequest("sending request", *os, _logger, _traceLevels); - } - _transceiver->write(*os); - requestSent = true; - - if(!out) - { - return; - } - + + // + // Send the request. + // + os->i = os->b.begin(); + if(_traceLevels->protocol >= 1) + { + traceRequest("sending request", *os, _logger, _traceLevels); + } + _transceiver->write(*os); + requestSent = true; + + if(!out) + { + return; + } + #ifndef ICEE_PURE_BLOCKING_CLIENT - if(_blocking) - { + if(_blocking) + { #endif - // - // Re-use the stream for reading the reply. - // - os->reset(); - - Int receivedRequestId = 0; + // + // Re-use the stream for reading the reply. + // + os->reset(); + + Int receivedRequestId = 0; #ifndef ICEE_PURE_CLIENT - Int invokeNum = 0; - readStreamAndParseMessage(*os, receivedRequestId, invokeNum); - if(invokeNum > 0) - { - throwUnknownMessageException(__FILE__, __LINE__); - } - else if(requestId != receivedRequestId) - { - throwUnknownRequestIdException(__FILE__, __LINE__); - } + Int invokeNum = 0; + readStreamAndParseMessage(*os, receivedRequestId, invokeNum); + if(invokeNum > 0) + { + throwUnknownMessageException(__FILE__, __LINE__); + } + else if(requestId != receivedRequestId) + { + throwUnknownRequestIdException(__FILE__, __LINE__); + } #else - readStreamAndParseMessage(*os, receivedRequestId); - if(requestId != receivedRequestId) - { - throwUnknownRequestIdException(__FILE__, __LINE__); - } + readStreamAndParseMessage(*os, receivedRequestId); + if(requestId != receivedRequestId) + { + throwUnknownRequestIdException(__FILE__, __LINE__); + } #endif - out->finished(*os); + out->finished(*os); #ifndef ICEE_PURE_BLOCKING_CLIENT - } - else - { - // - // Wait until the request has completed, or until the request times out. - // - Int tout = timeout(); - IceUtil::Time expireTime; - if(tout > 0) - { - expireTime = IceUtil::Time::now() + IceUtil::Time::milliSeconds(tout); - } - - while(out->state() == Outgoing::StateInProgress) - { - if(tout > 0) - { - IceUtil::Time now = IceUtil::Time::now(); - if(now < expireTime) - { - _sendMonitor.timedWait(expireTime - now); - } - - // - // Make sure we woke up because of timeout and not another response. - // - if(out->state() == Outgoing::StateInProgress && IceUtil::Time::now() > expireTime) - { - throw TimeoutException(__FILE__, __LINE__); - } - } - else - { - _sendMonitor.wait(); - } - } - } + } + else + { + // + // Wait until the request has completed, or until the request times out. + // + Int tout = timeout(); + IceUtil::Time expireTime; + if(tout > 0) + { + expireTime = IceUtil::Time::now() + IceUtil::Time::milliSeconds(tout); + } + + while(out->state() == Outgoing::StateInProgress) + { + if(tout > 0) + { + IceUtil::Time now = IceUtil::Time::now(); + if(now < expireTime) + { + _sendMonitor.timedWait(expireTime - now); + } + + // + // Make sure we woke up because of timeout and not another response. + // + if(out->state() == Outgoing::StateInProgress && IceUtil::Time::now() > expireTime) + { + throw TimeoutException(__FILE__, __LINE__); + } + } + else + { + _sendMonitor.wait(); + } + } + } #endif } catch(const LocalException& ex) { - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); - if(!requestSent) - { - _exception->ice_throw(); - } - } - - // - // If the request was already sent, we don't throw directly - // but instead we set the Outgoing object exception with - // finished(). Throwing directly would break "at-most-once" - // (see also comment in Outgoing.invoke()) - // - IceUtil::Monitor<IceUtil::Mutex>::Lock sendSync(_sendMonitor); + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + assert(_exception.get()); + if(!requestSent) + { + _exception->ice_throw(); + } + } + + // + // If the request was already sent, we don't throw directly + // but instead we set the Outgoing object exception with + // finished(). Throwing directly would break "at-most-once" + // (see also comment in Outgoing.invoke()) + // + IceUtil::Monitor<IceUtil::Mutex>::Lock sendSync(_sendMonitor); #ifndef ICEE_PURE_BLOCKING_CLIENT - if(_blocking) - { + if(_blocking) + { #endif - out->finished(ex); + out->finished(ex); #ifndef ICEE_PURE_BLOCKING_CLIENT - } - else - { - while(out->state() == Outgoing::StateInProgress) - { - _sendMonitor.wait(); // Wait for the thread to propagate the exception to the Outgoing object. - } - } + } + else + { + while(out->state() == Outgoing::StateInProgress) + { + _sendMonitor.wait(); // Wait for the thread to propagate the exception to the Outgoing object. + } + } #endif } } @@ -496,12 +496,12 @@ Ice::Connection::prepareBatchRequest(BasicStream* os) // while(_batchStreamInUse && !_exception.get()) { - wait(); + wait(); } if(_exception.get()) { - _exception->ice_throw(); + _exception->ice_throw(); } assert(_state > StateNotValidated); @@ -509,15 +509,15 @@ Ice::Connection::prepareBatchRequest(BasicStream* os) if(_batchStream.b.empty()) { - try - { - _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - ex.ice_throw(); - } + try + { + _batchStream.writeBlob(requestBatchHdr, sizeof(requestBatchHdr)); + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + ex.ice_throw(); + } } _batchStreamInUse = true; @@ -655,111 +655,111 @@ void Ice::Connection::flushBatchRequestsInternal(bool ignoreInUse) { { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(!ignoreInUse) { - while(_batchStreamInUse && !_exception.get()) - { - wait(); - } + while(_batchStreamInUse && !_exception.get()) + { + wait(); + } + } + + if(_exception.get()) + { + _exception->ice_throw(); } - - if(_exception.get()) - { - _exception->ice_throw(); - } - if(_batchStream.b.empty()) - { - return; // Nothing to do. - } + if(_batchStream.b.empty()) + { + return; // Nothing to do. + } - assert(_state > StateNotValidated); - assert(_state < StateClosing); + assert(_state > StateNotValidated); + assert(_state < StateClosing); - _batchStream.i = _batchStream.b.begin(); + _batchStream.i = _batchStream.b.begin(); - // - // Prevent that new batch requests are added while we are - // flushing. - // - _batchStreamInUse = true; + // + // Prevent that new batch requests are added while we are + // flushing. + // + _batchStreamInUse = true; } try { - Lock sendSync(_sendMonitor); + Lock sendSync(_sendMonitor); - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } + if(!_transceiver) // Has the transceiver already been closed? + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } - // - // Fill in the number of requests in the batch. - // - Byte* dest = &(_batchStream.b[0]) + headerSize; + // + // Fill in the number of requests in the batch. + // + Byte* dest = &(_batchStream.b[0]) + headerSize; #ifdef ICE_BIG_ENDIAN - const Byte* src = reinterpret_cast<const Byte*>(&_batchRequestNum) + sizeof(Ice::Int) - 1; - *dest++ = *src--; - *dest++ = *src--; - *dest++ = *src--; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&_batchRequestNum) + sizeof(Ice::Int) - 1; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest = *src; #else - const Byte* src = reinterpret_cast<const Byte*>(&_batchRequestNum); - *dest++ = *src++; - *dest++ = *src++; - *dest++ = *src++; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&_batchRequestNum); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; #endif - - const Int sz = static_cast<Int>(_batchStream.b.size()); - dest = &(_batchStream.b[0]) + 10; + + const Int sz = static_cast<Int>(_batchStream.b.size()); + dest = &(_batchStream.b[0]) + 10; #ifdef ICE_BIG_ENDIAN - src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1; - *dest++ = *src--; - *dest++ = *src--; - *dest++ = *src--; - *dest = *src; + src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest = *src; #else - src = reinterpret_cast<const Byte*>(&sz); - *dest++ = *src++; - *dest++ = *src++; - *dest++ = *src++; - *dest = *src; + src = reinterpret_cast<const Byte*>(&sz); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; #endif - - // - // Send the batch request. - // - _batchStream.i = _batchStream.b.begin(); - if(_traceLevels->protocol >= 1) - { - traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); - } - _transceiver->write(_batchStream); + + // + // Send the batch request. + // + _batchStream.i = _batchStream.b.begin(); + if(_traceLevels->protocol >= 1) + { + traceBatchRequest("sending batch request", _batchStream, _logger, _traceLevels); + } + _transceiver->write(_batchStream); } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - assert(_exception.get()); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + assert(_exception.get()); - // - // Since batch requests are all oneways, we - // must report the exception to the caller. - // - _exception->ice_throw(); + // + // Since batch requests are all oneways, we + // must report the exception to the caller. + // + _exception->ice_throw(); } { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // Reset the batch stream, and notify that flushing is over. - // + // + // Reset the batch stream, and notify that flushing is over. + // resetBatch(!ignoreInUse); } } @@ -797,69 +797,69 @@ Ice::Connection::sendResponse(BasicStream* os) { try { - Lock sendSync(_sendMonitor); + Lock sendSync(_sendMonitor); - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } + if(!_transceiver) // Has the transceiver already been closed? + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } - const Int sz = static_cast<Int>(os->b.size()); - Byte* dest = &(os->b[0]) + 10; + const Int sz = static_cast<Int>(os->b.size()); + Byte* dest = &(os->b[0]) + 10; #ifdef ICE_BIG_ENDIAN - const Byte* src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1; - *dest++ = *src--; - *dest++ = *src--; - *dest++ = *src--; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&sz) + sizeof(Ice::Int) - 1; + *dest++ = *src--; + *dest++ = *src--; + *dest++ = *src--; + *dest = *src; #else - const Byte* src = reinterpret_cast<const Byte*>(&sz); - *dest++ = *src++; - *dest++ = *src++; - *dest++ = *src++; - *dest = *src; + const Byte* src = reinterpret_cast<const Byte*>(&sz); + *dest++ = *src++; + *dest++ = *src++; + *dest++ = *src++; + *dest = *src; #endif - - // - // Send the reply. - // - os->i = os->b.begin(); - if(_traceLevels->protocol >= 1) - { - traceReply("sending reply", *os, _logger, _traceLevels); - } - _transceiver->write(*os); + + // + // Send the reply. + // + os->i = os->b.begin(); + if(_traceLevels->protocol >= 1) + { + traceReply("sending reply", *os, _logger, _traceLevels); + } + _transceiver->write(*os); } catch(const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); } { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_state > StateNotValidated); + assert(_state > StateNotValidated); - try - { - assert(_dispatchCount > 0); - if(--_dispatchCount == 0) - { - notifyAll(); - } - - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - } + try + { + assert(_dispatchCount > 0); + if(--_dispatchCount == 0) + { + notifyAll(); + } + + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } } } @@ -872,20 +872,20 @@ Ice::Connection::sendNoResponse() try { - assert(_dispatchCount > 0); - if(--_dispatchCount == 0) - { - notifyAll(); - } + assert(_dispatchCount > 0); + if(--_dispatchCount == 0) + { + notifyAll(); + } - if(_state == StateClosing && _dispatchCount == 0) - { - initiateShutdown(); - } + if(_state == StateClosing && _dispatchCount == 0) + { + initiateShutdown(); + } } catch(const LocalException& ex) { - setState(StateClosed, ex); + setState(StateClosed, ex); } } @@ -916,12 +916,12 @@ Ice::Connection::setAdapter(const ObjectAdapterPtr& adapter) // while(_dispatchCount > 0) { - wait(); + wait(); } if(_exception.get()) { - _exception->ice_throw(); + _exception->ice_throw(); } assert(_state < StateClosing); @@ -946,7 +946,7 @@ Ice::Connection::createProxy(const Identity& ident) const vector<ConnectionPtr> connections; connections.push_back(const_cast<Connection*>(this)); ReferencePtr ref = _instance->referenceFactory()->create(ident, Ice::Context(), "", ReferenceModeTwoway, - connections); + connections); return _instance->proxyFactory()->referenceToProxy(ref); } @@ -972,28 +972,28 @@ Ice::Connection::toString() const #ifndef ICEE_PURE_CLIENT Ice::Connection::Connection(const InstancePtr& instance, - const TransceiverPtr& transceiver, - const EndpointPtr& endpoint, - const ObjectAdapterPtr& adapter) : + const TransceiverPtr& transceiver, + const EndpointPtr& endpoint, + const ObjectAdapterPtr& adapter) : #else Ice::Connection::Connection(const InstancePtr& instance, - const TransceiverPtr& transceiver, - const EndpointPtr& endpoint) : + const TransceiverPtr& transceiver, + const EndpointPtr& endpoint) : #endif - _instance(instance), - _transceiver(transceiver), - _desc(transceiver->toString()), - _type(transceiver->type()), - _endpoint(endpoint), - _logger(_instance->initializationData().logger), // Cached for better performance. - _traceLevels(_instance->traceLevels()), // Cached for better performance. - _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), + _instance(instance), + _transceiver(transceiver), + _desc(transceiver->toString()), + _type(transceiver->type()), + _endpoint(endpoint), + _logger(_instance->initializationData().logger), // Cached for better performance. + _traceLevels(_instance->traceLevels()), // Cached for better performance. + _warn(_instance->initializationData().properties->getPropertyAsInt("Ice.Warn.Connections") > 0), #ifndef ICEE_PURE_CLIENT - _in(_instance.get(), this, _stream, adapter), + _in(_instance.get(), this, _stream, adapter), #endif #ifndef ICEE_PURE_BLOCKING_CLIENT - _stream(_instance.get(), _instance->messageSizeMax() + _stream(_instance.get(), _instance->messageSizeMax() #ifdef ICEE_HAS_WSTRING , _instance->initializationData().stringConverter, _instance->initializationData().wstringConverter #endif @@ -1002,19 +1002,19 @@ Ice::Connection::Connection(const InstancePtr& instance, #ifdef ICEE_HAS_BATCH _batchAutoFlush( _instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.BatchAutoFlush", 1) > 0), - _batchStream(_instance.get(), _instance->messageSizeMax(), + _batchStream(_instance.get(), _instance->messageSizeMax(), #ifdef ICEE_HAS_WSTRING _instance->initializationData().stringConverter, _instance->initializationData().wstringConverter, #endif _batchAutoFlush), - _batchStreamInUse(false), - _batchRequestNum(0), + _batchStreamInUse(false), + _batchRequestNum(0), _batchMarker(0), #endif - _dispatchCount(0), - _state(StateNotValidated), - _stateTime(IceUtil::Time::now()), - _nextRequestId(1) + _dispatchCount(0), + _state(StateNotValidated), + _stateTime(IceUtil::Time::now()), + _nextRequestId(1) #ifndef ICEE_PURE_BLOCKING_CLIENT , _requestsHint(_requests.end()) #endif @@ -1027,18 +1027,18 @@ Ice::Connection::Connection(const InstancePtr& instance, # endif if(_blocking) { - _transceiver->setTimeouts(_endpoint->timeout(), _endpoint->timeout()); + _transceiver->setTimeouts(_endpoint->timeout(), _endpoint->timeout()); } else { #ifdef _WIN32 - // - // On Windows, the recv() call doesn't return if the socket is - // shutdown. We use the timeout to not block indefinitely. - // - _transceiver->setTimeouts(_endpoint->timeout(), _endpoint->timeout()); + // + // On Windows, the recv() call doesn't return if the socket is + // shutdown. We use the timeout to not block indefinitely. + // + _transceiver->setTimeouts(_endpoint->timeout(), _endpoint->timeout()); #else - _transceiver->setTimeouts(-1, _endpoint->timeout()); + _transceiver->setTimeouts(-1, _endpoint->timeout()); #endif } #else @@ -1050,38 +1050,38 @@ Ice::Connection::Connection(const InstancePtr& instance, #else if(_blocking) { - validate(); + validate(); } else { __setNoDelete(true); try { - // - // If we are in thread per connection mode, create the thread - // for this connection. - // - _threadPerConnection = new ThreadPerConnection(this); - _threadPerConnection->start(_instance->threadPerConnectionStackSize()); + // + // If we are in thread per connection mode, create the thread + // for this connection. + // + _threadPerConnection = new ThreadPerConnection(this); + _threadPerConnection->start(_instance->threadPerConnectionStackSize()); } catch(const Ice::Exception& ex) { - { - Error out(_logger); - out << "cannot create thread for connection:\n" << ex.toString(); - } - - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - // Here we ignore any exceptions in close(). - } - - __setNoDelete(false); - ex.ice_throw(); + { + Error out(_logger); + out << "cannot create thread for connection:\n" << ex.toString(); + } + + try + { + _transceiver->close(); + } + catch(const LocalException&) + { + // Here we ignore any exceptions in close(). + } + + __setNoDelete(false); + ex.ice_throw(); } __setNoDelete(false); } @@ -1108,25 +1108,25 @@ Ice::Connection::validate() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - // - // The connection might already be closed (e.g.: the communicator - // was destroyed or object adapter deactivated.) - // - assert(_state == StateNotValidated || _state == StateClosed); - if(_state == StateClosed) - { - assert(_exception.get()); - _exception->ice_throw(); - } + + // + // The connection might already be closed (e.g.: the communicator + // was destroyed or object adapter deactivated.) + // + assert(_state == StateNotValidated || _state == StateClosed); + if(_state == StateClosed) + { + assert(_exception.get()); + _exception->ice_throw(); + } if(_in.getAdapter()) { - active = true; // The server side has the active role for connection validation. + active = true; // The server side has the active role for connection validation. } else { - active = false; // The client side has the passive role for connection validation. + active = false; // The client side has the passive role for connection validation. } } #endif @@ -1136,111 +1136,111 @@ Ice::Connection::validate() Int timeout; if(_instance->defaultsAndOverrides()->overrideConnectTimeout) { - timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; + timeout = _instance->defaultsAndOverrides()->overrideConnectTimeoutValue; } else { - timeout = _endpoint->timeout(); + timeout = _endpoint->timeout(); } #ifndef ICEE_PURE_CLIENT if(active) { - BasicStream os(_instance.get(), _instance->messageSizeMax() + BasicStream os(_instance.get(), _instance->messageSizeMax() #ifdef ICEE_HAS_WSTRING , _instance->initializationData().stringConverter, _instance->initializationData().wstringConverter #endif ); - os.write(magic[0]); - os.write(magic[1]); - os.write(magic[2]); - os.write(magic[3]); - os.write(protocolMajor); - os.write(protocolMinor); - os.write(encodingMajor); - os.write(encodingMinor); - os.write(validateConnectionMsg); - os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). - os.write(headerSize); // Message size. - os.i = os.b.begin(); - if(_traceLevels->protocol >= 1) - { - traceHeader("sending validate connection", os, _logger, _traceLevels); - } - try - { - _transceiver->writeWithTimeout(os, timeout); - } - catch(const TimeoutException&) - { - throw ConnectTimeoutException(__FILE__, __LINE__); - } + os.write(magic[0]); + os.write(magic[1]); + os.write(magic[2]); + os.write(magic[3]); + os.write(protocolMajor); + os.write(protocolMinor); + os.write(encodingMajor); + os.write(encodingMinor); + os.write(validateConnectionMsg); + os.write(static_cast<Byte>(0)); // Compression status (always zero for validate connection). + os.write(headerSize); // Message size. + os.i = os.b.begin(); + if(_traceLevels->protocol >= 1) + { + traceHeader("sending validate connection", os, _logger, _traceLevels); + } + try + { + _transceiver->writeWithTimeout(os, timeout); + } + catch(const TimeoutException&) + { + throw ConnectTimeoutException(__FILE__, __LINE__); + } } else #endif { - BasicStream is(_instance.get(), _instance->messageSizeMax() + BasicStream is(_instance.get(), _instance->messageSizeMax() #ifdef ICEE_HAS_WSTRING , _instance->initializationData().stringConverter, _instance->initializationData().wstringConverter #endif ); - is.b.resize(headerSize); - is.i = is.b.begin(); - try - { - _transceiver->readWithTimeout(is, timeout); - } - catch(const TimeoutException&) - { - throw ConnectTimeoutException(__FILE__, __LINE__); - } - assert(is.i == is.b.end()); - is.i = is.b.begin(); - Ice::Byte m[4]; - is.read(m[0]); - is.read(m[1]); - is.read(m[2]); - is.read(m[3]); - if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) - { - throwBadMagicException(__FILE__, __LINE__, Ice::ByteSeq(&m[0], &m[0] + sizeof(m))); - } - Byte pMajor; - Byte pMinor; - is.read(pMajor); - is.read(pMinor); - if(pMajor != protocolMajor) - { - throwUnsupportedProtocolException(__FILE__, __LINE__, pMajor, pMinor, protocolMajor, protocolMinor); - } - Byte eMajor; - Byte eMinor; - is.read(eMajor); - is.read(eMinor); - if(eMajor != encodingMajor) - { - throwUnsupportedEncodingException(__FILE__, __LINE__, eMajor, eMinor, encodingMajor, encodingMinor); - } - Byte messageType; - is.read(messageType); - if(messageType != validateConnectionMsg) - { - throwConnectionNotValidatedException(__FILE__, __LINE__); - } + is.b.resize(headerSize); + is.i = is.b.begin(); + try + { + _transceiver->readWithTimeout(is, timeout); + } + catch(const TimeoutException&) + { + throw ConnectTimeoutException(__FILE__, __LINE__); + } + assert(is.i == is.b.end()); + is.i = is.b.begin(); + Ice::Byte m[4]; + is.read(m[0]); + is.read(m[1]); + is.read(m[2]); + is.read(m[3]); + if(m[0] != magic[0] || m[1] != magic[1] || m[2] != magic[2] || m[3] != magic[3]) + { + throwBadMagicException(__FILE__, __LINE__, Ice::ByteSeq(&m[0], &m[0] + sizeof(m))); + } + Byte pMajor; + Byte pMinor; + is.read(pMajor); + is.read(pMinor); + if(pMajor != protocolMajor) + { + throwUnsupportedProtocolException(__FILE__, __LINE__, pMajor, pMinor, protocolMajor, protocolMinor); + } + Byte eMajor; + Byte eMinor; + is.read(eMajor); + is.read(eMinor); + if(eMajor != encodingMajor) + { + throwUnsupportedEncodingException(__FILE__, __LINE__, eMajor, eMinor, encodingMajor, encodingMinor); + } + Byte messageType; + is.read(messageType); + if(messageType != validateConnectionMsg) + { + throwConnectionNotValidatedException(__FILE__, __LINE__); + } Byte compress; is.read(compress); // Ignore compression status for validate connection. - Int size; - is.read(size); - if(size != headerSize) - { - throwIllegalMessageSizeException(__FILE__, __LINE__); - } - if(_traceLevels->protocol >= 1) - { - traceHeader("received validate connection", is, _logger, _traceLevels); - } + Int size; + is.read(size); + if(size != headerSize) + { + throwIllegalMessageSizeException(__FILE__, __LINE__); + } + if(_traceLevels->protocol >= 1) + { + traceHeader("received validate connection", is, _logger, _traceLevels); + } } } catch(const LocalException& ex) @@ -1269,41 +1269,41 @@ Ice::Connection::setState(State state, const LocalException& ex) if(_state == state) // Don't switch twice. { - return; + return; } if(!_exception.get()) { - // - // If we are in closed state, an exception must be set. - // - assert(_state != StateClosed); - - _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); - - if(_warn) - { - // - // We don't warn if we are not validated. - // - if(_state > StateNotValidated) - { - // - // Don't warn about certain expected exceptions. - // - if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || - dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || - dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || + // + // If we are in closed state, an exception must be set. + // + assert(_state != StateClosed); + + _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + + if(_warn) + { + // + // We don't warn if we are not validated. + // + if(_state > StateNotValidated) + { + // + // Don't warn about certain expected exceptions. + // + if(!(dynamic_cast<const CloseConnectionException*>(_exception.get()) || + dynamic_cast<const ForcedCloseConnectionException*>(_exception.get()) || + dynamic_cast<const CommunicatorDestroyedException*>(_exception.get()) || #ifndef ICEE_PURE_CLIENT - dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || + dynamic_cast<const ObjectAdapterDeactivatedException*>(_exception.get()) || #endif - (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) - { - Warning out(_logger); - out << "connection exception:\n" << (*_exception.get()).toString() << "\n" << _desc; - } - } - } + (dynamic_cast<const ConnectionLostException*>(_exception.get()) && _state == StateClosing))) + { + Warning out(_logger); + out << "connection exception:\n" << (*_exception.get()).toString() << "\n" << _desc; + } + } + } } // @@ -1319,91 +1319,91 @@ Ice::Connection::setState(State state) { if(_state == state) // Don't switch twice. { - return; + return; } switch(state) { case StateNotValidated: { - assert(false); - break; + assert(false); + break; } case StateActive: { - // - // Can only switch from holding or not validated to - // active. - // + // + // Can only switch from holding or not validated to + // active. + // #ifdef ICEE_PURE_CLIENT - if(_state != StateNotValidated) - { - return; - } + if(_state != StateNotValidated) + { + return; + } #else - if(_state != StateHolding && _state != StateNotValidated) - { - return; - } + if(_state != StateHolding && _state != StateNotValidated) + { + return; + } #endif - break; + break; } - + #ifndef ICEE_PURE_CLIENT case StateHolding: { - // - // Can only switch from active or not validated to - // holding. - // - if(_state != StateActive && _state != StateNotValidated) - { - return; - } - break; + // + // Can only switch from active or not validated to + // holding. + // + if(_state != StateActive && _state != StateNotValidated) + { + return; + } + break; } #endif case StateClosing: { - // - // Can't change back from closed. - // - if(_state == StateClosed) - { - return; - } - break; + // + // Can't change back from closed. + // + if(_state == StateClosed) + { + return; + } + break; } - + case StateClosed: { - // - // We shutdown both for reading and writing. This will - // unblock and read call with an exception. The thread - // per connection then closes the transceiver. - // - _transceiver->shutdownReadWrite(); + // + // We shutdown both for reading and writing. This will + // unblock and read call with an exception. The thread + // per connection then closes the transceiver. + // + _transceiver->shutdownReadWrite(); - // - // In blocking mode, we close the transceiver now. - // + // + // In blocking mode, we close the transceiver now. + // #ifndef ICEE_PURE_BLOCKING_CLIENT - if(_blocking) + if(_blocking) #endif - { - Lock sync(_sendMonitor); - try - { - _transceiver->close(); - } - catch(const Ice::LocalException&) - { - } - _transceiver = 0; - } - break; + { + Lock sync(_sendMonitor); + try + { + _transceiver->close(); + } + catch(const Ice::LocalException&) + { + } + _transceiver = 0; + } + break; } } @@ -1414,21 +1414,21 @@ Ice::Connection::setState(State state) if(_state == StateClosing && _dispatchCount == 0) { - try - { - initiateShutdown(); + try + { + initiateShutdown(); #ifndef ICEE_PURE_BLOCKING_CLIENT - if(_blocking) + if(_blocking) #endif - { - setState(StateClosed); - } - } - catch(const LocalException& ex) - { - setState(StateClosed, ex); - } + { + setState(StateClosed); + } + } + catch(const LocalException& ex) + { + setState(StateClosed, ex); + } } } @@ -1467,7 +1467,7 @@ Ice::Connection::initiateShutdown() const os.i = os.b.begin(); if(_traceLevels->protocol >= 1) { - traceHeader("sending close connection", os, _logger, _traceLevels); + traceHeader("sending close connection", os, _logger, _traceLevels); } _transceiver->write(os); @@ -1503,20 +1503,20 @@ Ice::Connection::readStreamAndParseMessage(IceInternal::BasicStream& stream, Int stream.readBlob(header, headerSize); if(header[0] != magic[0] || header[1] != magic[1] || header[2] != magic[2] || header[3] != magic[3]) { - throwBadMagicException(__FILE__, __LINE__, Ice::ByteSeq(&header[0], &header[0] + sizeof(magic))); + throwBadMagicException(__FILE__, __LINE__, Ice::ByteSeq(&header[0], &header[0] + sizeof(magic))); } if(header[4] != protocolMajor) { - throwUnsupportedProtocolException(__FILE__, __LINE__, header[4], header[5], protocolMajor, protocolMinor); + throwUnsupportedProtocolException(__FILE__, __LINE__, header[4], header[5], protocolMajor, protocolMinor); } if(header[6] != encodingMajor) { - throwUnsupportedEncodingException(__FILE__, __LINE__, header[6], header[7], encodingMajor, encodingMinor); + throwUnsupportedEncodingException(__FILE__, __LINE__, header[6], header[7], encodingMajor, encodingMinor); } const Byte messageType = header[8]; if(header[9] == 2) { - throw FeatureNotSupportedException(__FILE__, __LINE__, "compression"); + throw FeatureNotSupportedException(__FILE__, __LINE__, "compression"); } Int size; @@ -1524,21 +1524,21 @@ Ice::Connection::readStreamAndParseMessage(IceInternal::BasicStream& stream, Int stream.read(size); if(size < headerSize) { - throwIllegalMessageSizeException(__FILE__, __LINE__); + throwIllegalMessageSizeException(__FILE__, __LINE__); } if(size > static_cast<Int>(_instance->messageSizeMax())) { - throwMemoryLimitException(__FILE__, __LINE__); + throwMemoryLimitException(__FILE__, __LINE__); } if(size > static_cast<Int>(stream.b.size())) { - stream.b.resize(size); + stream.b.resize(size); } stream.i = stream.b.begin() + pos; if(stream.i != stream.b.end()) { - _transceiver->read(stream); + _transceiver->read(stream); } assert(stream.i == stream.b.end()); @@ -1548,74 +1548,74 @@ Ice::Connection::readStreamAndParseMessage(IceInternal::BasicStream& stream, Int { case closeConnectionMsg: { - if(_traceLevels->protocol >= 1) - { - traceHeader("received close connection", stream, _logger, _traceLevels); - } - throw CloseConnectionException(__FILE__, __LINE__); - break; + if(_traceLevels->protocol >= 1) + { + traceHeader("received close connection", stream, _logger, _traceLevels); + } + throw CloseConnectionException(__FILE__, __LINE__); + break; } - + case replyMsg: { - if(_traceLevels->protocol >= 1) - { - traceReply("received reply", stream, _logger, _traceLevels); - } - stream.read(requestId); - break; - } - + if(_traceLevels->protocol >= 1) + { + traceReply("received reply", stream, _logger, _traceLevels); + } + stream.read(requestId); + break; + } + #ifndef ICEE_PURE_CLIENT case requestMsg: { - if(_traceLevels->protocol >= 1) - { - traceRequest("received request", stream, _logger, _traceLevels); - } - stream.read(requestId); - invokeNum = 1; - break; + if(_traceLevels->protocol >= 1) + { + traceRequest("received request", stream, _logger, _traceLevels); + } + stream.read(requestId); + invokeNum = 1; + break; } - + case requestBatchMsg: { - if(_traceLevels->protocol >= 1) - { - traceBatchRequest("received batch request", stream, _logger, _traceLevels); - } - stream.read(invokeNum); - if(invokeNum < 0) - { - invokeNum = 0; - throwNegativeSizeException(__FILE__, __LINE__); - } - break; + if(_traceLevels->protocol >= 1) + { + traceBatchRequest("received batch request", stream, _logger, _traceLevels); + } + stream.read(invokeNum); + if(invokeNum < 0) + { + invokeNum = 0; + throwNegativeSizeException(__FILE__, __LINE__); + } + break; } #endif - + case validateConnectionMsg: { - if(_traceLevels->protocol >= 1) - { - traceHeader("received validate connection", stream, _logger, _traceLevels); - } - if(_warn) - { - Warning out(_logger); - out << "ignoring unexpected validate connection message:\n" << _desc; - } - break; - } - + if(_traceLevels->protocol >= 1) + { + traceHeader("received validate connection", stream, _logger, _traceLevels); + } + if(_warn) + { + Warning out(_logger); + out << "ignoring unexpected validate connection message:\n" << _desc; + } + break; + } + default: { - if(_traceLevels->protocol >= 1) - { - traceHeader("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels); - } - throwUnknownMessageException(__FILE__, __LINE__); - break; + if(_traceLevels->protocol >= 1) + { + traceHeader("received unknown message\n(invalid, closing connection)", stream, _logger, _traceLevels); + } + throwUnknownMessageException(__FILE__, __LINE__); + break; } } } @@ -1636,17 +1636,17 @@ Ice::Connection::run() } catch(const LocalException&) { - Lock sync(*this); + Lock sync(*this); assert(_state == StateClosed); - Lock sendSync(_sendMonitor); + Lock sendSync(_sendMonitor); try { - _transceiver->close(); + _transceiver->close(); } catch(const LocalException&) { - // Here we ignore any exceptions in close(). + // Here we ignore any exceptions in close(). } _transceiver = 0; @@ -1660,224 +1660,224 @@ Ice::Connection::run() while(!closed) { - Int requestId = 0; + Int requestId = 0; #ifndef ICEE_PURE_CLIENT - Int invokeNum = 0; - _in.os()->reset(); + Int invokeNum = 0; + _in.os()->reset(); #endif - _stream.reset(); - - // - // Read and parse the next message. We don't need to lock the - // send monitor here as we have the guarantee that - // _transceiver won't be set to 0 by another thread, the - // thread per connection is the only thread that can set - // _transceiver to 0. - // - try - { + _stream.reset(); + + // + // Read and parse the next message. We don't need to lock the + // send monitor here as we have the guarantee that + // _transceiver won't be set to 0 by another thread, the + // thread per connection is the only thread that can set + // _transceiver to 0. + // + try + { #ifndef ICEE_PURE_CLIENT - readStreamAndParseMessage(_stream, requestId, invokeNum); + readStreamAndParseMessage(_stream, requestId, invokeNum); #else - readStreamAndParseMessage(_stream, requestId); + readStreamAndParseMessage(_stream, requestId); #endif - } + } #ifdef _WIN32 - catch(const Ice::TimeoutException&) - { - // - // See the comment in the Connection constructor. This is - // necessary to not block in recv() indefinitely. - // - continue; - } + catch(const Ice::TimeoutException&) + { + // + // See the comment in the Connection constructor. This is + // necessary to not block in recv() indefinitely. + // + continue; + } #endif - catch(const Ice::LocalException& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - } - - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - if(_state != StateClosed) - { + catch(const Ice::LocalException& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + } + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state != StateClosed) + { #ifndef ICEE_PURE_CLIENT - if(invokeNum > 0) // We received a request or a batch request - { - if(_state == StateClosing) - { - if(_traceLevels->protocol >= 1) - { - string req = invokeNum > 1 ? "received batch request" : "received request"; - req += " during closing\n(ignored by server, client will retry)"; - traceRequest( req.c_str(), _stream, _logger, _traceLevels); - } - invokeNum = 0; - } - _dispatchCount += invokeNum; - } - else + if(invokeNum > 0) // We received a request or a batch request + { + if(_state == StateClosing) + { + if(_traceLevels->protocol >= 1) + { + string req = invokeNum > 1 ? "received batch request" : "received request"; + req += " during closing\n(ignored by server, client will retry)"; + traceRequest( req.c_str(), _stream, _logger, _traceLevels); + } + invokeNum = 0; + } + _dispatchCount += invokeNum; + } + else #endif - if(requestId > 0) - { - // - // The message is a reply, we search the Outgoing object waiting - // for this reply and pass it the stream before to notify the - // send monitor to wake up threads waiting for replies. - // - try - { - Lock sync(_sendMonitor); - - map<Int, Outgoing*>::iterator p = _requests.end(); - if(p != _requestsHint) - { - if(_requestsHint->first == requestId) - { - p = _requestsHint; - } - } - - if(p == _requests.end()) - { - p = _requests.find(requestId); - } - - if(p == _requests.end()) - { - throwUnknownRequestIdException(__FILE__, __LINE__); - } - - p->second->finished(_stream); - - if(p == _requestsHint) - { - _requests.erase(p++); - _requestsHint = p; - } - else - { - _requests.erase(p); - } - _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest() - } - catch(const Ice::LocalException& ex) - { - setState(StateClosed, ex); - } - } - } + if(requestId > 0) + { + // + // The message is a reply, we search the Outgoing object waiting + // for this reply and pass it the stream before to notify the + // send monitor to wake up threads waiting for replies. + // + try + { + Lock sync(_sendMonitor); + + map<Int, Outgoing*>::iterator p = _requests.end(); + if(p != _requestsHint) + { + if(_requestsHint->first == requestId) + { + p = _requestsHint; + } + } + + if(p == _requests.end()) + { + p = _requests.find(requestId); + } + + if(p == _requests.end()) + { + throwUnknownRequestIdException(__FILE__, __LINE__); + } + + p->second->finished(_stream); + + if(p == _requestsHint) + { + _requests.erase(p++); + _requestsHint = p; + } + else + { + _requests.erase(p); + } + _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest() + } + catch(const Ice::LocalException& ex) + { + setState(StateClosed, ex); + } + } + } #ifndef ICEE_PURE_CLIENT - while(_state == StateHolding) - { - wait(); - } + while(_state == StateHolding) + { + wait(); + } #endif - if(_state == StateClosed) - { - Lock sync(_sendMonitor); - try - { - _transceiver->close(); - } - catch(const LocalException&) - { - } - _transceiver = 0; - notifyAll(); - - // - // We cannot simply return here. We have to make sure - // that all requests are notified about the closed - // connection below. - // - closed = true; - } - - if(_state == StateClosed || _state == StateClosing) - { - Lock sync(_sendMonitor); - assert(_exception.get()); - for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) - { - p->second->finished(*_exception.get()); // The exception is immutable at this point. - } - _requests.clear(); - _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest() - } - } - - // - // Method invocation (or multiple invocations for batch - // messages) must be done outside the thread synchronization, - // so that nested calls are possible. - // + if(_state == StateClosed) + { + Lock sync(_sendMonitor); + try + { + _transceiver->close(); + } + catch(const LocalException&) + { + } + _transceiver = 0; + notifyAll(); + + // + // We cannot simply return here. We have to make sure + // that all requests are notified about the closed + // connection below. + // + closed = true; + } + + if(_state == StateClosed || _state == StateClosing) + { + Lock sync(_sendMonitor); + assert(_exception.get()); + for(map<Int, Outgoing*>::iterator p = _requests.begin(); p != _requests.end(); ++p) + { + p->second->finished(*_exception.get()); // The exception is immutable at this point. + } + _requests.clear(); + _sendMonitor.notifyAll(); // Wake up threads waiting in sendRequest() + } + } + + // + // Method invocation (or multiple invocations for batch + // messages) must be done outside the thread synchronization, + // so that nested calls are possible. + // #ifndef ICEE_PURE_CLIENT - try - { - for(;invokeNum > 0; --invokeNum) - { - // - // Prepare the response if necessary. - // - const bool response = requestId != 0; - if(response) - { - assert(invokeNum == 1); // No further invocations if a response is expected. - - // - // Add the reply header and request id. - // - BasicStream* os = _in.os(); - os->writeBlob(replyHdr, sizeof(replyHdr)); - os->write(requestId); - } - - // - // Dispatch the incoming request. - // - _in.invoke(response, requestId); - } - } - catch(const LocalException& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - setState(StateClosed, ex); - } - catch(const std::exception& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - UnknownException uex(__FILE__, __LINE__); - uex.unknown = string("std::exception: ") + ex.what(); - setState(StateClosed, uex); - } - catch(...) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - UnknownException uex(__FILE__, __LINE__); - uex.unknown = "unknown c++ exception"; - setState(StateClosed, uex); - } - - // - // If invoke() above raised an exception, and therefore neither - // sendResponse() nor sendNoResponse() has been called, then we - // must decrement _dispatchCount here. - // - if(invokeNum > 0) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - assert(_dispatchCount > 0); - _dispatchCount -= invokeNum; - assert(_dispatchCount >= 0); - if(_dispatchCount == 0) - { - notifyAll(); - } - } + try + { + for(;invokeNum > 0; --invokeNum) + { + // + // Prepare the response if necessary. + // + const bool response = requestId != 0; + if(response) + { + assert(invokeNum == 1); // No further invocations if a response is expected. + + // + // Add the reply header and request id. + // + BasicStream* os = _in.os(); + os->writeBlob(replyHdr, sizeof(replyHdr)); + os->write(requestId); + } + + // + // Dispatch the incoming request. + // + _in.invoke(response, requestId); + } + } + catch(const LocalException& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + } + catch(const std::exception& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + UnknownException uex(__FILE__, __LINE__); + uex.unknown = string("std::exception: ") + ex.what(); + setState(StateClosed, uex); + } + catch(...) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + UnknownException uex(__FILE__, __LINE__); + uex.unknown = "unknown c++ exception"; + setState(StateClosed, uex); + } + + // + // If invoke() above raised an exception, and therefore neither + // sendResponse() nor sendNoResponse() has been called, then we + // must decrement _dispatchCount here. + // + if(invokeNum > 0) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + assert(_dispatchCount > 0); + _dispatchCount -= invokeNum; + assert(_dispatchCount >= 0); + if(_dispatchCount == 0) + { + notifyAll(); + } + } #endif } } @@ -1892,22 +1892,22 @@ Ice::Connection::ThreadPerConnection::run() { try { - _connection->run(); + _connection->run(); } catch(const Exception& ex) - { - Error out(_connection->_logger); - out << "exception in thread per connection:\n" << _connection->toString() << ex.toString(); + { + Error out(_connection->_logger); + out << "exception in thread per connection:\n" << _connection->toString() << ex.toString(); } catch(const std::exception& ex) { - Error out(_connection->_logger); - out << "std::exception in thread per connection:\n" << _connection->toString() << ex.what(); + Error out(_connection->_logger); + out << "std::exception in thread per connection:\n" << _connection->toString() << ex.what(); } catch(...) { - Error out(_connection->_logger); - out << "unknown exception in thread per connection:\n" << _connection->toString(); + Error out(_connection->_logger); + out << "unknown exception in thread per connection:\n" << _connection->toString(); } _connection = 0; // Resolve cyclic dependency. |