diff options
-rw-r--r-- | cpp/CHANGES | 9 | ||||
-rw-r--r-- | cpp/demo/IceStorm/clock/Publisher.cpp | 42 | ||||
-rw-r--r-- | cpp/demo/IceStorm/clock/Subscriber.cpp | 71 | ||||
-rw-r--r-- | cpp/demo/IceStorm/clock/config.service | 2 | ||||
-rw-r--r-- | cpp/demo/IceStorm/clock/config.sub | 2 | ||||
-rw-r--r-- | cpp/demo/IceStorm/counter/CounterI.cpp | 6 | ||||
-rw-r--r-- | cpp/slice/IceStorm/IceStorm.ice | 24 | ||||
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 67 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 50 | ||||
-rw-r--r-- | cpp/test/IceStorm/federation/.depend | 6 | ||||
-rw-r--r-- | cpp/test/IceStorm/federation/Subscriber.cpp | 24 | ||||
-rw-r--r-- | cpp/test/IceStorm/federation2/Subscriber.cpp | 12 | ||||
-rw-r--r-- | cpp/test/IceStorm/single/Subscriber.cpp | 58 | ||||
-rw-r--r-- | cpp/test/IceStorm/stress/Subscriber.cpp | 52 | ||||
-rwxr-xr-x | cpp/test/IceStorm/stress/run.py | 4 |
17 files changed, 304 insertions, 133 deletions
diff --git a/cpp/CHANGES b/cpp/CHANGES index 2f5ec41aaf3..bf083e5abc1 100644 --- a/cpp/CHANGES +++ b/cpp/CHANGES @@ -249,6 +249,15 @@ Changes since version 3.1.1 is used to send an event to only that particular subscriber. See demo/IceStorm/counter for a demo of this feature. + The possible values of the reliability QoS passed to + subscribeAndGetPublisher are now different. The only allowable value + is either "ordered" or the empty string. The delivery reliability + quality of service used by IceStorm is now derived from the the + proxy mode. For example, if oneway delivery is required then a + oneway proxy must be passed. The "ordered" QoS requires a twoway + proxy otherwise a BadQoS exception is raised. Topic::subscribe has + been deprecated and retains the old subscription QoS semantics. + The property IceStorm.InstanceName is now used to produce unique identities for each IceStorm topic. The identities used are now: diff --git a/cpp/demo/IceStorm/clock/Publisher.cpp b/cpp/demo/IceStorm/clock/Publisher.cpp index ec61a31d7da..51ef4e54c2e 100644 --- a/cpp/demo/IceStorm/clock/Publisher.cpp +++ b/cpp/demo/IceStorm/clock/Publisher.cpp @@ -9,6 +9,7 @@ #include <Ice/Application.h> #include <IceStorm/IceStorm.h> +#include <IceUtil/Options.h> #include <Clock.h> @@ -29,9 +30,31 @@ main(int argc, char* argv[]) return app.main(argc, argv, "config.pub"); } +void +usage(const string& n) +{ + cerr << "Usage: " << n << " [--datagram|--twoway|--oneway] [topic]\n" << endl; +} + int Publisher::run(int argc, char* argv[]) { + IceUtil::Options opts; + opts.addOpt("", "datagram"); + opts.addOpt("", "twoway"); + opts.addOpt("", "oneway"); + + IceUtil::Options::StringVector remaining; + try + { + remaining = opts.parse(argc, argv); + } + catch(const IceUtil::BadOptException& e) + { + cerr << argv[0] << ": " << e.reason << endl; + return EXIT_FAILURE; + } + IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast( communicator()->propertyToProxy("IceStorm.TopicManager.Proxy")); if(!manager) @@ -41,9 +64,9 @@ Publisher::run(int argc, char* argv[]) } string topicName = "time"; - if(argc != 1) + if(!remaining.empty()) { - topicName = argv[1]; + topicName = remaining.front(); } // @@ -71,7 +94,20 @@ Publisher::run(int argc, char* argv[]) // Get the topic's publisher object, the Clock type, and create a // oneway Clock proxy (for efficiency reasons). // - ClockPrx clock = ClockPrx::uncheckedCast(topic->getPublisher()->ice_oneway()); + Ice::ObjectPrx publisher = topic->getPublisher(); + if(opts.isSet("datagram")) + { + publisher = publisher->ice_datagram(); + } + else if(opts.isSet("twoway")) + { + // Do nothing. + } + else + { + publisher = publisher->ice_oneway(); + } + ClockPrx clock = ClockPrx::uncheckedCast(publisher); cout << "publishing tick events. Press ^C to terminate the application." << endl; try diff --git a/cpp/demo/IceStorm/clock/Subscriber.cpp b/cpp/demo/IceStorm/clock/Subscriber.cpp index c08173003ee..01b27870fd4 100644 --- a/cpp/demo/IceStorm/clock/Subscriber.cpp +++ b/cpp/demo/IceStorm/clock/Subscriber.cpp @@ -9,6 +9,7 @@ #include <Ice/Application.h> #include <IceStorm/IceStorm.h> +#include <IceUtil/Options.h> #include <Clock.h> @@ -42,9 +43,34 @@ main(int argc, char* argv[]) return app.main(argc, argv, "config.sub"); } +void +usage(const string& n) +{ + cerr << "Usage: " << n << " [--batch] [--datagram|--twoway|--ordered|--oneway] [topic]\n" << endl; +} + int Subscriber::run(int argc, char* argv[]) { + IceUtil::Options opts; + opts.addOpt("", "datagram"); + opts.addOpt("", "twoway"); + opts.addOpt("", "ordered"); + opts.addOpt("", "oneway"); + opts.addOpt("", "batch"); + + IceUtil::Options::StringVector remaining; + try + { + remaining = opts.parse(argc, argv); + } + catch(const IceUtil::BadOptException& e) + { + cerr << argv[0] << ": " << e.reason << endl; + usage(appName()); + return EXIT_FAILURE; + } + IceStorm::TopicManagerPrx manager = IceStorm::TopicManagerPrx::checkedCast( communicator()->propertyToProxy("IceStorm.TopicManager.Proxy")); if(!manager) @@ -54,9 +80,9 @@ Subscriber::run(int argc, char* argv[]) } string topicName = "time"; - if(argc != 1) + if(!remaining.empty()) { - topicName = argv[1]; + topicName = remaining.front(); } IceStorm::TopicPrx topic; @@ -82,15 +108,46 @@ Subscriber::run(int argc, char* argv[]) // // Add a Servant for the Ice Object. // + IceStorm::QoS qos; Ice::ObjectPrx subscriber = adapter->addWithUUID(new ClockI); - // - // This demo requires no quality of service, so it will use - // the defaults. + // Set up the proxy. // - IceStorm::QoS qos; + if(opts.isSet("datagram")) + { + subscriber = subscriber->ice_datagram(); + } + else if(opts.isSet("twoway")) + { + // Do nothing. + } + else if(opts.isSet("ordered")) + { + qos["reliability"] = "ordered"; + subscriber = subscriber->ice_datagram(); + } + else // default. + { + subscriber = subscriber->ice_oneway(); + } + if(opts.isSet("batch")) + { + if(opts.isSet("twoway") || opts.isSet("ordered")) + { + cerr << appName() << ": batch can only be set with oneway or datagram" << endl; + return EXIT_FAILURE; + } + if(opts.isSet("datagram")) + { + subscriber = subscriber->ice_batchDatagram(); + } + else + { + subscriber = subscriber->ice_batchOneway(); + } + } - topic->subscribe(qos, subscriber); + topic->subscribeAndGetPublisher(qos, subscriber); adapter->activate(); shutdownOnInterrupt(); diff --git a/cpp/demo/IceStorm/clock/config.service b/cpp/demo/IceStorm/clock/config.service index 4f1edc9ad3f..0ed60e4cb34 100644 --- a/cpp/demo/IceStorm/clock/config.service +++ b/cpp/demo/IceStorm/clock/config.service @@ -20,7 +20,7 @@ IceStorm.InstanceName=DemoIceStorm # IceStorm instances this must run on a fixed port (or use # IceGrid). # -Ice.OA.IceStorm.Publish.Endpoints=default -p 10001 +Ice.OA.IceStorm.Publish.Endpoints=tcp -p 10001:udp -p 10001 # # TopicManager Tracing diff --git a/cpp/demo/IceStorm/clock/config.sub b/cpp/demo/IceStorm/clock/config.sub index cb6b50d94f0..4f60a8ca7c1 100644 --- a/cpp/demo/IceStorm/clock/config.sub +++ b/cpp/demo/IceStorm/clock/config.sub @@ -2,7 +2,7 @@ # This property is used to configure the endpoints of the clock # subscriber adapter. # -Ice.OA.Clock.Subscriber.Endpoints=tcp +Ice.OA.Clock.Subscriber.Endpoints=tcp:udp # # This property is used by the clients to connect to IceStorm. diff --git a/cpp/demo/IceStorm/counter/CounterI.cpp b/cpp/demo/IceStorm/counter/CounterI.cpp index 2b59ac5f460..9391cec441f 100644 --- a/cpp/demo/IceStorm/counter/CounterI.cpp +++ b/cpp/demo/IceStorm/counter/CounterI.cpp @@ -26,15 +26,13 @@ CounterI::subscribe(const CounterObserverPrx& observer, const Ice::Current&) { Lock sync(*this); - IceStorm::QoS qos; - qos["reliability"] = "twoway"; - // // Subscribe to the IceStorm topic. This returns a per-subscriber // object which is then used to send the initialize event to just // the given subscriber. // - CounterObserverPrx o = CounterObserverPrx::uncheckedCast(_topic->subscribeAndGetPublisher(qos, observer)); + CounterObserverPrx o = CounterObserverPrx::uncheckedCast( + _topic->subscribeAndGetPublisher(IceStorm::QoS(), observer)); o->init(_value); } diff --git a/cpp/slice/IceStorm/IceStorm.ice b/cpp/slice/IceStorm/IceStorm.ice index f136f4a578f..f9a543f770a 100644 --- a/cpp/slice/IceStorm/IceStorm.ice +++ b/cpp/slice/IceStorm/IceStorm.ice @@ -115,6 +115,22 @@ exception AlreadySubscribed /** * + * This exception indicates that a subscription failed due to an + * invalid QoS. + * + **/ +exception BadQoS +{ + /* + * + * The reason for the failed. + * + */ + string reason; +}; + +/** + * * Publishers publish information on a particular topic. A topic * logically represents a type. * @@ -153,6 +169,8 @@ interface Topic * replaced. Note that this can cause a loss of events to the * subscribed object. * + * <p class="Deprecated">This operation is deprecated as of version 3.2. + * * @param qos The quality of service parameters for this * subscription. * @@ -163,6 +181,7 @@ interface Topic * @see unsubscribe * **/ + ["deprecate:subscribe is deprecated, use subscribeAndGetPublisher instead"] void subscribe(QoS theQoS, Object* subscriber); /** @@ -181,11 +200,14 @@ interface Topic * @throws AlreadySubscribed Raised if the subscriber object is * already subscribed. * + * @throws BadQoS Raised if the requested quality of service + * is unavailable or invalid. + * * @see unsubscribe * **/ Object* subscribeAndGetPublisher(QoS theQoS, Object* subscriber) - throws AlreadySubscribed; + throws AlreadySubscribed, BadQoS; /** * diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index 19f32d5b1c5..fa5102a322c 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -167,8 +167,8 @@ ReplicaCache::subscribe(const ReplicaObserverPrx& observer) } IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer); + qos["reliability"] = "ordered"; + Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer->ice_twoway()); ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas); } catch(const Ice::ConnectionRefusedException&) diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index ff87fae0d72..5e5aa3dfd6d 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -63,8 +63,8 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) } IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - initObserver(_topic->subscribeAndGetPublisher(qos, obsv)); + qos["reliability"] = "ordered"; + initObserver(_topic->subscribeAndGetPublisher(qos, obsv->ice_twoway())); _subscribers.insert(obsv->ice_getIdentity()); diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 435c9bfdf63..9174eed5949 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -96,10 +96,7 @@ class SubscriberOneway : public Subscriber { public: - SubscriberOneway(const InstancePtr&, - const Ice::ObjectPrx&, - const Ice::ObjectPrx&, - bool); + SubscriberOneway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); // // Oneway // @@ -117,9 +114,7 @@ class SubscriberTwoway : public Subscriber { public: - SubscriberTwoway(const InstancePtr&, - const Ice::ObjectPrx&, - const Ice::ObjectPrx&); + SubscriberTwoway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); virtual bool flush(); @@ -135,9 +130,7 @@ class SubscriberTwowayOrdered : public Subscriber { public: - SubscriberTwowayOrdered(const InstancePtr&, - const Ice::ObjectPrx&, - const Ice::ObjectPrx&); + SubscriberTwowayOrdered(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); virtual bool flush(); void response(); @@ -152,9 +145,7 @@ class SubscriberLink : public Subscriber { public: - SubscriberLink(const InstancePtr&, - const TopicLinkPrx&, - int); + SubscriberLink(const InstancePtr&, const TopicLinkPrx&, int); virtual QueueState queue(bool, const std::vector<EventDataPtr>&); virtual bool flush(); @@ -178,10 +169,9 @@ typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr; SubscriberOneway::SubscriberOneway( const InstancePtr& instance, const Ice::ObjectPrx& proxy, - const Ice::ObjectPrx& obj, - bool batch) : + const Ice::ObjectPrx& obj) : Subscriber(instance, proxy, false, obj->ice_getIdentity()), - _batch(batch), + _batch(obj->ice_isBatchDatagram() || obj->ice_isBatchOneway()), _obj(obj) { // @@ -196,7 +186,7 @@ SubscriberOneway::SubscriberOneway( _objBatch = obj->ice_batchOneway(); } - if(batch) + if(_batch) { _instance->batchFlusher()->add(_obj); } @@ -685,12 +675,16 @@ Subscriber::create( try { - string reliability = "oneway"; + string reliability; QoS::const_iterator p = qos.find("reliability"); if(p != qos.end()) { reliability = p->second; } + if(!reliability.empty() && reliability != "ordered") + { + throw BadQoS("invalid reliability: " + reliability); + } // // Override the timeout. @@ -709,41 +703,22 @@ Subscriber::create( // newObj = obj; } - - if(reliability == "batch") + if(reliability == "ordered") { - if(newObj->ice_isDatagram()) - { - newObj = newObj->ice_batchDatagram(); - } - else + if(!newObj->ice_isTwoway()) { - newObj = newObj->ice_batchOneway(); + throw BadQoS("ordered reliability requires a twoway proxy"); } - subscriber = new SubscriberOneway(instance, proxy, newObj, true); - } - else if(reliability == "twoway") - { - newObj = newObj->ice_twoway(); - subscriber = new SubscriberTwoway(instance, proxy, newObj); + subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); } - else if(reliability == "twoway ordered") + else if(newObj->ice_isOneway() || newObj->ice_isDatagram() || + newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) { - newObj = newObj->ice_twoway(); - subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); + subscriber = new SubscriberOneway(instance, proxy, newObj); } - else // reliability == "oneway" + else if(newObj->ice_isTwoway()) { - if(reliability != "oneway" && traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << reliability <<" mode not understood."; - } - if(!newObj->ice_isDatagram()) - { - newObj = newObj->ice_oneway(); - } - subscriber = new SubscriberOneway(instance, proxy, newObj, false); + subscriber = new SubscriberTwoway(instance, proxy, newObj); } per->setSubscriber(subscriber); } diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index dbd80819f11..44fc11b32db 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -203,10 +203,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, #endif void -TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&) +TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&) { Ice::Identity id = obj->ice_getIdentity(); TraceLevelsPtr traceLevels = _instance->traceLevels(); + QoS qos = origQoS; if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -225,6 +226,51 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& } } + + string reliability = "oneway"; + { + QoS::iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + qos.erase(p); + } + } + + Ice::ObjectPrx newObj = obj; + if(reliability == "batch") + { + if(newObj->ice_isDatagram()) + { + newObj = newObj->ice_batchDatagram(); + } + else + { + newObj = newObj->ice_batchOneway(); + } + } + else if(reliability == "twoway") + { + newObj = newObj->ice_twoway(); + } + else if(reliability == "twoway ordered") + { + qos["reliability"] = "ordered"; + newObj = newObj->ice_twoway(); + } + else // reliability == "oneway" + { + if(reliability != "oneway" && traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << reliability <<" mode not understood."; + } + if(!newObj->ice_isDatagram()) + { + newObj = newObj->ice_oneway(); + } + } + IceUtil::Mutex::Lock sync(_subscribersMutex); vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) @@ -234,7 +280,7 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& _subscribers.erase(p); } - SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); + SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos); _subscribers.push_back(subscriber); _instance->subscriberPool()->add(subscriber); } diff --git a/cpp/test/IceStorm/federation/.depend b/cpp/test/IceStorm/federation/.depend index 6478eb332d0..e72f0fe7a33 100644 --- a/cpp/test/IceStorm/federation/.depend +++ b/cpp/test/IceStorm/federation/.depend @@ -1,5 +1,5 @@ -Event$(OBJEXT): Event.cpp ./Event.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/Ice/Proxy.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/StreamF.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/InstanceF.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/Ice/StringConverter.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/Direct.h ../../../include/Ice/LocalException.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/ObjectFactory.h ../../../include/IceUtil/Iterator.h ../../../include/IceUtil/ScopedArray.h -Publisher$(OBJEXT): Publisher.cpp ../../../include/Ice/Ice.h ../../../include/Ice/Initialize.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/PropertiesF.h ../../../include/Ice/InstanceF.h ../../../include/Ice/LoggerF.h ../../../include/Ice/StreamF.h ../../../include/Ice/StatsF.h ../../../include/Ice/StringConverter.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/Proxy.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/LocalException.h ../../../include/Ice/Properties.h ../../../include/Ice/Logger.h ../../../include/Ice/LoggerUtil.h ../../../include/Ice/Stats.h ../../../include/Ice/Communicator.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/RouterF.h ../../../include/Ice/LocatorF.h ../../../include/Ice/PluginF.h ../../../include/Ice/ImplicitContextF.h ../../../include/Ice/ObjectFactory.h ../../../include/Ice/ObjectAdapter.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/OutgoingAsync.h ../../../include/IceUtil/RecMutex.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/IncomingAsync.h ../../../include/Ice/Direct.h ../../../include/Ice/UserExceptionFactory.h ../../../include/Ice/FactoryTable.h ../../../include/Ice/FactoryTableDef.h ../../../include/IceUtil/StaticMutex.h ../../../include/Ice/UserExceptionFactoryF.h ../../../include/Ice/FacetMap.h ../../../include/Ice/Locator.h ../../../include/Ice/ProcessF.h ../../../include/Ice/ServantLocator.h ../../../include/Ice/Process.h ../../../include/Ice/Application.h ../../../include/Ice/Connection.h ../../../include/Ice/Functional.h ../../../include/IceUtil/Functional.h ../../../include/Ice/Stream.h ../../../include/Ice/ImplicitContext.h ../../../include/IceStorm/IceStorm.h ../../../include/Ice/SliceChecksumDict.h ./Event.h -Subscriber$(OBJEXT): Subscriber.cpp ../../../include/IceUtil/DisableWarnings.h ../../../include/Ice/Ice.h ../../../include/Ice/Initialize.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/PropertiesF.h ../../../include/Ice/InstanceF.h ../../../include/Ice/LoggerF.h ../../../include/Ice/StreamF.h ../../../include/Ice/StatsF.h ../../../include/Ice/StringConverter.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/Proxy.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/LocalException.h ../../../include/Ice/Properties.h ../../../include/Ice/Logger.h ../../../include/Ice/LoggerUtil.h ../../../include/Ice/Stats.h ../../../include/Ice/Communicator.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/RouterF.h ../../../include/Ice/LocatorF.h ../../../include/Ice/PluginF.h ../../../include/Ice/ImplicitContextF.h ../../../include/Ice/ObjectFactory.h ../../../include/Ice/ObjectAdapter.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/OutgoingAsync.h ../../../include/IceUtil/RecMutex.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/IncomingAsync.h ../../../include/Ice/Direct.h ../../../include/Ice/UserExceptionFactory.h ../../../include/Ice/FactoryTable.h ../../../include/Ice/FactoryTableDef.h ../../../include/IceUtil/StaticMutex.h ../../../include/Ice/UserExceptionFactoryF.h ../../../include/Ice/FacetMap.h ../../../include/Ice/Locator.h ../../../include/Ice/ProcessF.h ../../../include/Ice/ServantLocator.h ../../../include/Ice/Process.h ../../../include/Ice/Application.h ../../../include/Ice/Connection.h ../../../include/Ice/Functional.h ../../../include/IceUtil/Functional.h ../../../include/Ice/Stream.h ../../../include/Ice/ImplicitContext.h ../../../include/IceStorm/IceStorm.h ../../../include/Ice/SliceChecksumDict.h ./Event.h ../../include/TestCommon.h +Event$(OBJEXT): Event.cpp Event.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/Proxy.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/StreamF.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/InstanceF.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/Ice/StringConverter.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/Direct.h ../../../include/Ice/LocalException.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/ObjectFactory.h ../../../include/IceUtil/Iterator.h ../../../include/IceUtil/ScopedArray.h +Publisher$(OBJEXT): Publisher.cpp ../../../include/Ice/Ice.h ../../../include/Ice/Initialize.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/PropertiesF.h ../../../include/Ice/InstanceF.h ../../../include/Ice/LoggerF.h ../../../include/Ice/StreamF.h ../../../include/Ice/StatsF.h ../../../include/Ice/StringConverter.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/Proxy.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/LocalException.h ../../../include/Ice/Properties.h ../../../include/Ice/Logger.h ../../../include/Ice/LoggerUtil.h ../../../include/Ice/Stats.h ../../../include/Ice/Communicator.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/RouterF.h ../../../include/Ice/LocatorF.h ../../../include/Ice/PluginF.h ../../../include/Ice/ImplicitContextF.h ../../../include/Ice/ObjectFactory.h ../../../include/Ice/ObjectAdapter.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/OutgoingAsync.h ../../../include/IceUtil/RecMutex.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/IncomingAsync.h ../../../include/Ice/Direct.h ../../../include/Ice/UserExceptionFactory.h ../../../include/Ice/FactoryTable.h ../../../include/Ice/FactoryTableDef.h ../../../include/IceUtil/StaticMutex.h ../../../include/Ice/UserExceptionFactoryF.h ../../../include/Ice/FacetMap.h ../../../include/Ice/Locator.h ../../../include/Ice/ProcessF.h ../../../include/Ice/ServantLocator.h ../../../include/Ice/Process.h ../../../include/Ice/Application.h ../../../include/Ice/Connection.h ../../../include/Ice/Functional.h ../../../include/IceUtil/Functional.h ../../../include/Ice/Stream.h ../../../include/Ice/ImplicitContext.h ../../../include/IceStorm/IceStorm.h ../../../include/Ice/SliceChecksumDict.h Event.h +Subscriber$(OBJEXT): Subscriber.cpp ../../../include/IceUtil/DisableWarnings.h ../../../include/Ice/Ice.h ../../../include/Ice/Initialize.h ../../../include/Ice/CommunicatorF.h ../../../include/Ice/LocalObjectF.h ../../../include/Ice/Handle.h ../../../include/IceUtil/Handle.h ../../../include/IceUtil/Exception.h ../../../include/IceUtil/Config.h ../../../include/Ice/Config.h ../../../include/Ice/ProxyHandle.h ../../../include/Ice/ProxyF.h ../../../include/Ice/ObjectF.h ../../../include/Ice/GCCountMap.h ../../../include/Ice/Exception.h ../../../include/Ice/LocalObject.h ../../../include/IceUtil/Shared.h ../../../include/IceUtil/Mutex.h ../../../include/IceUtil/Lock.h ../../../include/IceUtil/ThreadException.h ../../../include/Ice/UndefSysMacros.h ../../../include/Ice/PropertiesF.h ../../../include/Ice/InstanceF.h ../../../include/Ice/LoggerF.h ../../../include/Ice/StreamF.h ../../../include/Ice/StatsF.h ../../../include/Ice/StringConverter.h ../../../include/Ice/BuiltinSequences.h ../../../include/Ice/Proxy.h ../../../include/Ice/ProxyFactoryF.h ../../../include/Ice/ConnectionIF.h ../../../include/Ice/EndpointIF.h ../../../include/Ice/Endpoint.h ../../../include/Ice/ObjectAdapterF.h ../../../include/Ice/ReferenceF.h ../../../include/Ice/OutgoingAsyncF.h ../../../include/Ice/Current.h ../../../include/Ice/ConnectionF.h ../../../include/Ice/Identity.h ../../../include/Ice/LocalException.h ../../../include/Ice/Properties.h ../../../include/Ice/Logger.h ../../../include/Ice/LoggerUtil.h ../../../include/Ice/Stats.h ../../../include/Ice/Communicator.h ../../../include/Ice/ObjectFactoryF.h ../../../include/Ice/RouterF.h ../../../include/Ice/LocatorF.h ../../../include/Ice/PluginF.h ../../../include/Ice/ImplicitContextF.h ../../../include/Ice/ObjectFactory.h ../../../include/Ice/ObjectAdapter.h ../../../include/Ice/Object.h ../../../include/Ice/GCShared.h ../../../include/Ice/IncomingAsyncF.h ../../../include/Ice/Outgoing.h ../../../include/IceUtil/Monitor.h ../../../include/IceUtil/Cond.h ../../../include/IceUtil/Time.h ../../../include/Ice/BasicStream.h ../../../include/Ice/Buffer.h ../../../include/Ice/Protocol.h ../../../include/IceUtil/Unicode.h ../../../include/Ice/OutgoingAsync.h ../../../include/IceUtil/RecMutex.h ../../../include/Ice/Incoming.h ../../../include/Ice/ServantLocatorF.h ../../../include/Ice/ServantManagerF.h ../../../include/Ice/IncomingAsync.h ../../../include/Ice/Direct.h ../../../include/Ice/UserExceptionFactory.h ../../../include/Ice/FactoryTable.h ../../../include/Ice/FactoryTableDef.h ../../../include/IceUtil/StaticMutex.h ../../../include/Ice/UserExceptionFactoryF.h ../../../include/Ice/FacetMap.h ../../../include/Ice/Locator.h ../../../include/Ice/ProcessF.h ../../../include/Ice/ServantLocator.h ../../../include/Ice/Process.h ../../../include/Ice/Application.h ../../../include/Ice/Connection.h ../../../include/Ice/Functional.h ../../../include/IceUtil/Functional.h ../../../include/Ice/Stream.h ../../../include/Ice/ImplicitContext.h ../../../include/IceStorm/IceStorm.h ../../../include/Ice/SliceChecksumDict.h Event.h ../../include/TestCommon.h Event.cpp: Event.ice Event.ice: $(SLICE2CPP) $(SLICEPARSERLIB) diff --git a/cpp/test/IceStorm/federation/Subscriber.cpp b/cpp/test/IceStorm/federation/Subscriber.cpp index 181d1e839e1..c6c3efc8578 100644 --- a/cpp/test/IceStorm/federation/Subscriber.cpp +++ b/cpp/test/IceStorm/federation/Subscriber.cpp @@ -14,14 +14,6 @@ #include <TestCommon.h> -#include <fcntl.h> -#ifdef _WIN32 -# include <io.h> -#else -# include <sys/types.h> -# include <sys/stat.h> -#endif - using namespace std; using namespace Ice; using namespace IceStorm; @@ -136,7 +128,15 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) IceStorm::QoS qos; if(batch) { - qos["reliability"] = "batch"; + objFed1 = objFed1->ice_batchOneway(); + objFed2 = objFed1->ice_batchOneway(); + objFed3 = objFed1->ice_batchOneway(); + } + else + { + objFed1 = objFed1->ice_oneway(); + objFed2 = objFed1->ice_oneway(); + objFed3 = objFed1->ice_oneway(); } TopicPrx fed1; @@ -155,9 +155,9 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) return EXIT_FAILURE; } - fed1->subscribe(qos, objFed1); - fed2->subscribe(qos, objFed2); - fed3->subscribe(qos, objFed3); + fed1->subscribeAndGetPublisher(qos, objFed1); + fed2->subscribeAndGetPublisher(qos, objFed2); + fed3->subscribeAndGetPublisher(qos, objFed3); communicator->waitForShutdown(); diff --git a/cpp/test/IceStorm/federation2/Subscriber.cpp b/cpp/test/IceStorm/federation2/Subscriber.cpp index 0cbc16d779b..88a4408c54d 100644 --- a/cpp/test/IceStorm/federation2/Subscriber.cpp +++ b/cpp/test/IceStorm/federation2/Subscriber.cpp @@ -125,14 +125,18 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) // // Activate the servants. // - ObjectPrx objFed1 = adapter->addWithUUID(eventFed1); + ObjectPrx obj = adapter->addWithUUID(eventFed1); adapter->activate(); IceStorm::QoS qos; if(batch) { - qos["reliability"] = "batch"; + obj = obj->ice_batchOneway(); + } + else + { + obj = obj->ice_oneway(); } TopicPrx fed1; @@ -147,11 +151,11 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) return EXIT_FAILURE; } - fed1->subscribe(qos, objFed1); + fed1->subscribeAndGetPublisher(qos, obj); communicator->waitForShutdown(); - fed1->unsubscribe(objFed1); + fed1->unsubscribe(obj); return EXIT_SUCCESS; } diff --git a/cpp/test/IceStorm/single/Subscriber.cpp b/cpp/test/IceStorm/single/Subscriber.cpp index 2118f49b2b0..c0c8e1b4f93 100644 --- a/cpp/test/IceStorm/single/Subscriber.cpp +++ b/cpp/test/IceStorm/single/Subscriber.cpp @@ -13,14 +13,6 @@ #include <Single.h> #include <TestCommon.h> -#include <fcntl.h> -#ifdef _WIN32 -# include <io.h> -#else -# include <sys/types.h> -# include <sys/stat.h> -#endif - using namespace std; using namespace Ice; using namespace IceStorm; @@ -30,26 +22,34 @@ class SingleI : public Single, public IceUtil::Monitor<IceUtil::Mutex> { public: - SingleI(const CommunicatorPtr& communicator, const string& name, bool ordered = false) : + SingleI(const CommunicatorPtr& communicator, const string& name) : _communicator(communicator), _name(name), _count(0), - _ordered(ordered), _last(0) { } - virtual void event(int i, const Current&) + virtual void + event(int i, const Current& current) { - Lock sync(*this); - if(_ordered && i != _last) + if((_name == "default" || _name == "oneway" || _name == "batch") && current.requestId != 0) + { + cerr << endl << "expected oneway request"; + test(false); + } + else if((_name == "twoway" || _name == "twoway ordered") && current.requestId == 0) + { + cerr << endl << "expected twoway request"; + } + if(_name == "twoway ordered" && i != _last) { cerr << endl << "received unordered event for `" << _name << "': " << i << " " << _last; test(false); } + Lock sync(*this); ++_last; - if(++_count == 1000) { notify(); @@ -118,6 +118,9 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) // Create subscribers with different QoS. // vector<SingleIPtr> subscribers; + // + // First we use the old deprecated API. + // { subscribers.push_back(new SingleI(communicator, "default")); topic->subscribe(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())); @@ -141,11 +144,36 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) topic->subscribe(qos, adapter->addWithUUID(subscribers.back())); } { - subscribers.push_back(new SingleI(communicator, "twoway ordered", true)); // Ordered + subscribers.push_back(new SingleI(communicator, "twoway ordered")); // Ordered IceStorm::QoS qos; qos["reliability"] = "twoway ordered"; topic->subscribe(qos, adapter->addWithUUID(subscribers.back())); } + // + // Next we use the new API call with the new proxy semantics. + // + { + subscribers.push_back(new SingleI(communicator, "default")); + topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_oneway()); + } + { + subscribers.push_back(new SingleI(communicator, "oneway")); + topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_oneway()); + } + { + subscribers.push_back(new SingleI(communicator, "twoway")); + topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())); + } + { + subscribers.push_back(new SingleI(communicator, "batch")); + topic->subscribeAndGetPublisher(IceStorm::QoS(), adapter->addWithUUID(subscribers.back())->ice_batchOneway()); + } + { + subscribers.push_back(new SingleI(communicator, "twoway ordered")); // Ordered + IceStorm::QoS qos; + qos["reliability"] = "ordered"; + topic->subscribeAndGetPublisher(qos, adapter->addWithUUID(subscribers.back())); + } adapter->activate(); diff --git a/cpp/test/IceStorm/stress/Subscriber.cpp b/cpp/test/IceStorm/stress/Subscriber.cpp index 3bf45842cec..313fb47c485 100644 --- a/cpp/test/IceStorm/stress/Subscriber.cpp +++ b/cpp/test/IceStorm/stress/Subscriber.cpp @@ -18,14 +18,6 @@ #include <TestCommon.h> -#include <fcntl.h> -#ifdef _WIN32 -# include <io.h> -#else -# include <sys/types.h> -# include <sys/stat.h> -#endif - using namespace std; using namespace Ice; using namespace IceStorm; @@ -174,20 +166,6 @@ private: IceUtil::StaticMutex ErraticEventI::_remainingMutex = ICE_STATIC_MUTEX_INITIALIZER; int ErraticEventI::_remaining = 0; -void -usage(const char* appName) -{ - cerr << "Usage: " << appName << " [options]\n"; - cerr << - "Options:\n" - "-h, --help Show this message.\n" - "--events <e> Terminate after <e> are received.\n" - "--qos <key>,<value><e> Subscribe with this QoS.\n" - "--erratic <n> Add <n> erratic subscribers.\n" - "--slow The subscribers sleeps 3 seconds after each event.\n" - ; -} - struct Subscription { Ice::ObjectAdapterPtr adapter; @@ -277,6 +255,7 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) ObjectAdapterPtr adapter = communicator->createObjectAdapterWithEndpoints("SubscriberAdapter", "default"); + string reliability = ""; EventIPtr servant; if(erratic) { @@ -297,15 +276,20 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) } else { - map<string, string>::const_iterator reliability = qos.find("reliability"); - if(reliability != qos.end()) + map<string, string>::iterator p = qos.find("reliability"); + if(p != qos.end()) { - if(reliability->second == "twoway ordered") + reliability = p->second; + if(reliability != "ordered") { - servant = new OrderEventI(communicator, events); + qos.erase(p); } } - if(!servant) + if(reliability == "ordered") + { + servant = new OrderEventI(communicator, events); + } + else { servant = new CountEventI(communicator, events); } @@ -338,8 +322,20 @@ run(int argc, char* argv[], const CommunicatorPtr& communicator) for(vector<Subscription>::iterator p = subs.begin(); p != subs.end(); ++p) { p->obj = p->adapter->addWithUUID(p->servant); + if(reliability == "twoway" || reliability == "ordered") + { + // Do nothing. + } + else if(reliability == "batch") + { + p->obj = p->obj->ice_batchOneway(); + } + else //if(reliability == "oneway") + { + p->obj = p->obj->ice_oneway(); + } p->adapter->activate(); - topic->subscribe(qos, p->obj); + topic->subscribeAndGetPublisher(qos, p->obj); } } diff --git a/cpp/test/IceStorm/stress/run.py b/cpp/test/IceStorm/stress/run.py index 5da53f11c35..7fc3b952ac0 100755 --- a/cpp/test/IceStorm/stress/run.py +++ b/cpp/test/IceStorm/stress/run.py @@ -183,7 +183,7 @@ print "ok" print "Sending 5000 ordered events... ", sys.stdout.flush() -status = doTest('--events 5000 --qos "reliability,twoway ordered" ' + iceStormReference, '--events 5000') +status = doTest('--events 5000 --qos "reliability,ordered" ' + iceStormReference, '--events 5000') if status: print "failed!" TestUtil.killServers() @@ -192,7 +192,7 @@ print "ok" print "Sending 5000 ordered events across a link... ", sys.stdout.flush() -status = doTest('--events 5000 --qos "reliability,twoway ordered" ' + iceStormReference2, '--events 5000') +status = doTest('--events 5000 --qos "reliability,ordered" ' + iceStormReference2, '--events 5000') if status: TestUtil.killServers() sys.exit(1) |