summaryrefslogtreecommitdiff
path: root/cpp/src/IceGrid/Database.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-09-13 10:09:50 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-09-13 10:09:50 +0000
commite65e272d55bfe9f9d26de43b73721c04b683234a (patch)
tree0b0e04bbaf26fe357a1620e4725986d5d750c003 /cpp/src/IceGrid/Database.cpp
parentFixed for VC6 (STLport bug?) (diff)
downloadice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.bz2
ice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.xz
ice-e65e272d55bfe9f9d26de43b73721c04b683234a.zip
Fixes
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp136
1 files changed, 76 insertions, 60 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 42082b7fddc..457660751c9 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -230,10 +230,9 @@ Database::getObserverTopic(TopicName name) const
case ObjectObserverTopicName:
return _objectObserverTopic;
default:
- assert(false);
break;
}
- return 0; // Keep the compiler happy.
+ return 0;
}
void
@@ -392,9 +391,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
ApplicationHelper helper(_communicator, info.descriptor);
checkForAddition(helper);
load(helper, entries, info.uuid, info.revision);
- _updating.insert(info.descriptor.name);
-
- _replicaCache.startApplicationReplication(info.uuid, info.revision);
+ startUpdating(info.descriptor.name);
}
if(_master)
@@ -410,7 +407,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
Lock sync(*this);
entries.clear();
unload(ApplicationHelper(_communicator, info.descriptor), entries);
- _updating.erase(info.descriptor.name);
+ finishUpdating(info.descriptor.name);
notifyAll();
}
catch(const DeploymentException& ex)
@@ -422,21 +419,20 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
}
}
- //
- // Save the application descriptor.
- //
int serial;
{
Lock sync(*this);
- _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
serial = ++_applicationSerial;
- _updating.erase(info.descriptor.name);
- notifyAll();
+ _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
}
_applicationObserverTopic->applicationAdded(serial, info);
- _replicaCache.waitForUpdateReplication("application", serial);
- _replicaCache.finishApplicationReplication(info.uuid, info.revision);
+
+ {
+ Lock sync(*this);
+ finishUpdating(info.descriptor.name);
+ notifyAll();
+ }
if(_traceLevels->application > 0)
{
@@ -481,11 +477,10 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se
newDesc = helper.getDefinition();
- _updating.insert(update.descriptor.name);
- _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
+ startUpdating(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(entries, update, oldApp, newDesc, session);
}
void
@@ -523,11 +518,10 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS
checkForUpdate(previous, helper);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
- _updating.insert(update.descriptor.name);
- _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
+ startUpdating(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(entries, update, oldApp, newDesc, session);
}
void
@@ -571,11 +565,10 @@ Database::instantiateServer(const string& application,
newDesc = helper.getDefinition();
- _updating.insert(update.descriptor.name);
- _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
+ startUpdating(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(entries, update, oldApp, newDesc, session);
}
void
@@ -618,7 +611,6 @@ Database::removeApplication(const string& name, AdminSessionI* session)
}
_applicationObserverTopic->applicationRemoved(serial, name);
- _replicaCache.waitForUpdateReplication("application", serial);
if(_traceLevels->application > 0)
{
@@ -729,15 +721,11 @@ Database::getAllNodes(const string& expression)
void
Database::addReplica(const string& name, const ReplicaSessionIPtr& session)
{
- //
- // NOTE: this must be done before we add the replica to the cache
- // in order for ReplicaCache::waitForUpdateReplication to work.
- //
- _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
- _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
- _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
-
_replicaCache.add(name, session);
+
+ _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name);
+ _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name);
+ _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name);
}
InternalRegistryPrx
@@ -753,25 +741,40 @@ Database::getReplicaInfo(const string& name) const
}
void
-Database::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure)
+Database::replicaReceivedUpdate(const string& replica, TopicName name, int serial, const string& failure)
{
- _replicaCache.replicaReceivedUpdate(name, update, serial, failure);
+ ObserverTopicPtr topic = getObserverTopic(name);
+ if(topic)
+ {
+ topic->receivedUpdate(replica, serial, failure);
+ }
}
void
-Database::waitForApplicationReplication(const string& application, int revision)
+Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr& cb,
+ const string& application,
+ int revision)
{
- _replicaCache.waitForApplicationReplication(application, revision);
+ Lock sync(*this);
+ map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(application);
+ if(p != _updating.end())
+ {
+ p->second.push_back(cb);
+ }
+ else
+ {
+ cb->ice_response();
+ }
}
void
Database::removeReplica(const string& name, const ReplicaSessionIPtr& session)
{
- _replicaCache.remove(name);
+ _applicationObserverTopic->unsubscribe(session->getObserver(), name);
+ _adapterObserverTopic->unsubscribe(session->getObserver(), name);
+ _objectObserverTopic->unsubscribe(session->getObserver(), name);
- _applicationObserverTopic->unsubscribe(session->getObserver());
- _adapterObserverTopic->unsubscribe(session->getObserver());
- _objectObserverTopic->unsubscribe(session->getObserver());
+ _replicaCache.remove(name);
}
Ice::StringSeq
@@ -881,7 +884,6 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
{
_adapterObserverTopic->adapterRemoved(serial, adapterId);
}
- _replicaCache.waitForUpdateReplication("adapter", serial);
return true;
}
@@ -961,7 +963,6 @@ Database::removeAdapter(const string& adapterId)
if(infos.empty())
{
_adapterObserverTopic->adapterRemoved(serial, adapterId);
- _replicaCache.waitForUpdateReplication("adapter", serial);
}
else
{
@@ -971,11 +972,6 @@ Database::removeAdapter(const string& adapterId)
{
_adapterObserverTopic->adapterUpdated(serial + i, *p);
}
- i = 0;
- for(p = infos.begin(); p != infos.end(); ++p, ++i)
- {
- _replicaCache.waitForUpdateReplication("adapter", serial + i);
- }
}
}
@@ -1166,7 +1162,6 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase)
{
_objectObserverTopic->objectUpdated(serial, info);
}
- _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1211,7 +1206,6 @@ Database::removeObject(const Ice::Identity& id)
}
_objectObserverTopic->objectRemoved(serial, id);
- _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1254,7 +1248,6 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
}
_objectObserverTopic->objectUpdated(serial, info);
- _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1755,11 +1748,11 @@ Database::reload(const ApplicationHelper& oldApp,
}
void
-Database::finishUpdate(ServerEntrySeq& entries,
- const ApplicationUpdateInfo& update,
- const ApplicationInfo& oldApp,
- const ApplicationDescriptor& newDesc,
- AdminSessionI* session)
+Database::finishApplicationUpdate(ServerEntrySeq& entries,
+ const ApplicationUpdateInfo& update,
+ const ApplicationInfo& oldApp,
+ const ApplicationDescriptor& newDesc,
+ AdminSessionI* session)
{
if(_master)
{
@@ -1780,7 +1773,7 @@ Database::finishUpdate(ServerEntrySeq& entries,
ApplicationHelper previous(_communicator, newDesc);
ApplicationHelper helper(_communicator, oldApp.descriptor);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision);
- _updating.erase(newDesc.name);
+ finishUpdating(newDesc.name);
notifyAll();
}
@@ -1812,13 +1805,15 @@ Database::finishUpdate(ServerEntrySeq& entries,
_applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
serial = ++_applicationSerial;
- _updating.erase(update.descriptor.name);
- notifyAll();
}
_applicationObserverTopic->applicationUpdated(serial, update);
- _replicaCache.waitForUpdateReplication("application", serial);
- _replicaCache.finishApplicationReplication(oldApp.uuid, update.revision);
+
+ {
+ Lock sync(*this);
+ finishUpdating(update.descriptor.name);
+ notifyAll();
+ }
if(_traceLevels->application > 0)
{
@@ -1826,3 +1821,24 @@ Database::finishUpdate(ServerEntrySeq& entries,
out << "updated application `" << update.descriptor.name << "'";
}
}
+
+void
+Database::startUpdating(const string& name)
+{
+ // Must be called within the synchronization.
+ _updating.insert(make_pair(name, vector<AMD_NodeSession_waitForApplicationReplicationPtr>()));
+}
+
+void
+Database::finishUpdating(const string& name)
+{
+ // Must be called within the synchronization.
+ map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(name);
+ assert(p != _updating.end());
+ for(vector<AMD_NodeSession_waitForApplicationReplicationPtr>::const_iterator q = p->second.begin();
+ q != p->second.end(); ++q)
+ {
+ (*q)->ice_response();
+ }
+ _updating.erase(p);
+}