summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-09-06 15:39:41 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-09-06 15:39:41 +0000
commit6f46bb760b30ef883b386dfa8e695c8d5004f05f (patch)
treec3dabd2d404b72a8e4ad16996a913ceee963815e /cpp/src/IceGrid/Database.cpp
parentFixed bug 1209 (diff)
downloadice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.tar.bz2
ice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.tar.xz
ice-6f46bb760b30ef883b386dfa8e695c8d5004f05f.zip
The master now waits for the replicas to be updated before to return.
Added support for dynamic registration of adapters in the replicas.
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp287
1 files changed, 157 insertions, 130 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index d0f64b6e7b8..ac63076458c 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -9,7 +9,6 @@
#include <IceUtil/StringUtil.h>
#include <IceUtil/Random.h>
-#include <IceUtil/UUID.h>
#include <Freeze/Freeze.h>
#include <IceGrid/Database.h>
#include <IceGrid/TraceLevels.h>
@@ -371,138 +370,145 @@ Database::getReplicatedEndpoints(const string& name, const Ice::ObjectPrx& proxy
}
void
-Database::addApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& desc)
+Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
{
ServerEntrySeq entries;
- string uuid = IceUtil::generateUUID();
{
Lock sync(*this);
checkSessionLock(session);
- while(_updating.find(desc.name) != _updating.end())
+ while(_updating.find(info.descriptor.name) != _updating.end())
{
wait();
}
- if(_applications.find(desc.name) != _applications.end())
+ if(_applications.find(info.descriptor.name) != _applications.end())
{
- throw DeploymentException("application `" + desc.name + "' already exists");
+ throw DeploymentException("application `" + info.descriptor.name + "' already exists");
}
- ApplicationHelper helper(_communicator, desc);
+ ApplicationHelper helper(_communicator, info.descriptor);
checkForAddition(helper);
- load(helper, entries, uuid, 1);
- _updating.insert(desc.name);
+ load(helper, entries, info.uuid, info.revision);
+ _updating.insert(info.descriptor.name);
}
//
- // Synchronize the servers on the nodes. If a server couldn't be
- // deployed we unload the application and throw.
+ // If the update is from an admin session, we synchronize the
+ // servers and throw if there's errors.
//
- try
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- }
- catch(const DeploymentException& ex)
+ if(session)
{
- {
- Lock sync(*this);
- entries.clear();
- unload(ApplicationHelper(_communicator, desc), entries);
- _updating.erase(desc.name);
- notifyAll();
- }
+ //
+ // Synchronize the servers on the nodes. If a server couldn't be
+ // deployed we unload the application and throw.
+ //
try
{
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
- catch(const DeploymentException&)
+ catch(const DeploymentException& ex)
{
- // TODO: warning?
+ {
+ Lock sync(*this);
+ entries.clear();
+ unload(ApplicationHelper(_communicator, info.descriptor), entries);
+ _updating.erase(info.descriptor.name);
+ notifyAll();
+ }
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ }
+ catch(const DeploymentException&)
+ {
+ // TODO: warning?
+ }
+ throw ex;
}
- throw ex;
+ }
+ else
+ {
+ // TODO: XXX: Synchronize the servers here?!
}
//
// Save the application descriptor.
//
int serial;
- ApplicationInfo info;
{
- Lock sync(*this);
-
- info.createTime = info.updateTime = IceUtil::Time::now().toMilliSeconds();
- info.createUser = info.updateUser = _lockUserId;
- info.descriptor = desc;
- info.revision = 1;
- info.uuid = uuid;
-
- _applications.put(StringApplicationInfoDict::value_type(desc.name, info));
-
+ Lock sync(*this);
+ _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
serial = ++_applicationSerial;
- _updating.erase(desc.name);
+ _updating.erase(info.descriptor.name);
notifyAll();
}
- //
- // Notify the observers.
- //
_applicationObserverTopic->applicationAdded(serial, info);
+ _replicaCache.waitForUpdateReplication("application", serial);
if(_traceLevels->application > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "added application `" << desc.name << "'";
+ out << "added application `" << info.descriptor.name << "'";
}
}
void
-Database::updateApplicationDescriptor(AdminSessionI* session, const ApplicationUpdateDescriptor& update)
+Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* session)
{
ServerEntrySeq entries;
ApplicationInfo oldApp;
ApplicationDescriptor newDesc;
+ ApplicationUpdateInfo update = updt;
{
Lock sync(*this);
checkSessionLock(session);
- while(_updating.find(update.name) != _updating.end())
+ while(_updating.find(update.descriptor.name) != _updating.end())
{
wait();
}
- StringApplicationInfoDict::const_iterator p = _applications.find(update.name);
+ StringApplicationInfoDict::const_iterator p = _applications.find(update.descriptor.name);
if(p == _applications.end())
{
- throw ApplicationNotExistException(update.name);
+ throw ApplicationNotExistException(update.descriptor.name);
}
oldApp = p->second;
+ if(update.revision < 0)
+ {
+ update.revision = oldApp.revision + 1;
+ }
+
ApplicationHelper previous(_communicator, oldApp.descriptor);
- ApplicationHelper helper(_communicator, previous.update(update));
+ ApplicationHelper helper(_communicator, previous.update(update.descriptor));
checkForUpdate(previous, helper);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
newDesc = helper.getDefinition();
- _updating.insert(update.name);
+ _updating.insert(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc);
+ finishUpdate(entries, update, oldApp, newDesc, session);
}
void
-Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDescriptor& newDesc)
+Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminSessionI* session)
{
+ assert(session);
+
ServerEntrySeq entries;
- ApplicationUpdateDescriptor update;
+ ApplicationUpdateInfo update;
ApplicationInfo oldApp;
{
Lock sync(*this);
checkSessionLock(session);
- while(_updating.find(update.name) != _updating.end())
+ while(_updating.find(update.descriptor.name) != _updating.end())
{
wait();
}
@@ -516,32 +522,38 @@ Database::syncApplicationDescriptor(AdminSessionI* session, const ApplicationDes
ApplicationHelper previous(_communicator, oldApp.descriptor);
ApplicationHelper helper(_communicator, newDesc);
- update = helper.diff(previous);
+
+ update.updateTime = IceUtil::Time::now().toMilliSeconds();
+ update.updateUser = _lockUserId;
+ update.revision = oldApp.revision + 1;
+ update.descriptor = helper.diff(previous);
checkForUpdate(previous, helper);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
- _updating.insert(update.name);
+ _updating.insert(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc);
+ finishUpdate(entries, update, oldApp, newDesc, session);
}
void
-Database::instantiateServer(AdminSessionI* session,
- const string& application,
+Database::instantiateServer(const string& application,
const string& node,
- const ServerInstanceDescriptor& instance)
+ const ServerInstanceDescriptor& instance,
+ AdminSessionI* session)
{
+ assert(session);
+
ServerEntrySeq entries;
- ApplicationUpdateDescriptor update;
+ ApplicationUpdateInfo update;
ApplicationInfo oldApp;
ApplicationDescriptor newDesc;
{
Lock sync(*this);
checkSessionLock(session);
- while(_updating.find(update.name) != _updating.end())
+ while(_updating.find(application) != _updating.end())
{
wait();
}
@@ -555,21 +567,25 @@ Database::instantiateServer(AdminSessionI* session,
ApplicationHelper previous(_communicator, oldApp.descriptor);
ApplicationHelper helper(_communicator, previous.instantiateServer(node, instance));
- update = helper.diff(previous);
+
+ update.updateTime = IceUtil::Time::now().toMilliSeconds();
+ update.updateUser = _lockUserId;
+ update.revision = oldApp.revision + 1;
+ update.descriptor = helper.diff(previous);
checkForUpdate(previous, helper);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
newDesc = helper.getDefinition();
- _updating.insert(update.name);
+ _updating.insert(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc);
+ finishUpdate(entries, update, oldApp, newDesc, session);
}
void
-Database::removeApplicationDescriptor(AdminSessionI* session, const std::string& name)
+Database::removeApplication(const string& name, AdminSessionI* session)
{
ServerEntrySeq entries;
int serial;
@@ -607,10 +623,8 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string&
serial = ++_applicationSerial;
}
- //
- // Notify the observers
- //
_applicationObserverTopic->applicationRemoved(serial, name);
+ _replicaCache.waitForUpdateReplication("application", serial);
if(_traceLevels->application > 0)
{
@@ -618,7 +632,14 @@ Database::removeApplicationDescriptor(AdminSessionI* session, const std::string&
out << "removed application `" << name << "'";
}
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ if(session)
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ }
+ else
+ {
+ // TODO: XXX: synchronize the servers
+ }
}
ApplicationInfo
@@ -707,11 +728,15 @@ Database::getAllNodes(const string& expression)
void
Database::addReplica(const string& name, const ReplicaSessionIPtr& session)
{
- _replicaCache.add(name, session);
+ //
+ // NOTE: this must be done before we add the replica to the cache
+ // in order for ReplicaCache::waitForUpdateReplication to work.
+ //
+ _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
+ _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
+ _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
- _applicationObserverTopic->subscribe(session->getObserver());
- _adapterObserverTopic->subscribe(session->getObserver());
- _objectObserverTopic->subscribe(session->getObserver());
+ _replicaCache.add(name, session);
}
InternalRegistryPrx
@@ -727,13 +752,19 @@ Database::getReplicaInfo(const string& name) const
}
void
+Database::replicaReceivedUpdate(const string& name, const string& update, int serial)
+{
+ _replicaCache.replicaReceivedUpdate(name, update, serial);
+}
+
+void
Database::removeReplica(const string& name, const ReplicaSessionIPtr& session)
{
+ _replicaCache.remove(name);
+
_applicationObserverTopic->unsubscribe(session->getObserver());
_adapterObserverTopic->unsubscribe(session->getObserver());
_objectObserverTopic->unsubscribe(session->getObserver());
-
- _replicaCache.remove(name);
}
Ice::StringSeq
@@ -843,6 +874,7 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
{
_adapterObserverTopic->adapterRemoved(serial, adapterId);
}
+ _replicaCache.waitForUpdateReplication("adapter", serial);
return true;
}
@@ -908,7 +940,7 @@ Database::removeAdapter(const string& adapterId)
}
else
{
- serial = _adapterSerial;
+ serial = _adapterSerial + 1;
_adapterSerial += static_cast<int>(static_cast<int>(infos.size()));
}
}
@@ -922,13 +954,21 @@ Database::removeAdapter(const string& adapterId)
if(infos.empty())
{
_adapterObserverTopic->adapterRemoved(serial, adapterId);
+ _replicaCache.waitForUpdateReplication("adapter", serial);
}
else
{
- for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ int i = 0;
+ AdapterInfoSeq::const_iterator p;
+ for(p = infos.begin(); p != infos.end(); ++p, ++i)
{
- _adapterObserverTopic->adapterUpdated(++serial, *p);
+ _adapterObserverTopic->adapterUpdated(serial + i, *p);
}
+ i = 0;
+ for(p = infos.begin(); p != infos.end(); ++p, ++i)
+ {
+ _replicaCache.waitForUpdateReplication("adapter", serial + i);
+ }
}
}
@@ -1110,10 +1150,7 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase)
serial = ++_objectSerial;
}
-
- //
- // Notify the observers.
- //
+
if(!update)
{
_objectObserverTopic->objectAdded(serial, info);
@@ -1122,6 +1159,7 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase)
{
_objectObserverTopic->objectUpdated(serial, info);
}
+ _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1165,10 +1203,8 @@ Database::removeObject(const Ice::Identity& id)
serial = ++_objectSerial;
}
- //
- // Notify the observers.
- //
_objectObserverTopic->objectRemoved(serial, id);
+ _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1210,10 +1246,8 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
serial = ++_objectSerial;
}
- //
- // Notify the observers.
- //
_objectObserverTopic->objectUpdated(serial, info);
+ _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1241,9 +1275,6 @@ Database::addOrUpdateObjectsInDatabase(const ObjectInfoSeq& objects)
txHolder.commit();
}
- //
- // Notify the observers.
- //
vector<bool>::const_iterator q = updated.begin();
for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p, ++q)
{
@@ -1288,10 +1319,7 @@ Database::removeObjectsInDatabase(const ObjectInfoSeq& objects)
_objectSerial += static_cast<int>(static_cast<int>(objects.size()));
txHolder.commit();
}
-
- //
- // Notify the observers.
- //
+
for(ObjectInfoSeq::const_iterator p = objects.begin(); p != objects.end(); ++p)
{
_objectObserverTopic->objectRemoved(++serial, p->proxy->ice_getIdentity());
@@ -1721,70 +1749,69 @@ Database::reload(const ApplicationHelper& oldApp,
void
Database::finishUpdate(ServerEntrySeq& entries,
- const ApplicationUpdateDescriptor& update,
+ const ApplicationUpdateInfo& update,
const ApplicationInfo& oldApp,
- const ApplicationDescriptor& newDesc)
+ const ApplicationDescriptor& newDesc,
+ AdminSessionI* session)
{
-
- //
- // Synchronize the servers on the nodes. If a server couldn't be
- // deployed we unload the application and throw.
- //
- try
- {
- for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
- }
- catch(const DeploymentException& ex)
+ if(session)
{
- {
- Lock sync(*this);
- entries.clear();
- ApplicationHelper previous(_communicator, newDesc);
- ApplicationHelper helper(_communicator, oldApp.descriptor);
- reload(previous, helper, entries, oldApp.uuid, oldApp.revision);
- _updating.erase(newDesc.name);
- notifyAll();
- }
+ //
+ // Synchronize the servers on the nodes. If a server couldn't be
+ // deployed we unload the application and throw.
+ //
try
{
for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
}
- catch(const DeploymentException&)
+ catch(const DeploymentException& ex)
{
- // TODO: warning?
+ {
+ Lock sync(*this);
+ entries.clear();
+ ApplicationHelper previous(_communicator, newDesc);
+ ApplicationHelper helper(_communicator, oldApp.descriptor);
+ reload(previous, helper, entries, oldApp.uuid, oldApp.revision);
+ _updating.erase(newDesc.name);
+ notifyAll();
+ }
+ try
+ {
+ for_each(entries.begin(), entries.end(), IceUtil::voidMemFun(&ServerEntry::sync));
+ }
+ catch(const DeploymentException&)
+ {
+ // TODO: warning?
+ }
+ throw ex;
}
- throw ex;
}
//
// Save the application descriptor.
//
int serial;
- ApplicationUpdateInfo updateInfo;
{
Lock sync(*this);
ApplicationInfo info = oldApp;
- info.updateTime = updateInfo.updateTime = IceUtil::Time::now().toMilliSeconds();
- info.updateUser = updateInfo.updateUser = _lockUserId;
- info.revision = updateInfo.revision = oldApp.revision + 1;
+ info.updateTime = update.updateTime;
+ info.updateUser = update.updateUser;
+ info.revision = update.revision;
info.descriptor = newDesc;
- updateInfo.descriptor = update;
- _applications.put(StringApplicationInfoDict::value_type(update.name, info));
+ _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
serial = ++_applicationSerial;
- _updating.erase(update.name);
+ _updating.erase(update.descriptor.name);
notifyAll();
}
- //
- // Notify the observers.
- //
- _applicationObserverTopic->applicationUpdated(serial, updateInfo);
+ _applicationObserverTopic->applicationUpdated(serial, update);
+ _replicaCache.waitForUpdateReplication("application", serial);
if(_traceLevels->application > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "updated application `" << update.name << "'";
+ out << "updated application `" << update.descriptor.name << "'";
}
}