summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-11-29 09:25:19 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-11-29 09:25:19 +0000
commit55d4301510cd67fc11638255df6b94574f9d49b5 (patch)
treef4a631a5e8de10c511c6bc3ee9d8aad3c6b577d2 /cpp
parentvarious file chooser enhancements (diff)
downloadice-55d4301510cd67fc11638255df6b94574f9d49b5.tar.bz2
ice-55d4301510cd67fc11638255df6b94574f9d49b5.tar.xz
ice-55d4301510cd67fc11638255df6b94574f9d49b5.zip
Fixes
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/IceGrid/Database.cpp566
-rw-r--r--cpp/src/IceGrid/IceGridNode.cpp6
-rw-r--r--cpp/src/IceGrid/NodeI.cpp113
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.cpp92
-rw-r--r--cpp/src/IceGrid/NodeSessionManager.h3
-rw-r--r--cpp/src/IceGrid/ServerCache.cpp24
-rw-r--r--cpp/src/IceGrid/ServerI.cpp225
-rw-r--r--cpp/src/IceGrid/ServerI.h4
-rw-r--r--cpp/src/IceGrid/SessionManager.h2
-rw-r--r--cpp/src/IceGrid/Topics.cpp101
-rw-r--r--cpp/src/IceGrid/Topics.h26
11 files changed, 642 insertions, 520 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 824308ad5a2..b7f26779a04 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -183,82 +183,94 @@ Database::unlock(AdminSessionI* session)
void
Database::syncApplications(const ApplicationInfoSeq& applications)
{
- Lock sync(*this);
-
- Freeze::TransactionHolder txHolder(_connection);
- ServerEntrySeq entries;
- set<string> names;
- for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p)
+ int serial;
{
- try
+ Lock sync(*this);
+
+ Freeze::TransactionHolder txHolder(_connection);
+ ServerEntrySeq entries;
+ set<string> names;
+ for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p)
{
- StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name);
- if(s != _applications.end())
+ try
{
- ApplicationHelper previous(_communicator, s->second.descriptor);
- ApplicationHelper helper(_communicator, p->descriptor);
- reload(previous, helper, entries, p->uuid, p->revision);
+ StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name);
+ if(s != _applications.end())
+ {
+ ApplicationHelper previous(_communicator, s->second.descriptor);
+ ApplicationHelper helper(_communicator, p->descriptor);
+ reload(previous, helper, entries, p->uuid, p->revision);
+ }
+ else
+ {
+ load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
+ }
}
- else
+ catch(const DeploymentException& ex)
{
- load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision);
+ Ice::Warning warn(_traceLevels->logger);
+ warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
}
+ _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
+ names.insert(p->descriptor.name);
}
- catch(const DeploymentException& ex)
- {
- Ice::Warning warn(_traceLevels->logger);
- warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason;
- }
- _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p));
- names.insert(p->descriptor.name);
- }
- StringApplicationInfoDict::iterator s = _applications.begin();
- while(s != _applications.end())
- {
- if(names.find(s->first) == names.end())
+ StringApplicationInfoDict::iterator s = _applications.begin();
+ while(s != _applications.end())
{
- unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
- _applications.erase(s++);
- }
- else
- {
- ++s;
+ if(names.find(s->first) == names.end())
+ {
+ unload(ApplicationHelper(_communicator, s->second.descriptor), entries);
+ _applications.erase(s++);
+ }
+ else
+ {
+ ++s;
+ }
}
- }
- ++_applicationSerial;
+ ++_applicationSerial;
- _applicationObserverTopic->applicationInit(_applicationSerial, applications);
+ serial = _applicationObserverTopic->applicationInit(_applicationSerial, applications);
- txHolder.commit();
+ txHolder.commit();
+ }
+ _applicationObserverTopic->waitForSyncedSubscribers(serial);
}
void
Database::syncAdapters(const AdapterInfoSeq& adapters)
{
- Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- _adapters.clear();
- for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
+ int serial;
{
- _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
+ Lock sync(*this);
+ Freeze::TransactionHolder txHolder(_connection);
+ _adapters.clear();
+ for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r)
+ {
+ _adapters.put(StringAdapterInfoDict::value_type(r->id, *r));
+ }
+ serial = _adapterObserverTopic->adapterInit(adapters);
+ txHolder.commit();
}
- _adapterObserverTopic->adapterInit(adapters);
- txHolder.commit();
+ _adapterObserverTopic->waitForSyncedSubscribers(serial);
}
void
Database::syncObjects(const ObjectInfoSeq& objects)
{
- Lock sync(*this);
- Freeze::TransactionHolder txHolder(_connection);
- _objects.clear();
- for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
+ int serial;
{
- _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
+ Lock sync(*this);
+ Freeze::TransactionHolder txHolder(_connection);
+ _objects.clear();
+ for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q)
+ {
+ _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q));
+ }
+ serial = _objectObserverTopic->objectInit(objects);
+ txHolder.commit();
}
- _objectObserverTopic->objectInit(objects);
- txHolder.commit();
+ _objectObserverTopic->waitForSyncedSubscribers(serial);
}
void
@@ -310,21 +322,24 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session)
}
}
+ int serial;
{
Lock sync(*this);
++_applicationSerial;
_applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info));
- finishUpdating(info.descriptor.name);
- _applicationObserverTopic->applicationAdded(_applicationSerial, info);
+ serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info);
if(_traceLevels->application > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
out << "added application `" << info.descriptor.name << "'";
}
- notifyAll();
}
+
+ _applicationObserverTopic->waitForSyncedSubscribers(serial);
+
+ finishUpdating(info.descriptor.name);
}
void
@@ -457,6 +472,7 @@ void
Database::removeApplication(const string& name, AdminSessionI* session)
{
ServerEntrySeq entries;
+ int serial;
{
Lock sync(*this);
checkSessionLock(session);
@@ -489,7 +505,7 @@ Database::removeApplication(const string& name, AdminSessionI* session)
_applications.erase(p);
++_applicationSerial;
- _applicationObserverTopic->applicationRemoved(_applicationSerial, name);
+ serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name);
if(_traceLevels->application > 0)
{
@@ -509,6 +525,8 @@ Database::removeApplication(const string& name, AdminSessionI* session)
// Ignore, this is traced by the node cache.
}
}
+
+ _applicationObserverTopic->waitForSyncedSubscribers(serial);
}
ApplicationInfo
@@ -602,67 +620,71 @@ Database::getAllocatableObject(const Ice::Identity& id) const
void
Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy)
{
- Lock sync(*this);
- if(_adapterCache.has(adapterId))
+ int serial;
{
- throw AdapterExistsException(adapterId);
- }
+ Lock sync(*this);
+ if(_adapterCache.has(adapterId))
+ {
+ throw AdapterExistsException(adapterId);
+ }
- StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
- AdapterInfo info;
- bool updated = false;
- if(proxy)
- {
- if(p != _adapters.end())
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ AdapterInfo info;
+ bool updated = false;
+ if(proxy)
{
- info = p->second;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- p.set(info);
- updated = true;
+ if(p != _adapters.end())
+ {
+ info = p->second;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ p.set(info);
+ updated = true;
+ }
+ else
+ {
+ info.id = adapterId;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
+ }
}
else
{
- info.id = adapterId;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
- }
- }
- else
- {
- if(p == _adapters.end())
- {
- return;
+ if(p == _adapters.end())
+ {
+ return;
+ }
+ _adapters.erase(p);
}
- _adapters.erase(p);
- }
- if(_traceLevels->adapter > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'";
- if(!replicaGroupId.empty())
+ if(_traceLevels->adapter > 0)
{
- out << " with replica group `" << replicaGroupId << "'";
+ Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
+ out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'";
+ if(!replicaGroupId.empty())
+ {
+ out << " with replica group `" << replicaGroupId << "'";
+ }
}
- }
- if(proxy)
- {
- if(updated)
+ if(proxy)
{
- _adapterObserverTopic->adapterUpdated(info);
+ if(updated)
+ {
+ serial = _adapterObserverTopic->adapterUpdated(info);
+ }
+ else
+ {
+ serial = _adapterObserverTopic->adapterAdded(info);
+ }
}
else
{
- _adapterObserverTopic->adapterAdded(info);
+ serial = _adapterObserverTopic->adapterRemoved(adapterId);
}
}
- else
- {
- _adapterObserverTopic->adapterRemoved(adapterId);
- }
+ _adapterObserverTopic->waitForSyncedSubscribers(serial);
}
Ice::ObjectPrx
@@ -693,61 +715,65 @@ Database::getAdapterDirectProxy(const string& id)
void
Database::removeAdapter(const string& adapterId)
{
- Lock sync(*this);
- if(_adapterCache.has(adapterId))
- {
- AdapterEntryPtr adpt = _adapterCache.get(adapterId);
- DeploymentException ex;
- ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
- ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
- throw ex;
- }
-
- Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator
-
- StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
- AdapterInfoSeq infos;
- if(p != _adapters.end())
- {
- _adapters.erase(p);
- }
- else
+ int serial;
{
- p = _adapters.findByReplicaGroupId(adapterId, true);
- if(p == _adapters.end())
+ Lock sync(*this);
+ if(_adapterCache.has(adapterId))
{
- throw AdapterNotExistException(adapterId);
+ AdapterEntryPtr adpt = _adapterCache.get(adapterId);
+ DeploymentException ex;
+ ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
+ ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
+ throw ex;
}
- while(p != _adapters.end())
+ Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator
+
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ AdapterInfoSeq infos;
+ if(p != _adapters.end())
{
- AdapterInfo info = p->second;
- info.replicaGroupId = "";
- infos.push_back(info);
- _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
- ++p;
+ _adapters.erase(p);
}
- }
-
- if(_traceLevels->adapter > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'";
- }
-
- if(infos.empty())
- {
- _adapterObserverTopic->adapterRemoved(adapterId);
- }
- else
- {
- for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ else
+ {
+ p = _adapters.findByReplicaGroupId(adapterId, true);
+ if(p == _adapters.end())
+ {
+ throw AdapterNotExistException(adapterId);
+ }
+
+ while(p != _adapters.end())
+ {
+ AdapterInfo info = p->second;
+ info.replicaGroupId = "";
+ infos.push_back(info);
+ _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
+ ++p;
+ }
+ }
+
+ if(_traceLevels->adapter > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
+ out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'";
+ }
+
+ if(infos.empty())
{
- _adapterObserverTopic->adapterUpdated(*p);
+ serial = _adapterObserverTopic->adapterRemoved(adapterId);
}
+ else
+ {
+ for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ serial = _adapterObserverTopic->adapterUpdated(*p);
+ }
+ }
+
+ txHolder.commit();
}
-
- txHolder.commit();
+ _adapterObserverTopic->waitForSyncedSubscribers(serial);
}
AdapterEntryPtr
@@ -840,128 +866,143 @@ Database::getAllAdapters(const string& expression)
void
Database::addObject(const ObjectInfo& info)
{
- Lock sync(*this);
- const Ice::Identity id = info.proxy->ice_getIdentity();
-
- if(_objectCache.has(id))
+ int serial;
{
- throw ObjectExistsException(id);
- }
-
- if(_objects.find(id) != _objects.end())
- {
- throw ObjectExistsException(id);
- }
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
-
- _objectObserverTopic->objectAdded(info);
+ Lock sync(*this);
+ const Ice::Identity id = info.proxy->ice_getIdentity();
+
+ if(_objectCache.has(id))
+ {
+ throw ObjectExistsException(id);
+ }
+
+ if(_objects.find(id) != _objects.end())
+ {
+ throw ObjectExistsException(id);
+ }
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ serial = _objectObserverTopic->objectAdded(info);
- if(_traceLevels->object > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "added object `" << _communicator->identityToString(id) << "'";
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "added object `" << _communicator->identityToString(id) << "'";
+ }
}
+ _objectObserverTopic->waitForSyncedSubscribers(serial);
}
void
Database::addOrUpdateObject(const ObjectInfo& info)
{
- Lock sync(*this);
- const Ice::Identity id = info.proxy->ice_getIdentity();
-
- if(_objectCache.has(id))
+ int serial;
{
- throw ObjectExistsException(id);
- }
-
- bool update = _objects.find(id) != _objects.end();
- _objects.put(IdentityObjectInfoDict::value_type(id, info));
-
- if(update)
- {
- _objectObserverTopic->objectUpdated(info);
- }
- else
- {
- _objectObserverTopic->objectAdded(info);
- }
-
- if(_traceLevels->object > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "'";
+ Lock sync(*this);
+ const Ice::Identity id = info.proxy->ice_getIdentity();
+
+ if(_objectCache.has(id))
+ {
+ throw ObjectExistsException(id);
+ }
+
+ bool update = _objects.find(id) != _objects.end();
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ if(update)
+ {
+ serial = _objectObserverTopic->objectUpdated(info);
+ }
+ else
+ {
+ serial = _objectObserverTopic->objectAdded(info);
+ }
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "'";
+ }
}
+ _objectObserverTopic->waitForSyncedSubscribers(serial);
}
void
Database::removeObject(const Ice::Identity& id)
{
- Lock sync(*this);
- if(_objectCache.has(id))
- {
- DeploymentException ex;
- ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `";
- ex.reason += _objectCache.get(id)->getApplication();
- ex.reason += "'";
- throw ex;
- }
-
- IdentityObjectInfoDict::iterator p = _objects.find(id);
- if(p == _objects.end())
+ int serial;
{
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
- _objects.erase(p);
-
+ Lock sync(*this);
+ if(_objectCache.has(id))
+ {
+ DeploymentException ex;
+ ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
- _objectObserverTopic->objectRemoved(id);
-
- if(_traceLevels->object > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "removed object `" << _communicator->identityToString(id) << "'";
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
+ }
+ _objects.erase(p);
+
+ serial = _objectObserverTopic->objectRemoved(id);
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "removed object `" << _communicator->identityToString(id) << "'";
+ }
}
+ _objectObserverTopic->waitForSyncedSubscribers(serial);
}
void
Database::updateObject(const Ice::ObjectPrx& proxy)
{
- Lock sync(*this);
-
- const Ice::Identity id = proxy->ice_getIdentity();
- if(_objectCache.has(id))
- {
- DeploymentException ex;
- ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `";
- ex.reason += _objectCache.get(id)->getApplication();
- ex.reason += "'";
- throw ex;
- }
-
- IdentityObjectInfoDict::iterator p = _objects.find(id);
- if(p == _objects.end())
+ int serial;
{
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
-
- ObjectInfo info;
- info = p->second;
- info.proxy = proxy;
- p.set(info);
+ Lock sync(*this);
+
+ const Ice::Identity id = proxy->ice_getIdentity();
+ if(_objectCache.has(id))
+ {
+ DeploymentException ex;
+ ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
- _objectObserverTopic->objectUpdated(info);
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
+ }
+
+ ObjectInfo info;
+ info = p->second;
+ info.proxy = proxy;
+ p.set(info);
- if(_traceLevels->object > 0)
- {
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "updated object `" << _communicator->identityToString(id) << "'";
+ serial = _objectObserverTopic->objectUpdated(info);
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "updated object `" << _communicator->identityToString(id) << "'";
+ }
}
+ _objectObserverTopic->waitForSyncedSubscribers(serial);
}
int
@@ -1458,27 +1499,31 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries,
//
// Save the application descriptor.
//
- Lock sync(*this);
-
- ApplicationInfo info = oldApp;
- info.updateTime = update.updateTime;
- info.updateUser = update.updateUser;
- info.revision = update.revision;
- info.descriptor = newDesc;
-
- _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
- ++_applicationSerial;
- finishUpdating(update.descriptor.name);
-
- if(_traceLevels->application > 0)
+ int serial;
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
- out << "updated application `" << update.descriptor.name << "'";
- }
-
- _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
+ Lock sync(*this);
+
+ ApplicationInfo info = oldApp;
+ info.updateTime = update.updateTime;
+ info.updateUser = update.updateUser;
+ info.revision = update.revision;
+ info.descriptor = newDesc;
+
+ _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info));
+ ++_applicationSerial;
- notifyAll();
+ if(_traceLevels->application > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat);
+ out << "updated application `" << update.descriptor.name << "'";
+ }
+
+ serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update);
+ }
+
+ _applicationObserverTopic->waitForSyncedSubscribers(serial);
+
+ finishUpdating(update.descriptor.name);
}
void
@@ -1491,7 +1536,8 @@ Database::startUpdating(const string& name)
void
Database::finishUpdating(const string& name)
{
- // Must be called within the synchronization.
+ Lock sync(*this);
+
map<string, vector<AMD_NodeSession_waitForApplicationUpdatePtr> >::iterator p = _updating.find(name);
assert(p != _updating.end());
for(vector<AMD_NodeSession_waitForApplicationUpdatePtr>::const_iterator q = p->second.begin();
@@ -1500,4 +1546,6 @@ Database::finishUpdating(const string& name)
(*q)->ice_response();
}
_updating.erase(p);
+
+ notifyAll();
}
diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp
index cd314f84bd7..337db3a32ec 100644
--- a/cpp/src/IceGrid/IceGridNode.cpp
+++ b/cpp/src/IceGrid/IceGridNode.cpp
@@ -482,6 +482,12 @@ NodeService::start(int argc, char* argv[])
//
_adapter->activate();
+ //
+ // Notify the node session manager that the node can start
+ // accepting incoming connections.
+ //
+ _sessions.activated();
+
string bundleName = properties->getProperty("IceGrid.Node.PrintServersReady");
if(!bundleName.empty() || !desc.empty())
{
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index aae67c97964..81a29709767 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -218,46 +218,53 @@ NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB,
bool fromMaster,
const Ice::Current& current)
{
- Lock sync(*this);
- ++_serial;
-
- Ice::Identity id = createServerIdentity(info.descriptor->id);
-
- //
- // Check if we already have a servant for this server. If that's
- // the case, the server is already loaded and we just need to
- // update it.
- //
- while(true)
+ ServerCommandPtr command;
{
- bool added = false;
- ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(id));
- if(!server)
- {
- ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id));
- server = new ServerI(this, proxy, _serversDir, info.descriptor->id, _waitTime);
- _adapter->add(server, id);
- added = true;
- }
+ Lock sync(*this);
+ ++_serial;
- try
- {
- server->load(amdCB, info, fromMaster);
- }
- catch(const Ice::ObjectNotExistException&)
- {
- assert(!added);
- continue;
- }
- catch(const Ice::Exception&)
+ Ice::Identity id = createServerIdentity(info.descriptor->id);
+
+ //
+ // Check if we already have a servant for this server. If that's
+ // the case, the server is already loaded and we just need to
+ // update it.
+ //
+ while(true)
{
- if(added)
+ bool added = false;
+ ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(id));
+ if(!server)
+ {
+ ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id));
+ server = new ServerI(this, proxy, _serversDir, info.descriptor->id, _waitTime);
+ _adapter->add(server, id);
+ added = true;
+ }
+
+ try
+ {
+ command = server->load(amdCB, info, fromMaster);
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ assert(!added);
+ continue;
+ }
+ catch(const Ice::Exception&)
{
- _adapter->remove(id);
+ if(added)
+ {
+ _adapter->remove(id);
+ }
+ throw;
}
- throw;
+ break;
}
- break;
+ }
+ if(command)
+ {
+ command->execute();
}
}
@@ -268,24 +275,31 @@ NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB,
int revision,
const Ice::Current& current)
{
- Lock sync(*this);
- ++_serial;
-
- ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(serverId)));
- if(!server)
+ ServerCommandPtr command;
{
- server = new ServerI(this, 0, _serversDir, serverId, _waitTime);
- }
-
- //
- // Destroy the server object if it's loaded.
- //
- try
- {
- server->destroy(amdCB, uuid, revision);
+ Lock sync(*this);
+ ++_serial;
+
+ ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(serverId)));
+ if(!server)
+ {
+ server = new ServerI(this, 0, _serversDir, serverId, _waitTime);
+ }
+
+ //
+ // Destroy the server object if it's loaded.
+ //
+ try
+ {
+ command = server->destroy(amdCB, uuid, revision);
+ }
+ catch(const Ice::ObjectNotExistException&)
+ {
+ }
}
- catch(const Ice::ObjectNotExistException&)
+ if(command)
{
+ command->execute();
}
}
@@ -697,7 +711,6 @@ void
NodeI::removeObserver(const NodeSessionPrx& session)
{
IceUtil::Mutex::Lock sync(_observerMutex);
- assert(_observers.find(session) != _observers.end());
_observers.erase(session);
}
diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp
index 87b9ae17eac..6ee09c7fa3c 100644
--- a/cpp/src/IceGrid/NodeSessionManager.cpp
+++ b/cpp/src/IceGrid/NodeSessionManager.cpp
@@ -197,7 +197,8 @@ NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session)
NodeSessionManager::NodeSessionManager() :
_serial(1),
- _destroyed(false)
+ _destroyed(false),
+ _activated(false)
{
}
@@ -241,7 +242,7 @@ NodeSessionManager::create(const NodeIPtr& node)
// to load the servers on the node (when createSession invokes
// loadServers() on the session).
//
- _thread->tryCreateSession(false);
+ _thread->tryCreateSession(true);
}
void
@@ -261,6 +262,25 @@ NodeSessionManager::create(const InternalRegistryPrx& replica)
}
}
+void
+NodeSessionManager::activated()
+{
+ {
+ Lock sync(*this);
+ _activated = true;
+ }
+ NodeSessionPrx session = _thread->getSession();
+ if(!session)
+ {
+ _thread->tryCreateSession(true);
+ session = _thread->getSession();
+ }
+ if(session)
+ {
+ syncServers(session);
+ }
+}
+
bool
NodeSessionManager::waitForCreate()
{
@@ -360,6 +380,7 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
_sessions.swap(sessions);
NodeSessionKeepAliveThreadPtr thread;
+ vector<NodeSessionKeepAliveThreadPtr> newSessions;
for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p)
{
if((*p)->ice_getIdentity() == _master->ice_getIdentity())
@@ -377,6 +398,7 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
thread = new NodeSessionKeepAliveThread(*p, _node, _queryObjects);
thread->start();
thread->tryCreateSession(false);
+ newSessions.push_back(thread);
}
_sessions.insert(make_pair((*p)->ice_getIdentity(), thread));
}
@@ -402,6 +424,44 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas)
{
q->second->getThreadControl().join();
}
+
+ //
+ // If the node is being started, we wait for the new sessions to
+ // be created. This ensures that once the node is activated all
+ // the known replicas are connected.
+ //
+ if(!_activated)
+ {
+ for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator t = newSessions.begin(); t != newSessions.end(); ++t)
+ {
+ (*t)->tryCreateSession(true);
+ }
+ }
+}
+
+void
+NodeSessionManager::syncServers(const NodeSessionPrx& session)
+{
+ //
+ // Ask the session to load the servers on the node. Once this is
+ // done we check the consistency of the node to make sure old
+ // servers are removed.
+ //
+ // NOTE: it's important for this to be done after trying to
+ // register with the replicas. When the master loads the server
+ // some server might get activated and it's better if at that time
+ // the registry replicas (at least the ones which are up) have all
+ // established their session with the node.
+ //
+ assert(session);
+ try
+ {
+ session->loadServers();
+ _node->checkConsistency(session);
+ }
+ catch(const Ice::LocalException&)
+ {
+ }
}
void
@@ -473,27 +533,21 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session)
}
//
- // Ask the master to load the servers on the node. Once this is
- // done we check the consistency of the node to make sure old
- // servers are removed.
+ // Synchronize the servers if the session is active and if the
+ // node adapter has been activated (otherwise, the servers will be
+ // synced after the node adapter activation, see activated()).
//
- // NOTE: it's important for this to be done after trying to
- // register with the replicas. When the master loads the server
- // some server might get activated and it's better if at that time
- // the registry replicas (at least the ones which are up) have all
- // established their session with the node.
- //
- try
+ if(session)
{
- if(session)
+ bool activated;
{
- session->loadServers();
- _node->checkConsistency(session);
+ Lock sync(*this);
+ activated = _activated;
+ }
+ if(activated)
+ {
+ syncServers(session);
}
- }
- catch(const Ice::LocalException&)
- {
- // IGNORE
}
}
diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h
index 6c546b7da8c..5d28f060aca 100644
--- a/cpp/src/IceGrid/NodeSessionManager.h
+++ b/cpp/src/IceGrid/NodeSessionManager.h
@@ -54,6 +54,7 @@ public:
void create(const NodeIPtr&);
void create(const InternalRegistryPrx&);
+ void activated();
bool waitForCreate();
void terminate();
void destroy();
@@ -66,6 +67,7 @@ public:
private:
void syncReplicas(const InternalRegistryPrxSeq&);
+ void syncServers(const NodeSessionPrx&);
class Thread : public NodeSessionKeepAliveThread
{
@@ -100,6 +102,7 @@ private:
InternalRegistryPrx _master;
unsigned long _serial;
bool _destroyed;
+ bool _activated;
typedef std::map<Ice::Identity, NodeSessionKeepAliveThreadPtr> NodeSessionMap;
NodeSessionMap _sessions;
diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp
index 774b8b074f3..8acb32f19da 100644
--- a/cpp/src/IceGrid/ServerCache.cpp
+++ b/cpp/src/IceGrid/ServerCache.cpp
@@ -258,8 +258,6 @@ ServerEntry::update(const ServerInfo& info)
_load = descriptor;
_loaded.reset(0);
-// _proxy = 0;
-// _adapters.clear();
//
// Update the allocatable flag.
@@ -291,8 +289,6 @@ ServerEntry::destroy()
_load.reset(0);
_loaded.reset(0);
- _proxy = 0;
- _adapters.clear();
}
ServerInfo
@@ -348,7 +344,7 @@ ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string&
{
{
Lock sync(*this);
- if(_loaded.get() || _proxy && !upToDate) // Synced or if not up to date is fine
+ if(_loaded.get() || _proxy && _synchronizing && !upToDate) // Synced or if not up to date is fine
{
assert(_loaded.get() || _load.get());
activationTimeout = _activationTimeout;
@@ -364,7 +360,7 @@ ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string&
{
Lock sync(*this);
- if(_loaded.get() || _proxy && !upToDate) // Synced or if not up to date is fine
+ if(_loaded.get() || _proxy && _synchronizing && !upToDate) // Synced or if not up to date is fine
{
assert(_loaded.get() || _load.get());
activationTimeout = _activationTimeout;
@@ -389,7 +385,7 @@ ServerEntry::getAdapter(const string& id, bool upToDate)
{
{
Lock sync(*this);
- if(_loaded.get() || _proxy && !upToDate) // Synced or if not up to date is fine
+ if(_loaded.get() || _proxy && _synchronizing && !upToDate) // Synced or if not up to date is fine
{
AdapterPrxDict::const_iterator p = _adapters.find(id);
if(p != _adapters.end())
@@ -410,7 +406,7 @@ ServerEntry::getAdapter(const string& id, bool upToDate)
{
Lock sync(*this);
- if(_loaded.get() || _proxy && !upToDate) // Synced or if not up to date is fine
+ if(_loaded.get() || _proxy && _synchronizing && !upToDate) // Synced or if not up to date is fine
{
AdapterPrxDict::const_iterator p = _adapters.find(id);
if(p != _adapters.end())
@@ -500,8 +496,6 @@ ServerEntry::syncImpl(bool waitForUpdate)
if(!_load.get() && !_destroy.get())
{
_load = _loaded; // Re-load the current server.
-// _proxy = 0;
-// _adapters.clear();
}
_updated = false;
@@ -657,6 +651,8 @@ ServerEntry::destroyCallback()
{
Lock sync(*this);
_destroy.reset(0);
+ _proxy = 0;
+ _adapters.clear();
if(!_load.get())
{
@@ -703,6 +699,8 @@ ServerEntry::exception(const Ice::Exception& ex)
remove = _destroy.get();
_destroy.reset(0);
_exception.reset(ex.ice_clone());
+ _proxy = 0;
+ _adapters.clear();
_synchronizing = false;
notifyAll();
}
@@ -773,8 +771,6 @@ ServerEntry::allocated(const SessionIPtr& session)
{
_load = _loaded;
}
-// _proxy = 0;
-// _adapters.clear();
_session = session;
_load->sessionId = session->getId();
}
@@ -828,7 +824,7 @@ ServerEntry::allocatedNoSync(const SessionIPtr& session)
_loaded.get() && _loaded->descriptor->activation != "session" ||
_load.get() && _load->descriptor->activation != "session")
{
- return;
+ return;
}
}
@@ -855,8 +851,6 @@ ServerEntry::released(const SessionIPtr& session)
{
_load = _loaded;
}
-// _proxy = 0;
-// _adapters.clear();
_load->sessionId = "";
_session = 0;
}
diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp
index 6ff039f625b..c47f91bf792 100644
--- a/cpp/src/IceGrid/ServerI.cpp
+++ b/cpp/src/IceGrid/ServerI.cpp
@@ -939,156 +939,139 @@ ServerI::start(ServerActivation activation, const AMD_Server_startPtr& amdCB)
}
}
-void
+ServerCommandPtr
ServerI::load(const AMD_Node_loadServerPtr& amdCB, const ServerInfo& info, bool fromMaster)
{
- ServerCommandPtr command;
+ Lock sync(*this);
+ checkDestroyed();
+
+ //
+ // Don't reload the server if:
+ //
+ // - the application uuid, the application revision and the
+ // session id didn't change.
+ //
+ // - the load command if from a slave and the given descriptor
+ // is from another application or doesn't have the same
+ // version.
+ //
+ // - the descriptor and the session id didn't change.
+ //
+ // In any case, we update the server application revision if
+ // it needs to be updated.
+ //
+ if(!_info.uuid.empty() && !fromMaster)
{
- Lock sync(*this);
- checkDestroyed();
-
- //
- // Don't reload the server if:
- //
- // - the application uuid, the application revision and the
- // session id didn't change.
- //
- // - the load command if from a slave and the given descriptor
- // is from another application or doesn't have the same
- // version.
- //
- // - the descriptor and the session id didn't change.
- //
- // In any case, we update the server application revision if
- // it needs to be updated.
- //
- if(!_info.uuid.empty() && !fromMaster)
- {
- if(_info.uuid != info.uuid)
- {
- DeploymentException ex;
- ex.reason = "server descriptor from replica is from another application (`" + info.uuid + "')";
- throw ex;
- }
- else if(_info.revision != info.revision)
- {
- ostringstream os;
- os << "server descriptor from replica has different version:\n";
- os << "current revision: " << _info.revision << "\n";
- os << "replica revision: " << info.revision;
- throw DeploymentException(os.str());
- }
- }
-
- //
- // Otherwise, if the following conditions are met:
- //
- // - the server is already loaded.
- // - the descriptor is from the master and the session id didn't change or it's coming from a slave.
- // - the descriptor is the same as the one loaded.
- //
- // we don't re-load the server. We just return the server
- // proxy and the proxies of its adapters.
- //
- if(_info.descriptor &&
- (!fromMaster || _info.sessionId == info.sessionId) &&
- (_info.uuid == info.uuid && _info.revision == info.revision ||
- descriptorEqual(_node->getCommunicator(), _info.descriptor, info.descriptor)))
- {
- if(_info.uuid == info.uuid && _info.revision < info.revision)
- {
- //
- // If the application was updated but the server didn't change
- // we just update the application revision.
- //
- _info.revision = info.revision;
- updateRevisionFile();
- }
-
- if(amdCB)
- {
- AdapterPrxDict adapters;
- for(ServerAdapterDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
- {
- adapters.insert(make_pair(p->first, p->second->getProxy()));
- }
- amdCB->ice_response(_this, adapters, _activationTimeout, _deactivationTimeout);
- }
- return;
- }
-
- assert(fromMaster || _info.uuid.empty() || _info.uuid == info.uuid && _info.revision <= info.revision);
- if(!StopCommand::isStopped(_state) && !_stop)
+ if(_info.uuid != info.uuid)
{
- _stop = new StopCommand(this, _node->getWaitQueue(), _deactivationTimeout);
+ DeploymentException ex;
+ ex.reason = "server descriptor from replica is from another application (`" + info.uuid + "')";
+ throw ex;
}
- if(!_load)
+ else if(_info.revision != info.revision)
{
- _load = new LoadCommand(this);
+ ostringstream os;
+ os << "server descriptor from replica has different version:\n";
+ os << "current revision: " << _info.revision << "\n";
+ os << "replica revision: " << info.revision;
+ throw DeploymentException(os.str());
}
- _load->setUpdate(info, _destroy);
- if(_destroy && _state != Destroying)
+ }
+
+ //
+ // Otherwise, if the following conditions are met:
+ //
+ // - the server is already loaded.
+ // - the descriptor is from the master and the session id didn't change or it's coming from a slave.
+ // - the descriptor is the same as the one loaded.
+ //
+ // we don't re-load the server. We just return the server
+ // proxy and the proxies of its adapters.
+ //
+ if(_info.descriptor &&
+ (!fromMaster || _info.sessionId == info.sessionId) &&
+ (_info.uuid == info.uuid && _info.revision == info.revision ||
+ descriptorEqual(_node->getCommunicator(), _info.descriptor, info.descriptor)))
+ {
+ if(_info.uuid != info.uuid || _info.revision != info.revision)
{
- _destroy->finished();
- _destroy = 0;
+ _info.uuid = info.uuid;
+ _info.revision = info.revision;
+ updateRevisionFile();
}
+
if(amdCB)
{
- _load->addCallback(amdCB);
+ AdapterPrxDict adapters;
+ for(ServerAdapterDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ adapters.insert(make_pair(p->first, p->second->getProxy()));
+ }
+ amdCB->ice_response(_this, adapters, _activationTimeout, _deactivationTimeout);
}
- command = nextCommand();
+ return 0;
}
- if(command)
+
+ assert(fromMaster || _info.uuid.empty() || _info.uuid == info.uuid && _info.revision == info.revision);
+ if(!StopCommand::isStopped(_state) && !_stop)
{
- command->execute();
+ _stop = new StopCommand(this, _node->getWaitQueue(), _deactivationTimeout);
+ }
+ if(!_load)
+ {
+ _load = new LoadCommand(this);
+ }
+ _load->setUpdate(info, _destroy);
+ if(_destroy && _state != Destroying)
+ {
+ _destroy->finished();
+ _destroy = 0;
+ }
+ if(amdCB)
+ {
+ _load->addCallback(amdCB);
}
+ return nextCommand();
}
-void
+ServerCommandPtr
ServerI::destroy(const AMD_Node_destroyServerPtr& amdCB, const string& uuid, int revision)
{
- ServerCommandPtr command;
- {
- Lock sync(*this);
- checkDestroyed();
+ Lock sync(*this);
+ checkDestroyed();
- if(!uuid.empty()) // Empty if from checkConsistency.
+ if(!uuid.empty()) // Empty if from checkConsistency.
+ {
+ if(_info.uuid.empty())
{
- if(_info.uuid.empty())
- {
- amdCB->ice_response();
- return; // Server doesn't exist.
- }
- else if(_info.uuid != uuid)
- {
- DeploymentException ex;
- ex.reason = "server descriptor from replica is from another application (`" + uuid + "')";
- throw ex;
- }
- else if(_info.revision > revision)
- {
- ostringstream os;
- os << "server descriptor from replica is too old:\n";
- os << "current revision: " << _info.revision << "\n";
- os << "replica revision: " << revision;
- throw DeploymentException(os.str());
- }
+ amdCB->ice_response();
+ return 0; // Server doesn't exist.
}
-
- if(!_destroy)
+ else if(_info.uuid != uuid)
{
- _destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching);
+ DeploymentException ex;
+ ex.reason = "server descriptor from replica is from another application (`" + uuid + "')";
+ throw ex;
}
- if(amdCB)
+ else if(_info.revision > revision)
{
- _destroy->addCallback(amdCB);
+ ostringstream os;
+ os << "server descriptor from replica is too old:\n";
+ os << "current revision: " << _info.revision << "\n";
+ os << "replica revision: " << revision;
+ throw DeploymentException(os.str());
}
- command = nextCommand();
}
- if(command)
+
+ if(!_destroy)
{
- command->execute();
+ _destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching);
+ }
+ if(amdCB)
+ {
+ _destroy->addCallback(amdCB);
}
+ return nextCommand();
}
bool
diff --git a/cpp/src/IceGrid/ServerI.h b/cpp/src/IceGrid/ServerI.h
index 38b21d67933..194e63a8745 100644
--- a/cpp/src/IceGrid/ServerI.h
+++ b/cpp/src/IceGrid/ServerI.h
@@ -90,8 +90,8 @@ public:
DistributionDescriptor getDistribution() const;
void start(ServerActivation, const AMD_Server_startPtr& = AMD_Server_startPtr());
- void load(const AMD_Node_loadServerPtr&, const ServerInfo&, bool);
- void destroy(const AMD_Node_destroyServerPtr&, const std::string&, int);
+ ServerCommandPtr load(const AMD_Node_loadServerPtr&, const ServerInfo&, bool);
+ ServerCommandPtr destroy(const AMD_Node_destroyServerPtr&, const std::string&, int);
bool startPatch(bool);
bool waitForPatch();
void finishPatch();
diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h
index 30326f6a79c..7d659d593ea 100644
--- a/cpp/src/IceGrid/SessionManager.h
+++ b/cpp/src/IceGrid/SessionManager.h
@@ -196,7 +196,7 @@ public:
{
Lock sync(*this);
// Wait until the action is executed and the state changes.
- while((_nextAction == Connect || _nextAction == KeepAlive) || _state == InProgress)
+ while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress)
{
wait();
}
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 34d80ab18a9..968a733ed45 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -72,6 +72,7 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
{
assert(_syncSubscribers.find(name) == _syncSubscribers.end());
_syncSubscribers.insert(name);
+ addExpectedUpdate(_serial, name);
waitForSyncedSubscribersNoSync(_serial, name);
}
}
@@ -146,8 +147,9 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail
if(p->second.empty())
{
_waitForUpdates.erase(p);
- notifyAll();
}
+
+ notifyAll();
}
}
@@ -159,21 +161,33 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name)
}
void
-ObserverTopic::addExpectedUpdate(int serial)
+ObserverTopic::addExpectedUpdate(int serial, const string& name)
{
- if(_syncSubscribers.empty())
+ if(_syncSubscribers.empty() && name.empty())
{
return;
}
// Must be called with the lock held.
- assert(_waitForUpdates[serial].empty());
- _waitForUpdates[serial] = _syncSubscribers;
+ if(name.empty())
+ {
+ assert(_waitForUpdates[serial].empty());
+ _waitForUpdates[serial] = _syncSubscribers;
+ }
+ else
+ {
+ _waitForUpdates[serial].insert(name);
+ }
}
void
ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
{
+ if(serial < 0)
+ {
+ return;
+ }
+
//
// Wait until all the updates are received.
//
@@ -500,13 +514,13 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP
const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher);
}
-void
+int
ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(serial);
_applications.clear();
@@ -523,16 +537,19 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
Ice::Warning out(_logger);
out << "unexpected exception while publishing `applicationInit' update:\n" << ex;
}
+ addExpectedUpdate(serial);
+ return serial;
}
-void
+int
ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
+
updateSerial(serial);
_applications.insert(make_pair(info.descriptor.name, info));
try
@@ -545,16 +562,16 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
out << "unexpected exception while publishing `applicationAdded' update:\n" << ex;
}
addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ return serial;
}
-void
+int
ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(serial);
_applications.erase(name);
@@ -568,16 +585,16 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex;
}
addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ return serial;
}
-void
+int
ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(serial);
@@ -622,7 +639,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex;
}
addExpectedUpdate(serial);
- waitForSyncedSubscribersNoSync(serial);
+ return serial;
}
void
@@ -654,13 +671,13 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi
const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher);
}
-void
+int
AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_adapters.clear();
@@ -677,15 +694,17 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `adapterInit' update:\n" << ex;
}
+ addExpectedUpdate(_serial);
+ return _serial;
}
-void
+int
AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_adapters.insert(make_pair(info.id, info));
@@ -699,16 +718,16 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
out << "unexpected exception while publishing `adapterAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
+int
AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_adapters[info.id] = info;
@@ -722,16 +741,16 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
+int
AdapterObserverTopic::adapterRemoved(const string& id)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_adapters.erase(id);
@@ -745,7 +764,7 @@ AdapterObserverTopic::adapterRemoved(const string& id)
out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
void
@@ -777,13 +796,13 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM
const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher);
}
-void
+int
ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_objects.clear();
@@ -800,15 +819,17 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
Ice::Warning out(_logger);
out << "unexpected exception while publishing `objectInit' update:\n" << ex;
}
+ addExpectedUpdate(_serial);
+ return _serial;
}
-void
+int
ObjectObserverTopic::objectAdded(const ObjectInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
@@ -822,16 +843,16 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info)
out << "unexpected exception while publishing `objectAdded' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
+int
ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_objects[info.proxy->ice_getIdentity()] = info;
@@ -845,16 +866,16 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
out << "unexpected exception while publishing `objectUpdated' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
-void
+int
ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
{
Lock sync(*this);
if(!_topic)
{
- return;
+ return -1;
}
updateSerial(_serial + 1);
_objects.erase(id);
@@ -868,7 +889,7 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
out << "unexpected exception while publishing `objectRemoved' update:\n" << ex;
}
addExpectedUpdate(_serial);
- waitForSyncedSubscribersNoSync(_serial);
+ return _serial;
}
int
@@ -877,7 +898,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
Lock sync(*this);
if(!_topic)
{
- return _serial;
+ return -1;
}
updateSerial(_serial + 1);
@@ -927,7 +948,7 @@ ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
Lock sync(*this);
if(!_topic)
{
- return _serial;
+ return -1;
}
updateSerial(_serial + 1);
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h
index 52748aafb3e..28d5b002b98 100644
--- a/cpp/src/IceGrid/Topics.h
+++ b/cpp/src/IceGrid/Topics.h
@@ -40,7 +40,7 @@ public:
protected:
- void addExpectedUpdate(int);
+ void addExpectedUpdate(int, const std::string& = std::string());
void waitForSyncedSubscribersNoSync(int, const std::string& = std::string());
void updateSerial(int);
Ice::Context getContext(int) const;
@@ -106,10 +106,10 @@ public:
ApplicationObserverTopic(const IceStorm::TopicManagerPrx&, const StringApplicationInfoDict&);
- void applicationInit(int, const ApplicationInfoSeq&);
- void applicationAdded(int, const ApplicationInfo&);
- void applicationRemoved(int, const std::string&);
- void applicationUpdated(int, const ApplicationUpdateInfo&);
+ int applicationInit(int, const ApplicationInfoSeq&);
+ int applicationAdded(int, const ApplicationInfo&);
+ int applicationRemoved(int, const std::string&);
+ int applicationUpdated(int, const ApplicationUpdateInfo&);
virtual void initObserver(const Ice::ObjectPrx&);
@@ -126,10 +126,10 @@ public:
AdapterObserverTopic(const IceStorm::TopicManagerPrx&, const StringAdapterInfoDict&);
- void adapterInit(const AdapterInfoSeq&);
- void adapterAdded(const AdapterInfo&);
- void adapterUpdated(const AdapterInfo&);
- void adapterRemoved(const std::string&);
+ int adapterInit(const AdapterInfoSeq&);
+ int adapterAdded(const AdapterInfo&);
+ int adapterUpdated(const AdapterInfo&);
+ int adapterRemoved(const std::string&);
virtual void initObserver(const Ice::ObjectPrx&);
@@ -146,10 +146,10 @@ public:
ObjectObserverTopic(const IceStorm::TopicManagerPrx&, const IdentityObjectInfoDict&);
- void objectInit(const ObjectInfoSeq&);
- void objectAdded(const ObjectInfo&);
- void objectUpdated(const ObjectInfo&);
- void objectRemoved(const Ice::Identity&);
+ int objectInit(const ObjectInfoSeq&);
+ int objectAdded(const ObjectInfo&);
+ int objectUpdated(const ObjectInfo&);
+ int objectRemoved(const Ice::Identity&);
int objectsAddedOrUpdated(const ObjectInfoSeq&);
int objectsRemoved(const ObjectInfoSeq&);