diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-12-04 19:43:10 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-12-04 19:43:10 +0000 |
commit | 996be326b98f2c5bb776ba68dd2b3d8b2861a12a (patch) | |
tree | 446bb318a97f7e64b35f299407f03868942c7c0f /cpp/src/IceGrid/ReplicaCache.cpp | |
parent | Removed -single_module, it doesn't seem to be neccessary anymore (diff) | |
download | ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.tar.bz2 ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.tar.xz ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.zip |
More cleanup
Diffstat (limited to 'cpp/src/IceGrid/ReplicaCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 70 |
1 files changed, 47 insertions, 23 deletions
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index cb4ace333da..9e21cf98d5b 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -23,15 +23,15 @@ ReplicaCache::ReplicaCache(const Ice::CommunicatorPtr& communicator, const IceSt IceStorm::TopicPrx t; try { - t = topicManager->create("NodeNotifier"); + t = topicManager->create("ReplicaObserverTopic"); } catch(const IceStorm::TopicExists&) { - t = topicManager->retrieve("NodeNotifier"); + t = topicManager->retrieve("ReplicaObserverTopic"); } - const_cast<IceStorm::TopicPrx&>(_topic) = t; - const_cast<NodePrx&>(_nodes) = NodePrx::uncheckedCast(_topic->getPublisher()); + const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true)); + const_cast<ReplicaObserverPrx&>(_observers) = ReplicaObserverPrx::uncheckedCast(_topic->getPublisher()); } ReplicaEntryPtr @@ -39,22 +39,38 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) { Lock sync(*this); - while(true) + ReplicaEntryPtr entry; + while(entry = getImpl(name)) { - ReplicaEntryPtr entry = getImpl(name); - if(entry) + ReplicaSessionIPtr session = entry->getSession(); + if(session->isDestroyed()) { - if(entry->getSession()->isDestroyed()) + wait(); // Wait for the session to be removed. + } + else + { + // + // Check if the replica is still reachable, if not, we + // destroy its session. + // + sync.release(); + try { - wait(); - continue; + session->getInternalRegistry()->ice_ping(); + throw ReplicaActiveException(); } - else + catch(const Ice::LocalException&) { - throw ReplicaActiveException(); + try + { + session->destroy(); + } + catch(const Ice::LocalException&) + { + } } + sync.acquire(); } - break; } if(_traceLevels && _traceLevels->replica > 0) @@ -65,7 +81,7 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session) try { - _nodes->replicaAdded(session->getInternalRegistry()); + _observers->replicaAdded(session->getInternalRegistry()); } catch(const Ice::ConnectionRefusedException&) { @@ -104,7 +120,7 @@ ReplicaCache::remove(const string& name, bool shutdown) { try { - _nodes->replicaRemoved(entry->getProxy()); + _observers->replicaRemoved(entry->getProxy()); } catch(const Ice::ConnectionRefusedException&) { @@ -139,13 +155,21 @@ ReplicaCache::get(const string& name) const } void -ReplicaCache::nodeAdded(const NodePrx& node) +ReplicaCache::subscribe(const ReplicaObserverPrx& observer) { - IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; try { - _topic->subscribe(qos, node); + Lock sync(*this); + InternalRegistryPrxSeq replicas; + for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p) + { + replicas.push_back(p->second->getProxy()); + } + + IceStorm::QoS qos; + qos["reliability"] = "twoway ordered"; + Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer); + ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas); } catch(const Ice::ConnectionRefusedException&) { @@ -157,17 +181,17 @@ ReplicaCache::nodeAdded(const NodePrx& node) if(traceLevels) { Ice::Warning out(traceLevels->logger); - out << "unexpected exception while subscribing node from replica observer topic:\n" << ex; + out << "unexpected exception while subscribing observer from replica observer topic:\n" << ex; } } } void -ReplicaCache::nodeRemoved(const NodePrx& node) +ReplicaCache::unsubscribe(const ReplicaObserverPrx& observer) { try { - _topic->unsubscribe(node); + _topic->unsubscribe(observer); } catch(const Ice::ConnectionRefusedException&) { @@ -179,7 +203,7 @@ ReplicaCache::nodeRemoved(const NodePrx& node) if(traceLevels) { Ice::Warning out(traceLevels->logger); - out << "unexpected exception while unsubscribing node from replica observer topic:\n" << ex; + out << "unexpected exception while unsubscribing observer from replica observer topic:\n" << ex; } } } |