diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 304 |
1 files changed, 182 insertions, 122 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 1e93b7d250e..000f7cbfd5e 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -107,9 +107,10 @@ public: virtual void destroy(); private: + const bool _batch; const Ice::ObjectPrx _obj; - /*const*/ Ice::ObjectPrx _objBatch; + const Ice::ObjectPrx _objBatch; }; class SubscriberTwoway : public Subscriber @@ -159,16 +160,21 @@ public: virtual bool flush(); void response(); + void offline(const Ice::Exception&); + private: const TopicLinkPrx _obj; const int _cost; + + // The next to try sending a new event if we're offline. + IceUtil::Time _next; + bool _warn; }; typedef IceUtil::Handle<SubscriberLink> SubscriberLinkPtr; } - SubscriberOneway::SubscriberOneway( const InstancePtr& instance, const Ice::ObjectPrx& proxy, @@ -176,9 +182,9 @@ SubscriberOneway::SubscriberOneway( bool batch) : Subscriber(instance, proxy, false, obj->ice_getIdentity()), _batch(batch), - _obj(obj) + _obj(obj), + _objBatch(obj->ice_isDatagram() ? obj->ice_batchDatagram() : obj->ice_batchOneway()) { - _objBatch = obj->ice_isDatagram() ? _obj->ice_batchDatagram() : _obj->ice_batchOneway(); if(batch) { @@ -194,13 +200,12 @@ SubscriberOneway::flush() // // If the subscriber errored out then we're done. // - if(_state != StateActive) + if(_state == SubscriberStateError) { - _busy = false; return false; } + assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); - assert(_busy); try { @@ -215,12 +220,12 @@ SubscriberOneway::flush() sync.release(); // XXX: - /* +/* TraceLevelsPtr traceLevels = _instance->traceLevels(); if(_obj->ice_getIdentity().name.substr(0, 4) == "slow") { - //Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - //out << "deliberately stalling"; + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << "deliberately stalling"; sleep(2); } if(_obj->ice_getIdentity().name.substr(0, 5) == "block") @@ -235,7 +240,7 @@ SubscriberOneway::flush() out << "<- stall for 100s"; } } - */ +*/ // // Deliver the events without holding the lock. @@ -267,22 +272,22 @@ SubscriberOneway::flush() // Reacquire the lock before we check the queue again. // sync.acquire(); - - // - // If there have been more events queued in the meantime then - // we are still busy. - // - _busy = !_events.empty(); } catch(const Ice::LocalException& ex) { assert(!sync.acquired()); - // setError will re-acquire and release the lock. - setError(ex); + // error will re-acquire and release the lock. + error(ex); return false; } - return _busy; + if(!_events.empty()) + { + _state = SubscriberStateFlushPending; + return true; + } + _state = SubscriberStateOnline; + return false; } void @@ -315,7 +320,7 @@ public: virtual void ice_exception(const Ice::Exception& e) { - _subscriber->setError(e); + _subscriber->error(e); } private: @@ -342,13 +347,12 @@ SubscriberTwoway::flush() // // If the subscriber errored out then we're done. // - if(_state != StateActive) + if(_state == SubscriberStateError) { - _busy = false; return false; } + assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); - assert(_busy); // // Get the current set of events, but release the lock before @@ -375,11 +379,15 @@ SubscriberTwoway::flush() // // If there have been more events queued in the meantime then - // we are still busy. + // we have a pending flush. // - _busy = !_events.empty(); - - return _busy; + if(!_events.empty()) + { + _state = SubscriberStateFlushPending; + return true; + } + _state = SubscriberStateOnline; + return false; } namespace @@ -403,7 +411,7 @@ public: virtual void ice_exception(const Ice::Exception& ex) { - _subscriber->setError(ex); + _subscriber->error(ex); } private: @@ -432,13 +440,12 @@ SubscriberTwowayOrdered::flush() // // If the subscriber errored out then we're done. // - if(_state != StateActive) + if(_state == SubscriberStateError) { - _busy = false; return false; } + assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); - assert(_busy); e = _events.front(); _events.erase(_events.begin()); @@ -455,11 +462,10 @@ SubscriberTwowayOrdered::response() { IceUtil::Mutex::Lock sync(_mutex); - assert(_state == StateActive && _busy); - + assert(_state != SubscriberStateError); if(_events.empty()) { - _busy = false; + _state = SubscriberStateOnline; return; } } @@ -492,11 +498,11 @@ public: } catch(const Ice::ObjectNotExistException& ex) { - _subscriber->setError(ex); + _subscriber->error(ex); } catch(const Ice::LocalException& ex) { - _subscriber->setUnreachable(ex); + _subscriber->offline(ex); } } @@ -513,7 +519,8 @@ SubscriberLink::SubscriberLink( int cost) : Subscriber(instance, 0, true, obj->ice_getIdentity()), _obj(TopicLinkPrx::uncheckedCast(obj->ice_collocationOptimized(false))), - _cost(cost) + _cost(cost), + _warn(true) { } @@ -527,29 +534,51 @@ SubscriberLink::queue(bool forwarded, const EventSeq& events) // // Don't propagate a message that has already been forwarded. - // Also, if this link has a non-zero cost, then don't propagate - // a message whose cost exceeds the link cost. + // Also, if this link has a non-zero cost, then don't propagate a + // message whose cost exceeds the link cost. // IceUtil::Mutex::Lock sync(_mutex); - - if(_state != StateActive) + + if(_state == SubscriberStateError) { + return QueueStateError; + } + + // + // If the proxy is offline and its time to send another event then + // put us into retry state. + // + if(_state == SubscriberStateOffline) + { + // + // If there are alot of subscribers offline then we will call + // Time::now() alot, which could be costly. This could be + // optimized to only one per event-batch by making the + // forwarded argument an EventInfo thing where the queue-time + // is lazy initialized. // - // Either the state is error here or the link is inactive. + if(IceUtil::Time::now() < _next) + { + return QueueStateNoFlush; + } + // - return (_state == StateError) ? QueueStateError : QueueStateNoFlush; + // State transition to online. + // + _state = SubscriberStateOnline; } - size_t s = _events.size(); + int queued = 0; for(EventSeq::const_iterator p = events.begin(); p != events.end(); ++p) { if(_cost != 0) { // - // Note that we could calculate this cost once and - // cache it in a private form of the event to avoid - // this if this really is a performance problem. + // Note that we could calculate this cost once and cache + // it in a private form of the event to avoid this if this + // really is a performance problem (this could use the + // EventInfo thing discussed above). // int cost = 0; Ice::Context::const_iterator q = (*p)->context.find("cost"); @@ -562,19 +591,15 @@ SubscriberLink::queue(bool forwarded, const EventSeq& events) continue; } } + ++queued; _events.push_back(*p); } - // - // If no event was queued, or we're busy then the subscriber - // doesn't need to be flushed, otherwise it must be. - // - if(_busy || s == _events.size()) + if(_state == SubscriberStateFlushPending || queued == 0) { return QueueStateNoFlush; } - - _busy = true; + _state = SubscriberStateFlushPending; return QueueStateFlush; } @@ -588,13 +613,13 @@ SubscriberLink::flush() // // If the subscriber errored out then we're done. // - if(_state != StateActive) + if(_state == SubscriberStateError) { - _busy = false; return false; } + + assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); - assert(_busy); v.swap(_events); } @@ -609,18 +634,62 @@ SubscriberLink::response() { { IceUtil::Mutex::Lock sync(_mutex); + + assert(_state == SubscriberStateFlushPending); - assert(_state == StateActive && _busy); - + // + // A successful response means we're no longer retrying, we're + // back active. + // + _warn = true; + + // + // No more events, no need to requeue this subscriber. + // if(_events.empty()) { - _busy = false; + _state = SubscriberStateOnline; return; } } _instance->subscriberPool()->flush(this); } +void +SubscriberLink::offline(const Ice::Exception& e) +{ + IceUtil::Mutex::Lock sync(_mutex); + if(_state != SubscriberStateOffline) + { + _next = IceUtil::Time::now() + _instance->discardInterval(); + + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(_warn) + { + Ice::Warning warn(traceLevels->logger); + warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_id) + << ": link offline: " << e; + } + else + { + if(traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << _instance->communicator()->identityToString(_id) << ": link offline: " << e + << " discarding events: " << _instance->discardInterval() << "s"; + } + } + + _state = SubscriberStateOffline; + _warn = false; + + // + // Clear all queued events. + // + _events.clear(); + } +} + SubscriberPtr Subscriber::create( const InstancePtr& instance, @@ -632,6 +701,7 @@ Subscriber::create( TraceLevelsPtr traceLevels = instance->traceLevels(); SubscriberPtr subscriber; + try { string reliability = "oneway"; @@ -640,28 +710,45 @@ Subscriber::create( { reliability = p->second; } + + // + // Override the timeout. + // Ice::ObjectPrx newObj; + try + { + newObj = obj->ice_timeout(instance->sendTimeout()); + } + catch(const Ice::FixedProxyException&) + { + // + // In the event IceStorm is collocated this could be a + // fixed proxy in which case its not possible to set the + // timeout. + // + newObj = obj; + } if(reliability == "batch") { - if(obj->ice_isDatagram()) + if(newObj->ice_isDatagram()) { - newObj = obj->ice_batchDatagram(); + newObj = newObj->ice_batchDatagram(); } else { - newObj = obj->ice_batchOneway(); + newObj = newObj->ice_batchOneway(); } subscriber = new SubscriberOneway(instance, proxy, newObj, true); } else if(reliability == "twoway") { - newObj = obj->ice_twoway(); + newObj = newObj->ice_twoway(); subscriber = new SubscriberTwoway(instance, proxy, newObj); } else if(reliability == "twoway ordered") { - newObj = obj->ice_twoway(); + newObj = newObj->ice_twoway(); subscriber = new SubscriberTwowayOrdered(instance, proxy, newObj); } else // reliability == "oneway" @@ -671,13 +758,9 @@ Subscriber::create( Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); out << reliability <<" mode not understood."; } - if(obj->ice_isDatagram()) - { - newObj = obj; - } - else + if(!newObj->ice_isDatagram()) { - newObj = obj->ice_oneway(); + newObj = newObj->ice_oneway(); } subscriber = new SubscriberOneway(instance, proxy, newObj, false); } @@ -705,23 +788,6 @@ Subscriber::~Subscriber() { } -void -Subscriber::reachable() -{ - IceUtil::Mutex::Lock sync(_mutex); - if(_state == StateUnreachable) - { - _state = StateActive; - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << "Reachable " << _instance->communicator()->identityToString(id()); - } - } -} - Ice::ObjectPrx Subscriber::proxy() const { @@ -745,22 +811,17 @@ Subscriber::queue(bool, const EventSeq& events) { IceUtil::Mutex::Lock sync(_mutex); - if(_state != StateActive) + if(_state == SubscriberStateError) { return QueueStateError; } - copy(events.begin(), events.end(), back_inserter(_events)); - - // - // If another thread is busy delivering events then the subscriber - // does not need to be flushed. - // - if(_busy) + if(_state == SubscriberStateFlushPending) { return QueueStateNoFlush; } - _busy = true; + + _state = SubscriberStateFlushPending; return QueueStateFlush; } @@ -807,13 +868,13 @@ Subscriber::pollMaxFlushTime(const IceUtil::Time& now) } void -Subscriber::setError(const Ice::Exception& e) +Subscriber::error(const Ice::Exception& e) { IceUtil::Mutex::Lock sync(_mutex); - if(_state != StateError) + if(_state != SubscriberStateError) { - _state = StateError; - _busy = false; + _state = SubscriberStateError; + _events.clear(); TraceLevelsPtr traceLevels = _instance->traceLevels(); if(traceLevels->subscriber > 0) @@ -824,24 +885,6 @@ Subscriber::setError(const Ice::Exception& e) } } -void -Subscriber::setUnreachable(const Ice::Exception& e) -{ - IceUtil::Mutex::Lock sync(_mutex); - if(_state != StateUnreachable) - { - _state = StateUnreachable; - _busy = false; - - TraceLevelsPtr traceLevels = _instance->traceLevels(); - if(traceLevels->subscriber > 0) - { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << _instance->communicator()->identityToString(_id) << ": link publish unreachable: " << e; - } - } -} - Subscriber::Subscriber( const InstancePtr& instance, const Ice::ObjectPrx& proxy, @@ -851,8 +894,7 @@ Subscriber::Subscriber( _id(id), _persistent(persistent), _proxy(proxy), - _state(StateActive), - _busy(false), + _state(SubscriberStateOnline), _resetMax(true), _maxSend(IceUtil::Time::seconds(60*24)) // A long time { @@ -864,3 +906,21 @@ IceStorm::operator==(const SubscriberPtr& subscriber, const Ice::Identity& id) return subscriber->id() == id; } +bool +IceStorm::operator==(const Subscriber& s1, const Subscriber& s2) +{ + return &s1 == &s2; +} + +bool +IceStorm::operator!=(const Subscriber& s1, const Subscriber& s2) +{ + return &s1 != &s2; +} + +bool +IceStorm::operator<(const Subscriber& s1, const Subscriber& s2) +{ + return &s1 < &s2; +} + |