summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp1358
1 files changed, 679 insertions, 679 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index aa4803f9269..15dafc7bf4d 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -38,17 +38,17 @@ struct ObjectLoadCI : binary_function<pair<Ice::ObjectPrx, float>&, pair<Ice::Ob
{
bool operator()(const pair<Ice::ObjectPrx, float>& lhs, const pair<Ice::ObjectPrx, float>& rhs)
{
- return lhs.second < rhs.second;
+ return lhs.second < rhs.second;
}
};
}
Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
- const IceStorm::TopicManagerPrx& topicManager,
- const string& instanceName,
- const TraceLevelsPtr& traceLevels,
- const RegistryInfo& info) :
+ const IceStorm::TopicManagerPrx& topicManager,
+ const string& instanceName,
+ const TraceLevelsPtr& traceLevels,
+ const RegistryInfo& info) :
_communicator(registryAdapter->getCommunicator()),
_internalAdapter(registryAdapter),
_topicManager(topicManager),
@@ -72,31 +72,31 @@ Database::Database(const Ice::ObjectAdapterPtr& registryAdapter,
ServerEntrySeq entries;
for(StringApplicationInfoDict::iterator p = _applications.begin(); p != _applications.end(); ++p)
{
- try
- {
- //
- // Create an application helper for the application
- // without instantiating. The application might be invalid
- // if we need to upgrade it.
- //
- ApplicationInfo info = p->second;
-
- ApplicationHelper helper(_communicator, p->second.descriptor, false, false);
- if(helper.upgrade(info.descriptor))
- {
- ++info.revision;
- info.updateUser = "IceGrid Registry (database upgrade)";
- info.updateTime = IceUtil::Time::now().toMilliSeconds();
- p.set(info);
- }
-
- load(ApplicationHelper(_communicator, info.descriptor), entries, info.uuid, info.revision);
- }
- catch(const DeploymentException& ex)
- {
- Ice::Error err(_traceLevels->logger);
- err << "invalid application `" << p->first << "':\n" << ex.reason;
- }
+ try
+ {
+ //
+ // Create an application helper for the application
+ // without instantiating. The application might be invalid
+ // if we need to upgrade it.
+ //
+ ApplicationInfo info = p->second;
+
+ ApplicationHelper helper(_communicator, p->second.descriptor, false, false);
+ if(helper.upgrade(info.descriptor))
+ {
+ ++info.revision;
+ info.updateUser = "IceGrid Registry (database upgrade)";
+ info.updateTime = IceUtil::Time::now().toMilliSeconds();
+ p.set(info);
+ }
+
+ load(ApplicationHelper(_communicator, info.descriptor), entries, info.uuid, info.revision);
+ }
+ catch(const DeploymentException& ex)
+ {
+ Ice::Error err(_traceLevels->logger);
+ err << "invalid application `" << p->first << "':\n" << ex.reason;
+ }
}
_serverCache.setTraceLevels(_traceLevels);
@@ -141,17 +141,17 @@ Database::getObserverTopic(TopicName name) const
switch(name)
{
case RegistryObserverTopicName:
- return _registryObserverTopic;
+ return _registryObserverTopic;
case NodeObserverTopicName:
- return _nodeObserverTopic;
+ return _nodeObserverTopic;
case ApplicationObserverTopicName:
- return _applicationObserverTopic;
+ return _applicationObserverTopic;
case AdapterObserverTopicName:
- return _adapterObserverTopic;
+ return _adapterObserverTopic;
case ObjectObserverTopicName:
- return _objectObserverTopic;
+ return _objectObserverTopic;
default:
- break;
+ break;
}
return 0;
}
@@ -161,7 +161,7 @@ Database::checkSessionLock(AdminSessionI* session)
{
if(_lock != 0 && session != _lock)
{
- throw AccessDeniedException(_lockUserId); // Lock held by another session.
+ throw AccessDeniedException(_lockUserId); // Lock held by another session.
}
}
@@ -172,7 +172,7 @@ Database::lock(AdminSessionI* session, const string& userId)
if(_lock != 0 && session != _lock)
{
- throw AccessDeniedException(_lockUserId); // Lock held by another session.
+ throw AccessDeniedException(_lockUserId); // Lock held by another session.
}
assert(_lock == 0 || _lock == session);
@@ -188,7 +188,7 @@ Database::unlock(AdminSessionI* session)
Lock sync(*this);
if(_lock != session)
{
- throw AccessDeniedException();
+ throw AccessDeniedException();
}
_lock = 0;
@@ -200,54 +200,54 @@ Database::syncApplications(const ApplicationInfoSeq& applications)
{
int serial;
{
- Lock sync(*this);
+ Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- ServerEntrySeq entries;
- set<string> names;
- for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p)
- {
- try
- {
- StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name);
- if(s != _applications.end())
- {
- ApplicationHelper previous(_communicator, s->second.descriptor);
- ApplicationHelper helper(_communicator, p->descriptor);
- reload(previous, helper, entries, p->uuid, p->revision);
- }
- else
- {
- load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
- }
- }
- catch(const DeploymentException& ex)
- {
- Ice::Warning warn(_traceLevels->logger);
- warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
- }
- _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
- names.insert(p->descriptor.name);
- }
-
- StringApplicationInfoDict::iterator s = _applications.begin();
- while(s != _applications.end())
- {
- if(names.find(s->first) == names.end())
- {
- unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
- _applications.erase(s++);
- }
- else
- {
- ++s;
- }
- }
- ++_applicationSerial;
+ Freeze::TransactionHolder txHolder(_connection);
+ ServerEntrySeq entries;
+ set<string> names;
+ for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p)
+ {
+ try
+ {
+ StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name);
+ if(s != _applications.end())
+ {
+ ApplicationHelper previous(_communicator, s->second.descriptor);
+ ApplicationHelper helper(_communicator, p->descriptor);
+ reload(previous, helper, entries, p->uuid, p->revision);
+ }
+ else
+ {
+ load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
+ }
+ }
+ catch(const DeploymentException& ex)
+ {
+ Ice::Warning warn(_traceLevels->logger);
+ warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
+ }
+ _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
+ names.insert(p->descriptor.name);
+ }
+
+ StringApplicationInfoDict::iterator s = _applications.begin();
+ while(s != _applications.end())
+ {
+ if(names.find(s->first) == names.end())
+ {
+ unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
+ _applications.erase(s++);
+ }
+ else
+ {
+ ++s;
+ }
+ }
+ ++_applicationSerial;
- serial = _applicationObserverTopic->applicationInit(_applicationSerial, applications);
+ serial = _applicationObserverTopic->applicationInit(_applicationSerial, applications);
- txHolder.commit();
+ txHolder.commit();
}
_applicationObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -257,15 +257,15 @@ Database::syncAdapters(const AdapterInfoSeq& adapters)
{
int serial;
{
- Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- _adapters.clear();
- for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
- {
- _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
- }
- serial = _adapterObserverTopic->adapterInit(adapters);
- txHolder.commit();
+ Lock sync(*this);
+ Freeze::TransactionHolder txHolder(_connection);
+ _adapters.clear();
+ for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
+ {
+ _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
+ }
+ serial = _adapterObserverTopic->adapterInit(adapters);
+ txHolder.commit();
}
_adapterObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -275,15 +275,15 @@ Database::syncObjects(const ObjectInfoSeq& objects)
{
int serial;
{
- Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- _objects.clear();
- for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
- {
- _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
- }
- serial = _objectObserverTopic->objectInit(objects);
- txHolder.commit();
+ Lock sync(*this);
+ Freeze::TransactionHolder txHolder(_connection);
+ _objects.clear();
+ for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
+ {
+ _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
+ }
+ serial = _objectObserverTopic->objectInit(objects);
+ txHolder.commit();
}
_objectObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -293,62 +293,62 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
{
ServerEntrySeq entries;
{
- Lock sync(*this);
- checkSessionLock(session);
+ Lock sync(*this);
+ checkSessionLock(session);
- while(_updating.find(info.descriptor.name) != _updating.end())
- {
- wait();
- }
+ while(_updating.find(info.descriptor.name) != _updating.end())
+ {
+ wait();
+ }
- if(_applications.find(info.descriptor.name) != _applications.end())
- {
- throw DeploymentException("application `" + info.descriptor.name + "' already exists");
- }
+ if(_applications.find(info.descriptor.name) != _applications.end())
+ {
+ throw DeploymentException("application `" + info.descriptor.name + "' already exists");
+ }
- ApplicationHelper helper(_communicator, info.descriptor, true);
- checkForAddition(helper);
- load(helper, entries, info.uuid, info.revision);
- startUpdating(info.descriptor.name);
+ ApplicationHelper helper(_communicator, info.descriptor, true);
+ checkForAddition(helper);
+ load(helper, entries, info.uuid, info.revision);
+ startUpdating(info.descriptor.name);
}
if(_master)
{
- try
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
- }
- catch(const DeploymentException& ex)
- {
- try
- {
- Lock sync(*this);
- entries.clear();
- unload(ApplicationHelper(_communicator, info.descriptor), entries);
- }
- catch(const DeploymentException& ex)
- {
- Ice::Error err(_traceLevels->logger);
- err << "failed to rollback previous application `" << info.descriptor.name << "':\n" << ex.reason;
- }
- finishUpdating(info.descriptor.name);
- throw ex;
- }
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
+ }
+ catch(const DeploymentException& ex)
+ {
+ try
+ {
+ Lock sync(*this);
+ entries.clear();
+ unload(ApplicationHelper(_communicator, info.descriptor), entries);
+ }
+ catch(const DeploymentException& ex)
+ {
+ Ice::Error err(_traceLevels->logger);
+ err << "failed to rollback previous application `" << info.descriptor.name << "':\n" << ex.reason;
+ }
+ finishUpdating(info.descriptor.name);
+ throw ex;
+ }
}
int serial;
{
- Lock sync(*this);
- ++_applicationSerial;
- _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
+ Lock sync(*this);
+ ++_applicationSerial;
+ _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
- serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info);
+ serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info);
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "added application `" << info.descriptor.name << "'";
- }
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "added application `" << info.descriptor.name << "'";
+ }
}
_applicationObserverTopic->waitForSyncedSubscribers(serial);
@@ -364,35 +364,35 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se
ApplicationDescriptor newDesc;
ApplicationUpdateInfo update = updt;
{
- Lock sync(*this);
- checkSessionLock(session);
+ Lock sync(*this);
+ checkSessionLock(session);
- while(_updating.find(update.descriptor.name) != _updating.end())
- {
- wait();
- }
+ while(_updating.find(update.descriptor.name) != _updating.end())
+ {
+ wait();
+ }
- StringApplicationInfoDict::const_iterator p = _applications.find(update.descriptor.name);
- if(p == _applications.end())
- {
- throw ApplicationNotExistException(update.descriptor.name);
- }
- oldApp = p->second;
+ StringApplicationInfoDict::const_iterator p = _applications.find(update.descriptor.name);
+ if(p == _applications.end())
+ {
+ throw ApplicationNotExistException(update.descriptor.name);
+ }
+ oldApp = p->second;
- if(update.revision < 0)
- {
- update.revision = oldApp.revision + 1;
- }
+ if(update.revision < 0)
+ {
+ update.revision = oldApp.revision + 1;
+ }
- ApplicationHelper previous(_communicator, oldApp.descriptor);
- ApplicationHelper helper(_communicator, previous.update(update.descriptor), true);
+ ApplicationHelper previous(_communicator, oldApp.descriptor);
+ ApplicationHelper helper(_communicator, previous.update(update.descriptor), true);
- checkForUpdate(previous, helper);
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
+ checkForUpdate(previous, helper);
+ reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
- newDesc = helper.getDefinition();
+ newDesc = helper.getDefinition();
- startUpdating(update.descriptor.name);
+ startUpdating(update.descriptor.name);
}
finishApplicationUpdate(entries, update, oldApp, newDesc, session);
@@ -405,33 +405,33 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS
ApplicationUpdateInfo update;
ApplicationInfo oldApp;
{
- Lock sync(*this);
- checkSessionLock(session);
+ Lock sync(*this);
+ checkSessionLock(session);
- while(_updating.find(update.descriptor.name) != _updating.end())
- {
- wait();
- }
+ while(_updating.find(update.descriptor.name) != _updating.end())
+ {
+ wait();
+ }
- StringApplicationInfoDict::const_iterator p = _applications.find(newDesc.name);
- if(p == _applications.end())
- {
- throw ApplicationNotExistException(newDesc.name);
- }
- oldApp = p->second;
+ StringApplicationInfoDict::const_iterator p = _applications.find(newDesc.name);
+ if(p == _applications.end())
+ {
+ throw ApplicationNotExistException(newDesc.name);
+ }
+ oldApp = p->second;
- ApplicationHelper previous(_communicator, oldApp.descriptor);
- ApplicationHelper helper(_communicator, newDesc, true);
+ ApplicationHelper previous(_communicator, oldApp.descriptor);
+ ApplicationHelper helper(_communicator, newDesc, true);
- update.updateTime = IceUtil::Time::now().toMilliSeconds();
- update.updateUser = _lockUserId;
- update.revision = oldApp.revision + 1;
- update.descriptor = helper.diff(previous);
-
- checkForUpdate(previous, helper);
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
+ update.updateTime = IceUtil::Time::now().toMilliSeconds();
+ update.updateUser = _lockUserId;
+ update.revision = oldApp.revision + 1;
+ update.descriptor = helper.diff(previous);
+
+ checkForUpdate(previous, helper);
+ reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
- startUpdating(update.descriptor.name);
+ startUpdating(update.descriptor.name);
}
finishApplicationUpdate(entries, update, oldApp, newDesc, session);
@@ -439,44 +439,44 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS
void
Database::instantiateServer(const string& application,
- const string& node,
- const ServerInstanceDescriptor& instance,
- AdminSessionI* session)
+ const string& node,
+ const ServerInstanceDescriptor& instance,
+ AdminSessionI* session)
{
ServerEntrySeq entries;
ApplicationUpdateInfo update;
ApplicationInfo oldApp;
ApplicationDescriptor newDesc;
{
- Lock sync(*this);
- checkSessionLock(session);
+ Lock sync(*this);
+ checkSessionLock(session);
- while(_updating.find(application) != _updating.end())
- {
- wait();
- }
+ while(_updating.find(application) != _updating.end())
+ {
+ wait();
+ }
- StringApplicationInfoDict::const_iterator p = _applications.find(application);
- if(p == _applications.end())
- {
- throw ApplicationNotExistException(application);
- }
- oldApp = p->second;
+ StringApplicationInfoDict::const_iterator p = _applications.find(application);
+ if(p == _applications.end())
+ {
+ throw ApplicationNotExistException(application);
+ }
+ oldApp = p->second;
- ApplicationHelper previous(_communicator, oldApp.descriptor);
- ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance), true);
+ ApplicationHelper previous(_communicator, oldApp.descriptor);
+ ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance), true);
- update.updateTime = IceUtil::Time::now().toMilliSeconds();
- update.updateUser = _lockUserId;
- update.revision = oldApp.revision + 1;
- update.descriptor = helper.diff(previous);
+ update.updateTime = IceUtil::Time::now().toMilliSeconds();
+ update.updateUser = _lockUserId;
+ update.revision = oldApp.revision + 1;
+ update.descriptor = helper.diff(previous);
- checkForUpdate(previous, helper);
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
+ checkForUpdate(previous, helper);
+ reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
- newDesc = helper.getDefinition();
+ newDesc = helper.getDefinition();
- startUpdating(update.descriptor.name);
+ startUpdating(update.descriptor.name);
}
finishApplicationUpdate(entries, update, oldApp, newDesc, session);
@@ -488,55 +488,55 @@ Database::removeApplication(const string& name, AdminSessionI* session)
ServerEntrySeq entries;
int serial;
{
- Lock sync(*this);
- checkSessionLock(session);
-
- while(_updating.find(name) != _updating.end())
- {
- wait();
- }
-
- StringApplicationInfoDict::iterator p = _applications.find(name);
- if(p == _applications.end())
- {
- throw ApplicationNotExistException(name);
- }
-
- try
- {
- ApplicationHelper helper(_communicator, p->second.descriptor);
- unload(helper, entries);
- }
- catch(const DeploymentException&)
- {
- //
- // For some reasons the application became invalid. If
- // it's invalid, it's most likely not loaded either. So we
- // ignore the error and erase the descriptor.
- //
- }
-
- startUpdating(name);
+ Lock sync(*this);
+ checkSessionLock(session);
+
+ while(_updating.find(name) != _updating.end())
+ {
+ wait();
+ }
+
+ StringApplicationInfoDict::iterator p = _applications.find(name);
+ if(p == _applications.end())
+ {
+ throw ApplicationNotExistException(name);
+ }
+
+ try
+ {
+ ApplicationHelper helper(_communicator, p->second.descriptor);
+ unload(helper, entries);
+ }
+ catch(const DeploymentException&)
+ {
+ //
+ // For some reasons the application became invalid. If
+ // it's invalid, it's most likely not loaded either. So we
+ // ignore the error and erase the descriptor.
+ //
+ }
+
+ startUpdating(name);
}
if(_master)
{
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitNoThrow));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::waitNoThrow));
}
{
- Lock sync(*this);
- _applications.erase(name);
- ++_applicationSerial;
+ Lock sync(*this);
+ _applications.erase(name);
+ ++_applicationSerial;
- serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name);
+ serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name);
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "removed application `" << name << "'";
- }
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "removed application `" << name << "'";
+ }
}
_applicationObserverTopic->waitForSyncedSubscribers(serial);
@@ -553,7 +553,7 @@ Database::getApplicationInfo(const std::string& name)
StringApplicationInfoDict::const_iterator p = descriptors.find(name);
if(p == descriptors.end())
{
- throw ApplicationNotExistException(name);
+ throw ApplicationNotExistException(name);
}
return p->second;
@@ -569,18 +569,18 @@ Database::getAllApplications(const string& expression)
void
Database::waitForApplicationUpdate(const AMD_NodeSession_waitForApplicationUpdatePtr& cb,
- const string& application,
- int revision)
+ const string& application,
+ int revision)
{
Lock sync(*this);
map<string, vector<AMD_NodeSession_waitForApplicationUpdatePtr> >::iterator p = _updating.find(application);
if(p != _updating.end())
{
- p->second.push_back(cb);
+ p->second.push_back(cb);
}
else
{
- cb->ice_response();
+ cb->ice_response();
}
}
@@ -637,67 +637,67 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
{
int serial;
{
- Lock sync(*this);
- if(_adapterCache.has(adapterId))
- {
- throw AdapterExistsException(adapterId);
- }
-
- StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
- AdapterInfo info;
- bool updated = false;
- if(proxy)
- {
- if(p != _adapters.end())
- {
- info = p->second;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- p.set(info);
- updated = true;
- }
- else
- {
- info.id = adapterId;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
- }
- }
- else
- {
- if(p == _adapters.end())
- {
- return;
- }
- _adapters.erase(p);
- }
-
- if(_traceLevels->adapter > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'";
- if(!replicaGroupId.empty())
- {
- out << " with replica group `" << replicaGroupId << "'";
- }
- }
+ Lock sync(*this);
+ if(_adapterCache.has(adapterId))
+ {
+ throw AdapterExistsException(adapterId);
+ }
+
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ AdapterInfo info;
+ bool updated = false;
+ if(proxy)
+ {
+ if(p != _adapters.end())
+ {
+ info = p->second;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ p.set(info);
+ updated = true;
+ }
+ else
+ {
+ info.id = adapterId;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
+ }
+ }
+ else
+ {
+ if(p == _adapters.end())
+ {
+ return;
+ }
+ _adapters.erase(p);
+ }
+
+ if(_traceLevels->adapter > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
+ out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'";
+ if(!replicaGroupId.empty())
+ {
+ out << " with replica group `" << replicaGroupId << "'";
+ }
+ }
- if(proxy)
- {
- if(updated)
- {
- serial = _adapterObserverTopic->adapterUpdated(info);
- }
- else
- {
- serial = _adapterObserverTopic->adapterAdded(info);
- }
- }
- else
- {
- serial = _adapterObserverTopic->adapterRemoved(adapterId);
- }
+ if(proxy)
+ {
+ if(updated)
+ {
+ serial = _adapterObserverTopic->adapterUpdated(info);
+ }
+ else
+ {
+ serial = _adapterObserverTopic->adapterAdded(info);
+ }
+ }
+ else
+ {
+ serial = _adapterObserverTopic->adapterRemoved(adapterId);
+ }
}
_adapterObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -710,18 +710,18 @@ Database::getAdapterDirectProxy(const string& id)
StringAdapterInfoDict::const_iterator p = adapters.find(id);
if(p != adapters.end())
{
- return p->second.proxy;
+ return p->second.proxy;
}
Ice::EndpointSeq endpoints;
for(p = adapters.findByReplicaGroupId(id, true); p != adapters.end(); ++p)
{
- Ice::EndpointSeq edpts = p->second.proxy->ice_getEndpoints();
- endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
+ Ice::EndpointSeq edpts = p->second.proxy->ice_getEndpoints();
+ endpoints.insert(endpoints.end(), edpts.begin(), edpts.end());
}
if(!endpoints.empty())
{
- return _communicator->stringToProxy("dummy:default")->ice_endpoints(endpoints);
+ return _communicator->stringToProxy("dummy:default")->ice_endpoints(endpoints);
}
throw AdapterNotExistException(id);
@@ -732,61 +732,61 @@ Database::removeAdapter(const string& adapterId)
{
int serial;
{
- Lock sync(*this);
- if(_adapterCache.has(adapterId))
- {
- AdapterEntryPtr adpt = _adapterCache.get(adapterId);
- DeploymentException ex;
- ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
- ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
- throw ex;
- }
-
- Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator
-
- StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
- AdapterInfoSeq infos;
- if(p != _adapters.end())
- {
- _adapters.erase(p);
- }
- else
- {
- p = _adapters.findByReplicaGroupId(adapterId, true);
- if(p == _adapters.end())
- {
- throw AdapterNotExistException(adapterId);
- }
-
- while(p != _adapters.end())
- {
- AdapterInfo info = p->second;
- info.replicaGroupId = "";
- infos.push_back(info);
- _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
- ++p;
- }
- }
-
- if(_traceLevels->adapter > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'";
- }
-
- if(infos.empty())
- {
- serial = _adapterObserverTopic->adapterRemoved(adapterId);
- }
- else
- {
- for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
- {
- serial = _adapterObserverTopic->adapterUpdated(*p);
- }
- }
-
- txHolder.commit();
+ Lock sync(*this);
+ if(_adapterCache.has(adapterId))
+ {
+ AdapterEntryPtr adpt = _adapterCache.get(adapterId);
+ DeploymentException ex;
+ ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
+ ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
+ throw ex;
+ }
+
+ Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator
+
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ AdapterInfoSeq infos;
+ if(p != _adapters.end())
+ {
+ _adapters.erase(p);
+ }
+ else
+ {
+ p = _adapters.findByReplicaGroupId(adapterId, true);
+ if(p == _adapters.end())
+ {
+ throw AdapterNotExistException(adapterId);
+ }
+
+ while(p != _adapters.end())
+ {
+ AdapterInfo info = p->second;
+ info.replicaGroupId = "";
+ infos.push_back(info);
+ _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
+ ++p;
+ }
+ }
+
+ if(_traceLevels->adapter > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
+ out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'";
+ }
+
+ if(infos.empty())
+ {
+ serial = _adapterObserverTopic->adapterRemoved(adapterId);
+ }
+ else
+ {
+ for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ serial = _adapterObserverTopic->adapterUpdated(*p);
+ }
+ }
+
+ txHolder.commit();
}
_adapterObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -807,7 +807,7 @@ Database::getAdapterInfo(const string& id)
//
try
{
- return _adapterCache.get(id)->getAdapterInfo();
+ return _adapterCache.get(id)->getAdapterInfo();
}
catch(AdapterNotExistException&)
{
@@ -822,9 +822,9 @@ Database::getAdapterInfo(const string& id)
StringAdapterInfoDict::const_iterator p = adapters.find(id);
if(p != adapters.end())
{
- AdapterInfoSeq infos;
- infos.push_back(p->second);
- return infos;
+ AdapterInfoSeq infos;
+ infos.push_back(p->second);
+ return infos;
}
//
@@ -834,13 +834,13 @@ Database::getAdapterInfo(const string& id)
p = adapters.findByReplicaGroupId(id, true);
if(p != adapters.end())
{
- AdapterInfoSeq infos;
- while(p != adapters.end())
- {
- infos.push_back(p->second);
- ++p;
- }
- return infos;
+ AdapterInfoSeq infos;
+ while(p != adapters.end())
+ {
+ infos.push_back(p->second);
+ ++p;
+ }
+ return infos;
}
throw AdapterNotExistException(id);
@@ -857,15 +857,15 @@ Database::getAllAdapters(const string& expression)
set<string> groups;
for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
{
- if(expression.empty() || IceUtil::match(p->first, expression, true))
- {
- result.push_back(p->first);
- }
- string replicaGroupId = p->second.replicaGroupId;
- if(!replicaGroupId.empty() && (expression.empty() || IceUtil::match(replicaGroupId, expression, true)))
- {
- groups.insert(replicaGroupId);
- }
+ if(expression.empty() || IceUtil::match(p->first, expression, true))
+ {
+ result.push_back(p->first);
+ }
+ string replicaGroupId = p->second.replicaGroupId;
+ if(!replicaGroupId.empty() && (expression.empty() || IceUtil::match(replicaGroupId, expression, true)))
+ {
+ groups.insert(replicaGroupId);
+ }
}
//
// COMPILERFIX: We're not using result.insert() here, this doesn't compile on Sun.
@@ -873,7 +873,7 @@ Database::getAllAdapters(const string& expression)
//result.insert(result.end(), groups.begin(), groups.end())
for(set<string>::const_iterator q = groups.begin(); q != groups.end(); ++q)
{
- result.push_back(*q);
+ result.push_back(*q);
}
return result;
}
@@ -883,27 +883,27 @@ Database::addObject(const ObjectInfo& info)
{
int serial;
{
- Lock sync(*this);
- const Ice::Identity id = info.proxy->ice_getIdentity();
-
- if(_objectCache.has(id))
- {
- throw ObjectExistsException(id);
- }
-
- if(_objects.find(id) != _objects.end())
- {
- throw ObjectExistsException(id);
- }
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
-
- serial = _objectObserverTopic->objectAdded(info);
-
- if(_traceLevels->object > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "added object `" << _communicator->identityToString(id) << "'";
- }
+ Lock sync(*this);
+ const Ice::Identity id = info.proxy->ice_getIdentity();
+
+ if(_objectCache.has(id))
+ {
+ throw ObjectExistsException(id);
+ }
+
+ if(_objects.find(id) != _objects.end())
+ {
+ throw ObjectExistsException(id);
+ }
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ serial = _objectObserverTopic->objectAdded(info);
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "added object `" << _communicator->identityToString(id) << "'";
+ }
}
_objectObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -913,31 +913,31 @@ Database::addOrUpdateObject(const ObjectInfo& info)
{
int serial;
{
- Lock sync(*this);
- const Ice::Identity id = info.proxy->ice_getIdentity();
-
- if(_objectCache.has(id))
- {
- throw ObjectExistsException(id);
- }
-
- bool update = _objects.find(id) != _objects.end();
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
-
- if(update)
- {
- serial = _objectObserverTopic->objectUpdated(info);
- }
- else
- {
- serial = _objectObserverTopic->objectAdded(info);
- }
-
- if(_traceLevels->object > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "'";
- }
+ Lock sync(*this);
+ const Ice::Identity id = info.proxy->ice_getIdentity();
+
+ if(_objectCache.has(id))
+ {
+ throw ObjectExistsException(id);
+ }
+
+ bool update = _objects.find(id) != _objects.end();
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ if(update)
+ {
+ serial = _objectObserverTopic->objectUpdated(info);
+ }
+ else
+ {
+ serial = _objectObserverTopic->objectAdded(info);
+ }
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "'";
+ }
}
_objectObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -947,33 +947,33 @@ Database::removeObject(const Ice::Identity& id)
{
int serial;
{
- Lock sync(*this);
- if(_objectCache.has(id))
- {
- DeploymentException ex;
- ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `";
- ex.reason += _objectCache.get(id)->getApplication();
- ex.reason += "'";
- throw ex;
- }
-
- IdentityObjectInfoDict::iterator p = _objects.find(id);
- if(p == _objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
- _objects.erase(p);
-
- serial = _objectObserverTopic->objectRemoved(id);
-
- if(_traceLevels->object > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "removed object `" << _communicator->identityToString(id) << "'";
- }
+ Lock sync(*this);
+ if(_objectCache.has(id))
+ {
+ DeploymentException ex;
+ ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
+
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
+ }
+ _objects.erase(p);
+
+ serial = _objectObserverTopic->objectRemoved(id);
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "removed object `" << _communicator->identityToString(id) << "'";
+ }
}
_objectObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -983,39 +983,39 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
{
int serial;
{
- Lock sync(*this);
-
- const Ice::Identity id = proxy->ice_getIdentity();
- if(_objectCache.has(id))
- {
- DeploymentException ex;
- ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `";
- ex.reason += _objectCache.get(id)->getApplication();
- ex.reason += "'";
- throw ex;
- }
+ Lock sync(*this);
+
+ const Ice::Identity id = proxy->ice_getIdentity();
+ if(_objectCache.has(id))
+ {
+ DeploymentException ex;
+ ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
- IdentityObjectInfoDict::iterator p = _objects.find(id);
- if(p == _objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
-
- ObjectInfo info;
- info = p->second;
- info.proxy = proxy;
- p.set(info);
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
+ }
+
+ ObjectInfo info;
+ info = p->second;
+ info.proxy = proxy;
+ p.set(info);
- serial = _objectObserverTopic->objectUpdated(info);
-
- if(_traceLevels->object > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "updated object `" << _communicator->identityToString(id) << "'";
- }
+ serial = _objectObserverTopic->objectUpdated(info);
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "updated object `" << _communicator->identityToString(id) << "'";
+ }
}
_objectObserverTopic->waitForSyncedSubscribers(serial);
}
@@ -1027,7 +1027,7 @@ Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects)
Freeze::TransactionHolder txHolder(_connection);
for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p));
+ _objects.put(IdentityObjectInfoDict::value_type(p->proxy->ice_getIdentity(), *p));
}
int serial = _objectObserverTopic->objectsAddedOrUpdated(objects);
txHolder.commit();
@@ -1041,7 +1041,7 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects)
Freeze::TransactionHolder txHolder(_connection);
for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- _objects.erase(p->proxy->ice_getIdentity());
+ _objects.erase(p->proxy->ice_getIdentity());
}
_objectObserverTopic->objectsRemoved(objects);
txHolder.commit();
@@ -1052,10 +1052,10 @@ Database::getObjectProxy(const Ice::Identity& id)
{
try
{
- //
- // Only return proxies for non allocatable objects.
- //
- return _objectCache.get(id)->getProxy();
+ //
+ // Only return proxies for non allocatable objects.
+ //
+ return _objectCache.get(id)->getProxy();
}
catch(ObjectNotRegisteredException&)
{
@@ -1066,9 +1066,9 @@ Database::getObjectProxy(const Ice::Identity& id)
IdentityObjectInfoDict::const_iterator p = objects.find(id);
if(p == objects.end())
{
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
return p->second.proxy;
}
@@ -1079,7 +1079,7 @@ Database::getObjectByType(const string& type)
Ice::ObjectProxySeq objs = getObjectsByType(type);
if(objs.empty())
{
- return 0;
+ return 0;
}
return objs[IceUtil::random(static_cast<int>(objs.size()))];
}
@@ -1090,7 +1090,7 @@ Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample
Ice::ObjectProxySeq objs = getObjectsByType(type);
if(objs.empty())
{
- return 0;
+ return 0;
}
RandomNumberGenerator rng;
@@ -1099,18 +1099,18 @@ Database::getObjectByTypeOnLeastLoadedNode(const string& type, LoadSample sample
objectsWithLoad.reserve(objs.size());
for(Ice::ObjectProxySeq::const_iterator p = objs.begin(); p != objs.end(); ++p)
{
- float load = 1.0f;
- if(!(*p)->ice_getAdapterId().empty())
- {
- try
- {
- load = _adapterCache.get((*p)->ice_getAdapterId())->getLeastLoadedNodeLoad(sample);
- }
- catch(const AdapterNotExistException&)
- {
- }
- }
- objectsWithLoad.push_back(make_pair(*p, load));
+ float load = 1.0f;
+ if(!(*p)->ice_getAdapterId().empty())
+ {
+ try
+ {
+ load = _adapterCache.get((*p)->ice_getAdapterId())->getLeastLoadedNodeLoad(sample);
+ }
+ catch(const AdapterNotExistException&)
+ {
+ }
+ }
+ objectsWithLoad.push_back(make_pair(*p, load));
}
return min_element(objectsWithLoad.begin(), objectsWithLoad.end(), ObjectLoadCI())->first;
}
@@ -1124,7 +1124,7 @@ Database::getObjectsByType(const string& type)
IdentityObjectInfoDict objects(connection, _objectDbName);
for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p)
{
- proxies.push_back(p->second.proxy);
+ proxies.push_back(p->second.proxy);
}
return proxies;
}
@@ -1134,8 +1134,8 @@ Database::getObjectInfo(const Ice::Identity& id)
{
try
{
- ObjectEntryPtr object = _objectCache.get(id);
- return object->getObjectInfo();
+ ObjectEntryPtr object = _objectCache.get(id);
+ return object->getObjectInfo();
}
catch(ObjectNotRegisteredException&)
{
@@ -1146,7 +1146,7 @@ Database::getObjectInfo(const Ice::Identity& id)
IdentityObjectInfoDict::const_iterator p = objects.find(id);
if(p == objects.end())
{
- throw ObjectNotRegisteredException(id);
+ throw ObjectNotRegisteredException(id);
}
return p->second;
}
@@ -1159,10 +1159,10 @@ Database::getAllObjectInfos(const string& expression)
IdentityObjectInfoDict objects(connection, _objectDbName);
for(IdentityObjectInfoDict::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
- if(expression.empty() || IceUtil::match(_communicator->identityToString(p->first), expression, true))
- {
- infos.push_back(p->second);
- }
+ if(expression.empty() || IceUtil::match(_communicator->identityToString(p->first), expression, true))
+ {
+ infos.push_back(p->second);
+ }
}
return infos;
}
@@ -1175,7 +1175,7 @@ Database::getObjectInfosByType(const string& type)
IdentityObjectInfoDict objects(connection, _objectDbName);
for(IdentityObjectInfoDict::const_iterator p = objects.findByType(type); p != objects.end(); ++p)
{
- infos.push_back(p->second);
+ infos.push_back(p->second);
}
return infos;
}
@@ -1183,11 +1183,11 @@ Database::getObjectInfosByType(const string& type)
void
Database::addInternalObject(const ObjectInfo& info, bool replace)
{
- Lock sync(*this);
+ Lock sync(*this);
const Ice::Identity id = info.proxy->ice_getIdentity();
if(!replace && _internalObjects.find(id) != _internalObjects.end())
{
- throw ObjectExistsException(id);
+ throw ObjectExistsException(id);
}
_internalObjects.put(IdentityObjectInfoDict::value_type(id, info));
}
@@ -1199,9 +1199,9 @@ Database::removeInternalObject(const Ice::Identity& id)
IdentityObjectInfoDict::iterator p = _internalObjects.find(id);
if(p == _internalObjects.end())
{
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
_internalObjects.erase(p);
}
@@ -1214,7 +1214,7 @@ Database::getInternalObjectsByType(const string& type)
Ice::ObjectProxySeq proxies;
for(IdentityObjectInfoDict::const_iterator p = internalObjects.findByType(type); p != internalObjects.end(); ++p)
{
- proxies.push_back(p->second.proxy);
+ proxies.push_back(p->second.proxy);
}
return proxies;
}
@@ -1261,9 +1261,9 @@ Database::checkServerForAddition(const string& id)
{
if(_serverCache.has(id))
{
- DeploymentException ex;
- ex.reason = "server `" + id + "' is already registered";
- throw ex;
+ DeploymentException ex;
+ ex.reason = "server `" + id + "' is already registered";
+ throw ex;
}
}
@@ -1274,9 +1274,9 @@ Database::checkAdapterForAddition(const string& id)
_adapters.find(id) != _adapters.end() ||
_adapters.findByReplicaGroupId(id) != _adapters.end())
{
- DeploymentException ex;
- ex.reason = "adapter `" + id + "' is already registered";
- throw ex;
+ DeploymentException ex;
+ ex.reason = "adapter `" + id + "' is already registered";
+ throw ex;
}
}
@@ -1287,9 +1287,9 @@ Database::checkObjectForAddition(const Ice::Identity& objectId)
_allocatableObjectCache.has(objectId) ||
_objects.find(objectId) != _objects.end())
{
- DeploymentException ex;
- ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered";
- throw ex;
+ DeploymentException ex;
+ ex.reason = "object `" + _communicator->identityToString(objectId) + "' is already registered";
+ throw ex;
}
}
@@ -1300,27 +1300,27 @@ Database::load(const ApplicationHelper& app, ServerEntrySeq& entries, const stri
const string application = app.getInstance().name;
for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
{
- _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
+ _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
}
const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
{
- assert(!r->id.empty());
- _adapterCache.addReplicaGroup(*r, application);
- for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
- {
- ObjectInfo info;
- info.type = o->type;
- info.proxy = _communicator->stringToProxy("\"" + _communicator->identityToString(o->id) + "\" @ " + r->id);
- _objectCache.add(info, application);
- }
+ assert(!r->id.empty());
+ _adapterCache.addReplicaGroup(*r, application);
+ for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
+ {
+ ObjectInfo info;
+ info.type = o->type;
+ info.proxy = _communicator->stringToProxy("\"" + _communicator->identityToString(o->id) + "\" @ " + r->id);
+ _objectCache.add(info, application);
+ }
}
map<string, ServerInfo> servers = app.getServerInfos(uuid, revision);
for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
- entries.push_back(_serverCache.add(p->second));
+ entries.push_back(_serverCache.add(p->second));
}
}
@@ -1330,33 +1330,33 @@ Database::unload(const ApplicationHelper& app, ServerEntrySeq& entries)
map<string, ServerInfo> servers = app.getServerInfos("", 0);
for(map<string, ServerInfo>::const_iterator p = servers.begin(); p != servers.end(); ++p)
{
- entries.push_back(_serverCache.remove(p->first));
+ entries.push_back(_serverCache.remove(p->first));
}
const ReplicaGroupDescriptorSeq& adpts = app.getInstance().replicaGroups;
for(ReplicaGroupDescriptorSeq::const_iterator r = adpts.begin(); r != adpts.end(); ++r)
{
- for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
- {
- _objectCache.remove(o->id);
- }
- _adapterCache.removeReplicaGroup(r->id);
+ for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
+ {
+ _objectCache.remove(o->id);
+ }
+ _adapterCache.removeReplicaGroup(r->id);
}
const NodeDescriptorDict& nodes = app.getInstance().nodes;
const string application = app.getInstance().name;
for(NodeDescriptorDict::const_iterator n = nodes.begin(); n != nodes.end(); ++n)
{
- _nodeCache.get(n->first)->removeDescriptor(application);
+ _nodeCache.get(n->first)->removeDescriptor(application);
}
}
void
Database::reload(const ApplicationHelper& oldApp,
- const ApplicationHelper& newApp,
- ServerEntrySeq& entries,
- const string& uuid,
- int revision)
+ const ApplicationHelper& newApp,
+ ServerEntrySeq& entries,
+ const string& uuid,
+ int revision)
{
const string application = oldApp.getInstance().name;
@@ -1369,24 +1369,24 @@ Database::reload(const ApplicationHelper& oldApp,
map<string, ServerInfo>::const_iterator p;
for(p = newServers.begin(); p != newServers.end(); ++p)
{
- map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
- if(q == oldServers.end())
- {
- load.push_back(p->second);
- }
- else
- {
- _serverCache.remove(p->first, false); // Don't destroy the server if it was updated.
- load.push_back(p->second);
- }
+ map<string, ServerInfo>::const_iterator q = oldServers.find(p->first);
+ if(q == oldServers.end())
+ {
+ load.push_back(p->second);
+ }
+ else
+ {
+ _serverCache.remove(p->first, false); // Don't destroy the server if it was updated.
+ load.push_back(p->second);
+ }
}
for(p = oldServers.begin(); p != oldServers.end(); ++p)
{
- map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
- if(q == newServers.end())
- {
- entries.push_back(_serverCache.remove(p->first));
- }
+ map<string, ServerInfo>::const_iterator q = newServers.find(p->first);
+ if(q == newServers.end())
+ {
+ entries.push_back(_serverCache.remove(p->first));
+ }
}
//
@@ -1397,22 +1397,22 @@ Database::reload(const ApplicationHelper& oldApp,
ReplicaGroupDescriptorSeq::const_iterator r;
for(r = oldAdpts.begin(); r != oldAdpts.end(); ++r)
{
- ReplicaGroupDescriptorSeq::const_iterator t;
- for(t = newAdpts.begin(); t != newAdpts.end(); ++t)
- {
- if(t->id == r->id)
- {
- break;
- }
- }
- for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
- {
- _objectCache.remove(o->id);
- }
- if(t == newAdpts.end())
- {
- _adapterCache.removeReplicaGroup(r->id);
- }
+ ReplicaGroupDescriptorSeq::const_iterator t;
+ for(t = newAdpts.begin(); t != newAdpts.end(); ++t)
+ {
+ if(t->id == r->id)
+ {
+ break;
+ }
+ }
+ for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
+ {
+ _objectCache.remove(o->id);
+ }
+ if(t == newAdpts.end())
+ {
+ _adapterCache.removeReplicaGroup(r->id);
+ }
}
//
@@ -1422,7 +1422,7 @@ Database::reload(const ApplicationHelper& oldApp,
NodeDescriptorDict::const_iterator n;
for(n = oldNodes.begin(); n != oldNodes.end(); ++n)
{
- _nodeCache.get(n->first)->removeDescriptor(application);
+ _nodeCache.get(n->first)->removeDescriptor(application);
}
//
@@ -1431,7 +1431,7 @@ Database::reload(const ApplicationHelper& oldApp,
const NodeDescriptorDict& newNodes = newApp.getInstance().nodes;
for(n = newNodes.begin(); n != newNodes.end(); ++n)
{
- _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
+ _nodeCache.get(n->first, true)->addDescriptor(application, n->second);
}
//
@@ -1439,24 +1439,24 @@ Database::reload(const ApplicationHelper& oldApp,
//
for(r = newAdpts.begin(); r != newAdpts.end(); ++r)
{
- try
- {
- ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(r->id));
- assert(entry);
- entry->update(r->loadBalancing);
- }
- catch(const AdapterNotExistException&)
- {
- _adapterCache.addReplicaGroup(*r, application);
- }
-
- for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
- {
- ObjectInfo info;
- info.type = o->type;
- info.proxy = _communicator->stringToProxy(_communicator->identityToString(o->id) + "@" + r->id);
- _objectCache.add(info, application);
- }
+ try
+ {
+ ReplicaGroupEntryPtr entry = ReplicaGroupEntryPtr::dynamicCast(_adapterCache.get(r->id));
+ assert(entry);
+ entry->update(r->loadBalancing);
+ }
+ catch(const AdapterNotExistException&)
+ {
+ _adapterCache.addReplicaGroup(*r, application);
+ }
+
+ for(ObjectDescriptorSeq::const_iterator o = r->objects.begin(); o != r->objects.end(); ++o)
+ {
+ ObjectInfo info;
+ info.type = o->type;
+ info.proxy = _communicator->stringToProxy(_communicator->identityToString(o->id) + "@" + r->id);
+ _objectCache.add(info, application);
+ }
}
//
@@ -1464,51 +1464,51 @@ Database::reload(const ApplicationHelper& oldApp,
//
for(vector<ServerInfo>::const_iterator q = load.begin(); q != load.end(); ++q)
{
- entries.push_back(_serverCache.add(*q));
+ entries.push_back(_serverCache.add(*q));
}
}
void
Database::finishApplicationUpdate(ServerEntrySeq& entries,
- const ApplicationUpdateInfo& update,
- const ApplicationInfo& oldApp,
- const ApplicationDescriptor& newDesc,
- AdminSessionI* session)
+ const ApplicationUpdateInfo& update,
+ const ApplicationInfo& oldApp,
+ const ApplicationDescriptor& newDesc,
+ AdminSessionI* session)
{
if(_master)
{
- //
- // Load the servers on the nodes. If a server couldn't be
- // deployed we unload the application and throw.
- //
- try
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
- }
- catch(const DeploymentException& ex)
- {
- ApplicationUpdateInfo newUpdate;
- {
- Lock sync(*this);
- entries.clear();
- ApplicationHelper previous(_communicator, newDesc);
- ApplicationHelper helper(_communicator, oldApp.descriptor);
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision);
- }
-
- try
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
- }
- catch(const DeploymentException& ex)
- {
- Ice::Error err(_traceLevels->logger);
- err << "failed to rollback previous application `" << oldApp.descriptor.name << "':\n" << ex.reason;
- }
-
- finishUpdating(newDesc.name);
- throw ex;
- }
+ //
+ // Load the servers on the nodes. If a server couldn't be
+ // deployed we unload the application and throw.
+ //
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
+ }
+ catch(const DeploymentException& ex)
+ {
+ ApplicationUpdateInfo newUpdate;
+ {
+ Lock sync(*this);
+ entries.clear();
+ ApplicationHelper previous(_communicator, newDesc);
+ ApplicationHelper helper(_communicator, oldApp.descriptor);
+ reload(previous, helper, entries, oldApp.uuid, oldApp.revision);
+ }
+
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::syncAndWait));
+ }
+ catch(const DeploymentException& ex)
+ {
+ Ice::Error err(_traceLevels->logger);
+ err << "failed to rollback previous application `" << oldApp.descriptor.name << "':\n" << ex.reason;
+ }
+
+ finishUpdating(newDesc.name);
+ throw ex;
+ }
}
//
@@ -1516,24 +1516,24 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
//
int serial;
{
- Lock sync(*this);
-
- ApplicationInfo info = oldApp;
- info.updateTime = update.updateTime;
- info.updateUser = update.updateUser;
- info.revision = update.revision;
- info.descriptor = newDesc;
-
- _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
- ++_applicationSerial;
+ Lock sync(*this);
+
+ ApplicationInfo info = oldApp;
+ info.updateTime = update.updateTime;
+ info.updateUser = update.updateUser;
+ info.revision = update.revision;
+ info.descriptor = newDesc;
+
+ _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
+ ++_applicationSerial;
- if(_traceLevels->application > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "updated application `" << update.descriptor.name << "'";
- }
-
- serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "updated application `" << update.descriptor.name << "'";
+ }
+
+ serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
}
_applicationObserverTopic->waitForSyncedSubscribers(serial);
@@ -1556,9 +1556,9 @@ Database::finishUpdating(const string& name)
map<string, vector<AMD_NodeSession_waitForApplicationUpdatePtr> >::iterator p = _updating.find(name);
assert(p != _updating.end());
for(vector<AMD_NodeSession_waitForApplicationUpdatePtr>::const_iterator q = p->second.begin();
- q != p->second.end(); ++q)
+ q != p->second.end(); ++q)
{
- (*q)->ice_response();
+ (*q)->ice_response();
}
_updating.erase(p);