summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/ReplicaSessionManager.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/ReplicaSessionManager.cpp')
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp139
1 files changed, 109 insertions, 30 deletions
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index e121a307386..8413261f8af 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -264,6 +264,22 @@ ReplicaSessionManager::create(const string& name,
_wellKnownObjects = wellKnownObjects;
_traceLevels = _database->getTraceLevels();
+ //
+ // Initialize the IceGrid::Query objects. The IceGrid::Query
+ // interface is used to lookup the registry proxy in case it
+ // becomes unavailable. Since replicas might not always have
+ // an up to date registry proxy, we need to query all the
+ // replicas.
+ //
+ Ice::EndpointSeq endpoints = comm->getDefaultLocator()->ice_getEndpoints();
+ QueryPrx query = QueryPrx::uncheckedCast(comm->stringToProxy(instName + "/Query"));
+ for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ Ice::EndpointSeq singleEndpoint;
+ singleEndpoint.push_back(*p);
+ _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
+ }
+
_thread = new Thread(*this, _master);
_thread->start();
notifyAll();
@@ -293,7 +309,7 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica)
}
NodePrxSeq
-ReplicaSessionManager::getNodes() const
+ReplicaSessionManager::getNodes(const NodePrxSeq& nodes) const
{
try
{
@@ -301,7 +317,7 @@ ReplicaSessionManager::getNodes() const
}
catch(const Ice::LocalException&)
{
- return _internalRegistry->getNodes();
+ return nodes;
}
}
@@ -324,11 +340,6 @@ void
ReplicaSessionManager::registerAllWellKnownObjects()
{
//
- // Try to create the session if it doesn't already exists.
- //
- _thread->tryCreateSession(0);
-
- //
// If there's an active session, register the well-known objects
// with the session.
//
@@ -374,6 +385,8 @@ ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session)
ReplicaSessionPrx
ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
{
+ ReplicaSessionPrx session;
+ auto_ptr<Ice::Exception> exception;
try
{
if(_traceLevels && _traceLevels->replica > 1)
@@ -381,7 +394,94 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
out << "trying to establish session with master replica";
}
+
+ set<InternalRegistryPrx> used;
+ if(!registry->ice_getEndpoints().empty())
+ {
+ try
+ {
+ session = createSessionImpl(registry, timeout);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ used.insert(registry);
+ _thread->setRegistry(InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())));
+ }
+ }
+
+ if(!session)
+ {
+ for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
+ {
+ InternalRegistryPrx newRegistry;
+ try
+ {
+ Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity());
+ newRegistry = InternalRegistryPrx::uncheckedCast(obj);
+ if(newRegistry && used.find(newRegistry) == used.end())
+ {
+ session = createSessionImpl(newRegistry, timeout);
+ _thread->setRegistry(newRegistry);
+ break;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ if(newRegistry)
+ {
+ used.insert(newRegistry);
+ }
+ }
+ }
+ }
+ }
+ catch(const ReplicaActiveException& ex)
+ {
+ if(_traceLevels)
+ {
+ _traceLevels->logger->error("a replica with the same name is already registered and active");
+ }
+ exception.reset(ex.ice_clone());
+ }
+ catch(const Ice::Exception& ex)
+ {
+ exception.reset(ex.ice_clone());
+ }
+
+ if(session)
+ {
+ if(_traceLevels && _traceLevels->replica > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
+ out << "established session with master replica";
+ }
+ }
+ else
+ {
+ if(_traceLevels && _traceLevels->replica > 1)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
+ out << "failed to establish session with master replica:\n";
+ if(exception.get())
+ {
+ out << *exception.get();
+ }
+ else
+ {
+ out << "failed to get replica proxy";
+ }
+ }
+ }
+ return session;
+}
+ReplicaSessionPrx
+ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
+{
+ try
+ {
ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry);
int t = session->getTimeout();
if(t > 0)
@@ -402,24 +502,9 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
// Register all the well-known objects with the replica session.
//
_wellKnownObjects->registerAll(session);
-
- if(_traceLevels && _traceLevels->replica > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
- out << "established session with master replica";
- }
-
return session;
}
- catch(const ReplicaActiveException&)
- {
- if(_traceLevels)
- {
- _traceLevels->logger->error("a replica with the same name is already registered and active");
- }
- return 0;
- }
- catch(const Ice::LocalException& ex)
+ catch(const Ice::LocalException&)
{
if(_observer)
{
@@ -437,13 +522,7 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
// Re-register all the well known objects with the local database.
//
_wellKnownObjects->registerAll();
-
- if(_traceLevels && _traceLevels->replica > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
- out << "failed to establish session with master replica:\n" << ex;
- }
- return 0;
+ throw;
}
}