diff options
author | Benoit Foucher <benoit@zeroc.com> | 2010-02-25 11:29:46 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2010-02-25 11:29:46 +0100 |
commit | 47a5c2f2a2ea021b960b8da8709670cac07af5a4 (patch) | |
tree | 008b7225e18472f81b1380aab287800051e14689 /cpp/src/IceGrid/AdapterCache.cpp | |
parent | bug 4071 - revert renaming of createStringConverter (diff) | |
download | ice-47a5c2f2a2ea021b960b8da8709670cac07af5a4.tar.bz2 ice-47a5c2f2a2ea021b960b8da8709670cac07af5a4.tar.xz ice-47a5c2f2a2ea021b960b8da8709670cac07af5a4.zip |
Fixed bug 4695 - IceStorm/repgrid test failure
Diffstat (limited to 'cpp/src/IceGrid/AdapterCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/AdapterCache.cpp | 167 |
1 files changed, 163 insertions, 4 deletions
diff --git a/cpp/src/IceGrid/AdapterCache.cpp b/cpp/src/IceGrid/AdapterCache.cpp index 421d841d465..837a6386584 100644 --- a/cpp/src/IceGrid/AdapterCache.cpp +++ b/cpp/src/IceGrid/AdapterCache.cpp @@ -66,6 +66,113 @@ struct TransformToReplica : public unary_function<const pair<string, ServerAdapt } }; +class ReplicaGroupSyncCallback : public SynchronizationCallback, public IceUtil::Mutex +{ +public: + + ReplicaGroupSyncCallback(const SynchronizationCallbackPtr& callback, int count, int nReplicas) : + _callback(callback), + _responseCalled(false), + _synchronizeCount(count), + _synchronizedCount(0), + _nReplicas(nReplicas > count ? count : nReplicas) + { + } + + bool + response() + { + Lock sync(*this); + _responseCalled = true; + if(_synchronizedCount >= _nReplicas) + { + _callback = 0; + return false; + } + else if(_synchronizeCount == 0) + { + if(_synchronizedCount == 0 && _exception.get()) + { + _exception->ice_throw(); + } + _callback = 0; + return false; + } + return true; + } + + void + synchronized() + { + SynchronizationCallbackPtr callback; + { + Lock sync(*this); + ++_synchronizedCount; + --_synchronizeCount; + + if(!_responseCalled) + { + return; + } + + if(_synchronizedCount < _nReplicas && _synchronizeCount > 0) + { + return; + } + + callback = _callback; + _callback = 0; + } + + if(callback) + { + callback->synchronized(); + } + } + + void + synchronized(const Ice::Exception& ex) + { + SynchronizationCallbackPtr callback; + { + Lock sync(*this); + if(!_exception.get()) + { + _exception.reset(ex.ice_clone()); + } + + --_synchronizeCount; + if(!_responseCalled) + { + return; + } + + if(_synchronizeCount > 0) + { + return; + } + + callback = _callback; + _callback = 0; + } + + if(callback) + { + callback->synchronized(ex); + } + } + +private: + + SynchronizationCallbackPtr _callback; + bool _responseCalled; + int _synchronizeCount; + int _synchronizedCount; + int _nReplicas; + std::auto_ptr<Ice::Exception> _exception; +}; +typedef IceUtil::Handle<ReplicaGroupSyncCallback> ReplicaGroupSyncCallbackPtr; + } AdapterCache::AdapterCache(const Ice::CommunicatorPtr& communicator) : _communicator(communicator) @@ -230,7 +337,7 @@ ServerAdapterEntry::ServerAdapterEntry(AdapterCache& cache, } bool -ServerAdapterEntry::addSyncCallback(const SynchronizationCallbackPtr& callback) +ServerAdapterEntry::addSyncCallback(const SynchronizationCallbackPtr& callback, const set<string>&) { try { @@ -336,9 +443,53 @@ ReplicaGroupEntry::ReplicaGroupEntry(AdapterCache& cache, } bool -ReplicaGroupEntry::addSyncCallback(const SynchronizationCallbackPtr&) +ReplicaGroupEntry::addSyncCallback(const SynchronizationCallbackPtr& callback, const set<string>& excludes) { - return false; + vector<ServerAdapterEntryPtr> replicas; + int nReplicas; + int roundRobin = false; + { + Lock sync(*this); + nReplicas = _loadBalancingNReplicas > 0 ? _loadBalancingNReplicas : static_cast<int>(_replicas.size()); + roundRobin = RoundRobinLoadBalancingPolicyPtr::dynamicCast(_loadBalancing); + if(!roundRobin) + { + replicas = _replicas; + } + else + { + for(vector<ServerAdapterEntryPtr>::const_iterator p = _replicas.begin(); p != _replicas.end(); ++p) + { + if(excludes.find((*p)->getId()) == excludes.end()) + { + replicas.push_back(*p); + } + } + } + + if(replicas.empty()) + { + return false; + } + } + + ReplicaGroupSyncCallbackPtr cb = new ReplicaGroupSyncCallback(callback, replicas.size(), nReplicas); + set<string> emptyExcludes; + for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p) + { + try + { + if(!(*p)->addSyncCallback(cb, emptyExcludes)) + { + cb->synchronized(); + } + } + catch(const Ice::Exception& ex) + { + cb->synchronized(ex); + } + } + return cb->response(); } void @@ -468,6 +619,8 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n // might not exist anymore at this time or the node might not be // reachable. // + bool synchronizing = false; + set<string> emptyExcludes; for(vector<ServerAdapterEntryPtr>::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { if(!roundRobin || excludes.find((*p)->getId()) == excludes.end()) @@ -477,7 +630,7 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n int dummy; bool dummy2; bool dummy3; - (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3, set<string>()); + (*p)->getLocatorAdapterInfo(adapters, dummy, dummy2, dummy3, emptyExcludes); } catch(const AdapterNotExistException&) { @@ -490,9 +643,15 @@ ReplicaGroupEntry::getLocatorAdapterInfo(LocatorAdapterInfoSeq& adapters, int& n } catch(const SynchronizationException&) { + synchronizing = true; } } } + + if(adapters.empty() && synchronizing) + { + throw SynchronizationException(__FILE__, __LINE__); + } } float |