diff options
author | ZeroC Staff <git@zeroc.com> | 2007-12-12 21:01:52 -0500 |
---|---|---|
committer | ZeroC Staff <git@zeroc.com> | 2007-12-12 21:01:52 -0500 |
commit | 883a047970693d63716aade9bd94f38c75012c7c (patch) | |
tree | 28f414393cc199bd852d618bcaf4702dd380759b /cpp/src/Ice/OutgoingAsync.cpp | |
parent | Fixed VC build (diff) | |
parent | Fixed bug 2592 (diff) | |
download | ice-883a047970693d63716aade9bd94f38c75012c7c.tar.bz2 ice-883a047970693d63716aade9bd94f38c75012c7c.tar.xz ice-883a047970693d63716aade9bd94f38c75012c7c.zip |
Merge branch 'master' of bernard@cvs.zeroc.com:/home/git/ice
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 645 |
1 files changed, 324 insertions, 321 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 4286cc1949c..1a1fe2beda5 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -23,6 +23,7 @@ #include <Ice/Protocol.h> #include <Ice/ReplyStatus.h> #include <Ice/ImplicitContextI.h> +#include <Ice/ThreadPool.h> using namespace std; using namespace Ice; @@ -35,38 +36,178 @@ IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_invoke* p) { return p; } IceUtil::Shared* IceInternal::upCast(AMI_Array_Object_ice_invoke* p) { return p; } IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_flushBatchRequests* p) { return p; } -IceInternal::OutgoingAsync::OutgoingAsync() : + +namespace +{ + +class CallException : public ThreadPoolWorkItem +{ +public: + + CallException(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::LocalException& ex) : + _outAsync(outAsync), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone())) + { + } + + virtual void + execute(const ThreadPoolPtr& threadPool) + { + threadPool->promoteFollower(); + _outAsync->__exception(*_exception.get()); + } + +private: + + const OutgoingAsyncMessageCallbackPtr _outAsync; + const auto_ptr<Ice::LocalException> _exception; +}; + +}; + +IceInternal::OutgoingAsyncMessageCallback::OutgoingAsyncMessageCallback() : __is(0), __os(0) { } -IceInternal::OutgoingAsync::~OutgoingAsync() +IceInternal::OutgoingAsyncMessageCallback::~OutgoingAsyncMessageCallback() { assert(!__is); assert(!__os); } void +IceInternal::OutgoingAsyncMessageCallback::__exception(const Ice::Exception& exc) +{ + try + { + ice_exception(exc); + } + catch(const std::exception& ex) + { + __warning(ex); + } + catch(...) + { + __warning(); + } + + __release(); +} + +void +IceInternal::OutgoingAsyncMessageCallback::__acquire(const Ice::ObjectPrx& proxy) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); + + // + // We must first wait for other requests to finish. + // + while(__os) + { + __monitor.wait(); + } + + Instance* instance = proxy->__reference()->getInstance().get(); + assert(!__is); + __is = new BasicStream(instance); + assert(!__os); + __os = new BasicStream(instance); +} + +void +IceInternal::OutgoingAsyncMessageCallback::__release(const Ice::LocalException& exc) +{ + assert(__os); + + // + // This is called by the invoking thread to release the callback following a direct + // failure to marhsall/send the request. We call the ice_exception() callback with + // the thread pool to avoid potential deadlocks in case the invoking thread locked + // some mutexes/resources (which couldn't be re-acquired by the callback). + // + + try + { + __os->instance()->clientThreadPool()->execute(new CallException(this, exc)); + } + catch(const Ice::CommunicatorDestroyedException&) + { + __release(); + throw; // CommunicatorDestroyedException is the only exception that can propagate directly. + } +} + +void +IceInternal::OutgoingAsyncMessageCallback::__releaseNoSync() +{ + assert(__is); + delete __is; + __is = 0; + + assert(__os); + delete __os; + __os = 0; + + __monitor.notify(); +} + +void +IceInternal::OutgoingAsyncMessageCallback::__warning(const std::exception& exc) const +{ + if(__os) // Don't print anything if release() was already called. + { + InstancePtr instance = __os->instance(); + if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Warning out(instance->initializationData().logger); + const Exception* ex = dynamic_cast<const ObjectNotExistException*>(&exc); + if(ex) + { + out << "Ice::Exception raised by AMI callback:\n" << ex; + } + else + { + out << "std::exception raised by AMI callback:\n" << exc.what(); + } + } + } +} + +void +IceInternal::OutgoingAsyncMessageCallback::__warning() const +{ + if(__os) // Don't print anything if release() was already called. + { + InstancePtr instance = __os->instance(); + if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Warning out(instance->initializationData().logger); + out << "unknown exception raised by AMI callback"; + } + } +} + +void IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); _sent = true; if(!_proxy->ice_isTwoway()) { - cleanup(); // No response expected, we're done with the OutgoingAsync. + __releaseNoSync(); // No response expected, we're done with the OutgoingAsync. } else if(_response) { - _monitor.notifyAll(); // If the response was already received notify finished() which is waiting. + __monitor.notifyAll(); // If the response was already received notify finished() which is waiting. } else if(connection->timeout() > 0) { - assert(!_timerTaskConnection); + assert(!_timerTaskConnection && __os); _timerTaskConnection = connection; IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout()); - _proxy->__reference()->getInstance()->timer()->schedule(this, timeout); + __os->instance()->timer()->schedule(this, timeout); } } @@ -78,18 +219,18 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) Ice::Byte replyStatus; try { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); assert(__os); _response = true; - if(_timerTaskConnection && _proxy->__reference()->getInstance()->timer()->cancel(this)) + if(_timerTaskConnection && __os->instance()->timer()->cancel(this)) { _timerTaskConnection = 0; // Timer cancelled. } while(!_sent || _timerTaskConnection) { - _monitor.wait(); + __monitor.wait(); } __is->swap(is); @@ -222,419 +363,271 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) } catch(const std::exception& ex) { - warning(ex); + __warning(ex); + __release(); } catch(...) { - warning(); + __warning(); + __release(); } - - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - cleanup(); } void -IceInternal::OutgoingAsync::__finished(const LocalException& exc) +IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc) { - bool retry = false; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - - if(__os) // Might be called from __prepare or before __prepare - { - if(_timerTaskConnection && _proxy->__reference()->getInstance()->timer()->cancel(this)) - { - _timerTaskConnection = 0; // Timer cancelled. - } - - while(_timerTaskConnection) - { - _monitor.wait(); - } - - // - // A CloseConnectionException indicates graceful server - // shutdown, and is therefore always repeatable without - // violating "at-most-once". That's because by sending a close - // connection message, the server guarantees that all - // outstanding requests can safely be repeated. Otherwise, we - // can also retry if the operation mode is Nonmutating or - // Idempotent. - // - // An ObjectNotExistException can always be retried as - // well without violating "at-most-once". - // - if(!_sent || - _mode == Nonmutating || _mode == Idempotent || - dynamic_cast<const CloseConnectionException*>(&exc) || - dynamic_cast<const ObjectNotExistException*>(&exc)) - { - retry = true; - } - } - } - - if(retry) { - try + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); + assert(__os); + + if(_timerTaskConnection && __os->instance()->timer()->cancel(this)) { - _proxy->__handleException(_delegate, exc, _cnt); - __send(); - return; + _timerTaskConnection = 0; // Timer cancelled. } - catch(const LocalException&) + + while(_timerTaskConnection) { + __monitor.wait(); } } + + // + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. + // try { - ice_exception(exc); - } - catch(const std::exception& ex) - { - warning(ex); + handleException(exc); // This will throw if the invocation can't be retried. + __send(); } - catch(...) + catch(const Ice::LocalException& ex) { - warning(); + __exception(ex); } - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - cleanup(); } void IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& ex) { + assert(__os && !_sent); + // - // NOTE: This is called if sendRequest/sendAsyncRequest fails with - // a LocalExceptionWrapper exception. It's not possible for the - // timer to be set at this point because the request couldn't be - // sent. + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. The LocalExceptionWrapper exception is only called + // before the invocation is sent. // - assert(!_sent && !_timerTaskConnection); try { - if(_mode == Nonmutating || _mode == Idempotent) - { - _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); - } - else - { - _proxy->__handleExceptionWrapper(_delegate, ex); - } + handleException(ex); // This will throw if the invocation can't be retried. __send(); } - catch(const Ice::LocalException& exc) + catch(const Ice::LocalException& ex) { - try - { - ice_exception(exc); - } - catch(const std::exception& ex) - { - warning(ex); - } - catch(...) - { - warning(); - } - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - cleanup(); + __exception(ex); } } void -IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode, +IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode, const Context* context) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _proxy = prx; + _delegate = 0; + _cnt = 0; + _mode = mode; - try + // + // Can't call async via a batch proxy. + // + if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) { - // - // We must first wait for other requests to finish. - // - while(__os) - { - _monitor.wait(); - } - - // - // Can't call async via a batch proxy. - // - _proxy = prx; - if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) - { - throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI"); - } - _delegate = 0; - _cnt = 0; - _mode = mode; - _sent = false; - _response = false; - - ReferencePtr ref = _proxy->__reference(); - assert(!__is); - __is = new BasicStream(ref->getInstance().get()); - assert(!__os); - __os = new BasicStream(ref->getInstance().get()); - - __os->writeBlob(requestHdr, sizeof(requestHdr)); - - ref->getIdentity().__write(__os); - - // - // For compatibility with the old FacetPath. - // - if(ref->getFacet().empty()) - { - __os->write(static_cast<string*>(0), static_cast<string*>(0)); - } - else - { - string facet = ref->getFacet(); - __os->write(&facet, &facet + 1); - } + throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI"); + } - __os->write(operation, false); + __os->writeBlob(requestHdr, sizeof(requestHdr)); - __os->write(static_cast<Byte>(_mode)); + Reference* ref = _proxy->__reference().get(); - if(context != 0) - { - // - // Explicit context - // - __writeContext(__os, *context); - } - else - { - // - // Implicit context - // - const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext(); - const Context& prxContext = ref->getContext()->getValue(); - if(implicitContext == 0) - { - __writeContext(__os, prxContext); - } - else - { - implicitContext->write(prxContext, __os); - } - } - - __os->startWriteEncaps(); - } - catch(const LocalException& ex) - { - cleanup(); - ex.ice_throw(); - } -} + ref->getIdentity().__write(__os); -void -IceInternal::OutgoingAsync::__send() -{ // - // NOTE: no synchronization needed. At this point, no other threads can be calling on this object. + // For compatibility with the old FacetPath. // - RequestHandler* handler; - try + if(ref->getFacet().empty()) { - _delegate = _proxy->__getDelegate(true); - handler = _delegate->__getRequestHandler().get(); + __os->write(static_cast<string*>(0), static_cast<string*>(0)); } - catch(const Ice::LocalException& ex) + else { - __finished(ex); - return; + string facet = ref->getFacet(); + __os->write(&facet, &facet + 1); } - _sent = false; - _response = false; - handler->sendAsyncRequest(this); -} - -void -IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTimerTask() -{ - Ice::ConnectionIPtr connection; - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(_timerTaskConnection && _sent); // Can only be set once the request is sent. + __os->write(operation, false); - if(!_response) // If the response was just received, don't close the connection. - { - connection = _timerTaskConnection; - } - _timerTaskConnection = 0; - _monitor.notifyAll(); - } + __os->write(static_cast<Byte>(_mode)); - if(connection) + if(context != 0) { - connection->exception(Ice::TimeoutException(__FILE__, __LINE__)); + // + // Explicit context + // + __writeContext(__os, *context); } -} - -void -IceInternal::OutgoingAsync::warning(const std::exception& exc) const -{ - if(__os) // Don't print anything if cleanup() was already called. + else { - ReferencePtr ref = _proxy->__reference(); - if(ref->getInstance()->initializationData().properties-> - getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + // + // Implicit context + // + const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext(); + const Context& prxContext = ref->getContext()->getValue(); + if(implicitContext == 0) { - Warning out(ref->getInstance()->initializationData().logger); - const Exception* ex = dynamic_cast<const ObjectNotExistException*>(&exc); - if(ex) - { - out << "Ice::Exception raised by AMI callback:\n" << ex; - } - else - { - out << "std::exception raised by AMI callback:\n" << exc.what(); - } + __writeContext(__os, prxContext); + } + else + { + implicitContext->write(prxContext, __os); } } + + __os->startWriteEncaps(); } void -IceInternal::OutgoingAsync::warning() const +IceInternal::OutgoingAsync::__send() { - if(__os) // Don't print anything if cleanup() was already called. + while(true) { - ReferencePtr ref = _proxy->__reference(); - if(ref->getInstance()->initializationData().properties-> - getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + try { - Warning out(ref->getInstance()->initializationData().logger); - out << "unknown exception raised by AMI callback"; + _sent = false; + _response = false; + _delegate = _proxy->__getDelegate(true); + _delegate->__getRequestHandler()->sendAsyncRequest(this); + return; + } + catch(const LocalExceptionWrapper& ex) + { + handleException(ex); + } + catch(const Ice::LocalException& ex) + { + handleException(ex); } } } void -IceInternal::OutgoingAsync::cleanup() -{ - assert(!_timerTaskConnection); - - delete __is; - __is = 0; - delete __os; - __os = 0; - - _monitor.notify(); -} - -IceInternal::BatchOutgoingAsync::BatchOutgoingAsync() : _os(0) -{ -} - -void -IceInternal::BatchOutgoingAsync::__prepare(const InstancePtr& instance) +IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(_os) + if(_mode == Nonmutating || _mode == Idempotent) { - _monitor.wait(); + _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); + } + else + { + _proxy->__handleExceptionWrapper(_delegate, ex); } - _os = new BasicStream(instance.get()); -} - -void -IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - cleanup(); } void -IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc) +IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc) { try { - ice_exception(exc); + // + // A CloseConnectionException indicates graceful server shutdown, and is therefore + // always repeatable without violating "at-most-once". That's because by sending a + // close connection message, the server guarantees that all outstanding requests + // can safely be repeated. + // + // An ObjectNotExistException can always be retried as well without violating + // "at-most-once". + // + if(!_sent || + dynamic_cast<const CloseConnectionException*>(&exc) || + dynamic_cast<const ObjectNotExistException*>(&exc)) + { + exc.ice_throw(); + } + + // + // Throw the exception wrapped in a LocalExceptionWrapper, to indicate that the + // request cannot be resent without potentially violating the "at-most-once" + // principle. + // + throw LocalExceptionWrapper(exc, false); } - catch(const std::exception& ex) + catch(const LocalExceptionWrapper& ex) { - warning(ex); + if(_mode == Nonmutating || _mode == Idempotent) + { + _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); + } + else + { + _proxy->__handleExceptionWrapper(_delegate, ex); + } } - catch(...) + catch(const Ice::LocalException& ex) { - warning(); + _proxy->__handleException(_delegate, ex, _cnt); } - - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - cleanup(); } void -IceInternal::BatchOutgoingAsync::warning(const std::exception& exc) const +IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTimerTask() { - if(_os) // Don't print anything if cleanup() was already called. + Ice::ConnectionIPtr connection; { - if(_os->instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); + assert(_timerTaskConnection && _sent); // Can only be set once the request is sent. + + if(!_response) // If the response was just received, don't close the connection. { - Warning out(_os->instance()->initializationData().logger); - const Exception* ex = dynamic_cast<const ObjectNotExistException*>(&exc); - if(ex) - { - out << "Ice::Exception raised by AMI callback:\n" << ex; - } - else - { - out << "std::exception raised by AMI callback:\n" << exc.what(); - } + connection = _timerTaskConnection; } + _timerTaskConnection = 0; + __monitor.notifyAll(); } -} -void -IceInternal::BatchOutgoingAsync::warning() const -{ - if(_os) // Don't print anything if cleanup() was already called. + if(connection) { - if(_os->instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) - { - Warning out(_os->instance()->initializationData().logger); - out << "unknown exception raised by AMI callback"; - } + connection->exception(Ice::TimeoutException(__FILE__, __LINE__)); } } void -IceInternal::BatchOutgoingAsync::cleanup() +IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection) { - delete _os; - _os = 0; + __release(); +} - _monitor.notify(); +void +IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc) +{ + __exception(exc); } void Ice::AMI_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode, const vector<Byte>& inParams, const Context* context) { + __acquire(prx); try { __prepare(prx, operation, mode, context); __os->writeBlob(inParams); __os->endWriteEncaps(); + __send(); } - catch(const LocalException& ex) + catch(const Ice::LocalException& ex) { - __finished(ex); - return; + __release(ex); } - __send(); } void @@ -652,24 +645,25 @@ Ice::AMI_Object_ice_invoke::__response(bool ok) // ok == true means no user exce return; } ice_response(ok, outParams); + __release(); } void Ice::AMI_Array_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode, const pair<const Byte*, const Byte*>& inParams, const Context* context) { + __acquire(prx); try { __prepare(prx, operation, mode, context); __os->writeBlob(inParams.first, static_cast<Int>(inParams.second - inParams.first)); __os->endWriteEncaps(); + __send(); } - catch(const LocalException& ex) + catch(const Ice::LocalException& ex) { - __finished(ex); - return; + __release(ex); } - __send(); } void @@ -688,25 +682,34 @@ Ice::AMI_Array_Object_ice_invoke::__response(bool ok) // ok == true means no use return; } ice_response(ok, outParams); + __release(); } void Ice::AMI_Object_ice_flushBatchRequests::__invoke(const ObjectPrx& prx) { - Handle< ::IceDelegate::Ice::Object> delegate; - RequestHandler* handler; + __acquire(prx); try { - __prepare(prx->__reference()->getInstance()); - delegate = prx->__getDelegate(true); - handler = delegate->__getRequestHandler().get(); + // + // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch + // requests were queued with the connection, they would be lost without being noticed. + // + Handle< ::IceDelegate::Ice::Object> delegate; + int cnt = -1; // Don't retry. + try + { + delegate = prx->__getDelegate(true); + delegate->__getRequestHandler()->flushAsyncBatchRequests(this); + } + catch(const Ice::LocalException& ex) + { + prx->__handleException(delegate, ex, cnt); + } } catch(const Ice::LocalException& ex) { - __finished(ex); - return; + __release(ex); } - - handler->flushAsyncBatchRequests(this); } |