summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2013-01-17 15:48:26 +0100
committerBenoit Foucher <benoit@zeroc.com>2013-01-17 15:48:26 +0100
commitcade94b03f44c09a65542d58746e111a997477c1 (patch)
tree47c62bd60c8fa62587c271d5e16de711bf973243 /cpp
parentAdded OS X Frameworks (diff)
downloadice-cade94b03f44c09a65542d58746e111a997477c1.tar.bz2
ice-cade94b03f44c09a65542d58746e111a997477c1.tar.xz
ice-cade94b03f44c09a65542d58746e111a997477c1.zip
Fixed ICE-4968 - Support for 1.0 observers with IceGrid
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/IceGrid/AdminSessionI.cpp14
-rw-r--r--cpp/src/IceGrid/AdminSessionI.h2
-rw-r--r--cpp/src/IceGrid/Topics.cpp230
-rw-r--r--cpp/src/IceGrid/Topics.h24
-rw-r--r--cpp/test/IceGrid/replication/AllTests.cpp104
-rw-r--r--cpp/test/IceGrid/replication/application.xml6
-rw-r--r--cpp/test/IceGrid/session/AllTests.cpp27
-rwxr-xr-xcpp/test/IceGrid/session/run.py14
8 files changed, 318 insertions, 103 deletions
diff --git a/cpp/src/IceGrid/AdminSessionI.cpp b/cpp/src/IceGrid/AdminSessionI.cpp
index 8cabcba8fee..f201d595148 100644
--- a/cpp/src/IceGrid/AdminSessionI.cpp
+++ b/cpp/src/IceGrid/AdminSessionI.cpp
@@ -185,11 +185,11 @@ AdminSessionI::setObserversByIdentity(const Ice::Identity& registryObserver,
throw ex;
}
- setupObserverSubscription(RegistryObserverTopicName, toProxy(registryObserver, current.con));
- setupObserverSubscription(NodeObserverTopicName, toProxy(nodeObserver, current.con));
- setupObserverSubscription(ApplicationObserverTopicName, toProxy(appObserver, current.con));
- setupObserverSubscription(AdapterObserverTopicName, toProxy(adapterObserver, current.con));
- setupObserverSubscription(ObjectObserverTopicName, toProxy(objectObserver, current.con));
+ setupObserverSubscription(RegistryObserverTopicName, toProxy(registryObserver, current.con, current.encoding));
+ setupObserverSubscription(NodeObserverTopicName, toProxy(nodeObserver, current.con, current.encoding));
+ setupObserverSubscription(ApplicationObserverTopicName, toProxy(appObserver, current.con, current.encoding));
+ setupObserverSubscription(AdapterObserverTopicName, toProxy(adapterObserver, current.con, current.encoding));
+ setupObserverSubscription(ObjectObserverTopicName, toProxy(objectObserver, current.con, current.encoding));
}
@@ -335,9 +335,9 @@ AdminSessionI::setupObserverSubscription(TopicName name, const Ice::ObjectPrx& o
}
Ice::ObjectPrx
-AdminSessionI::toProxy(const Ice::Identity& id, const Ice::ConnectionPtr& connection)
+AdminSessionI::toProxy(const Ice::Identity& id, const Ice::ConnectionPtr& connection, const Ice::EncodingVersion& v)
{
- return id.name.empty() ? Ice::ObjectPrx() : connection->createProxy(id);
+ return id.name.empty() ? Ice::ObjectPrx() : connection->createProxy(id)->ice_encodingVersion(v);
}
FileIteratorPrx
diff --git a/cpp/src/IceGrid/AdminSessionI.h b/cpp/src/IceGrid/AdminSessionI.h
index ff234953ab3..bc63acce993 100644
--- a/cpp/src/IceGrid/AdminSessionI.h
+++ b/cpp/src/IceGrid/AdminSessionI.h
@@ -66,7 +66,7 @@ public:
private:
void setupObserverSubscription(TopicName, const Ice::ObjectPrx&);
- Ice::ObjectPrx toProxy(const Ice::Identity&, const Ice::ConnectionPtr&);
+ Ice::ObjectPrx toProxy(const Ice::Identity&, const Ice::ConnectionPtr&, const Ice::EncodingVersion&);
FileIteratorPrx addFileIterator(const FileReaderPrx&, const std::string&, int, const Ice::Current&);
virtual void destroyImpl(bool);
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index 62c582858ac..d2b976c7452 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -14,28 +14,48 @@
using namespace std;
using namespace IceGrid;
+namespace
+{
+
+//
+// Encodings supported by the observers. We create one topic per
+// encoding version and subscribe the observer to the appropriate
+// topic depending on its encoding.
+//
+Ice::EncodingVersion encodings[] = {
+ { 1, 0 },
+ { 1, 1 }
+};
+
+}
ObserverTopic::ObserverTopic(const IceStorm::TopicManagerPrx& topicManager, const string& name) :
_logger(topicManager->ice_getCommunicator()->getLogger()),
_serial(0)
{
- IceStorm::TopicPrx t;
- try
- {
- t = topicManager->create(name);
- }
- catch(const IceStorm::TopicExists&)
+ for(int i = 0; i < sizeof(encodings) / sizeof(Ice::EncodingVersion); ++i)
{
- t = topicManager->retrieve(name);
- }
+ ostringstream os;
+ os << name << "-" << Ice::encodingVersionToString(encodings[i]);
+ IceStorm::TopicPrx t;
+ try
+ {
+ t = topicManager->create(os.str());
+ }
+ catch(const IceStorm::TopicExists&)
+ {
+ t = topicManager->retrieve(os.str());
+ }
- //
- // NOTE: collocation optimization needs to be turned on for the
- // topic because the subscribe() method is given a fixed proxy
- // which can't be marshalled.
- //
- _topic = IceStorm::TopicPrx::uncheckedCast(t->ice_collocationOptimized(true));
- _basePublisher = _topic->getPublisher()->ice_collocationOptimized(false);
+ //
+ // NOTE: collocation optimization needs to be turned on for the
+ // topic because the subscribe() method is given a fixed proxy
+ // which can't be marshalled.
+ //
+ _topics[encodings[i]] = t->ice_collocationOptimized(true);
+ _basePublishers.push_back(
+ t->getPublisher()->ice_collocationOptimized(false)->ice_encodingVersion(encodings[i]));
+ }
}
ObserverTopic::~ObserverTopic()
@@ -46,7 +66,7 @@ int
ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -56,7 +76,15 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
{
IceStorm::QoS qos;
qos["reliability"] = "ordered";
- initObserver(_topic->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
+ Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(obsv->ice_getEncodingVersion());
+ map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v);
+ if(p == _topics.end())
+ {
+ Ice::Warning out(_logger);
+ out << "unsupported encoding version for observer `" << obsv << "'";
+ return -1;
+ }
+ initObserver(p->second->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
}
catch(const IceStorm::AlreadySubscribed&)
{
@@ -77,10 +105,13 @@ void
ObserverTopic::unsubscribe(const Ice::ObjectPrx& observer, const string& name)
{
Lock sync(*this);
- if(_topic)
+ Ice::EncodingVersion v = IceInternal::getCompatibleEncoding(observer->ice_getEncodingVersion());
+ map<Ice::EncodingVersion, IceStorm::TopicPrx>::const_iterator p = _topics.find(v);
+ if(p == _topics.end())
{
- _topic->unsubscribe(observer);
+ return;
}
+ p->second->unsubscribe(observer);
assert(observer);
@@ -116,7 +147,7 @@ void
ObserverTopic::destroy()
{
Lock sync(*this);
- _topic = 0;
+ _topics.clear();
notifyAll();
}
@@ -186,7 +217,7 @@ ObserverTopic::waitForSyncedSubscribersNoSync(int serial, const string& name)
//
// Wait until all the updates are received or the service shutdown.
//
- while(_topic)
+ while(!_topics.empty())
{
map<int, set<string> >::const_iterator p = _waitForUpdates.find(serial);
if(p == _waitForUpdates.end())
@@ -239,14 +270,14 @@ ObserverTopic::getContext(int serial) const
RegistryObserverTopic::RegistryObserverTopic(const IceStorm::TopicManagerPrx& topicManager) :
ObserverTopic(topicManager, "RegistryObserver")
{
- const_cast<RegistryObserverPrx&>(_publisher) = RegistryObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<RegistryObserverPrx>();
}
void
RegistryObserverTopic::registryUp(const RegistryInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -254,7 +285,10 @@ RegistryObserverTopic::registryUp(const RegistryInfo& info)
_registries.insert(make_pair(info.name, info));
try
{
- _publisher->registryUp(info);
+ for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->registryUp(info);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -267,7 +301,7 @@ void
RegistryObserverTopic::registryDown(const string& name)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -281,7 +315,10 @@ RegistryObserverTopic::registryDown(const string& name)
_registries.erase(name);
try
{
- _publisher->registryDown(name);
+ for(vector<RegistryObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->registryDown(name);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -307,7 +344,7 @@ NodeObserverTopic::NodeObserverTopic(const IceStorm::TopicManagerPrx& topicManag
const Ice::ObjectAdapterPtr& adapter) :
ObserverTopic(topicManager, "NodeObserver")
{
- const_cast<NodeObserverPrx&>(_publisher) = NodeObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<NodeObserverPrx>();
try
{
const_cast<NodeObserverPrx&>(_externalPublisher) = NodeObserverPrx::uncheckedCast(adapter->addWithUUID(this));
@@ -327,7 +364,7 @@ void
NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -335,7 +372,10 @@ NodeObserverTopic::nodeUp(const NodeDynamicInfo& info, const Ice::Current&)
_nodes.insert(make_pair(info.info.name, info));
try
{
- _publisher->nodeUp(info);
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->nodeUp(info);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -354,7 +394,7 @@ void
NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& server, const Ice::Current&)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -394,7 +434,10 @@ NodeObserverTopic::updateServer(const string& node, const ServerDynamicInfo& ser
try
{
- _publisher->updateServer(node, server);
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->updateServer(node, server);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -407,7 +450,7 @@ void
NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& adapter, const Ice::Current&)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -447,7 +490,10 @@ NodeObserverTopic::updateAdapter(const string& node, const AdapterDynamicInfo& a
try
{
- _publisher->updateAdapter(node, adapter);
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->updateAdapter(node, adapter);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -460,7 +506,7 @@ void
NodeObserverTopic::nodeDown(const string& name)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return;
}
@@ -472,7 +518,10 @@ NodeObserverTopic::nodeDown(const string& name)
_nodes.erase(name);
try
{
- _publisher->nodeDown(name);
+ for(vector<NodeObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->nodeDown(name);
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -500,14 +549,14 @@ ApplicationObserverTopic::ApplicationObserverTopic(const IceStorm::TopicManagerP
ObserverTopic(topicManager, "ApplicationObserver"),
_applications(applications)
{
- const_cast<ApplicationObserverPrx&>(_publisher) = ApplicationObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<ApplicationObserverPrx>();
}
int
ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq& apps)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -519,7 +568,10 @@ ApplicationObserverTopic::applicationInit(int serial, const ApplicationInfoSeq&
}
try
{
- _publisher->applicationInit(serial, apps, getContext(serial));
+ for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->applicationInit(serial, apps, getContext(serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -534,7 +586,7 @@ int
ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -543,7 +595,10 @@ ApplicationObserverTopic::applicationAdded(int serial, const ApplicationInfo& in
_applications.insert(make_pair(info.descriptor.name, info));
try
{
- _publisher->applicationAdded(serial, info, getContext(serial));
+ for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->applicationAdded(serial, info, getContext(serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -558,7 +613,7 @@ int
ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -566,7 +621,10 @@ ApplicationObserverTopic::applicationRemoved(int serial, const string& name)
_applications.erase(name);
try
{
- _publisher->applicationRemoved(serial, name, getContext(serial));
+ for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->applicationRemoved(serial, name, getContext(serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -581,7 +639,7 @@ int
ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdateInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -592,7 +650,7 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
map<string, ApplicationInfo>::iterator p = _applications.find(info.descriptor.name);
if(p != _applications.end())
{
- ApplicationHelper helper(_publisher->ice_getCommunicator(), p->second.descriptor);
+ ApplicationHelper helper(_publishers[0]->ice_getCommunicator(), p->second.descriptor);
p->second.descriptor = helper.update(info.descriptor);
p->second.updateTime = info.updateTime;
p->second.updateUser = info.updateUser;
@@ -625,7 +683,10 @@ ApplicationObserverTopic::applicationUpdated(int serial, const ApplicationUpdate
}
try
{
- _publisher->applicationUpdated(serial, info, getContext(serial));
+ for(vector<ApplicationObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->applicationUpdated(serial, info, getContext(serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -653,14 +714,14 @@ AdapterObserverTopic::AdapterObserverTopic(const IceStorm::TopicManagerPrx& topi
ObserverTopic(topicManager, "AdapterObserver"),
_adapters(adapters)
{
- const_cast<AdapterObserverPrx&>(_publisher) = AdapterObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<AdapterObserverPrx>();
}
int
AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -672,7 +733,10 @@ AdapterObserverTopic::adapterInit(const AdapterInfoSeq& adpts)
}
try
{
- _publisher->adapterInit(adpts, getContext(_serial));
+ for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->adapterInit(adpts, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -687,7 +751,7 @@ int
AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -695,7 +759,10 @@ AdapterObserverTopic::adapterAdded(const AdapterInfo& info)
_adapters.insert(make_pair(info.id, info));
try
{
- _publisher->adapterAdded(info, getContext(_serial));
+ for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->adapterAdded(info, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -710,7 +777,7 @@ int
AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -718,7 +785,10 @@ AdapterObserverTopic::adapterUpdated(const AdapterInfo& info)
_adapters[info.id] = info;
try
{
- _publisher->adapterUpdated(info, getContext(_serial));
+ for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->adapterUpdated(info, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -733,7 +803,7 @@ int
AdapterObserverTopic::adapterRemoved(const string& id)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -741,7 +811,10 @@ AdapterObserverTopic::adapterRemoved(const string& id)
_adapters.erase(id);
try
{
- _publisher->adapterRemoved(id, getContext(_serial));
+ for(vector<AdapterObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->adapterRemoved(id, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -769,14 +842,14 @@ ObjectObserverTopic::ObjectObserverTopic(const IceStorm::TopicManagerPrx& topicM
ObserverTopic(topicManager, "ObjectObserver"),
_objects(objects)
{
- const_cast<ObjectObserverPrx&>(_publisher) = ObjectObserverPrx::uncheckedCast(_basePublisher);
+ _publishers = getPublishers<ObjectObserverPrx>();
}
int
ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -788,7 +861,10 @@ ObjectObserverTopic::objectInit(const ObjectInfoSeq& objects)
}
try
{
- _publisher->objectInit(objects, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->objectInit(objects, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -803,7 +879,7 @@ int
ObjectObserverTopic::objectAdded(const ObjectInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -811,7 +887,10 @@ ObjectObserverTopic::objectAdded(const ObjectInfo& info)
_objects.insert(make_pair(info.proxy->ice_getIdentity(), info));
try
{
- _publisher->objectAdded(info, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->objectAdded(info, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -826,7 +905,7 @@ int
ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -834,7 +913,10 @@ ObjectObserverTopic::objectUpdated(const ObjectInfo& info)
_objects[info.proxy->ice_getIdentity()] = info;
try
{
- _publisher->objectUpdated(info, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->objectUpdated(info, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -849,7 +931,7 @@ int
ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -857,7 +939,10 @@ ObjectObserverTopic::objectRemoved(const Ice::Identity& id)
_objects.erase(id);
try
{
- _publisher->objectRemoved(id, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator p = _publishers.begin(); p != _publishers.end(); ++p)
+ {
+ (*p)->objectRemoved(id, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -872,7 +957,7 @@ int
ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -886,7 +971,10 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
q->second = *p;
try
{
- _publisher->objectUpdated(*p, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ {
+ (*q)->objectUpdated(*p, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -899,7 +987,10 @@ ObjectObserverTopic::objectsAddedOrUpdated(const ObjectInfoSeq& infos)
_objects.insert(make_pair(p->proxy->ice_getIdentity(), *p));
try
{
- _publisher->objectAdded(*p, getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ {
+ (*q)->objectAdded(*p, getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
@@ -922,7 +1013,7 @@ int
ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
{
Lock sync(*this);
- if(!_topic)
+ if(_topics.empty())
{
return -1;
}
@@ -933,7 +1024,10 @@ ObjectObserverTopic::objectsRemoved(const ObjectInfoSeq& infos)
_objects.erase(p->proxy->ice_getIdentity());
try
{
- _publisher->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
+ for(vector<ObjectObserverPrx>::const_iterator q = _publishers.begin(); q != _publishers.end(); ++q)
+ {
+ (*q)->objectRemoved(p->proxy->ice_getIdentity(), getContext(_serial));
+ }
}
catch(const Ice::LocalException& ex)
{
diff --git a/cpp/src/IceGrid/Topics.h b/cpp/src/IceGrid/Topics.h
index 7399457d35e..1a27af89ae6 100644
--- a/cpp/src/IceGrid/Topics.h
+++ b/cpp/src/IceGrid/Topics.h
@@ -43,9 +43,19 @@ protected:
void updateSerial(int);
Ice::Context getContext(int) const;
+ template<typename T> std::vector<T> getPublishers() const
+ {
+ std::vector<T> publishers;
+ for(std::vector<Ice::ObjectPrx>::const_iterator p = _basePublishers.begin(); p != _basePublishers.end(); ++p)
+ {
+ publishers.push_back(T::uncheckedCast(*p));
+ }
+ return publishers;
+ }
+
Ice::LoggerPtr _logger;
- IceStorm::TopicPrx _topic;
- Ice::ObjectPrx _basePublisher;
+ std::map<Ice::EncodingVersion, IceStorm::TopicPrx> _topics;
+ std::vector<Ice::ObjectPrx> _basePublishers;
int _serial;
std::set<std::string> _syncSubscribers;
@@ -67,7 +77,7 @@ public:
private:
- const RegistryObserverPrx _publisher;
+ std::vector<RegistryObserverPrx> _publishers;
std::map<std::string, RegistryInfo> _registries;
};
typedef IceUtil::Handle<RegistryObserverTopic> RegistryObserverTopicPtr;
@@ -92,7 +102,7 @@ public:
private:
const NodeObserverPrx _externalPublisher;
- const NodeObserverPrx _publisher;
+ std::vector<NodeObserverPrx> _publishers;
std::map<std::string, NodeDynamicInfo> _nodes;
};
typedef IceUtil::Handle<NodeObserverTopic> NodeObserverTopicPtr;
@@ -112,7 +122,7 @@ public:
private:
- const ApplicationObserverPrx _publisher;
+ std::vector<ApplicationObserverPrx> _publishers;
std::map<std::string, ApplicationInfo> _applications;
};
typedef IceUtil::Handle<ApplicationObserverTopic> ApplicationObserverTopicPtr;
@@ -132,7 +142,7 @@ public:
private:
- const AdapterObserverPrx _publisher;
+ std::vector<AdapterObserverPrx> _publishers;
std::map<std::string, AdapterInfo> _adapters;
};
typedef IceUtil::Handle<AdapterObserverTopic> AdapterObserverTopicPtr;
@@ -155,7 +165,7 @@ public:
private:
- const ObjectObserverPrx _publisher;
+ std::vector<ObjectObserverPrx> _publishers;
std::map<Ice::Identity, ObjectInfo> _objects;
};
typedef IceUtil::Handle<ObjectObserverTopic> ObjectObserverTopicPtr;
diff --git a/cpp/test/IceGrid/replication/AllTests.cpp b/cpp/test/IceGrid/replication/AllTests.cpp
index 0d181db7773..17619fab729 100644
--- a/cpp/test/IceGrid/replication/AllTests.cpp
+++ b/cpp/test/IceGrid/replication/AllTests.cpp
@@ -91,7 +91,7 @@ waitForServerState(const IceGrid::AdminPrx& admin, const std::string& server, bo
return;
}
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(500));
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
++nRetry;
}
test(false);
@@ -118,7 +118,7 @@ waitForNodeState(const IceGrid::AdminPrx& admin, const std::string& node, bool u
}
}
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(500));
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
++nRetry;
}
try
@@ -218,7 +218,7 @@ waitAndPing(const Ice::ObjectPrx& obj)
}
catch(const Ice::LocalException&)
{
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(500));
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
++nRetry;
}
}
@@ -336,7 +336,7 @@ allTests(const Ice::CommunicatorPtr& comm)
int nRetry = 0;
while(slave1Admin->getObjectInfo(comm->stringToIdentity("TestIceGrid/Locator")) != info && nRetry < 30)
{
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(500));
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
++nRetry;
}
test(slave2Admin->getObjectInfo(comm->stringToIdentity("TestIceGrid/Locator")) == info);
@@ -352,7 +352,7 @@ allTests(const Ice::CommunicatorPtr& comm)
nRetry = 0;
while(slave1Admin->getObjectInfo(comm->stringToIdentity("TestIceGrid/Query")) != info && nRetry < 30)
{
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(500));
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
++nRetry;
}
test(slave2Admin->getObjectInfo(comm->stringToIdentity("TestIceGrid/Query")) == info);
@@ -371,7 +371,7 @@ allTests(const Ice::CommunicatorPtr& comm)
nRetry = 0;
while(slave1Admin->getObjectInfo(comm->stringToIdentity("TestIceGrid/Locator")) != info && nRetry < 30)
{
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(500));
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
++nRetry;
}
test(slave1Admin->getObjectInfo(comm->stringToIdentity("TestIceGrid/Locator")) == info);
@@ -385,7 +385,7 @@ allTests(const Ice::CommunicatorPtr& comm)
nRetry = 0;
while(slave1Admin->getObjectInfo(comm->stringToIdentity("TestIceGrid/Query")) != info && nRetry < 30)
{
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(500));
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
++nRetry;
}
test(slave1Admin->getObjectInfo(comm->stringToIdentity("TestIceGrid/Query")) == info);
@@ -1236,20 +1236,94 @@ allTests(const Ice::CommunicatorPtr& comm)
masterAdmin->removeApplication("TestApp");
}
cout << "ok" << endl;
+
+ cout << "testing interop with registry and node using the 1.0 encoding... " << flush;
+ {
+ params.clear();
+ params["id"] = "Slave3";
+ params["replicaName"] = "Slave3";
+ params["port"] = "12053";
+ params["encoding"] = "1.0";
+ instantiateServer(admin, "IceGridRegistry", params);
+
+ params.clear();
+ params["id"] = "Node2";
+ params["encoding"] = "1.0";
+ instantiateServer(admin, "IceGridNode", params);
+
+ admin->startServer("Slave3");
+ waitForServerState(admin, "Slave3", true);
+ int nRetry = 0;
+ while(nRetry < 30)
+ {
+ try
+ {
+ test(masterAdmin->pingRegistry("Slave3"));
+ break;
+ }
+ catch(const IceGrid::RegistryNotExistException&)
+ {
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(100));
+ ++nRetry;
+ }
+ }
+
+ admin->startServer("Node2");
+ waitForNodeState(masterAdmin, "Node2", true);
+
+ Ice::LocatorPrx slave3Locator =
+ Ice::LocatorPrx::uncheckedCast(comm->stringToProxy("TestIceGrid/Locator-Slave3 -e 1.0:default -p 12053"));
+ IceGrid::AdminPrx slave3Admin = createAdminSession(slave3Locator, "Slave3");
+ test(slave3Admin->pingNode("Node2"));
+
+ ApplicationDescriptor app;
+ app.name = "TestApp";
+ app.description = "added application";
+
+ ServerDescriptorPtr server = new ServerDescriptor();
+ server->id = "Server";
+ server->exe = comm->getProperties()->getProperty("TestDir") + "/server";
+ server->pwd = ".";
+ server->applicationDistrib = false;
+ server->allocatable = false;
+ addProperty(server, "Ice.Admin.Endpoints", "tcp -h 127.0.0.1");
+ server->activation = "on-demand";
+ AdapterDescriptor adapter;
+ adapter.name = "TestAdapter";
+ adapter.id = "TestAdapter.Server";
+ adapter.serverLifetime = true;
+ adapter.registerProcess = false;
+ PropertyDescriptor property;
+ property.name = "TestAdapter.Endpoints";
+ property.value = "default";
+ server->propertySet.properties.push_back(property);
+ property.name = "Identity";
+ property.value = "test";
+ server->propertySet.properties.push_back(property);
+ ObjectDescriptor object;
+ object.id = comm->stringToIdentity("test");
+ object.type = "::Test::TestIntf";
+ adapter.objects.push_back(object);
+ server->adapters.push_back(adapter);
+ app.nodes["Node2"].servers.push_back(server);
+
+ masterAdmin->addApplication(app);
+
+ comm->stringToProxy("test -e 1.0")->ice_locator(
+ masterLocator->ice_encodingVersion(Ice::Encoding_1_0))->ice_locatorCacheTimeout(0)->ice_ping();
+ comm->stringToProxy("test -e 1.0")->ice_locator(
+ slave1Locator->ice_encodingVersion(Ice::Encoding_1_0))->ice_locatorCacheTimeout(0)->ice_ping();
+ comm->stringToProxy("test -e 1.0")->ice_locator(slave3Locator)->ice_locatorCacheTimeout(0)->ice_ping();
+ masterAdmin->stopServer("Server");
+
+ }
+ cout << "ok" << endl;
- cout << "shutting down Node1... " << flush;
slave1Admin->shutdownNode("Node1");
- cout << "ok" << endl;
-
- cout << "removing Node1 server... " << flush;
removeServer(admin, "Node1");
- cout << "ok" << endl;
- cout << "removing Slave2 server..." << flush;
removeServer(admin, "Slave2");
- cout << "ok" << endl;
-
slave1Admin->shutdown();
removeServer(admin, "Slave1");
diff --git a/cpp/test/IceGrid/replication/application.xml b/cpp/test/IceGrid/replication/application.xml
index 40c5846bf47..74d758e9687 100644
--- a/cpp/test/IceGrid/replication/application.xml
+++ b/cpp/test/IceGrid/replication/application.xml
@@ -4,6 +4,7 @@
<server-template id="IceGridNode">
<parameter name="id"/>
+ <parameter name="encoding" default=""/>
<server id="${id}" exe="${ice.bindir}/icegridnode" activation="manual">
<option>--nowarn</option>
<dbenv name="data"/>
@@ -19,6 +20,8 @@
<property name="Ice.Trace.Network" value="0"/>
<property name="Ice.Warn.Connections" value="0"/>
<property name="Ice.Admin.Endpoints" value=""/>
+
+ <property name="Ice.Default.EncodingVersion" value="${encoding}"/>
</server>
</server-template>
@@ -26,6 +29,7 @@
<parameter name="id"/>
<parameter name="port"/>
<parameter name="replicaName"/>
+ <parameter name="encoding" default=""/>
<server id="${id}" exe="${ice.bindir}/icegridregistry" activation="manual">
<option>--nowarn</option>
<dbenv name="data">
@@ -55,6 +59,8 @@
<property name="Ice.Admin.Endpoints" value=""/>
<property name="Ice.Plugin.DB" value="${db-plugin}"/>
<property name="IceGrid.SQL.DatabaseType" value="QSQLITE"/>
+
+ <property name="Ice.Default.EncodingVersion" value="${encoding}"/>
</server>
</server-template>
diff --git a/cpp/test/IceGrid/session/AllTests.cpp b/cpp/test/IceGrid/session/AllTests.cpp
index c18dea86d86..0fa3b197aac 100644
--- a/cpp/test/IceGrid/session/AllTests.cpp
+++ b/cpp/test/IceGrid/session/AllTests.cpp
@@ -556,6 +556,8 @@ allTests(const Ice::CommunicatorPtr& communicator)
communicator->getLogger(), IceUtil::Time::seconds(5));
keepAlive->start();
+ bool encoding10 = communicator->getProperties()->getProperty("Ice.Default.EncodingVersion") == "1.0";
+
RegistryPrx registry = RegistryPrx::checkedCast(communicator->stringToProxy("IceGrid/Registry"));
test(registry);
@@ -905,7 +907,11 @@ allTests(const Ice::CommunicatorPtr& communicator)
}
catch(const Test::ExtendedPermissionDeniedException& ex)
{
- test(ex.reason == "reason");
+ test(!encoding10 && ex.reason == "reason");
+ }
+ catch(const Glacier2::PermissionDeniedException& ex)
+ {
+ test(encoding10 && ex.reason == "reason");
}
session1->ice_ping();
@@ -978,7 +984,11 @@ allTests(const Ice::CommunicatorPtr& communicator)
}
catch(const Test::ExtendedPermissionDeniedException& ex)
{
- test(ex.reason == "reason");
+ test(!encoding10 && ex.reason == "reason");
+ }
+ catch(const Glacier2::PermissionDeniedException& ex)
+ {
+ test(encoding10 && ex.reason == "reason");
}
admSession1->ice_ping();
@@ -1066,7 +1076,11 @@ allTests(const Ice::CommunicatorPtr& communicator)
}
catch(const Test::ExtendedPermissionDeniedException& ex)
{
- test(ex.reason == "reason");
+ test(!encoding10 && ex.reason == "reason");
+ }
+ catch(const Glacier2::PermissionDeniedException& ex)
+ {
+ test(encoding10 && ex.reason == "reason");
}
try
@@ -1137,7 +1151,11 @@ allTests(const Ice::CommunicatorPtr& communicator)
}
catch(const Test::ExtendedPermissionDeniedException& ex)
{
- test(ex.reason == "reason");
+ test(!encoding10 && ex.reason == "reason");
+ }
+ catch(const Glacier2::PermissionDeniedException& ex)
+ {
+ test(encoding10 && ex.reason == "reason");
}
Ice::ObjectPrx admin1 = admSession1->getAdmin()->ice_router(adminRouter1)->ice_connectionId("admRouter11");
@@ -1966,7 +1984,6 @@ allTests(const Ice::CommunicatorPtr& communicator)
cout << "ok" << endl;
}
-
admin->stopServer("PermissionsVerifierServer");
cout << "shutting down admin router... " << flush;
diff --git a/cpp/test/IceGrid/session/run.py b/cpp/test/IceGrid/session/run.py
index 00c9d18d088..8f45d842295 100755
--- a/cpp/test/IceGrid/session/run.py
+++ b/cpp/test/IceGrid/session/run.py
@@ -34,6 +34,8 @@ if not os.path.exists(node1Dir):
else:
IceGridAdmin.cleanDbDir(node1Dir)
+print("Running test with default encoding...")
+
sys.stdout.write("starting admin permissions verifier... ")
verifierProc = TestUtil.startServer(os.path.join(os.getcwd(), "verifier"), config=TestUtil.DriverConfig("server"))
print("ok")
@@ -52,3 +54,15 @@ IceGridAdmin.iceGridTest("application.xml",
'properties-override=\'%s\'' % IceGridAdmin.iceGridNodePropertiesOverride())
verifierProc.waitTestSuccess()
+
+print("Running test with 1.0 encoding...")
+
+sys.stdout.write("starting admin permissions verifier... ")
+verifierProc = TestUtil.startServer(os.path.join(os.getcwd(), "verifier"), config=TestUtil.DriverConfig("server"))
+print("ok")
+
+IceGridAdmin.iceGridTest("application.xml",
+ '--Ice.Default.EncodingVersion=1.0 --IceBinDir="%s" --TestDir="%s"' % (TestUtil.getCppBinDir(), os.getcwd()),
+ 'properties-override=\'%s\'' % IceGridAdmin.iceGridNodePropertiesOverride())
+
+verifierProc.waitTestSuccess()