diff options
Diffstat (limited to 'cppe/src/IceE/Connection.cpp')
-rwxr-xr-x | cppe/src/IceE/Connection.cpp | 201 |
1 files changed, 118 insertions, 83 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp index a926dfe5fb1..8899ce2bee7 100755 --- a/cppe/src/IceE/Connection.cpp +++ b/cppe/src/IceE/Connection.cpp @@ -301,8 +301,62 @@ Ice::Connection::prepareRequest(BasicStream* os) os->writeBlob(_requestHdr); } + +Int +Ice::Connection::fillRequestId(BasicStream* os) +{ + // + // Create a new unique request ID. + // + Int requestId = _nextRequestId++; + if(requestId <= 0) + { + _nextRequestId = 1; + requestId = _nextRequestId++; + } + + // + // Fill in the request ID. + // + const Byte* p = reinterpret_cast<const Byte*>(&requestId); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#else + copy(p, p + sizeof(Int), os->b.begin() + headerSize); +#endif + + return requestId; +} + +void +Ice::Connection::sendRequest(BasicStream* os) +{ + if(!_transceiver) // Has the transceiver already been closed? + { + assert(_exception.get()); + _exception->ice_throw(); // The exception is immutable at this point. + } + + Int sz = static_cast<Int>(os->b.size()); + const Byte* p = reinterpret_cast<const Byte*>(&sz); +#ifdef ICE_BIG_ENDIAN + reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); +#else + copy(p, p + sizeof(Int), os->b.begin() + 10); +#endif + + // + // Send the request. + // + os->i = os->b.begin(); + traceRequest("sending request", *os, _logger, _traceLevels); + _transceiver->write(*os, _endpoint->timeout()); +} + +#ifdef ICEE_BLOCKING_CLIENT + void -Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out) +Ice::Connection::sendBlockingRequest(BasicStream* os, BasicStream* is, Outgoing* out) { Int requestId; @@ -318,78 +372,20 @@ Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out) assert(_state < StateClosing); // - // Only add to the request map if this is a twoway call. + // Fill in request id if it is a twoway call. // if(out) { - // - // Create a new unique request ID. - // - requestId = _nextRequestId++; - if(requestId <= 0) - { - _nextRequestId = 1; - requestId = _nextRequestId++; - } - - // - // Fill in the request ID. - // - const Byte* p = reinterpret_cast<const Byte*>(&requestId); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#else - copy(p, p + sizeof(Int), os->b.begin() + headerSize); -#endif - - // - // Add to the requests map if not blocking. - // -#ifndef ICEE_PURE_BLOCKING_CLIENT -# ifdef ICEE_BLOCKING_CLIENT - if(!_blocking) -# endif - { - _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); - } -#endif + requestId = fillRequestId(os); } } try { IceUtil::Mutex::Lock sendSync(_sendMutex); + sendRequest(os); - if(!_transceiver) // Has the transceiver already been closed? - { - assert(_exception.get()); - _exception->ice_throw(); // The exception is immutable at this point. - } - - Int sz = static_cast<Int>(os->b.size()); - const Byte* p = reinterpret_cast<const Byte*>(&sz); -#ifdef ICE_BIG_ENDIAN - reverse_copy(p, p + sizeof(Int), os->b.begin() + 10); -#else - copy(p, p + sizeof(Int), os->b.begin() + 10); -#endif - - // - // Send the request. - // - os->i = os->b.begin(); - traceRequest("sending request", *os, _logger, _traceLevels); - _transceiver->write(*os, _endpoint->timeout()); - -#ifdef ICEE_BLOCKING_CLIENT - // - // If blocking client, we wait for the response from the server. - // - if(out -#ifndef ICEE_PURE_BLOCKING_CLIENT - && _blocking -#endif - ) + if(out) { readStream(*is); @@ -430,20 +426,62 @@ Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out) } } } -#endif } catch(const LocalException& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); setState(StateClosed, ex); assert(_exception.get()); + _exception->ice_throw(); + } +} + +#endif #ifndef ICEE_PURE_BLOCKING_CLIENT - if(out -# ifdef ICEE_BLOCKING_CLIENT - && !_blocking -# endif - ) + +void +Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) +{ + Int requestId; + + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + if(_exception.get()) + { + _exception->ice_throw(); + } + + assert(_state > StateNotValidated); + assert(_state < StateClosing); + + // + // Only add to the request map if this is a twoway call. + // + if(out) + { + requestId = fillRequestId(os); + + // + // Add to the requests map. + // + _requestsHint = _requests.insert(_requests.end(), pair<const Int, Outgoing*>(requestId, out)); + } + } + + try + { + IceUtil::Mutex::Lock sendSync(_sendMutex); + sendRequest(os); + } + catch(const LocalException& ex) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + setState(StateClosed, ex); + assert(_exception.get()); + + if(out) { // // If the request has already been removed from the @@ -476,13 +514,14 @@ Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out) } } else -#endif { _exception->ice_throw(); } } } +#endif + #ifdef ICEE_HAS_BATCH void @@ -887,13 +926,11 @@ Ice::Connection::Connection(const InstancePtr& instance, _state(StateNotValidated), _stateTime(IceUtil::Time::now()) { -#ifndef ICEE_PURE_BLOCKING_CLIENT -# ifdef ICEE_BLOCKING_CLIENT - _blocking = _instance->properties()->getPropertyAsInt("Ice.Blocking") > 0 -# ifndef ICEE_PURE_CLIENT - && !_adapter -# endif - ; +#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) +# ifdef ICEE_PURE_CLIENT + _blocking = _instance->properties()->getPropertyAsInt("Ice.Blocking") > 0; +# else + _blocking = _instance->properties()->getPropertyAsInt("Ice.Blocking") > 0 && !_adapter; # endif #endif @@ -942,18 +979,16 @@ Ice::Connection::Connection(const InstancePtr& instance, } #endif -#ifdef ICEE_BLOCKING_CLIENT -# ifndef ICEE_PURE_BLOCKING_CLIENT +#ifdef ICEE_PURE_BLOCKING_CLIENT + validate(); +#else +# ifdef ICEE_BLOCKING_CLIENT if(_blocking) { -# endif validate(); -# ifndef ICEE_PURE_BLOCKING_CLIENT } else # endif -#endif -#ifndef ICEE_PURE_BLOCKING_CLIENT { __setNoDelete(true); try |