diff options
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 50 |
1 files changed, 48 insertions, 2 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index dbd80819f11..44fc11b32db 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -203,10 +203,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end, #endif void -TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&) +TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&) { Ice::Identity id = obj->ice_getIdentity(); TraceLevelsPtr traceLevels = _instance->traceLevels(); + QoS qos = origQoS; if(traceLevels->topic > 0) { Ice::Trace out(traceLevels->logger, traceLevels->topicCat); @@ -225,6 +226,51 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& } } + + string reliability = "oneway"; + { + QoS::iterator p = qos.find("reliability"); + if(p != qos.end()) + { + reliability = p->second; + qos.erase(p); + } + } + + Ice::ObjectPrx newObj = obj; + if(reliability == "batch") + { + if(newObj->ice_isDatagram()) + { + newObj = newObj->ice_batchDatagram(); + } + else + { + newObj = newObj->ice_batchOneway(); + } + } + else if(reliability == "twoway") + { + newObj = newObj->ice_twoway(); + } + else if(reliability == "twoway ordered") + { + qos["reliability"] = "ordered"; + newObj = newObj->ice_twoway(); + } + else // reliability == "oneway" + { + if(reliability != "oneway" && traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << reliability <<" mode not understood."; + } + if(!newObj->ice_isDatagram()) + { + newObj = newObj->ice_oneway(); + } + } + IceUtil::Mutex::Lock sync(_subscribersMutex); vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id); if(p != _subscribers.end()) @@ -234,7 +280,7 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current& _subscribers.erase(p); } - SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos); + SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos); _subscribers.push_back(subscriber); _instance->subscriberPool()->add(subscriber); } |