// // Copyright (c) ZeroC, Inc. All rights reserved. // #include #include #include #include #include #include #include #include using namespace std; using namespace IceStorm; using namespace IceStormElection; // // Per Subscriber object. // namespace { class PerSubscriberPublisherI final : public Ice::BlobjectArray { public: PerSubscriberPublisherI(shared_ptr instance) : _instance(move(instance)) { } void setSubscriber(shared_ptr subscriber) { _subscriber = move(subscriber); } bool ice_invoke(pair inParams, vector&, const Ice::Current& current) override { // Use cached reads. CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__); EventData event = { current.operation, current.mode, Ice::ByteSeq(), current.ctx }; Ice::ByteSeq data(inParams.first, inParams.second); event.data.swap(data); EventDataSeq e; e.push_back(move(event)); _subscriber->queue(false, move(e)); return true; } private: const shared_ptr _instance; shared_ptr _subscriber; }; IceStorm::Instrumentation::SubscriberState toSubscriberState(Subscriber::SubscriberState s) { switch(s) { case Subscriber::SubscriberStateOnline: return IceStorm::Instrumentation::SubscriberState::SubscriberStateOnline; case Subscriber::SubscriberStateOffline: return IceStorm::Instrumentation::SubscriberState::SubscriberStateOffline; case Subscriber::SubscriberStateError: case Subscriber::SubscriberStateReaped: return IceStorm::Instrumentation::SubscriberState::SubscriberStateError; default: assert(false); return IceStorm::Instrumentation::SubscriberState::SubscriberStateError; } } } // Each of the various Subscriber types. namespace { class SubscriberOneway final : public Subscriber { public: SubscriberOneway(const shared_ptr&, const SubscriberRecord&, const shared_ptr&, int, shared_ptr); void flush() override; void sentAsynchronously(); private: const shared_ptr _obj; }; class SubscriberTwoway final : public Subscriber { public: SubscriberTwoway(const shared_ptr&, const SubscriberRecord&, const shared_ptr&, int, int, shared_ptr); void flush() override; private: const shared_ptr _obj; }; class SubscriberLink final : public Subscriber { public: SubscriberLink(const shared_ptr&, const SubscriberRecord&); void flush() override; private: const shared_ptr _obj; }; } SubscriberOneway::SubscriberOneway(const shared_ptr& instance, const SubscriberRecord& rec, const shared_ptr& proxy, int retryCount, shared_ptr obj) : Subscriber(instance, rec, proxy, retryCount, 5), _obj(move(obj)) { } void SubscriberOneway::flush() { lock_guard lg(_mutex); // // If the subscriber isn't online we're done. // if(_state != SubscriberStateOnline || _events.empty()) { return; } // Send up to _maxOutstanding pending events. while(_outstanding < _maxOutstanding && !_events.empty()) { // // Dequeue the head event, count one more outstanding AMI // request. // EventData e = move(_events.front()); _events.pop_front(); if(_observer) { _observer->outstanding(1); } try { auto self = static_pointer_cast(shared_from_this()); auto isSent = make_shared>(); auto future = isSent->get_future(); _obj->ice_invokeAsync(e.op, e.mode, e.data, nullptr, [self](exception_ptr ex) { self->error(true, ex); }, [self, isSent](bool sentSynchronously) { isSent->set_value(sentSynchronously); if(!sentSynchronously) { self->sentAsynchronously(); } }, e.context); // // Check if the request is (or potentially was) sent asynchronously // // If the request was sent synchronously then the isSent promise will have been set during the call // to ice_invokeAsync (sent callback is called immediately after sending from the current thread). // // Otherwise if the request was sent asynchronously but quick enough so that the isSent promise is already // fulfilled, we need to verify the sent callback's sentSynchronously value // if(future.wait_for(0s) != future_status::ready || future.get() == false) { ++_outstanding; } else if(_observer) { _observer->delivered(1); } } catch(const std::exception&) { error(false, current_exception()); return; } } if(_events.empty() && _outstanding == 0 && _shutdown) { _condVar.notify_one(); } } void SubscriberOneway::sentAsynchronously() { lock_guard lg(_mutex); // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); if(_observer) { _observer->delivered(1); } if(_events.empty() && _outstanding == 0 && _shutdown) { _condVar.notify_one(); } else if(_outstanding <= 0 && !_events.empty()) { flush(); } } SubscriberTwoway::SubscriberTwoway(const shared_ptr& instance, const SubscriberRecord& rec, const shared_ptr& proxy, int retryCount, int maxOutstanding, shared_ptr obj) : Subscriber(instance, rec, proxy, retryCount, maxOutstanding), _obj(move(obj)) { } void SubscriberTwoway::flush() { lock_guard lg(_mutex); // // If the subscriber isn't online we're done. // if(_state != SubscriberStateOnline || _events.empty()) { return; } // Send up to _maxOutstanding pending events. while(_outstanding < _maxOutstanding && !_events.empty()) { // // Dequeue the head event, count one more outstanding AMI // request. // EventData e = move(_events.front()); _events.pop_front(); ++_outstanding; if(_observer) { _observer->outstanding(1); } try { auto self = static_pointer_cast(shared_from_this()); _obj->ice_invokeAsync(e.op, e.mode, e.data, [self](bool, vector) { self->completed(); }, [self](exception_ptr ex) { self->error(true, ex); }, nullptr, e.context); } catch(const std::exception&) { error(true, current_exception()); return; } } } namespace { SubscriberLink::SubscriberLink(const shared_ptr& instance, const SubscriberRecord& rec) : Subscriber(instance, rec, 0, -1, 1), _obj(Ice::uncheckedCast( rec.obj->ice_collocationOptimized(false)->ice_timeout(static_cast(instance->sendTimeout().count())))) { } void SubscriberLink::flush() { lock_guard lg(_mutex); if(_state != SubscriberStateOnline || _outstanding > 0) { return; } EventDataSeq v; v.swap(_events); auto p = v.begin(); while(p != v.end()) { if(_rec.cost != 0) { int cost = 0; auto q = p->context.find("cost"); if(q != p->context.end()) { try { cost = stoi(q->second); } catch(const std::invalid_argument&) { cost = 0; } } if(cost > _rec.cost) { p = v.erase(p); continue; } } ++p; } if(!v.empty()) { try { ++_outstanding; if(_observer) { _outstandingCount = static_cast(v.size()); _observer->outstanding(_outstandingCount); } auto self = static_pointer_cast(shared_from_this()); _obj->forwardAsync(v, [self] { self->completed(); }, [self](exception_ptr ex) { self->error(true, ex); }); } catch(const std::exception&) { error(true, current_exception()); } } } } shared_ptr Subscriber::create(const shared_ptr& instance, const SubscriberRecord& rec) { if(rec.link) { return make_shared(instance, rec); } else { auto per = make_shared(instance); Ice::Identity perId; perId.category = instance->instanceName(); perId.name = "topic." + rec.topicName + ".publish." + instance->communicator()->identityToString(rec.obj->ice_getIdentity()); auto proxy = instance->publishAdapter()->add(per, perId); auto traceLevels = instance->traceLevels(); shared_ptr subscriber; try { int retryCount = 0; QoS::const_iterator p = rec.theQoS.find("retryCount"); if(p != rec.theQoS.end()) { retryCount = atoi(p->second.c_str()); } string reliability; p = rec.theQoS.find("reliability"); if(p != rec.theQoS.end()) { reliability = p->second; } if(!reliability.empty() && reliability != "ordered") { throw BadQoS("invalid reliability: " + reliability); } // // Override the timeout. // shared_ptr newObj; try { newObj = rec.obj->ice_timeout(static_cast(instance->sendTimeout().count())); } catch(const Ice::FixedProxyException&) { // // In the event IceStorm is collocated this could be a // fixed proxy in which case its not possible to set the // timeout. // newObj = rec.obj; } p = rec.theQoS.find("locatorCacheTimeout"); if(p != rec.theQoS.end()) { istringstream is(IceUtilInternal::trim(p->second)); int locatorCacheTimeout; if(!(is >> locatorCacheTimeout) || !is.eof()) { throw BadQoS("invalid locator cache timeout (numeric value required): " + p->second); } newObj = newObj->ice_locatorCacheTimeout(locatorCacheTimeout); } p = rec.theQoS.find("connectionCached"); if(p != rec.theQoS.end()) { istringstream is(IceUtilInternal::trim(p->second)); int connectionCached; if(!(is >> connectionCached) || !is.eof()) { throw BadQoS("invalid connection cached setting (numeric value required): " + p->second); } newObj = newObj->ice_connectionCached(connectionCached > 0); } if(newObj->ice_isBatchOneway()) { // Use Oneway in case of Batch Oneway newObj = newObj->ice_oneway(); } else if(newObj->ice_isBatchDatagram()) { // Use Datagram in case of Batch Datagram newObj = newObj->ice_datagram(); } if(reliability == "ordered") { if(!newObj->ice_isTwoway()) { throw BadQoS("ordered reliability requires a twoway proxy"); } subscriber = make_shared(instance, rec, proxy, retryCount, 1, newObj); } else if(newObj->ice_isOneway() || newObj->ice_isDatagram()) { subscriber = make_shared(instance, rec, proxy, retryCount, newObj); } else //if(newObj->ice_isTwoway()) { assert(newObj->ice_isTwoway()); subscriber = make_shared(instance, rec, proxy, retryCount, 5, newObj); } per->setSubscriber(subscriber); } catch(const Ice::Exception&) { instance->publishAdapter()->remove(proxy->ice_getIdentity()); throw; } return subscriber; } } shared_ptr Subscriber::proxy() const { return _proxyReplica; } Ice::Identity Subscriber::id() const { return _rec.id; } SubscriberRecord Subscriber::record() const { return _rec; } bool Subscriber::queue(bool forwarded, const EventDataSeq& events) { lock_guard lg(_mutex); // If this is a link subscriber if the set of events were // forwarded from another IceStorm instance then do not queue the // events. if(forwarded && _rec.link) { return true; } switch(_state) { case SubscriberStateOffline: { if(chrono::steady_clock::now() < _next) { break; } // // State transition to online. // setState(SubscriberStateOnline); } /* FALLTHROUGH */ case SubscriberStateOnline: { for(EventDataSeq::const_iterator p = events.begin(); p != events.end(); ++p) { if(static_cast(_events.size()) == _instance->sendQueueSizeMax()) { if(_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber) { error(false, make_exception_ptr(SendQueueSizeMaxReached(__FILE__, __LINE__))); return false; } else // DropEvents { _events.pop_front(); } } _events.push_back(*p); } if(_observer) { _observer->queued(static_cast(events.size())); } flush(); break; } case SubscriberStateError: return false; case SubscriberStateReaped: break; } return true; } bool Subscriber::reap() { lock_guard lg(_mutex); assert(_state >= SubscriberStateError); if(_state == SubscriberStateError) { setState(SubscriberStateReaped); return true; } return false; } void Subscriber::resetIfReaped() { lock_guard lg(_mutex); if(_state == SubscriberStateReaped) { setState(SubscriberStateError); } } bool Subscriber::errored() const { lock_guard lg(_mutex); return _state >= SubscriberStateError; } void Subscriber::destroy() { // // Clear the per-subscriber object if it exists. // if(_proxy) { try { _instance->publishAdapter()->remove(_proxy->ice_getIdentity()); } catch(const Ice::NotRegisteredException&) { // Ignore } catch(const Ice::ObjectAdapterDeactivatedException&) { // Ignore } } lock_guard lg(_mutex); _observer.detach(); } void Subscriber::completed() { lock_guard lg(_mutex); // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); if(_observer) { _observer->delivered(_outstandingCount); } // // A successful response means we're no longer retrying, we're // back active. // _currentRetry = 0; if(_events.empty() && _outstanding == 0 && _shutdown) { _condVar.notify_one(); } else { flush(); } } void Subscriber::error(bool dec, exception_ptr e) { lock_guard lg(_mutex); if(dec) { // Decrement the _outstanding count. --_outstanding; assert(_outstanding >= 0 && _outstanding < _maxOutstanding); } // // It's possible to be already in the error state if the queue maximum size // has been reached or if an ObjectNotExistException occured before. // if(_state >= SubscriberStateError) { if(_shutdown) { _condVar.notify_one(); } return; } // A hard error is an ObjectNotExistException, NotRegisteredException, or SendQueueSizeMaxReached bool hardError; string what; try { rethrow_exception(e); } catch(const Ice::ObjectNotExistException& ex) { hardError = true; what = ex.what(); } catch(const Ice::NotRegisteredException& ex) { hardError = true; what = ex.what(); } catch(const SendQueueSizeMaxReached& ex) { hardError = true; what = ex.what(); } catch(const std::exception& ex) { hardError = false; what = ex.what(); } // // A twoway subscriber can queue multiple send events and // therefore its possible to get multiple error'd replies. Ignore // replies if we're retrying and its not yet time to process the // next request. // auto now = std::chrono::steady_clock::now(); if(!hardError && _state == SubscriberStateOffline && now < _next) { return; } // // If we're in our retry limits and the error isn't a hard failure // (that is ObjectNotExistException or NotRegisteredException) // then we transition to an offline state. // if(!hardError && (_retryCount == -1 || _currentRetry < _retryCount)) { assert(_state < SubscriberStateError); auto traceLevels = _instance->traceLevels(); if(_currentRetry == 0) { Ice::Warning warn(traceLevels->logger); warn << traceLevels->subscriberCat << ":" << _instance->communicator()->identityToString(_rec.id); if(traceLevels->subscriber > 1) { warn << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj); } warn << " subscriber offline: " << what << " discarding events: " << _instance->discardInterval().count() << "s retryCount: " << _retryCount; } else { if(traceLevels->subscriber > 0) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); out << _instance->communicator()->identityToString(_rec.id); if(traceLevels->subscriber > 1) { out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj); } out << " subscriber offline: " << what << " discarding events: " << _instance->discardInterval().count() << "s retry: " << _currentRetry << "/" << _retryCount; } } // Transition to offline state, increment the retry count and // clear all queued events. _next = now + _instance->discardInterval(); ++_currentRetry; _events.clear(); setState(SubscriberStateOffline); } // Errored out. else if(_state < SubscriberStateError) { _events.clear(); setState(SubscriberStateError); auto traceLevels = _instance->traceLevels(); if(traceLevels->subscriber > 0) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); out << _instance->communicator()->identityToString(_rec.id); if(traceLevels->subscriber > 1) { out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj); } out << " subscriber errored out: " << what << " retry: " << _currentRetry << "/" << _retryCount; } } if(_shutdown && _events.empty()) { _condVar.notify_one(); } } void Subscriber::shutdown() { unique_lock lock(_mutex); _shutdown = true; while(_outstanding > 0 && !_events.empty()) { _condVar.wait(lock); } _observer.detach(); } void Subscriber::updateObserver() { lock_guard lg(_mutex); if(_instance->observer()) { _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), _rec.topicName, _rec.obj, _rec.theQoS, _rec.theTopic, toSubscriberState(_state), _observer.get())); } } Subscriber::Subscriber(shared_ptr instance, SubscriberRecord rec, shared_ptr proxy, int retryCount, int maxOutstanding) : _instance(move(instance)), _rec(move(rec)), _retryCount(retryCount), _maxOutstanding(maxOutstanding), _proxy(move(proxy)), _proxyReplica(_proxy), _shutdown(false), _state(SubscriberStateOnline), _outstanding(0), _outstandingCount(1), _currentRetry(0) { if(_proxy && _instance->publisherReplicaProxy()) { const_cast&>(_proxyReplica) = _instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity()); } if(_instance->observer()) { _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), _rec.topicName, _rec.obj, _rec.theQoS, _rec.theTopic, toSubscriberState(_state), 0)); } } namespace { string stateToString(Subscriber::SubscriberState state) { switch(state) { case Subscriber::SubscriberStateOnline: return "online"; case Subscriber::SubscriberStateOffline: return "offline"; case Subscriber::SubscriberStateError: return "error"; case Subscriber::SubscriberStateReaped: return "reaped"; default: return "???"; } } } void Subscriber::setState(Subscriber::SubscriberState state) { if(state != _state) { auto traceLevels = _instance->traceLevels(); if(traceLevels->subscriber > 1) { Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat); out << "endpoints: " << IceStormInternal::describeEndpoints(_rec.obj) << " transition from: " << stateToString(_state) << " to: " << stateToString(state); } _state = state; if(_instance->observer()) { _observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(), _rec.topicName, _rec.obj, _rec.theQoS, _rec.theTopic, toSubscriberState(_state), _observer.get())); } } } bool IceStorm::operator==(const shared_ptr& subscriber, const Ice::Identity& id) { return subscriber->id() == id; } bool IceStorm::operator==(const Subscriber& s1, const Subscriber& s2) { return &s1 == &s2; } bool IceStorm::operator!=(const Subscriber& s1, const Subscriber& s2) { return &s1 != &s2; } bool IceStorm::operator<(const Subscriber& s1, const Subscriber& s2) { return &s1 < &s2; }