diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rwxr-xr-x | cpp/src/IceStorm/Subscriber.cpp | 184 |
1 files changed, 55 insertions, 129 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index b854ff2a835..74b15583cc1 100755 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -93,6 +93,11 @@ public: virtual void flush(); + void exception(const Ice::Exception& ex) + { + error(false, ex); + } + void doFlush(); private: @@ -111,7 +116,11 @@ public: virtual void flush(); - void sent(); + void exception(const Ice::Exception& ex) + { + error(true, ex); + } + void sent(bool); private: @@ -146,84 +155,6 @@ private: const TopicLinkPrx _obj; }; -class OnewayIceInvokeI : public Ice::AMI_Object_ice_invoke, public Ice::AMISentCallback -{ -public: - - OnewayIceInvokeI(const SubscriberOnewayPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - ice_response(bool, const std::vector<Ice::Byte>&) - { - assert(false); - } - - virtual void - ice_sent() - { - _subscriber->sent(); - } - - virtual void - ice_exception(const Ice::Exception& e) - { - _subscriber->error(true, e); - } - -private: - - const SubscriberOnewayPtr _subscriber; -}; - -class IceInvokeI : public Ice::AMI_Object_ice_invoke -{ -public: - - IceInvokeI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - ice_response(bool, const std::vector<Ice::Byte>&) - { - _subscriber->response(); - } - - virtual void - ice_exception(const Ice::Exception& e) - { - _subscriber->error(true, e); - } - -private: - - const SubscriberPtr _subscriber; -}; - -class FlushBatchI : public Ice::AMI_Object_ice_flushBatchRequests -{ -public: - - FlushBatchI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - ice_exception(const Ice::Exception& e) - { - _subscriber->error(false, e); - } - -private: - - const SubscriberPtr _subscriber; -}; - class FlushTimerTask : public IceUtil::TimerTask { public: @@ -313,7 +244,8 @@ SubscriberBatch::doFlush() return; } - _obj->ice_flushBatchRequests_async(new FlushBatchI(this)); + _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(this, + &SubscriberBatch::exception)); // This is significantly faster than the async version, but it can // block the calling thread. Bad news! @@ -361,7 +293,11 @@ SubscriberOneway::flush() _events.erase(_events.begin()); try { - if(!_obj->ice_invoke_async(new OnewayIceInvokeI(this), e->op, e->mode, e->data, e->context)) + Ice::AsyncResultPtr result = _obj->begin_ice_invoke( + e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this, + &SubscriberOneway::exception, + &SubscriberOneway::sent)); + if(!result->sentSynchronously()) { ++_outstanding; } @@ -380,8 +316,13 @@ SubscriberOneway::flush() } void -SubscriberOneway::sent() +SubscriberOneway::sent(bool sentSynchronously) { + if(sentSynchronously) + { + return; + } + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); // Decrement the _outstanding count. @@ -436,7 +377,8 @@ SubscriberTwoway::flush() try { - _obj->ice_invoke_async(new IceInvokeI(this), e->op, e->mode, e->data, e->context); + _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context, + Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed)); } catch(const Ice::Exception& ex) { @@ -449,34 +391,6 @@ SubscriberTwoway::flush() namespace { -class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward -{ -public: - - Topiclink_forwardI(const SubscriberPtr& subscriber) : - _subscriber(subscriber) - { - } - - virtual void - ice_response() - { - _subscriber->response(); - } - - virtual void - ice_exception(const Ice::Exception& e) - { - _subscriber->error(true, e); - } - -private: - - const SubscriberPtr _subscriber; -}; - -} - SubscriberLink::SubscriberLink( const InstancePtr& instance, const SubscriberRecord& rec) : @@ -523,7 +437,7 @@ SubscriberLink::flush() try { ++_outstanding; - _obj->forward_async(new Topiclink_forwardI(this), v); + _obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed)); } catch(const Ice::Exception& ex) { @@ -532,6 +446,8 @@ SubscriberLink::flush() } } +} + SubscriberPtr Subscriber::create( const InstancePtr& instance, @@ -850,30 +766,40 @@ Subscriber::error(bool dec, const Ice::Exception& e) } void -Subscriber::response() +Subscriber::completed(const Ice::AsyncResultPtr& result) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - - // Decrement the _outstanding count. - --_outstanding; - assert(_outstanding >= 0 && _outstanding < _maxOutstanding); - - // - // A successful response means we're no longer retrying, we're - // back active. - // - _currentRetry = 0; - - if(_events.empty() && _outstanding == 0 && _shutdown) + try { - _lock.notify(); + result->throwLocalException(); + + IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + + // Decrement the _outstanding count. + --_outstanding; + assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + + // + // A successful response means we're no longer retrying, we're + // back active. + // + _currentRetry = 0; + + if(_events.empty() && _outstanding == 0 && _shutdown) + { + _lock.notify(); + } + else + { + flush(); + } } - else + catch(const Ice::LocalException& ex) { - flush(); + error(true, ex); } } + void Subscriber::shutdown() { |