diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-09-13 10:09:50 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-09-13 10:09:50 +0000 |
commit | e65e272d55bfe9f9d26de43b73721c04b683234a (patch) | |
tree | 0b0e04bbaf26fe357a1620e4725986d5d750c003 /cpp/src | |
parent | Fixed for VC6 (STLport bug?) (diff) | |
download | ice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.bz2 ice-e65e272d55bfe9f9d26de43b73721c04b683234a.tar.xz ice-e65e272d55bfe9f9d26de43b73721c04b683234a.zip |
Fixes
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 136 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.h | 14 | ||||
-rw-r--r-- | cpp/src/IceGrid/Internal.ice | 13 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.cpp | 7 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 153 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.h | 13 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionI.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.cpp | 26 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaSessionManager.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 185 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 39 |
13 files changed, 289 insertions, 308 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 42082b7fddc..457660751c9 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -230,10 +230,9 @@ Database::getObserverTopic(TopicName name) const case ObjectObserverTopicName: return _objectObserverTopic; default: - assert(false); break; } - return 0; // Keep the compiler happy. + return 0; } void @@ -392,9 +391,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) ApplicationHelper helper(_communicator, info.descriptor); checkForAddition(helper); load(helper, entries, info.uuid, info.revision); - _updating.insert(info.descriptor.name); - - _replicaCache.startApplicationReplication(info.uuid, info.revision); + startUpdating(info.descriptor.name); } if(_master) @@ -410,7 +407,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) Lock sync(*this); entries.clear(); unload(ApplicationHelper(_communicator, info.descriptor), entries); - _updating.erase(info.descriptor.name); + finishUpdating(info.descriptor.name); notifyAll(); } catch(const DeploymentException& ex) @@ -422,21 +419,20 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) } } - // - // Save the application descriptor. - // int serial; { Lock sync(*this); - _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); serial = ++_applicationSerial; - _updating.erase(info.descriptor.name); - notifyAll(); + _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); } _applicationObserverTopic->applicationAdded(serial, info); - _replicaCache.waitForUpdateReplication("application", serial); - _replicaCache.finishApplicationReplication(info.uuid, info.revision); + + { + Lock sync(*this); + finishUpdating(info.descriptor.name); + notifyAll(); + } if(_traceLevels->application > 0) { @@ -481,11 +477,10 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se newDesc = helper.getDefinition(); - _updating.insert(update.descriptor.name); - _replicaCache.startApplicationReplication(oldApp.uuid, update.revision); + startUpdating(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc, session); + finishApplicationUpdate(entries, update, oldApp, newDesc, session); } void @@ -523,11 +518,10 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS checkForUpdate(previous, helper); reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1); - _updating.insert(update.descriptor.name); - _replicaCache.startApplicationReplication(oldApp.uuid, update.revision); + startUpdating(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc, session); + finishApplicationUpdate(entries, update, oldApp, newDesc, session); } void @@ -571,11 +565,10 @@ Database::instantiateServer(const string& application, newDesc = helper.getDefinition(); - _updating.insert(update.descriptor.name); - _replicaCache.startApplicationReplication(oldApp.uuid, update.revision); + startUpdating(update.descriptor.name); } - finishUpdate(entries, update, oldApp, newDesc, session); + finishApplicationUpdate(entries, update, oldApp, newDesc, session); } void @@ -618,7 +611,6 @@ Database::removeApplication(const string& name, AdminSessionI* session) } _applicationObserverTopic->applicationRemoved(serial, name); - _replicaCache.waitForUpdateReplication("application", serial); if(_traceLevels->application > 0) { @@ -729,15 +721,11 @@ Database::getAllNodes(const string& expression) void Database::addReplica(const string& name, const ReplicaSessionIPtr& session) { - // - // NOTE: this must be done before we add the replica to the cache - // in order for ReplicaCache::waitForUpdateReplication to work. - // - _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); - _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); - _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver()); - _replicaCache.add(name, session); + + _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name); + _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name); + _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name); } InternalRegistryPrx @@ -753,25 +741,40 @@ Database::getReplicaInfo(const string& name) const } void -Database::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure) +Database::replicaReceivedUpdate(const string& replica, TopicName name, int serial, const string& failure) { - _replicaCache.replicaReceivedUpdate(name, update, serial, failure); + ObserverTopicPtr topic = getObserverTopic(name); + if(topic) + { + topic->receivedUpdate(replica, serial, failure); + } } void -Database::waitForApplicationReplication(const string& application, int revision) +Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr& cb, + const string& application, + int revision) { - _replicaCache.waitForApplicationReplication(application, revision); + Lock sync(*this); + map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(application); + if(p != _updating.end()) + { + p->second.push_back(cb); + } + else + { + cb->ice_response(); + } } void Database::removeReplica(const string& name, const ReplicaSessionIPtr& session) { - _replicaCache.remove(name); + _applicationObserverTopic->unsubscribe(session->getObserver(), name); + _adapterObserverTopic->unsubscribe(session->getObserver(), name); + _objectObserverTopic->unsubscribe(session->getObserver(), name); - _applicationObserverTopic->unsubscribe(session->getObserver()); - _adapterObserverTopic->unsubscribe(session->getObserver()); - _objectObserverTopic->unsubscribe(session->getObserver()); + _replicaCache.remove(name); } Ice::StringSeq @@ -881,7 +884,6 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr { _adapterObserverTopic->adapterRemoved(serial, adapterId); } - _replicaCache.waitForUpdateReplication("adapter", serial); return true; } @@ -961,7 +963,6 @@ Database::removeAdapter(const string& adapterId) if(infos.empty()) { _adapterObserverTopic->adapterRemoved(serial, adapterId); - _replicaCache.waitForUpdateReplication("adapter", serial); } else { @@ -971,11 +972,6 @@ Database::removeAdapter(const string& adapterId) { _adapterObserverTopic->adapterUpdated(serial + i, *p); } - i = 0; - for(p = infos.begin(); p != infos.end(); ++p, ++i) - { - _replicaCache.waitForUpdateReplication("adapter", serial + i); - } } } @@ -1166,7 +1162,6 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase) { _objectObserverTopic->objectUpdated(serial, info); } - _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1211,7 +1206,6 @@ Database::removeObject(const Ice::Identity& id) } _objectObserverTopic->objectRemoved(serial, id); - _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1254,7 +1248,6 @@ Database::updateObject(const Ice::ObjectPrx& proxy) } _objectObserverTopic->objectUpdated(serial, info); - _replicaCache.waitForUpdateReplication("object", serial); if(_traceLevels->object > 0) { @@ -1755,11 +1748,11 @@ Database::reload(const ApplicationHelper& oldApp, } void -Database::finishUpdate(ServerEntrySeq& entries, - const ApplicationUpdateInfo& update, - const ApplicationInfo& oldApp, - const ApplicationDescriptor& newDesc, - AdminSessionI* session) +Database::finishApplicationUpdate(ServerEntrySeq& entries, + const ApplicationUpdateInfo& update, + const ApplicationInfo& oldApp, + const ApplicationDescriptor& newDesc, + AdminSessionI* session) { if(_master) { @@ -1780,7 +1773,7 @@ Database::finishUpdate(ServerEntrySeq& entries, ApplicationHelper previous(_communicator, newDesc); ApplicationHelper helper(_communicator, oldApp.descriptor); reload(previous, helper, entries, oldApp.uuid, oldApp.revision); - _updating.erase(newDesc.name); + finishUpdating(newDesc.name); notifyAll(); } @@ -1812,13 +1805,15 @@ Database::finishUpdate(ServerEntrySeq& entries, _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info)); serial = ++_applicationSerial; - _updating.erase(update.descriptor.name); - notifyAll(); } _applicationObserverTopic->applicationUpdated(serial, update); - _replicaCache.waitForUpdateReplication("application", serial); - _replicaCache.finishApplicationReplication(oldApp.uuid, update.revision); + + { + Lock sync(*this); + finishUpdating(update.descriptor.name); + notifyAll(); + } if(_traceLevels->application > 0) { @@ -1826,3 +1821,24 @@ Database::finishUpdate(ServerEntrySeq& entries, out << "updated application `" << update.descriptor.name << "'"; } } + +void +Database::startUpdating(const string& name) +{ + // Must be called within the synchronization. + _updating.insert(make_pair(name, vector<AMD_NodeSession_waitForApplicationReplicationPtr>())); +} + +void +Database::finishUpdating(const string& name) +{ + // Must be called within the synchronization. + map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(name); + assert(p != _updating.end()); + for(vector<AMD_NodeSession_waitForApplicationReplicationPtr>::const_iterator q = p->second.begin(); + q != p->second.end(); ++q) + { + (*q)->ice_response(); + } + _updating.erase(p); +} diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h index e8226d776cd..4056c8005f6 100644 --- a/cpp/src/IceGrid/Database.h +++ b/cpp/src/IceGrid/Database.h @@ -92,8 +92,9 @@ public: void addReplica(const std::string&, const ReplicaSessionIPtr&); RegistryInfo getReplicaInfo(const std::string&) const; InternalRegistryPrx getReplica(const std::string&) const; - void replicaReceivedUpdate(const std::string&, const std::string&, int, const std::string&); - void waitForApplicationReplication(const std::string&, int); + void replicaReceivedUpdate(const std::string&, TopicName, int, const std::string&); + void waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr&, const std::string&, + int); void removeReplica(const std::string&, const ReplicaSessionIPtr&); Ice::StringSeq getAllReplicas(const std::string& = std::string()); @@ -142,11 +143,14 @@ private: void load(const ApplicationHelper&, ServerEntrySeq&, const std::string&, int); void unload(const ApplicationHelper&, ServerEntrySeq&); void reload(const ApplicationHelper&, const ApplicationHelper&, ServerEntrySeq&, const std::string&, int); - void finishUpdate(ServerEntrySeq&, const ApplicationUpdateInfo&, const ApplicationInfo&, - const ApplicationDescriptor&, AdminSessionI*); + void finishApplicationUpdate(ServerEntrySeq&, const ApplicationUpdateInfo&, const ApplicationInfo&, + const ApplicationDescriptor&, AdminSessionI*); void checkSessionLock(AdminSessionI*); + void startUpdating(const std::string&); + void finishUpdating(const std::string&); + friend struct AddComponent; static const std::string _applicationDbName; @@ -187,7 +191,7 @@ private: int _replicaApplicationSerial; int _adapterSerial; int _objectSerial; - std::set<std::string> _updating; + std::map<std::string, std::vector<AMD_NodeSession_waitForApplicationReplicationPtr> > _updating; }; typedef IceUtil::Handle<Database> DatabasePtr; diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice index 7ebf0e02e56..163b0c87931 100644 --- a/cpp/src/IceGrid/Internal.ice +++ b/cpp/src/IceGrid/Internal.ice @@ -322,7 +322,7 @@ interface NodeSession * Wait for the replication of the given application to be done. * **/ - ["ami", "cpp:const"] void waitForApplicationReplication(string application, int revision); + ["amd", "ami", "cpp:const"] void waitForApplicationReplication(string application, int revision); /** * @@ -342,6 +342,15 @@ exception ReplicaActiveException { }; +enum TopicName +{ + RegistryObserverTopicName, + NodeObserverTopicName, + ApplicationObserverTopicName, + AdapterObserverTopicName, + ObjectObserverTopicName +}; + interface ReplicaSession { /** @@ -391,7 +400,7 @@ interface ReplicaSession * before to continue. * **/ - void receivedUpdate(string name, int serial, string failure); + void receivedUpdate(TopicName name, int serial, string failure); /** * diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp index fcffcc71324..e5ca8dfef6d 100644 --- a/cpp/src/IceGrid/NodeSessionI.cpp +++ b/cpp/src/IceGrid/NodeSessionI.cpp @@ -108,9 +108,12 @@ NodeSessionI::getServers(const Ice::Current& current) const } void -NodeSessionI::waitForApplicationReplication(const std::string& application, int revision, const Ice::Current&) const +NodeSessionI::waitForApplicationReplication_async(const AMD_NodeSession_waitForApplicationReplicationPtr& cb, + const std::string& application, + int revision, + const Ice::Current&) const { - _database->waitForApplicationReplication(application, revision); + _database->waitForApplicationReplication(cb, application, revision); } void diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h index e5e012cfc43..a21c86d8b17 100644 --- a/cpp/src/IceGrid/NodeSessionI.h +++ b/cpp/src/IceGrid/NodeSessionI.h @@ -32,7 +32,8 @@ public: virtual NodeObserverPrx getObserver(const Ice::Current&) const; virtual void loadServers(const Ice::Current&) const; virtual Ice::StringSeq getServers(const Ice::Current&) const; - virtual void waitForApplicationReplication(const std::string&, int, const Ice::Current&) const; + virtual void waitForApplicationReplication_async(const AMD_NodeSession_waitForApplicationReplicationPtr&, + const std::string&, int, const Ice::Current&) const; virtual void destroy(const Ice::Current&); const NodePrx& getNode() const; diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index 8eb940fa669..f411e170730 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -88,11 +88,6 @@ ReplicaCache::remove(const string& name) Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat); out << "replica `" << name << "' down"; } - - // - // Remove the replica expected updates. - // - removeReplicaUpdates(name); } try @@ -177,154 +172,6 @@ ReplicaCache::getEndpoints(const string& name, const Ice::ObjectPrx& proxy) cons return _communicator->stringToProxy("dummy")->ice_endpoints(endpoints); } -void -ReplicaCache::waitForUpdateReplication(const string& name, int serial) -{ - Lock sync(*this); - if(_entries.empty()) - { - return; - } - - vector<string> replicas; - for(map<string, ReplicaEntryPtr>::const_iterator s = _entries.begin(); s != _entries.end(); ++s) - { - replicas.push_back(s->first); - } - - ostringstream os; - os << name << "-" << serial; - const string key = os.str(); - - _waitForUpdates.insert(make_pair(key, set<string>(replicas.begin(), replicas.end()))); - - // - // Wait until all the updates are received. - // - while(true) - { - map<string, set<string> >::const_iterator p = _waitForUpdates.find(key); - if(p == _waitForUpdates.end()) - { - map<string, map<string, string> >::iterator q = _updateFailures.find(key); - if(q != _updateFailures.end()) - { - map<string, string> failures = q->second; - _updateFailures.erase(q); - - ostringstream os; - for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r) - { - os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n"; - } - Ice::Error err(_traceLevels->logger); - err << os.str(); - } - return; - } - else - { - wait(); - } - } -} - -void -ReplicaCache::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure) -{ - Lock sync(*this); - - ostringstream os; - os << update << "-" << serial; - const string key = os.str(); - - map<string, set<string> >::iterator p = _waitForUpdates.find(key); - if(p != _waitForUpdates.end()) - { - p->second.erase(name); - - if(!failure.empty()) - { - map<string, map<string, string> >::iterator q = _updateFailures.find(key); - if(q == _updateFailures.end()) - { - q = _updateFailures.insert(make_pair(key, map<string ,string>())).first; - } - q->second.insert(make_pair(name, failure)); - } - - if(p->second.empty()) - { - _waitForUpdates.erase(p); - notifyAll(); - } - } -} - -void -ReplicaCache::startApplicationReplication(const string& application, int revision) -{ - // - // Add the given application to the set of application being - // replicated. - // - Lock sync(*this); - _applicationReplication.insert(application); -} - - -void -ReplicaCache::finishApplicationReplication(const string& application, int revision) -{ - // - // Notify waiting threads that the given application replication - // is completed. - // - Lock sync(*this); - _applicationReplication.erase(application); - notifyAll(); -} - -void -ReplicaCache::waitForApplicationReplication(const string& application, int revision) -{ - // - // Wait for the given application to be replicated. - // - Lock sync(*this); - while(_applicationReplication.find(application) != _applicationReplication.end()) - { - wait(); - } -} - -void -ReplicaCache::removeReplicaUpdates(const string& name) -{ - // Must b called within the synchronization. - - map<string, set<string> >::iterator p = _waitForUpdates.begin(); - bool notifyMonitor = false; - while(p != _waitForUpdates.end()) - { - p->second.erase(name); - if(p->second.empty()) - { - _waitForUpdates.erase(p++); - notifyMonitor = true; - } - else - { - ++p; - } - } - - if(notifyMonitor) - { - notifyAll(); - } -} - ReplicaEntry::ReplicaEntry(const std::string& name, const ReplicaSessionIPtr& session) : _name(name), _session(session) diff --git a/cpp/src/IceGrid/ReplicaCache.h b/cpp/src/IceGrid/ReplicaCache.h index 837a472c093..2bc776ca108 100644 --- a/cpp/src/IceGrid/ReplicaCache.h +++ b/cpp/src/IceGrid/ReplicaCache.h @@ -58,24 +58,11 @@ public: Ice::ObjectPrx getEndpoints(const std::string&, const Ice::ObjectPrx&) const; - void waitForUpdateReplication(const std::string&, int); - void replicaReceivedUpdate(const std::string&, const std::string&, int, const std::string&); - - void startApplicationReplication(const std::string&, int); - void finishApplicationReplication(const std::string&, int); - void waitForApplicationReplication(const std::string&, int); - private: - void removeReplicaUpdates(const std::string&); - const Ice::CommunicatorPtr _communicator; const IceStorm::TopicPrx _topic; const NodePrx _nodes; - - std::map<std::string, std::set<std::string> > _waitForUpdates; - std::map<std::string, std::map<std::string, std::string> > _updateFailures; - std::set<std::string> _applicationReplication; }; }; diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp index ecc9789c6f4..605d0e77595 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.cpp +++ b/cpp/src/IceGrid/ReplicaSessionI.cpp @@ -107,9 +107,9 @@ ReplicaSessionI::setAdapterDirectProxy(const string& adapterId, } void -ReplicaSessionI::receivedUpdate(const string& update, int serial, const string& failure, const Ice::Current&) +ReplicaSessionI::receivedUpdate(TopicName topic, int serial, const string& failure, const Ice::Current&) { - _database->replicaReceivedUpdate(_name, update, serial, failure); + _database->replicaReceivedUpdate(_name, topic, serial, failure); } void diff --git a/cpp/src/IceGrid/ReplicaSessionI.h b/cpp/src/IceGrid/ReplicaSessionI.h index 737ff4aa656..aa737b91b06 100644 --- a/cpp/src/IceGrid/ReplicaSessionI.h +++ b/cpp/src/IceGrid/ReplicaSessionI.h @@ -38,7 +38,7 @@ public: virtual void registerWellKnownObjects(const ObjectInfoSeq&, const Ice::Current&); virtual void setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&, const Ice::Current&); - virtual void receivedUpdate(const std::string&, int, const std::string&, const Ice::Current&); + virtual void receivedUpdate(TopicName, int, const std::string&, const Ice::Current&); virtual void destroy(const Ice::Current&); virtual IceUtil::Time timestamp() const; diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp index b1045c49a75..3ebc345c5d1 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.cpp +++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp @@ -49,7 +49,7 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - _manager.receivedUpdate("application", getSerial(current.ctx, "application"), failure); + _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -66,7 +66,7 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - _manager.receivedUpdate("application", getSerial(current.ctx, "application"), failure); + _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -89,7 +89,7 @@ public: os << ex << ":\napplication: " << ex.name; failure = os.str(); } - _manager.receivedUpdate("application", getSerial(current.ctx, "application"), failure); + _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -106,7 +106,7 @@ public: { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - _manager.receivedUpdate("adapter", getSerial(current.ctx, "adapter"), failure); + _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -117,7 +117,7 @@ public: { failure = "adapter `" + info.id + "' already exists and belongs to an application"; } - _manager.receivedUpdate("adapter", getSerial(current.ctx, "adapter"), failure); + _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -128,7 +128,7 @@ public: { failure = "adapter `" + id + "' already exists and belongs to an application"; } - _manager.receivedUpdate("adapter", getSerial(current.ctx, "adapter"), failure); + _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -152,7 +152,7 @@ public: os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity()); failure = os.str(); } - _manager.receivedUpdate("object", getSerial(current.ctx, "object"), failure); + _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -169,7 +169,7 @@ public: os << ex << ":\n" << ex.reason; failure = os.str(); } - _manager.receivedUpdate("object", getSerial(current.ctx, "object"), failure); + _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } virtual void @@ -189,15 +189,15 @@ public: catch(const ObjectNotRegisteredException&) { } - _manager.receivedUpdate("object", getSerial(current.ctx, "object"), failure); + _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure); } private: int - getSerial(const Ice::Context& context, const string& name) + getSerial(const Ice::Context& context) { - Ice::Context::const_iterator p = context.find(name); + Ice::Context::const_iterator p = context.find("serial"); if(p != context.end()) { int serial; @@ -305,14 +305,14 @@ ReplicaSessionManager::destroy() } void -ReplicaSessionManager::receivedUpdate(const string& update, int serial, const string& failure) +ReplicaSessionManager::receivedUpdate(TopicName name, int serial, const string& failure) { ReplicaSessionPrx session = _thread->getSession(); if(session) { try { - session->receivedUpdate(update, serial, failure); + session->receivedUpdate(name, serial, failure); } catch(const Ice::LocalException&) { diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h index a1bc90bf326..6f895755ec6 100644 --- a/cpp/src/IceGrid/ReplicaSessionManager.h +++ b/cpp/src/IceGrid/ReplicaSessionManager.h @@ -41,7 +41,7 @@ public: NodePrxSeq getNodes() const; void destroy(); - void receivedUpdate(const std::string&, int, const std::string&); + void receivedUpdate(TopicName, int, const std::string&); void registerAllWellKnownObjects(); ReplicaSessionPrx getSession() const { return _thread->getSession(); } diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 6eebdae97bc..5c896132f1b 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -22,9 +22,11 @@ class InitCB : public T { public: - InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& name, int serial) : + InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& subscriberName, + const string& name, int serial) : _topic(topic), _observer(observer), + _subscriberName(subscriberName), _name(name), _serial(serial) { @@ -33,7 +35,7 @@ public: void ice_response() { - _topic->subscribe(_observer, _serial); + _topic->subscribe(_observer, _subscriberName, _serial); } void @@ -47,6 +49,7 @@ private: const ObserverTopicPtr _topic; const Ice::ObjectPrx _observer; + const string _subscriberName; const string _name; const int _serial; }; @@ -80,13 +83,13 @@ ObserverTopic::~ObserverTopic() } void -ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial) +ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial) { while(true) { if(serial == -1) { - initObserver(obsv); + initObserver(obsv, name); return; } @@ -97,19 +100,19 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial) continue; } - subscribeImpl(obsv); + subscribeImpl(obsv, name); break; } } void -ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv) +ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv, const string& name) { { Lock sync(*this); _waitForSubscribe.insert(obsv->ice_getIdentity()); } - subscribe(obsv); + subscribe(obsv, name); { Lock sync(*this); while(_topic && _waitForSubscribe.find(obsv->ice_getIdentity()) != _waitForSubscribe.end()) @@ -120,13 +123,39 @@ ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv) } void -ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer) +ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name) { Lock sync(*this); if(_topic) { _topic->unsubscribe(observer); } + + if(!name.empty()) + { + _syncSubscribers.erase(name); + + map<int, set<string> >::iterator p = _waitForUpdates.begin(); + bool notifyMonitor = false; + while(p != _waitForUpdates.end()) + { + p->second.erase(name); + if(p->second.empty()) + { + _waitForUpdates.erase(p++); + notifyMonitor = true; + } + else + { + ++p; + } + } + + if(notifyMonitor) + { + notifyAll(); + } + } } void @@ -137,8 +166,78 @@ ObserverTopic::destroy() notifyAll(); } +void +ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure) +{ + Lock sync(*this); + + map<int, set<string> >::iterator p = _waitForUpdates.find(serial); + if(p != _waitForUpdates.end()) + { + p->second.erase(name); + + if(!failure.empty()) + { + map<int, map<string, string> >::iterator q = _updateFailures.find(serial); + if(q == _updateFailures.end()) + { + q = _updateFailures.insert(make_pair(serial, map<string ,string>())).first; + } + q->second.insert(make_pair(name, failure)); + } + + if(p->second.empty()) + { + _waitForUpdates.erase(p); + notifyAll(); + } + } +} + +void +ObserverTopic::waitForSyncedSubscribers(int serial) +{ + if(_syncSubscribers.empty()) + { + return; + } + + _waitForUpdates.insert(make_pair(serial, _syncSubscribers)); + + // + // Wait until all the updates are received. + // + while(true) + { + map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial); + if(p == _waitForUpdates.end()) + { + map<int, map<string, string> >::iterator q = _updateFailures.find(serial); + if(q != _updateFailures.end()) + { + map<string, string> failures = q->second; + _updateFailures.erase(q); + + ostringstream os; + for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r) + { + os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n"; + } + // TODO: XXX +// Ice::Error err(_traceLevels->logger); +// err << os.str(); + } + return; + } + else + { + wait(); + } + } +} + void -ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer) +ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer, const string& name) { // This must be called with the mutex locked. if(!_topic) @@ -151,6 +250,12 @@ ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer) _topic->subscribe(qos, observer); _waitForSubscribe.erase(observer->ice_getIdentity()); + + if(!name.empty()) + { + _syncSubscribers.insert(name); + } + notifyAll(); } @@ -171,13 +276,13 @@ ObserverTopic::updateSerial(int serial) } Ice::Context -ObserverTopic::getContext(const string& name, int serial) const +ObserverTopic::getContext(int serial) const { ostringstream os; os << serial; Ice::Context context; - context[name] = os.str(); + context["serial"] = os.str(); return context; } @@ -233,7 +338,7 @@ RegistryObserverTopic::registryDown(const string& name) } void -RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv); RegistryInfoSeq registries; @@ -247,8 +352,8 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv) } serial = _serial; } - observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, "registry", serial), - registries); + observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, name, "registry", + serial), registries); } NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -431,7 +536,7 @@ NodeObserverTopic::nodeDown(const string& name) } void -NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv); int serial; @@ -445,7 +550,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv) } serial = _serial; } - observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, "node", serial), nodes); + observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, name, "node", serial), nodes); } ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager, @@ -472,7 +577,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& } try { - _publisher->applicationInit(serial, apps, getContext("application", serial)); + _publisher->applicationInit(serial, apps, getContext(serial)); } catch(const Ice::LocalException& ex) { @@ -493,13 +598,14 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in _applications.insert(make_pair(info.descriptor.name, info)); try { - _publisher->applicationAdded(serial, info, getContext("application", serial)); + _publisher->applicationAdded(serial, info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -514,13 +620,14 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) _applications.erase(name); try { - _publisher->applicationRemoved(serial, name, getContext("application", serial)); + _publisher->applicationRemoved(serial, name, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -566,17 +673,18 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate } try { - _publisher->applicationUpdated(serial, info, getContext("application", serial)); + _publisher->applicationUpdated(serial, info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void -ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv); int serial; @@ -590,8 +698,8 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv) applications.push_back(p->second); } } - observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, "application", - serial), + observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, name, + "application", serial), serial, applications); } @@ -619,7 +727,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts) } try { - _publisher->adapterInit(adpts, getContext("adapter", serial)); + _publisher->adapterInit(adpts, getContext(serial)); } catch(const Ice::LocalException& ex) { @@ -640,13 +748,14 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info) _adapters.insert(make_pair(info.id, info)); try { - _publisher->adapterAdded(info, getContext("adapter", serial)); + _publisher->adapterAdded(info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -661,13 +770,14 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info) _adapters[info.id] = info; try { - _publisher->adapterUpdated(info, getContext("adapter", serial)); + _publisher->adapterUpdated(info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -682,17 +792,18 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id) _adapters.erase(id); try { - _publisher->adapterRemoved(id, getContext("adapter", serial)); + _publisher->adapterRemoved(id, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void -AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv); int serial; @@ -706,7 +817,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv) adapters.push_back(p->second); } } - observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, "adapter", serial), + observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, name, "adapter", serial), adapters); } @@ -734,7 +845,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects) } try { - _publisher->objectInit(objects, getContext("object", serial)); + _publisher->objectInit(objects, getContext(serial)); } catch(const Ice::LocalException& ex) { @@ -755,13 +866,14 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info) _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); try { - _publisher->objectAdded(info, getContext("object", serial)); + _publisher->objectAdded(info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -776,13 +888,14 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info) _objects[info.proxy->ice_getIdentity()] = info; try { - _publisher->objectUpdated(info, getContext("object", serial)); + _publisher->objectUpdated(info, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void @@ -797,17 +910,18 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id) _objects.erase(id); try { - _publisher->objectRemoved(id, getContext("object", serial)); + _publisher->objectRemoved(id, getContext(serial)); } catch(const Ice::LocalException& ex) { Ice::Warning out(_publisher->ice_getCommunicator()->getLogger()); out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } + waitForSyncedSubscribers(serial); } void -ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) +ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name) { ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv); int serial; @@ -820,7 +934,8 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv) objects.push_back(p->second); } } - observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, "object", serial), objects); + observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, name, "object", serial), + objects); } diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index e17f3c9a438..c561dfa5862 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -20,14 +20,6 @@ namespace IceGrid { -enum TopicName -{ - RegistryObserverTopicName, - NodeObserverTopicName, - ApplicationObserverTopicName, - AdapterObserverTopicName, - ObjectObserverTopicName -}; class ObserverTopic : public IceUtil::Monitor<IceUtil::Mutex>, virtual public Ice::Object { @@ -36,23 +28,30 @@ public: ObserverTopic(const IceStorm::TopicManagerPrx&, const std::string&); virtual ~ObserverTopic(); - void subscribe(const Ice::ObjectPrx&, int = -1); - void subscribeAndWaitForSubscription(const Ice::ObjectPrx&); - void unsubscribe(const Ice::ObjectPrx&); + void subscribe(const Ice::ObjectPrx&, const std::string& = std::string(), int = -1); + void subscribeAndWaitForSubscription(const Ice::ObjectPrx&, const std::string& = std::string()); + void unsubscribe(const Ice::ObjectPrx&, const std::string& = std::string()); void destroy(); - virtual void initObserver(const Ice::ObjectPrx&) = 0; + void receivedUpdate(const std::string&, int, const std::string&); + + virtual void initObserver(const Ice::ObjectPrx&, const std::string&) = 0; protected: - void subscribeImpl(const Ice::ObjectPrx&); + void waitForSyncedSubscribers(int); + void subscribeImpl(const Ice::ObjectPrx&, const std::string&); void updateSerial(int); - Ice::Context getContext(const std::string&, int) const; + Ice::Context getContext(int) const; IceStorm::TopicPrx _topic; Ice::ObjectPrx _basePublisher; std::set<Ice::Identity> _waitForSubscribe; - int _serial; + int _serial; + + std::set<std::string> _syncSubscribers; + std::map<int, std::set<std::string> > _waitForUpdates; + std::map<int, std::map<std::string, std::string> > _updateFailures; }; typedef IceUtil::Handle<ObserverTopic> ObserverTopicPtr; @@ -65,7 +64,7 @@ public: void registryUp(const RegistryInfo&); void registryDown(const std::string&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const Ice::ObjectPrx&, const std::string&); private: @@ -89,7 +88,7 @@ public: const NodeObserverPrx& getPublisher() { return _externalPublisher; } void nodeDown(const std::string&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const Ice::ObjectPrx&, const std::string&); private: @@ -110,7 +109,7 @@ public: void applicationRemoved(int, const std::string&); void applicationUpdated(int, const ApplicationUpdateInfo&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const Ice::ObjectPrx&, const std::string&); private: @@ -130,7 +129,7 @@ public: void adapterUpdated(int, const AdapterInfo&); void adapterRemoved(int, const std::string&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const Ice::ObjectPrx&, const std::string&); private: @@ -150,7 +149,7 @@ public: void objectUpdated(int, const ObjectInfo&); void objectRemoved(int, const Ice::Identity&); - virtual void initObserver(const Ice::ObjectPrx&); + virtual void initObserver(const Ice::ObjectPrx&, const std::string&); private: |