diff options
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 20 | ||||
-rw-r--r-- | cpp/src/IceGrid/InternalRegistryI.cpp | 24 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 199 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.h | 36 | ||||
-rw-r--r-- | cpp/src/IceGrid/QueryI.cpp | 27 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 66 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 139 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.h | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionManager.h | 67 |
10 files changed, 304 insertions, 285 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index c000b00b455..df7c4130c29 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -1337,6 +1337,10 @@ Ice::ObjectPrx Database::getObjectByType(const string& type) { Ice::ObjectProxySeq objs = getObjectsByType(type); + if(objs.empty()) + { + return 0; + } return objs[IceUtil::random(static_cast<int>(objs.size()))]; } @@ -1344,6 +1348,11 @@ Ice::ObjectPrx Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample) { Ice::ObjectProxySeq objs = getObjectsByType(type); + if(objs.empty()) + { + return 0; + } + RandomNumberGenerator rng; random_shuffle(objs.begin(), objs.end(), rng); vector<pair<Ice::ObjectPrx, float> > objectsWithLoad; @@ -1371,17 +1380,12 @@ Ice::ObjectProxySeq Database::getObjectsByType(const string& type) { Ice::ObjectProxySeq proxies = _objectCache.getObjectsByType(type); - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); IdentityObjectInfoDict objects(connection, _objectDbName); for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p) { proxies.push_back(p->second.proxy); } - if(proxies.empty()) - { - throw ObjectNotRegisteredException(); - } return proxies; } @@ -1411,7 +1415,6 @@ ObjectInfoSeq Database::getAllObjectInfos(const string& expression) { ObjectInfoSeq infos = _objectCache.getAll(expression); - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); IdentityObjectInfoDict objects(connection, _objectDbName); for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p) @@ -1428,7 +1431,6 @@ ObjectInfoSeq Database::getObjectInfosByType(const string& type) { ObjectInfoSeq infos = _objectCache.getAllByType(type); - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); IdentityObjectInfoDict objects(connection, _objectDbName); for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p) @@ -1474,10 +1476,6 @@ Database::getInternalObjectsByType(const string& type) { proxies.push_back(p->second.proxy); } - if(proxies.empty()) - { - throw ObjectNotRegisteredException(); - } return proxies; } diff --git a/cpp/src/IceGrid/InternalRegistryI.cpp b/cpp/src/IceGrid/InternalRegistryI.cpp index 0b2fd6cd504..bdf24718a8f 100644 --- a/cpp/src/IceGrid/InternalRegistryI.cpp +++ b/cpp/src/IceGrid/InternalRegistryI.cpp @@ -158,16 +158,10 @@ NodePrxSeq InternalRegistryI::getNodes(const Ice::Current&) const { NodePrxSeq nodes; - try - { - Ice::ObjectProxySeq proxies = _database->getInternalObjectsByType(Node::ice_staticId()); - for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) - { - nodes.push_back(NodePrx::uncheckedCast(*p)); - } - } - catch(const ObjectNotRegisteredException&) + Ice::ObjectProxySeq proxies = _database->getInternalObjectsByType(Node::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) { + nodes.push_back(NodePrx::uncheckedCast(*p)); } return nodes; } @@ -176,16 +170,10 @@ InternalRegistryPrxSeq InternalRegistryI::getReplicas(const Ice::Current&) const { InternalRegistryPrxSeq replicas; - try - { - Ice::ObjectProxySeq proxies = _database->getObjectsByType(InternalRegistry::ice_staticId()); - for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) - { - replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); - } - } - catch(const ObjectNotRegisteredException&) + Ice::ObjectProxySeq proxies = _database->getObjectsByType(InternalRegistry::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) { + replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); } return replicas; } diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index bde9c1c6b24..c6742056400 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -16,10 +16,14 @@ using namespace std; using namespace IceGrid; -NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node) : - SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx>(registry), - _node(node) +NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, + const NodeIPtr& node, + const IceGrid::QueryPrx& query) : + SessionKeepAliveThread<NodeSessionPrx>(registry), + _node(node), + _query(query) { + assert(registry && node && query); string name = registry->ice_getIdentity().name; const string prefix("InternalRegistry-"); string::size_type pos = name.find(prefix); @@ -33,6 +37,8 @@ NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx NodeSessionPrx NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) { + NodeSessionPrx session; + auto_ptr<Ice::Exception> exception; TraceLevelsPtr traceLevels = _node->getTraceLevels(); try { @@ -42,39 +48,88 @@ NodeSessionKeepAliveThread::createSession(const InternalRegistryPrx& registry, I out << "trying to establish session with replica `" << _name << "'"; } - NodeSessionPrx session = _node->registerWithRegistry(registry); - - int t = session->getTimeout(); - if(t > 0) + if(!registry->ice_getEndpoints().empty()) { - timeout = IceUtil::Time::seconds(t / 2); + try + { + session = createSessionImpl(registry, timeout); + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + setRegistry(InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq()))); + } } - - if(traceLevels && traceLevels->replica > 0) + + if(!session) { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "established session with replica `" << _name << "'"; + try + { + Ice::ObjectPrx obj = _query->findObjectById(registry->ice_getIdentity()); + InternalRegistryPrx newRegistry = InternalRegistryPrx::uncheckedCast(obj); + if(newRegistry && newRegistry != registry) + { + session = createSessionImpl(newRegistry, timeout); + setRegistry(newRegistry); + } + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + } } - - return session; } - catch(const NodeActiveException&) + catch(const NodeActiveException& ex) { if(traceLevels) { traceLevels->logger->error("a node with the same name is already active with the replica `" + _name + "'"); } - return 0; + exception.reset(ex.ice_clone()); } - catch(const Ice::LocalException& ex) + catch(const Ice::Exception& ex) + { + exception.reset(ex.ice_clone()); + } + + if(session) + { + if(traceLevels && traceLevels->replica > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); + out << "established session with replica `" << _name << "'"; + } + } + else { if(traceLevels && traceLevels->replica > 1) { Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "failed to establish session with replica `" << _name << "':\n" << ex; + out << "failed to establish session with replica `" << _name << "':\n"; + if(exception.get()) + { + out << *exception.get(); + } + else + { + out << "failed to get replica proxy"; + } } - return 0; } + + return session; +} + +NodeSessionPrx +NodeSessionKeepAliveThread::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout) +{ + NodeSessionPrx session = _node->registerWithRegistry(registry); + int t = session->getTimeout(); + if(t > 0) + { + timeout = IceUtil::Time::seconds(t / 2); + } + return session; } void @@ -148,7 +203,7 @@ NodeSessionManager::create(const NodeIPtr& node) id.name = "InternalRegistry-Master"; _master = InternalRegistryPrx::uncheckedCast(communicator->stringToProxy(communicator->identityToString(id))); - _thread = new Thread(*this, _master); + _thread = new Thread(*this); _thread->start(); // @@ -226,7 +281,7 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) return p->second; } - NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node); + NodeSessionKeepAliveThreadPtr thread = new NodeSessionKeepAliveThread(replica, _node, _query); _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); thread->start(); return thread; @@ -279,7 +334,7 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) } else { - thread = new NodeSessionKeepAliveThread(*p, _node); + thread = new NodeSessionKeepAliveThread(*p, _node, _query); thread->start(); thread->tryCreateSession(*p); } @@ -297,53 +352,10 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) } } -NodeSessionPrx -NodeSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) +void +NodeSessionManager::createdSession(const NodeSessionPrx& session) { // - // Establish a session with the master IceGrid registry. - // - NodeSessionPrx session; - TraceLevelsPtr traceLevels = _node->getTraceLevels(); - try - { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "trying to establish session with master replica"; - } - - session = _node->registerWithRegistry(registry); - - int t = session->getTimeout(); - if(t > 0) - { - timeout = IceUtil::Time::seconds(t / 2); - } - - if(traceLevels && traceLevels->replica > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "established session with master replica"; - } - } - catch(const NodeActiveException&) - { - if(traceLevels) - { - traceLevels->logger->error("a node with the same name is already active with the master replica"); - } - } - catch(const Ice::LocalException& ex) - { - if(traceLevels && traceLevels->replica > 1) - { - Ice::Trace out(traceLevels->logger, traceLevels->replicaCat); - out << "failed to establish session with master replica:\n" << ex; - } - } - - // // Get the list of replicas (either with the master session or the // IceGrid::Query interface) and make sure we have sessions opened // to these replicas. @@ -368,7 +380,7 @@ NodeSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil:: if(session) { - replicas = registry->getReplicas(); + replicas = _thread->getRegistry()->getReplicas(); } else { @@ -414,56 +426,5 @@ NodeSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil:: { // IGNORE } - - return session; -} - -bool -NodeSessionManager::keepAlive(const NodeSessionPrx& session) -{ - if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 2) - { - Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); - out << "sending keep alive message to master replica"; - } - - try - { - session->keepAlive(_node->getPlatformInfo().getLoadInfo()); - return true; - } - catch(const Ice::LocalException& ex) - { - if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) - { - Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); - out << "lost session with master replica:\n" << ex; - } - _node->setObserver(0); - return false; - } -} - -void -NodeSessionManager::destroySession(const NodeSessionPrx& session) -{ - try - { - session->destroy(); - - if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 0) - { - Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); - out << "destroyed master replica session"; - } - } - catch(const Ice::LocalException& ex) - { - if(_node->getTraceLevels() && _node->getTraceLevels()->replica > 1) - { - Ice::Trace out(_node->getTraceLevels()->logger, _node->getTraceLevels()->replicaCat); - out << "couldn't destroy master replica session:\n" << ex; - } - } } diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h index a06d1f4e2c2..2988c869573 100644 --- a/cpp/src/IceGrid/NodeSessionManager.h +++ b/cpp/src/IceGrid/NodeSessionManager.h @@ -24,20 +24,23 @@ namespace IceGrid class NodeI; typedef IceUtil::Handle<NodeI> NodeIPtr; -class NodeSessionKeepAliveThread : public SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx> +class NodeSessionKeepAliveThread : public SessionKeepAliveThread<NodeSessionPrx> { public: - NodeSessionKeepAliveThread(const InternalRegistryPrx&, const NodeIPtr&); + NodeSessionKeepAliveThread(const InternalRegistryPrx&, const NodeIPtr&, const IceGrid::QueryPrx&); virtual NodeSessionPrx createSession(const InternalRegistryPrx&, IceUtil::Time&); virtual void destroySession(const NodeSessionPrx&); virtual bool keepAlive(const NodeSessionPrx&); -private: +protected: + + virtual NodeSessionPrx createSessionImpl(const InternalRegistryPrx&, IceUtil::Time&); const NodeIPtr _node; const std::string _name; + const IceGrid::QueryPrx _query; }; typedef IceUtil::Handle<NodeSessionKeepAliveThread> NodeSessionKeepAliveThreadPtr; @@ -61,12 +64,12 @@ private: void syncReplicas(const InternalRegistryPrxSeq&); - class Thread : public SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx> + class Thread : public NodeSessionKeepAliveThread { public: - Thread(NodeSessionManager& manager, const InternalRegistryPrx& master) : - SessionKeepAliveThread<NodeSessionPrx, InternalRegistryPrx>(master), + Thread(NodeSessionManager& manager) : + NodeSessionKeepAliveThread(manager._master, manager._node, manager._query), _manager(manager) { } @@ -74,19 +77,9 @@ private: virtual NodeSessionPrx createSession(const InternalRegistryPrx& master, IceUtil::Time& timeout) { - return _manager.createSession(master, timeout); - } - - virtual void - destroySession(const NodeSessionPrx& session) - { - _manager.destroySession(session); - } - - virtual bool - keepAlive(const NodeSessionPrx& session) - { - return _manager.keepAlive(session); + NodeSessionPrx session = NodeSessionKeepAliveThread::createSession(master, timeout); + _manager.createdSession(session); + return session; } private: @@ -94,12 +87,9 @@ private: NodeSessionManager& _manager; }; typedef IceUtil::Handle<Thread> ThreadPtr; - friend class Thread; - NodeSessionPrx createSession(const InternalRegistryPrx&, IceUtil::Time&); - void destroySession(const NodeSessionPrx&); - bool keepAlive(const NodeSessionPrx&); + void createdSession(const NodeSessionPrx&); const NodeIPtr _node; ThreadPtr _thread; diff --git a/cpp/src/IceGrid/QueryI.cpp b/cpp/src/IceGrid/QueryI.cpp index 7df0c437a78..efa395a0c67 100644 --- a/cpp/src/IceGrid/QueryI.cpp +++ b/cpp/src/IceGrid/QueryI.cpp @@ -41,14 +41,7 @@ QueryI::findObjectById_async(const AMD_Query_findObjectByIdPtr& cb, const Ice::I void QueryI::findObjectByType_async(const AMD_Query_findObjectByTypePtr& cb, const string& type, const Ice::Current&) const { - try - { - cb->ice_response(_database->getObjectByType(type)); - } - catch(const ObjectNotRegisteredException&) - { - cb->ice_response(0); - } + cb->ice_response(_database->getObjectByType(type)); } void @@ -57,14 +50,7 @@ QueryI::findObjectByTypeOnLeastLoadedNode_async(const AMD_Query_findObjectByType LoadSample sample, const Ice::Current&) const { - try - { - cb->ice_response(_database->getObjectByTypeOnLeastLoadedNode(type, sample)); - } - catch(const ObjectNotRegisteredException&) - { - cb->ice_response(0); - } + cb->ice_response(_database->getObjectByTypeOnLeastLoadedNode(type, sample)); } void @@ -72,14 +58,7 @@ QueryI::findAllObjectsByType_async(const AMD_Query_findAllObjectsByTypePtr& cb, const string& type, const Ice::Current&) const { - try - { - cb->ice_response(_database->getObjectsByType(type)); - } - catch(const ObjectNotRegisteredException&) - { - cb->ice_response(Ice::ObjectProxySeq()); - } + cb->ice_response(_database->getObjectsByType(type)); } diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index f1417d1a176..1cc700491eb 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -206,12 +206,6 @@ RegistryI::start(bool nowarn) _replicaName = properties->getPropertyWithDefault("IceGrid.Registry.ReplicaName", "Master"); _master = _replicaName == "Master"; - // - // Create the internal registry object adapter and activate it. - // - ObjectAdapterPtr registryAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Internal"); - registryAdapter->activate(); - _reaper = new ReapThread(); _reaper->start(); @@ -222,7 +216,18 @@ RegistryI::start(bool nowarn) // if(_master) { - _instanceName = properties->getPropertyWithDefault("IceGrid.InstanceName", "IceGrid"); + _instanceName = properties->getProperty("IceGrid.InstanceName"); + if(_instanceName.empty()) + { + if(_communicator->getDefaultLocator()) + { + _instanceName = _communicator->getDefaultLocator()->ice_getIdentity().category; + } + else + { + _instanceName = "IceGrid"; + } + } } else { @@ -242,6 +247,12 @@ RegistryI::start(bool nowarn) properties->setProperty("Freeze.DbEnv.Registry.DbPrivate", "0"); // + // Create the internal registry object adapter. + // + ObjectAdapterPtr registryAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Internal"); + registryAdapter->activate(); + + // // Create the internal IceStorm service. // Identity registryTopicManagerId; @@ -258,18 +269,44 @@ RegistryI::start(bool nowarn) _database = new Database(registryAdapter, topicManager, _instanceName, _traceLevels, getInfo(), _master); _wellKnownObjects = new WellKnownObjectsManager(_database); - InternalRegistryPrx internalRegistry; + // + // Get the saved replica/node proxies and remove them from the + // database. + // + Ice::ObjectProxySeq proxies; + + NodePrxSeq nodes; + proxies = _database->getInternalObjectsByType(Node::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + { + nodes.push_back(NodePrx::uncheckedCast(*p)); + _database->removeInternalObject((*p)->ice_getIdentity()); + } + + InternalRegistryPrxSeq replicas; + proxies = _database->getObjectsByType(InternalRegistry::ice_staticId()); + for(Ice::ObjectProxySeq::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + { + replicas.push_back(InternalRegistryPrx::uncheckedCast(*p)); + _database->removeObject((*p)->ice_getIdentity()); + } + + // + // NOTE: The internal registry object must be added only once the + // node/replica proxies are retrieved and removed from the + // database. Otherwise, if some replica/node register as soon as + // the internal registry is setup we might clear valid proxies. + // + InternalRegistryPrx internalRegistry = setupInternalRegistry(registryAdapter); if(_master) { - internalRegistry = setupInternalRegistry(registryAdapter); - NodePrxSeq nodes = registerReplicas(internalRegistry); + NodePrxSeq nodes = registerReplicas(internalRegistry, replicas); registerNodes(internalRegistry, nodes); } else { - internalRegistry = setupInternalRegistry(registryAdapter); _session.create(_replicaName, getInfo(), _database, _wellKnownObjects, internalRegistry); - registerNodes(internalRegistry, _session.getNodes()); + registerNodes(internalRegistry, _session.getNodes(nodes)); } ObjectAdapterPtr serverAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Server"); @@ -1062,10 +1099,9 @@ RegistryI::getSSLInfo(const ConnectionPtr& connection, string& userDN) } NodePrxSeq -RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry) +RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry, const InternalRegistryPrxSeq& replicas) { set<NodePrx> nodes; - InternalRegistryPrxSeq replicas = internalRegistry->getReplicas(); for(InternalRegistryPrxSeq::const_iterator r = replicas.begin(); r != replicas.end(); ++r) { if((*r)->ice_getIdentity() != internalRegistry->ice_getIdentity()) @@ -1078,7 +1114,6 @@ RegistryI::registerReplicas(const InternalRegistryPrx& internalRegistry) } catch(const Ice::LocalException&) { - // TODO: Cleanup the database? } } } @@ -1112,7 +1147,6 @@ RegistryI::registerNodes(const InternalRegistryPrx& internalRegistry, const Node } catch(const Ice::LocalException&) { - // TODO: Cleanup the database? } } } diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h index 39f6072ade2..b1b0e9463f0 100644 --- a/cpp/src/IceGrid/RegistryI.h +++ b/cpp/src/IceGrid/RegistryI.h @@ -90,7 +90,7 @@ private: Glacier2::SSLPermissionsVerifierPrx getSSLPermissionsVerifier(const Ice::LocatorPrx&, const std::string&, bool); Glacier2::SSLInfo getSSLInfo(const Ice::ConnectionPtr&, std::string&); - NodePrxSeq registerReplicas(const InternalRegistryPrx&); + NodePrxSeq registerReplicas(const InternalRegistryPrx&, const InternalRegistryPrxSeq&); void registerNodes(const InternalRegistryPrx&, const NodePrxSeq&); const Ice::CommunicatorPtr _communicator; diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index e121a307386..8413261f8af 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -264,6 +264,22 @@ ReplicaSessionManager::create(const string& name, _wellKnownObjects = wellKnownObjects; _traceLevels = _database->getTraceLevels(); + // + // Initialize the IceGrid::Query objects. The IceGrid::Query + // interface is used to lookup the registry proxy in case it + // becomes unavailable. Since replicas might not always have + // an up to date registry proxy, we need to query all the + // replicas. + // + Ice::EndpointSeq endpoints = comm->getDefaultLocator()->ice_getEndpoints(); + QueryPrx query = QueryPrx::uncheckedCast(comm->stringToProxy(instName + "/Query")); + for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) + { + Ice::EndpointSeq singleEndpoint; + singleEndpoint.push_back(*p); + _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); + } + _thread = new Thread(*this, _master); _thread->start(); notifyAll(); @@ -293,7 +309,7 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica) } NodePrxSeq -ReplicaSessionManager::getNodes() const +ReplicaSessionManager::getNodes(const NodePrxSeq& nodes) const { try { @@ -301,7 +317,7 @@ ReplicaSessionManager::getNodes() const } catch(const Ice::LocalException&) { - return _internalRegistry->getNodes(); + return nodes; } } @@ -324,11 +340,6 @@ void ReplicaSessionManager::registerAllWellKnownObjects() { // - // Try to create the session if it doesn't already exists. - // - _thread->tryCreateSession(0); - - // // If there's an active session, register the well-known objects // with the session. // @@ -374,6 +385,8 @@ ReplicaSessionManager::keepAlive(const ReplicaSessionPrx& session) ReplicaSessionPrx ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUtil::Time& timeout) { + ReplicaSessionPrx session; + auto_ptr<Ice::Exception> exception; try { if(_traceLevels && _traceLevels->replica > 1) @@ -381,7 +394,94 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); out << "trying to establish session with master replica"; } + + set<InternalRegistryPrx> used; + if(!registry->ice_getEndpoints().empty()) + { + try + { + session = createSessionImpl(registry, timeout); + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + used.insert(registry); + _thread->setRegistry(InternalRegistryPrx::uncheckedCast(registry->ice_endpoints(Ice::EndpointSeq()))); + } + } + + if(!session) + { + for(vector<QueryPrx>::const_iterator p = _queryObjects.begin(); p != _queryObjects.end(); ++p) + { + InternalRegistryPrx newRegistry; + try + { + Ice::ObjectPrx obj = (*p)->findObjectById(registry->ice_getIdentity()); + newRegistry = InternalRegistryPrx::uncheckedCast(obj); + if(newRegistry && used.find(newRegistry) == used.end()) + { + session = createSessionImpl(newRegistry, timeout); + _thread->setRegistry(newRegistry); + break; + } + } + catch(const Ice::LocalException& ex) + { + exception.reset(ex.ice_clone()); + if(newRegistry) + { + used.insert(newRegistry); + } + } + } + } + } + catch(const ReplicaActiveException& ex) + { + if(_traceLevels) + { + _traceLevels->logger->error("a replica with the same name is already registered and active"); + } + exception.reset(ex.ice_clone()); + } + catch(const Ice::Exception& ex) + { + exception.reset(ex.ice_clone()); + } + + if(session) + { + if(_traceLevels && _traceLevels->replica > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); + out << "established session with master replica"; + } + } + else + { + if(_traceLevels && _traceLevels->replica > 1) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); + out << "failed to establish session with master replica:\n"; + if(exception.get()) + { + out << *exception.get(); + } + else + { + out << "failed to get replica proxy"; + } + } + } + return session; +} +ReplicaSessionPrx +ReplicaSessionManager::createSessionImpl(const InternalRegistryPrx& registry, IceUtil::Time& timeout) +{ + try + { ReplicaSessionPrx session = registry->registerReplica(_name, _info, _internalRegistry); int t = session->getTimeout(); if(t > 0) @@ -402,24 +502,9 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti // Register all the well-known objects with the replica session. // _wellKnownObjects->registerAll(session); - - if(_traceLevels && _traceLevels->replica > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); - out << "established session with master replica"; - } - return session; } - catch(const ReplicaActiveException&) - { - if(_traceLevels) - { - _traceLevels->logger->error("a replica with the same name is already registered and active"); - } - return 0; - } - catch(const Ice::LocalException& ex) + catch(const Ice::LocalException&) { if(_observer) { @@ -437,13 +522,7 @@ ReplicaSessionManager::createSession(const InternalRegistryPrx& registry, IceUti // Re-register all the well known objects with the local database. // _wellKnownObjects->registerAll(); - - if(_traceLevels && _traceLevels->replica > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); - out << "failed to establish session with master replica:\n" << ex; - } - return 0; + throw; } } diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h index 15c918b8bbd..ec7f6ca500c 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.h +++ b/cpp/src/IceGrid/ReplicaSessionManager.h @@ -32,12 +32,13 @@ typedef IceUtil::Handle<TraceLevels> TraceLevelsPtr; class ReplicaSessionManager : public IceUtil::Monitor<IceUtil::Mutex> { public: - class Thread : public SessionKeepAliveThread<ReplicaSessionPrx, InternalRegistryPrx> + + class Thread : public SessionKeepAliveThread<ReplicaSessionPrx> { public: Thread(ReplicaSessionManager& manager, const InternalRegistryPrx& master) : - SessionKeepAliveThread<ReplicaSessionPrx, InternalRegistryPrx>(master), + SessionKeepAliveThread<ReplicaSessionPrx>(master), _manager(manager) { } @@ -73,7 +74,7 @@ public: void create(const std::string&, const RegistryInfo&, const DatabasePtr&, const WellKnownObjectsManagerPtr&, const InternalRegistryPrx&); void create(const InternalRegistryPrx&); - NodePrxSeq getNodes() const; + NodePrxSeq getNodes(const NodePrxSeq&) const; void destroy(); void registerAllWellKnownObjects(); @@ -84,6 +85,7 @@ private: friend class Thread; ReplicaSessionPrx createSession(const InternalRegistryPrx&, IceUtil::Time&); + ReplicaSessionPrx createSessionImpl(const InternalRegistryPrx&, IceUtil::Time&); void destroySession(const ReplicaSessionPrx&); bool keepAlive(const ReplicaSessionPrx&); @@ -97,6 +99,7 @@ private: DatabasePtr _database; WellKnownObjectsManagerPtr _wellKnownObjects; TraceLevelsPtr _traceLevels; + std::vector<QueryPrx> _queryObjects; }; } diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h index eb96c868265..0dcb3b006e2 100644 --- a/cpp/src/IceGrid/SessionManager.h +++ b/cpp/src/IceGrid/SessionManager.h @@ -16,11 +16,12 @@ #include <IceUtil/Thread.h> #include <IceGrid/Query.h> +#include <IceGrid/Internal.h> namespace IceGrid { -template<class TPrx, class FPrx> +template<class TPrx> class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<IceUtil::Mutex> { enum State @@ -34,8 +35,8 @@ class SessionKeepAliveThread : public IceUtil::Thread, public IceUtil::Monitor<I public: - SessionKeepAliveThread(const FPrx& factory) : - _factory(factory), + SessionKeepAliveThread(const InternalRegistryPrx& registry) : + _registry(registry), _state(Disconnected), _destroySession(false) { @@ -45,7 +46,7 @@ public: run() { TPrx session; - FPrx factory = _factory; + InternalRegistryPrx registry = _registry; bool updateState = false; IceUtil::Time timeout = IceUtil::Time::seconds(10); bool destroy = false; @@ -70,37 +71,10 @@ public: // if(!session) { - session = createSession(factory, timeout); + session = createSession(registry, timeout); updateState |= session; - } - - // - // If we failed to create the session with the factory and - // the factory proxy is a direct proxy, we check with the - // Query interface if the factory proxy was updated. It's - // possible that the factory was restarted for example. - // - if(!session && !factory->ice_getEndpoints().empty()) - { - std::string instanceName = factory->ice_getIdentity().category; - try - { - QueryPrx query = QueryPrx::uncheckedCast( - factory->ice_getCommunicator()->stringToProxy(instanceName + "/Query")); - Ice::ObjectPrx obj = query->findObjectById(factory->ice_getIdentity()); - FPrx newFactory = FPrx::uncheckedCast(obj); - if(newFactory != factory) - { - session = createSession(newFactory, timeout); - factory = newFactory; - updateState |= session; - } - } - catch(const Ice::LocalException&) - { - } } - + if(updateState) { Lock sync(*this); @@ -108,7 +82,6 @@ public: { _state = session ? Connected : Disconnected; } - _factory = factory; _session = session; notifyAll(); } @@ -137,7 +110,7 @@ public: } updateState = _state == Retry || _state == DestroySession; - factory = _factory; + registry = _registry; } if(destroy) @@ -170,7 +143,7 @@ public: } virtual bool - tryCreateSession(FPrx factory) + tryCreateSession(InternalRegistryPrx registry) { { Lock sync(*this); @@ -185,9 +158,9 @@ public: } _state = Retry; - if(factory) + if(registry) { - _factory = factory; + _registry = registry; } notifyAll(); } @@ -226,13 +199,27 @@ public: return _session; } - virtual TPrx createSession(const FPrx&, IceUtil::Time&) = 0; + void + setRegistry(const InternalRegistryPrx& registry) + { + Lock sync(*this); + _registry = registry; + } + + InternalRegistryPrx + getRegistry() const + { + Lock sync(*this); + return _registry; + } + + virtual TPrx createSession(const InternalRegistryPrx&, IceUtil::Time&) = 0; virtual void destroySession(const TPrx&) = 0; virtual bool keepAlive(const TPrx&) = 0; protected: - FPrx _factory; + InternalRegistryPrx _registry; TPrx _session; State _state; bool _destroySession; |