summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-07-15 10:22:40 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-07-15 10:22:40 +0200
commit710a9221852d6c92b1727a429a33b38f1f949352 (patch)
tree6bc9ac9ed04a6b1858d8fc30282d4f18ef04abbb /cpp/src/Ice/OutgoingAsync.cpp
parent- Fix for ICE-5578 - Python build failure (diff)
downloadice-710a9221852d6c92b1727a429a33b38f1f949352.tar.bz2
ice-710a9221852d6c92b1727a429a33b38f1f949352.tar.xz
ice-710a9221852d6c92b1727a429a33b38f1f949352.zip
Fixed collocation optimization to use the dispatcher, minor test fixes
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp99
1 files changed, 51 insertions, 48 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 8b12805ba4f..e8aadb6fb38 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -51,7 +51,8 @@ class AsynchronousException : public DispatchWorkItem
{
public:
- AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result, const Ice::Exception& ex) :
+ AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result,
+ const Ice::Exception& ex) :
DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone())
{
}
@@ -88,28 +89,6 @@ private:
const Ice::AsyncResultPtr _result;
};
-class AsynchronousTimeout : public DispatchWorkItem
-{
-public:
-
- AsynchronousTimeout(const Ice::ConnectionPtr& connection, const IceInternal::RequestHandlerPtr& handler, const Ice::AsyncResultPtr& result) :
- DispatchWorkItem(connection), _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result))
- {
- assert(_outAsync);
- }
-
- virtual void
- run()
- {
- _handler->asyncRequestTimedOut(_outAsync);
- }
-
-private:
-
- IceInternal::RequestHandlerPtr _handler;
- IceInternal::OutgoingAsyncMessageCallbackPtr _outAsync;
-};
-
};
Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator,
@@ -269,7 +248,7 @@ Ice::AsyncResult::__invokeSentAsync()
//
try
{
- _instance->clientThreadPool()->execute(new AsynchronousSent(_cachedConnection, this));
+ _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this));
}
catch(const Ice::CommunicatorDestroyedException&)
{
@@ -301,7 +280,7 @@ Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex)
// CommunicatorDestroyedException is the only exception that can propagate directly
// from this method.
//
- _instance->clientThreadPool()->execute(new AsynchronousException(_cachedConnection, this, ex));
+ _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, this, ex));
}
void
@@ -343,16 +322,7 @@ Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask()
if(handler)
{
- Ice::ConnectionPtr connection;
- try
- {
- connection = handler->getConnection(false);
- }
- catch(const Ice::LocalException&)
- {
- // Ignore.
- }
- _instance->clientThreadPool()->execute(new AsynchronousTimeout(connection, handler, this));
+ handler->asyncRequestTimedOut(OutgoingAsyncMessageCallbackPtr::dynamicCast(this));
}
}
@@ -436,6 +406,33 @@ Ice::AsyncResult::__warning() const
}
}
+void
+IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const ThreadPoolPtr& threadPool,
+ const Ice::ConnectionPtr& connection)
+{
+ class InvocationTimeoutCall : public DispatchWorkItem
+ {
+ public:
+
+ InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) :
+ DispatchWorkItem(connection), _outAsync(outAsync)
+ {
+ }
+
+ virtual void
+ run()
+ {
+ InvocationTimeoutException ex(__FILE__, __LINE__);
+ _outAsync->__finished(ex);
+ }
+
+ private:
+
+ const OutgoingAsyncMessageCallbackPtr _outAsync;
+ };
+ threadPool->dispatch(new InvocationTimeoutCall(this, connection));
+}
+
IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
const std::string& operation,
const CallbackBasePtr& delegate,
@@ -451,6 +448,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
{
_handler = 0;
_cnt = 0;
+ _sent = false;
_mode = mode;
_sentSynchronously = false;
@@ -534,6 +532,7 @@ IceInternal::OutgoingAsync::__sent()
bool alreadySent = _state & Sent; // Expected in case of a retry.
_state |= Sent;
+ _sent = true;
assert(!(_state & Done));
if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
@@ -562,7 +561,7 @@ IceInternal::OutgoingAsync::__invokeSent()
}
void
-IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc, bool sent)
+IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc)
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -582,7 +581,7 @@ IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc, bool sent)
//
try
{
- if(!handleException(exc, sent)) // This will throw if the invocation can't be retried.
+ if(!handleException(exc)) // This will throw if the invocation can't be retried.
{
return; // Can't be retried immediately.
}
@@ -747,7 +746,7 @@ IceInternal::OutgoingAsync::__finished()
}
catch(const LocalException& ex)
{
- __finished(ex, true);
+ __finished(ex);
return;
}
@@ -762,6 +761,7 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous)
{
try
{
+ _sent = false;
_handler = _proxy->__getRequestHandler(true);
AsyncStatus status = _handler->sendAsyncRequest(this);
if(status & AsyncStatusSent)
@@ -804,7 +804,7 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous)
}
catch(const Ice::Exception& ex)
{
- if(!handleException(ex, false)) // This will throw if the invocation can't be retried.
+ if(!handleException(ex)) // This will throw if the invocation can't be retried.
{
break; // Can't be retried immediately.
}
@@ -814,11 +814,11 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous)
}
bool
-IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc, bool sent)
+IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc)
{
try
{
- int interval = _proxy->__handleException(exc, _handler, _mode, sent, _cnt);
+ int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
_observer.retried(); // Invocation is being retried.
if(interval > 0)
{
@@ -888,14 +888,17 @@ IceInternal::BatchOutgoingAsync::__invokeSent()
}
void
-IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc, bool)
+IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc)
{
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- if(_timeoutRequestHandler)
{
- _instance->timer()->cancel(this);
- _timeoutRequestHandler = 0;
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _childObserver.failed(exc.ice_name());
+ _childObserver.detach();
+ if(_timeoutRequestHandler)
+ {
+ _instance->timer()->cancel(this);
+ _timeoutRequestHandler = 0;
+ }
}
__invokeException(exc);
}
@@ -1042,7 +1045,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
using BatchOutgoingAsync::__sent;
#endif
- virtual void __finished(const Ice::Exception& ex, bool)
+ virtual void __finished(const Ice::Exception& ex)
{
_childObserver.failed(ex.ice_name());
_childObserver.detach();