diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rwxr-xr-x | cpp/src/IceStorm/Subscriber.cpp | 179 |
1 files changed, 155 insertions, 24 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 74b15583cc1..4733d93d296 100755 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -78,6 +78,21 @@ private: }; typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr; +IceStorm::Instrumentation::SubscriberState +toSubscriberState(Subscriber::SubscriberState s) +{ + switch(s) + { + case Subscriber::SubscriberStateOnline: + return IceStorm::Instrumentation::SubscriberStateOnline; + case Subscriber::SubscriberStateOffline: + return IceStorm::Instrumentation::SubscriberStateOffline; + case Subscriber::SubscriberStateError: + case Subscriber::SubscriberStateReaped: + return IceStorm::Instrumentation::SubscriberStateError; + } +} + } // Each of the various Subscriber types. @@ -99,6 +114,7 @@ public: } void doFlush(); + void sent(bool); private: @@ -207,27 +223,24 @@ SubscriberBatch::flush() void SubscriberBatch::doFlush() { - EventDataSeq v; + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // + // If the subscriber isn't online we're done. + // + if(_state != SubscriberStateOnline) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - - --_outstanding; - assert(_outstanding == 0); - - if(_shutdown) - { - _lock.notify(); - } + return; + } - // - // If the subscriber isn't online we're done. - // - if(_state != SubscriberStateOnline) - { - return; - } - v.swap(_events); - assert(!v.empty()); + EventDataSeq v; + v.swap(_events); + assert(!v.empty()); + + if(_observer) + { + _outstandingCount = v.size(); + _observer->outstanding(v.size()); } try @@ -237,6 +250,20 @@ SubscriberBatch::doFlush() { _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); } + + Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests( + Ice::newCallback_Object_ice_flushBatchRequests(this, + &SubscriberBatch::exception, + &SubscriberBatch::sent)); + if(result->sentSynchronously()) + { + --_outstanding; + assert(_outstanding == 0); + if(_observer) + { + _observer->delivered(_outstandingCount); + } + } } catch(const Ice::Exception& ex) { @@ -244,15 +271,46 @@ SubscriberBatch::doFlush() return; } - _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(this, - &SubscriberBatch::exception)); - + if(_events.empty() && _outstanding == 0 && _shutdown) + { + _lock.notify(); + } + // This is significantly faster than the async version, but it can // block the calling thread. Bad news! //_obj->ice_flushBatchRequests(); } +void +SubscriberBatch::sent(bool sentSynchronously) +{ + if(sentSynchronously) + { + return; + } + + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // Decrement the _outstanding count. + --_outstanding; + assert(_outstanding == 0); + if(_observer) + { + _observer->delivered(_outstandingCount); + } + + if(_events.empty() && _outstanding == 0 && _shutdown) + { + _lock.notify(); + } + else if(!_events.empty()) + { + flush(); + } + +} + SubscriberOneway::SubscriberOneway( const InstancePtr& instance, const SubscriberRecord& rec, @@ -291,6 +349,11 @@ SubscriberOneway::flush() // EventDataPtr e = _events.front(); _events.erase(_events.begin()); + if(_observer) + { + _observer->outstanding(1); + } + try { Ice::AsyncResultPtr result = _obj->begin_ice_invoke( @@ -301,6 +364,10 @@ SubscriberOneway::flush() { ++_outstanding; } + else if(_observer) + { + _observer->delivered(1); + } } catch(const Ice::Exception& ex) { @@ -328,6 +395,10 @@ SubscriberOneway::sent(bool sentSynchronously) // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + if(_observer) + { + _observer->delivered(1); + } if(_events.empty() && _outstanding == 0 && _shutdown) { @@ -374,7 +445,11 @@ SubscriberTwoway::flush() EventDataPtr e = _events.front(); _events.erase(_events.begin()); ++_outstanding; - + if(_observer) + { + _observer->outstanding(1); + } + try { _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context, @@ -437,6 +512,11 @@ SubscriberLink::flush() try { ++_outstanding; + if(_observer) + { + _outstandingCount = v.size(); + _observer->outstanding(_outstandingCount); + } _obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed)); } catch(const Ice::Exception& ex) @@ -600,6 +680,10 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events) case SubscriberStateOnline: copy(events.begin(), events.end(), back_inserter(_events)); + if(_observer) + { + _observer->queued(events.size()); + } flush(); break; @@ -664,6 +748,8 @@ Subscriber::destroy() // Ignore } } + + _observer.detach(); } void @@ -773,10 +859,14 @@ Subscriber::completed(const Ice::AsyncResultPtr& result) result->throwLocalException(); IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - + // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + if(_observer) + { + _observer->delivered(_outstandingCount); + } // // A successful response means we're no longer retrying, we're @@ -810,6 +900,24 @@ Subscriber::shutdown() { _lock.wait(); } + + _observer.detach(); +} + +void +Subscriber::updateObserver() +{ + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + if(_instance->observer()) + { + _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), + _rec.topicName, + _rec.obj, + _rec.theQoS, + _rec.theTopic, + toSubscriberState(_state), + _observer.get())); + } } Subscriber::Subscriber( @@ -827,6 +935,7 @@ Subscriber::Subscriber( _shutdown(false), _state(SubscriberStateOnline), _outstanding(0), + _outstandingCount(1), _currentRetry(0) { if(_proxy && _instance->publisherReplicaProxy()) @@ -834,6 +943,17 @@ Subscriber::Subscriber( const_cast<Ice::ObjectPrx&>(_proxyReplica) = _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity()); } + + if(_instance->observer()) + { + _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), + rec.topicName, + rec.obj, + rec.theQoS, + rec.theTopic, + toSubscriberState(_state), + 0)); + } } namespace @@ -872,6 +992,17 @@ Subscriber::setState(Subscriber::SubscriberState state) << " transition from: " << stateToString(_state) << " to: " << stateToString(state); } _state = state; + + if(_instance->observer()) + { + _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), + _rec.topicName, + _rec.obj, + _rec.theQoS, + _rec.theTopic, + toSubscriberState(_state), + _observer.get())); + } } } |