diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 272 |
1 files changed, 139 insertions, 133 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 8acf39d6a6c..58b31d02626 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -21,6 +21,7 @@ #include <Ice/Properties.h> #include <Ice/LoggerUtil.h> #include <Ice/LocatorInfo.h> +#include <Ice/ProxyFactory.h> using namespace std; using namespace Ice; @@ -34,7 +35,8 @@ void IceInternal::decRef(AMI_Object_ice_invoke* p) { p->__decRef(); } IceInternal::OutgoingAsync::OutgoingAsync() : __is(0), - __os(0) + __os(0), + _cnt(0) { } @@ -47,134 +49,17 @@ IceInternal::OutgoingAsync::~OutgoingAsync() } void -IceInternal::OutgoingAsync::__finished(const LocalException& exc) -{ - if(_reference->locatorInfo) - { - _reference->locatorInfo->clearObjectCache(_reference); - } - -/* - ProxyFactoryPtr proxyFactory = _reference->instance->proxyFactory(); - if(proxyFactory) - { - proxyFactory->checkRetryAfterException(ex, cnt); - } - else - { - ex.ice_throw(); // The communicator is already destroyed, so we cannot retry. - } -*/ - - try - { - ice_exception(exc); - } - catch(const Exception& ex) - { - warning(ex); - } - catch(const std::exception& ex) - { - warning(ex); - } - catch(...) - { - warning(); - } - - assert(_connection); - _connection->decProxyCount(); - _connection = 0; - - assert(__is); - delete __is; - __is = 0; - - assert(__os); - delete __os; - __os = 0; -} - -bool -IceInternal::OutgoingAsync::__timedOut() const -{ - if(_connection->timeout() >= 0) - { - return IceUtil::Time::now() >= _absoluteTimeout; - } - else - { - return false; - } -} - -void -IceInternal::OutgoingAsync::__prepare(const ReferencePtr& ref, const string& operation, OperationMode mode, - const Context& context) -{ - assert(!_reference); - _reference = ref; - - assert(!_connection); - _connection = _reference->getConnection(); - _connection->incProxyCount(); - - assert(!__is); - __is = new BasicStream(_reference->instance.get()); - - assert(!__os); - __os = new BasicStream(_reference->instance.get()); - - _connection->prepareRequest(__os); - _reference->identity.__write(__os); - __os->write(_reference->facet); - __os->write(operation); - __os->write(static_cast<Byte>(mode)); - __os->writeSize(Int(context.size())); - Context::const_iterator p; - for(p = context.begin(); p != context.end(); ++p) - { - __os->write(p->first); - __os->write(p->second); - } - - __os->startWriteEncaps(); -} - -void -IceInternal::OutgoingAsync::__send() -{ - if(_connection->timeout() >= 0) - { - _absoluteTimeout = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_connection->timeout()); - } - - try - { - _connection->sendAsyncRequest(__os, this); - } - catch(const LocalException&) - { - // - // Twoway requests report exceptions using finished(). - // - assert(false); - } -} - -void IceInternal::OutgoingAsync::__finished(BasicStream& is) { DispatchStatus status; - + try { __is->swap(is); Byte b; __is->read(b); status = static_cast<DispatchStatus>(b); - + switch(status) { case DispatchOK: @@ -192,7 +77,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) __is->read(ex.operation); throw ex; } - + case DispatchFacetNotExist: { FacetNotExistException ex(__FILE__, __LINE__); @@ -264,20 +149,120 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) warning(); } - assert(_reference); - _reference = 0; + cleanup(); +} - assert(_connection); - _connection->decProxyCount(); - _connection = 0; +void +IceInternal::OutgoingAsync::__finished(const LocalException& exc) +{ + try + { + ice_exception(exc); + } + catch(const Exception& ex) + { + warning(ex); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + cleanup(); +} + +bool +IceInternal::OutgoingAsync::__timedOut() const +{ + if(_connection && _connection->timeout() >= 0) + { + return IceUtil::Time::now() >= _absoluteTimeout; + } + else + { + return false; + } +} + +void +IceInternal::OutgoingAsync::__prepare(const ReferencePtr& ref, const string& operation, OperationMode mode, + const Context& context) +{ + assert(!_reference); + _reference = ref; + + assert(!_connection); + _connection = _reference->getConnection(); + _connection->incProxyCount(); + + assert(_cnt == 0); - assert(__is); - delete __is; - __is = 0; - - assert(__os); - delete __os; - __os = 0; + assert(!__is); + __is = new BasicStream(_reference->instance.get()); + + assert(!__os); + __os = new BasicStream(_reference->instance.get()); + + _connection->prepareRequest(__os); + _reference->identity.__write(__os); + __os->write(_reference->facet); + __os->write(operation); + __os->write(static_cast<Byte>(mode)); + __os->writeSize(Int(context.size())); + Context::const_iterator p; + for(p = context.begin(); p != context.end(); ++p) + { + __os->write(p->first); + __os->write(p->second); + } + + __os->startWriteEncaps(); +} + +void +IceInternal::OutgoingAsync::__send() +{ + if(_connection->timeout() >= 0) + { + _absoluteTimeout = IceUtil::Time::now() + IceUtil::Time::milliSeconds(_connection->timeout()); + } + + try + { + while(true) + { + try + { + _connection->sendAsyncRequest(__os, this); + break; + } + catch(const LocalException& ex) + { + if(_reference->locatorInfo) + { + _reference->locatorInfo->clearObjectCache(_reference); + } + + ProxyFactoryPtr proxyFactory = _reference->instance->proxyFactory(); + if(proxyFactory) + { + proxyFactory->checkRetryAfterException(ex, _cnt); + } + else + { + ex.ice_throw(); // The communicator is already destroyed, so we cannot retry. + } + } + } + } + catch(const LocalException& ex) + { + __finished(ex); + } } void @@ -311,6 +296,27 @@ IceInternal::OutgoingAsync::warning() const } void +IceInternal::OutgoingAsync::cleanup() +{ + assert(_reference); + _reference = 0; + + assert(_connection); + _connection->decProxyCount(); + _connection = 0; + + _cnt = 0; + + assert(__is); + delete __is; + __is = 0; + + assert(__os); + delete __os; + __os = 0; +} + +void Ice::AMI_Object_ice_invoke::__invoke(const IceInternal::ReferencePtr& ref, const string& operation, OperationMode mode, const vector<Byte>& inParams, const Context& context) { |