summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2006-04-05 12:54:15 +0000
committerBenoit Foucher <benoit@zeroc.com>2006-04-05 12:54:15 +0000
commit872fd9e5dfb743a648c64bf774f6c9a76a45b651 (patch)
treec071c5dc5bd1177c0913e82cb8a3233f0da50ea6 /cpp/src
parentadding timeout test (diff)
downloadice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.bz2
ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.tar.xz
ice-872fd9e5dfb743a648c64bf774f6c9a76a45b651.zip
- Added support for observing adapters and objects.
- Lots of cleanup in the IceGrid registry initilization method.
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/AdminI.cpp20
-rw-r--r--cpp/src/IceGrid/AdminI.h2
-rw-r--r--cpp/src/IceGrid/Database.cpp330
-rw-r--r--cpp/src/IceGrid/IceGridNode.cpp48
-rw-r--r--cpp/src/IceGrid/IceGridRegistry.cpp16
-rw-r--r--cpp/src/IceGrid/LocatorI.cpp4
-rw-r--r--cpp/src/IceGrid/NodeI.cpp2
-rw-r--r--cpp/src/IceGrid/Parser.cpp12
-rw-r--r--cpp/src/IceGrid/RegistryI.cpp293
-rw-r--r--cpp/src/IceGrid/RegistryI.h3
-rw-r--r--cpp/src/IceGrid/Topics.cpp184
-rw-r--r--cpp/src/IceGrid/Topics.h20
12 files changed, 506 insertions, 428 deletions
diff --git a/cpp/src/IceGrid/AdminI.cpp b/cpp/src/IceGrid/AdminI.cpp
index 30319133b76..25f76afaf03 100644
--- a/cpp/src/IceGrid/AdminI.cpp
+++ b/cpp/src/IceGrid/AdminI.cpp
@@ -574,32 +574,32 @@ AdminI::isServerEnabled(const ::std::string& id, const Ice::Current&) const
}
}
-StringObjectProxyDict
-AdminI::getAdapterEndpoints(const string& adapterId, const Current&) const
+AdapterInfoSeq
+AdminI::getAdapterInfo(const string& id, const Current&) const
{
int count;
- vector<pair<string, AdapterPrx> > adapters = _database->getAdapters(adapterId, true, count);
- StringObjectProxyDict adpts;
+ vector<pair<string, AdapterPrx> > adapters = _database->getAdapters(id, true, count);
+ AdapterInfoSeq adpts;
for(vector<pair<string, AdapterPrx> >::const_iterator p = adapters.begin(); p != adapters.end(); ++p)
{
+ AdapterInfo info;
+ info.id = p->first;
+ info.replicaGroupId = p->first != id ? id : "";
if(p->second)
{
try
{
- adpts[p->first] = p->second->getDirectProxy();
+ info.proxy = p->second->getDirectProxy();
}
catch(const Ice::ObjectNotExistException&)
{
+ continue;
}
catch(const Ice::Exception&)
{
- adpts[p->first] = 0;
}
}
- else
- {
- adpts[p->first] = 0;
- }
+ adpts.push_back(info);
}
return adpts;
}
diff --git a/cpp/src/IceGrid/AdminI.h b/cpp/src/IceGrid/AdminI.h
index 8c0b8e0d57f..095476f3cc0 100644
--- a/cpp/src/IceGrid/AdminI.h
+++ b/cpp/src/IceGrid/AdminI.h
@@ -52,7 +52,7 @@ public:
virtual void enableServer(const ::std::string&, bool, const Ice::Current&);
virtual bool isServerEnabled(const ::std::string&, const Ice::Current&) const;
- virtual StringObjectProxyDict getAdapterEndpoints(const ::std::string&, const ::Ice::Current&) const;
+ virtual AdapterInfoSeq getAdapterInfo(const ::std::string&, const ::Ice::Current&) const;
virtual void removeAdapter(const std::string&, const Ice::Current&);
virtual Ice::StringSeq getAllAdapterIds(const ::Ice::Current&) const;
diff --git a/cpp/src/IceGrid/Database.cpp b/cpp/src/IceGrid/Database.cpp
index 8a62f3c0ef0..65f963b25f8 100644
--- a/cpp/src/IceGrid/Database.cpp
+++ b/cpp/src/IceGrid/Database.cpp
@@ -182,6 +182,8 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb
{
int serial;
ApplicationDescriptorSeq applications;
+ AdapterInfoSeq adapters;
+ ObjectInfoSeq objects;
{
Lock sync(*this);
_registryObserver = registryObserver;
@@ -192,12 +194,26 @@ Database::setObservers(const RegistryObserverPrx& registryObserver, const NodeOb
{
applications.push_back(p->second);
}
+
+ for(StringAdapterInfoDict::const_iterator p = _adapters.begin(); p != _adapters.end(); ++p)
+ {
+ adapters.push_back(p->second);
+ if(adapters.back().id.empty())
+ {
+ adapters.back().id = p->first;
+ }
+ }
+
+ for(IdentityObjectInfoDict::const_iterator p = _objects.begin(); p != _objects.end(); ++p)
+ {
+ objects.push_back(p->second);
+ }
}
//
// Notify the observers.
//
- _registryObserver->init(serial, applications);
+ _registryObserver->init(serial, applications, adapters, objects);
}
void
@@ -563,9 +579,9 @@ Database::getAllNodeServers(const string& node)
bool
Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGroupId, const Ice::ObjectPrx& proxy)
{
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- StringAdapterInfoDict adapters(connection, _adapterDbName);
- if(proxy)
+ AdapterInfo info;
+ int serial;
+ bool updated = false;
{
Lock sync(*this);
if(_adapterCache.has(adapterId))
@@ -573,65 +589,64 @@ Database::setAdapterDirectProxy(const string& adapterId, const string& replicaGr
return false;
}
- StringAdapterInfoDict::iterator p = adapters.find(adapterId);
- if(p != adapters.end())
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ if(proxy)
{
- AdapterInfo info = p->second;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- p.set(info);
- if(_traceLevels->adapter > 0)
+ if(p != _adapters.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "updated adapter `" << adapterId << "'";
- if(!replicaGroupId.empty())
- {
- out << " with replica group `" << replicaGroupId << "'";
- }
+ info = p->second;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ p.set(info);
+ updated = true;
}
+ else
+ {
+ info.id = adapterId;
+ info.proxy = proxy;
+ info.replicaGroupId = replicaGroupId;
+ _adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
+ }
}
else
{
- AdapterInfo info;
- info.proxy = proxy;
- info.replicaGroupId = replicaGroupId;
- adapters.put(StringAdapterInfoDict::value_type(adapterId, info));
- if(_traceLevels->adapter > 0)
+ if(p == _adapters.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "added adapter `" << adapterId << "'";
- if(!replicaGroupId.empty())
- {
- out << " with replica group `" << replicaGroupId << "'";
- }
+ return true;
}
- }
- return true;
+ _adapters.erase(p);
+ }
+
+ serial = ++_serial;
}
- else
+
+
+ if(_traceLevels->adapter > 0)
{
- Lock sync(*this);
- if(_adapterCache.has(adapterId))
+ Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
+ out << (proxy ? (updated ? "updated" : "added") : "removed") << " adapter `" << adapterId << "'";
+ if(!replicaGroupId.empty())
{
- return false;
- }
+ out << " with replica group `" << replicaGroupId << "'";
+ }
+ }
- StringAdapterInfoDict::iterator p = adapters.find(adapterId);
- if(p == adapters.end())
+ if(proxy)
+ {
+ if(updated)
{
- return true;
+ _registryObserver->adapterUpdated(serial, info);
}
- AdapterInfo info = p->second;
- adapters.erase(p);
-
- if(_traceLevels->adapter > 0)
+ else
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed adapter `" << adapterId << "'";
+ _registryObserver->adapterAdded(serial, info);
}
-
- return true;
}
+ else
+ {
+ _registryObserver->adapterRemoved(serial, adapterId);
+ }
+ return true;
}
Ice::ObjectPrx
@@ -650,62 +665,73 @@ Database::getAdapterDirectProxy(const string& adapterId)
void
Database::removeAdapter(const string& adapterId)
{
- try
- {
- AdapterEntryPtr adpt = _adapterCache.get(adapterId);
- DeploymentException ex;
- ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
- ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
- throw ex;
- }
- catch(const AdapterNotExistException&)
+ AdapterInfoSeq infos;
+ int serial;
{
- }
+ Lock sync(*this);
+ if(_adapterCache.has(adapterId))
+ {
+ AdapterEntryPtr adpt = _adapterCache.get(adapterId);
+ DeploymentException ex;
+ ex.reason = "removing adapter `" + adapterId + "' is not allowed:\n";
+ ex.reason += "the adapter was added with the application descriptor `" + adpt->getApplication() + "'";
+ throw ex;
+ }
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
+ Freeze::TransactionHolder txHolder(_connection);
- {
- StringAdapterInfoDict adapters(connection, _adapterDbName);
- StringAdapterInfoDict::iterator p = adapters.find(adapterId);
- if(p != adapters.end())
+ StringAdapterInfoDict::iterator p = _adapters.find(adapterId);
+ if(p != _adapters.end())
{
- AdapterInfo info = p->second;
- adapters.erase(p);
-
- if(_traceLevels->adapter > 0)
+ _adapters.erase(p);
+ }
+ else
+ {
+ p = _adapters.findByReplicaGroupId(adapterId, true);
+ if(p == _adapters.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed adapter `" << adapterId << "'";
+ throw AdapterNotExistException(adapterId);
+ }
+
+ while(p != _adapters.end())
+ {
+ AdapterInfo info = p->second;
+ info.replicaGroupId = "";
+ infos.push_back(info);
+ _adapters.put(StringAdapterInfoDict::value_type(p->first, info));
+ ++p;
}
- return;
}
- }
- {
- Freeze::TransactionHolder txHolder(connection);
- StringAdapterInfoDict adapters(connection, _adapterDbName);
+ txHolder.commit();
- StringAdapterInfoDict::iterator p = adapters.findByReplicaGroupId(adapterId, true);
- if(p == adapters.end())
+ if(infos.empty())
{
- throw AdapterNotExistException(adapterId);
+ serial = ++_serial;
}
-
- while(p != adapters.end())
+ else
{
- AdapterInfo info = p->second;
- info.replicaGroupId = "";
- adapters.put(StringAdapterInfoDict::value_type(p->first, info));
- ++p;
+ serial = _serial;
+ _serial += infos.size();
}
-
- txHolder.commit();
}
if(_traceLevels->adapter > 0)
{
Ice::Trace out(_traceLevels->logger, _traceLevels->adapterCat);
- out << "removed replica group `" << adapterId << "'";
+ out << "removed " << (infos.empty() ? "adapter" : "replica group") << " `" << adapterId << "'";
+ }
+
+ if(infos.empty())
+ {
+ _registryObserver->adapterRemoved(serial, adapterId);
+ }
+ else
+ {
+ for(AdapterInfoSeq::const_iterator p = infos.begin(); p != infos.end(); ++p)
+ {
+ _registryObserver->adapterUpdated(++serial, *p);
+ }
}
}
@@ -744,7 +770,7 @@ Database::getAdapters(const string& id, bool allRegistered, int& endpointCount)
Ice::Identity identity;
identity.category = "IceGridAdapter";
identity.name = id;
- Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity)->ice_collocationOptimization(false);
+ Ice::ObjectPrx adpt = _internalAdapter->createDirectProxy(identity);
adpts.push_back(make_pair(id, AdapterPrx::uncheckedCast(adpt)));
endpointCount = 1;
return adpts;
@@ -809,21 +835,28 @@ Database::getAllAdapters(const string& expression)
void
Database::addObject(const ObjectInfo& info)
{
- Lock sync(*this);
-
+ int serial;
const Ice::Identity id = info.proxy->ice_getIdentity();
- if(_objectCache.has(id))
{
- throw ObjectExistsException(id);
- }
-
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- if(objects.find(id) != objects.end())
- {
- throw ObjectExistsException(id);
+ Lock sync(*this);
+ if(_objectCache.has(id))
+ {
+ throw ObjectExistsException(id);
+ }
+
+ if(_objects.find(id) != _objects.end())
+ {
+ throw ObjectExistsException(id);
+ }
+ _objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ serial = ++_serial;
}
- objects.put(IdentityObjectInfoDict::value_type(id, info));
+
+ //
+ // Notify the observers.
+ //
+ _registryObserver->objectAdded(serial, info);
if(_traceLevels->object > 0)
{
@@ -835,33 +868,40 @@ Database::addObject(const ObjectInfo& info)
void
Database::removeObject(const Ice::Identity& id)
{
- try
- {
- ObjectEntryPtr obj = _objectCache.get(id);
- DeploymentException ex;
- ex.reason = "removing object `" + Ice::identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `" + obj->getApplication() + "'";
- throw ex;
- }
- catch(const ObjectNotRegisteredException&)
+ int serial;
{
- }
+ Lock sync(*this);
+ if(_objectCache.has(id))
+ {
+ DeploymentException ex;
+ ex.reason = "removing object `" + Ice::identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- if(objects.find(id) == objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
- }
- if(objects.erase(id) > 0)
- {
- if(_traceLevels->object > 0)
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
{
- Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
- out << "removed object `" << Ice::identityToString(id) << "'";
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
}
+ _objects.erase(p);
+
+ serial = ++_serial;
+ }
+
+ //
+ // Notify the observers.
+ //
+ _registryObserver->objectRemoved(serial, id);
+
+ if(_traceLevels->object > 0)
+ {
+ Ice::Trace out(_traceLevels->logger, _traceLevels->objectCat);
+ out << "removed object `" << Ice::identityToString(id) << "'";
}
}
@@ -869,31 +909,39 @@ void
Database::updateObject(const Ice::ObjectPrx& proxy)
{
const Ice::Identity id = proxy->ice_getIdentity();
- try
- {
- ObjectEntryPtr obj = _objectCache.get(id);
- DeploymentException ex;
- ex.reason = "updating object `" + Ice::identityToString(id) + "' is not allowed:\n";
- ex.reason += "the object was added with the application descriptor `" + obj->getApplication() + "'";
- throw ex;
- }
- catch(const ObjectNotRegisteredException&)
+ int serial;
+ ObjectInfo info;
{
- }
+ Lock sync(*this);
+ if(_objectCache.has(id))
+ {
+ DeploymentException ex;
+ ex.reason = "updating object `" + Ice::identityToString(id) + "' is not allowed:\n";
+ ex.reason += "the object was added with the application descriptor `";
+ ex.reason += _objectCache.get(id)->getApplication();
+ ex.reason += "'";
+ throw ex;
+ }
- Freeze::ConnectionPtr connection = Freeze::createConnection(_communicator, _envName);
- IdentityObjectInfoDict objects(connection, _objectDbName);
- IdentityObjectInfoDict::iterator p = objects.find(id);
- if(p == objects.end())
- {
- ObjectNotRegisteredException ex;
- ex.id = id;
- throw ex;
+ IdentityObjectInfoDict::iterator p = _objects.find(id);
+ if(p == _objects.end())
+ {
+ ObjectNotRegisteredException ex;
+ ex.id = id;
+ throw ex;
+ }
+
+ info = p->second;
+ info.proxy = proxy;
+ p.set(info);
+
+ serial = ++_serial;
}
- ObjectInfo info = p->second;
- info.proxy = proxy;
- p.set(info);
+ //
+ // Notify the observers.
+ //
+ _registryObserver->objectUpdated(serial, info);
if(_traceLevels->object > 0)
{
diff --git a/cpp/src/IceGrid/IceGridNode.cpp b/cpp/src/IceGrid/IceGridNode.cpp
index 9d84cc3899f..72aa45694f0 100644
--- a/cpp/src/IceGrid/IceGridNode.cpp
+++ b/cpp/src/IceGrid/IceGridNode.cpp
@@ -246,17 +246,33 @@ NodeService::start(int argc, char* argv[])
// termination listener instead?
//
properties->setProperty("Ice.ServerIdleTime", "0");
- if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10) <= 10)
+
+ //
+ // Warn the user that setting Ice.ThreadPool.Server isn't useful.
+ //
+ if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0) > 0)
{
- properties->setProperty("Ice.ThreadPool.Server.Size", "10");
+ Warning out(communicator()->getLogger());
+ out << "setting `Ice.ThreadPool.Server.Size' is not useful,\n";
+ out << "you should set individual adapter thread pools instead.";
}
- if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.SizeWarn", 80) <= 80)
+
+ int size = properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.Size", 0);
+ if(size <= 0)
{
- properties->setProperty("Ice.ThreadPool.Server.SizeWarn", "80");
+ properties->setProperty("IceGrid.Node.ThreadPool.Size", "1");
}
- if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.SizeMax", 100) <= 100)
+ int sizeMax = properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.SizeMax", 0);
+ if(sizeMax <= 0)
{
- properties->setProperty("Ice.ThreadPool.Server.SizeMax", "100");
+ if(size >= sizeMax)
+ {
+ sizeMax = size * 10;
+ }
+
+ ostringstream os;
+ os << sizeMax;
+ properties->setProperty("IceGrid.Node.ThreadPool.SizeMax", os.str());
}
@@ -271,22 +287,6 @@ NodeService::start(int argc, char* argv[])
//
if(properties->getPropertyAsInt("IceGrid.Node.CollocateRegistry") > 0)
{
- //
- // The node needs a different thread pool.
- //
- if(properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.Size", 5) <= 0)
- {
- properties->setProperty("IceGrid.Node.ThreadPool.Size", "5");
- }
- if(properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.SizeWarn", 80) <= 80)
- {
- properties->setProperty("IceGrid.Node.ThreadPool.SizeWarn", "80");
- }
- if(properties->getPropertyAsIntWithDefault("IceGrid.Node.ThreadPool.SizeMax", 100) <= 100)
- {
- properties->setProperty("IceGrid.Node.ThreadPool.SizeMax", "100");
- }
-
_registry = new CollocatedRegistry(communicator(), _activator);
if(!_registry->start(nowarn))
{
@@ -586,9 +586,11 @@ NodeService::initializeCommunicator(int& argc, char* argv[])
PropertiesPtr defaultProperties = getDefaultProperties(argc, argv);
//
- // Make sure that IceGridNode doesn't use thread-per-connection.
+ // Make sure that IceGridNode doesn't use thread-per-connection or
+ // collocation optimization
//
defaultProperties->setProperty("Ice.ThreadPerConnection", "");
+ defaultProperties->setProperty("Ice.Default.CollocationOptimization", "0");
return Service::initializeCommunicator(argc, argv);
}
diff --git a/cpp/src/IceGrid/IceGridRegistry.cpp b/cpp/src/IceGrid/IceGridRegistry.cpp
index 80ec6a4180b..a44c2faac6b 100644
--- a/cpp/src/IceGrid/IceGridRegistry.cpp
+++ b/cpp/src/IceGrid/IceGridRegistry.cpp
@@ -84,12 +84,16 @@ RegistryService::start(int argc, char* argv[])
return false;
}
- PropertiesPtr properties = communicator()->getProperties();
- if(properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 5) <= 5)
+ //
+ // Warn the user that setting Ice.ThreadPool.Server isn't useful.
+ //
+ if(communicator()->getProperties()->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 0) > 0)
{
- properties->setProperty("Ice.ThreadPool.Server.Size", "5");
+ Warning out(communicator()->getLogger());
+ out << "setting `Ice.ThreadPool.Server.Size' is not useful,\n";
+ out << "you should set individual adapter thread pools instead.";
}
-
+
_registry = new RegistryI(communicator());
if(!_registry->start(nowarn))
{
@@ -112,9 +116,11 @@ RegistryService::initializeCommunicator(int& argc, char* argv[])
PropertiesPtr defaultProperties = getDefaultProperties(argc, argv);
//
- // Make sure that IceGridRegistry doesn't use thread-per-connection.
+ // Make sure that IceGridRegistry doesn't use
+ // thread-per-connection or collocation optimization.
//
defaultProperties->setProperty("Ice.ThreadPerConnection", "");
+ defaultProperties->setProperty("Ice.Default.CollocationOptimization", "0");
return Service::initializeCommunicator(argc, argv);
}
diff --git a/cpp/src/IceGrid/LocatorI.cpp b/cpp/src/IceGrid/LocatorI.cpp
index 0284e0a95e7..84f7c002803 100644
--- a/cpp/src/IceGrid/LocatorI.cpp
+++ b/cpp/src/IceGrid/LocatorI.cpp
@@ -286,7 +286,7 @@ LocatorI::LocatorI(const Ice::CommunicatorPtr& communicator,
const Ice::LocatorRegistryPrx& locatorRegistry) :
_communicator(communicator),
_database(database),
- _locatorRegistry(locatorRegistry)
+ _locatorRegistry(Ice::LocatorRegistryPrx::uncheckedCast(locatorRegistry->ice_collocationOptimization(false)))
{
}
@@ -396,7 +396,7 @@ LocatorI::getDirectProxyException(const AdapterPrx& adapter, const string& id, c
return;
}
}
- catch(const Ice::Exception&)
+ catch(const Ice::Exception& ex)
{
//
// TODO: Add a warning!!!
diff --git a/cpp/src/IceGrid/NodeI.cpp b/cpp/src/IceGrid/NodeI.cpp
index 1c612f32ed1..fe1b4122de4 100644
--- a/cpp/src/IceGrid/NodeI.cpp
+++ b/cpp/src/IceGrid/NodeI.cpp
@@ -550,7 +550,7 @@ NodeI::keepAlive()
{
try
{
- Ice::ObjectPrx obj = getCommunicator()->stringToProxy(_instName + "/Registry@IceGrid.Registry.Internal");
+ Ice::ObjectPrx obj = getCommunicator()->stringToProxy(_instName + "/Registry");
RegistryPrx registry = RegistryPrx::uncheckedCast(obj);
NodeObserverPrx observer;
setSession(registry->registerNode(_name, _proxy, _platform.getNodeInfo(), observer), observer);
diff --git a/cpp/src/IceGrid/Parser.cpp b/cpp/src/IceGrid/Parser.cpp
index 0224e3659a5..faeaa4fb9da 100644
--- a/cpp/src/IceGrid/Parser.cpp
+++ b/cpp/src/IceGrid/Parser.cpp
@@ -939,18 +939,18 @@ Parser::endpointsAdapter(const list<string>& args)
try
{
string adapterId = args.front();
- StringObjectProxyDict proxies = _admin->getAdapterEndpoints(adapterId);
- if(proxies.size() == 1 && proxies.begin()->first == adapterId)
+ AdapterInfoSeq adpts = _admin->getAdapterInfo(adapterId);
+ if(adpts.size() == 1 && adpts.begin()->id == adapterId)
{
- string endpoints = _communicator->proxyToString(proxies.begin()->second);
+ string endpoints = _communicator->proxyToString(adpts.begin()->proxy);
cout << (endpoints.empty() ? "<inactive>" : endpoints) << endl;
}
else
{
- for(StringObjectProxyDict::const_iterator p = proxies.begin(); p != proxies.end(); ++p)
+ for(AdapterInfoSeq::const_iterator p = adpts.begin(); p != adpts.end(); ++p)
{
- cout << (p->first.empty() ? "<empty>" : p->first) << ": ";
- string endpoints = _communicator->proxyToString(p->second);
+ cout << (p->id.empty() ? "<empty>" : p->id) << ": ";
+ string endpoints = _communicator->proxyToString(p->proxy);
cout << (endpoints.empty() ? "<inactive>" : endpoints) << endl;
}
}
diff --git a/cpp/src/IceGrid/RegistryI.cpp b/cpp/src/IceGrid/RegistryI.cpp
index 57b0efd85ca..d30c09e1af0 100644
--- a/cpp/src/IceGrid/RegistryI.cpp
+++ b/cpp/src/IceGrid/RegistryI.cpp
@@ -39,25 +39,7 @@ using namespace std;
using namespace Ice;
using namespace IceGrid;
-namespace IceStorm
-{
-};
-
-namespace IceGrid
-{
-
-string
-intToString(int v)
-{
- ostringstream os;
- os << v;
- return os.str();
-}
-
-};
-
-RegistryI::RegistryI(const CommunicatorPtr& communicator) :
- _communicator(communicator)
+RegistryI::RegistryI(const CommunicatorPtr& communicator) : _communicator(communicator)
{
}
@@ -132,87 +114,12 @@ RegistryI::start(bool nowarn)
properties->setProperty("IceGrid.Registry.Client.AdapterId", "");
properties->setProperty("IceGrid.Registry.Server.AdapterId", "");
properties->setProperty("IceGrid.Registry.Admin.AdapterId", "");
- properties->setProperty("IceGrid.Registry.Internal.AdapterId", "IceGrid.Registry.Internal");
+ properties->setProperty("IceGrid.Registry.Internal.AdapterId", "");
- //
- // Setup thread pool size for each thread pool.
- //
- int nThreadPool = 0;
- const string clientThreadPool("IceGrid.Registry.Client.ThreadPool");
- if(properties->getPropertyAsInt(clientThreadPool + ".Size") == 0 &&
- properties->getPropertyAsInt(clientThreadPool + ".SizeMax") == 0)
- {
- ++nThreadPool;
- }
- const string serverThreadPool("IceGrid.Registry.Server.ThreadPool");
- if(properties->getPropertyAsInt(serverThreadPool + ".Size") == 0 &&
- properties->getPropertyAsInt(serverThreadPool + ".SizeMax") == 0)
- {
- ++nThreadPool;
- }
- const string adminThreadPool("IceGrid.Registry.Admin.ThreadPool");
- if(properties->getPropertyAsInt(adminThreadPool + ".Size") == 0 &&
- properties->getPropertyAsInt(adminThreadPool + ".SizeMax") == 0)
- {
- ++nThreadPool;
- }
-
- int size = properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.Size", 10);
- if(size < nThreadPool)
- {
- size = nThreadPool;
- }
- int sizeMax = properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.SizeMax", size * 10);
- if(sizeMax < size)
- {
- sizeMax = size;
- }
- int sizeWarn = properties->getPropertyAsIntWithDefault("Ice.ThreadPool.Server.SizeWarn", sizeMax * 80 / 100);
-
- if(properties->getPropertyAsInt(clientThreadPool + ".Size") == 0 &&
- properties->getPropertyAsInt(clientThreadPool + ".SizeMax") == 0)
- {
- properties->setProperty(clientThreadPool + ".Size", IceGrid::intToString(size / nThreadPool));
- properties->setProperty(clientThreadPool + ".SizeMax", IceGrid::intToString(sizeMax / nThreadPool));
- properties->setProperty(clientThreadPool + ".SizeWarn", IceGrid::intToString(sizeWarn / nThreadPool));
- }
- if(properties->getPropertyAsInt(serverThreadPool + ".Size") == 0 &&
- properties->getPropertyAsInt(serverThreadPool + ".SizeMax") == 0)
- {
- properties->setProperty(serverThreadPool + ".Size", IceGrid::intToString(size / nThreadPool));
- properties->setProperty(serverThreadPool + ".SizeMax", IceGrid::intToString(sizeMax / nThreadPool));
- properties->setProperty(serverThreadPool + ".SizeWarn", IceGrid::intToString(sizeWarn / nThreadPool));
- }
- if(properties->getPropertyAsInt(adminThreadPool + ".Size") == 0 &&
- properties->getPropertyAsInt(adminThreadPool + ".SizeMax") == 0)
- {
- properties->setProperty(adminThreadPool + ".Size", IceGrid::intToString(size / nThreadPool));
- properties->setProperty(adminThreadPool + ".SizeMax", IceGrid::intToString(sizeMax / nThreadPool));
- properties->setProperty(adminThreadPool + ".SizeWarn", IceGrid::intToString(sizeWarn / nThreadPool));
- }
-
- int clientSize = properties->getPropertyAsInt(clientThreadPool + ".Size") * 2;
- int serverSize = properties->getPropertyAsInt(serverThreadPool + ".Size") * 2;
- const string internalThreadPool("IceGrid.Registry.Internal.ThreadPool");
- properties->setProperty(internalThreadPool + ".Size", IceGrid::intToString(clientSize + serverSize));
-
- int clientSizeMax = properties->getPropertyAsInt(clientThreadPool + ".SizeMax") * 2;
- if(clientSizeMax < clientSize)
- {
- clientSizeMax = clientSize;
- }
- int serverSizeMax = properties->getPropertyAsInt(serverThreadPool + ".SizeMax") * 2;
- if(serverSizeMax < serverSize)
- {
- serverSizeMax = serverSize;
- }
- properties->setProperty(internalThreadPool + ".SizeMax", IceGrid::intToString(clientSizeMax + serverSizeMax));
-
- int clientSizeWarn =
- properties->getPropertyAsIntWithDefault(clientThreadPool + ".SizeWarn", clientSizeMax * 80 / 100) * 2;
- int serverSizeWarn =
- properties->getPropertyAsIntWithDefault(serverThreadPool + ".SizeWarn", serverSizeMax * 80 / 100) * 2;
- properties->setProperty(internalThreadPool + ".SizeWarn", IceGrid::intToString(clientSizeWarn + serverSizeWarn));
+ setupThreadPool(properties, "IceGrid.Registry.Client.ThreadPool", 1, 10);
+ setupThreadPool(properties, "IceGrid.Registry.Server.ThreadPool", 1, 10);
+ setupThreadPool(properties, "IceGrid.Registry.Admin.ThreadPool", 1, 10);
+ setupThreadPool(properties, "IceGrid.Registry.Internal.ThreadPool", 1, 100);
TraceLevelsPtr traceLevels = new TraceLevels(properties, _communicator->getLogger(), false);
@@ -259,138 +166,69 @@ RegistryI::start(bool nowarn)
bool dynamicReg = properties->getPropertyAsInt("IceGrid.Registry.DynamicRegistration") > 0;
ObjectPtr locatorRegistry = new LocatorRegistryI(_database, dynamicReg);
ObjectPrx obj = serverAdapter->add(locatorRegistry, stringToIdentity(instanceName + "/"+ IceUtil::generateUUID()));
- LocatorRegistryPrx locatorRegistryPrx = LocatorRegistryPrx::uncheckedCast(obj->ice_collocationOptimization(false));
+ LocatorRegistryPrx locatorRegistryPrx = LocatorRegistryPrx::uncheckedCast(obj);
ObjectPtr locator = new LocatorI(_communicator, _database, locatorRegistryPrx);
Identity locatorId = stringToIdentity(instanceName + "/Locator");
clientAdapter->add(locator, locatorId);
- //
- // Create the query interface and register it with the object registry.
- //
- QueryPtr query = new QueryI(_communicator, _database);
- Identity queryId = stringToIdentity(instanceName + "/Query");
- clientAdapter->add(query, queryId);
- ObjectPrx queryPrx = clientAdapter->createDirectProxy(queryId);
- try
- {
- _database->removeObject(queryPrx->ice_getIdentity());
- }
- catch(const IceGrid::ObjectNotRegisteredException&)
- {
- }
- ObjectInfo info;
- info.proxy = queryPrx;
- info.type = "::IceGrid::Query";
- _database->addObject(info);
-
- //
- // Create the admin interface and register it with the object registry.
- //
- ObjectPtr admin = new AdminI(_database, this, traceLevels);
- Identity adminId = stringToIdentity(instanceName + "/Admin");
- adminAdapter->add(admin, adminId);
- ObjectPrx adminPrx = adminAdapter->createDirectProxy(adminId);
- try
- {
- _database->removeObject(adminPrx->ice_getIdentity());
- }
- catch(const IceGrid::ObjectNotRegisteredException&)
- {
- }
- info.proxy = adminPrx;
- info.type = "::IceGrid::Admin";
- _database->addObject(info);
- //
- // Set the IceGrid.Registry.Internal adapter direct proxy directly in the database.
- //
- registryAdapter->add(this, stringToIdentity(instanceName + "/Registry"));
+ Ice::Identity registryId = stringToIdentity(instanceName + "/Registry");
+ registryAdapter->add(this, registryId);
registryAdapter->activate();
- Ice::Identity dummy = Ice::stringToIdentity("dummy");
- _database->setAdapterDirectProxy("IceGrid.Registry.Internal", "", registryAdapter->createDirectProxy(dummy));
-
+
//
// Setup a internal locator to be used by the IceGrid registry itself. This locator is
- // registered with the registry object adapter which is using an independant threadpool.
+ // registered with the registry object adapter which is using its own threadpool.
//
- locator = new LocatorI(_communicator, _database, locatorRegistryPrx);
- registryAdapter->add(locator, locatorId);
- obj = registryAdapter->createDirectProxy(locatorId);
- _communicator->setDefaultLocator(LocatorPrx::uncheckedCast(obj->ice_collocationOptimization(false)));
+// obj = registryAdapter->addWithUUID(new LocatorI(_communicator, _database, locatorRegistryPrx));
+// _communicator->setDefaultLocator(LocatorPrx::uncheckedCast(obj->ice_collocationOptimization(false)));
//
- // Create the internal IceStorm service.
+ // Create the internal IceStorm service and the registry and node topics.
//
- Identity topicMgrId = stringToIdentity(instanceName + "/TopicManager");
- _iceStorm = IceStorm::Service::create(_communicator, registryAdapter, registryAdapter, "IceGrid.Registry",
- topicMgrId, "Registry");
+ _iceStorm = IceStorm::Service::create(_communicator,
+ registryAdapter,
+ registryAdapter,
+ "IceGrid.Registry",
+ stringToIdentity(instanceName + "/TopicManager"),
+ "Registry");
- //
- // Create the registry and node observer topic.
- //
- IceStorm::TopicPrx nodeObserverTopic;
- IceStorm::TopicPrx registryObserverTopic;
- try
- {
- registryObserverTopic = _iceStorm->getTopicManager()->create("RegistryObserver");
- }
- catch(const IceStorm::TopicExists&)
- {
- registryObserverTopic = _iceStorm->getTopicManager()->retrieve("RegistryObserver");
- }
- try
- {
- nodeObserverTopic = _iceStorm->getTopicManager()->create("NodeObserver");
- }
- catch(const IceStorm::TopicExists&)
- {
- nodeObserverTopic = _iceStorm->getTopicManager()->retrieve("NodeObserver");
- }
+ NodeObserverTopic* nodeTopic = new NodeObserverTopic(_iceStorm->getTopicManager());
+ _nodeObserver = NodeObserverPrx::uncheckedCast(registryAdapter->addWithUUID(nodeTopic));
+
+ RegistryObserverTopic* regTopic = new RegistryObserverTopic(_iceStorm->getTopicManager());
+ _registryObserver = RegistryObserverPrx::uncheckedCast(registryAdapter->addWithUUID(regTopic));
- obj = nodeObserverTopic->getPublisher()->ice_collocationOptimization(false);
- obj = obj->ice_locator(_communicator->getDefaultLocator()); // TODO: Why is this necessary?
- NodeObserverTopic* nodeTopic = new NodeObserverTopic(nodeObserverTopic, NodeObserverPrx::uncheckedCast(obj));
- obj = registryAdapter->addWithUUID(nodeTopic)->ice_collocationOptimization(false);
- obj = obj->ice_locator(_communicator->getDefaultLocator());
- _nodeObserver = NodeObserverPrx::uncheckedCast(obj);
-
- obj = registryObserverTopic->getPublisher()->ice_collocationOptimization(false);
- obj = obj->ice_locator(_communicator->getDefaultLocator()); // TODO: Why is this necessary?
- RegistryObserverTopic* regTopic;
- regTopic = new RegistryObserverTopic(registryObserverTopic, RegistryObserverPrx::uncheckedCast(obj), *nodeTopic);
- obj = registryAdapter->addWithUUID(regTopic)->ice_collocationOptimization(false);
- obj = obj->ice_locator(_communicator->getDefaultLocator());
- _registryObserver = RegistryObserverPrx::uncheckedCast(obj);
_database->setObservers(_registryObserver, _nodeObserver);
//
- // Create the session manager.
+ // Create the query, admin, session manager interfaces;
//
+ Identity queryId = stringToIdentity(instanceName + "/Query");
+ clientAdapter->add(new QueryI(_communicator, _database), queryId);
+
+ Identity adminId = stringToIdentity(instanceName + "/Admin");
+ adminAdapter->add(new AdminI(_database, this, traceLevels), adminId);
+
+ Identity sessionManagerId = stringToIdentity(instanceName + "/SessionManager");
ReapThreadPtr reaper = _adminReaper ? _adminReaper : _reaper;
ObjectPtr sessionManager = new SessionManagerI(*regTopic, *nodeTopic, _database, reaper, adminSessionTimeout);
- Identity sessionManagerId = stringToIdentity(instanceName + "/SessionManager");
adminAdapter->add(sessionManager, sessionManagerId);
- ObjectPrx sessionManagerPrx = adminAdapter->createDirectProxy(sessionManagerId);
- try
- {
- _database->removeObject(sessionManagerPrx->ice_getIdentity());
- }
- catch(const IceGrid::ObjectNotRegisteredException&)
- {
- }
- info.proxy = sessionManagerPrx;
- info.type = "::IceGrid::SessionManager";
- _database->addObject(info);
+
+ //
+ // Register well known objects with the object registry.
+ //
+ addWellKnownObject(registryAdapter->createProxy(registryId), Registry::ice_staticId());
+ addWellKnownObject(clientAdapter->createProxy(queryId), Query::ice_staticId());
+ addWellKnownObject(adminAdapter->createProxy(adminId), Admin::ice_staticId());
+ addWellKnownObject(adminAdapter->createProxy(sessionManagerId), SessionManager::ice_staticId());
//
// We are ready to go!
//
serverAdapter->activate();
clientAdapter->activate();
- if(adminAdapter)
- {
- adminAdapter->activate();
- }
+ adminAdapter->activate();
return true;
}
@@ -420,8 +258,7 @@ NodeSessionPrx
RegistryI::registerNode(const std::string& name, const NodePrx& node, const NodeInfo& info, NodeObserverPrx& obs,
const Ice::Current& c)
{
- NodePrx n =
- NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout * 1000)->ice_collocationOptimization(false));
+ NodePrx n = NodePrx::uncheckedCast(node->ice_timeout(_nodeSessionTimeout * 1000));
NodeSessionIPtr session = new NodeSessionI(_database, name, n, info);
NodeSessionPrx proxy = NodeSessionPrx::uncheckedCast(c.adapter->addWithUUID(session));
_reaper->add(proxy, session);
@@ -446,3 +283,47 @@ RegistryI::getTopicManager()
{
return _iceStorm->getTopicManager();
}
+
+void
+RegistryI::addWellKnownObject(const Ice::ObjectPrx& proxy, const string& type)
+{
+ assert(_database);
+ try
+ {
+ _database->removeObject(proxy->ice_getIdentity());
+ }
+ catch(const IceGrid::ObjectNotRegisteredException&)
+ {
+ }
+ ObjectInfo info;
+ info.proxy = proxy;
+ info.type = type;
+ _database->addObject(info);
+}
+
+void
+RegistryI::setupThreadPool(const Ice::PropertiesPtr& properties, const string& name, int size, int sizeMax)
+{
+ if(properties->getPropertyAsIntWithDefault(name + ".Size", 0) < size)
+ {
+ ostringstream os;
+ os << size;
+ properties->setProperty(name + ".Size", os.str());
+ }
+ else
+ {
+ size = properties->getPropertyAsInt(name + ".Size");
+ }
+
+ if(sizeMax > 0 && properties->getPropertyAsIntWithDefault(name + ".SizeMax", 0) < sizeMax)
+ {
+ if(size >= sizeMax)
+ {
+ sizeMax = size * 10;
+ }
+
+ ostringstream os;
+ os << sizeMax;
+ properties->setProperty(name + ".SizeMax", os.str());
+ }
+}
diff --git a/cpp/src/IceGrid/RegistryI.h b/cpp/src/IceGrid/RegistryI.h
index 528a97c1982..38efe4b1a20 100644
--- a/cpp/src/IceGrid/RegistryI.h
+++ b/cpp/src/IceGrid/RegistryI.h
@@ -41,6 +41,9 @@ public:
private:
+ void addWellKnownObject(const Ice::ObjectPrx&, const std::string&);
+ void setupThreadPool(const Ice::PropertiesPtr&, const std::string&, int, int = 0);
+
Ice::CommunicatorPtr _communicator;
DatabasePtr _database;
ReapThreadPtr _reaper;
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index f85e074122a..3fdff40ea35 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -77,9 +77,25 @@ private:
};
-NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicPrx& topic, const NodeObserverPrx& publisher) :
- _topic(topic), _publisher(publisher), _serial(0)
+NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0)
{
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create("NodeObserver");
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve("NodeObserver");
+ }
+
+ //
+ // NOTE: collocation optimization needs to be turned on for the
+ // topic because the subscribe() method is given a fixed proxy
+ // which can't be marshalled.
+ //
+ const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimization(true));
+ const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_topic->getPublisher());
}
void
@@ -229,22 +245,52 @@ NodeObserverTopic::unsubscribe(const NodeObserverPrx& observer)
_topic->unsubscribe(observer);
}
-RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicPrx& topic,
- const RegistryObserverPrx& publisher,
- NodeObserverTopic& nodeObserver) :
- _topic(topic), _publisher(publisher), _nodeObserver(nodeObserver), _serial(0)
+RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) : _serial(0)
{
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create("RegistryObserver");
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve("RegistryObserver");
+ }
+
+ //
+ // NOTE: collocation optimization needs to be turned on for the
+ // topic because the subscribe() method is given a fixed proxy
+ // which can't be marshalled.
+ //
+ const_cast<IceStorm::TopicPrx&>(_topic) = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimization(true));
+ const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_topic->getPublisher());
}
void
-RegistryObserverTopic::init(int serial, const ApplicationDescriptorSeq& apps, const Ice::Current&)
+RegistryObserverTopic::init(int serial,
+ const ApplicationDescriptorSeq& apps,
+ const AdapterInfoSeq& adpts,
+ const ObjectInfoSeq& objects,
+ const Ice::Current&)
{
Lock sync(*this);
_serial = serial;
- _applications = apps;
- _publisher->init(serial, apps);
+ for(ApplicationDescriptorSeq::const_iterator p = apps.begin(); p != apps.end(); ++p)
+ {
+ _applications.insert(make_pair(p->name, *p));
+ }
+ for(AdapterInfoSeq::const_iterator q = adpts.begin(); q != adpts.end(); ++q)
+ {
+ _adapters.insert(make_pair(q->id, *q));
+ }
+ for(ObjectInfoSeq::const_iterator r = objects.begin(); r != objects.end(); ++r)
+ {
+ _objects.insert(make_pair(r->proxy->ice_getIdentity(), *r));
+ }
+
+ _publisher->init(serial, apps, adpts, objects);
}
void
@@ -254,7 +300,7 @@ RegistryObserverTopic::applicationAdded(int serial, const ApplicationDescriptor&
updateSerial(serial);
- _applications.push_back(desc);
+ _applications.insert(make_pair(desc.name, desc));
_publisher->applicationAdded(serial, desc);
}
@@ -265,14 +311,8 @@ RegistryObserverTopic::applicationRemoved(int serial, const string& name, const
Lock sync(*this);
updateSerial(serial);
- for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p)
- {
- if(p->name == name)
- {
- _applications.erase(p);
- break;
- }
- }
+
+ _applications.erase(name);
_publisher->applicationRemoved(serial, name);
}
@@ -285,15 +325,12 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes
updateSerial(serial);
try
{
- for(ApplicationDescriptorSeq::iterator p = _applications.begin(); p != _applications.end(); ++p)
+ map<string, ApplicationDescriptor>::iterator p = _applications.find(desc.name);
+ if(p != _applications.end())
{
- if(p->name == desc.name)
- {
- ApplicationHelper helper(*p);
- helper.update(desc);
- *p = helper.getDescriptor();
- break;
- }
+ ApplicationHelper helper(p->second);
+ helper.update(desc);
+ p->second = helper.getDescriptor();
}
}
catch(const DeploymentException& ex)
@@ -320,6 +357,78 @@ RegistryObserverTopic::applicationUpdated(int serial, const ApplicationUpdateDes
}
void
+RegistryObserverTopic::adapterAdded(int serial, const AdapterInfo& info, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _adapters.insert(make_pair(info.id, info));
+
+ _publisher->adapterAdded(serial, info);
+}
+
+void
+RegistryObserverTopic::adapterUpdated(int serial, const AdapterInfo& info, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _adapters[info.id] = info;
+
+ _publisher->adapterUpdated(serial, info);
+}
+
+void
+RegistryObserverTopic::adapterRemoved(int serial, const string& id, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _adapters.erase(id);
+
+ _publisher->adapterRemoved(serial, id);
+}
+
+void
+RegistryObserverTopic::objectAdded(int serial, const ObjectInfo& info, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
+
+ _publisher->objectAdded(serial, info);
+}
+
+void
+RegistryObserverTopic::objectUpdated(int serial, const ObjectInfo& info, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _objects[info.proxy->ice_getIdentity()] = info;
+
+ _publisher->objectUpdated(serial, info);
+}
+
+void
+RegistryObserverTopic::objectRemoved(int serial, const Ice::Identity& id, const Ice::Current&)
+{
+ Lock sync(*this);
+
+ updateSerial(serial);
+
+ _objects.erase(id);
+
+ _publisher->objectRemoved(serial, id);
+}
+
+void
RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial)
{
while(true)
@@ -327,13 +436,32 @@ RegistryObserverTopic::subscribe(const RegistryObserverPrx& observer, int serial
if(serial == -1)
{
ApplicationDescriptorSeq applications;
+ AdapterInfoSeq adapters;
+ ObjectInfoSeq objects;
{
Lock sync(*this);
assert(_serial != -1);
serial = _serial;
- applications = _applications;
+
+ map<string, ApplicationDescriptor>::const_iterator p;
+ for(p = _applications.begin(); p != _applications.end(); ++p)
+ {
+ applications.push_back(p->second);
+ }
+
+ map<string, AdapterInfo>::const_iterator q;
+ for(q = _adapters.begin(); q != _adapters.end(); ++q)
+ {
+ adapters.push_back(q->second);
+ }
+
+ map<Ice::Identity, ObjectInfo>::const_iterator r;
+ for(r = _objects.begin(); r != _objects.end(); ++r)
+ {
+ objects.push_back(r->second);
+ }
}
- observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications);
+ observer->init_async(new RegistryInitCB(this, observer, serial), serial, applications, adapters, objects);
return;
}
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h
index 300000006eb..385ca475be3 100644
--- a/cpp/src/IceGrid/Topics.h
+++ b/cpp/src/IceGrid/Topics.h
@@ -23,7 +23,7 @@ class NodeObserverTopic : public NodeObserver, public IceUtil::Mutex
{
public:
- NodeObserverTopic(const IceStorm::TopicPrx&, const NodeObserverPrx&);
+ NodeObserverTopic(const IceStorm::TopicManagerPrx&);
virtual void init(const NodeDynamicInfoSeq&, const Ice::Current&);
virtual void nodeUp(const NodeDynamicInfo&, const Ice::Current&);
@@ -50,13 +50,22 @@ class RegistryObserverTopic : public RegistryObserver, public IceUtil::Monitor<I
{
public:
- RegistryObserverTopic(const IceStorm::TopicPrx&, const RegistryObserverPrx&, NodeObserverTopic&);
- virtual void init(int, const ApplicationDescriptorSeq&, const Ice::Current&);
+ RegistryObserverTopic(const IceStorm::TopicManagerPrx&);
+ virtual void init(int, const ApplicationDescriptorSeq&, const AdapterInfoSeq&, const ObjectInfoSeq&,
+ const Ice::Current&);
virtual void applicationAdded(int, const ApplicationDescriptor&, const Ice::Current&);
virtual void applicationRemoved(int, const std::string&, const Ice::Current&);
virtual void applicationUpdated(int, const ApplicationUpdateDescriptor&, const Ice::Current&);
+ virtual void adapterAdded(int, const AdapterInfo&, const Ice::Current&);
+ virtual void adapterUpdated(int, const AdapterInfo&, const Ice::Current&);
+ virtual void adapterRemoved(int, const std::string&, const Ice::Current&);
+
+ virtual void objectAdded(int, const ObjectInfo&, const Ice::Current&);
+ virtual void objectUpdated(int, const ObjectInfo&, const Ice::Current&);
+ virtual void objectRemoved(int, const Ice::Identity&, const Ice::Current&);
+
void subscribe(const RegistryObserverPrx&, int = -1);
void unsubscribe(const RegistryObserverPrx&);
@@ -66,10 +75,11 @@ private:
const IceStorm::TopicPrx _topic;
const RegistryObserverPrx _publisher;
- NodeObserverTopic& _nodeObserver;
int _serial;
- ApplicationDescriptorSeq _applications;
+ std::map<std::string, ApplicationDescriptor> _applications;
+ std::map<std::string, AdapterInfo> _adapters;
+ std::map<Ice::Identity, ObjectInfo> _objects;
};
typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr;