summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp229
-rw-r--r--cpp/src/IceGrid/LocatorI.h34
-rw-r--r--cpp/test/IceGrid/replicaGroup/AllTests.cpp13
3 files changed, 175 insertions, 101 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index 9158def55ff..702eded7cd7 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -26,53 +26,67 @@ class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy
{
public:
- AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const string& id, const LocatorAdapterInfo& adapter) :
- _locator(locator), _id(id), _adapter(adapter)
+ AMI_Adapter_getDirectProxyI(const LocatorI::RequestPtr& request, const string& id) :
+ _request(request), _id(id)
{
}
virtual void ice_response(const ::Ice::ObjectPrx& obj)
{
assert(obj);
- _locator->getDirectProxyCallback(_adapter.proxy->ice_getIdentity(), obj);
+ _request->response(_id, obj);
}
- virtual void ice_exception(const ::Ice::Exception& ex)
- {
- _locator->getDirectProxyException(_adapter, _id, ex);
+ virtual void ice_exception(const ::Ice::Exception& e)
+ {
+ try
+ {
+ e.ice_throw();
+ }
+ catch(const AdapterNotActiveException& ex)
+ {
+ if(ex.activatable)
+ {
+ _request->activate(_id);
+ return;
+ }
+ }
+ catch(const Ice::Exception& ex)
+ {
+ }
+
+ _request->exception(_id, e);
}
private:
- const LocatorIPtr _locator;
+ const LocatorI::RequestPtr _request;
const string _id;
- const LocatorAdapterInfo _adapter;
};
class AMI_Adapter_activateI : public AMI_Adapter_activate
{
public:
- AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id, const LocatorAdapterInfo& adapter) :
- _locator(locator), _id(id), _adapter(adapter)
+ AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id) :
+ _locator(locator), _id(id)
{
}
virtual void ice_response(const ::Ice::ObjectPrx& obj)
{
- _locator->getDirectProxyCallback(_adapter.proxy->ice_getIdentity(), obj);
+ _locator->activateFinished(_id, obj);
}
virtual void ice_exception(const ::Ice::Exception& ex)
{
- _locator->getDirectProxyException(_adapter, _id, ex);
+ _locator->activateException(_id, ex);
}
private:
const LocatorIPtr _locator;
const string _id;
- const LocatorAdapterInfo _adapter;
};
//
@@ -205,29 +219,70 @@ LocatorI::Request::execute()
++_lastAdapter;
}
}
- assert(!adapters.empty());
+
for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
{
- requestAdapter(*p);
+ p->proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, p->id));
+ }
+}
+
+void
+LocatorI::Request::activate(const string& id)
+{
+ //
+ // Activate the adapter
+ //
+ // NOTE: we use a timeout large enough to ensure that the activate() call won't
+ // timeout if the server hangs in deactivation and/or activation.
+ //
+ for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ if(p->id == id)
+ {
+ _locator->activate(*p, this);
+ _activating.insert(id);
+ }
+ }
+
+ //
+ // If this is a request for a replica group, don't wait for the activation to
+ // complete. Instead, we query the next adapter which might be already active.
+ //
+ if(_replicaGroup)
+ {
+ LocatorAdapterInfo adapter;
+ {
+ Lock sync(*this);
+ if(_lastAdapter != _adapters.end())
+ {
+ adapter = *_lastAdapter;
+ ++_lastAdapter;
+ }
+ }
+ if(adapter.proxy)
+ {
+ adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id));
+ }
}
}
void
-LocatorI::Request::exception(const Ice::Exception& ex)
+LocatorI::Request::exception(const string& id, const Ice::Exception& ex)
{
LocatorAdapterInfo adapter;
{
Lock sync(*this);
-
if(!_exception.get())
{
_exception.reset(ex.ice_clone());
}
+
+ _activating.erase(id);
if(_lastAdapter == _adapters.end())
{
--_count; // Expect one less adapter proxy if there's no more adapters to query.
-
+
//
// If we received all the required proxies, it's time to send the
// answer back to the client.
@@ -236,7 +291,6 @@ LocatorI::Request::exception(const Ice::Exception& ex)
{
sendResponse();
}
- return;
}
else
{
@@ -244,16 +298,28 @@ LocatorI::Request::exception(const Ice::Exception& ex)
++_lastAdapter;
}
}
- requestAdapter(adapter);
+
+ if(adapter.proxy)
+ {
+ adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id));
+ }
}
void
-LocatorI::Request::response(const Ice::ObjectPrx& proxy)
+LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy)
{
+ if(!proxy)
+ {
+ exception(id, AdapterNotActiveException());
+ return;
+ }
+
Lock sync(*this);
assert(proxy);
- _proxies.push_back(proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy")));
+ _activating.erase(id);
+
+ _proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
//
// If we received all the required proxies, it's time to send the
@@ -266,22 +332,11 @@ LocatorI::Request::response(const Ice::ObjectPrx& proxy)
}
void
-LocatorI::Request::requestAdapter(const LocatorAdapterInfo& adapter)
-{
- assert(adapter.proxy);
- if(_locator->getDirectProxyRequest(this, adapter))
- {
- AMI_Adapter_getDirectProxyPtr amiCB = new AMI_Adapter_getDirectProxyI(_locator, _id, adapter);
- adapter.proxy->getDirectProxy_async(amiCB);
- }
-}
-
-void
LocatorI::Request::sendResponse()
{
if(_proxies.size() == 1)
{
- _amdCB->ice_response(_proxies.back());
+ _amdCB->ice_response(_proxies.begin()->second);
}
else if(_proxies.empty())
{
@@ -304,10 +359,19 @@ LocatorI::Request::sendResponse()
{
Ice::EndpointSeq endpoints;
endpoints.reserve(_proxies.size());
- for(vector<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
+ for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
{
- Ice::EndpointSeq edpts = (*p)->ice_getEndpoints();
- endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ map<string, Ice::ObjectPrx>::const_iterator q = _proxies.find(p->id);
+ if(q != _proxies.end())
+ {
+ Ice::EndpointSeq edpts = q->second->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ }
+ }
+
+ for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q)
+ {
+ _locator->cancelActivate(*q, this);
}
Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
@@ -442,54 +506,55 @@ LocatorI::getLocalQuery(const Ice::Current&) const
return _localQuery;
}
-bool
-LocatorI::getDirectProxyRequest(const RequestPtr& request, const LocatorAdapterInfo& adapter)
+const Ice::CommunicatorPtr&
+LocatorI::getCommunicator() const
{
- Lock sync(*this);
-
- //
- // Check if there's already pending requests for this adapter. If that's the case,
- // we just add this one to the queue. If not, we add it to the queue and initiate
- // a call on the adapter to get its direct proxy.
- //
- PendingRequestsMap::iterator p;
- p = _pendingRequests.insert(make_pair(adapter.proxy->ice_getIdentity(), PendingRequests())).first;
- p->second.push_back(request);
- return p->second.size() == 1;
+ return _communicator;
}
void
-LocatorI::getDirectProxyException(const LocatorAdapterInfo& adpt, const string& id, const Ice::Exception& ex)
+LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request)
{
- try
- {
- ex.ice_throw();
- }
- catch(const AdapterNotActiveException& ex)
{
- if(ex.activatable)
+ Lock sync(*this);
+
+ //
+ // Check if there's already pending requests for this adapter. If that's the case,
+ // we just add this one to the queue. If not, we add it to the queue and initiate
+ // a call on the adapter to get its direct proxy.
+ //
+ PendingRequestsMap::iterator p;
+ p = _pendingRequests.insert(make_pair(adapter.id, PendingRequests())).first;
+ p->second.insert(request);
+ if(p->second.size() != 1)
{
- //
- // Activate the adapter if it can be activated on demand.
- //
- // NOTE: we use a timeout large enough to ensure that the
- // activate() call won't timeout if the server hangs in
- // deactivation and/or activation.
- //
- AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, id, adpt);
- int timeout = adpt.activationTimeout + adpt.deactivationTimeout;
- AdapterPrx::uncheckedCast(adpt.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB);
return;
}
}
- catch(const Ice::Exception&)
+
+ AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter.id);
+ int timeout = adapter.activationTimeout + adapter.deactivationTimeout;
+ AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB);
+}
+
+void
+LocatorI::cancelActivate(const string& id, const RequestPtr& request)
+{
+ Lock sync(*this);
+ PendingRequestsMap::iterator p = _pendingRequests.find(id);
+ if(p != _pendingRequests.end())
{
+ p->second.erase(request);
}
+}
+void
+LocatorI::activateFinished(const string& id, const Ice::ObjectPrx& proxy)
+{
PendingRequests requests;
{
Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(adpt.proxy->ice_getIdentity());
+ PendingRequestsMap::iterator p = _pendingRequests.find(id);
assert(p != _pendingRequests.end());
requests.swap(p->second);
_pendingRequests.erase(p);
@@ -497,40 +562,24 @@ LocatorI::getDirectProxyException(const LocatorAdapterInfo& adpt, const string&
for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
{
- (*q)->exception(ex);
+ (*q)->response(id, proxy);
}
}
void
-LocatorI::getDirectProxyCallback(const Ice::Identity& adapterId, const Ice::ObjectPrx& proxy)
+LocatorI::activateException(const string& id, const Ice::Exception& ex)
{
PendingRequests requests;
{
Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(adapterId);
+ PendingRequestsMap::iterator p = _pendingRequests.find(id);
assert(p != _pendingRequests.end());
requests.swap(p->second);
_pendingRequests.erase(p);
}
- if(proxy)
- {
- for(PendingRequests::const_iterator q = requests.begin(); q != requests.end(); ++q)
- {
- (*q)->response(proxy);
- }
- }
- else
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
{
- for(PendingRequests::const_iterator q = requests.begin(); q != requests.end(); ++q)
- {
- (*q)->exception(AdapterNotActiveException());
- }
+ (*q)->exception(id, ex);
}
}
-
-const Ice::CommunicatorPtr&
-LocatorI::getCommunicator() const
-{
- return _communicator;
-}
diff --git a/cpp/src/IceGrid/LocatorI.h b/cpp/src/IceGrid/LocatorI.h
index e36741631f9..f0f97239c83 100644
--- a/cpp/src/IceGrid/LocatorI.h
+++ b/cpp/src/IceGrid/LocatorI.h
@@ -13,6 +13,8 @@
#include <IceGrid/Internal.h>
#include <IceGrid/Locator.h>
+#include <set>
+
namespace IceGrid
{
@@ -30,6 +32,8 @@ typedef std::vector<LocatorAdapterInfo> LocatorAdapterInfoSeq;
class LocatorI : public Locator, public IceUtil::Mutex
{
+public:
+
class Request : public IceUtil::Mutex, public IceUtil::Shared
{
public:
@@ -38,8 +42,15 @@ class LocatorI : public Locator, public IceUtil::Mutex
const LocatorAdapterInfoSeq&, int, const TraceLevelsPtr&);
void execute();
- void response(const Ice::ObjectPrx&);
- void exception(const Ice::Exception&);
+ void response(const std::string&, const Ice::ObjectPrx&);
+ void activate(const std::string&);
+ void exception(const std::string&, const Ice::Exception&);
+
+ virtual bool
+ operator<(const Request& r) const
+ {
+ return this < &r;
+ }
private:
@@ -54,13 +65,12 @@ class LocatorI : public Locator, public IceUtil::Mutex
const TraceLevelsPtr _traceLevels;
unsigned int _count;
LocatorAdapterInfoSeq::const_iterator _lastAdapter;
- std::vector<Ice::ObjectPrx> _proxies;
+ std::map<std::string, Ice::ObjectPrx> _proxies;
std::auto_ptr<Ice::Exception> _exception;
+ std::set<std::string> _activating;
};
typedef IceUtil::Handle<Request> RequestPtr;
-public:
-
LocatorI(const Ice::CommunicatorPtr&, const DatabasePtr&, const Ice::LocatorRegistryPrx&, const RegistryPrx&,
const QueryPrx&);
@@ -73,13 +83,15 @@ public:
virtual Ice::LocatorRegistryPrx getRegistry(const Ice::Current&) const;
virtual RegistryPrx getLocalRegistry(const Ice::Current&) const;
virtual QueryPrx getLocalQuery(const Ice::Current&) const;
-
- bool getDirectProxyRequest(const RequestPtr&, const LocatorAdapterInfo&);
- void getDirectProxyException(const LocatorAdapterInfo&, const std::string&, const Ice::Exception&);
- void getDirectProxyCallback(const Ice::Identity&, const Ice::ObjectPrx&);
const Ice::CommunicatorPtr& getCommunicator() const;
+ void activate(const LocatorAdapterInfo&, const RequestPtr&);
+ void cancelActivate(const std::string&, const RequestPtr&);
+
+ void activateFinished(const std::string&, const Ice::ObjectPrx&);
+ void activateException(const std::string&, const Ice::Exception&);
+
protected:
const Ice::CommunicatorPtr _communicator;
@@ -88,8 +100,8 @@ protected:
const RegistryPrx _localRegistry;
const QueryPrx _localQuery;
- typedef std::vector<RequestPtr> PendingRequests;
- typedef std::map<Ice::Identity, PendingRequests> PendingRequestsMap;
+ typedef std::set<RequestPtr> PendingRequests;
+ typedef std::map<std::string, PendingRequests> PendingRequestsMap;
PendingRequestsMap _pendingRequests;
};
diff --git a/cpp/test/IceGrid/replicaGroup/AllTests.cpp b/cpp/test/IceGrid/replicaGroup/AllTests.cpp
index 1bef85f3b3d..866b5f23580 100644
--- a/cpp/test/IceGrid/replicaGroup/AllTests.cpp
+++ b/cpp/test/IceGrid/replicaGroup/AllTests.cpp
@@ -99,6 +99,19 @@ instantiateServer(const AdminPrx& admin, const string& templ, const string& node
cerr << ex << endl;
test(false);
}
+
+ assert(params.find("id") != params.end());
+ try
+ {
+ admin->startServer(params.find("id")->second);
+ }
+ catch(const NodeUnreachableException&)
+ {
+ }
+ catch(const Ice::Exception&)
+ {
+ test(false);
+ }
}
void