diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 30 | ||||
-rw-r--r-- | cpp/src/IceDiscovery/LookupI.cpp | 110 | ||||
-rw-r--r-- | cpp/src/IceLocatorDiscovery/PluginI.cpp | 64 |
3 files changed, 154 insertions, 50 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index d2e78fcd4a9..6a641c38037 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -149,6 +149,7 @@ CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalExceptio InvocationTimeoutException ex(__FILE__, __LINE__); out->completed(ex); _sendRequests.erase(p); + _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. return; } @@ -185,6 +186,7 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs { outAsync->invokeCompletedAsync(); } + _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. return; } @@ -209,6 +211,12 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs void CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) { + // + // Increase the direct count to prevent the thread pool from being destroyed before + // invokeAll is called. This will also throw if the object adapter has been deactivated. + // + _adapter->incDirectCount(); + int requestId = 0; { Lock sync(*this); @@ -243,7 +251,14 @@ CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) AsyncStatus CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum) { + // + // Increase the direct count to prevent the thread pool from being destroyed before + // invokeAll is called. This will also throw if the object adapter has been deactivated. + // + _adapter->incDirectCount(); + int requestId = 0; + try { Lock sync(*this); @@ -257,6 +272,11 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba _sendAsyncRequests.insert(make_pair(outAsync, requestId)); } + catch(...) + { + _adapter->decDirectCount(); + throw; + } outAsync->attachCollocatedObserver(_adapter, requestId); @@ -418,6 +438,12 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq { while(invokeNum > 0) { + // + // Increase the direct count for the dispatch. We increase it again here for + // each dispatch. It's important for the direct count to be > 0 until the last + // collocated request response is sent to make sure the thread pool isn't + // destroyed before. + // try { _adapter->incDirectCount(); @@ -425,7 +451,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq catch(const ObjectAdapterDeactivatedException& ex) { handleException(requestId, ex, false); - return; + break; } Incoming in(_reference->getInstance().get(), this, 0, _adapter, _response, 0, requestId); @@ -437,6 +463,8 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq { invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception } + + _adapter->decDirectCount(); } void diff --git a/cpp/src/IceDiscovery/LookupI.cpp b/cpp/src/IceDiscovery/LookupI.cpp index 292edf91c1b..c1778a6eb92 100644 --- a/cpp/src/IceDiscovery/LookupI.cpp +++ b/cpp/src/IceDiscovery/LookupI.cpp @@ -99,9 +99,9 @@ ObjectRequest::runTimerTask() _lookup->objectRequestTimedOut(this); } -LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrx& lookup, const Ice::PropertiesPtr& properties) : - _registry(registry), - _lookup(lookup), +LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrx& lookup, const Ice::PropertiesPtr& properties) : + _registry(registry), + _lookup(lookup), _timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300))), _retryCount(properties->getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3)), _latencyMultiplier(properties->getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1)), @@ -138,8 +138,8 @@ LookupI::setLookupReply(const LookupReplyPrx& lookupReply) _lookupReply = lookupReply; } -void -LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const IceDiscovery::LookupReplyPrx& reply, +void +LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const IceDiscovery::LookupReplyPrx& reply, const Ice::Current&) { if(domainId != _domainId) @@ -153,12 +153,19 @@ LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const I // // Reply to the mulicast request using the given proxy. // - reply->begin_foundObjectById(id, proxy); + try + { + reply->begin_foundObjectById(id, proxy); + } + catch(const Ice::LocalException&) + { + // Ignore. + } } } -void -LookupI::findAdapterById(const string& domainId, const std::string& adapterId, +void +LookupI::findAdapterById(const string& domainId, const std::string& adapterId, const IceDiscovery::LookupReplyPrx& reply, const Ice::Current&) { if(domainId != _domainId) @@ -173,15 +180,22 @@ LookupI::findAdapterById(const string& domainId, const std::string& adapterId, // // Reply to the multicast request using the given proxy. // - reply->begin_foundAdapterById(adapterId, proxy, isReplicaGroup); + try + { + reply->begin_foundAdapterById(adapterId, proxy, isReplicaGroup); + } + catch(const Ice::LocalException&) + { + // Ignore. + } } } -void +void LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Identity& id) { Lock sync(*this); - map<Ice::Identity, ObjectRequestPtr>::const_iterator p = _objectRequests.find(id); + map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id); if(p == _objectRequests.end()) { p = _objectRequests.insert(make_pair(id, new ObjectRequest(this, id, _retryCount))).first; @@ -189,16 +203,24 @@ LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Ide if(p->second->addCallback(cb)) { - _lookup->begin_findObjectById(_domainId, id, _lookupReply); - _timer->schedule(p->second, _timeout); + try + { + _lookup->begin_findObjectById(_domainId, id, _lookupReply); + _timer->schedule(p->second, _timeout); + } + catch(const Ice::LocalException&) + { + p->second->finished(0); + _objectRequests.erase(p); + } } } -void +void LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::string& adapterId) { Lock sync(*this); - map<string, AdapterRequestPtr>::const_iterator p = _adapterRequests.find(adapterId); + map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId); if(p == _adapterRequests.end()) { p = _adapterRequests.insert(make_pair(adapterId, new AdapterRequest(this, adapterId, _retryCount))).first; @@ -206,8 +228,16 @@ LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::s if(p->second->addCallback(cb)) { - _lookup->begin_findAdapterById(_domainId, adapterId, _lookupReply); - _timer->schedule(p->second, _timeout); + try + { + _lookup->begin_findAdapterById(_domainId, adapterId, _lookupReply); + _timer->schedule(p->second, _timeout); + } + catch(const Ice::LocalException&) + { + p->second->finished(0); + _adapterRequests.erase(p); + } } } @@ -255,15 +285,20 @@ LookupI::objectRequestTimedOut(const ObjectRequestPtr& request) if(request->retry()) { - _lookup->begin_findObjectById(_domainId, request->getId(), _lookupReply); - _timer->schedule(p->second, _timeout); - } - else - { - request->finished(0); - _objectRequests.erase(p); - _timer->cancel(request); + try + { + _lookup->begin_findObjectById(_domainId, request->getId(), _lookupReply); + _timer->schedule(p->second, _timeout); + return; + } + catch(const Ice::LocalException&) + { + } } + + request->finished(0); + _objectRequests.erase(p); + _timer->cancel(request); } void @@ -278,28 +313,33 @@ LookupI::adapterRequestTimedOut(const AdapterRequestPtr& request) if(request->retry()) { - _lookup->begin_findAdapterById(_domainId, request->getId(), _lookupReply); - _timer->schedule(p->second, _timeout); - } - else - { - request->finished(0); - _adapterRequests.erase(p); - _timer->cancel(request); + try + { + _lookup->begin_findAdapterById(_domainId, request->getId(), _lookupReply); + _timer->schedule(p->second, _timeout); + return; + } + catch(const Ice::LocalException&) + { + } } + + request->finished(0); + _adapterRequests.erase(p); + _timer->cancel(request); } LookupReplyI::LookupReplyI(const LookupIPtr& lookup) : _lookup(lookup) { } -void +void LookupReplyI::foundObjectById(const Ice::Identity& id, const Ice::ObjectPrx& proxy, const Ice::Current&) { _lookup->foundObject(id, proxy); } -void +void LookupReplyI::foundAdapterById(const std::string& adapterId, const Ice::ObjectPrx& proxy, bool isReplicaGroup, const Ice::Current&) { diff --git a/cpp/src/IceLocatorDiscovery/PluginI.cpp b/cpp/src/IceLocatorDiscovery/PluginI.cpp index d364fa7157c..3c3c22ca89e 100644 --- a/cpp/src/IceLocatorDiscovery/PluginI.cpp +++ b/cpp/src/IceLocatorDiscovery/PluginI.cpp @@ -276,8 +276,15 @@ Request::invoke(const Ice::LocatorPrx& l) if(l != _locatorPrx) { _locatorPrx = l; - l->begin_ice_invoke(_operation, _mode, _inParams, _context, - Ice::newCallback_Object_ice_invoke(this, &Request::response, &Request::exception)); + try + { + l->begin_ice_invoke(_operation, _mode, _inParams, _context, + Ice::newCallback_Object_ice_invoke(this, &Request::response, &Request::exception)); + } + catch(const Ice::LocalException& ex) + { + exception(ex); + } } else { @@ -307,6 +314,18 @@ Request::exception(const Ice::Exception& ex) { _amdCB->ice_exception(ex); } + catch(const Ice::NoEndpointException&) + { + _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__)); + } + catch(const Ice::CommunicatorDestroyedException&) + { + _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__)); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__)); + } catch(const Ice::Exception&) { _exception.reset(ex.ice_clone()); @@ -450,8 +469,20 @@ LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request) if(_pendingRetryCount == 0) // No request in progress { _pendingRetryCount = _retryCount; - _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer->schedule(this, _timeout); + try + { + _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _timer->schedule(this, _timeout); + } + catch(const Ice::LocalException&) + { + for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + { + (*p)->invoke(_voidLocator); + } + _pendingRequests.clear(); + _pendingRetryCount = 0; + } } } } @@ -462,19 +493,24 @@ LocatorI::runTimerTask() Lock sync(*this); if(--_pendingRetryCount > 0) { - _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer->schedule(this, _timeout); - } - else - { - assert(!_pendingRequests.empty()); - for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + try { - (*p)->invoke(_voidLocator); // Send pending requests on void locator. + _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _timer->schedule(this, _timeout); + return; } - _pendingRequests.clear(); - _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires + catch(const Ice::LocalException&) + { + } + _pendingRetryCount = 0; } + + for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + { + (*p)->invoke(_voidLocator); // Send pending requests on void locator. + } + _pendingRequests.clear(); + _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires } void |