diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 344 |
1 files changed, 281 insertions, 63 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 40cb5336c23..4286cc1949c 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -10,6 +10,7 @@ #include <Ice/OutgoingAsync.h> #include <Ice/Object.h> #include <Ice/ConnectionI.h> +#include <Ice/RequestHandler.h> #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/LocalException.h> @@ -27,9 +28,12 @@ using namespace std; using namespace Ice; using namespace IceInternal; +IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } +IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_invoke* p) { return p; } IceUtil::Shared* IceInternal::upCast(AMI_Array_Object_ice_invoke* p) { return p; } +IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_flushBatchRequests* p) { return p; } IceInternal::OutgoingAsync::OutgoingAsync() : __is(0), @@ -44,17 +48,52 @@ IceInternal::OutgoingAsync::~OutgoingAsync() } void +IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _sent = true; + + if(!_proxy->ice_isTwoway()) + { + cleanup(); // No response expected, we're done with the OutgoingAsync. + } + else if(_response) + { + _monitor.notifyAll(); // If the response was already received notify finished() which is waiting. + } + else if(connection->timeout() > 0) + { + assert(!_timerTaskConnection); + _timerTaskConnection = connection; + IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout()); + _proxy->__reference()->getInstance()->timer()->schedule(this, timeout); + } +} + +void IceInternal::OutgoingAsync::__finished(BasicStream& is) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); + assert(_proxy->ice_isTwoway()); // Can only be called for twoways. Ice::Byte replyStatus; - try { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + assert(__os); + _response = true; + + if(_timerTaskConnection && _proxy->__reference()->getInstance()->timer()->cancel(this)) + { + _timerTaskConnection = 0; // Timer cancelled. + } + + while(!_sent || _timerTaskConnection) + { + _monitor.wait(); + } + __is->swap(is); __is->read(replyStatus); - switch(replyStatus) { @@ -190,43 +229,65 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) warning(); } + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); cleanup(); } void IceInternal::OutgoingAsync::__finished(const LocalException& exc) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); - - if(__os) // Don't retry if cleanup() was already called. + bool retry = false; { - // - // A CloseConnectionException indicates graceful server - // shutdown, and is therefore always repeatable without - // violating "at-most-once". That's because by sending a close - // connection message, the server guarantees that all - // outstanding requests can safely be repeated. Otherwise, we - // can also retry if the operation mode is Nonmutating or - // Idempotent. - // - // An ObjectNotExistException can always be retried as - // well without violating "at-most-once". - // - if(_mode == Nonmutating || _mode == Idempotent || dynamic_cast<const CloseConnectionException*>(&exc) || - dynamic_cast<const ObjectNotExistException*>(&exc)) + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + + if(__os) // Might be called from __prepare or before __prepare { - try + if(_timerTaskConnection && _proxy->__reference()->getInstance()->timer()->cancel(this)) { - _proxy->__handleException(_delegate, exc, _cnt); - __send(); - return; + _timerTaskConnection = 0; // Timer cancelled. } - catch(const LocalException&) + + while(_timerTaskConnection) + { + _monitor.wait(); + } + + // + // A CloseConnectionException indicates graceful server + // shutdown, and is therefore always repeatable without + // violating "at-most-once". That's because by sending a close + // connection message, the server guarantees that all + // outstanding requests can safely be repeated. Otherwise, we + // can also retry if the operation mode is Nonmutating or + // Idempotent. + // + // An ObjectNotExistException can always be retried as + // well without violating "at-most-once". + // + if(!_sent || + _mode == Nonmutating || _mode == Idempotent || + dynamic_cast<const CloseConnectionException*>(&exc) || + dynamic_cast<const ObjectNotExistException*>(&exc)) { + retry = true; } } } + if(retry) + { + try + { + _proxy->__handleException(_delegate, exc, _cnt); + __send(); + return; + } + catch(const LocalException&) + { + } + } + try { ice_exception(exc); @@ -240,14 +301,58 @@ IceInternal::OutgoingAsync::__finished(const LocalException& exc) warning(); } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); cleanup(); } void +IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& ex) +{ + // + // NOTE: This is called if sendRequest/sendAsyncRequest fails with + // a LocalExceptionWrapper exception. It's not possible for the + // timer to be set at this point because the request couldn't be + // sent. + // + assert(!_sent && !_timerTaskConnection); + + try + { + if(_mode == Nonmutating || _mode == Idempotent) + { + _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, _cnt); + } + else + { + _proxy->__handleExceptionWrapper(_delegate, ex); + } + __send(); + } + catch(const Ice::LocalException& exc) + { + try + { + ice_exception(exc); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + cleanup(); + } +} + +void IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode, const Context* context) { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); try { @@ -258,16 +363,20 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat { _monitor.wait(); } - + // - // Can't call async via a oneway proxy. + // Can't call async via a batch proxy. // - prx->__checkTwowayOnly(operation); - _proxy = prx; + if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) + { + throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI"); + } _delegate = 0; _cnt = 0; _mode = mode; + _sent = false; + _response = false; ReferencePtr ref = _proxy->__reference(); assert(!__is); @@ -308,11 +417,8 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat // // Implicit context // - const ImplicitContextIPtr& implicitContext = - ref->getInstance()->getImplicitContext(); - + const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext(); const Context& prxContext = ref->getContext()->getValue(); - if(implicitContext == 0) { __writeContext(__os, prxContext); @@ -335,41 +441,45 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat void IceInternal::OutgoingAsync::__send() { - IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor); - + // + // NOTE: no synchronization needed. At this point, no other threads can be calling on this object. + // + RequestHandler* handler; try { - while(true) + _delegate = _proxy->__getDelegate(true); + handler = _delegate->__getRequestHandler().get(); + } + catch(const Ice::LocalException& ex) + { + __finished(ex); + return; + } + + _sent = false; + _response = false; + handler->sendAsyncRequest(this); +} + +void +IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTimerTask() +{ + Ice::ConnectionIPtr connection; + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + assert(_timerTaskConnection && _sent); // Can only be set once the request is sent. + + if(!_response) // If the response was just received, don't close the connection. { - bool compress; - _delegate = _proxy->__getDelegate(); - Ice::ConnectionIPtr connection = _delegate->__getConnection(compress); - try - { - connection->sendAsyncRequest(__os, this, compress); - - // - // Don't do anything after sendAsyncRequest() returned - // without an exception. I such case, there will be - // callbacks, i.e., calls to the __finished() - // functions. Since there is no mutex protection, we - // cannot modify state here and in such callbacks. - // - return; - } - catch(const LocalExceptionWrapper& ex) - { - _proxy->__handleExceptionWrapper(_delegate, ex); - } - catch(const LocalException& ex) - { - _proxy->__handleException(_delegate, ex, _cnt); - } + connection = _timerTaskConnection; } + _timerTaskConnection = 0; + _monitor.notifyAll(); } - catch(const LocalException& ex) + + if(connection) { - __finished(ex); + connection->exception(Ice::TimeoutException(__FILE__, __LINE__)); } } @@ -414,6 +524,8 @@ IceInternal::OutgoingAsync::warning() const void IceInternal::OutgoingAsync::cleanup() { + assert(!_timerTaskConnection); + delete __is; __is = 0; delete __os; @@ -422,6 +534,91 @@ IceInternal::OutgoingAsync::cleanup() _monitor.notify(); } +IceInternal::BatchOutgoingAsync::BatchOutgoingAsync() : _os(0) +{ +} + +void +IceInternal::BatchOutgoingAsync::__prepare(const InstancePtr& instance) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(_os) + { + _monitor.wait(); + } + _os = new BasicStream(instance.get()); +} + +void +IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + cleanup(); +} + +void +IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc) +{ + try + { + ice_exception(exc); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + cleanup(); +} + +void +IceInternal::BatchOutgoingAsync::warning(const std::exception& exc) const +{ + if(_os) // Don't print anything if cleanup() was already called. + { + if(_os->instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Warning out(_os->instance()->initializationData().logger); + const Exception* ex = dynamic_cast<const ObjectNotExistException*>(&exc); + if(ex) + { + out << "Ice::Exception raised by AMI callback:\n" << ex; + } + else + { + out << "std::exception raised by AMI callback:\n" << exc.what(); + } + } + } +} + +void +IceInternal::BatchOutgoingAsync::warning() const +{ + if(_os) // Don't print anything if cleanup() was already called. + { + if(_os->instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Warning out(_os->instance()->initializationData().logger); + out << "unknown exception raised by AMI callback"; + } + } +} + +void +IceInternal::BatchOutgoingAsync::cleanup() +{ + delete _os; + _os = 0; + + _monitor.notify(); +} + void Ice::AMI_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode, const vector<Byte>& inParams, const Context* context) @@ -492,3 +689,24 @@ Ice::AMI_Array_Object_ice_invoke::__response(bool ok) // ok == true means no use } ice_response(ok, outParams); } + +void +Ice::AMI_Object_ice_flushBatchRequests::__invoke(const ObjectPrx& prx) +{ + Handle< ::IceDelegate::Ice::Object> delegate; + RequestHandler* handler; + try + { + __prepare(prx->__reference()->getInstance()); + delegate = prx->__getDelegate(true); + handler = delegate->__getRequestHandler().get(); + } + catch(const Ice::LocalException& ex) + { + __finished(ex); + return; + } + + handler->flushAsyncBatchRequests(this); +} + |