summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp30
-rw-r--r--cpp/src/IceDiscovery/LookupI.cpp110
-rw-r--r--cpp/src/IceLocatorDiscovery/PluginI.cpp64
-rw-r--r--csharp/src/Ice/CollocatedRequestHandler.cs23
-rw-r--r--csharp/src/IceDiscovery/LookupI.cs119
-rw-r--r--csharp/src/IceLocatorDiscovery/PluginI.cs111
-rw-r--r--java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java41
-rw-r--r--java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java81
-rw-r--r--java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java120
9 files changed, 496 insertions, 203 deletions
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index d2e78fcd4a9..6a641c38037 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -149,6 +149,7 @@ CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalExceptio
InvocationTimeoutException ex(__FILE__, __LINE__);
out->completed(ex);
_sendRequests.erase(p);
+ _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count.
return;
}
@@ -185,6 +186,7 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
{
outAsync->invokeCompletedAsync();
}
+ _adapter->decDirectCount(); // invokeAll won't be called, decrease the direct count.
return;
}
@@ -209,6 +211,12 @@ CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAs
void
CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum)
{
+ //
+ // Increase the direct count to prevent the thread pool from being destroyed before
+ // invokeAll is called. This will also throw if the object adapter has been deactivated.
+ //
+ _adapter->incDirectCount();
+
int requestId = 0;
{
Lock sync(*this);
@@ -243,7 +251,14 @@ CollocatedRequestHandler::invokeRequest(OutgoingBase* out, int batchRequestNum)
AsyncStatus
CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int batchRequestNum)
{
+ //
+ // Increase the direct count to prevent the thread pool from being destroyed before
+ // invokeAll is called. This will also throw if the object adapter has been deactivated.
+ //
+ _adapter->incDirectCount();
+
int requestId = 0;
+ try
{
Lock sync(*this);
@@ -257,6 +272,11 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsyncBase* outAsync, int ba
_sendAsyncRequests.insert(make_pair(outAsync, requestId));
}
+ catch(...)
+ {
+ _adapter->decDirectCount();
+ throw;
+ }
outAsync->attachCollocatedObserver(_adapter, requestId);
@@ -418,6 +438,12 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq
{
while(invokeNum > 0)
{
+ //
+ // Increase the direct count for the dispatch. We increase it again here for
+ // each dispatch. It's important for the direct count to be > 0 until the last
+ // collocated request response is sent to make sure the thread pool isn't
+ // destroyed before.
+ //
try
{
_adapter->incDirectCount();
@@ -425,7 +451,7 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq
catch(const ObjectAdapterDeactivatedException& ex)
{
handleException(requestId, ex, false);
- return;
+ break;
}
Incoming in(_reference->getInstance().get(), this, 0, _adapter, _response, 0, requestId);
@@ -437,6 +463,8 @@ CollocatedRequestHandler::invokeAll(BasicStream* os, Int requestId, Int batchReq
{
invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
+
+ _adapter->decDirectCount();
}
void
diff --git a/cpp/src/IceDiscovery/LookupI.cpp b/cpp/src/IceDiscovery/LookupI.cpp
index 292edf91c1b..c1778a6eb92 100644
--- a/cpp/src/IceDiscovery/LookupI.cpp
+++ b/cpp/src/IceDiscovery/LookupI.cpp
@@ -99,9 +99,9 @@ ObjectRequest::runTimerTask()
_lookup->objectRequestTimedOut(this);
}
-LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrx& lookup, const Ice::PropertiesPtr& properties) :
- _registry(registry),
- _lookup(lookup),
+LookupI::LookupI(const LocatorRegistryIPtr& registry, const LookupPrx& lookup, const Ice::PropertiesPtr& properties) :
+ _registry(registry),
+ _lookup(lookup),
_timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceDiscovery.Timeout", 300))),
_retryCount(properties->getPropertyAsIntWithDefault("IceDiscovery.RetryCount", 3)),
_latencyMultiplier(properties->getPropertyAsIntWithDefault("IceDiscovery.LatencyMultiplier", 1)),
@@ -138,8 +138,8 @@ LookupI::setLookupReply(const LookupReplyPrx& lookupReply)
_lookupReply = lookupReply;
}
-void
-LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const IceDiscovery::LookupReplyPrx& reply,
+void
+LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const IceDiscovery::LookupReplyPrx& reply,
const Ice::Current&)
{
if(domainId != _domainId)
@@ -153,12 +153,19 @@ LookupI::findObjectById(const string& domainId, const Ice::Identity& id, const I
//
// Reply to the mulicast request using the given proxy.
//
- reply->begin_foundObjectById(id, proxy);
+ try
+ {
+ reply->begin_foundObjectById(id, proxy);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore.
+ }
}
}
-void
-LookupI::findAdapterById(const string& domainId, const std::string& adapterId,
+void
+LookupI::findAdapterById(const string& domainId, const std::string& adapterId,
const IceDiscovery::LookupReplyPrx& reply, const Ice::Current&)
{
if(domainId != _domainId)
@@ -173,15 +180,22 @@ LookupI::findAdapterById(const string& domainId, const std::string& adapterId,
//
// Reply to the multicast request using the given proxy.
//
- reply->begin_foundAdapterById(adapterId, proxy, isReplicaGroup);
+ try
+ {
+ reply->begin_foundAdapterById(adapterId, proxy, isReplicaGroup);
+ }
+ catch(const Ice::LocalException&)
+ {
+ // Ignore.
+ }
}
}
-void
+void
LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Identity& id)
{
Lock sync(*this);
- map<Ice::Identity, ObjectRequestPtr>::const_iterator p = _objectRequests.find(id);
+ map<Ice::Identity, ObjectRequestPtr>::iterator p = _objectRequests.find(id);
if(p == _objectRequests.end())
{
p = _objectRequests.insert(make_pair(id, new ObjectRequest(this, id, _retryCount))).first;
@@ -189,16 +203,24 @@ LookupI::findObject(const Ice::AMD_Locator_findObjectByIdPtr& cb, const Ice::Ide
if(p->second->addCallback(cb))
{
- _lookup->begin_findObjectById(_domainId, id, _lookupReply);
- _timer->schedule(p->second, _timeout);
+ try
+ {
+ _lookup->begin_findObjectById(_domainId, id, _lookupReply);
+ _timer->schedule(p->second, _timeout);
+ }
+ catch(const Ice::LocalException&)
+ {
+ p->second->finished(0);
+ _objectRequests.erase(p);
+ }
}
}
-void
+void
LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::string& adapterId)
{
Lock sync(*this);
- map<string, AdapterRequestPtr>::const_iterator p = _adapterRequests.find(adapterId);
+ map<string, AdapterRequestPtr>::iterator p = _adapterRequests.find(adapterId);
if(p == _adapterRequests.end())
{
p = _adapterRequests.insert(make_pair(adapterId, new AdapterRequest(this, adapterId, _retryCount))).first;
@@ -206,8 +228,16 @@ LookupI::findAdapter(const Ice::AMD_Locator_findAdapterByIdPtr& cb, const std::s
if(p->second->addCallback(cb))
{
- _lookup->begin_findAdapterById(_domainId, adapterId, _lookupReply);
- _timer->schedule(p->second, _timeout);
+ try
+ {
+ _lookup->begin_findAdapterById(_domainId, adapterId, _lookupReply);
+ _timer->schedule(p->second, _timeout);
+ }
+ catch(const Ice::LocalException&)
+ {
+ p->second->finished(0);
+ _adapterRequests.erase(p);
+ }
}
}
@@ -255,15 +285,20 @@ LookupI::objectRequestTimedOut(const ObjectRequestPtr& request)
if(request->retry())
{
- _lookup->begin_findObjectById(_domainId, request->getId(), _lookupReply);
- _timer->schedule(p->second, _timeout);
- }
- else
- {
- request->finished(0);
- _objectRequests.erase(p);
- _timer->cancel(request);
+ try
+ {
+ _lookup->begin_findObjectById(_domainId, request->getId(), _lookupReply);
+ _timer->schedule(p->second, _timeout);
+ return;
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
+
+ request->finished(0);
+ _objectRequests.erase(p);
+ _timer->cancel(request);
}
void
@@ -278,28 +313,33 @@ LookupI::adapterRequestTimedOut(const AdapterRequestPtr& request)
if(request->retry())
{
- _lookup->begin_findAdapterById(_domainId, request->getId(), _lookupReply);
- _timer->schedule(p->second, _timeout);
- }
- else
- {
- request->finished(0);
- _adapterRequests.erase(p);
- _timer->cancel(request);
+ try
+ {
+ _lookup->begin_findAdapterById(_domainId, request->getId(), _lookupReply);
+ _timer->schedule(p->second, _timeout);
+ return;
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
+
+ request->finished(0);
+ _adapterRequests.erase(p);
+ _timer->cancel(request);
}
LookupReplyI::LookupReplyI(const LookupIPtr& lookup) : _lookup(lookup)
{
}
-void
+void
LookupReplyI::foundObjectById(const Ice::Identity& id, const Ice::ObjectPrx& proxy, const Ice::Current&)
{
_lookup->foundObject(id, proxy);
}
-void
+void
LookupReplyI::foundAdapterById(const std::string& adapterId, const Ice::ObjectPrx& proxy, bool isReplicaGroup,
const Ice::Current&)
{
diff --git a/cpp/src/IceLocatorDiscovery/PluginI.cpp b/cpp/src/IceLocatorDiscovery/PluginI.cpp
index d364fa7157c..3c3c22ca89e 100644
--- a/cpp/src/IceLocatorDiscovery/PluginI.cpp
+++ b/cpp/src/IceLocatorDiscovery/PluginI.cpp
@@ -276,8 +276,15 @@ Request::invoke(const Ice::LocatorPrx& l)
if(l != _locatorPrx)
{
_locatorPrx = l;
- l->begin_ice_invoke(_operation, _mode, _inParams, _context,
- Ice::newCallback_Object_ice_invoke(this, &Request::response, &Request::exception));
+ try
+ {
+ l->begin_ice_invoke(_operation, _mode, _inParams, _context,
+ Ice::newCallback_Object_ice_invoke(this, &Request::response, &Request::exception));
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception(ex);
+ }
}
else
{
@@ -307,6 +314,18 @@ Request::exception(const Ice::Exception& ex)
{
_amdCB->ice_exception(ex);
}
+ catch(const Ice::NoEndpointException&)
+ {
+ _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__));
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__));
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ _amdCB->ice_exception(Ice::ObjectNotExistException(__FILE__, __LINE__));
+ }
catch(const Ice::Exception&)
{
_exception.reset(ex.ice_clone());
@@ -450,8 +469,20 @@ LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request)
if(_pendingRetryCount == 0) // No request in progress
{
_pendingRetryCount = _retryCount;
- _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
- _timer->schedule(this, _timeout);
+ try
+ {
+ _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ _timer->schedule(this, _timeout);
+ }
+ catch(const Ice::LocalException&)
+ {
+ for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
+ {
+ (*p)->invoke(_voidLocator);
+ }
+ _pendingRequests.clear();
+ _pendingRetryCount = 0;
+ }
}
}
}
@@ -462,19 +493,24 @@ LocatorI::runTimerTask()
Lock sync(*this);
if(--_pendingRetryCount > 0)
{
- _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
- _timer->schedule(this, _timeout);
- }
- else
- {
- assert(!_pendingRequests.empty());
- for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
+ try
{
- (*p)->invoke(_voidLocator); // Send pending requests on void locator.
+ _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ _timer->schedule(this, _timeout);
+ return;
}
- _pendingRequests.clear();
- _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires
+ catch(const Ice::LocalException&)
+ {
+ }
+ _pendingRetryCount = 0;
}
+
+ for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
+ {
+ (*p)->invoke(_voidLocator); // Send pending requests on void locator.
+ }
+ _pendingRequests.clear();
+ _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires
}
void
diff --git a/csharp/src/Ice/CollocatedRequestHandler.cs b/csharp/src/Ice/CollocatedRequestHandler.cs
index 4fcd670003c..21d4a7dc198 100644
--- a/csharp/src/Ice/CollocatedRequestHandler.cs
+++ b/csharp/src/Ice/CollocatedRequestHandler.cs
@@ -64,6 +64,7 @@ namespace IceInternal
{
outAsync.invokeCompletedAsync(cb);
}
+ _adapter.decDirectCount(); // invokeAll won't be called, decrease the direct count.
return;
}
if(outAsync is OutgoingAsync)
@@ -161,7 +162,14 @@ namespace IceInternal
public bool invokeAsyncRequest(OutgoingAsyncBase outAsync, int batchRequestNum, bool synchronous,
out Ice.AsyncCallback sentCallback)
{
+ //
+ // Increase the direct count to prevent the thread pool from being destroyed before
+ // invokeAll is called. This will also throw if the object adapter has been deactivated.
+ //
+ _adapter.incDirectCount();
+
int requestId = 0;
+ try
{
lock(this)
{
@@ -176,6 +184,11 @@ namespace IceInternal
_sendAsyncRequests.Add(outAsync, requestId);
}
}
+ catch(System.Exception ex)
+ {
+ _adapter.decDirectCount();
+ throw ex;
+ }
outAsync.attachCollocatedObserver(_adapter, requestId);
@@ -277,6 +290,12 @@ namespace IceInternal
{
while(invokeNum > 0)
{
+ //
+ // Increase the direct count for the dispatch. We increase it again here for
+ // each dispatch. It's important for the direct count to be > 0 until the last
+ // collocated request response is sent to make sure the thread pool isn't
+ // destroyed before.
+ //
try
{
_adapter.incDirectCount();
@@ -284,7 +303,7 @@ namespace IceInternal
catch(Ice.ObjectAdapterDeactivatedException ex)
{
handleException(requestId, ex, false);
- return;
+ break;
}
Incoming @in = new Incoming(_reference.getInstance(), this, null, _adapter, _response, (byte)0,
@@ -297,6 +316,8 @@ namespace IceInternal
{
invokeException(requestId, ex, invokeNum, false); // Fatal invocation exception
}
+
+ _adapter.decDirectCount();
}
void
diff --git a/csharp/src/IceDiscovery/LookupI.cs b/csharp/src/IceDiscovery/LookupI.cs
index 27a143f3d9d..d516eb8c16a 100644
--- a/csharp/src/IceDiscovery/LookupI.cs
+++ b/csharp/src/IceDiscovery/LookupI.cs
@@ -25,7 +25,7 @@ namespace IceDiscovery
{
return _id;
}
-
+
public bool addCallback(AmdCB cb)
{
callbacks_.Add(cb);
@@ -55,7 +55,7 @@ namespace IceDiscovery
{
return _proxies.Count == 0 && --nRetry_ >= 0;
}
-
+
public bool response(Ice.ObjectPrx proxy, bool isReplicaGroup)
{
if(isReplicaGroup)
@@ -102,7 +102,7 @@ namespace IceDiscovery
}
sendResponse(result.ice_endpoints(endpoints.ToArray()));
}
-
+
public void runTimerTask()
{
lookup_.adapterRequestTimedOut(this);
@@ -116,7 +116,7 @@ namespace IceDiscovery
}
callbacks_.Clear();
}
-
+
private List<Ice.ObjectPrx> _proxies = new List<Ice.ObjectPrx>();
private long _start;
private long _latency;
@@ -132,7 +132,7 @@ namespace IceDiscovery
{
finished(proxy);
}
-
+
public void finished(Ice.ObjectPrx proxy)
{
foreach(Ice.AMD_Locator_findObjectById cb in callbacks_)
@@ -160,12 +160,12 @@ namespace IceDiscovery
_domainId = properties.getProperty("IceDiscovery.DomainId");
_timer = IceInternal.Util.getInstance(lookup.ice_getCommunicator()).timer();
}
-
+
public void setLookupReply(LookupReplyPrx lookupReply)
{
_lookupReply = lookupReply;
}
-
+
public override void findObjectById(string domainId, Ice.Identity id, IceDiscovery.LookupReplyPrx reply,
Ice.Current c)
{
@@ -180,11 +180,18 @@ namespace IceDiscovery
//
// Reply to the mulicast request using the given proxy.
//
- getLookupReply(reply, c).begin_foundObjectById(id, proxy);
+ try
+ {
+ reply.begin_foundObjectById(id, proxy);
+ }
+ catch(Ice.LocalException)
+ {
+ // Ignore.
+ }
}
}
- public override void findAdapterById(string domainId, string adapterId, IceDiscovery.LookupReplyPrx reply,
+ public override void findAdapterById(string domainId, string adapterId, IceDiscovery.LookupReplyPrx reply,
Ice.Current c)
{
if(!domainId.Equals(_domainId))
@@ -199,7 +206,14 @@ namespace IceDiscovery
//
// Reply to the multicast request using the given proxy.
//
- getLookupReply(reply, c).begin_foundAdapterById(adapterId, proxy, isReplicaGroup);
+ try
+ {
+ reply.begin_foundAdapterById(adapterId, proxy, isReplicaGroup);
+ }
+ catch(Ice.LocalException)
+ {
+ // Ignore.
+ }
}
}
@@ -215,8 +229,16 @@ namespace IceDiscovery
}
if(request.addCallback(cb))
{
- _lookup.begin_findObjectById(_domainId, id, _lookupReply);
- _timer.schedule(request, _timeout);
+ try
+ {
+ _lookup.begin_findObjectById(_domainId, id, _lookupReply);
+ _timer.schedule(request, _timeout);
+ }
+ catch(Ice.LocalException)
+ {
+ request.finished(null);
+ _objectRequests.Remove(id);
+ }
}
}
}
@@ -233,8 +255,16 @@ namespace IceDiscovery
}
if(request.addCallback(cb))
{
- _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply);
- _timer.schedule(request, _timeout);
+ try
+ {
+ _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply);
+ _timer.schedule(request, _timeout);
+ }
+ catch(Ice.LocalException)
+ {
+ request.finished(null);
+ _adapterRequests.Remove(adapterId);
+ }
}
}
}
@@ -284,15 +314,20 @@ namespace IceDiscovery
if(request.retry())
{
- _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply);
- _timer.schedule(request, _timeout);
- }
- else
- {
- request.finished(null);
- _objectRequests.Remove(request.getId());
- _timer.cancel(request);
+ try
+ {
+ _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply);
+ _timer.schedule(request, _timeout);
+ return;
+ }
+ catch(Ice.LocalException)
+ {
+ }
}
+
+ request.finished(null);
+ _objectRequests.Remove(request.getId());
+ _timer.cancel(request);
}
}
@@ -308,15 +343,20 @@ namespace IceDiscovery
if(request.retry())
{
- _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply);
- _timer.schedule(request, _timeout);
- }
- else
- {
- request.finished(null);
- _adapterRequests.Remove(request.getId());
- _timer.cancel(request);
+ try
+ {
+ _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply);
+ _timer.schedule(request, _timeout);
+ return;
+ }
+ catch(Ice.LocalException)
+ {
+ }
}
+
+ request.finished(null);
+ _adapterRequests.Remove(request.getId());
+ _timer.cancel(request);
}
}
@@ -330,23 +370,6 @@ namespace IceDiscovery
return _latencyMultiplier;
}
- private LookupReplyPrx getLookupReply(LookupReplyPrx reply, Ice.Current current)
- {
- // Ice.UDPConnectionInfo info = Ice.UDPConnectionInfoPtr.dynamicCast(current.con.getInfo());
- // if(info)
- // {
- // Ice.Communicator com = current.adapter.getCommunicator();
- // ostringstream os;
- // os << "\"" << com.identityToString(reply.ice_getIdentity()) << "\"";
- // os << ":udp -h " << info.remoteAddress << " -p " << info.remotePort;
- // return LookupReplyPrx.uncheckedCast(com.stringToProxy(os.str()).ice_datagram());
- // }
- // else
- {
- return reply;
- }
- }
-
private LocatorRegistryI _registry;
private readonly LookupPrx _lookup;
private LookupReplyPrx _lookupReply;
@@ -367,7 +390,7 @@ namespace IceDiscovery
{
_lookup = lookup;
}
-
+
public override void foundObjectById(Ice.Identity id, Ice.ObjectPrx proxy, Ice.Current c)
{
_lookup.foundObject(id, proxy);
diff --git a/csharp/src/IceLocatorDiscovery/PluginI.cs b/csharp/src/IceLocatorDiscovery/PluginI.cs
index f349eaa560c..ccf9b266850 100644
--- a/csharp/src/IceLocatorDiscovery/PluginI.cs
+++ b/csharp/src/IceLocatorDiscovery/PluginI.cs
@@ -44,31 +44,55 @@ namespace IceLocatorDiscovery
invoke(Ice.LocatorPrx l)
{
_locatorPrx = l;
- Request self = this;
- l.begin_ice_invoke(_operation, _mode, _inParams, _context).whenCompleted(
- (bool ok, byte[] outParams) =>
- {
- _amdCB.ice_response(ok, outParams);
- },
- (Ice.Exception ex) =>
- {
- try
- {
- throw ex;
- }
- catch(Ice.RequestFailedException exc)
- {
- _amdCB.ice_exception(exc);
- }
- catch(Ice.UnknownException exc)
+ try
+ {
+ l.begin_ice_invoke(_operation, _mode, _inParams, _context).whenCompleted(
+ (bool ok, byte[] outParams) =>
{
- _amdCB.ice_exception(exc);
- }
- catch(Ice.Exception)
+ _amdCB.ice_response(ok, outParams);
+ },
+ (Ice.Exception ex) =>
{
- _locator.invoke(_locatorPrx, self); // Retry with new locator proxy
- }
- });
+ exception(ex);
+ });
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception(ex);
+ }
+ }
+
+ private void
+ exception(Ice.Exception ex)
+ {
+ try
+ {
+ throw ex;
+ }
+ catch(Ice.RequestFailedException exc)
+ {
+ _amdCB.ice_exception(exc);
+ }
+ catch(Ice.UnknownException exc)
+ {
+ _amdCB.ice_exception(exc);
+ }
+ catch(Ice.NoEndpointException)
+ {
+ _amdCB.ice_exception(new Ice.ObjectNotExistException());
+ }
+ catch(Ice.ObjectAdapterDeactivatedException)
+ {
+ _amdCB.ice_exception(new Ice.ObjectNotExistException());
+ }
+ catch(Ice.CommunicatorDestroyedException)
+ {
+ _amdCB.ice_exception(new Ice.ObjectNotExistException());
+ }
+ catch(Ice.Exception)
+ {
+ _locator.invoke(_locatorPrx, this); // Retry with new locator proxy
+ }
}
private readonly LocatorI _locator;
@@ -243,8 +267,20 @@ namespace IceLocatorDiscovery
if(_pendingRetryCount == 0) // No request in progress
{
_pendingRetryCount = _retryCount;
- _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
- _timer.schedule(this, _timeout);
+ try
+ {
+ _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ _timer.schedule(this, _timeout);
+ }
+ catch(Ice.LocalException)
+ {
+ foreach(Request req in _pendingRequests)
+ {
+ req.invoke(_voidLocator);
+ }
+ _pendingRequests.Clear();
+ _pendingRetryCount = 0;
+ }
}
}
}
@@ -257,19 +293,24 @@ namespace IceLocatorDiscovery
{
if(--_pendingRetryCount > 0)
{
- _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request
- _timer.schedule(this, _timeout);
- }
- else
- {
- Debug.Assert(_pendingRequests.Count > 0, "Pending requests is not empty");
- foreach(Request req in _pendingRequests)
+ try
+ {
+ _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request
+ _timer.schedule(this, _timeout);
+ return;
+ }
+ catch(Ice.LocalException)
{
- req.invoke(_voidLocator);
}
- _pendingRequests.Clear();
- _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay;
+ _pendingRetryCount = 0;
+ }
+
+ foreach(Request req in _pendingRequests)
+ {
+ req.invoke(_voidLocator);
}
+ _pendingRequests.Clear();
+ _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay;
}
}
diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
index 48b82937f11..55c2c69e7d3 100644
--- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
@@ -78,6 +78,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
outAsync.invokeCompletedAsync();
}
+ _adapter.decDirectCount(); // invokeAll won't be called, decrease the direct count.
return;
}
@@ -183,18 +184,32 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
int invokeAsyncRequest(OutgoingAsyncBase outAsync, int batchRequestNum, boolean synchronous)
{
+ //
+ // Increase the direct count to prevent the thread pool from being destroyed before
+ // invokeAll is called. This will also throw if the object adapter has been deactivated.
+ //
+ _adapter.incDirectCount();
+
int requestId = 0;
- synchronized(this)
+ try
{
- outAsync.cancelable(this); // This will throw if the request is canceled
-
- if(_response)
+ synchronized(this)
{
- requestId = ++_requestId;
- _asyncRequests.put(requestId, outAsync);
- }
+ outAsync.cancelable(this); // This will throw if the request is canceled
+
+ if(_response)
+ {
+ requestId = ++_requestId;
+ _asyncRequests.put(requestId, outAsync);
+ }
- _sendAsyncRequests.put(outAsync, requestId);
+ _sendAsyncRequests.put(outAsync, requestId);
+ }
+ }
+ catch(Exception ex)
+ {
+ _adapter.decDirectCount();
+ throw ex;
}
outAsync.attachCollocatedObserver(_adapter, requestId);
@@ -288,6 +303,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
while(invokeNum > 0)
{
+ //
+ // Increase the direct count for the dispatch. We increase it again here for
+ // each dispatch. It's important for the direct count to be > 0 until the last
+ // collocated request response is sent to make sure the thread pool isn't
+ // destroyed before.
+ //
try
{
_adapter.incDirectCount();
@@ -295,7 +316,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
catch(Ice.ObjectAdapterDeactivatedException ex)
{
handleException(requestId, ex, false);
- return;
+ break;
}
Incoming in = new Incoming(_reference.getInstance(), this, null, _adapter, _response, (byte)0,
@@ -348,6 +369,8 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
throw ex;
}
}
+
+ _adapter.decDirectCount();
}
private void
diff --git a/java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java b/java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java
index 386abe3c5f4..4d2aff7c370 100644
--- a/java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java
+++ b/java/src/IceDiscovery/src/main/java/IceDiscovery/LookupI.java
@@ -213,7 +213,14 @@ class LookupI extends _LookupDisp
//
// Reply to the mulicast request using the given proxy.
//
- reply.begin_foundObjectById(id, proxy);
+ try
+ {
+ reply.begin_foundObjectById(id, proxy);
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Ignore
+ }
}
}
@@ -233,7 +240,14 @@ class LookupI extends _LookupDisp
//
// Reply to the multicast request using the given proxy.
//
- reply.begin_foundAdapterById(adapterId, proxy, isReplicaGroup.value);
+ try
+ {
+ reply.begin_foundAdapterById(adapterId, proxy, isReplicaGroup.value);
+ }
+ catch(Ice.LocalException ex)
+ {
+ // Ignore
+ }
}
}
@@ -249,8 +263,16 @@ class LookupI extends _LookupDisp
if(request.addCallback(cb))
{
- _lookup.begin_findObjectById(_domainId, id, _lookupReply);
- request.scheduleTimer(_timeout);
+ try
+ {
+ _lookup.begin_findObjectById(_domainId, id, _lookupReply);
+ request.scheduleTimer(_timeout);
+ }
+ catch(Ice.LocalException ex)
+ {
+ request.finished(null);
+ _objectRequests.remove(id);
+ }
}
}
@@ -266,8 +288,16 @@ class LookupI extends _LookupDisp
if(request.addCallback(cb))
{
- _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply);
- request.scheduleTimer(_timeout);
+ try
+ {
+ _lookup.begin_findAdapterById(_domainId, adapterId, _lookupReply);
+ request.scheduleTimer(_timeout);
+ }
+ catch(Ice.LocalException ex)
+ {
+ request.finished(null);
+ _adapterRequests.remove(adapterId);
+ }
}
}
@@ -312,14 +342,19 @@ class LookupI extends _LookupDisp
if(request.retry())
{
- _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply);
- request.scheduleTimer(_timeout);
- }
- else
- {
- request.finished(null);
- _objectRequests.remove(request.getId());
+ try
+ {
+ _lookup.begin_findObjectById(_domainId, request.getId(), _lookupReply);
+ request.scheduleTimer(_timeout);
+ return;
+ }
+ catch(Ice.LocalException ex)
+ {
+ }
}
+
+ request.finished(null);
+ _objectRequests.remove(request.getId());
}
synchronized void
@@ -333,14 +368,19 @@ class LookupI extends _LookupDisp
if(request.retry())
{
- _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply);
- request.scheduleTimer(_timeout);
- }
- else
- {
- request.finished(null);
- _adapterRequests.remove(request.getId());
+ try
+ {
+ _lookup.begin_findAdapterById(_domainId, request.getId(), _lookupReply);
+ request.scheduleTimer(_timeout);
+ return;
+ }
+ catch(Ice.LocalException ex)
+ {
+ }
}
+
+ request.finished(null);
+ _adapterRequests.remove(request.getId());
}
private LocatorRegistryI _registry;
@@ -355,6 +395,5 @@ class LookupI extends _LookupDisp
private Map<Ice.Identity, ObjectRequest> _objectRequests = new HashMap<Ice.Identity, ObjectRequest>();
private Map<String, AdapterRequest> _adapterRequests = new HashMap<String, AdapterRequest>();
-
}
diff --git a/java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java b/java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java
index c93d4f19df3..b19bc6d59f8 100644
--- a/java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java
+++ b/java/src/IceLocatorDiscovery/src/main/java/IceLocatorDiscovery/PluginI.java
@@ -36,38 +36,63 @@ class PluginI implements Ice.Plugin
invoke(Ice.LocatorPrx l)
{
_locatorPrx = l;
- l.begin_ice_invoke(_operation, _mode, _inParams, _context,
- new Ice.Callback_Object_ice_invoke()
- {
- @Override
- public void
- response(boolean ok, byte[] outParams)
- {
- _amdCB.ice_response(ok, outParams);
- }
-
- @Override
- public void
- exception(Ice.LocalException ex)
+ try
+ {
+ l.begin_ice_invoke(_operation, _mode, _inParams, _context,
+ new Ice.Callback_Object_ice_invoke()
{
- try
- {
- throw ex;
- }
- catch(Ice.RequestFailedException exc)
- {
- _amdCB.ice_exception(ex);
- }
- catch(Ice.UnknownException exc)
+ @Override
+ public void
+ response(boolean ok, byte[] outParams)
{
- _amdCB.ice_exception(ex);
+ _amdCB.ice_response(ok, outParams);
}
- catch(Ice.LocalException exc)
+
+ @Override
+ public void
+ exception(Ice.LocalException ex)
{
- _locator.invoke(_locatorPrx, Request.this); // Retry with new locator proxy
+ Request.this.exception(ex);
}
- }
- });
+ });
+ }
+ catch(Ice.LocalException ex)
+ {
+ exception(ex);
+ }
+ }
+
+ private void
+ exception(Ice.LocalException ex)
+ {
+ try
+ {
+ throw ex;
+ }
+ catch(Ice.RequestFailedException exc)
+ {
+ _amdCB.ice_exception(ex);
+ }
+ catch(Ice.UnknownException exc)
+ {
+ _amdCB.ice_exception(ex);
+ }
+ catch(Ice.NoEndpointException exc)
+ {
+ _amdCB.ice_exception(new Ice.ObjectNotExistException());
+ }
+ catch(Ice.ObjectAdapterDeactivatedException exc)
+ {
+ _amdCB.ice_exception(new Ice.ObjectNotExistException());
+ }
+ catch(Ice.CommunicatorDestroyedException exc)
+ {
+ _amdCB.ice_exception(new Ice.ObjectNotExistException());
+ }
+ catch(Ice.LocalException exc)
+ {
+ _locator.invoke(_locatorPrx, Request.this); // Retry with new locator proxy
+ }
}
private final LocatorI _locator;
@@ -242,8 +267,20 @@ class PluginI implements Ice.Plugin
if(_pendingRetryCount == 0) // No request in progress
{
_pendingRetryCount = _retryCount;
- _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
- _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
+ try
+ {
+ _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
+ }
+ catch(Ice.LocalException ex)
+ {
+ for(Request req : _pendingRequests)
+ {
+ req.invoke(_voidLocator);
+ }
+ _pendingRequests.clear();
+ _pendingRetryCount = 0;
+ }
}
}
}
@@ -257,19 +294,24 @@ class PluginI implements Ice.Plugin
{
if(--_pendingRetryCount > 0)
{
- _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
- _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
- }
- else
- {
- assert !_pendingRequests.isEmpty();
- for(Request req : _pendingRequests)
+ try
{
- req.invoke(_voidLocator);
+ _lookup.begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ _future = _timer.schedule(_retryTask, _timeout, java.util.concurrent.TimeUnit.MILLISECONDS);
+ return;
}
- _pendingRequests.clear();
- _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay;
+ catch(Ice.LocalException ex)
+ {
+ }
+ _pendingRetryCount = 0;
+ }
+
+ for(Request req : _pendingRequests)
+ {
+ req.invoke(_voidLocator);
}
+ _pendingRequests.clear();
+ _nextRetry = IceInternal.Time.currentMonotonicTimeMillis() + _retryDelay;
}
}