summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-09-26 14:12:55 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-09-26 14:12:55 +0000
commita22bf87583396f5720e3eb2c6fe555ca2da9cb89 (patch)
tree3afc24e9347693e57f01db2b6fcc2a9142baca66 /cpp/src
parentBug 1379. (diff)
downloadice-a22bf87583396f5720e3eb2c6fe555ca2da9cb89.tar.bz2
ice-a22bf87583396f5720e3eb2c6fe555ca2da9cb89.tar.xz
ice-a22bf87583396f5720e3eb2c6fe555ca2da9cb89.zip
Fixes to allow upgrade of a slave to a master
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/Database.cpp20
-rw-r--r--cpp/src/IceGrid/InternalRegistryI.cpp24
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp199
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.h36
-rw-r--r--cpp/src/IceGrid/QueryI.cpp27
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp66
-rw-r--r--cpp/src/IceGrid/RegistryI.h2
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp139
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.h9
-rw-r--r--cpp/src/IceGrid/SessionManager.h67
10 files changed, 304 insertions, 285 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index c000b00b455..df7c4130c29 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -1337,6 +1337,10 @@ Ice::ObjectPrx
Database::getObjectByType(const string& type)
{
Ice::ObjectProxySeq objs = getObjectsByType(type);
+ if(objs.empty())
+ {
+ return 0;
+ }
return objs[IceUtil::random(static_cast<int>(objs.size()))];
}
@@ -1344,6 +1348,11 @@ Ice::ObjectPrx
Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample)
{
Ice::ObjectProxySeq objs = getObjectsByType(type);
+ if(objs.empty())
+ {
+ return 0;
+ }
+
RandomNumberGenerator rng;
random_shuffle(objs.begin(), objs.end(), rng);
vector<pair<Ice::ObjectPrx, float> > objectsWithLoad;
@@ -1371,17 +1380,12 @@ Ice::ObjectProxySeq
Database::getObjectsByType(const string& type)
{
Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type);
-
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
IdentityObjectInfoDict objects(connection, _objectDbName);
for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p)
{
proxies.push_back(p->second.proxy);
}
- if(proxies.empty())
- {
- throw ObjectNotRegisteredException();
- }
return proxies;
}
@@ -1411,7 +1415,6 @@ ObjectInfoSeq
Database::getAllObjectInfos(const string& expression)
{
ObjectInfoSeq infos = _objectCache.getAll(expression);
-
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
IdentityObjectInfoDict objects(connection, _objectDbName);
for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p)
@@ -1428,7 +1431,6 @@ ObjectInfoSeq
Database::getObjectInfosByType(const string& type)
{
ObjectInfoSeq infos = _objectCache.getAllByType(type);
-
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
IdentityObjectInfoDict objects(connection, _objectDbName);
for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p)
@@ -1474,10 +1476,6 @@ Database::getInternalObjectsByType(const string& type)
{
proxies.push_back(p->second.proxy);
}
- if(proxies.empty())
- {
- throw ObjectNotRegisteredException();
- }
return proxies;
}
diff --git a/cpp/src/IceGrid/InternalRegistryI.cpp b/cpp/src/IceGrid/InternalRegistryI.cpp
index 0b2fd6cd504..bdf24718a8f 100644
--- a/cpp/src/IceGrid/InternalRegistryI.cpp
+++ b/cpp/src/IceGrid/InternalRegistryI.cpp
@@ -158,16 +158,10 @@ NodePrxSeq
InternalRegistryI::getNodes(const Ice::Current&) const
{
NodePrxSeq nodes;
- try
- {
- Ice::ObjectProxySeq proxies = _database->getInternalObjectsByType(Node::ice_staticId());
- for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
- {
- nodes.push_back(NodePrx::uncheckedCast(*p));
- }
- }
- catch(const ObjectNotRegisteredException&)
+ Ice::ObjectProxySeq proxies = _database->getInternalObjectsByType(Node::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
{
+ nodes.push_back(NodePrx::uncheckedCast(*p));
}
return nodes;
}
@@ -176,16 +170,10 @@ InternalRegistryPrxSeq
InternalRegistryI::getReplicas(const Ice::Current&) const
{
InternalRegistryPrxSeq replicas;
- try
- {
- Ice::ObjectProxySeq proxies = _database->getObjectsByType(InternalRegistry::ice_staticId());
- for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
- {
- replicas.push_back(InternalRegistryPrx::uncheckedCast(*p));
- }
- }
- catch(const ObjectNotRegisteredException&)
+ Ice::ObjectProxySeq proxies = _database->getObjectsByType(InternalRegistry::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
{
+ replicas.push_back(InternalRegistryPrx::uncheckedCast(*p));
}
return replicas;
}
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index bde9c1c6b24..c6742056400 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -16,10 +16,14 @@
using namespace std;
using namespace IceGrid;
-NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node) :
- SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx>(registry),
- _node(node)
+NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry,
+ const NodeIPtr& node,
+ const IceGrid::QueryPrx& query) :
+ SessionKeepAliveThread<NodeSessionPrx>(registry),
+ _node(node),
+ _query(query)
{
+ assert(registry && node && query);
string name = registry->ice_getIdentity().name;
const string prefix("InternalRegistry-");
string::size_type pos = name.find(prefix);
@@ -33,6 +37,8 @@ NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx
NodeSessionPrx
NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
{
+ NodeSessionPrx session;
+ auto_ptr<Ice::Exception> exception;
TraceLevelsPtr traceLevels = _node->getTraceLevels();
try
{
@@ -42,39 +48,88 @@ NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, I
out << "trying to establish session with replica `" << _name << "'";
}
- NodeSessionPrx session = _node->registerWithRegistry(registry);
-
- int t = session->getTimeout();
- if(t > 0)
+ if(!registry->ice_getEndpoints().empty())
{
- timeout = IceUtil::Time::seconds(t / 2);
+ try
+ {
+ session = createSessionImpl(registry, timeout);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ setRegistry(InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())));
+ }
}
-
- if(traceLevels && traceLevels->replica > 0)
+
+ if(!session)
{
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "established session with replica `" << _name << "'";
+ try
+ {
+ Ice::ObjectPrx obj = _query->findObjectById(registry->ice_getIdentity());
+ InternalRegistryPrx newRegistry = InternalRegistryPrx::uncheckedCast(obj);
+ if(newRegistry && newRegistry != registry)
+ {
+ session = createSessionImpl(newRegistry, timeout);
+ setRegistry(newRegistry);
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ }
}
-
- return session;
}
- catch(const NodeActiveException&)
+ catch(const NodeActiveException& ex)
{
if(traceLevels)
{
traceLevels->logger->error("a node with the same name is already active with the replica `" + _name + "'");
}
- return 0;
+ exception.reset(ex.ice_clone());
}
- catch(const Ice::LocalException& ex)
+ catch(const Ice::Exception& ex)
+ {
+ exception.reset(ex.ice_clone());
+ }
+
+ if(session)
+ {
+ if(traceLevels && traceLevels->replica > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
+ out << "established session with replica `" << _name << "'";
+ }
+ }
+ else
{
if(traceLevels && traceLevels->replica > 1)
{
Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "failed to establish session with replica `" << _name << "':\n" << ex;
+ out << "failed to establish session with replica `" << _name << "':\n";
+ if(exception.get())
+ {
+ out << *exception.get();
+ }
+ else
+ {
+ out << "failed to get replica proxy";
+ }
}
- return 0;
}
+
+ return session;
+}
+
+NodeSessionPrx
+NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
+{
+ NodeSessionPrx session = _node->registerWithRegistry(registry);
+ int t = session->getTimeout();
+ if(t > 0)
+ {
+ timeout = IceUtil::Time::seconds(t / 2);
+ }
+ return session;
}
void
@@ -148,7 +203,7 @@ NodeSessionManager::create(const NodeIPtr& node)
id.name = "InternalRegistry-Master";
_master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id)));
- _thread = new Thread(*this, _master);
+ _thread = new Thread(*this);
_thread->start();
//
@@ -226,7 +281,7 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
return p->second;
}
- NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node);
+ NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node, _query);
_sessions.insert(make_pair(replica->ice_getIdentity(), thread));
thread->start();
return thread;
@@ -279,7 +334,7 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
}
else
{
- thread = new NodeSessionKeepAliveThread(*p, _node);
+ thread = new NodeSessionKeepAliveThread(*p, _node, _query);
thread->start();
thread->tryCreateSession(*p);
}
@@ -297,53 +352,10 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
}
}
-NodeSessionPrx
-NodeSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
+void
+NodeSessionManager::createdSession(const NodeSessionPrx& session)
{
//
- // Establish a session with the master IceGrid registry.
- //
- NodeSessionPrx session;
- TraceLevelsPtr traceLevels = _node->getTraceLevels();
- try
- {
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "trying to establish session with master replica";
- }
-
- session = _node->registerWithRegistry(registry);
-
- int t = session->getTimeout();
- if(t > 0)
- {
- timeout = IceUtil::Time::seconds(t / 2);
- }
-
- if(traceLevels && traceLevels->replica > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "established session with master replica";
- }
- }
- catch(const NodeActiveException&)
- {
- if(traceLevels)
- {
- traceLevels->logger->error("a node with the same name is already active with the master replica");
- }
- }
- catch(const Ice::LocalException& ex)
- {
- if(traceLevels && traceLevels->replica > 1)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->replicaCat);
- out << "failed to establish session with master replica:\n" << ex;
- }
- }
-
- //
// Get the list of replicas (either with the master session or the
// IceGrid::Query interface) and make sure we have sessions opened
// to these replicas.
@@ -368,7 +380,7 @@ NodeSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::
if(session)
{
- replicas = registry->getReplicas();
+ replicas = _thread->getRegistry()->getReplicas();
}
else
{
@@ -414,56 +426,5 @@ NodeSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::
{
// IGNORE
}
-
- return session;
-}
-
-bool
-NodeSessionManager::keepAlive(const NodeSessionPrx& session)
-{
- if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2)
- {
- Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
- out << "sending keep alive message to master replica";
- }
-
- try
- {
- session->keepAlive(_node->getPlatformInfo().getLoadInfo());
- return true;
- }
- catch(const Ice::LocalException& ex)
- {
- if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
- {
- Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
- out << "lost session with master replica:\n" << ex;
- }
- _node->setObserver(0);
- return false;
- }
-}
-
-void
-NodeSessionManager::destroySession(const NodeSessionPrx& session)
-{
- try
- {
- session->destroy();
-
- if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0)
- {
- Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
- out << "destroyed master replica session";
- }
- }
- catch(const Ice::LocalException& ex)
- {
- if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1)
- {
- Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat);
- out << "couldn't destroy master replica session:\n" << ex;
- }
- }
}
diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h
index a06d1f4e2c2..2988c869573 100644
--- a/cpp/src/IceGrid/NodeSessionManager.h
+++ b/cpp/src/IceGrid/NodeSessionManager.h
@@ -24,20 +24,23 @@ namespace IceGrid
class NodeI;
typedef IceUtil::Handle<NodeI> NodeIPtr;
-class NodeSessionKeepAliveThread : public SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx>
+class NodeSessionKeepAliveThread : public SessionKeepAliveThread<NodeSessionPrx>
{
public:
- NodeSessionKeepAliveThread(const InternalRegistryPrx&, const NodeIPtr&);
+ NodeSessionKeepAliveThread(const InternalRegistryPrx&, const NodeIPtr&, const IceGrid::QueryPrx&);
virtual NodeSessionPrx createSession(const InternalRegistryPrx&, IceUtil::Time&);
virtual void destroySession(const NodeSessionPrx&);
virtual bool keepAlive(const NodeSessionPrx&);
-private:
+protected:
+
+ virtual NodeSessionPrx createSessionImpl(const InternalRegistryPrx&, IceUtil::Time&);
const NodeIPtr _node;
const std::string _name;
+ const IceGrid::QueryPrx _query;
};
typedef IceUtil::Handle<NodeSessionKeepAliveThread> NodeSessionKeepAliveThreadPtr;
@@ -61,12 +64,12 @@ private:
void syncReplicas(const InternalRegistryPrxSeq&);
- class Thread : public SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx>
+ class Thread : public NodeSessionKeepAliveThread
{
public:
- Thread(NodeSessionManager& manager, const InternalRegistryPrx& master) :
- SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx>(master),
+ Thread(NodeSessionManager& manager) :
+ NodeSessionKeepAliveThread(manager._master, manager._node, manager._query),
_manager(manager)
{
}
@@ -74,19 +77,9 @@ private:
virtual NodeSessionPrx
createSession(const InternalRegistryPrx& master, IceUtil::Time& timeout)
{
- return _manager.createSession(master, timeout);
- }
-
- virtual void
- destroySession(const NodeSessionPrx& session)
- {
- _manager.destroySession(session);
- }
-
- virtual bool
- keepAlive(const NodeSessionPrx& session)
- {
- return _manager.keepAlive(session);
+ NodeSessionPrx session = NodeSessionKeepAliveThread::createSession(master, timeout);
+ _manager.createdSession(session);
+ return session;
}
private:
@@ -94,12 +87,9 @@ private:
NodeSessionManager& _manager;
};
typedef IceUtil::Handle<Thread> ThreadPtr;
-
friend class Thread;
- NodeSessionPrx createSession(const InternalRegistryPrx&, IceUtil::Time&);
- void destroySession(const NodeSessionPrx&);
- bool keepAlive(const NodeSessionPrx&);
+ void createdSession(const NodeSessionPrx&);
const NodeIPtr _node;
ThreadPtr _thread;
diff --git a/cpp/src/IceGrid/QueryI.cpp b/cpp/src/IceGrid/QueryI.cpp
index 7df0c437a78..efa395a0c67 100644
--- a/cpp/src/IceGrid/QueryI.cpp
+++ b/cpp/src/IceGrid/QueryI.cpp
@@ -41,14 +41,7 @@ QueryI::findObjectById_async(const AMD_Query_findObjectByIdPtr& cb, const Ice::I
void
QueryI::findObjectByType_async(const AMD_Query_findObjectByTypePtr& cb, const string& type, const Ice::Current&) const
{
- try
- {
- cb->ice_response(_database->getObjectByType(type));
- }
- catch(const ObjectNotRegisteredException&)
- {
- cb->ice_response(0);
- }
+ cb->ice_response(_database->getObjectByType(type));
}
void
@@ -57,14 +50,7 @@ QueryI::findObjectByTypeOnLeastLoadedNode_async(const AMD_Query_findObjectByType
LoadSample sample,
const Ice::Current&) const
{
- try
- {
- cb->ice_response(_database->getObjectByTypeOnLeastLoadedNode(type, sample));
- }
- catch(const ObjectNotRegisteredException&)
- {
- cb->ice_response(0);
- }
+ cb->ice_response(_database->getObjectByTypeOnLeastLoadedNode(type, sample));
}
void
@@ -72,14 +58,7 @@ QueryI::findAllObjectsByType_async(const AMD_Query_findAllObjectsByTypePtr& cb,
const string& type,
const Ice::Current&) const
{
- try
- {
- cb->ice_response(_database->getObjectsByType(type));
- }
- catch(const ObjectNotRegisteredException&)
- {
- cb->ice_response(Ice::ObjectProxySeq());
- }
+ cb->ice_response(_database->getObjectsByType(type));
}
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index f1417d1a176..1cc700491eb 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -206,12 +206,6 @@ RegistryI::start(bool nowarn)
_replicaName = properties->getPropertyWithDefault("IceGrid.Registry.ReplicaName", "Master");
_master = _replicaName == "Master";
- //
- // Create the internal registry object adapter and activate it.
- //
- ObjectAdapterPtr registryAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Internal");
- registryAdapter->activate();
-
_reaper = new ReapThread();
_reaper->start();
@@ -222,7 +216,18 @@ RegistryI::start(bool nowarn)
//
if(_master)
{
- _instanceName = properties->getPropertyWithDefault("IceGrid.InstanceName", "IceGrid");
+ _instanceName = properties->getProperty("IceGrid.InstanceName");
+ if(_instanceName.empty())
+ {
+ if(_communicator->getDefaultLocator())
+ {
+ _instanceName = _communicator->getDefaultLocator()->ice_getIdentity().category;
+ }
+ else
+ {
+ _instanceName = "IceGrid";
+ }
+ }
}
else
{
@@ -242,6 +247,12 @@ RegistryI::start(bool nowarn)
properties->setProperty("Freeze.DbEnv.Registry.DbPrivate", "0");
//
+ // Create the internal registry object adapter.
+ //
+ ObjectAdapterPtr registryAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Internal");
+ registryAdapter->activate();
+
+ //
// Create the internal IceStorm service.
//
Identity registryTopicManagerId;
@@ -258,18 +269,44 @@ RegistryI::start(bool nowarn)
_database = new Database(registryAdapter, topicManager, _instanceName, _traceLevels, getInfo(), _master);
_wellKnownObjects = new WellKnownObjectsManager(_database);
- InternalRegistryPrx internalRegistry;
+ //
+ // Get the saved replica/node proxies and remove them from the
+ // database.
+ //
+ Ice::ObjectProxySeq proxies;
+
+ NodePrxSeq nodes;
+ proxies = _database->getInternalObjectsByType(Node::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
+ {
+ nodes.push_back(NodePrx::uncheckedCast(*p));
+ _database->removeInternalObject((*p)->ice_getIdentity());
+ }
+
+ InternalRegistryPrxSeq replicas;
+ proxies = _database->getObjectsByType(InternalRegistry::ice_staticId());
+ for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
+ {
+ replicas.push_back(InternalRegistryPrx::uncheckedCast(*p));
+ _database->removeObject((*p)->ice_getIdentity());
+ }
+
+ //
+ // NOTE: The internal registry object must be added only once the
+ // node/replica proxies are retrieved and removed from the
+ // database. Otherwise, if some replica/node register as soon as
+ // the internal registry is setup we might clear valid proxies.
+ //
+ InternalRegistryPrx internalRegistry = setupInternalRegistry(registryAdapter);
if(_master)
{
- internalRegistry = setupInternalRegistry(registryAdapter);
- NodePrxSeq nodes = registerReplicas(internalRegistry);
+ NodePrxSeq nodes = registerReplicas(internalRegistry, replicas);
registerNodes(internalRegistry, nodes);
}
else
{
- internalRegistry = setupInternalRegistry(registryAdapter);
_session.create(_replicaName, getInfo(), _database, _wellKnownObjects, internalRegistry);
- registerNodes(internalRegistry, _session.getNodes());
+ registerNodes(internalRegistry, _session.getNodes(nodes));
}
ObjectAdapterPtr serverAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Server");
@@ -1062,10 +1099,9 @@ RegistryI::getSSLInfo(const ConnectionPtr& connection, string& userDN)
}
NodePrxSeq
-RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry)
+RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry, const InternalRegistryPrxSeq& replicas)
{
set<NodePrx> nodes;
- InternalRegistryPrxSeq replicas = internalRegistry->getReplicas();
for(InternalRegistryPrxSeq::const_iterator r = replicas.begin(); r != replicas.end(); ++r)
{
if((*r)->ice_getIdentity() != internalRegistry->ice_getIdentity())
@@ -1078,7 +1114,6 @@ RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry)
}
catch(const Ice::LocalException&)
{
- // TODO: Cleanup the database?
}
}
}
@@ -1112,7 +1147,6 @@ RegistryI::registerNodes(const InternalRegistryPrx& internalRegistry, const Node
}
catch(const Ice::LocalException&)
{
- // TODO: Cleanup the database?
}
}
}
diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h
index 39f6072ade2..b1b0e9463f0 100644
--- a/cpp/src/IceGrid/RegistryI.h
+++ b/cpp/src/IceGrid/RegistryI.h
@@ -90,7 +90,7 @@ private:
Glacier2::SSLPermissionsVerifierPrx getSSLPermissionsVerifier(const Ice::LocatorPrx&, const std::string&, bool);
Glacier2::SSLInfo getSSLInfo(const Ice::ConnectionPtr&, std::string&);
- NodePrxSeq registerReplicas(const InternalRegistryPrx&);
+ NodePrxSeq registerReplicas(const InternalRegistryPrx&, const InternalRegistryPrxSeq&);
void registerNodes(const InternalRegistryPrx&, const NodePrxSeq&);
const Ice::CommunicatorPtr _communicator;
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index e121a307386..8413261f8af 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -264,6 +264,22 @@ ReplicaSessionManager::create(const string& name,
_wellKnownObjects = wellKnownObjects;
_traceLevels = _database->getTraceLevels();
+ //
+ // Initialize the IceGrid::Query objects. The IceGrid::Query
+ // interface is used to lookup the registry proxy in case it
+ // becomes unavailable. Since replicas might not always have
+ // an up to date registry proxy, we need to query all the
+ // replicas.
+ //
+ Ice::EndpointSeq endpoints = comm->getDefaultLocator()->ice_getEndpoints();
+ QueryPrx query = QueryPrx::uncheckedCast(comm->stringToProxy(instName + "/Query"));
+ for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
+ {
+ Ice::EndpointSeq singleEndpoint;
+ singleEndpoint.push_back(*p);
+ _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
+ }
+
_thread = new Thread(*this, _master);
_thread->start();
notifyAll();
@@ -293,7 +309,7 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica)
}
NodePrxSeq
-ReplicaSessionManager::getNodes() const
+ReplicaSessionManager::getNodes(const NodePrxSeq& nodes) const
{
try
{
@@ -301,7 +317,7 @@ ReplicaSessionManager::getNodes() const
}
catch(const Ice::LocalException&)
{
- return _internalRegistry->getNodes();
+ return nodes;
}
}
@@ -324,11 +340,6 @@ void
ReplicaSessionManager::registerAllWellKnownObjects()
{
//
- // Try to create the session if it doesn't already exists.
- //
- _thread->tryCreateSession(0);
-
- //
// If there's an active session, register the well-known objects
// with the session.
//
@@ -374,6 +385,8 @@ ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session)
ReplicaSessionPrx
ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
{
+ ReplicaSessionPrx session;
+ auto_ptr<Ice::Exception> exception;
try
{
if(_traceLevels && _traceLevels->replica > 1)
@@ -381,7 +394,94 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
out << "trying to establish session with master replica";
}
+
+ set<InternalRegistryPrx> used;
+ if(!registry->ice_getEndpoints().empty())
+ {
+ try
+ {
+ session = createSessionImpl(registry, timeout);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ used.insert(registry);
+ _thread->setRegistry(InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq())));
+ }
+ }
+
+ if(!session)
+ {
+ for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p)
+ {
+ InternalRegistryPrx newRegistry;
+ try
+ {
+ Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity());
+ newRegistry = InternalRegistryPrx::uncheckedCast(obj);
+ if(newRegistry && used.find(newRegistry) == used.end())
+ {
+ session = createSessionImpl(newRegistry, timeout);
+ _thread->setRegistry(newRegistry);
+ break;
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ exception.reset(ex.ice_clone());
+ if(newRegistry)
+ {
+ used.insert(newRegistry);
+ }
+ }
+ }
+ }
+ }
+ catch(const ReplicaActiveException& ex)
+ {
+ if(_traceLevels)
+ {
+ _traceLevels->logger->error("a replica with the same name is already registered and active");
+ }
+ exception.reset(ex.ice_clone());
+ }
+ catch(const Ice::Exception& ex)
+ {
+ exception.reset(ex.ice_clone());
+ }
+
+ if(session)
+ {
+ if(_traceLevels && _traceLevels->replica > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
+ out << "established session with master replica";
+ }
+ }
+ else
+ {
+ if(_traceLevels && _traceLevels->replica > 1)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
+ out << "failed to establish session with master replica:\n";
+ if(exception.get())
+ {
+ out << *exception.get();
+ }
+ else
+ {
+ out << "failed to get replica proxy";
+ }
+ }
+ }
+ return session;
+}
+ReplicaSessionPrx
+ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout)
+{
+ try
+ {
ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry);
int t = session->getTimeout();
if(t > 0)
@@ -402,24 +502,9 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
// Register all the well-known objects with the replica session.
//
_wellKnownObjects->registerAll(session);
-
- if(_traceLevels && _traceLevels->replica > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
- out << "established session with master replica";
- }
-
return session;
}
- catch(const ReplicaActiveException&)
- {
- if(_traceLevels)
- {
- _traceLevels->logger->error("a replica with the same name is already registered and active");
- }
- return 0;
- }
- catch(const Ice::LocalException& ex)
+ catch(const Ice::LocalException&)
{
if(_observer)
{
@@ -437,13 +522,7 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti
// Re-register all the well known objects with the local database.
//
_wellKnownObjects->registerAll();
-
- if(_traceLevels && _traceLevels->replica > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
- out << "failed to establish session with master replica:\n" << ex;
- }
- return 0;
+ throw;
}
}
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h
index 15c918b8bbd..ec7f6ca500c 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.h
+++ b/cpp/src/IceGrid/ReplicaSessionManager.h
@@ -32,12 +32,13 @@ typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr;
class ReplicaSessionManager : public IceUtil::Monitor<IceUtil::Mutex>
{
public:
- class Thread : public SessionKeepAliveThread<ReplicaSessionPrx, InternalRegistryPrx>
+
+ class Thread : public SessionKeepAliveThread<ReplicaSessionPrx>
{
public:
Thread(ReplicaSessionManager& manager, const InternalRegistryPrx& master) :
- SessionKeepAliveThread<ReplicaSessionPrx, InternalRegistryPrx>(master),
+ SessionKeepAliveThread<ReplicaSessionPrx>(master),
_manager(manager)
{
}
@@ -73,7 +74,7 @@ public:
void create(const std::string&, const RegistryInfo&, const DatabasePtr&, const WellKnownObjectsManagerPtr&,
const InternalRegistryPrx&);
void create(const InternalRegistryPrx&);
- NodePrxSeq getNodes() const;
+ NodePrxSeq getNodes(const NodePrxSeq&) const;
void destroy();
void registerAllWellKnownObjects();
@@ -84,6 +85,7 @@ private:
friend class Thread;
ReplicaSessionPrx createSession(const InternalRegistryPrx&, IceUtil::Time&);
+ ReplicaSessionPrx createSessionImpl(const InternalRegistryPrx&, IceUtil::Time&);
void destroySession(const ReplicaSessionPrx&);
bool keepAlive(const ReplicaSessionPrx&);
@@ -97,6 +99,7 @@ private:
DatabasePtr _database;
WellKnownObjectsManagerPtr _wellKnownObjects;
TraceLevelsPtr _traceLevels;
+ std::vector<QueryPrx> _queryObjects;
};
}
diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h
index eb96c868265..0dcb3b006e2 100644
--- a/cpp/src/IceGrid/SessionManager.h
+++ b/cpp/src/IceGrid/SessionManager.h
@@ -16,11 +16,12 @@
#include <IceUtil/Thread.h>
#include <IceGrid/Query.h>
+#include <IceGrid/Internal.h>
namespace IceGrid
{
-template<class TPrx, class FPrx>
+template<class TPrx>
class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex>
{
enum State
@@ -34,8 +35,8 @@ class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<I
public:
- SessionKeepAliveThread(const FPrx& factory) :
- _factory(factory),
+ SessionKeepAliveThread(const InternalRegistryPrx& registry) :
+ _registry(registry),
_state(Disconnected),
_destroySession(false)
{
@@ -45,7 +46,7 @@ public:
run()
{
TPrx session;
- FPrx factory = _factory;
+ InternalRegistryPrx registry = _registry;
bool updateState = false;
IceUtil::Time timeout = IceUtil::Time::seconds(10);
bool destroy = false;
@@ -70,37 +71,10 @@ public:
//
if(!session)
{
- session = createSession(factory, timeout);
+ session = createSession(registry, timeout);
updateState |= session;
- }
-
- //
- // If we failed to create the session with the factory and
- // the factory proxy is a direct proxy, we check with the
- // Query interface if the factory proxy was updated. It's
- // possible that the factory was restarted for example.
- //
- if(!session && !factory->ice_getEndpoints().empty())
- {
- std::string instanceName = factory->ice_getIdentity().category;
- try
- {
- QueryPrx query = QueryPrx::uncheckedCast(
- factory->ice_getCommunicator()->stringToProxy(instanceName + "/Query"));
- Ice::ObjectPrx obj = query->findObjectById(factory->ice_getIdentity());
- FPrx newFactory = FPrx::uncheckedCast(obj);
- if(newFactory != factory)
- {
- session = createSession(newFactory, timeout);
- factory = newFactory;
- updateState |= session;
- }
- }
- catch(const Ice::LocalException&)
- {
- }
}
-
+
if(updateState)
{
Lock sync(*this);
@@ -108,7 +82,6 @@ public:
{
_state = session ? Connected : Disconnected;
}
- _factory = factory;
_session = session;
notifyAll();
}
@@ -137,7 +110,7 @@ public:
}
updateState = _state == Retry || _state == DestroySession;
- factory = _factory;
+ registry = _registry;
}
if(destroy)
@@ -170,7 +143,7 @@ public:
}
virtual bool
- tryCreateSession(FPrx factory)
+ tryCreateSession(InternalRegistryPrx registry)
{
{
Lock sync(*this);
@@ -185,9 +158,9 @@ public:
}
_state = Retry;
- if(factory)
+ if(registry)
{
- _factory = factory;
+ _registry = registry;
}
notifyAll();
}
@@ -226,13 +199,27 @@ public:
return _session;
}
- virtual TPrx createSession(const FPrx&, IceUtil::Time&) = 0;
+ void
+ setRegistry(const InternalRegistryPrx& registry)
+ {
+ Lock sync(*this);
+ _registry = registry;
+ }
+
+ InternalRegistryPrx
+ getRegistry() const
+ {
+ Lock sync(*this);
+ return _registry;
+ }
+
+ virtual TPrx createSession(const InternalRegistryPrx&, IceUtil::Time&) = 0;
virtual void destroySession(const TPrx&) = 0;
virtual bool keepAlive(const TPrx&) = 0;
protected:
- FPrx _factory;
+ InternalRegistryPrx _registry;
TPrx _session;
State _state;
bool _destroySession;