diff options
Diffstat (limited to 'cpp/src/Ice/Proxy.cpp')
-rw-r--r-- | cpp/src/Ice/Proxy.cpp | 150 |
1 files changed, 103 insertions, 47 deletions
diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index 319c12aa623..ee704a8b569 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -8,6 +8,8 @@ // // ********************************************************************** +#include <IceUtil/Thread.h> + #include <Ice/Proxy.h> #include <Ice/ProxyFactory.h> #include <Ice/Object.h> @@ -23,6 +25,7 @@ #include <Ice/ConnectionFactory.h> #include <Ice/Connection.h> #include <Ice/RouterInfo.h> +#include <Ice/LocatorInfo.h> #include <Ice/BasicStream.h> #include <Ice/LocalException.h> #include <Ice/Functional.h> @@ -290,7 +293,7 @@ IceProxy::Ice::Object::ice_newIdentity(const Identity& newIdentity) const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(_reference->changeIdentity(newIdentity)); + proxy->setup(_reference->changeIdentity(newIdentity), _retryIntervals); return proxy; } } @@ -311,7 +314,7 @@ IceProxy::Ice::Object::ice_newFacet(const string& newFacet) const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(_reference->changeFacet(newFacet)); + proxy->setup(_reference->changeFacet(newFacet), _retryIntervals); return proxy; } } @@ -327,7 +330,7 @@ IceProxy::Ice::Object::ice_twoway() const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -343,7 +346,7 @@ IceProxy::Ice::Object::ice_oneway() const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -359,7 +362,7 @@ IceProxy::Ice::Object::ice_batchOneway() const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -375,7 +378,7 @@ IceProxy::Ice::Object::ice_datagram() const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -391,7 +394,7 @@ IceProxy::Ice::Object::ice_batchDatagram() const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -407,7 +410,7 @@ IceProxy::Ice::Object::ice_secure(bool b) const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -423,7 +426,7 @@ IceProxy::Ice::Object::ice_compress(bool b) const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -439,7 +442,7 @@ IceProxy::Ice::Object::ice_timeout(int t) const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -455,7 +458,7 @@ IceProxy::Ice::Object::ice_router(const RouterPrx& router) const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -471,7 +474,7 @@ IceProxy::Ice::Object::ice_default() const else { ObjectPrx proxy(new ::IceProxy::Ice::Object()); - proxy->setup(ref); + proxy->setup(ref, _retryIntervals); return proxy; } } @@ -495,11 +498,13 @@ IceProxy::Ice::Object::__copyFrom(const ObjectPrx& from) ReferencePtr ref; Handle< ::IceDelegateD::Ice::Object> delegateD; Handle< ::IceDelegateM::Ice::Object> delegateM; + vector<int> retryIntervals; { IceUtil::Mutex::Lock sync(*from.get()); ref = from->_reference; + retryIntervals = from->_retryIntervals; delegateD = dynamic_cast< ::IceDelegateD::Ice::Object*>(from->_delegate.get()); delegateM = dynamic_cast< ::IceDelegateM::Ice::Object*>(from->_delegate.get()); } @@ -510,7 +515,8 @@ IceProxy::Ice::Object::__copyFrom(const ObjectPrx& from) // _reference = ref; - + _retryIntervals = retryIntervals; + if(delegateD) { Handle< ::IceDelegateD::Ice::Object> delegate = __createDelegateD(); @@ -532,8 +538,6 @@ IceProxy::Ice::Object::__handleException(const LocalException& ex, int& cnt) _delegate = 0; - static const int max = 1; // TODO: Make number of retries configurable - try { ex.ice_throw(); @@ -544,7 +548,6 @@ IceProxy::Ice::Object::__handleException(const LocalException& ex, int& cnt) // We always retry on a close connection exception, as this // indicates graceful server shutdown. // - // TODO: configurable timeout before we try again? } catch(const SocketException&) { @@ -561,8 +564,8 @@ IceProxy::Ice::Object::__handleException(const LocalException& ex, int& cnt) TraceLevelsPtr traceLevels = _reference->instance->traceLevels(); LoggerPtr logger = _reference->instance->logger(); - - if(cnt > max) + + if(cnt > (int)_retryIntervals.size()) { if(traceLevels->retry >= 1) { @@ -575,13 +578,22 @@ IceProxy::Ice::Object::__handleException(const LocalException& ex, int& cnt) if(traceLevels->retry >= 1) { Trace out(logger, traceLevels->retryCat); - out << "re-trying operation call because of exception\n" << ex; + out << "re-trying operation call"; + if(cnt > 0 && _retryIntervals[cnt - 1] > 0) + { + out << " in " << _retryIntervals[cnt - 1] << " (ms)"; + } + out << " because of exception\n" << ex; } - // - // Reset the endpoints to the original endpoints upon retry - // - _reference = _reference->changeEndpoints(_reference->origEndpoints); + if(cnt > 0) + { + // + // Sleep before retrying. TODO: is it safe to sleep here + // with the mutex locked? + // + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(_retryIntervals[cnt - 1])); + } } void @@ -596,6 +608,7 @@ IceProxy::Ice::Object::__locationForward(const LocationForward& ex) throw LocationForwardIdentityException(__FILE__, __LINE__); } + _reference = _reference->changeAdapterId(ex._prx->_reference->adapterId); _reference = _reference->changeEndpoints(ex._prx->_reference->endpoints); /* @@ -668,13 +681,14 @@ IceProxy::Ice::Object::__createDelegateD() } void -IceProxy::Ice::Object::setup(const ReferencePtr& ref) +IceProxy::Ice::Object::setup(const ReferencePtr& ref, const vector<int>& retryIntervals) { // // No need to synchronize "*this", as this operation is only // called upon initialization. // _reference = ref; + _retryIntervals = retryIntervals; } IceDelegateM::Ice::Object::~Object() @@ -856,31 +870,73 @@ IceDelegateM::Ice::Object::setup(const ReferencePtr& ref) } else { - vector<EndpointPtr> endpoints; - if(__reference->routerInfo) - { - // - // If we route, we send everything to the router's client - // proxy endpoints. - // - ObjectPrx proxy = __reference->routerInfo->getClientProxy(); - endpoints = filterEndpoints(proxy->__reference()->endpoints); - } - else - { - endpoints = filterEndpoints(__reference->endpoints); - } - - if(endpoints.empty()) + while(true) { - throw NoEndpointException(__FILE__, __LINE__); + vector<EndpointPtr> endpoints; + bool cached = false; + if(__reference->routerInfo) + { + // + // If we route, we send everything to the router's client + // proxy endpoints. + // + ObjectPrx proxy = ref->routerInfo->getClientProxy(); + endpoints = proxy->__reference()->endpoints; + } + else if(!__reference->endpoints.empty()) + { + endpoints = __reference->endpoints; + } + else if(__reference->locatorInfo) + { + cached = __reference->locatorInfo->getEndpoints(__reference, endpoints); + } + + + vector<EndpointPtr> filteredEndpoints = filterEndpoints(endpoints); + if(filteredEndpoints.empty()) + { + throw NoEndpointException(__FILE__, __LINE__); + } + + try + { + OutgoingConnectionFactoryPtr factory = __reference->instance->outgoingConnectionFactory(); + __connection = factory->create(filteredEndpoints); + assert(__connection); + __connection->incProxyUsageCount(); + } + catch(const LocalException& ex) + { + if(cached) + { + TraceLevelsPtr traceLevels = __reference->instance->traceLevels(); + LoggerPtr logger = __reference->instance->logger(); + + if(traceLevels->retry >= 1) + { + Trace out(logger, traceLevels->retryCat); + out << "connection to cached endpoint failed, removing endpoint from cache\n" + << "and trying one more time\n" << ex; + } + + assert(__reference->locatorInfo); + __reference->locatorInfo->removeEndpoints(__reference); + continue; + } + else + { + throw; + } + } + + if(__reference->locatorInfo && !cached) + { + __reference->locatorInfo->addEndpoints(__reference, endpoints); + } + break; } - - OutgoingConnectionFactoryPtr factory = __reference->instance->outgoingConnectionFactory(); - __connection = factory->create(endpoints); - assert(__connection); - __connection->incProxyUsageCount(); - + // // If we have a router, set the object adapter for this router // (if any) to the new connection, so that callbacks from the |