summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2005-07-08 13:47:30 +0000
committerBenoit Foucher <benoit@zeroc.com>2005-07-08 13:47:30 +0000
commit99894938dbde9a0bb10fc1998d9863cae52b8977 (patch)
tree5b68e52e9a62792f11d8a4e2c844d394cd39f496 /cpp/src/IceGrid/Database.cpp
parentFixed Ice interoperability issue (diff)
downloadice-99894938dbde9a0bb10fc1998d9863cae52b8977.tar.bz2
ice-99894938dbde9a0bb10fc1998d9863cae52b8977.tar.xz
ice-99894938dbde9a0bb10fc1998d9863cae52b8977.zip
More adapter replication changes.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp831
1 files changed, 217 insertions, 614 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index ffe0ba34210..67d4c9aab9c 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -29,25 +29,9 @@ const string Database::_objectDbName = "objects";
namespace IceGrid
{
-struct AddComponent : std::unary_function<ComponentDescriptorPtr&, void>
-{
- AddComponent(Database& database, const Database::ServerEntryPtr& entry) : _database(database), _entry(entry)
- {
- }
-
- void
- operator()(const ComponentDescriptorPtr& desc)
- {
- _database.addComponent(_entry, desc);
- }
-
- Database& _database;
- const Database::ServerEntryPtr _entry;
-};
-
struct AddAdapterId : std::unary_function<ComponentDescriptorPtr&, void>
{
- AddAdapterId(set<string>& ids) : _ids(ids)
+ AddAdapterId(set<string>& ids, set<string>& replicatedIds) : _ids(ids), _replicatedIds(replicatedIds)
{
}
@@ -59,20 +43,40 @@ struct AddAdapterId : std::unary_function<ComponentDescriptorPtr&, void>
if(p->id.empty())
{
DeploymentException ex;
- ex.reason = "invalid descriptor: empty adapter id for adapter `" + p->name + "' in `" + desc->name
- + "'";
+ ex.reason = "empty adapter id for adapter `" + p->name + "' in `" + desc->name + "'";
throw ex;
}
- if(!_ids.insert(p->id).second)
+ if(!_ids.insert(p->id).second && _replicatedIds.find(p->id) == _replicatedIds.end())
{
DeploymentException ex;
- ex.reason = "invalid descriptor: duplicated adapter id `" + p->id + "'";
+ ex.reason = "duplicated adapter id `" + p->id + "'";
throw ex;
}
}
}
set<string>& _ids;
+ const set<string>& _replicatedIds;
+};
+
+struct AddReplicatedAdapterId : std::unary_function<ReplicatedAdapterDescriptor&, void>
+{
+ AddReplicatedAdapterId(set<string>& ids) : _ids(ids)
+ {
+ }
+
+ void
+ operator()(const ReplicatedAdapterDescriptor& desc)
+ {
+ if(!_ids.insert(desc.id).second)
+ {
+ DeploymentException ex;
+ ex.reason = "duplicated replicated adapter id `" + desc.id + "'";
+ throw ex;
+ }
+ }
+
+ set<string>& _ids;
};
struct AddObjectId : std::unary_function<ComponentDescriptorPtr&, void>
@@ -88,17 +92,10 @@ struct AddObjectId : std::unary_function<ComponentDescriptorPtr&, void>
{
for(ObjectDescriptorSeq::const_iterator q = p->objects.begin(); q != p->objects.end(); ++q)
{
- if(!q->proxy)
- {
- DeploymentException ex;
- ex.reason = "invalid descriptor: object proxy is null in `" + desc->name + "'";
- throw ex;
- }
- if(!_ids.insert(q->proxy->ice_getIdentity()).second)
+ if(!_ids.insert(q->id).second)
{
DeploymentException ex;
- ex.reason = "invalid descriptor: duplicated object id `" +
- Ice::identityToString(q->proxy->ice_getIdentity()) + "'";
+ ex.reason = "duplicated object id `" + Ice::identityToString(q->id) + "'";
throw ex;
}
}
@@ -192,6 +189,8 @@ Database::Database(const Ice::ObjectAdapterPtr& adapter,
_envName(envName),
_nodeSessionTimeout(nodeSessionTimeout),
_traceLevels(traceLevels),
+ _objectCache(_communicator),
+ _serverCache(*this, _nodeCache, _adapterCache, _objectCache),
_connection(Freeze::createConnection(adapter->getCommunicator(), envName)),
_descriptors(_connection, _descriptorDbName),
_objects(_connection, _objectDbName),
@@ -204,10 +203,16 @@ Database::Database(const Ice::ObjectAdapterPtr& adapter,
_internalAdapter->addServantLocator(new AdapterServantLocator(this), "IceGridAdapter");
//
- // Cache the servers.
+ // Cache the servers & adapters.
//
for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p)
{
+ for(ReplicatedAdapterDescriptorSeq::const_iterator r = p->second->replicatedAdapters.begin();
+ r != p->second->replicatedAdapters.end(); ++r)
+ {
+ _adapterCache.get(r->id, true)->enableReplication(r->loadBalancing);
+ }
+
ServerInstanceDescriptorSeq::const_iterator q;
for(q = p->second->servers.begin(); q != p->second->servers.end(); ++q)
{
@@ -329,9 +334,25 @@ Database::addApplicationDescriptor(ObserverSessionI* session, const ApplicationD
ex.reason = "server `" + e.name + "' is already registered";
throw ex;
}
+
+ set<string> replicatedAdapterIds;
+ AddReplicatedAdapterId addReplicatedAdpt(replicatedAdapterIds);
+ for_each(descriptor->replicatedAdapters.begin(), descriptor->replicatedAdapters.end(), addReplicatedAdpt);
+ try
+ {
+ ObjFunc<Database, const string&> func = objFunc(*this, &Database::checkAdapterForAddition);
+ for_each(replicatedAdapterIds.begin(), replicatedAdapterIds.end(), func);
+ }
+ catch(const AdapterExistsException& e)
+ {
+ DeploymentException ex;
+ ex.reason = "replicated adapter `" + e.id + "' is already registered";
+ throw ex;
+ }
set<string> adapterIds;
- for_each(descriptor->servers.begin(), descriptor->servers.end(), forEachComponent(AddAdapterId(adapterIds)));
+ AddAdapterId addAdpt(adapterIds, replicatedAdapterIds);
+ for_each(descriptor->servers.begin(), descriptor->servers.end(), forEachComponent(addAdpt));
try
{
for_each(adapterIds.begin(), adapterIds.end(), objFunc(*this, &Database::checkAdapterForAddition));
@@ -357,6 +378,15 @@ Database::addApplicationDescriptor(ObserverSessionI* session, const ApplicationD
}
//
+ // Register the replicated adapters.
+ //
+ for(ReplicatedAdapterDescriptorSeq::const_iterator p = descriptor->replicatedAdapters.begin();
+ p != descriptor->replicatedAdapters.end(); ++p)
+ {
+ _adapterCache.get(p->id, true)->enableReplication(p->loadBalancing);
+ }
+
+ //
// Register the application servers.
//
addServers(descriptor->name, descriptor->servers, servers, entries);
@@ -383,7 +413,7 @@ Database::addApplicationDescriptor(ObserverSessionI* session, const ApplicationD
//
// Synchronize the servers on the nodes.
//
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
void
@@ -440,7 +470,7 @@ Database::updateApplicationDescriptor(ObserverSessionI* session, const Applicati
out << "updated application `" << update.name << "'";
}
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
void
@@ -495,7 +525,7 @@ Database::syncApplicationDescriptor(ObserverSessionI* session, const Application
out << "synced application `" << newDesc->name << "'";
}
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
void
@@ -528,13 +558,41 @@ Database::syncApplicationDescriptorNoSync(const ApplicationDescriptorPtr& origDe
}
//
+ // Ensure that the new application replicated adapters aren't
+ // already registered.
+ //
+ set<string> oldReplicatedAdapterIds;
+ set<string> newReplicatedAdapterIds;
+ AddReplicatedAdapterId addOldReplicatedAdpt(oldReplicatedAdapterIds);
+ for_each(origDesc->replicatedAdapters.begin(), origDesc->replicatedAdapters.end(), addOldReplicatedAdpt);
+ AddReplicatedAdapterId addNewReplicatedAdpt(newReplicatedAdapterIds);
+ for_each(newDesc->replicatedAdapters.begin(), newDesc->replicatedAdapters.end(), addNewReplicatedAdpt);
+
+ set<string> addedReplicatedAdpts;
+ set_difference(newReplicatedAdapterIds.begin(), newReplicatedAdapterIds.end(), oldReplicatedAdapterIds.begin(),
+ oldReplicatedAdapterIds.end(), set_inserter(addedReplicatedAdpts));
+ try
+ {
+ ObjFunc<Database, const string&> func = objFunc(*this, &Database::checkAdapterForAddition);
+ for_each(addedReplicatedAdpts.begin(), addedReplicatedAdpts.end(), func);
+ }
+ catch(const AdapterExistsException& e)
+ {
+ DeploymentException ex;
+ ex.reason = "replicated adapter `" + e.id + "' is already registered";
+ throw ex;
+ }
+
+ //
// Ensure that the new application adapters aren't already
// registered.
//
set<string> oldAdpts;
set<string> newAdpts;
- for_each(origDesc->servers.begin(), origDesc->servers.end(), forEachComponent(AddAdapterId(oldAdpts)));
- for_each(newDesc->servers.begin(), newDesc->servers.end(), forEachComponent(AddAdapterId(newAdpts)));
+ AddAdapterId addOldAdpt(oldAdpts, oldReplicatedAdapterIds);
+ for_each(origDesc->servers.begin(), origDesc->servers.end(), forEachComponent(addOldAdpt));
+ AddAdapterId addNewAdpt(newAdpts, newReplicatedAdapterIds);
+ for_each(newDesc->servers.begin(), newDesc->servers.end(), forEachComponent(addNewAdpt));
set<string> addedAdpts;
set_difference(newAdpts.begin(), newAdpts.end(), oldAdpts.begin(), oldAdpts.end(), set_inserter(addedAdpts));
@@ -572,6 +630,21 @@ Database::syncApplicationDescriptorNoSync(const ApplicationDescriptorPtr& origDe
}
//
+ // Update the replicated adapters.
+ //
+ for(ReplicatedAdapterDescriptorSeq::const_iterator p = origDesc->replicatedAdapters.begin();
+ p != origDesc->replicatedAdapters.end(); ++p)
+ {
+ _adapterCache.get(p->id)->disableReplication();
+ }
+ for(ReplicatedAdapterDescriptorSeq::const_iterator p = newDesc->replicatedAdapters.begin();
+ p != newDesc->replicatedAdapters.end(); ++p)
+ {
+ _adapterCache.get(p->id, true)->enableReplication(p->loadBalancing);
+ }
+
+
+ //
// Register the new servers, unregister the old ones and
// update the updated ones.
//
@@ -601,6 +674,12 @@ Database::removeApplicationDescriptor(ObserverSessionI* session, const std::stri
descriptor = p->second;
_descriptors.erase(p);
+ for(ReplicatedAdapterDescriptorSeq::const_iterator q = descriptor->replicatedAdapters.begin();
+ q != descriptor->replicatedAdapters.end(); ++q)
+ {
+ _adapterCache.get(q->id)->disableReplication();
+ }
+
set<string> servers;
for_each(descriptor->servers.begin(), descriptor->servers.end(), AddServerName(servers));
removeServers(descriptor->name, descriptor->servers, servers, entries);
@@ -619,7 +698,7 @@ Database::removeApplicationDescriptor(ObserverSessionI* session, const std::stri
out << "removed application `" << name << "'";
}
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
ApplicationDescriptorPtr
@@ -648,81 +727,21 @@ Database::getAllApplications(const string& expression)
}
void
-Database::addNode(const string& name, const NodeSessionIPtr& node)
+Database::addNode(const string& name, const NodeSessionIPtr& session)
{
- ServerEntrySeq entries;
- {
- Lock sync(*this);
-
- if(_nodes.find(name) != _nodes.end())
- {
- throw NodeActiveException();
- }
-
- if(_traceLevels->node > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
- out << "added node `" << name << "'";
- }
-
- _nodes.insert(make_pair(name, node));
-
- //
- // Get all the node servers and see if they need to be synced.
- //
- map<string, set<string> >::const_iterator p = _serversByNode.find(name);
- if(p == _serversByNode.end())
- {
- return;
- }
- for(set<string>::const_iterator q = p->second.begin() ; q != p->second.end(); ++q)
- {
- ServerEntryPtr entry = _servers[*q];
- assert(entry);
- if(entry->needsSync())
- {
- entries.push_back(entry);
- }
- }
- }
-
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&Database::ServerEntry::sync));
+ _nodeCache.get(name, true)->setSession(session);
}
NodePrx
Database::getNode(const string& name) const
{
- Lock sync(*this);
-
- map<string, NodeSessionIPtr>::const_iterator p = _nodes.find(name);
- if(p == _nodes.end())
- {
- if(_serversByNode.find(name) == _serversByNode.end())
- {
- throw NodeNotExistException();
- }
- else
- {
- throw NodeUnreachableException();
- }
- }
- return p->second->getNode();
+ return _nodeCache.get(name)->getProxy();
}
void
Database::removeNode(const string& name)
{
- {
- Lock sync(*this);
- if(_nodes.erase(name) > 0)
- {
- if(_traceLevels->node > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->nodeCat);
- out << "removed node `" << name << "'";
- }
- }
- }
+ _nodeCache.get(name)->setSession(0);
try
{
@@ -737,13 +756,7 @@ Database::removeNode(const string& name)
Ice::StringSeq
Database::getAllNodes(const string& expression)
{
- Lock sync(*this);
- set<string> nodes;
- Ice::StringSeq r = getMatchingKeys<map<string, set<string> > >(_serversByNode, expression);
- nodes.insert(r.begin(), r.end());
- r = getMatchingKeys<map<string, NodeSessionIPtr> >(_nodes, expression);
- nodes.insert(r.begin(), r.end());
- return Ice::StringSeq(nodes.begin(), nodes.end());
+ return _nodeCache.getAll(expression);
}
ServerInstanceDescriptor
@@ -767,16 +780,7 @@ Database::getServerDescriptor(const std::string& name)
string
Database::getServerApplication(const string& name)
{
- Lock sync(*this);
- map<string, string>::const_iterator p = _applicationsByServerName.find(name);
- if(p == _applicationsByServerName.end())
- {
- ServerNotExistException ex;
- ex.name = name;
- throw ex;
- }
-
- return p->second;
+ return _serverCache.get(name)->getApplication();
}
ServerPrx
@@ -789,41 +793,19 @@ Database::getServer(const string& name)
ServerPrx
Database::getServerWithTimeouts(const string& name, int& activationTimeout, int& deactivationTimeout)
{
- ServerEntryPtr entry;
- {
- Lock sync(*this);
- map<string, ServerEntryPtr>::const_iterator p = _servers.find(name);
- if(p != _servers.end())
- {
- entry = p->second;
- }
- }
- if(!entry)
- {
- ServerNotExistException ex;
- ex.name = name;
- throw ex;
- }
- return entry->getProxy(activationTimeout, deactivationTimeout);
+ return _serverCache.get(name)->getProxy(activationTimeout, deactivationTimeout);
}
Ice::StringSeq
Database::getAllServers(const string& expression)
{
- Lock sync(*this);
- return getMatchingKeys<map<string, ServerEntryPtr> >(_servers, expression);
+ return _serverCache.getAll(expression);
}
Ice::StringSeq
Database::getAllNodeServers(const string& node)
{
- Lock sync(*this);
- map<string, set<string> >::const_iterator p = _serversByNode.find(node);
- if(p == _serversByNode.end())
- {
- return Ice::StringSeq();
- }
- return Ice::StringSeq(p->second.begin(), p->second.end());
+ return _nodeCache.get(node)->getServers();
}
void
@@ -875,7 +857,7 @@ Database::getAdapterDirectProxy(const string& id)
}
AdapterPrx
-Database::getAdapter(const string& id)
+Database::getAdapter(const string& id, const string& serverId)
{
//
// TODO: Perhaps we should also cache the adapter proxies here
@@ -887,18 +869,12 @@ Database::getAdapter(const string& id)
// server, if that's the case we get the adapter proxy from the
// server.
//
- ServerEntryPtr entry;
+ try
{
- Lock sync(*this);
- map<string, ServerEntryPtr>::const_iterator p = _serversByAdapterId.find(id);
- if(p != _serversByAdapterId.end())
- {
- entry = p->second;
- }
+ return _adapterCache.get(id)->getProxy(serverId);
}
- if(entry)
+ catch(const AdapterNotExistException&)
{
- return entry->getAdapter(id);
}
//
@@ -925,8 +901,7 @@ Database::getAllAdapters(const string& expression)
{
Lock sync(*this);
vector<string> result;
- vector<string> ids;
- ids = getMatchingKeys<map<string, ServerEntryPtr> >(_serversByAdapterId, expression);
+ vector<string> ids = _adapterCache.getAll(expression);
result.swap(ids);
ids = getMatchingKeys<StringObjectProxyDict>(_adapters, expression);
result.insert(result.end(), ids.begin(), ids.end());
@@ -934,18 +909,18 @@ Database::getAllAdapters(const string& expression)
}
void
-Database::addObjectDescriptor(const ObjectDescriptor& object)
+Database::addObject(const ObjectInfo& info)
{
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectDescDict objects(connection, _objectDbName);
- const Ice::Identity id = object.proxy->ice_getIdentity();
+ IdentityObjectInfoDict objects(connection, _objectDbName);
+ const Ice::Identity id = info.proxy->ice_getIdentity();
if(objects.find(id) != objects.end())
{
ObjectExistsException ex;
ex.id = id;
throw ex;
}
- objects.put(make_pair(id, object));
+ objects.put(make_pair(id, info));
if(_traceLevels->object > 0)
{
@@ -955,10 +930,10 @@ Database::addObjectDescriptor(const ObjectDescriptor& object)
}
void
-Database::removeObjectDescriptor(const Ice::Identity& id)
+Database::removeObject(const Ice::Identity& id)
{
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectDescDict objects(connection, _objectDbName);
+ IdentityObjectInfoDict objects(connection, _objectDbName);
if(objects.find(id) == objects.end())
{
ObjectNotExistException ex;
@@ -975,21 +950,21 @@ Database::removeObjectDescriptor(const Ice::Identity& id)
}
void
-Database::updateObjectDescriptor(const Ice::ObjectPrx& proxy)
+Database::updateObject(const Ice::ObjectPrx& proxy)
{
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectDescDict objects(connection, _objectDbName);
+ IdentityObjectInfoDict objects(connection, _objectDbName);
const Ice::Identity id = proxy->ice_getIdentity();
- IdentityObjectDescDict::iterator p = objects.find(id);
+ IdentityObjectInfoDict::iterator p = objects.find(id);
if(p == objects.end())
{
ObjectNotExistException ex;
ex.id = id;
throw ex;
}
- ObjectDescriptor desc = p->second;
- desc.proxy = proxy;
- p.set(desc);
+ ObjectInfo info = p->second;
+ info.proxy = proxy;
+ p.set(info);
if(_traceLevels->object > 0)
{
@@ -998,19 +973,30 @@ Database::updateObjectDescriptor(const Ice::ObjectPrx& proxy)
}
}
-ObjectDescriptor
-Database::getObjectDescriptor(const Ice::Identity& id)
+Ice::ObjectPrx
+Database::getObjectProxy(const Ice::Identity& id, string& adapterId)
{
+ try
+ {
+ ObjectEntryPtr object = _objectCache.get(id);
+ adapterId = object->getAdapterId();
+ return object->getProxy();
+ }
+ catch(ObjectNotExistException&)
+ {
+ }
+
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectDescDict objects(connection, _objectDbName);
- IdentityObjectDescDict::const_iterator p = objects.find(id);
+ IdentityObjectInfoDict objects(connection, _objectDbName);
+ IdentityObjectInfoDict::const_iterator p = objects.find(id);
if(p == objects.end())
{
ObjectNotExistException ex;
ex.id = id;
throw ex;
}
- return p->second;
+ adapterId = "";
+ return p->second.proxy;
}
Ice::ObjectPrx
@@ -1023,10 +1009,11 @@ Database::getObjectByType(const string& type)
Ice::ObjectProxySeq
Database::getObjectsWithType(const string& type)
{
+ Ice::ObjectProxySeq proxies = _objectCache.getObjectsWithType(type);
+
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectDescDict objects(connection, _objectDbName);
- Ice::ObjectProxySeq proxies;
- for(IdentityObjectDescDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p)
+ IdentityObjectInfoDict objects(connection, _objectDbName);
+ for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p)
{
proxies.push_back(p->second.proxy);
}
@@ -1037,27 +1024,63 @@ Database::getObjectsWithType(const string& type)
return proxies;
}
-ObjectDescriptorSeq
-Database::getAllObjectDescriptors(const string& expression)
+ObjectInfo
+Database::getObjectInfo(const Ice::Identity& id)
+{
+ try
+ {
+ ObjectEntryPtr object = _objectCache.get(id);
+ return object->getObjectInfo();
+ }
+ catch(ObjectNotExistException&)
+ {
+ }
+
+ Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ IdentityObjectInfoDict objects(connection, _objectDbName);
+ IdentityObjectInfoDict::const_iterator p = objects.find(id);
+ if(p == objects.end())
+ {
+ ObjectNotExistException ex;
+ ex.id = id;
+ throw ex;
+ }
+ return p->second;
+}
+
+ObjectInfoSeq
+Database::getAllObjectInfos(const string& expression)
{
+ ObjectInfoSeq infos = _objectCache.getAll(expression);
+
Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectDescDict objects(connection, _objectDbName);
- ObjectDescriptorSeq descriptors;
- for(IdentityObjectDescDict::const_iterator p = objects.begin(); p != objects.end(); ++p)
+ IdentityObjectInfoDict objects(connection, _objectDbName);
+ for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- if(expression.empty() ||
- IceUtil::match(Ice::identityToString(p->second.proxy->ice_getIdentity()), expression, true))
+ if(expression.empty() || IceUtil::match(Ice::identityToString(p->first), expression, true))
{
- descriptors.push_back(p->second);
+ infos.push_back(p->second);
}
}
- return descriptors;
+ return infos;
+}
+
+const TraceLevelsPtr&
+Database::getTraceLevels() const
+{
+ return _traceLevels;
+}
+
+int
+Database::getNodeSessionTimeout() const
+{
+ return _nodeSessionTimeout;
}
void
Database::checkServerForAddition(const string& name)
{
- if(_servers.find(name) != _servers.end())
+ if(_serverCache.has(name))
{
ServerExistsException ex;
ex.name = name;
@@ -1068,7 +1091,7 @@ Database::checkServerForAddition(const string& name)
void
Database::checkAdapterForAddition(const string& id)
{
- if(_serversByAdapterId.find(id) != _serversByAdapterId.end() || _adapters.find(id) != _adapters.end())
+ if(_adapterCache.has(id) || _adapters.find(id) != _adapters.end())
{
AdapterExistsException ex;
ex.id = id;
@@ -1079,7 +1102,7 @@ Database::checkAdapterForAddition(const string& id)
void
Database::checkObjectForAddition(const Ice::Identity& objectId)
{
- if(_objects.find(objectId) != _objects.end())
+ if(_objectCache.has(objectId) || _objects.find(objectId) != _objects.end())
{
ObjectExistsException ex;
ex.id = objectId;
@@ -1147,440 +1170,20 @@ Database::removeServers(const string& application, const ServerInstanceDescripto
}
}
-Database::ServerEntryPtr
+ServerEntryPtr
Database::addServer(const string& application, const ServerInstanceDescriptor& instance)
{
- ServerEntryPtr entry;
- map<string, ServerEntryPtr>::const_iterator q = _servers.find(instance.descriptor->name);
- if(q != _servers.end())
- {
- entry = q->second;
- entry->update(instance);
- }
- else
- {
- entry = new ServerEntry(*this, instance);
- _servers.insert(make_pair(instance.descriptor->name, entry));
- }
-
- map<string, set<string> >::iterator p = _serversByNode.find(instance.node);
- if(p == _serversByNode.end())
- {
- p = _serversByNode.insert(make_pair(instance.node, set<string>())).first;
- }
- p->second.insert(p->second.begin(), instance.descriptor->name);
-
- _applicationsByServerName.insert(make_pair(instance.descriptor->name, application));
-
- forEachComponent(AddComponent(*this, entry))(instance);
- return entry;
+ return _serverCache.add(instance.descriptor->name, instance, application);
}
-Database::ServerEntryPtr
+ServerEntryPtr
Database::updateServer(const ServerInstanceDescriptor& instance)
{
- //
- // Get the server entry and the current descriptor then check
- // if the server descriptor really changed.
- //
- ServerEntryPtr entry;
- map<string, ServerEntryPtr>::const_iterator q = _servers.find(instance.descriptor->name);
- assert(q != _servers.end());
-
- entry = q->second;
- ServerInstanceDescriptor old = entry->getDescriptor();
-
- //
- // If the node changed, move the server from the old node to the
- // new one.
- //
- if(old.node != instance.node)
- {
- map<string, set<string> >::iterator p = _serversByNode.find(old.node);
- assert(p != _serversByNode.end());
- p->second.erase(instance.descriptor->name);
- if(p->second.empty())
- {
- _serversByNode.erase(p);
- }
- p = _serversByNode.find(instance.node);
- if(p == _serversByNode.end())
- {
- p = _serversByNode.insert(make_pair(instance.node, set<string>())).first;
- }
- p->second.insert(p->second.begin(), instance.descriptor->name);
- }
-
- //
- // Remove the object adapters and objects from the old descriptor.
- //
- forEachComponent(objFunc(*this, &Database::removeComponent))(old);
-
- //
- // Update the server entry.
- //
- entry->update(instance);
-
- //
- // Add the new object adapters and objects.
- //
- forEachComponent(AddComponent(*this, entry))(instance);
- return entry;
+ return _serverCache.update(instance);
}
-Database::ServerEntryPtr
+ServerEntryPtr
Database::removeServer(const string& application, const ServerInstanceDescriptor& instance)
{
- ServerEntryPtr entry;
- map<string, ServerEntryPtr>::iterator q = _servers.find(instance.descriptor->name);
- assert(q != _servers.end());
-
- map<string, set<string> >::iterator p = _serversByNode.find(instance.node);
- assert(p != _serversByNode.end());
- p->second.erase(instance.descriptor->name);
- if(p->second.empty())
- {
- _serversByNode.erase(p);
- }
-
- entry = q->second;
- entry->destroy();
-
- _applicationsByServerName.erase(instance.descriptor->name);
-
- //
- // Remove the object adapters and objects.
- //
- forEachComponent(objFunc(*this, &Database::removeComponent))(instance);
- return entry;
-}
-
-void
-Database::clearServer(const std::string& name)
-{
- Lock sync(*this);
- map<string, ServerEntryPtr>::iterator p = _servers.find(name);
- if(p != _servers.end())
- {
- if(p->second->canRemove())
- {
- _servers.erase(p);
- }
- }
-}
-
-void
-Database::addComponent(const ServerEntryPtr& entry, const ComponentDescriptorPtr& component)
-{
- for(AdapterDescriptorSeq::const_iterator q = component->adapters.begin() ; q != component->adapters.end(); ++q)
- {
- _serversByAdapterId.insert(make_pair(q->id, entry));
- for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r)
- {
- _objects.put(make_pair(r->proxy->ice_getIdentity(), *r));
- }
- }
-}
-
-void
-Database::removeComponent(const ComponentDescriptorPtr& component)
-{
- for(AdapterDescriptorSeq::const_iterator q = component->adapters.begin() ; q != component->adapters.end(); ++q)
- {
- _serversByAdapterId.erase(q->id);
- for(ObjectDescriptorSeq::const_iterator r = q->objects.begin(); r != q->objects.end(); ++r)
- {
- _objects.erase(r->proxy->ice_getIdentity());
- }
- }
-}
-
-Database::ServerEntry::ServerEntry(Database& database, const ServerInstanceDescriptor& descriptor) :
- _database(database),
- _synchronizing(false)
-{
- _load.reset(new ServerInstanceDescriptor());
- *_load = descriptor;
-}
-
-void
-Database::ServerEntry::sync()
-{
- map<string, AdapterPrx> adapters;
- int at, dt;
- try
- {
- sync(adapters, at, dt);
- }
- catch(const NodeUnreachableException&)
- {
- }
-}
-
-bool
-Database::ServerEntry::needsSync() const
-{
- Lock sync(*this);
- return _failed;
-}
-
-void
-Database::ServerEntry::update(const ServerInstanceDescriptor& instance)
-{
- Lock sync(*this);
-
- auto_ptr<ServerInstanceDescriptor> descriptor(new ServerInstanceDescriptor());
- *descriptor = instance;
-
- if(_loaded.get() && descriptor->node != _loaded->node)
- {
- assert(!_destroy.get());
- _destroy = _loaded;
- }
- else if(_load.get() && descriptor->node != _load->node)
- {
- assert(!_destroy.get());
- _destroy = _load;
- }
-
- _load = descriptor;
- _loaded.reset(0);
- _proxy = 0;
- _adapters.clear();
-}
-
-void
-Database::ServerEntry::destroy()
-{
- Lock sync(*this);
- if(_loaded.get())
- {
- assert(!_destroy.get());
- _destroy = _loaded;
- }
- else if(_load.get())
- {
- assert(!_destroy.get());
- _destroy = _load;
- }
-
- _load.reset(0);
- _loaded.reset(0);
- _proxy = 0;
- _adapters.clear();
-}
-
-ServerInstanceDescriptor
-Database::ServerEntry::getDescriptor()
-{
- Lock sync(*this);
- if(_proxy)
- {
- return *_loaded.get();
- }
- else
- {
- return *_load.get();
- }
-}
-
-ServerPrx
-Database::ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout)
-{
- ServerPrx proxy;
- {
- Lock sync(*this);
- if(_proxy) // Synced
- {
- proxy = _proxy;
- activationTimeout = _activationTimeout;
- deactivationTimeout = _deactivationTimeout;
- }
- }
-
- if(proxy)
- {
- try
- {
- proxy->ice_ping();
- return proxy;
- }
- catch(const Ice::LocalException& ex)
- {
- }
- }
-
- StringAdapterPrxDict adapters;
- return sync(adapters, activationTimeout, deactivationTimeout);
-}
-
-AdapterPrx
-Database::ServerEntry::getAdapter(const string& id)
-{
- AdapterPrx proxy;
- {
- Lock sync(*this);
- if(_proxy) // Synced
- {
- proxy = _adapters[id];
- }
- }
-
- if(proxy)
- {
- try
- {
- proxy->ice_ping();
- return proxy;
- }
- catch(const Ice::LocalException& ex)
- {
- }
- }
-
- StringAdapterPrxDict adapters;
- int activationTimeout, deactivationTimeout;
- sync(adapters, activationTimeout, deactivationTimeout);
- return adapters[id];
-}
-
-ServerPrx
-Database::ServerEntry::sync(map<string, AdapterPrx>& adapters, int& activationTimeout, int& deactivationTimeout)
-{
- ServerDescriptorPtr load;
- string loadNode;
- ServerDescriptorPtr destroy;
- string destroyNode;
- {
- Lock sync(*this);
- while(_synchronizing)
- {
- wait();
- }
-
- if(!_load.get() && !_destroy.get())
- {
- _load = _loaded; // Re-load the current server.
- }
-
- _synchronizing = true;
- _failed = false;
- if(_load.get())
- {
- load = _load->descriptor;
- loadNode = _load->node;
- }
- if(_destroy.get())
- {
- destroy = _destroy->descriptor;
- destroyNode = _destroy->node;
- }
- }
-
- ServerPrx proxy;
- try
- {
- if(destroy)
- {
- try
- {
- _database.getNode(destroyNode)->destroyServer(destroy->name);
- }
- catch(const NodeNotExistException& ex)
- {
- if(!load)
- {
- throw NodeUnreachableException();
- }
- }
- catch(const Ice::LocalException& ex)
- {
- if(!load)
- {
- throw NodeUnreachableException();
- }
- }
- }
-
- if(load)
- {
- try
- {
- proxy = _database.getNode(loadNode)->loadServer(load, adapters, activationTimeout, deactivationTimeout);
- proxy = ServerPrx::uncheckedCast(proxy->ice_collocationOptimization(false));
- }
- catch(const NodeNotExistException& ex)
- {
- throw NodeUnreachableException();
- }
- catch(const DeploymentException& ex)
- {
- Ice::Warning out(_database._traceLevels->logger);
- out << "failed to load server on node `" << loadNode << "':\n" << ex;
- throw NodeUnreachableException();
- }
- catch(const Ice::LocalException& ex)
- {
- Ice::Warning out(_database._traceLevels->logger);
- out << "unexpected exception while loading on node `" << loadNode << "':\n" << ex;
- throw NodeUnreachableException();
- }
- }
- }
- catch(const NodeUnreachableException& ex)
- {
- {
- Lock sync(*this);
- _synchronizing = false;
- _destroy.reset(0);
- _failed = true;
- notifyAll();
- }
- if(!load && destroy)
- {
- _database.clearServer(destroy->name);
- }
- throw;
- }
-
- {
- Lock sync(*this);
- _synchronizing = false;
- _loaded = _load;
- _load.reset(0);
- _destroy.reset(0);
-
- //
- // Set timeout on server and adapter proxies. Most of the
- // calls on the proxies shouldn't block for longer than the
- // node session timeout. Calls that might block for a longer
- // time should set the correct timeout before invoking on the
- // proxy (e.g.: server start/stop, adapter activate).
- //
- int timeout = _database._nodeSessionTimeout * 1000; // sec to ms
- _proxy = proxy ? ServerPrx::uncheckedCast(proxy->ice_timeout(timeout)) : ServerPrx();
- _adapters.clear();
- for(StringAdapterPrxDict::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
- {
- AdapterPrx adapter = AdapterPrx::uncheckedCast(p->second->ice_timeout(timeout));
- _adapters.insert(make_pair(p->first, adapter));
- }
- activationTimeout += _database._nodeSessionTimeout;
- deactivationTimeout += _database._nodeSessionTimeout;
- _activationTimeout = activationTimeout;
- _deactivationTimeout = deactivationTimeout;
- notifyAll();
- }
- if(!load && destroy)
- {
- _database.clearServer(destroy->name);
- }
- return proxy;
-}
-
-bool
-Database::ServerEntry::canRemove()
-{
- Lock sync(*this);
- return !_loaded.get() && !_load.get() && !_destroy.get();
+ return _serverCache.remove(instance.descriptor->name);
}