diff options
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 461 |
1 files changed, 208 insertions, 253 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 3699c3ee29a..23cccd2708a 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -23,134 +23,29 @@ using namespace Ice; using namespace Ice::Instrumentation; using namespace IceInternal; -OutgoingBase::OutgoingBase(Instance* instance, const string& operation) : - _os(instance, Ice::currentProtocolEncoding), _sent(false) +OutgoingBase::OutgoingBase(Instance* instance) : _os(instance, Ice::currentProtocolEncoding), _sent(false) { } -Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) : - OutgoingBase(proxy->__reference()->getInstance().get(), operation), +ProxyOutgoingBase::ProxyOutgoingBase(IceProxy::Ice::Object* proxy, OperationMode mode) : + OutgoingBase(proxy->__reference()->getInstance().get()), _proxy(proxy), _mode(mode), - _state(StateUnsent), - _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())), - _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding) -{ - checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); - - _observer.attach(proxy, operation, context); - + _state(StateUnsent) +{ int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); if(invocationTimeout > 0) { _invocationTimeoutDeadline = Time::now(Time::Monotonic) + Time::milliSeconds(invocationTimeout); } - - switch(_proxy->__reference()->getMode()) - { - case Reference::ModeTwoway: - case Reference::ModeOneway: - case Reference::ModeDatagram: - { - _os.writeBlob(requestHdr, sizeof(requestHdr)); - break; - } - - case Reference::ModeBatchOneway: - case Reference::ModeBatchDatagram: - { - while(true) - { - try - { - _handler = proxy->__getRequestHandler(); - _handler->prepareBatchRequest(&_os); - break; - } - catch(const RetryException&) - { - _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. - } - catch(const Ice::LocalException& ex) - { - _observer.failed(ex.ice_name()); - _proxy->__setRequestHandler(_handler, 0); // Clear request handler - throw; - } - } - break; - } - } - - try - { - _os.write(_proxy->__reference()->getIdentity()); - - // - // For compatibility with the old FacetPath. - // - if(_proxy->__reference()->getFacet().empty()) - { - _os.write(static_cast<string*>(0), static_cast<string*>(0)); - } - else - { - string facet = _proxy->__reference()->getFacet(); - _os.write(&facet, &facet + 1); - } - - _os.write(operation, false); - - _os.write(static_cast<Ice::Byte>(mode)); - - if(context != 0) - { - // - // Explicit context - // - _os.write(*context); - } - else - { - // - // Implicit context - // - const ImplicitContextIPtr& implicitContext = _proxy->__reference()->getInstance()->getImplicitContext(); - const Context& prxContext = _proxy->__reference()->getContext()->getValue(); - if(implicitContext == 0) - { - _os.write(prxContext); - } - else - { - implicitContext->write(prxContext, &_os); - } - } - } - catch(const LocalException& ex) - { - abort(ex); - } } -Outgoing::~Outgoing() -{ -} - -bool -Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +ProxyOutgoingBase::~ProxyOutgoingBase() { - return connection->sendRequest(this, compress, response); } void -Outgoing::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeRequest(this); -} - -void -Outgoing::sent() +ProxyOutgoingBase::sent() { Monitor<Mutex>::Lock sync(_monitor); if(_proxy->__reference()->getMode() != Reference::ModeTwoway) @@ -162,14 +57,14 @@ Outgoing::sent() _monitor.notify(); // - // NOTE: At this point the stack allocated Outgoing object can be destroyed + // NOTE: At this point the stack allocated ProxyOutgoingBase object can be destroyed // since the notify() on the monitor will release the thread waiting on the // synchronous Ice call. // } void -Outgoing::completed(const Ice::Exception& ex) +ProxyOutgoingBase::completed(const Ice::Exception& ex) { Monitor<Mutex>::Lock sync(_monitor); //assert(_state <= StateInProgress); @@ -197,7 +92,13 @@ Outgoing::completed(const Ice::Exception& ex) } void -Outgoing::retryException(const Ice::Exception&) +ProxyOutgoingBase::completed(BasicStream& is) +{ + assert(false); // Must be overriden +} + +void +ProxyOutgoingBase::retryException(const Ice::Exception&) { Monitor<Mutex>::Lock sync(_monitor); assert(_state <= StateInProgress); @@ -206,22 +107,14 @@ Outgoing::retryException(const Ice::Exception&) } bool -Outgoing::invoke() +ProxyOutgoingBase::invokeImpl() { assert(_state == StateUnsent); - - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _state = StateInProgress; - _handler->finishBatchRequest(&_os); - return true; - } const int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); int cnt = 0; while(true) - { + { try { if(invocationTimeout > 0 && _invocationTimeoutDeadline <= Time::now(Time::Monotonic)) @@ -256,7 +149,7 @@ Outgoing::invoke() { } } - + bool timedOut = false; { Monitor<Mutex>::Lock sync(_monitor); @@ -274,7 +167,7 @@ Outgoing::invoke() break; } _monitor.timedWait(_invocationTimeoutDeadline - now); - + if((_state == StateInProgress || !_sent) && _state != StateFailed) { now = Time::now(Time::Monotonic); @@ -290,7 +183,7 @@ Outgoing::invoke() } } } - + if(timedOut) { if(invocationTimeout == -2) @@ -304,8 +197,8 @@ Outgoing::invoke() // // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestCanceled got called. - // In this case, the exception should be set on the Outgoing. + // the timeout if there was a failure shortly before requestCanceled got called. + // In this case, the exception should be set on the ProxyOutgoingBase. // Monitor<Mutex>::Lock sync(_monitor); while(_state == StateInProgress) @@ -313,14 +206,14 @@ Outgoing::invoke() _monitor.wait(); } } - + if(_exception.get()) { _exception->ice_throw(); } else if(_state == StateRetry) { - _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. + _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and retry. continue; } else @@ -331,7 +224,7 @@ Outgoing::invoke() } catch(const RetryException&) { - _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. + _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and retry. } catch(const Ice::Exception& ex) { @@ -348,8 +241,8 @@ Outgoing::invoke() // // Wait until either the retry and invocation timeout deadline is reached. - // Note that we're using a loop here because sleep() precision isn't as - // good as the motonic clock and it can return few hundred micro-seconds + // Note that we're using a loop here because sleep() precision isn't as + // good as the motonic clock and it can return few hundred micro-seconds // earlier which breaks the check for the invocation timeout. // while(retryDeadline > now && _invocationTimeoutDeadline > now) @@ -388,22 +281,129 @@ Outgoing::invoke() return false; } +Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) : + ProxyOutgoingBase(proxy, mode), + _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())), + _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), + _operation(operation) +{ + checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); + _observer.attach(proxy, operation, context); + + switch(_proxy->__reference()->getMode()) + { + case Reference::ModeTwoway: + case Reference::ModeOneway: + case Reference::ModeDatagram: + { + _os.writeBlob(requestHdr, sizeof(requestHdr)); + break; + } + + case Reference::ModeBatchOneway: + case Reference::ModeBatchDatagram: + { + _proxy->__getBatchRequestQueue()->prepareBatchRequest(&_os); + break; + } + } + + try + { + _os.write(_proxy->__reference()->getIdentity()); + + // + // For compatibility with the old FacetPath. + // + if(_proxy->__reference()->getFacet().empty()) + { + _os.write(static_cast<string*>(0), static_cast<string*>(0)); + } + else + { + string facet = _proxy->__reference()->getFacet(); + _os.write(&facet, &facet + 1); + } + + _os.write(operation, false); + + _os.write(static_cast<Ice::Byte>(mode)); + + if(context != 0) + { + // + // Explicit context + // + _os.write(*context); + } + else + { + // + // Implicit context + // + const ImplicitContextIPtr& implicitContext = _proxy->__reference()->getInstance()->getImplicitContext(); + const Context& prxContext = _proxy->__reference()->getContext()->getValue(); + if(implicitContext == 0) + { + _os.write(prxContext); + } + else + { + implicitContext->write(prxContext, &_os); + } + } + } + catch(const LocalException& ex) + { + abort(ex); + } +} + +Outgoing::~Outgoing() +{ +} + +bool +Outgoing::invokeRemote(const Ice::ConnectionIPtr& connection, bool compress, bool response) +{ + return connection->sendRequest(this, compress, response, 0); +} + +void +Outgoing::invokeCollocated(CollocatedRequestHandler* handler) +{ + handler->invokeRequest(this, 0); +} + +bool +Outgoing::invoke() +{ + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) + { + _state = StateInProgress; + _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, _operation); + return true; + } + return invokeImpl(); +} + void Outgoing::abort(const LocalException& ex) { assert(_state == StateUnsent); - + // // If we didn't finish a batch oneway or datagram request, we must // notify the connection about that we give up ownership of the // batch stream. // - if(_proxy->__reference()->getMode() == Reference::ModeBatchOneway || - _proxy->__reference()->getMode() == Reference::ModeBatchDatagram) + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - _handler->abortBatchRequest(); + _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os); } - + ex.ice_throw(); } @@ -425,7 +425,7 @@ Outgoing::completed(BasicStream& is) Ice::Byte replyStatus; _is.read(replyStatus); - + switch(replyStatus) { case replyOK: @@ -433,14 +433,14 @@ Outgoing::completed(BasicStream& is) _state = StateOK; // The state must be set last, in case there is an exception. break; } - + case replyUserException: { _observer.userException(); _state = StateUserException; // The state must be set last, in case there is an exception. break; } - + case replyObjectNotExist: case replyFacetNotExist: case replyOperationNotExist: @@ -470,7 +470,7 @@ Outgoing::completed(BasicStream& is) string operation; _is.read(operation, false); - + RequestFailedException* ex; switch(replyStatus) { @@ -479,19 +479,19 @@ Outgoing::completed(BasicStream& is) ex = new ObjectNotExistException(__FILE__, __LINE__); break; } - + case replyFacetNotExist: { ex = new FacetNotExistException(__FILE__, __LINE__); break; } - + case replyOperationNotExist: { ex = new OperationNotExistException(__FILE__, __LINE__); break; } - + default: { ex = 0; // To keep the compiler from complaining. @@ -499,7 +499,7 @@ Outgoing::completed(BasicStream& is) break; } } - + ex->id = ident; ex->facet = facet; ex->operation = operation; @@ -508,7 +508,7 @@ Outgoing::completed(BasicStream& is) _state = StateLocalException; // The state must be set last, in case there is an exception. break; } - + case replyUnknownException: case replyUnknownLocalException: case replyUnknownUserException: @@ -520,7 +520,7 @@ Outgoing::completed(BasicStream& is) // string unknown; _is.read(unknown, false); - + UnknownException* ex; switch(replyStatus) { @@ -529,19 +529,19 @@ Outgoing::completed(BasicStream& is) ex = new UnknownException(__FILE__, __LINE__); break; } - + case replyUnknownLocalException: { ex = new UnknownLocalException(__FILE__, __LINE__); break; } - + case replyUnknownUserException: { ex = new UnknownUserException(__FILE__, __LINE__); break; } - + default: { ex = 0; // To keep the compiler from complaining. @@ -549,14 +549,14 @@ Outgoing::completed(BasicStream& is) break; } } - + ex->unknown = unknown; _exception.reset(ex); _state = StateLocalException; // The state must be set last, in case there is an exception. break; } - + default: { _exception.reset(new UnknownReplyStatusException(__FILE__, __LINE__)); @@ -583,145 +583,94 @@ Outgoing::throwUserException() } } -FlushBatch::FlushBatch(IceProxy::Ice::Object* proxy, const string& operation) : - OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _connection(0) +ProxyFlushBatch::ProxyFlushBatch(IceProxy::Ice::Object* proxy, const string& operation) : + ProxyOutgoingBase(proxy, Ice::Normal) { - checkSupportedProtocol(proxy->__reference()->getProtocol()); + checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); + _observer.attach(proxy, operation, 0); - _observer.attach(proxy->__reference()->getInstance().get(), operation); + _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os); } -FlushBatch::FlushBatch(ConnectionI* connection, Instance* instance, const string& operation) : - OutgoingBase(instance, operation), _proxy(0), _connection(connection) +bool +ProxyFlushBatch::invokeRemote(const Ice::ConnectionIPtr& connection, bool compress, bool response) { - _observer.attach(instance, operation); + return connection->sendRequest(this, compress, response, _batchRequestNum); } void -FlushBatch::invoke() +ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler) { - assert(_proxy || _connection); + handler->invokeRequest(this, _batchRequestNum); +} - if(_connection) +void +ProxyFlushBatch::invoke() +{ + if(_batchRequestNum == 0) { - if(_connection->flushBatchRequests(this)) - { - return; - } - - Monitor<Mutex>::Lock sync(_monitor); - while(!_exception.get() && !_sent) - { - _monitor.wait(); - } - if(_exception.get()) - { - _exception->ice_throw(); - } - return; + sent(); } + else + { + invokeImpl(); + } +} + +ConnectionFlushBatch::ConnectionFlushBatch(ConnectionI* connection, Instance* instance, const string& operation) : + OutgoingBase(instance), _connection(connection) +{ + _observer.attach(instance, operation); +} + +void +ConnectionFlushBatch::invoke() +{ + int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os); - RequestHandlerPtr handler; try { - handler = _proxy->__getRequestHandler(); - if(handler->sendRequest(this)) + if(batchRequestNum == 0) { - return; + sent(); } - - bool timedOut = false; + else if(!_connection->sendRequest(this, false, false, batchRequestNum)) { Monitor<Mutex>::Lock sync(_monitor); - int timeout = _proxy->__reference()->getInvocationTimeout(); - if(timeout > 0) - { - Time now = Time::now(Time::Monotonic); - Time deadline = now + Time::milliSeconds(timeout); - while(!_exception.get() && !_sent && !timedOut) - { - _monitor.timedWait(deadline - now); - if(!_exception.get() && !_sent) - { - now = Time::now(Time::Monotonic); - timedOut = now >= deadline; - } - } - } - else - { - while(!_exception.get() && !_sent) - { - _monitor.wait(); - } - } - } - - if(timedOut) - { - Ice::InvocationTimeoutException ex(__FILE__, __LINE__); - handler->requestCanceled(this, ex); - - // - // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestTimedOut got called. - // In this case, the exception should be set on the Outgoing. - // - Monitor<Mutex>::Lock sync(_monitor); while(!_exception.get() && !_sent) { _monitor.wait(); } - } - - if(_exception.get()) - { - _exception->ice_throw(); + if(_exception.get()) + { + _exception->ice_throw(); + } } } catch(const RetryException& ex) { - _proxy->__setRequestHandler(handler, 0); // Clear request handler - ex.get()->ice_throw(); // Throw to notify the user that batch requests were potentially lost. + ex.get()->ice_throw(); } - catch(const Ice::Exception& ex) - { - _proxy->__setRequestHandler(handler, 0); // Clear request handler - _observer.failed(ex.ice_name()); - throw; // Throw to notify the user that batch requests were potentially lost. - } -} - -bool -FlushBatch::send(const Ice::ConnectionIPtr& connection, bool, bool) -{ - return connection->flushBatchRequests(this); -} - -void -FlushBatch::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeBatchRequests(this); } void -FlushBatch::sent() +ConnectionFlushBatch::sent() { Monitor<Mutex>::Lock sync(_monitor); _childObserver.detach(); - + _sent = true; _monitor.notify(); // - // NOTE: At this point the stack allocated FlushBatch object - // can be destroyed since the notify() on the monitor will release - // the thread waiting on the synchronous Ice call. + // NOTE: At this point the stack allocated ConnectionFlushBatch + // object can be destroyed since the notify() on the monitor will + // release the thread waiting on the synchronous Ice call. // } void -FlushBatch::completed(const Ice::Exception& ex) +ConnectionFlushBatch::completed(const Ice::Exception& ex) { Monitor<Mutex>::Lock sync(_monitor); _childObserver.failed(ex.ice_name()); @@ -731,7 +680,13 @@ FlushBatch::completed(const Ice::Exception& ex) } void -FlushBatch::retryException(const Ice::Exception& ex) +ConnectionFlushBatch::completed(BasicStream& is) +{ + assert(false); +} + +void +ConnectionFlushBatch::retryException(const Ice::Exception& ex) { completed(ex); } |