summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscribers.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscribers.cpp')
-rw-r--r--cpp/src/IceStorm/Subscribers.cpp153
1 files changed, 90 insertions, 63 deletions
diff --git a/cpp/src/IceStorm/Subscribers.cpp b/cpp/src/IceStorm/Subscribers.cpp
index 2aca83c3ca9..17b42bdc605 100644
--- a/cpp/src/IceStorm/Subscribers.cpp
+++ b/cpp/src/IceStorm/Subscribers.cpp
@@ -29,19 +29,19 @@ using namespace IceStorm;
//
// Per Subscriber object.
//
-namespace IceStorm
+namespace
{
-class PerSubscriberPublisherProxyI : public Ice::BlobjectArray
+class PerSubscriberPublisherI : public Ice::BlobjectArray
{
public:
- PerSubscriberPublisherProxyI(const InstancePtr& instance) :
+ PerSubscriberPublisherI(const InstancePtr& instance) :
_instance(instance)
{
}
- ~PerSubscriberPublisherProxyI()
+ ~PerSubscriberPublisherI()
{
}
@@ -56,11 +56,18 @@ public:
vector<Ice::Byte>&,
const Ice::Current& current)
{
- EventPtr event = new Event;
- event->op = current.operation;
- event->mode = current.mode;
- vector<Ice::Byte>(inParams.first, inParams.second).swap(event->data);
- event->context = current.ctx;
+ EventPtr event = new Event(
+ current.operation,
+ current.mode,
+ Ice::ByteSeq(),
+ current.ctx);
+
+ //
+ // COMPILERBUG: gcc 4.0.1 doesn't like this.
+ //
+ //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
+ Ice::ByteSeq data(inParams.first, inParams.second);
+ event->data.swap(data);
EventSeq e;
e.push_back(event);
@@ -70,7 +77,7 @@ public:
{
list<SubscriberPtr> l;
l.push_back(_subscriber);
- _instance->subscriberPool()->add(l);
+ _instance->subscriberPool()->flush(l);
}
return true;
}
@@ -80,11 +87,11 @@ private:
const InstancePtr _instance;
/*const*/ SubscriberPtr _subscriber;
};
-typedef IceUtil::Handle<PerSubscriberPublisherProxyI> PerSubscriberPublisherProxyIPtr;
+typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr;
}
// Each of the various Subscriber types.
-namespace IceStorm
+namespace
{
class SubscriberOneway : public Subscriber
@@ -187,24 +194,18 @@ SubscriberOneway::flush()
IceUtil::Mutex::Lock sync(_mutex);
//
- // If another thread is busy delivering events or we're no longer
- // active then we have nothing left to do.
+ // If the subscriber errored out then we're done.
//
- if(_state != StateActive || _events.empty())
+ if(_state != StateActive)
{
_busy = false;
return false;
}
+ assert(!_events.empty());
assert(_busy);
try
{
- if(_state != StateActive)
- {
- assert(!_busy);
- return false;
- }
-
//
// Get the current set of events, but release the lock before
// attempting to deliver the events. This allows other threads
@@ -216,15 +217,25 @@ SubscriberOneway::flush()
sync.release();
// XXX:
-/*
TraceLevelsPtr traceLevels = _instance->traceLevels();
if(_obj->ice_getIdentity().name.substr(0, 4) == "slow")
{
- Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
- out << "deliberately stalling";
- sleep(1);
+ //Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ //out << "deliberately stalling";
+ sleep(2);
+ }
+ if(_obj->ice_getIdentity().name.substr(0, 5) == "block")
+ {
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << "-> stall for 100s";
+ }
+ sleep(100);
+ {
+ Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
+ out << "<- stall for 100s";
+ }
}
-*/
//
// Deliver the events without holding the lock.
@@ -256,15 +267,12 @@ SubscriberOneway::flush()
// Reacquire the lock before we check the queue again.
//
sync.acquire();
-
+
//
// If there have been more events queued in the meantime then
// we are still busy.
//
- if(_events.empty())
- {
- _busy = false;
- }
+ _busy = !_events.empty();
}
catch(const Ice::LocalException& ex)
{
@@ -287,7 +295,7 @@ SubscriberOneway::destroy()
Subscriber::destroy();
}
-namespace IceStorm
+namespace
{
class TwowayInvokeI : public Ice::AMI_Object_ice_invoke
@@ -332,22 +340,16 @@ SubscriberTwoway::flush()
IceUtil::Mutex::Lock sync(_mutex);
//
- // If another thread is busy delivering events or we're no longer
- // active then we have nothing left to do.
+ // If the subscriber errored out then we're done.
//
- if(_state != StateActive || _events.empty())
+ if(_state != StateActive)
{
_busy = false;
return false;
}
+ assert(!_events.empty());
assert(_busy);
- if(_state != StateActive)
- {
- assert(!_busy);
- return false;
- }
-
//
// Get the current set of events, but release the lock before
// attempting to deliver the events. This allows other threads
@@ -372,17 +374,15 @@ SubscriberTwoway::flush()
sync.acquire();
//
- // If there have been more events queued in the meantime then we
- // are still busy.
+ // If there have been more events queued in the meantime then
+ // we are still busy.
//
- if(_events.empty())
- {
- _busy = false;
- }
+ _busy = !_events.empty();
+
return _busy;
}
-namespace IceStorm
+namespace
{
class TwowayOrderedInvokeI : public Ice::AMI_Object_ice_invoke
@@ -428,23 +428,24 @@ SubscriberTwowayOrdered::flush()
EventPtr e;
{
IceUtil::Mutex::Lock sync(_mutex);
-
+
//
- // If another thread is busy delivering events or we're no longer
- // active then we have nothing left to do.
+ // If the subscriber errored out then we're done.
//
- if(_state != StateActive || _events.empty())
+ if(_state != StateActive)
{
_busy = false;
return false;
}
+ assert(!_events.empty());
assert(_busy);
e = _events.front();
_events.erase(_events.begin());
}
-
+
_obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context);
+
return false;
}
@@ -454,15 +455,15 @@ SubscriberTwowayOrdered::response()
EventPtr e;
{
IceUtil::Mutex::Lock sync(_mutex);
-
+
assert(_state == StateActive && _busy);
-
+
if(_events.empty())
{
_busy = false;
return;
}
-
+
e = _events.front();
_events.erase(_events.begin());
}
@@ -470,7 +471,7 @@ SubscriberTwowayOrdered::response()
_obj->ice_invoke_async(new TwowayOrderedInvokeI(this), e->op, e->mode, e->data, e->context);
}
-namespace IceStorm
+namespace
{
class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward
@@ -590,20 +591,21 @@ SubscriberLink::flush()
IceUtil::Mutex::Lock sync(_mutex);
//
- // If another thread is busy delivering events or we're no longer
- // active then we have nothing left to do.
+ // If the subscriber errored out then we're done.
//
- if(_state != StateActive || _events.empty())
+ if(_state != StateActive)
{
_busy = false;
return false;
}
+ assert(!_events.empty());
assert(_busy);
-
+
v.swap(_events);
}
_obj->forward_async(new Topiclink_forwardI(this), v);
+
return false;
}
@@ -634,7 +636,7 @@ Subscriber::create(
const Ice::ObjectPrx& obj,
const IceStorm::QoS& qos)
{
- PerSubscriberPublisherProxyIPtr per = new PerSubscriberPublisherProxyI(instance);
+ PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance);
Ice::ObjectPrx proxy = instance->objectAdapter()->addWithUUID(per);
TraceLevelsPtr traceLevels = instance->traceLevels();
SubscriberPtr subscriber;
@@ -791,6 +793,29 @@ Subscriber::destroy()
}
void
+Subscriber::flushTime(const IceUtil::Time& interval)
+{
+ if(_resetMax || interval > _maxSend)
+ {
+ assert(interval != IceUtil::Time());
+ _resetMax = false;
+ _maxSend = interval;
+ }
+}
+
+IceUtil::Time
+Subscriber::pollMaxFlushTime(const IceUtil::Time& now)
+{
+ //IceUtil::Time max = _maxSend;
+ //_maxSend = _maxSend * 0.95;
+ //return max;
+
+ // The next call to flushTime can reset the max time.
+ _resetMax = true;
+ return _maxSend;
+}
+
+void
Subscriber::setError(const Ice::Exception& e)
{
IceUtil::Mutex::Lock sync(_mutex);
@@ -836,7 +861,9 @@ Subscriber::Subscriber(
_persistent(persistent),
_proxy(proxy),
_state(StateActive),
- _busy(false)
+ _busy(false),
+ _resetMax(true),
+ _maxSend(IceUtil::Time::seconds(60*24)) // A long time
{
}