diff options
author | Benoit Foucher <benoit@zeroc.com> | 2005-06-08 14:29:54 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2005-06-08 14:29:54 +0000 |
commit | bd4c28900c1e5306bea0a4cb2448f83d29dfa5e9 (patch) | |
tree | cabdd6919e7747b08ee6f076fea6cbf549c8c069 /cpp/src/IceGrid/Database.cpp | |
parent | Fixed but reported in http://www.zeroc.com/vbulletin/showthread.php?t=1480 (diff) | |
download | ice-bd4c28900c1e5306bea0a4cb2448f83d29dfa5e9.tar.bz2 ice-bd4c28900c1e5306bea0a4cb2448f83d29dfa5e9.tar.xz ice-bd4c28900c1e5306bea0a4cb2448f83d29dfa5e9.zip |
Added node session support.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 163 |
1 files changed, 71 insertions, 92 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index dbd3c058a65..22fd031b6f9 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -13,6 +13,7 @@ #include <IceGrid/TraceLevels.h> #include <IceGrid/Util.h> #include <IceGrid/DescriptorHelper.h> +#include <IceGrid/NodeSessionI.h> #include <algorithm> #include <functional> @@ -22,7 +23,6 @@ using namespace std; using namespace IceGrid; const string Database::_descriptorDbName = "applications"; -const string Database::_nodeDbName = "nodes"; const string Database::_adapterDbName = "adapters"; const string Database::_objectDbName = "objects"; @@ -186,14 +186,15 @@ private: Database::Database(const Ice::CommunicatorPtr& communicator, const Ice::ObjectAdapterPtr& adapter, const string& envName, + int nodeSessionTimeout, const TraceLevelsPtr& traceLevels) : _communicator(communicator), _internalAdapter(adapter), _envName(envName), + _nodeSessionTimeout(nodeSessionTimeout), _traceLevels(traceLevels), _connection(Freeze::createConnection(communicator, envName)), _descriptors(_connection, _descriptorDbName), - _nodes(_connection, _nodeDbName), _objects(_connection, _objectDbName), _adapters(_connection, _adapterDbName) { @@ -466,130 +467,90 @@ Database::getAllApplications(const string& expression) } void -Database::addNode(const string& name, const NodePrx& node) +Database::addNode(const string& name, const NodeSessionIPtr& node) { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringApplicationDescriptorDict descriptors(connection, _descriptorDbName); - - while(true) + ServerEntrySeq entries; { - NodePrx oldNode; - try + Lock sync(*this); + + if(_nodes.find(name) != _nodes.end()) { - oldNode = getNode(name); - oldNode->ice_ping(); throw NodeActiveException(); } - catch(const NodeNotExistException&) - { - } - catch(const Ice::LocalException&) + + if(_traceLevels->node > 0) { + Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); + out << "added node `" << name << "'"; } - IceUtil::Mutex::Lock sync(*this); - - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxyDict dict(connection, _nodeDbName); + _nodes.insert(make_pair(name, node)); - StringObjectProxyDict::iterator p = dict.find(name); - if(p != dict.end()) + // + // Get all the node servers and see if they need to be synced. + // + map<string, set<string> >::const_iterator p = _serversByNode.find(name); + if(p == _serversByNode.end()) { - if(oldNode && oldNode != p->second) - { - continue; - } - - p.set(node); - - if(_traceLevels->node > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); - out << "updated node `" << name << "' proxy"; - } + return; } - else + for(set<string>::const_iterator q = p->second.begin() ; q != p->second.end(); ++q) { - dict.put(make_pair(name, node)); - - if(_traceLevels->node > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); - out << "added node `" << name << "'"; - } + ServerEntryPtr entry = _servers[*q]; + assert(entry); + if(entry->needsSync()) + { + entries.push_back(entry); + } } - - setAdapterDirectProxy("IceGrid.Node." + name, node); - break; } + + for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync)); } NodePrx Database::getNode(const string& name) const { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxyDict dict(connection, _nodeDbName); + Lock sync(*this); - StringObjectProxyDict::iterator p = dict.find(name); - if(p != dict.end()) + map<string, NodeSessionIPtr>::const_iterator p = _nodes.find(name); + if(p == _nodes.end()) { - try - { - return NodePrx::checkedCast(p->second); - } - catch(const Ice::ObjectNotExistException&) + if(_serversByNode.find(name) == _serversByNode.end()) { + throw NodeNotExistException(); } - catch(const Ice::LocalException&) + else { - return NodePrx::uncheckedCast(p->second); + throw NodeUnreachableException(); } } - throw NodeNotExistException(); + return p->second->getNode(); } void Database::removeNode(const string& name) { - // - // TODO: Remove the node servers - // - - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxyDict dict(connection, _nodeDbName); - - StringObjectProxyDict::iterator p = dict.find(name); - if(p == dict.end()) - { - throw NodeNotExistException(); - } - - dict.erase(p); - - setAdapterDirectProxy("IceGrid.Node." + name, 0); - - if(_traceLevels->node > 0) + Lock sync(*this); + if(_nodes.erase(name) > 0) { - Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); - out << "removed node `" << name << "'"; + if(_traceLevels->node > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat); + out << "removed node `" << name << "'"; + } } } Ice::StringSeq Database::getAllNodes(const string& expression) { + Lock sync(*this); set<string> nodes; - { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringObjectProxyDict dict(connection, _nodeDbName); - Ice::StringSeq r = getMatchingKeys<StringObjectProxyDict>(dict, expression); - nodes.insert(r.begin(), r.end()); - } - { - Lock sync(*this); - Ice::StringSeq r = getMatchingKeys<map<string, set<string> > >(_serversByNode, expression); - nodes.insert(r.begin(), r.end()); - } + Ice::StringSeq r = getMatchingKeys<map<string, set<string> > >(_serversByNode, expression); + nodes.insert(r.begin(), r.end()); + r = getMatchingKeys<map<string, NodeSessionIPtr> >(_nodes, expression); + nodes.insert(r.begin(), r.end()); return Ice::StringSeq(nodes.begin(), nodes.end()); } @@ -1146,6 +1107,13 @@ Database::ServerEntry::sync() } } +bool +Database::ServerEntry::needsSync() const +{ + Lock sync(*this); + return _failed; +} + void Database::ServerEntry::update(const ServerDescriptorPtr& descriptor) { @@ -1256,6 +1224,7 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters) } _synchronizing = true; + _failed = false; load = _load; destroy = _destroy; } @@ -1267,7 +1236,7 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters) { try { - _database.getNode(destroy->node)->destroyServer(destroy); + _database.getNode(destroy->node)->destroyServer(destroy->name); } catch(const NodeNotExistException& ex) { @@ -1276,7 +1245,7 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters) throw NodeUnreachableException(); } } - catch(Ice::LocalException& ex) + catch(const Ice::LocalException& ex) { if(!load) { @@ -1295,7 +1264,11 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters) { throw NodeUnreachableException(); } - catch(Ice::LocalException& ex) + catch(const DeploymentException& ex) + { + // TODO: Warning + } + catch(const Ice::LocalException& ex) { throw NodeUnreachableException(); } @@ -1307,6 +1280,7 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters) Lock sync(*this); _synchronizing = false; _destroy = 0; + _failed = true; notifyAll(); } if(!load && destroy) @@ -1322,8 +1296,13 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters) _loaded = _load; _load = 0; _destroy = 0; - _proxy = proxy; - _adapters = adapters; + _proxy = proxy ? ServerPrx::uncheckedCast(proxy->ice_timeout(_database._nodeSessionTimeout)) : ServerPrx(); + _adapters.clear(); + for(StringAdapterPrxDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p) + { + AdapterPrx adapter = AdapterPrx::uncheckedCast(p->second->ice_timeout(_database._nodeSessionTimeout)); + _adapters.insert(make_pair(p->first, adapter)); + } notifyAll(); } if(!load && destroy) |