diff options
author | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2014-06-27 10:31:41 +0200 |
commit | a4f93259dc3494d98addf38e69b87eb557d432b3 (patch) | |
tree | d2b78bb5cea24e33dc1b46be22dba6167e96c9ed /cpp/src/Ice/Outgoing.cpp | |
parent | Fix for ICE-5515 (ice_staticId on proxies) in Java, C#, Python, Ruby and PHP ... (diff) | |
download | ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.bz2 ice-a4f93259dc3494d98addf38e69b87eb557d432b3.tar.xz ice-a4f93259dc3494d98addf38e69b87eb557d432b3.zip |
Better collocation optimization, fix for ICE-5489, ICE-5484
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 377 |
1 files changed, 181 insertions, 196 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index cb0a9ff53fb..73778c9dafd 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -9,7 +9,7 @@ #include <Ice/Outgoing.h> #include <Ice/Object.h> -#include <Ice/RequestHandler.h> +#include <Ice/CollocatedRequestHandler.h> #include <Ice/ConnectionI.h> #include <Ice/Reference.h> #include <Ice/Endpoint.h> @@ -24,87 +24,20 @@ using namespace Ice; using namespace Ice::Instrumentation; using namespace IceInternal; -namespace IceUtilInternal -{ - -extern bool ICE_DECLSPEC_IMPORT printStackTraces; - -} - -IceInternal::LocalExceptionWrapper::LocalExceptionWrapper(const LocalException& ex, bool r) : - _retry(r) -{ - _ex.reset(ex.ice_clone()); -} - -IceInternal::LocalExceptionWrapper::LocalExceptionWrapper(const LocalExceptionWrapper& ex) : - _retry(ex._retry) -{ - _ex.reset(ex.get()->ice_clone()); -} - -void -IceInternal::LocalExceptionWrapper::throwWrapper(const std::exception& ex) -{ - - const UserException* ue = dynamic_cast<const UserException*>(&ex); - if(ue) - { - stringstream s; - s << *ue; - throw LocalExceptionWrapper(UnknownUserException(__FILE__, __LINE__, s.str()), false); - } - - const LocalException* le = dynamic_cast<const LocalException*>(&ex); - if(le) - { - if(dynamic_cast<const UnknownException*>(le) || - dynamic_cast<const ObjectNotExistException*>(le) || - dynamic_cast<const OperationNotExistException*>(le) || - dynamic_cast<const FacetNotExistException*>(le)) - { - throw LocalExceptionWrapper(*le, false); - } - stringstream s; - s << *le; - if(IceUtilInternal::printStackTraces) - { - s << "\n" << le->ice_stackTrace(); - } - throw LocalExceptionWrapper(UnknownLocalException(__FILE__, __LINE__, s.str()), false); - } - string msg = "std::exception: "; - throw LocalExceptionWrapper(UnknownException(__FILE__, __LINE__, msg + ex.what()), false); -} - -const LocalException* -IceInternal::LocalExceptionWrapper::get() const -{ - assert(_ex.get()); - return _ex.get(); -} - -bool -IceInternal::LocalExceptionWrapper::retry() const -{ - return _retry; -} - -IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation, OperationMode mode, - const Context* context, InvocationObserver& observer) : - _handler(handler), - _exceptionWrapper(false), - _exceptionWrapperRetry(false), - _observer(observer), +IceInternal::Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, + const Context* context) : + _proxy(proxy), + _mode(mode), + _observer(proxy, operation, context), _state(StateUnsent), - _encoding(getCompatibleEncoding(handler->getReference()->getEncoding())), - _is(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding), - _os(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding), + _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())), + _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), + _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), _sent(false) { - checkSupportedProtocol(getCompatibleProtocol(handler->getReference()->getProtocol())); + checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); - switch(_handler->getReference()->getMode()) + switch(_proxy->__reference()->getMode()) { case Reference::ModeTwoway: case Reference::ModeOneway: @@ -117,25 +50,43 @@ IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation case Reference::ModeBatchOneway: case Reference::ModeBatchDatagram: { - _handler->prepareBatchRequest(&_os); + while(true) + { + try + { + _handler = proxy->__getRequestHandler(true); + _handler->prepareBatchRequest(&_os); + break; + } + catch(const RetryException&) + { + _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. + } + catch(const Ice::LocalException& ex) + { + _observer.failed(ex.ice_name()); + _proxy->__setRequestHandler(_handler, 0); // Clear request handler + throw; + } + } break; } } try { - _os.write(_handler->getReference()->getIdentity()); + _os.write(_proxy->__reference()->getIdentity()); // // For compatibility with the old FacetPath. // - if(_handler->getReference()->getFacet().empty()) + if(_proxy->__reference()->getFacet().empty()) { _os.write(static_cast<string*>(0), static_cast<string*>(0)); } else { - string facet = _handler->getReference()->getFacet(); + string facet = _proxy->__reference()->getFacet(); _os.write(&facet, &facet + 1); } @@ -155,8 +106,8 @@ IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation // // Implicit context // - const ImplicitContextIPtr& implicitContext = _handler->getReference()->getInstance()->getImplicitContext(); - const Context& prxContext = _handler->getReference()->getContext()->getValue(); + const ImplicitContextIPtr& implicitContext = _proxy->__reference()->getInstance()->getImplicitContext(); + const Context& prxContext = _proxy->__reference()->getContext()->getValue(); if(implicitContext == 0) { _os.write(prxContext); @@ -181,28 +132,39 @@ bool IceInternal::Outgoing::invoke() { assert(_state == StateUnsent); - - switch(_handler->getReference()->getMode()) + + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - case Reference::ModeTwoway: - case Reference::ModeOneway: - case Reference::ModeDatagram: + _state = StateInProgress; + _handler->finishBatchRequest(&_os); + return true; + } + + int cnt = 0; + while(true) + { + try { _state = StateInProgress; + _exception.reset(0); + _sent = false; + + _handler = _proxy->__getRequestHandler(false); if(_handler->sendRequest(this)) // Request sent and no response expected, we're done. { return true; } - + bool timedOut = false; { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - + // // If the handler says it's not finished, we wait until we're done. // - int invocationTimeout = _handler->getReference()->getInvocationTimeout(); + int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); if(invocationTimeout > 0) { IceUtil::Time now = IceUtil::Time::now(); @@ -210,7 +172,7 @@ IceInternal::Outgoing::invoke() while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut) { _monitor.timedWait(deadline - now); - + if((_state == StateInProgress || !_sent) && _state != StateFailed) { now = IceUtil::Time::now(); @@ -226,64 +188,53 @@ IceInternal::Outgoing::invoke() } } } - + if(timedOut) { _handler->requestTimedOut(this); - assert(_exception.get()); - } - - if(_exception.get()) - { - if(_exceptionWrapper) - { - throw LocalExceptionWrapper(*_exception.get(), _exceptionWrapperRetry); - } // - // A CloseConnectionException indicates graceful - // server shutdown, and is therefore always repeatable - // without violating "at-most-once". That's because by - // sending a close connection message, the server - // guarantees that all outstanding requests can safely - // be repeated. - // - // An ObjectNotExistException can always be retried as - // well without violating "at-most-once" (see the - // implementation of the checkRetryAfterException - // method of the ProxyFactory class for the reasons - // why it can be useful). + // Wait for the exception to propagate. It's possible the request handler ignores + // the timeout if there was a failure shortly before requestTimedOut got called. + // In this case, the exception should be set on the Outgoing. // - if(!_sent || - dynamic_cast<CloseConnectionException*>(_exception.get()) || - dynamic_cast<ObjectNotExistException*>(_exception.get())) + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!_exception.get()) { - _exception->ice_throw(); + _monitor.wait(); } + } - // - // Throw the exception wrapped in a LocalExceptionWrapper, - // to indicate that the request cannot be resent without - // potentially violating the "at-most-once" principle. - // - throw LocalExceptionWrapper(*_exception.get(), false); + if(_exception.get()) + { + _exception->ice_throw(); + } + else + { + assert(_state != StateInProgress); + return _state == StateOK; } - - assert(_state != StateInProgress); - return _state == StateOK; } - - case Reference::ModeBatchOneway: - case Reference::ModeBatchDatagram: + catch(const RetryException&) { - // - // For batch oneways and datagrams, the same rules as for - // regular oneways and datagrams (see comment above) - // apply. - // - _state = StateInProgress; - _handler->finishBatchRequest(&_os); - return true; + _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. + } + catch(const Ice::Exception& ex) + { + try + { + int interval = _proxy->__handleException(ex, _handler, _mode, _sent, cnt); + _observer.retried(); // Invocation is being retried. + if(interval > 0) + { + IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); + } + } + catch(const Ice::Exception& ex) + { + _observer.failed(ex.ice_name()); + throw; + } } } @@ -301,8 +252,8 @@ IceInternal::Outgoing::abort(const LocalException& ex) // notify the connection about that we give up ownership of the // batch stream. // - if(_handler->getReference()->getMode() == Reference::ModeBatchOneway || - _handler->getReference()->getMode() == Reference::ModeBatchDatagram) + if(_proxy->__reference()->getMode() == Reference::ModeBatchOneway || + _proxy->__reference()->getMode() == Reference::ModeBatchDatagram) { _handler->abortBatchRequest(); } @@ -317,10 +268,16 @@ IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress } void +IceInternal::Outgoing::invokeCollocated(CollocatedRequestHandler* handler) +{ + handler->invokeRequest(this); +} + +void IceInternal::Outgoing::sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_handler->getReference()->getMode() != Reference::ModeTwoway) + if(_proxy->__reference()->getMode() != Reference::ModeTwoway) { _remoteObserver.detach(); _state = StateOK; @@ -336,7 +293,7 @@ IceInternal::Outgoing::sent() } void -IceInternal::Outgoing::finished(const LocalException& ex, bool sent) +IceInternal::Outgoing::finished(const Exception& ex, bool sent) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); assert(_state <= StateInProgress); @@ -350,26 +307,11 @@ IceInternal::Outgoing::finished(const LocalException& ex, bool sent) } void -IceInternal::Outgoing::finished(const LocalExceptionWrapper& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _remoteObserver.failed(ex.get()->ice_name()); - _remoteObserver.detach(); - - _state = StateFailed; - _exceptionWrapper = true; - _exceptionWrapperRetry = ex.retry(); - _exception.reset(ex.get()->ice_clone()); - _sent = false; - _monitor.notify(); -} - -void IceInternal::Outgoing::finished(BasicStream& is) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(_handler->getReference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways. + assert(_proxy->__reference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways. assert(_state <= StateInProgress); if(_remoteObserver) @@ -540,83 +482,120 @@ IceInternal::Outgoing::throwUserException() } } -IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler, InvocationObserver& observer) : - _handler(handler), +IceInternal::BatchOutgoing::BatchOutgoing(IceProxy::Ice::Object* proxy, const string& name) : + _proxy(proxy), _connection(0), _sent(false), - _os(handler->getReference()->getInstance().get(), Ice::currentProtocolEncoding), - _observer(observer) + _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), + _observer(proxy, name, 0) { - checkSupportedProtocol(handler->getReference()->getProtocol()); + checkSupportedProtocol(proxy->__reference()->getProtocol()); } -IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, InvocationObserver& observer) : - _handler(0), +IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, const string& name) : + _proxy(0), _connection(connection), _sent(false), _os(instance, Ice::currentProtocolEncoding), - _observer(observer) + _observer(instance, name) { } void IceInternal::BatchOutgoing::invoke() { - assert(_handler || _connection); + assert(_proxy || _connection); - int timeout; if(_connection) { if(_connection->flushBatchRequests(this)) { return; } - timeout = -1; + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!_exception.get() && !_sent) + { + _monitor.wait(); + } + if(_exception.get()) + { + _exception->ice_throw(); + } + return; } - else + + RequestHandlerPtr handler; + try { - if(_handler->sendRequest(this)) + handler = _proxy->__getRequestHandler(false); + if(handler->sendRequest(this)) { return; } - timeout = _handler->getReference()->getInvocationTimeout(); - } - - bool timedOut = false; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(timeout > 0) + + bool timedOut = false; { - IceUtil::Time now = IceUtil::Time::now(); - IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(timeout); - while(!_exception.get() && !_sent && !timedOut) + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + int timeout = _proxy->__reference()->getInvocationTimeout(); + if(timeout > 0) + { + IceUtil::Time now = IceUtil::Time::now(); + IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(timeout); + while(!_exception.get() && !_sent && !timedOut) + { + _monitor.timedWait(deadline - now); + if(!_exception.get() && !_sent) + { + now = IceUtil::Time::now(); + timedOut = now >= deadline; + } + } + } + else { - _monitor.timedWait(deadline - now); - if(!_exception.get() && !_sent) + while(!_exception.get() && !_sent) { - now = IceUtil::Time::now(); - timedOut = now >= deadline; + _monitor.wait(); } } } - else + + if(timedOut) { - while(!_exception.get() && !_sent) + handler->requestTimedOut(this); + + // + // Wait for the exception to propagate. It's possible the request handler ignores + // the timeout if there was a failure shortly before requestTimedOut got called. + // In this case, the exception should be set on the Outgoing. + // + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!_exception.get()) { _monitor.wait(); } } + + if(_exception.get()) + { + _exception->ice_throw(); + } } - - if(timedOut) + catch(const RetryException&) { - _handler->requestTimedOut(this); - assert(_exception.get()); + // + // Clear request handler but don't retry or throw. Retrying + // isn't useful, there were no batch requests associated with + // the proxy's request handler. + // + _proxy->__setRequestHandler(handler, 0); } - - if(_exception.get()) + catch(const Ice::Exception& ex) { - _exception->ice_throw(); + _proxy->__setRequestHandler(handler, 0); // Clear request handler + _observer.failed(ex.ice_name()); + throw; // Throw to notify the user that batch requests were potentially lost. } } @@ -627,6 +606,12 @@ IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bo } void +IceInternal::BatchOutgoing::invokeCollocated(CollocatedRequestHandler* handler) +{ + handler->invokeBatchRequests(this); +} + +void IceInternal::BatchOutgoing::sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -643,7 +628,7 @@ IceInternal::BatchOutgoing::sent() } void -IceInternal::BatchOutgoing::finished(const Ice::LocalException& ex, bool) +IceInternal::BatchOutgoing::finished(const Ice::Exception& ex, bool) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); _remoteObserver.failed(ex.ice_name()); |