summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/TopicI.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/TopicI.cpp')
-rw-r--r--cpp/src/IceStorm/TopicI.cpp50
1 files changed, 48 insertions, 2 deletions
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index dbd80819f11..44fc11b32db 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -203,10 +203,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end,
#endif
void
-TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&)
+TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
{
Ice::Identity id = obj->ice_getIdentity();
TraceLevelsPtr traceLevels = _instance->traceLevels();
+ QoS qos = origQoS;
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -225,6 +226,51 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&
}
}
+
+ string reliability = "oneway";
+ {
+ QoS::iterator p = qos.find("reliability");
+ if(p != qos.end())
+ {
+ reliability = p->second;
+ qos.erase(p);
+ }
+ }
+
+ Ice::ObjectPrx newObj = obj;
+ if(reliability == "batch")
+ {
+ if(newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_batchDatagram();
+ }
+ else
+ {
+ newObj = newObj->ice_batchOneway();
+ }
+ }
+ else if(reliability == "twoway")
+ {
+ newObj = newObj->ice_twoway();
+ }
+ else if(reliability == "twoway ordered")
+ {
+ qos["reliability"] = "ordered";
+ newObj = newObj->ice_twoway();
+ }
+ else // reliability == "oneway"
+ {
+ if(reliability != "oneway" && traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << reliability <<" mode not understood.";
+ }
+ if(!newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_oneway();
+ }
+ }
+
IceUtil::Mutex::Lock sync(_subscribersMutex);
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
@@ -234,7 +280,7 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&
_subscribers.erase(p);
}
- SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
+ SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos);
_subscribers.push_back(subscriber);
_instance->subscriberPool()->add(subscriber);
}