summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-04-05 12:54:15 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-04-05 12:54:15 +0000
commit872fd9e5dfb743a648c64bf774f6c9a76a45b651 (patch)
treec071c5dc5bd1177c0913e82cb8a3233f0da50ea6 /cpp/src/IceGrid/Database.cpp
parentadding timeout test (diff)
downloadice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.bz2
ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.xz
ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.zip
- Added support for observing adapters and objects.
- Lots of cleanup in the IceGrid registry initilization method.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp330
1 files changed, 189 insertions, 141 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 8a62f3c0ef0..65f963b25f8 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -182,6 +182,8 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb
{
int serial;
ApplicationDescriptorSeq applications;
+ AdapterInfoSeq adapters;
+ ObjectInfoSeq objects;
{
Lock sync(*this);
_registryObserver = registryObserver;
@@ -192,12 +194,26 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb
{
applications.push_back(p->second);
}
+
+ for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ adapters.push_back(p->second);
+ if(adapters.back().id.empty())
+ {
+ adapters.back().id = p->first;
+ }
+ }
+
+ for(IdentityObjectInfoDict::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
+ {
+ objects.push_back(p->second);
+ }
}
//
// Notify the observers.
//
- _registryObserver->init(serial, applications);
+ _registryObserver->init(serial, applications, adapters, objects);
}
void
@@ -563,9 +579,9 @@ Database::getAllNodeServers(const string& node)
bool
Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy)
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringAdapterInfoDict adapters(connection, _adapterDbName);
- if(proxy)
+ AdapterInfo info;
+ int serial;
+ bool updated = false;
{
Lock sync(*this);
if(_adapterCache.has(adapterId))
@@ -573,65 +589,64 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
return false;
}
- StringAdapterInfoDict::iterator p = adapters.find(adapterId);
- if(p != adapters.end())
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ if(proxy)
{
- AdapterInfo info = p->second;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- p.set(info);
- if(_traceLevels->adapter > 0)
+ if(p != _adapters.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "updated adapter `" << adapterId << "'";
- if(!replicaGroupId.empty())
- {
- out << " with replica group `" << replicaGroupId << "'";
- }
+ info = p->second;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ p.set(info);
+ updated = true;
}
+ else
+ {
+ info.id = adapterId;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
+ }
}
else
{
- AdapterInfo info;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
- if(_traceLevels->adapter > 0)
+ if(p == _adapters.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "added adapter `" << adapterId << "'";
- if(!replicaGroupId.empty())
- {
- out << " with replica group `" << replicaGroupId << "'";
- }
+ return true;
}
- }
- return true;
+ _adapters.erase(p);
+ }
+
+ serial = ++_serial;
}
- else
+
+
+ if(_traceLevels->adapter > 0)
{
- Lock sync(*this);
- if(_adapterCache.has(adapterId))
+ Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
+ out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'";
+ if(!replicaGroupId.empty())
{
- return false;
- }
+ out << " with replica group `" << replicaGroupId << "'";
+ }
+ }
- StringAdapterInfoDict::iterator p = adapters.find(adapterId);
- if(p == adapters.end())
+ if(proxy)
+ {
+ if(updated)
{
- return true;
+ _registryObserver->adapterUpdated(serial, info);
}
- AdapterInfo info = p->second;
- adapters.erase(p);
-
- if(_traceLevels->adapter > 0)
+ else
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed adapter `" << adapterId << "'";
+ _registryObserver->adapterAdded(serial, info);
}
-
- return true;
}
+ else
+ {
+ _registryObserver->adapterRemoved(serial, adapterId);
+ }
+ return true;
}
Ice::ObjectPrx
@@ -650,62 +665,73 @@ Database::getAdapterDirectProxy(const string& adapterId)
void
Database::removeAdapter(const string& adapterId)
{
- try
- {
- AdapterEntryPtr adpt = _adapterCache.get(adapterId);
- DeploymentException ex;
- ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
- ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
- throw ex;
- }
- catch(const AdapterNotExistException&)
+ AdapterInfoSeq infos;
+ int serial;
{
- }
+ Lock sync(*this);
+ if(_adapterCache.has(adapterId))
+ {
+ AdapterEntryPtr adpt = _adapterCache.get(adapterId);
+ DeploymentException ex;
+ ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
+ ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
+ throw ex;
+ }
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ Freeze::TransactionHolder txHolder(_connection);
- {
- StringAdapterInfoDict adapters(connection, _adapterDbName);
- StringAdapterInfoDict::iterator p = adapters.find(adapterId);
- if(p != adapters.end())
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ if(p != _adapters.end())
{
- AdapterInfo info = p->second;
- adapters.erase(p);
-
- if(_traceLevels->adapter > 0)
+ _adapters.erase(p);
+ }
+ else
+ {
+ p = _adapters.findByReplicaGroupId(adapterId, true);
+ if(p == _adapters.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed adapter `" << adapterId << "'";
+ throw AdapterNotExistException(adapterId);
+ }
+
+ while(p != _adapters.end())
+ {
+ AdapterInfo info = p->second;
+ info.replicaGroupId = "";
+ infos.push_back(info);
+ _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
+ ++p;
}
- return;
}
- }
- {
- Freeze::TransactionHolder txHolder(connection);
- StringAdapterInfoDict adapters(connection, _adapterDbName);
+ txHolder.commit();
- StringAdapterInfoDict::iterator p = adapters.findByReplicaGroupId(adapterId, true);
- if(p == adapters.end())
+ if(infos.empty())
{
- throw AdapterNotExistException(adapterId);
+ serial = ++_serial;
}
-
- while(p != adapters.end())
+ else
{
- AdapterInfo info = p->second;
- info.replicaGroupId = "";
- adapters.put(StringAdapterInfoDict::value_type(p->first, info));
- ++p;
+ serial = _serial;
+ _serial += infos.size();
}
-
- txHolder.commit();
}
if(_traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed replica group `" << adapterId << "'";
+ out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'";
+ }
+
+ if(infos.empty())
+ {
+ _registryObserver->adapterRemoved(serial, adapterId);
+ }
+ else
+ {
+ for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ _registryObserver->adapterUpdated(++serial, *p);
+ }
}
}
@@ -744,7 +770,7 @@ Database::getAdapters(const string& id, bool allRegistered, int& endpointCount)
Ice::Identity identity;
identity.category = "IceGridAdapter";
identity.name = id;
- Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity)->ice_collocationOptimization(false);
+ Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity);
adpts.push_back(make_pair(id, AdapterPrx::uncheckedCast(adpt)));
endpointCount = 1;
return adpts;
@@ -809,21 +835,28 @@ Database::getAllAdapters(const string& expression)
void
Database::addObject(const ObjectInfo& info)
{
- Lock sync(*this);
-
+ int serial;
const Ice::Identity id = info.proxy->ice_getIdentity();
- if(_objectCache.has(id))
{
- throw ObjectExistsException(id);
- }
-
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- if(objects.find(id) != objects.end())
- {
- throw ObjectExistsException(id);
+ Lock sync(*this);
+ if(_objectCache.has(id))
+ {
+ throw ObjectExistsException(id);
+ }
+
+ if(_objects.find(id) != _objects.end())
+ {
+ throw ObjectExistsException(id);
+ }
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ serial = ++_serial;
}
- objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ //
+ // Notify the observers.
+ //
+ _registryObserver->objectAdded(serial, info);
if(_traceLevels->object > 0)
{
@@ -835,33 +868,40 @@ Database::addObject(const ObjectInfo& info)
void
Database::removeObject(const Ice::Identity& id)
{
- try
- {
- ObjectEntryPtr obj = _objectCache.get(id);
- DeploymentException ex;
- ex.reason = "removing object `" + Ice::identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `" + obj->getApplication() + "'";
- throw ex;
- }
- catch(const ObjectNotRegisteredException&)
+ int serial;
{
- }
+ Lock sync(*this);
+ if(_objectCache.has(id))
+ {
+ DeploymentException ex;
+ ex.reason = "removing object `" + Ice::identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- if(objects.find(id) == objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
- if(objects.erase(id) > 0)
- {
- if(_traceLevels->object > 0)
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "removed object `" << Ice::identityToString(id) << "'";
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
+ _objects.erase(p);
+
+ serial = ++_serial;
+ }
+
+ //
+ // Notify the observers.
+ //
+ _registryObserver->objectRemoved(serial, id);
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "removed object `" << Ice::identityToString(id) << "'";
}
}
@@ -869,31 +909,39 @@ void
Database::updateObject(const Ice::ObjectPrx& proxy)
{
const Ice::Identity id = proxy->ice_getIdentity();
- try
- {
- ObjectEntryPtr obj = _objectCache.get(id);
- DeploymentException ex;
- ex.reason = "updating object `" + Ice::identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `" + obj->getApplication() + "'";
- throw ex;
- }
- catch(const ObjectNotRegisteredException&)
+ int serial;
+ ObjectInfo info;
{
- }
+ Lock sync(*this);
+ if(_objectCache.has(id))
+ {
+ DeploymentException ex;
+ ex.reason = "updating object `" + Ice::identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- IdentityObjectInfoDict::iterator p = objects.find(id);
- if(p == objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
+ }
+
+ info = p->second;
+ info.proxy = proxy;
+ p.set(info);
+
+ serial = ++_serial;
}
- ObjectInfo info = p->second;
- info.proxy = proxy;
- p.set(info);
+ //
+ // Notify the observers.
+ //
+ _registryObserver->objectUpdated(serial, info);
if(_traceLevels->object > 0)
{