diff options
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/demo/IceGrid/replication/config.node1 | 8 | ||||
-rw-r--r-- | cpp/demo/IceGrid/replication/config.node2 | 8 | ||||
-rw-r--r-- | cpp/demo/IceGrid/replication/config.replica1 | 8 | ||||
-rw-r--r-- | cpp/demo/IceGrid/replication/config.replica2 | 8 | ||||
-rwxr-xr-x | cpp/demo/IceGrid/replication/expect.py | 136 | ||||
-rw-r--r-- | cpp/src/Ice/LocatorInfo.cpp | 6 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridNode.cpp | 32 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 11 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 50 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.h | 17 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 60 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.h | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 9 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 17 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionManager.cpp | 93 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionManager.h | 36 | ||||
-rw-r--r-- | cpp/src/IceGridLib/DiscoveryPluginI.cpp | 220 | ||||
-rw-r--r-- | cpp/test/IceGrid/simple/AllTests.cpp | 58 | ||||
-rwxr-xr-x | cpp/test/IceGrid/simple/run.py | 3 |
21 files changed, 484 insertions, 305 deletions
diff --git a/cpp/demo/IceGrid/replication/config.node1 b/cpp/demo/IceGrid/replication/config.node1 index 42c70b30bca..007b3e1081c 100644 --- a/cpp/demo/IceGrid/replication/config.node1 +++ b/cpp/demo/IceGrid/replication/config.node1 @@ -4,6 +4,14 @@ Ice.Default.Locator=ReplicationDemoIceGrid/Locator:default -h localhost -p 12000 -t 10000:default -h localhost -p 12001 -t 10000:default -h localhost -p 12002 -t 10000 # +# To use the IceGrid discovery plugin comment out the default locator +# property above and un-comment the two properties below. The discovery +# plugin uses multicast to discover IceGrid. +# +#Ice.Plugin.IceGridDiscovery=IceGrid:createIceGridDiscovery +#IceGrid.InstanceName=ReplicationDemoIceGrid + +# # IceGrid node configuration. # IceGrid.Node.Name=node1 diff --git a/cpp/demo/IceGrid/replication/config.node2 b/cpp/demo/IceGrid/replication/config.node2 index a8def29fc15..b787aa62edb 100644 --- a/cpp/demo/IceGrid/replication/config.node2 +++ b/cpp/demo/IceGrid/replication/config.node2 @@ -4,6 +4,14 @@ Ice.Default.Locator=ReplicationDemoIceGrid/Locator:default -h localhost -p 12000 -t 10000:default -h localhost -p 12001 -t 10000:default -h localhost -p 12002 -t 10000 # +# To use the IceGrid discovery plugin comment out the default locator +# property above and un-comment the two properties below. The discovery +# plugin uses multicast to discover IceGrid. +# +#Ice.Plugin.IceGridDiscovery=IceGrid:createIceGridDiscovery +#IceGrid.InstanceName=ReplicationDemoIceGrid + +# # IceGrid node configuration. # IceGrid.Node.Name=node2 diff --git a/cpp/demo/IceGrid/replication/config.replica1 b/cpp/demo/IceGrid/replication/config.replica1 index 7d3d886061f..e23bd32f2a5 100644 --- a/cpp/demo/IceGrid/replication/config.replica1 +++ b/cpp/demo/IceGrid/replication/config.replica1 @@ -4,6 +4,14 @@ Ice.Default.Locator=ReplicationDemoIceGrid/Locator:default -h localhost -p 12000 -t 10000:default -h localhost -p 12002 -t 10000 # +# To use the IceGrid discovery plugin comment out the default locator +# property above and un-comment the two properties below. The discovery +# plugin uses multicast to discover IceGrid. +# +#Ice.Plugin.IceGridDiscovery=IceGrid:createIceGridDiscovery +#IceGrid.InstanceName=ReplicationDemoIceGrid + +# # IceGrid registry configuration. # IceGrid.Registry.Client.Endpoints=default -h localhost -p 12001 -t 10000 diff --git a/cpp/demo/IceGrid/replication/config.replica2 b/cpp/demo/IceGrid/replication/config.replica2 index 11c1f03dd48..e122e707a47 100644 --- a/cpp/demo/IceGrid/replication/config.replica2 +++ b/cpp/demo/IceGrid/replication/config.replica2 @@ -4,6 +4,14 @@ Ice.Default.Locator=ReplicationDemoIceGrid/Locator:default -h localhost -p 12000 -t 10000:default -h localhost -p 12001 -t 10000 # +# To use the IceGrid discovery plugin comment out the default locator +# property above and un-comment the two properties below. The discovery +# plugin uses multicast to discover IceGrid. +# +#Ice.Plugin.IceGridDiscovery=IceGrid:createIceGridDiscovery +#IceGrid.InstanceName=ReplicationDemoIceGrid + +# # IceGrid registry configuration. # IceGrid.Registry.Client.Endpoints=default -h localhost -p 12002 -t 10000 diff --git a/cpp/demo/IceGrid/replication/expect.py b/cpp/demo/IceGrid/replication/expect.py index ed1f93c1b5e..d28f664c6d1 100755 --- a/cpp/demo/IceGrid/replication/expect.py +++ b/cpp/demo/IceGrid/replication/expect.py @@ -35,72 +35,88 @@ if Util.defaultHost: else: args = '' -sys.stdout.write("starting icegridnodes... ") -sys.stdout.flush() -master = Util.spawn(Util.getIceGridRegistry() + ' --Ice.Config=config.master --Ice.PrintAdapterReady --Ice.StdErr= --Ice.StdOut=') -master.expect('IceGrid.Registry.Internal ready\nIceGrid.Registry.Server ready\nIceGrid.Registry.Client ready') -replica1 = Util.spawn(Util.getIceGridRegistry() + ' --Ice.Config=config.replica1 --Ice.PrintAdapterReady --Ice.StdErr= --Ice.StdOut=') -replica1.expect('IceGrid.Registry.Server ready\nIceGrid.Registry.Client ready') -replica2 = Util.spawn(Util.getIceGridRegistry() + ' --Ice.Config=config.replica2 --Ice.PrintAdapterReady --Ice.StdErr= --Ice.StdOut=') -replica2.expect('IceGrid.Registry.Server ready\nIceGrid.Registry.Client ready') -node1 = Util.spawn(Util.getIceGridNode() + ' --Ice.Config=config.node1 --Ice.PrintAdapterReady --Ice.StdErr= --Ice.StdOut= %s' % (args)) -node1.expect('IceGrid.Node ready') -node2 = Util.spawn(Util.getIceGridNode() + ' --Ice.Config=config.node2 --Ice.PrintAdapterReady --Ice.StdErr= --Ice.StdOut= %s' % (args)) -node2.expect('IceGrid.Node ready') -print("ok") +def runDemo(properties, clientProperties): -sys.stdout.write("deploying application... ") -sys.stdout.flush() -admin = Util.spawn(Util.getIceGridAdmin() + ' --Ice.Config=config.client') -admin.expect('>>>') -admin.sendline("application add \'application.xml\'") -admin.expect('>>>') -print("ok") + sys.stdout.write("starting icegridnodes... ") + sys.stdout.flush() + master = Util.spawn(Util.getIceGridRegistry() + properties + ' --Ice.Config=config.master ') + master.expect('IceGrid.Registry.Internal ready\nIceGrid.Registry.Server ready\nIceGrid.Registry.Client ready') + replica1 = Util.spawn(Util.getIceGridRegistry() + properties + ' --Ice.Config=config.replica1') + replica1.expect('IceGrid.Registry.Server ready\nIceGrid.Registry.Client ready') + replica2 = Util.spawn(Util.getIceGridRegistry() + properties + ' --Ice.Config=config.replica2') + replica2.expect('IceGrid.Registry.Server ready\nIceGrid.Registry.Client ready') + node1 = Util.spawn(Util.getIceGridNode() + properties + ' --Ice.Config=config.node1 %s' % (args)) + node1.expect('IceGrid.Node ready') + node2 = Util.spawn(Util.getIceGridNode() + properties + ' --Ice.Config=config.node2 %s' % (args)) + node2.expect('IceGrid.Node ready') + print("ok") -def runtest(): - client = Util.spawn('./client') - client.expect('iterations:') - client.sendline('5') - client.expect('\(in ms\):') - client.sendline('0') - for i in range(1, 5): - client.expect("Hello World!") - client.sendline('x') - client.sendline('x') + sys.stdout.write("deploying application... ") + sys.stdout.flush() + admin = Util.spawn(Util.getIceGridAdmin() + ' --Ice.Config=config.client') + admin.expect('>>>') + admin.sendline("application add \'application.xml\'") + admin.expect('>>>') + print("ok") - client.waitTestSuccess(timeout=1) + def runtest(): + client = Util.spawn('./client ' + clientProperties) + client.expect('iterations:') + client.sendline('5') + client.expect('\(in ms\):') + client.sendline('0') + for i in range(1, 5): + client.expect("Hello World!") + client.sendline('x') + client.sendline('x') -sys.stdout.write("testing client... ") -sys.stdout.flush() -runtest() -print("ok") + client.waitTestSuccess(timeout=1) -sys.stdout.write("testing replication... ") -sys.stdout.flush() -admin.sendline('registry shutdown Replica1') -admin.expect('>>>') -replica1.waitTestSuccess() -runtest() -admin.sendline('registry shutdown Replica2') -admin.expect('>>>') -replica2.waitTestSuccess() -runtest() -print("ok") + sys.stdout.write("testing client... ") + sys.stdout.flush() + runtest() + print("ok") -sys.stdout.write("completing shutdown... ") -sys.stdout.flush() -admin.sendline('node shutdown node1') -admin.expect('>>>') -node1.waitTestSuccess(timeout=120) + sys.stdout.write("testing replication... ") + sys.stdout.flush() + admin.sendline('registry shutdown Replica1') + admin.expect('>>>') + replica1.waitTestSuccess() + runtest() + admin.sendline('registry shutdown Replica2') + admin.expect('>>>') + replica2.waitTestSuccess() + runtest() + print("ok") -admin.sendline('node shutdown node2') -admin.expect('>>>') -node2.waitTestSuccess(timeout=120) + sys.stdout.write("completing shutdown... ") + sys.stdout.flush() + admin.sendline('node shutdown node1') + admin.expect('>>>') + node1.waitTestSuccess(timeout=120) -admin.sendline('registry shutdown Master') -admin.expect('>>>') -master.waitTestSuccess() + admin.sendline('node shutdown node2') + admin.expect('>>>') + node2.waitTestSuccess(timeout=120) -admin.sendline('exit') -admin.waitTestSuccess(timeout=120) -print("ok") + admin.sendline("application remove Simple") + admin.expect('>>>') + + admin.sendline('registry shutdown Master') + admin.expect('>>>') + master.waitTestSuccess() + + admin.sendline('exit') + admin.waitTestSuccess(timeout=120) + print("ok") + +print("running with Ice.Default.Locator set") +properties = ' --Ice.PrintAdapterReady --Ice.StdErr= --Ice.StdOut=' +runDemo(properties, '') + +print("running with IceGridDiscovery") +discovery = ' --Ice.Plugin.IceGridDiscovery=IceGrid:createIceGridDiscovery' + \ + ' --IceGridDiscovery.InstanceName=ReplicationDemoIceGrid' + \ + ' --Ice.Default.Locator=' +properties = ' --Ice.PrintAdapterReady --Ice.StdErr= --Ice.StdOut= --Ice.Default.Locator=' +runDemo(properties + discovery, discovery) diff --git a/cpp/src/Ice/LocatorInfo.cpp b/cpp/src/Ice/LocatorInfo.cpp index dfdadb06c86..e668dc3a248 100644 --- a/cpp/src/Ice/LocatorInfo.cpp +++ b/cpp/src/Ice/LocatorInfo.cpp @@ -540,7 +540,11 @@ IceInternal::LocatorInfo::getLocatorRegistry() // Do not make locator calls from within sync. // LocatorRegistryPrx locatorRegistry = _locator->getRegistry(); - + if(!locatorRegistry) + { + return 0; + } + { IceUtil::Mutex::Lock sync(*this); diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp index 183de011e10..089f2d49c26 100644 --- a/cpp/src/IceGrid/IceGridNode.cpp +++ b/cpp/src/IceGrid/IceGridNode.cpp @@ -331,6 +331,8 @@ NodeService::startImpl(int argc, char* argv[], int& status) return false; } + communicator()->setDefaultLocator(_registry->getLocator()); + // // Set the default locator property to point to the collocated // locator (this property is passed by the activator to each @@ -339,16 +341,10 @@ NodeService::startImpl(int argc, char* argv[], int& status) // if(properties->getProperty("Ice.Default.Locator").empty()) { - Identity locatorId; - locatorId.category = properties->getPropertyWithDefault("IceGrid.InstanceName", "IceGrid"); - locatorId.name = "Locator"; - string endpoints = properties->getProperty("IceGrid.Registry.Client.Endpoints"); - string locPrx = "\"" + communicator()->identityToString(locatorId) + "\" :" + endpoints; - communicator()->setDefaultLocator(Ice::LocatorPrx::uncheckedCast(communicator()->stringToProxy(locPrx))); - properties->setProperty("Ice.Default.Locator", locPrx); + properties->setProperty("Ice.Default.Locator", communicator()->getDefaultLocator()->ice_toString()); } } - else if(properties->getProperty("Ice.Default.Locator").empty()) + else if(!communicator()->getDefaultLocator()) { error("property `Ice.Default.Locator' is not set"); return false; @@ -485,9 +481,21 @@ NodeService::startImpl(int argc, char* argv[], int& status) // // The IceGrid instance name. // - const string instanceName = communicator()->getDefaultLocator()->ice_getIdentity().category; + string instanceName = properties->getProperty("IceGrid.InstanceName"); + if(instanceName.empty()) + { + instanceName = properties->getProperty("IceGridDiscovery.InstanceName"); + } + if(instanceName.empty()) + { + instanceName = communicator()->getDefaultLocator()->ice_getIdentity().category; + } + if(instanceName.empty()) + { + instanceName = "IceGrid"; + } - _sessions.reset(new NodeSessionManager(communicator())); + _sessions.reset(new NodeSessionManager(communicator(), instanceName)); // // Create the server factory. The server factory creates persistent objects @@ -496,7 +504,7 @@ NodeService::startImpl(int argc, char* argv[], int& status) // Identity id = communicator()->stringToIdentity(instanceName + "/Node-" + name); NodePrx nodeProxy = NodePrx::uncheckedCast(_adapter->createProxy(id)); - _node = new NodeI(_adapter, *_sessions, _activator, _timer, traceLevels, nodeProxy, name, mapper); + _node = new NodeI(_adapter, *_sessions, _activator, _timer, traceLevels, nodeProxy, name, mapper, instanceName); _adapter->add(_node, nodeProxy->ice_getIdentity()); _adapter->addServantLocator(new DefaultServantLocator(new NodeServerAdminRouter(_node)), @@ -516,7 +524,7 @@ NodeService::startImpl(int argc, char* argv[], int& status) { communicator()->getDefaultLocator()->ice_timeout(1000)->ice_ping(); } - catch(const Ice::LocalException& ex) + catch(const Ice::Exception& ex) { Warning out(communicator()->getLogger()); out << "couldn't reach the IceGrid registry (this is expected "; diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 5e3fdacf906..97c4c077b7e 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -299,7 +299,8 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, const TraceLevelsPtr& traceLevels, const NodePrx& proxy, const string& name, - const UserAccountMapperPrx& mapper) : + const UserAccountMapperPrx& mapper, + const string& instanceName) : _communicator(adapter->getCommunicator()), _adapter(adapter), _sessions(sessions), @@ -311,6 +312,7 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, _redirectErrToOut(false), _allowEndpointsOverride(false), _waitTime(0), + _instanceName(instanceName), _userAccountMapper(mapper), _platform("IceGrid.Node", _communicator, _traceLevels), _fileCache(new FileCache(_communicator)), @@ -322,7 +324,6 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter, const_cast<string&>(_dataDir) = _platform.getDataDir(); const_cast<string&>(_serversDir) = _dataDir + "/servers"; const_cast<string&>(_tmpDir) = _dataDir + "/tmp"; - const_cast<string&>(_instanceName) = _communicator->getDefaultLocator()->ice_getIdentity().category; const_cast<Ice::Int&>(_waitTime) = props->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60); const_cast<string&>(_outputDir) = props->getProperty("IceGrid.Node.Output"); const_cast<bool&>(_redirectErrToOut) = props->getPropertyAsInt("IceGrid.Node.RedirectErrToOut") > 0; @@ -903,6 +904,12 @@ NodeI::getPropertiesOverride() const return _propertiesOverride; } +const string& +NodeI::getInstanceName() const +{ + return _instanceName; +} + string NodeI::getOutputDir() const { diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h index f212a71b4c6..b438af7ffef 100644 --- a/cpp/src/IceGrid/NodeI.h +++ b/cpp/src/IceGrid/NodeI.h @@ -63,7 +63,7 @@ public: typedef IceUtil::Handle<Update> UpdatePtr; NodeI(const Ice::ObjectAdapterPtr&, NodeSessionManager&, const ActivatorPtr&, const IceUtil::TimerPtr&, - const TraceLevelsPtr&, const NodePrx&, const std::string&, const UserAccountMapperPrx&); + const TraceLevelsPtr&, const NodePrx&, const std::string&, const UserAccountMapperPrx&, const std::string&); virtual ~NodeI(); virtual void loadServer_async(const AMD_Node_loadServerPtr&, @@ -119,6 +119,7 @@ public: FileCachePtr getFileCache() const; NodePrx getProxy() const; const PropertyDescriptorSeq& getPropertiesOverride() const; + const std::string& getInstanceName() const; std::string getOutputDir() const; bool getRedirectErrToOut() const; diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 5a71048a511..5d361f5ce7e 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -16,14 +16,12 @@ using namespace std; using namespace IceGrid; -NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, +NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry, const NodeIPtr& node, - const vector<QueryPrx>& queryObjects) : - SessionKeepAliveThread<NodeSessionPrx>(registry, node->getTraceLevels()->logger), - _node(node), - _queryObjects(queryObjects) + NodeSessionManager& manager) : + SessionKeepAliveThread<NodeSessionPrx>(registry, node->getTraceLevels()->logger), _node(node), _manager(manager) { - assert(registry && node && !_queryObjects.empty()); + assert(registry && node); string name = registry->ice_getIdentity().name; const string prefix("InternalRegistry-"); string::size_type pos = name.find(prefix); @@ -66,7 +64,8 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil if(!session) { vector<Ice::AsyncResultPtr> results; - for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q) + vector<QueryPrx> queryObjects = _manager.getQueryObjects(); + for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) { results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity())); } @@ -227,8 +226,8 @@ NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) } } -NodeSessionManager::NodeSessionManager(const Ice::CommunicatorPtr& communicator) : - SessionManager(communicator), +NodeSessionManager::NodeSessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) : + SessionManager(communicator, instanceName), _destroyed(false), _activated(false) { @@ -251,7 +250,8 @@ NodeSessionManager::create(const NodeIPtr& node) // with replicas (see createdSession below) and this must be done // before the node is activated. // - _thread->tryCreateSession(true, IceUtil::Time::seconds(3)); + _thread->tryCreateSession(); + _thread->waitTryCreateSession(IceUtil::Time::seconds(3)); } void @@ -273,6 +273,7 @@ NodeSessionManager::create(const InternalRegistryPrx& replica) if(thread) { thread->tryCreateSession(); + thread->waitTryCreateSession(); } } @@ -370,7 +371,7 @@ NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas) for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { _replicas.insert((*p)->ice_getIdentity()); - addReplicaSession(*p)->tryCreateSession(false); + addReplicaSession(*p)->tryCreateSession(); } } @@ -383,7 +384,7 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica) return; } _replicas.insert(replica->ice_getIdentity()); - addReplicaSession(replica)->tryCreateSession(false); + addReplicaSession(replica)->tryCreateSession(); } void @@ -408,7 +409,6 @@ NodeSessionKeepAliveThreadPtr NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica) { assert(!_destroyed); - NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity()); NodeSessionKeepAliveThreadPtr thread; if(p != _sessions.end()) @@ -418,7 +418,7 @@ NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica) } else { - thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects); + thread = new NodeSessionKeepAliveThread(replica, _node, *this); _sessions.insert(make_pair(replica->ice_getIdentity(), thread)); thread->start(); } @@ -535,7 +535,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) { vector<Ice::AsyncResultPtr> results1; vector<Ice::AsyncResultPtr> results2; - vector<QueryPrx> queryObjects = findAllQueryObjects(); + vector<QueryPrx> queryObjects = findAllQueryObjects(false); // // Below we try to retrieve internal registry proxies either @@ -576,7 +576,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) } catch(const Ice::LocalException&) { - // IGNORE + // Ignore. } } for(vector<Ice::AsyncResultPtr>::const_iterator p = results2.begin(); p != results2.end(); ++p) @@ -609,7 +609,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) } catch(const Ice::LocalException&) { - // IGNORE + // Ignore } } @@ -639,9 +639,17 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) if((*p)->ice_getIdentity() != _master->ice_getIdentity()) { _replicas.insert((*p)->ice_getIdentity()); - NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p); - session->tryCreateSession(false); - sessions.push_back(session); + + if(_sessions.find((*p)->ice_getIdentity()) == _sessions.end()) + { + NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p); + session->tryCreateSession(); + sessions.push_back(session); + } + else + { + addReplicaSession(*p); // Update the session + } } } } @@ -664,7 +672,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) { break; } - (*p)->tryCreateSession(true, timeout); + (*p)->waitTryCreateSession(timeout); } } diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h index 7abe10554cf..bd8552ddb8e 100644 --- a/cpp/src/IceGrid/NodeSessionManager.h +++ b/cpp/src/IceGrid/NodeSessionManager.h @@ -25,11 +25,13 @@ namespace IceGrid class NodeI; typedef IceUtil::Handle<NodeI> NodeIPtr; +class NodeSessionManager; + class NodeSessionKeepAliveThread : public SessionKeepAliveThread<NodeSessionPrx> { public: - NodeSessionKeepAliveThread(const InternalRegistryPrx&, const NodeIPtr&, const std::vector<QueryPrx>&); + NodeSessionKeepAliveThread(const InternalRegistryPrx&, const NodeIPtr&, NodeSessionManager&); virtual NodeSessionPrx createSession(InternalRegistryPrx&, IceUtil::Time&); virtual void destroySession(const NodeSessionPrx&); @@ -43,7 +45,7 @@ protected: const NodeIPtr _node; const std::string _name; - const std::vector<QueryPrx> _queryObjects; + NodeSessionManager& _manager; }; typedef IceUtil::Handle<NodeSessionKeepAliveThread> NodeSessionKeepAliveThreadPtr; @@ -51,7 +53,7 @@ class NodeSessionManager : public SessionManager { public: - NodeSessionManager(const Ice::CommunicatorPtr&); + NodeSessionManager(const Ice::CommunicatorPtr&, const std::string&); void create(const NodeIPtr&); void create(const InternalRegistryPrx&); @@ -65,6 +67,7 @@ public: void replicaRemoved(const InternalRegistryPrx&); NodeSessionPrx getMasterNodeSession() const { return _thread->getSession(); } + std::vector<IceGrid::QueryPrx> getQueryObjects() { return findAllQueryObjects(true); } private: @@ -84,9 +87,7 @@ private: { public: - Thread(NodeSessionManager& manager) : - NodeSessionKeepAliveThread(manager._master, manager._node, manager._queryObjects), - _manager(manager) + Thread(NodeSessionManager& manager) : NodeSessionKeepAliveThread(manager._master, manager._node, manager) { } @@ -113,10 +114,6 @@ private: _manager.reapReplicas(); return alive; } - - private: - - NodeSessionManager& _manager; }; typedef IceUtil::Handle<Thread> ThreadPtr; friend class Thread; diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index 8ab2ed0bd92..071942cd8f5 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -205,7 +205,6 @@ RegistryI::RegistryI(const CommunicatorPtr& communicator, _nowarn(nowarn), _readonly(readonly), _initFromReplica(initFromReplica), - _session(communicator), _platform("IceGrid.Registry", communicator, traceLevels) { } @@ -337,36 +336,38 @@ RegistryI::startImpl() properties->setProperty("Ice.ACM.Server.Close", "3"); // Close on invocation and idle. - if(!_master && properties->getProperty("Ice.Default.Locator").empty() && - properties->getProperty("Ice.Default.Locator").empty()) + if(!_master && !_communicator->getDefaultLocator()) { Error out(_communicator->getLogger()); out << "property `Ice.Default.Locator' is not set"; return false; } + else if(_master) + { + _communicator->setDefaultLocator(0); // Clear the default locator in case it's set. + } // // Get the instance name // - if(_master) + _instanceName = properties->getProperty("IceGrid.InstanceName"); + if(_instanceName.empty()) { - _instanceName = properties->getProperty("IceGrid.InstanceName"); - if(_instanceName.empty()) - { - if(_communicator->getDefaultLocator()) - { - _instanceName = _communicator->getDefaultLocator()->ice_getIdentity().category; - } - else - { - _instanceName = "IceGrid"; - } - } + _instanceName = properties->getProperty("IceGridDiscovery.InstanceName"); } - else + if(_instanceName.empty() && _communicator->getDefaultLocator()) { _instanceName = _communicator->getDefaultLocator()->ice_getIdentity().category; } + if(_instanceName.empty()) + { + _instanceName = "IceGrid"; + } + + // + // Create the replica session manager + // + _session.reset(new ReplicaSessionManager(_communicator, _instanceName)); // // Create the registry database. @@ -485,7 +486,7 @@ RegistryI::startImpl() if(!proxy) { id.name = "InternalRegistry-" + (_initFromReplica.empty() ? "Master" : _initFromReplica); - proxy = _session.findInternalRegistryForReplica(id); + proxy = _session->findInternalRegistryForReplica(id); } if(!proxy) @@ -550,8 +551,8 @@ RegistryI::startImpl() else { InternalReplicaInfoPtr info = _platform.getInternalReplicaInfo(); - _session.create(_replicaName, info, _database, _wellKnownObjects, internalRegistry); - registerNodes(internalRegistry, _session.getNodes(nodes)); + _session->create(_replicaName, info, _database, _wellKnownObjects, internalRegistry); + registerNodes(internalRegistry, _session->getNodes(nodes)); } _serverAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Server"); @@ -600,7 +601,7 @@ RegistryI::startImpl() } else { - _session.registerAllWellKnownObjects(); + _session->registerAllWellKnownObjects(); } // @@ -685,7 +686,7 @@ RegistryI::setupLocatorRegistry() Identity locatorRegId; locatorRegId.category = _instanceName; locatorRegId.name = "LocatorRegistry-" + _replicaName; - ObjectPrx obj = _serverAdapter->add(new LocatorRegistryI(_database, dynReg, _master, _session), locatorRegId); + ObjectPrx obj = _serverAdapter->add(new LocatorRegistryI(_database, dynReg, _master, *_session), locatorRegId); return LocatorRegistryPrx::uncheckedCast(obj); } @@ -738,7 +739,7 @@ RegistryI::setupInternalRegistry() internalRegistryId.category = _instanceName; internalRegistryId.name = "InternalRegistry-" + _replicaName; assert(_reaper); - ObjectPtr internalRegistry = new InternalRegistryI(this, _database, _reaper, _wellKnownObjects, _session); + ObjectPtr internalRegistry = new InternalRegistryI(this, _database, _reaper, _wellKnownObjects, *_session); Ice::ObjectPrx proxy = _registryAdapter->add(internalRegistry, internalRegistryId); _wellKnownObjects->add(proxy, InternalRegistry::ice_staticId()); @@ -903,8 +904,11 @@ RegistryI::setupAdminSessionFactory(const Ice::ObjectPtr& router, const IceGrid: void RegistryI::stop() { - _session.destroy(); - + if(_session.get()) + { + _session->destroy(); + } + // // We destroy the topics before to shutdown the communicator to // ensure that there will be no more invocations on IceStorm once @@ -1233,6 +1237,12 @@ RegistryI::createAdminCallbackProxy(const Identity& id) const return _serverAdapter->createProxy(id); } +Ice::LocatorPrx +RegistryI::getLocator() +{ + return _wellKnownObjects->getLocator(); +} + Glacier2::PermissionsVerifierPrx RegistryI::getPermissionsVerifier(const IceGrid::LocatorPrx& locator, const string& verifierProperty, diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h index 75da1fa1259..e4a303e533b 100644 --- a/cpp/src/IceGrid/RegistryI.h +++ b/cpp/src/IceGrid/RegistryI.h @@ -79,6 +79,8 @@ public: const Ice::ObjectAdapterPtr& getRegistryAdapter() { return _registryAdapter; } + Ice::LocatorPrx getLocator(); + private: Ice::LocatorRegistryPrx setupLocatorRegistry(); @@ -117,7 +119,7 @@ private: IceUtil::TimerPtr _timer; SessionServantManagerPtr _servantManager; int _sessionTimeout; - ReplicaSessionManager _session; + IceUtil::UniquePtr<ReplicaSessionManager> _session; mutable PlatformInfo _platform; Glacier2::PermissionsVerifierPrx _nullPermissionsVerifier; diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index b053b21a988..fbce90c0816 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -282,7 +282,8 @@ private: }; -ReplicaSessionManager::ReplicaSessionManager(const Ice::CommunicatorPtr& communicator) : SessionManager(communicator) +ReplicaSessionManager::ReplicaSessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) : + SessionManager(communicator, instanceName) { } @@ -309,6 +310,7 @@ ReplicaSessionManager::create(const string& name, } _thread->tryCreateSession(); + _thread->waitTryCreateSession(); } void @@ -330,6 +332,7 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica) _thread->setRegistry(replica); _thread->tryCreateSession(); + _thread->waitTryCreateSession(); } NodePrxSeq @@ -412,7 +415,7 @@ IceGrid::InternalRegistryPrx ReplicaSessionManager::findInternalRegistryForReplica(const Ice::Identity& id) { vector<Ice::AsyncResultPtr> results; - vector<QueryPrx> queryObjects = findAllQueryObjects(); + vector<QueryPrx> queryObjects = findAllQueryObjects(true); for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) { results.push_back((*q)->begin_findObjectById(id)); @@ -489,7 +492,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim if(!session) { vector<Ice::AsyncResultPtr> results; - vector<QueryPrx> queryObjects = findAllQueryObjects(); + vector<QueryPrx> queryObjects = findAllQueryObjects(false); for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) { results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity())); diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h index 8e5c7308dce..31a9de77ae2 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.h +++ b/cpp/src/IceGrid/ReplicaSessionManager.h @@ -69,7 +69,7 @@ public: }; typedef IceUtil::Handle<Thread> ThreadPtr; - ReplicaSessionManager(const Ice::CommunicatorPtr&); + ReplicaSessionManager(const Ice::CommunicatorPtr&, const std::string&); void create(const std::string&, const InternalReplicaInfoPtr&, const DatabasePtr&, const WellKnownObjectsManagerPtr&, const InternalRegistryPrx&); void create(const InternalRegistryPrx&); diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index 33df2ed49e1..0234020b553 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -3174,9 +3174,20 @@ ServerI::getProperties(const InternalServerDescriptorPtr& desc) { if(getProperty(p->second, "Ice.Default.Locator").empty()) { - p->second.push_back( - createProperty("Ice.Default.Locator", - _node->getCommunicator()->getProperties()->getProperty("Ice.Default.Locator"))); + Ice::PropertiesPtr properties = _node->getCommunicator()->getProperties(); + + string locator = properties->getProperty("Ice.Default.Locator"); + if(!locator.empty()) + { + p->second.push_back(createProperty("Ice.Default.Locator", locator)); + } + + string discoveryPlugin = properties->getProperty("Ice.Plugin.IceGridDiscovery"); + if(!discoveryPlugin.empty()) + { + p->second.push_back(createProperty("Ice.Plugin.IceGridDiscovery", discoveryPlugin)); + p->second.push_back(createProperty("IceGridDiscovery.InstanceName", _node->getInstanceName())); + } } if(!overrides.empty()) diff --git a/cpp/src/IceGrid/SessionManager.cpp b/cpp/src/IceGrid/SessionManager.cpp index 826195ed4ac..dcc4d500c2f 100644 --- a/cpp/src/IceGrid/SessionManager.cpp +++ b/cpp/src/IceGrid/SessionManager.cpp @@ -13,26 +13,14 @@ using namespace std; using namespace IceGrid; -SessionManager::SessionManager(const Ice::CommunicatorPtr& communicator) : _communicator(communicator) +SessionManager::SessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) : + _communicator(communicator), _instanceName(instanceName) { - if(communicator->getDefaultLocator()) + Ice::LocatorPrx prx = communicator->getDefaultLocator(); + if(prx) { - Ice::ObjectPrx prx = communicator->getDefaultLocator(); - - // - // Derive the query objects from the locator proxy endpoints. - // - Ice::EndpointSeq endpoints = prx->ice_getEndpoints(); - Ice::Identity id = prx->ice_getIdentity(); - id.name = "Query"; - QueryPrx query = QueryPrx::uncheckedCast(prx->ice_identity(id)); - for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) - { - Ice::EndpointSeq singleEndpoint; - singleEndpoint.push_back(*p); - _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); - } - + Ice::Identity id; + id.category = instanceName; id.name = "InternalRegistry-Master"; _master = InternalRegistryPrx::uncheckedCast(prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq())); } @@ -43,20 +31,68 @@ SessionManager::~SessionManager() } vector<QueryPrx> -SessionManager::findAllQueryObjects() +SessionManager::findAllQueryObjects(bool cached) { - vector<QueryPrx> queryObjects = _queryObjects; - for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q) + vector<QueryPrx> queryObjects; { - Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection(); - if(connection) + Lock sync(*this); + if(cached && !_queryObjects.empty()) { - try + return _queryObjects; + } + queryObjects = _queryObjects; + } + + if(!cached) + { + for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q) + { + Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection(); + if(connection) + { + try + { + connection->close(false); + } + catch(const Ice::LocalException&) + { + } + } + } + queryObjects.clear(); + } + + if(queryObjects.empty()) + { + Ice::LocatorPrx locator = _communicator->getDefaultLocator(); + if(locator) + { + Ice::Identity id; + id.category = _instanceName; + id.name = "Query"; + QueryPrx query = QueryPrx::uncheckedCast(locator->ice_identity(id)); + Ice::EndpointSeq endpoints = query->ice_getEndpoints(); + if(endpoints.empty()) { - connection->close(false); + try + { + Ice::ObjectPrx r = locator->findObjectById(id); + if(r) + { + endpoints = r->ice_getEndpoints(); + } + } + catch(const Ice::Exception&) + { + // Ignore. + } } - catch(const Ice::LocalException&) + + for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p) { + Ice::EndpointSeq singleEndpoint; + singleEndpoint.push_back(*p); + queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint))); } } } @@ -103,5 +139,8 @@ SessionManager::findAllQueryObjects() } } while(proxies.size() != previousSize); - return queryObjects; + + Lock sync(*this); + _queryObjects.swap(queryObjects); + return _queryObjects; } diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h index e1d3d6e90de..2ff28178032 100644 --- a/cpp/src/IceGrid/SessionManager.h +++ b/cpp/src/IceGrid/SessionManager.h @@ -192,8 +192,8 @@ public: return _state != Destroyed; } - virtual void - tryCreateSession(bool waitForTry = true, const IceUtil::Time& timeout = IceUtil::Time()) + void + tryCreateSession() { { Lock sync(*this); @@ -212,23 +212,24 @@ public: } notifyAll(); } + } - if(waitForTry) + void + waitTryCreateSession(const IceUtil::Time& timeout = IceUtil::Time()) + { + Lock sync(*this); + // Wait until the action is executed and the state changes. + while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress) { - Lock sync(*this); - // Wait until the action is executed and the state changes. - while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress) + if(timeout == IceUtil::Time()) { - if(timeout == IceUtil::Time()) - { - wait(); - } - else + wait(); + } + else + { + if(!timedWait(timeout)) { - if(!timedWait(timeout)) - { - break; - } + break; } } } @@ -320,16 +321,17 @@ class SessionManager : public IceUtil::Monitor<IceUtil::Mutex> { public: - SessionManager(const Ice::CommunicatorPtr&); + SessionManager(const Ice::CommunicatorPtr&, const std::string&); virtual ~SessionManager(); virtual bool isDestroyed() = 0; protected: - std::vector<IceGrid::QueryPrx> findAllQueryObjects(); + std::vector<IceGrid::QueryPrx> findAllQueryObjects(bool); Ice::CommunicatorPtr _communicator; + std::string _instanceName; InternalRegistryPrx _master; std::vector<IceGrid::QueryPrx> _queryObjects; }; diff --git a/cpp/src/IceGridLib/DiscoveryPluginI.cpp b/cpp/src/IceGridLib/DiscoveryPluginI.cpp index 02b9327da37..521bda66f22 100644 --- a/cpp/src/IceGridLib/DiscoveryPluginI.cpp +++ b/cpp/src/IceGridLib/DiscoveryPluginI.cpp @@ -29,34 +29,47 @@ class Request : public IceUtil::Shared { public: - Request(LocatorI* locator) : _locator(locator) + Request(LocatorI* locator, + const string& operation, + ::Ice::OperationMode mode, + const pair<const Ice::Byte*, const Ice::Byte*>& inParams, + const ::Ice::Context& ctx, + const Ice::AMD_Object_ice_invokePtr& amdCB) : + _locator(locator), + _operation(operation), + _mode(mode), + _context(ctx), + _inParams(inParams.first, inParams.second), + _amdCB(amdCB) { } - virtual void invoke(const Ice::LocatorPrx&) = 0; - virtual void response(const Ice::ObjectPrx&) = 0; + void invoke(const Ice::LocatorPrx&); + void response(const bool, const pair<const Ice::Byte*, const Ice::Byte*>&); + void exception(const Ice::Exception&); protected: LocatorI* _locator; + const string _operation; + const Ice::OperationMode _mode; + const Ice::Context _context; + const Ice::ByteSeq _inParams; + const Ice::AMD_Object_ice_invokePtr _amdCB; + Ice::LocatorPrx _locatorPrx; }; typedef IceUtil::Handle<Request> RequestPtr; -class LocatorI : public Ice::Locator, private IceUtil::TimerTask, private IceUtil::Monitor<IceUtil::Mutex> +class LocatorI : public Ice::BlobjectArrayAsync, private IceUtil::TimerTask, private IceUtil::Monitor<IceUtil::Mutex> { public: - LocatorI(const LookupPrx&, const Ice::PropertiesPtr&); + LocatorI(const LookupPrx&, const Ice::PropertiesPtr&, const string&, const LocatorPrx&); void setLookupReply(const LookupReplyPrx&); - virtual void findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr&, const Ice::Identity&, - const Ice::Current&) const; - - virtual void findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr&, const string&, - const Ice::Current&) const; - - virtual Ice::LocatorRegistryPrx getRegistry(const Ice::Current&) const; + virtual void ice_invoke_async(const Ice::AMD_Object_ice_invokePtr&, const pair<const Ice::Byte*, const Ice::Byte*>&, + const Ice::Current&); void foundLocator(const LocatorPrx&); void invoke(const Ice::LocatorPrx&, const RequestPtr&); @@ -64,18 +77,20 @@ public: private: virtual void runTimerTask(); - void queueRequest(const RequestPtr&); const LookupPrx _lookup; const IceUtil::Time _timeout; const int _retryCount; + const IceUtil::Time _retryDelay; const IceUtil::TimerPtr _timer; string _instanceName; bool _warned; LookupReplyPrx _lookupReply; Ice::LocatorPrx _locator; + IceGrid::LocatorPrx _voidLocator; + IceUtil::Time _nextRetry; int _pendingRetryCount; vector<RequestPtr> _pendingRequests; }; @@ -96,52 +111,46 @@ private: const LocatorIPtr _locator; }; -class ObjectRequest : public Request +// +// The void locator implementation below is used when no locator is found. +// +class VoidLocatorI : public IceGrid::Locator { public: - - ObjectRequest(LocatorI* locator, const Ice::Identity& id, const Ice::AMD_Locator_findObjectByIdPtr& amdCB) : - Request(locator), _id(id), _amdCB(amdCB) + + virtual void + findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& amdCB, + const Ice::Identity&, + const Ice::Current&) const { + amdCB->ice_response(0); } - virtual void invoke(const Ice::LocatorPrx&); - virtual void response(const Ice::ObjectPrx&); - - void - exception(const Ice::Exception&) + virtual void + findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, + const string&, + const Ice::Current&) const { - _locator->invoke(_locatorPrx, this); // Retry with new locator proxy + amdCB->ice_response(0); } -private: - - const Ice::Identity _id; - Ice::AMD_Locator_findObjectByIdPtr _amdCB; -}; - -class AdapterRequest : public Request -{ -public: - - AdapterRequest(LocatorI* locator, const string& adapterId, const Ice::AMD_Locator_findAdapterByIdPtr& amdCB) : - Request(locator), _adapterId(adapterId), _amdCB(amdCB) + virtual Ice::LocatorRegistryPrx + getRegistry(const Ice::Current&) const { + return 0; } - virtual void invoke(const Ice::LocatorPrx&); - virtual void response(const Ice::ObjectPrx&); - - void - exception(const Ice::Exception&) + virtual IceGrid::RegistryPrx + getLocalRegistry(const Ice::Current&) const { - _locator->invoke(_locatorPrx, this); // Retry with new locator proxy. + return 0; } -private: - - const string _adapterId; - const Ice::AMD_Locator_findAdapterByIdPtr _amdCB; + virtual IceGrid::QueryPrx + getLocalQuery(const Ice::Current&) const + { + return 0; + } }; } @@ -231,8 +240,14 @@ DiscoveryPluginI::initialize() throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str()); } - LocatorIPtr locator = new LocatorI(LookupPrx::uncheckedCast(lookupPrx), properties); - _communicator->setDefaultLocator(Ice::LocatorPrx::uncheckedCast(_locatorAdapter->addWithUUID(locator))); + LocatorPrx voidLocator = LocatorPrx::uncheckedCast(_locatorAdapter->addWithUUID(new VoidLocatorI())); + + string instanceName = properties->getProperty("IceGridDiscovery.InstanceName"); + Ice::Identity id; + id.name = "Locator"; + id.category = !instanceName.empty() ? instanceName : IceUtil::generateUUID(); + LocatorIPtr locator = new LocatorI(LookupPrx::uncheckedCast(lookupPrx), properties, instanceName, voidLocator); + _communicator->setDefaultLocator(Ice::LocatorPrx::uncheckedCast(_locatorAdapter->add(locator, id))); Ice::ObjectPrx lookupReply = _replyAdapter->addWithUUID(new LookupReplyI(locator))->ice_datagram(); locator->setLookupReply(LookupReplyPrx::uncheckedCast(lookupReply)); @@ -249,43 +264,38 @@ DiscoveryPluginI::destroy() } void -AdapterRequest::invoke(const Ice::LocatorPrx& l) +Request::invoke(const Ice::LocatorPrx& l) { _locatorPrx = l; - l->begin_findAdapterById(_adapterId, Ice::newCallback_Locator_findAdapterById(this, - &AdapterRequest::response, - &AdapterRequest::exception)); -} - -void -AdapterRequest::response(const Ice::ObjectPrx& prx) -{ - _amdCB->ice_response(prx); + l->begin_ice_invoke(_operation, _mode, _inParams, _context, + Ice::newCallback_Object_ice_invoke(this, &Request::response, &Request::exception)); } void -ObjectRequest::invoke(const Ice::LocatorPrx& l) +Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams) { - _locatorPrx = l; - l->begin_findObjectById(_id, Ice::newCallback_Locator_findObjectById(this, - &ObjectRequest::response, - &ObjectRequest::exception)); + _amdCB->ice_response(ok, outParams); } void -ObjectRequest::response(const Ice::ObjectPrx& prx) +Request::exception(const Ice::Exception& ex) { - _amdCB->ice_response(prx); + _locator->invoke(_locatorPrx, this); // Retry with new locator proxy } -LocatorI::LocatorI(const LookupPrx& lookup, const Ice::PropertiesPtr& properties) : +LocatorI::LocatorI(const LookupPrx& lookup, + const Ice::PropertiesPtr& props, + const string& instanceName, + const IceGrid::LocatorPrx& voidLocator) : _lookup(lookup), - _timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceGridDiscovery.Timeout", 300))), - _retryCount(properties->getPropertyAsIntWithDefault("IceGridDiscovery.RetryCount", 3)), + _timeout(IceUtil::Time::milliSeconds(props->getPropertyAsIntWithDefault("IceGridDiscovery.Timeout", 300))), + _retryCount(props->getPropertyAsIntWithDefault("IceGridDiscovery.RetryCount", 3)), + _retryDelay(IceUtil::Time::milliSeconds(props->getPropertyAsIntWithDefault("IceGridDiscovery.RetryDelay", 2000))), _timer(IceInternal::getInstanceTimer(lookup->ice_getCommunicator())), - _instanceName(properties->getProperty("IceGridDiscovery.InstanceName")), + _instanceName(instanceName), _warned(false), _locator(lookup->ice_getCommunicator()->getDefaultLocator()), + _voidLocator(voidLocator), _pendingRetryCount(0) { } @@ -297,45 +307,18 @@ LocatorI::setLookupReply(const LookupReplyPrx& lookupReply) } void -LocatorI::findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& amdCB, - const Ice::Identity& id, - const Ice::Current&) const +LocatorI::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB, + const pair<const Ice::Byte*, const Ice::Byte*>& inParams, + const Ice::Current& current) { - const_cast<LocatorI*>(this)->invoke(0, new ObjectRequest(const_cast<LocatorI*>(this), id, amdCB)); -} - -void -LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB, - const string& adapterId, - const Ice::Current&) const -{ - const_cast<LocatorI*>(this)->invoke(0, new AdapterRequest(const_cast<LocatorI*>(this), adapterId, amdCB)); -} - -Ice::LocatorRegistryPrx -LocatorI::getRegistry(const Ice::Current&) const -{ - Ice::LocatorPrx locator; - { - Lock sync(*this); - if(!_locator) - { - const_cast<LocatorI*>(this)->queueRequest(0); // Search for locator if not already doing so. - while(_pendingRetryCount > 0) - { - wait(); - } - } - locator = _locator; - } - return locator ? locator->getRegistry() : Ice::LocatorRegistryPrx(); + invoke(0, new Request(this, current.operation, current.mode, inParams, current.ctx, amdCB)); } void LocatorI::foundLocator(const LocatorPrx& locator) { Lock sync(*this); - if(!locator) + if(!locator || (!_instanceName.empty() && locator->ice_getIdentity().category != _instanceName)) { return; } @@ -401,7 +384,7 @@ LocatorI::foundLocator(const LocatorPrx& locator) _locator = locator; if(_instanceName.empty()) { - _instanceName = _locator->ice_getIdentity().category; + _instanceName = _locator->ice_getIdentity().category; // Stick to the first discovered locator. } } @@ -413,7 +396,6 @@ LocatorI::foundLocator(const LocatorPrx& locator) (*p)->invoke(_locator); } _pendingRequests.clear(); - notifyAll(); } void @@ -424,10 +406,22 @@ LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request) { request->invoke(_locator); } + else if(IceUtil::Time::now() < _nextRetry) + { + request->invoke(_voidLocator); // Don't retry to find a locator before the retry delay expires + } else { _locator = 0; - queueRequest(request); + + _pendingRequests.push_back(request); + + if(_pendingRetryCount == 0) // No request in progress + { + _pendingRetryCount = _retryCount; + _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. + _timer->schedule(this, _timeout); + } } } @@ -445,26 +439,10 @@ LocatorI::runTimerTask() assert(!_pendingRequests.empty()); for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p) { - (*p)->response(0); + (*p)->invoke(_voidLocator); // Send pending requests on void locator. } _pendingRequests.clear(); - notifyAll(); - } -} - -void -LocatorI::queueRequest(const RequestPtr& request) -{ - if(request) - { - _pendingRequests.push_back(request); - } - - if(_pendingRetryCount == 0) // No request in progress - { - _pendingRetryCount = _retryCount; - _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request. - _timer->schedule(this, _timeout); + _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires } } diff --git a/cpp/test/IceGrid/simple/AllTests.cpp b/cpp/test/IceGrid/simple/AllTests.cpp index 5ee95f3dad4..184e18adac7 100644 --- a/cpp/test/IceGrid/simple/AllTests.cpp +++ b/cpp/test/IceGrid/simple/AllTests.cpp @@ -50,14 +50,72 @@ allTests(const Ice::CommunicatorPtr& communicator) cout << "testing discovery... " << flush; { + // Add test well-known object + IceGrid::RegistryPrx registry = IceGrid::RegistryPrx::checkedCast( + communicator->stringToProxy(communicator->getDefaultLocator()->ice_getIdentity().category + "/Registry")); + test(registry); + + IceGrid::AdminSessionPrx session = registry->createAdminSession("foo", "bar"); + session->getAdmin()->addObjectWithType(base, "::Test"); + session->destroy(); + + // + // Ensure the IceGrid discovery locator can discover the + // registries and make sure locator requests are forwarded. + // Ice::InitializationData initData; initData.properties = communicator->getProperties()->clone(); initData.properties->setProperty("Ice.Default.Locator", ""); initData.properties->setProperty("Ice.Plugin.IceGridDiscovery", "IceGrid:createIceGridDiscovery"); + initData.properties->setProperty("AdapterForDiscoveryTest.AdapterId", "discoveryAdapter"); + initData.properties->setProperty("AdapterForDiscoveryTest.Endpoints", "default"); Ice::CommunicatorPtr com = Ice::initialize(initData); test(com->getDefaultLocator()); com->stringToProxy("test @ TestAdapter")->ice_ping(); + com->stringToProxy("test")->ice_ping(); + + test(com->getDefaultLocator()->getRegistry()); + test(IceGrid::LocatorPrx::uncheckedCast(com->getDefaultLocator())->getLocalRegistry()); + test(IceGrid::LocatorPrx::uncheckedCast(com->getDefaultLocator())->getLocalQuery()); + + Ice::ObjectAdapterPtr adapter = com->createObjectAdapter("AdapterForDiscoveryTest"); + adapter->activate(); + adapter->deactivate(); + + com->destroy(); + + // + // Now, ensure that the IceGrid discovery locator correctly + // handles failure to find a locator. + // + initData.properties->setProperty("IceGridDiscovery.InstanceName", "unknown"); + initData.properties->setProperty("IceGridDiscovery.RetryCount", "1"); + initData.properties->setProperty("IceGridDiscovery.Timeout", "100"); + com = Ice::initialize(initData); + test(com->getDefaultLocator()); + try + { + com->stringToProxy("test @ TestAdapter")->ice_ping(); + } + catch(const Ice::NoEndpointException&) + { + } + try + { + com->stringToProxy("test")->ice_ping(); + } + catch(const Ice::NoEndpointException&) + { + } + test(!com->getDefaultLocator()->getRegistry()); + test(!IceGrid::LocatorPrx::uncheckedCast(com->getDefaultLocator())->getLocalRegistry()); + test(!IceGrid::LocatorPrx::uncheckedCast(com->getDefaultLocator())->getLocalQuery()); + + adapter = com->createObjectAdapter("AdapterForDiscoveryTest"); + adapter->activate(); + adapter->deactivate(); + com->destroy(); } cout << "ok" << endl; diff --git a/cpp/test/IceGrid/simple/run.py b/cpp/test/IceGrid/simple/run.py index 80f594895d9..14478af1931 100755 --- a/cpp/test/IceGrid/simple/run.py +++ b/cpp/test/IceGrid/simple/run.py @@ -20,6 +20,9 @@ if len(path) == 0: sys.path.append(os.path.join(path[0], "scripts")) import TestUtil, IceGridAdmin +# Test IceGrid discovery with multiple replicas +IceGridAdmin.nreplicas=2 + # # Test client/server without on demand activation. # |