diff options
-rw-r--r-- | cpp/src/Ice/CollocatedRequestHandler.cpp | 30 | ||||
-rw-r--r-- | cpp/src/IceDiscovery/LookupI.cpp | 110 | ||||
-rw-r--r-- | cpp/src/IceLocatorDiscovery/PluginI.cpp | 64 | ||||
-rw-r--r-- | csharp/src/Ice/CollocatedRequestHandler.cs | 23 | ||||
-rw-r--r-- | csharp/src/IceDiscovery/LookupI.cs | 119 | ||||
-rw-r--r-- | csharp/src/IceLocatorDiscovery/PluginI.cs | 111 | ||||
-rw-r--r-- | java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java | 41 | ||||
-rw-r--r-- | java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java | 81 | ||||
-rw-r--r-- | java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java | 120 |
9 files changed, 496 insertions, 203 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index d2e78fcd4a9..6a641c38037 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -149,6 +149,7 @@ CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalExceptio InvocationTimeoutException ex(__FILE__, __LINE__); out->completed(ex); _sendRequests.erase(p); + _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. return; } @@ -185,6 +186,7 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs { outAsync->invokeCompletedAsync(); } + _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count. return; } @@ -209,6 +211,12 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs void CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) { + // + // Increase the direct count to prevent the thread pool from being destroyed before + // invokeAll is called. This will also throw if the object adapter has been deactivated. + // + _adapter->incDirectCount(); + int requestId = 0; { Lock sync(*this); @@ -243,7 +251,14 @@ CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum) AsyncStatus CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum) { + // + // Increase the direct count to prevent the thread pool from being destroyed before + // invokeAll is called. This will also throw if the object adapter has been deactivated. + // + _adapter->incDirectCount(); + int requestId = 0; + try { Lock sync(*this); @@ -257,6 +272,11 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba _sendAsyncRequests.insert(make_pair(outAsync, requestId)); } + catch(...) + { + _adapter->decDirectCount(); + throw; + } outAsync->attachCollocatedObserver(_adapter, requestId); @@ -418,6 +438,12 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq { while(invokeNum > 0) { + // + // Increase the direct count for the dispatch. We increase it again here for + // each dispatch. It's important for the direct count to be > 0 until the last + // collocated request response is sent to make sure the thread pool isn't + // destroyed before. + // try { _adapter->incDirectCount(); @@ -425,7 +451,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq catch(const ObjectAdapterDeactivatedException& ex) { handleException(requestId, ex, false); - return; + break; } Incoming in(_reference->getInstance().get(), this, 0, _adapter, _response, 0, requestId); @@ -437,6 +463,8 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq { invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception } + + _adapter->decDirectCount(); } void diff --git a/cpp/src/IceDiscovery/LookupI.cpp b/cpp/src/IceDiscovery/LookupI.cpp index 292edf91c1b..c1778a6eb92 100644 --- a/cpp/src/IceDiscovery/LookupI.cpp +++ b/cpp/src/IceDiscovery/LookupI.cpp @@ -99,9 +99,9 @@ ObjectRequest::runTimerTask() _lookup->objectRequestTimedOut(this); } -LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrx& lookup, const Ice::PropertiesPtr& properties) : - _registry(registry), - _lookup(lookup), +LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrx& lookup, const Ice::PropertiesPtr& properties) : + _registry(registry), + _lookup(lookup), _timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300))), _retryCount(properties->getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3)), _latencyMultiplier(properties->getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1)), @@ -138,8 +138,8 @@ LookupI::setLookupReply(const LookupReplyPrx& lookupReply) _lookupReply = lookupReply; } -void -LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const IceDiscovery::LookupReplyPrx& reply, +void +LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const IceDiscovery::LookupReplyPrx& reply, const Ice::Current&) { if(domainId != _domainId) @@ -153,12 +153,19 @@ LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const I // // Reply to the mulicast request using the given proxy. // - reply->begin_foundObjectById(id, proxy); + try + { + reply->begin_foundObjectById(id, proxy); + } + catch(const Ice::LocalException&) + { + // Ignore. + } } } -void -LookupI::findAdapterById(const string& domainId, const std::string& adapterId, +void +LookupI::findAdapterById(const string& domainId, const std::string& adapterId, const IceDiscovery::LookupReplyPrx& reply, const Ice::Current&) { if(domainId != _domainId) @@ -173,15 +180,22 @@ LookupI::findAdapterById(const string& domainId, const std::string& adapterId, // // Reply to the multicast request using the given proxy. // - reply->begin_foundAdapterById(adapterId, proxy, isReplicaGroup); + try + { + reply->begin_foundAdapterById(adapterId, proxy, isReplicaGroup); + } + catch(const Ice::LocalException&) + { + // Ignore. + } } } -void +void LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Identity& id) { Lock sync(*this); - map<Ice::Identity, ObjectRequestPtr>::const_iterator p = _objectRequests.find(id); + map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id); if(p == _objectRequests.end()) { p = _objectRequests.insert(make_pair(id, new ObjectRequest(this, id, _retryCount))).first; @@ -189,16 +203,24 @@ LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Ide if(p->second->addCallback(cb)) { - _lookup->begin_findObjectById(_domainId, id, _lookupReply); - _timer->schedule(p->second, _timeout); + try + { + _lookup->begin_findObjectById(_domainId, id, _lookupReply); + _timer->schedule(p->second, _timeout); + } + catch(const Ice::LocalException&) + { + p->second->finished(0); + _objectRequests.erase(p); + } } } -void +void LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::string& adapterId) { Lock sync(*this); - map<string, AdapterRequestPtr>::const_iterator p = _adapterRequests.find(adapterId); + map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId); if(p == _adapterRequests.end()) { p = _adapterRequests.insert(make_pair(adapterId, new AdapterRequest(this, adapterId, _retryCount))).first; @@ -206,8 +228,16 @@ LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::s if(p->second->addCallback(cb)) { - _lookup->begin_findAdapterById(_domainId, adapterId, _lookupReply); - _timer->schedule(p->second, _timeout); + try + { + _lookup->begin_findAdapterById(_domainId, adapterId, _lookupReply); + _timer->schedule(p->second, _timeout); + } + catch(const Ice::LocalException&) + { + p->second->finished(0); + _adapterRequests.erase(p); + } } } @@ -255,15 +285,20 @@ LookupI::objectRequestTimedOut(const ObjectRequestPtr& request) if(request->retry()) { - _lookup->begin_findObjectById(_domainId, request->getId(), _lookupReply); - _timer->schedule(p->second, _timeout); - } - else - { - request->finished(0); - _objectRequests.erase(p); - _timer->cancel(request); + try + { + _lookup->begin_findObjectById(_domainId, request->getId(), _lookupReply); + _timer->schedule(p->second, _timeout); + return; + } + catch(const Ice::LocalException&) + { + } } + + request->finished(0); + _objectRequests.erase(p); + _timer->cancel(request); } void @@ -278,28 +313,33 @@ LookupI::adapterRequestTimedOut(const AdapterRequestPtr& request) if(request->retry()) { - _lookup->begin_findAdapterById(_domainId, request->getId(), _lookupReply); - _timer->schedule(p->second, _timeout); - } - else - { - request->finished(0); - _adapterRequests.erase(p); - _timer->cancel(request); + try + { + _lookup->begin_findAdapterById(_domainId, request->getId(), _lookupReply); + _timer->schedule(p->second, _timeout); + return; + } + catch(const Ice::LocalException&) + { + } } + + request->finished(0); + _adapterRequests.erase(p); + _timer->cancel(request); } LookupReplyI::LookupReplyI(const LookupIPtr& lookup) : _lookup(lookup) { } -void +void LookupReplyI::foundObjectById(const Ice::Identity& id, const Ice::ObjectPrx& proxy, const Ice::Current&) { _lookup->foundObject(id, proxy); } -void +void LookupReplyI::foundAdapterById(const std::string& adapterId, const Ice::ObjectPrx& proxy, bool isReplicaGroup, const Ice::Current&) { diff --git a/cpp/src/IceLocatorDiscovery/PluginI.cpp b/cpp/src/IceLocatorDiscovery/PluginI.cpp index d364fa7157c..3c3c22ca89e 100644 --- a/cpp/src/IceLocatorDiscovery/PluginI.cpp +++ b/cpp/src/IceLocatorDiscovery/PluginI.cpp @@ -276,8 +276,15 @@ Request::invoke(const Ice::LocatorPrx& l) if(l != _locatorPrx) { _locatorPrx = l; - l->begin_ice_invoke(_operation, _mode, _inParams, _context, - Ice::newCallback_Object_ice_invoke(this, &Request::response, &Request::exception)); + try + { + l->begin_ice_invoke(_operation, _mode, _inParams, _context, + Ice::newCallback_Object_ice_invoke(this, &Request::response, &Request::exception)); + } + catch(const Ice::LocalException& ex) + { + exception(ex); + } } else { @@ -307,6 +314,18 @@ Request::exception(const Ice::Exception& ex) { _amdCB->ice_exception(ex); } + catch(const Ice::NoEndpointException&) + { + _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__)); + } + catch(const Ice::CommunicatorDestroyedException&) + { + _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__)); + } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__)); + } catch(const Ice::Exception&) { _exception.reset(ex.ice_clone()); @@ -450,8 +469,20 @@ LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request) if(_pendingRetryCount == 0) // No request in progress { _pendingRetryCount = _retryCount; - _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer->schedule(this, _timeout); + try + { + _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _timer->schedule(this, _timeout); + } + catch(const Ice::LocalException&) + { + for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + { + (*p)->invoke(_voidLocator); + } + _pendingRequests.clear(); + _pendingRetryCount = 0; + } } } } @@ -462,19 +493,24 @@ LocatorI::runTimerTask() Lock sync(*this); if(--_pendingRetryCount > 0) { - _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer->schedule(this, _timeout); - } - else - { - assert(!_pendingRequests.empty()); - for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + try { - (*p)->invoke(_voidLocator); // Send pending requests on void locator. + _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _timer->schedule(this, _timeout); + return; } - _pendingRequests.clear(); - _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires + catch(const Ice::LocalException&) + { + } + _pendingRetryCount = 0; } + + for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) + { + (*p)->invoke(_voidLocator); // Send pending requests on void locator. + } + _pendingRequests.clear(); + _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires } void diff --git a/csharp/src/Ice/CollocatedRequestHandler.cs b/csharp/src/Ice/CollocatedRequestHandler.cs index 4fcd670003c..21d4a7dc198 100644 --- a/csharp/src/Ice/CollocatedRequestHandler.cs +++ b/csharp/src/Ice/CollocatedRequestHandler.cs @@ -64,6 +64,7 @@ namespace IceInternal { outAsync.invokeCompletedAsync(cb); } + _adapter.decDirectCount(); // invokeAll won't be called, decrease the direct count. return; } if(outAsync is OutgoingAsync) @@ -161,7 +162,14 @@ namespace IceInternal public bool invokeAsyncRequest(OutgoingAsyncBase outAsync, int batchRequestNum, bool synchronous, out Ice.AsyncCallback sentCallback) { + // + // Increase the direct count to prevent the thread pool from being destroyed before + // invokeAll is called. This will also throw if the object adapter has been deactivated. + // + _adapter.incDirectCount(); + int requestId = 0; + try { lock(this) { @@ -176,6 +184,11 @@ namespace IceInternal _sendAsyncRequests.Add(outAsync, requestId); } } + catch(System.Exception ex) + { + _adapter.decDirectCount(); + throw ex; + } outAsync.attachCollocatedObserver(_adapter, requestId); @@ -277,6 +290,12 @@ namespace IceInternal { while(invokeNum > 0) { + // + // Increase the direct count for the dispatch. We increase it again here for + // each dispatch. It's important for the direct count to be > 0 until the last + // collocated request response is sent to make sure the thread pool isn't + // destroyed before. + // try { _adapter.incDirectCount(); @@ -284,7 +303,7 @@ namespace IceInternal catch(Ice.ObjectAdapterDeactivatedException ex) { handleException(requestId, ex, false); - return; + break; } Incoming @in = new Incoming(_reference.getInstance(), this, null, _adapter, _response, (byte)0, @@ -297,6 +316,8 @@ namespace IceInternal { invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception } + + _adapter.decDirectCount(); } void diff --git a/csharp/src/IceDiscovery/LookupI.cs b/csharp/src/IceDiscovery/LookupI.cs index 27a143f3d9d..d516eb8c16a 100644 --- a/csharp/src/IceDiscovery/LookupI.cs +++ b/csharp/src/IceDiscovery/LookupI.cs @@ -25,7 +25,7 @@ namespace IceDiscovery { return _id; } - + public bool addCallback(AmdCB cb) { callbacks_.Add(cb); @@ -55,7 +55,7 @@ namespace IceDiscovery { return _proxies.Count == 0 && --nRetry_ >= 0; } - + public bool response(Ice.ObjectPrx proxy, bool isReplicaGroup) { if(isReplicaGroup) @@ -102,7 +102,7 @@ namespace IceDiscovery } sendResponse(result.ice_endpoints(endpoints.ToArray())); } - + public void runTimerTask() { lookup_.adapterRequestTimedOut(this); @@ -116,7 +116,7 @@ namespace IceDiscovery } callbacks_.Clear(); } - + private List<Ice.ObjectPrx> _proxies = new List<Ice.ObjectPrx>(); private long _start; private long _latency; @@ -132,7 +132,7 @@ namespace IceDiscovery { finished(proxy); } - + public void finished(Ice.ObjectPrx proxy) { foreach(Ice.AMD_Locator_findObjectById cb in callbacks_) @@ -160,12 +160,12 @@ namespace IceDiscovery _domainId = properties.getProperty("IceDiscovery.DomainId"); _timer = IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer(); } - + public void setLookupReply(LookupReplyPrx lookupReply) { _lookupReply = lookupReply; } - + public override void findObjectById(string domainId, Ice.Identity id, IceDiscovery.LookupReplyPrx reply, Ice.Current c) { @@ -180,11 +180,18 @@ namespace IceDiscovery // // Reply to the mulicast request using the given proxy. // - getLookupReply(reply, c).begin_foundObjectById(id, proxy); + try + { + reply.begin_foundObjectById(id, proxy); + } + catch(Ice.LocalException) + { + // Ignore. + } } } - public override void findAdapterById(string domainId, string adapterId, IceDiscovery.LookupReplyPrx reply, + public override void findAdapterById(string domainId, string adapterId, IceDiscovery.LookupReplyPrx reply, Ice.Current c) { if(!domainId.Equals(_domainId)) @@ -199,7 +206,14 @@ namespace IceDiscovery // // Reply to the multicast request using the given proxy. // - getLookupReply(reply, c).begin_foundAdapterById(adapterId, proxy, isReplicaGroup); + try + { + reply.begin_foundAdapterById(adapterId, proxy, isReplicaGroup); + } + catch(Ice.LocalException) + { + // Ignore. + } } } @@ -215,8 +229,16 @@ namespace IceDiscovery } if(request.addCallback(cb)) { - _lookup.begin_findObjectById(_domainId, id, _lookupReply); - _timer.schedule(request, _timeout); + try + { + _lookup.begin_findObjectById(_domainId, id, _lookupReply); + _timer.schedule(request, _timeout); + } + catch(Ice.LocalException) + { + request.finished(null); + _objectRequests.Remove(id); + } } } } @@ -233,8 +255,16 @@ namespace IceDiscovery } if(request.addCallback(cb)) { - _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply); - _timer.schedule(request, _timeout); + try + { + _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply); + _timer.schedule(request, _timeout); + } + catch(Ice.LocalException) + { + request.finished(null); + _adapterRequests.Remove(adapterId); + } } } } @@ -284,15 +314,20 @@ namespace IceDiscovery if(request.retry()) { - _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply); - _timer.schedule(request, _timeout); - } - else - { - request.finished(null); - _objectRequests.Remove(request.getId()); - _timer.cancel(request); + try + { + _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply); + _timer.schedule(request, _timeout); + return; + } + catch(Ice.LocalException) + { + } } + + request.finished(null); + _objectRequests.Remove(request.getId()); + _timer.cancel(request); } } @@ -308,15 +343,20 @@ namespace IceDiscovery if(request.retry()) { - _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply); - _timer.schedule(request, _timeout); - } - else - { - request.finished(null); - _adapterRequests.Remove(request.getId()); - _timer.cancel(request); + try + { + _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply); + _timer.schedule(request, _timeout); + return; + } + catch(Ice.LocalException) + { + } } + + request.finished(null); + _adapterRequests.Remove(request.getId()); + _timer.cancel(request); } } @@ -330,23 +370,6 @@ namespace IceDiscovery return _latencyMultiplier; } - private LookupReplyPrx getLookupReply(LookupReplyPrx reply, Ice.Current current) - { - // Ice.UDPConnectionInfo info = Ice.UDPConnectionInfoPtr.dynamicCast(current.con.getInfo()); - // if(info) - // { - // Ice.Communicator com = current.adapter.getCommunicator(); - // ostringstream os; - // os << "\"" << com.identityToString(reply.ice_getIdentity()) << "\""; - // os << ":udp -h " << info.remoteAddress << " -p " << info.remotePort; - // return LookupReplyPrx.uncheckedCast(com.stringToProxy(os.str()).ice_datagram()); - // } - // else - { - return reply; - } - } - private LocatorRegistryI _registry; private readonly LookupPrx _lookup; private LookupReplyPrx _lookupReply; @@ -367,7 +390,7 @@ namespace IceDiscovery { _lookup = lookup; } - + public override void foundObjectById(Ice.Identity id, Ice.ObjectPrx proxy, Ice.Current c) { _lookup.foundObject(id, proxy); diff --git a/csharp/src/IceLocatorDiscovery/PluginI.cs b/csharp/src/IceLocatorDiscovery/PluginI.cs index f349eaa560c..ccf9b266850 100644 --- a/csharp/src/IceLocatorDiscovery/PluginI.cs +++ b/csharp/src/IceLocatorDiscovery/PluginI.cs @@ -44,31 +44,55 @@ namespace IceLocatorDiscovery invoke(Ice.LocatorPrx l) { _locatorPrx = l; - Request self = this; - l.begin_ice_invoke(_operation, _mode, _inParams, _context).whenCompleted( - (bool ok, byte[] outParams) => - { - _amdCB.ice_response(ok, outParams); - }, - (Ice.Exception ex) => - { - try - { - throw ex; - } - catch(Ice.RequestFailedException exc) - { - _amdCB.ice_exception(exc); - } - catch(Ice.UnknownException exc) + try + { + l.begin_ice_invoke(_operation, _mode, _inParams, _context).whenCompleted( + (bool ok, byte[] outParams) => { - _amdCB.ice_exception(exc); - } - catch(Ice.Exception) + _amdCB.ice_response(ok, outParams); + }, + (Ice.Exception ex) => { - _locator.invoke(_locatorPrx, self); // Retry with new locator proxy - } - }); + exception(ex); + }); + } + catch(Ice.LocalException ex) + { + exception(ex); + } + } + + private void + exception(Ice.Exception ex) + { + try + { + throw ex; + } + catch(Ice.RequestFailedException exc) + { + _amdCB.ice_exception(exc); + } + catch(Ice.UnknownException exc) + { + _amdCB.ice_exception(exc); + } + catch(Ice.NoEndpointException) + { + _amdCB.ice_exception(new Ice.ObjectNotExistException()); + } + catch(Ice.ObjectAdapterDeactivatedException) + { + _amdCB.ice_exception(new Ice.ObjectNotExistException()); + } + catch(Ice.CommunicatorDestroyedException) + { + _amdCB.ice_exception(new Ice.ObjectNotExistException()); + } + catch(Ice.Exception) + { + _locator.invoke(_locatorPrx, this); // Retry with new locator proxy + } } private readonly LocatorI _locator; @@ -243,8 +267,20 @@ namespace IceLocatorDiscovery if(_pendingRetryCount == 0) // No request in progress { _pendingRetryCount = _retryCount; - _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer.schedule(this, _timeout); + try + { + _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _timer.schedule(this, _timeout); + } + catch(Ice.LocalException) + { + foreach(Request req in _pendingRequests) + { + req.invoke(_voidLocator); + } + _pendingRequests.Clear(); + _pendingRetryCount = 0; + } } } } @@ -257,19 +293,24 @@ namespace IceLocatorDiscovery { if(--_pendingRetryCount > 0) { - _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request - _timer.schedule(this, _timeout); - } - else - { - Debug.Assert(_pendingRequests.Count > 0, "Pending requests is not empty"); - foreach(Request req in _pendingRequests) + try + { + _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request + _timer.schedule(this, _timeout); + return; + } + catch(Ice.LocalException) { - req.invoke(_voidLocator); } - _pendingRequests.Clear(); - _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay; + _pendingRetryCount = 0; + } + + foreach(Request req in _pendingRequests) + { + req.invoke(_voidLocator); } + _pendingRequests.Clear(); + _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay; } } diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java index 48b82937f11..55c2c69e7d3 100644 --- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java +++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java @@ -78,6 +78,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { outAsync.invokeCompletedAsync(); } + _adapter.decDirectCount(); // invokeAll won't be called, decrease the direct count. return; } @@ -183,18 +184,32 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler int invokeAsyncRequest(OutgoingAsyncBase outAsync, int batchRequestNum, boolean synchronous) { + // + // Increase the direct count to prevent the thread pool from being destroyed before + // invokeAll is called. This will also throw if the object adapter has been deactivated. + // + _adapter.incDirectCount(); + int requestId = 0; - synchronized(this) + try { - outAsync.cancelable(this); // This will throw if the request is canceled - - if(_response) + synchronized(this) { - requestId = ++_requestId; - _asyncRequests.put(requestId, outAsync); - } + outAsync.cancelable(this); // This will throw if the request is canceled + + if(_response) + { + requestId = ++_requestId; + _asyncRequests.put(requestId, outAsync); + } - _sendAsyncRequests.put(outAsync, requestId); + _sendAsyncRequests.put(outAsync, requestId); + } + } + catch(Exception ex) + { + _adapter.decDirectCount(); + throw ex; } outAsync.attachCollocatedObserver(_adapter, requestId); @@ -288,6 +303,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler { while(invokeNum > 0) { + // + // Increase the direct count for the dispatch. We increase it again here for + // each dispatch. It's important for the direct count to be > 0 until the last + // collocated request response is sent to make sure the thread pool isn't + // destroyed before. + // try { _adapter.incDirectCount(); @@ -295,7 +316,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler catch(Ice.ObjectAdapterDeactivatedException ex) { handleException(requestId, ex, false); - return; + break; } Incoming in = new Incoming(_reference.getInstance(), this, null, _adapter, _response, (byte)0, @@ -348,6 +369,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler throw ex; } } + + _adapter.decDirectCount(); } private void diff --git a/java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java b/java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java index 386abe3c5f4..4d2aff7c370 100644 --- a/java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java +++ b/java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java @@ -213,7 +213,14 @@ class LookupI extends _LookupDisp // // Reply to the mulicast request using the given proxy. // - reply.begin_foundObjectById(id, proxy); + try + { + reply.begin_foundObjectById(id, proxy); + } + catch(Ice.LocalException ex) + { + // Ignore + } } } @@ -233,7 +240,14 @@ class LookupI extends _LookupDisp // // Reply to the multicast request using the given proxy. // - reply.begin_foundAdapterById(adapterId, proxy, isReplicaGroup.value); + try + { + reply.begin_foundAdapterById(adapterId, proxy, isReplicaGroup.value); + } + catch(Ice.LocalException ex) + { + // Ignore + } } } @@ -249,8 +263,16 @@ class LookupI extends _LookupDisp if(request.addCallback(cb)) { - _lookup.begin_findObjectById(_domainId, id, _lookupReply); - request.scheduleTimer(_timeout); + try + { + _lookup.begin_findObjectById(_domainId, id, _lookupReply); + request.scheduleTimer(_timeout); + } + catch(Ice.LocalException ex) + { + request.finished(null); + _objectRequests.remove(id); + } } } @@ -266,8 +288,16 @@ class LookupI extends _LookupDisp if(request.addCallback(cb)) { - _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply); - request.scheduleTimer(_timeout); + try + { + _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply); + request.scheduleTimer(_timeout); + } + catch(Ice.LocalException ex) + { + request.finished(null); + _adapterRequests.remove(adapterId); + } } } @@ -312,14 +342,19 @@ class LookupI extends _LookupDisp if(request.retry()) { - _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply); - request.scheduleTimer(_timeout); - } - else - { - request.finished(null); - _objectRequests.remove(request.getId()); + try + { + _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply); + request.scheduleTimer(_timeout); + return; + } + catch(Ice.LocalException ex) + { + } } + + request.finished(null); + _objectRequests.remove(request.getId()); } synchronized void @@ -333,14 +368,19 @@ class LookupI extends _LookupDisp if(request.retry()) { - _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply); - request.scheduleTimer(_timeout); - } - else - { - request.finished(null); - _adapterRequests.remove(request.getId()); + try + { + _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply); + request.scheduleTimer(_timeout); + return; + } + catch(Ice.LocalException ex) + { + } } + + request.finished(null); + _adapterRequests.remove(request.getId()); } private LocatorRegistryI _registry; @@ -355,6 +395,5 @@ class LookupI extends _LookupDisp private Map<Ice.Identity, ObjectRequest> _objectRequests = new HashMap<Ice.Identity, ObjectRequest>(); private Map<String, AdapterRequest> _adapterRequests = new HashMap<String, AdapterRequest>(); - } diff --git a/java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java b/java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java index c93d4f19df3..b19bc6d59f8 100644 --- a/java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java +++ b/java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java @@ -36,38 +36,63 @@ class PluginI implements Ice.Plugin invoke(Ice.LocatorPrx l) { _locatorPrx = l; - l.begin_ice_invoke(_operation, _mode, _inParams, _context, - new Ice.Callback_Object_ice_invoke() - { - @Override - public void - response(boolean ok, byte[] outParams) - { - _amdCB.ice_response(ok, outParams); - } - - @Override - public void - exception(Ice.LocalException ex) + try + { + l.begin_ice_invoke(_operation, _mode, _inParams, _context, + new Ice.Callback_Object_ice_invoke() { - try - { - throw ex; - } - catch(Ice.RequestFailedException exc) - { - _amdCB.ice_exception(ex); - } - catch(Ice.UnknownException exc) + @Override + public void + response(boolean ok, byte[] outParams) { - _amdCB.ice_exception(ex); + _amdCB.ice_response(ok, outParams); } - catch(Ice.LocalException exc) + + @Override + public void + exception(Ice.LocalException ex) { - _locator.invoke(_locatorPrx, Request.this); // Retry with new locator proxy + Request.this.exception(ex); } - } - }); + }); + } + catch(Ice.LocalException ex) + { + exception(ex); + } + } + + private void + exception(Ice.LocalException ex) + { + try + { + throw ex; + } + catch(Ice.RequestFailedException exc) + { + _amdCB.ice_exception(ex); + } + catch(Ice.UnknownException exc) + { + _amdCB.ice_exception(ex); + } + catch(Ice.NoEndpointException exc) + { + _amdCB.ice_exception(new Ice.ObjectNotExistException()); + } + catch(Ice.ObjectAdapterDeactivatedException exc) + { + _amdCB.ice_exception(new Ice.ObjectNotExistException()); + } + catch(Ice.CommunicatorDestroyedException exc) + { + _amdCB.ice_exception(new Ice.ObjectNotExistException()); + } + catch(Ice.LocalException exc) + { + _locator.invoke(_locatorPrx, Request.this); // Retry with new locator proxy + } } private final LocatorI _locator; @@ -242,8 +267,20 @@ class PluginI implements Ice.Plugin if(_pendingRetryCount == 0) // No request in progress { _pendingRetryCount = _retryCount; - _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS); + try + { + _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS); + } + catch(Ice.LocalException ex) + { + for(Request req : _pendingRequests) + { + req.invoke(_voidLocator); + } + _pendingRequests.clear(); + _pendingRetryCount = 0; + } } } } @@ -257,19 +294,24 @@ class PluginI implements Ice.Plugin { if(--_pendingRetryCount > 0) { - _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS); - } - else - { - assert !_pendingRequests.isEmpty(); - for(Request req : _pendingRequests) + try { - req.invoke(_voidLocator); + _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS); + return; } - _pendingRequests.clear(); - _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay; + catch(Ice.LocalException ex) + { + } + _pendingRetryCount = 0; + } + + for(Request req : _pendingRequests) + { + req.invoke(_voidLocator); } + _pendingRequests.clear(); + _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay; } } |