summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rw-r--r--cpp/src/IceStorm/Subscriber.cpp617
1 files changed, 254 insertions, 363 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index 9df599eec3c..9edfe04ea21 100644
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -21,54 +21,45 @@ using namespace IceStormElection;
namespace
{
-class PerSubscriberPublisherI : public Ice::BlobjectArray
+class PerSubscriberPublisherI final : public Ice::BlobjectArray
{
public:
- PerSubscriberPublisherI(const InstancePtr& instance) :
- _instance(instance)
+ PerSubscriberPublisherI(shared_ptr<Instance> instance) :
+ _instance(move(instance))
{
}
void
- setSubscriber(const SubscriberPtr& subscriber)
+ setSubscriber(shared_ptr<Subscriber> subscriber)
{
- _subscriber = subscriber;
+ _subscriber = move(subscriber);
}
- virtual bool
- ice_invoke(const pair<const Ice::Byte*, const Ice::Byte*>& inParams,
+ bool
+ ice_invoke(pair<const Ice::Byte*, const Ice::Byte*> inParams,
vector<Ice::Byte>&,
- const Ice::Current& current)
+ const Ice::Current& current) override
{
// Use cached reads.
CachedReadHelper unlock(_instance->node(), __FILE__, __LINE__);
- EventDataPtr event = new EventData(
- current.operation,
- current.mode,
- Ice::ByteSeq(),
- current.ctx);
+ EventData event = { current.operation, current.mode, Ice::ByteSeq(), current.ctx };
- //
- // COMPILERBUG: gcc 4.0.1 doesn't like this.
- //
- //event->data.swap(Ice::ByteSeq(inParams.first, inParams.second));
Ice::ByteSeq data(inParams.first, inParams.second);
- event->data.swap(data);
+ event.data.swap(data);
EventDataSeq e;
- e.push_back(event);
- _subscriber->queue(false, e);
+ e.push_back(move(event));
+ _subscriber->queue(false, move(e));
return true;
}
private:
- const InstancePtr _instance;
- /*const*/ SubscriberPtr _subscriber;
+ const shared_ptr<Instance> _instance;
+ shared_ptr<Subscriber> _subscriber;
};
-typedef IceUtil::Handle<PerSubscriberPublisherI> PerSubscriberPublisherIPtr;
IceStorm::Instrumentation::SubscriberState
toSubscriberState(Subscriber::SubscriberState s)
@@ -76,15 +67,15 @@ toSubscriberState(Subscriber::SubscriberState s)
switch(s)
{
case Subscriber::SubscriberStateOnline:
- return IceStorm::Instrumentation::SubscriberStateOnline;
+ return IceStorm::Instrumentation::SubscriberState::SubscriberStateOnline;
case Subscriber::SubscriberStateOffline:
- return IceStorm::Instrumentation::SubscriberStateOffline;
+ return IceStorm::Instrumentation::SubscriberState::SubscriberStateOffline;
case Subscriber::SubscriberStateError:
case Subscriber::SubscriberStateReaped:
- return IceStorm::Instrumentation::SubscriberStateError;
+ return IceStorm::Instrumentation::SubscriberState::SubscriberStateError;
default:
assert(false);
- return IceStorm::Instrumentation::SubscriberStateError;
+ return IceStorm::Instrumentation::SubscriberState::SubscriberStateError;
}
}
@@ -94,231 +85,64 @@ toSubscriberState(Subscriber::SubscriberState s)
namespace
{
-class SubscriberBatch : public Subscriber
-{
-public:
-
- SubscriberBatch(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&);
-
- virtual void flush();
-
- void exception(const Ice::Exception& ex)
- {
- error(false, ex);
- }
-
- void doFlush();
- void sent(bool);
-
-private:
-
- const Ice::ObjectPrx _obj;
- const IceUtil::Time _interval;
-};
-typedef IceUtil::Handle<SubscriberBatch> SubscriberBatchPtr;
-
-class SubscriberOneway : public Subscriber
-{
-public:
-
- SubscriberOneway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, const Ice::ObjectPrx&);
-
- virtual void flush();
-
- void exception(const Ice::Exception& ex)
- {
- error(true, ex);
- }
- void sent(bool);
-
-private:
-
- const Ice::ObjectPrx _obj;
-};
-typedef IceUtil::Handle<SubscriberOneway> SubscriberOnewayPtr;
-
-class SubscriberTwoway : public Subscriber
+class SubscriberOneway final : public Subscriber
{
public:
- SubscriberTwoway(const InstancePtr&, const SubscriberRecord&, const Ice::ObjectPrx&, int, int,
- const Ice::ObjectPrx&);
+ SubscriberOneway(const shared_ptr<Instance>&, const SubscriberRecord&,
+ const shared_ptr<Ice::ObjectPrx>&, int, shared_ptr<Ice::ObjectPrx>);
- virtual void flush();
+ void flush() override;
+ void sentAsynchronously();
private:
- const Ice::ObjectPrx _obj;
+ const shared_ptr<Ice::ObjectPrx> _obj;
};
-class SubscriberLink : public Subscriber
+class SubscriberTwoway final : public Subscriber
{
public:
- SubscriberLink(const InstancePtr&, const SubscriberRecord&);
+ SubscriberTwoway(const shared_ptr<Instance>&, const SubscriberRecord&, const shared_ptr<Ice::ObjectPrx>&, int, int,
+ shared_ptr<Ice::ObjectPrx>);
- virtual void flush();
+ void flush() override;
private:
- const TopicLinkPrx _obj;
+ const shared_ptr<Ice::ObjectPrx> _obj;
};
-class FlushTimerTask : public IceUtil::TimerTask
+class SubscriberLink final : public Subscriber
{
public:
- FlushTimerTask(const SubscriberBatchPtr& subscriber) :
- _subscriber(subscriber)
- {
- }
+ SubscriberLink(const shared_ptr<Instance>&, const SubscriberRecord&);
- virtual void
- runTimerTask()
- {
- _subscriber->doFlush();
- }
+ void flush() override;
private:
- const SubscriberBatchPtr _subscriber;
+ const shared_ptr<TopicLinkPrx> _obj;
};
}
-SubscriberBatch::SubscriberBatch(
- const InstancePtr& instance,
- const SubscriberRecord& rec,
- const Ice::ObjectPrx& proxy,
- int retryCount,
- const Ice::ObjectPrx& obj) :
- Subscriber(instance, rec, proxy, retryCount, 1),
- _obj(obj),
- _interval(instance->flushInterval())
-{
-}
-
-void
-SubscriberBatch::flush()
-{
- if(_state != SubscriberStateOnline || _events.empty())
- {
- return;
- }
-
- if(_outstanding == 0)
- {
- ++_outstanding;
- _instance->batchFlusher()->schedule(new FlushTimerTask(this), _interval);
- }
-}
-
-void
-SubscriberBatch::doFlush()
-{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
- //
- // If the subscriber isn't online we're done.
- //
- if(_state != SubscriberStateOnline)
- {
- return;
- }
-
- EventDataSeq v;
- v.swap(_events);
- assert(!v.empty());
-
- if(_observer)
- {
- _outstandingCount = static_cast<Ice::Int>(v.size());
- _observer->outstanding(_outstandingCount);
- }
-
- try
- {
- vector<Ice::Byte> dummy;
- for(EventDataSeq::const_iterator p = v.begin(); p != v.end(); ++p)
- {
- _obj->ice_invoke((*p)->op, (*p)->mode, (*p)->data, dummy, (*p)->context);
- }
-
- Ice::AsyncResultPtr result = _obj->begin_ice_flushBatchRequests(
- Ice::newCallback_Object_ice_flushBatchRequests(this,
- &SubscriberBatch::exception,
- &SubscriberBatch::sent));
- if(result->sentSynchronously())
- {
- --_outstanding;
- assert(_outstanding == 0);
- if(_observer)
- {
- _observer->delivered(_outstandingCount);
- }
- }
- }
- catch(const Ice::Exception& ex)
- {
- error(false, ex);
- return;
- }
-
- if(_events.empty() && _outstanding == 0 && _shutdown)
- {
- _lock.notify();
- }
-
- // This is significantly faster than the async version, but it can
- // block the calling thread. Bad news!
-
- //_obj->ice_flushBatchRequests();
-}
-
-void
-SubscriberBatch::sent(bool sentSynchronously)
-{
- if(sentSynchronously)
- {
- return;
- }
-
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
- // Decrement the _outstanding count.
- --_outstanding;
- assert(_outstanding == 0);
- if(_observer)
- {
- _observer->delivered(_outstandingCount);
- }
-
- if(_events.empty() && _outstanding == 0 && _shutdown)
- {
- _lock.notify();
- }
- else if(!_events.empty())
- {
- flush();
- }
-
-}
-
-SubscriberOneway::SubscriberOneway(
- const InstancePtr& instance,
- const SubscriberRecord& rec,
- const Ice::ObjectPrx& proxy,
- int retryCount,
- const Ice::ObjectPrx& obj) :
+SubscriberOneway::SubscriberOneway(const shared_ptr<Instance>& instance,
+ const SubscriberRecord& rec,
+ const shared_ptr<Ice::ObjectPrx>& proxy,
+ int retryCount,
+ shared_ptr<Ice::ObjectPrx> obj) :
Subscriber(instance, rec, proxy, retryCount, 5),
- _obj(obj)
+ _obj(move(obj))
{
}
void
SubscriberOneway::flush()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
//
// If the subscriber isn't online we're done.
@@ -335,8 +159,8 @@ SubscriberOneway::flush()
// Dequeue the head event, count one more outstanding AMI
// request.
//
- EventDataPtr e = _events.front();
- _events.erase(_events.begin());
+ EventData e = move(_events.front());
+ _events.pop_front();
if(_observer)
{
_observer->outstanding(1);
@@ -344,11 +168,35 @@ SubscriberOneway::flush()
try
{
- Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
- e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this,
- &SubscriberOneway::exception,
- &SubscriberOneway::sent));
- if(!result->sentSynchronously())
+ auto self = static_pointer_cast<SubscriberOneway>(shared_from_this());
+ auto isSent = make_shared<promise<bool>>();
+ auto future = isSent->get_future();
+
+ _obj->ice_invokeAsync(e.op, e.mode, e.data, nullptr,
+ [self](exception_ptr ex)
+ {
+ self->error(true, ex);
+ },
+ [self, isSent](bool sentSynchronously)
+ {
+ isSent->set_value(sentSynchronously);
+ if(!sentSynchronously)
+ {
+ self->sentAsynchronously();
+ }
+ },
+ e.context);
+
+ //
+ // Check if the request is (or potentially was) sent asynchronously
+ //
+ // If the request was sent synchronously then the isSent promise will have been set during the call
+ // to ice_invokeAsync (sent callback is called immediately after sending from the current thread).
+ //
+ // Otherwise if the request was sent asynchronously but quick enough so that the isSent promise is already
+ // fulfilled, we need to verify the sent callback's sentSynchronously value
+ //
+ if(future.wait_for(0s) != future_status::ready || future.get() == false)
{
++_outstanding;
}
@@ -357,28 +205,23 @@ SubscriberOneway::flush()
_observer->delivered(1);
}
}
- catch(const Ice::Exception& ex)
+ catch(const std::exception&)
{
- error(true, ex);
+ error(false, current_exception());
return;
}
}
if(_events.empty() && _outstanding == 0 && _shutdown)
{
- _lock.notify();
+ _condVar.notify_one();
}
}
void
-SubscriberOneway::sent(bool sentSynchronously)
+SubscriberOneway::sentAsynchronously()
{
- if(sentSynchronously)
- {
- return;
- }
-
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
// Decrement the _outstanding count.
--_outstanding;
@@ -390,7 +233,7 @@ SubscriberOneway::sent(bool sentSynchronously)
if(_events.empty() && _outstanding == 0 && _shutdown)
{
- _lock.notify();
+ _condVar.notify_one();
}
else if(_outstanding <= 0 && !_events.empty())
{
@@ -398,22 +241,21 @@ SubscriberOneway::sent(bool sentSynchronously)
}
}
-SubscriberTwoway::SubscriberTwoway(
- const InstancePtr& instance,
- const SubscriberRecord& rec,
- const Ice::ObjectPrx& proxy,
- int retryCount,
- int maxOutstanding,
- const Ice::ObjectPrx& obj) :
+SubscriberTwoway::SubscriberTwoway(const shared_ptr<Instance>& instance,
+ const SubscriberRecord& rec,
+ const shared_ptr<Ice::ObjectPrx>& proxy,
+ int retryCount,
+ int maxOutstanding,
+ shared_ptr<Ice::ObjectPrx> obj) :
Subscriber(instance, rec, proxy, retryCount, maxOutstanding),
- _obj(obj)
+ _obj(move(obj))
{
}
void
SubscriberTwoway::flush()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
//
// If the subscriber isn't online we're done.
@@ -430,9 +272,11 @@ SubscriberTwoway::flush()
// Dequeue the head event, count one more outstanding AMI
// request.
//
- EventDataPtr e = _events.front();
- _events.erase(_events.begin());
+ EventData e = move(_events.front());
+ _events.pop_front();
+
++_outstanding;
+
if(_observer)
{
_observer->outstanding(1);
@@ -440,12 +284,21 @@ SubscriberTwoway::flush()
try
{
- _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context,
- Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
+ auto self = static_pointer_cast<SubscriberTwoway>(shared_from_this());
+ _obj->ice_invokeAsync(e.op, e.mode, e.data,
+ [self](bool, vector<Ice::Byte>)
+ {
+ self->completed();
+ },
+ [self](exception_ptr ex)
+ {
+ self->error(true, ex);
+ },
+ nullptr, e.context);
}
- catch(const Ice::Exception& ex)
+ catch(const std::exception&)
{
- error(true, ex);
+ error(true, current_exception());
return;
}
}
@@ -454,18 +307,18 @@ SubscriberTwoway::flush()
namespace
{
-SubscriberLink::SubscriberLink(
- const InstancePtr& instance,
- const SubscriberRecord& rec) :
+SubscriberLink::SubscriberLink(const shared_ptr<Instance>& instance,
+ const SubscriberRecord& rec) :
Subscriber(instance, rec, 0, -1, 1),
- _obj(TopicLinkPrx::uncheckedCast(rec.obj->ice_collocationOptimized(false)->ice_timeout(instance->sendTimeout())))
+ _obj(Ice::uncheckedCast<TopicLinkPrx>(
+ rec.obj->ice_collocationOptimized(false)->ice_timeout(static_cast<int>(instance->sendTimeout().count()))))
{
}
void
SubscriberLink::flush()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
if(_state != SubscriberStateOnline || _outstanding > 0)
{
@@ -475,16 +328,23 @@ SubscriberLink::flush()
EventDataSeq v;
v.swap(_events);
- EventDataSeq::iterator p = v.begin();
+ auto p = v.begin();
while(p != v.end())
{
if(_rec.cost != 0)
{
int cost = 0;
- Ice::Context::const_iterator q = (*p)->context.find("cost");
- if(q != (*p)->context.end())
+ auto q = p->context.find("cost");
+ if(q != p->context.end())
{
- cost = atoi(q->second.c_str());
+ try
+ {
+ cost = stoi(q->second);
+ }
+ catch(const std::invalid_argument&)
+ {
+ cost = 0;
+ }
}
if(cost > _rec.cost)
{
@@ -502,39 +362,48 @@ SubscriberLink::flush()
++_outstanding;
if(_observer)
{
- _outstandingCount = static_cast<Ice::Int>(v.size());
+ _outstandingCount = static_cast<int>(v.size());
_observer->outstanding(_outstandingCount);
}
- _obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
+
+ auto self = static_pointer_cast<SubscriberLink>(shared_from_this());
+ _obj->forwardAsync(v,
+ [self]
+ {
+ self->completed();
+ },
+ [self](exception_ptr ex)
+ {
+ self->error(true, ex);
+ });
}
- catch(const Ice::Exception& ex)
+ catch(const std::exception&)
{
- error(true, ex);
+ error(true, current_exception());
}
}
}
}
-SubscriberPtr
-Subscriber::create(
- const InstancePtr& instance,
- const SubscriberRecord& rec)
+shared_ptr<Subscriber>
+Subscriber::create(const shared_ptr<Instance>& instance,
+ const SubscriberRecord& rec)
{
if(rec.link)
{
- return new SubscriberLink(instance, rec);
+ return make_shared<SubscriberLink>(instance, rec);
}
else
{
- PerSubscriberPublisherIPtr per = new PerSubscriberPublisherI(instance);
+ auto per = make_shared<PerSubscriberPublisherI>(instance);
Ice::Identity perId;
perId.category = instance->instanceName();
perId.name = "topic." + rec.topicName + ".publish." +
instance->communicator()->identityToString(rec.obj->ice_getIdentity());
- Ice::ObjectPrx proxy = instance->publishAdapter()->add(per, perId);
- TraceLevelsPtr traceLevels = instance->traceLevels();
- SubscriberPtr subscriber;
+ auto proxy = instance->publishAdapter()->add(per, perId);
+ auto traceLevels = instance->traceLevels();
+ shared_ptr<Subscriber> subscriber;
try
{
@@ -559,10 +428,10 @@ Subscriber::create(
//
// Override the timeout.
//
- Ice::ObjectPrx newObj;
+ shared_ptr<Ice::ObjectPrx> newObj;
try
{
- newObj = rec.obj->ice_timeout(instance->sendTimeout());
+ newObj = rec.obj->ice_timeout(static_cast<int>(instance->sendTimeout().count()));
}
catch(const Ice::FixedProxyException&)
{
@@ -598,26 +467,33 @@ Subscriber::create(
newObj = newObj->ice_connectionCached(connectionCached > 0);
}
+ if(newObj->ice_isBatchOneway())
+ {
+ // Use Oneway in case of Batch Oneway
+ newObj = newObj->ice_oneway();
+ }
+ else if(newObj->ice_isBatchDatagram())
+ {
+ // Use Datagram in case of Batch Datagram
+ newObj = newObj->ice_datagram();
+ }
+
if(reliability == "ordered")
{
if(!newObj->ice_isTwoway())
{
throw BadQoS("ordered reliability requires a twoway proxy");
}
- subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 1, newObj);
+ subscriber = make_shared<SubscriberTwoway>(instance, rec, proxy, retryCount, 1, newObj);
}
else if(newObj->ice_isOneway() || newObj->ice_isDatagram())
{
- subscriber = new SubscriberOneway(instance, rec, proxy, retryCount, newObj);
- }
- else if(newObj->ice_isBatchOneway() || newObj->ice_isBatchDatagram())
- {
- subscriber = new SubscriberBatch(instance, rec, proxy, retryCount, newObj);
+ subscriber = make_shared<SubscriberOneway>(instance, rec, proxy, retryCount, newObj);
}
else //if(newObj->ice_isTwoway())
{
assert(newObj->ice_isTwoway());
- subscriber = new SubscriberTwoway(instance, rec, proxy, retryCount, 5, newObj);
+ subscriber = make_shared<SubscriberTwoway>(instance, rec, proxy, retryCount, 5, newObj);
}
per->setSubscriber(subscriber);
}
@@ -631,7 +507,7 @@ Subscriber::create(
}
}
-Ice::ObjectPrx
+shared_ptr<Ice::ObjectPrx>
Subscriber::proxy() const
{
return _proxyReplica;
@@ -652,7 +528,7 @@ Subscriber::record() const
bool
Subscriber::queue(bool forwarded, const EventDataSeq& events)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
// If this is a link subscriber if the set of events were
// forwarded from another IceStorm instance then do not queue the
@@ -666,7 +542,7 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events)
{
case SubscriberStateOffline:
{
- if(IceUtil::Time::now(IceUtil::Time::Monotonic) < _next)
+ if(chrono::steady_clock::now() < _next)
{
break;
}
@@ -686,7 +562,7 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events)
{
if(_instance->sendQueueSizeMaxPolicy() == Instance::RemoveSubscriber)
{
- error(false, IceStorm::SendQueueSizeMaxReached(__FILE__, __LINE__));
+ error(false, make_exception_ptr(SendQueueSizeMaxReached(__FILE__, __LINE__)));
return false;
}
else // DropEvents
@@ -717,7 +593,7 @@ Subscriber::queue(bool forwarded, const EventDataSeq& events)
bool
Subscriber::reap()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
assert(_state >= SubscriberStateError);
if(_state == SubscriberStateError)
{
@@ -730,7 +606,8 @@ Subscriber::reap()
void
Subscriber::resetIfReaped()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
+
if(_state == SubscriberStateReaped)
{
setState(SubscriberStateError);
@@ -740,7 +617,8 @@ Subscriber::resetIfReaped()
bool
Subscriber::errored() const
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
+
return _state >= SubscriberStateError;
}
@@ -766,14 +644,43 @@ Subscriber::destroy()
}
}
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
_observer.detach();
}
void
-Subscriber::error(bool dec, const Ice::Exception& e)
+Subscriber::completed()
+{
+ lock_guard<recursive_mutex> lg(_mutex);
+
+ // Decrement the _outstanding count.
+ --_outstanding;
+ assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
+ if(_observer)
+ {
+ _observer->delivered(_outstandingCount);
+ }
+
+ //
+ // A successful response means we're no longer retrying, we're
+ // back active.
+ //
+ _currentRetry = 0;
+
+ if(_events.empty() && _outstanding == 0 && _shutdown)
+ {
+ _condVar.notify_one();
+ }
+ else
+ {
+ flush();
+ }
+}
+
+void
+Subscriber::error(bool dec, exception_ptr e)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
if(dec)
{
@@ -790,16 +697,38 @@ Subscriber::error(bool dec, const Ice::Exception& e)
{
if(_shutdown)
{
- _lock.notify();
+ _condVar.notify_one();
}
return;
}
- // A hard error is an ObjectNotExistException or
- // NotRegisteredException.
- bool hardError = dynamic_cast<const Ice::ObjectNotExistException*>(&e) ||
- dynamic_cast<const Ice::NotRegisteredException*>(&e) ||
- dynamic_cast<const IceStorm::SendQueueSizeMaxReached*>(&e);
+ // A hard error is an ObjectNotExistException, NotRegisteredException, or SendQueueSizeMaxReached
+ bool hardError;
+ string what;
+ try
+ {
+ rethrow_exception(e);
+ }
+ catch(const Ice::ObjectNotExistException& ex)
+ {
+ hardError = true;
+ what = ex.what();
+ }
+ catch(const Ice::NotRegisteredException& ex)
+ {
+ hardError = true;
+ what = ex.what();
+ }
+ catch(const SendQueueSizeMaxReached& ex)
+ {
+ hardError = true;
+ what = ex.what();
+ }
+ catch(const std::exception& ex)
+ {
+ hardError = false;
+ what = ex.what();
+ }
//
// A twoway subscriber can queue multiple send events and
@@ -807,7 +736,7 @@ Subscriber::error(bool dec, const Ice::Exception& e)
// replies if we're retrying and its not yet time to process the
// next request.
//
- IceUtil::Time now = IceUtil::Time::now(IceUtil::Time::Monotonic);
+ auto now = std::chrono::steady_clock::now();
if(!hardError && _state == SubscriberStateOffline && now < _next)
{
return;
@@ -822,7 +751,7 @@ Subscriber::error(bool dec, const Ice::Exception& e)
{
assert(_state < SubscriberStateError);
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(_currentRetry == 0)
{
Ice::Warning warn(traceLevels->logger);
@@ -831,8 +760,8 @@ Subscriber::error(bool dec, const Ice::Exception& e)
{
warn << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
}
- warn << " subscriber offline: " << e
- << " discarding events: " << _instance->discardInterval() << "s retryCount: " << _retryCount;
+ warn << " subscriber offline: " << what
+ << " discarding events: " << _instance->discardInterval().count() << "s retryCount: " << _retryCount;
}
else
{
@@ -844,8 +773,8 @@ Subscriber::error(bool dec, const Ice::Exception& e)
{
out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
}
- out << " subscriber offline: " << e
- << " discarding events: " << _instance->discardInterval() << "s retry: "
+ out << " subscriber offline: " << what
+ << " discarding events: " << _instance->discardInterval().count() << "s retry: "
<< _currentRetry << "/" << _retryCount;
}
}
@@ -863,7 +792,7 @@ Subscriber::error(bool dec, const Ice::Exception& e)
_events.clear();
setState(SubscriberStateError);
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->subscriber > 0)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
@@ -872,64 +801,26 @@ Subscriber::error(bool dec, const Ice::Exception& e)
{
out << " endpoints: " << IceStormInternal::describeEndpoints(_rec.obj);
}
- out << " subscriber errored out: " << e
+ out << " subscriber errored out: " << what
<< " retry: " << _currentRetry << "/" << _retryCount;
}
}
if(_shutdown && _events.empty())
{
- _lock.notify();
- }
-}
-
-void
-Subscriber::completed(const Ice::AsyncResultPtr& result)
-{
- try
- {
- result->throwLocalException();
-
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
- // Decrement the _outstanding count.
- --_outstanding;
- assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
- if(_observer)
- {
- _observer->delivered(_outstandingCount);
- }
-
- //
- // A successful response means we're no longer retrying, we're
- // back active.
- //
- _currentRetry = 0;
-
- if(_events.empty() && _outstanding == 0 && _shutdown)
- {
- _lock.notify();
- }
- else
- {
- flush();
- }
- }
- catch(const Ice::LocalException& ex)
- {
- error(true, ex);
+ _condVar.notify_one();
}
}
void
Subscriber::shutdown()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ unique_lock<recursive_mutex> lock(_mutex);
_shutdown = true;
while(_outstanding > 0 && !_events.empty())
{
- _lock.wait();
+ _condVar.wait(lock);
}
_observer.detach();
@@ -938,7 +829,8 @@ Subscriber::shutdown()
void
Subscriber::updateObserver()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+ lock_guard<recursive_mutex> lg(_mutex);
+
if(_instance->observer())
{
_observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
@@ -951,18 +843,17 @@ Subscriber::updateObserver()
}
}
-Subscriber::Subscriber(
- const InstancePtr& instance,
- const SubscriberRecord& rec,
- const Ice::ObjectPrx& proxy,
- int retryCount,
- int maxOutstanding) :
- _instance(instance),
- _rec(rec),
+Subscriber::Subscriber(shared_ptr<Instance> instance,
+ SubscriberRecord rec,
+ shared_ptr<Ice::ObjectPrx> proxy,
+ int retryCount,
+ int maxOutstanding) :
+ _instance(move(instance)),
+ _rec(move(rec)),
_retryCount(retryCount),
_maxOutstanding(maxOutstanding),
- _proxy(proxy),
- _proxyReplica(proxy),
+ _proxy(move(proxy)),
+ _proxyReplica(_proxy),
_shutdown(false),
_state(SubscriberStateOnline),
_outstanding(0),
@@ -971,17 +862,17 @@ Subscriber::Subscriber(
{
if(_proxy && _instance->publisherReplicaProxy())
{
- const_cast<Ice::ObjectPrx&>(_proxyReplica) =
+ const_cast<shared_ptr<Ice::ObjectPrx>&>(_proxyReplica) =
_instance->publisherReplicaProxy()->ice_identity(_proxy->ice_getIdentity());
}
if(_instance->observer())
{
_observer.attach(_instance->observer()->getSubscriberObserver(_instance->serviceName(),
- rec.topicName,
- rec.obj,
- rec.theQoS,
- rec.theTopic,
+ _rec.topicName,
+ _rec.obj,
+ _rec.theQoS,
+ _rec.theTopic,
toSubscriberState(_state),
0));
}
@@ -1015,7 +906,7 @@ Subscriber::setState(Subscriber::SubscriberState state)
{
if(state != _state)
{
- TraceLevelsPtr traceLevels = _instance->traceLevels();
+ auto traceLevels = _instance->traceLevels();
if(traceLevels->subscriber > 1)
{
Ice::Trace out(traceLevels->logger, traceLevels->subscriberCat);
@@ -1038,7 +929,7 @@ Subscriber::setState(Subscriber::SubscriberState state)
}
bool
-IceStorm::operator==(const SubscriberPtr& subscriber, const Ice::Identity& id)
+IceStorm::operator==(const shared_ptr<Subscriber>& subscriber, const Ice::Identity& id)
{
return subscriber->id() == id;
}