summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaCache.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-12-04 19:43:10 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-12-04 19:43:10 +0000
commit996be326b98f2c5bb776ba68dd2b3d8b2861a12a (patch)
tree446bb318a97f7e64b35f299407f03868942c7c0f /cpp/src/IceGrid/ReplicaCache.cpp
parentRemoved -single_module, it doesn't seem to be neccessary anymore (diff)
downloadice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.tar.bz2
ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.tar.xz
ice-996be326b98f2c5bb776ba68dd2b3d8b2861a12a.zip
More cleanup
Diffstat (limited to 'cpp/src/IceGrid/ReplicaCache.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp70
1 files changed, 47 insertions, 23 deletions
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
index cb4ace333da..9e21cf98d5b 100644
--- a/cpp/src/IceGrid/ReplicaCache.cpp
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -23,15 +23,15 @@ ReplicaCache::ReplicaCache(const Ice::CommunicatorPtr& communicator, const IceSt
IceStorm::TopicPrx t;
try
{
- t = topicManager->create("NodeNotifier");
+ t = topicManager->create("ReplicaObserverTopic");
}
catch(const IceStorm::TopicExists&)
{
- t = topicManager->retrieve("NodeNotifier");
+ t = topicManager->retrieve("ReplicaObserverTopic");
}
- const_cast<IceStorm::TopicPrx&>(_topic) = t;
- const_cast<NodePrx&>(_nodes) = NodePrx::uncheckedCast(_topic->getPublisher());
+ const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
+ const_cast<ReplicaObserverPrx&>(_observers) = ReplicaObserverPrx::uncheckedCast(_topic->getPublisher());
}
ReplicaEntryPtr
@@ -39,22 +39,38 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session)
{
Lock sync(*this);
- while(true)
+ ReplicaEntryPtr entry;
+ while(entry = getImpl(name))
{
- ReplicaEntryPtr entry = getImpl(name);
- if(entry)
+ ReplicaSessionIPtr session = entry->getSession();
+ if(session->isDestroyed())
{
- if(entry->getSession()->isDestroyed())
+ wait(); // Wait for the session to be removed.
+ }
+ else
+ {
+ //
+ // Check if the replica is still reachable, if not, we
+ // destroy its session.
+ //
+ sync.release();
+ try
{
- wait();
- continue;
+ session->getInternalRegistry()->ice_ping();
+ throw ReplicaActiveException();
}
- else
+ catch(const Ice::LocalException&)
{
- throw ReplicaActiveException();
+ try
+ {
+ session->destroy();
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
+ sync.acquire();
}
- break;
}
if(_traceLevels && _traceLevels->replica > 0)
@@ -65,7 +81,7 @@ ReplicaCache::add(const string& name, const ReplicaSessionIPtr& session)
try
{
- _nodes->replicaAdded(session->getInternalRegistry());
+ _observers->replicaAdded(session->getInternalRegistry());
}
catch(const Ice::ConnectionRefusedException&)
{
@@ -104,7 +120,7 @@ ReplicaCache::remove(const string& name, bool shutdown)
{
try
{
- _nodes->replicaRemoved(entry->getProxy());
+ _observers->replicaRemoved(entry->getProxy());
}
catch(const Ice::ConnectionRefusedException&)
{
@@ -139,13 +155,21 @@ ReplicaCache::get(const string& name) const
}
void
-ReplicaCache::nodeAdded(const NodePrx& node)
+ReplicaCache::subscribe(const ReplicaObserverPrx& observer)
{
- IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
try
{
- _topic->subscribe(qos, node);
+ Lock sync(*this);
+ InternalRegistryPrxSeq replicas;
+ for(map<string, ReplicaEntryPtr>::const_iterator p = _entries.begin(); p != _entries.end(); ++p)
+ {
+ replicas.push_back(p->second->getProxy());
+ }
+
+ IceStorm::QoS qos;
+ qos["reliability"] = "twoway ordered";
+ Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer);
+ ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas);
}
catch(const Ice::ConnectionRefusedException&)
{
@@ -157,17 +181,17 @@ ReplicaCache::nodeAdded(const NodePrx& node)
if(traceLevels)
{
Ice::Warning out(traceLevels->logger);
- out << "unexpected exception while subscribing node from replica observer topic:\n" << ex;
+ out << "unexpected exception while subscribing observer from replica observer topic:\n" << ex;
}
}
}
void
-ReplicaCache::nodeRemoved(const NodePrx& node)
+ReplicaCache::unsubscribe(const ReplicaObserverPrx& observer)
{
try
{
- _topic->unsubscribe(node);
+ _topic->unsubscribe(observer);
}
catch(const Ice::ConnectionRefusedException&)
{
@@ -179,7 +203,7 @@ ReplicaCache::nodeRemoved(const NodePrx& node)
if(traceLevels)
{
Ice::Warning out(traceLevels->logger);
- out << "unexpected exception while unsubscribing node from replica observer topic:\n" << ex;
+ out << "unexpected exception while unsubscribing observer from replica observer topic:\n" << ex;
}
}
}