diff options
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r-- | cpp/src/Ice/Outgoing.cpp | 211 |
1 files changed, 143 insertions, 68 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 8eedeffe6a1..14fb41c7afd 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -9,6 +9,7 @@ #include <Ice/Outgoing.h> #include <Ice/Object.h> +#include <Ice/RequestHandler.h> #include <Ice/ConnectionI.h> #include <Ice/Reference.h> #include <Ice/Endpoint.h> @@ -78,16 +79,15 @@ IceInternal::LocalExceptionWrapper::retry() const return _retry; } -IceInternal::Outgoing::Outgoing(ConnectionI* connection, Reference* ref, const string& operation, - OperationMode mode, const Context* context, bool compress) : - _connection(connection), - _reference(ref), +IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation, OperationMode mode, + const Context* context) : + _handler(handler), _state(StateUnsent), - _is(ref->getInstance().get()), - _os(ref->getInstance().get()), - _compress(compress) + _is(handler->getReference()->getInstance().get()), + _os(handler->getReference()->getInstance().get()), + _sent(false) { - switch(_reference->getMode()) + switch(_handler->getReference()->getMode()) { case Reference::ModeTwoway: case Reference::ModeOneway: @@ -100,25 +100,25 @@ IceInternal::Outgoing::Outgoing(ConnectionI* connection, Reference* ref, const s case Reference::ModeBatchOneway: case Reference::ModeBatchDatagram: { - _connection->prepareBatchRequest(&_os); + _handler->prepareBatchRequest(&_os); break; } } try { - _reference->getIdentity().__write(&_os); + _handler->getReference()->getIdentity().__write(&_os); // // For compatibility with the old FacetPath. // - if(_reference->getFacet().empty()) + if(_handler->getReference()->getFacet().empty()) { _os.write(static_cast<string*>(0), static_cast<string*>(0)); } else { - string facet = _reference->getFacet(); + string facet = _handler->getReference()->getFacet(); _os.write(&facet, &facet + 1); } @@ -138,11 +138,8 @@ IceInternal::Outgoing::Outgoing(ConnectionI* connection, Reference* ref, const s // // Implicit context // - const ImplicitContextIPtr& implicitContext = - _reference->getInstance()->getImplicitContext(); - - const Context& prxContext = _reference->getContext()->getValue(); - + const ImplicitContextIPtr& implicitContext = _handler->getReference()->getInstance()->getImplicitContext(); + const Context& prxContext = _handler->getReference()->getContext()->getValue(); if(implicitContext == 0) { __writeContext(&_os, prxContext); @@ -173,42 +170,33 @@ IceInternal::Outgoing::invoke() _os.endWriteEncaps(); - switch(_reference->getMode()) + switch(_handler->getReference()->getMode()) { case Reference::ModeTwoway: { - // - // We let all exceptions raised by sending directly - // propagate to the caller, because they can be retried - // without violating "at-most-once". In case of such - // exceptions, the connection object does not call back on - // this object, so we don't need to lock the mutex, keep - // track of state, or save exceptions. - // - _connection->sendRequest(&_os, this, _compress); - - // - // Wait until the request has completed, or until the - // request times out. - // + _state = StateInProgress; - bool timedOut = false; + Ice::ConnectionI* connection = _handler->sendRequest(this); + bool timedOut = false; + { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); // - // It's possible that the request has already - // completed, due to a regular response, or because of - // an exception. So we only change the state to "in - // progress" if it is still "unsent". + // If the request is being sent in the background we first wait for the + // sent notification. // - if(_state == StateUnsent) + while(_state != StateFailed && !_sent) { - _state = StateInProgress; + _monitor.wait(); } - - Int timeout = _connection->timeout(); + + // + // Wait until the request has completed, or until the request times out. + // + + Int timeout = connection->timeout(); while(_state == StateInProgress && !timedOut) { if(timeout >= 0) @@ -233,7 +221,7 @@ IceInternal::Outgoing::invoke() // Must be called outside the synchronization of this // object. // - _connection->exception(TimeoutException(__FILE__, __LINE__)); + connection->exception(TimeoutException(__FILE__, __LINE__)); // // We must wait until the exception set above has @@ -241,7 +229,6 @@ IceInternal::Outgoing::invoke() // { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(_state == StateInProgress) { _monitor.wait(); @@ -262,7 +249,8 @@ IceInternal::Outgoing::invoke() // An ObjectNotExistException can always be retried as // well without violating "at-most-once". // - if(dynamic_cast<CloseConnectionException*>(_exception.get()) || + if(!_sent || + dynamic_cast<CloseConnectionException*>(_exception.get()) || dynamic_cast<ObjectNotExistException*>(_exception.get())) { _exception->ice_throw(); @@ -281,25 +269,35 @@ IceInternal::Outgoing::invoke() { return false; } - - assert(_state == StateOK); - break; + else + { + assert(_state == StateOK); + return true; + } } - + case Reference::ModeOneway: case Reference::ModeDatagram: { - // - // For oneway and datagram requests, the connection object - // never calls back on this object. Therefore we don't - // need to lock the mutex or save exceptions. We simply - // let all exceptions from sending propagate to the - // caller, because such exceptions can be retried without - // violating "at-most-once". - // _state = StateInProgress; - _connection->sendRequest(&_os, 0, _compress); - break; + if(_handler->sendRequest(this)) + { + // + // If the handler returns the connection, we must wait for the sent callback. + // + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(_state != StateFailed && !_sent) + { + _monitor.wait(); + } + + if(_exception.get()) + { + assert(!_sent); + _exception->ice_throw(); + } + } + return true; } case Reference::ModeBatchOneway: @@ -311,12 +309,13 @@ IceInternal::Outgoing::invoke() // apply. // _state = StateInProgress; - _connection->finishBatchRequest(&_os, _compress); - break; + _handler->finishBatchRequest(&_os); + return true; } } - return true; + assert(false); + return false; } void @@ -329,9 +328,10 @@ IceInternal::Outgoing::abort(const LocalException& ex) // notify the connection about that we give up ownership of the // batch stream. // - if(_reference->getMode() == Reference::ModeBatchOneway || _reference->getMode() == Reference::ModeBatchDatagram) + if(_handler->getReference()->getMode() == Reference::ModeBatchOneway || + _handler->getReference()->getMode() == Reference::ModeBatchDatagram) { - _connection->abortBatchRequest(); + _handler->abortBatchRequest(); // // If we abort a batch requests, we cannot retry, because not @@ -345,11 +345,30 @@ IceInternal::Outgoing::abort(const LocalException& ex) } void +IceInternal::Outgoing::sent(bool notify) +{ + if(notify) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _sent = true; + _monitor.notify(); + } + else + { + // + // No synchronization is necessary if called from sendRequest() because the connection + // send mutex is locked and no other threads can call on Outgoing until it's released. + // + _sent = true; + } +} + +void IceInternal::Outgoing::finished(BasicStream& is) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(_reference->getMode() == Reference::ModeTwoway); // Can only be called for twoways. + assert(_handler->getReference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways. assert(_state <= StateInProgress); @@ -514,12 +533,68 @@ void IceInternal::Outgoing::finished(const LocalException& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - - assert(_reference->getMode() == Reference::ModeTwoway); // Can only be called for twoways. - assert(_state <= StateInProgress); - _state = StateLocalException; + _state = StateFailed; + _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); + _monitor.notify(); +} + +IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler) : + _handler(handler), + _connection(0), + _sent(false), + _os(handler->getReference()->getInstance().get()) +{ +} + +IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance) : + _handler(0), + _connection(connection), + _sent(false), + _os(instance) +{ +} + +void +IceInternal::BatchOutgoing::invoke() +{ + assert(_handler || _connection); + if(_handler && !_handler->flushBatchRequests(this) || _connection && !_connection->flushBatchRequests(this)) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!_exception.get() && !_sent) + { + _monitor.wait(); + } + + if(_exception.get()) + { + assert(!_sent); + _exception->ice_throw(); + } + } +} + +void +IceInternal::BatchOutgoing::sent(bool notify) +{ + if(notify) + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _sent = true; + _monitor.notify(); + } + else + { + _sent = true; + } +} + +void +IceInternal::BatchOutgoing::finished(const Ice::LocalException& ex) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone())); _monitor.notify(); } |