summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.h
blob: 8eb0c18bc6191b050cafe5f9d34405205d7a1f33 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
// **********************************************************************
//
// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved.
//
// This copy of Ice is licensed to you under the terms described in the
// ICE_LICENSE file included in this distribution.
//
// **********************************************************************

#ifndef SUBSCRIBER_H
#define SUBSCRIBER_H

#include <IceStorm/IceStormInternal.h>
#include <IceStorm/SubscriberRecord.h>
#include <IceStorm/Instrumentation.h>
#include <Ice/ObserverHelper.h>
#include <IceUtil/RecMutex.h>

namespace IceStorm
{

class Instance;
typedef IceUtil::Handle<Instance> InstancePtr;

class Subscriber;
typedef IceUtil::Handle<Subscriber> SubscriberPtr;

class Subscriber : public IceUtil::Shared
{
public:

    static SubscriberPtr create(const InstancePtr&, const IceStorm::SubscriberRecord&);

    ~Subscriber();

    Ice::ObjectPrx proxy() const; // Get the per subscriber object.
    Ice::Identity id() const; // Return the id of the subscriber.
    IceStorm::SubscriberRecord record() const; // Get the subscriber record.

    // Returns false if the subscriber should be reaped.
    bool queue(bool, const EventDataSeq&);
    bool reap();
    void resetIfReaped();
    bool errored() const;

    void destroy();

    // To be called by the AMI callbacks only.
    void completed(const Ice::AsyncResultPtr&);
    void error(bool, const Ice::Exception&);

    void shutdown();

    void updateObserver();

    enum SubscriberState
    {
        SubscriberStateOnline, // Online waiting to send events.
        SubscriberStateOffline, // Offline, retrying.
        SubscriberStateError, // Error state, awaiting reaping.
        SubscriberStateReaped // Reaped.
    };

    virtual void flush() = 0;

protected:

    void setState(SubscriberState);

    Subscriber(const InstancePtr&, const IceStorm::SubscriberRecord&, const Ice::ObjectPrx&, int, int);

    // Immutable
    const InstancePtr _instance;
    const IceStorm::SubscriberRecord _rec; // The subscriber record.
    const int _retryCount; // The retryCount.
    const int _maxOutstanding; // The maximum number of oustanding events.
    const Ice::ObjectPrx _proxy; // The per subscriber object proxy, if any.
    const Ice::ObjectPrx _proxyReplica; // The replicated per subscriber object proxy, if any.

    IceUtil::Monitor<IceUtil::RecMutex> _lock;

    bool _shutdown;

    SubscriberState _state; // The subscriber state.

    int _outstanding; // The current number of outstanding responses.
    int _outstandingCount; // The current number of outstanding events when batching events (only used for metrics).
    EventDataSeq _events; // The queue of events to send.

    // The next time to try sending a new event if we're offline.
    IceUtil::Time _next;
    int _currentRetry;

    IceInternal::ObserverHelperT<IceStorm::Instrumentation::SubscriberObserver> _observer;
};

bool operator==(const IceStorm::SubscriberPtr&, const Ice::Identity&);
bool operator==(const IceStorm::Subscriber&, const IceStorm::Subscriber&);
bool operator!=(const IceStorm::Subscriber&, const IceStorm::Subscriber&);
bool operator<(const IceStorm::Subscriber&, const IceStorm::Subscriber&);

}

#endif // SUBSCRIBER_H