summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/OutgoingAsync.cpp
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp215
1 files changed, 69 insertions, 146 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index fb470bceaf2..5a600e8fce2 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -11,7 +11,7 @@
#include <Ice/OutgoingAsync.h>
#include <Ice/Object.h>
#include <Ice/ConnectionI.h>
-#include <Ice/RequestHandler.h>
+#include <Ice/CollocatedRequestHandler.h>
#include <Ice/Reference.h>
#include <Ice/Instance.h>
#include <Ice/LocalException.h>
@@ -20,7 +20,6 @@
#include <Ice/LocatorInfo.h>
#include <Ice/ProxyFactory.h>
#include <Ice/RouterInfo.h>
-#include <Ice/Outgoing.h> // For LocalExceptionWrapper.
#include <Ice/Protocol.h>
#include <Ice/ReplyStatus.h>
#include <Ice/ImplicitContextI.h>
@@ -52,10 +51,8 @@ class AsynchronousException : public DispatchWorkItem
{
public:
- AsynchronousException(const IceInternal::InstancePtr& instance,
- const Ice::AsyncResultPtr& result,
- const Ice::Exception& ex) :
- DispatchWorkItem(instance), _result(result), _exception(ex.ice_clone())
+ AsynchronousException(const Ice::AsyncResultPtr& result, const Ice::Exception& ex) :
+ _result(result), _exception(ex.ice_clone())
{
}
@@ -75,8 +72,7 @@ class AsynchronousSent : public DispatchWorkItem
{
public:
- AsynchronousSent(const IceInternal::InstancePtr& instance, const Ice::AsyncResultPtr& result) :
- DispatchWorkItem(instance), _result(result)
+ AsynchronousSent(const Ice::AsyncResultPtr& result) : _result(result)
{
}
@@ -95,10 +91,8 @@ class AsynchronousTimeout : public DispatchWorkItem
{
public:
- AsynchronousTimeout(const IceInternal::InstancePtr& instance,
- const IceInternal::RequestHandlerPtr& handler,
- const Ice::AsyncResultPtr& result) :
- DispatchWorkItem(instance), _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result))
+ AsynchronousTimeout(const IceInternal::RequestHandlerPtr& handler, const Ice::AsyncResultPtr& result) :
+ _handler(handler), _outAsync(OutgoingAsyncMessageCallbackPtr::dynamicCast(result))
{
assert(_outAsync);
}
@@ -144,18 +138,6 @@ Ice::AsyncResult::~AsyncResult()
{
}
-bool
-Ice::AsyncResult::operator==(const AsyncResult& r) const
-{
- return this == &r;
-}
-
-bool
-Ice::AsyncResult::operator<(const AsyncResult& r) const
-{
- return this < &r;
-}
-
Int
Ice::AsyncResult::getHash() const
{
@@ -286,7 +268,7 @@ Ice::AsyncResult::__invokeSentAsync()
//
try
{
- _instance->clientThreadPool()->execute(new AsynchronousSent(_instance, this));
+ _instance->clientThreadPool()->execute(new AsynchronousSent(this));
}
catch(const Ice::CommunicatorDestroyedException&)
{
@@ -318,7 +300,7 @@ Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex)
// CommunicatorDestroyedException is the only exception that can propagate directly
// from this method.
//
- _instance->clientThreadPool()->execute(new AsynchronousException(_instance, this, ex));
+ _instance->clientThreadPool()->execute(new AsynchronousException(this, ex));
}
void
@@ -360,7 +342,7 @@ Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask()
if(handler)
{
- _instance->clientThreadPool()->execute(new AsynchronousTimeout(_instance, handler, this));
+ _instance->clientThreadPool()->execute(new AsynchronousTimeout(handler, this));
}
}
@@ -457,7 +439,7 @@ IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
void
IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context)
{
- _delegate = 0;
+ _handler = 0;
_cnt = 0;
_mode = mode;
_sentSynchronously = false;
@@ -528,6 +510,12 @@ IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool c
return connection->sendAsyncRequest(this, compress, response);
}
+AsyncStatus
+IceInternal::OutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler)
+{
+ return handler->invokeAsyncRequest(this);
+}
+
bool
IceInternal::OutgoingAsync::__sent()
{
@@ -537,7 +525,7 @@ IceInternal::OutgoingAsync::__sent()
_state |= Sent;
assert(!(_state & Done));
- if(!_proxy->ice_isTwoway())
+ if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
{
_remoteObserver.detach();
if(!_callback || !_callback->__hasSentCallback())
@@ -550,7 +538,7 @@ IceInternal::OutgoingAsync::__sent()
_timeoutRequestHandler = 0;
}
_state |= Done | OK;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization.
}
_monitor.notifyAll();
return !alreadySent && _callback && _callback->__hasSentCallback();
@@ -563,7 +551,7 @@ IceInternal::OutgoingAsync::__invokeSent()
}
void
-IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent)
+IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc, bool sent)
{
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -581,51 +569,16 @@ IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent
// NOTE: at this point, synchronization isn't needed, no other threads should be
// calling on the callback.
//
-
try
{
- int interval = handleException(exc, sent); // This will throw if the invocation can't be retried.
- if(interval > 0)
- {
- _instance->retryQueue()->add(this, interval);
- }
- else
+ if(!handleException(exc, sent)) // This will throw if the invocation can't be retried.
{
- __invoke(false);
+ return; // Can't be retried immediately.
}
- }
- catch(const Ice::LocalException& ex)
- {
- __invokeException(ex);
- }
-}
-
-void
-IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
-{
- //
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback. The LocalExceptionWrapper exception is only called
- // before the invocation is sent.
- //
- _remoteObserver.failed(exc.get()->ice_name());
- _remoteObserver.detach();
- assert(!_timeoutRequestHandler);
-
- try
- {
- int interval = handleException(exc); // This will throw if the invocation can't be retried.
- if(interval > 0)
- {
- _instance->retryQueue()->add(this, interval);
- }
- else
- {
- __invoke(false);
- }
+ __invoke(false); // Retry the invocation
}
- catch(const Ice::LocalException& ex)
+ catch(const Ice::Exception& ex)
{
__invokeException(ex);
}
@@ -796,12 +749,10 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous)
{
while(true)
{
- int interval = 0;
try
{
- _delegate = _proxy->__getDelegate(true);
- RequestHandlerPtr handler = _delegate->__getRequestHandler();
- AsyncStatus status = handler->sendAsyncRequest(this);
+ _handler = _proxy->__getRequestHandler(true);
+ AsyncStatus status = _handler->sendAsyncRequest(this);
if(status & AsyncStatusSent)
{
if(synchronous)
@@ -826,92 +777,53 @@ IceInternal::OutgoingAsync::__invoke(bool synchronous)
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
if(!(_state & Done))
{
- int invocationTimeout = handler->getReference()->getInvocationTimeout();
+ int invocationTimeout = _handler->getReference()->getInvocationTimeout();
if(invocationTimeout > 0)
{
_instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout));
- _timeoutRequestHandler = handler;
+ _timeoutRequestHandler = _handler;
}
}
}
break;
}
- catch(const LocalExceptionWrapper& ex)
+ catch(const RetryException&)
{
- interval = handleException(ex);
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
}
- catch(const Ice::LocalException& ex)
+ catch(const Ice::Exception& ex)
{
- interval = handleException(ex, false);
- }
-
- if(interval > 0)
- {
- _instance->retryQueue()->add(this, interval);
- return false;
+ if(!handleException(ex, false)) // This will throw if the invocation can't be retried.
+ {
+ break; // Can't be retried immediately.
+ }
}
}
return _sentSynchronously;
}
-int
-IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex)
-{
- if(_mode == Nonmutating || _mode == Idempotent)
- {
- return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt, _observer);
- }
- else
- {
- return _proxy->__handleExceptionWrapper(_delegate, ex, _observer);
- }
-}
-
-int
-IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool sent)
+bool
+IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc, bool sent)
{
try
{
- //
- // A CloseConnectionException indicates graceful server shutdown, and is therefore
- // always repeatable without violating "at-most-once". That's because by sending a
- // close connection message, the server guarantees that all outstanding requests
- // can safely be repeated.
- //
- // An ObjectNotExistException can always be retried as well without violating
- // "at-most-once" (see the implementation of the checkRetryAfterException method of
- // the ProxyFactory class for the reasons why it can be useful).
- //
- if(!sent ||
- dynamic_cast<const CloseConnectionException*>(&exc) ||
- dynamic_cast<const ObjectNotExistException*>(&exc))
- {
- exc.ice_throw();
- }
-
- //
- // Throw the exception wrapped in a LocalExceptionWrapper, to indicate that the
- // request cannot be resent without potentially violating the "at-most-once"
- // principle.
- //
- throw LocalExceptionWrapper(exc, false);
- }
- catch(const LocalExceptionWrapper& ex)
- {
- if(_mode == Nonmutating || _mode == Idempotent)
+ int interval = _proxy->__handleException(exc, _handler, _mode, sent, _cnt);
+ _observer.retried(); // Invocation is being retried.
+ if(interval > 0)
{
- return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt, _observer);
+ _instance->retryQueue()->add(this, interval);
+ return false; // Don't retry immediately, the retry queue will take care of the retry.
}
else
{
- return _proxy->__handleExceptionWrapper(_delegate, ex, _observer);
+ return true; // Retry immediately.
}
}
- catch(const Ice::LocalException& ex)
+ catch(const Ice::Exception& ex)
{
- return _proxy->__handleException(_delegate, ex, false, _cnt, _observer);
+ _observer.failed(ex.ice_name());
+ throw;
}
- return 0; // Keep the compiler happy.
}
IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator,
@@ -929,13 +841,19 @@ IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, b
return connection->flushAsyncBatchRequests(this);
}
+AsyncStatus
+IceInternal::BatchOutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler)
+{
+ return handler->invokeAsyncBatchRequests(this);
+}
+
bool
IceInternal::BatchOutgoingAsync::__sent()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(!_exception.get());
_state |= Done | OK | Sent;
- _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
+ //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization.
_remoteObserver.detach();
if(_timeoutRequestHandler)
{
@@ -958,7 +876,7 @@ IceInternal::BatchOutgoingAsync::__invokeSent()
}
void
-IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool)
+IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc, bool)
{
_remoteObserver.failed(exc.ice_name());
_remoteObserver.detach();
@@ -985,16 +903,10 @@ IceInternal::ProxyBatchOutgoingAsync::__invoke()
{
checkSupportedProtocol(_proxy->__reference()->getProtocol());
- //
- // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
- // requests were queued with the connection, they would be lost without being noticed.
- //
- Handle<IceDelegate::Ice::Object> delegate;
- int cnt = -1; // Don't retry.
+ RequestHandlerPtr handler;
try
{
- delegate = _proxy->__getDelegate(true);
- RequestHandlerPtr handler = delegate->__getRequestHandler();
+ handler = _proxy->__getRequestHandler(true);
AsyncStatus status = handler->sendAsyncRequest(this);
if(status & AsyncStatusSent)
{
@@ -1018,9 +930,20 @@ IceInternal::ProxyBatchOutgoingAsync::__invoke()
}
}
}
- catch(const ::Ice::LocalException& ex)
+ catch(const RetryException&)
+ {
+ //
+ // Clear request handler but don't retry or throw. Retrying
+ // isn't useful, there were no batch requests associated with
+ // the proxy's request handler.
+ //
+ _proxy->__setRequestHandler(handler, 0);
+ }
+ catch(const Ice::Exception& ex)
{
- _proxy->__handleException(delegate, ex, 0, cnt, _observer);
+ _observer.failed(ex.ice_name());
+ _proxy->__setRequestHandler(handler, 0); // Clear request handler
+ throw; // Throw to notify the user that batch requests were potentially lost.
}
}
@@ -1107,7 +1030,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
using BatchOutgoingAsync::__sent;
#endif
- virtual void __finished(const Ice::LocalException& ex, bool)
+ virtual void __finished(const Ice::Exception& ex, bool)
{
_remoteObserver.failed(ex.ice_name());
_remoteObserver.detach();