summaryrefslogtreecommitdiff
path: root/cpp/src
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src')
-rw-r--r--cpp/src/IceGrid/ReplicaCache.cpp4
-rw-r--r--cpp/src/IceGrid/Topics.cpp4
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp67
-rw-r--r--cpp/src/IceStorm/TopicI.cpp50
4 files changed, 73 insertions, 52 deletions
diff --git a/cpp/src/IceGrid/ReplicaCache.cpp b/cpp/src/IceGrid/ReplicaCache.cpp
index 19f32d5b1c5..fa5102a322c 100644
--- a/cpp/src/IceGrid/ReplicaCache.cpp
+++ b/cpp/src/IceGrid/ReplicaCache.cpp
@@ -167,8 +167,8 @@ ReplicaCache::subscribe(const ReplicaObserverPrx& observer)
}
IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
- Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer);
+ qos["reliability"] = "ordered";
+ Ice::ObjectPrx publisher = _topic->subscribeAndGetPublisher(qos, observer->ice_twoway());
ReplicaObserverPrx::uncheckedCast(publisher)->replicaInit(replicas);
}
catch(const Ice::ConnectionRefusedException&)
diff --git a/cpp/src/IceGrid/Topics.cpp b/cpp/src/IceGrid/Topics.cpp
index ff87fae0d72..5e5aa3dfd6d 100644
--- a/cpp/src/IceGrid/Topics.cpp
+++ b/cpp/src/IceGrid/Topics.cpp
@@ -63,8 +63,8 @@ ObserverTopic::subscribe(const Ice::ObjectPrx& obsv, const string& name)
}
IceStorm::QoS qos;
- qos["reliability"] = "twoway ordered";
- initObserver(_topic->subscribeAndGetPublisher(qos, obsv));
+ qos["reliability"] = "ordered";
+ initObserver(_topic->subscribeAndGetPublisher(qos, obsv->ice_twoway()));
_subscribers.insert(obsv->ice_getIdentity());
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 435c9bfdf63..9174eed5949 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -96,10 +96,7 @@ class SubscriberOneway : public Subscriber
{
public:
- SubscriberOneway(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&,
- bool);
+ SubscriberOneway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
//
// Oneway
//
@@ -117,9 +114,7 @@ class SubscriberTwoway : public Subscriber
{
public:
- SubscriberTwoway(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&);
+ SubscriberTwoway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
virtual bool flush();
@@ -135,9 +130,7 @@ class SubscriberTwowayOrdered : public Subscriber
{
public:
- SubscriberTwowayOrdered(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&);
+ SubscriberTwowayOrdered(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
virtual bool flush();
void response();
@@ -152,9 +145,7 @@ class SubscriberLink : public Subscriber
{
public:
- SubscriberLink(const InstancePtr&,
- const TopicLinkPrx&,
- int);
+ SubscriberLink(const InstancePtr&, const TopicLinkPrx&, int);
virtual QueueState queue(bool, const std::vector<EventDataPtr>&);
virtual bool flush();
@@ -178,10 +169,9 @@ typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr;
SubscriberOneway::SubscriberOneway(
const InstancePtr& instance,
const Ice::ObjectPrx& proxy,
- const Ice::ObjectPrx& obj,
- bool batch) :
+ const Ice::ObjectPrx& obj) :
Subscriber(instance, proxy, false, obj->ice_getIdentity()),
- _batch(batch),
+ _batch(obj->ice_isBatchDatagram() || obj->ice_isBatchOneway()),
_obj(obj)
{
//
@@ -196,7 +186,7 @@ SubscriberOneway::SubscriberOneway(
_objBatch = obj->ice_batchOneway();
}
- if(batch)
+ if(_batch)
{
_instance->batchFlusher()->add(_obj);
}
@@ -685,12 +675,16 @@ Subscriber::create(
try
{
- string reliability = "oneway";
+ string reliability;
QoS::const_iterator p = qos.find("reliability");
if(p != qos.end())
{
reliability = p->second;
}
+ if(!reliability.empty() && reliability != "ordered")
+ {
+ throw BadQoS("invalid reliability: " + reliability);
+ }
//
// Override the timeout.
@@ -709,41 +703,22 @@ Subscriber::create(
//
newObj = obj;
}
-
- if(reliability == "batch")
+ if(reliability == "ordered")
{
- if(newObj->ice_isDatagram())
- {
- newObj = newObj->ice_batchDatagram();
- }
- else
+ if(!newObj->ice_isTwoway())
{
- newObj = newObj->ice_batchOneway();
+ throw BadQoS("ordered reliability requires a twoway proxy");
}
- subscriber = new SubscriberOneway(instance, proxy, newObj, true);
- }
- else if(reliability == "twoway")
- {
- newObj = newObj->ice_twoway();
- subscriber = new SubscriberTwoway(instance, proxy, newObj);
+ subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
}
- else if(reliability == "twoway ordered")
+ else if(newObj->ice_isOneway() || newObj->ice_isDatagram() ||
+ newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
{
- newObj = newObj->ice_twoway();
- subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
+ subscriber = new SubscriberOneway(instance, proxy, newObj);
}
- else // reliability == "oneway"
+ else if(newObj->ice_isTwoway())
{
- if(reliability != "oneway" && traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << reliability <<" mode not understood.";
- }
- if(!newObj->ice_isDatagram())
- {
- newObj = newObj->ice_oneway();
- }
- subscriber = new SubscriberOneway(instance, proxy, newObj, false);
+ subscriber = new SubscriberTwoway(instance, proxy, newObj);
}
per->setSubscriber(subscriber);
}
diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp
index dbd80819f11..44fc11b32db 100644
--- a/cpp/src/IceStorm/TopicI.cpp
+++ b/cpp/src/IceStorm/TopicI.cpp
@@ -203,10 +203,11 @@ find(vector<SubscriberPtr>::iterator start, vector<SubscriberPtr>::iterator end,
#endif
void
-TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&)
+TopicI::subscribe(const QoS& origQoS, const Ice::ObjectPrx& obj, const Ice::Current&)
{
Ice::Identity id = obj->ice_getIdentity();
TraceLevelsPtr traceLevels = _instance->traceLevels();
+ QoS qos = origQoS;
if(traceLevels->topic > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->topicCat);
@@ -225,6 +226,51 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&
}
}
+
+ string reliability = "oneway";
+ {
+ QoS::iterator p = qos.find("reliability");
+ if(p != qos.end())
+ {
+ reliability = p->second;
+ qos.erase(p);
+ }
+ }
+
+ Ice::ObjectPrx newObj = obj;
+ if(reliability == "batch")
+ {
+ if(newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_batchDatagram();
+ }
+ else
+ {
+ newObj = newObj->ice_batchOneway();
+ }
+ }
+ else if(reliability == "twoway")
+ {
+ newObj = newObj->ice_twoway();
+ }
+ else if(reliability == "twoway ordered")
+ {
+ qos["reliability"] = "ordered";
+ newObj = newObj->ice_twoway();
+ }
+ else // reliability == "oneway"
+ {
+ if(reliability != "oneway" && traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << reliability <<" mode not understood.";
+ }
+ if(!newObj->ice_isDatagram())
+ {
+ newObj = newObj->ice_oneway();
+ }
+ }
+
IceUtil::Mutex::Lock sync(_subscribersMutex);
vector<SubscriberPtr>::iterator p = find(_subscribers.begin(), _subscribers.end(), id);
if(p != _subscribers.end())
@@ -234,7 +280,7 @@ TopicI::subscribe(const QoS& qos, const Ice::ObjectPrx& obj, const Ice::Current&
_subscribers.erase(p);
}
- SubscriberPtr subscriber = Subscriber::create(_instance, obj, qos);
+ SubscriberPtr subscriber = Subscriber::create(_instance, newObj, qos);
_subscribers.push_back(subscriber);
_instance->subscriberPool()->add(subscriber);
}