diff options
author | Dwayne Boone <dwayne@zeroc.com> | 2005-12-13 17:03:20 +0000 |
---|---|---|
committer | Dwayne Boone <dwayne@zeroc.com> | 2005-12-13 17:03:20 +0000 |
commit | 60858a33bbce82f8af38fa70d26fc46d771aece4 (patch) | |
tree | ccd38b1dd37803de6dffbf889053fe894db62188 /cppe/src | |
parent | Added replica group tests (diff) | |
download | ice-60858a33bbce82f8af38fa70d26fc46d771aece4.tar.bz2 ice-60858a33bbce82f8af38fa70d26fc46d771aece4.tar.xz ice-60858a33bbce82f8af38fa70d26fc46d771aece4.zip |
Removed sync from Outgoing for blocking sends
Diffstat (limited to 'cppe/src')
-rwxr-xr-x | cppe/src/IceE/Connection.cpp | 26 | ||||
-rw-r--r-- | cppe/src/IceE/Instance.cpp | 21 | ||||
-rw-r--r-- | cppe/src/IceE/Instance.h | 12 | ||||
-rw-r--r-- | cppe/src/IceE/Outgoing.cpp | 143 |
4 files changed, 107 insertions, 95 deletions
diff --git a/cppe/src/IceE/Connection.cpp b/cppe/src/IceE/Connection.cpp index f6df199a2c3..ef329c27daa 100755 --- a/cppe/src/IceE/Connection.cpp +++ b/cppe/src/IceE/Connection.cpp @@ -302,7 +302,7 @@ Ice::Connection::prepareRequest(BasicStream* os) } void -Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) +Ice::Connection::sendRequest(BasicStream* os, BasicStream* is, Outgoing* out) { Int requestId; @@ -391,8 +391,7 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) #endif ) { - BasicStream stream(_instance.get()); - readStream(stream); + readStream(*is); #ifndef ICEE_PURE_CLIENT Int invokeNum = 0; @@ -406,9 +405,9 @@ Ice::Connection::sendRequest(BasicStream* os, Outgoing* out) if(_state != StateClosed) { #ifndef ICEE_PURE_CLIENT - parseMessage(stream, requestId, out, invokeNum, servantManager, adapter); + parseMessage(*is, requestId, invokeNum, servantManager, adapter); #else - parseMessage(stream, requestId, out); + parseMessage(*is, requestId); #endif } @@ -762,6 +761,14 @@ Ice::Connection::endpoint() const return _endpoint; // No mutex protection necessary, _endpoint is immutable. } +#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) +bool +Ice::Connection::blocking() const +{ + return _blocking; +} +#endif + #ifndef ICEE_PURE_CLIENT void @@ -882,7 +889,7 @@ Ice::Connection::Connection(const InstancePtr& instance, { #ifndef ICEE_PURE_BLOCKING_CLIENT # ifdef ICEE_BLOCKING_CLIENT - _blocking = _instance->blocking() + _blocking = _instance->properties()->getPropertyAsInt("Ice.Blocking") > 0 # ifndef ICEE_PURE_CLIENT && !_adapter # endif @@ -1356,7 +1363,7 @@ Ice::Connection::initiateShutdown() const } void -Ice::Connection::parseMessage(BasicStream& stream, Int& requestId, Outgoing* out +Ice::Connection::parseMessage(BasicStream& stream, Int& requestId #ifndef ICEE_PURE_CLIENT ,Int& invokeNum, ServantManagerPtr& servantManager, ObjectAdapterPtr& adapter #endif @@ -1410,7 +1417,6 @@ Ice::Connection::parseMessage(BasicStream& stream, Int& requestId, Outgoing* out { throw UnknownRequestIdException(__FILE__, __LINE__); } - out->finished(stream); break; } @@ -1813,9 +1819,9 @@ Ice::Connection::run() if(_state != StateClosed) { #ifndef ICEE_PURE_CLIENT - parseMessage(stream, requestId, 0, invokeNum, servantManager, adapter); + parseMessage(stream, requestId, invokeNum, servantManager, adapter); #else - parseMessage(stream, requestId, 0); + parseMessage(stream, requestId); #endif } diff --git a/cppe/src/IceE/Instance.cpp b/cppe/src/IceE/Instance.cpp index 0f3f59a89a2..c30b41df662 100644 --- a/cppe/src/IceE/Instance.cpp +++ b/cppe/src/IceE/Instance.cpp @@ -202,20 +202,14 @@ IceInternal::Instance::objectAdapterFactory() const } #endif -#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) -bool -IceInternal::Instance::blocking() const -{ - return _blocking; -} -#endif - +#ifndef ICEE_PURE_BLOCKING_CLIENT size_t IceInternal::Instance::threadPerConnectionStackSize() const { // No mutex lock, immutable. return _threadPerConnectionStackSize; } +#endif EndpointFactoryPtr IceInternal::Instance::endpointFactory() const @@ -298,10 +292,9 @@ IceInternal::Instance::getDefaultContext() const IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const PropertiesPtr& properties) : _state(StateActive), _properties(properties), - _messageSizeMax(0), - _threadPerConnectionStackSize(0) -#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) - , _blocking(false) + _messageSizeMax(0) +#ifndef ICEE_PURE_BLOCKING_CLIENT + , _threadPerConnectionStackSize(0) #endif { try @@ -450,10 +443,6 @@ IceInternal::Instance::Instance(const CommunicatorPtr& communicator, const Prope } #ifndef ICEE_PURE_BLOCKING_CLIENT -# ifdef ICEE_BLOCKING_CLIENT - const_cast<bool&>(_blocking) = _properties->getPropertyAsInt("Ice.Blocking") > 0; -# endif - { Int stackSize = _properties->getPropertyAsInt("Ice.ThreadPerConnection.StackSize"); if(stackSize < 0) diff --git a/cppe/src/IceE/Instance.h b/cppe/src/IceE/Instance.h index a9af7a54819..5971b5b94ca 100644 --- a/cppe/src/IceE/Instance.h +++ b/cppe/src/IceE/Instance.h @@ -58,16 +58,14 @@ public: #endif void setDefaultContext(const ::Ice::Context&); ::Ice::Context getDefaultContext() const; +#ifndef ICEE_PURE_BLOCKING_CLIENT size_t threadPerConnectionStackSize() const; +#endif #ifndef ICEE_PURE_CLIENT ObjectAdapterFactoryPtr objectAdapterFactory() const; #endif -#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) - bool blocking() const; -#endif - private: Instance(const Ice::CommunicatorPtr&, const Ice::PropertiesPtr&); @@ -89,7 +87,9 @@ private: const TraceLevelsPtr _traceLevels; // Immutable, not reset by destroy(). const DefaultsAndOverridesPtr _defaultsAndOverrides; // Immutable, not reset by destroy(). const size_t _messageSizeMax; // Immutable, not reset by destroy(). +#ifndef ICEE_PURE_BLOCKING_CLIENT const size_t _threadPerConnectionStackSize; +#endif #ifdef ICEE_HAS_ROUTER RouterManagerPtr _routerManager; #endif @@ -105,10 +105,6 @@ private: #ifndef ICEE_PURE_CLIENT ObjectAdapterFactoryPtr _objectAdapterFactory; #endif - -#if defined(ICEE_BLOCKING_CLIENT) && !defined(ICEE_PURE_BLOCKING_CLIENT) - const bool _blocking; -#endif }; } diff --git a/cppe/src/IceE/Outgoing.cpp b/cppe/src/IceE/Outgoing.cpp index c75ded2a4cc..c5561b62c7d 100644 --- a/cppe/src/IceE/Outgoing.cpp +++ b/cppe/src/IceE/Outgoing.cpp @@ -113,78 +113,94 @@ IceInternal::Outgoing::invoke() { case Reference::ModeTwoway: { - // - // We let all exceptions raised by sending directly - // propagate to the caller, because they can be retried - // without violating "at-most-once". In case of such - // exceptions, the connection object does not call back on - // this object, so we don't need to lock the mutex, keep - // track of state, or save exceptions. - // - _connection->sendRequest(&_os, this); +#ifndef ICEE_PURE_BLOCKING_CLIENT +#ifdef ICEE_BLOCKING_CLIENT + if(!_connection->blocking()) + { +#endif + // + // We let all exceptions raised by sending directly + // propagate to the caller, because they can be retried + // without violating "at-most-once". In case of such + // exceptions, the connection object does not call back on + // this object, so we don't need to lock the mutex, keep + // track of state, or save exceptions. + // + _connection->sendRequest(&_os, 0, this); - // - // Wait until the request has completed, or until the - // request times out. - // + // + // Wait until the request has completed, or until the + // request times out. + // - bool timedOut = false; + bool timedOut = false; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - // - // It's possible that the request has already - // completed, due to a regular response, or because of - // an exception. So we only change the state to "in - // progress" if it is still "unsent". - // - if(_state == StateUnsent) - { - _state = StateInProgress; - } + // + // It's possible that the request has already + // completed, due to a regular response, or because of + // an exception. So we only change the state to "in + // progress" if it is still "unsent". + // + if(_state == StateUnsent) + { + _state = StateInProgress; + } - Int timeout = _connection->timeout(); - while(_state == StateInProgress && !timedOut) - { - if(timeout >= 0) - { - timedWait(IceUtil::Time::milliSeconds(timeout)); + Int timeout = _connection->timeout(); + while(_state == StateInProgress && !timedOut) + { + if(timeout >= 0) + { + timedWait(IceUtil::Time::milliSeconds(timeout)); - if(_state == StateInProgress) - { - timedOut = true; - } + if(_state == StateInProgress) + { + timedOut = true; + } + } + else + { + wait(); + } } - else + } + + if(timedOut) + { + // + // Must be called outside the synchronization of this + // object. + // + _connection->exception(TimeoutException(__FILE__, __LINE__)); + + // + // We must wait until the exception set above has + // propagated to this Outgoing object. + // { - wait(); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + + while(_state == StateInProgress) + { + wait(); + } } - } + } +#ifdef ICEE_BLOCKING_CLIENT } - - if(timedOut) + else { - // - // Must be called outside the synchronization of this - // object. - // - _connection->exception(TimeoutException(__FILE__, __LINE__)); - - // - // We must wait until the exception set above has - // propagated to this Outgoing object. - // + _connection->sendRequest(&_os, &_is, this); + if(!_exception.get()) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); - - while(_state == StateInProgress) - { - wait(); - } + finishedInternal(); } } - +#endif +#endif if(_exception.get()) { // @@ -232,7 +248,7 @@ IceInternal::Outgoing::invoke() // violating "at-most-once". // _state = StateInProgress; - _connection->sendRequest(&_os, 0); + _connection->sendRequest(&_os, 0, 0); break; } @@ -297,6 +313,13 @@ IceInternal::Outgoing::finished(BasicStream& is) assert(_state <= StateInProgress); _is.swap(is); + finishedInternal(); + notify(); +} + +void +IceInternal::Outgoing::finishedInternal() +{ Byte status; _is.read(status); @@ -449,8 +472,6 @@ IceInternal::Outgoing::finished(BasicStream& is) break; } } - - notify(); } void |