summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/AdapterCache.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2010-02-25 11:29:46 +0100
committerBenoit Foucher <benoit@zeroc.com>2010-02-25 11:29:46 +0100
commit47a5c2f2a2ea021b960b8da8709670cac07af5a4 (patch)
tree008b7225e18472f81b1380aab287800051e14689 /cpp/src/IceGrid/AdapterCache.cpp
parentbug 4071 - revert renaming of createStringConverter (diff)
downloadice-47a5c2f2a2ea021b960b8da8709670cac07af5a4.tar.bz2
ice-47a5c2f2a2ea021b960b8da8709670cac07af5a4.tar.xz
ice-47a5c2f2a2ea021b960b8da8709670cac07af5a4.zip
Fixed bug 4695 - IceStorm/repgrid test failure
Diffstat (limited to 'cpp/src/IceGrid/AdapterCache.cpp')
-rw-r--r--cpp/src/IceGrid/AdapterCache.cpp167
1 files changed, 163 insertions, 4 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp
index 421d841d465..837a6386584 100644
--- a/cpp/src/IceGrid/AdapterCache.cpp
+++ b/cpp/src/IceGrid/AdapterCache.cpp
@@ -66,6 +66,113 @@ struct TransformToReplica : public unary_function<const pair<string, ServerAdapt
}
};
+class ReplicaGroupSyncCallback : public SynchronizationCallback, public IceUtil::Mutex
+{
+public:
+
+ ReplicaGroupSyncCallback(const SynchronizationCallbackPtr& callback, int count, int nReplicas) :
+ _callback(callback),
+ _responseCalled(false),
+ _synchronizeCount(count),
+ _synchronizedCount(0),
+ _nReplicas(nReplicas > count ? count : nReplicas)
+ {
+ }
+
+ bool
+ response()
+ {
+ Lock sync(*this);
+ _responseCalled = true;
+ if(_synchronizedCount >= _nReplicas)
+ {
+ _callback = 0;
+ return false;
+ }
+ else if(_synchronizeCount == 0)
+ {
+ if(_synchronizedCount == 0 && _exception.get())
+ {
+ _exception->ice_throw();
+ }
+ _callback = 0;
+ return false;
+ }
+ return true;
+ }
+
+ void
+ synchronized()
+ {
+ SynchronizationCallbackPtr callback;
+ {
+ Lock sync(*this);
+ ++_synchronizedCount;
+ --_synchronizeCount;
+
+ if(!_responseCalled)
+ {
+ return;
+ }
+
+ if(_synchronizedCount < _nReplicas && _synchronizeCount > 0)
+ {
+ return;
+ }
+
+ callback = _callback;
+ _callback = 0;
+ }
+
+ if(callback)
+ {
+ callback->synchronized();
+ }
+ }
+
+ void
+ synchronized(const Ice::Exception& ex)
+ {
+ SynchronizationCallbackPtr callback;
+ {
+ Lock sync(*this);
+ if(!_exception.get())
+ {
+ _exception.reset(ex.ice_clone());
+ }
+
+ --_synchronizeCount;
+ if(!_responseCalled)
+ {
+ return;
+ }
+
+ if(_synchronizeCount > 0)
+ {
+ return;
+ }
+
+ callback = _callback;
+ _callback = 0;
+ }
+
+ if(callback)
+ {
+ callback->synchronized(ex);
+ }
+ }
+
+private:
+
+ SynchronizationCallbackPtr _callback;
+ bool _responseCalled;
+ int _synchronizeCount;
+ int _synchronizedCount;
+ int _nReplicas;
+ std::auto_ptr<Ice::Exception> _exception;
+};
+typedef IceUtil::Handle<ReplicaGroupSyncCallback> ReplicaGroupSyncCallbackPtr;
+
}
AdapterCache::AdapterCache(const Ice::CommunicatorPtr& communicator) : _communicator(communicator)
@@ -230,7 +337,7 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache,
}
bool
-ServerAdapterEntry::addSyncCallback(const SynchronizationCallbackPtr& callback)
+ServerAdapterEntry::addSyncCallback(const SynchronizationCallbackPtr& callback, const set<string>&)
{
try
{
@@ -336,9 +443,53 @@ ReplicaGroupEntry::ReplicaGroupEntry(AdapterCache& cache,
}
bool
-ReplicaGroupEntry::addSyncCallback(const SynchronizationCallbackPtr&)
+ReplicaGroupEntry::addSyncCallback(const SynchronizationCallbackPtr& callback, const set<string>& excludes)
{
- return false;
+ vector<ServerAdapterEntryPtr> replicas;
+ int nReplicas;
+ int roundRobin = false;
+ {
+ Lock sync(*this);
+ nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : static_cast<int>(_replicas.size());
+ roundRobin = RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing);
+ if(!roundRobin)
+ {
+ replicas = _replicas;
+ }
+ else
+ {
+ for(vector<ServerAdapterEntryPtr>::const_iterator p = _replicas.begin(); p != _replicas.end(); ++p)
+ {
+ if(excludes.find((*p)->getId()) == excludes.end())
+ {
+ replicas.push_back(*p);
+ }
+ }
+ }
+
+ if(replicas.empty())
+ {
+ return false;
+ }
+ }
+
+ ReplicaGroupSyncCallbackPtr cb = new ReplicaGroupSyncCallback(callback, replicas.size(), nReplicas);
+ set<string> emptyExcludes;
+ for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
+ {
+ try
+ {
+ if(!(*p)->addSyncCallback(cb, emptyExcludes))
+ {
+ cb->synchronized();
+ }
+ }
+ catch(const Ice::Exception& ex)
+ {
+ cb->synchronized(ex);
+ }
+ }
+ return cb->response();
}
void
@@ -468,6 +619,8 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
// might not exist anymore at this time or the node might not be
// reachable.
//
+ bool synchronizing = false;
+ set<string> emptyExcludes;
for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
if(!roundRobin || excludes.find((*p)->getId()) == excludes.end())
@@ -477,7 +630,7 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
int dummy;
bool dummy2;
bool dummy3;
- (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3, set<string>());
+ (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3, emptyExcludes);
}
catch(const AdapterNotExistException&)
{
@@ -490,9 +643,15 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n
}
catch(const SynchronizationException&)
{
+ synchronizing = true;
}
}
}
+
+ if(adapters.empty() && synchronizing)
+ {
+ throw SynchronizationException(__FILE__, __LINE__);
+ }
}
float