summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp504
1 files changed, 252 insertions, 252 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 9174eed5949..3162116566a 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -37,7 +37,7 @@ class PerSubscriberPublisherI : public Ice::BlobjectArray
public:
PerSubscriberPublisherI(const InstancePtr& instance) :
- _instance(instance)
+ _instance(instance)
{
}
@@ -48,36 +48,36 @@ public:
void
setSubscriber(const SubscriberPtr& subscriber)
{
- _subscriber = subscriber;
+ _subscriber = subscriber;
}
virtual bool
ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
- vector<Ice::Byte>&,
- const Ice::Current& current)
+ vector<Ice::Byte>&,
+ const Ice::Current& current)
{
- EventDataPtr event = new EventData(
- current.operation,
- current.mode,
- Ice::ByteSeq(),
- current.ctx);
+ EventDataPtr event = new EventData(
+ current.operation,
+ current.mode,
+ Ice::ByteSeq(),
+ current.ctx);
- //
- // COMPILERBUG: gcc 4.0.1 doesn't like this.
- //
- //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
- Ice::ByteSeq data(inParams.first, inParams.second);
- event->data.swap(data);
+ //
+ // COMPILERBUG: gcc 4.0.1 doesn't like this.
+ //
+ //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ Ice::ByteSeq data(inParams.first, inParams.second);
+ event->data.swap(data);
- EventDataSeq e;
- e.push_back(event);
- Subscriber::QueueState state = _subscriber->queue(false, e);
+ EventDataSeq e;
+ e.push_back(event);
+ Subscriber::QueueState state = _subscriber->queue(false, e);
- if(state == Subscriber::QueueStateFlush)
- {
- _instance->subscriberPool()->flush(_subscriber);
- }
- return true;
+ if(state == Subscriber::QueueStateFlush)
+ {
+ _instance->subscriberPool()->flush(_subscriber);
+ }
+ return true;
}
private:
@@ -188,7 +188,7 @@ SubscriberOneway::SubscriberOneway(
if(_batch)
{
- _instance->batchFlusher()->add(_obj);
+ _instance->batchFlusher()->add(_obj);
}
}
@@ -202,66 +202,66 @@ SubscriberOneway::flush()
//
if(_state == SubscriberStateError)
{
- return false;
+ return false;
}
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
try
{
- //
- // Get the current set of events, but release the lock before
- // attempting to deliver the events. This allows other threads
- // to add events in case we block (such as during connection
- // establishment).
- //
- EventDataSeq v;
- v.swap(_events);
- sync.release();
-
- //
- // Deliver the events without holding the lock.
- //
- // If there are more than one event queued and we are not in
- // batch sending mode then send the events as a batch and then
- // flush immediately, otherwise send one at a time.
- //
- vector<Ice::Byte> dummy;
- if(v.size() > 1 && !_batch)
- {
- for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
- {
- _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
- }
- Ice::ConnectionPtr conn = _objBatch->ice_getCachedConnection();
- assert(conn);
- conn->flushBatchRequests();
- }
- else
- {
- for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
- {
- _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
- }
- }
-
- //
- // Reacquire the lock before we check the queue again.
- //
- sync.acquire();
+ //
+ // Get the current set of events, but release the lock before
+ // attempting to deliver the events. This allows other threads
+ // to add events in case we block (such as during connection
+ // establishment).
+ //
+ EventDataSeq v;
+ v.swap(_events);
+ sync.release();
+
+ //
+ // Deliver the events without holding the lock.
+ //
+ // If there are more than one event queued and we are not in
+ // batch sending mode then send the events as a batch and then
+ // flush immediately, otherwise send one at a time.
+ //
+ vector<Ice::Byte> dummy;
+ if(v.size() > 1 && !_batch)
+ {
+ for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
+ }
+ Ice::ConnectionPtr conn = _objBatch->ice_getCachedConnection();
+ assert(conn);
+ conn->flushBatchRequests();
+ }
+ else
+ {
+ for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
+ }
+ }
+
+ //
+ // Reacquire the lock before we check the queue again.
+ //
+ sync.acquire();
}
catch(const Ice::LocalException& ex)
{
- assert(!sync.acquired());
- // error will re-acquire and release the lock.
- error(ex);
- return false;
+ assert(!sync.acquired());
+ // error will re-acquire and release the lock.
+ error(ex);
+ return false;
}
if(!_events.empty())
{
- assert(_state == SubscriberStateFlushPending);
- return true;
+ assert(_state == SubscriberStateFlushPending);
+ return true;
}
_state = SubscriberStateOnline;
return false;
@@ -272,7 +272,7 @@ SubscriberOneway::destroy()
{
if(_batch)
{
- _instance->batchFlusher()->remove(_obj);
+ _instance->batchFlusher()->remove(_obj);
}
Subscriber::destroy();
}
@@ -285,7 +285,7 @@ class TwowayInvokeI : public Ice::AMI_Object_ice_invoke
public:
TwowayInvokeI(const SubscriberPtr& subscriber) :
- _subscriber(subscriber)
+ _subscriber(subscriber)
{
}
@@ -297,7 +297,7 @@ public:
virtual void
ice_exception(const Ice::Exception& e)
{
- _subscriber->error(e);
+ _subscriber->error(e);
}
private:
@@ -326,7 +326,7 @@ SubscriberTwoway::flush()
//
if(_state == SubscriberStateError)
{
- return false;
+ return false;
}
assert(_state == SubscriberStateFlushPending);
assert(!_events.empty());
@@ -346,7 +346,7 @@ SubscriberTwoway::flush()
//
for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
{
- _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context);
+ _obj->ice_invoke_async(new TwowayInvokeI(this), (*p)->op, (*p)->mode, (*p)->data, (*p)->context);
}
//
@@ -360,8 +360,8 @@ SubscriberTwoway::flush()
//
if(!_events.empty())
{
- assert(_state == SubscriberStateFlushPending);
- return true;
+ assert(_state == SubscriberStateFlushPending);
+ return true;
}
_state = SubscriberStateOnline;
return false;
@@ -375,20 +375,20 @@ class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke
public:
TwowayOrderedInvokeI(const SubscriberTwowayOrderedPtr& subscriber) :
- _subscriber(subscriber)
+ _subscriber(subscriber)
{
}
virtual void
ice_response(bool, const std::vector<Ice::Byte>&)
{
- _subscriber->response();
+ _subscriber->response();
}
virtual void
ice_exception(const Ice::Exception& ex)
{
- _subscriber->error(ex);
+ _subscriber->error(ex);
}
private:
@@ -412,20 +412,20 @@ SubscriberTwowayOrdered::flush()
{
EventDataPtr e;
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- //
- // If the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateError)
- {
- return false;
- }
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
-
- e = _events.front();
- _events.erase(_events.begin());
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ //
+ // If the subscriber errored out then we're done.
+ //
+ if(_state == SubscriberStateError)
+ {
+ return false;
+ }
+ assert(_state == SubscriberStateFlushPending);
+ assert(!_events.empty());
+
+ e = _events.front();
+ _events.erase(_events.begin());
}
_obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context);
@@ -441,8 +441,8 @@ SubscriberTwowayOrdered::response()
assert(_state != SubscriberStateError);
if(_events.empty())
{
- _state = SubscriberStateOnline;
- return;
+ _state = SubscriberStateOnline;
+ return;
}
_instance->subscriberPool()->flush(this);
}
@@ -455,31 +455,31 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward
public:
Topiclink_forwardI(const SubscriberLinkPtr& subscriber) :
- _subscriber(subscriber)
+ _subscriber(subscriber)
{
}
virtual void
ice_response()
{
- _subscriber->response();
+ _subscriber->response();
}
virtual void
ice_exception(const Ice::Exception& ex)
{
- try
- {
- ex.ice_throw();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- _subscriber->error(ex);
- }
- catch(const Ice::LocalException& ex)
- {
- _subscriber->offline(ex);
- }
+ try
+ {
+ ex.ice_throw();
+ }
+ catch(const Ice::ObjectNotExistException& ex)
+ {
+ _subscriber->error(ex);
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _subscriber->offline(ex);
+ }
}
private:
@@ -505,7 +505,7 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events)
{
if(forwarded)
{
- return QueueStateNoFlush;
+ return QueueStateNoFlush;
}
//
@@ -518,7 +518,7 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events)
if(_state == SubscriberStateError)
{
- return QueueStateError;
+ return QueueStateError;
}
//
@@ -527,53 +527,53 @@ SubscriberLink::queue(bool forwarded, const EventDataSeq& events)
//
if(_state == SubscriberStateOffline)
{
- //
- // If there are alot of subscribers offline then we will call
- // Time::now() alot, which could be costly. This could be
- // optimized to only one per event-batch by making the
- // forwarded argument an EventInfo thing where the queue-time
- // is lazy initialized.
- //
- if(IceUtil::Time::now() < _next)
- {
- return QueueStateNoFlush;
- }
-
- //
- // State transition to online.
- //
- _state = SubscriberStateOnline;
+ //
+ // If there are alot of subscribers offline then we will call
+ // Time::now() alot, which could be costly. This could be
+ // optimized to only one per event-batch by making the
+ // forwarded argument an EventInfo thing where the queue-time
+ // is lazy initialized.
+ //
+ if(IceUtil::Time::now() < _next)
+ {
+ return QueueStateNoFlush;
+ }
+
+ //
+ // State transition to online.
+ //
+ _state = SubscriberStateOnline;
}
int queued = 0;
for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
{
- if(_cost != 0)
- {
- //
- // Note that we could calculate this cost once and cache
- // it in a private form of the event to avoid this if this
- // really is a performance problem (this could use the
- // EventInfo thing discussed above).
- //
- int cost = 0;
- Ice::Context::const_iterator q = (*p)->context.find("cost");
- if(q != (*p)->context.end())
- {
- cost = atoi(q->second.c_str());
- }
- if(cost > _cost)
- {
- continue;
- }
- }
- ++queued;
- _events.push_back(*p);
+ if(_cost != 0)
+ {
+ //
+ // Note that we could calculate this cost once and cache
+ // it in a private form of the event to avoid this if this
+ // really is a performance problem (this could use the
+ // EventInfo thing discussed above).
+ //
+ int cost = 0;
+ Ice::Context::const_iterator q = (*p)->context.find("cost");
+ if(q != (*p)->context.end())
+ {
+ cost = atoi(q->second.c_str());
+ }
+ if(cost > _cost)
+ {
+ continue;
+ }
+ }
+ ++queued;
+ _events.push_back(*p);
}
if(_state == SubscriberStateFlushPending || queued == 0)
{
- return QueueStateNoFlush;
+ return QueueStateNoFlush;
}
_state = SubscriberStateFlushPending;
return QueueStateFlush;
@@ -584,20 +584,20 @@ SubscriberLink::flush()
{
EventDataSeq v;
{
- IceUtil::Mutex::Lock sync(_mutex);
-
- //
- // If the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateError)
- {
- return false;
- }
+ IceUtil::Mutex::Lock sync(_mutex);
+
+ //
+ // If the subscriber errored out then we're done.
+ //
+ if(_state == SubscriberStateError)
+ {
+ return false;
+ }
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
-
- v.swap(_events);
+ assert(_state == SubscriberStateFlushPending);
+ assert(!_events.empty());
+
+ v.swap(_events);
}
_obj->forward_async(new Topiclink_forwardI(this), v);
@@ -622,8 +622,8 @@ SubscriberLink::response()
//
if(_events.empty())
{
- _state = SubscriberStateOnline;
- return;
+ _state = SubscriberStateOnline;
+ return;
}
_instance->subscriberPool()->flush(this);
}
@@ -639,18 +639,18 @@ SubscriberLink::offline(const Ice::Exception& e)
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(_warn)
{
- Ice::Warning warn(traceLevels->logger);
- warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id)
- << ": link offline: " << e;
+ Ice::Warning warn(traceLevels->logger);
+ warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id)
+ << ": link offline: " << e;
}
else
{
- if(traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << _instance->communicator()->identityToString(_id) << ": link offline: " << e
- << " discarding events: " << _instance->discardInterval() << "s";
- }
+ if(traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << _instance->communicator()->identityToString(_id) << ": link offline: " << e
+ << " discarding events: " << _instance->discardInterval() << "s";
+ }
}
_state = SubscriberStateOffline;
@@ -675,57 +675,57 @@ Subscriber::create(
try
{
- string reliability;
- QoS::const_iterator p = qos.find("reliability");
- if(p != qos.end())
- {
- reliability = p->second;
- }
- if(!reliability.empty() && reliability != "ordered")
- {
- throw BadQoS("invalid reliability: " + reliability);
- }
-
- //
- // Override the timeout.
- //
- Ice::ObjectPrx newObj;
- try
- {
- newObj = obj->ice_timeout(instance->sendTimeout());
- }
- catch(const Ice::FixedProxyException&)
- {
- //
- // In the event IceStorm is collocated this could be a
- // fixed proxy in which case its not possible to set the
- // timeout.
- //
- newObj = obj;
- }
- if(reliability == "ordered")
- {
- if(!newObj->ice_isTwoway())
- {
- throw BadQoS("ordered reliability requires a twoway proxy");
- }
- subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
- }
- else if(newObj->ice_isOneway() || newObj->ice_isDatagram() ||
- newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
- {
- subscriber = new SubscriberOneway(instance, proxy, newObj);
- }
- else if(newObj->ice_isTwoway())
- {
- subscriber = new SubscriberTwoway(instance, proxy, newObj);
- }
- per->setSubscriber(subscriber);
+ string reliability;
+ QoS::const_iterator p = qos.find("reliability");
+ if(p != qos.end())
+ {
+ reliability = p->second;
+ }
+ if(!reliability.empty() && reliability != "ordered")
+ {
+ throw BadQoS("invalid reliability: " + reliability);
+ }
+
+ //
+ // Override the timeout.
+ //
+ Ice::ObjectPrx newObj;
+ try
+ {
+ newObj = obj->ice_timeout(instance->sendTimeout());
+ }
+ catch(const Ice::FixedProxyException&)
+ {
+ //
+ // In the event IceStorm is collocated this could be a
+ // fixed proxy in which case its not possible to set the
+ // timeout.
+ //
+ newObj = obj;
+ }
+ if(reliability == "ordered")
+ {
+ if(!newObj->ice_isTwoway())
+ {
+ throw BadQoS("ordered reliability requires a twoway proxy");
+ }
+ subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
+ }
+ else if(newObj->ice_isOneway() || newObj->ice_isDatagram() ||
+ newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
+ {
+ subscriber = new SubscriberOneway(instance, proxy, newObj);
+ }
+ else if(newObj->ice_isTwoway())
+ {
+ subscriber = new SubscriberTwoway(instance, proxy, newObj);
+ }
+ per->setSubscriber(subscriber);
}
catch(const Ice::Exception&)
{
- instance->objectAdapter()->remove(proxy->ice_getIdentity());
- throw;
+ instance->objectAdapter()->remove(proxy->ice_getIdentity());
+ throw;
}
return subscriber;
@@ -738,9 +738,9 @@ Subscriber::create(
int cost)
{
return new SubscriberLink(
- instance,
- TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())),
- cost);
+ instance,
+ TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())),
+ cost);
}
Subscriber::~Subscriber()
@@ -772,13 +772,13 @@ Subscriber::queue(bool, const EventDataSeq& events)
if(_state == SubscriberStateError)
{
- return QueueStateError;
+ return QueueStateError;
}
copy(events.begin(), events.end(), back_inserter(_events));
if(_state == SubscriberStateFlushPending)
{
- return QueueStateNoFlush;
+ return QueueStateNoFlush;
}
_state = SubscriberStateFlushPending;
@@ -793,18 +793,18 @@ Subscriber::destroy()
//
if(_proxy)
{
- try
- {
- _instance->objectAdapter()->remove(_proxy->ice_getIdentity());
- }
- catch(const Ice::NotRegisteredException&)
- {
- // Ignore
- }
- catch(const Ice::ObjectAdapterDeactivatedException&)
- {
- // Ignore
- }
+ try
+ {
+ _instance->objectAdapter()->remove(_proxy->ice_getIdentity());
+ }
+ catch(const Ice::NotRegisteredException&)
+ {
+ // Ignore
+ }
+ catch(const Ice::ObjectAdapterDeactivatedException&)
+ {
+ // Ignore
+ }
}
}
@@ -813,9 +813,9 @@ Subscriber::flushTime(const IceUtil::Time& interval)
{
if(_resetMax || interval > _maxSend)
{
- assert(interval != IceUtil::Time());
- _resetMax = false;
- _maxSend = interval;
+ assert(interval != IceUtil::Time());
+ _resetMax = false;
+ _maxSend = interval;
}
}
@@ -833,15 +833,15 @@ Subscriber::error(const Ice::Exception& e)
IceUtil::Mutex::Lock sync(_mutex);
if(_state != SubscriberStateError)
{
- _state = SubscriberStateError;
- _events.clear();
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(traceLevels->subscriber > 0)
- {
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e;
- }
+ _state = SubscriberStateError;
+ _events.clear();
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e;
+ }
}
}