summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp136
-rw-r--r--cpp/src/IceGrid/Database.h14
-rw-r--r--cpp/src/IceGrid/Internal.ice13
-rw-r--r--cpp/src/IceGrid/NodeSessionI.cpp7
-rw-r--r--cpp/src/IceGrid/NodeSessionI.h3
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp153
-rw-r--r--cpp/src/IceGrid/ReplicaCache.h13
-rw-r--r--cpp/src/IceGrid/ReplicaSessionI.cpp4
-rw-r--r--cpp/src/IceGrid/ReplicaSessionI.h2
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.cpp26
-rw-r--r--cpp/src/IceGrid/ReplicaSessionManager.h2
-rw-r--r--cpp/src/IceGrid/Topics.cpp185
-rw-r--r--cpp/src/IceGrid/Topics.h39
13 files changed, 289 insertions, 308 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 42082b7fddc..457660751c9 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -230,10 +230,9 @@ Database::getObserverTopic(TopicName name) const
case ObjectObserverTopicName:
return _objectObserverTopic;
default:
- assert(false);
break;
}
- return 0; // Keep the compiler happy.
+ return 0;
}
void
@@ -392,9 +391,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
ApplicationHelper helper(_communicator, info.descriptor);
checkForAddition(helper);
load(helper, entries, info.uuid, info.revision);
- _updating.insert(info.descriptor.name);
-
- _replicaCache.startApplicationReplication(info.uuid, info.revision);
+ startUpdating(info.descriptor.name);
}
if(_master)
@@ -410,7 +407,7 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
Lock sync(*this);
entries.clear();
unload(ApplicationHelper(_communicator, info.descriptor), entries);
- _updating.erase(info.descriptor.name);
+ finishUpdating(info.descriptor.name);
notifyAll();
}
catch(const DeploymentException& ex)
@@ -422,21 +419,20 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
}
}
- //
- // Save the application descriptor.
- //
int serial;
{
Lock sync(*this);
- _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
serial = ++_applicationSerial;
- _updating.erase(info.descriptor.name);
- notifyAll();
+ _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
}
_applicationObserverTopic->applicationAdded(serial, info);
- _replicaCache.waitForUpdateReplication("application", serial);
- _replicaCache.finishApplicationReplication(info.uuid, info.revision);
+
+ {
+ Lock sync(*this);
+ finishUpdating(info.descriptor.name);
+ notifyAll();
+ }
if(_traceLevels->application > 0)
{
@@ -481,11 +477,10 @@ Database::updateApplication(const ApplicationUpdateInfo& updt, AdminSessionI* se
newDesc = helper.getDefinition();
- _updating.insert(update.descriptor.name);
- _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
+ startUpdating(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(entries, update, oldApp, newDesc, session);
}
void
@@ -523,11 +518,10 @@ Database::syncApplicationDescriptor(const ApplicationDescriptor& newDesc, AdminS
checkForUpdate(previous, helper);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision + 1);
- _updating.insert(update.descriptor.name);
- _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
+ startUpdating(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(entries, update, oldApp, newDesc, session);
}
void
@@ -571,11 +565,10 @@ Database::instantiateServer(const string& application,
newDesc = helper.getDefinition();
- _updating.insert(update.descriptor.name);
- _replicaCache.startApplicationReplication(oldApp.uuid, update.revision);
+ startUpdating(update.descriptor.name);
}
- finishUpdate(entries, update, oldApp, newDesc, session);
+ finishApplicationUpdate(entries, update, oldApp, newDesc, session);
}
void
@@ -618,7 +611,6 @@ Database::removeApplication(const string& name, AdminSessionI* session)
}
_applicationObserverTopic->applicationRemoved(serial, name);
- _replicaCache.waitForUpdateReplication("application", serial);
if(_traceLevels->application > 0)
{
@@ -729,15 +721,11 @@ Database::getAllNodes(const string& expression)
void
Database::addReplica(const string& name, const ReplicaSessionIPtr& session)
{
- //
- // NOTE: this must be done before we add the replica to the cache
- // in order for ReplicaCache::waitForUpdateReplication to work.
- //
- _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
- _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
- _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver());
-
_replicaCache.add(name, session);
+
+ _applicationObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name);
+ _adapterObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name);
+ _objectObserverTopic->subscribeAndWaitForSubscription(session->getObserver(), name);
}
InternalRegistryPrx
@@ -753,25 +741,40 @@ Database::getReplicaInfo(const string& name) const
}
void
-Database::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure)
+Database::replicaReceivedUpdate(const string& replica, TopicName name, int serial, const string& failure)
{
- _replicaCache.replicaReceivedUpdate(name, update, serial, failure);
+ ObserverTopicPtr topic = getObserverTopic(name);
+ if(topic)
+ {
+ topic->receivedUpdate(replica, serial, failure);
+ }
}
void
-Database::waitForApplicationReplication(const string& application, int revision)
+Database::waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr& cb,
+ const string& application,
+ int revision)
{
- _replicaCache.waitForApplicationReplication(application, revision);
+ Lock sync(*this);
+ map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(application);
+ if(p != _updating.end())
+ {
+ p->second.push_back(cb);
+ }
+ else
+ {
+ cb->ice_response();
+ }
}
void
Database::removeReplica(const string& name, const ReplicaSessionIPtr& session)
{
- _replicaCache.remove(name);
+ _applicationObserverTopic->unsubscribe(session->getObserver(), name);
+ _adapterObserverTopic->unsubscribe(session->getObserver(), name);
+ _objectObserverTopic->unsubscribe(session->getObserver(), name);
- _applicationObserverTopic->unsubscribe(session->getObserver());
- _adapterObserverTopic->unsubscribe(session->getObserver());
- _objectObserverTopic->unsubscribe(session->getObserver());
+ _replicaCache.remove(name);
}
Ice::StringSeq
@@ -881,7 +884,6 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
{
_adapterObserverTopic->adapterRemoved(serial, adapterId);
}
- _replicaCache.waitForUpdateReplication("adapter", serial);
return true;
}
@@ -961,7 +963,6 @@ Database::removeAdapter(const string& adapterId)
if(infos.empty())
{
_adapterObserverTopic->adapterRemoved(serial, adapterId);
- _replicaCache.waitForUpdateReplication("adapter", serial);
}
else
{
@@ -971,11 +972,6 @@ Database::removeAdapter(const string& adapterId)
{
_adapterObserverTopic->adapterUpdated(serial + i, *p);
}
- i = 0;
- for(p = infos.begin(); p != infos.end(); ++p, ++i)
- {
- _replicaCache.waitForUpdateReplication("adapter", serial + i);
- }
}
}
@@ -1166,7 +1162,6 @@ Database::addObject(const ObjectInfo& info, bool replaceIfExistsInDatabase)
{
_objectObserverTopic->objectUpdated(serial, info);
}
- _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1211,7 +1206,6 @@ Database::removeObject(const Ice::Identity& id)
}
_objectObserverTopic->objectRemoved(serial, id);
- _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1254,7 +1248,6 @@ Database::updateObject(const Ice::ObjectPrx& proxy)
}
_objectObserverTopic->objectUpdated(serial, info);
- _replicaCache.waitForUpdateReplication("object", serial);
if(_traceLevels->object > 0)
{
@@ -1755,11 +1748,11 @@ Database::reload(const ApplicationHelper& oldApp,
}
void
-Database::finishUpdate(ServerEntrySeq& entries,
- const ApplicationUpdateInfo& update,
- const ApplicationInfo& oldApp,
- const ApplicationDescriptor& newDesc,
- AdminSessionI* session)
+Database::finishApplicationUpdate(ServerEntrySeq& entries,
+ const ApplicationUpdateInfo& update,
+ const ApplicationInfo& oldApp,
+ const ApplicationDescriptor& newDesc,
+ AdminSessionI* session)
{
if(_master)
{
@@ -1780,7 +1773,7 @@ Database::finishUpdate(ServerEntrySeq& entries,
ApplicationHelper previous(_communicator, newDesc);
ApplicationHelper helper(_communicator, oldApp.descriptor);
reload(previous, helper, entries, oldApp.uuid, oldApp.revision);
- _updating.erase(newDesc.name);
+ finishUpdating(newDesc.name);
notifyAll();
}
@@ -1812,13 +1805,15 @@ Database::finishUpdate(ServerEntrySeq& entries,
_applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
serial = ++_applicationSerial;
- _updating.erase(update.descriptor.name);
- notifyAll();
}
_applicationObserverTopic->applicationUpdated(serial, update);
- _replicaCache.waitForUpdateReplication("application", serial);
- _replicaCache.finishApplicationReplication(oldApp.uuid, update.revision);
+
+ {
+ Lock sync(*this);
+ finishUpdating(update.descriptor.name);
+ notifyAll();
+ }
if(_traceLevels->application > 0)
{
@@ -1826,3 +1821,24 @@ Database::finishUpdate(ServerEntrySeq& entries,
out << "updated application `" << update.descriptor.name << "'";
}
}
+
+void
+Database::startUpdating(const string& name)
+{
+ // Must be called within the synchronization.
+ _updating.insert(make_pair(name, vector<AMD_NodeSession_waitForApplicationReplicationPtr>()));
+}
+
+void
+Database::finishUpdating(const string& name)
+{
+ // Must be called within the synchronization.
+ map<string, vector<AMD_NodeSession_waitForApplicationReplicationPtr> >::iterator p = _updating.find(name);
+ assert(p != _updating.end());
+ for(vector<AMD_NodeSession_waitForApplicationReplicationPtr>::const_iterator q = p->second.begin();
+ q != p->second.end(); ++q)
+ {
+ (*q)->ice_response();
+ }
+ _updating.erase(p);
+}
diff --git a/cpp/src/IceGrid/Database.h b/cpp/src/IceGrid/Database.h
index e8226d776cd..4056c8005f6 100644
--- a/cpp/src/IceGrid/Database.h
+++ b/cpp/src/IceGrid/Database.h
@@ -92,8 +92,9 @@ public:
void addReplica(const std::string&, const ReplicaSessionIPtr&);
RegistryInfo getReplicaInfo(const std::string&) const;
InternalRegistryPrx getReplica(const std::string&) const;
- void replicaReceivedUpdate(const std::string&, const std::string&, int, const std::string&);
- void waitForApplicationReplication(const std::string&, int);
+ void replicaReceivedUpdate(const std::string&, TopicName, int, const std::string&);
+ void waitForApplicationReplication(const AMD_NodeSession_waitForApplicationReplicationPtr&, const std::string&,
+ int);
void removeReplica(const std::string&, const ReplicaSessionIPtr&);
Ice::StringSeq getAllReplicas(const std::string& = std::string());
@@ -142,11 +143,14 @@ private:
void load(const ApplicationHelper&, ServerEntrySeq&, const std::string&, int);
void unload(const ApplicationHelper&, ServerEntrySeq&);
void reload(const ApplicationHelper&, const ApplicationHelper&, ServerEntrySeq&, const std::string&, int);
- void finishUpdate(ServerEntrySeq&, const ApplicationUpdateInfo&, const ApplicationInfo&,
- const ApplicationDescriptor&, AdminSessionI*);
+ void finishApplicationUpdate(ServerEntrySeq&, const ApplicationUpdateInfo&, const ApplicationInfo&,
+ const ApplicationDescriptor&, AdminSessionI*);
void checkSessionLock(AdminSessionI*);
+ void startUpdating(const std::string&);
+ void finishUpdating(const std::string&);
+
friend struct AddComponent;
static const std::string _applicationDbName;
@@ -187,7 +191,7 @@ private:
int _replicaApplicationSerial;
int _adapterSerial;
int _objectSerial;
- std::set<std::string> _updating;
+ std::map<std::string, std::vector<AMD_NodeSession_waitForApplicationReplicationPtr> > _updating;
};
typedef IceUtil::Handle<Database> DatabasePtr;
diff --git a/cpp/src/IceGrid/Internal.ice b/cpp/src/IceGrid/Internal.ice
index 7ebf0e02e56..163b0c87931 100644
--- a/cpp/src/IceGrid/Internal.ice
+++ b/cpp/src/IceGrid/Internal.ice
@@ -322,7 +322,7 @@ interface NodeSession
* Wait for the replication of the given application to be done.
*
**/
- ["ami", "cpp:const"] void waitForApplicationReplication(string application, int revision);
+ ["amd", "ami", "cpp:const"] void waitForApplicationReplication(string application, int revision);
/**
*
@@ -342,6 +342,15 @@ exception ReplicaActiveException
{
};
+enum TopicName
+{
+ RegistryObserverTopicName,
+ NodeObserverTopicName,
+ ApplicationObserverTopicName,
+ AdapterObserverTopicName,
+ ObjectObserverTopicName
+};
+
interface ReplicaSession
{
/**
@@ -391,7 +400,7 @@ interface ReplicaSession
* before to continue.
*
**/
- void receivedUpdate(string name, int serial, string failure);
+ void receivedUpdate(TopicName name, int serial, string failure);
/**
*
diff --git a/cpp/src/IceGrid/NodeSessionI.cpp b/cpp/src/IceGrid/NodeSessionI.cpp
index fcffcc71324..e5ca8dfef6d 100644
--- a/cpp/src/IceGrid/NodeSessionI.cpp
+++ b/cpp/src/IceGrid/NodeSessionI.cpp
@@ -108,9 +108,12 @@ NodeSessionI::getServers(const Ice::Current& current) const
}
void
-NodeSessionI::waitForApplicationReplication(const std::string& application, int revision, const Ice::Current&) const
+NodeSessionI::waitForApplicationReplication_async(const AMD_NodeSession_waitForApplicationReplicationPtr& cb,
+ const std::string& application,
+ int revision,
+ const Ice::Current&) const
{
- _database->waitForApplicationReplication(application, revision);
+ _database->waitForApplicationReplication(cb, application, revision);
}
void
diff --git a/cpp/src/IceGrid/NodeSessionI.h b/cpp/src/IceGrid/NodeSessionI.h
index e5e012cfc43..a21c86d8b17 100644
--- a/cpp/src/IceGrid/NodeSessionI.h
+++ b/cpp/src/IceGrid/NodeSessionI.h
@@ -32,7 +32,8 @@ public:
virtual NodeObserverPrx getObserver(const Ice::Current&) const;
virtual void loadServers(const Ice::Current&) const;
virtual Ice::StringSeq getServers(const Ice::Current&) const;
- virtual void waitForApplicationReplication(const std::string&, int, const Ice::Current&) const;
+ virtual void waitForApplicationReplication_async(const AMD_NodeSession_waitForApplicationReplicationPtr&,
+ const std::string&, int, const Ice::Current&) const;
virtual void destroy(const Ice::Current&);
const NodePrx& getNode() const;
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
index 8eb940fa669..f411e170730 100644
--- a/cpp/src/IceGrid/ReplicaCache.cpp
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -88,11 +88,6 @@ ReplicaCache::remove(const string& name)
Ice::Trace out(_traceLevels->logger, _traceLevels->replicaCat);
out << "replica `" << name << "' down";
}
-
- //
- // Remove the replica expected updates.
- //
- removeReplicaUpdates(name);
}
try
@@ -177,154 +172,6 @@ ReplicaCache::getEndpoints(const string& name, const Ice::ObjectPrx& proxy) cons
return _communicator->stringToProxy("dummy")->ice_endpoints(endpoints);
}
-void
-ReplicaCache::waitForUpdateReplication(const string& name, int serial)
-{
- Lock sync(*this);
- if(_entries.empty())
- {
- return;
- }
-
- vector<string> replicas;
- for(map<string, ReplicaEntryPtr>::const_iterator s = _entries.begin(); s != _entries.end(); ++s)
- {
- replicas.push_back(s->first);
- }
-
- ostringstream os;
- os << name << "-" << serial;
- const string key = os.str();
-
- _waitForUpdates.insert(make_pair(key, set<string>(replicas.begin(), replicas.end())));
-
- //
- // Wait until all the updates are received.
- //
- while(true)
- {
- map<string, set<string> >::const_iterator p = _waitForUpdates.find(key);
- if(p == _waitForUpdates.end())
- {
- map<string, map<string, string> >::iterator q = _updateFailures.find(key);
- if(q != _updateFailures.end())
- {
- map<string, string> failures = q->second;
- _updateFailures.erase(q);
-
- ostringstream os;
- for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
- {
- os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n";
- }
- Ice::Error err(_traceLevels->logger);
- err << os.str();
- }
- return;
- }
- else
- {
- wait();
- }
- }
-}
-
-void
-ReplicaCache::replicaReceivedUpdate(const string& name, const string& update, int serial, const string& failure)
-{
- Lock sync(*this);
-
- ostringstream os;
- os << update << "-" << serial;
- const string key = os.str();
-
- map<string, set<string> >::iterator p = _waitForUpdates.find(key);
- if(p != _waitForUpdates.end())
- {
- p->second.erase(name);
-
- if(!failure.empty())
- {
- map<string, map<string, string> >::iterator q = _updateFailures.find(key);
- if(q == _updateFailures.end())
- {
- q = _updateFailures.insert(make_pair(key, map<string ,string>())).first;
- }
- q->second.insert(make_pair(name, failure));
- }
-
- if(p->second.empty())
- {
- _waitForUpdates.erase(p);
- notifyAll();
- }
- }
-}
-
-void
-ReplicaCache::startApplicationReplication(const string& application, int revision)
-{
- //
- // Add the given application to the set of application being
- // replicated.
- //
- Lock sync(*this);
- _applicationReplication.insert(application);
-}
-
-
-void
-ReplicaCache::finishApplicationReplication(const string& application, int revision)
-{
- //
- // Notify waiting threads that the given application replication
- // is completed.
- //
- Lock sync(*this);
- _applicationReplication.erase(application);
- notifyAll();
-}
-
-void
-ReplicaCache::waitForApplicationReplication(const string& application, int revision)
-{
- //
- // Wait for the given application to be replicated.
- //
- Lock sync(*this);
- while(_applicationReplication.find(application) != _applicationReplication.end())
- {
- wait();
- }
-}
-
-void
-ReplicaCache::removeReplicaUpdates(const string& name)
-{
- // Must b called within the synchronization.
-
- map<string, set<string> >::iterator p = _waitForUpdates.begin();
- bool notifyMonitor = false;
- while(p != _waitForUpdates.end())
- {
- p->second.erase(name);
- if(p->second.empty())
- {
- _waitForUpdates.erase(p++);
- notifyMonitor = true;
- }
- else
- {
- ++p;
- }
- }
-
- if(notifyMonitor)
- {
- notifyAll();
- }
-}
-
ReplicaEntry::ReplicaEntry(const std::string& name, const ReplicaSessionIPtr& session) :
_name(name),
_session(session)
diff --git a/cpp/src/IceGrid/ReplicaCache.h b/cpp/src/IceGrid/ReplicaCache.h
index 837a472c093..2bc776ca108 100644
--- a/cpp/src/IceGrid/ReplicaCache.h
+++ b/cpp/src/IceGrid/ReplicaCache.h
@@ -58,24 +58,11 @@ public:
Ice::ObjectPrx getEndpoints(const std::string&, const Ice::ObjectPrx&) const;
- void waitForUpdateReplication(const std::string&, int);
- void replicaReceivedUpdate(const std::string&, const std::string&, int, const std::string&);
-
- void startApplicationReplication(const std::string&, int);
- void finishApplicationReplication(const std::string&, int);
- void waitForApplicationReplication(const std::string&, int);
-
private:
- void removeReplicaUpdates(const std::string&);
-
const Ice::CommunicatorPtr _communicator;
const IceStorm::TopicPrx _topic;
const NodePrx _nodes;
-
- std::map<std::string, std::set<std::string> > _waitForUpdates;
- std::map<std::string, std::map<std::string, std::string> > _updateFailures;
- std::set<std::string> _applicationReplication;
};
};
diff --git a/cpp/src/IceGrid/ReplicaSessionI.cpp b/cpp/src/IceGrid/ReplicaSessionI.cpp
index ecc9789c6f4..605d0e77595 100644
--- a/cpp/src/IceGrid/ReplicaSessionI.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionI.cpp
@@ -107,9 +107,9 @@ ReplicaSessionI::setAdapterDirectProxy(const string& adapterId,
}
void
-ReplicaSessionI::receivedUpdate(const string& update, int serial, const string& failure, const Ice::Current&)
+ReplicaSessionI::receivedUpdate(TopicName topic, int serial, const string& failure, const Ice::Current&)
{
- _database->replicaReceivedUpdate(_name, update, serial, failure);
+ _database->replicaReceivedUpdate(_name, topic, serial, failure);
}
void
diff --git a/cpp/src/IceGrid/ReplicaSessionI.h b/cpp/src/IceGrid/ReplicaSessionI.h
index 737ff4aa656..aa737b91b06 100644
--- a/cpp/src/IceGrid/ReplicaSessionI.h
+++ b/cpp/src/IceGrid/ReplicaSessionI.h
@@ -38,7 +38,7 @@ public:
virtual void registerWellKnownObjects(const ObjectInfoSeq&, const Ice::Current&);
virtual void setAdapterDirectProxy(const std::string&, const std::string&, const Ice::ObjectPrx&,
const Ice::Current&);
- virtual void receivedUpdate(const std::string&, int, const std::string&, const Ice::Current&);
+ virtual void receivedUpdate(TopicName, int, const std::string&, const Ice::Current&);
virtual void destroy(const Ice::Current&);
virtual IceUtil::Time timestamp() const;
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.cpp b/cpp/src/IceGrid/ReplicaSessionManager.cpp
index b1045c49a75..3ebc345c5d1 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.cpp
+++ b/cpp/src/IceGrid/ReplicaSessionManager.cpp
@@ -49,7 +49,7 @@ public:
os << ex << ":\n" << ex.reason;
failure = os.str();
}
- _manager.receivedUpdate("application", getSerial(current.ctx, "application"), failure);
+ _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -66,7 +66,7 @@ public:
os << ex << ":\napplication: " << ex.name;
failure = os.str();
}
- _manager.receivedUpdate("application", getSerial(current.ctx, "application"), failure);
+ _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -89,7 +89,7 @@ public:
os << ex << ":\napplication: " << ex.name;
failure = os.str();
}
- _manager.receivedUpdate("application", getSerial(current.ctx, "application"), failure);
+ _manager.receivedUpdate(ApplicationObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -106,7 +106,7 @@ public:
{
failure = "adapter `" + info.id + "' already exists and belongs to an application";
}
- _manager.receivedUpdate("adapter", getSerial(current.ctx, "adapter"), failure);
+ _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -117,7 +117,7 @@ public:
{
failure = "adapter `" + info.id + "' already exists and belongs to an application";
}
- _manager.receivedUpdate("adapter", getSerial(current.ctx, "adapter"), failure);
+ _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -128,7 +128,7 @@ public:
{
failure = "adapter `" + id + "' already exists and belongs to an application";
}
- _manager.receivedUpdate("adapter", getSerial(current.ctx, "adapter"), failure);
+ _manager.receivedUpdate(AdapterObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -152,7 +152,7 @@ public:
os << "id: " << info.proxy->ice_getCommunicator()->identityToString(info.proxy->ice_getIdentity());
failure = os.str();
}
- _manager.receivedUpdate("object", getSerial(current.ctx, "object"), failure);
+ _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -169,7 +169,7 @@ public:
os << ex << ":\n" << ex.reason;
failure = os.str();
}
- _manager.receivedUpdate("object", getSerial(current.ctx, "object"), failure);
+ _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
}
virtual void
@@ -189,15 +189,15 @@ public:
catch(const ObjectNotRegisteredException&)
{
}
- _manager.receivedUpdate("object", getSerial(current.ctx, "object"), failure);
+ _manager.receivedUpdate(ObjectObserverTopicName, getSerial(current.ctx), failure);
}
private:
int
- getSerial(const Ice::Context& context, const string& name)
+ getSerial(const Ice::Context& context)
{
- Ice::Context::const_iterator p = context.find(name);
+ Ice::Context::const_iterator p = context.find("serial");
if(p != context.end())
{
int serial;
@@ -305,14 +305,14 @@ ReplicaSessionManager::destroy()
}
void
-ReplicaSessionManager::receivedUpdate(const string& update, int serial, const string& failure)
+ReplicaSessionManager::receivedUpdate(TopicName name, int serial, const string& failure)
{
ReplicaSessionPrx session = _thread->getSession();
if(session)
{
try
{
- session->receivedUpdate(update, serial, failure);
+ session->receivedUpdate(name, serial, failure);
}
catch(const Ice::LocalException&)
{
diff --git a/cpp/src/IceGrid/ReplicaSessionManager.h b/cpp/src/IceGrid/ReplicaSessionManager.h
index a1bc90bf326..6f895755ec6 100644
--- a/cpp/src/IceGrid/ReplicaSessionManager.h
+++ b/cpp/src/IceGrid/ReplicaSessionManager.h
@@ -41,7 +41,7 @@ public:
NodePrxSeq getNodes() const;
void destroy();
- void receivedUpdate(const std::string&, int, const std::string&);
+ void receivedUpdate(TopicName, int, const std::string&);
void registerAllWellKnownObjects();
ReplicaSessionPrx getSession() const { return _thread->getSession(); }
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 6eebdae97bc..5c896132f1b 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -22,9 +22,11 @@ class InitCB : public T
{
public:
- InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& name, int serial) :
+ InitCB(const ObserverTopicPtr& topic, const Ice::ObjectPrx& observer, const string& subscriberName,
+ const string& name, int serial) :
_topic(topic),
_observer(observer),
+ _subscriberName(subscriberName),
_name(name),
_serial(serial)
{
@@ -33,7 +35,7 @@ public:
void
ice_response()
{
- _topic->subscribe(_observer, _serial);
+ _topic->subscribe(_observer, _subscriberName, _serial);
}
void
@@ -47,6 +49,7 @@ private:
const ObserverTopicPtr _topic;
const Ice::ObjectPrx _observer;
+ const string _subscriberName;
const string _name;
const int _serial;
};
@@ -80,13 +83,13 @@ ObserverTopic::~ObserverTopic()
}
void
-ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial)
+ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name, int serial)
{
while(true)
{
if(serial == -1)
{
- initObserver(obsv);
+ initObserver(obsv, name);
return;
}
@@ -97,19 +100,19 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, int serial)
continue;
}
- subscribeImpl(obsv);
+ subscribeImpl(obsv, name);
break;
}
}
void
-ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv)
+ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv, const string& name)
{
{
Lock sync(*this);
_waitForSubscribe.insert(obsv->ice_getIdentity());
}
- subscribe(obsv);
+ subscribe(obsv, name);
{
Lock sync(*this);
while(_topic && _waitForSubscribe.find(obsv->ice_getIdentity()) != _waitForSubscribe.end())
@@ -120,13 +123,39 @@ ObserverTopic::subscribeAndWaitForSubscription(const Ice::ObjectPrx& obsv)
}
void
-ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer)
+ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
{
Lock sync(*this);
if(_topic)
{
_topic->unsubscribe(observer);
}
+
+ if(!name.empty())
+ {
+ _syncSubscribers.erase(name);
+
+ map<int, set<string> >::iterator p = _waitForUpdates.begin();
+ bool notifyMonitor = false;
+ while(p != _waitForUpdates.end())
+ {
+ p->second.erase(name);
+ if(p->second.empty())
+ {
+ _waitForUpdates.erase(p++);
+ notifyMonitor = true;
+ }
+ else
+ {
+ ++p;
+ }
+ }
+
+ if(notifyMonitor)
+ {
+ notifyAll();
+ }
+ }
}
void
@@ -137,8 +166,78 @@ ObserverTopic::destroy()
notifyAll();
}
+void
+ObserverTopic::receivedUpdate(const string& name, int serial, const string& failure)
+{
+ Lock sync(*this);
+
+ map<int, set<string> >::iterator p = _waitForUpdates.find(serial);
+ if(p != _waitForUpdates.end())
+ {
+ p->second.erase(name);
+
+ if(!failure.empty())
+ {
+ map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
+ if(q == _updateFailures.end())
+ {
+ q = _updateFailures.insert(make_pair(serial, map<string ,string>())).first;
+ }
+ q->second.insert(make_pair(name, failure));
+ }
+
+ if(p->second.empty())
+ {
+ _waitForUpdates.erase(p);
+ notifyAll();
+ }
+ }
+}
+
+void
+ObserverTopic::waitForSyncedSubscribers(int serial)
+{
+ if(_syncSubscribers.empty())
+ {
+ return;
+ }
+
+ _waitForUpdates.insert(make_pair(serial, _syncSubscribers));
+
+ //
+ // Wait until all the updates are received.
+ //
+ while(true)
+ {
+ map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial);
+ if(p == _waitForUpdates.end())
+ {
+ map<int, map<string, string> >::iterator q = _updateFailures.find(serial);
+ if(q != _updateFailures.end())
+ {
+ map<string, string> failures = q->second;
+ _updateFailures.erase(q);
+
+ ostringstream os;
+ for(map<string, string>::const_iterator r = failures.begin(); r != failures.end(); ++r)
+ {
+ os << "replication failed on replica `" << r->first << "':\n" << r->second << "\n";
+ }
+ // TODO: XXX
+// Ice::Error err(_traceLevels->logger);
+// err << os.str();
+ }
+ return;
+ }
+ else
+ {
+ wait();
+ }
+ }
+}
+
void
-ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer)
+ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer, const string& name)
{
// This must be called with the mutex locked.
if(!_topic)
@@ -151,6 +250,12 @@ ObserverTopic::subscribeImpl(const Ice::ObjectPrx& observer)
_topic->subscribe(qos, observer);
_waitForSubscribe.erase(observer->ice_getIdentity());
+
+ if(!name.empty())
+ {
+ _syncSubscribers.insert(name);
+ }
+
notifyAll();
}
@@ -171,13 +276,13 @@ ObserverTopic::updateSerial(int serial)
}
Ice::Context
-ObserverTopic::getContext(const string& name, int serial) const
+ObserverTopic::getContext(int serial) const
{
ostringstream os;
os << serial;
Ice::Context context;
- context[name] = os.str();
+ context["serial"] = os.str();
return context;
}
@@ -233,7 +338,7 @@ RegistryObserverTopic::registryDown(const string& name)
}
void
-RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
RegistryObserverPrx observer = RegistryObserverPrx::uncheckedCast(obsv);
RegistryInfoSeq registries;
@@ -247,8 +352,8 @@ RegistryObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
}
serial = _serial;
}
- observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, "registry", serial),
- registries);
+ observer->registryInit_async(new InitCB<AMI_RegistryObserver_registryInit>(this, observer, name, "registry",
+ serial), registries);
}
NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -431,7 +536,7 @@ NodeObserverTopic::nodeDown(const string& name)
}
void
-NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
NodeObserverPrx observer = NodeObserverPrx::uncheckedCast(obsv);
int serial;
@@ -445,7 +550,7 @@ NodeObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
}
serial = _serial;
}
- observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, "node", serial), nodes);
+ observer->nodeInit_async(new InitCB<AMI_NodeObserver_nodeInit>(this, observer, name, "node", serial), nodes);
}
ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerPrx& topicManager,
@@ -472,7 +577,7 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
}
try
{
- _publisher->applicationInit(serial, apps, getContext("application", serial));
+ _publisher->applicationInit(serial, apps, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
@@ -493,13 +598,14 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
_applications.insert(make_pair(info.descriptor.name, info));
try
{
- _publisher->applicationAdded(serial, info, getContext("application", serial));
+ _publisher->applicationAdded(serial, info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -514,13 +620,14 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
_applications.erase(name);
try
{
- _publisher->applicationRemoved(serial, name, getContext("application", serial));
+ _publisher->applicationRemoved(serial, name, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -566,17 +673,18 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
}
try
{
- _publisher->applicationUpdated(serial, info, getContext("application", serial));
+ _publisher->applicationUpdated(serial, info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
-ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
ApplicationObserverPrx observer = ApplicationObserverPrx::uncheckedCast(obsv);
int serial;
@@ -590,8 +698,8 @@ ApplicationObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
applications.push_back(p->second);
}
}
- observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, "application",
- serial),
+ observer->applicationInit_async(new InitCB<AMI_ApplicationObserver_applicationInit>(this, observer, name,
+ "application", serial),
serial, applications);
}
@@ -619,7 +727,7 @@ AdapterObserverTopic::adapterInit(int serial, const AdapterInfoSeq& adpts)
}
try
{
- _publisher->adapterInit(adpts, getContext("adapter", serial));
+ _publisher->adapterInit(adpts, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
@@ -640,13 +748,14 @@ AdapterObserverTopic::adapterAdded(int serial, const AdapterInfo& info)
_adapters.insert(make_pair(info.id, info));
try
{
- _publisher->adapterAdded(info, getContext("adapter", serial));
+ _publisher->adapterAdded(info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -661,13 +770,14 @@ AdapterObserverTopic::adapterUpdated(int serial, const AdapterInfo& info)
_adapters[info.id] = info;
try
{
- _publisher->adapterUpdated(info, getContext("adapter", serial));
+ _publisher->adapterUpdated(info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -682,17 +792,18 @@ AdapterObserverTopic::adapterRemoved(int serial, const string& id)
_adapters.erase(id);
try
{
- _publisher->adapterRemoved(id, getContext("adapter", serial));
+ _publisher->adapterRemoved(id, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
-AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
AdapterObserverPrx observer = AdapterObserverPrx::uncheckedCast(obsv);
int serial;
@@ -706,7 +817,7 @@ AdapterObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
adapters.push_back(p->second);
}
}
- observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, "adapter", serial),
+ observer->adapterInit_async(new InitCB<AMI_AdapterObserver_adapterInit>(this, observer, name, "adapter", serial),
adapters);
}
@@ -734,7 +845,7 @@ ObjectObserverTopic::objectInit(int serial, const ObjectInfoSeq& objects)
}
try
{
- _publisher->objectInit(objects, getContext("object", serial));
+ _publisher->objectInit(objects, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
@@ -755,13 +866,14 @@ ObjectObserverTopic::objectAdded(int serial, const ObjectInfo& info)
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
try
{
- _publisher->objectAdded(info, getContext("object", serial));
+ _publisher->objectAdded(info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -776,13 +888,14 @@ ObjectObserverTopic::objectUpdated(int serial, const ObjectInfo& info)
_objects[info.proxy->ice_getIdentity()] = info;
try
{
- _publisher->objectUpdated(info, getContext("object", serial));
+ _publisher->objectUpdated(info, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
@@ -797,17 +910,18 @@ ObjectObserverTopic::objectRemoved(int serial, const Ice::Identity& id)
_objects.erase(id);
try
{
- _publisher->objectRemoved(id, getContext("object", serial));
+ _publisher->objectRemoved(id, getContext(serial));
}
catch(const Ice::LocalException& ex)
{
Ice::Warning out(_publisher->ice_getCommunicator()->getLogger());
out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
}
+ waitForSyncedSubscribers(serial);
}
void
-ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
+ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv, const string& name)
{
ObjectObserverPrx observer = ObjectObserverPrx::uncheckedCast(obsv);
int serial;
@@ -820,7 +934,8 @@ ObjectObserverTopic::initObserver(const Ice::ObjectPrx& obsv)
objects.push_back(p->second);
}
}
- observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, "object", serial), objects);
+ observer->objectInit_async(new InitCB<AMI_ObjectObserver_objectInit>(this, observer, name, "object", serial),
+ objects);
}
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h
index e17f3c9a438..c561dfa5862 100644
--- a/cpp/src/IceGrid/Topics.h
+++ b/cpp/src/IceGrid/Topics.h
@@ -20,14 +20,6 @@
namespace IceGrid
{
-enum TopicName
-{
- RegistryObserverTopicName,
- NodeObserverTopicName,
- ApplicationObserverTopicName,
- AdapterObserverTopicName,
- ObjectObserverTopicName
-};
class ObserverTopic : public IceUtil::Monitor<IceUtil::Mutex>, virtual public Ice::Object
{
@@ -36,23 +28,30 @@ public:
ObserverTopic(const IceStorm::TopicManagerPrx&, const std::string&);
virtual ~ObserverTopic();
- void subscribe(const Ice::ObjectPrx&, int = -1);
- void subscribeAndWaitForSubscription(const Ice::ObjectPrx&);
- void unsubscribe(const Ice::ObjectPrx&);
+ void subscribe(const Ice::ObjectPrx&, const std::string& = std::string(), int = -1);
+ void subscribeAndWaitForSubscription(const Ice::ObjectPrx&, const std::string& = std::string());
+ void unsubscribe(const Ice::ObjectPrx&, const std::string& = std::string());
void destroy();
- virtual void initObserver(const Ice::ObjectPrx&) = 0;
+ void receivedUpdate(const std::string&, int, const std::string&);
+
+ virtual void initObserver(const Ice::ObjectPrx&, const std::string&) = 0;
protected:
- void subscribeImpl(const Ice::ObjectPrx&);
+ void waitForSyncedSubscribers(int);
+ void subscribeImpl(const Ice::ObjectPrx&, const std::string&);
void updateSerial(int);
- Ice::Context getContext(const std::string&, int) const;
+ Ice::Context getContext(int) const;
IceStorm::TopicPrx _topic;
Ice::ObjectPrx _basePublisher;
std::set<Ice::Identity> _waitForSubscribe;
- int _serial;
+ int _serial;
+
+ std::set<std::string> _syncSubscribers;
+ std::map<int, std::set<std::string> > _waitForUpdates;
+ std::map<int, std::map<std::string, std::string> > _updateFailures;
};
typedef IceUtil::Handle<ObserverTopic> ObserverTopicPtr;
@@ -65,7 +64,7 @@ public:
void registryUp(const RegistryInfo&);
void registryDown(const std::string&);
- virtual void initObserver(const Ice::ObjectPrx&);
+ virtual void initObserver(const Ice::ObjectPrx&, const std::string&);
private:
@@ -89,7 +88,7 @@ public:
const NodeObserverPrx& getPublisher() { return _externalPublisher; }
void nodeDown(const std::string&);
- virtual void initObserver(const Ice::ObjectPrx&);
+ virtual void initObserver(const Ice::ObjectPrx&, const std::string&);
private:
@@ -110,7 +109,7 @@ public:
void applicationRemoved(int, const std::string&);
void applicationUpdated(int, const ApplicationUpdateInfo&);
- virtual void initObserver(const Ice::ObjectPrx&);
+ virtual void initObserver(const Ice::ObjectPrx&, const std::string&);
private:
@@ -130,7 +129,7 @@ public:
void adapterUpdated(int, const AdapterInfo&);
void adapterRemoved(int, const std::string&);
- virtual void initObserver(const Ice::ObjectPrx&);
+ virtual void initObserver(const Ice::ObjectPrx&, const std::string&);
private:
@@ -150,7 +149,7 @@ public:
void objectUpdated(int, const ObjectInfo&);
void objectRemoved(int, const Ice::Identity&);
- virtual void initObserver(const Ice::ObjectPrx&);
+ virtual void initObserver(const Ice::ObjectPrx&, const std::string&);
private: