diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscribers.cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscribers.cpp | 153 |
1 files changed, 90 insertions, 63 deletions
diff --git a/cpp/src/IceStorm/Subscribers.cpp b/cpp/src/IceStorm/Subscribers.cpp index 2aca83c3ca9..17b42bdc605 100644 --- a/cpp/src/IceStorm/Subscribers.cpp +++ b/cpp/src/IceStorm/Subscribers.cpp @@ -29,19 +29,19 @@ using namespace IceStorm; // // Per Subscriber object. // -namespace IceStorm +namespace { -class PerSubscriberPublisherProxyI : public Ice::BlobjectArray +class PerSubscriberPublisherI : public Ice::BlobjectArray { public: - PerSubscriberPublisherProxyI(const InstancePtr& instance) : + PerSubscriberPublisherI(const InstancePtr& instance) : _instance(instance) { } - ~PerSubscriberPublisherProxyI() + ~PerSubscriberPublisherI() { } @@ -56,11 +56,18 @@ public: vector<Ice::Byte>&, const Ice::Current& current) { - EventPtr event = new Event; - event->op = current.operation; - event->mode = current.mode; - vector<Ice::Byte>(inParams.first, inParams.second).swap(event->data); - event->context = current.ctx; + EventPtr event = new Event( + current.operation, + current.mode, + Ice::ByteSeq(), + current.ctx); + + // + // COMPILERBUG: gcc 4.0.1 doesn't like this. + // + //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); + Ice::ByteSeq data(inParams.first, inParams.second); + event->data.swap(data); EventSeq e; e.push_back(event); @@ -70,7 +77,7 @@ public: { list<SubscriberPtr> l; l.push_back(_subscriber); - _instance->subscriberPool()->add(l); + _instance->subscriberPool()->flush(l); } return true; } @@ -80,11 +87,11 @@ private: const InstancePtr _instance; /*const*/ SubscriberPtr _subscriber; }; -typedef IceUtil::Handle<PerSubscriberPublisherProxyI> PerSubscriberPublisherProxyIPtr; +typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr; } // Each of the various Subscriber types. -namespace IceStorm +namespace { class SubscriberOneway : public Subscriber @@ -187,24 +194,18 @@ SubscriberOneway::flush() IceUtil::Mutex::Lock sync(_mutex); // - // If another thread is busy delivering events or we're no longer - // active then we have nothing left to do. + // If the subscriber errored out then we're done. // - if(_state != StateActive || _events.empty()) + if(_state != StateActive) { _busy = false; return false; } + assert(!_events.empty()); assert(_busy); try { - if(_state != StateActive) - { - assert(!_busy); - return false; - } - // // Get the current set of events, but release the lock before // attempting to deliver the events. This allows other threads @@ -216,15 +217,25 @@ SubscriberOneway::flush() sync.release(); // XXX: -/* TraceLevelsPtr traceLevels = _instance->traceLevels(); if(_obj->ice_getIdentity().name.substr(0, 4) == "slow") { - Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); - out << "deliberately stalling"; - sleep(1); + //Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + //out << "deliberately stalling"; + sleep(2); + } + if(_obj->ice_getIdentity().name.substr(0, 5) == "block") + { + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << "-> stall for 100s"; + } + sleep(100); + { + Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); + out << "<- stall for 100s"; + } } -*/ // // Deliver the events without holding the lock. @@ -256,15 +267,12 @@ SubscriberOneway::flush() // Reacquire the lock before we check the queue again. // sync.acquire(); - + // // If there have been more events queued in the meantime then // we are still busy. // - if(_events.empty()) - { - _busy = false; - } + _busy = !_events.empty(); } catch(const Ice::LocalException& ex) { @@ -287,7 +295,7 @@ SubscriberOneway::destroy() Subscriber::destroy(); } -namespace IceStorm +namespace { class TwowayInvokeI : public Ice::AMI_Object_ice_invoke @@ -332,22 +340,16 @@ SubscriberTwoway::flush() IceUtil::Mutex::Lock sync(_mutex); // - // If another thread is busy delivering events or we're no longer - // active then we have nothing left to do. + // If the subscriber errored out then we're done. // - if(_state != StateActive || _events.empty()) + if(_state != StateActive) { _busy = false; return false; } + assert(!_events.empty()); assert(_busy); - if(_state != StateActive) - { - assert(!_busy); - return false; - } - // // Get the current set of events, but release the lock before // attempting to deliver the events. This allows other threads @@ -372,17 +374,15 @@ SubscriberTwoway::flush() sync.acquire(); // - // If there have been more events queued in the meantime then we - // are still busy. + // If there have been more events queued in the meantime then + // we are still busy. // - if(_events.empty()) - { - _busy = false; - } + _busy = !_events.empty(); + return _busy; } -namespace IceStorm +namespace { class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke @@ -428,23 +428,24 @@ SubscriberTwowayOrdered::flush() EventPtr e; { IceUtil::Mutex::Lock sync(_mutex); - + // - // If another thread is busy delivering events or we're no longer - // active then we have nothing left to do. + // If the subscriber errored out then we're done. // - if(_state != StateActive || _events.empty()) + if(_state != StateActive) { _busy = false; return false; } + assert(!_events.empty()); assert(_busy); e = _events.front(); _events.erase(_events.begin()); } - + _obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context); + return false; } @@ -454,15 +455,15 @@ SubscriberTwowayOrdered::response() EventPtr e; { IceUtil::Mutex::Lock sync(_mutex); - + assert(_state == StateActive && _busy); - + if(_events.empty()) { _busy = false; return; } - + e = _events.front(); _events.erase(_events.begin()); } @@ -470,7 +471,7 @@ SubscriberTwowayOrdered::response() _obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context); } -namespace IceStorm +namespace { class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward @@ -590,20 +591,21 @@ SubscriberLink::flush() IceUtil::Mutex::Lock sync(_mutex); // - // If another thread is busy delivering events or we're no longer - // active then we have nothing left to do. + // If the subscriber errored out then we're done. // - if(_state != StateActive || _events.empty()) + if(_state != StateActive) { _busy = false; return false; } + assert(!_events.empty()); assert(_busy); - + v.swap(_events); } _obj->forward_async(new Topiclink_forwardI(this), v); + return false; } @@ -634,7 +636,7 @@ Subscriber::create( const Ice::ObjectPrx& obj, const IceStorm::QoS& qos) { - PerSubscriberPublisherProxyIPtr per = new PerSubscriberPublisherProxyI(instance); + PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance); Ice::ObjectPrx proxy = instance->objectAdapter()->addWithUUID(per); TraceLevelsPtr traceLevels = instance->traceLevels(); SubscriberPtr subscriber; @@ -791,6 +793,29 @@ Subscriber::destroy() } void +Subscriber::flushTime(const IceUtil::Time& interval) +{ + if(_resetMax || interval > _maxSend) + { + assert(interval != IceUtil::Time()); + _resetMax = false; + _maxSend = interval; + } +} + +IceUtil::Time +Subscriber::pollMaxFlushTime(const IceUtil::Time& now) +{ + //IceUtil::Time max = _maxSend; + //_maxSend = _maxSend * 0.95; + //return max; + + // The next call to flushTime can reset the max time. + _resetMax = true; + return _maxSend; +} + +void Subscriber::setError(const Ice::Exception& e) { IceUtil::Mutex::Lock sync(_mutex); @@ -836,7 +861,9 @@ Subscriber::Subscriber( _persistent(persistent), _proxy(proxy), _state(StateActive), - _busy(false) + _busy(false), + _resetMax(true), + _maxSend(IceUtil::Time::seconds(60*24)) // A long time { } |