summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/NodeCache.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-11-23 14:44:51 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-11-23 14:44:51 +0000
commitad476fdda5c9a9f23b9e65fc3c4b3016f7704848 (patch)
tree329b46efefd33095fe3ed8dc92ef3d8726cd0857 /cpp/src/IceGrid/NodeCache.cpp
parentRemove deprecated methods (diff)
downloadice-ad476fdda5c9a9f23b9e65fc3c4b3016f7704848.tar.bz2
ice-ad476fdda5c9a9f23b9e65fc3c4b3016f7704848.tar.xz
ice-ad476fdda5c9a9f23b9e65fc3c4b3016f7704848.zip
Code cleanup and fixed startup scalability issue of the registry (it no
longer contacts all the nodes).
Diffstat (limited to 'cpp/src/IceGrid/NodeCache.cpp')
-rw-r--r--cpp/src/IceGrid/NodeCache.cpp238
1 files changed, 178 insertions, 60 deletions
diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp
index 21619cc5be7..a43e3335798 100644
--- a/cpp/src/IceGrid/NodeCache.cpp
+++ b/cpp/src/IceGrid/NodeCache.cpp
@@ -66,17 +66,6 @@ public:
{
ex.ice_throw();
}
- catch(const NodeNotExistException& ex)
- {
- if(_traceLevels && _traceLevels->server > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
- out << "couldn't load `" << _id << "' on node `" << _node << "':\n" << ex;
- }
- ostringstream os;
- os << ex;
- _server->exception(NodeUnreachableException(_node, os.str()));
- }
catch(const DeploymentException& ex)
{
if(_traceLevels && _traceLevels->server > 1)
@@ -139,17 +128,6 @@ public:
{
ex.ice_throw();
}
- catch(const NodeNotExistException& ex)
- {
- if(_traceLevels && _traceLevels->server > 1)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat);
- out << "couldn't unload `" << _id << "' on node `" << _node << "':\n" << ex;
- }
- ostringstream os;
- os << ex;
- _server->exception(NodeUnreachableException(_node, os.str()));
- }
catch(const DeploymentException& ex)
{
if(_traceLevels && _traceLevels->server > 1)
@@ -183,6 +161,30 @@ private:
const string _node;
};
+class RegisterCB : public AMI_Node_registerWithReplica
+{
+public:
+
+ RegisterCB(const NodeEntryPtr& node) : _node(node)
+ {
+ }
+
+ void
+ ice_response()
+ {
+ _node->finishedRegistration();
+ }
+
+ void
+ ice_exception(const Ice::Exception& ex)
+ {
+ _node->finishedRegistration(ex);
+ }
+
+private:
+ const NodeEntryPtr _node;
+};
+
};
NodeCache::NodeCache(const Ice::CommunicatorPtr& communicator, ReplicaCache& replicaCache, bool master) :
@@ -215,7 +217,8 @@ NodeCache::get(const string& name, bool create) const
NodeEntry::NodeEntry(NodeCache& cache, const std::string& name) :
_cache(cache),
_ref(0),
- _name(name)
+ _name(name),
+ _registering(false)
{
}
@@ -256,19 +259,21 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
{
Lock sync(*this);
- if(session)
+ if(session && _session)
{
- // If the current session has just been destroyed, wait for the setSession(0) call.
- assert(session != _session);
- while(_session && _session->isDestroyed())
+ if(_session->isDestroyed())
{
- wait();
+ // If the current session has just been destroyed, wait for the setSession(0) call.
+ assert(session != _session);
+ while(_session)
+ {
+ wait();
+ }
+ }
+ else
+ {
+ throw NodeActiveException();
}
- }
-
- if(session && _session)
- {
- throw NodeActiveException();
}
else if(!session && !_session)
{
@@ -287,7 +292,20 @@ NodeEntry::setSession(const NodeSessionIPtr& session)
{
_cache.getReplicaCache().nodeAdded(session->getNode());
}
-
+
+ //
+ // Clear the saved proxy, the node has established a session
+ // so we won't need anymore to try to register it with this
+ // registry.
+ //
+ _proxy = 0;
+
+ if(_registering)
+ {
+ _registering = false;
+ notifyAll();
+ }
+
if(session)
{
if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0)
@@ -310,10 +328,7 @@ NodePrx
NodeEntry::getProxy() const
{
Lock sync(*this);
- if(!_session)
- {
- throw NodeUnreachableException(_name, "the node is not active");
- }
+ checkSession();
return _session->getNode();
}
@@ -321,10 +336,7 @@ NodeInfo
NodeEntry::getInfo() const
{
Lock sync(*this);
- if(!_session)
- {
- throw NodeUnreachableException(_name, "the node is not active");
- }
+ checkSession();
return _session->getInfo();
}
@@ -344,19 +356,23 @@ LoadInfo
NodeEntry::getLoadInfoAndLoadFactor(const string& application, float& loadFactor) const
{
Lock sync(*this);
- if(!_session)
+ checkSession();
+
+ map<string, NodeDescriptor>::const_iterator p = _descriptors.find(application);
+ if(p == _descriptors.end())
{
- throw NodeUnreachableException(_name, "the node is not active");
+ throw NodeNotExistException(); // The node doesn't exist in the given application.
}
- map<string, NodeDescriptor>::const_iterator p = _descriptors.find(application);
+
+ //
+ // TODO: Cache the load factors? Parsing the load factor for each
+ // call could be costly.
+ //
loadFactor = -1.0f;
- if(p != _descriptors.end())
+ if(!p->second.loadFactor.empty())
{
- if(!p->second.loadFactor.empty())
- {
- istringstream is(p->second.loadFactor);
- is >> loadFactor;
- }
+ istringstream is(p->second.loadFactor);
+ is >> loadFactor;
}
if(loadFactor < 0.0f)
{
@@ -380,6 +396,7 @@ NodeEntry::getLoadInfoAndLoadFactor(const string& application, float& loadFactor
loadFactor = 1.0f;
}
}
+
return _session->getLoadInfo();
}
@@ -400,10 +417,7 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con
ServerDescriptorPtr desc;
{
Lock sync(*this);
- if(!_session)
- {
- throw NodeUnreachableException(_name, "the node is not active");
- }
+ checkSession();
node = _session->getNode();
timeout = _session->getTimeout();
try
@@ -472,10 +486,8 @@ ServerInfo
NodeEntry::getServerInfo(const ServerInfo& server, const SessionIPtr& session)
{
Lock sync(*this);
- if(!_session)
- {
- throw NodeUnreachableException(_name, "the node is not active");
- }
+ checkSession();
+
ServerInfo info = server;
info.descriptor = getServerDescriptor(server, session);
assert(info.descriptor);
@@ -530,7 +542,8 @@ NodeEntry::__decRef()
bool doRemove = false;
bool doDelete = false;
{
- Lock sync(*this);
+ Lock sync(*this); // We use a recursive mutex so it's fine to
+ // create Ptr with the mutex locked.
assert(_ref > 0);
--_ref;
@@ -553,3 +566,108 @@ NodeEntry::__decRef()
delete this;
}
}
+
+void
+NodeEntry::checkSession() const
+{
+ if(_session && !_session->isDestroyed())
+ {
+ return;
+ }
+ else if(!_proxy && !_registering)
+ {
+ throw NodeUnreachableException(_name, "the node is not active");
+ }
+ else if(_proxy)
+ {
+ //
+ // If the node proxy is set, we attempt to get the node to
+ // register with this registry.
+ //
+ assert(!_registering);
+
+ if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0)
+ {
+ Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat);
+ out << "creating node `" << _name << "' session";
+ }
+
+ NodeEntry* self = const_cast<NodeEntry*>(this);
+ //
+ // NOTE: setting _registering to true must be done before the
+ // call otherwise if the callback is call immediately we'll
+ // hang in the while loop.
+ //
+ _registering = true;
+ _proxy->registerWithReplica_async(new RegisterCB(self), _cache.getReplicaCache().getInternalRegistry());
+ _proxy = 0; // Registration with the proxy is only attempted once.
+ }
+
+ while(_registering)
+ {
+ wait();
+ }
+
+ if(!_session)
+ {
+ throw NodeUnreachableException(_name, "the node is not active");
+ }
+}
+
+void
+NodeEntry::setProxy(const NodePrx& node)
+{
+ Lock sync(*this);
+
+ //
+ // If the node has already established a session with the
+ // registry, no need to remember its proxy, we don't need to get
+ // it to register with this registry since it's already
+ // registered.
+ //
+ if(!_session)
+ {
+ _proxy = node;
+ }
+}
+
+void
+NodeEntry::finishedRegistration()
+{
+ Lock sync(*this);
+ if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0)
+ {
+ Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat);
+ if(_session)
+ {
+ out << "node `" << _name << "' session created";
+ }
+ else
+ {
+ out << "node `" << _name << "' session creation failed";
+ }
+ }
+
+ if(_registering)
+ {
+ _registering = false;
+ notifyAll();
+ }
+}
+
+void
+NodeEntry::finishedRegistration(const Ice::Exception& ex)
+{
+ Lock sync(*this);
+ if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0)
+ {
+ Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat);
+ out << "node `" << _name << "' session creation failed:\n" << ex;
+ }
+
+ if(_registering)
+ {
+ _registering = false;
+ notifyAll();
+ }
+}