diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.h')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.h | 32 |
1 files changed, 15 insertions, 17 deletions
diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h index af5cc721f69..a192d11e0e8 100644 --- a/cpp/src/IceStorm/Subscriber.h +++ b/cpp/src/IceStorm/Subscriber.h @@ -9,24 +9,21 @@ #include <IceStorm/SubscriberRecord.h> #include <IceStorm/Instrumentation.h> #include <Ice/ObserverHelper.h> -#include <IceUtil/RecMutex.h> + +#include<condition_variable> namespace IceStorm { class Instance; -typedef IceUtil::Handle<Instance> InstancePtr; - -class Subscriber; -typedef IceUtil::Handle<Subscriber> SubscriberPtr; -class Subscriber : public IceUtil::Shared +class Subscriber : public std::enable_shared_from_this<Subscriber> { public: - static SubscriberPtr create(const InstancePtr&, const IceStorm::SubscriberRecord&); + static std::shared_ptr<Subscriber> create(const std::shared_ptr<Instance>&, const IceStorm::SubscriberRecord&); - Ice::ObjectPrx proxy() const; // Get the per subscriber object. + std::shared_ptr<Ice::ObjectPrx> proxy() const; // Get the per subscriber object. Ice::Identity id() const; // Return the id of the subscriber. IceStorm::SubscriberRecord record() const; // Get the subscriber record. @@ -39,8 +36,8 @@ public: void destroy(); // To be called by the AMI callbacks only. - void completed(const Ice::AsyncResultPtr&); - void error(bool, const Ice::Exception&); + void completed(); + void error(bool, std::exception_ptr); void shutdown(); @@ -60,17 +57,18 @@ protected: void setState(SubscriberState); - Subscriber(const InstancePtr&, const IceStorm::SubscriberRecord&, const Ice::ObjectPrx&, int, int); + Subscriber(std::shared_ptr<Instance>, IceStorm::SubscriberRecord, std::shared_ptr<Ice::ObjectPrx>, int, int); // Immutable - const InstancePtr _instance; + const std::shared_ptr<Instance> _instance; const IceStorm::SubscriberRecord _rec; // The subscriber record. const int _retryCount; // The retryCount. const int _maxOutstanding; // The maximum number of oustanding events. - const Ice::ObjectPrx _proxy; // The per subscriber object proxy, if any. - const Ice::ObjectPrx _proxyReplica; // The replicated per subscriber object proxy, if any. + const std::shared_ptr<Ice::ObjectPrx> _proxy; // The per subscriber object proxy, if any. + const std::shared_ptr<Ice::ObjectPrx> _proxyReplica; // The replicated per subscriber object proxy, if any. - IceUtil::Monitor<IceUtil::RecMutex> _lock; + mutable std::recursive_mutex _mutex; + std::condition_variable_any _condVar; bool _shutdown; @@ -81,13 +79,13 @@ protected: EventDataSeq _events; // The queue of events to send. // The next time to try sending a new event if we're offline. - IceUtil::Time _next; + std::chrono::steady_clock::time_point _next; int _currentRetry; IceInternal::ObserverHelperT<IceStorm::Instrumentation::SubscriberObserver> _observer; }; -bool operator==(const IceStorm::SubscriberPtr&, const Ice::Identity&); +bool operator==(const std::shared_ptr<IceStorm::Subscriber>&, const Ice::Identity&); bool operator==(const IceStorm::Subscriber&, const IceStorm::Subscriber&); bool operator!=(const IceStorm::Subscriber&, const IceStorm::Subscriber&); bool operator<(const IceStorm::Subscriber&, const IceStorm::Subscriber&); |