summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/CHANGES9
-rw-r--r--cpp/demo/IceStorm/clock/Publisher.cpp42
-rw-r--r--cpp/demo/IceStorm/clock/Subscriber.cpp71
-rw-r--r--cpp/demo/IceStorm/clock/config.service2
-rw-r--r--cpp/demo/IceStorm/clock/config.sub2
-rw-r--r--cpp/demo/IceStorm/counter/CounterI.cpp6
-rw-r--r--cpp/slice/IceStorm/IceStorm.ice24
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp4
-rw-r--r--cpp/src/IceGrid/Topics.cpp4
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp67
-rw-r--r--cpp/src/IceStorm/TopicI.cpp50
-rw-r--r--cpp/test/IceStorm/federation/.depend6
-rw-r--r--cpp/test/IceStorm/federation/Subscriber.cpp24
-rw-r--r--cpp/test/IceStorm/federation2/Subscriber.cpp12
-rw-r--r--cpp/test/IceStorm/single/Subscriber.cpp58
-rw-r--r--cpp/test/IceStorm/stress/Subscriber.cpp52
-rwxr-xr-xcpp/test/IceStorm/stress/run.py4
17 files changed, 304 insertions, 133 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES
index 2f5ec41aaf3..bf083e5abc1 100644
--- a/cpp/CHANGES
+++ b/cpp/CHANGES
@@ -249,6 +249,15 @@ Changes since version 3.1.1
is used to send an event to only that particular subscriber. See
demo/IceStorm/counter for a demo of this feature.
+ The possible values of the reliability QoS passed to
+ subscribeAndGetPublisher are now different. The only allowable value
+ is either "ordered" or the empty string. The delivery reliability
+ quality of service used by IceStorm is now derived from the the
+ proxy mode. For example, if oneway delivery is required then a
+ oneway proxy must be passed. The "ordered" QoS requires a twoway
+ proxy otherwise a BadQoS exception is raised. Topic::subscribe has
+ been deprecated and retains the old subscription QoS semantics.
+
The property IceStorm.InstanceName is now used to produce unique identities
for each IceStorm topic. The identities used are now:
diff --git a/cpp/demo/IceStorm/clock/Publisher.cpp b/cpp/demo/IceStorm/clock/Publisher.cpp
index ec61a31d7da..51ef4e54c2e 100644
--- a/cpp/demo/IceStorm/clock/Publisher.cpp
+++ b/cpp/demo/IceStorm/clock/Publisher.cpp
@@ -9,6 +9,7 @@
#include <Ice/Application.h>
#include <IceStorm/IceStorm.h>
+#include <IceUtil/Options.h>
#include <Clock.h>
@@ -29,9 +30,31 @@ main(int argc, char* argv[])
return app.main(argc, argv, "config.pub");
}
+void
+usage(const string& n)
+{
+ cerr << "Usage: " << n << " [--datagram|--twoway|--oneway] [topic]\n" << endl;
+}
+
int
Publisher::run(int argc, char* argv[])
{
+ IceUtil::Options opts;
+ opts.addOpt("", "datagram");
+ opts.addOpt("", "twoway");
+ opts.addOpt("", "oneway");
+
+ IceUtil::Options::StringVector remaining;
+ try
+ {
+ remaining = opts.parse(argc, argv);
+ }
+ catch(const IceUtil::BadOptException& e)
+ {
+ cerr << argv[0] << ": " << e.reason << endl;
+ return EXIT_FAILURE;
+ }
+
IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
communicator()->propertyToProxy("IceStorm.TopicManager.Proxy"));
if(!manager)
@@ -41,9 +64,9 @@ Publisher::run(int argc, char* argv[])
}
string topicName = "time";
- if(argc != 1)
+ if(!remaining.empty())
{
- topicName = argv[1];
+ topicName = remaining.front();
}
//
@@ -71,7 +94,20 @@ Publisher::run(int argc, char* argv[])
// Get the topic's publisher object, the Clock type, and create a
// oneway Clock proxy (for efficiency reasons).
//
- ClockPrx clock = ClockPrx::uncheckedCast(topic->getPublisher()->ice_oneway());
+ Ice::ObjectPrx publisher = topic->getPublisher();
+ if(opts.isSet("datagram"))
+ {
+ publisher = publisher->ice_datagram();
+ }
+ else if(opts.isSet("twoway"))
+ {
+ // Do nothing.
+ }
+ else
+ {
+ publisher = publisher->ice_oneway();
+ }
+ ClockPrx clock = ClockPrx::uncheckedCast(publisher);
cout << "publishing tick events. Press ^C to terminate the application." << endl;
try
diff --git a/cpp/demo/IceStorm/clock/Subscriber.cpp b/cpp/demo/IceStorm/clock/Subscriber.cpp
index c08173003ee..01b27870fd4 100644
--- a/cpp/demo/IceStorm/clock/Subscriber.cpp
+++ b/cpp/demo/IceStorm/clock/Subscriber.cpp
@@ -9,6 +9,7 @@
#include <Ice/Application.h>
#include <IceStorm/IceStorm.h>
+#include <IceUtil/Options.h>
#include <Clock.h>
@@ -42,9 +43,34 @@ main(int argc, char* argv[])
return app.main(argc, argv, "config.sub");
}
+void
+usage(const string& n)
+{
+ cerr << "Usage: " << n << " [--batch] [--datagram|--twoway|--ordered|--oneway] [topic]\n" << endl;
+}
+
int
Subscriber::run(int argc, char* argv[])
{
+ IceUtil::Options opts;
+ opts.addOpt("", "datagram");
+ opts.addOpt("", "twoway");
+ opts.addOpt("", "ordered");
+ opts.addOpt("", "oneway");
+ opts.addOpt("", "batch");
+
+ IceUtil::Options::StringVector remaining;
+ try
+ {
+ remaining = opts.parse(argc, argv);
+ }
+ catch(const IceUtil::BadOptException& e)
+ {
+ cerr << argv[0] << ": " << e.reason << endl;
+ usage(appName());
+ return EXIT_FAILURE;
+ }
+
IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast(
communicator()->propertyToProxy("IceStorm.TopicManager.Proxy"));
if(!manager)
@@ -54,9 +80,9 @@ Subscriber::run(int argc, char* argv[])
}
string topicName = "time";
- if(argc != 1)
+ if(!remaining.empty())
{
- topicName = argv[1];
+ topicName = remaining.front();
}
IceStorm::TopicPrx topic;
@@ -82,15 +108,46 @@ Subscriber::run(int argc, char* argv[])
//
// Add a Servant for the Ice Object.
//
+ IceStorm::QoS qos;
Ice::ObjectPrx subscriber = adapter->addWithUUID(new ClockI);
-
//
- // This demo requires no quality of service, so it will use
- // the defaults.
+ // Set up the proxy.
//
- IceStorm::QoS qos;
+ if(opts.isSet("datagram"))
+ {
+ subscriber = subscriber->ice_datagram();
+ }
+ else if(opts.isSet("twoway"))
+ {
+ // Do nothing.
+ }
+ else if(opts.isSet("ordered"))
+ {
+ qos["reliability"] = "ordered";
+ subscriber = subscriber->ice_datagram();
+ }
+ else // default.
+ {
+ subscriber = subscriber->ice_oneway();
+ }
+ if(opts.isSet("batch"))
+ {
+ if(opts.isSet("twoway") || opts.isSet("ordered"))
+ {
+ cerr << appName() << ": batch can only be set with oneway or datagram" << endl;
+ return EXIT_FAILURE;
+ }
+ if(opts.isSet("datagram"))
+ {
+ subscriber = subscriber->ice_batchDatagram();
+ }
+ else
+ {
+ subscriber = subscriber->ice_batchOneway();
+ }
+ }
- topic->subscribe(qos, subscriber);
+ topic->subscribeAndGetPublisher(qos, subscriber);
adapter->activate();
shutdownOnInterrupt();
diff --git a/cpp/demo/IceStorm/clock/config.service b/cpp/demo/IceStorm/clock/config.service
index 4f1edc9ad3f..0ed60e4cb34 100644
--- a/cpp/demo/IceStorm/clock/config.service
+++ b/cpp/demo/IceStorm/clock/config.service
@@ -20,7 +20,7 @@ IceStorm.InstanceName=DemoIceStorm
# IceStorm instances this must run on a fixed port (or use
# IceGrid).
#
-Ice.OA.IceStorm.Publish.Endpoints=default -p 10001
+Ice.OA.IceStorm.Publish.Endpoints=tcp -p 10001:udp -p 10001
#
# TopicManager Tracing
diff --git a/cpp/demo/IceStorm/clock/config.sub b/cpp/demo/IceStorm/clock/config.sub
index cb6b50d94f0..4f60a8ca7c1 100644
--- a/cpp/demo/IceStorm/clock/config.sub
+++ b/cpp/demo/IceStorm/clock/config.sub
@@ -2,7 +2,7 @@
# This property is used to configure the endpoints of the clock
# subscriber adapter.
#
-Ice.OA.Clock.Subscriber.Endpoints=tcp
+Ice.OA.Clock.Subscriber.Endpoints=tcp:udp
#
# This property is used by the clients to connect to IceStorm.
diff --git a/cpp/demo/IceStorm/counter/CounterI.cpp b/cpp/demo/IceStorm/counter/CounterI.cpp
index 2b59ac5f460..9391cec441f 100644
--- a/cpp/demo/IceStorm/counter/CounterI.cpp
+++ b/cpp/demo/IceStorm/counter/CounterI.cpp
@@ -26,15 +26,13 @@ CounterI::subscribe(const CounterObserverPrx& observer, const Ice::Current&)
{
Lock sync(*this);
- IceStorm::QoS qos;
- qos["reliability"] = "twoway";
-
//
// Subscribe to the IceStorm topic. This returns a per-subscriber
// object which is then used to send the initialize event to just
// the given subscriber.
//
- CounterObserverPrx o = CounterObserverPrx::uncheckedCast(_topic->subscribeAndGetPublisher(qos, observer));
+ CounterObserverPrx o = CounterObserverPrx::uncheckedCast(
+ _topic->subscribeAndGetPublisher(IceStorm::QoS(), observer));
o->init(_value);
}
diff --git a/cpp/slice/IceStorm/IceStorm.ice b/cpp/slice/IceStorm/IceStorm.ice
index f136f4a578f..f9a543f770a 100644
--- a/cpp/slice/IceStorm/IceStorm.ice
+++ b/cpp/slice/IceStorm/IceStorm.ice
@@ -115,6 +115,22 @@ exception AlreadySubscribed
/**
*
+ * This exception indicates that a subscription failed due to an
+ * invalid QoS.
+ *
+ **/
+exception BadQoS
+{
+ /*
+ *
+ * The reason for the failed.
+ *
+ */
+ string reason;
+};
+
+/**
+ *
* Publishers publish information on a particular topic. A topic
* logically represents a type.
*
@@ -153,6 +169,8 @@ interface Topic
* replaced. Note that this can cause a loss of events to the
* subscribed object.
*
+ * <p class="Deprecated">This operation is deprecated as of version 3.2.
+ *
* @param qos The quality of service parameters for this
* subscription.
*
@@ -163,6 +181,7 @@ interface Topic
* @see unsubscribe
*
**/
+ ["deprecate:subscribe is deprecated, use subscribeAndGetPublisher instead"]
void subscribe(QoS theQoS, Object* subscriber);
/**
@@ -181,11 +200,14 @@ interface Topic
* @throws AlreadySubscribed Raised if the subscriber object is
* already subscribed.
*
+ * @throws BadQoS Raised if the requested quality of service
+ * is unavailable or invalid.
+ *
* @see unsubscribe
*
**/
Object* subscribeAndGetPublisher(QoS theQoS, Object* subscriber)
- throws AlreadySubscribed;
+ throws AlreadySubscribed, BadQoS;
/**
*
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
index 19f32d5b1c5..fa5102a322c 100644
--- a/cpp/src/IceGrid/ReplicaCache.cpp
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -167,8 +167,8 @@ ReplicaCache::subscribe(const ReplicaObserverPrx& observer)
}
IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
- Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer);
+ qos["reliability"] = "ordered";
+ Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer->ice_twoway());
ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas);
}
catch(const Ice::ConnectionRefusedException&)
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index ff87fae0d72..5e5aa3dfd6d 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -63,8 +63,8 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
}
IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
- initObserver(_topic->subscribeAndGetPublisher(qos, obsv));
+ qos["reliability"] = "ordered";
+ initObserver(_topic->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
_subscribers.insert(obsv->ice_getIdentity());
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 435c9bfdf63..9174eed5949 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -96,10 +96,7 @@ class SubscriberOneway : public Subscriber
{
public:
- SubscriberOneway(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&,
- bool);
+ SubscriberOneway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
//
// Oneway
//
@@ -117,9 +114,7 @@ class SubscriberTwoway : public Subscriber
{
public:
- SubscriberTwoway(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&);
+ SubscriberTwoway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
virtual bool flush();
@@ -135,9 +130,7 @@ class SubscriberTwowayOrdered : public Subscriber
{
public:
- SubscriberTwowayOrdered(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&);
+ SubscriberTwowayOrdered(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
virtual bool flush();
void response();
@@ -152,9 +145,7 @@ class SubscriberLink : public Subscriber
{
public:
- SubscriberLink(const InstancePtr&,
- const TopicLinkPrx&,
- int);
+ SubscriberLink(const InstancePtr&, const TopicLinkPrx&, int);
virtual QueueState queue(bool, const std::vector<EventDataPtr>&);
virtual bool flush();
@@ -178,10 +169,9 @@ typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr;
SubscriberOneway::SubscriberOneway(
const InstancePtr& instance,
const Ice::ObjectPrx& proxy,
- const Ice::ObjectPrx& obj,
- bool batch) :
+ const Ice::ObjectPrx& obj) :
Subscriber(instance, proxy, false, obj->ice_getIdentity()),
- _batch(batch),
+ _batch(obj->ice_isBatchDatagram() || obj->ice_isBatchOneway()),
_obj(obj)
{
//
@@ -196,7 +186,7 @@ SubscriberOneway::SubscriberOneway(
_objBatch = obj->ice_batchOneway();
}
- if(batch)
+ if(_batch)
{
_instance->batchFlusher()->add(_obj);
}
@@ -685,12 +675,16 @@ Subscriber::create(
try
{
- string reliability = "oneway";
+ string reliability;
QoS::const_iterator p = qos.find("reliability");
if(p != qos.end())
{
reliability = p->second;
}
+ if(!reliability.empty() && reliability != "ordered")
+ {
+ throw BadQoS("invalid reliability: " + reliability);
+ }
//
// Override the timeout.
@@ -709,41 +703,22 @@ Subscriber::create(
//
newObj = obj;
}
-
- if(reliability == "batch")
+ if(reliability == "ordered")
{
- if(newObj->ice_isDatagram())
- {
- newObj = newObj->ice_batchDatagram();
- }
- else
+ if(!newObj->ice_isTwoway())
{
- newObj = newObj->ice_batchOneway();
+ throw BadQoS("ordered reliability requires a twoway proxy");
}
- subscriber = new SubscriberOneway(instance, proxy, newObj, true);
- }
- else if(reliability == "twoway")
- {
- newObj = newObj->ice_twoway();
- subscriber = new SubscriberTwoway(instance, proxy, newObj);
+ subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
}
- else if(reliability == "twoway ordered")
+ else if(newObj->ice_isOneway() || newObj->ice_isDatagram() ||
+ newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
{
- newObj = newObj->ice_twoway();
- subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
+ subscriber = new SubscriberOneway(instance, proxy, newObj);
}
- else // reliability == "oneway"
+ else if(newObj->ice_isTwoway())
{
- if(reliability != "oneway" && traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << reliability <<" mode not understood.";
- }
- if(!newObj->ice_isDatagram())
- {
- newObj = newObj->ice_oneway();
- }
- subscriber = new SubscriberOneway(instance, proxy, newObj, false);
+ subscriber = new SubscriberTwoway(instance, proxy, newObj);
}
per->setSubscriber(subscriber);
}
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index dbd80819f11..44fc11b32db 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -203,10 +203,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end,
#endif
void
-TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&)
+TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
{
Ice::Identity id = obj->ice_getIdentity();
TraceLevelsPtr traceLevels = _instance->traceLevels();
+ QoS qos = origQoS;
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -225,6 +226,51 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&
}
}
+
+ string reliability = "oneway";
+ {
+ QoS::iterator p = qos.find("reliability");
+ if(p != qos.end())
+ {
+ reliability = p->second;
+ qos.erase(p);
+ }
+ }
+
+ Ice::ObjectPrx newObj = obj;
+ if(reliability == "batch")
+ {
+ if(newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_batchDatagram();
+ }
+ else
+ {
+ newObj = newObj->ice_batchOneway();
+ }
+ }
+ else if(reliability == "twoway")
+ {
+ newObj = newObj->ice_twoway();
+ }
+ else if(reliability == "twoway ordered")
+ {
+ qos["reliability"] = "ordered";
+ newObj = newObj->ice_twoway();
+ }
+ else // reliability == "oneway"
+ {
+ if(reliability != "oneway" && traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << reliability <<" mode not understood.";
+ }
+ if(!newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_oneway();
+ }
+ }
+
IceUtil::Mutex::Lock sync(_subscribersMutex);
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
@@ -234,7 +280,7 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&
_subscribers.erase(p);
}
- SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
+ SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos);
_subscribers.push_back(subscriber);
_instance->subscriberPool()->add(subscriber);
}
diff --git a/cpp/test/IceStorm/federation/.depend b/cpp/test/IceStorm/federation/.depend
index 6478eb332d0..e72f0fe7a33 100644
--- a/cpp/test/IceStorm/federation/.depend
+++ b/cpp/test/IceStorm/federation/.depend
@@ -1,5 +1,5 @@
-Event$(OBJEXT): Event.cpp ./Event.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/Ice/Proxy.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/StreamF.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/InstanceF.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/Ice/StringConverter.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/Direct.h ../../../include/Ice/LocalException.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/ObjectFactory.h ../../../include/IceUtil/Iterator.h ../../../include/IceUtil/ScopedArray.h
-Publisher$(OBJEXT): Publisher.cpp ../../../include/Ice/Ice.h ../../../include/Ice/Initialize.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/PropertiesF.h ../../../include/Ice/InstanceF.h ../../../include/Ice/LoggerF.h ../../../include/Ice/StreamF.h ../../../include/Ice/StatsF.h ../../../include/Ice/StringConverter.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/Proxy.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/LocalException.h ../../../include/Ice/Properties.h ../../../include/Ice/Logger.h ../../../include/Ice/LoggerUtil.h ../../../include/Ice/Stats.h ../../../include/Ice/Communicator.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/RouterF.h ../../../include/Ice/LocatorF.h ../../../include/Ice/PluginF.h ../../../include/Ice/ImplicitContextF.h ../../../include/Ice/ObjectFactory.h ../../../include/Ice/ObjectAdapter.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/OutgoingAsync.h ../../../include/IceUtil/RecMutex.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/IncomingAsync.h ../../../include/Ice/Direct.h ../../../include/Ice/UserExceptionFactory.h ../../../include/Ice/FactoryTable.h ../../../include/Ice/FactoryTableDef.h ../../../include/IceUtil/StaticMutex.h ../../../include/Ice/UserExceptionFactoryF.h ../../../include/Ice/FacetMap.h ../../../include/Ice/Locator.h ../../../include/Ice/ProcessF.h ../../../include/Ice/ServantLocator.h ../../../include/Ice/Process.h ../../../include/Ice/Application.h ../../../include/Ice/Connection.h ../../../include/Ice/Functional.h ../../../include/IceUtil/Functional.h ../../../include/Ice/Stream.h ../../../include/Ice/ImplicitContext.h ../../../include/IceStorm/IceStorm.h ../../../include/Ice/SliceChecksumDict.h ./Event.h
-Subscriber$(OBJEXT): Subscriber.cpp ../../../include/IceUtil/DisableWarnings.h ../../../include/Ice/Ice.h ../../../include/Ice/Initialize.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/PropertiesF.h ../../../include/Ice/InstanceF.h ../../../include/Ice/LoggerF.h ../../../include/Ice/StreamF.h ../../../include/Ice/StatsF.h ../../../include/Ice/StringConverter.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/Proxy.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/LocalException.h ../../../include/Ice/Properties.h ../../../include/Ice/Logger.h ../../../include/Ice/LoggerUtil.h ../../../include/Ice/Stats.h ../../../include/Ice/Communicator.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/RouterF.h ../../../include/Ice/LocatorF.h ../../../include/Ice/PluginF.h ../../../include/Ice/ImplicitContextF.h ../../../include/Ice/ObjectFactory.h ../../../include/Ice/ObjectAdapter.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/OutgoingAsync.h ../../../include/IceUtil/RecMutex.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/IncomingAsync.h ../../../include/Ice/Direct.h ../../../include/Ice/UserExceptionFactory.h ../../../include/Ice/FactoryTable.h ../../../include/Ice/FactoryTableDef.h ../../../include/IceUtil/StaticMutex.h ../../../include/Ice/UserExceptionFactoryF.h ../../../include/Ice/FacetMap.h ../../../include/Ice/Locator.h ../../../include/Ice/ProcessF.h ../../../include/Ice/ServantLocator.h ../../../include/Ice/Process.h ../../../include/Ice/Application.h ../../../include/Ice/Connection.h ../../../include/Ice/Functional.h ../../../include/IceUtil/Functional.h ../../../include/Ice/Stream.h ../../../include/Ice/ImplicitContext.h ../../../include/IceStorm/IceStorm.h ../../../include/Ice/SliceChecksumDict.h ./Event.h ../../include/TestCommon.h
+Event$(OBJEXT): Event.cpp Event.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/Proxy.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/StreamF.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/InstanceF.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/Ice/StringConverter.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/Direct.h ../../../include/Ice/LocalException.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/ObjectFactory.h ../../../include/IceUtil/Iterator.h ../../../include/IceUtil/ScopedArray.h
+Publisher$(OBJEXT): Publisher.cpp ../../../include/Ice/Ice.h ../../../include/Ice/Initialize.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/PropertiesF.h ../../../include/Ice/InstanceF.h ../../../include/Ice/LoggerF.h ../../../include/Ice/StreamF.h ../../../include/Ice/StatsF.h ../../../include/Ice/StringConverter.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/Proxy.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/LocalException.h ../../../include/Ice/Properties.h ../../../include/Ice/Logger.h ../../../include/Ice/LoggerUtil.h ../../../include/Ice/Stats.h ../../../include/Ice/Communicator.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/RouterF.h ../../../include/Ice/LocatorF.h ../../../include/Ice/PluginF.h ../../../include/Ice/ImplicitContextF.h ../../../include/Ice/ObjectFactory.h ../../../include/Ice/ObjectAdapter.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/OutgoingAsync.h ../../../include/IceUtil/RecMutex.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/IncomingAsync.h ../../../include/Ice/Direct.h ../../../include/Ice/UserExceptionFactory.h ../../../include/Ice/FactoryTable.h ../../../include/Ice/FactoryTableDef.h ../../../include/IceUtil/StaticMutex.h ../../../include/Ice/UserExceptionFactoryF.h ../../../include/Ice/FacetMap.h ../../../include/Ice/Locator.h ../../../include/Ice/ProcessF.h ../../../include/Ice/ServantLocator.h ../../../include/Ice/Process.h ../../../include/Ice/Application.h ../../../include/Ice/Connection.h ../../../include/Ice/Functional.h ../../../include/IceUtil/Functional.h ../../../include/Ice/Stream.h ../../../include/Ice/ImplicitContext.h ../../../include/IceStorm/IceStorm.h ../../../include/Ice/SliceChecksumDict.h Event.h
+Subscriber$(OBJEXT): Subscriber.cpp ../../../include/IceUtil/DisableWarnings.h ../../../include/Ice/Ice.h ../../../include/Ice/Initialize.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/PropertiesF.h ../../../include/Ice/InstanceF.h ../../../include/Ice/LoggerF.h ../../../include/Ice/StreamF.h ../../../include/Ice/StatsF.h ../../../include/Ice/StringConverter.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/Proxy.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/LocalException.h ../../../include/Ice/Properties.h ../../../include/Ice/Logger.h ../../../include/Ice/LoggerUtil.h ../../../include/Ice/Stats.h ../../../include/Ice/Communicator.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/RouterF.h ../../../include/Ice/LocatorF.h ../../../include/Ice/PluginF.h ../../../include/Ice/ImplicitContextF.h ../../../include/Ice/ObjectFactory.h ../../../include/Ice/ObjectAdapter.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/OutgoingAsync.h ../../../include/IceUtil/RecMutex.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/IncomingAsync.h ../../../include/Ice/Direct.h ../../../include/Ice/UserExceptionFactory.h ../../../include/Ice/FactoryTable.h ../../../include/Ice/FactoryTableDef.h ../../../include/IceUtil/StaticMutex.h ../../../include/Ice/UserExceptionFactoryF.h ../../../include/Ice/FacetMap.h ../../../include/Ice/Locator.h ../../../include/Ice/ProcessF.h ../../../include/Ice/ServantLocator.h ../../../include/Ice/Process.h ../../../include/Ice/Application.h ../../../include/Ice/Connection.h ../../../include/Ice/Functional.h ../../../include/IceUtil/Functional.h ../../../include/Ice/Stream.h ../../../include/Ice/ImplicitContext.h ../../../include/IceStorm/IceStorm.h ../../../include/Ice/SliceChecksumDict.h Event.h ../../include/TestCommon.h
Event.cpp: Event.ice
Event.ice: $(SLICE2CPP) $(SLICEPARSERLIB)
diff --git a/cpp/test/IceStorm/federation/Subscriber.cpp b/cpp/test/IceStorm/federation/Subscriber.cpp
index 181d1e839e1..c6c3efc8578 100644
--- a/cpp/test/IceStorm/federation/Subscriber.cpp
+++ b/cpp/test/IceStorm/federation/Subscriber.cpp
@@ -14,14 +14,6 @@
#include <TestCommon.h>
-#include <fcntl.h>
-#ifdef _WIN32
-# include <io.h>
-#else
-# include <sys/types.h>
-# include <sys/stat.h>
-#endif
-
using namespace std;
using namespace Ice;
using namespace IceStorm;
@@ -136,7 +128,15 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
IceStorm::QoS qos;
if(batch)
{
- qos["reliability"] = "batch";
+ objFed1 = objFed1->ice_batchOneway();
+ objFed2 = objFed1->ice_batchOneway();
+ objFed3 = objFed1->ice_batchOneway();
+ }
+ else
+ {
+ objFed1 = objFed1->ice_oneway();
+ objFed2 = objFed1->ice_oneway();
+ objFed3 = objFed1->ice_oneway();
}
TopicPrx fed1;
@@ -155,9 +155,9 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
return EXIT_FAILURE;
}
- fed1->subscribe(qos, objFed1);
- fed2->subscribe(qos, objFed2);
- fed3->subscribe(qos, objFed3);
+ fed1->subscribeAndGetPublisher(qos, objFed1);
+ fed2->subscribeAndGetPublisher(qos, objFed2);
+ fed3->subscribeAndGetPublisher(qos, objFed3);
communicator->waitForShutdown();
diff --git a/cpp/test/IceStorm/federation2/Subscriber.cpp b/cpp/test/IceStorm/federation2/Subscriber.cpp
index 0cbc16d779b..88a4408c54d 100644
--- a/cpp/test/IceStorm/federation2/Subscriber.cpp
+++ b/cpp/test/IceStorm/federation2/Subscriber.cpp
@@ -125,14 +125,18 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
//
// Activate the servants.
//
- ObjectPrx objFed1 = adapter->addWithUUID(eventFed1);
+ ObjectPrx obj = adapter->addWithUUID(eventFed1);
adapter->activate();
IceStorm::QoS qos;
if(batch)
{
- qos["reliability"] = "batch";
+ obj = obj->ice_batchOneway();
+ }
+ else
+ {
+ obj = obj->ice_oneway();
}
TopicPrx fed1;
@@ -147,11 +151,11 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
return EXIT_FAILURE;
}
- fed1->subscribe(qos, objFed1);
+ fed1->subscribeAndGetPublisher(qos, obj);
communicator->waitForShutdown();
- fed1->unsubscribe(objFed1);
+ fed1->unsubscribe(obj);
return EXIT_SUCCESS;
}
diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp
index 2118f49b2b0..c0c8e1b4f93 100644
--- a/cpp/test/IceStorm/single/Subscriber.cpp
+++ b/cpp/test/IceStorm/single/Subscriber.cpp
@@ -13,14 +13,6 @@
#include <Single.h>
#include <TestCommon.h>
-#include <fcntl.h>
-#ifdef _WIN32
-# include <io.h>
-#else
-# include <sys/types.h>
-# include <sys/stat.h>
-#endif
-
using namespace std;
using namespace Ice;
using namespace IceStorm;
@@ -30,26 +22,34 @@ class SingleI : public Single, public IceUtil::Monitor<IceUtil::Mutex>
{
public:
- SingleI(const CommunicatorPtr& communicator, const string& name, bool ordered = false) :
+ SingleI(const CommunicatorPtr& communicator, const string& name) :
_communicator(communicator),
_name(name),
_count(0),
- _ordered(ordered),
_last(0)
{
}
- virtual void event(int i, const Current&)
+ virtual void
+ event(int i, const Current& current)
{
- Lock sync(*this);
- if(_ordered && i != _last)
+ if((_name == "default" || _name == "oneway" || _name == "batch") && current.requestId != 0)
+ {
+ cerr << endl << "expected oneway request";
+ test(false);
+ }
+ else if((_name == "twoway" || _name == "twoway ordered") && current.requestId == 0)
+ {
+ cerr << endl << "expected twoway request";
+ }
+ if(_name == "twoway ordered" && i != _last)
{
cerr << endl << "received unordered event for `" << _name << "': " << i << " " << _last;
test(false);
}
+ Lock sync(*this);
++_last;
-
if(++_count == 1000)
{
notify();
@@ -118,6 +118,9 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
// Create subscribers with different QoS.
//
vector<SingleIPtr> subscribers;
+ //
+ // First we use the old deprecated API.
+ //
{
subscribers.push_back(new SingleI(communicator, "default"));
topic->subscribe(IceStorm::QoS(), adapter->addWithUUID(subscribers.back()));
@@ -141,11 +144,36 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
topic->subscribe(qos, adapter->addWithUUID(subscribers.back()));
}
{
- subscribers.push_back(new SingleI(communicator, "twoway ordered", true)); // Ordered
+ subscribers.push_back(new SingleI(communicator, "twoway ordered")); // Ordered
IceStorm::QoS qos;
qos["reliability"] = "twoway ordered";
topic->subscribe(qos, adapter->addWithUUID(subscribers.back()));
}
+ //
+ // Next we use the new API call with the new proxy semantics.
+ //
+ {
+ subscribers.push_back(new SingleI(communicator, "default"));
+ topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_oneway());
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "oneway"));
+ topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_oneway());
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "twoway"));
+ topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back()));
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "batch"));
+ topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_batchOneway());
+ }
+ {
+ subscribers.push_back(new SingleI(communicator, "twoway ordered")); // Ordered
+ IceStorm::QoS qos;
+ qos["reliability"] = "ordered";
+ topic->subscribeAndGetPublisher(qos, adapter->addWithUUID(subscribers.back()));
+ }
adapter->activate();
diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp
index 3bf45842cec..313fb47c485 100644
--- a/cpp/test/IceStorm/stress/Subscriber.cpp
+++ b/cpp/test/IceStorm/stress/Subscriber.cpp
@@ -18,14 +18,6 @@
#include <TestCommon.h>
-#include <fcntl.h>
-#ifdef _WIN32
-# include <io.h>
-#else
-# include <sys/types.h>
-# include <sys/stat.h>
-#endif
-
using namespace std;
using namespace Ice;
using namespace IceStorm;
@@ -174,20 +166,6 @@ private:
IceUtil::StaticMutex ErraticEventI::_remainingMutex = ICE_STATIC_MUTEX_INITIALIZER;
int ErraticEventI::_remaining = 0;
-void
-usage(const char* appName)
-{
- cerr << "Usage: " << appName << " [options]\n";
- cerr <<
- "Options:\n"
- "-h, --help Show this message.\n"
- "--events <e> Terminate after <e> are received.\n"
- "--qos <key>,<value><e> Subscribe with this QoS.\n"
- "--erratic <n> Add <n> erratic subscribers.\n"
- "--slow The subscribers sleeps 3 seconds after each event.\n"
- ;
-}
-
struct Subscription
{
Ice::ObjectAdapterPtr adapter;
@@ -277,6 +255,7 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SubscriberAdapter", "default");
+ string reliability = "";
EventIPtr servant;
if(erratic)
{
@@ -297,15 +276,20 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
}
else
{
- map<string, string>::const_iterator reliability = qos.find("reliability");
- if(reliability != qos.end())
+ map<string, string>::iterator p = qos.find("reliability");
+ if(p != qos.end())
{
- if(reliability->second == "twoway ordered")
+ reliability = p->second;
+ if(reliability != "ordered")
{
- servant = new OrderEventI(communicator, events);
+ qos.erase(p);
}
}
- if(!servant)
+ if(reliability == "ordered")
+ {
+ servant = new OrderEventI(communicator, events);
+ }
+ else
{
servant = new CountEventI(communicator, events);
}
@@ -338,8 +322,20 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator)
for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p)
{
p->obj = p->adapter->addWithUUID(p->servant);
+ if(reliability == "twoway" || reliability == "ordered")
+ {
+ // Do nothing.
+ }
+ else if(reliability == "batch")
+ {
+ p->obj = p->obj->ice_batchOneway();
+ }
+ else //if(reliability == "oneway")
+ {
+ p->obj = p->obj->ice_oneway();
+ }
p->adapter->activate();
- topic->subscribe(qos, p->obj);
+ topic->subscribeAndGetPublisher(qos, p->obj);
}
}
diff --git a/cpp/test/IceStorm/stress/run.py b/cpp/test/IceStorm/stress/run.py
index 5da53f11c35..7fc3b952ac0 100755
--- a/cpp/test/IceStorm/stress/run.py
+++ b/cpp/test/IceStorm/stress/run.py
@@ -183,7 +183,7 @@ print "ok"
print "Sending 5000 ordered events... ",
sys.stdout.flush()
-status = doTest('--events 5000 --qos "reliability,twoway ordered" ' + iceStormReference, '--events 5000')
+status = doTest('--events 5000 --qos "reliability,ordered" ' + iceStormReference, '--events 5000')
if status:
print "failed!"
TestUtil.killServers()
@@ -192,7 +192,7 @@ print "ok"
print "Sending 5000 ordered events across a link... ",
sys.stdout.flush()
-status = doTest('--events 5000 --qos "reliability,twoway ordered" ' + iceStormReference2, '--events 5000')
+status = doTest('--events 5000 --qos "reliability,ordered" ' + iceStormReference2, '--events 5000')
if status:
TestUtil.killServers()
sys.exit(1)