diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-11-29 09:25:19 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-11-29 09:25:19 +0000 |
commit | 55d4301510cd67fc11638255df6b94574f9d49b5 (patch) | |
tree | f4a631a5e8de10c511c6bc3ee9d8aad3c6b577d2 /cpp/src | |
parent | various file chooser enhancements (diff) | |
download | ice-55d4301510cd67fc11638255df6b94574f9d49b5.tar.bz2 ice-55d4301510cd67fc11638255df6b94574f9d49b5.tar.xz ice-55d4301510cd67fc11638255df6b94574f9d49b5.zip |
Fixes
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 566 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridNode.cpp | 6 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 113 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.cpp | 92 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeSessionManager.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerCache.cpp | 24 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.cpp | 225 | ||||
-rw-r--r-- | cpp/src/IceGrid/ServerI.h | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/SessionManager.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 101 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 26 |
11 files changed, 642 insertions, 520 deletions
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 824308ad5a2..b7f26779a04 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -183,82 +183,94 @@ Database::unlock(AdminSessionI* session) void Database::syncApplications(const ApplicationInfoSeq& applications) { - Lock sync(*this); - - Freeze::TransactionHolder txHolder(_connection); - ServerEntrySeq entries; - set<string> names; - for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) + int serial; { - try + Lock sync(*this); + + Freeze::TransactionHolder txHolder(_connection); + ServerEntrySeq entries; + set<string> names; + for(ApplicationInfoSeq::const_iterator p = applications.begin(); p != applications.end(); ++p) { - StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name); - if(s != _applications.end()) + try { - ApplicationHelper previous(_communicator, s->second.descriptor); - ApplicationHelper helper(_communicator, p->descriptor); - reload(previous, helper, entries, p->uuid, p->revision); + StringApplicationInfoDict::const_iterator s = _applications.find(p->descriptor.name); + if(s != _applications.end()) + { + ApplicationHelper previous(_communicator, s->second.descriptor); + ApplicationHelper helper(_communicator, p->descriptor); + reload(previous, helper, entries, p->uuid, p->revision); + } + else + { + load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision); + } } - else + catch(const DeploymentException& ex) { - load(ApplicationHelper(_communicator, p->descriptor), entries, p->uuid, p->revision); + Ice::Warning warn(_traceLevels->logger); + warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason; } + _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); + names.insert(p->descriptor.name); } - catch(const DeploymentException& ex) - { - Ice::Warning warn(_traceLevels->logger); - warn << "invalid application `" << p->descriptor.name << "':\n" << ex.reason; - } - _applications.put(StringApplicationInfoDict::value_type(p->descriptor.name, *p)); - names.insert(p->descriptor.name); - } - StringApplicationInfoDict::iterator s = _applications.begin(); - while(s != _applications.end()) - { - if(names.find(s->first) == names.end()) + StringApplicationInfoDict::iterator s = _applications.begin(); + while(s != _applications.end()) { - unload(ApplicationHelper(_communicator, s->second.descriptor), entries); - _applications.erase(s++); - } - else - { - ++s; + if(names.find(s->first) == names.end()) + { + unload(ApplicationHelper(_communicator, s->second.descriptor), entries); + _applications.erase(s++); + } + else + { + ++s; + } } - } - ++_applicationSerial; + ++_applicationSerial; - _applicationObserverTopic->applicationInit(_applicationSerial, applications); + serial = _applicationObserverTopic->applicationInit(_applicationSerial, applications); - txHolder.commit(); + txHolder.commit(); + } + _applicationObserverTopic->waitForSyncedSubscribers(serial); } void Database::syncAdapters(const AdapterInfoSeq& adapters) { - Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); - _adapters.clear(); - for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) + int serial; { - _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); + Lock sync(*this); + Freeze::TransactionHolder txHolder(_connection); + _adapters.clear(); + for(AdapterInfoSeq::const_iterator r = adapters.begin(); r != adapters.end(); ++r) + { + _adapters.put(StringAdapterInfoDict::value_type(r->id, *r)); + } + serial = _adapterObserverTopic->adapterInit(adapters); + txHolder.commit(); } - _adapterObserverTopic->adapterInit(adapters); - txHolder.commit(); + _adapterObserverTopic->waitForSyncedSubscribers(serial); } void Database::syncObjects(const ObjectInfoSeq& objects) { - Lock sync(*this); - Freeze::TransactionHolder txHolder(_connection); - _objects.clear(); - for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) + int serial; { - _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); + Lock sync(*this); + Freeze::TransactionHolder txHolder(_connection); + _objects.clear(); + for(ObjectInfoSeq::const_iterator q = objects.begin(); q != objects.end(); ++q) + { + _objects.put(IdentityObjectInfoDict::value_type(q->proxy->ice_getIdentity(), *q)); + } + serial = _objectObserverTopic->objectInit(objects); + txHolder.commit(); } - _objectObserverTopic->objectInit(objects); - txHolder.commit(); + _objectObserverTopic->waitForSyncedSubscribers(serial); } void @@ -310,21 +322,24 @@ Database::addApplication(const ApplicationInfo& info, AdminSessionI* session) } } + int serial; { Lock sync(*this); ++_applicationSerial; _applications.put(StringApplicationInfoDict::value_type(info.descriptor.name, info)); - finishUpdating(info.descriptor.name); - _applicationObserverTopic->applicationAdded(_applicationSerial, info); + serial = _applicationObserverTopic->applicationAdded(_applicationSerial, info); if(_traceLevels->application > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); out << "added application `" << info.descriptor.name << "'"; } - notifyAll(); } + + _applicationObserverTopic->waitForSyncedSubscribers(serial); + + finishUpdating(info.descriptor.name); } void @@ -457,6 +472,7 @@ void Database::removeApplication(const string& name, AdminSessionI* session) { ServerEntrySeq entries; + int serial; { Lock sync(*this); checkSessionLock(session); @@ -489,7 +505,7 @@ Database::removeApplication(const string& name, AdminSessionI* session) _applications.erase(p); ++_applicationSerial; - _applicationObserverTopic->applicationRemoved(_applicationSerial, name); + serial = _applicationObserverTopic->applicationRemoved(_applicationSerial, name); if(_traceLevels->application > 0) { @@ -509,6 +525,8 @@ Database::removeApplication(const string& name, AdminSessionI* session) // Ignore, this is traced by the node cache. } } + + _applicationObserverTopic->waitForSyncedSubscribers(serial); } ApplicationInfo @@ -602,67 +620,71 @@ Database::getAllocatableObject(const Ice::Identity& id) const void Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) { - Lock sync(*this); - if(_adapterCache.has(adapterId)) + int serial; { - throw AdapterExistsException(adapterId); - } + Lock sync(*this); + if(_adapterCache.has(adapterId)) + { + throw AdapterExistsException(adapterId); + } - StringAdapterInfoDict::iterator p = _adapters.find(adapterId); - AdapterInfo info; - bool updated = false; - if(proxy) - { - if(p != _adapters.end()) + StringAdapterInfoDict::iterator p = _adapters.find(adapterId); + AdapterInfo info; + bool updated = false; + if(proxy) { - info = p->second; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - p.set(info); - updated = true; + if(p != _adapters.end()) + { + info = p->second; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; + p.set(info); + updated = true; + } + else + { + info.id = adapterId; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; + _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); + } } else { - info.id = adapterId; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); - } - } - else - { - if(p == _adapters.end()) - { - return; + if(p == _adapters.end()) + { + return; + } + _adapters.erase(p); } - _adapters.erase(p); - } - if(_traceLevels->adapter > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'"; - if(!replicaGroupId.empty()) + if(_traceLevels->adapter > 0) { - out << " with replica group `" << replicaGroupId << "'"; + Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); + out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'"; + if(!replicaGroupId.empty()) + { + out << " with replica group `" << replicaGroupId << "'"; + } } - } - if(proxy) - { - if(updated) + if(proxy) { - _adapterObserverTopic->adapterUpdated(info); + if(updated) + { + serial = _adapterObserverTopic->adapterUpdated(info); + } + else + { + serial = _adapterObserverTopic->adapterAdded(info); + } } else { - _adapterObserverTopic->adapterAdded(info); + serial = _adapterObserverTopic->adapterRemoved(adapterId); } } - else - { - _adapterObserverTopic->adapterRemoved(adapterId); - } + _adapterObserverTopic->waitForSyncedSubscribers(serial); } Ice::ObjectPrx @@ -693,61 +715,65 @@ Database::getAdapterDirectProxy(const string& id) void Database::removeAdapter(const string& adapterId) { - Lock sync(*this); - if(_adapterCache.has(adapterId)) - { - AdapterEntryPtr adpt = _adapterCache.get(adapterId); - DeploymentException ex; - ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; - ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; - throw ex; - } - - Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator - - StringAdapterInfoDict::iterator p = _adapters.find(adapterId); - AdapterInfoSeq infos; - if(p != _adapters.end()) - { - _adapters.erase(p); - } - else + int serial; { - p = _adapters.findByReplicaGroupId(adapterId, true); - if(p == _adapters.end()) + Lock sync(*this); + if(_adapterCache.has(adapterId)) { - throw AdapterNotExistException(adapterId); + AdapterEntryPtr adpt = _adapterCache.get(adapterId); + DeploymentException ex; + ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; + ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; + throw ex; } - while(p != _adapters.end()) + Freeze::TransactionHolder txHolder(_connection); // Required because of the iterator + + StringAdapterInfoDict::iterator p = _adapters.find(adapterId); + AdapterInfoSeq infos; + if(p != _adapters.end()) { - AdapterInfo info = p->second; - info.replicaGroupId = ""; - infos.push_back(info); - _adapters.put(StringAdapterInfoDict::value_type(p->first, info)); - ++p; + _adapters.erase(p); } - } - - if(_traceLevels->adapter > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'"; - } - - if(infos.empty()) - { - _adapterObserverTopic->adapterRemoved(adapterId); - } - else - { - for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + else + { + p = _adapters.findByReplicaGroupId(adapterId, true); + if(p == _adapters.end()) + { + throw AdapterNotExistException(adapterId); + } + + while(p != _adapters.end()) + { + AdapterInfo info = p->second; + info.replicaGroupId = ""; + infos.push_back(info); + _adapters.put(StringAdapterInfoDict::value_type(p->first, info)); + ++p; + } + } + + if(_traceLevels->adapter > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); + out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'"; + } + + if(infos.empty()) { - _adapterObserverTopic->adapterUpdated(*p); + serial = _adapterObserverTopic->adapterRemoved(adapterId); } + else + { + for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + serial = _adapterObserverTopic->adapterUpdated(*p); + } + } + + txHolder.commit(); } - - txHolder.commit(); + _adapterObserverTopic->waitForSyncedSubscribers(serial); } AdapterEntryPtr @@ -840,128 +866,143 @@ Database::getAllAdapters(const string& expression) void Database::addObject(const ObjectInfo& info) { - Lock sync(*this); - const Ice::Identity id = info.proxy->ice_getIdentity(); - - if(_objectCache.has(id)) + int serial; { - throw ObjectExistsException(id); - } - - if(_objects.find(id) != _objects.end()) - { - throw ObjectExistsException(id); - } - _objects.put(IdentityObjectInfoDict::value_type(id, info)); - - _objectObserverTopic->objectAdded(info); + Lock sync(*this); + const Ice::Identity id = info.proxy->ice_getIdentity(); + + if(_objectCache.has(id)) + { + throw ObjectExistsException(id); + } + + if(_objects.find(id) != _objects.end()) + { + throw ObjectExistsException(id); + } + _objects.put(IdentityObjectInfoDict::value_type(id, info)); + + serial = _objectObserverTopic->objectAdded(info); - if(_traceLevels->object > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - out << "added object `" << _communicator->identityToString(id) << "'"; + if(_traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "added object `" << _communicator->identityToString(id) << "'"; + } } + _objectObserverTopic->waitForSyncedSubscribers(serial); } void Database::addOrUpdateObject(const ObjectInfo& info) { - Lock sync(*this); - const Ice::Identity id = info.proxy->ice_getIdentity(); - - if(_objectCache.has(id)) + int serial; { - throw ObjectExistsException(id); - } - - bool update = _objects.find(id) != _objects.end(); - _objects.put(IdentityObjectInfoDict::value_type(id, info)); - - if(update) - { - _objectObserverTopic->objectUpdated(info); - } - else - { - _objectObserverTopic->objectAdded(info); - } - - if(_traceLevels->object > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "'"; + Lock sync(*this); + const Ice::Identity id = info.proxy->ice_getIdentity(); + + if(_objectCache.has(id)) + { + throw ObjectExistsException(id); + } + + bool update = _objects.find(id) != _objects.end(); + _objects.put(IdentityObjectInfoDict::value_type(id, info)); + + if(update) + { + serial = _objectObserverTopic->objectUpdated(info); + } + else + { + serial = _objectObserverTopic->objectAdded(info); + } + + if(_traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << (!update ? "added" : "updated") << " object `" << _communicator->identityToString(id) << "'"; + } } + _objectObserverTopic->waitForSyncedSubscribers(serial); } void Database::removeObject(const Ice::Identity& id) { - Lock sync(*this); - if(_objectCache.has(id)) - { - DeploymentException ex; - ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n"; - ex.reason += "the object was added with the application descriptor `"; - ex.reason += _objectCache.get(id)->getApplication(); - ex.reason += "'"; - throw ex; - } - - IdentityObjectInfoDict::iterator p = _objects.find(id); - if(p == _objects.end()) + int serial; { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } - _objects.erase(p); - + Lock sync(*this); + if(_objectCache.has(id)) + { + DeploymentException ex; + ex.reason = "removing object `" + _communicator->identityToString(id) + "' is not allowed:\n"; + ex.reason += "the object was added with the application descriptor `"; + ex.reason += _objectCache.get(id)->getApplication(); + ex.reason += "'"; + throw ex; + } - _objectObserverTopic->objectRemoved(id); - - if(_traceLevels->object > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - out << "removed object `" << _communicator->identityToString(id) << "'"; + IdentityObjectInfoDict::iterator p = _objects.find(id); + if(p == _objects.end()) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; + } + _objects.erase(p); + + serial = _objectObserverTopic->objectRemoved(id); + + if(_traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "removed object `" << _communicator->identityToString(id) << "'"; + } } + _objectObserverTopic->waitForSyncedSubscribers(serial); } void Database::updateObject(const Ice::ObjectPrx& proxy) { - Lock sync(*this); - - const Ice::Identity id = proxy->ice_getIdentity(); - if(_objectCache.has(id)) - { - DeploymentException ex; - ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n"; - ex.reason += "the object was added with the application descriptor `"; - ex.reason += _objectCache.get(id)->getApplication(); - ex.reason += "'"; - throw ex; - } - - IdentityObjectInfoDict::iterator p = _objects.find(id); - if(p == _objects.end()) + int serial; { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } - - ObjectInfo info; - info = p->second; - info.proxy = proxy; - p.set(info); + Lock sync(*this); + + const Ice::Identity id = proxy->ice_getIdentity(); + if(_objectCache.has(id)) + { + DeploymentException ex; + ex.reason = "updating object `" + _communicator->identityToString(id) + "' is not allowed:\n"; + ex.reason += "the object was added with the application descriptor `"; + ex.reason += _objectCache.get(id)->getApplication(); + ex.reason += "'"; + throw ex; + } - _objectObserverTopic->objectUpdated(info); + IdentityObjectInfoDict::iterator p = _objects.find(id); + if(p == _objects.end()) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; + } + + ObjectInfo info; + info = p->second; + info.proxy = proxy; + p.set(info); - if(_traceLevels->object > 0) - { - Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - out << "updated object `" << _communicator->identityToString(id) << "'"; + serial = _objectObserverTopic->objectUpdated(info); + + if(_traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "updated object `" << _communicator->identityToString(id) << "'"; + } } + _objectObserverTopic->waitForSyncedSubscribers(serial); } int @@ -1458,27 +1499,31 @@ Database::finishApplicationUpdate(ServerEntrySeq& entries, // // Save the application descriptor. // - Lock sync(*this); - - ApplicationInfo info = oldApp; - info.updateTime = update.updateTime; - info.updateUser = update.updateUser; - info.revision = update.revision; - info.descriptor = newDesc; - - _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info)); - ++_applicationSerial; - finishUpdating(update.descriptor.name); - - if(_traceLevels->application > 0) + int serial; { - Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); - out << "updated application `" << update.descriptor.name << "'"; - } - - _applicationObserverTopic->applicationUpdated(_applicationSerial, update); + Lock sync(*this); + + ApplicationInfo info = oldApp; + info.updateTime = update.updateTime; + info.updateUser = update.updateUser; + info.revision = update.revision; + info.descriptor = newDesc; + + _applications.put(StringApplicationInfoDict::value_type(update.descriptor.name, info)); + ++_applicationSerial; - notifyAll(); + if(_traceLevels->application > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->applicationCat); + out << "updated application `" << update.descriptor.name << "'"; + } + + serial = _applicationObserverTopic->applicationUpdated(_applicationSerial, update); + } + + _applicationObserverTopic->waitForSyncedSubscribers(serial); + + finishUpdating(update.descriptor.name); } void @@ -1491,7 +1536,8 @@ Database::startUpdating(const string& name) void Database::finishUpdating(const string& name) { - // Must be called within the synchronization. + Lock sync(*this); + map<string, vector<AMD_NodeSession_waitForApplicationUpdatePtr> >::iterator p = _updating.find(name); assert(p != _updating.end()); for(vector<AMD_NodeSession_waitForApplicationUpdatePtr>::const_iterator q = p->second.begin(); @@ -1500,4 +1546,6 @@ Database::finishUpdating(const string& name) (*q)->ice_response(); } _updating.erase(p); + + notifyAll(); } diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp index cd314f84bd7..337db3a32ec 100644 --- a/cpp/src/IceGrid/IceGridNode.cpp +++ b/cpp/src/IceGrid/IceGridNode.cpp @@ -482,6 +482,12 @@ NodeService::start(int argc, char* argv[]) // _adapter->activate(); + // + // Notify the node session manager that the node can start + // accepting incoming connections. + // + _sessions.activated(); + string bundleName = properties->getProperty("IceGrid.Node.PrintServersReady"); if(!bundleName.empty() || !desc.empty()) { diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index aae67c97964..81a29709767 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -218,46 +218,53 @@ NodeI::loadServer_async(const AMD_Node_loadServerPtr& amdCB, bool fromMaster, const Ice::Current& current) { - Lock sync(*this); - ++_serial; - - Ice::Identity id = createServerIdentity(info.descriptor->id); - - // - // Check if we already have a servant for this server. If that's - // the case, the server is already loaded and we just need to - // update it. - // - while(true) + ServerCommandPtr command; { - bool added = false; - ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(id)); - if(!server) - { - ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id)); - server = new ServerI(this, proxy, _serversDir, info.descriptor->id, _waitTime); - _adapter->add(server, id); - added = true; - } + Lock sync(*this); + ++_serial; - try - { - server->load(amdCB, info, fromMaster); - } - catch(const Ice::ObjectNotExistException&) - { - assert(!added); - continue; - } - catch(const Ice::Exception&) + Ice::Identity id = createServerIdentity(info.descriptor->id); + + // + // Check if we already have a servant for this server. If that's + // the case, the server is already loaded and we just need to + // update it. + // + while(true) { - if(added) + bool added = false; + ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(id)); + if(!server) + { + ServerPrx proxy = ServerPrx::uncheckedCast(_adapter->createProxy(id)); + server = new ServerI(this, proxy, _serversDir, info.descriptor->id, _waitTime); + _adapter->add(server, id); + added = true; + } + + try + { + command = server->load(amdCB, info, fromMaster); + } + catch(const Ice::ObjectNotExistException&) + { + assert(!added); + continue; + } + catch(const Ice::Exception&) { - _adapter->remove(id); + if(added) + { + _adapter->remove(id); + } + throw; } - throw; + break; } - break; + } + if(command) + { + command->execute(); } } @@ -268,24 +275,31 @@ NodeI::destroyServer_async(const AMD_Node_destroyServerPtr& amdCB, int revision, const Ice::Current& current) { - Lock sync(*this); - ++_serial; - - ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(serverId))); - if(!server) + ServerCommandPtr command; { - server = new ServerI(this, 0, _serversDir, serverId, _waitTime); - } - - // - // Destroy the server object if it's loaded. - // - try - { - server->destroy(amdCB, uuid, revision); + Lock sync(*this); + ++_serial; + + ServerIPtr server = ServerIPtr::dynamicCast(_adapter->find(createServerIdentity(serverId))); + if(!server) + { + server = new ServerI(this, 0, _serversDir, serverId, _waitTime); + } + + // + // Destroy the server object if it's loaded. + // + try + { + command = server->destroy(amdCB, uuid, revision); + } + catch(const Ice::ObjectNotExistException&) + { + } } - catch(const Ice::ObjectNotExistException&) + if(command) { + command->execute(); } } @@ -697,7 +711,6 @@ void NodeI::removeObserver(const NodeSessionPrx& session) { IceUtil::Mutex::Lock sync(_observerMutex); - assert(_observers.find(session) != _observers.end()); _observers.erase(session); } diff --git a/cpp/src/IceGrid/NodeSessionManager.cpp b/cpp/src/IceGrid/NodeSessionManager.cpp index 87b9ae17eac..6ee09c7fa3c 100644 --- a/cpp/src/IceGrid/NodeSessionManager.cpp +++ b/cpp/src/IceGrid/NodeSessionManager.cpp @@ -197,7 +197,8 @@ NodeSessionKeepAliveThread::keepAlive(const NodeSessionPrx& session) NodeSessionManager::NodeSessionManager() : _serial(1), - _destroyed(false) + _destroyed(false), + _activated(false) { } @@ -241,7 +242,7 @@ NodeSessionManager::create(const NodeIPtr& node) // to load the servers on the node (when createSession invokes // loadServers() on the session). // - _thread->tryCreateSession(false); + _thread->tryCreateSession(true); } void @@ -261,6 +262,25 @@ NodeSessionManager::create(const InternalRegistryPrx& replica) } } +void +NodeSessionManager::activated() +{ + { + Lock sync(*this); + _activated = true; + } + NodeSessionPrx session = _thread->getSession(); + if(!session) + { + _thread->tryCreateSession(true); + session = _thread->getSession(); + } + if(session) + { + syncServers(session); + } +} + bool NodeSessionManager::waitForCreate() { @@ -360,6 +380,7 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) _sessions.swap(sessions); NodeSessionKeepAliveThreadPtr thread; + vector<NodeSessionKeepAliveThreadPtr> newSessions; for(InternalRegistryPrxSeq::const_iterator p = replicas.begin(); p != replicas.end(); ++p) { if((*p)->ice_getIdentity() == _master->ice_getIdentity()) @@ -377,6 +398,7 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) thread = new NodeSessionKeepAliveThread(*p, _node, _queryObjects); thread->start(); thread->tryCreateSession(false); + newSessions.push_back(thread); } _sessions.insert(make_pair((*p)->ice_getIdentity(), thread)); } @@ -402,6 +424,44 @@ NodeSessionManager::syncReplicas(const InternalRegistryPrxSeq& replicas) { q->second->getThreadControl().join(); } + + // + // If the node is being started, we wait for the new sessions to + // be created. This ensures that once the node is activated all + // the known replicas are connected. + // + if(!_activated) + { + for(vector<NodeSessionKeepAliveThreadPtr>::const_iterator t = newSessions.begin(); t != newSessions.end(); ++t) + { + (*t)->tryCreateSession(true); + } + } +} + +void +NodeSessionManager::syncServers(const NodeSessionPrx& session) +{ + // + // Ask the session to load the servers on the node. Once this is + // done we check the consistency of the node to make sure old + // servers are removed. + // + // NOTE: it's important for this to be done after trying to + // register with the replicas. When the master loads the server + // some server might get activated and it's better if at that time + // the registry replicas (at least the ones which are up) have all + // established their session with the node. + // + assert(session); + try + { + session->loadServers(); + _node->checkConsistency(session); + } + catch(const Ice::LocalException&) + { + } } void @@ -473,27 +533,21 @@ NodeSessionManager::createdSession(const NodeSessionPrx& session) } // - // Ask the master to load the servers on the node. Once this is - // done we check the consistency of the node to make sure old - // servers are removed. + // Synchronize the servers if the session is active and if the + // node adapter has been activated (otherwise, the servers will be + // synced after the node adapter activation, see activated()). // - // NOTE: it's important for this to be done after trying to - // register with the replicas. When the master loads the server - // some server might get activated and it's better if at that time - // the registry replicas (at least the ones which are up) have all - // established their session with the node. - // - try + if(session) { - if(session) + bool activated; { - session->loadServers(); - _node->checkConsistency(session); + Lock sync(*this); + activated = _activated; + } + if(activated) + { + syncServers(session); } - } - catch(const Ice::LocalException&) - { - // IGNORE } } diff --git a/cpp/src/IceGrid/NodeSessionManager.h b/cpp/src/IceGrid/NodeSessionManager.h index 6c546b7da8c..5d28f060aca 100644 --- a/cpp/src/IceGrid/NodeSessionManager.h +++ b/cpp/src/IceGrid/NodeSessionManager.h @@ -54,6 +54,7 @@ public: void create(const NodeIPtr&); void create(const InternalRegistryPrx&); + void activated(); bool waitForCreate(); void terminate(); void destroy(); @@ -66,6 +67,7 @@ public: private: void syncReplicas(const InternalRegistryPrxSeq&); + void syncServers(const NodeSessionPrx&); class Thread : public NodeSessionKeepAliveThread { @@ -100,6 +102,7 @@ private: InternalRegistryPrx _master; unsigned long _serial; bool _destroyed; + bool _activated; typedef std::map<Ice::Identity, NodeSessionKeepAliveThreadPtr> NodeSessionMap; NodeSessionMap _sessions; diff --git a/cpp/src/IceGrid/ServerCache.cpp b/cpp/src/IceGrid/ServerCache.cpp index 774b8b074f3..8acb32f19da 100644 --- a/cpp/src/IceGrid/ServerCache.cpp +++ b/cpp/src/IceGrid/ServerCache.cpp @@ -258,8 +258,6 @@ ServerEntry::update(const ServerInfo& info) _load = descriptor; _loaded.reset(0); -// _proxy = 0; -// _adapters.clear(); // // Update the allocatable flag. @@ -291,8 +289,6 @@ ServerEntry::destroy() _load.reset(0); _loaded.reset(0); - _proxy = 0; - _adapters.clear(); } ServerInfo @@ -348,7 +344,7 @@ ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& { { Lock sync(*this); - if(_loaded.get() || _proxy && !upToDate) // Synced or if not up to date is fine + if(_loaded.get() || _proxy && _synchronizing && !upToDate) // Synced or if not up to date is fine { assert(_loaded.get() || _load.get()); activationTimeout = _activationTimeout; @@ -364,7 +360,7 @@ ServerEntry::getProxy(int& activationTimeout, int& deactivationTimeout, string& { Lock sync(*this); - if(_loaded.get() || _proxy && !upToDate) // Synced or if not up to date is fine + if(_loaded.get() || _proxy && _synchronizing && !upToDate) // Synced or if not up to date is fine { assert(_loaded.get() || _load.get()); activationTimeout = _activationTimeout; @@ -389,7 +385,7 @@ ServerEntry::getAdapter(const string& id, bool upToDate) { { Lock sync(*this); - if(_loaded.get() || _proxy && !upToDate) // Synced or if not up to date is fine + if(_loaded.get() || _proxy && _synchronizing && !upToDate) // Synced or if not up to date is fine { AdapterPrxDict::const_iterator p = _adapters.find(id); if(p != _adapters.end()) @@ -410,7 +406,7 @@ ServerEntry::getAdapter(const string& id, bool upToDate) { Lock sync(*this); - if(_loaded.get() || _proxy && !upToDate) // Synced or if not up to date is fine + if(_loaded.get() || _proxy && _synchronizing && !upToDate) // Synced or if not up to date is fine { AdapterPrxDict::const_iterator p = _adapters.find(id); if(p != _adapters.end()) @@ -500,8 +496,6 @@ ServerEntry::syncImpl(bool waitForUpdate) if(!_load.get() && !_destroy.get()) { _load = _loaded; // Re-load the current server. -// _proxy = 0; -// _adapters.clear(); } _updated = false; @@ -657,6 +651,8 @@ ServerEntry::destroyCallback() { Lock sync(*this); _destroy.reset(0); + _proxy = 0; + _adapters.clear(); if(!_load.get()) { @@ -703,6 +699,8 @@ ServerEntry::exception(const Ice::Exception& ex) remove = _destroy.get(); _destroy.reset(0); _exception.reset(ex.ice_clone()); + _proxy = 0; + _adapters.clear(); _synchronizing = false; notifyAll(); } @@ -773,8 +771,6 @@ ServerEntry::allocated(const SessionIPtr& session) { _load = _loaded; } -// _proxy = 0; -// _adapters.clear(); _session = session; _load->sessionId = session->getId(); } @@ -828,7 +824,7 @@ ServerEntry::allocatedNoSync(const SessionIPtr& session) _loaded.get() && _loaded->descriptor->activation != "session" || _load.get() && _load->descriptor->activation != "session") { - return; + return; } } @@ -855,8 +851,6 @@ ServerEntry::released(const SessionIPtr& session) { _load = _loaded; } -// _proxy = 0; -// _adapters.clear(); _load->sessionId = ""; _session = 0; } diff --git a/cpp/src/IceGrid/ServerI.cpp b/cpp/src/IceGrid/ServerI.cpp index 6ff039f625b..c47f91bf792 100644 --- a/cpp/src/IceGrid/ServerI.cpp +++ b/cpp/src/IceGrid/ServerI.cpp @@ -939,156 +939,139 @@ ServerI::start(ServerActivation activation, const AMD_Server_startPtr& amdCB) } } -void +ServerCommandPtr ServerI::load(const AMD_Node_loadServerPtr& amdCB, const ServerInfo& info, bool fromMaster) { - ServerCommandPtr command; + Lock sync(*this); + checkDestroyed(); + + // + // Don't reload the server if: + // + // - the application uuid, the application revision and the + // session id didn't change. + // + // - the load command if from a slave and the given descriptor + // is from another application or doesn't have the same + // version. + // + // - the descriptor and the session id didn't change. + // + // In any case, we update the server application revision if + // it needs to be updated. + // + if(!_info.uuid.empty() && !fromMaster) { - Lock sync(*this); - checkDestroyed(); - - // - // Don't reload the server if: - // - // - the application uuid, the application revision and the - // session id didn't change. - // - // - the load command if from a slave and the given descriptor - // is from another application or doesn't have the same - // version. - // - // - the descriptor and the session id didn't change. - // - // In any case, we update the server application revision if - // it needs to be updated. - // - if(!_info.uuid.empty() && !fromMaster) - { - if(_info.uuid != info.uuid) - { - DeploymentException ex; - ex.reason = "server descriptor from replica is from another application (`" + info.uuid + "')"; - throw ex; - } - else if(_info.revision != info.revision) - { - ostringstream os; - os << "server descriptor from replica has different version:\n"; - os << "current revision: " << _info.revision << "\n"; - os << "replica revision: " << info.revision; - throw DeploymentException(os.str()); - } - } - - // - // Otherwise, if the following conditions are met: - // - // - the server is already loaded. - // - the descriptor is from the master and the session id didn't change or it's coming from a slave. - // - the descriptor is the same as the one loaded. - // - // we don't re-load the server. We just return the server - // proxy and the proxies of its adapters. - // - if(_info.descriptor && - (!fromMaster || _info.sessionId == info.sessionId) && - (_info.uuid == info.uuid && _info.revision == info.revision || - descriptorEqual(_node->getCommunicator(), _info.descriptor, info.descriptor))) - { - if(_info.uuid == info.uuid && _info.revision < info.revision) - { - // - // If the application was updated but the server didn't change - // we just update the application revision. - // - _info.revision = info.revision; - updateRevisionFile(); - } - - if(amdCB) - { - AdapterPrxDict adapters; - for(ServerAdapterDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) - { - adapters.insert(make_pair(p->first, p->second->getProxy())); - } - amdCB->ice_response(_this, adapters, _activationTimeout, _deactivationTimeout); - } - return; - } - - assert(fromMaster || _info.uuid.empty() || _info.uuid == info.uuid && _info.revision <= info.revision); - if(!StopCommand::isStopped(_state) && !_stop) + if(_info.uuid != info.uuid) { - _stop = new StopCommand(this, _node->getWaitQueue(), _deactivationTimeout); + DeploymentException ex; + ex.reason = "server descriptor from replica is from another application (`" + info.uuid + "')"; + throw ex; } - if(!_load) + else if(_info.revision != info.revision) { - _load = new LoadCommand(this); + ostringstream os; + os << "server descriptor from replica has different version:\n"; + os << "current revision: " << _info.revision << "\n"; + os << "replica revision: " << info.revision; + throw DeploymentException(os.str()); } - _load->setUpdate(info, _destroy); - if(_destroy && _state != Destroying) + } + + // + // Otherwise, if the following conditions are met: + // + // - the server is already loaded. + // - the descriptor is from the master and the session id didn't change or it's coming from a slave. + // - the descriptor is the same as the one loaded. + // + // we don't re-load the server. We just return the server + // proxy and the proxies of its adapters. + // + if(_info.descriptor && + (!fromMaster || _info.sessionId == info.sessionId) && + (_info.uuid == info.uuid && _info.revision == info.revision || + descriptorEqual(_node->getCommunicator(), _info.descriptor, info.descriptor))) + { + if(_info.uuid != info.uuid || _info.revision != info.revision) { - _destroy->finished(); - _destroy = 0; + _info.uuid = info.uuid; + _info.revision = info.revision; + updateRevisionFile(); } + if(amdCB) { - _load->addCallback(amdCB); + AdapterPrxDict adapters; + for(ServerAdapterDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + adapters.insert(make_pair(p->first, p->second->getProxy())); + } + amdCB->ice_response(_this, adapters, _activationTimeout, _deactivationTimeout); } - command = nextCommand(); + return 0; } - if(command) + + assert(fromMaster || _info.uuid.empty() || _info.uuid == info.uuid && _info.revision == info.revision); + if(!StopCommand::isStopped(_state) && !_stop) { - command->execute(); + _stop = new StopCommand(this, _node->getWaitQueue(), _deactivationTimeout); + } + if(!_load) + { + _load = new LoadCommand(this); + } + _load->setUpdate(info, _destroy); + if(_destroy && _state != Destroying) + { + _destroy->finished(); + _destroy = 0; + } + if(amdCB) + { + _load->addCallback(amdCB); } + return nextCommand(); } -void +ServerCommandPtr ServerI::destroy(const AMD_Node_destroyServerPtr& amdCB, const string& uuid, int revision) { - ServerCommandPtr command; - { - Lock sync(*this); - checkDestroyed(); + Lock sync(*this); + checkDestroyed(); - if(!uuid.empty()) // Empty if from checkConsistency. + if(!uuid.empty()) // Empty if from checkConsistency. + { + if(_info.uuid.empty()) { - if(_info.uuid.empty()) - { - amdCB->ice_response(); - return; // Server doesn't exist. - } - else if(_info.uuid != uuid) - { - DeploymentException ex; - ex.reason = "server descriptor from replica is from another application (`" + uuid + "')"; - throw ex; - } - else if(_info.revision > revision) - { - ostringstream os; - os << "server descriptor from replica is too old:\n"; - os << "current revision: " << _info.revision << "\n"; - os << "replica revision: " << revision; - throw DeploymentException(os.str()); - } + amdCB->ice_response(); + return 0; // Server doesn't exist. } - - if(!_destroy) + else if(_info.uuid != uuid) { - _destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching); + DeploymentException ex; + ex.reason = "server descriptor from replica is from another application (`" + uuid + "')"; + throw ex; } - if(amdCB) + else if(_info.revision > revision) { - _destroy->addCallback(amdCB); + ostringstream os; + os << "server descriptor from replica is too old:\n"; + os << "current revision: " << _info.revision << "\n"; + os << "replica revision: " << revision; + throw DeploymentException(os.str()); } - command = nextCommand(); } - if(command) + + if(!_destroy) { - command->execute(); + _destroy = new DestroyCommand(this, _state != Inactive && _state != Loading && _state != Patching); + } + if(amdCB) + { + _destroy->addCallback(amdCB); } + return nextCommand(); } bool diff --git a/cpp/src/IceGrid/ServerI.h b/cpp/src/IceGrid/ServerI.h index 38b21d67933..194e63a8745 100644 --- a/cpp/src/IceGrid/ServerI.h +++ b/cpp/src/IceGrid/ServerI.h @@ -90,8 +90,8 @@ public: DistributionDescriptor getDistribution() const; void start(ServerActivation, const AMD_Server_startPtr& = AMD_Server_startPtr()); - void load(const AMD_Node_loadServerPtr&, const ServerInfo&, bool); - void destroy(const AMD_Node_destroyServerPtr&, const std::string&, int); + ServerCommandPtr load(const AMD_Node_loadServerPtr&, const ServerInfo&, bool); + ServerCommandPtr destroy(const AMD_Node_destroyServerPtr&, const std::string&, int); bool startPatch(bool); bool waitForPatch(); void finishPatch(); diff --git a/cpp/src/IceGrid/SessionManager.h b/cpp/src/IceGrid/SessionManager.h index 30326f6a79c..7d659d593ea 100644 --- a/cpp/src/IceGrid/SessionManager.h +++ b/cpp/src/IceGrid/SessionManager.h @@ -196,7 +196,7 @@ public: { Lock sync(*this); // Wait until the action is executed and the state changes. - while((_nextAction == Connect || _nextAction == KeepAlive) || _state == InProgress) + while(_nextAction == Connect || _nextAction == KeepAlive || _state == InProgress) { wait(); } diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index 34d80ab18a9..968a733ed45 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -72,6 +72,7 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) { assert(_syncSubscribers.find(name) == _syncSubscribers.end()); _syncSubscribers.insert(name); + addExpectedUpdate(_serial, name); waitForSyncedSubscribersNoSync(_serial, name); } } @@ -146,8 +147,9 @@ ObserverTopic::receivedUpdate(const string& name, int serial, const string& fail if(p->second.empty()) { _waitForUpdates.erase(p); - notifyAll(); } + + notifyAll(); } } @@ -159,21 +161,33 @@ ObserverTopic::waitForSyncedSubscribers(int serial, const string& name) } void -ObserverTopic::addExpectedUpdate(int serial) +ObserverTopic::addExpectedUpdate(int serial, const string& name) { - if(_syncSubscribers.empty()) + if(_syncSubscribers.empty() && name.empty()) { return; } // Must be called with the lock held. - assert(_waitForUpdates[serial].empty()); - _waitForUpdates[serial] = _syncSubscribers; + if(name.empty()) + { + assert(_waitForUpdates[serial].empty()); + _waitForUpdates[serial] = _syncSubscribers; + } + else + { + _waitForUpdates[serial].insert(name); + } } void ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name) { + if(serial < 0) + { + return; + } + // // Wait until all the updates are received. // @@ -500,13 +514,13 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher); } -void +int ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(serial); _applications.clear(); @@ -523,16 +537,19 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& Ice::Warning out(_logger); out << "unexpected exception while publishing `applicationInit' update:\n" << ex; } + addExpectedUpdate(serial); + return serial; } -void +int ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } + updateSerial(serial); _applications.insert(make_pair(info.descriptor.name, info)); try @@ -545,16 +562,16 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in out << "unexpected exception while publishing `applicationAdded' update:\n" << ex; } addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + return serial; } -void +int ApplicationObserverTopic::applicationRemoved(int serial, const string& name) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(serial); _applications.erase(name); @@ -568,16 +585,16 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name) out << "unexpected exception while publishing `applicationRemoved' update:\n" << ex; } addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + return serial; } -void +int ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(serial); @@ -622,7 +639,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate out << "unexpected exception while publishing `applicationUpdated' update:\n" << ex; } addExpectedUpdate(serial); - waitForSyncedSubscribersNoSync(serial); + return serial; } void @@ -654,13 +671,13 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher); } -void +int AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _adapters.clear(); @@ -677,15 +694,17 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts) Ice::Warning out(_logger); out << "unexpected exception while publishing `adapterInit' update:\n" << ex; } + addExpectedUpdate(_serial); + return _serial; } -void +int AdapterObserverTopic::adapterAdded(const AdapterInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _adapters.insert(make_pair(info.id, info)); @@ -699,16 +718,16 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info) out << "unexpected exception while publishing `adapterAdded' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void +int AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _adapters[info.id] = info; @@ -722,16 +741,16 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info) out << "unexpected exception while publishing `adapterUpdated' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void +int AdapterObserverTopic::adapterRemoved(const string& id) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _adapters.erase(id); @@ -745,7 +764,7 @@ AdapterObserverTopic::adapterRemoved(const string& id) out << "unexpected exception while publishing `adapterRemoved' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } void @@ -777,13 +796,13 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher); } -void +int ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _objects.clear(); @@ -800,15 +819,17 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects) Ice::Warning out(_logger); out << "unexpected exception while publishing `objectInit' update:\n" << ex; } + addExpectedUpdate(_serial); + return _serial; } -void +int ObjectObserverTopic::objectAdded(const ObjectInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); @@ -822,16 +843,16 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info) out << "unexpected exception while publishing `objectAdded' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void +int ObjectObserverTopic::objectUpdated(const ObjectInfo& info) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _objects[info.proxy->ice_getIdentity()] = info; @@ -845,16 +866,16 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info) out << "unexpected exception while publishing `objectUpdated' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } -void +int ObjectObserverTopic::objectRemoved(const Ice::Identity& id) { Lock sync(*this); if(!_topic) { - return; + return -1; } updateSerial(_serial + 1); _objects.erase(id); @@ -868,7 +889,7 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id) out << "unexpected exception while publishing `objectRemoved' update:\n" << ex; } addExpectedUpdate(_serial); - waitForSyncedSubscribersNoSync(_serial); + return _serial; } int @@ -877,7 +898,7 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos) Lock sync(*this); if(!_topic) { - return _serial; + return -1; } updateSerial(_serial + 1); @@ -927,7 +948,7 @@ ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos) Lock sync(*this); if(!_topic) { - return _serial; + return -1; } updateSerial(_serial + 1); diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index 52748aafb3e..28d5b002b98 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -40,7 +40,7 @@ public: protected: - void addExpectedUpdate(int); + void addExpectedUpdate(int, const std::string& = std::string()); void waitForSyncedSubscribersNoSync(int, const std::string& = std::string()); void updateSerial(int); Ice::Context getContext(int) const; @@ -106,10 +106,10 @@ public: ApplicationObserverTopic(const IceStorm::TopicManagerPrx&, const StringApplicationInfoDict&); - void applicationInit(int, const ApplicationInfoSeq&); - void applicationAdded(int, const ApplicationInfo&); - void applicationRemoved(int, const std::string&); - void applicationUpdated(int, const ApplicationUpdateInfo&); + int applicationInit(int, const ApplicationInfoSeq&); + int applicationAdded(int, const ApplicationInfo&); + int applicationRemoved(int, const std::string&); + int applicationUpdated(int, const ApplicationUpdateInfo&); virtual void initObserver(const Ice::ObjectPrx&); @@ -126,10 +126,10 @@ public: AdapterObserverTopic(const IceStorm::TopicManagerPrx&, const StringAdapterInfoDict&); - void adapterInit(const AdapterInfoSeq&); - void adapterAdded(const AdapterInfo&); - void adapterUpdated(const AdapterInfo&); - void adapterRemoved(const std::string&); + int adapterInit(const AdapterInfoSeq&); + int adapterAdded(const AdapterInfo&); + int adapterUpdated(const AdapterInfo&); + int adapterRemoved(const std::string&); virtual void initObserver(const Ice::ObjectPrx&); @@ -146,10 +146,10 @@ public: ObjectObserverTopic(const IceStorm::TopicManagerPrx&, const IdentityObjectInfoDict&); - void objectInit(const ObjectInfoSeq&); - void objectAdded(const ObjectInfo&); - void objectUpdated(const ObjectInfo&); - void objectRemoved(const Ice::Identity&); + int objectInit(const ObjectInfoSeq&); + int objectAdded(const ObjectInfo&); + int objectUpdated(const ObjectInfo&); + int objectRemoved(const Ice::Identity&); int objectsAddedOrUpdated(const ObjectInfoSeq&); int objectsRemoved(const ObjectInfoSeq&); |