diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-11-23 14:44:51 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-11-23 14:44:51 +0000 |
commit | ad476fdda5c9a9f23b9e65fc3c4b3016f7704848 (patch) | |
tree | 329b46efefd33095fe3ed8dc92ef3d8726cd0857 /cpp/src/IceGrid/NodeCache.cpp | |
parent | Remove deprecated methods (diff) | |
download | ice-ad476fdda5c9a9f23b9e65fc3c4b3016f7704848.tar.bz2 ice-ad476fdda5c9a9f23b9e65fc3c4b3016f7704848.tar.xz ice-ad476fdda5c9a9f23b9e65fc3c4b3016f7704848.zip |
Code cleanup and fixed startup scalability issue of the registry (it no
longer contacts all the nodes).
Diffstat (limited to 'cpp/src/IceGrid/NodeCache.cpp')
-rw-r--r-- | cpp/src/IceGrid/NodeCache.cpp | 238 |
1 files changed, 178 insertions, 60 deletions
diff --git a/cpp/src/IceGrid/NodeCache.cpp b/cpp/src/IceGrid/NodeCache.cpp index 21619cc5be7..a43e3335798 100644 --- a/cpp/src/IceGrid/NodeCache.cpp +++ b/cpp/src/IceGrid/NodeCache.cpp @@ -66,17 +66,6 @@ public: { ex.ice_throw(); } - catch(const NodeNotExistException& ex) - { - if(_traceLevels && _traceLevels->server > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "couldn't load `" << _id << "' on node `" << _node << "':\n" << ex; - } - ostringstream os; - os << ex; - _server->exception(NodeUnreachableException(_node, os.str())); - } catch(const DeploymentException& ex) { if(_traceLevels && _traceLevels->server > 1) @@ -139,17 +128,6 @@ public: { ex.ice_throw(); } - catch(const NodeNotExistException& ex) - { - if(_traceLevels && _traceLevels->server > 1) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->serverCat); - out << "couldn't unload `" << _id << "' on node `" << _node << "':\n" << ex; - } - ostringstream os; - os << ex; - _server->exception(NodeUnreachableException(_node, os.str())); - } catch(const DeploymentException& ex) { if(_traceLevels && _traceLevels->server > 1) @@ -183,6 +161,30 @@ private: const string _node; }; +class RegisterCB : public AMI_Node_registerWithReplica +{ +public: + + RegisterCB(const NodeEntryPtr& node) : _node(node) + { + } + + void + ice_response() + { + _node->finishedRegistration(); + } + + void + ice_exception(const Ice::Exception& ex) + { + _node->finishedRegistration(ex); + } + +private: + const NodeEntryPtr _node; +}; + }; NodeCache::NodeCache(const Ice::CommunicatorPtr& communicator, ReplicaCache& replicaCache, bool master) : @@ -215,7 +217,8 @@ NodeCache::get(const string& name, bool create) const NodeEntry::NodeEntry(NodeCache& cache, const std::string& name) : _cache(cache), _ref(0), - _name(name) + _name(name), + _registering(false) { } @@ -256,19 +259,21 @@ NodeEntry::setSession(const NodeSessionIPtr& session) { Lock sync(*this); - if(session) + if(session && _session) { - // If the current session has just been destroyed, wait for the setSession(0) call. - assert(session != _session); - while(_session && _session->isDestroyed()) + if(_session->isDestroyed()) { - wait(); + // If the current session has just been destroyed, wait for the setSession(0) call. + assert(session != _session); + while(_session) + { + wait(); + } + } + else + { + throw NodeActiveException(); } - } - - if(session && _session) - { - throw NodeActiveException(); } else if(!session && !_session) { @@ -287,7 +292,20 @@ NodeEntry::setSession(const NodeSessionIPtr& session) { _cache.getReplicaCache().nodeAdded(session->getNode()); } - + + // + // Clear the saved proxy, the node has established a session + // so we won't need anymore to try to register it with this + // registry. + // + _proxy = 0; + + if(_registering) + { + _registering = false; + notifyAll(); + } + if(session) { if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0) @@ -310,10 +328,7 @@ NodePrx NodeEntry::getProxy() const { Lock sync(*this); - if(!_session) - { - throw NodeUnreachableException(_name, "the node is not active"); - } + checkSession(); return _session->getNode(); } @@ -321,10 +336,7 @@ NodeInfo NodeEntry::getInfo() const { Lock sync(*this); - if(!_session) - { - throw NodeUnreachableException(_name, "the node is not active"); - } + checkSession(); return _session->getInfo(); } @@ -344,19 +356,23 @@ LoadInfo NodeEntry::getLoadInfoAndLoadFactor(const string& application, float& loadFactor) const { Lock sync(*this); - if(!_session) + checkSession(); + + map<string, NodeDescriptor>::const_iterator p = _descriptors.find(application); + if(p == _descriptors.end()) { - throw NodeUnreachableException(_name, "the node is not active"); + throw NodeNotExistException(); // The node doesn't exist in the given application. } - map<string, NodeDescriptor>::const_iterator p = _descriptors.find(application); + + // + // TODO: Cache the load factors? Parsing the load factor for each + // call could be costly. + // loadFactor = -1.0f; - if(p != _descriptors.end()) + if(!p->second.loadFactor.empty()) { - if(!p->second.loadFactor.empty()) - { - istringstream is(p->second.loadFactor); - is >> loadFactor; - } + istringstream is(p->second.loadFactor); + is >> loadFactor; } if(loadFactor < 0.0f) { @@ -380,6 +396,7 @@ NodeEntry::getLoadInfoAndLoadFactor(const string& application, float& loadFactor loadFactor = 1.0f; } } + return _session->getLoadInfo(); } @@ -400,10 +417,7 @@ NodeEntry::loadServer(const ServerEntryPtr& entry, const ServerInfo& server, con ServerDescriptorPtr desc; { Lock sync(*this); - if(!_session) - { - throw NodeUnreachableException(_name, "the node is not active"); - } + checkSession(); node = _session->getNode(); timeout = _session->getTimeout(); try @@ -472,10 +486,8 @@ ServerInfo NodeEntry::getServerInfo(const ServerInfo& server, const SessionIPtr& session) { Lock sync(*this); - if(!_session) - { - throw NodeUnreachableException(_name, "the node is not active"); - } + checkSession(); + ServerInfo info = server; info.descriptor = getServerDescriptor(server, session); assert(info.descriptor); @@ -530,7 +542,8 @@ NodeEntry::__decRef() bool doRemove = false; bool doDelete = false; { - Lock sync(*this); + Lock sync(*this); // We use a recursive mutex so it's fine to + // create Ptr with the mutex locked. assert(_ref > 0); --_ref; @@ -553,3 +566,108 @@ NodeEntry::__decRef() delete this; } } + +void +NodeEntry::checkSession() const +{ + if(_session && !_session->isDestroyed()) + { + return; + } + else if(!_proxy && !_registering) + { + throw NodeUnreachableException(_name, "the node is not active"); + } + else if(_proxy) + { + // + // If the node proxy is set, we attempt to get the node to + // register with this registry. + // + assert(!_registering); + + if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0) + { + Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat); + out << "creating node `" << _name << "' session"; + } + + NodeEntry* self = const_cast<NodeEntry*>(this); + // + // NOTE: setting _registering to true must be done before the + // call otherwise if the callback is call immediately we'll + // hang in the while loop. + // + _registering = true; + _proxy->registerWithReplica_async(new RegisterCB(self), _cache.getReplicaCache().getInternalRegistry()); + _proxy = 0; // Registration with the proxy is only attempted once. + } + + while(_registering) + { + wait(); + } + + if(!_session) + { + throw NodeUnreachableException(_name, "the node is not active"); + } +} + +void +NodeEntry::setProxy(const NodePrx& node) +{ + Lock sync(*this); + + // + // If the node has already established a session with the + // registry, no need to remember its proxy, we don't need to get + // it to register with this registry since it's already + // registered. + // + if(!_session) + { + _proxy = node; + } +} + +void +NodeEntry::finishedRegistration() +{ + Lock sync(*this); + if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0) + { + Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat); + if(_session) + { + out << "node `" << _name << "' session created"; + } + else + { + out << "node `" << _name << "' session creation failed"; + } + } + + if(_registering) + { + _registering = false; + notifyAll(); + } +} + +void +NodeEntry::finishedRegistration(const Ice::Exception& ex) +{ + Lock sync(*this); + if(_cache.getTraceLevels() && _cache.getTraceLevels()->node > 0) + { + Ice::Trace out(_cache.getTraceLevels()->logger, _cache.getTraceLevels()->nodeCat); + out << "node `" << _name << "' session creation failed:\n" << ex; + } + + if(_registering) + { + _registering = false; + notifyAll(); + } +} |