summaryrefslogtreecommitdiff
path: root/cpp/src/IceStorm/Subscriber.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/IceStorm/Subscriber.cpp')
-rwxr-xr-xcpp/src/IceStorm/Subscriber.cpp184
1 files changed, 55 insertions, 129 deletions
diff --git a/cpp/src/IceStorm/Subscriber.cpp b/cpp/src/IceStorm/Subscriber.cpp
index b854ff2a835..74b15583cc1 100755
--- a/cpp/src/IceStorm/Subscriber.cpp
+++ b/cpp/src/IceStorm/Subscriber.cpp
@@ -93,6 +93,11 @@ public:
virtual void flush();
+ void exception(const Ice::Exception& ex)
+ {
+ error(false, ex);
+ }
+
void doFlush();
private:
@@ -111,7 +116,11 @@ public:
virtual void flush();
- void sent();
+ void exception(const Ice::Exception& ex)
+ {
+ error(true, ex);
+ }
+ void sent(bool);
private:
@@ -146,84 +155,6 @@ private:
const TopicLinkPrx _obj;
};
-class OnewayIceInvokeI : public Ice::AMI_Object_ice_invoke, public Ice::AMISentCallback
-{
-public:
-
- OnewayIceInvokeI(const SubscriberOnewayPtr& subscriber) :
- _subscriber(subscriber)
- {
- }
-
- virtual void
- ice_response(bool, const std::vector<Ice::Byte>&)
- {
- assert(false);
- }
-
- virtual void
- ice_sent()
- {
- _subscriber->sent();
- }
-
- virtual void
- ice_exception(const Ice::Exception& e)
- {
- _subscriber->error(true, e);
- }
-
-private:
-
- const SubscriberOnewayPtr _subscriber;
-};
-
-class IceInvokeI : public Ice::AMI_Object_ice_invoke
-{
-public:
-
- IceInvokeI(const SubscriberPtr& subscriber) :
- _subscriber(subscriber)
- {
- }
-
- virtual void
- ice_response(bool, const std::vector<Ice::Byte>&)
- {
- _subscriber->response();
- }
-
- virtual void
- ice_exception(const Ice::Exception& e)
- {
- _subscriber->error(true, e);
- }
-
-private:
-
- const SubscriberPtr _subscriber;
-};
-
-class FlushBatchI : public Ice::AMI_Object_ice_flushBatchRequests
-{
-public:
-
- FlushBatchI(const SubscriberPtr& subscriber) :
- _subscriber(subscriber)
- {
- }
-
- virtual void
- ice_exception(const Ice::Exception& e)
- {
- _subscriber->error(false, e);
- }
-
-private:
-
- const SubscriberPtr _subscriber;
-};
-
class FlushTimerTask : public IceUtil::TimerTask
{
public:
@@ -313,7 +244,8 @@ SubscriberBatch::doFlush()
return;
}
- _obj->ice_flushBatchRequests_async(new FlushBatchI(this));
+ _obj->begin_ice_flushBatchRequests(Ice::newCallback_Object_ice_flushBatchRequests(this,
+ &SubscriberBatch::exception));
// This is significantly faster than the async version, but it can
// block the calling thread. Bad news!
@@ -361,7 +293,11 @@ SubscriberOneway::flush()
_events.erase(_events.begin());
try
{
- if(!_obj->ice_invoke_async(new OnewayIceInvokeI(this), e->op, e->mode, e->data, e->context))
+ Ice::AsyncResultPtr result = _obj->begin_ice_invoke(
+ e->op, e->mode, e->data, e->context, Ice::newCallback_Object_ice_invoke(this,
+ &SubscriberOneway::exception,
+ &SubscriberOneway::sent));
+ if(!result->sentSynchronously())
{
++_outstanding;
}
@@ -380,8 +316,13 @@ SubscriberOneway::flush()
}
void
-SubscriberOneway::sent()
+SubscriberOneway::sent(bool sentSynchronously)
{
+ if(sentSynchronously)
+ {
+ return;
+ }
+
IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
// Decrement the _outstanding count.
@@ -436,7 +377,8 @@ SubscriberTwoway::flush()
try
{
- _obj->ice_invoke_async(new IceInvokeI(this), e->op, e->mode, e->data, e->context);
+ _obj->begin_ice_invoke(e->op, e->mode, e->data, e->context,
+ Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
}
catch(const Ice::Exception& ex)
{
@@ -449,34 +391,6 @@ SubscriberTwoway::flush()
namespace
{
-class Topiclink_forwardI : public IceStorm::AMI_TopicLink_forward
-{
-public:
-
- Topiclink_forwardI(const SubscriberPtr& subscriber) :
- _subscriber(subscriber)
- {
- }
-
- virtual void
- ice_response()
- {
- _subscriber->response();
- }
-
- virtual void
- ice_exception(const Ice::Exception& e)
- {
- _subscriber->error(true, e);
- }
-
-private:
-
- const SubscriberPtr _subscriber;
-};
-
-}
-
SubscriberLink::SubscriberLink(
const InstancePtr& instance,
const SubscriberRecord& rec) :
@@ -523,7 +437,7 @@ SubscriberLink::flush()
try
{
++_outstanding;
- _obj->forward_async(new Topiclink_forwardI(this), v);
+ _obj->begin_forward(v, Ice::newCallback(static_cast<Subscriber*>(this), &Subscriber::completed));
}
catch(const Ice::Exception& ex)
{
@@ -532,6 +446,8 @@ SubscriberLink::flush()
}
}
+}
+
SubscriberPtr
Subscriber::create(
const InstancePtr& instance,
@@ -850,30 +766,40 @@ Subscriber::error(bool dec, const Ice::Exception& e)
}
void
-Subscriber::response()
+Subscriber::completed(const Ice::AsyncResultPtr& result)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
-
- // Decrement the _outstanding count.
- --_outstanding;
- assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
-
- //
- // A successful response means we're no longer retrying, we're
- // back active.
- //
- _currentRetry = 0;
-
- if(_events.empty() && _outstanding == 0 && _shutdown)
+ try
{
- _lock.notify();
+ result->throwLocalException();
+
+ IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_lock);
+
+ // Decrement the _outstanding count.
+ --_outstanding;
+ assert(_outstanding >= 0 && _outstanding < _maxOutstanding);
+
+ //
+ // A successful response means we're no longer retrying, we're
+ // back active.
+ //
+ _currentRetry = 0;
+
+ if(_events.empty() && _outstanding == 0 && _shutdown)
+ {
+ _lock.notify();
+ }
+ else
+ {
+ flush();
+ }
}
- else
+ catch(const Ice::LocalException& ex)
{
- flush();
+ error(true, ex);
}
}
+
void
Subscriber::shutdown()
{