summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp67
1 files changed, 21 insertions, 46 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 435c9bfdf63..9174eed5949 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -96,10 +96,7 @@ class SubscriberOneway : public Subscriber
{
public:
- SubscriberOneway(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&,
- bool);
+ SubscriberOneway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
//
// Oneway
//
@@ -117,9 +114,7 @@ class SubscriberTwoway : public Subscriber
{
public:
- SubscriberTwoway(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&);
+ SubscriberTwoway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
virtual bool flush();
@@ -135,9 +130,7 @@ class SubscriberTwowayOrdered : public Subscriber
{
public:
- SubscriberTwowayOrdered(const InstancePtr&,
- const Ice::ObjectPrx&,
- const Ice::ObjectPrx&);
+ SubscriberTwowayOrdered(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
virtual bool flush();
void response();
@@ -152,9 +145,7 @@ class SubscriberLink : public Subscriber
{
public:
- SubscriberLink(const InstancePtr&,
- const TopicLinkPrx&,
- int);
+ SubscriberLink(const InstancePtr&, const TopicLinkPrx&, int);
virtual QueueState queue(bool, const std::vector<EventDataPtr>&);
virtual bool flush();
@@ -178,10 +169,9 @@ typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr;
SubscriberOneway::SubscriberOneway(
const InstancePtr& instance,
const Ice::ObjectPrx& proxy,
- const Ice::ObjectPrx& obj,
- bool batch) :
+ const Ice::ObjectPrx& obj) :
Subscriber(instance, proxy, false, obj->ice_getIdentity()),
- _batch(batch),
+ _batch(obj->ice_isBatchDatagram() || obj->ice_isBatchOneway()),
_obj(obj)
{
//
@@ -196,7 +186,7 @@ SubscriberOneway::SubscriberOneway(
_objBatch = obj->ice_batchOneway();
}
- if(batch)
+ if(_batch)
{
_instance->batchFlusher()->add(_obj);
}
@@ -685,12 +675,16 @@ Subscriber::create(
try
{
- string reliability = "oneway";
+ string reliability;
QoS::const_iterator p = qos.find("reliability");
if(p != qos.end())
{
reliability = p->second;
}
+ if(!reliability.empty() && reliability != "ordered")
+ {
+ throw BadQoS("invalid reliability: " + reliability);
+ }
//
// Override the timeout.
@@ -709,41 +703,22 @@ Subscriber::create(
//
newObj = obj;
}
-
- if(reliability == "batch")
+ if(reliability == "ordered")
{
- if(newObj->ice_isDatagram())
- {
- newObj = newObj->ice_batchDatagram();
- }
- else
+ if(!newObj->ice_isTwoway())
{
- newObj = newObj->ice_batchOneway();
+ throw BadQoS("ordered reliability requires a twoway proxy");
}
- subscriber = new SubscriberOneway(instance, proxy, newObj, true);
- }
- else if(reliability == "twoway")
- {
- newObj = newObj->ice_twoway();
- subscriber = new SubscriberTwoway(instance, proxy, newObj);
+ subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
}
- else if(reliability == "twoway ordered")
+ else if(newObj->ice_isOneway() || newObj->ice_isDatagram() ||
+ newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
{
- newObj = newObj->ice_twoway();
- subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
+ subscriber = new SubscriberOneway(instance, proxy, newObj);
}
- else // reliability == "oneway"
+ else if(newObj->ice_isTwoway())
{
- if(reliability != "oneway" && traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << reliability <<" mode not understood.";
- }
- if(!newObj->ice_isDatagram())
- {
- newObj = newObj->ice_oneway();
- }
- subscriber = new SubscriberOneway(instance, proxy, newObj, false);
+ subscriber = new SubscriberTwoway(instance, proxy, newObj);
}
per->setSubscriber(subscriber);
}