diff options
author | Matthew Newhook <matthew@zeroc.com> | 2006-11-20 02:09:09 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2006-11-20 02:09:09 +0000 |
commit | d994d0d5549aa90a50559888a2c29f6b709ce70d (patch) | |
tree | 09a78e7f6bc71af7d2d99d3d9f0abaed0f991853 /cpp/src/IceStorm/Subscriber.cpp | |
parent | minor fix (diff) | |
download | ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.tar.bz2 ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.tar.xz ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.zip |
Added shutdown state.
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 213 |
1 files changed, 179 insertions, 34 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 844f9902b10..1089b4f3e01 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -167,6 +167,11 @@ private: const TopicLinkPrx _obj; const int _cost; + // The first event that is sent we check the whether the object + // supports the "V2" facet. + bool _checkV2; + TopicLinkV2Prx _objV2; + // The next to try sending a new event if we're offline. IceUtil::Time _next; bool _warn; @@ -204,6 +209,10 @@ SubscriberOneway::flush() { return false; } + // + // Flush cannot be called with SubscriberStateShutdown because + // the pool workers are joined with before flushing. + // assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); @@ -281,6 +290,10 @@ SubscriberOneway::flush() return false; } + if(_state == SubscriberStateShutdown) + { + return false; + } if(!_events.empty()) { assert(_state == SubscriberStateFlushPending); @@ -351,6 +364,10 @@ SubscriberTwoway::flush() { return false; } + // + // Flush cannot be called with SubscriberStateShutdown because + // the pool workers are joined with before flushing. + // assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); @@ -377,6 +394,11 @@ SubscriberTwoway::flush() // sync.acquire(); + if(_state == SubscriberStateShutdown) + { + return false; + } + // // If there have been more events queued in the meantime then // we have a pending flush. @@ -444,6 +466,10 @@ SubscriberTwowayOrdered::flush() { return false; } + // + // Flush cannot be called with SubscriberStateShutdown because + // the pool workers are joined with before flushing. + // assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); @@ -459,15 +485,17 @@ SubscriberTwowayOrdered::flush() void SubscriberTwowayOrdered::response() { + IceUtil::Mutex::Lock sync(_mutex); + + assert(_state != SubscriberStateError); + if(_state == SubscriberStateShutdown) { - IceUtil::Mutex::Lock sync(_mutex); - - assert(_state != SubscriberStateError); - if(_events.empty()) - { - _state = SubscriberStateOnline; - return; - } + return; + } + if(_events.empty()) + { + _state = SubscriberStateOnline; + return; } _instance->subscriberPool()->flush(this); } @@ -479,7 +507,45 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward { public: - Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : _subscriber(subscriber) + Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : + _subscriber(subscriber) + { + } + + virtual void + ice_response() + { + _subscriber->response(); + } + + virtual void + ice_exception(const Ice::Exception& ex) + { + try + { + ex.ice_throw(); + } + catch(const Ice::ObjectNotExistException& ex) + { + _subscriber->error(ex); + } + catch(const Ice::LocalException& ex) + { + _subscriber->offline(ex); + } + } + +private: + + const SubscriberLinkPtr _subscriber; +}; + +class TopiclinkV2_forwardI : public IceStorm::AMI_TopicLinkV2_forward +{ +public: + + TopiclinkV2_forwardI(const SubscriberLinkPtr& subscriber) : + _subscriber(subscriber) { } @@ -520,6 +586,7 @@ SubscriberLink::SubscriberLink( Subscriber(instance, 0, true, obj->ice_getIdentity()), _obj(TopicLinkPrx::uncheckedCast(obj->ice_collocationOptimized(false))), _cost(cost), + _checkV2(false), _warn(true) { } @@ -544,6 +611,10 @@ SubscriberLink::queue(bool forwarded, const EventSeq& events) { return QueueStateError; } + if(_state == SubscriberStateShutdown) + { + return QueueStateNoFlush; + } // // If the proxy is offline and its time to send another event then @@ -618,13 +689,66 @@ SubscriberLink::flush() return false; } + // + // Flush cannot be called with SubscriberStateShutdown because + // the pool workers are joined with before flushing. + // assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); v.swap(_events); } - _obj->forward_async(new Topiclink_forwardI(this), v); + // + // Only one thread at a time can be active at this point. + // + if(!_checkV2) + { + try + { + _objV2 = TopicLinkV2Prx::checkedCast(_obj, "V2"); + } + catch(const Ice::ObjectNotExistException& ex) + { + error(ex); + return false; + } + catch(const Ice::LocalException& ex) + { + offline(ex); + return false; + } + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(!_objV2 && traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << _instance->communicator()->identityToString(_id) << ": link is v1 IceStorm."; + } + _checkV2 = true; + } + + if(_objV2) + { + _objV2->forward_async(new TopiclinkV2_forwardI(this), v); + } + else + { + // + // Transform the event sequence into a EventData. + // + EventDataSeq events; + events.reserve(v.size()); + for(EventSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + EventData data; + data.op = (*p)->op; + data.mode = (*p)->mode; + data.data = (*p)->data; + data.context = (*p)->context; + events.push_back(data); + } + _obj->forward_async(new Topiclink_forwardI(this), events); + } return false; } @@ -632,25 +756,27 @@ SubscriberLink::flush() void SubscriberLink::response() { + IceUtil::Mutex::Lock sync(_mutex); + + assert(_state != SubscriberStateError); + if(_state == SubscriberStateShutdown) { - IceUtil::Mutex::Lock sync(_mutex); - - assert(_state == SubscriberStateFlushPending); - - // - // A successful response means we're no longer retrying, we're - // back active. - // - _warn = true; - - // - // No more events, no need to requeue this subscriber. - // - if(_events.empty()) - { - _state = SubscriberStateOnline; - return; - } + return; + } + + // + // A successful response means we're no longer retrying, we're + // back active. + // + _warn = true; + + // + // No more events, no need to requeue this subscriber. + // + if(_events.empty()) + { + _state = SubscriberStateOnline; + return; } _instance->subscriberPool()->flush(this); } @@ -659,6 +785,10 @@ void SubscriberLink::offline(const Ice::Exception& e) { IceUtil::Mutex::Lock sync(_mutex); + if(_state == SubscriberStateShutdown) + { + return; + } assert(_state != SubscriberStateOffline); _next = IceUtil::Time::now() + _instance->discardInterval(); @@ -780,7 +910,10 @@ Subscriber::create( const TopicLinkPrx& link, int cost) { - return new SubscriberLink(instance, link, cost); + return new SubscriberLink( + instance, + TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())), + cost); } Subscriber::~Subscriber() @@ -814,6 +947,11 @@ Subscriber::queue(bool, const EventSeq& events) { return QueueStateError; } + if(_state == SubscriberStateShutdown) + { + return QueueStateNoFlush; + } + copy(events.begin(), events.end(), back_inserter(_events)); if(_state == SubscriberStateFlushPending) { @@ -840,10 +978,21 @@ Subscriber::destroy() { // Ignore } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + // Ignore + } } } void +Subscriber::shutdown() +{ + IceUtil::Mutex::Lock sync(_mutex); + _state = SubscriberStateShutdown; +} + +void Subscriber::flushTime(const IceUtil::Time& interval) { if(_resetMax || interval > _maxSend) @@ -857,10 +1006,6 @@ Subscriber::flushTime(const IceUtil::Time& interval) IceUtil::Time Subscriber::pollMaxFlushTime(const IceUtil::Time& now) { - //IceUtil::Time max = _maxSend; - //_maxSend = _maxSend * 0.95; - //return max; - // The next call to flushTime can reset the max time. _resetMax = true; return _maxSend; @@ -870,7 +1015,7 @@ void Subscriber::error(const Ice::Exception& e) { IceUtil::Mutex::Lock sync(_mutex); - if(_state != SubscriberStateError) + if(_state != SubscriberStateError && _state != SubscriberStateShutdown) { _state = SubscriberStateError; _events.clear(); |