diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-09-13 10:09:50 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-09-13 10:09:50 +0000 |
commit | e65e272d55bfe9f9d26de43b73721c04b683234a (patch) | |
tree | 0b0e04bbaf26fe357a1620e4725986d5d750c003 /cpp/src/IceGrid/Database.cpp | |
parent | Fixed for VC6 (STLport bug?) (diff) | |
download | ice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.bz2 ice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.xz ice-e65e272d55bfe9f9d26de43b73721c04b683234a.zip |
Fixes
Diffstat (limited to 'cpp/src/IceGrid/Database.cpp')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 136 |
1 files changed, 76 insertions, 60 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 42082b7fddc..457660751c9 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -230,10 +230,9 @@ Database::getObserverTopic(TopicName name) const case ObjectObserverTopicName: return _objectObserverTopic; default: - assert(false); break; } - return 0; // Keep the compiler happy. + return 0; } void @@ -392,9 +391,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) ApplicationHelper helper(_communicator, info.descriptor); checkForAddition(helper); load(helper, entries, info.uuid, info.revision); - _updating.insert(info.descriptor.name); - - _replicaCache.startApplicationReplication(info.uuid, info.revision); + startUpdating(info.descriptor.name); } if(_master) @@ -410,7 +407,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) Lock sync(*this); entries.clear(); unload(ApplicationHelper(_communicator, info.descriptor), entries); - _updating.erase(info.descriptor.name); + finishUpdating(info.descriptor.name); notifyAll(); } catch(const DeploymentException& ex) @@ -422,21 +419,20 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) } } - // - // Save the application descriptor. - // int serial; { Lock sync(*this); - _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); serial = ++_applicationSerial; - _updating.erase(info.descriptor.name); - notifyAll(); + _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); } _applicationObserverTopic->applicationAdded(serial, info); - _replicaCache.waitForUpdateReplication("application", serial); - _replicaCache.finishApplicationReplication(info.uuid, info.revision); + + { + Lock sync(*this); + finishUpdating(info.descriptor.name); + notifyAll(); + } if(_traceLevels->application > 0) { @@ -481,11 +477,10 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se newDesc = helper.getDefinition(); - _updating.insert(update.descriptor.name); - _replicaCache.startApplicationReplication(oldApp.uuid, update.revision); + startUpdating(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc, session); + finishApplicationUpdate(entries, update, oldApp, newDesc, session); } void @@ -523,11 +518,10 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS checkForUpdate(previous, helper); reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); - _updating.insert(update.descriptor.name); - _replicaCache.startApplicationReplication(oldApp.uuid, update.revision); + startUpdating(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc, session); + finishApplicationUpdate(entries, update, oldApp, newDesc, session); } void @@ -571,11 +565,10 @@ Database::instantiateServer(const string& application, newDesc = helper.getDefinition(); - _updating.insert(update.descriptor.name); - _replicaCache.startApplicationReplication(oldApp.uuid, update.revision); + startUpdating(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc, session); + finishApplicationUpdate(entries, update, oldApp, newDesc, session); } void @@ -618,7 +611,6 @@ Database::removeApplication(const string& name, AdminSessionI* session) } _applicationObserverTopic->applicationRemoved(serial, name); - _replicaCache.waitForUpdateReplication("application", serial); if(_traceLevels->application > 0) { @@ -729,15 +721,11 @@ Database::getAllNodes(const string& expression) void Database::addReplica(const string& name, const ReplicaSessionIPtr& session) { - // - // NOTE: this must be done before we add the replica to the cache - // in order for ReplicaCache::waitForUpdateReplication to work. - // - _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); - _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); - _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); - _replicaCache.add(name, session); + + _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name); + _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name); + _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name); } InternalRegistryPrx @@ -753,25 +741,40 @@ Database::getReplicaInfo(const string& name) const } void -Database::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure) +Database::replicaReceivedUpdate(const string& replica, TopicName name, int serial, const string& failure) { - _replicaCache.replicaReceivedUpdate(name, update, serial, failure); + ObserverTopicPtr topic = getObserverTopic(name); + if(topic) + { + topic->receivedUpdate(replica, serial, failure); + } } void -Database::waitForApplicationReplication(const string& application, int revision) +Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr& cb, + const string& application, + int revision) { - _replicaCache.waitForApplicationReplication(application, revision); + Lock sync(*this); + map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(application); + if(p != _updating.end()) + { + p->second.push_back(cb); + } + else + { + cb->ice_response(); + } } void Database::removeReplica(const string& name, const ReplicaSessionIPtr& session) { - _replicaCache.remove(name); + _applicationObserverTopic->unsubscribe(session->getObserver(), name); + _adapterObserverTopic->unsubscribe(session->getObserver(), name); + _objectObserverTopic->unsubscribe(session->getObserver(), name); - _applicationObserverTopic->unsubscribe(session->getObserver()); - _adapterObserverTopic->unsubscribe(session->getObserver()); - _objectObserverTopic->unsubscribe(session->getObserver()); + _replicaCache.remove(name); } Ice::StringSeq @@ -881,7 +884,6 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr { _adapterObserverTopic->adapterRemoved(serial, adapterId); } - _replicaCache.waitForUpdateReplication("adapter", serial); return true; } @@ -961,7 +963,6 @@ Database::removeAdapter(const string& adapterId) if(infos.empty()) { _adapterObserverTopic->adapterRemoved(serial, adapterId); - _replicaCache.waitForUpdateReplication("adapter", serial); } else { @@ -971,11 +972,6 @@ Database::removeAdapter(const string& adapterId) { _adapterObserverTopic->adapterUpdated(serial + i, *p); } - i = 0; - for(p = infos.begin(); p != infos.end(); ++p, ++i) - { - _replicaCache.waitForUpdateReplication("adapter", serial + i); - } } } @@ -1166,7 +1162,6 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) { _objectObserverTopic->objectUpdated(serial, info); } - _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1211,7 +1206,6 @@ Database::removeObject(const Ice::Identity& id) } _objectObserverTopic->objectRemoved(serial, id); - _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1254,7 +1248,6 @@ Database::updateObject(const Ice::ObjectPrx& proxy) } _objectObserverTopic->objectUpdated(serial, info); - _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1755,11 +1748,11 @@ Database::reload(const ApplicationHelper& oldApp, } void -Database::finishUpdate(ServerEntrySeq& entries, - const ApplicationUpdateInfo& update, - const ApplicationInfo& oldApp, - const ApplicationDescriptor& newDesc, - AdminSessionI* session) +Database::finishApplicationUpdate(ServerEntrySeq& entries, + const ApplicationUpdateInfo& update, + const ApplicationInfo& oldApp, + const ApplicationDescriptor& newDesc, + AdminSessionI* session) { if(_master) { @@ -1780,7 +1773,7 @@ Database::finishUpdate(ServerEntrySeq& entries, ApplicationHelper previous(_communicator, newDesc); ApplicationHelper helper(_communicator, oldApp.descriptor); reload(previous, helper, entries, oldApp.uuid, oldApp.revision); - _updating.erase(newDesc.name); + finishUpdating(newDesc.name); notifyAll(); } @@ -1812,13 +1805,15 @@ Database::finishUpdate(ServerEntrySeq& entries, _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info)); serial = ++_applicationSerial; - _updating.erase(update.descriptor.name); - notifyAll(); } _applicationObserverTopic->applicationUpdated(serial, update); - _replicaCache.waitForUpdateReplication("application", serial); - _replicaCache.finishApplicationReplication(oldApp.uuid, update.revision); + + { + Lock sync(*this); + finishUpdating(update.descriptor.name); + notifyAll(); + } if(_traceLevels->application > 0) { @@ -1826,3 +1821,24 @@ Database::finishUpdate(ServerEntrySeq& entries, out << "updated application `" << update.descriptor.name << "'"; } } + +void +Database::startUpdating(const string& name) +{ + // Must be called within the synchronization. + _updating.insert(make_pair(name, vector<AMD_NodeSession_waitForApplicationReplicationPtr>())); +} + +void +Database::finishUpdating(const string& name) +{ + // Must be called within the synchronization. + map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(name); + assert(p != _updating.end()); + for(vector<AMD_NodeSession_waitForApplicationReplicationPtr>::const_iterator q = p->second.begin(); + q != p->second.end(); ++q) + { + (*q)->ice_response(); + } + _updating.erase(p); +} |