diff options
author | Matthew Newhook <matthew@zeroc.com> | 2007-01-30 11:04:51 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2007-01-30 11:04:51 +0000 |
commit | fd675fdaff4dfa73678effd765a52209967567c7 (patch) | |
tree | 2943032021c6bf8329da6330b02af24ba6cad6bc /cpp/src | |
parent | cleanup IceStorm tests. (diff) | |
download | ice-fd675fdaff4dfa73678effd765a52209967567c7.tar.bz2 ice-fd675fdaff4dfa73678effd765a52209967567c7.tar.xz ice-fd675fdaff4dfa73678effd765a52209967567c7.zip |
http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=1711
Diffstat (limited to 'cpp/src')
-rw-r--r-- | cpp/src/IceGrid/ReplicaCache.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceGrid/Topics.cpp | 4 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 67 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 50 |
4 files changed, 73 insertions, 52 deletions
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp index 19f32d5b1c5..fa5102a322c 100644 --- a/cpp/src/IceGrid/ReplicaCache.cpp +++ b/cpp/src/IceGrid/ReplicaCache.cpp @@ -167,8 +167,8 @@ ReplicaCache::subscribe(const ReplicaObserverPrx& observer) } IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer); + qos["reliability"] = "ordered"; + Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer->ice_twoway()); ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas); } catch(const Ice::ConnectionRefusedException&) diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp index ff87fae0d72..5e5aa3dfd6d 100644 --- a/cpp/src/IceGrid/Topics.cpp +++ b/cpp/src/IceGrid/Topics.cpp @@ -63,8 +63,8 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name) } IceStorm::QoS qos; - qos["reliability"] = "twoway ordered"; - initObserver(_topic->subscribeAndGetPublisher(qos, obsv)); + qos["reliability"] = "ordered"; + initObserver(_topic->subscribeAndGetPublisher(qos, obsv->ice_twoway())); _subscribers.insert(obsv->ice_getIdentity()); diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 435c9bfdf63..9174eed5949 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -96,10 +96,7 @@ class SubscriberOneway : public Subscriber { public: - SubscriberOneway(const InstancePtr&, - const Ice::ObjectPrx&, - const Ice::ObjectPrx&, - bool); + SubscriberOneway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); // // Oneway // @@ -117,9 +114,7 @@ class SubscriberTwoway : public Subscriber { public: - SubscriberTwoway(const InstancePtr&, - const Ice::ObjectPrx&, - const Ice::ObjectPrx&); + SubscriberTwoway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); virtual bool flush(); @@ -135,9 +130,7 @@ class SubscriberTwowayOrdered : public Subscriber { public: - SubscriberTwowayOrdered(const InstancePtr&, - const Ice::ObjectPrx&, - const Ice::ObjectPrx&); + SubscriberTwowayOrdered(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); virtual bool flush(); void response(); @@ -152,9 +145,7 @@ class SubscriberLink : public Subscriber { public: - SubscriberLink(const InstancePtr&, - const TopicLinkPrx&, - int); + SubscriberLink(const InstancePtr&, const TopicLinkPrx&, int); virtual QueueState queue(bool, const std::vector<EventDataPtr>&); virtual bool flush(); @@ -178,10 +169,9 @@ typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr; SubscriberOneway::SubscriberOneway( const InstancePtr& instance, const Ice::ObjectPrx& proxy, - const Ice::ObjectPrx& obj, - bool batch) : + const Ice::ObjectPrx& obj) : Subscriber(instance, proxy, false, obj->ice_getIdentity()), - _batch(batch), + _batch(obj->ice_isBatchDatagram() || obj->ice_isBatchOneway()), _obj(obj) { // @@ -196,7 +186,7 @@ SubscriberOneway::SubscriberOneway( _objBatch = obj->ice_batchOneway(); } - if(batch) + if(_batch) { _instance->batchFlusher()->add(_obj); } @@ -685,12 +675,16 @@ Subscriber::create( try { - string reliability = "oneway"; + string reliability; QoS::const_iterator p = qos.find("reliability"); if(p != qos.end()) { reliability = p->second; } + if(!reliability.empty() && reliability != "ordered") + { + throw BadQoS("invalid reliability: " + reliability); + } // // Override the timeout. @@ -709,41 +703,22 @@ Subscriber::create( // newObj = obj; } - - if(reliability == "batch") + if(reliability == "ordered") { - if(newObj->ice_isDatagram()) - { - newObj = newObj->ice_batchDatagram(); - } - else + if(!newObj->ice_isTwoway()) { - newObj = newObj->ice_batchOneway(); + throw BadQoS("ordered reliability requires a twoway proxy"); } - subscriber = new SubscriberOneway(instance, proxy, newObj, true); - } - else if(reliability == "twoway") - { - newObj = newObj->ice_twoway(); - subscriber = new SubscriberTwoway(instance, proxy, newObj); + subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); } - else if(reliability == "twoway ordered") + else if(newObj->ice_isOneway() || newObj->ice_isDatagram() || + newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) { - newObj = newObj->ice_twoway(); - subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); + subscriber = new SubscriberOneway(instance, proxy, newObj); } - else // reliability == "oneway" + else if(newObj->ice_isTwoway()) { - if(reliability != "oneway" && traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << reliability <<" mode not understood."; - } - if(!newObj->ice_isDatagram()) - { - newObj = newObj->ice_oneway(); - } - subscriber = new SubscriberOneway(instance, proxy, newObj, false); + subscriber = new SubscriberTwoway(instance, proxy, newObj); } per->setSubscriber(subscriber); } diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index dbd80819f11..44fc11b32db 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -203,10 +203,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, #endif void -TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&) +TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&) { Ice::Identity id = obj->ice_getIdentity(); TraceLevelsPtr traceLevels = _instance->traceLevels(); + QoS qos = origQoS; if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -225,6 +226,51 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& } } + + string reliability = "oneway"; + { + QoS::iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + qos.erase(p); + } + } + + Ice::ObjectPrx newObj = obj; + if(reliability == "batch") + { + if(newObj->ice_isDatagram()) + { + newObj = newObj->ice_batchDatagram(); + } + else + { + newObj = newObj->ice_batchOneway(); + } + } + else if(reliability == "twoway") + { + newObj = newObj->ice_twoway(); + } + else if(reliability == "twoway ordered") + { + qos["reliability"] = "ordered"; + newObj = newObj->ice_twoway(); + } + else // reliability == "oneway" + { + if(reliability != "oneway" && traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << reliability <<" mode not understood."; + } + if(!newObj->ice_isDatagram()) + { + newObj = newObj->ice_oneway(); + } + } + IceUtil::Mutex::Lock sync(_subscribersMutex); vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) @@ -234,7 +280,7 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& _subscribers.erase(p); } - SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); + SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos); _subscribers.push_back(subscriber); _instance->subscriberPool()->add(subscriber); } |