summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Flusher.cpp
blob: 3fb95ec071515bf49c2ab37b500d7e87039c7735 (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
// **********************************************************************
//
// Copyright (c) 2001
// MutableRealms, Inc.
// Huntsville, AL, USA
//
// All Rights Reserved
//
// **********************************************************************

#include <Ice/Ice.h>
#include <Ice/Functional.h>

#include <IceStorm/Flusher.h>
#include <IceStorm/Subscriber.h>
#include <IceStorm/TraceLevels.h>

#include <algorithm>

using namespace IceStorm;
using namespace std;

void IceStorm::incRef(Flusher* p) { p->__incRef(); }
void IceStorm::decRef(Flusher* p) { p->__decRef(); }

namespace IceStorm
{

class FlusherThread : public JTCThread, public JTCMonitor
{
public:

    FlusherThread(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels) :
	_traceLevels(traceLevels),
	_logger(communicator->getLogger())
    {
	Ice::PropertiesPtr properties = communicator->getProperties();
	string value;
	value = properties->getProperty("IceStorm.Flush.Timeout");
	if (!value.empty())
	{
	    _flushTime = atoi(value.c_str());
	    if (_flushTime < 100)
	    {
		_flushTime = 100; // Minimum of 100 ms
	    }
	}
	else
	{
	    _flushTime = 1000; // Default of 1 second
	}
    }
    
    ~FlusherThread()
    {
    }

    virtual void
    run()
    {
	JTCSyncT<JTCMonitor> sync(*this);
	while (!_destroy)
	{
	    long tout = calcTimeout();
	    try
	    {
		if (tout == 0)
		{
		    wait();
		}
		else
		{
		    wait(tout);
		}
	    }
	    catch(const JTCInterruptedException&)
	    {
	    }
	    if (_destroy)
	    {
		continue;
	    }
	    flushAll();
	}
    }

    void
    destroy()
    {
	JTCSyncT<JTCMonitor> sync(*this);
	_destroy = true;
	notify();
    }

    //
    // It would be possible to write add/remove in such a way as to
    // avoid blocking while flushing by having a queue of actions
    // which are only performed before flushing. For now, however,
    // this issue is ignored.
    //
    void
    add(const SubscriberPtr& subscriber)
    {
	JTCSyncT<JTCMonitor> sync(*this);
	bool isEmpty = _subscribers.empty();
	_subscribers.push_back(subscriber);

	//
	// If the set of subscribers was previously empty then wake up
	// the flushing thread since it will be waiting indefinitely
	//
	if (isEmpty)
	{
	    notify();
	}
    }

    void
    remove(const SubscriberPtr& subscriber)
    {
	JTCSyncT<JTCMonitor> sync(*this);
	_subscribers.remove(subscriber);
    }


private:

    void
    flushAll()
    {
	// This is always called with the monitor locked
	//JTCSyncT<JTCMonitor> sync(*this);

	//
	// Using standard algorithms I don't think there is a way to
	// do this in one pass. For instance, I thought about using
	// remove_if - but the predicate needs to be a pure function
	// (see Meyers for details). If this is fixed then fix TopicI
	// also.
	//
	_subscribers.remove_if(::Ice::constMemFun(&Subscriber::invalid));
	for_each(_subscribers.begin(), _subscribers.end(), Ice::voidMemFun(&Subscriber::flush));

	//
	// Trace after the flush so that the correct number of objects
	// are displayed
	//
	if (_traceLevels->flush > 0)
	{
	    ostringstream s;
	    s << _subscribers.size() << " object(s)";
	    
	    _logger->trace(_traceLevels->flushCat, s.str());
	}
    }

    long
    calcTimeout()
    {
	return (_subscribers.empty()) ? 0 : _flushTime;
    }

    TraceLevelsPtr _traceLevels;
    Ice::LoggerPtr _logger;

    SubscriberList _subscribers;
    bool _destroy;
    long _flushTime;
};

} // End namespace IceStorm

Flusher::Flusher(const Ice::CommunicatorPtr& communicator, const TraceLevelsPtr& traceLevels)
{
    _thread = new FlusherThread(communicator, traceLevels);
    _thread->start();
}

Flusher::~Flusher()
{
    _thread->destroy();
    while(_thread->isAlive())
    {
	try
	{
	    _thread->join();
	}
	catch(const JTCInterruptedException&)
	{
	}
    }
}

void
Flusher::add(const SubscriberPtr& subscriber)
{
    _thread->add(subscriber);
}

void
Flusher::remove(const SubscriberPtr& subscriber)
{
    _thread->remove(subscriber);
}

void
Flusher::stopFlushing()
{
    _thread->destroy();
}