summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp297
-rw-r--r--cpp/src/IceGrid/LocatorI.h19
2 files changed, 152 insertions, 164 deletions
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index 7cbcec60d68..d8162b85709 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -26,67 +26,51 @@ class AMI_Adapter_getDirectProxyI : public AMI_Adapter_getDirectProxy
{
public:
- AMI_Adapter_getDirectProxyI(const LocatorI::RequestPtr& request, const string& id) :
- _request(request), _id(id)
+ AMI_Adapter_getDirectProxyI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) :
+ _locator(locator), _adapter(adapter)
{
}
virtual void ice_response(const ::Ice::ObjectPrx& obj)
{
assert(obj);
- _request->response(_id, obj);
+ _locator->getDirectProxyResponse(_adapter, obj);
}
virtual void ice_exception(const ::Ice::Exception& e)
{
- try
- {
- e.ice_throw();
- }
- catch(const AdapterNotActiveException& ex)
- {
- if(ex.activatable)
- {
- _request->activate(_id);
- return;
- }
- }
- catch(const Ice::Exception&)
- {
- }
-
- _request->exception(_id, e);
+ _locator->getDirectProxyException(_adapter, e);
}
private:
- const LocatorI::RequestPtr _request;
- const string _id;
+ const LocatorIPtr _locator;
+ const LocatorAdapterInfo _adapter;
};
class AMI_Adapter_activateI : public AMI_Adapter_activate
{
public:
- AMI_Adapter_activateI(const LocatorIPtr& locator, const string& id) :
- _locator(locator), _id(id)
+ AMI_Adapter_activateI(const LocatorIPtr& locator, const LocatorAdapterInfo& adapter) :
+ _locator(locator), _adapter(adapter)
{
}
virtual void ice_response(const ::Ice::ObjectPrx& obj)
{
- _locator->activateFinished(_id, obj);
+ _locator->getDirectProxyResponse(_adapter, obj);
}
virtual void ice_exception(const ::Ice::Exception& ex)
{
- _locator->activateException(_id, ex);
+ _locator->getDirectProxyException(_adapter, ex);
}
private:
const LocatorIPtr _locator;
- const string _id;
+ const LocatorAdapterInfo _adapter;
};
//
@@ -223,50 +207,35 @@ LocatorI::Request::execute()
for(LocatorAdapterInfoSeq::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
{
- p->proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, p->id));
+ if(_locator->getDirectProxy(*p, this))
+ {
+ activating();
+ }
}
}
void
-LocatorI::Request::activate(const string& id)
+LocatorI::Request::activating()
{
//
- // Activate the adapter
- //
- // NOTE: we use a timeout large enough to ensure that the activate() call won't
- // timeout if the server hangs in deactivation and/or activation.
- //
- {
- Lock sync(*this);
- for(LocatorAdapterInfoSeq::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
- {
- if(p->id == id)
- {
- _locator->activate(*p, this);
- _activating.insert(id);
- }
- }
- }
-
- //
- // If this is a request for a replica group, don't wait for the activation to
- // complete. Instead, we query the next adapter which might be already active.
+ // An adapter is being activated. If this is a request for a replica group, don't
+ // wait for the activation to complete. Instead, we query the next adapter which
+ // might be already active.
//
if(_replicaGroup)
{
LocatorAdapterInfo adapter;
+ do
{
Lock sync(*this);
- if(_lastAdapter != _adapters.end())
+ if(_lastAdapter == _adapters.end())
{
- adapter = *_lastAdapter;
- ++_lastAdapter;
+ break;
}
+ adapter = *_lastAdapter;
+ ++_lastAdapter;
}
- if(adapter.proxy)
- {
- adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id));
- }
+ while(_locator->getDirectProxy(adapter, this));
}
}
@@ -276,13 +245,15 @@ LocatorI::Request::exception(const string& id, const Ice::Exception& ex)
LocatorAdapterInfo adapter;
{
Lock sync(*this);
+ if(_proxies.size() == _count) // Nothing to do if we already sent the response.
+ {
+ return;
+ }
if(!_exception.get())
{
_exception.reset(ex.ice_clone());
}
- _activating.erase(id);
-
if(_lastAdapter == _adapters.end())
{
--_count; // Expect one less adapter proxy if there's no more adapters to query.
@@ -305,7 +276,10 @@ LocatorI::Request::exception(const string& id, const Ice::Exception& ex)
if(adapter.proxy)
{
- adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter.id));
+ if(_locator->getDirectProxy(adapter, this))
+ {
+ activating();
+ }
}
}
@@ -319,10 +293,11 @@ LocatorI::Request::response(const string& id, const Ice::ObjectPrx& proxy)
}
Lock sync(*this);
- assert(proxy);
-
- _activating.erase(id);
-
+ if(_proxies.size() == _count) // Nothing to do if we already sent the response.
+ {
+ return;
+ }
+
_proxies[id] = proxy->ice_identity(_locator->getCommunicator()->stringToIdentity("dummy"));
//
@@ -397,18 +372,13 @@ LocatorI::Request::sendResponse()
}
}
- for(set<string>::const_iterator q = _activating.begin(); q != _activating.end(); ++q)
- {
- _locator->cancelActivate(*q, this);
- }
-
Ice::ObjectPrx proxy = _locator->getCommunicator()->stringToProxy("dummy:default");
_amdCB->ice_response(proxy->ice_endpoints(endpoints));
}
if(_roundRobin)
{
- _locator->removePendingResolve(_id, roundRobinCount);
+ _locator->removePendingRoundRobinRequest(_id, roundRobinCount);
}
}
@@ -475,7 +445,8 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
{
LocatorIPtr self = const_cast<LocatorI*>(this);
bool pending = false;
- if(self->addPendingResolve(id, cb, true, pending)) // Add only if there's already requests pending.
+ if(self->addPendingRoundRobinRequest(id, cb, true, pending)) // Add only if there's already round robin requests
+ // pending.
{
//
// Another request is currently resolving the adapter endpoints. We'll
@@ -511,14 +482,14 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
//
if(roundRobin)
{
- if(self->addPendingResolve(id, cb, false, pending))
+ if(self->addPendingRoundRobinRequest(id, cb, false, pending))
{
return;
}
}
else if(pending)
{
- self->removePendingResolve(id, 0);
+ self->removePendingRoundRobinRequest(id, 0);
}
RequestPtr request = new Request(cb, self, id, replicaGroup, roundRobin, adapters, count);
@@ -528,7 +499,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
{
if(pending)
{
- self->removePendingResolve(id, 0);
+ self->removePendingRoundRobinRequest(id, 0);
}
try
@@ -545,7 +516,7 @@ LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& cb,
{
if(pending)
{
- self->removePendingResolve(id, 0);
+ self->removePendingRoundRobinRequest(id, 0);
}
const TraceLevelsPtr traceLevels = _database->getTraceLevels();
@@ -596,85 +567,11 @@ LocatorI::getTraceLevels() const
return _database->getTraceLevels();
}
-void
-LocatorI::activate(const LocatorAdapterInfo& adapter, const RequestPtr& request)
-{
- {
- Lock sync(*this);
-
- //
- // Check if there's already pending requests for this adapter. If that's the case,
- // we just add this one to the queue. If not, we add it to the queue and initiate
- // a call on the adapter to get its direct proxy.
- //
- PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
- if(p != _pendingRequests.end())
- {
- p->second.insert(request);
- return;
- }
-
- p = _pendingRequests.insert(make_pair(adapter.id, PendingRequests())).first;
- p->second.insert(request);
- }
-
- AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter.id);
- int timeout = adapter.activationTimeout + adapter.deactivationTimeout;
- AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB);
-}
-
-void
-LocatorI::cancelActivate(const string& id, const RequestPtr& request)
-{
- Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(id);
- if(p != _pendingRequests.end())
- {
- p->second.erase(request);
- }
-}
-
-void
-LocatorI::activateFinished(const string& id, const Ice::ObjectPrx& proxy)
-{
- PendingRequests requests;
- {
- Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(id);
- assert(p != _pendingRequests.end());
- requests.swap(p->second);
- _pendingRequests.erase(p);
- }
-
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
- {
- (*q)->response(id, proxy);
- }
-}
-
-void
-LocatorI::activateException(const string& id, const Ice::Exception& ex)
-{
- PendingRequests requests;
- {
- Lock sync(*this);
- PendingRequestsMap::iterator p = _pendingRequests.find(id);
- assert(p != _pendingRequests.end());
- requests.swap(p->second);
- _pendingRequests.erase(p);
- }
-
- for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
- {
- (*q)->exception(id, ex);
- }
-}
-
bool
-LocatorI::addPendingResolve(const string& adapterId,
- const Ice::AMD_Locator_findAdapterByIdPtr& cb,
- bool addIfExists,
- bool& pending)
+LocatorI::addPendingRoundRobinRequest(const string& adapterId,
+ const Ice::AMD_Locator_findAdapterByIdPtr& cb,
+ bool addIfExists,
+ bool& pending)
{
Lock sync(*this);
pending = false;
@@ -698,7 +595,7 @@ LocatorI::addPendingResolve(const string& adapterId,
}
void
-LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount)
+LocatorI::removePendingRoundRobinRequest(const string& adapterId, int roundRobinCount)
{
Ice::AMD_Locator_findAdapterByIdPtr cb;
{
@@ -741,3 +638,97 @@ LocatorI::removePendingResolve(const string& adapterId, int roundRobinCount)
findAdapterById_async(cb, adapterId);
}
}
+
+bool
+LocatorI::getDirectProxy(const LocatorAdapterInfo& adapter, const RequestPtr& request)
+{
+ {
+ Lock sync(*this);
+ PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
+ if(p != _pendingRequests.end())
+ {
+ p->second.push_back(request);
+ return _activating.find(adapter.id) != _activating.end();
+ }
+
+ PendingRequests requests;
+ requests.push_back(request);
+ _pendingRequests.insert(make_pair(adapter.id, requests));
+ }
+
+ adapter.proxy->getDirectProxy_async(new AMI_Adapter_getDirectProxyI(this, adapter));
+ return false;
+}
+
+void
+LocatorI::getDirectProxyResponse(const LocatorAdapterInfo& adapter, const Ice::ObjectPrx& proxy)
+{
+ PendingRequests requests;
+ {
+ Lock sync(*this);
+ PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
+ assert(p != _pendingRequests.end());
+ requests.swap(p->second);
+ _pendingRequests.erase(p);
+ _activating.erase(adapter.id);
+ }
+
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->response(adapter.id, proxy);
+ }
+}
+
+void
+LocatorI::getDirectProxyException(const LocatorAdapterInfo& adapter, const Ice::Exception& ex)
+{
+ bool activate = false;
+ try
+ {
+ ex.ice_throw();
+ }
+ catch(const AdapterNotActiveException& e)
+ {
+ activate = e.activatable;
+ }
+ catch(const Ice::Exception&)
+ {
+ }
+
+ PendingRequests requests;
+ {
+ Lock sync(*this);
+ PendingRequestsMap::iterator p = _pendingRequests.find(adapter.id);
+ assert(p != _pendingRequests.end());
+ if(activate)
+ {
+ _activating.insert(adapter.id);
+ requests = p->second;
+ }
+ else
+ {
+ requests.swap(p->second);
+ _pendingRequests.erase(p);
+ _activating.erase(adapter.id);
+ }
+ }
+
+ if(activate)
+ {
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->activating();
+ }
+
+ AMI_Adapter_activatePtr amiCB = new AMI_Adapter_activateI(this, adapter);
+ int timeout = adapter.activationTimeout + adapter.deactivationTimeout;
+ AdapterPrx::uncheckedCast(adapter.proxy->ice_timeout(timeout * 1000))->activate_async(amiCB);
+ }
+ else
+ {
+ for(PendingRequests::iterator q = requests.begin(); q != requests.end(); ++q)
+ {
+ (*q)->exception(adapter.id, ex);
+ }
+ }
+}
diff --git a/cpp/src/IceGrid/LocatorI.h b/cpp/src/IceGrid/LocatorI.h
index 9b66c51ac99..452afd2c03b 100644
--- a/cpp/src/IceGrid/LocatorI.h
+++ b/cpp/src/IceGrid/LocatorI.h
@@ -43,7 +43,7 @@ public:
void execute();
void response(const std::string&, const Ice::ObjectPrx&);
- void activate(const std::string&);
+ void activating();
void exception(const std::string&, const Ice::Exception&);
virtual bool
@@ -68,7 +68,6 @@ public:
LocatorAdapterInfoSeq::const_iterator _lastAdapter;
std::map<std::string, Ice::ObjectPrx> _proxies;
std::auto_ptr<Ice::Exception> _exception;
- std::set<std::string> _activating;
};
typedef IceUtil::Handle<Request> RequestPtr;
@@ -88,14 +87,12 @@ public:
const Ice::CommunicatorPtr& getCommunicator() const;
const TraceLevelsPtr& getTraceLevels() const;
- void activate(const LocatorAdapterInfo&, const RequestPtr&);
- void cancelActivate(const std::string&, const RequestPtr&);
+ bool addPendingRoundRobinRequest(const std::string&, const Ice::AMD_Locator_findAdapterByIdPtr&, bool, bool&);
+ void removePendingRoundRobinRequest(const std::string&, int);
- void activateFinished(const std::string&, const Ice::ObjectPrx&);
- void activateException(const std::string&, const Ice::Exception&);
-
- bool addPendingResolve(const std::string&, const Ice::AMD_Locator_findAdapterByIdPtr&, bool, bool&);
- void removePendingResolve(const std::string&, int);
+ bool getDirectProxy(const LocatorAdapterInfo&, const RequestPtr&);
+ void getDirectProxyResponse(const LocatorAdapterInfo&, const Ice::ObjectPrx&);
+ void getDirectProxyException(const LocatorAdapterInfo&, const Ice::Exception&);
protected:
@@ -105,10 +102,10 @@ protected:
const RegistryPrx _localRegistry;
const QueryPrx _localQuery;
- typedef std::set<RequestPtr> PendingRequests;
+ typedef std::vector<RequestPtr> PendingRequests;
typedef std::map<std::string, PendingRequests> PendingRequestsMap;
-
PendingRequestsMap _pendingRequests;
+ std::set<std::string> _activating;
std::map<std::string, std::deque<Ice::AMD_Locator_findAdapterByIdPtr> > _resolves;
};