diff options
author | Benoit Foucher <benoit@zeroc.com> | 2006-04-05 12:54:15 +0000 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2006-04-05 12:54:15 +0000 |
commit | 872fd9e5dfb743a648c64bf774f6c9a76a45b651 (patch) | |
tree | c071c5dc5bd1177c0913e82cb8a3233f0da50ea6 /cpp/src | |
parent | adding timeout test (diff) | |
download | ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.bz2 ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.xz ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.zip |
- Added support for observing adapters and objects.
- Lots of cleanup in the IceGrid registry initilization method.
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/AdminI.cpp | 20 | ||||
-rw-r--r-- | cpp/src/IceGrid/AdminI.h | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/Database.cpp | 330 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridNode.cpp | 48 | ||||
-rw-r--r-- | cpp/src/IceGrid/IceGridRegistry.cpp | 16 | ||||
-rw-r--r-- | cpp/src/IceGrid/LocatorI.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/NodeI.cpp | 2 | ||||
-rw-r--r-- | cpp/src/IceGrid/Parser.cpp | 12 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.cpp | 293 | ||||
-rw-r--r-- | cpp/src/IceGrid/RegistryI.h | 3 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 184 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.h | 20 |
12 files changed, 506 insertions, 428 deletions
diff --git a/cpp/src/IceGrid/AdminI.cpp b/cpp/src/IceGrid/AdminI.cpp index 30319133b76..25f76afaf03 100644 --- a/cpp/src/IceGrid/AdminI.cpp +++ b/cpp/src/IceGrid/AdminI.cpp @@ -574,32 +574,32 @@ AdminI::isServerEnabled(const ::std::string& id, const Ice::Current&) const } } -StringObjectProxyDict -AdminI::getAdapterEndpoints(const string& adapterId, const Current&) const +AdapterInfoSeq +AdminI::getAdapterInfo(const string& id, const Current&) const { int count; - vector<pair<string, AdapterPrx> > adapters = _database->getAdapters(adapterId, true, count); - StringObjectProxyDict adpts; + vector<pair<string, AdapterPrx> > adapters = _database->getAdapters(id, true, count); + AdapterInfoSeq adpts; for(vector<pair<string, AdapterPrx> >::const_iterator p = adapters.begin(); p != adapters.end(); ++p) { + AdapterInfo info; + info.id = p->first; + info.replicaGroupId = p->first != id ? id : ""; if(p->second) { try { - adpts[p->first] = p->second->getDirectProxy(); + info.proxy = p->second->getDirectProxy(); } catch(const Ice::ObjectNotExistException&) { + continue; } catch(const Ice::Exception&) { - adpts[p->first] = 0; } } - else - { - adpts[p->first] = 0; - } + adpts.push_back(info); } return adpts; } diff --git a/cpp/src/IceGrid/AdminI.h b/cpp/src/IceGrid/AdminI.h index 8c0b8e0d57f..095476f3cc0 100644 --- a/cpp/src/IceGrid/AdminI.h +++ b/cpp/src/IceGrid/AdminI.h @@ -52,7 +52,7 @@ public: virtual void enableServer(const ::std::string&, bool, const Ice::Current&); virtual bool isServerEnabled(const ::std::string&, const Ice::Current&) const; - virtual StringObjectProxyDict getAdapterEndpoints(const ::std::string&, const ::Ice::Current&) const; + virtual AdapterInfoSeq getAdapterInfo(const ::std::string&, const ::Ice::Current&) const; virtual void removeAdapter(const std::string&, const Ice::Current&); virtual Ice::StringSeq getAllAdapterIds(const ::Ice::Current&) const; diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp index 8a62f3c0ef0..65f963b25f8 100644 --- a/cpp/src/IceGrid/Database.cpp +++ b/cpp/src/IceGrid/Database.cpp @@ -182,6 +182,8 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb { int serial; ApplicationDescriptorSeq applications; + AdapterInfoSeq adapters; + ObjectInfoSeq objects; { Lock sync(*this); _registryObserver = registryObserver; @@ -192,12 +194,26 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb { applications.push_back(p->second); } + + for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p) + { + adapters.push_back(p->second); + if(adapters.back().id.empty()) + { + adapters.back().id = p->first; + } + } + + for(IdentityObjectInfoDict::const_iterator p = _objects.begin(); p != _objects.end(); ++p) + { + objects.push_back(p->second); + } } // // Notify the observers. // - _registryObserver->init(serial, applications); + _registryObserver->init(serial, applications, adapters, objects); } void @@ -563,9 +579,9 @@ Database::getAllNodeServers(const string& node) bool Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy) { - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - StringAdapterInfoDict adapters(connection, _adapterDbName); - if(proxy) + AdapterInfo info; + int serial; + bool updated = false; { Lock sync(*this); if(_adapterCache.has(adapterId)) @@ -573,65 +589,64 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr return false; } - StringAdapterInfoDict::iterator p = adapters.find(adapterId); - if(p != adapters.end()) + StringAdapterInfoDict::iterator p = _adapters.find(adapterId); + if(proxy) { - AdapterInfo info = p->second; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - p.set(info); - if(_traceLevels->adapter > 0) + if(p != _adapters.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "updated adapter `" << adapterId << "'"; - if(!replicaGroupId.empty()) - { - out << " with replica group `" << replicaGroupId << "'"; - } + info = p->second; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; + p.set(info); + updated = true; } + else + { + info.id = adapterId; + info.proxy = proxy; + info.replicaGroupId = replicaGroupId; + _adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); + } } else { - AdapterInfo info; - info.proxy = proxy; - info.replicaGroupId = replicaGroupId; - adapters.put(StringAdapterInfoDict::value_type(adapterId, info)); - if(_traceLevels->adapter > 0) + if(p == _adapters.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "added adapter `" << adapterId << "'"; - if(!replicaGroupId.empty()) - { - out << " with replica group `" << replicaGroupId << "'"; - } + return true; } - } - return true; + _adapters.erase(p); + } + + serial = ++_serial; } - else + + + if(_traceLevels->adapter > 0) { - Lock sync(*this); - if(_adapterCache.has(adapterId)) + Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); + out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'"; + if(!replicaGroupId.empty()) { - return false; - } + out << " with replica group `" << replicaGroupId << "'"; + } + } - StringAdapterInfoDict::iterator p = adapters.find(adapterId); - if(p == adapters.end()) + if(proxy) + { + if(updated) { - return true; + _registryObserver->adapterUpdated(serial, info); } - AdapterInfo info = p->second; - adapters.erase(p); - - if(_traceLevels->adapter > 0) + else { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed adapter `" << adapterId << "'"; + _registryObserver->adapterAdded(serial, info); } - - return true; } + else + { + _registryObserver->adapterRemoved(serial, adapterId); + } + return true; } Ice::ObjectPrx @@ -650,62 +665,73 @@ Database::getAdapterDirectProxy(const string& adapterId) void Database::removeAdapter(const string& adapterId) { - try - { - AdapterEntryPtr adpt = _adapterCache.get(adapterId); - DeploymentException ex; - ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; - ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; - throw ex; - } - catch(const AdapterNotExistException&) + AdapterInfoSeq infos; + int serial; { - } + Lock sync(*this); + if(_adapterCache.has(adapterId)) + { + AdapterEntryPtr adpt = _adapterCache.get(adapterId); + DeploymentException ex; + ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n"; + ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'"; + throw ex; + } - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); + Freeze::TransactionHolder txHolder(_connection); - { - StringAdapterInfoDict adapters(connection, _adapterDbName); - StringAdapterInfoDict::iterator p = adapters.find(adapterId); - if(p != adapters.end()) + StringAdapterInfoDict::iterator p = _adapters.find(adapterId); + if(p != _adapters.end()) { - AdapterInfo info = p->second; - adapters.erase(p); - - if(_traceLevels->adapter > 0) + _adapters.erase(p); + } + else + { + p = _adapters.findByReplicaGroupId(adapterId, true); + if(p == _adapters.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed adapter `" << adapterId << "'"; + throw AdapterNotExistException(adapterId); + } + + while(p != _adapters.end()) + { + AdapterInfo info = p->second; + info.replicaGroupId = ""; + infos.push_back(info); + _adapters.put(StringAdapterInfoDict::value_type(p->first, info)); + ++p; } - return; } - } - { - Freeze::TransactionHolder txHolder(connection); - StringAdapterInfoDict adapters(connection, _adapterDbName); + txHolder.commit(); - StringAdapterInfoDict::iterator p = adapters.findByReplicaGroupId(adapterId, true); - if(p == adapters.end()) + if(infos.empty()) { - throw AdapterNotExistException(adapterId); + serial = ++_serial; } - - while(p != adapters.end()) + else { - AdapterInfo info = p->second; - info.replicaGroupId = ""; - adapters.put(StringAdapterInfoDict::value_type(p->first, info)); - ++p; + serial = _serial; + _serial += infos.size(); } - - txHolder.commit(); } if(_traceLevels->adapter > 0) { Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat); - out << "removed replica group `" << adapterId << "'"; + out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'"; + } + + if(infos.empty()) + { + _registryObserver->adapterRemoved(serial, adapterId); + } + else + { + for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p) + { + _registryObserver->adapterUpdated(++serial, *p); + } } } @@ -744,7 +770,7 @@ Database::getAdapters(const string& id, bool allRegistered, int& endpointCount) Ice::Identity identity; identity.category = "IceGridAdapter"; identity.name = id; - Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity)->ice_collocationOptimization(false); + Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity); adpts.push_back(make_pair(id, AdapterPrx::uncheckedCast(adpt))); endpointCount = 1; return adpts; @@ -809,21 +835,28 @@ Database::getAllAdapters(const string& expression) void Database::addObject(const ObjectInfo& info) { - Lock sync(*this); - + int serial; const Ice::Identity id = info.proxy->ice_getIdentity(); - if(_objectCache.has(id)) { - throw ObjectExistsException(id); - } - - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - if(objects.find(id) != objects.end()) - { - throw ObjectExistsException(id); + Lock sync(*this); + if(_objectCache.has(id)) + { + throw ObjectExistsException(id); + } + + if(_objects.find(id) != _objects.end()) + { + throw ObjectExistsException(id); + } + _objects.put(IdentityObjectInfoDict::value_type(id, info)); + + serial = ++_serial; } - objects.put(IdentityObjectInfoDict::value_type(id, info)); + + // + // Notify the observers. + // + _registryObserver->objectAdded(serial, info); if(_traceLevels->object > 0) { @@ -835,33 +868,40 @@ Database::addObject(const ObjectInfo& info) void Database::removeObject(const Ice::Identity& id) { - try - { - ObjectEntryPtr obj = _objectCache.get(id); - DeploymentException ex; - ex.reason = "removing object `" + Ice::identityToString(id) + "' is not allowed:\n"; - ex.reason += "the object was added with the application descriptor `" + obj->getApplication() + "'"; - throw ex; - } - catch(const ObjectNotRegisteredException&) + int serial; { - } + Lock sync(*this); + if(_objectCache.has(id)) + { + DeploymentException ex; + ex.reason = "removing object `" + Ice::identityToString(id) + "' is not allowed:\n"; + ex.reason += "the object was added with the application descriptor `"; + ex.reason += _objectCache.get(id)->getApplication(); + ex.reason += "'"; + throw ex; + } - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - if(objects.find(id) == objects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; - } - if(objects.erase(id) > 0) - { - if(_traceLevels->object > 0) + IdentityObjectInfoDict::iterator p = _objects.find(id); + if(p == _objects.end()) { - Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); - out << "removed object `" << Ice::identityToString(id) << "'"; + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; } + _objects.erase(p); + + serial = ++_serial; + } + + // + // Notify the observers. + // + _registryObserver->objectRemoved(serial, id); + + if(_traceLevels->object > 0) + { + Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat); + out << "removed object `" << Ice::identityToString(id) << "'"; } } @@ -869,31 +909,39 @@ void Database::updateObject(const Ice::ObjectPrx& proxy) { const Ice::Identity id = proxy->ice_getIdentity(); - try - { - ObjectEntryPtr obj = _objectCache.get(id); - DeploymentException ex; - ex.reason = "updating object `" + Ice::identityToString(id) + "' is not allowed:\n"; - ex.reason += "the object was added with the application descriptor `" + obj->getApplication() + "'"; - throw ex; - } - catch(const ObjectNotRegisteredException&) + int serial; + ObjectInfo info; { - } + Lock sync(*this); + if(_objectCache.has(id)) + { + DeploymentException ex; + ex.reason = "updating object `" + Ice::identityToString(id) + "' is not allowed:\n"; + ex.reason += "the object was added with the application descriptor `"; + ex.reason += _objectCache.get(id)->getApplication(); + ex.reason += "'"; + throw ex; + } - Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName); - IdentityObjectInfoDict objects(connection, _objectDbName); - IdentityObjectInfoDict::iterator p = objects.find(id); - if(p == objects.end()) - { - ObjectNotRegisteredException ex; - ex.id = id; - throw ex; + IdentityObjectInfoDict::iterator p = _objects.find(id); + if(p == _objects.end()) + { + ObjectNotRegisteredException ex; + ex.id = id; + throw ex; + } + + info = p->second; + info.proxy = proxy; + p.set(info); + + serial = ++_serial; } - ObjectInfo info = p->second; - info.proxy = proxy; - p.set(info); + // + // Notify the observers. + // + _registryObserver->objectUpdated(serial, info); if(_traceLevels->object > 0) { diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp index 9d84cc3899f..72aa45694f0 100644 --- a/cpp/src/IceGrid/IceGridNode.cpp +++ b/cpp/src/IceGrid/IceGridNode.cpp @@ -246,17 +246,33 @@ NodeService::start(int argc, char* argv[]) // termination listener instead? // properties->setProperty("Ice.ServerIdleTime", "0"); - if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10) <= 10) + + // + // Warn the user that setting Ice.ThreadPool.Server isn't useful. + // + if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0) > 0) { - properties->setProperty("Ice.ThreadPool.Server.Size", "10"); + Warning out(communicator()->getLogger()); + out << "setting `Ice.ThreadPool.Server.Size' is not useful,\n"; + out << "you should set individual adapter thread pools instead."; } - if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.SizeWarn", 80) <= 80) + + int size = properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.Size", 0); + if(size <= 0) { - properties->setProperty("Ice.ThreadPool.Server.SizeWarn", "80"); + properties->setProperty("IceGrid.Node.ThreadPool.Size", "1"); } - if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.SizeMax", 100) <= 100) + int sizeMax = properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.SizeMax", 0); + if(sizeMax <= 0) { - properties->setProperty("Ice.ThreadPool.Server.SizeMax", "100"); + if(size >= sizeMax) + { + sizeMax = size * 10; + } + + ostringstream os; + os << sizeMax; + properties->setProperty("IceGrid.Node.ThreadPool.SizeMax", os.str()); } @@ -271,22 +287,6 @@ NodeService::start(int argc, char* argv[]) // if(properties->getPropertyAsInt("IceGrid.Node.CollocateRegistry") > 0) { - // - // The node needs a different thread pool. - // - if(properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.Size", 5) <= 0) - { - properties->setProperty("IceGrid.Node.ThreadPool.Size", "5"); - } - if(properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.SizeWarn", 80) <= 80) - { - properties->setProperty("IceGrid.Node.ThreadPool.SizeWarn", "80"); - } - if(properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.SizeMax", 100) <= 100) - { - properties->setProperty("IceGrid.Node.ThreadPool.SizeMax", "100"); - } - _registry = new CollocatedRegistry(communicator(), _activator); if(!_registry->start(nowarn)) { @@ -586,9 +586,11 @@ NodeService::initializeCommunicator(int& argc, char* argv[]) PropertiesPtr defaultProperties = getDefaultProperties(argc, argv); // - // Make sure that IceGridNode doesn't use thread-per-connection. + // Make sure that IceGridNode doesn't use thread-per-connection or + // collocation optimization // defaultProperties->setProperty("Ice.ThreadPerConnection", ""); + defaultProperties->setProperty("Ice.Default.CollocationOptimization", "0"); return Service::initializeCommunicator(argc, argv); } diff --git a/cpp/src/IceGrid/IceGridRegistry.cpp b/cpp/src/IceGrid/IceGridRegistry.cpp index 80ec6a4180b..a44c2faac6b 100644 --- a/cpp/src/IceGrid/IceGridRegistry.cpp +++ b/cpp/src/IceGrid/IceGridRegistry.cpp @@ -84,12 +84,16 @@ RegistryService::start(int argc, char* argv[]) return false; } - PropertiesPtr properties = communicator()->getProperties(); - if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 5) <= 5) + // + // Warn the user that setting Ice.ThreadPool.Server isn't useful. + // + if(communicator()->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0) > 0) { - properties->setProperty("Ice.ThreadPool.Server.Size", "5"); + Warning out(communicator()->getLogger()); + out << "setting `Ice.ThreadPool.Server.Size' is not useful,\n"; + out << "you should set individual adapter thread pools instead."; } - + _registry = new RegistryI(communicator()); if(!_registry->start(nowarn)) { @@ -112,9 +116,11 @@ RegistryService::initializeCommunicator(int& argc, char* argv[]) PropertiesPtr defaultProperties = getDefaultProperties(argc, argv); // - // Make sure that IceGridRegistry doesn't use thread-per-connection. + // Make sure that IceGridRegistry doesn't use + // thread-per-connection or collocation optimization. // defaultProperties->setProperty("Ice.ThreadPerConnection", ""); + defaultProperties->setProperty("Ice.Default.CollocationOptimization", "0"); return Service::initializeCommunicator(argc, argv); } diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp index 0284e0a95e7..84f7c002803 100644 --- a/cpp/src/IceGrid/LocatorI.cpp +++ b/cpp/src/IceGrid/LocatorI.cpp @@ -286,7 +286,7 @@ LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator, const Ice::LocatorRegistryPrx& locatorRegistry) : _communicator(communicator), _database(database), - _locatorRegistry(locatorRegistry) + _locatorRegistry(Ice::LocatorRegistryPrx::uncheckedCast(locatorRegistry->ice_collocationOptimization(false))) { } @@ -396,7 +396,7 @@ LocatorI::getDirectProxyException(const AdapterPrx& adapter, const string& id, c return; } } - catch(const Ice::Exception&) + catch(const Ice::Exception& ex) { // // TODO: Add a warning!!! diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp index 1c612f32ed1..fe1b4122de4 100644 --- a/cpp/src/IceGrid/NodeI.cpp +++ b/cpp/src/IceGrid/NodeI.cpp @@ -550,7 +550,7 @@ NodeI::keepAlive() { try { - Ice::ObjectPrx obj = getCommunicator()->stringToProxy(_instName + "/Registry@IceGrid.Registry.Internal"); + Ice::ObjectPrx obj = getCommunicator()->stringToProxy(_instName + "/Registry"); RegistryPrx registry = RegistryPrx::uncheckedCast(obj); NodeObserverPrx observer; setSession(registry->registerNode(_name, _proxy, _platform.getNodeInfo(), observer), observer); diff --git a/cpp/src/IceGrid/Parser.cpp b/cpp/src/IceGrid/Parser.cpp index 0224e3659a5..faeaa4fb9da 100644 --- a/cpp/src/IceGrid/Parser.cpp +++ b/cpp/src/IceGrid/Parser.cpp @@ -939,18 +939,18 @@ Parser::endpointsAdapter(const list<string>& args) try { string adapterId = args.front(); - StringObjectProxyDict proxies = _admin->getAdapterEndpoints(adapterId); - if(proxies.size() == 1 && proxies.begin()->first == adapterId) + AdapterInfoSeq adpts = _admin->getAdapterInfo(adapterId); + if(adpts.size() == 1 && adpts.begin()->id == adapterId) { - string endpoints = _communicator->proxyToString(proxies.begin()->second); + string endpoints = _communicator->proxyToString(adpts.begin()->proxy); cout << (endpoints.empty() ? "<inactive>" : endpoints) << endl; } else { - for(StringObjectProxyDict::const_iterator p = proxies.begin(); p != proxies.end(); ++p) + for(AdapterInfoSeq::const_iterator p = adpts.begin(); p != adpts.end(); ++p) { - cout << (p->first.empty() ? "<empty>" : p->first) << ": "; - string endpoints = _communicator->proxyToString(p->second); + cout << (p->id.empty() ? "<empty>" : p->id) << ": "; + string endpoints = _communicator->proxyToString(p->proxy); cout << (endpoints.empty() ? "<inactive>" : endpoints) << endl; } } diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp index 57b0efd85ca..d30c09e1af0 100644 --- a/cpp/src/IceGrid/RegistryI.cpp +++ b/cpp/src/IceGrid/RegistryI.cpp @@ -39,25 +39,7 @@ using namespace std; using namespace Ice; using namespace IceGrid; -namespace IceStorm -{ -}; - -namespace IceGrid -{ - -string -intToString(int v) -{ - ostringstream os; - os << v; - return os.str(); -} - -}; - -RegistryI::RegistryI(const CommunicatorPtr& communicator) : - _communicator(communicator) +RegistryI::RegistryI(const CommunicatorPtr& communicator) : _communicator(communicator) { } @@ -132,87 +114,12 @@ RegistryI::start(bool nowarn) properties->setProperty("IceGrid.Registry.Client.AdapterId", ""); properties->setProperty("IceGrid.Registry.Server.AdapterId", ""); properties->setProperty("IceGrid.Registry.Admin.AdapterId", ""); - properties->setProperty("IceGrid.Registry.Internal.AdapterId", "IceGrid.Registry.Internal"); + properties->setProperty("IceGrid.Registry.Internal.AdapterId", ""); - // - // Setup thread pool size for each thread pool. - // - int nThreadPool = 0; - const string clientThreadPool("IceGrid.Registry.Client.ThreadPool"); - if(properties->getPropertyAsInt(clientThreadPool + ".Size") == 0 && - properties->getPropertyAsInt(clientThreadPool + ".SizeMax") == 0) - { - ++nThreadPool; - } - const string serverThreadPool("IceGrid.Registry.Server.ThreadPool"); - if(properties->getPropertyAsInt(serverThreadPool + ".Size") == 0 && - properties->getPropertyAsInt(serverThreadPool + ".SizeMax") == 0) - { - ++nThreadPool; - } - const string adminThreadPool("IceGrid.Registry.Admin.ThreadPool"); - if(properties->getPropertyAsInt(adminThreadPool + ".Size") == 0 && - properties->getPropertyAsInt(adminThreadPool + ".SizeMax") == 0) - { - ++nThreadPool; - } - - int size = properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10); - if(size < nThreadPool) - { - size = nThreadPool; - } - int sizeMax = properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.SizeMax", size * 10); - if(sizeMax < size) - { - sizeMax = size; - } - int sizeWarn = properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.SizeWarn", sizeMax * 80 / 100); - - if(properties->getPropertyAsInt(clientThreadPool + ".Size") == 0 && - properties->getPropertyAsInt(clientThreadPool + ".SizeMax") == 0) - { - properties->setProperty(clientThreadPool + ".Size", IceGrid::intToString(size / nThreadPool)); - properties->setProperty(clientThreadPool + ".SizeMax", IceGrid::intToString(sizeMax / nThreadPool)); - properties->setProperty(clientThreadPool + ".SizeWarn", IceGrid::intToString(sizeWarn / nThreadPool)); - } - if(properties->getPropertyAsInt(serverThreadPool + ".Size") == 0 && - properties->getPropertyAsInt(serverThreadPool + ".SizeMax") == 0) - { - properties->setProperty(serverThreadPool + ".Size", IceGrid::intToString(size / nThreadPool)); - properties->setProperty(serverThreadPool + ".SizeMax", IceGrid::intToString(sizeMax / nThreadPool)); - properties->setProperty(serverThreadPool + ".SizeWarn", IceGrid::intToString(sizeWarn / nThreadPool)); - } - if(properties->getPropertyAsInt(adminThreadPool + ".Size") == 0 && - properties->getPropertyAsInt(adminThreadPool + ".SizeMax") == 0) - { - properties->setProperty(adminThreadPool + ".Size", IceGrid::intToString(size / nThreadPool)); - properties->setProperty(adminThreadPool + ".SizeMax", IceGrid::intToString(sizeMax / nThreadPool)); - properties->setProperty(adminThreadPool + ".SizeWarn", IceGrid::intToString(sizeWarn / nThreadPool)); - } - - int clientSize = properties->getPropertyAsInt(clientThreadPool + ".Size") * 2; - int serverSize = properties->getPropertyAsInt(serverThreadPool + ".Size") * 2; - const string internalThreadPool("IceGrid.Registry.Internal.ThreadPool"); - properties->setProperty(internalThreadPool + ".Size", IceGrid::intToString(clientSize + serverSize)); - - int clientSizeMax = properties->getPropertyAsInt(clientThreadPool + ".SizeMax") * 2; - if(clientSizeMax < clientSize) - { - clientSizeMax = clientSize; - } - int serverSizeMax = properties->getPropertyAsInt(serverThreadPool + ".SizeMax") * 2; - if(serverSizeMax < serverSize) - { - serverSizeMax = serverSize; - } - properties->setProperty(internalThreadPool + ".SizeMax", IceGrid::intToString(clientSizeMax + serverSizeMax)); - - int clientSizeWarn = - properties->getPropertyAsIntWithDefault(clientThreadPool + ".SizeWarn", clientSizeMax * 80 / 100) * 2; - int serverSizeWarn = - properties->getPropertyAsIntWithDefault(serverThreadPool + ".SizeWarn", serverSizeMax * 80 / 100) * 2; - properties->setProperty(internalThreadPool + ".SizeWarn", IceGrid::intToString(clientSizeWarn + serverSizeWarn)); + setupThreadPool(properties, "IceGrid.Registry.Client.ThreadPool", 1, 10); + setupThreadPool(properties, "IceGrid.Registry.Server.ThreadPool", 1, 10); + setupThreadPool(properties, "IceGrid.Registry.Admin.ThreadPool", 1, 10); + setupThreadPool(properties, "IceGrid.Registry.Internal.ThreadPool", 1, 100); TraceLevelsPtr traceLevels = new TraceLevels(properties, _communicator->getLogger(), false); @@ -259,138 +166,69 @@ RegistryI::start(bool nowarn) bool dynamicReg = properties->getPropertyAsInt("IceGrid.Registry.DynamicRegistration") > 0; ObjectPtr locatorRegistry = new LocatorRegistryI(_database, dynamicReg); ObjectPrx obj = serverAdapter->add(locatorRegistry, stringToIdentity(instanceName + "/"+ IceUtil::generateUUID())); - LocatorRegistryPrx locatorRegistryPrx = LocatorRegistryPrx::uncheckedCast(obj->ice_collocationOptimization(false)); + LocatorRegistryPrx locatorRegistryPrx = LocatorRegistryPrx::uncheckedCast(obj); ObjectPtr locator = new LocatorI(_communicator, _database, locatorRegistryPrx); Identity locatorId = stringToIdentity(instanceName + "/Locator"); clientAdapter->add(locator, locatorId); - // - // Create the query interface and register it with the object registry. - // - QueryPtr query = new QueryI(_communicator, _database); - Identity queryId = stringToIdentity(instanceName + "/Query"); - clientAdapter->add(query, queryId); - ObjectPrx queryPrx = clientAdapter->createDirectProxy(queryId); - try - { - _database->removeObject(queryPrx->ice_getIdentity()); - } - catch(const IceGrid::ObjectNotRegisteredException&) - { - } - ObjectInfo info; - info.proxy = queryPrx; - info.type = "::IceGrid::Query"; - _database->addObject(info); - - // - // Create the admin interface and register it with the object registry. - // - ObjectPtr admin = new AdminI(_database, this, traceLevels); - Identity adminId = stringToIdentity(instanceName + "/Admin"); - adminAdapter->add(admin, adminId); - ObjectPrx adminPrx = adminAdapter->createDirectProxy(adminId); - try - { - _database->removeObject(adminPrx->ice_getIdentity()); - } - catch(const IceGrid::ObjectNotRegisteredException&) - { - } - info.proxy = adminPrx; - info.type = "::IceGrid::Admin"; - _database->addObject(info); - // - // Set the IceGrid.Registry.Internal adapter direct proxy directly in the database. - // - registryAdapter->add(this, stringToIdentity(instanceName + "/Registry")); + Ice::Identity registryId = stringToIdentity(instanceName + "/Registry"); + registryAdapter->add(this, registryId); registryAdapter->activate(); - Ice::Identity dummy = Ice::stringToIdentity("dummy"); - _database->setAdapterDirectProxy("IceGrid.Registry.Internal", "", registryAdapter->createDirectProxy(dummy)); - + // // Setup a internal locator to be used by the IceGrid registry itself. This locator is - // registered with the registry object adapter which is using an independant threadpool. + // registered with the registry object adapter which is using its own threadpool. // - locator = new LocatorI(_communicator, _database, locatorRegistryPrx); - registryAdapter->add(locator, locatorId); - obj = registryAdapter->createDirectProxy(locatorId); - _communicator->setDefaultLocator(LocatorPrx::uncheckedCast(obj->ice_collocationOptimization(false))); +// obj = registryAdapter->addWithUUID(new LocatorI(_communicator, _database, locatorRegistryPrx)); +// _communicator->setDefaultLocator(LocatorPrx::uncheckedCast(obj->ice_collocationOptimization(false))); // - // Create the internal IceStorm service. + // Create the internal IceStorm service and the registry and node topics. // - Identity topicMgrId = stringToIdentity(instanceName + "/TopicManager"); - _iceStorm = IceStorm::Service::create(_communicator, registryAdapter, registryAdapter, "IceGrid.Registry", - topicMgrId, "Registry"); + _iceStorm = IceStorm::Service::create(_communicator, + registryAdapter, + registryAdapter, + "IceGrid.Registry", + stringToIdentity(instanceName + "/TopicManager"), + "Registry"); - // - // Create the registry and node observer topic. - // - IceStorm::TopicPrx nodeObserverTopic; - IceStorm::TopicPrx registryObserverTopic; - try - { - registryObserverTopic = _iceStorm->getTopicManager()->create("RegistryObserver"); - } - catch(const IceStorm::TopicExists&) - { - registryObserverTopic = _iceStorm->getTopicManager()->retrieve("RegistryObserver"); - } - try - { - nodeObserverTopic = _iceStorm->getTopicManager()->create("NodeObserver"); - } - catch(const IceStorm::TopicExists&) - { - nodeObserverTopic = _iceStorm->getTopicManager()->retrieve("NodeObserver"); - } + NodeObserverTopic* nodeTopic = new NodeObserverTopic(_iceStorm->getTopicManager()); + _nodeObserver = NodeObserverPrx::uncheckedCast(registryAdapter->addWithUUID(nodeTopic)); + + RegistryObserverTopic* regTopic = new RegistryObserverTopic(_iceStorm->getTopicManager()); + _registryObserver = RegistryObserverPrx::uncheckedCast(registryAdapter->addWithUUID(regTopic)); - obj = nodeObserverTopic->getPublisher()->ice_collocationOptimization(false); - obj = obj->ice_locator(_communicator->getDefaultLocator()); // TODO: Why is this necessary? - NodeObserverTopic* nodeTopic = new NodeObserverTopic(nodeObserverTopic, NodeObserverPrx::uncheckedCast(obj)); - obj = registryAdapter->addWithUUID(nodeTopic)->ice_collocationOptimization(false); - obj = obj->ice_locator(_communicator->getDefaultLocator()); - _nodeObserver = NodeObserverPrx::uncheckedCast(obj); - - obj = registryObserverTopic->getPublisher()->ice_collocationOptimization(false); - obj = obj->ice_locator(_communicator->getDefaultLocator()); // TODO: Why is this necessary? - RegistryObserverTopic* regTopic; - regTopic = new RegistryObserverTopic(registryObserverTopic, RegistryObserverPrx::uncheckedCast(obj), *nodeTopic); - obj = registryAdapter->addWithUUID(regTopic)->ice_collocationOptimization(false); - obj = obj->ice_locator(_communicator->getDefaultLocator()); - _registryObserver = RegistryObserverPrx::uncheckedCast(obj); _database->setObservers(_registryObserver, _nodeObserver); // - // Create the session manager. + // Create the query, admin, session manager interfaces; // + Identity queryId = stringToIdentity(instanceName + "/Query"); + clientAdapter->add(new QueryI(_communicator, _database), queryId); + + Identity adminId = stringToIdentity(instanceName + "/Admin"); + adminAdapter->add(new AdminI(_database, this, traceLevels), adminId); + + Identity sessionManagerId = stringToIdentity(instanceName + "/SessionManager"); ReapThreadPtr reaper = _adminReaper ? _adminReaper : _reaper; ObjectPtr sessionManager = new SessionManagerI(*regTopic, *nodeTopic, _database, reaper, adminSessionTimeout); - Identity sessionManagerId = stringToIdentity(instanceName + "/SessionManager"); adminAdapter->add(sessionManager, sessionManagerId); - ObjectPrx sessionManagerPrx = adminAdapter->createDirectProxy(sessionManagerId); - try - { - _database->removeObject(sessionManagerPrx->ice_getIdentity()); - } - catch(const IceGrid::ObjectNotRegisteredException&) - { - } - info.proxy = sessionManagerPrx; - info.type = "::IceGrid::SessionManager"; - _database->addObject(info); + + // + // Register well known objects with the object registry. + // + addWellKnownObject(registryAdapter->createProxy(registryId), Registry::ice_staticId()); + addWellKnownObject(clientAdapter->createProxy(queryId), Query::ice_staticId()); + addWellKnownObject(adminAdapter->createProxy(adminId), Admin::ice_staticId()); + addWellKnownObject(adminAdapter->createProxy(sessionManagerId), SessionManager::ice_staticId()); // // We are ready to go! // serverAdapter->activate(); clientAdapter->activate(); - if(adminAdapter) - { - adminAdapter->activate(); - } + adminAdapter->activate(); return true; } @@ -420,8 +258,7 @@ NodeSessionPrx RegistryI::registerNode(const std::string& name, const NodePrx& node, const NodeInfo& info, NodeObserverPrx& obs, const Ice::Current& c) { - NodePrx n = - NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout * 1000)->ice_collocationOptimization(false)); + NodePrx n = NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout * 1000)); NodeSessionIPtr session = new NodeSessionI(_database, name, n, info); NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(c.adapter->addWithUUID(session)); _reaper->add(proxy, session); @@ -446,3 +283,47 @@ RegistryI::getTopicManager() { return _iceStorm->getTopicManager(); } + +void +RegistryI::addWellKnownObject(const Ice::ObjectPrx& proxy, const string& type) +{ + assert(_database); + try + { + _database->removeObject(proxy->ice_getIdentity()); + } + catch(const IceGrid::ObjectNotRegisteredException&) + { + } + ObjectInfo info; + info.proxy = proxy; + info.type = type; + _database->addObject(info); +} + +void +RegistryI::setupThreadPool(const Ice::PropertiesPtr& properties, const string& name, int size, int sizeMax) +{ + if(properties->getPropertyAsIntWithDefault(name + ".Size", 0) < size) + { + ostringstream os; + os << size; + properties->setProperty(name + ".Size", os.str()); + } + else + { + size = properties->getPropertyAsInt(name + ".Size"); + } + + if(sizeMax > 0 && properties->getPropertyAsIntWithDefault(name + ".SizeMax", 0) < sizeMax) + { + if(size >= sizeMax) + { + sizeMax = size * 10; + } + + ostringstream os; + os << sizeMax; + properties->setProperty(name + ".SizeMax", os.str()); + } +} diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h index 528a97c1982..38efe4b1a20 100644 --- a/cpp/src/IceGrid/RegistryI.h +++ b/cpp/src/IceGrid/RegistryI.h @@ -41,6 +41,9 @@ public: private: + void addWellKnownObject(const Ice::ObjectPrx&, const std::string&); + void setupThreadPool(const Ice::PropertiesPtr&, const std::string&, int, int = 0); + Ice::CommunicatorPtr _communicator; DatabasePtr _database; ReapThreadPtr _reaper; diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index f85e074122a..3fdff40ea35 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -77,9 +77,25 @@ private: }; -NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) : - _topic(topic), _publisher(publisher), _serial(0) +NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0) { + IceStorm::TopicPrx t; + try + { + t = topicManager->create("NodeObserver"); + } + catch(const IceStorm::TopicExists&) + { + t = topicManager->retrieve("NodeObserver"); + } + + // + // NOTE: collocation optimization needs to be turned on for the + // topic because the subscribe() method is given a fixed proxy + // which can't be marshalled. + // + const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimization(true)); + const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_topic->getPublisher()); } void @@ -229,22 +245,52 @@ NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer) _topic->unsubscribe(observer); } -RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic, - const RegistryObserverPrx& publisher, - NodeObserverTopic& nodeObserver) : - _topic(topic), _publisher(publisher), _nodeObserver(nodeObserver), _serial(0) +RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0) { + IceStorm::TopicPrx t; + try + { + t = topicManager->create("RegistryObserver"); + } + catch(const IceStorm::TopicExists&) + { + t = topicManager->retrieve("RegistryObserver"); + } + + // + // NOTE: collocation optimization needs to be turned on for the + // topic because the subscribe() method is given a fixed proxy + // which can't be marshalled. + // + const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimization(true)); + const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_topic->getPublisher()); } void -RegistryObserverTopic::init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&) +RegistryObserverTopic::init(int serial, + const ApplicationDescriptorSeq& apps, + const AdapterInfoSeq& adpts, + const ObjectInfoSeq& objects, + const Ice::Current&) { Lock sync(*this); _serial = serial; - _applications = apps; - _publisher->init(serial, apps); + for(ApplicationDescriptorSeq::const_iterator p = apps.begin(); p != apps.end(); ++p) + { + _applications.insert(make_pair(p->name, *p)); + } + for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q) + { + _adapters.insert(make_pair(q->id, *q)); + } + for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r) + { + _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r)); + } + + _publisher->init(serial, apps, adpts, objects); } void @@ -254,7 +300,7 @@ RegistryObserverTopic::applicationAdded(int serial, const ApplicationDescriptor& updateSerial(serial); - _applications.push_back(desc); + _applications.insert(make_pair(desc.name, desc)); _publisher->applicationAdded(serial, desc); } @@ -265,14 +311,8 @@ RegistryObserverTopic::applicationRemoved(int serial, const string& name, const Lock sync(*this); updateSerial(serial); - for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p) - { - if(p->name == name) - { - _applications.erase(p); - break; - } - } + + _applications.erase(name); _publisher->applicationRemoved(serial, name); } @@ -285,15 +325,12 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes updateSerial(serial); try { - for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p) + map<string, ApplicationDescriptor>::iterator p = _applications.find(desc.name); + if(p != _applications.end()) { - if(p->name == desc.name) - { - ApplicationHelper helper(*p); - helper.update(desc); - *p = helper.getDescriptor(); - break; - } + ApplicationHelper helper(p->second); + helper.update(desc); + p->second = helper.getDescriptor(); } } catch(const DeploymentException& ex) @@ -320,6 +357,78 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes } void +RegistryObserverTopic::adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _adapters.insert(make_pair(info.id, info)); + + _publisher->adapterAdded(serial, info); +} + +void +RegistryObserverTopic::adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _adapters[info.id] = info; + + _publisher->adapterUpdated(serial, info); +} + +void +RegistryObserverTopic::adapterRemoved(int serial, const string& id, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _adapters.erase(id); + + _publisher->adapterRemoved(serial, id); +} + +void +RegistryObserverTopic::objectAdded(int serial, const ObjectInfo& info, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _objects.insert(make_pair(info.proxy->ice_getIdentity(), info)); + + _publisher->objectAdded(serial, info); +} + +void +RegistryObserverTopic::objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _objects[info.proxy->ice_getIdentity()] = info; + + _publisher->objectUpdated(serial, info); +} + +void +RegistryObserverTopic::objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&) +{ + Lock sync(*this); + + updateSerial(serial); + + _objects.erase(id); + + _publisher->objectRemoved(serial, id); +} + +void RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial) { while(true) @@ -327,13 +436,32 @@ RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial if(serial == -1) { ApplicationDescriptorSeq applications; + AdapterInfoSeq adapters; + ObjectInfoSeq objects; { Lock sync(*this); assert(_serial != -1); serial = _serial; - applications = _applications; + + map<string, ApplicationDescriptor>::const_iterator p; + for(p = _applications.begin(); p != _applications.end(); ++p) + { + applications.push_back(p->second); + } + + map<string, AdapterInfo>::const_iterator q; + for(q = _adapters.begin(); q != _adapters.end(); ++q) + { + adapters.push_back(q->second); + } + + map<Ice::Identity, ObjectInfo>::const_iterator r; + for(r = _objects.begin(); r != _objects.end(); ++r) + { + objects.push_back(r->second); + } } - observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications); + observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications, adapters, objects); return; } diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h index 300000006eb..385ca475be3 100644 --- a/cpp/src/IceGrid/Topics.h +++ b/cpp/src/IceGrid/Topics.h @@ -23,7 +23,7 @@ class NodeObserverTopic : public NodeObserver, public IceUtil::Mutex { public: - NodeObserverTopic(const IceStorm::TopicPrx&, const NodeObserverPrx&); + NodeObserverTopic(const IceStorm::TopicManagerPrx&); virtual void init(const NodeDynamicInfoSeq&, const Ice::Current&); virtual void nodeUp(const NodeDynamicInfo&, const Ice::Current&); @@ -50,13 +50,22 @@ class RegistryObserverTopic : public RegistryObserver, public IceUtil::Monitor<I { public: - RegistryObserverTopic(const IceStorm::TopicPrx&, const RegistryObserverPrx&, NodeObserverTopic&); - virtual void init(int, const ApplicationDescriptorSeq&, const Ice::Current&); + RegistryObserverTopic(const IceStorm::TopicManagerPrx&); + virtual void init(int, const ApplicationDescriptorSeq&, const AdapterInfoSeq&, const ObjectInfoSeq&, + const Ice::Current&); virtual void applicationAdded(int, const ApplicationDescriptor&, const Ice::Current&); virtual void applicationRemoved(int, const std::string&, const Ice::Current&); virtual void applicationUpdated(int, const ApplicationUpdateDescriptor&, const Ice::Current&); + virtual void adapterAdded(int, const AdapterInfo&, const Ice::Current&); + virtual void adapterUpdated(int, const AdapterInfo&, const Ice::Current&); + virtual void adapterRemoved(int, const std::string&, const Ice::Current&); + + virtual void objectAdded(int, const ObjectInfo&, const Ice::Current&); + virtual void objectUpdated(int, const ObjectInfo&, const Ice::Current&); + virtual void objectRemoved(int, const Ice::Identity&, const Ice::Current&); + void subscribe(const RegistryObserverPrx&, int = -1); void unsubscribe(const RegistryObserverPrx&); @@ -66,10 +75,11 @@ private: const IceStorm::TopicPrx _topic; const RegistryObserverPrx _publisher; - NodeObserverTopic& _nodeObserver; int _serial; - ApplicationDescriptorSeq _applications; + std::map<std::string, ApplicationDescriptor> _applications; + std::map<std::string, AdapterInfo> _adapters; + std::map<Ice::Identity, ObjectInfo> _objects; }; typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr; |