summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp328
1 files changed, 196 insertions, 132 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 7ca95468137..7a3f32bd760 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -185,31 +185,29 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb
ApplicationDescriptorSeq applications;
AdapterInfoSeq adapters;
ObjectInfoSeq objects;
- {
- Lock sync(*this);
- _registryObserver = registryObserver;
- _nodeObserver = nodeObserver;
- serial = _serial;
-
- for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p)
- {
- applications.push_back(p->second);
- }
- for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q)
- {
- adapters.push_back(q->second);
- if(adapters.back().id.empty())
- {
- adapters.back().id = q->first;
- }
- }
-
- for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r)
+ _registryObserver = registryObserver;
+ _nodeObserver = nodeObserver;
+ serial = _serial;
+
+ for(StringApplicationDescriptorDict::const_iterator p = _descriptors.begin(); p != _descriptors.end(); ++p)
+ {
+ applications.push_back(p->second);
+ }
+
+ for(StringAdapterInfoDict::const_iterator q = _adapters.begin(); q != _adapters.end(); ++q)
+ {
+ adapters.push_back(q->second);
+ if(adapters.back().id.empty())
{
- objects.push_back(r->second);
+ adapters.back().id = q->first;
}
}
+
+ for(IdentityObjectInfoDict::const_iterator r = _objects.begin(); r != _objects.end(); ++r)
+ {
+ objects.push_back(r->second);
+ }
//
// Notify the observers.
@@ -264,43 +262,64 @@ void
Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc)
{
ServerEntrySeq entries;
- int serial;
- DistributionDescriptor appDistrib;
- map<string, DistributionDescriptorDict> nodeDistrib;
-
{
Lock sync(*this);
-
checkSessionLock(session);
- //
- // We first ensure that the application doesn't already exist
- // and that the application components don't already exist.
- //
+ while(_updating.find(desc.name) != _updating.end())
+ {
+ wait();
+ }
+
if(_descriptors.find(desc.name) != _descriptors.end())
{
throw DeploymentException("application `" + desc.name + "' already exists");
- }
+ }
ApplicationHelper helper(_communicator, desc);
-
- //
- // Ensure that the application servers, adapters and objects
- // aren't already registered.
- //
checkForAddition(helper);
-
- //
- // Register the application servers, adapters, objects.
- //
load(helper, entries);
-
- //
- // Save the application descriptor.
- //
- _descriptors.put(StringApplicationDescriptorDict::value_type(desc.name, desc));
-
- serial = ++_serial;
+ _updating.insert(desc.name);
+ }
+
+ //
+ // Synchronize the servers on the nodes. If a server couldn't be
+ // deployed we unload the application and throw.
+ //
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ }
+ catch(const DeploymentException& ex)
+ {
+ {
+ Lock sync(*this);
+ entries.clear();
+ unload(ApplicationHelper(_communicator, desc), entries);
+ _updating.erase(desc.name);
+ notifyAll();
+ }
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ }
+ catch(const DeploymentException& ex)
+ {
+ // TODO: warning?
+ }
+ throw ex;
+ }
+
+ //
+ // Save the application descriptor.
+ //
+ int serial;
+ {
+ Lock sync(*this);
+ _descriptors.put(StringApplicationDescriptorDict::value_type(desc.name, desc));
+ serial = ++_serial;
+ _updating.erase(desc.name);
+ notifyAll();
}
//
@@ -313,22 +332,23 @@ Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDesc
Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
out << "added application `" << desc.name << "'";
}
-
- //
- // Synchronize the servers on the nodes.
- //
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
void
Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update)
{
ServerEntrySeq entries;
- int serial;
+ ApplicationDescriptor oldDesc;
+ ApplicationDescriptor newDesc;
{
Lock sync(*this);
checkSessionLock(session);
+ while(_updating.find(update.name) != _updating.end())
+ {
+ wait();
+ }
+
StringApplicationDescriptorDict::const_iterator p = _descriptors.find(update.name);
if(p == _descriptors.end())
{
@@ -337,40 +357,34 @@ Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationU
ApplicationHelper previous(_communicator, p->second);
ApplicationHelper helper(_communicator, previous.update(update));
-
+
checkForUpdate(previous, helper);
-
reload(previous, helper, entries);
-
- _descriptors.put(StringApplicationDescriptorDict::value_type(update.name, helper.getDefinition()));
-
- serial = ++_serial;
- }
- //
- // Notify the observers.
- //
- _registryObserver->applicationUpdated(serial, update);
+ oldDesc = previous.getDefinition();
+ newDesc = helper.getDefinition();
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "updated application `" << update.name << "'";
+ _updating.insert(update.name);
}
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ finishUpdate(entries, update, oldDesc, newDesc);
}
void
Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& newDesc)
{
ServerEntrySeq entries;
- int serial;
ApplicationUpdateDescriptor update;
+ ApplicationDescriptor oldDesc;
{
Lock sync(*this);
checkSessionLock(session);
+ while(_updating.find(update.name) != _updating.end())
+ {
+ wait();
+ }
+
StringApplicationDescriptorDict::const_iterator p = _descriptors.find(newDesc.name);
if(p == _descriptors.end())
{
@@ -381,27 +395,56 @@ Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDes
ApplicationHelper helper(_communicator, newDesc);
update = helper.diff(previous);
- checkForUpdate(previous, helper);
-
- reload(previous, helper, entries);
-
- _descriptors.put(StringApplicationDescriptorDict::value_type(newDesc.name, newDesc));
+ checkForUpdate(previous, helper);
+ reload(previous, helper, entries);
- serial = ++_serial;
+ oldDesc = previous.getDefinition();
+
+ _updating.insert(update.name);
}
- //
- // Notify the observers.
- //
- _registryObserver->applicationUpdated(serial, update);
+ finishUpdate(entries, update, oldDesc, newDesc);
+}
- if(_traceLevels->application > 0)
+void
+Database::instantiateServer(AdminSessionI* session,
+ const string& application,
+ const string& node,
+ const ServerInstanceDescriptor& instance)
+{
+ ServerEntrySeq entries;
+ ApplicationUpdateDescriptor update;
+ ApplicationDescriptor oldDesc;
+ ApplicationDescriptor newDesc;
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "synced application `" << newDesc.name << "'";
+ Lock sync(*this);
+ checkSessionLock(session);
+
+ while(_updating.find(update.name) != _updating.end())
+ {
+ wait();
+ }
+
+ StringApplicationDescriptorDict::const_iterator p = _descriptors.find(application);
+ if(p == _descriptors.end())
+ {
+ throw ApplicationNotExistException(application);
+ }
+
+ ApplicationHelper previous(_communicator, p->second);
+ ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance));
+ update = helper.diff(previous);
+
+ checkForUpdate(previous, helper);
+ reload(previous, helper, entries);
+
+ oldDesc = previous.getDefinition();
+ newDesc = helper.getDefinition();
+
+ _updating.insert(update.name);
}
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ finishUpdate(entries, update, oldDesc, newDesc);
}
void
@@ -413,6 +456,11 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string&
Lock sync(*this);
checkSessionLock(session);
+ while(_updating.find(name) != _updating.end())
+ {
+ wait();
+ }
+
StringApplicationDescriptorDict::iterator p = _descriptors.find(name);
if(p == _descriptors.end())
{
@@ -452,52 +500,6 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string&
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
-void
-Database::instantiateServer(AdminSessionI* session,
- const string& application,
- const string& node,
- const ServerInstanceDescriptor& instance)
-{
- ServerEntrySeq entries;
- int serial;
- ApplicationUpdateDescriptor update;
- {
- Lock sync(*this);
- checkSessionLock(session);
-
- StringApplicationDescriptorDict::const_iterator p = _descriptors.find(application);
- if(p == _descriptors.end())
- {
- throw ApplicationNotExistException(application);
- }
-
- ApplicationHelper previous(_communicator, p->second);
- ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance));
- update = helper.diff(previous);
-
- checkForUpdate(previous, helper);
-
- reload(previous, helper, entries);
-
- _descriptors.put(StringApplicationDescriptorDict::value_type(application, helper.getDefinition()));
-
- serial = ++_serial;
- }
-
- //
- // Notify the observers.
- //
- _registryObserver->applicationUpdated(serial, update);
-
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "updated application `" << update.name << "'";
- }
-
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
-}
-
ApplicationDescriptor
Database::getApplicationDescriptor(const std::string& name)
{
@@ -633,7 +635,6 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
serial = ++_serial;
}
-
if(_traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
@@ -935,7 +936,7 @@ Database::removeObject(const Ice::Identity& id)
{
int serial;
{
- Lock sync(*this);
+ Lock sync(*this);
if(_objectCache.has(id))
{
DeploymentException ex;
@@ -1405,3 +1406,66 @@ Database::reload(const ApplicationHelper& oldApp, const ApplicationHelper& newAp
entries.push_back(_serverCache.add(*q));
}
}
+
+void
+Database::finishUpdate(ServerEntrySeq& entries,
+ const ApplicationUpdateDescriptor& update,
+ const ApplicationDescriptor& oldDesc,
+ const ApplicationDescriptor& newDesc)
+{
+
+ //
+ // Synchronize the servers on the nodes. If a server couldn't be
+ // deployed we unload the application and throw.
+ //
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ }
+ catch(const DeploymentException& ex)
+ {
+ {
+ Lock sync(*this);
+ entries.clear();
+ ApplicationHelper previous(_communicator, newDesc);
+ ApplicationHelper helper(_communicator, oldDesc);
+ reload(previous, helper, entries);
+ _updating.erase(newDesc.name);
+ notifyAll();
+ }
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ }
+ catch(const DeploymentException& ex)
+ {
+ // TODO: warning?
+ }
+ throw ex;
+ }
+
+ //
+ // Save the application descriptor.
+ //
+ int serial;
+ {
+ Lock sync(*this);
+ _descriptors.put(StringApplicationDescriptorDict::value_type(update.name, newDesc));
+ serial = ++_serial;
+ _updating.erase(update.name);
+ notifyAll();
+ }
+
+ //
+ // Notify the observers.
+ //
+ _registryObserver->applicationUpdated(serial, update);
+
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "updated application `" << update.name << "'";
+ }
+
+}
+