summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-07-25 11:54:05 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-07-25 11:54:05 +0200
commit78aa45dcf75316fc2db511938b7c8249511e54ca (patch)
treeb68cda42fe86a4b868b7e72de55aea4d00dba26b /cpp/src
parentBuild fixes for MinGW 4.7.2-32 (diff)
downloadice-78aa45dcf75316fc2db511938b7c8249511e54ca.tar.bz2
ice-78aa45dcf75316fc2db511938b7c8249511e54ca.tar.xz
ice-78aa45dcf75316fc2db511938b7c8249511e54ca.zip
Improved IceGrid discovery, it now works with icegrid registry slaves and nodes
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/Ice/LocatorInfo.cpp6
-rw-r--r--cpp/src/IceGrid/IceGridNode.cpp32
-rw-r--r--cpp/src/IceGrid/NodeI.cpp11
-rw-r--r--cpp/src/IceGrid/NodeI.h3
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp50
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.h17
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp60
-rw-r--r--cpp/src/IceGrid/RegistryI.h4
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp9
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.h2
-rw-r--r--cpp/src/IceGrid/ServerI.cpp17
-rw-r--r--cpp/src/IceGrid/SessionManager.cpp93
-rw-r--r--cpp/src/IceGrid/SessionManager.h36
-rw-r--r--cpp/src/IceGridLib/DiscoveryPluginI.cpp220
14 files changed, 315 insertions, 245 deletions
diff --git a/cpp/src/Ice/LocatorInfo.cpp b/cpp/src/Ice/LocatorInfo.cpp
index dfdadb06c86..e668dc3a248 100644
--- a/cpp/src/Ice/LocatorInfo.cpp
+++ b/cpp/src/Ice/LocatorInfo.cpp
@@ -540,7 +540,11 @@ IceInternal::LocatorInfo::getLocatorRegistry()
// Do not make locator calls from within sync.
//
LocatorRegistryPrx locatorRegistry = _locator->getRegistry();
-
+ if(!locatorRegistry)
+ {
+ return 0;
+ }
+
{
IceUtil::Mutex::Lock sync(*this);
diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp
index 183de011e10..089f2d49c26 100644
--- a/cpp/src/IceGrid/IceGridNode.cpp
+++ b/cpp/src/IceGrid/IceGridNode.cpp
@@ -331,6 +331,8 @@ NodeService::startImpl(int argc, char* argv[], int& status)
return false;
}
+ communicator()->setDefaultLocator(_registry->getLocator());
+
//
// Set the default locator property to point to the collocated
// locator (this property is passed by the activator to each
@@ -339,16 +341,10 @@ NodeService::startImpl(int argc, char* argv[], int& status)
//
if(properties->getProperty("Ice.Default.Locator").empty())
{
- Identity locatorId;
- locatorId.category = properties->getPropertyWithDefault("IceGrid.InstanceName", "IceGrid");
- locatorId.name = "Locator";
- string endpoints = properties->getProperty("IceGrid.Registry.Client.Endpoints");
- string locPrx = "\"" + communicator()->identityToString(locatorId) + "\" :" + endpoints;
- communicator()->setDefaultLocator(Ice::LocatorPrx::uncheckedCast(communicator()->stringToProxy(locPrx)));
- properties->setProperty("Ice.Default.Locator", locPrx);
+ properties->setProperty("Ice.Default.Locator", communicator()->getDefaultLocator()->ice_toString());
}
}
- else if(properties->getProperty("Ice.Default.Locator").empty())
+ else if(!communicator()->getDefaultLocator())
{
error("property `Ice.Default.Locator' is not set");
return false;
@@ -485,9 +481,21 @@ NodeService::startImpl(int argc, char* argv[], int& status)
//
// The IceGrid instance name.
//
- const string instanceName = communicator()->getDefaultLocator()->ice_getIdentity().category;
+ string instanceName = properties->getProperty("IceGrid.InstanceName");
+ if(instanceName.empty())
+ {
+ instanceName = properties->getProperty("IceGridDiscovery.InstanceName");
+ }
+ if(instanceName.empty())
+ {
+ instanceName = communicator()->getDefaultLocator()->ice_getIdentity().category;
+ }
+ if(instanceName.empty())
+ {
+ instanceName = "IceGrid";
+ }
- _sessions.reset(new NodeSessionManager(communicator()));
+ _sessions.reset(new NodeSessionManager(communicator(), instanceName));
//
// Create the server factory. The server factory creates persistent objects
@@ -496,7 +504,7 @@ NodeService::startImpl(int argc, char* argv[], int& status)
//
Identity id = communicator()->stringToIdentity(instanceName + "/Node-" + name);
NodePrx nodeProxy = NodePrx::uncheckedCast(_adapter->createProxy(id));
- _node = new NodeI(_adapter, *_sessions, _activator, _timer, traceLevels, nodeProxy, name, mapper);
+ _node = new NodeI(_adapter, *_sessions, _activator, _timer, traceLevels, nodeProxy, name, mapper, instanceName);
_adapter->add(_node, nodeProxy->ice_getIdentity());
_adapter->addServantLocator(new DefaultServantLocator(new NodeServerAdminRouter(_node)),
@@ -516,7 +524,7 @@ NodeService::startImpl(int argc, char* argv[], int& status)
{
communicator()->getDefaultLocator()->ice_timeout(1000)->ice_ping();
}
- catch(const Ice::LocalException& ex)
+ catch(const Ice::Exception& ex)
{
Warning out(communicator()->getLogger());
out << "couldn't reach the IceGrid registry (this is expected ";
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 5e3fdacf906..97c4c077b7e 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -299,7 +299,8 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
const TraceLevelsPtr& traceLevels,
const NodePrx& proxy,
const string& name,
- const UserAccountMapperPrx& mapper) :
+ const UserAccountMapperPrx& mapper,
+ const string& instanceName) :
_communicator(adapter->getCommunicator()),
_adapter(adapter),
_sessions(sessions),
@@ -311,6 +312,7 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
_redirectErrToOut(false),
_allowEndpointsOverride(false),
_waitTime(0),
+ _instanceName(instanceName),
_userAccountMapper(mapper),
_platform("IceGrid.Node", _communicator, _traceLevels),
_fileCache(new FileCache(_communicator)),
@@ -322,7 +324,6 @@ NodeI::NodeI(const Ice::ObjectAdapterPtr& adapter,
const_cast<string&>(_dataDir) = _platform.getDataDir();
const_cast<string&>(_serversDir) = _dataDir + "/servers";
const_cast<string&>(_tmpDir) = _dataDir + "/tmp";
- const_cast<string&>(_instanceName) = _communicator->getDefaultLocator()->ice_getIdentity().category;
const_cast<Ice::Int&>(_waitTime) = props->getPropertyAsIntWithDefault("IceGrid.Node.WaitTime", 60);
const_cast<string&>(_outputDir) = props->getProperty("IceGrid.Node.Output");
const_cast<bool&>(_redirectErrToOut) = props->getPropertyAsInt("IceGrid.Node.RedirectErrToOut") > 0;
@@ -903,6 +904,12 @@ NodeI::getPropertiesOverride() const
return _propertiesOverride;
}
+const string&
+NodeI::getInstanceName() const
+{
+ return _instanceName;
+}
+
string
NodeI::getOutputDir() const
{
diff --git a/cpp/src/IceGrid/NodeI.h b/cpp/src/IceGrid/NodeI.h
index f212a71b4c6..b438af7ffef 100644
--- a/cpp/src/IceGrid/NodeI.h
+++ b/cpp/src/IceGrid/NodeI.h
@@ -63,7 +63,7 @@ public:
typedef IceUtil::Handle<Update> UpdatePtr;
NodeI(const Ice::ObjectAdapterPtr&, NodeSessionManager&, const ActivatorPtr&, const IceUtil::TimerPtr&,
- const TraceLevelsPtr&, const NodePrx&, const std::string&, const UserAccountMapperPrx&);
+ const TraceLevelsPtr&, const NodePrx&, const std::string&, const UserAccountMapperPrx&, const std::string&);
virtual ~NodeI();
virtual void loadServer_async(const AMD_Node_loadServerPtr&,
@@ -119,6 +119,7 @@ public:
FileCachePtr getFileCache() const;
NodePrx getProxy() const;
const PropertyDescriptorSeq& getPropertiesOverride() const;
+ const std::string& getInstanceName() const;
std::string getOutputDir() const;
bool getRedirectErrToOut() const;
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index 5a71048a511..5d361f5ce7e 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -16,14 +16,12 @@
using namespace std;
using namespace IceGrid;
-NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry,
+NodeSessionKeepAliveThread::NodeSessionKeepAliveThread(const InternalRegistryPrx& registry,
const NodeIPtr& node,
- const vector<QueryPrx>& queryObjects) :
- SessionKeepAliveThread<NodeSessionPrx>(registry, node->getTraceLevels()->logger),
- _node(node),
- _queryObjects(queryObjects)
+ NodeSessionManager& manager) :
+ SessionKeepAliveThread<NodeSessionPrx>(registry, node->getTraceLevels()->logger), _node(node), _manager(manager)
{
- assert(registry && node && !_queryObjects.empty());
+ assert(registry && node);
string name = registry->ice_getIdentity().name;
const string prefix("InternalRegistry-");
string::size_type pos = name.find(prefix);
@@ -66,7 +64,8 @@ NodeSessionKeepAliveThread::createSession(InternalRegistryPrx& registry, IceUtil
if(!session)
{
vector<Ice::AsyncResultPtr> results;
- for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q)
+ vector<QueryPrx> queryObjects = _manager.getQueryObjects();
+ for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
{
results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity()));
}
@@ -227,8 +226,8 @@ NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session)
}
}
-NodeSessionManager::NodeSessionManager(const Ice::CommunicatorPtr& communicator) :
- SessionManager(communicator),
+NodeSessionManager::NodeSessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) :
+ SessionManager(communicator, instanceName),
_destroyed(false),
_activated(false)
{
@@ -251,7 +250,8 @@ NodeSessionManager::create(const NodeIPtr& node)
// with replicas (see createdSession below) and this must be done
// before the node is activated.
//
- _thread->tryCreateSession(true, IceUtil::Time::seconds(3));
+ _thread->tryCreateSession();
+ _thread->waitTryCreateSession(IceUtil::Time::seconds(3));
}
void
@@ -273,6 +273,7 @@ NodeSessionManager::create(const InternalRegistryPrx& replica)
if(thread)
{
thread->tryCreateSession();
+ thread->waitTryCreateSession();
}
}
@@ -370,7 +371,7 @@ NodeSessionManager::replicaInit(const InternalRegistryPrxSeq& replicas)
for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
_replicas.insert((*p)->ice_getIdentity());
- addReplicaSession(*p)->tryCreateSession(false);
+ addReplicaSession(*p)->tryCreateSession();
}
}
@@ -383,7 +384,7 @@ NodeSessionManager::replicaAdded(const InternalRegistryPrx& replica)
return;
}
_replicas.insert(replica->ice_getIdentity());
- addReplicaSession(replica)->tryCreateSession(false);
+ addReplicaSession(replica)->tryCreateSession();
}
void
@@ -408,7 +409,6 @@ NodeSessionKeepAliveThreadPtr
NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica)
{
assert(!_destroyed);
-
NodeSessionMap::const_iterator p = _sessions.find(replica->ice_getIdentity());
NodeSessionKeepAliveThreadPtr thread;
if(p != _sessions.end())
@@ -418,7 +418,7 @@ NodeSessionManager::addReplicaSession(const InternalRegistryPrx& replica)
}
else
{
- thread = new NodeSessionKeepAliveThread(replica, _node, _queryObjects);
+ thread = new NodeSessionKeepAliveThread(replica, _node, *this);
_sessions.insert(make_pair(replica->ice_getIdentity(), thread));
thread->start();
}
@@ -535,7 +535,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
{
vector<Ice::AsyncResultPtr> results1;
vector<Ice::AsyncResultPtr> results2;
- vector<QueryPrx> queryObjects = findAllQueryObjects();
+ vector<QueryPrx> queryObjects = findAllQueryObjects(false);
//
// Below we try to retrieve internal registry proxies either
@@ -576,7 +576,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
}
catch(const Ice::LocalException&)
{
- // IGNORE
+ // Ignore.
}
}
for(vector<Ice::AsyncResultPtr>::const_iterator p = results2.begin(); p != results2.end(); ++p)
@@ -609,7 +609,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
}
catch(const Ice::LocalException&)
{
- // IGNORE
+ // Ignore
}
}
@@ -639,9 +639,17 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
if((*p)->ice_getIdentity() != _master->ice_getIdentity())
{
_replicas.insert((*p)->ice_getIdentity());
- NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p);
- session->tryCreateSession(false);
- sessions.push_back(session);
+
+ if(_sessions.find((*p)->ice_getIdentity()) == _sessions.end())
+ {
+ NodeSessionKeepAliveThreadPtr session = addReplicaSession(*p);
+ session->tryCreateSession();
+ sessions.push_back(session);
+ }
+ else
+ {
+ addReplicaSession(*p); // Update the session
+ }
}
}
}
@@ -664,7 +672,7 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
{
break;
}
- (*p)->tryCreateSession(true, timeout);
+ (*p)->waitTryCreateSession(timeout);
}
}
diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h
index 7abe10554cf..bd8552ddb8e 100644
--- a/cpp/src/IceGrid/NodeSessionManager.h
+++ b/cpp/src/IceGrid/NodeSessionManager.h
@@ -25,11 +25,13 @@ namespace IceGrid
class NodeI;
typedef IceUtil::Handle<NodeI> NodeIPtr;
+class NodeSessionManager;
+
class NodeSessionKeepAliveThread : public SessionKeepAliveThread<NodeSessionPrx>
{
public:
- NodeSessionKeepAliveThread(const InternalRegistryPrx&, const NodeIPtr&, const std::vector<QueryPrx>&);
+ NodeSessionKeepAliveThread(const InternalRegistryPrx&, const NodeIPtr&, NodeSessionManager&);
virtual NodeSessionPrx createSession(InternalRegistryPrx&, IceUtil::Time&);
virtual void destroySession(const NodeSessionPrx&);
@@ -43,7 +45,7 @@ protected:
const NodeIPtr _node;
const std::string _name;
- const std::vector<QueryPrx> _queryObjects;
+ NodeSessionManager& _manager;
};
typedef IceUtil::Handle<NodeSessionKeepAliveThread> NodeSessionKeepAliveThreadPtr;
@@ -51,7 +53,7 @@ class NodeSessionManager : public SessionManager
{
public:
- NodeSessionManager(const Ice::CommunicatorPtr&);
+ NodeSessionManager(const Ice::CommunicatorPtr&, const std::string&);
void create(const NodeIPtr&);
void create(const InternalRegistryPrx&);
@@ -65,6 +67,7 @@ public:
void replicaRemoved(const InternalRegistryPrx&);
NodeSessionPrx getMasterNodeSession() const { return _thread->getSession(); }
+ std::vector<IceGrid::QueryPrx> getQueryObjects() { return findAllQueryObjects(true); }
private:
@@ -84,9 +87,7 @@ private:
{
public:
- Thread(NodeSessionManager& manager) :
- NodeSessionKeepAliveThread(manager._master, manager._node, manager._queryObjects),
- _manager(manager)
+ Thread(NodeSessionManager& manager) : NodeSessionKeepAliveThread(manager._master, manager._node, manager)
{
}
@@ -113,10 +114,6 @@ private:
_manager.reapReplicas();
return alive;
}
-
- private:
-
- NodeSessionManager& _manager;
};
typedef IceUtil::Handle<Thread> ThreadPtr;
friend class Thread;
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index 8ab2ed0bd92..071942cd8f5 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -205,7 +205,6 @@ RegistryI::RegistryI(const CommunicatorPtr& communicator,
_nowarn(nowarn),
_readonly(readonly),
_initFromReplica(initFromReplica),
- _session(communicator),
_platform("IceGrid.Registry", communicator, traceLevels)
{
}
@@ -337,36 +336,38 @@ RegistryI::startImpl()
properties->setProperty("Ice.ACM.Server.Close", "3"); // Close on invocation and idle.
- if(!_master && properties->getProperty("Ice.Default.Locator").empty() &&
- properties->getProperty("Ice.Default.Locator").empty())
+ if(!_master && !_communicator->getDefaultLocator())
{
Error out(_communicator->getLogger());
out << "property `Ice.Default.Locator' is not set";
return false;
}
+ else if(_master)
+ {
+ _communicator->setDefaultLocator(0); // Clear the default locator in case it's set.
+ }
//
// Get the instance name
//
- if(_master)
+ _instanceName = properties->getProperty("IceGrid.InstanceName");
+ if(_instanceName.empty())
{
- _instanceName = properties->getProperty("IceGrid.InstanceName");
- if(_instanceName.empty())
- {
- if(_communicator->getDefaultLocator())
- {
- _instanceName = _communicator->getDefaultLocator()->ice_getIdentity().category;
- }
- else
- {
- _instanceName = "IceGrid";
- }
- }
+ _instanceName = properties->getProperty("IceGridDiscovery.InstanceName");
}
- else
+ if(_instanceName.empty() && _communicator->getDefaultLocator())
{
_instanceName = _communicator->getDefaultLocator()->ice_getIdentity().category;
}
+ if(_instanceName.empty())
+ {
+ _instanceName = "IceGrid";
+ }
+
+ //
+ // Create the replica session manager
+ //
+ _session.reset(new ReplicaSessionManager(_communicator, _instanceName));
//
// Create the registry database.
@@ -485,7 +486,7 @@ RegistryI::startImpl()
if(!proxy)
{
id.name = "InternalRegistry-" + (_initFromReplica.empty() ? "Master" : _initFromReplica);
- proxy = _session.findInternalRegistryForReplica(id);
+ proxy = _session->findInternalRegistryForReplica(id);
}
if(!proxy)
@@ -550,8 +551,8 @@ RegistryI::startImpl()
else
{
InternalReplicaInfoPtr info = _platform.getInternalReplicaInfo();
- _session.create(_replicaName, info, _database, _wellKnownObjects, internalRegistry);
- registerNodes(internalRegistry, _session.getNodes(nodes));
+ _session->create(_replicaName, info, _database, _wellKnownObjects, internalRegistry);
+ registerNodes(internalRegistry, _session->getNodes(nodes));
}
_serverAdapter = _communicator->createObjectAdapter("IceGrid.Registry.Server");
@@ -600,7 +601,7 @@ RegistryI::startImpl()
}
else
{
- _session.registerAllWellKnownObjects();
+ _session->registerAllWellKnownObjects();
}
//
@@ -685,7 +686,7 @@ RegistryI::setupLocatorRegistry()
Identity locatorRegId;
locatorRegId.category = _instanceName;
locatorRegId.name = "LocatorRegistry-" + _replicaName;
- ObjectPrx obj = _serverAdapter->add(new LocatorRegistryI(_database, dynReg, _master, _session), locatorRegId);
+ ObjectPrx obj = _serverAdapter->add(new LocatorRegistryI(_database, dynReg, _master, *_session), locatorRegId);
return LocatorRegistryPrx::uncheckedCast(obj);
}
@@ -738,7 +739,7 @@ RegistryI::setupInternalRegistry()
internalRegistryId.category = _instanceName;
internalRegistryId.name = "InternalRegistry-" + _replicaName;
assert(_reaper);
- ObjectPtr internalRegistry = new InternalRegistryI(this, _database, _reaper, _wellKnownObjects, _session);
+ ObjectPtr internalRegistry = new InternalRegistryI(this, _database, _reaper, _wellKnownObjects, *_session);
Ice::ObjectPrx proxy = _registryAdapter->add(internalRegistry, internalRegistryId);
_wellKnownObjects->add(proxy, InternalRegistry::ice_staticId());
@@ -903,8 +904,11 @@ RegistryI::setupAdminSessionFactory(const Ice::ObjectPtr& router, const IceGrid:
void
RegistryI::stop()
{
- _session.destroy();
-
+ if(_session.get())
+ {
+ _session->destroy();
+ }
+
//
// We destroy the topics before to shutdown the communicator to
// ensure that there will be no more invocations on IceStorm once
@@ -1233,6 +1237,12 @@ RegistryI::createAdminCallbackProxy(const Identity& id) const
return _serverAdapter->createProxy(id);
}
+Ice::LocatorPrx
+RegistryI::getLocator()
+{
+ return _wellKnownObjects->getLocator();
+}
+
Glacier2::PermissionsVerifierPrx
RegistryI::getPermissionsVerifier(const IceGrid::LocatorPrx& locator,
const string& verifierProperty,
diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h
index 75da1fa1259..e4a303e533b 100644
--- a/cpp/src/IceGrid/RegistryI.h
+++ b/cpp/src/IceGrid/RegistryI.h
@@ -79,6 +79,8 @@ public:
const Ice::ObjectAdapterPtr& getRegistryAdapter() { return _registryAdapter; }
+ Ice::LocatorPrx getLocator();
+
private:
Ice::LocatorRegistryPrx setupLocatorRegistry();
@@ -117,7 +119,7 @@ private:
IceUtil::TimerPtr _timer;
SessionServantManagerPtr _servantManager;
int _sessionTimeout;
- ReplicaSessionManager _session;
+ IceUtil::UniquePtr<ReplicaSessionManager> _session;
mutable PlatformInfo _platform;
Glacier2::PermissionsVerifierPrx _nullPermissionsVerifier;
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index b053b21a988..fbce90c0816 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -282,7 +282,8 @@ private:
};
-ReplicaSessionManager::ReplicaSessionManager(const Ice::CommunicatorPtr& communicator) : SessionManager(communicator)
+ReplicaSessionManager::ReplicaSessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) :
+ SessionManager(communicator, instanceName)
{
}
@@ -309,6 +310,7 @@ ReplicaSessionManager::create(const string& name,
}
_thread->tryCreateSession();
+ _thread->waitTryCreateSession();
}
void
@@ -330,6 +332,7 @@ ReplicaSessionManager::create(const InternalRegistryPrx& replica)
_thread->setRegistry(replica);
_thread->tryCreateSession();
+ _thread->waitTryCreateSession();
}
NodePrxSeq
@@ -412,7 +415,7 @@ IceGrid::InternalRegistryPrx
ReplicaSessionManager::findInternalRegistryForReplica(const Ice::Identity& id)
{
vector<Ice::AsyncResultPtr> results;
- vector<QueryPrx> queryObjects = findAllQueryObjects();
+ vector<QueryPrx> queryObjects = findAllQueryObjects(true);
for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
{
results.push_back((*q)->begin_findObjectById(id));
@@ -489,7 +492,7 @@ ReplicaSessionManager::createSession(InternalRegistryPrx& registry, IceUtil::Tim
if(!session)
{
vector<Ice::AsyncResultPtr> results;
- vector<QueryPrx> queryObjects = findAllQueryObjects();
+ vector<QueryPrx> queryObjects = findAllQueryObjects(false);
for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
{
results.push_back((*q)->begin_findObjectById(registry->ice_getIdentity()));
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h
index 8e5c7308dce..31a9de77ae2 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.h
+++ b/cpp/src/IceGrid/ReplicaSessionManager.h
@@ -69,7 +69,7 @@ public:
};
typedef IceUtil::Handle<Thread> ThreadPtr;
- ReplicaSessionManager(const Ice::CommunicatorPtr&);
+ ReplicaSessionManager(const Ice::CommunicatorPtr&, const std::string&);
void create(const std::string&, const InternalReplicaInfoPtr&, const DatabasePtr&,
const WellKnownObjectsManagerPtr&, const InternalRegistryPrx&);
void create(const InternalRegistryPrx&);
diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp
index 33df2ed49e1..0234020b553 100644
--- a/cpp/src/IceGrid/ServerI.cpp
+++ b/cpp/src/IceGrid/ServerI.cpp
@@ -3174,9 +3174,20 @@ ServerI::getProperties(const InternalServerDescriptorPtr& desc)
{
if(getProperty(p->second, "Ice.Default.Locator").empty())
{
- p->second.push_back(
- createProperty("Ice.Default.Locator",
- _node->getCommunicator()->getProperties()->getProperty("Ice.Default.Locator")));
+ Ice::PropertiesPtr properties = _node->getCommunicator()->getProperties();
+
+ string locator = properties->getProperty("Ice.Default.Locator");
+ if(!locator.empty())
+ {
+ p->second.push_back(createProperty("Ice.Default.Locator", locator));
+ }
+
+ string discoveryPlugin = properties->getProperty("Ice.Plugin.IceGridDiscovery");
+ if(!discoveryPlugin.empty())
+ {
+ p->second.push_back(createProperty("Ice.Plugin.IceGridDiscovery", discoveryPlugin));
+ p->second.push_back(createProperty("IceGridDiscovery.InstanceName", _node->getInstanceName()));
+ }
}
if(!overrides.empty())
diff --git a/cpp/src/IceGrid/SessionManager.cpp b/cpp/src/IceGrid/SessionManager.cpp
index 826195ed4ac..dcc4d500c2f 100644
--- a/cpp/src/IceGrid/SessionManager.cpp
+++ b/cpp/src/IceGrid/SessionManager.cpp
@@ -13,26 +13,14 @@
using namespace std;
using namespace IceGrid;
-SessionManager::SessionManager(const Ice::CommunicatorPtr& communicator) : _communicator(communicator)
+SessionManager::SessionManager(const Ice::CommunicatorPtr& communicator, const string& instanceName) :
+ _communicator(communicator), _instanceName(instanceName)
{
- if(communicator->getDefaultLocator())
+ Ice::LocatorPrx prx = communicator->getDefaultLocator();
+ if(prx)
{
- Ice::ObjectPrx prx = communicator->getDefaultLocator();
-
- //
- // Derive the query objects from the locator proxy endpoints.
- //
- Ice::EndpointSeq endpoints = prx->ice_getEndpoints();
- Ice::Identity id = prx->ice_getIdentity();
- id.name = "Query";
- QueryPrx query = QueryPrx::uncheckedCast(prx->ice_identity(id));
- for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
- {
- Ice::EndpointSeq singleEndpoint;
- singleEndpoint.push_back(*p);
- _queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
- }
-
+ Ice::Identity id;
+ id.category = instanceName;
id.name = "InternalRegistry-Master";
_master = InternalRegistryPrx::uncheckedCast(prx->ice_identity(id)->ice_endpoints(Ice::EndpointSeq()));
}
@@ -43,20 +31,68 @@ SessionManager::~SessionManager()
}
vector<QueryPrx>
-SessionManager::findAllQueryObjects()
+SessionManager::findAllQueryObjects(bool cached)
{
- vector<QueryPrx> queryObjects = _queryObjects;
- for(vector<QueryPrx>::const_iterator q = _queryObjects.begin(); q != _queryObjects.end(); ++q)
+ vector<QueryPrx> queryObjects;
{
- Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection();
- if(connection)
+ Lock sync(*this);
+ if(cached && !_queryObjects.empty())
{
- try
+ return _queryObjects;
+ }
+ queryObjects = _queryObjects;
+ }
+
+ if(!cached)
+ {
+ for(vector<QueryPrx>::const_iterator q = queryObjects.begin(); q != queryObjects.end(); ++q)
+ {
+ Ice::ConnectionPtr connection = (*q)->ice_getCachedConnection();
+ if(connection)
+ {
+ try
+ {
+ connection->close(false);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
+ }
+ }
+ queryObjects.clear();
+ }
+
+ if(queryObjects.empty())
+ {
+ Ice::LocatorPrx locator = _communicator->getDefaultLocator();
+ if(locator)
+ {
+ Ice::Identity id;
+ id.category = _instanceName;
+ id.name = "Query";
+ QueryPrx query = QueryPrx::uncheckedCast(locator->ice_identity(id));
+ Ice::EndpointSeq endpoints = query->ice_getEndpoints();
+ if(endpoints.empty())
{
- connection->close(false);
+ try
+ {
+ Ice::ObjectPrx r = locator->findObjectById(id);
+ if(r)
+ {
+ endpoints = r->ice_getEndpoints();
+ }
+ }
+ catch(const Ice::Exception&)
+ {
+ // Ignore.
+ }
}
- catch(const Ice::LocalException&)
+
+ for(Ice::EndpointSeq::const_iterator p = endpoints.begin(); p != endpoints.end(); ++p)
{
+ Ice::EndpointSeq singleEndpoint;
+ singleEndpoint.push_back(*p);
+ queryObjects.push_back(QueryPrx::uncheckedCast(query->ice_endpoints(singleEndpoint)));
}
}
}
@@ -103,5 +139,8 @@ SessionManager::findAllQueryObjects()
}
}
while(proxies.size() != previousSize);
- return queryObjects;
+
+ Lock sync(*this);
+ _queryObjects.swap(queryObjects);
+ return _queryObjects;
}
diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h
index e1d3d6e90de..2ff28178032 100644
--- a/cpp/src/IceGrid/SessionManager.h
+++ b/cpp/src/IceGrid/SessionManager.h
@@ -192,8 +192,8 @@ public:
return _state != Destroyed;
}
- virtual void
- tryCreateSession(bool waitForTry = true, const IceUtil::Time& timeout = IceUtil::Time())
+ void
+ tryCreateSession()
{
{
Lock sync(*this);
@@ -212,23 +212,24 @@ public:
}
notifyAll();
}
+ }
- if(waitForTry)
+ void
+ waitTryCreateSession(const IceUtil::Time& timeout = IceUtil::Time())
+ {
+ Lock sync(*this);
+ // Wait until the action is executed and the state changes.
+ while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress)
{
- Lock sync(*this);
- // Wait until the action is executed and the state changes.
- while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress)
+ if(timeout == IceUtil::Time())
{
- if(timeout == IceUtil::Time())
- {
- wait();
- }
- else
+ wait();
+ }
+ else
+ {
+ if(!timedWait(timeout))
{
- if(!timedWait(timeout))
- {
- break;
- }
+ break;
}
}
}
@@ -320,16 +321,17 @@ class SessionManager : public IceUtil::Monitor<IceUtil::Mutex>
{
public:
- SessionManager(const Ice::CommunicatorPtr&);
+ SessionManager(const Ice::CommunicatorPtr&, const std::string&);
virtual ~SessionManager();
virtual bool isDestroyed() = 0;
protected:
- std::vector<IceGrid::QueryPrx> findAllQueryObjects();
+ std::vector<IceGrid::QueryPrx> findAllQueryObjects(bool);
Ice::CommunicatorPtr _communicator;
+ std::string _instanceName;
InternalRegistryPrx _master;
std::vector<IceGrid::QueryPrx> _queryObjects;
};
diff --git a/cpp/src/IceGridLib/DiscoveryPluginI.cpp b/cpp/src/IceGridLib/DiscoveryPluginI.cpp
index 02b9327da37..521bda66f22 100644
--- a/cpp/src/IceGridLib/DiscoveryPluginI.cpp
+++ b/cpp/src/IceGridLib/DiscoveryPluginI.cpp
@@ -29,34 +29,47 @@ class Request : public IceUtil::Shared
{
public:
- Request(LocatorI* locator) : _locator(locator)
+ Request(LocatorI* locator,
+ const string& operation,
+ ::Ice::OperationMode mode,
+ const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
+ const ::Ice::Context& ctx,
+ const Ice::AMD_Object_ice_invokePtr& amdCB) :
+ _locator(locator),
+ _operation(operation),
+ _mode(mode),
+ _context(ctx),
+ _inParams(inParams.first, inParams.second),
+ _amdCB(amdCB)
{
}
- virtual void invoke(const Ice::LocatorPrx&) = 0;
- virtual void response(const Ice::ObjectPrx&) = 0;
+ void invoke(const Ice::LocatorPrx&);
+ void response(const bool, const pair<const Ice::Byte*, const Ice::Byte*>&);
+ void exception(const Ice::Exception&);
protected:
LocatorI* _locator;
+ const string _operation;
+ const Ice::OperationMode _mode;
+ const Ice::Context _context;
+ const Ice::ByteSeq _inParams;
+ const Ice::AMD_Object_ice_invokePtr _amdCB;
+
Ice::LocatorPrx _locatorPrx;
};
typedef IceUtil::Handle<Request> RequestPtr;
-class LocatorI : public Ice::Locator, private IceUtil::TimerTask, private IceUtil::Monitor<IceUtil::Mutex>
+class LocatorI : public Ice::BlobjectArrayAsync, private IceUtil::TimerTask, private IceUtil::Monitor<IceUtil::Mutex>
{
public:
- LocatorI(const LookupPrx&, const Ice::PropertiesPtr&);
+ LocatorI(const LookupPrx&, const Ice::PropertiesPtr&, const string&, const LocatorPrx&);
void setLookupReply(const LookupReplyPrx&);
- virtual void findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr&, const Ice::Identity&,
- const Ice::Current&) const;
-
- virtual void findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr&, const string&,
- const Ice::Current&) const;
-
- virtual Ice::LocatorRegistryPrx getRegistry(const Ice::Current&) const;
+ virtual void ice_invoke_async(const Ice::AMD_Object_ice_invokePtr&, const pair<const Ice::Byte*, const Ice::Byte*>&,
+ const Ice::Current&);
void foundLocator(const LocatorPrx&);
void invoke(const Ice::LocatorPrx&, const RequestPtr&);
@@ -64,18 +77,20 @@ public:
private:
virtual void runTimerTask();
- void queueRequest(const RequestPtr&);
const LookupPrx _lookup;
const IceUtil::Time _timeout;
const int _retryCount;
+ const IceUtil::Time _retryDelay;
const IceUtil::TimerPtr _timer;
string _instanceName;
bool _warned;
LookupReplyPrx _lookupReply;
Ice::LocatorPrx _locator;
+ IceGrid::LocatorPrx _voidLocator;
+ IceUtil::Time _nextRetry;
int _pendingRetryCount;
vector<RequestPtr> _pendingRequests;
};
@@ -96,52 +111,46 @@ private:
const LocatorIPtr _locator;
};
-class ObjectRequest : public Request
+//
+// The void locator implementation below is used when no locator is found.
+//
+class VoidLocatorI : public IceGrid::Locator
{
public:
-
- ObjectRequest(LocatorI* locator, const Ice::Identity& id, const Ice::AMD_Locator_findObjectByIdPtr& amdCB) :
- Request(locator), _id(id), _amdCB(amdCB)
+
+ virtual void
+ findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& amdCB,
+ const Ice::Identity&,
+ const Ice::Current&) const
{
+ amdCB->ice_response(0);
}
- virtual void invoke(const Ice::LocatorPrx&);
- virtual void response(const Ice::ObjectPrx&);
-
- void
- exception(const Ice::Exception&)
+ virtual void
+ findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
+ const string&,
+ const Ice::Current&) const
{
- _locator->invoke(_locatorPrx, this); // Retry with new locator proxy
+ amdCB->ice_response(0);
}
-private:
-
- const Ice::Identity _id;
- Ice::AMD_Locator_findObjectByIdPtr _amdCB;
-};
-
-class AdapterRequest : public Request
-{
-public:
-
- AdapterRequest(LocatorI* locator, const string& adapterId, const Ice::AMD_Locator_findAdapterByIdPtr& amdCB) :
- Request(locator), _adapterId(adapterId), _amdCB(amdCB)
+ virtual Ice::LocatorRegistryPrx
+ getRegistry(const Ice::Current&) const
{
+ return 0;
}
- virtual void invoke(const Ice::LocatorPrx&);
- virtual void response(const Ice::ObjectPrx&);
-
- void
- exception(const Ice::Exception&)
+ virtual IceGrid::RegistryPrx
+ getLocalRegistry(const Ice::Current&) const
{
- _locator->invoke(_locatorPrx, this); // Retry with new locator proxy.
+ return 0;
}
-private:
-
- const string _adapterId;
- const Ice::AMD_Locator_findAdapterByIdPtr _amdCB;
+ virtual IceGrid::QueryPrx
+ getLocalQuery(const Ice::Current&) const
+ {
+ return 0;
+ }
};
}
@@ -231,8 +240,14 @@ DiscoveryPluginI::initialize()
throw Ice::PluginInitializationException(__FILE__, __LINE__, os.str());
}
- LocatorIPtr locator = new LocatorI(LookupPrx::uncheckedCast(lookupPrx), properties);
- _communicator->setDefaultLocator(Ice::LocatorPrx::uncheckedCast(_locatorAdapter->addWithUUID(locator)));
+ LocatorPrx voidLocator = LocatorPrx::uncheckedCast(_locatorAdapter->addWithUUID(new VoidLocatorI()));
+
+ string instanceName = properties->getProperty("IceGridDiscovery.InstanceName");
+ Ice::Identity id;
+ id.name = "Locator";
+ id.category = !instanceName.empty() ? instanceName : IceUtil::generateUUID();
+ LocatorIPtr locator = new LocatorI(LookupPrx::uncheckedCast(lookupPrx), properties, instanceName, voidLocator);
+ _communicator->setDefaultLocator(Ice::LocatorPrx::uncheckedCast(_locatorAdapter->add(locator, id)));
Ice::ObjectPrx lookupReply = _replyAdapter->addWithUUID(new LookupReplyI(locator))->ice_datagram();
locator->setLookupReply(LookupReplyPrx::uncheckedCast(lookupReply));
@@ -249,43 +264,38 @@ DiscoveryPluginI::destroy()
}
void
-AdapterRequest::invoke(const Ice::LocatorPrx& l)
+Request::invoke(const Ice::LocatorPrx& l)
{
_locatorPrx = l;
- l->begin_findAdapterById(_adapterId, Ice::newCallback_Locator_findAdapterById(this,
- &AdapterRequest::response,
- &AdapterRequest::exception));
-}
-
-void
-AdapterRequest::response(const Ice::ObjectPrx& prx)
-{
- _amdCB->ice_response(prx);
+ l->begin_ice_invoke(_operation, _mode, _inParams, _context,
+ Ice::newCallback_Object_ice_invoke(this, &Request::response, &Request::exception));
}
void
-ObjectRequest::invoke(const Ice::LocatorPrx& l)
+Request::response(bool ok, const pair<const Ice::Byte*, const Ice::Byte*>& outParams)
{
- _locatorPrx = l;
- l->begin_findObjectById(_id, Ice::newCallback_Locator_findObjectById(this,
- &ObjectRequest::response,
- &ObjectRequest::exception));
+ _amdCB->ice_response(ok, outParams);
}
void
-ObjectRequest::response(const Ice::ObjectPrx& prx)
+Request::exception(const Ice::Exception& ex)
{
- _amdCB->ice_response(prx);
+ _locator->invoke(_locatorPrx, this); // Retry with new locator proxy
}
-LocatorI::LocatorI(const LookupPrx& lookup, const Ice::PropertiesPtr& properties) :
+LocatorI::LocatorI(const LookupPrx& lookup,
+ const Ice::PropertiesPtr& props,
+ const string& instanceName,
+ const IceGrid::LocatorPrx& voidLocator) :
_lookup(lookup),
- _timeout(IceUtil::Time::milliSeconds(properties->getPropertyAsIntWithDefault("IceGridDiscovery.Timeout", 300))),
- _retryCount(properties->getPropertyAsIntWithDefault("IceGridDiscovery.RetryCount", 3)),
+ _timeout(IceUtil::Time::milliSeconds(props->getPropertyAsIntWithDefault("IceGridDiscovery.Timeout", 300))),
+ _retryCount(props->getPropertyAsIntWithDefault("IceGridDiscovery.RetryCount", 3)),
+ _retryDelay(IceUtil::Time::milliSeconds(props->getPropertyAsIntWithDefault("IceGridDiscovery.RetryDelay", 2000))),
_timer(IceInternal::getInstanceTimer(lookup->ice_getCommunicator())),
- _instanceName(properties->getProperty("IceGridDiscovery.InstanceName")),
+ _instanceName(instanceName),
_warned(false),
_locator(lookup->ice_getCommunicator()->getDefaultLocator()),
+ _voidLocator(voidLocator),
_pendingRetryCount(0)
{
}
@@ -297,45 +307,18 @@ LocatorI::setLookupReply(const LookupReplyPrx& lookupReply)
}
void
-LocatorI::findObjectById_async(const Ice::AMD_Locator_findObjectByIdPtr& amdCB,
- const Ice::Identity& id,
- const Ice::Current&) const
+LocatorI::ice_invoke_async(const Ice::AMD_Object_ice_invokePtr& amdCB,
+ const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
+ const Ice::Current& current)
{
- const_cast<LocatorI*>(this)->invoke(0, new ObjectRequest(const_cast<LocatorI*>(this), id, amdCB));
-}
-
-void
-LocatorI::findAdapterById_async(const Ice::AMD_Locator_findAdapterByIdPtr& amdCB,
- const string& adapterId,
- const Ice::Current&) const
-{
- const_cast<LocatorI*>(this)->invoke(0, new AdapterRequest(const_cast<LocatorI*>(this), adapterId, amdCB));
-}
-
-Ice::LocatorRegistryPrx
-LocatorI::getRegistry(const Ice::Current&) const
-{
- Ice::LocatorPrx locator;
- {
- Lock sync(*this);
- if(!_locator)
- {
- const_cast<LocatorI*>(this)->queueRequest(0); // Search for locator if not already doing so.
- while(_pendingRetryCount > 0)
- {
- wait();
- }
- }
- locator = _locator;
- }
- return locator ? locator->getRegistry() : Ice::LocatorRegistryPrx();
+ invoke(0, new Request(this, current.operation, current.mode, inParams, current.ctx, amdCB));
}
void
LocatorI::foundLocator(const LocatorPrx& locator)
{
Lock sync(*this);
- if(!locator)
+ if(!locator || (!_instanceName.empty() && locator->ice_getIdentity().category != _instanceName))
{
return;
}
@@ -401,7 +384,7 @@ LocatorI::foundLocator(const LocatorPrx& locator)
_locator = locator;
if(_instanceName.empty())
{
- _instanceName = _locator->ice_getIdentity().category;
+ _instanceName = _locator->ice_getIdentity().category; // Stick to the first discovered locator.
}
}
@@ -413,7 +396,6 @@ LocatorI::foundLocator(const LocatorPrx& locator)
(*p)->invoke(_locator);
}
_pendingRequests.clear();
- notifyAll();
}
void
@@ -424,10 +406,22 @@ LocatorI::invoke(const Ice::LocatorPrx& locator, const RequestPtr& request)
{
request->invoke(_locator);
}
+ else if(IceUtil::Time::now() < _nextRetry)
+ {
+ request->invoke(_voidLocator); // Don't retry to find a locator before the retry delay expires
+ }
else
{
_locator = 0;
- queueRequest(request);
+
+ _pendingRequests.push_back(request);
+
+ if(_pendingRetryCount == 0) // No request in progress
+ {
+ _pendingRetryCount = _retryCount;
+ _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
+ _timer->schedule(this, _timeout);
+ }
}
}
@@ -445,26 +439,10 @@ LocatorI::runTimerTask()
assert(!_pendingRequests.empty());
for(vector<RequestPtr>::const_iterator p = _pendingRequests.begin(); p != _pendingRequests.end(); ++p)
{
- (*p)->response(0);
+ (*p)->invoke(_voidLocator); // Send pending requests on void locator.
}
_pendingRequests.clear();
- notifyAll();
- }
-}
-
-void
-LocatorI::queueRequest(const RequestPtr& request)
-{
- if(request)
- {
- _pendingRequests.push_back(request);
- }
-
- if(_pendingRetryCount == 0) // No request in progress
- {
- _pendingRetryCount = _retryCount;
- _lookup->begin_findLocator(_instanceName, _lookupReply); // Send multicast request.
- _timer->schedule(this, _timeout);
+ _nextRetry = IceUtil::Time::now() + _retryDelay; // Only retry when the retry delay expires
}
}