summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Outgoing.cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
committerBenoit Foucher <benoit@zeroc.com>2014-06-27 10:31:41 +0200
commita4f93259dc3494d98addf38e69b87eb557d432b3 (patch)
treed2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/Outgoing.cpp
parentFix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff)
downloadice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz
ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r--cpp/src/Ice/Outgoing.cpp377
1 files changed, 181 insertions, 196 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index cb0a9ff53fb..73778c9dafd 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -9,7 +9,7 @@
#include <Ice/Outgoing.h>
#include <Ice/Object.h>
-#include <Ice/RequestHandler.h>
+#include <Ice/CollocatedRequestHandler.h>
#include <Ice/ConnectionI.h>
#include <Ice/Reference.h>
#include <Ice/Endpoint.h>
@@ -24,87 +24,20 @@ using namespace Ice;
using namespace Ice::Instrumentation;
using namespace IceInternal;
-namespace IceUtilInternal
-{
-
-extern bool ICE_DECLSPEC_IMPORT printStackTraces;
-
-}
-
-IceInternal::LocalExceptionWrapper::LocalExceptionWrapper(const LocalException& ex, bool r) :
- _retry(r)
-{
- _ex.reset(ex.ice_clone());
-}
-
-IceInternal::LocalExceptionWrapper::LocalExceptionWrapper(const LocalExceptionWrapper& ex) :
- _retry(ex._retry)
-{
- _ex.reset(ex.get()->ice_clone());
-}
-
-void
-IceInternal::LocalExceptionWrapper::throwWrapper(const std::exception& ex)
-{
-
- const UserException* ue = dynamic_cast<const UserException*>(&ex);
- if(ue)
- {
- stringstream s;
- s << *ue;
- throw LocalExceptionWrapper(UnknownUserException(__FILE__, __LINE__, s.str()), false);
- }
-
- const LocalException* le = dynamic_cast<const LocalException*>(&ex);
- if(le)
- {
- if(dynamic_cast<const UnknownException*>(le) ||
- dynamic_cast<const ObjectNotExistException*>(le) ||
- dynamic_cast<const OperationNotExistException*>(le) ||
- dynamic_cast<const FacetNotExistException*>(le))
- {
- throw LocalExceptionWrapper(*le, false);
- }
- stringstream s;
- s << *le;
- if(IceUtilInternal::printStackTraces)
- {
- s << "\n" << le->ice_stackTrace();
- }
- throw LocalExceptionWrapper(UnknownLocalException(__FILE__, __LINE__, s.str()), false);
- }
- string msg = "std::exception: ";
- throw LocalExceptionWrapper(UnknownException(__FILE__, __LINE__, msg + ex.what()), false);
-}
-
-const LocalException*
-IceInternal::LocalExceptionWrapper::get() const
-{
- assert(_ex.get());
- return _ex.get();
-}
-
-bool
-IceInternal::LocalExceptionWrapper::retry() const
-{
- return _retry;
-}
-
-IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation, OperationMode mode,
- const Context* context, InvocationObserver& observer) :
- _handler(handler),
- _exceptionWrapper(false),
- _exceptionWrapperRetry(false),
- _observer(observer),
+IceInternal::Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode,
+ const Context* context) :
+ _proxy(proxy),
+ _mode(mode),
+ _observer(proxy, operation, context),
_state(StateUnsent),
- _encoding(getCompatibleEncoding(handler->getReference()->getEncoding())),
- _is(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding),
- _os(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding),
+ _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())),
+ _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
+ _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
_sent(false)
{
- checkSupportedProtocol(getCompatibleProtocol(handler->getReference()->getProtocol()));
+ checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol()));
- switch(_handler->getReference()->getMode())
+ switch(_proxy->__reference()->getMode())
{
case Reference::ModeTwoway:
case Reference::ModeOneway:
@@ -117,25 +50,43 @@ IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation
case Reference::ModeBatchOneway:
case Reference::ModeBatchDatagram:
{
- _handler->prepareBatchRequest(&_os);
+ while(true)
+ {
+ try
+ {
+ _handler = proxy->__getRequestHandler(true);
+ _handler->prepareBatchRequest(&_os);
+ break;
+ }
+ catch(const RetryException&)
+ {
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ _observer.failed(ex.ice_name());
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler
+ throw;
+ }
+ }
break;
}
}
try
{
- _os.write(_handler->getReference()->getIdentity());
+ _os.write(_proxy->__reference()->getIdentity());
//
// For compatibility with the old FacetPath.
//
- if(_handler->getReference()->getFacet().empty())
+ if(_proxy->__reference()->getFacet().empty())
{
_os.write(static_cast<string*>(0), static_cast<string*>(0));
}
else
{
- string facet = _handler->getReference()->getFacet();
+ string facet = _proxy->__reference()->getFacet();
_os.write(&facet, &facet + 1);
}
@@ -155,8 +106,8 @@ IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation
//
// Implicit context
//
- const ImplicitContextIPtr& implicitContext = _handler->getReference()->getInstance()->getImplicitContext();
- const Context& prxContext = _handler->getReference()->getContext()->getValue();
+ const ImplicitContextIPtr& implicitContext = _proxy->__reference()->getInstance()->getImplicitContext();
+ const Context& prxContext = _proxy->__reference()->getContext()->getValue();
if(implicitContext == 0)
{
_os.write(prxContext);
@@ -181,28 +132,39 @@ bool
IceInternal::Outgoing::invoke()
{
assert(_state == StateUnsent);
-
- switch(_handler->getReference()->getMode())
+
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
{
- case Reference::ModeTwoway:
- case Reference::ModeOneway:
- case Reference::ModeDatagram:
+ _state = StateInProgress;
+ _handler->finishBatchRequest(&_os);
+ return true;
+ }
+
+ int cnt = 0;
+ while(true)
+ {
+ try
{
_state = StateInProgress;
+ _exception.reset(0);
+ _sent = false;
+
+ _handler = _proxy->__getRequestHandler(false);
if(_handler->sendRequest(this)) // Request sent and no response expected, we're done.
{
return true;
}
-
+
bool timedOut = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
-
+
//
// If the handler says it's not finished, we wait until we're done.
//
- int invocationTimeout = _handler->getReference()->getInvocationTimeout();
+ int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
if(invocationTimeout > 0)
{
IceUtil::Time now = IceUtil::Time::now();
@@ -210,7 +172,7 @@ IceInternal::Outgoing::invoke()
while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut)
{
_monitor.timedWait(deadline - now);
-
+
if((_state == StateInProgress || !_sent) && _state != StateFailed)
{
now = IceUtil::Time::now();
@@ -226,64 +188,53 @@ IceInternal::Outgoing::invoke()
}
}
}
-
+
if(timedOut)
{
_handler->requestTimedOut(this);
- assert(_exception.get());
- }
-
- if(_exception.get())
- {
- if(_exceptionWrapper)
- {
- throw LocalExceptionWrapper(*_exception.get(), _exceptionWrapperRetry);
- }
//
- // A CloseConnectionException indicates graceful
- // server shutdown, and is therefore always repeatable
- // without violating "at-most-once". That's because by
- // sending a close connection message, the server
- // guarantees that all outstanding requests can safely
- // be repeated.
- //
- // An ObjectNotExistException can always be retried as
- // well without violating "at-most-once" (see the
- // implementation of the checkRetryAfterException
- // method of the ProxyFactory class for the reasons
- // why it can be useful).
+ // Wait for the exception to propagate. It's possible the request handler ignores
+ // the timeout if there was a failure shortly before requestTimedOut got called.
+ // In this case, the exception should be set on the Outgoing.
//
- if(!_sent ||
- dynamic_cast<CloseConnectionException*>(_exception.get()) ||
- dynamic_cast<ObjectNotExistException*>(_exception.get()))
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!_exception.get())
{
- _exception->ice_throw();
+ _monitor.wait();
}
+ }
- //
- // Throw the exception wrapped in a LocalExceptionWrapper,
- // to indicate that the request cannot be resent without
- // potentially violating the "at-most-once" principle.
- //
- throw LocalExceptionWrapper(*_exception.get(), false);
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+ else
+ {
+ assert(_state != StateInProgress);
+ return _state == StateOK;
}
-
- assert(_state != StateInProgress);
- return _state == StateOK;
}
-
- case Reference::ModeBatchOneway:
- case Reference::ModeBatchDatagram:
+ catch(const RetryException&)
{
- //
- // For batch oneways and datagrams, the same rules as for
- // regular oneways and datagrams (see comment above)
- // apply.
- //
- _state = StateInProgress;
- _handler->finishBatchRequest(&_os);
- return true;
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
+ }
+ catch(const Ice::Exception& ex)
+ {
+ try
+ {
+ int interval = _proxy->__handleException(ex, _handler, _mode, _sent, cnt);
+ _observer.retried(); // Invocation is being retried.
+ if(interval > 0)
+ {
+ IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval));
+ }
+ }
+ catch(const Ice::Exception& ex)
+ {
+ _observer.failed(ex.ice_name());
+ throw;
+ }
}
}
@@ -301,8 +252,8 @@ IceInternal::Outgoing::abort(const LocalException& ex)
// notify the connection about that we give up ownership of the
// batch stream.
//
- if(_handler->getReference()->getMode() == Reference::ModeBatchOneway ||
- _handler->getReference()->getMode() == Reference::ModeBatchDatagram)
+ if(_proxy->__reference()->getMode() == Reference::ModeBatchOneway ||
+ _proxy->__reference()->getMode() == Reference::ModeBatchDatagram)
{
_handler->abortBatchRequest();
}
@@ -317,10 +268,16 @@ IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress
}
void
+IceInternal::Outgoing::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ handler->invokeRequest(this);
+}
+
+void
IceInternal::Outgoing::sent()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(_handler->getReference()->getMode() != Reference::ModeTwoway)
+ if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
{
_remoteObserver.detach();
_state = StateOK;
@@ -336,7 +293,7 @@ IceInternal::Outgoing::sent()
}
void
-IceInternal::Outgoing::finished(const LocalException& ex, bool sent)
+IceInternal::Outgoing::finished(const Exception& ex, bool sent)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
assert(_state <= StateInProgress);
@@ -350,26 +307,11 @@ IceInternal::Outgoing::finished(const LocalException& ex, bool sent)
}
void
-IceInternal::Outgoing::finished(const LocalExceptionWrapper& ex)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _remoteObserver.failed(ex.get()->ice_name());
- _remoteObserver.detach();
-
- _state = StateFailed;
- _exceptionWrapper = true;
- _exceptionWrapperRetry = ex.retry();
- _exception.reset(ex.get()->ice_clone());
- _sent = false;
- _monitor.notify();
-}
-
-void
IceInternal::Outgoing::finished(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(_handler->getReference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
+ assert(_proxy->__reference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
assert(_state <= StateInProgress);
if(_remoteObserver)
@@ -540,83 +482,120 @@ IceInternal::Outgoing::throwUserException()
}
}
-IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler, InvocationObserver& observer) :
- _handler(handler),
+IceInternal::BatchOutgoing::BatchOutgoing(IceProxy::Ice::Object* proxy, const string& name) :
+ _proxy(proxy),
_connection(0),
_sent(false),
- _os(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding),
- _observer(observer)
+ _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
+ _observer(proxy, name, 0)
{
- checkSupportedProtocol(handler->getReference()->getProtocol());
+ checkSupportedProtocol(proxy->__reference()->getProtocol());
}
-IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, InvocationObserver& observer) :
- _handler(0),
+IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, const string& name) :
+ _proxy(0),
_connection(connection),
_sent(false),
_os(instance, Ice::currentProtocolEncoding),
- _observer(observer)
+ _observer(instance, name)
{
}
void
IceInternal::BatchOutgoing::invoke()
{
- assert(_handler || _connection);
+ assert(_proxy || _connection);
- int timeout;
if(_connection)
{
if(_connection->flushBatchRequests(this))
{
return;
}
- timeout = -1;
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!_exception.get() && !_sent)
+ {
+ _monitor.wait();
+ }
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
+ return;
}
- else
+
+ RequestHandlerPtr handler;
+ try
{
- if(_handler->sendRequest(this))
+ handler = _proxy->__getRequestHandler(false);
+ if(handler->sendRequest(this))
{
return;
}
- timeout = _handler->getReference()->getInvocationTimeout();
- }
-
- bool timedOut = false;
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(timeout > 0)
+
+ bool timedOut = false;
{
- IceUtil::Time now = IceUtil::Time::now();
- IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(timeout);
- while(!_exception.get() && !_sent && !timedOut)
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ int timeout = _proxy->__reference()->getInvocationTimeout();
+ if(timeout > 0)
+ {
+ IceUtil::Time now = IceUtil::Time::now();
+ IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(timeout);
+ while(!_exception.get() && !_sent && !timedOut)
+ {
+ _monitor.timedWait(deadline - now);
+ if(!_exception.get() && !_sent)
+ {
+ now = IceUtil::Time::now();
+ timedOut = now >= deadline;
+ }
+ }
+ }
+ else
{
- _monitor.timedWait(deadline - now);
- if(!_exception.get() && !_sent)
+ while(!_exception.get() && !_sent)
{
- now = IceUtil::Time::now();
- timedOut = now >= deadline;
+ _monitor.wait();
}
}
}
- else
+
+ if(timedOut)
{
- while(!_exception.get() && !_sent)
+ handler->requestTimedOut(this);
+
+ //
+ // Wait for the exception to propagate. It's possible the request handler ignores
+ // the timeout if there was a failure shortly before requestTimedOut got called.
+ // In this case, the exception should be set on the Outgoing.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!_exception.get())
{
_monitor.wait();
}
}
+
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
}
-
- if(timedOut)
+ catch(const RetryException&)
{
- _handler->requestTimedOut(this);
- assert(_exception.get());
+ //
+ // Clear request handler but don't retry or throw. Retrying
+ // isn't useful, there were no batch requests associated with
+ // the proxy's request handler.
+ //
+ _proxy->__setRequestHandler(handler, 0);
}
-
- if(_exception.get())
+ catch(const Ice::Exception& ex)
{
- _exception->ice_throw();
+ _proxy->__setRequestHandler(handler, 0); // Clear request handler
+ _observer.failed(ex.ice_name());
+ throw; // Throw to notify the user that batch requests were potentially lost.
}
}
@@ -627,6 +606,12 @@ IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bo
}
void
+IceInternal::BatchOutgoing::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ handler->invokeBatchRequests(this);
+}
+
+void
IceInternal::BatchOutgoing::sent()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -643,7 +628,7 @@ IceInternal::BatchOutgoing::sent()
}
void
-IceInternal::BatchOutgoing::finished(const Ice::LocalException& ex, bool)
+IceInternal::BatchOutgoing::finished(const Ice::Exception& ex, bool)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
_remoteObserver.failed(ex.ice_name());