summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.h
diff options
context:
space:
mode:
authorMatthew Newhook <matthew@zeroc.com>2008-02-29 15:51:11 +0800
committerMatthew Newhook <matthew@zeroc.com>2008-02-29 16:39:54 +0800
commitfb4132881dde7c9b135d713a06a3b64db1f706db (patch)
tree8a037e9d4cae7ed15360ab0878d14b32ac3150a4 /cpp/src/IceStorm/Subscriber.h
parentfixing mode on php/config/Make.rules.mak (diff)
downloadice-fb4132881dde7c9b135d713a06a3b64db1f706db.tar.bz2
ice-fb4132881dde7c9b135d713a06a3b64db1f706db.tar.xz
ice-fb4132881dde7c9b135d713a06a3b64db1f706db.zip
Merge HA IceStorm branch.
- http://bugzilla/bugzilla/show_bug.cgi?id=2706 - http://bugzilla/bugzilla/show_bug.cgi?id=2705
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.h')
-rw-r--r--cpp/src/IceStorm/Subscriber.h94
1 files changed, 47 insertions, 47 deletions
diff --git a/cpp/src/IceStorm/Subscriber.h b/cpp/src/IceStorm/Subscriber.h
index 787ec81ce84..14e4a8efd4d 100644
--- a/cpp/src/IceStorm/Subscriber.h
+++ b/cpp/src/IceStorm/Subscriber.h
@@ -7,10 +7,12 @@
//
// **********************************************************************
-#ifndef SUBSCRIBERS_H
-#define SUBSCRIBERS_H
+#ifndef SUBSCRIBER_H
+#define SUBSCRIBER_H
-#include <IceStorm/IceStormInternal.h> // F
+#include <IceStorm/IceStormInternal.h>
+#include <IceStorm/SubscriberRecord.h>
+#include <IceUtil/RecMutex.h>
namespace IceStorm
{
@@ -25,66 +27,64 @@ class Subscriber : public IceUtil::Shared
{
public:
- static SubscriberPtr create(const InstancePtr&, const std::string&, const Ice::ObjectPrx&, const IceStorm::QoS&);
- static SubscriberPtr create(const InstancePtr&, const TopicLinkPrx&, int);
+ static SubscriberPtr create(const InstancePtr&, const IceStorm::SubscriberRecord&);
~Subscriber();
- Ice::ObjectPrx proxy() const;
- Ice::Identity id() const;
- bool persistent() const;
+ Ice::ObjectPrx proxy() const; // Get the per subscriber object.
+ Ice::Identity id() const; // Return the id of the subscriber.
+ IceStorm::SubscriberRecord record() const; // Get the subscriber record.
- enum QueueState
+ // Returns false if the subscriber should be reaped.
+ bool queue(bool, const EventDataSeq&);
+ bool reap();
+ void resetIfReaped();
+ bool errored() const;
+
+ void destroy();
+
+ // To be called by the AMI callbacks only.
+ void error(bool, const Ice::Exception&);
+ void response();
+
+ void shutdown();
+
+ enum SubscriberState
{
- QueueStateError,
- QueueStateFlush,
- QueueStateNoFlush
+ SubscriberStateOnline, // Online waiting to send events.
+ SubscriberStateOffline, // Offline, retrying.
+ SubscriberStateError, // Error state, awaiting reaping.
+ SubscriberStateReaped // Reaped.
};
- virtual QueueState queue(bool, const std::vector<EventDataPtr>&);
- //
- // Return true if flush() must be called again, false otherwise.
- //
- virtual bool flush() = 0;
- virtual void destroy();
-
- //
- // These methods must only be called by the SubscriberPool they
- // are not internally mutex protected.
- //
- void flushTime(const IceUtil::Time&);
- IceUtil::Time pollMaxFlushTime(const IceUtil::Time&);
-
- void error(const Ice::Exception&);
+
+ virtual void flush() = 0;
protected:
- Subscriber(const InstancePtr&, const Ice::ObjectPrx&, bool, const Ice::Identity&);
+ void setState(SubscriberState);
+
+ Subscriber(const InstancePtr&, const IceStorm::SubscriberRecord&, const Ice::ObjectPrx&, int, int);
// Immutable
const InstancePtr _instance;
- const Ice::Identity _id;
- const bool _persistent;
- const Ice::ObjectPrx _proxy;
+ const IceStorm::SubscriberRecord _rec; // The subscriber record.
+ const int _retryCount; // The retryCount.
+ const int _maxOutstanding; // The maximum number of oustanding events.
+ const Ice::ObjectPrx _proxy; // The per subscriber object proxy, if any.
+ const Ice::ObjectPrx _proxyReplica; // The replicated per subscriber object proxy, if any.
- IceUtil::Mutex _mutex;
+ IceUtil::Monitor<IceUtil::RecMutex> _lock;
+
+ bool _shutdown;
- enum SubscriberState
- {
- SubscriberStateOnline,
- SubscriberStateFlushPending,
- SubscriberStateSending,
- SubscriberStateOffline,
- SubscriberStateError
- };
SubscriberState _state; // The subscriber state.
+
+ int _outstanding; // The current number of outstanding responses.
EventDataSeq _events; // The queue of events to send.
- //
- // Not protected by _mutex. These members are protected by the
- // SubscriberPool mutex.
- //
- bool _resetMax;
- IceUtil::Time _maxSend;
+ // The next to try sending a new event if we're offline.
+ IceUtil::Time _next;
+ int _currentRetry;
};
bool operator==(const IceStorm::SubscriberPtr&, const Ice::Identity&);
@@ -94,4 +94,4 @@ bool operator<(const IceStorm::Subscriber&, const IceStorm::Subscriber&);
}
-#endif // SUBSCRIBERS_H
+#endif // SUBSCRIBER_H