diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 914 |
1 files changed, 521 insertions, 393 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 2c6ad47d336..7fc5b8a080a 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -30,23 +30,26 @@ using namespace std; using namespace Ice; using namespace IceInternal; +IceUtil::Shared* IceInternal::upCast(AsyncResult* p) { return p; } + IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_invoke* p) { return p; } -IceUtil::Shared* IceInternal::upCast(AMI_Array_Object_ice_invoke* p) { return p; } -IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_flushBatchRequests* p) { return p; } +const unsigned char Ice::AsyncResult::OK = 0x1; +const unsigned char Ice::AsyncResult::Done = 0x2; +const unsigned char Ice::AsyncResult::Sent = 0x4; +const unsigned char Ice::AsyncResult::EndCalled = 0x8; namespace { -class CallException : public ThreadPoolWorkItem +class AsynchronousException : public ThreadPoolWorkItem { public: - CallException(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::LocalException& ex) : - _outAsync(outAsync), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone())) + AsynchronousException(const Ice::AsyncResultPtr& result, const Ice::Exception& ex) : + _result(result), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone())) { } @@ -54,142 +57,289 @@ public: execute(ThreadPoolCurrent& current) { current.ioCompleted(); - _outAsync->__exception(*_exception.get()); + _result->__exception(*_exception.get()); } private: - const OutgoingAsyncMessageCallbackPtr _outAsync; - const auto_ptr<Ice::LocalException> _exception; + const Ice::AsyncResultPtr _result; + const auto_ptr<Ice::Exception> _exception; }; }; -IceInternal::OutgoingAsyncMessageCallback::OutgoingAsyncMessageCallback() : - __is(0), - __os(0) +Ice::AsyncResult::AsyncResult(const IceInternal::InstancePtr& instance, + const string& op, + const CallbackBasePtr& del, + const LocalObjectPtr& cookie) : + _instance(instance), + _operation(op), + _callback(del), + _cookie(cookie), + _is(instance.get()), + _os(instance.get()), + _state(0), + _exception(0) +{ + if(!_callback) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__); + } + const_cast<CallbackBasePtr&>(_callback) = _callback->__verify(const_cast<LocalObjectPtr&>(_cookie)); +} + +Ice::AsyncResult::~AsyncResult() +{ +} + +bool +Ice::AsyncResult::operator==(const AsyncResult& r) const +{ + return this == &r; +} + +bool +Ice::AsyncResult::operator!=(const AsyncResult& r) const { + return !operator==(r); } -IceInternal::OutgoingAsyncMessageCallback::~OutgoingAsyncMessageCallback() +bool +Ice::AsyncResult::operator<(const AsyncResult& r) const { - assert(!__is); - assert(!__os); + return this < &r; +} + +Int +Ice::AsyncResult::getHash() const +{ + return static_cast<Int>(reinterpret_cast<Long>(this) >> 4); +} + +bool +Ice::AsyncResult::isCompleted() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + return _state & Done; +} + +void +Ice::AsyncResult::waitForCompleted() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!(_state & Done)) + { + _monitor.wait(); + } +} + +bool +Ice::AsyncResult::isSent() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + return _state & (Sent | Done); } void -IceInternal::OutgoingAsyncMessageCallback::__sentCallback(const InstancePtr& instance) +Ice::AsyncResult::waitForSent() { - try + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!(_state & (Sent | Done))) { - dynamic_cast<Ice::AMISentCallback*>(this)->ice_sent(); + _monitor.wait(); } - catch(const std::exception& ex) +} + +bool +Ice::AsyncResult::__wait() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_state & EndCalled) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once"); + } + _state |= EndCalled; + while(!(_state & Done)) { - __warning(instance, ex); + _monitor.wait(); } - catch(...) + if(_exception.get()) { - __warning(instance); + _exception.get()->ice_throw(); } + return _state & OK; } void -IceInternal::OutgoingAsyncMessageCallback::__exception(const Ice::Exception& exc) -{ +Ice::AsyncResult::__throwUserException() +{ try { - ice_exception(exc); + _is.startReadEncaps(); + _is.throwException(); } - catch(const std::exception& ex) - { - __warning(ex); - } - catch(...) + catch(const Ice::UserException&) { - __warning(); + _is.endReadEncaps(); + throw; } - - __releaseCallback(); } void -IceInternal::OutgoingAsyncMessageCallback::__acquireCallback(const Ice::ObjectPrx& proxy) +Ice::AsyncResult::__sent() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); - // - // We must first wait for other requests to finish. + // Note: no need to change the _state here, specializations are responsible for + // changing the state. // - while(__os) + + if(_callback) + { + try + { + AsyncResultPtr self(this); + _callback->__sent(self); + } + catch(const std::exception& ex) + { + __warning(ex); + } + catch(...) + { + __warning(); + } + } +} + +void +Ice::AsyncResult::__exception(const Ice::Exception& ex) +{ { - __monitor.wait(); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + _exception.reset(ex.ice_clone()); + _monitor.notifyAll(); } - Instance* instance = proxy->__reference()->getInstance().get(); - assert(!__os); - __os = new BasicStream(instance); + if(_callback) + { + try + { + AsyncResultPtr self(this); + _callback->__completed(self); + } + catch(const std::exception& ex) + { + __warning(ex); + } + catch(...) + { + __warning(); + } + } } void -IceInternal::OutgoingAsyncMessageCallback::__releaseCallback(const Ice::LocalException& exc) +Ice::AsyncResult::__exceptionAsync(const Ice::Exception& ex) { - assert(__os); - // - // This is called by the invoking thread to release the callback following a direct - // failure to marhsall/send the request. We call the ice_exception() callback with - // the thread pool to avoid potential deadlocks in case the invoking thread locked - // some mutexes/resources (which couldn't be re-acquired by the callback). + // This is called when it's not safe to call the exception callback synchronously + // from this thread. Instead the exception callback is called asynchronously from + // the client thread pool. // - try { - // - // COMPILERFIX: The following in done in two separate lines in order to work around - // bug in C++Builder 2009. - // - ThreadPoolPtr threadPool = __os->instance()->clientThreadPool(); - threadPool->execute(new CallException(this, exc)); + _instance->clientThreadPool()->execute(new AsynchronousException(this, ex)); } catch(const Ice::CommunicatorDestroyedException&) { - __releaseCallback(); throw; // CommunicatorDestroyedException is the only exception that can propagate directly. } } void -IceInternal::OutgoingAsyncMessageCallback::__releaseCallbackNoSync() +Ice::AsyncResult::__response() +{ + // + // Note: no need to change the _state here, specializations are responsible for + // changing the state. + // + + if(_callback) + { + try + { + AsyncResultPtr self(this); + _callback->__completed(self); + } + catch(const std::exception& ex) + { + __warning(ex); + } + catch(...) + { + __warning(); + } + } +} + +void +Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) { - if(__is) + __check(r, operation); + if(r->getProxy().get() != prx) { - delete __is; - __is = 0; + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation + + " does not match proxy that was used to call corresponding begin_" + + operation + " method"); } +} - assert(__os); - delete __os; - __os = 0; +void +Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation) +{ + __check(r, operation); + if(r->getCommunicator().get() != com) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation + + " does not match communicator that was used to call corresponding " + + "begin_" + operation + " method"); + } +} - __monitor.notify(); +void +Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation) +{ + __check(r, operation); + if(r->getConnection().get() != con) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation + + " does not match connection that was used to call corresponding " + + "begin_" + operation + " method"); + } } void -IceInternal::OutgoingAsyncMessageCallback::__warning(const std::exception& exc) const +Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation) { - if(__os) // Don't print anything if release() was already called. + if(!r) { - __warning(__os->instance(), exc); + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null"); + } + else if(&r->_operation != &operation) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation + + " method: " + r->_operation); } } + void -IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance, const std::exception& exc) const +Ice::AsyncResult::__warning(const std::exception& exc) const { - if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) { - Warning out(instance->initializationData().logger); + Warning out(_instance->initializationData().logger); const Exception* ex = dynamic_cast<const Exception*>(&exc); if(ex) { @@ -203,44 +353,191 @@ IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance } void -IceInternal::OutgoingAsyncMessageCallback::__warning() const +Ice::AsyncResult::__warning() const { - if(__os) // Don't print anything if release() was already called. + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) { - __warning(__os->instance()); + Warning out(_instance->initializationData().logger); + out << "unknown exception raised by AMI callback"; } } +IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, + const std::string& operation, + const CallbackBasePtr& delegate, + const Ice::LocalObjectPtr& cookie) : + AsyncResult(prx->__reference()->getInstance(), operation, delegate, cookie), + _proxy(prx) +{ +} + void -IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance) const +IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context) { - if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + _delegate = 0; + _cnt = 0; + _mode = mode; + _sentSynchronously = false; + + // + // Can't call async via a batch proxy. + // + if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) { - Warning out(instance->initializationData().logger); - out << "unknown exception raised by AMI callback"; + throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI"); } + + _os.writeBlob(requestHdr, sizeof(requestHdr)); + + Reference* ref = _proxy->__reference().get(); + + ref->getIdentity().__write(&_os); + + // + // For compatibility with the old FacetPath. + // + if(ref->getFacet().empty()) + { + _os.write(static_cast<string*>(0), static_cast<string*>(0)); + } + else + { + string facet = ref->getFacet(); + _os.write(&facet, &facet + 1); + } + + _os.write(operation, false); + + _os.write(static_cast<Byte>(_mode)); + + if(context != 0) + { + // + // Explicit context + // + __writeContext(&_os, *context); + } + else + { + // + // Implicit context + // + const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext(); + const Context& prxContext = ref->getContext()->getValue(); + if(implicitContext == 0) + { + __writeContext(&_os, prxContext); + } + else + { + implicitContext->write(prxContext, &_os); + } + } + + _os.startWriteEncaps(); } -void +bool IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); - _sent = true; - - if(!_proxy->ice_isTwoway()) + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + + bool alreadySent = _state & Sent; // Expected in case of a retry. + _state |= Sent; + + // + // It's possible for the request to be done already when using IOCP. This + // is the case for example if the send callback is dispatched after the + // read callback for the response/exception. + // + if(!(_state & Done)) { - __releaseCallbackNoSync(); // No response expected, we're done with the OutgoingAsync. + if(!_proxy->ice_isTwoway()) + { + _state |= Done | OK; + } + else if(connection->timeout() > 0) + { + assert(!_timerTaskConnection); + _timerTaskConnection = connection; + IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout()); + _instance->timer()->schedule(this, timeout); + } } - else if(_response) + _monitor.notifyAll(); + return !alreadySent && _callback && _callback->__hasSentCallback(); // Don't call the sent call is already sent. +} + +void +IceInternal::OutgoingAsync::__sent() +{ +#if defined(_MSC_VER) && (_MSC_VER < 1300) // VC++ 6 compiler bug + AsyncResult::__sent(); +#else + ::Ice::AsyncResult::__sent(); +#endif +} + +void +IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent) +{ { - __monitor.notifyAll(); // If the response was already received notify finished() which is waiting. + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + assert(!(_state & Done)); + if(_timerTaskConnection) + { + _instance->timer()->cancel(this); + _timerTaskConnection = 0; + } } - else if(connection->timeout() > 0) + + // + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. + // + + try + { + int interval = handleException(exc, sent); // This will throw if the invocation can't be retried. + if(interval > 0) + { + _instance->retryQueue()->add(this, interval); + } + else + { + __send(false); + } + } + catch(const Ice::LocalException& ex) + { + __exception(ex); + } +} + +void +IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) +{ + // + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. The LocalExceptionWrapper exception is only called + // before the invocation is sent. + // + + try { - assert(!_timerTaskConnection && __os); - _timerTaskConnection = connection; - IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout()); - __os->instance()->timer()->schedule(this, timeout); + int interval = handleException(exc); // This will throw if the invocation can't be retried. + if(interval > 0) + { + _instance->retryQueue()->add(this, interval); + } + else + { + __send(false); + } + } + catch(const Ice::LocalException& ex) + { + __exception(ex); } } @@ -252,23 +549,17 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) Ice::Byte replyStatus; try { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); - assert(__os); - _response = true; - - if(_timerTaskConnection && __os->instance()->timer()->cancel(this)) - { - _timerTaskConnection = 0; // Timer cancelled. - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + assert(!_exception.get() && !(_state & Done)); - while(!_sent || _timerTaskConnection) + if(_timerTaskConnection) { - __monitor.wait(); + _instance->timer()->cancel(this); + _timerTaskConnection = 0; } - __is = new BasicStream(__os->instance()); - __is->swap(is); - __is->read(replyStatus); + _is.swap(is); + _is.read(replyStatus); switch(replyStatus) { @@ -283,13 +574,13 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) case replyOperationNotExist: { Identity ident; - ident.__read(__is); + ident.__read(&_is); // // For compatibility with the old FacetPath. // vector<string> facetPath; - __is->read(facetPath); + _is.read(facetPath); string facet; if(!facetPath.empty()) { @@ -301,7 +592,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) } string operation; - __is->read(operation, false); + _is.read(operation, false); auto_ptr<RequestFailedException> ex; switch(replyStatus) @@ -342,7 +633,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) case replyUnknownUserException: { string unknown; - __is->read(unknown, false); + _is.read(unknown, false); auto_ptr<UnknownException> ex; switch(replyStatus) @@ -381,223 +672,73 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is) throw UnknownReplyStatusException(__FILE__, __LINE__); } } + + _state |= Done; + if(replyStatus == replyOK) + { + _state |= OK; + } + _monitor.notifyAll(); } catch(const LocalException& ex) { - __finished(ex); + __finished(ex, true); return; } assert(replyStatus == replyOK || replyStatus == replyUserException); - - try - { - __response(replyStatus == replyOK); - } - catch(const std::exception& ex) - { - __warning(ex); - __releaseCallback(); - } - catch(...) - { - __warning(); - __releaseCallback(); - } + __response(); } -void -IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc) +bool +IceInternal::OutgoingAsync::__send(bool synchronous) { + while(true) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); - assert(__os); - - if(_timerTaskConnection && __os->instance()->timer()->cancel(this)) + int interval = 0; + try { - _timerTaskConnection = 0; // Timer cancelled. + _delegate = _proxy->__getDelegate(true); + bool sent = _delegate->__getRequestHandler()->sendAsyncRequest(this); + if(synchronous) + { + _sentSynchronously = sent; + } + break; } - - while(_timerTaskConnection) + catch(const LocalExceptionWrapper& ex) { - __monitor.wait(); + interval = handleException(ex); } - } - - // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. - // - - try - { - handleException(exc); // This will throw if the invocation can't be retried. - } - catch(const Ice::LocalException& ex) - { - __exception(ex); - } -} - -void -IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc) -{ - assert(__os && !_sent); - - // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. The LocalExceptionWrapper exception is only called - // before the invocation is sent. - // - - try - { - handleException(exc); // This will throw if the invocation can't be retried. - } - catch(const Ice::LocalException& ex) - { - __exception(ex); - } -} - -void -IceInternal::OutgoingAsync::__retry(int interval) -{ - // - // This method is called by the proxy to retry an invocation, no - // other threads can access this object. - // - if(interval > 0) - { - assert(__os); - __os->instance()->retryQueue()->add(this, interval); - } - else - { - __send(); - } -} - -bool -IceInternal::OutgoingAsync::__send() -{ - try - { - _sent = false; - _response = false; - _delegate = _proxy->__getDelegate(true); - _sentSynchronously = _delegate->__getRequestHandler()->sendAsyncRequest(this); - } - catch(const LocalExceptionWrapper& ex) - { - handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously - } - catch(const Ice::LocalException& ex) - { - handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously - } - return _sentSynchronously; -} - -void -IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode, - const Context* context) -{ - _proxy = prx; - _delegate = 0; - _cnt = 0; - _mode = mode; - _sentSynchronously = false; - - // - // Can't call async via a batch proxy. - // - if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram()) - { - throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI"); - } - - __os->writeBlob(requestHdr, sizeof(requestHdr)); - - Reference* ref = _proxy->__reference().get(); - - ref->getIdentity().__write(__os); - - // - // For compatibility with the old FacetPath. - // - if(ref->getFacet().empty()) - { - __os->write(static_cast<string*>(0), static_cast<string*>(0)); - } - else - { - string facet = ref->getFacet(); - __os->write(&facet, &facet + 1); - } - - __os->write(operation, false); - - __os->write(static_cast<Byte>(_mode)); - - if(context != 0) - { - // - // Explicit context - // - __writeContext(__os, *context); - } - else - { - // - // Implicit context - // - const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext(); - const Context& prxContext = ref->getContext()->getValue(); - if(implicitContext == 0) + catch(const Ice::LocalException& ex) { - __writeContext(__os, prxContext); + interval = handleException(ex, false); } - else + + if(interval > 0) { - implicitContext->write(prxContext, __os); + _instance->retryQueue()->add(this, interval); + return false; } } - - __os->startWriteEncaps(); -} - -void -IceInternal::OutgoingAsync::__throwUserException() -{ - try - { - assert(__is); - __is->startReadEncaps(); - __is->throwException(); - } - catch(const Ice::UserException&) - { - __is->endReadEncaps(); - throw; - } + return _sentSynchronously; } -void +int IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex) { if(_mode == Nonmutating || _mode == Idempotent) { - _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt); + return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt); } else { - _proxy->__handleExceptionWrapper(_delegate, ex, this); + return _proxy->__handleExceptionWrapper(_delegate, ex); } } -void -IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc) +int +IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool sent) { try { @@ -611,7 +752,7 @@ IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc) // "at-most-once" (see the implementation of the checkRetryAfterException method of // the ProxyFactory class for the reasons why it can be useful). // - if(!_sent || + if(!sent || dynamic_cast<const CloseConnectionException*>(&exc) || dynamic_cast<const ObjectNotExistException*>(&exc)) { @@ -629,17 +770,18 @@ IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc) { if(_mode == Nonmutating || _mode == Idempotent) { - _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt); + return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt); } else { - _proxy->__handleExceptionWrapper(_delegate, ex, this); + return _proxy->__handleExceptionWrapper(_delegate, ex); } } catch(const Ice::LocalException& ex) { - _proxy->__handleException(_delegate, ex, this, _cnt); + return _proxy->__handleException(_delegate, ex, false, _cnt); } + return 0; // Keep the compiler happy. } void @@ -647,15 +789,9 @@ IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTi { Ice::ConnectionIPtr connection; { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor); - assert(_timerTaskConnection && _sent); // Can only be set once the request is sent. - - if(!_response) // If the response was just received, don't close the connection. - { - connection = _timerTaskConnection; - } + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + connection = _timerTaskConnection; _timerTaskConnection = 0; - __monitor.notifyAll(); } if(connection) @@ -664,122 +800,114 @@ IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTi } } -void +IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const InstancePtr& instance, + const std::string& operation, + const CallbackBasePtr& delegate, + const Ice::LocalObjectPtr& cookie) : + AsyncResult(instance, operation, delegate, cookie) +{ +} + +bool IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection) { - __releaseCallback(); + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + assert(!_exception.get()); + _state |= Done | OK | Sent; + _monitor.notifyAll(); + return _callback && _callback->__hasSentCallback(); } void -IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc) +IceInternal::BatchOutgoingAsync::__sent() +{ +#if defined(_MSC_VER) && (_MSC_VER < 1300) // VC++ 6 compiler bug + AsyncResult::__sent(); +#else + ::Ice::AsyncResult::__sent(); +#endif +} + +void +IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool) { __exception(exc); } -bool -Ice::AMI_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode, - const vector<Byte>& inParams, const Context* context) +IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy, + const std::string& operation, + const CallbackBasePtr& delegate, + const Ice::LocalObjectPtr& cookie) : + BatchOutgoingAsync(proxy->__reference()->getInstance(), operation, delegate, cookie), + _proxy(proxy) { - __acquireCallback(prx); - try - { - __prepare(prx, operation, mode, context); - __os->writeBlob(inParams); - __os->endWriteEncaps(); - return __send(); - } - catch(const Ice::LocalException& ex) - { - __releaseCallback(ex); - return false; - } } -void -Ice::AMI_Object_ice_invoke::__response(bool ok) // ok == true means no user exception. +namespace { - vector<Byte> outParams; - try + +// +// Dummy class derived from CallbackBase +// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn, +// this allows us to test whether the user supplied a null delegate instance to the +// generated begin_ method without having to generate a separate test to throw IllegalArgumentException +// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated +// object code. +// +class DummyCallback : public CallbackBase +{ +public: + + DummyCallback() { - __is->startReadEncaps(); - Int sz = __is->getReadEncapsSize(); - __is->readBlob(outParams, sz); - __is->endReadEncaps(); } - catch(const LocalException& ex) + + virtual void __completed(const Ice::AsyncResultPtr&) const { - __finished(ex); - return; + assert(false); } - ice_response(ok, outParams); - __releaseCallback(); -} -bool -Ice::AMI_Array_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode, - const pair<const Byte*, const Byte*>& inParams, const Context* context) -{ - __acquireCallback(prx); - try + virtual CallbackBasePtr __verify(Ice::LocalObjectPtr&) { - __prepare(prx, operation, mode, context); - __os->writeBlob(inParams.first, static_cast<Int>(inParams.second - inParams.first)); - __os->endWriteEncaps(); - return __send(); + // + // Called by the AsyncResult constructor to verify the delegate. The dummy + // delegate is passed when the user used a begin_ method without delegate. + // By returning 0 here, we tell the AsyncResult that no delegates was + // provided. + // + return 0; } - catch(const Ice::LocalException& ex) + + virtual void __sent(const AsyncResultPtr&) const { - __releaseCallback(ex); + assert(false); + } + + virtual bool __hasSentCallback() const + { + assert(false); return false; } +}; + } +// +// This gives a pointer value to compare against in the generated +// begin_ method to decide whether the caller passed a null pointer +// versus the generated inline version of the begin_ method having +// passed a pointer to the dummy delegate. +// +CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback; + void -Ice::AMI_Array_Object_ice_invoke::__response(bool ok) // ok == true means no user exception. +Ice::AMICallbackBase::__exception(const ::Ice::Exception& ex) { - pair<const Byte*, const Byte*> outParams; - try - { - __is->startReadEncaps(); - Int sz = __is->getReadEncapsSize(); - __is->readBlob(outParams.first, sz); - outParams.second = outParams.first + sz; - __is->endReadEncaps(); - } - catch(const LocalException& ex) - { - __finished(ex); - return; - } - ice_response(ok, outParams); - __releaseCallback(); + ice_exception(ex); } -bool -Ice::AMI_Object_ice_flushBatchRequests::__invoke(const ObjectPrx& prx) +void +Ice::AMICallbackBase::__sent() { - __acquireCallback(prx); - try - { - // - // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch - // requests were queued with the connection, they would be lost without being noticed. - // - Handle< ::IceDelegate::Ice::Object> delegate; - int cnt = -1; // Don't retry. - try - { - delegate = prx->__getDelegate(true); - return delegate->__getRequestHandler()->flushAsyncBatchRequests(this); - } - catch(const Ice::LocalException& ex) - { - prx->__handleException(delegate, ex, 0, cnt); - } - } - catch(const Ice::LocalException& ex) - { - __releaseCallback(ex); - } - return false; + dynamic_cast<AMISentCallback*>(this)->ice_sent(); } |