diff options
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 693 |
1 files changed, 0 insertions, 693 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp deleted file mode 100644 index 592a9b2f9d4..00000000000 --- a/cpp/src/Ice/Outgoing.cpp +++ /dev/null @@ -1,693 +0,0 @@ -// ********************************************************************** -// -// Copyright (c) 2003-2016 ZeroC, Inc. All rights reserved. -// -// This copy of Ice is licensed to you under the terms described in the -// ICE_LICENSE file included in this distribution. -// -// ********************************************************************** - -#include <Ice/Outgoing.h> -#include <Ice/ConnectionI.h> -#include <Ice/CollocatedRequestHandler.h> -#include <Ice/Reference.h> -#include <Ice/Instance.h> -#include <Ice/LocalException.h> -#include <Ice/ReplyStatus.h> -#include <Ice/ImplicitContextI.h> - -using namespace std; -using namespace IceUtil; -using namespace Ice; -using namespace Ice::Instrumentation; -using namespace IceInternal; - -OutgoingBase::~OutgoingBase() -{ - // Out of line to avoid weak vtable -} - -OutgoingBase::OutgoingBase(Instance* instance) : _os(instance, Ice::currentProtocolEncoding), _sent(false) -{ -} - -ProxyOutgoingBase::ProxyOutgoingBase(const Ice::ObjectPrxPtr& proxy, OperationMode mode) : - OutgoingBase(proxy->__reference()->getInstance().get()), - _proxy(proxy), - _mode(mode), - _state(StateUnsent) -{ - int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); - if(invocationTimeout > 0) - { - _invocationTimeoutDeadline = Time::now(Time::Monotonic) + Time::milliSeconds(invocationTimeout); - } -} - -ProxyOutgoingBase::~ProxyOutgoingBase() -{ -} - -void -ProxyOutgoingBase::sent() -{ - Monitor<Mutex>::Lock sync(_monitor); - if(_proxy->__reference()->getMode() != Reference::ModeTwoway) - { - _childObserver.detach(); - _state = StateOK; - } - _sent = true; - _monitor.notify(); - - // - // NOTE: At this point the stack allocated ProxyOutgoingBase object can be destroyed - // since the notify() on the monitor will release the thread waiting on the - // synchronous Ice call. - // -} - -void -ProxyOutgoingBase::completed(const Ice::Exception& ex) -{ - Monitor<Mutex>::Lock sync(_monitor); - //assert(_state <= StateInProgress); - if(_state > StateInProgress) - { - // - // Response was already received but message - // didn't get removed first from the connection - // send message queue so it's possible we can be - // notified of failures. In this case, ignore the - // failure and assume the outgoing has been sent. - // - assert(_state != StateFailed); - _sent = true; - _monitor.notify(); - return; - } - - _childObserver.failed(ex.ice_id()); - _childObserver.detach(); - - _state = StateFailed; - ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone()); - _monitor.notify(); -} - -void -ProxyOutgoingBase::completed(InputStream&) -{ - assert(false); // Must be overriden -} - -void -ProxyOutgoingBase::retryException(const Ice::Exception&) -{ - Monitor<Mutex>::Lock sync(_monitor); - assert(_state <= StateInProgress); - _state = StateRetry; - _monitor.notify(); -} - -bool -ProxyOutgoingBase::invokeImpl() -{ - assert(_state == StateUnsent); - - const int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); - int cnt = 0; - while(true) - { - try - { - if(invocationTimeout > 0 && _invocationTimeoutDeadline <= Time::now(Time::Monotonic)) - { - throw Ice::InvocationTimeoutException(__FILE__, __LINE__); - } - - _state = StateInProgress; - _exception.reset(); - _sent = false; - - _handler = _proxy->__getRequestHandler(); - - if(_handler->sendRequest(this)) // Request sent and no response expected, we're done. - { - return true; - } - - if(invocationTimeout == -2) // Use the connection timeout - { - try - { - _invocationTimeoutDeadline = Time(); // Reset any previously set value - - int timeout = _handler->waitForConnection()->timeout(); - if(timeout > 0) - { - _invocationTimeoutDeadline = Time::now(Time::Monotonic) + Time::milliSeconds(timeout); - } - } - catch(const Ice::LocalException&) - { - } - } - - bool timedOut = false; - { - Monitor<Mutex>::Lock sync(_monitor); - // - // If the handler says it's not finished, we wait until we're done. - // - if(_invocationTimeoutDeadline != Time()) - { - Time now = Time::now(Time::Monotonic); - timedOut = now >= _invocationTimeoutDeadline; - while((_state == StateInProgress || !_sent) && _state != StateFailed && _state != StateRetry) - { - if(timedOut) - { - break; - } - _monitor.timedWait(_invocationTimeoutDeadline - now); - - if((_state == StateInProgress || !_sent) && _state != StateFailed) - { - now = Time::now(Time::Monotonic); - timedOut = now >= _invocationTimeoutDeadline; - } - } - } - else - { - while((_state == StateInProgress || !_sent) && _state != StateFailed && _state != StateRetry) - { - _monitor.wait(); - } - } - } - - if(timedOut) - { - if(invocationTimeout == -2) - { - _handler->requestCanceled(this, ConnectionTimeoutException(__FILE__, __LINE__)); - } - else - { - _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); - } - - // - // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestCanceled got called. - // In this case, the exception should be set on the ProxyOutgoingBase. - // - Monitor<Mutex>::Lock sync(_monitor); - while(_state == StateInProgress) - { - _monitor.wait(); - } - } - - if(_exception) - { - _exception->ice_throw(); - } - else if(_state == StateRetry) - { - _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and retry. - continue; - } - else - { - assert(_state != StateInProgress); - return _state == StateOK; - } - } - catch(const RetryException&) - { - _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and retry. - } - catch(const Ice::Exception& ex) - { - try - { - Time interval; - interval = Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt)); - if(interval > Time()) - { - if(invocationTimeout > 0) - { - IceUtil::Time now = Time::now(Time::Monotonic); - IceUtil::Time retryDeadline = now + interval; - - // - // Wait until either the retry and invocation timeout deadline is reached. - // Note that we're using a loop here because sleep() precision isn't as - // good as the motonic clock and it can return few hundred micro-seconds - // earlier which breaks the check for the invocation timeout. - // - while(retryDeadline > now && _invocationTimeoutDeadline > now) - { - if(retryDeadline < _invocationTimeoutDeadline) - { - ThreadControl::sleep(retryDeadline - now); - } - else if(_invocationTimeoutDeadline > now) - { - ThreadControl::sleep(_invocationTimeoutDeadline - now); - } - now = Time::now(Time::Monotonic); - } - if(now >= _invocationTimeoutDeadline) - { - throw Ice::InvocationTimeoutException(__FILE__, __LINE__); - } - } - else - { - ThreadControl::sleep(interval); - } - } - _observer.retried(); - } - catch(const Ice::Exception& ex) - { - _observer.failed(ex.ice_id()); - throw; - } - } - } - - assert(false); - return false; -} - -Outgoing::Outgoing(const Ice::ObjectPrxPtr& proxy, const string& operation, OperationMode mode, const Context& context) : - ProxyOutgoingBase(proxy, mode), - _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())), - _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _operation(operation) -{ - checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); - _observer.attach(proxy, operation, context); - switch(_proxy->__reference()->getMode()) - { - case Reference::ModeTwoway: - case Reference::ModeOneway: - case Reference::ModeDatagram: - { - _os.writeBlob(requestHdr, sizeof(requestHdr)); - break; - } - - case Reference::ModeBatchOneway: - case Reference::ModeBatchDatagram: - { - _proxy->__getBatchRequestQueue()->prepareBatchRequest(&_os); - break; - } - } - - try - { - _os.write(_proxy->__reference()->getIdentity()); - - // - // For compatibility with the old FacetPath. - // - if(_proxy->__reference()->getFacet().empty()) - { - _os.write(static_cast<string*>(0), static_cast<string*>(0)); - } - else - { - string facet = _proxy->__reference()->getFacet(); - _os.write(&facet, &facet + 1); - } - - _os.write(operation, false); - - _os.write(static_cast<Ice::Byte>(mode)); - - if(&context != &Ice::noExplicitContext) - { - // - // Explicit context - // - _os.write(context); - } - else - { - // - // Implicit context - // - const ImplicitContextIPtr& implicitContext = _proxy->__reference()->getInstance()->getImplicitContext(); - const Context& prxContext = _proxy->__reference()->getContext()->getValue(); - if(implicitContext == 0) - { - _os.write(prxContext); - } - else - { - implicitContext->write(prxContext, &_os); - } - } - } - catch(const LocalException& ex) - { - abort(ex); - } -} - -Outgoing::~Outgoing() -{ -} - -bool -Outgoing::invokeRemote(const Ice::ConnectionIPtr& connection, bool compress, bool response) -{ - return connection->sendRequest(this, compress, response, 0); -} - -void -Outgoing::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeRequest(this, 0); -} - -bool -Outgoing::invoke() -{ - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _state = StateInProgress; - _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, _operation); - return true; - } - return invokeImpl(); -} - -void -Outgoing::abort(const LocalException& ex) -{ - assert(_state == StateUnsent); - - // - // If we didn't finish a batch oneway or datagram request, we must - // notify the connection about that we give up ownership of the - // batch stream. - // - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os); - } - - ex.ice_throw(); -} - -void -Outgoing::completed(InputStream& is) -{ - Monitor<Mutex>::Lock sync(_monitor); - - assert(_proxy->__reference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways. - - assert(_state <= StateInProgress); - if(_childObserver) - { - _childObserver->reply(static_cast<Int>(is.b.size() - headerSize - 4)); - } - _childObserver.detach(); - - _is.swap(is); - - Ice::Byte replyStatus; - _is.read(replyStatus); - - switch(replyStatus) - { - case replyOK: - { - _state = StateOK; // The state must be set last, in case there is an exception. - break; - } - - case replyUserException: - { - _observer.userException(); - _state = StateUserException; // The state must be set last, in case there is an exception. - break; - } - - case replyObjectNotExist: - case replyFacetNotExist: - case replyOperationNotExist: - { - // - // Don't read the exception members directly into the - // exception. Otherwise if reading fails and raises an - // exception, you will have a memory leak. - // - Identity ident; - _is.read(ident); - - // - // For compatibility with the old FacetPath. - // - vector<string> facetPath; - _is.read(facetPath); - string facet; - if(!facetPath.empty()) - { - if(facetPath.size() > 1) - { - throw MarshalException(__FILE__, __LINE__); - } - facet.swap(facetPath[0]); - } - - string operation; - _is.read(operation, false); - - RequestFailedException* ex; - switch(replyStatus) - { - case replyObjectNotExist: - { - ex = new ObjectNotExistException(__FILE__, __LINE__); - break; - } - - case replyFacetNotExist: - { - ex = new FacetNotExistException(__FILE__, __LINE__); - break; - } - - case replyOperationNotExist: - { - ex = new OperationNotExistException(__FILE__, __LINE__); - break; - } - - default: - { - ex = 0; // To keep the compiler from complaining. - assert(false); - break; - } - } - - ex->id = ident; - ex->facet = facet; - ex->operation = operation; - _exception.reset(ex); // adopt - _state = StateLocalException; // The state must be set last, in case there is an exception. - break; - } - - case replyUnknownException: - case replyUnknownLocalException: - case replyUnknownUserException: - { - // - // Don't read the exception members directly into the - // exception. Otherwise if reading fails and raises an - // exception, you will have a memory leak. - // - string unknown; - _is.read(unknown, false); - - UnknownException* ex; - switch(replyStatus) - { - case replyUnknownException: - { - ex = new UnknownException(__FILE__, __LINE__); - break; - } - - case replyUnknownLocalException: - { - ex = new UnknownLocalException(__FILE__, __LINE__); - break; - } - - case replyUnknownUserException: - { - ex = new UnknownUserException(__FILE__, __LINE__); - break; - } - - default: - { - ex = 0; // To keep the compiler from complaining. - assert(false); - break; - } - } - - ex->unknown = unknown; - _exception.reset(ex); // adopt - _state = StateLocalException; // The state must be set last, in case there is an exception. - break; - } - - default: - { - _exception.reset(new UnknownReplyStatusException(__FILE__, __LINE__)); - _state = StateLocalException; - break; - } - } - - _monitor.notify(); -} - -void -Outgoing::throwUserException() -{ - try - { - _is.startEncapsulation(); - _is.throwException(); - } - catch(const Ice::UserException&) - { - _is.endEncapsulation(); - throw; - } -} - -ProxyFlushBatch::ProxyFlushBatch(const Ice::ObjectPrxPtr& proxy, const string& operation) : - ProxyOutgoingBase(proxy, ICE_ENUM(OperationMode, Normal)) -{ - checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); - _observer.attach(proxy, operation, ::Ice::noExplicitContext); - - _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os); -} - -bool -ProxyFlushBatch::invokeRemote(const Ice::ConnectionIPtr& connection, bool compress, bool response) -{ - return connection->sendRequest(this, compress, response, _batchRequestNum); -} - -void -ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeRequest(this, _batchRequestNum); -} - -void -ProxyFlushBatch::invoke() -{ - if(_batchRequestNum == 0) - { - sent(); - } - else - { - invokeImpl(); - } -} - -ConnectionFlushBatch::ConnectionFlushBatch(ConnectionI* connection, Instance* instance, const string& operation) : - OutgoingBase(instance), _connection(connection) -{ - _observer.attach(instance, operation); -} - -void -ConnectionFlushBatch::invoke() -{ - int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os); - - try - { - if(batchRequestNum == 0) - { - sent(); - } - else if(!_connection->sendRequest(this, false, false, batchRequestNum)) - { - Monitor<Mutex>::Lock sync(_monitor); - while(!_exception && !_sent) - { - _monitor.wait(); - } - if(_exception) - { - _exception->ice_throw(); - } - } - } - catch(const RetryException& ex) - { - ex.get()->ice_throw(); - } -} - -void -ConnectionFlushBatch::sent() -{ - Monitor<Mutex>::Lock sync(_monitor); - _childObserver.detach(); - - _sent = true; - _monitor.notify(); - - // - // NOTE: At this point the stack allocated ConnectionFlushBatch - // object can be destroyed since the notify() on the monitor will - // release the thread waiting on the synchronous Ice call. - // -} - -void -ConnectionFlushBatch::completed(const Ice::Exception& ex) -{ - Monitor<Mutex>::Lock sync(_monitor); - _childObserver.failed(ex.ice_id()); - _childObserver.detach(); - ICE_SET_EXCEPTION_FROM_CLONE(_exception, ex.ice_clone()); - _monitor.notify(); -} - -void -ConnectionFlushBatch::completed(InputStream&) -{ - assert(false); -} - -void -ConnectionFlushBatch::retryException(const Ice::Exception& ex) -{ - completed(ex); -} |