summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp507
1 files changed, 218 insertions, 289 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 9e7debbaaac..9b33ce7917a 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -140,9 +140,7 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
_objects(_connection, _objectDbName),
_internalObjects(_connection, _internalObjectDbName),
_lock(0),
- _applicationSerial(0),
- _adapterSerial(0),
- _objectSerial(0)
+ _applicationSerial(0)
{
ServerEntrySeq entries;
for(StringApplicationInfoDict::const_iterator p = _applications.begin(); p != _applications.end(); ++p)
@@ -272,95 +270,82 @@ Database::unlock(AdminSessionI* session)
void
Database::syncApplications(const ApplicationInfoSeq& applications)
{
- int serial;
+ Lock sync(*this);
+
+ Freeze::TransactionHolder txHolder(_connection);
+ ServerEntrySeq entries;
+ set<string> names;
+ for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p)
{
- Lock sync(*this);
-
- Freeze::TransactionHolder txHolder(_connection);
- ServerEntrySeq entries;
- set<string> names;
- for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p)
+ try
{
- try
+ StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name);
+ if(s != _applications.end())
{
- StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name);
- if(s != _applications.end())
- {
- ApplicationHelper previous(_communicator, s->second.descriptor);
- ApplicationHelper helper(_communicator, p->descriptor);
- reload(previous, helper, entries, p->uuid, p->revision);
- }
- else
- {
- load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
- }
+ ApplicationHelper previous(_communicator, s->second.descriptor);
+ ApplicationHelper helper(_communicator, p->descriptor);
+ reload(previous, helper, entries, p->uuid, p->revision);
}
- catch(const DeploymentException& ex)
+ else
{
- Ice::Warning warn(_traceLevels->logger);
- warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
+ load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
}
- _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
- names.insert(p->descriptor.name);
}
- StringApplicationInfoDict::iterator s = _applications.begin();
- while(s != _applications.end())
+ catch(const DeploymentException& ex)
{
- if(names.find(s->first) == names.end())
- {
- unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
- _applications.erase(s++);
- }
- else
- {
- ++s;
- }
+ Ice::Warning warn(_traceLevels->logger);
+ warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
}
- serial = ++_applicationSerial;
- txHolder.commit();
+ _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
+ names.insert(p->descriptor.name);
}
- _applicationObserverTopic->applicationInit(serial, applications);
+ StringApplicationInfoDict::iterator s = _applications.begin();
+ while(s != _applications.end())
+ {
+ if(names.find(s->first) == names.end())
+ {
+ unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
+ _applications.erase(s++);
+ }
+ else
+ {
+ ++s;
+ }
+ }
+ ++_applicationSerial;
+
+ _applicationObserverTopic->applicationInit(_applicationSerial, applications);
+
+ txHolder.commit();
}
void
Database::syncAdapters(const AdapterInfoSeq& adapters)
{
- int serial;
+ Lock sync(*this);
+ Freeze::TransactionHolder txHolder(_connection);
+ _adapters.clear();
+ for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
{
- Lock sync(*this);
-
- Freeze::TransactionHolder txHolder(_connection);
- _adapters.clear();
- for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
- {
- _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
- }
- serial = ++_adapterSerial;
- txHolder.commit();
- }
-
- _adapterObserverTopic->adapterInit(serial, adapters);
+ _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
+ }
+ _adapterObserverTopic->adapterInit(adapters);
+ txHolder.commit();
}
void
Database::syncObjects(const ObjectInfoSeq& objects)
{
- int serial;
+ Lock sync(*this);
+ Freeze::TransactionHolder txHolder(_connection);
+ _objects.clear();
+ for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
{
- Lock sync(*this);
-
- Freeze::TransactionHolder txHolder(_connection);
- _objects.clear();
- for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
- {
- _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
- }
- serial = ++_objectSerial;
- txHolder.commit();
+ _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
}
-
- _objectObserverTopic->objectInit(serial, objects);
+ _objectObserverTopic->objectInit(objects);
+ txHolder.commit();
}
Ice::ObjectPrx
@@ -418,25 +403,20 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
}
}
- int serial;
{
Lock sync(*this);
- serial = ++_applicationSerial;
+ ++_applicationSerial;
_applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
- }
-
- _applicationObserverTopic->applicationAdded(serial, info);
-
- {
- Lock sync(*this);
finishUpdating(info.descriptor.name);
- notifyAll();
- }
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "added application `" << info.descriptor.name << "'";
+ _applicationObserverTopic->applicationAdded(_applicationSerial, info);
+
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "added application `" << info.descriptor.name << "'";
+ }
+ notifyAll();
}
}
@@ -570,7 +550,6 @@ void
Database::removeApplication(const string& name, AdminSessionI* session)
{
ServerEntrySeq entries;
- int serial;
{
Lock sync(*this);
checkSessionLock(session);
@@ -601,16 +580,15 @@ Database::removeApplication(const string& name, AdminSessionI* session)
}
_applications.erase(p);
+ ++_applicationSerial;
- serial = ++_applicationSerial;
- }
-
- _applicationObserverTopic->applicationRemoved(serial, name);
+ _applicationObserverTopic->applicationRemoved(_applicationSerial, name);
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "removed application `" << name << "'";
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "removed application `" << name << "'";
+ }
}
if(_master)
@@ -799,45 +777,40 @@ Database::getAllNodeServers(const string& node)
bool
Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy)
{
+ Lock sync(*this);
+ if(_adapterCache.has(adapterId))
+ {
+ return false;
+ }
+
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
AdapterInfo info;
- int serial;
bool updated = false;
+ if(proxy)
{
- Lock sync(*this);
- if(_adapterCache.has(adapterId))
+ if(p != _adapters.end())
{
- return false;
- }
-
- StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
- if(proxy)
- {
- if(p != _adapters.end())
- {
- info = p->second;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- p.set(info);
- updated = true;
- }
- else
- {
- info.id = adapterId;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
- }
+ info = p->second;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ p.set(info);
+ updated = true;
}
else
{
- if(p == _adapters.end())
- {
- return true;
- }
- _adapters.erase(p);
+ info.id = adapterId;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
+ }
+ }
+ else
+ {
+ if(p == _adapters.end())
+ {
+ return true;
}
-
- serial = ++_adapterSerial;
+ _adapters.erase(p);
}
if(_traceLevels->adapter > 0)
@@ -849,22 +822,23 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
out << " with replica group `" << replicaGroupId << "'";
}
}
-
+
if(proxy)
{
if(updated)
{
- _adapterObserverTopic->adapterUpdated(serial, info);
+ _adapterObserverTopic->adapterUpdated(info);
}
else
{
- _adapterObserverTopic->adapterAdded(serial, info);
+ _adapterObserverTopic->adapterAdded(info);
}
}
else
{
- _adapterObserverTopic->adapterRemoved(serial, adapterId);
+ _adapterObserverTopic->adapterRemoved(adapterId);
}
+
return true;
}
@@ -884,57 +858,42 @@ Database::getAdapterDirectProxy(const string& adapterId)
void
Database::removeAdapter(const string& adapterId)
{
- AdapterInfoSeq infos;
- int serial;
+ Lock sync(*this);
+ if(_adapterCache.has(adapterId))
{
- Lock sync(*this);
- if(_adapterCache.has(adapterId))
- {
- AdapterEntryPtr adpt = _adapterCache.get(adapterId);
- DeploymentException ex;
- ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
- ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
- throw ex;
- }
-
- Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator
+ AdapterEntryPtr adpt = _adapterCache.get(adapterId);
+ DeploymentException ex;
+ ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
+ ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
+ throw ex;
+ }
+
+ Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator
- StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
- if(p != _adapters.end())
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ AdapterInfoSeq infos;
+ if(p != _adapters.end())
+ {
+ _adapters.erase(p);
+ }
+ else
+ {
+ p = _adapters.findByReplicaGroupId(adapterId, true);
+ if(p == _adapters.end())
{
- _adapters.erase(p);
+ throw AdapterNotExistException(adapterId);
}
- else
- {
- p = _adapters.findByReplicaGroupId(adapterId, true);
- if(p == _adapters.end())
- {
- throw AdapterNotExistException(adapterId);
- }
- while(p != _adapters.end())
- {
- AdapterInfo info = p->second;
- info.replicaGroupId = "";
- infos.push_back(info);
- _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
- ++p;
- }
- }
-
- txHolder.commit();
-
- if(infos.empty())
+ while(p != _adapters.end())
{
- serial = ++_adapterSerial;
- }
- else
- {
- serial = _adapterSerial + 1;
- _adapterSerial += static_cast<int>(static_cast<int>(infos.size()));
+ AdapterInfo info = p->second;
+ info.replicaGroupId = "";
+ infos.push_back(info);
+ _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
+ ++p;
}
}
-
+
if(_traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
@@ -943,17 +902,17 @@ Database::removeAdapter(const string& adapterId)
if(infos.empty())
{
- _adapterObserverTopic->adapterRemoved(serial, adapterId);
+ _adapterObserverTopic->adapterRemoved(adapterId);
}
else
{
- int i = 0;
- AdapterInfoSeq::const_iterator p;
- for(p = infos.begin(); p != infos.end(); ++p, ++i)
+ for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
{
- _adapterObserverTopic->adapterUpdated(serial + i, *p);
+ _adapterObserverTopic->adapterUpdated(*p);
}
}
+
+ txHolder.commit();
}
AdapterPrx
@@ -1109,41 +1068,37 @@ Database::getAllAdapters(const string& expression)
void
Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase)
{
+ Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
- int serial;
+
+ if(_objectCache.has(id))
+ {
+ throw ObjectExistsException(id);
+ }
+
bool update = false;
+ if(_objects.find(id) != _objects.end())
{
- Lock sync(*this);
- if(_objectCache.has(id))
+ if(!replaceIfExistsInDatabase)
{
throw ObjectExistsException(id);
}
-
- if(_objects.find(id) != _objects.end())
+ else
{
- if(!replaceIfExistsInDatabase)
- {
- throw ObjectExistsException(id);
- }
- else
- {
- update = true;
- }
+ update = true;
}
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
-
- serial = ++_objectSerial;
}
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
if(!update)
{
- _objectObserverTopic->objectAdded(serial, info);
+ _objectObserverTopic->objectAdded(info);
}
else
{
- _objectObserverTopic->objectUpdated(serial, info);
+ _objectObserverTopic->objectUpdated(info);
}
-
+
if(_traceLevels->object > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
@@ -1161,32 +1116,28 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase)
void
Database::removeObject(const Ice::Identity& id)
{
- int serial;
+ Lock sync(*this);
+ if(_objectCache.has(id))
{
- Lock sync(*this);
- if(_objectCache.has(id))
- {
- DeploymentException ex;
- ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `";
- ex.reason += _objectCache.get(id)->getApplication();
- ex.reason += "'";
- throw ex;
- }
-
- IdentityObjectInfoDict::iterator p = _objects.find(id);
- if(p == _objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
- _objects.erase(p);
+ DeploymentException ex;
+ ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
- serial = ++_objectSerial;
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
+ _objects.erase(p);
- _objectObserverTopic->objectRemoved(serial, id);
+
+ _objectObserverTopic->objectRemoved(id);
if(_traceLevels->object > 0)
{
@@ -1198,38 +1149,34 @@ Database::removeObject(const Ice::Identity& id)
void
Database::updateObject(const Ice::ObjectPrx& proxy)
{
+ Lock sync(*this);
+
const Ice::Identity id = proxy->ice_getIdentity();
- int serial;
- ObjectInfo info;
+ if(_objectCache.has(id))
{
- Lock sync(*this);
- if(_objectCache.has(id))
- {
- DeploymentException ex;
- ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `";
- ex.reason += _objectCache.get(id)->getApplication();
- ex.reason += "'";
- throw ex;
- }
-
- IdentityObjectInfoDict::iterator p = _objects.find(id);
- if(p == _objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
-
- info = p->second;
- info.proxy = proxy;
- p.set(info);
-
- serial = ++_objectSerial;
+ DeploymentException ex;
+ ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
+
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
- _objectObserverTopic->objectUpdated(serial, info);
-
+ ObjectInfo info;
+ info = p->second;
+ info.proxy = proxy;
+ p.set(info);
+
+ _objectObserverTopic->objectUpdated(info);
+
if(_traceLevels->object > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
@@ -1240,40 +1187,28 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
int
Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects)
{
- int serial;
- vector<bool> updated;
+ Lock sync(*this);
+ Freeze::TransactionHolder txHolder(_connection);
+ for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
- {
- updated.push_back(_objects.find(p->proxy->ice_getIdentity()) != _objects.end());
- _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p));
- }
- serial = ++_objectSerial;
- txHolder.commit();
+ _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p));
}
-
- _objectObserverTopic->objectsAddedOrUpdated(serial, objects);
+ int serial = _objectObserverTopic->objectsAddedOrUpdated(objects);
+ txHolder.commit();
return serial;
}
void
Database::removeObjectsInDatabase(const ObjectInfoSeq& objects)
{
- int serial;
+ Lock sync(*this);
+ Freeze::TransactionHolder txHolder(_connection);
+ for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
- {
- _objects.erase(p->proxy->ice_getIdentity());
- }
- serial = ++_objectSerial;
- txHolder.commit();
+ _objects.erase(p->proxy->ice_getIdentity());
}
-
- _objectObserverTopic->objectsRemoved(serial, objects);
+ _objectObserverTopic->objectsRemoved(objects);
+ txHolder.commit();
}
void
@@ -1760,33 +1695,27 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
//
// Save the application descriptor.
//
- int serial;
- {
- Lock sync(*this);
-
- ApplicationInfo info = oldApp;
- info.updateTime = update.updateTime;
- info.updateUser = update.updateUser;
- info.revision = update.revision;
- info.descriptor = newDesc;
-
- _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
- serial = ++_applicationSerial;
- }
-
- _applicationObserverTopic->applicationUpdated(serial, update);
-
- {
- Lock sync(*this);
- finishUpdating(update.descriptor.name);
- notifyAll();
- }
-
+ Lock sync(*this);
+
+ ApplicationInfo info = oldApp;
+ info.updateTime = update.updateTime;
+ info.updateUser = update.updateUser;
+ info.revision = update.revision;
+ info.descriptor = newDesc;
+
+ _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
+ ++_applicationSerial;
+ finishUpdating(update.descriptor.name);
+
if(_traceLevels->application > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
out << "updated application `" << update.descriptor.name << "'";
}
+
+ _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
+
+ notifyAll();
}
void