diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.cpp | 617 |
1 files changed, 254 insertions, 363 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp index 9df599eec3c..9edfe04ea21 100644 --- a/cpp/src/IceStorm/Subscriber.cpp +++ b/cpp/src/IceStorm/Subscriber.cpp @@ -21,54 +21,45 @@ using namespace IceStormElection; namespace { -class PerSubscriberPublisherI : public Ice::BlobjectArray +class PerSubscriberPublisherI final : public Ice::BlobjectArray { public: - PerSubscriberPublisherI(const InstancePtr& instance) : - _instance(instance) + PerSubscriberPublisherI(shared_ptr<Instance> instance) : + _instance(move(instance)) { } void - setSubscriber(const SubscriberPtr& subscriber) + setSubscriber(shared_ptr<Subscriber> subscriber) { - _subscriber = subscriber; + _subscriber = move(subscriber); } - virtual bool - ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams, + bool + ice_invoke(pair<const Ice::Byte*, const Ice::Byte*> inParams, vector<Ice::Byte>&, - const Ice::Current& current) + const Ice::Current& current) override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); - EventDataPtr event = new EventData( - current.operation, - current.mode, - Ice::ByteSeq(), - current.ctx); + EventData event = { current.operation, current.mode, Ice::ByteSeq(), current.ctx }; - // - // COMPILERBUG: gcc 4.0.1 doesn't like this. - // - //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second)); Ice::ByteSeq data(inParams.first, inParams.second); - event->data.swap(data); + event.data.swap(data); EventDataSeq e; - e.push_back(event); - _subscriber->queue(false, e); + e.push_back(move(event)); + _subscriber->queue(false, move(e)); return true; } private: - const InstancePtr _instance; - /*const*/ SubscriberPtr _subscriber; + const shared_ptr<Instance> _instance; + shared_ptr<Subscriber> _subscriber; }; -typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr; IceStorm::Instrumentation::SubscriberState toSubscriberState(Subscriber::SubscriberState s) @@ -76,15 +67,15 @@ toSubscriberState(Subscriber::SubscriberState s) switch(s) { case Subscriber::SubscriberStateOnline: - return IceStorm::Instrumentation::SubscriberStateOnline; + return IceStorm::Instrumentation::SubscriberState::SubscriberStateOnline; case Subscriber::SubscriberStateOffline: - return IceStorm::Instrumentation::SubscriberStateOffline; + return IceStorm::Instrumentation::SubscriberState::SubscriberStateOffline; case Subscriber::SubscriberStateError: case Subscriber::SubscriberStateReaped: - return IceStorm::Instrumentation::SubscriberStateError; + return IceStorm::Instrumentation::SubscriberState::SubscriberStateError; default: assert(false); - return IceStorm::Instrumentation::SubscriberStateError; + return IceStorm::Instrumentation::SubscriberState::SubscriberStateError; } } @@ -94,231 +85,64 @@ toSubscriberState(Subscriber::SubscriberState s) namespace { -class SubscriberBatch : public Subscriber -{ -public: - - SubscriberBatch(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&); - - virtual void flush(); - - void exception(const Ice::Exception& ex) - { - error(false, ex); - } - - void doFlush(); - void sent(bool); - -private: - - const Ice::ObjectPrx _obj; - const IceUtil::Time _interval; -}; -typedef IceUtil::Handle<SubscriberBatch> SubscriberBatchPtr; - -class SubscriberOneway : public Subscriber -{ -public: - - SubscriberOneway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&); - - virtual void flush(); - - void exception(const Ice::Exception& ex) - { - error(true, ex); - } - void sent(bool); - -private: - - const Ice::ObjectPrx _obj; -}; -typedef IceUtil::Handle<SubscriberOneway> SubscriberOnewayPtr; - -class SubscriberTwoway : public Subscriber +class SubscriberOneway final : public Subscriber { public: - SubscriberTwoway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, int, - const Ice::ObjectPrx&); + SubscriberOneway(const shared_ptr<Instance>&, const SubscriberRecord&, + const shared_ptr<Ice::ObjectPrx>&, int, shared_ptr<Ice::ObjectPrx>); - virtual void flush(); + void flush() override; + void sentAsynchronously(); private: - const Ice::ObjectPrx _obj; + const shared_ptr<Ice::ObjectPrx> _obj; }; -class SubscriberLink : public Subscriber +class SubscriberTwoway final : public Subscriber { public: - SubscriberLink(const InstancePtr&, const SubscriberRecord&); + SubscriberTwoway(const shared_ptr<Instance>&, const SubscriberRecord&, const shared_ptr<Ice::ObjectPrx>&, int, int, + shared_ptr<Ice::ObjectPrx>); - virtual void flush(); + void flush() override; private: - const TopicLinkPrx _obj; + const shared_ptr<Ice::ObjectPrx> _obj; }; -class FlushTimerTask : public IceUtil::TimerTask +class SubscriberLink final : public Subscriber { public: - FlushTimerTask(const SubscriberBatchPtr& subscriber) : - _subscriber(subscriber) - { - } + SubscriberLink(const shared_ptr<Instance>&, const SubscriberRecord&); - virtual void - runTimerTask() - { - _subscriber->doFlush(); - } + void flush() override; private: - const SubscriberBatchPtr _subscriber; + const shared_ptr<TopicLinkPrx> _obj; }; } -SubscriberBatch::SubscriberBatch( - const InstancePtr& instance, - const SubscriberRecord& rec, - const Ice::ObjectPrx& proxy, - int retryCount, - const Ice::ObjectPrx& obj) : - Subscriber(instance, rec, proxy, retryCount, 1), - _obj(obj), - _interval(instance->flushInterval()) -{ -} - -void -SubscriberBatch::flush() -{ - if(_state != SubscriberStateOnline || _events.empty()) - { - return; - } - - if(_outstanding == 0) - { - ++_outstanding; - _instance->batchFlusher()->schedule(new FlushTimerTask(this), _interval); - } -} - -void -SubscriberBatch::doFlush() -{ - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - - // - // If the subscriber isn't online we're done. - // - if(_state != SubscriberStateOnline) - { - return; - } - - EventDataSeq v; - v.swap(_events); - assert(!v.empty()); - - if(_observer) - { - _outstandingCount = static_cast<Ice::Int>(v.size()); - _observer->outstanding(_outstandingCount); - } - - try - { - vector<Ice::Byte> dummy; - for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p) - { - _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context); - } - - Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests( - Ice::newCallback_Object_ice_flushBatchRequests(this, - &SubscriberBatch::exception, - &SubscriberBatch::sent)); - if(result->sentSynchronously()) - { - --_outstanding; - assert(_outstanding == 0); - if(_observer) - { - _observer->delivered(_outstandingCount); - } - } - } - catch(const Ice::Exception& ex) - { - error(false, ex); - return; - } - - if(_events.empty() && _outstanding == 0 && _shutdown) - { - _lock.notify(); - } - - // This is significantly faster than the async version, but it can - // block the calling thread. Bad news! - - //_obj->ice_flushBatchRequests(); -} - -void -SubscriberBatch::sent(bool sentSynchronously) -{ - if(sentSynchronously) - { - return; - } - - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - - // Decrement the _outstanding count. - --_outstanding; - assert(_outstanding == 0); - if(_observer) - { - _observer->delivered(_outstandingCount); - } - - if(_events.empty() && _outstanding == 0 && _shutdown) - { - _lock.notify(); - } - else if(!_events.empty()) - { - flush(); - } - -} - -SubscriberOneway::SubscriberOneway( - const InstancePtr& instance, - const SubscriberRecord& rec, - const Ice::ObjectPrx& proxy, - int retryCount, - const Ice::ObjectPrx& obj) : +SubscriberOneway::SubscriberOneway(const shared_ptr<Instance>& instance, + const SubscriberRecord& rec, + const shared_ptr<Ice::ObjectPrx>& proxy, + int retryCount, + shared_ptr<Ice::ObjectPrx> obj) : Subscriber(instance, rec, proxy, retryCount, 5), - _obj(obj) + _obj(move(obj)) { } void SubscriberOneway::flush() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); // // If the subscriber isn't online we're done. @@ -335,8 +159,8 @@ SubscriberOneway::flush() // Dequeue the head event, count one more outstanding AMI // request. // - EventDataPtr e = _events.front(); - _events.erase(_events.begin()); + EventData e = move(_events.front()); + _events.pop_front(); if(_observer) { _observer->outstanding(1); @@ -344,11 +168,35 @@ SubscriberOneway::flush() try { - Ice::AsyncResultPtr result = _obj->begin_ice_invoke( - e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this, - &SubscriberOneway::exception, - &SubscriberOneway::sent)); - if(!result->sentSynchronously()) + auto self = static_pointer_cast<SubscriberOneway>(shared_from_this()); + auto isSent = make_shared<promise<bool>>(); + auto future = isSent->get_future(); + + _obj->ice_invokeAsync(e.op, e.mode, e.data, nullptr, + [self](exception_ptr ex) + { + self->error(true, ex); + }, + [self, isSent](bool sentSynchronously) + { + isSent->set_value(sentSynchronously); + if(!sentSynchronously) + { + self->sentAsynchronously(); + } + }, + e.context); + + // + // Check if the request is (or potentially was) sent asynchronously + // + // If the request was sent synchronously then the isSent promise will have been set during the call + // to ice_invokeAsync (sent callback is called immediately after sending from the current thread). + // + // Otherwise if the request was sent asynchronously but quick enough so that the isSent promise is already + // fulfilled, we need to verify the sent callback's sentSynchronously value + // + if(future.wait_for(0s) != future_status::ready || future.get() == false) { ++_outstanding; } @@ -357,28 +205,23 @@ SubscriberOneway::flush() _observer->delivered(1); } } - catch(const Ice::Exception& ex) + catch(const std::exception&) { - error(true, ex); + error(false, current_exception()); return; } } if(_events.empty() && _outstanding == 0 && _shutdown) { - _lock.notify(); + _condVar.notify_one(); } } void -SubscriberOneway::sent(bool sentSynchronously) +SubscriberOneway::sentAsynchronously() { - if(sentSynchronously) - { - return; - } - - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); // Decrement the _outstanding count. --_outstanding; @@ -390,7 +233,7 @@ SubscriberOneway::sent(bool sentSynchronously) if(_events.empty() && _outstanding == 0 && _shutdown) { - _lock.notify(); + _condVar.notify_one(); } else if(_outstanding <= 0 && !_events.empty()) { @@ -398,22 +241,21 @@ SubscriberOneway::sent(bool sentSynchronously) } } -SubscriberTwoway::SubscriberTwoway( - const InstancePtr& instance, - const SubscriberRecord& rec, - const Ice::ObjectPrx& proxy, - int retryCount, - int maxOutstanding, - const Ice::ObjectPrx& obj) : +SubscriberTwoway::SubscriberTwoway(const shared_ptr<Instance>& instance, + const SubscriberRecord& rec, + const shared_ptr<Ice::ObjectPrx>& proxy, + int retryCount, + int maxOutstanding, + shared_ptr<Ice::ObjectPrx> obj) : Subscriber(instance, rec, proxy, retryCount, maxOutstanding), - _obj(obj) + _obj(move(obj)) { } void SubscriberTwoway::flush() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); // // If the subscriber isn't online we're done. @@ -430,9 +272,11 @@ SubscriberTwoway::flush() // Dequeue the head event, count one more outstanding AMI // request. // - EventDataPtr e = _events.front(); - _events.erase(_events.begin()); + EventData e = move(_events.front()); + _events.pop_front(); + ++_outstanding; + if(_observer) { _observer->outstanding(1); @@ -440,12 +284,21 @@ SubscriberTwoway::flush() try { - _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context, - Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed)); + auto self = static_pointer_cast<SubscriberTwoway>(shared_from_this()); + _obj->ice_invokeAsync(e.op, e.mode, e.data, + [self](bool, vector<Ice::Byte>) + { + self->completed(); + }, + [self](exception_ptr ex) + { + self->error(true, ex); + }, + nullptr, e.context); } - catch(const Ice::Exception& ex) + catch(const std::exception&) { - error(true, ex); + error(true, current_exception()); return; } } @@ -454,18 +307,18 @@ SubscriberTwoway::flush() namespace { -SubscriberLink::SubscriberLink( - const InstancePtr& instance, - const SubscriberRecord& rec) : +SubscriberLink::SubscriberLink(const shared_ptr<Instance>& instance, + const SubscriberRecord& rec) : Subscriber(instance, rec, 0, -1, 1), - _obj(TopicLinkPrx::uncheckedCast(rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout()))) + _obj(Ice::uncheckedCast<TopicLinkPrx>( + rec.obj->ice_collocationOptimized(false)->ice_timeout(static_cast<int>(instance->sendTimeout().count())))) { } void SubscriberLink::flush() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); if(_state != SubscriberStateOnline || _outstanding > 0) { @@ -475,16 +328,23 @@ SubscriberLink::flush() EventDataSeq v; v.swap(_events); - EventDataSeq::iterator p = v.begin(); + auto p = v.begin(); while(p != v.end()) { if(_rec.cost != 0) { int cost = 0; - Ice::Context::const_iterator q = (*p)->context.find("cost"); - if(q != (*p)->context.end()) + auto q = p->context.find("cost"); + if(q != p->context.end()) { - cost = atoi(q->second.c_str()); + try + { + cost = stoi(q->second); + } + catch(const std::invalid_argument&) + { + cost = 0; + } } if(cost > _rec.cost) { @@ -502,39 +362,48 @@ SubscriberLink::flush() ++_outstanding; if(_observer) { - _outstandingCount = static_cast<Ice::Int>(v.size()); + _outstandingCount = static_cast<int>(v.size()); _observer->outstanding(_outstandingCount); } - _obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed)); + + auto self = static_pointer_cast<SubscriberLink>(shared_from_this()); + _obj->forwardAsync(v, + [self] + { + self->completed(); + }, + [self](exception_ptr ex) + { + self->error(true, ex); + }); } - catch(const Ice::Exception& ex) + catch(const std::exception&) { - error(true, ex); + error(true, current_exception()); } } } } -SubscriberPtr -Subscriber::create( - const InstancePtr& instance, - const SubscriberRecord& rec) +shared_ptr<Subscriber> +Subscriber::create(const shared_ptr<Instance>& instance, + const SubscriberRecord& rec) { if(rec.link) { - return new SubscriberLink(instance, rec); + return make_shared<SubscriberLink>(instance, rec); } else { - PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance); + auto per = make_shared<PerSubscriberPublisherI>(instance); Ice::Identity perId; perId.category = instance->instanceName(); perId.name = "topic." + rec.topicName + ".publish." + instance->communicator()->identityToString(rec.obj->ice_getIdentity()); - Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId); - TraceLevelsPtr traceLevels = instance->traceLevels(); - SubscriberPtr subscriber; + auto proxy = instance->publishAdapter()->add(per, perId); + auto traceLevels = instance->traceLevels(); + shared_ptr<Subscriber> subscriber; try { @@ -559,10 +428,10 @@ Subscriber::create( // // Override the timeout. // - Ice::ObjectPrx newObj; + shared_ptr<Ice::ObjectPrx> newObj; try { - newObj = rec.obj->ice_timeout(instance->sendTimeout()); + newObj = rec.obj->ice_timeout(static_cast<int>(instance->sendTimeout().count())); } catch(const Ice::FixedProxyException&) { @@ -598,26 +467,33 @@ Subscriber::create( newObj = newObj->ice_connectionCached(connectionCached > 0); } + if(newObj->ice_isBatchOneway()) + { + // Use Oneway in case of Batch Oneway + newObj = newObj->ice_oneway(); + } + else if(newObj->ice_isBatchDatagram()) + { + // Use Datagram in case of Batch Datagram + newObj = newObj->ice_datagram(); + } + if(reliability == "ordered") { if(!newObj->ice_isTwoway()) { throw BadQoS("ordered reliability requires a twoway proxy"); } - subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj); + subscriber = make_shared<SubscriberTwoway>(instance, rec, proxy, retryCount, 1, newObj); } else if(newObj->ice_isOneway() || newObj->ice_isDatagram()) { - subscriber = new SubscriberOneway(instance, rec, proxy, retryCount, newObj); - } - else if(newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram()) - { - subscriber = new SubscriberBatch(instance, rec, proxy, retryCount, newObj); + subscriber = make_shared<SubscriberOneway>(instance, rec, proxy, retryCount, newObj); } else //if(newObj->ice_isTwoway()) { assert(newObj->ice_isTwoway()); - subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj); + subscriber = make_shared<SubscriberTwoway>(instance, rec, proxy, retryCount, 5, newObj); } per->setSubscriber(subscriber); } @@ -631,7 +507,7 @@ Subscriber::create( } } -Ice::ObjectPrx +shared_ptr<Ice::ObjectPrx> Subscriber::proxy() const { return _proxyReplica; @@ -652,7 +528,7 @@ Subscriber::record() const bool Subscriber::queue(bool forwarded, const EventDataSeq& events) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); // If this is a link subscriber if the set of events were // forwarded from another IceStorm instance then do not queue the @@ -666,7 +542,7 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events) { case SubscriberStateOffline: { - if(IceUtil::Time::now(IceUtil::Time::Monotonic) < _next) + if(chrono::steady_clock::now() < _next) { break; } @@ -686,7 +562,7 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events) { if(_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber) { - error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__)); + error(false, make_exception_ptr(SendQueueSizeMaxReached(__FILE__, __LINE__))); return false; } else // DropEvents @@ -717,7 +593,7 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events) bool Subscriber::reap() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); assert(_state >= SubscriberStateError); if(_state == SubscriberStateError) { @@ -730,7 +606,8 @@ Subscriber::reap() void Subscriber::resetIfReaped() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); + if(_state == SubscriberStateReaped) { setState(SubscriberStateError); @@ -740,7 +617,8 @@ Subscriber::resetIfReaped() bool Subscriber::errored() const { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); + return _state >= SubscriberStateError; } @@ -766,14 +644,43 @@ Subscriber::destroy() } } - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); _observer.detach(); } void -Subscriber::error(bool dec, const Ice::Exception& e) +Subscriber::completed() +{ + lock_guard<recursive_mutex> lg(_mutex); + + // Decrement the _outstanding count. + --_outstanding; + assert(_outstanding >= 0 && _outstanding < _maxOutstanding); + if(_observer) + { + _observer->delivered(_outstandingCount); + } + + // + // A successful response means we're no longer retrying, we're + // back active. + // + _currentRetry = 0; + + if(_events.empty() && _outstanding == 0 && _shutdown) + { + _condVar.notify_one(); + } + else + { + flush(); + } +} + +void +Subscriber::error(bool dec, exception_ptr e) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); if(dec) { @@ -790,16 +697,38 @@ Subscriber::error(bool dec, const Ice::Exception& e) { if(_shutdown) { - _lock.notify(); + _condVar.notify_one(); } return; } - // A hard error is an ObjectNotExistException or - // NotRegisteredException. - bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) || - dynamic_cast<const Ice::NotRegisteredException*>(&e) || - dynamic_cast<const IceStorm::SendQueueSizeMaxReached*>(&e); + // A hard error is an ObjectNotExistException, NotRegisteredException, or SendQueueSizeMaxReached + bool hardError; + string what; + try + { + rethrow_exception(e); + } + catch(const Ice::ObjectNotExistException& ex) + { + hardError = true; + what = ex.what(); + } + catch(const Ice::NotRegisteredException& ex) + { + hardError = true; + what = ex.what(); + } + catch(const SendQueueSizeMaxReached& ex) + { + hardError = true; + what = ex.what(); + } + catch(const std::exception& ex) + { + hardError = false; + what = ex.what(); + } // // A twoway subscriber can queue multiple send events and @@ -807,7 +736,7 @@ Subscriber::error(bool dec, const Ice::Exception& e) // replies if we're retrying and its not yet time to process the // next request. // - IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic); + auto now = std::chrono::steady_clock::now(); if(!hardError && _state == SubscriberStateOffline && now < _next) { return; @@ -822,7 +751,7 @@ Subscriber::error(bool dec, const Ice::Exception& e) { assert(_state < SubscriberStateError); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(_currentRetry == 0) { Ice::Warning warn(traceLevels->logger); @@ -831,8 +760,8 @@ Subscriber::error(bool dec, const Ice::Exception& e) { warn << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj); } - warn << " subscriber offline: " << e - << " discarding events: " << _instance->discardInterval() << "s retryCount: " << _retryCount; + warn << " subscriber offline: " << what + << " discarding events: " << _instance->discardInterval().count() << "s retryCount: " << _retryCount; } else { @@ -844,8 +773,8 @@ Subscriber::error(bool dec, const Ice::Exception& e) { out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj); } - out << " subscriber offline: " << e - << " discarding events: " << _instance->discardInterval() << "s retry: " + out << " subscriber offline: " << what + << " discarding events: " << _instance->discardInterval().count() << "s retry: " << _currentRetry << "/" << _retryCount; } } @@ -863,7 +792,7 @@ Subscriber::error(bool dec, const Ice::Exception& e) _events.clear(); setState(SubscriberStateError); - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->subscriber > 0) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); @@ -872,64 +801,26 @@ Subscriber::error(bool dec, const Ice::Exception& e) { out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj); } - out << " subscriber errored out: " << e + out << " subscriber errored out: " << what << " retry: " << _currentRetry << "/" << _retryCount; } } if(_shutdown && _events.empty()) { - _lock.notify(); - } -} - -void -Subscriber::completed(const Ice::AsyncResultPtr& result) -{ - try - { - result->throwLocalException(); - - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); - - // Decrement the _outstanding count. - --_outstanding; - assert(_outstanding >= 0 && _outstanding < _maxOutstanding); - if(_observer) - { - _observer->delivered(_outstandingCount); - } - - // - // A successful response means we're no longer retrying, we're - // back active. - // - _currentRetry = 0; - - if(_events.empty() && _outstanding == 0 && _shutdown) - { - _lock.notify(); - } - else - { - flush(); - } - } - catch(const Ice::LocalException& ex) - { - error(true, ex); + _condVar.notify_one(); } } void Subscriber::shutdown() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + unique_lock<recursive_mutex> lock(_mutex); _shutdown = true; while(_outstanding > 0 && !_events.empty()) { - _lock.wait(); + _condVar.wait(lock); } _observer.detach(); @@ -938,7 +829,8 @@ Subscriber::shutdown() void Subscriber::updateObserver() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock); + lock_guard<recursive_mutex> lg(_mutex); + if(_instance->observer()) { _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), @@ -951,18 +843,17 @@ Subscriber::updateObserver() } } -Subscriber::Subscriber( - const InstancePtr& instance, - const SubscriberRecord& rec, - const Ice::ObjectPrx& proxy, - int retryCount, - int maxOutstanding) : - _instance(instance), - _rec(rec), +Subscriber::Subscriber(shared_ptr<Instance> instance, + SubscriberRecord rec, + shared_ptr<Ice::ObjectPrx> proxy, + int retryCount, + int maxOutstanding) : + _instance(move(instance)), + _rec(move(rec)), _retryCount(retryCount), _maxOutstanding(maxOutstanding), - _proxy(proxy), - _proxyReplica(proxy), + _proxy(move(proxy)), + _proxyReplica(_proxy), _shutdown(false), _state(SubscriberStateOnline), _outstanding(0), @@ -971,17 +862,17 @@ Subscriber::Subscriber( { if(_proxy && _instance->publisherReplicaProxy()) { - const_cast<Ice::ObjectPrx&>(_proxyReplica) = + const_cast<shared_ptr<Ice::ObjectPrx>&>(_proxyReplica) = _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity()); } if(_instance->observer()) { _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), - rec.topicName, - rec.obj, - rec.theQoS, - rec.theTopic, + _rec.topicName, + _rec.obj, + _rec.theQoS, + _rec.theTopic, toSubscriberState(_state), 0)); } @@ -1015,7 +906,7 @@ Subscriber::setState(Subscriber::SubscriberState state) { if(state != _state) { - TraceLevelsPtr traceLevels = _instance->traceLevels(); + auto traceLevels = _instance->traceLevels(); if(traceLevels->subscriber > 1) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); @@ -1038,7 +929,7 @@ Subscriber::setState(Subscriber::SubscriberState state) } bool -IceStorm::operator==(const SubscriberPtr& subscriber, const Ice::Identity& id) +IceStorm::operator==(const shared_ptr<Subscriber>& subscriber, const Ice::Identity& id) { return subscriber->id() == id; } |