summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp1128
1 files changed, 558 insertions, 570 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 546f98b624c..0b7338c321f 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -10,14 +10,9 @@
#include <IceStorm/Subscriber.h>
#include <IceStorm/Instance.h>
#include <IceStorm/TraceLevels.h>
-#include <IceStorm/BatchFlusher.h>
-#include <IceStorm/SubscriberPool.h>
+#include <IceStorm/NodeI.h>
-#include <Ice/ObjectAdapter.h>
#include <Ice/LoggerUtil.h>
-#include <Ice/Communicator.h>
-#include <Ice/LocalException.h>
-#include <Ice/Connection.h>
#ifdef __BCPLUSPLUS__
#include <iterator>
@@ -25,6 +20,7 @@
using namespace std;
using namespace IceStorm;
+using namespace IceStormElection;
//
// Per Subscriber object.
@@ -56,6 +52,9 @@ public:
vector<Ice::Byte>&,
const Ice::Current& current)
{
+ // Use cached reads.
+ CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
+
EventDataPtr event = new EventData(
current.operation,
current.mode,
@@ -71,12 +70,7 @@ public:
EventDataSeq e;
e.push_back(event);
- Subscriber::QueueState state = _subscriber->queue(false, e);
-
- if(state == Subscriber::QueueStateFlush)
- {
- _instance->subscriberPool()->flush(_subscriber);
- }
+ _subscriber->queue(false, e);
return true;
}
@@ -88,441 +82,368 @@ private:
typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr;
}
+
// Each of the various Subscriber types.
namespace
{
-class SubscriberOneway : public Subscriber
+class SubscriberBatch : public Subscriber
{
public:
- SubscriberOneway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
- //
- // Oneway
- //
- virtual bool flush();
- virtual void destroy();
+ SubscriberBatch(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&);
+ ~SubscriberBatch();
+
+ virtual void flush();
+
+ void doFlush();
private:
- const bool _batch;
const Ice::ObjectPrx _obj;
- /*const*/ Ice::ObjectPrx _objBatch;
+ const IceUtil::Time _interval;
};
+typedef IceUtil::Handle<SubscriberBatch> SubscriberBatchPtr;
-class SubscriberTwoway : public Subscriber
+class SubscriberOneway : public Subscriber
{
public:
- SubscriberTwoway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
+ SubscriberOneway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&);
+ ~SubscriberOneway();
- virtual bool flush();
- void response();
+ virtual void flush();
+
+ void sent();
private:
const Ice::ObjectPrx _obj;
- const int _maxOutstanding;
- int _outstanding;
};
-typedef IceUtil::Handle<SubscriberTwoway> SubscriberTwowayPtr;
+typedef IceUtil::Handle<SubscriberOneway> SubscriberOnewayPtr;
-//
-// Twoway Ordered
-//
-class SubscriberTwowayOrdered : public Subscriber
+class SubscriberTwoway : public Subscriber
{
public:
- SubscriberTwowayOrdered(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&);
+ SubscriberTwoway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, int,
+ const Ice::ObjectPrx&);
- virtual bool flush();
- void response();
+ virtual void flush();
private:
const Ice::ObjectPrx _obj;
};
-typedef IceUtil::Handle<SubscriberTwowayOrdered> SubscriberTwowayOrderedPtr;
class SubscriberLink : public Subscriber
{
public:
- SubscriberLink(const InstancePtr&, const TopicLinkPrx&, int);
+ SubscriberLink(const InstancePtr&, const SubscriberRecord&);
- virtual QueueState queue(bool, const std::vector<EventDataPtr>&);
- virtual bool flush();
- void response();
-
- void offline(const Ice::Exception&);
+ virtual void flush();
private:
const TopicLinkPrx _obj;
- const int _cost;
-
- // The next to try sending a new event if we're offline.
- IceUtil::Time _next;
- bool _warn;
};
-typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr;
-
-}
-SubscriberOneway::SubscriberOneway(
- const InstancePtr& instance,
- const Ice::ObjectPrx& proxy,
- const Ice::ObjectPrx& obj) :
- Subscriber(instance, proxy, false, obj->ice_getIdentity()),
- _batch(obj->ice_isBatchDatagram() || obj->ice_isBatchOneway()),
- _obj(obj)
+class ResponseTimerTask : public IceUtil::TimerTask
{
- //
- // COMPILERFIX: Initialized this way for Borland to compile.
- //
- if(obj->ice_isDatagram())
+public:
+ ResponseTimerTask(const SubscriberPtr& subscriber) :
+ _subscriber(subscriber)
{
- _objBatch = obj->ice_batchDatagram();
}
- else
+
+ virtual void
+ runTimerTask()
{
- _objBatch = obj->ice_batchOneway();
+ _subscriber->flush();
}
- if(_batch)
+private:
+
+ const SubscriberPtr _subscriber;
+};
+
+class OnewayIceInvokeI : public Ice::AMI_Object_ice_invoke
+{
+public:
+
+ OnewayIceInvokeI(const SubscriberOnewayPtr& subscriber) :
+ _subscriber(subscriber)
{
- _instance->batchFlusher()->add(_obj);
}
-}
-bool
-SubscriberOneway::flush()
-{
- IceUtil::Mutex::Lock sync(_mutex);
-
- //
- // If the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateError)
+ virtual void
+ ice_response(bool, const std::vector<Ice::Byte>&)
{
- return false;
+ assert(false);
}
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
- try
+ virtual void __sent(Ice::ConnectionI* c)
{
- //
- // Get the current set of events, but release the lock before
- // attempting to deliver the events. This allows other threads
- // to add events in case we block (such as during connection
- // establishment).
- //
- EventDataSeq v;
- v.swap(_events);
- sync.release();
+ AMI_Object_ice_invoke::__sent(c);
+ _subscriber->sent();
+ }
- //
- // Deliver the events without holding the lock.
- //
- // If there are more than one event queued and we are not in
- // batch sending mode then send the events as a batch and then
- // flush immediately, otherwise send one at a time.
- //
- vector<Ice::Byte> dummy;
- if(v.size() > 1 && !_batch)
- {
- Ice::ConnectionPtr conn = _objBatch->ice_getConnection();
- for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
- {
- _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
- }
- conn->flushBatchRequests();
- }
- else
- {
- for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
- {
- _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
- }
- }
-
- //
- // Reacquire the lock before we check the queue again.
- //
- sync.acquire();
+ virtual void
+ ice_exception(const Ice::Exception& e)
+ {
+ _subscriber->error(true, e);
}
- catch(const Ice::LocalException& ex)
+
+private:
+
+ const SubscriberOnewayPtr _subscriber;
+};
+
+class IceInvokeI : public Ice::AMI_Object_ice_invoke
+{
+public:
+
+ IceInvokeI(const SubscriberPtr& subscriber) :
+ _subscriber(subscriber)
{
- assert(!sync.acquired());
- // error will re-acquire and release the lock.
- error(ex);
- return false;
}
- if(!_events.empty())
+ virtual void
+ ice_response(bool, const std::vector<Ice::Byte>&)
{
- assert(_state == SubscriberStateFlushPending);
- return true;
+ _subscriber->response();
}
- _state = SubscriberStateOnline;
- return false;
-}
-void
-SubscriberOneway::destroy()
-{
- if(_batch)
+ virtual void
+ ice_exception(const Ice::Exception& e)
{
- _instance->batchFlusher()->remove(_obj);
+ _subscriber->error(true, e);
}
- Subscriber::destroy();
-}
-namespace
-{
+private:
-class TwowayInvokeI : public Ice::AMI_Object_ice_invoke
+ const SubscriberPtr _subscriber;
+};
+
+class FlushBatchI : public Ice::AMI_Object_ice_flushBatchRequests
{
public:
- TwowayInvokeI(const SubscriberTwowayPtr& subscriber) :
+ FlushBatchI(const SubscriberPtr& subscriber) :
_subscriber(subscriber)
{
}
virtual void
- ice_response(bool, const std::vector<Ice::Byte>&)
+ ice_exception(const Ice::Exception& e)
+ {
+ _subscriber->error(false, e);
+ }
+
+private:
+
+ const SubscriberPtr _subscriber;
+};
+
+class FlushTimerTask : public IceUtil::TimerTask
+{
+public:
+
+ FlushTimerTask(const SubscriberBatchPtr& subscriber) :
+ _subscriber(subscriber)
{
- _subscriber->response();
}
virtual void
- ice_exception(const Ice::Exception& e)
+ runTimerTask()
{
- _subscriber->error(e);
+ _subscriber->doFlush();
}
private:
- const SubscriberTwowayPtr _subscriber;
+ const SubscriberBatchPtr _subscriber;
};
}
-SubscriberTwoway::SubscriberTwoway(
+SubscriberBatch::SubscriberBatch(
const InstancePtr& instance,
+ const SubscriberRecord& rec,
const Ice::ObjectPrx& proxy,
+ int retryCount,
const Ice::ObjectPrx& obj) :
- Subscriber(instance, proxy, false, obj->ice_getIdentity()),
+ Subscriber(instance, rec, proxy, retryCount, 1),
_obj(obj),
- _maxOutstanding(10),
- _outstanding(0)
+ _interval(instance->flushInterval())
{
+ assert(retryCount == 0);
}
-bool
-SubscriberTwoway::flush()
+SubscriberBatch::~SubscriberBatch()
{
- EventDataPtr e;
- {
- IceUtil::Mutex::Lock sync(_mutex);
-
- //
- // If the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateError)
- {
- return false;
- }
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
+}
- //
- // If there are more than _maxOutstanding unanswered AMI
- // events we're also done. In this case the response to the
- // pending AMI requests will trigger another event to be sent.
- //
- if(_outstanding >= _maxOutstanding)
- {
- _state = SubscriberStateSending;
- return false;
- }
-
- //
- // Dequeue the head event, count one more outstanding AMI
- // request.
- //
- e = _events.front();
- _events.erase(_events.begin());
- _state = SubscriberStateSending;
+void
+SubscriberBatch::flush()
+{
+ if(_outstanding == 0)
+ {
++_outstanding;
+ _instance->batchFlusher()->schedule(new FlushTimerTask(this), _interval);
}
+}
- _obj->ice_invoke_async(new TwowayInvokeI(this), e->op, e->mode, e->data, e->context);
-
- //
- // We process the subscriber state after the event send and not
- // before to prevent the subscriber from being requeued
- // concurrently.
- //
+void
+SubscriberBatch::doFlush()
+{
+ EventDataSeq v;
{
- IceUtil::Mutex::Lock sync(_mutex);
- //
- // If the subscriber has already been requeued for a flush or
- // the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateFlushPending || _state == SubscriberStateError)
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ --_outstanding;
+ assert(_outstanding == 0);
+
+ if(_shutdown)
{
- return false;
+ _lock.notify();
}
+ v.swap(_events);
+ assert(!v.empty());
+ }
- //
- // If there are no events left in the queue transition back to
- // the online state, and return false to indicate to the
- // worker not to requeue.
- //
- if(_events.empty())
- {
- _state = SubscriberStateOnline;
- return false;
- }
+ vector<Ice::Byte> dummy;
+ for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
+ {
+ _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
+ }
- //
- // We must still be in sending state.
- //
- assert(_state == SubscriberStateSending);
+ _obj->ice_flushBatchRequests_async(new FlushBatchI(this));
- //
- // If we're below the outstanding limit then requeue,
- // otherwise the response callback will do so.
- //
- if(_outstanding < _maxOutstanding)
- {
- _state = SubscriberStateFlushPending;
- }
+ // This is significantly faster than the async version, but it can
+ // block the calling thread. Bad news!
- return _state == SubscriberStateFlushPending;
- }
+ //_obj->ice_flushBatchRequests();
}
-void
-SubscriberTwoway::response()
+SubscriberOneway::SubscriberOneway(
+ const InstancePtr& instance,
+ const SubscriberRecord& rec,
+ const Ice::ObjectPrx& proxy,
+ int retryCount,
+ const Ice::ObjectPrx& obj) :
+ Subscriber(instance, rec, proxy, retryCount, 5),
+ _obj(obj)
{
- IceUtil::Mutex::Lock sync(_mutex);
+ assert(retryCount == 0);
+}
- --_outstanding;
+SubscriberOneway::~SubscriberOneway()
+{
+}
+void
+SubscriberOneway::flush()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
//
- // Note that its possible for the _state to be error if there are
- // mutliple threads in the client side thread pool and response
- // and exception are called out of order.
+ // If the subscriber isn't online we're done.
//
- assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
+ if(_state != SubscriberStateOnline || _events.empty())
+ {
+ return;
+ }
- //
- // Unless we're in the sending state we do nothing.
- //
- if(_state == SubscriberStateSending)
+ // Send up to _maxOutstanding pending events.
+ while(_outstanding < _maxOutstanding && !_events.empty())
{
//
- // If there are no more events then we transition back to
- // online.
+ // Dequeue the head event, count one more outstanding AMI
+ // request.
//
- if(_events.empty())
+ EventDataPtr e = _events.front();
+ _events.erase(_events.begin());
+ ++_outstanding;
+
+ try
{
- _state = SubscriberStateOnline;
+ _obj->ice_invoke_async(new OnewayIceInvokeI(this), e->op, e->mode, e->data, e->context);
}
- //
- // Otherwise we re-add for a flush.
- //
- else
+ catch(const Ice::Exception& ex)
{
- _state = SubscriberStateFlushPending;
- _instance->subscriberPool()->flush(this);
+ error(true, ex);
+ return;
}
}
}
-namespace
-{
-
-class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke
+void
+SubscriberOneway::sent()
{
-public:
-
- TwowayOrderedInvokeI(const SubscriberTwowayOrderedPtr& subscriber) :
- _subscriber(subscriber)
- {
- }
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ // Decrement the _outstanding count.
+ --_outstanding;
+ assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
- virtual void
- ice_response(bool, const std::vector<Ice::Byte>&)
+ if(_events.empty() && _outstanding == 0 && _shutdown)
{
- _subscriber->response();
+ _lock.notify();
}
-
- virtual void
- ice_exception(const Ice::Exception& ex)
+ else if(_outstanding == 0 && !_events.empty())
{
- _subscriber->error(ex);
+ _instance->batchFlusher()->schedule(new ResponseTimerTask(this), IceUtil::Time::seconds(0));
}
-
-private:
-
- const SubscriberTwowayOrderedPtr _subscriber;
-};
-
}
-SubscriberTwowayOrdered::SubscriberTwowayOrdered(
+SubscriberTwoway::SubscriberTwoway(
const InstancePtr& instance,
+ const SubscriberRecord& rec,
const Ice::ObjectPrx& proxy,
+ int retryCount,
+ int maxOutstanding,
const Ice::ObjectPrx& obj) :
- Subscriber(instance, proxy, false, obj->ice_getIdentity()),
+ Subscriber(instance, rec, proxy, retryCount, maxOutstanding),
_obj(obj)
{
}
-bool
-SubscriberTwowayOrdered::flush()
+void
+SubscriberTwoway::flush()
{
- EventDataPtr e;
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ //
+ // If the subscriber isn't online we're done.
+ //
+ if(_state != SubscriberStateOnline || _events.empty())
+ {
+ return;
+ }
+
+ // Send up to _maxOutstanding pending events.
+ while(_outstanding < _maxOutstanding && !_events.empty())
{
- IceUtil::Mutex::Lock sync(_mutex);
-
//
- // If the subscriber errored out then we're done.
+ // Dequeue the head event, count one more outstanding AMI
+ // request.
//
- if(_state == SubscriberStateError)
+ EventDataPtr e = _events.front();
+ _events.erase(_events.begin());
+ ++_outstanding;
+
+ try
{
- return false;
+ _obj->ice_invoke_async(new IceInvokeI(this), e->op, e->mode, e->data, e->context);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ error(true, ex);
+ return;
}
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
-
- e = _events.front();
- _events.erase(_events.begin());
- }
-
- _obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context);
-
- return false;
-}
-
-void
-SubscriberTwowayOrdered::response()
-{
- IceUtil::Mutex::Lock sync(_mutex);
-
- assert(_state != SubscriberStateError);
- if(_events.empty())
- {
- _state = SubscriberStateOnline;
- return;
}
- _instance->subscriberPool()->flush(this);
}
namespace
@@ -532,7 +453,7 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward
{
public:
- Topiclink_forwardI(const SubscriberLinkPtr& subscriber) :
+ Topiclink_forwardI(const SubscriberPtr& subscriber) :
_subscriber(subscriber)
{
}
@@ -544,328 +465,266 @@ public:
}
virtual void
- ice_exception(const Ice::Exception& ex)
+ ice_exception(const Ice::Exception& e)
{
- try
- {
- ex.ice_throw();
- }
- catch(const Ice::ObjectNotExistException& ex)
- {
- _subscriber->error(ex);
- }
- catch(const Ice::LocalException& ex)
- {
- _subscriber->offline(ex);
- }
+ _subscriber->error(true, e);
}
private:
- const SubscriberLinkPtr _subscriber;
+ const SubscriberPtr _subscriber;
};
}
SubscriberLink::SubscriberLink(
const InstancePtr& instance,
- const TopicLinkPrx& obj,
- int cost) :
- Subscriber(instance, 0, true, obj->ice_getIdentity()),
- _obj(TopicLinkPrx::uncheckedCast(obj->ice_collocationOptimized(false))),
- _cost(cost),
- _warn(true)
+ const SubscriberRecord& rec) :
+ Subscriber(instance, rec, 0, -1, 1),
+ _obj(TopicLinkPrx::uncheckedCast(rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout())))
{
}
-Subscriber::QueueState
-SubscriberLink::queue(bool forwarded, const EventDataSeq& events)
+void
+SubscriberLink::flush()
{
- if(forwarded)
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ if(_state != SubscriberStateOnline || _outstanding > 0)
{
- return QueueStateNoFlush;
+ return;
}
- //
- // Don't propagate a message that has already been forwarded.
- // Also, if this link has a non-zero cost, then don't propagate a
- // message whose cost exceeds the link cost.
- //
-
- IceUtil::Mutex::Lock sync(_mutex);
-
- if(_state == SubscriberStateError)
- {
- return QueueStateError;
- }
+ EventDataSeq v;
+ v.swap(_events);
- //
- // If the proxy is offline and its time to send another event then
- // put us into retry state.
- //
- if(_state == SubscriberStateOffline)
+ EventDataSeq::iterator p = v.begin();
+ while(p != v.end())
{
- //
- // If there are alot of subscribers offline then we will call
- // Time::now() alot, which could be costly. This could be
- // optimized to only one per event-batch by making the
- // forwarded argument an EventInfo thing where the queue-time
- // is lazy initialized.
- //
- if(IceUtil::Time::now(IceUtil::Time::Monotonic) < _next)
+ if(_rec.cost != 0)
{
- return QueueStateNoFlush;
- }
-
- //
- // State transition to online.
- //
- _state = SubscriberStateOnline;
- }
-
- int queued = 0;
- for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p)
- {
- if(_cost != 0)
- {
- //
- // Note that we could calculate this cost once and cache
- // it in a private form of the event to avoid this if this
- // really is a performance problem (this could use the
- // EventInfo thing discussed above).
- //
int cost = 0;
Ice::Context::const_iterator q = (*p)->context.find("cost");
if(q != (*p)->context.end())
{
cost = atoi(q->second.c_str());
}
- if(cost > _cost)
+ if(cost > _rec.cost)
{
+ p = v.erase(p);
continue;
}
}
- ++queued;
- _events.push_back(*p);
+ ++p;
}
- if(_state == SubscriberStateFlushPending || queued == 0)
+ if(!v.empty())
{
- return QueueStateNoFlush;
- }
- _state = SubscriberStateFlushPending;
- return QueueStateFlush;
-}
-
-bool
-SubscriberLink::flush()
-{
- EventDataSeq v;
- {
- IceUtil::Mutex::Lock sync(_mutex);
-
- //
- // If the subscriber errored out then we're done.
- //
- if(_state == SubscriberStateError)
+ try
{
- return false;
+ ++_outstanding;
+ _obj->forward_async(new Topiclink_forwardI(this), v);
}
-
- assert(_state == SubscriberStateFlushPending);
- assert(!_events.empty());
-
- v.swap(_events);
- }
-
- _obj->forward_async(new Topiclink_forwardI(this), v);
- return false;
-}
-
-void
-SubscriberLink::response()
-{
- IceUtil::Mutex::Lock sync(_mutex);
-
- assert(_state != SubscriberStateError);
-
- //
- // A successful response means we're no longer retrying, we're
- // back active.
- //
- _warn = true;
-
- //
- // No more events, no need to requeue this subscriber.
- //
- if(_events.empty())
- {
- _state = SubscriberStateOnline;
- return;
- }
- _instance->subscriberPool()->flush(this);
-}
-
-void
-SubscriberLink::offline(const Ice::Exception& e)
-{
- IceUtil::Mutex::Lock sync(_mutex);
- assert(_state != SubscriberStateOffline);
-
- _next = IceUtil::Time::now(IceUtil::Time::Monotonic) + _instance->discardInterval();
-
- TraceLevelsPtr traceLevels = _instance->traceLevels();
- if(_warn)
- {
- Ice::Warning warn(traceLevels->logger);
- warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id)
- << ": link offline: " << e;
- }
- else
- {
- if(traceLevels->subscriber > 0)
+ catch(const Ice::Exception& ex)
{
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << _instance->communicator()->identityToString(_id) << ": link offline: " << e
- << " discarding events: " << _instance->discardInterval() << "s";
+ error(true, ex);
}
}
-
- _state = SubscriberStateOffline;
- _warn = false;
-
- //
- // Clear all queued events.
- //
- _events.clear();
}
SubscriberPtr
Subscriber::create(
const InstancePtr& instance,
- const string& topicName,
- const Ice::ObjectPrx& obj,
- const IceStorm::QoS& qos)
-{
- PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance);
- Ice::Identity perId;
- perId.category = instance->instanceName();
- perId.name = "topic." + topicName + ".publish." +
- instance->communicator()->identityToString(obj->ice_getIdentity());
- Ice::ObjectPrx proxy = instance->objectAdapter()->add(per, perId);
- TraceLevelsPtr traceLevels = instance->traceLevels();
- SubscriberPtr subscriber;
-
- try
- {
- string reliability;
- QoS::const_iterator p = qos.find("reliability");
- if(p != qos.end())
- {
- reliability = p->second;
- }
- if(!reliability.empty() && reliability != "ordered")
- {
- throw BadQoS("invalid reliability: " + reliability);
- }
+ const SubscriberRecord& rec)
+{
+ if(rec.link)
+ {
+ return new SubscriberLink(instance, rec);
+ }
+ else
+ {
+ PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance);
+ Ice::Identity perId;
+ perId.category = instance->instanceName();
+ perId.name = "topic." + rec.topicName + ".publish." +
+ instance->communicator()->identityToString(rec.obj->ice_getIdentity());
+ Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId);
+ TraceLevelsPtr traceLevels = instance->traceLevels();
+ SubscriberPtr subscriber;
- //
- // Override the timeout.
- //
- Ice::ObjectPrx newObj;
try
{
- newObj = obj->ice_timeout(instance->sendTimeout());
- }
- catch(const Ice::FixedProxyException&)
- {
+ int retryCount = 0;
+ QoS::const_iterator p = rec.theQoS.find("retryCount");
+ if(p != rec.theQoS.end())
+ {
+ retryCount = atoi(p->second.c_str());
+ }
+
+ string reliability;
+ p = rec.theQoS.find("reliability");
+ if(p != rec.theQoS.end())
+ {
+ reliability = p->second;
+ }
+ if(!reliability.empty() && reliability != "ordered")
+ {
+ throw BadQoS("invalid reliability: " + reliability);
+ }
+
//
- // In the event IceStorm is collocated this could be a
- // fixed proxy in which case its not possible to set the
- // timeout.
+ // Override the timeout.
//
- newObj = obj;
- }
- if(reliability == "ordered")
- {
- if(!newObj->ice_isTwoway())
+ Ice::ObjectPrx newObj;
+ try
{
- throw BadQoS("ordered reliability requires a twoway proxy");
+ newObj = rec.obj->ice_timeout(instance->sendTimeout());
}
- subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj);
- }
- else if(newObj->ice_isOneway() || newObj->ice_isDatagram() ||
- newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
- {
- subscriber = new SubscriberOneway(instance, proxy, newObj);
+ catch(const Ice::FixedProxyException&)
+ {
+ //
+ // In the event IceStorm is collocated this could be a
+ // fixed proxy in which case its not possible to set the
+ // timeout.
+ //
+ newObj = rec.obj;
+ }
+ if(reliability == "ordered")
+ {
+ if(!newObj->ice_isTwoway())
+ {
+ throw BadQoS("ordered reliability requires a twoway proxy");
+ }
+ subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj);
+ }
+ else if(newObj->ice_isOneway() || newObj->ice_isDatagram())
+ {
+ if(retryCount > 0)
+ {
+ throw BadQoS("non-zero retryCount QoS requires a twoway proxy");
+ }
+ subscriber = new SubscriberOneway(instance, rec, proxy, retryCount, newObj);
+ }
+ else if(newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
+ {
+ if(retryCount > 0)
+ {
+ throw BadQoS("non-zero retryCount QoS requires a twoway proxy");
+ }
+ subscriber = new SubscriberBatch(instance, rec, proxy, retryCount, newObj);
+ }
+ else //if(newObj->ice_isTwoway())
+ {
+ assert(newObj->ice_isTwoway());
+ subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj);
+ }
+ per->setSubscriber(subscriber);
}
- else if(newObj->ice_isTwoway())
+ catch(const Ice::Exception&)
{
- subscriber = new SubscriberTwoway(instance, proxy, newObj);
+ instance->publishAdapter()->remove(proxy->ice_getIdentity());
+ throw;
}
- per->setSubscriber(subscriber);
- }
- catch(const Ice::Exception&)
- {
- instance->objectAdapter()->remove(proxy->ice_getIdentity());
- throw;
- }
-
- return subscriber;
-}
-SubscriberPtr
-Subscriber::create(
- const InstancePtr& instance,
- const TopicLinkPrx& link,
- int cost)
-{
- return new SubscriberLink(
- instance,
- TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())),
- cost);
+ return subscriber;
+ }
}
Subscriber::~Subscriber()
{
+ //cout << "~Subscriber" << endl;
}
Ice::ObjectPrx
Subscriber::proxy() const
{
- return _proxy;
+ return _proxyReplica;
}
Ice::Identity
Subscriber::id() const
{
- return _id;
+ return _rec.id;
}
-bool
-Subscriber::persistent() const
+SubscriberRecord
+Subscriber::record() const
{
- return _persistent;
+ return _rec;
}
-Subscriber::QueueState
-Subscriber::queue(bool, const EventDataSeq& events)
+bool
+Subscriber::queue(bool forwarded, const EventDataSeq& events)
{
- IceUtil::Mutex::Lock sync(_mutex);
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ // If this is a link subscriber if the set of events were
+ // forwarded from another IceStorm instance then do not queue the
+ // events.
+ if(forwarded && _rec.link)
+ {
+ return true;
+ }
+
+ switch(_state)
+ {
+ case SubscriberStateOffline:
+ {
+ if(IceUtil::Time::now(IceUtil::Time::Monotonic) < _next)
+ {
+ break;
+ }
+
+ //
+ // State transition to online.
+ //
+ setState(SubscriberStateOnline);
+ // fall through
+ }
+ case SubscriberStateOnline:
+ copy(events.begin(), events.end(), back_inserter(_events));
+ flush();
+ break;
+
+ case SubscriberStateError:
+ return false;
+
+ case SubscriberStateReaped:
+ break;
+ }
+
+ return true;
+}
+
+bool
+Subscriber::reap()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ assert(_state >= SubscriberStateError);
if(_state == SubscriberStateError)
{
- return QueueStateError;
+ setState(SubscriberStateReaped);
+ return true;
}
+ return false;
+}
- copy(events.begin(), events.end(), back_inserter(_events));
- if(_state == SubscriberStateSending || _state == SubscriberStateFlushPending)
+void
+Subscriber::resetIfReaped()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ if(_state == SubscriberStateReaped)
{
- return QueueStateNoFlush;
+ setState(SubscriberStateError);
}
+}
- _state = SubscriberStateFlushPending;
- return QueueStateFlush;
+bool
+Subscriber::errored() const
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ return _state >= SubscriberStateError;
}
void
@@ -878,7 +737,7 @@ Subscriber::destroy()
{
try
{
- _instance->objectAdapter()->remove(_proxy->ice_getIdentity());
+ _instance->publishAdapter()->remove(_proxy->ice_getIdentity());
}
catch(const Ice::NotRegisteredException&)
{
@@ -892,60 +751,189 @@ Subscriber::destroy()
}
void
-Subscriber::flushTime(const IceUtil::Time& interval)
+Subscriber::error(bool dec, const Ice::Exception& e)
{
- if(_resetMax || interval > _maxSend)
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ if(dec)
{
- //
- // If is possible for the flush interval to be zero if the
- // timer resolution is sufficiently big. See
- // http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=1739
- //
- //assert(interval != IceUtil::Time());
- _resetMax = false;
- _maxSend = interval;
+ // Decrement the _outstanding count.
+ --_outstanding;
+ assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
}
-}
-IceUtil::Time
-Subscriber::pollMaxFlushTime(const IceUtil::Time& now)
-{
- // The next call to flushTime can reset the max time.
- _resetMax = true;
- return _maxSend;
-}
+ // A hard error is an ObjectNotExistException or
+ // NotRegisteredException.
+ bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) ||
+ dynamic_cast<const Ice::NotRegisteredException*>(&e);
-void
-Subscriber::error(const Ice::Exception& e)
-{
- IceUtil::Mutex::Lock sync(_mutex);
- if(_state != SubscriberStateError)
+ //
+ // A twoway subscriber can queue multiple send events and
+ // therefore its possible to get multiple error'd replies. Ignore
+ // replies if we're retrying and its not yet time to process the
+ // next request.
+ //
+ IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ if(!hardError && _state == SubscriberStateOffline && now < _next)
+ {
+ return;
+ }
+
+ //
+ // If we're in our retry limits and the error isn't a hard failure
+ // (that is ObjectNotExistException or NotRegisteredException)
+ // then we transition to an offline state.
+ //
+ if(!hardError && (_retryCount == -1 || _currentRetry < _retryCount))
+ {
+ assert(_state < SubscriberStateError);
+
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(_currentRetry == 0)
+ {
+ Ice::Warning warn(traceLevels->logger);
+ warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_rec.id)
+ << ": subscriber offline: " << e
+ << " discarding events: " << _instance->discardInterval() << "s retryCount: " << _retryCount;
+ }
+ else
+ {
+ if(traceLevels->subscriber > 0)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << this << " ";
+ out << _instance->communicator()->identityToString(_rec.id) << ": subscriber offline: " << e
+ << " discarding events: " << _instance->discardInterval() << "s retry: "
+ << _currentRetry << "/" << _retryCount;
+ }
+ }
+
+ // Transition to offline state, increment the retry count and
+ // clear all queued events.
+ _next = now + _instance->discardInterval();
+ ++_currentRetry;
+ _events.clear();
+ setState(SubscriberStateOffline);
+ }
+ // Errored out.
+ else if(_state < SubscriberStateError)
{
- _state = SubscriberStateError;
_events.clear();
+ setState(SubscriberStateError);
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(traceLevels->subscriber > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e;
+ out << this << " ";
+ out << _instance->communicator()->identityToString(_rec.id) << ": subscriber errored out: " << e
+ << " retry: " << _currentRetry << "/" << _retryCount;
}
}
+
+ if(_shutdown && _events.empty())
+ {
+ _lock.notify();
+ }
+}
+
+void
+Subscriber::response()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ // Decrement the _outstanding count.
+ --_outstanding;
+ assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
+
+ //
+ // A successful response means we're no longer retrying, we're
+ // back active.
+ //
+ _currentRetry = 0;
+
+ if(_events.empty() && _outstanding == 0 && _shutdown)
+ {
+ _lock.notify();
+ }
+ else
+ {
+ flush();
+ }
+}
+
+void
+Subscriber::shutdown()
+{
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ _shutdown = true;
+ while(_outstanding > 0 && !_events.empty())
+ {
+ _lock.wait();
+ }
}
Subscriber::Subscriber(
const InstancePtr& instance,
+ const SubscriberRecord& rec,
const Ice::ObjectPrx& proxy,
- bool persistent,
- const Ice::Identity& id) :
+ int retryCount,
+ int maxOutstanding) :
_instance(instance),
- _id(id),
- _persistent(persistent),
+ _rec(rec),
+ _retryCount(retryCount),
+ _maxOutstanding(maxOutstanding),
_proxy(proxy),
+ _proxyReplica(proxy),
+ _shutdown(false),
_state(SubscriberStateOnline),
- _resetMax(true),
- _maxSend(IceUtil::Time::seconds(60*24)) // A long time
+ _outstanding(0),
+ _currentRetry(0)
+{
+ if(_proxy && _instance->publisherReplicaProxy())
+ {
+ const_cast<Ice::ObjectPrx&>(_proxyReplica) =
+ _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity());
+ }
+}
+
+namespace
+{
+
+string
+stateToString(Subscriber::SubscriberState state)
{
+ switch(state)
+ {
+ case Subscriber::SubscriberStateOnline:
+ return "online";
+ case Subscriber::SubscriberStateOffline:
+ return "offline";
+ case Subscriber::SubscriberStateError:
+ return "error";
+ case Subscriber::SubscriberStateReaped:
+ return "reaped";
+ default:
+ return "???";
+ }
+}
+
+}
+
+void
+Subscriber::setState(Subscriber::SubscriberState state)
+{
+ if(state != _state)
+ {
+ TraceLevelsPtr traceLevels = _instance->traceLevels();
+ if(traceLevels->subscriber > 1)
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << this << " transition from " << stateToString(_state) << " to " << stateToString(state);
+ }
+ _state = state;
+ }
}
bool