summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp163
1 files changed, 71 insertions, 92 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index dbd3c058a65..22fd031b6f9 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -13,6 +13,7 @@
#include <IceGrid/TraceLevels.h>
#include <IceGrid/Util.h>
#include <IceGrid/DescriptorHelper.h>
+#include <IceGrid/NodeSessionI.h>
#include <algorithm>
#include <functional>
@@ -22,7 +23,6 @@ using namespace std;
using namespace IceGrid;
const string Database::_descriptorDbName = "applications";
-const string Database::_nodeDbName = "nodes";
const string Database::_adapterDbName = "adapters";
const string Database::_objectDbName = "objects";
@@ -186,14 +186,15 @@ private:
Database::Database(const Ice::CommunicatorPtr& communicator,
const Ice::ObjectAdapterPtr& adapter,
const string& envName,
+ int nodeSessionTimeout,
const TraceLevelsPtr& traceLevels) :
_communicator(communicator),
_internalAdapter(adapter),
_envName(envName),
+ _nodeSessionTimeout(nodeSessionTimeout),
_traceLevels(traceLevels),
_connection(Freeze::createConnection(communicator, envName)),
_descriptors(_connection, _descriptorDbName),
- _nodes(_connection, _nodeDbName),
_objects(_connection, _objectDbName),
_adapters(_connection, _adapterDbName)
{
@@ -466,130 +467,90 @@ Database::getAllApplications(const string& expression)
}
void
-Database::addNode(const string& name, const NodePrx& node)
+Database::addNode(const string& name, const NodeSessionIPtr& node)
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringApplicationDescriptorDict descriptors(connection, _descriptorDbName);
-
- while(true)
+ ServerEntrySeq entries;
{
- NodePrx oldNode;
- try
+ Lock sync(*this);
+
+ if(_nodes.find(name) != _nodes.end())
{
- oldNode = getNode(name);
- oldNode->ice_ping();
throw NodeActiveException();
}
- catch(const NodeNotExistException&)
- {
- }
- catch(const Ice::LocalException&)
+
+ if(_traceLevels->node > 0)
{
+ Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
+ out << "added node `" << name << "'";
}
- IceUtil::Mutex::Lock sync(*this);
-
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringObjectProxyDict dict(connection, _nodeDbName);
+ _nodes.insert(make_pair(name, node));
- StringObjectProxyDict::iterator p = dict.find(name);
- if(p != dict.end())
+ //
+ // Get all the node servers and see if they need to be synced.
+ //
+ map<string, set<string> >::const_iterator p = _serversByNode.find(name);
+ if(p == _serversByNode.end())
{
- if(oldNode && oldNode != p->second)
- {
- continue;
- }
-
- p.set(node);
-
- if(_traceLevels->node > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
- out << "updated node `" << name << "' proxy";
- }
+ return;
}
- else
+ for(set<string>::const_iterator q = p->second.begin() ; q != p->second.end(); ++q)
{
- dict.put(make_pair(name, node));
-
- if(_traceLevels->node > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
- out << "added node `" << name << "'";
- }
+ ServerEntryPtr entry = _servers[*q];
+ assert(entry);
+ if(entry->needsSync())
+ {
+ entries.push_back(entry);
+ }
}
-
- setAdapterDirectProxy("IceGrid.Node." + name, node);
- break;
}
+
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync));
}
NodePrx
Database::getNode(const string& name) const
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringObjectProxyDict dict(connection, _nodeDbName);
+ Lock sync(*this);
- StringObjectProxyDict::iterator p = dict.find(name);
- if(p != dict.end())
+ map<string, NodeSessionIPtr>::const_iterator p = _nodes.find(name);
+ if(p == _nodes.end())
{
- try
- {
- return NodePrx::checkedCast(p->second);
- }
- catch(const Ice::ObjectNotExistException&)
+ if(_serversByNode.find(name) == _serversByNode.end())
{
+ throw NodeNotExistException();
}
- catch(const Ice::LocalException&)
+ else
{
- return NodePrx::uncheckedCast(p->second);
+ throw NodeUnreachableException();
}
}
- throw NodeNotExistException();
+ return p->second->getNode();
}
void
Database::removeNode(const string& name)
{
- //
- // TODO: Remove the node servers
- //
-
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringObjectProxyDict dict(connection, _nodeDbName);
-
- StringObjectProxyDict::iterator p = dict.find(name);
- if(p == dict.end())
- {
- throw NodeNotExistException();
- }
-
- dict.erase(p);
-
- setAdapterDirectProxy("IceGrid.Node." + name, 0);
-
- if(_traceLevels->node > 0)
+ Lock sync(*this);
+ if(_nodes.erase(name) > 0)
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
- out << "removed node `" << name << "'";
+ if(_traceLevels->node > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
+ out << "removed node `" << name << "'";
+ }
}
}
Ice::StringSeq
Database::getAllNodes(const string& expression)
{
+ Lock sync(*this);
set<string> nodes;
- {
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringObjectProxyDict dict(connection, _nodeDbName);
- Ice::StringSeq r = getMatchingKeys<StringObjectProxyDict>(dict, expression);
- nodes.insert(r.begin(), r.end());
- }
- {
- Lock sync(*this);
- Ice::StringSeq r = getMatchingKeys<map<string, set<string> > >(_serversByNode, expression);
- nodes.insert(r.begin(), r.end());
- }
+ Ice::StringSeq r = getMatchingKeys<map<string, set<string> > >(_serversByNode, expression);
+ nodes.insert(r.begin(), r.end());
+ r = getMatchingKeys<map<string, NodeSessionIPtr> >(_nodes, expression);
+ nodes.insert(r.begin(), r.end());
return Ice::StringSeq(nodes.begin(), nodes.end());
}
@@ -1146,6 +1107,13 @@ Database::ServerEntry::sync()
}
}
+bool
+Database::ServerEntry::needsSync() const
+{
+ Lock sync(*this);
+ return _failed;
+}
+
void
Database::ServerEntry::update(const ServerDescriptorPtr& descriptor)
{
@@ -1256,6 +1224,7 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters)
}
_synchronizing = true;
+ _failed = false;
load = _load;
destroy = _destroy;
}
@@ -1267,7 +1236,7 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters)
{
try
{
- _database.getNode(destroy->node)->destroyServer(destroy);
+ _database.getNode(destroy->node)->destroyServer(destroy->name);
}
catch(const NodeNotExistException& ex)
{
@@ -1276,7 +1245,7 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters)
throw NodeUnreachableException();
}
}
- catch(Ice::LocalException& ex)
+ catch(const Ice::LocalException& ex)
{
if(!load)
{
@@ -1295,7 +1264,11 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters)
{
throw NodeUnreachableException();
}
- catch(Ice::LocalException& ex)
+ catch(const DeploymentException& ex)
+ {
+ // TODO: Warning
+ }
+ catch(const Ice::LocalException& ex)
{
throw NodeUnreachableException();
}
@@ -1307,6 +1280,7 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters)
Lock sync(*this);
_synchronizing = false;
_destroy = 0;
+ _failed = true;
notifyAll();
}
if(!load && destroy)
@@ -1322,8 +1296,13 @@ Database::ServerEntry::sync(map<string, AdapterPrx>& adapters)
_loaded = _load;
_load = 0;
_destroy = 0;
- _proxy = proxy;
- _adapters = adapters;
+ _proxy = proxy ? ServerPrx::uncheckedCast(proxy->ice_timeout(_database._nodeSessionTimeout)) : ServerPrx();
+ _adapters.clear();
+ for(StringAdapterPrxDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
+ {
+ AdapterPrx adapter = AdapterPrx::uncheckedCast(p->second->ice_timeout(_database._nodeSessionTimeout));
+ _adapters.insert(make_pair(p->first, adapter));
+ }
notifyAll();
}
if(!load && destroy)