diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 1128 |
1 files changed, 558 insertions, 570 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 546f98b624c..0b7338c321f 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -10,14 +10,9 @@ #include <IceStorm/Subscriber.h> #include <IceStorm/Instance.h> #include <IceStorm/TraceLevels.h> -#include <IceStorm/BatchFlusher.h> -#include <IceStorm/SubscriberPool.h> +#include <IceStorm/NodeI.h> -#include <Ice/ObjectAdapter.h> #include <Ice/LoggerUtil.h> -#include <Ice/Communicator.h> -#include <Ice/LocalException.h> -#include <Ice/Connection.h> #ifdef __BCPLUSPLUS__ #include <iterator> @@ -25,6 +20,7 @@ using namespace std; using namespace IceStorm; +using namespace IceStormElection; // // Per Subscriber object. @@ -56,6 +52,9 @@ public: vector<Ice::Byte>&, const Ice::Current& current) { + // Use cached reads. + CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); + EventDataPtr event = new EventData( current.operation, current.mode, @@ -71,12 +70,7 @@ public: EventDataSeq e; e.push_back(event); - Subscriber::QueueState state = _subscriber->queue(false, e); - - if(state == Subscriber::QueueStateFlush) - { - _instance->subscriberPool()->flush(_subscriber); - } + _subscriber->queue(false, e); return true; } @@ -88,441 +82,368 @@ private: typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr; } + // Each of the various Subscriber types. namespace { -class SubscriberOneway : public Subscriber +class SubscriberBatch : public Subscriber { public: - SubscriberOneway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); - // - // Oneway - // - virtual bool flush(); - virtual void destroy(); + SubscriberBatch(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&); + ~SubscriberBatch(); + + virtual void flush(); + + void doFlush(); private: - const bool _batch; const Ice::ObjectPrx _obj; - /*const*/ Ice::ObjectPrx _objBatch; + const IceUtil::Time _interval; }; +typedef IceUtil::Handle<SubscriberBatch> SubscriberBatchPtr; -class SubscriberTwoway : public Subscriber +class SubscriberOneway : public Subscriber { public: - SubscriberTwoway(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); + SubscriberOneway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&); + ~SubscriberOneway(); - virtual bool flush(); - void response(); + virtual void flush(); + + void sent(); private: const Ice::ObjectPrx _obj; - const int _maxOutstanding; - int _outstanding; }; -typedef IceUtil::Handle<SubscriberTwoway> SubscriberTwowayPtr; +typedef IceUtil::Handle<SubscriberOneway> SubscriberOnewayPtr; -// -// Twoway Ordered -// -class SubscriberTwowayOrdered : public Subscriber +class SubscriberTwoway : public Subscriber { public: - SubscriberTwowayOrdered(const InstancePtr&, const Ice::ObjectPrx&, const Ice::ObjectPrx&); + SubscriberTwoway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, int, + const Ice::ObjectPrx&); - virtual bool flush(); - void response(); + virtual void flush(); private: const Ice::ObjectPrx _obj; }; -typedef IceUtil::Handle<SubscriberTwowayOrdered> SubscriberTwowayOrderedPtr; class SubscriberLink : public Subscriber { public: - SubscriberLink(const InstancePtr&, const TopicLinkPrx&, int); + SubscriberLink(const InstancePtr&, const SubscriberRecord&); - virtual QueueState queue(bool, const std::vector<EventDataPtr>&); - virtual bool flush(); - void response(); - - void offline(const Ice::Exception&); + virtual void flush(); private: const TopicLinkPrx _obj; - const int _cost; - - // The next to try sending a new event if we're offline. - IceUtil::Time _next; - bool _warn; }; -typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr; - -} -SubscriberOneway::SubscriberOneway( - const InstancePtr& instance, - const Ice::ObjectPrx& proxy, - const Ice::ObjectPrx& obj) : - Subscriber(instance, proxy, false, obj->ice_getIdentity()), - _batch(obj->ice_isBatchDatagram() || obj->ice_isBatchOneway()), - _obj(obj) +class ResponseTimerTask : public IceUtil::TimerTask { - // - // COMPILERFIX: Initialized this way for Borland to compile. - // - if(obj->ice_isDatagram()) +public: + ResponseTimerTask(const SubscriberPtr& subscriber) : + _subscriber(subscriber) { - _objBatch = obj->ice_batchDatagram(); } - else + + virtual void + runTimerTask() { - _objBatch = obj->ice_batchOneway(); + _subscriber->flush(); } - if(_batch) +private: + + const SubscriberPtr _subscriber; +}; + +class OnewayIceInvokeI : public Ice::AMI_Object_ice_invoke +{ +public: + + OnewayIceInvokeI(const SubscriberOnewayPtr& subscriber) : + _subscriber(subscriber) { - _instance->batchFlusher()->add(_obj); } -} -bool -SubscriberOneway::flush() -{ - IceUtil::Mutex::Lock sync(_mutex); - - // - // If the subscriber errored out then we're done. - // - if(_state == SubscriberStateError) + virtual void + ice_response(bool, const std::vector<Ice::Byte>&) { - return false; + assert(false); } - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); - try + virtual void __sent(Ice::ConnectionI* c) { - // - // Get the current set of events, but release the lock before - // attempting to deliver the events. This allows other threads - // to add events in case we block (such as during connection - // establishment). - // - EventDataSeq v; - v.swap(_events); - sync.release(); + AMI_Object_ice_invoke::__sent(c); + _subscriber->sent(); + } - // - // Deliver the events without holding the lock. - // - // If there are more than one event queued and we are not in - // batch sending mode then send the events as a batch and then - // flush immediately, otherwise send one at a time. - // - vector<Ice::Byte> dummy; - if(v.size() > 1 && !_batch) - { - Ice::ConnectionPtr conn = _objBatch->ice_getConnection(); - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) - { - _objBatch->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); - } - conn->flushBatchRequests(); - } - else - { - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) - { - _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); - } - } - - // - // Reacquire the lock before we check the queue again. - // - sync.acquire(); + virtual void + ice_exception(const Ice::Exception& e) + { + _subscriber->error(true, e); } - catch(const Ice::LocalException& ex) + +private: + + const SubscriberOnewayPtr _subscriber; +}; + +class IceInvokeI : public Ice::AMI_Object_ice_invoke +{ +public: + + IceInvokeI(const SubscriberPtr& subscriber) : + _subscriber(subscriber) { - assert(!sync.acquired()); - // error will re-acquire and release the lock. - error(ex); - return false; } - if(!_events.empty()) + virtual void + ice_response(bool, const std::vector<Ice::Byte>&) { - assert(_state == SubscriberStateFlushPending); - return true; + _subscriber->response(); } - _state = SubscriberStateOnline; - return false; -} -void -SubscriberOneway::destroy() -{ - if(_batch) + virtual void + ice_exception(const Ice::Exception& e) { - _instance->batchFlusher()->remove(_obj); + _subscriber->error(true, e); } - Subscriber::destroy(); -} -namespace -{ +private: -class TwowayInvokeI : public Ice::AMI_Object_ice_invoke + const SubscriberPtr _subscriber; +}; + +class FlushBatchI : public Ice::AMI_Object_ice_flushBatchRequests { public: - TwowayInvokeI(const SubscriberTwowayPtr& subscriber) : + FlushBatchI(const SubscriberPtr& subscriber) : _subscriber(subscriber) { } virtual void - ice_response(bool, const std::vector<Ice::Byte>&) + ice_exception(const Ice::Exception& e) + { + _subscriber->error(false, e); + } + +private: + + const SubscriberPtr _subscriber; +}; + +class FlushTimerTask : public IceUtil::TimerTask +{ +public: + + FlushTimerTask(const SubscriberBatchPtr& subscriber) : + _subscriber(subscriber) { - _subscriber->response(); } virtual void - ice_exception(const Ice::Exception& e) + runTimerTask() { - _subscriber->error(e); + _subscriber->doFlush(); } private: - const SubscriberTwowayPtr _subscriber; + const SubscriberBatchPtr _subscriber; }; } -SubscriberTwoway::SubscriberTwoway( +SubscriberBatch::SubscriberBatch( const InstancePtr& instance, + const SubscriberRecord& rec, const Ice::ObjectPrx& proxy, + int retryCount, const Ice::ObjectPrx& obj) : - Subscriber(instance, proxy, false, obj->ice_getIdentity()), + Subscriber(instance, rec, proxy, retryCount, 1), _obj(obj), - _maxOutstanding(10), - _outstanding(0) + _interval(instance->flushInterval()) { + assert(retryCount == 0); } -bool -SubscriberTwoway::flush() +SubscriberBatch::~SubscriberBatch() { - EventDataPtr e; - { - IceUtil::Mutex::Lock sync(_mutex); - - // - // If the subscriber errored out then we're done. - // - if(_state == SubscriberStateError) - { - return false; - } - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); +} - // - // If there are more than _maxOutstanding unanswered AMI - // events we're also done. In this case the response to the - // pending AMI requests will trigger another event to be sent. - // - if(_outstanding >= _maxOutstanding) - { - _state = SubscriberStateSending; - return false; - } - - // - // Dequeue the head event, count one more outstanding AMI - // request. - // - e = _events.front(); - _events.erase(_events.begin()); - _state = SubscriberStateSending; +void +SubscriberBatch::flush() +{ + if(_outstanding == 0) + { ++_outstanding; + _instance->batchFlusher()->schedule(new FlushTimerTask(this), _interval); } +} - _obj->ice_invoke_async(new TwowayInvokeI(this), e->op, e->mode, e->data, e->context); - - // - // We process the subscriber state after the event send and not - // before to prevent the subscriber from being requeued - // concurrently. - // +void +SubscriberBatch::doFlush() +{ + EventDataSeq v; { - IceUtil::Mutex::Lock sync(_mutex); - // - // If the subscriber has already been requeued for a flush or - // the subscriber errored out then we're done. - // - if(_state == SubscriberStateFlushPending || _state == SubscriberStateError) + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + --_outstanding; + assert(_outstanding == 0); + + if(_shutdown) { - return false; + _lock.notify(); } + v.swap(_events); + assert(!v.empty()); + } - // - // If there are no events left in the queue transition back to - // the online state, and return false to indicate to the - // worker not to requeue. - // - if(_events.empty()) - { - _state = SubscriberStateOnline; - return false; - } + vector<Ice::Byte> dummy; + for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); + } - // - // We must still be in sending state. - // - assert(_state == SubscriberStateSending); + _obj->ice_flushBatchRequests_async(new FlushBatchI(this)); - // - // If we're below the outstanding limit then requeue, - // otherwise the response callback will do so. - // - if(_outstanding < _maxOutstanding) - { - _state = SubscriberStateFlushPending; - } + // This is significantly faster than the async version, but it can + // block the calling thread. Bad news! - return _state == SubscriberStateFlushPending; - } + //_obj->ice_flushBatchRequests(); } -void -SubscriberTwoway::response() +SubscriberOneway::SubscriberOneway( + const InstancePtr& instance, + const SubscriberRecord& rec, + const Ice::ObjectPrx& proxy, + int retryCount, + const Ice::ObjectPrx& obj) : + Subscriber(instance, rec, proxy, retryCount, 5), + _obj(obj) { - IceUtil::Mutex::Lock sync(_mutex); + assert(retryCount == 0); +} - --_outstanding; +SubscriberOneway::~SubscriberOneway() +{ +} +void +SubscriberOneway::flush() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + // - // Note that its possible for the _state to be error if there are - // mutliple threads in the client side thread pool and response - // and exception are called out of order. + // If the subscriber isn't online we're done. // - assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + if(_state != SubscriberStateOnline || _events.empty()) + { + return; + } - // - // Unless we're in the sending state we do nothing. - // - if(_state == SubscriberStateSending) + // Send up to _maxOutstanding pending events. + while(_outstanding < _maxOutstanding && !_events.empty()) { // - // If there are no more events then we transition back to - // online. + // Dequeue the head event, count one more outstanding AMI + // request. // - if(_events.empty()) + EventDataPtr e = _events.front(); + _events.erase(_events.begin()); + ++_outstanding; + + try { - _state = SubscriberStateOnline; + _obj->ice_invoke_async(new OnewayIceInvokeI(this), e->op, e->mode, e->data, e->context); } - // - // Otherwise we re-add for a flush. - // - else + catch(const Ice::Exception& ex) { - _state = SubscriberStateFlushPending; - _instance->subscriberPool()->flush(this); + error(true, ex); + return; } } } -namespace -{ - -class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke +void +SubscriberOneway::sent() { -public: - - TwowayOrderedInvokeI(const SubscriberTwowayOrderedPtr& subscriber) : - _subscriber(subscriber) - { - } + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // Decrement the _outstanding count. + --_outstanding; + assert(_outstanding >= 0 && _outstanding < _maxOutstanding); - virtual void - ice_response(bool, const std::vector<Ice::Byte>&) + if(_events.empty() && _outstanding == 0 && _shutdown) { - _subscriber->response(); + _lock.notify(); } - - virtual void - ice_exception(const Ice::Exception& ex) + else if(_outstanding == 0 && !_events.empty()) { - _subscriber->error(ex); + _instance->batchFlusher()->schedule(new ResponseTimerTask(this), IceUtil::Time::seconds(0)); } - -private: - - const SubscriberTwowayOrderedPtr _subscriber; -}; - } -SubscriberTwowayOrdered::SubscriberTwowayOrdered( +SubscriberTwoway::SubscriberTwoway( const InstancePtr& instance, + const SubscriberRecord& rec, const Ice::ObjectPrx& proxy, + int retryCount, + int maxOutstanding, const Ice::ObjectPrx& obj) : - Subscriber(instance, proxy, false, obj->ice_getIdentity()), + Subscriber(instance, rec, proxy, retryCount, maxOutstanding), _obj(obj) { } -bool -SubscriberTwowayOrdered::flush() +void +SubscriberTwoway::flush() { - EventDataPtr e; + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // + // If the subscriber isn't online we're done. + // + if(_state != SubscriberStateOnline || _events.empty()) + { + return; + } + + // Send up to _maxOutstanding pending events. + while(_outstanding < _maxOutstanding && !_events.empty()) { - IceUtil::Mutex::Lock sync(_mutex); - // - // If the subscriber errored out then we're done. + // Dequeue the head event, count one more outstanding AMI + // request. // - if(_state == SubscriberStateError) + EventDataPtr e = _events.front(); + _events.erase(_events.begin()); + ++_outstanding; + + try { - return false; + _obj->ice_invoke_async(new IceInvokeI(this), e->op, e->mode, e->data, e->context); + } + catch(const Ice::Exception& ex) + { + error(true, ex); + return; } - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); - - e = _events.front(); - _events.erase(_events.begin()); - } - - _obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context); - - return false; -} - -void -SubscriberTwowayOrdered::response() -{ - IceUtil::Mutex::Lock sync(_mutex); - - assert(_state != SubscriberStateError); - if(_events.empty()) - { - _state = SubscriberStateOnline; - return; } - _instance->subscriberPool()->flush(this); } namespace @@ -532,7 +453,7 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward { public: - Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : + Topiclink_forwardI(const SubscriberPtr& subscriber) : _subscriber(subscriber) { } @@ -544,328 +465,266 @@ public: } virtual void - ice_exception(const Ice::Exception& ex) + ice_exception(const Ice::Exception& e) { - try - { - ex.ice_throw(); - } - catch(const Ice::ObjectNotExistException& ex) - { - _subscriber->error(ex); - } - catch(const Ice::LocalException& ex) - { - _subscriber->offline(ex); - } + _subscriber->error(true, e); } private: - const SubscriberLinkPtr _subscriber; + const SubscriberPtr _subscriber; }; } SubscriberLink::SubscriberLink( const InstancePtr& instance, - const TopicLinkPrx& obj, - int cost) : - Subscriber(instance, 0, true, obj->ice_getIdentity()), - _obj(TopicLinkPrx::uncheckedCast(obj->ice_collocationOptimized(false))), - _cost(cost), - _warn(true) + const SubscriberRecord& rec) : + Subscriber(instance, rec, 0, -1, 1), + _obj(TopicLinkPrx::uncheckedCast(rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout()))) { } -Subscriber::QueueState -SubscriberLink::queue(bool forwarded, const EventDataSeq& events) +void +SubscriberLink::flush() { - if(forwarded) + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + if(_state != SubscriberStateOnline || _outstanding > 0) { - return QueueStateNoFlush; + return; } - // - // Don't propagate a message that has already been forwarded. - // Also, if this link has a non-zero cost, then don't propagate a - // message whose cost exceeds the link cost. - // - - IceUtil::Mutex::Lock sync(_mutex); - - if(_state == SubscriberStateError) - { - return QueueStateError; - } + EventDataSeq v; + v.swap(_events); - // - // If the proxy is offline and its time to send another event then - // put us into retry state. - // - if(_state == SubscriberStateOffline) + EventDataSeq::iterator p = v.begin(); + while(p != v.end()) { - // - // If there are alot of subscribers offline then we will call - // Time::now() alot, which could be costly. This could be - // optimized to only one per event-batch by making the - // forwarded argument an EventInfo thing where the queue-time - // is lazy initialized. - // - if(IceUtil::Time::now(IceUtil::Time::Monotonic) < _next) + if(_rec.cost != 0) { - return QueueStateNoFlush; - } - - // - // State transition to online. - // - _state = SubscriberStateOnline; - } - - int queued = 0; - for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p) - { - if(_cost != 0) - { - // - // Note that we could calculate this cost once and cache - // it in a private form of the event to avoid this if this - // really is a performance problem (this could use the - // EventInfo thing discussed above). - // int cost = 0; Ice::Context::const_iterator q = (*p)->context.find("cost"); if(q != (*p)->context.end()) { cost = atoi(q->second.c_str()); } - if(cost > _cost) + if(cost > _rec.cost) { + p = v.erase(p); continue; } } - ++queued; - _events.push_back(*p); + ++p; } - if(_state == SubscriberStateFlushPending || queued == 0) + if(!v.empty()) { - return QueueStateNoFlush; - } - _state = SubscriberStateFlushPending; - return QueueStateFlush; -} - -bool -SubscriberLink::flush() -{ - EventDataSeq v; - { - IceUtil::Mutex::Lock sync(_mutex); - - // - // If the subscriber errored out then we're done. - // - if(_state == SubscriberStateError) + try { - return false; + ++_outstanding; + _obj->forward_async(new Topiclink_forwardI(this), v); } - - assert(_state == SubscriberStateFlushPending); - assert(!_events.empty()); - - v.swap(_events); - } - - _obj->forward_async(new Topiclink_forwardI(this), v); - return false; -} - -void -SubscriberLink::response() -{ - IceUtil::Mutex::Lock sync(_mutex); - - assert(_state != SubscriberStateError); - - // - // A successful response means we're no longer retrying, we're - // back active. - // - _warn = true; - - // - // No more events, no need to requeue this subscriber. - // - if(_events.empty()) - { - _state = SubscriberStateOnline; - return; - } - _instance->subscriberPool()->flush(this); -} - -void -SubscriberLink::offline(const Ice::Exception& e) -{ - IceUtil::Mutex::Lock sync(_mutex); - assert(_state != SubscriberStateOffline); - - _next = IceUtil::Time::now(IceUtil::Time::Monotonic) + _instance->discardInterval(); - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(_warn) - { - Ice::Warning warn(traceLevels->logger); - warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id) - << ": link offline: " << e; - } - else - { - if(traceLevels->subscriber > 0) + catch(const Ice::Exception& ex) { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << _instance->communicator()->identityToString(_id) << ": link offline: " << e - << " discarding events: " << _instance->discardInterval() << "s"; + error(true, ex); } } - - _state = SubscriberStateOffline; - _warn = false; - - // - // Clear all queued events. - // - _events.clear(); } SubscriberPtr Subscriber::create( const InstancePtr& instance, - const string& topicName, - const Ice::ObjectPrx& obj, - const IceStorm::QoS& qos) -{ - PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance); - Ice::Identity perId; - perId.category = instance->instanceName(); - perId.name = "topic." + topicName + ".publish." + - instance->communicator()->identityToString(obj->ice_getIdentity()); - Ice::ObjectPrx proxy = instance->objectAdapter()->add(per, perId); - TraceLevelsPtr traceLevels = instance->traceLevels(); - SubscriberPtr subscriber; - - try - { - string reliability; - QoS::const_iterator p = qos.find("reliability"); - if(p != qos.end()) - { - reliability = p->second; - } - if(!reliability.empty() && reliability != "ordered") - { - throw BadQoS("invalid reliability: " + reliability); - } + const SubscriberRecord& rec) +{ + if(rec.link) + { + return new SubscriberLink(instance, rec); + } + else + { + PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance); + Ice::Identity perId; + perId.category = instance->instanceName(); + perId.name = "topic." + rec.topicName + ".publish." + + instance->communicator()->identityToString(rec.obj->ice_getIdentity()); + Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId); + TraceLevelsPtr traceLevels = instance->traceLevels(); + SubscriberPtr subscriber; - // - // Override the timeout. - // - Ice::ObjectPrx newObj; try { - newObj = obj->ice_timeout(instance->sendTimeout()); - } - catch(const Ice::FixedProxyException&) - { + int retryCount = 0; + QoS::const_iterator p = rec.theQoS.find("retryCount"); + if(p != rec.theQoS.end()) + { + retryCount = atoi(p->second.c_str()); + } + + string reliability; + p = rec.theQoS.find("reliability"); + if(p != rec.theQoS.end()) + { + reliability = p->second; + } + if(!reliability.empty() && reliability != "ordered") + { + throw BadQoS("invalid reliability: " + reliability); + } + // - // In the event IceStorm is collocated this could be a - // fixed proxy in which case its not possible to set the - // timeout. + // Override the timeout. // - newObj = obj; - } - if(reliability == "ordered") - { - if(!newObj->ice_isTwoway()) + Ice::ObjectPrx newObj; + try { - throw BadQoS("ordered reliability requires a twoway proxy"); + newObj = rec.obj->ice_timeout(instance->sendTimeout()); } - subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); - } - else if(newObj->ice_isOneway() || newObj->ice_isDatagram() || - newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) - { - subscriber = new SubscriberOneway(instance, proxy, newObj); + catch(const Ice::FixedProxyException&) + { + // + // In the event IceStorm is collocated this could be a + // fixed proxy in which case its not possible to set the + // timeout. + // + newObj = rec.obj; + } + if(reliability == "ordered") + { + if(!newObj->ice_isTwoway()) + { + throw BadQoS("ordered reliability requires a twoway proxy"); + } + subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj); + } + else if(newObj->ice_isOneway() || newObj->ice_isDatagram()) + { + if(retryCount > 0) + { + throw BadQoS("non-zero retryCount QoS requires a twoway proxy"); + } + subscriber = new SubscriberOneway(instance, rec, proxy, retryCount, newObj); + } + else if(newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) + { + if(retryCount > 0) + { + throw BadQoS("non-zero retryCount QoS requires a twoway proxy"); + } + subscriber = new SubscriberBatch(instance, rec, proxy, retryCount, newObj); + } + else //if(newObj->ice_isTwoway()) + { + assert(newObj->ice_isTwoway()); + subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj); + } + per->setSubscriber(subscriber); } - else if(newObj->ice_isTwoway()) + catch(const Ice::Exception&) { - subscriber = new SubscriberTwoway(instance, proxy, newObj); + instance->publishAdapter()->remove(proxy->ice_getIdentity()); + throw; } - per->setSubscriber(subscriber); - } - catch(const Ice::Exception&) - { - instance->objectAdapter()->remove(proxy->ice_getIdentity()); - throw; - } - - return subscriber; -} -SubscriberPtr -Subscriber::create( - const InstancePtr& instance, - const TopicLinkPrx& link, - int cost) -{ - return new SubscriberLink( - instance, - TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())), - cost); + return subscriber; + } } Subscriber::~Subscriber() { + //cout << "~Subscriber" << endl; } Ice::ObjectPrx Subscriber::proxy() const { - return _proxy; + return _proxyReplica; } Ice::Identity Subscriber::id() const { - return _id; + return _rec.id; } -bool -Subscriber::persistent() const +SubscriberRecord +Subscriber::record() const { - return _persistent; + return _rec; } -Subscriber::QueueState -Subscriber::queue(bool, const EventDataSeq& events) +bool +Subscriber::queue(bool forwarded, const EventDataSeq& events) { - IceUtil::Mutex::Lock sync(_mutex); + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // If this is a link subscriber if the set of events were + // forwarded from another IceStorm instance then do not queue the + // events. + if(forwarded && _rec.link) + { + return true; + } + + switch(_state) + { + case SubscriberStateOffline: + { + if(IceUtil::Time::now(IceUtil::Time::Monotonic) < _next) + { + break; + } + + // + // State transition to online. + // + setState(SubscriberStateOnline); + // fall through + } + case SubscriberStateOnline: + copy(events.begin(), events.end(), back_inserter(_events)); + flush(); + break; + + case SubscriberStateError: + return false; + + case SubscriberStateReaped: + break; + } + + return true; +} + +bool +Subscriber::reap() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + assert(_state >= SubscriberStateError); if(_state == SubscriberStateError) { - return QueueStateError; + setState(SubscriberStateReaped); + return true; } + return false; +} - copy(events.begin(), events.end(), back_inserter(_events)); - if(_state == SubscriberStateSending || _state == SubscriberStateFlushPending) +void +Subscriber::resetIfReaped() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + if(_state == SubscriberStateReaped) { - return QueueStateNoFlush; + setState(SubscriberStateError); } +} - _state = SubscriberStateFlushPending; - return QueueStateFlush; +bool +Subscriber::errored() const +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + return _state >= SubscriberStateError; } void @@ -878,7 +737,7 @@ Subscriber::destroy() { try { - _instance->objectAdapter()->remove(_proxy->ice_getIdentity()); + _instance->publishAdapter()->remove(_proxy->ice_getIdentity()); } catch(const Ice::NotRegisteredException&) { @@ -892,60 +751,189 @@ Subscriber::destroy() } void -Subscriber::flushTime(const IceUtil::Time& interval) +Subscriber::error(bool dec, const Ice::Exception& e) { - if(_resetMax || interval > _maxSend) + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + if(dec) { - // - // If is possible for the flush interval to be zero if the - // timer resolution is sufficiently big. See - // http://bugzilla.zeroc.com/bugzilla/show_bug.cgi?id=1739 - // - //assert(interval != IceUtil::Time()); - _resetMax = false; - _maxSend = interval; + // Decrement the _outstanding count. + --_outstanding; + assert(_outstanding >= 0 && _outstanding < _maxOutstanding); } -} -IceUtil::Time -Subscriber::pollMaxFlushTime(const IceUtil::Time& now) -{ - // The next call to flushTime can reset the max time. - _resetMax = true; - return _maxSend; -} + // A hard error is an ObjectNotExistException or + // NotRegisteredException. + bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) || + dynamic_cast<const Ice::NotRegisteredException*>(&e); -void -Subscriber::error(const Ice::Exception& e) -{ - IceUtil::Mutex::Lock sync(_mutex); - if(_state != SubscriberStateError) + // + // A twoway subscriber can queue multiple send events and + // therefore its possible to get multiple error'd replies. Ignore + // replies if we're retrying and its not yet time to process the + // next request. + // + IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic); + if(!hardError && _state == SubscriberStateOffline && now < _next) + { + return; + } + + // + // If we're in our retry limits and the error isn't a hard failure + // (that is ObjectNotExistException or NotRegisteredException) + // then we transition to an offline state. + // + if(!hardError && (_retryCount == -1 || _currentRetry < _retryCount)) + { + assert(_state < SubscriberStateError); + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(_currentRetry == 0) + { + Ice::Warning warn(traceLevels->logger); + warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_rec.id) + << ": subscriber offline: " << e + << " discarding events: " << _instance->discardInterval() << "s retryCount: " << _retryCount; + } + else + { + if(traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << this << " "; + out << _instance->communicator()->identityToString(_rec.id) << ": subscriber offline: " << e + << " discarding events: " << _instance->discardInterval() << "s retry: " + << _currentRetry << "/" << _retryCount; + } + } + + // Transition to offline state, increment the retry count and + // clear all queued events. + _next = now + _instance->discardInterval(); + ++_currentRetry; + _events.clear(); + setState(SubscriberStateOffline); + } + // Errored out. + else if(_state < SubscriberStateError) { - _state = SubscriberStateError; _events.clear(); + setState(SubscriberStateError); TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->subscriber > 0) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << _instance->communicator()->identityToString(_id) << ": topic publish failed: " << e; + out << this << " "; + out << _instance->communicator()->identityToString(_rec.id) << ": subscriber errored out: " << e + << " retry: " << _currentRetry << "/" << _retryCount; } } + + if(_shutdown && _events.empty()) + { + _lock.notify(); + } +} + +void +Subscriber::response() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // Decrement the _outstanding count. + --_outstanding; + assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + + // + // A successful response means we're no longer retrying, we're + // back active. + // + _currentRetry = 0; + + if(_events.empty() && _outstanding == 0 && _shutdown) + { + _lock.notify(); + } + else + { + flush(); + } +} + +void +Subscriber::shutdown() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + _shutdown = true; + while(_outstanding > 0 && !_events.empty()) + { + _lock.wait(); + } } Subscriber::Subscriber( const InstancePtr& instance, + const SubscriberRecord& rec, const Ice::ObjectPrx& proxy, - bool persistent, - const Ice::Identity& id) : + int retryCount, + int maxOutstanding) : _instance(instance), - _id(id), - _persistent(persistent), + _rec(rec), + _retryCount(retryCount), + _maxOutstanding(maxOutstanding), _proxy(proxy), + _proxyReplica(proxy), + _shutdown(false), _state(SubscriberStateOnline), - _resetMax(true), - _maxSend(IceUtil::Time::seconds(60*24)) // A long time + _outstanding(0), + _currentRetry(0) +{ + if(_proxy && _instance->publisherReplicaProxy()) + { + const_cast<Ice::ObjectPrx&>(_proxyReplica) = + _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity()); + } +} + +namespace +{ + +string +stateToString(Subscriber::SubscriberState state) { + switch(state) + { + case Subscriber::SubscriberStateOnline: + return "online"; + case Subscriber::SubscriberStateOffline: + return "offline"; + case Subscriber::SubscriberStateError: + return "error"; + case Subscriber::SubscriberStateReaped: + return "reaped"; + default: + return "???"; + } +} + +} + +void +Subscriber::setState(Subscriber::SubscriberState state) +{ + if(state != _state) + { + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(traceLevels->subscriber > 1) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << this << " transition from " << stateToString(_state) << " to " << stateToString(state); + } + _state = state; + } } bool |