summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp210
1 files changed, 134 insertions, 76 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 891278f7041..fb470bceaf2 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -62,7 +62,7 @@ public:
virtual void
run()
{
- _result->__exception(*_exception.get());
+ _result->__invokeException(*_exception.get());
}
private:
@@ -83,7 +83,7 @@ public:
virtual void
run()
{
- _result->__sent();
+ _result->__invokeSent();
}
private:
@@ -91,6 +91,30 @@ private:
const Ice::AsyncResultPtr _result;
};
+class AsynchronousTimeout : public DispatchWorkItem
+{
+public:
+
+ AsynchronousTimeout(const IceInternal::InstancePtr& instance,
+ const IceInternal::RequestHandlerPtr& handler,
+ const Ice::AsyncResultPtr& result) :
+ DispatchWorkItem(instance), _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result))
+ {
+ assert(_outAsync);
+ }
+
+ virtual void
+ run()
+ {
+ _handler->asyncRequestTimedOut(_outAsync);
+ }
+
+private:
+
+ IceInternal::RequestHandlerPtr _handler;
+ IceInternal::OutgoingAsyncMessageCallbackPtr _outAsync;
+};
+
};
Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator,
@@ -218,7 +242,7 @@ Ice::AsyncResult::__throwUserException()
}
void
-Ice::AsyncResult::__sent()
+Ice::AsyncResult::__invokeSent()
{
//
// Note: no need to change the _state here, specializations are responsible for
@@ -253,7 +277,7 @@ Ice::AsyncResult::__sent()
}
void
-Ice::AsyncResult::__sentAsync()
+Ice::AsyncResult::__invokeSentAsync()
{
//
// This is called when it's not safe to call the sent callback synchronously
@@ -270,7 +294,7 @@ Ice::AsyncResult::__sentAsync()
}
void
-Ice::AsyncResult::__exception(const Ice::Exception& ex)
+Ice::AsyncResult::__invokeException(const Ice::Exception& ex)
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -280,28 +304,11 @@ Ice::AsyncResult::__exception(const Ice::Exception& ex)
_monitor.notifyAll();
}
- if(_callback)
- {
- try
- {
- AsyncResultPtr self(this);
- _callback->__completed(self);
- }
- catch(const std::exception& ex)
- {
- __warning(ex);
- }
- catch(...)
- {
- __warning();
- }
- }
-
- _observer.detach();
+ __invokeCompleted();
}
void
-Ice::AsyncResult::__exceptionAsync(const Ice::Exception& ex)
+Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex)
{
//
// This is called when it's not safe to call the exception callback synchronously
@@ -315,7 +322,7 @@ Ice::AsyncResult::__exceptionAsync(const Ice::Exception& ex)
}
void
-Ice::AsyncResult::__response()
+Ice::AsyncResult::__invokeCompleted()
{
//
// Note: no need to change the _state here, specializations are responsible for
@@ -343,6 +350,22 @@ Ice::AsyncResult::__response()
}
void
+Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask()
+{
+ RequestHandlerPtr handler;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ swap(handler, _timeoutRequestHandler);
+ }
+
+ if(handler)
+ {
+ _instance->clientThreadPool()->execute(new AsynchronousTimeout(_instance, handler, this));
+ }
+}
+
+
+void
Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation)
{
__check(r, operation);
@@ -499,8 +522,14 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod
}
}
+AsyncStatus
+IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
+{
+ return connection->sendAsyncRequest(this, compress, response);
+}
+
bool
-IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
+IceInternal::OutgoingAsync::__sent()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -515,24 +544,22 @@ IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
{
_observer.detach();
}
+ if(_timeoutRequestHandler)
+ {
+ _instance->timer()->cancel(this);
+ _timeoutRequestHandler = 0;
+ }
_state |= Done | OK;
_os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
}
- else if(connection->timeout() > 0)
- {
- assert(!_timerTaskConnection);
- _timerTaskConnection = connection;
- IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
- _instance->timer()->schedule(this, timeout);
- }
_monitor.notifyAll();
return !alreadySent && _callback && _callback->__hasSentCallback();
}
void
-IceInternal::OutgoingAsync::__sent()
+IceInternal::OutgoingAsync::__invokeSent()
{
- ::Ice::AsyncResult::__sent();
+ ::Ice::AsyncResult::__invokeSent();
}
void
@@ -543,10 +570,10 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent
assert(!(_state & Done));
_remoteObserver.failed(exc.ice_name());
_remoteObserver.detach();
- if(_timerTaskConnection)
+ if(_timeoutRequestHandler)
{
_instance->timer()->cancel(this);
- _timerTaskConnection = 0;
+ _timeoutRequestHandler = 0;
}
}
@@ -564,12 +591,12 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent
}
else
{
- __send(false);
+ __invoke(false);
}
}
catch(const Ice::LocalException& ex)
{
- __exception(ex);
+ __invokeException(ex);
}
}
@@ -584,6 +611,8 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
_remoteObserver.failed(exc.get()->ice_name());
_remoteObserver.detach();
+ assert(!_timeoutRequestHandler);
+
try
{
int interval = handleException(exc); // This will throw if the invocation can't be retried.
@@ -593,12 +622,12 @@ IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
}
else
{
- __send(false);
+ __invoke(false);
}
}
catch(const Ice::LocalException& ex)
{
- __exception(ex);
+ __invokeException(ex);
}
}
@@ -620,10 +649,10 @@ IceInternal::OutgoingAsync::__finished()
}
_remoteObserver.detach();
- if(_timerTaskConnection)
+ if(_timeoutRequestHandler)
{
_instance->timer()->cancel(this);
- _timerTaskConnection = 0;
+ _timeoutRequestHandler = 0;
}
_is.read(replyStatus);
@@ -759,11 +788,11 @@ IceInternal::OutgoingAsync::__finished()
}
assert(replyStatus == replyOK || replyStatus == replyUserException);
- __response();
+ __invokeCompleted();
}
bool
-IceInternal::OutgoingAsync::__send(bool synchronous)
+IceInternal::OutgoingAsync::__invoke(bool synchronous)
{
while(true)
{
@@ -771,7 +800,8 @@ IceInternal::OutgoingAsync::__send(bool synchronous)
try
{
_delegate = _proxy->__getDelegate(true);
- AsyncStatus status = _delegate->__getRequestHandler()->sendAsyncRequest(this);
+ RequestHandlerPtr handler = _delegate->__getRequestHandler();
+ AsyncStatus status = handler->sendAsyncRequest(this);
if(status & AsyncStatusSent)
{
if(synchronous)
@@ -779,14 +809,28 @@ IceInternal::OutgoingAsync::__send(bool synchronous)
_sentSynchronously = true;
if(status & AsyncStatusInvokeSentCallback)
{
- __sent(); // Call the sent callback from the user thread.
+ __invokeSent(); // Call the sent callback from the user thread.
}
}
else
{
if(status & AsyncStatusInvokeSentCallback)
{
- __sentAsync(); // Call the sent callback from a client thread pool thread.
+ __invokeSentAsync(); // Call the sent callback from a client thread pool thread.
+ }
+ }
+ }
+
+ if(_proxy->ice_isTwoway() || !(status & AsyncStatusSent))
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(!(_state & Done))
+ {
+ int invocationTimeout = handler->getReference()->getInvocationTimeout();
+ if(invocationTimeout > 0)
+ {
+ _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
+ _timeoutRequestHandler = handler;
}
}
}
@@ -870,22 +914,6 @@ IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool
return 0; // Keep the compiler happy.
}
-void
-IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTimerTask()
-{
- Ice::ConnectionIPtr connection;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- connection = _timerTaskConnection;
- _timerTaskConnection = 0;
- }
-
- if(connection)
- {
- connection->exception(Ice::TimeoutException(__FILE__, __LINE__));
- }
-}
-
IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator,
const InstancePtr& instance,
const std::string& operation,
@@ -895,14 +923,25 @@ IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& commu
{
}
+AsyncStatus
+IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool)
+{
+ return connection->flushAsyncBatchRequests(this);
+}
+
bool
-IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* /*connection*/)
+IceInternal::BatchOutgoingAsync::__sent()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get());
_state |= Done | OK | Sent;
_os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
_remoteObserver.detach();
+ if(_timeoutRequestHandler)
+ {
+ _instance->timer()->cancel(this);
+ _timeoutRequestHandler = 0;
+ }
_monitor.notifyAll();
if(!_callback || !_callback->__hasSentCallback())
{
@@ -913,9 +952,9 @@ IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* /*connection*/)
}
void
-IceInternal::BatchOutgoingAsync::__sent()
+IceInternal::BatchOutgoingAsync::__invokeSent()
{
- ::Ice::AsyncResult::__sent();
+ ::Ice::AsyncResult::__invokeSent();
}
void
@@ -923,7 +962,12 @@ IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool
{
_remoteObserver.failed(exc.ice_name());
_remoteObserver.detach();
- __exception(exc);
+ if(_timeoutRequestHandler)
+ {
+ _instance->timer()->cancel(this);
+ _timeoutRequestHandler = 0;
+ }
+ __invokeException(exc);
}
IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy,
@@ -937,7 +981,7 @@ IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectP
}
void
-IceInternal::ProxyBatchOutgoingAsync::__send()
+IceInternal::ProxyBatchOutgoingAsync::__invoke()
{
checkSupportedProtocol(_proxy->__reference()->getProtocol());
@@ -950,13 +994,27 @@ IceInternal::ProxyBatchOutgoingAsync::__send()
try
{
delegate = _proxy->__getDelegate(true);
- AsyncStatus status = delegate->__getRequestHandler()->flushAsyncBatchRequests(this);
+ RequestHandlerPtr handler = delegate->__getRequestHandler();
+ AsyncStatus status = handler->sendAsyncRequest(this);
if(status & AsyncStatusSent)
{
_sentSynchronously = true;
if(status & AsyncStatusInvokeSentCallback)
{
- __sent();
+ __invokeSent();
+ }
+ }
+ else
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(!(_state & Done))
+ {
+ int invocationTimeout = handler->getReference()->getInvocationTimeout();
+ if(invocationTimeout > 0)
+ {
+ _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
+ _timeoutRequestHandler = handler;
+ }
}
}
}
@@ -979,7 +1037,7 @@ IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const Co
}
void
-IceInternal::ConnectionBatchOutgoingAsync::__send()
+IceInternal::ConnectionBatchOutgoingAsync::__invoke()
{
AsyncStatus status = _connection->flushAsyncBatchRequests(this);
if(status & AsyncStatusSent)
@@ -987,7 +1045,7 @@ IceInternal::ConnectionBatchOutgoingAsync::__send()
_sentSynchronously = true;
if(status & AsyncStatusInvokeSentCallback)
{
- __sent();
+ __invokeSent();
}
}
}
@@ -1038,7 +1096,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
{
}
- virtual bool __sent(Ice::ConnectionI*)
+ virtual bool __sent()
{
_remoteObserver.detach();
_outAsync->check(false);
@@ -1120,11 +1178,11 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
//
if(!_sentSynchronously || !userThread)
{
- __sentAsync();
+ __invokeSentAsync();
}
else
{
- AsyncResult::__sent();
+ AsyncResult::__invokeSent();
}
}
}