diff options
author | Matthew Newhook <matthew@zeroc.com> | 2006-11-20 02:09:09 +0000 |
---|---|---|
committer | Matthew Newhook <matthew@zeroc.com> | 2006-11-20 02:09:09 +0000 |
commit | d994d0d5549aa90a50559888a2c29f6b709ce70d (patch) | |
tree | 09a78e7f6bc71af7d2d99d3d9f0abaed0f991853 | |
parent | minor fix (diff) | |
download | ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.tar.bz2 ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.tar.xz ice-d994d0d5549aa90a50559888a2c29f6b709ce70d.zip |
Added shutdown state.
-rw-r--r-- | cpp/src/IceStorm/IceStormInternal.ice | 44 | ||||
-rw-r--r-- | cpp/src/IceStorm/Service.cpp | 12 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 213 | ||||
-rw-r--r-- | cpp/src/IceStorm/Subscriber.h | 4 | ||||
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.cpp | 52 | ||||
-rw-r--r-- | cpp/src/IceStorm/SubscriberPool.h | 4 | ||||
-rw-r--r-- | cpp/src/IceStorm/TopicI.cpp | 27 |
7 files changed, 299 insertions, 57 deletions
diff --git a/cpp/src/IceStorm/IceStormInternal.ice b/cpp/src/IceStorm/IceStormInternal.ice index 96d1f0e3b00..eb889338e61 100644 --- a/cpp/src/IceStorm/IceStormInternal.ice +++ b/cpp/src/IceStorm/IceStormInternal.ice @@ -18,6 +18,26 @@ module IceStorm /** * + * The event data. + * + **/ +struct EventData +{ + /** The operation name. */ + string op; + /** The operation mode. */ + Ice::OperationMode mode; + /** The encoded data for the operation's input parameters. */ + Ice::ByteSeq data; + /** The Ice::Current::Context data from the originating request. */ + Ice::Context context; +}; + +/** A sequence of EventData. */ +sequence<EventData> EventDataSeq; + +/** + * * The TopicLink interface. This is used to forward events between * federated Topic instances. * @@ -33,7 +53,7 @@ interface TopicLink * @param events The events to forward. * **/ - ["ami"] void forward(EventSeq events); + ["ami"] void forward(EventDataSeq events); }; /** @@ -55,6 +75,28 @@ interface TopicInternal extends Topic idempotent TopicLink* getLinkProxy(); }; +/** + * + * This is version 2 of the TopicLink interface. This is used to + * forward events between federated Topic instances for Ice 3.2 and + * later. + * + * @see TopicInternal + * + **/ +interface TopicLinkV2 +{ + /** + * + * Forward a sequence of events. + * + * @param events The events to forward. + * + **/ + ["ami"] void forward(EventSeq events); +}; + + }; // End module IceStorm #endif diff --git a/cpp/src/IceStorm/Service.cpp b/cpp/src/IceStorm/Service.cpp index f17ab500ace..e81e0370a4c 100644 --- a/cpp/src/IceStorm/Service.cpp +++ b/cpp/src/IceStorm/Service.cpp @@ -123,13 +123,6 @@ IceStorm::ServiceI::start( _topicAdapter->activate(); _publishAdapter->activate(); - - // - // The keep alive thread must be started after all topics are - // installed so that any upstream topics are notified immediately - // after startup. - // - //_instance->keepAlive()->startPinging(); } void @@ -181,9 +174,8 @@ IceStorm::ServiceI::stop() // does not clear the references. This is because the shutdown has // to be in two stages. First we destroy & join with the threads // so that no further activity can take place. Then we reap() - // which has to call on various instance objects (such as the keep - // alive thread), then we clear the instance which breaks any - // cycles. + // which has to call on various instance objects, then we clear + // the instance which breaks any cycles. // // diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 844f9902b10..1089b4f3e01 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -167,6 +167,11 @@ private: const TopicLinkPrx _obj; const int _cost; + // The first event that is sent we check the whether the object + // supports the "V2" facet. + bool _checkV2; + TopicLinkV2Prx _objV2; + // The next to try sending a new event if we're offline. IceUtil::Time _next; bool _warn; @@ -204,6 +209,10 @@ SubscriberOneway::flush() { return false; } + // + // Flush cannot be called with SubscriberStateShutdown because + // the pool workers are joined with before flushing. + // assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); @@ -281,6 +290,10 @@ SubscriberOneway::flush() return false; } + if(_state == SubscriberStateShutdown) + { + return false; + } if(!_events.empty()) { assert(_state == SubscriberStateFlushPending); @@ -351,6 +364,10 @@ SubscriberTwoway::flush() { return false; } + // + // Flush cannot be called with SubscriberStateShutdown because + // the pool workers are joined with before flushing. + // assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); @@ -377,6 +394,11 @@ SubscriberTwoway::flush() // sync.acquire(); + if(_state == SubscriberStateShutdown) + { + return false; + } + // // If there have been more events queued in the meantime then // we have a pending flush. @@ -444,6 +466,10 @@ SubscriberTwowayOrdered::flush() { return false; } + // + // Flush cannot be called with SubscriberStateShutdown because + // the pool workers are joined with before flushing. + // assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); @@ -459,15 +485,17 @@ SubscriberTwowayOrdered::flush() void SubscriberTwowayOrdered::response() { + IceUtil::Mutex::Lock sync(_mutex); + + assert(_state != SubscriberStateError); + if(_state == SubscriberStateShutdown) { - IceUtil::Mutex::Lock sync(_mutex); - - assert(_state != SubscriberStateError); - if(_events.empty()) - { - _state = SubscriberStateOnline; - return; - } + return; + } + if(_events.empty()) + { + _state = SubscriberStateOnline; + return; } _instance->subscriberPool()->flush(this); } @@ -479,7 +507,45 @@ class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward { public: - Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : _subscriber(subscriber) + Topiclink_forwardI(const SubscriberLinkPtr& subscriber) : + _subscriber(subscriber) + { + } + + virtual void + ice_response() + { + _subscriber->response(); + } + + virtual void + ice_exception(const Ice::Exception& ex) + { + try + { + ex.ice_throw(); + } + catch(const Ice::ObjectNotExistException& ex) + { + _subscriber->error(ex); + } + catch(const Ice::LocalException& ex) + { + _subscriber->offline(ex); + } + } + +private: + + const SubscriberLinkPtr _subscriber; +}; + +class TopiclinkV2_forwardI : public IceStorm::AMI_TopicLinkV2_forward +{ +public: + + TopiclinkV2_forwardI(const SubscriberLinkPtr& subscriber) : + _subscriber(subscriber) { } @@ -520,6 +586,7 @@ SubscriberLink::SubscriberLink( Subscriber(instance, 0, true, obj->ice_getIdentity()), _obj(TopicLinkPrx::uncheckedCast(obj->ice_collocationOptimized(false))), _cost(cost), + _checkV2(false), _warn(true) { } @@ -544,6 +611,10 @@ SubscriberLink::queue(bool forwarded, const EventSeq& events) { return QueueStateError; } + if(_state == SubscriberStateShutdown) + { + return QueueStateNoFlush; + } // // If the proxy is offline and its time to send another event then @@ -618,13 +689,66 @@ SubscriberLink::flush() return false; } + // + // Flush cannot be called with SubscriberStateShutdown because + // the pool workers are joined with before flushing. + // assert(_state == SubscriberStateFlushPending); assert(!_events.empty()); v.swap(_events); } - _obj->forward_async(new Topiclink_forwardI(this), v); + // + // Only one thread at a time can be active at this point. + // + if(!_checkV2) + { + try + { + _objV2 = TopicLinkV2Prx::checkedCast(_obj, "V2"); + } + catch(const Ice::ObjectNotExistException& ex) + { + error(ex); + return false; + } + catch(const Ice::LocalException& ex) + { + offline(ex); + return false; + } + TraceLevelsPtr traceLevels = _instance->traceLevels(); + if(!_objV2 && traceLevels->subscriber > 0) + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << _instance->communicator()->identityToString(_id) << ": link is v1 IceStorm."; + } + _checkV2 = true; + } + + if(_objV2) + { + _objV2->forward_async(new TopiclinkV2_forwardI(this), v); + } + else + { + // + // Transform the event sequence into a EventData. + // + EventDataSeq events; + events.reserve(v.size()); + for(EventSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + EventData data; + data.op = (*p)->op; + data.mode = (*p)->mode; + data.data = (*p)->data; + data.context = (*p)->context; + events.push_back(data); + } + _obj->forward_async(new Topiclink_forwardI(this), events); + } return false; } @@ -632,25 +756,27 @@ SubscriberLink::flush() void SubscriberLink::response() { + IceUtil::Mutex::Lock sync(_mutex); + + assert(_state != SubscriberStateError); + if(_state == SubscriberStateShutdown) { - IceUtil::Mutex::Lock sync(_mutex); - - assert(_state == SubscriberStateFlushPending); - - // - // A successful response means we're no longer retrying, we're - // back active. - // - _warn = true; - - // - // No more events, no need to requeue this subscriber. - // - if(_events.empty()) - { - _state = SubscriberStateOnline; - return; - } + return; + } + + // + // A successful response means we're no longer retrying, we're + // back active. + // + _warn = true; + + // + // No more events, no need to requeue this subscriber. + // + if(_events.empty()) + { + _state = SubscriberStateOnline; + return; } _instance->subscriberPool()->flush(this); } @@ -659,6 +785,10 @@ void SubscriberLink::offline(const Ice::Exception& e) { IceUtil::Mutex::Lock sync(_mutex); + if(_state == SubscriberStateShutdown) + { + return; + } assert(_state != SubscriberStateOffline); _next = IceUtil::Time::now() + _instance->discardInterval(); @@ -780,7 +910,10 @@ Subscriber::create( const TopicLinkPrx& link, int cost) { - return new SubscriberLink(instance, link, cost); + return new SubscriberLink( + instance, + TopicLinkPrx::uncheckedCast(link->ice_timeout(instance->sendTimeout())), + cost); } Subscriber::~Subscriber() @@ -814,6 +947,11 @@ Subscriber::queue(bool, const EventSeq& events) { return QueueStateError; } + if(_state == SubscriberStateShutdown) + { + return QueueStateNoFlush; + } + copy(events.begin(), events.end(), back_inserter(_events)); if(_state == SubscriberStateFlushPending) { @@ -840,10 +978,21 @@ Subscriber::destroy() { // Ignore } + catch(const Ice::ObjectAdapterDeactivatedException&) + { + // Ignore + } } } void +Subscriber::shutdown() +{ + IceUtil::Mutex::Lock sync(_mutex); + _state = SubscriberStateShutdown; +} + +void Subscriber::flushTime(const IceUtil::Time& interval) { if(_resetMax || interval > _maxSend) @@ -857,10 +1006,6 @@ Subscriber::flushTime(const IceUtil::Time& interval) IceUtil::Time Subscriber::pollMaxFlushTime(const IceUtil::Time& now) { - //IceUtil::Time max = _maxSend; - //_maxSend = _maxSend * 0.95; - //return max; - // The next call to flushTime can reset the max time. _resetMax = true; return _maxSend; @@ -870,7 +1015,7 @@ void Subscriber::error(const Ice::Exception& e) { IceUtil::Mutex::Lock sync(_mutex); - if(_state != SubscriberStateError) + if(_state != SubscriberStateError && _state != SubscriberStateShutdown) { _state = SubscriberStateError; _events.clear(); diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h index 580bef03cbb..ba0502b43dc 100644 --- a/cpp/src/IceStorm/Subscriber.h +++ b/cpp/src/IceStorm/Subscriber.h @@ -47,6 +47,7 @@ public: // virtual bool flush() = 0; virtual void destroy(); + void shutdown(); // // These methods must only be called by the SubscriberPool they @@ -74,7 +75,8 @@ protected: SubscriberStateOnline, SubscriberStateFlushPending, SubscriberStateOffline, - SubscriberStateError + SubscriberStateError, + SubscriberStateShutdown }; SubscriberState _state; // The subscriber state. EventSeq _events; // The queue of events to send. diff --git a/cpp/src/IceStorm/SubscriberPool.cpp b/cpp/src/IceStorm/SubscriberPool.cpp index ee3865bc74d..f118874295f 100644 --- a/cpp/src/IceStorm/SubscriberPool.cpp +++ b/cpp/src/IceStorm/SubscriberPool.cpp @@ -81,7 +81,7 @@ SubscriberPoolMonitor::SubscriberPoolMonitor(const SubscriberPoolPtr& manager, c _manager(manager), _timeout(timeout), _needCheck(false), - _destroy(false) + _destroyed(false) { start(); } @@ -97,7 +97,7 @@ SubscriberPoolMonitor::run() { { Lock sync(*this); - if(_destroy) + if(_destroyed) { return; } @@ -148,7 +148,7 @@ void SubscriberPoolMonitor::destroy() { Lock sync(*this); - _destroy = true; + _destroyed = true; notify(); } @@ -162,7 +162,7 @@ SubscriberPool::SubscriberPool(const InstancePtr& instance) : "IceStorm.SubscriberPool.Timeout", 1000), 50))), // 10 * the stall timeout. _stallCheck(_timeout * 10), - _destroy(false), + _destroyed(false), _reap(0) { try @@ -197,6 +197,10 @@ void SubscriberPool::flush(list<SubscriberPtr>& subscribers) { Lock sync(*this); + if(_destroyed) + { + return; + } // // Splice on the new set of subscribers to SubscriberPool. // @@ -209,6 +213,10 @@ void SubscriberPool::flush(const SubscriberPtr& subscriber) { Lock sync(*this); + if(_destroyed) + { + return; + } _pending.push_back(subscriber); assert(invariants()); notify(); @@ -218,6 +226,10 @@ void SubscriberPool::add(const SubscriberPtr& subscriber) { Lock sync(*this); + if(_destroyed) + { + return; + } _subscribers.push_back(subscriber); assert(invariants()); } @@ -226,6 +238,10 @@ void SubscriberPool::remove(const SubscriberPtr& subscriber) { Lock sync(*this); + if(_destroyed) + { + return; + } // // Note that this cannot remove based on the subscriber id because // the pool is TopicManager scoped and not topic scoped therefore @@ -245,6 +261,12 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: { Lock sync(*this); + if(_destroyed) + { + subscriber = 0; + return; + } + if(subscriber) { if(requeue) @@ -353,7 +375,7 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: } } - while(_pending.empty() && !_destroy) + while(_pending.empty() && !_destroyed) { // // If we wait then there is no need to monitor anymore. @@ -362,7 +384,7 @@ SubscriberPool::dequeue(SubscriberPtr& subscriber, bool requeue, const IceUtil:: wait(); } - if(_destroy) + if(_destroyed) { return; } @@ -398,18 +420,22 @@ SubscriberPool::destroy() { // // First mark the pool as destroyed. This causes all of the worker - // threads to unblock and terminate. + // threads to unblock and terminate. We also copy the list of subscribers + // for shutdown. No new subscribers can be added once _destroyed is set. // + std::list<SubscriberPtr> subscribers; + { Lock sync(*this); - _destroy = true; + _destroyed = true; notifyAll(); if(_subscriberPoolMonitor) { _subscriberPoolMonitor->destroy(); } + subscribers = _subscribers; + _subscribers.clear(); } - // // Next join with each worker. // @@ -420,6 +446,14 @@ SubscriberPool::destroy() _workers.clear(); // + // Shutdown each subscriber. + // + for(list<SubscriberPtr>::const_iterator q = subscribers.begin(); q != subscribers.end(); ++q) + { + (*q)->shutdown(); + } + + // // Once all of the workers have gone then we'll no longer have // concurrent access to the pool monitor, so we can join with it // and then clear to remove the circular reference count. diff --git a/cpp/src/IceStorm/SubscriberPool.h b/cpp/src/IceStorm/SubscriberPool.h index 0fd2855d548..76289fe3429 100644 --- a/cpp/src/IceStorm/SubscriberPool.h +++ b/cpp/src/IceStorm/SubscriberPool.h @@ -50,7 +50,7 @@ private: const SubscriberPoolPtr _manager; const IceUtil::Time _timeout; bool _needCheck; - bool _destroy; + bool _destroyed; }; typedef IceUtil::Handle<SubscriberPoolMonitor> SubscriberPoolMonitorPtr; @@ -91,7 +91,7 @@ private: std::list<SubscriberPtr> _pending; std::list<SubscriberPtr> _subscribers; - bool _destroy; + bool _destroyed; std::list<IceUtil::ThreadPtr> _workers; int _reap; diff --git a/cpp/src/IceStorm/TopicI.cpp b/cpp/src/IceStorm/TopicI.cpp index b00bbc2ab45..98b11165cd6 100644 --- a/cpp/src/IceStorm/TopicI.cpp +++ b/cpp/src/IceStorm/TopicI.cpp @@ -83,6 +83,32 @@ public: } virtual void + forward(const EventDataSeq& v, const Ice::Current& current) + { + EventSeq events; + events.reserve(v.size()); + for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) + { + events.push_back(new Event(p->op, p->mode, p->data, p->context)); + } + _topic->publish(true, events); + } + +private: + + const TopicIPtr _topic; +}; + +class TopicLinkV2I : public TopicLinkV2 +{ +public: + + TopicLinkV2I(const TopicIPtr& topic) : + _topic(topic) + { + } + + virtual void forward(const EventSeq& v, const Ice::Current& current) { _topic->publish(true, v); @@ -125,6 +151,7 @@ TopicI::TopicI( // id.name = "link"; _linkPrx = TopicLinkPrx::uncheckedCast(_instance->objectAdapter()->add(new TopicLinkI(this), id)); + _instance->objectAdapter()->addFacet(new TopicLinkV2I(this), id, "V2"); // // Re-establish linked subscribers. |