summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/LocatorI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/LocatorI.cpp')
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp229
1 files changed, 139 insertions, 90 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index 9158def55ff..702eded7cd7 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -26,53 +26,67 @@ class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy
{
public:
- AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const string& id, const LocatorAdapterInfo& adapter) :
- _locator(locator), _id(id), _adapter(adapter)
+ AMI_Adapter_getDirectProxyI(const LocatorI::RequestPtr& request, const string& id) :
+ _request(request), _id(id)
{
}
virtual void ice_response(const ::Ice::ObjectPrx& obj)
{
assert(obj);
- _locator->getDirectProxyCallback(_adapter.proxy->ice_getIdentity(), obj);
+ _request->response(_id, obj);
}
- virtual void ice_exception(const ::Ice::Exception& ex)
- {
- _locator->getDirectProxyException(_adapter, _id, ex);
+ virtual void ice_exception(const ::Ice::Exception& e)
+ {
+ try
+ {
+ e.ice_throw();
+ }
+ catch(const AdapterNotActiveException& ex)
+ {
+ if(ex.activatable)
+ {
+ _request->activate(_id);
+ return;
+ }
+ }
+ catch(const Ice::Exception& ex)
+ {
+ }
+
+ _request->exception(_id, e);
}
private:
- const LocatorIPtr _locator;
+ const LocatorI::RequestPtr _request;
const string _id;
- const LocatorAdapterInfo _adapter;
};
class AMI_Adapter_activateI : public AMI_Adapter_activate
{
public:
- AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id, const LocatorAdapterInfo& adapter) :
- _locator(locator), _id(id), _adapter(adapter)
+ AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id) :
+ _locator(locator), _id(id)
{
}
virtual void ice_response(const ::Ice::ObjectPrx& obj)
{
- _locator->getDirectProxyCallback(_adapter.proxy->ice_getIdentity(), obj);
+ _locator->activateFinished(_id, obj);
}
virtual void ice_exception(const ::Ice::Exception& ex)
{
- _locator->getDirectProxyException(_adapter, _id, ex);
+ _locator->activateException(_id, ex);
}
private:
const LocatorIPtr _locator;
const string _id;
- const LocatorAdapterInfo _adapter;
};
//
@@ -205,29 +219,70 @@ LocatorI::Request::execute()
++_lastAdapter;
}
}
- assert(!adapters.empty());
+
for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
{
- requestAdapter(*p);
+ p->proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, p->id));
+ }
+}
+
+void
+LocatorI::Request::activate(const string& id)
+{
+ //
+ // Activate the adapter
+ //
+ // NOTE: we use a timeout large enough to ensure that the activate() call won't
+ // timeout if the server hangs in deactivation and/or activation.
+ //
+ for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ if(p->id == id)
+ {
+ _locator->activate(*p, this);
+ _activating.insert(id);
+ }
+ }
+
+ //
+ // If this is a request for a replica group, don't wait for the activation to
+ // complete. Instead, we query the next adapter which might be already active.
+ //
+ if(_replicaGroup)
+ {
+ LocatorAdapterInfo adapter;
+ {
+ Lock sync(*this);
+ if(_lastAdapter != _adapters.end())
+ {
+ adapter = *_lastAdapter;
+ ++_lastAdapter;
+ }
+ }
+ if(adapter.proxy)
+ {
+ adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id));
+ }
}
}
void
-LocatorI::Request::exception(const Ice::Exception& ex)
+LocatorI::Request::exception(const string& id, const Ice::Exception& ex)
{
LocatorAdapterInfo adapter;
{
Lock sync(*this);
-
if(!_exception.get())
{
_exception.reset(ex.ice_clone());
}
+
+ _activating.erase(id);
if(_lastAdapter == _adapters.end())
{
--_count; // Expect one less adapter proxy if there's no more adapters to query.
-
+
//
// If we received all the required proxies, it's time to send the
// answer back to the client.
@@ -236,7 +291,6 @@ LocatorI::Request::exception(const Ice::Exception& ex)
{
sendResponse();
}
- return;
}
else
{
@@ -244,16 +298,28 @@ LocatorI::Request::exception(const Ice::Exception& ex)
++_lastAdapter;
}
}
- requestAdapter(adapter);
+
+ if(adapter.proxy)
+ {
+ adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id));
+ }
}
void
-LocatorI::Request::response(const Ice::ObjectPrx& proxy)
+LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy)
{
+ if(!proxy)
+ {
+ exception(id, AdapterNotActiveException());
+ return;
+ }
+
Lock sync(*this);
assert(proxy);
- _proxies.push_back(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")));
+ _activating.erase(id);
+
+ _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
//
// If we received all the required proxies, it's time to send the
@@ -266,22 +332,11 @@ LocatorI::Request::response(const Ice::ObjectPrx& proxy)
}
void
-LocatorI::Request::requestAdapter(const LocatorAdapterInfo& adapter)
-{
- assert(adapter.proxy);
- if(_locator->getDirectProxyRequest(this, adapter))
- {
- AMI_Adapter_getDirectProxyPtr amiCB = new AMI_Adapter_getDirectProxyI(_locator, _id, adapter);
- adapter.proxy->getDirectProxy_async(amiCB);
- }
-}
-
-void
LocatorI::Request::sendResponse()
{
if(_proxies.size() == 1)
{
- _amdCB->ice_response(_proxies.back());
+ _amdCB->ice_response(_proxies.begin()->second);
}
else if(_proxies.empty())
{
@@ -304,10 +359,19 @@ LocatorI::Request::sendResponse()
{
Ice::EndpointSeq endpoints;
endpoints.reserve(_proxies.size());
- for(vector<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
+ for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
{
- Ice::EndpointSeq edpts = (*p)->ice_getEndpoints();
- endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id);
+ if(q != _proxies.end())
+ {
+ Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ }
+ }
+
+ for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q)
+ {
+ _locator->cancelActivate(*q, this);
}
Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
@@ -442,54 +506,55 @@ LocatorI::getLocalQuery(const Ice::Current&) const
return _localQuery;
}
-bool
-LocatorI::getDirectProxyRequest(const RequestPtr& request, const LocatorAdapterInfo& adapter)
+const Ice::CommunicatorPtr&
+LocatorI::getCommunicator() const
{
- Lock sync(*this);
-
- //
- // Check if there's already pending requests for this adapter. If that's the case,
- // we just add this one to the queue. If not, we add it to the queue and initiate
- // a call on the adapter to get its direct proxy.
- //
- PendingRequestsMap::iterator p;
- p = _pendingRequests.insert(make_pair(adapter.proxy->ice_getIdentity(), PendingRequests())).first;
- p->second.push_back(request);
- return p->second.size() == 1;
+ return _communicator;
}
void
-LocatorI::getDirectProxyException(const LocatorAdapterInfo& adpt, const string& id, const Ice::Exception& ex)
+LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request)
{
- try
- {
- ex.ice_throw();
- }
- catch(const AdapterNotActiveException& ex)
{
- if(ex.activatable)
+ Lock sync(*this);
+
+ //
+ // Check if there's already pending requests for this adapter. If that's the case,
+ // we just add this one to the queue. If not, we add it to the queue and initiate
+ // a call on the adapter to get its direct proxy.
+ //
+ PendingRequestsMap::iterator p;
+ p = _pendingRequests.insert(make_pair(adapter.id, PendingRequests())).first;
+ p->second.insert(request);
+ if(p->second.size() != 1)
{
- //
- // Activate the adapter if it can be activated on demand.
- //
- // NOTE: we use a timeout large enough to ensure that the
- // activate() call won't timeout if the server hangs in
- // deactivation and/or activation.
- //
- AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, id, adpt);
- int timeout = adpt.activationTimeout + adpt.deactivationTimeout;
- AdapterPrx::uncheckedCast(adpt.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB);
return;
}
}
- catch(const Ice::Exception&)
+
+ AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter.id);
+ int timeout = adapter.activationTimeout + adapter.deactivationTimeout;
+ AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB);
+}
+
+void
+LocatorI::cancelActivate(const string& id, const RequestPtr& request)
+{
+ Lock sync(*this);
+ PendingRequestsMap::iterator p = _pendingRequests.find(id);
+ if(p != _pendingRequests.end())
{
+ p->second.erase(request);
}
+}
+void
+LocatorI::activateFinished(const string& id, const Ice::ObjectPrx& proxy)
+{
PendingRequests requests;
{
Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(adpt.proxy->ice_getIdentity());
+ PendingRequestsMap::iterator p = _pendingRequests.find(id);
assert(p != _pendingRequests.end());
requests.swap(p->second);
_pendingRequests.erase(p);
@@ -497,40 +562,24 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adpt, const string&
for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
{
- (*q)->exception(ex);
+ (*q)->response(id, proxy);
}
}
void
-LocatorI::getDirectProxyCallback(const Ice::Identity& adapterId, const Ice::ObjectPrx& proxy)
+LocatorI::activateException(const string& id, const Ice::Exception& ex)
{
PendingRequests requests;
{
Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(adapterId);
+ PendingRequestsMap::iterator p = _pendingRequests.find(id);
assert(p != _pendingRequests.end());
requests.swap(p->second);
_pendingRequests.erase(p);
}
- if(proxy)
- {
- for(PendingRequests::const_iterator q = requests.begin(); q != requests.end(); ++q)
- {
- (*q)->response(proxy);
- }
- }
- else
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
{
- for(PendingRequests::const_iterator q = requests.begin(); q != requests.end(); ++q)
- {
- (*q)->exception(AdapterNotActiveException());
- }
+ (*q)->exception(id, ex);
}
}
-
-const Ice::CommunicatorPtr&
-LocatorI::getCommunicator() const
-{
- return _communicator;
-}