diff options
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.h')
-rw-r--r-- | cpp/src/IceStorm/Subscriber.h | 94 |
1 files changed, 47 insertions, 47 deletions
diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h index 787ec81ce84..14e4a8efd4d 100644 --- a/cpp/src/IceStorm/Subscriber.h +++ b/cpp/src/IceStorm/Subscriber.h @@ -7,10 +7,12 @@ // // ********************************************************************** -#ifndef SUBSCRIBERS_H -#define SUBSCRIBERS_H +#ifndef SUBSCRIBER_H +#define SUBSCRIBER_H -#include <IceStorm/IceStormInternal.h> // F +#include <IceStorm/IceStormInternal.h> +#include <IceStorm/SubscriberRecord.h> +#include <IceUtil/RecMutex.h> namespace IceStorm { @@ -25,66 +27,64 @@ class Subscriber : public IceUtil::Shared { public: - static SubscriberPtr create(const InstancePtr&, const std::string&, const Ice::ObjectPrx&, const IceStorm::QoS&); - static SubscriberPtr create(const InstancePtr&, const TopicLinkPrx&, int); + static SubscriberPtr create(const InstancePtr&, const IceStorm::SubscriberRecord&); ~Subscriber(); - Ice::ObjectPrx proxy() const; - Ice::Identity id() const; - bool persistent() const; + Ice::ObjectPrx proxy() const; // Get the per subscriber object. + Ice::Identity id() const; // Return the id of the subscriber. + IceStorm::SubscriberRecord record() const; // Get the subscriber record. - enum QueueState + // Returns false if the subscriber should be reaped. + bool queue(bool, const EventDataSeq&); + bool reap(); + void resetIfReaped(); + bool errored() const; + + void destroy(); + + // To be called by the AMI callbacks only. + void error(bool, const Ice::Exception&); + void response(); + + void shutdown(); + + enum SubscriberState { - QueueStateError, - QueueStateFlush, - QueueStateNoFlush + SubscriberStateOnline, // Online waiting to send events. + SubscriberStateOffline, // Offline, retrying. + SubscriberStateError, // Error state, awaiting reaping. + SubscriberStateReaped // Reaped. }; - virtual QueueState queue(bool, const std::vector<EventDataPtr>&); - // - // Return true if flush() must be called again, false otherwise. - // - virtual bool flush() = 0; - virtual void destroy(); - - // - // These methods must only be called by the SubscriberPool they - // are not internally mutex protected. - // - void flushTime(const IceUtil::Time&); - IceUtil::Time pollMaxFlushTime(const IceUtil::Time&); - - void error(const Ice::Exception&); + + virtual void flush() = 0; protected: - Subscriber(const InstancePtr&, const Ice::ObjectPrx&, bool, const Ice::Identity&); + void setState(SubscriberState); + + Subscriber(const InstancePtr&, const IceStorm::SubscriberRecord&, const Ice::ObjectPrx&, int, int); // Immutable const InstancePtr _instance; - const Ice::Identity _id; - const bool _persistent; - const Ice::ObjectPrx _proxy; + const IceStorm::SubscriberRecord _rec; // The subscriber record. + const int _retryCount; // The retryCount. + const int _maxOutstanding; // The maximum number of oustanding events. + const Ice::ObjectPrx _proxy; // The per subscriber object proxy, if any. + const Ice::ObjectPrx _proxyReplica; // The replicated per subscriber object proxy, if any. - IceUtil::Mutex _mutex; + IceUtil::Monitor<IceUtil::RecMutex> _lock; + + bool _shutdown; - enum SubscriberState - { - SubscriberStateOnline, - SubscriberStateFlushPending, - SubscriberStateSending, - SubscriberStateOffline, - SubscriberStateError - }; SubscriberState _state; // The subscriber state. + + int _outstanding; // The current number of outstanding responses. EventDataSeq _events; // The queue of events to send. - // - // Not protected by _mutex. These members are protected by the - // SubscriberPool mutex. - // - bool _resetMax; - IceUtil::Time _maxSend; + // The next to try sending a new event if we're offline. + IceUtil::Time _next; + int _currentRetry; }; bool operator==(const IceStorm::SubscriberPtr&, const Ice::Identity&); @@ -94,4 +94,4 @@ bool operator<(const IceStorm::Subscriber&, const IceStorm::Subscriber&); } -#endif // SUBSCRIBERS_H +#endif // SUBSCRIBER_H |