diff options
Diffstat (limited to 'cpp/src')
34 files changed, 1443 insertions, 1347 deletions
diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp new file mode 100644 index 00000000000..03abc55e020 --- /dev/null +++ b/cpp/src/Ice/AsyncResult.cpp @@ -0,0 +1,602 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +#include <IceUtil/DisableWarnings.h> +#include <Ice/AsyncResult.h> +#include <Ice/ThreadPool.h> +#include <Ice/Instance.h> +#include <Ice/LoggerUtil.h> +#include <Ice/Properties.h> +#include <Ice/RequestHandler.h> +#include <Ice/OutgoingAsync.h> + +using namespace std; +using namespace Ice; +using namespace IceInternal; + +IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; } + +const unsigned char Ice::AsyncResult::OK = 0x1; +const unsigned char Ice::AsyncResult::Done = 0x2; +const unsigned char Ice::AsyncResult::Sent = 0x4; +const unsigned char Ice::AsyncResult::EndCalled = 0x8; + +void +AsyncResult::cancel() +{ + cancel(InvocationCanceledException(__FILE__, __LINE__)); +} + +Int +AsyncResult::getHash() const +{ + return static_cast<Int>(reinterpret_cast<Long>(this) >> 4); +} + +CommunicatorPtr +AsyncResult::getCommunicator() const +{ + return _communicator; +} + +ConnectionPtr +AsyncResult::getConnection() const +{ + return 0; +} + +ObjectPrx +AsyncResult::getProxy() const +{ + return 0; +} + +bool +AsyncResult::isCompleted() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + return _state & Done; +} + +void +AsyncResult::waitForCompleted() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!(_state & Done)) + { + _monitor.wait(); + } +} + +bool +AsyncResult::isSent() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + return _state & Sent; +} + +void +AsyncResult::waitForSent() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + while(!(_state & Sent) && !_exception.get()) + { + _monitor.wait(); + } +} + +void +AsyncResult::throwLocalException() const +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_exception.get()) + { + _exception.get()->ice_throw(); + } +} + +bool +AsyncResult::sentSynchronously() const +{ + return _sentSynchronously; +} + +LocalObjectPtr +AsyncResult::getCookie() const +{ + return _cookie; +} + +const std::string& +AsyncResult::getOperation() const +{ + return _operation; +} + +void +AsyncResult::__throwUserException() +{ + try + { + _is.startReadEncaps(); + _is.throwException(); + } + catch(const Ice::UserException&) + { + _is.endReadEncaps(); + throw; + } +} + +bool +AsyncResult::__wait() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_state & EndCalled) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once"); + } + _state |= EndCalled; + while(!(_state & Done)) + { + _monitor.wait(); + } + if(_exception.get()) + { + _exception.get()->ice_throw(); + } + return _state & OK; +} + +void +AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) +{ + __check(r, operation); + if(r->getProxy().get() != prx) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation + + " does not match proxy that was used to call corresponding begin_" + + operation + " method"); + } +} + +void +AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation) +{ + __check(r, operation); + if(r->getCommunicator().get() != com) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation + + " does not match communicator that was used to call corresponding " + + "begin_" + operation + " method"); + } +} + +void +AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation) +{ + __check(r, operation); + if(r->getConnection().get() != con) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation + + " does not match connection that was used to call corresponding " + + "begin_" + operation + " method"); + } +} + +void +AsyncResult::__check(const AsyncResultPtr& r, const string& operation) +{ + if(!r) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null"); + } + else if(&r->_operation != &operation) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation + + " method: " + r->_operation); + } +} + +AsyncResult::AsyncResult(const CommunicatorPtr& communicator, + const IceInternal::InstancePtr& instance, + const string& op, + const CallbackBasePtr& del, + const LocalObjectPtr& cookie) : + _instance(instance), + _sentSynchronously(false), + _is(instance.get(), Ice::currentProtocolEncoding), + _communicator(communicator), + _operation(op), + _callback(del), + _cookie(cookie), + _state(0) +{ + if(!_callback) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__); + } + const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie); +} + +AsyncResult::~AsyncResult() +{ +} + +bool +AsyncResult::sent(bool done) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + assert(!_exception.get()); + + bool alreadySent = _state & Sent; + _state |= Sent; + if(done) + { + _state |= Done | OK; + _cancellationHandler = 0; + if(!_callback || !_callback->hasSentCallback()) + { + _observer.detach(); + } + } + + _monitor.notifyAll(); + return !alreadySent && _callback && _callback->hasSentCallback(); +} + +bool +AsyncResult::finished(bool ok) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + if(ok) + { + _state |= OK; + } + _cancellationHandler = 0; + if(!_callback) + { + _observer.detach(); + } + _monitor.notifyAll(); + return _callback; +} + +bool +AsyncResult::finished(const Ice::Exception& ex) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + _exception.reset(ex.ice_clone()); + _cancellationHandler = 0; + _observer.failed(ex.ice_name()); + if(!_callback) + { + _observer.detach(); + } + _monitor.notifyAll(); + return _callback; +} + +void +AsyncResult::invokeSentAsync() +{ + class AsynchronousSent : public DispatchWorkItem + { + public: + + AsynchronousSent(const ConnectionPtr& connection, const AsyncResultPtr& result) : + DispatchWorkItem(connection), _result(result) + { + } + + virtual void + run() + { + _result->invokeSent(); + } + + private: + + const AsyncResultPtr _result; + }; + + // + // This is called when it's not safe to call the sent callback + // synchronously from this thread. Instead the exception callback + // is called asynchronously from the client thread pool. + // + try + { + _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this)); + } + catch(const Ice::CommunicatorDestroyedException&) + { + } +} + +void +AsyncResult::invokeCompletedAsync() +{ + class AsynchronousCompleted : public DispatchWorkItem + { + public: + + AsynchronousCompleted(const ConnectionPtr& connection, const AsyncResultPtr& result) : + DispatchWorkItem(connection), _result(result) + { + } + + virtual void + run() + { + _result->invokeCompleted(); + } + + private: + + const AsyncResultPtr _result; + }; + + // + // CommunicatorDestroyedCompleted is the only exception that can propagate directly + // from this method. + // + _instance->clientThreadPool()->dispatch(new AsynchronousCompleted(_cachedConnection, this)); +} + +void +AsyncResult::invokeSent() +{ + assert(_callback); + + try + { + AsyncResultPtr self(this); + _callback->sent(self); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + if(_observer) + { + ObjectPrx proxy = getProxy(); + if(!proxy || !proxy->ice_isTwoway()) + { + _observer.detach(); + } + } +} + +void +AsyncResult::invokeCompleted() +{ + assert(_callback); + + try + { + AsyncResultPtr self(this); + _callback->completed(self); + } + catch(const std::exception& ex) + { + warning(ex); + } + catch(...) + { + warning(); + } + + _observer.detach(); +} + +void +AsyncResult::cancel(const Ice::LocalException& ex) +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _cancellationException.reset(ex.ice_clone()); + if(!_cancellationHandler) + { + return; + } + } + _cancellationHandler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), ex); +} + +void +AsyncResult::cancelable(const CancellationHandlerPtr& handler) +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(!_cancellationException.get()) + { + _cancellationHandler = handler; + return; + } + } + handler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), *_cancellationException.get()); +} + +void +AsyncResult::checkCanceled() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_cancellationException.get()) + { + _cancellationException->ice_throw(); + } +} + +void +AsyncResult::warning(const std::exception& exc) const +{ + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Warning out(_instance->initializationData().logger); + const Exception* ex = dynamic_cast<const Exception*>(&exc); + if(ex) + { + out << "Ice::Exception raised by AMI callback:\n" << *ex; + } + else + { + out << "std::exception raised by AMI callback:\n" << exc.what(); + } + } +} + +void +AsyncResult::warning() const +{ + if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + { + Warning out(_instance->initializationData().logger); + out << "unknown exception raised by AMI callback"; + } +} + + +namespace +{ + +// +// Dummy class derived from CallbackBase +// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn, +// this allows us to test whether the user supplied a null delegate instance to the +// generated begin_ method without having to generate a separate test to throw IllegalArgumentException +// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated +// object code. +// +class DummyCallback : public CallbackBase +{ +public: + + DummyCallback() + { + } + + virtual void + completed(const Ice::AsyncResultPtr&) const + { + assert(false); + } + + virtual CallbackBasePtr + verify(const Ice::LocalObjectPtr&) + { + // + // Called by the AsyncResult constructor to verify the delegate. The dummy + // delegate is passed when the user used a begin_ method without delegate. + // By returning 0 here, we tell the AsyncResult that no delegates was + // provided. + // + return 0; + } + + virtual void + sent(const AsyncResultPtr&) const + { + assert(false); + } + + virtual bool + hasSentCallback() const + { + assert(false); + return false; + } +}; + +} + +// +// This gives a pointer value to compare against in the generated +// begin_ method to decide whether the caller passed a null pointer +// versus the generated inline version of the begin_ method having +// passed a pointer to the dummy delegate. +// +CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback; + +#ifdef ICE_CPP11 + +Ice::CallbackPtr +Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed, + const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent) +{ + class Cpp11CB : public GenericCallbackBase + { + public: + + Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed, + const ::std::function<void (const AsyncResultPtr&)>& sent) : + _completed(completed), + _sent(sent) + { + checkCallback(true, completed != nullptr); + } + + virtual void + completed(const AsyncResultPtr& result) const + { + _completed(result); + } + + virtual CallbackBasePtr + verify(const LocalObjectPtr&) + { + return this; // Nothing to do, the cookie is not type-safe. + } + + virtual void + sent(const AsyncResultPtr& result) const + { + if(_sent != nullptr) + { + _sent(result); + } + } + + virtual bool + hasSentCallback() const + { + return _sent != nullptr; + } + + private: + + ::std::function< void (const AsyncResultPtr&)> _completed; + ::std::function< void (const AsyncResultPtr&)> _sent; + }; + + return new Cpp11CB(completed, sent); +} +#endif + +void +IceInternal::CallbackBase::checkCallback(bool obj, bool cb) +{ + if(!obj) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null"); + } + if(!cb) + { + throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null"); + } +} + + diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp index 0b08198726a..3fc321735cb 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.cpp +++ b/cpp/src/Ice/CollocatedRequestHandler.cpp @@ -14,6 +14,8 @@ #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/TraceLevels.h> +#include <Ice/Outgoing.h> +#include <Ice/OutgoingAsync.h> #include <Ice/TraceUtil.h> @@ -28,7 +30,7 @@ class InvokeAll : public DispatchWorkItem { public: - InvokeAll(OutgoingMessageCallback* out, + InvokeAll(OutgoingBase* out, BasicStream* os, CollocatedRequestHandler* handler, Int requestId, @@ -49,7 +51,7 @@ public: private: - OutgoingMessageCallback* _out; + OutgoingBase* _out; BasicStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; @@ -61,7 +63,7 @@ class InvokeAllAsync : public DispatchWorkItem { public: - InvokeAllAsync(const OutgoingAsyncMessageCallbackPtr& outAsync, + InvokeAllAsync(const OutgoingAsyncBasePtr& outAsync, BasicStream* os, CollocatedRequestHandler* handler, Int requestId, @@ -82,7 +84,7 @@ public: private: - OutgoingAsyncMessageCallbackPtr _outAsync; + OutgoingAsyncBasePtr _outAsync; BasicStream* _os; CollocatedRequestHandlerPtr _handler; Int _requestId; @@ -113,7 +115,7 @@ public: private: const CollocatedRequestHandlerPtr _handler; - const OutgoingAsyncMessageCallbackPtr _outAsync; + const OutgoingAsyncBasePtr _outAsync; BasicStream _stream; Int _invokeNum; }; @@ -261,24 +263,24 @@ CollocatedRequestHandler::abortBatchRequest() } bool -CollocatedRequestHandler::sendRequest(OutgoingMessageCallback* out) +CollocatedRequestHandler::sendRequest(OutgoingBase* out) { out->invokeCollocated(this); return !_response && _reference->getInvocationTimeout() == 0; } AsyncStatus -CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& outAsync) +CollocatedRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& outAsync) { - return outAsync->__invokeCollocated(this); + return outAsync->invokeCollocated(this); } void -CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) +CollocatedRequestHandler::requestCanceled(OutgoingBase* out, const LocalException& ex) { Lock sync(*this); - map<OutgoingMessageCallback*, Int>::iterator p = _sendRequests.find(out); + map<OutgoingBase*, Int>::iterator p = _sendRequests.find(out); if(p != _sendRequests.end()) { if(p->second > 0) @@ -286,7 +288,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) _requests.erase(p->second); } InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex); + out->completed(ex); _sendRequests.erase(p); return; } @@ -299,7 +301,7 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) if(q->second == o) { InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); _requests.erase(q); return; // We're done. } @@ -307,12 +309,12 @@ CollocatedRequestHandler::requestTimedOut(OutgoingMessageCallback* out) } } -void -CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +void +CollocatedRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { Lock sync(*this); - - map<OutgoingAsyncMessageCallbackPtr, Int>::iterator p = _sendAsyncRequests.find(outAsync); + + map<OutgoingAsyncBasePtr, Int>::iterator p = _sendAsyncRequests.find(outAsync); if(p != _sendAsyncRequests.end()) { if(p->second > 0) @@ -320,7 +322,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac _asyncRequests.erase(p->second); } _sendAsyncRequests.erase(p); - outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } return; } @@ -332,7 +337,10 @@ CollocatedRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbac if(q->second.get() == o.get()) { _asyncRequests.erase(q); - outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } return; } } @@ -391,16 +399,17 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync) { _sendAsyncRequests.insert(make_pair(outAsync, requestId)); } + outAsync->cancelable(this); } - outAsync->__attachCollocatedObserver(_adapter, requestId); + outAsync->attachCollocatedObserver(_adapter, requestId); - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, requestId, 1, false)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, requestId, 1, false)); return AsyncStatusQueued; } void -CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out) +CollocatedRequestHandler::invokeBatchRequests(OutgoingBase* out) { Int invokeNum; { @@ -457,7 +466,7 @@ CollocatedRequestHandler::invokeBatchRequests(BatchOutgoing* out) } AsyncStatus -CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) +CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync) { Int invokeNum; { @@ -473,10 +482,12 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) if(_reference->getInvocationTimeout() > 0) { _sendAsyncRequests.insert(make_pair(outAsync, 0)); + + outAsync->cancelable(this); } assert(!_batchStream.b.empty()); - _batchStream.swap(*outAsync->__getOs()); + _batchStream.swap(*outAsync->getOs()); // // Reset the batch stream. @@ -488,14 +499,14 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(BatchOutgoingAsync* outAsync) } } - outAsync->__attachCollocatedObserver(_adapter, 0); + outAsync->attachCollocatedObserver(_adapter, 0); if(invokeNum > 0) { - _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->__getOs(), this, 0, invokeNum,true)); + _adapter->getThreadPool()->dispatch(new InvokeAllAsync(outAsync, outAsync->getOs(), this, 0, invokeNum,true)); return AsyncStatusQueued; } - else if(outAsync->__sent()) + else if(outAsync->sent()) { return static_cast<AsyncStatus>(AsyncStatusSent | AsyncStatusInvokeSentCallback); } @@ -512,7 +523,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) { Lock sync(*this); assert(_response); - + os->i = os->b.begin() + sizeof(replyHdr) + 4; if(_traceLevels->protocol >= 1) @@ -524,7 +535,7 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) map<int, Outgoing*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { - p->second->finished(*os); + p->second->completed(*os); _requests.erase(p); } else @@ -532,17 +543,21 @@ CollocatedRequestHandler::sendResponse(Int requestId, BasicStream* os, Byte) map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { - os->swap(*q->second->__getIs()); - outAsync = q->second; + os->swap(*q->second->getIs()); + if(q->second->completed()) + { + outAsync = q->second; + } _asyncRequests.erase(q); } } } - if(outAsync && outAsync->__finished()) + if(outAsync) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } + _adapter->decDirectCount(); } @@ -563,12 +578,7 @@ CollocatedRequestHandler::systemException(Int requestId, const SystemException& void CollocatedRequestHandler::invokeException(Int requestId, const LocalException& ex, int invokeNum) { - if(requestId > 0) - { - Lock sync(*this); - _requests.erase(requestId); - _asyncRequests.erase(requestId); - } + handleException(requestId, ex); _adapter->decDirectCount(); } @@ -585,7 +595,7 @@ CollocatedRequestHandler::waitForConnection() } bool -CollocatedRequestHandler::sent(OutgoingMessageCallback* out) +CollocatedRequestHandler::sent(OutgoingBase* out) { if(_reference->getInvocationTimeout() > 0) { @@ -600,7 +610,7 @@ CollocatedRequestHandler::sent(OutgoingMessageCallback* out) } bool -CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync) +CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync) { if(_reference->getInvocationTimeout() > 0) { @@ -610,9 +620,9 @@ CollocatedRequestHandler::sentAsync(OutgoingAsyncMessageCallback* outAsync) return false; // The request timed-out. } } - if(outAsync->__sent()) + if(outAsync->sent()) { - outAsync->__invokeSent(); + outAsync->invokeSent(); } return true; } @@ -684,7 +694,7 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex) map<int, Outgoing*>::iterator p = _requests.find(requestId); if(p != _requests.end()) { - p->second->finished(ex); + p->second->completed(ex); _requests.erase(p); } else @@ -692,13 +702,17 @@ CollocatedRequestHandler::handleException(int requestId, const Exception& ex) map<int, OutgoingAsyncPtr>::iterator q = _asyncRequests.find(requestId); if(q != _asyncRequests.end()) { - outAsync = q->second; + if(q->second->completed(ex)) + { + outAsync = q->second; + } _asyncRequests.erase(q); } } } + if(outAsync) { - outAsync->__finished(ex); + outAsync->invokeCompleted(); } } diff --git a/cpp/src/Ice/CollocatedRequestHandler.h b/cpp/src/Ice/CollocatedRequestHandler.h index a3ac1387045..3930c12ce1c 100644 --- a/cpp/src/Ice/CollocatedRequestHandler.h +++ b/cpp/src/Ice/CollocatedRequestHandler.h @@ -31,10 +31,10 @@ typedef IceUtil::Handle<ObjectAdapterI> ObjectAdapterIPtr; namespace IceInternal { +class OutgoingBase; class Outgoing; -class BatchOutgoing; +class OutgoingAsyncBase; class OutgoingAsync; -class BatchOutgoingAsync; class CollocatedRequestHandler : public RequestHandler, public ResponseHandler, private IceUtil::Monitor<IceUtil::Mutex> { @@ -50,11 +50,11 @@ public: virtual void finishBatchRequest(BasicStream*); virtual void abortBatchRequest(); - virtual bool sendRequest(OutgoingMessageCallback*); - virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&); + virtual bool sendRequest(OutgoingBase*); + virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&); - virtual void requestTimedOut(OutgoingMessageCallback*); - virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&); + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual void sendResponse(Ice::Int, BasicStream*, Ice::Byte); virtual void sendNoResponse(); @@ -68,11 +68,11 @@ public: void invokeRequest(Outgoing*); AsyncStatus invokeAsyncRequest(OutgoingAsync*); - void invokeBatchRequests(BatchOutgoing*); - AsyncStatus invokeAsyncBatchRequests(BatchOutgoingAsync*); + void invokeBatchRequests(OutgoingBase*); + AsyncStatus invokeAsyncBatchRequests(OutgoingAsyncBase*); - bool sent(OutgoingMessageCallback*); - bool sentAsync(OutgoingAsyncMessageCallback*); + bool sent(OutgoingBase*); + bool sentAsync(OutgoingAsyncBase*); void invokeAll(BasicStream*, Ice::Int, Ice::Int, bool); @@ -88,8 +88,8 @@ private: int _requestId; - std::map<OutgoingMessageCallback*, Ice::Int> _sendRequests; - std::map<OutgoingAsyncMessageCallbackPtr, Ice::Int> _sendAsyncRequests; + std::map<OutgoingBase*, Ice::Int> _sendRequests; + std::map<OutgoingAsyncBasePtr, Ice::Int> _sendAsyncRequests; std::map<Ice::Int, Outgoing*> _requests; std::map<Ice::Int, OutgoingAsyncPtr> _asyncRequests; diff --git a/cpp/src/Ice/CommunicatorI.cpp b/cpp/src/Ice/CommunicatorI.cpp index 9b7ce8b8cd8..a7c2aee35d2 100644 --- a/cpp/src/Ice/CommunicatorI.cpp +++ b/cpp/src/Ice/CommunicatorI.cpp @@ -21,6 +21,7 @@ #include <Ice/DefaultsAndOverrides.h> #include <Ice/TraceLevels.h> #include <Ice/Router.h> +#include <Ice/OutgoingAsync.h> #include <IceUtil/Mutex.h> #include <IceUtil/MutexPtrLock.h> #include <IceUtil/UUID.h> @@ -198,8 +199,7 @@ Ice::CommunicatorI::getPluginManager() const void Ice::CommunicatorI::flushBatchRequests() { - AsyncResultPtr r = begin_flushBatchRequests(); - end_flushBatchRequests(r); + end_flushBatchRequests(begin_flushBatchRequests()); } AsyncResultPtr @@ -223,9 +223,8 @@ Ice::CommunicatorI::begin_flushBatchRequests(const Callback_Communicator_flushBa #ifdef ICE_CPP11 AsyncResultPtr -Ice::CommunicatorI::begin_flushBatchRequests( - const IceInternal::Function<void (const Exception&)>& exception, - const IceInternal::Function<void (bool)>& sent) +Ice::CommunicatorI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception, + const IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public IceInternal::Cpp11FnCallbackNC { @@ -268,8 +267,7 @@ const ::std::string __flushBatchRequests_name = "flushBatchRequests"; } AsyncResultPtr -Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, - const LocalObjectPtr& cookie) +Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePtr& cb, const LocalObjectPtr& cookie) { OutgoingConnectionFactoryPtr connectionFactory = _instance->outgoingConnectionFactory(); ObjectAdapterFactoryPtr adapterFactory = _instance->objectAdapterFactory(); @@ -278,8 +276,11 @@ Ice::CommunicatorI::__begin_flushBatchRequests(const IceInternal::CallbackBasePt // This callback object receives the results of all invocations // of Connection::begin_flushBatchRequests. // - CommunicatorBatchOutgoingAsyncPtr result = - new CommunicatorBatchOutgoingAsync(this, _instance, __flushBatchRequests_name, cb, cookie); + CommunicatorFlushBatchPtr result = new CommunicatorFlushBatch(this, + _instance, + __flushBatchRequests_name, + cb, + cookie); connectionFactory->flushAsyncBatchRequests(result); adapterFactory->flushAsyncBatchRequests(result); diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp index 5a9a84c83b1..7d7f4a9291c 100644 --- a/cpp/src/Ice/ConnectRequestHandler.cpp +++ b/cpp/src/Ice/ConnectRequestHandler.cpp @@ -49,7 +49,7 @@ class FlushSentRequests : public DispatchWorkItem { public: - FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncMessageCallbackPtr>& callbacks) : + FlushSentRequests(const Ice::ConnectionPtr& connection, const vector<OutgoingAsyncBasePtr>& callbacks) : DispatchWorkItem(connection), _callbacks(callbacks) { } @@ -57,15 +57,15 @@ public: virtual void run() { - for(vector<OutgoingAsyncMessageCallbackPtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p) + for(vector<OutgoingAsyncBasePtr>::const_iterator p = _callbacks.begin(); p != _callbacks.end(); ++p) { - (*p)->__invokeSent(); + (*p)->invokeSent(); } } private: - vector<OutgoingAsyncMessageCallbackPtr> _callbacks; + vector<OutgoingAsyncBasePtr> _callbacks; }; }; @@ -202,7 +202,7 @@ ConnectRequestHandler::abortBatchRequest() } bool -ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out) +ConnectRequestHandler::sendRequest(OutgoingBase* out) { { Lock sync(*this); @@ -225,7 +225,7 @@ ConnectRequestHandler::sendRequest(OutgoingMessageCallback* out) } AsyncStatus -ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& out) +ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) { { Lock sync(*this); @@ -236,6 +236,7 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& o Request req; req.outAsync = out; _requests.push_back(req); + out->cancelable(this); return AsyncStatusQueued; } } @@ -244,11 +245,11 @@ ConnectRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& o throw RetryException(ex); } } - return out->__send(_connection, _compress, _response); + return out->send(_connection, _compress, _response); } void -ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) +ConnectRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) { { Lock sync(*this); @@ -263,8 +264,7 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) { if(p->out == out) { - Ice::InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex); + out->completed(ex); _requests.erase(p); return; } @@ -272,11 +272,11 @@ ConnectRequestHandler::requestTimedOut(OutgoingMessageCallback* out) assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - _connection->requestTimedOut(out); + _connection->requestCanceled(out, ex); } void -ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +ConnectRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { { Lock sync(*this); @@ -292,14 +292,16 @@ ConnectRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPt if(p->outAsync.get() == outAsync.get()) { _requests.erase(p); - outAsync->__dispatchInvocationTimeout(_reference->getInstance()->clientThreadPool(), 0); + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } return; } } - assert(false); // The request has to be queued if it timed out and we're not initialized yet. } } - _connection->asyncRequestTimedOut(outAsync); + _connection->asyncRequestCanceled(outAsync, ex); } Ice::ConnectionIPtr @@ -451,7 +453,7 @@ ConnectRequestHandler::flushRequests() _flushing = true; } - vector<OutgoingAsyncMessageCallbackPtr> sentCallbacks; + vector<OutgoingAsyncBasePtr> sentCallbacks; try { while(!_requests.empty()) // _requests is immutable when _flushing = true @@ -463,7 +465,7 @@ ConnectRequestHandler::flushRequests() } else if(req.outAsync) { - if(req.outAsync->__send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) + if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback) { sentCallbacks.push_back(req.outAsync); } @@ -551,11 +553,14 @@ ConnectRequestHandler::flushRequestsWithException() { if(p->out) { - p->out->finished(*_exception.get()); + p->out->completed(*_exception.get()); } else if(p->outAsync) { - p->outAsync->__finished(*_exception.get()); + if(p->outAsync->completed(*_exception.get())) + { + p->outAsync->invokeCompleted(); + } } else { diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h index 095edda6123..c5bc6602766 100644 --- a/cpp/src/Ice/ConnectRequestHandler.h +++ b/cpp/src/Ice/ConnectRequestHandler.h @@ -42,11 +42,11 @@ public: virtual void finishBatchRequest(BasicStream*); virtual void abortBatchRequest(); - virtual bool sendRequest(OutgoingMessageCallback*); - virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&); + virtual bool sendRequest(OutgoingBase*); + virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&); - virtual void requestTimedOut(OutgoingMessageCallback*); - virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&); + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual Ice::ConnectionIPtr getConnection(); virtual Ice::ConnectionIPtr waitForConnection(); @@ -69,8 +69,8 @@ private: { } - OutgoingMessageCallback* out; - OutgoingAsyncMessageCallbackPtr outAsync; + OutgoingBase* out; + OutgoingAsyncBasePtr outAsync; BasicStream* os; }; diff --git a/cpp/src/Ice/ConnectionFactory.cpp b/cpp/src/Ice/ConnectionFactory.cpp index f859ac3fa58..f1fc0380727 100644 --- a/cpp/src/Ice/ConnectionFactory.cpp +++ b/cpp/src/Ice/ConnectionFactory.cpp @@ -24,6 +24,7 @@ #include <Ice/RouterInfo.h> #include <Ice/LocalException.h> #include <Ice/Functional.h> +#include <Ice/OutgoingAsync.h> #include <IceUtil/Random.h> #include <iterator> @@ -432,7 +433,7 @@ IceInternal::OutgoingConnectionFactory::removeAdapter(const ObjectAdapterPtr& ad } void -IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) +IceInternal::OutgoingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) { list<ConnectionIPtr> c; @@ -1357,7 +1358,7 @@ IceInternal::IncomingConnectionFactory::connections() const } void -IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) +IceInternal::IncomingConnectionFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) { list<ConnectionIPtr> c = connections(); // connections() is synchronized, so no need to synchronize here. diff --git a/cpp/src/Ice/ConnectionFactory.h b/cpp/src/Ice/ConnectionFactory.h index bd0bbe30804..92603a55b04 100644 --- a/cpp/src/Ice/ConnectionFactory.h +++ b/cpp/src/Ice/ConnectionFactory.h @@ -66,7 +66,7 @@ public: const CreateConnectionCallbackPtr&); void setRouterInfo(const RouterInfoPtr&); void removeAdapter(const Ice::ObjectAdapterPtr&); - void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&); + void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&); private: @@ -178,7 +178,7 @@ public: EndpointIPtr endpoint() const; std::list<Ice::ConnectionIPtr> connections() const; - void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&); + void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&); // // Operations from EventHandler diff --git a/cpp/src/Ice/ConnectionI.cpp b/cpp/src/Ice/ConnectionI.cpp index 1bdbeb12448..92dc4a1693d 100644 --- a/cpp/src/Ice/ConnectionI.cpp +++ b/cpp/src/Ice/ConnectionI.cpp @@ -242,7 +242,7 @@ Ice::ConnectionI::OutgoingMessage::adopt(BasicStream* str) } void -Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) +Ice::ConnectionI::OutgoingMessage::canceled(bool adoptStream) { assert((out || outAsync)); // Only requests can timeout. out = 0; @@ -253,7 +253,7 @@ Ice::ConnectionI::OutgoingMessage::timedOut(bool adoptStream) } else { - assert(!adopted && !stream); + assert(!adopted); } } @@ -273,25 +273,28 @@ Ice::ConnectionI::OutgoingMessage::sent() else if(outAsync) { #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) - invokeSent = outAsync->__sent(); + invokeSent = outAsync->sent(); return invokeSent || receivedReply; #else - return outAsync->__sent(); + return outAsync->sent(); #endif } return false; } void -Ice::ConnectionI::OutgoingMessage::finished(const Ice::LocalException& ex) +Ice::ConnectionI::OutgoingMessage::completed(const Ice::LocalException& ex) { if(out) { - out->finished(ex); + out->completed(ex); } else if(outAsync) { - outAsync->__finished(ex); + if(outAsync->completed(ex)) + { + outAsync->invokeCompleted(); + } } if(adopted) @@ -651,8 +654,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) #endif } - out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - static_cast<Int>(os->b.size() - headerSize - 4)); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); // // Send the message. If it can't be sent without blocking the message is added @@ -685,7 +687,7 @@ Ice::ConnectionI::sendRequest(Outgoing* out, bool compress, bool response) AsyncStatus Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, bool response) { - BasicStream* os = out->__getOs(); + BasicStream* os = out->getOs(); IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); if(_exception.get()) @@ -731,8 +733,7 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b #endif } - out->__attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, - static_cast<Int>(os->b.size() - headerSize - 4)); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, requestId); AsyncStatus status = AsyncStatusQueued; try @@ -747,6 +748,11 @@ Ice::ConnectionI::sendAsyncRequest(const OutgoingAsyncPtr& out, bool compress, b _exception->ice_throw(); } + if(response || status & AsyncStatusQueued) + { + out->cancelable(this); // Notify the request that it's cancelable + } + if(response) { // @@ -961,7 +967,7 @@ Ice::ConnectionI::abortBatchRequest() void Ice::ConnectionI::flushBatchRequests() { - BatchOutgoing out(this, _instance.get(), __flushBatchRequests_name); + FlushBatch out(this, _instance.get(), __flushBatchRequests_name); out.invoke(); } @@ -986,9 +992,8 @@ Ice::ConnectionI::begin_flushBatchRequests(const Callback_Connection_flushBatchR #ifdef ICE_CPP11 AsyncResultPtr -Ice::ConnectionI::begin_flushBatchRequests( - const IceInternal::Function<void (const Exception&)>& exception, - const IceInternal::Function<void (bool)>& sent) +Ice::ConnectionI::begin_flushBatchRequests(const IceInternal::Function<void (const Exception&)>& exception, + const IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public IceInternal::Cpp11FnCallbackNC @@ -1026,16 +1031,13 @@ Ice::ConnectionI::begin_flushBatchRequests( AsyncResultPtr Ice::ConnectionI::__begin_flushBatchRequests(const CallbackBasePtr& cb, const LocalObjectPtr& cookie) { - ConnectionBatchOutgoingAsyncPtr result = - new ConnectionBatchOutgoingAsync(this, _communicator, _instance, __flushBatchRequests_name, cb, cookie); - try - { - result->__invoke(); - } - catch(const LocalException& __ex) - { - result->__invokeExceptionAsync(__ex); - } + ConnectionFlushBatchPtr result = new ConnectionFlushBatch(this, + _communicator, + _instance, + __flushBatchRequests_name, + cb, + cookie); + result->invoke(); return result; } @@ -1047,7 +1049,7 @@ Ice::ConnectionI::end_flushBatchRequests(const AsyncResultPtr& r) } bool -Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) +Ice::ConnectionI::flushBatchRequests(OutgoingBase* out) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); while(_batchStreamInUse && !_exception.get()) @@ -1075,12 +1077,10 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif - - out->attachRemoteObserver(initConnectionInfo(), _endpoint, - static_cast<Int>(_batchStream.b.size() - headerSize - 4)); - _batchStream.swap(*out->os()); + out->attachRemoteObserver(initConnectionInfo(), _endpoint, 0); + // // Send the batch stream. // @@ -1109,7 +1109,7 @@ Ice::ConnectionI::flushBatchRequests(BatchOutgoing* out) } AsyncStatus -Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) +Ice::ConnectionI::flushAsyncBatchRequests(const OutgoingAsyncBasePtr& outAsync) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); while(_batchStreamInUse && !_exception.get()) @@ -1125,7 +1125,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) if(_batchRequestNum == 0) { AsyncStatus status = AsyncStatusSent; - if(outAsync->__sent()) + if(outAsync->sent()) { status = static_cast<AsyncStatus>(status | AsyncStatusInvokeSentCallback); } @@ -1141,11 +1141,9 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) #else copy(p, p + sizeof(Int), _batchStream.b.begin() + headerSize); #endif + _batchStream.swap(*outAsync->getOs()); - outAsync->__attachRemoteObserver(initConnectionInfo(), _endpoint, 0, - static_cast<Int>(_batchStream.b.size() - headerSize - 4)); - - _batchStream.swap(*outAsync->__getOs()); + outAsync->attachRemoteObserver(initConnectionInfo(), _endpoint, 0); // // Send the batch stream. @@ -1153,7 +1151,7 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) AsyncStatus status = AsyncStatusQueued; try { - OutgoingMessage message(outAsync, outAsync->__getOs(), _batchRequestCompress, 0); + OutgoingMessage message(outAsync, outAsync->getOs(), _batchRequestCompress, 0); status = sendMessage(message); } catch(const Ice::LocalException& ex) @@ -1163,6 +1161,11 @@ Ice::ConnectionI::flushAsyncBatchRequests(const BatchOutgoingAsyncPtr& outAsync) _exception->ice_throw(); } + if(status & AsyncStatusQueued) + { + outAsync->cancelable(this); // Notify the request that it's cancelable. + } + // // Reset the batch stream. // @@ -1276,9 +1279,14 @@ Ice::ConnectionI::getACM() } void -Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) +Ice::ConnectionI::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) + { + return; // The request has already been or will be shortly notified of the failure. + } + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { if(o->out == out) @@ -1302,16 +1310,15 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) // if(o == _sendStreams.begin()) { - o->timedOut(true); // true = adopt the stream. + o->canceled(true); // true = adopt the stream. } else { - o->timedOut(false); + o->canceled(false); _sendStreams.erase(o); } - InvocationTimeoutException ex(__FILE__, __LINE__); - out->finished(ex); + out->completed(ex); return; } } @@ -1321,8 +1328,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) { if(_requestsHint != _requests.end() && _requestsHint->second == o) { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); _requests.erase(_requestsHint); _requestsHint = _requests.end(); } @@ -1332,8 +1338,7 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) { if(p->second == o) { - InvocationTimeoutException ex(__FILE__, __LINE__); - o->finished(ex); + o->completed(ex); assert(p != _requestsHint); _requests.erase(p); return; // We're done. @@ -1344,10 +1349,18 @@ Ice::ConnectionI::requestTimedOut(OutgoingMessageCallback* out) } void -Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +Ice::ConnectionI::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const LocalException& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + // + // NOTE: This isn't called from a thread pool thread. + // + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(*this); + if(_state >= StateClosed) + { + return; // The request has already been or will be shortly notified of the failure. + } + for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { if(o->outAsync.get() == outAsync.get()) @@ -1365,25 +1378,29 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou _asyncRequests.erase(o->requestId); } } - + // // If the request is being sent, don't remove it from the send streams, // it will be removed once the sending is finished. // if(o == _sendStreams.begin()) { - o->timedOut(true); // true = adopt the stream + o->canceled(true); // true = adopt the stream } else { - o->timedOut(false); + o->canceled(false); _sendStreams.erase(o); } - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + sync.release(); + outAsync->invokeCompleted(); + } + return; } } - + OutgoingAsyncPtr o = OutgoingAsyncPtr::dynamicCast(outAsync); if(o) { @@ -1393,19 +1410,25 @@ Ice::ConnectionI::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& ou { _asyncRequests.erase(_asyncRequestsHint); _asyncRequestsHint = _asyncRequests.end(); - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } + return; } } - + for(map<Int, OutgoingAsyncPtr>::iterator p = _asyncRequests.begin(); p != _asyncRequests.end(); ++p) { if(p->second.get() == o.get()) { assert(p != _asyncRequestsHint); _asyncRequests.erase(p); - outAsync->__dispatchInvocationTimeout(_threadPool, this); - return; // We're done + if(outAsync->completed(ex)) + { + outAsync->invokeCompletedAsync(); + } + return; } } } @@ -1972,18 +1995,18 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) if(p->invokeSent) { - p->outAsync->__invokeSent(); + p->outAsync->invokeSent(); } if(p->receivedReply) { OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(p->outAsync); - if(outAsync->__finished()) + if(outAsync->completed()) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } } #else - p->outAsync->__invokeSent(); + p->outAsync->invokeSent(); #endif } ++dispatchedCount; @@ -1995,7 +2018,7 @@ ConnectionI::dispatch(const StartCallbackPtr& startCB, const vector<OutgoingMess // if(outAsync) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); ++dispatchedCount; } @@ -2147,14 +2170,14 @@ Ice::ConnectionI::finish() { if(message->sent() && message->invokeSent) { - message->outAsync->__invokeSent(); + message->outAsync->invokeSent(); } if(message->receivedReply) { OutgoingAsyncPtr outAsync = OutgoingAsyncPtr::dynamicCast(message->outAsync); - if(outAsync->__finished()) + if(outAsync->completed()) { - outAsync->__invokeCompleted(); + outAsync->invokeCompleted(); } } _sendStreams.pop_front(); @@ -2164,7 +2187,7 @@ Ice::ConnectionI::finish() for(deque<OutgoingMessage>::iterator o = _sendStreams.begin(); o != _sendStreams.end(); ++o) { - o->finished(*_exception.get()); + o->completed(*_exception.get()); if(o->requestId) // Make sure finished isn't called twice. { if(o->out) @@ -2182,13 +2205,16 @@ Ice::ConnectionI::finish() for(map<Int, Outgoing*>::const_iterator p = _requests.begin(); p != _requests.end(); ++p) { - p->second->finished(*_exception.get()); + p->second->completed(*_exception.get()); } _requests.clear(); for(map<Int, OutgoingAsyncPtr>::const_iterator q = _asyncRequests.begin(); q != _asyncRequests.end(); ++q) { - q->second->__finished(*_exception.get()); + if(q->second->completed(*_exception.get())) + { + q->second->invokeCompleted(); + } } _asyncRequests.clear(); @@ -3481,7 +3507,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request if(p != _requests.end()) { - p->second->finished(stream); + p->second->completed(stream); if(p == _requestsHint) { @@ -3508,7 +3534,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request _asyncRequests.erase(q); } - stream.swap(*outAsync->__getIs()); + stream.swap(*outAsync->getIs()); #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) // @@ -3522,7 +3548,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request message->receivedReply = true; outAsync = 0; } - else if(outAsync->__finished()) + else if(outAsync->completed()) { ++dispatchCount; } @@ -3531,7 +3557,7 @@ Ice::ConnectionI::parseMessage(BasicStream& stream, Int& invokeNum, Int& request outAsync = 0; } #else - if(outAsync->__finished()) + if(outAsync->completed()) { ++dispatchCount; } diff --git a/cpp/src/Ice/ConnectionI.h b/cpp/src/Ice/ConnectionI.h index 32c474d20da..3ecec79a247 100644 --- a/cpp/src/Ice/ConnectionI.h +++ b/cpp/src/Ice/ConnectionI.h @@ -30,6 +30,7 @@ #include <Ice/TraceLevelsF.h> #include <Ice/OutgoingAsyncF.h> #include <Ice/EventHandler.h> +#include <Ice/RequestHandler.h> #include <Ice/ResponseHandler.h> #include <Ice/Dispatcher.h> #include <Ice/ObserverHelper.h> @@ -43,8 +44,7 @@ namespace IceInternal { class Outgoing; -class BatchOutgoing; -class OutgoingMessageCallback; +class OutgoingBase; } @@ -56,6 +56,7 @@ class LocalException; class ConnectionI : public Connection, public IceInternal::EventHandler, public IceInternal::ResponseHandler, + public IceInternal::CancellationHandler, public IceUtil::Monitor<IceUtil::Mutex> { class Observer : public IceInternal::ObserverHelperT<Ice::Instrumentation::ConnectionObserver> @@ -89,7 +90,7 @@ public: { } - OutgoingMessage(IceInternal::OutgoingMessageCallback* o, IceInternal::BasicStream* str, bool comp, int rid) : + OutgoingMessage(IceInternal::OutgoingBase* o, IceInternal::BasicStream* str, bool comp, int rid) : stream(str), out(o), compress(comp), requestId(rid), adopted(false) #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) , isSent(false), invokeSent(false), receivedReply(false) @@ -97,7 +98,7 @@ public: { } - OutgoingMessage(const IceInternal::OutgoingAsyncMessageCallbackPtr& o, IceInternal::BasicStream* str, + OutgoingMessage(const IceInternal::OutgoingAsyncBasePtr& o, IceInternal::BasicStream* str, bool comp, int rid) : stream(str), out(0), outAsync(o), compress(comp), requestId(rid), adopted(false) #if defined(ICE_USE_IOCP) || defined(ICE_OS_WINRT) @@ -107,13 +108,13 @@ public: } void adopt(IceInternal::BasicStream*); - void timedOut(bool); + void canceled(bool); bool sent(); - void finished(const Ice::LocalException&); + void completed(const Ice::LocalException&); IceInternal::BasicStream* stream; - IceInternal::OutgoingMessageCallback* out; - IceInternal::OutgoingAsyncMessageCallbackPtr outAsync; + IceInternal::OutgoingBase* out; + IceInternal::OutgoingAsyncBasePtr outAsync; bool compress; int requestId; bool adopted; @@ -178,8 +179,8 @@ public: virtual void end_flushBatchRequests(const AsyncResultPtr&); - bool flushBatchRequests(IceInternal::BatchOutgoing*); - IceInternal::AsyncStatus flushAsyncBatchRequests(const IceInternal::BatchOutgoingAsyncPtr&); + bool flushBatchRequests(IceInternal::OutgoingBase*); + IceInternal::AsyncStatus flushAsyncBatchRequests(const IceInternal::OutgoingAsyncBasePtr&); virtual void setCallback(const ConnectionCallbackPtr&); virtual void setACM(const IceUtil::Optional<int>&, @@ -187,8 +188,8 @@ public: const IceUtil::Optional<ACMHeartbeat>&); virtual ACM getACM(); - void requestTimedOut(IceInternal::OutgoingMessageCallback*); - void asyncRequestTimedOut(const IceInternal::OutgoingAsyncMessageCallbackPtr&); + virtual void requestCanceled(IceInternal::OutgoingBase*, const LocalException&); + virtual void asyncRequestCanceled(const IceInternal::OutgoingAsyncBasePtr&, const LocalException&); virtual void sendResponse(Int, IceInternal::BasicStream*, Byte); virtual void sendNoResponse(); diff --git a/cpp/src/Ice/ConnectionRequestHandler.cpp b/cpp/src/Ice/ConnectionRequestHandler.cpp index 43dda1b88ed..a94d3e7180a 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.cpp +++ b/cpp/src/Ice/ConnectionRequestHandler.cpp @@ -91,27 +91,27 @@ ConnectionRequestHandler::abortBatchRequest() } bool -ConnectionRequestHandler::sendRequest(OutgoingMessageCallback* out) +ConnectionRequestHandler::sendRequest(OutgoingBase* out) { return out->send(_connection, _compress, _response) && !_response; // Finished if sent and no response } AsyncStatus -ConnectionRequestHandler::sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr& out) +ConnectionRequestHandler::sendAsyncRequest(const OutgoingAsyncBasePtr& out) { - return out->__send(_connection, _compress, _response); + return out->send(_connection, _compress, _response); } void -ConnectionRequestHandler::requestTimedOut(OutgoingMessageCallback* out) +ConnectionRequestHandler::requestCanceled(OutgoingBase* out, const Ice::LocalException& ex) { - _connection->requestTimedOut(out); + _connection->requestCanceled(out, ex); } void -ConnectionRequestHandler::asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr& outAsync) +ConnectionRequestHandler::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException& ex) { - _connection->asyncRequestTimedOut(outAsync); + _connection->asyncRequestCanceled(outAsync, ex); } Ice::ConnectionIPtr diff --git a/cpp/src/Ice/ConnectionRequestHandler.h b/cpp/src/Ice/ConnectionRequestHandler.h index 5ab5a4c9ea7..211e8f02819 100644 --- a/cpp/src/Ice/ConnectionRequestHandler.h +++ b/cpp/src/Ice/ConnectionRequestHandler.h @@ -31,11 +31,11 @@ public: virtual void finishBatchRequest(BasicStream*); virtual void abortBatchRequest(); - virtual bool sendRequest(OutgoingMessageCallback*); - virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&); + virtual bool sendRequest(OutgoingBase*); + virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&); - virtual void requestTimedOut(OutgoingMessageCallback*); - virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&); + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); virtual Ice::ConnectionIPtr getConnection(); virtual Ice::ConnectionIPtr waitForConnection(); diff --git a/cpp/src/Ice/Exception.cpp b/cpp/src/Ice/Exception.cpp index a8bb8c6f81a..e5a3aed5b9f 100644 --- a/cpp/src/Ice/Exception.cpp +++ b/cpp/src/Ice/Exception.cpp @@ -502,6 +502,13 @@ Ice::InvocationTimeoutException::ice_print(ostream& out) const } void +Ice::InvocationCanceledException::ice_print(ostream& out) const +{ + Exception::ice_print(out); + out << ":\ninvocation canceled"; +} + +void Ice::ProtocolException::ice_print(ostream& out) const { Exception::ice_print(out); diff --git a/cpp/src/Ice/Makefile b/cpp/src/Ice/Makefile index 1398ab5d07a..f44e0c977a8 100644 --- a/cpp/src/Ice/Makefile +++ b/cpp/src/Ice/Makefile @@ -57,6 +57,7 @@ SLICE_OBJS = BuiltinSequences.o \ OBJS = Acceptor.o \ ACM.o \ Application.o \ + AsyncResult.o \ Base64.o \ BasicStream.o \ Buffer.o \ diff --git a/cpp/src/Ice/Makefile.mak b/cpp/src/Ice/Makefile.mak index 75cf8c26a68..fcc4d3eb3e9 100644 --- a/cpp/src/Ice/Makefile.mak +++ b/cpp/src/Ice/Makefile.mak @@ -59,6 +59,7 @@ WINDOWS_OBJS = .\DLLMain.obj OBJS = .\Acceptor.obj \ .\ACM.obj \ .\Application.obj \ + .\AsyncResult.obj \ .\Base64.obj \ .\BasicStream.obj \ .\Buffer.obj \ diff --git a/cpp/src/Ice/ObjectAdapterFactory.cpp b/cpp/src/Ice/ObjectAdapterFactory.cpp index ca7938bcc55..35ca16ea03e 100644 --- a/cpp/src/Ice/ObjectAdapterFactory.cpp +++ b/cpp/src/Ice/ObjectAdapterFactory.cpp @@ -211,7 +211,7 @@ IceInternal::ObjectAdapterFactory::removeObjectAdapter(const ObjectAdapterPtr& a } void -IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) const +IceInternal::ObjectAdapterFactory::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) const { list<ObjectAdapterIPtr> adapters; { diff --git a/cpp/src/Ice/ObjectAdapterFactory.h b/cpp/src/Ice/ObjectAdapterFactory.h index 7faa8ef73f4..37b3853497c 100644 --- a/cpp/src/Ice/ObjectAdapterFactory.h +++ b/cpp/src/Ice/ObjectAdapterFactory.h @@ -33,7 +33,7 @@ public: ::Ice::ObjectAdapterPtr createObjectAdapter(const std::string&, const Ice::RouterPrx&); ::Ice::ObjectAdapterPtr findObjectAdapter(const ::Ice::ObjectPrx&); void removeObjectAdapter(const ::Ice::ObjectAdapterPtr&); - void flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr&) const; + void flushAsyncBatchRequests(const CommunicatorFlushBatchPtr&) const; private: diff --git a/cpp/src/Ice/ObjectAdapterI.cpp b/cpp/src/Ice/ObjectAdapterI.cpp index 422cdb8fd01..c9a05c091ca 100644 --- a/cpp/src/Ice/ObjectAdapterI.cpp +++ b/cpp/src/Ice/ObjectAdapterI.cpp @@ -743,7 +743,7 @@ Ice::ObjectAdapterI::isLocal(const ObjectPrx& proxy) const } void -Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorBatchOutgoingAsyncPtr& outAsync) +Ice::ObjectAdapterI::flushAsyncBatchRequests(const CommunicatorFlushBatchPtr& outAsync) { vector<IncomingConnectionFactoryPtr> f; { diff --git a/cpp/src/Ice/ObjectAdapterI.h b/cpp/src/Ice/ObjectAdapterI.h index 3897c0ef557..30f499d96c0 100644 --- a/cpp/src/Ice/ObjectAdapterI.h +++ b/cpp/src/Ice/ObjectAdapterI.h @@ -87,7 +87,7 @@ public: bool isLocal(const ObjectPrx&) const; - void flushAsyncBatchRequests(const IceInternal::CommunicatorBatchOutgoingAsyncPtr&); + void flushAsyncBatchRequests(const IceInternal::CommunicatorFlushBatchPtr&); void updateConnectionObservers(); void updateThreadObservers(); diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp index 66509a2bedc..4815b9796fb 100644 --- a/cpp/src/Ice/Outgoing.cpp +++ b/cpp/src/Ice/Outgoing.cpp @@ -7,36 +7,44 @@ // // ********************************************************************** +#include <IceUtil/DisableWarnings.h> #include <Ice/Outgoing.h> -#include <Ice/Object.h> -#include <Ice/CollocatedRequestHandler.h> #include <Ice/ConnectionI.h> +#include <Ice/CollocatedRequestHandler.h> #include <Ice/Reference.h> -#include <Ice/Endpoint.h> -#include <Ice/LocalException.h> -#include <Ice/Protocol.h> #include <Ice/Instance.h> +#include <Ice/LocalException.h> #include <Ice/ReplyStatus.h> -#include <Ice/ProxyFactory.h> +#include <Ice/ImplicitContextI.h> using namespace std; using namespace Ice; using namespace Ice::Instrumentation; using namespace IceInternal; -IceInternal::Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, - const Context* context) : +OutgoingBase::OutgoingBase(Instance* instance, const string& operation) : + _os(instance, Ice::currentProtocolEncoding), _sent(false) +{ +} + +Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) : + OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _mode(mode), - _observer(proxy, operation, context), _state(StateUnsent), _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())), - _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _sent(false) + _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding) { checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol())); + _observer.attach(proxy, operation, context); + + int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); + if(invocationTimeout > 0) + { + _invocationTimeoutDeadline = IceUtil::Time::now() + IceUtil::Time::milliSeconds(invocationTimeout); + } + switch(_proxy->__reference()->getMode()) { case Reference::ModeTwoway: @@ -129,7 +137,66 @@ Outgoing::~Outgoing() } bool -IceInternal::Outgoing::invoke() +Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +{ + return connection->sendRequest(this, compress, response); +} + +void +Outgoing::invokeCollocated(CollocatedRequestHandler* handler) +{ + handler->invokeRequest(this); +} + +void +Outgoing::sent() +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + if(_proxy->__reference()->getMode() != Reference::ModeTwoway) + { + _childObserver.detach(); + _state = StateOK; + } + _sent = true; + _monitor.notify(); + + // + // NOTE: At this point the stack allocated Outgoing object can be destroyed + // since the notify() on the monitor will release the thread waiting on the + // synchronous Ice call. + // +} + +void +Outgoing::completed(const Exception& ex) +{ + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + //assert(_state <= StateInProgress); + if(_state > StateInProgress) + { + // + // Response was already received but message + // didn't get removed first from the connection + // send message queue so it's possible we can be + // notified of failures. In this case, ignore the + // failure and assume the outgoing has been sent. + // + assert(_state != StateFailed); + _sent = true; + _monitor.notify(); + return; + } + + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + + _state = StateFailed; + _exception.reset(ex.ice_clone()); + _monitor.notify(); +} + +bool +Outgoing::invoke() { assert(_state == StateUnsent); @@ -146,6 +213,11 @@ IceInternal::Outgoing::invoke() { try { + if(_invocationTimeoutDeadline != IceUtil::Time() && _invocationTimeoutDeadline <= IceUtil::Time::now()) + { + throw Ice::InvocationTimeoutException(__FILE__, __LINE__); + } + _state = StateInProgress; _exception.reset(0); _sent = false; @@ -164,19 +236,18 @@ IceInternal::Outgoing::invoke() // // If the handler says it's not finished, we wait until we're done. // - int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); - if(invocationTimeout > 0) + if(_invocationTimeoutDeadline != IceUtil::Time()) { IceUtil::Time now = IceUtil::Time::now(); - IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(invocationTimeout); + timedOut = now >= _invocationTimeoutDeadline; while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut) { - _monitor.timedWait(deadline - now); + _monitor.timedWait(_invocationTimeoutDeadline - now); if((_state == StateInProgress || !_sent) && _state != StateFailed) { now = IceUtil::Time::now(); - timedOut = now >= deadline; + timedOut = now >= _invocationTimeoutDeadline; } } } @@ -191,15 +262,15 @@ IceInternal::Outgoing::invoke() if(timedOut) { - _handler->requestTimedOut(this); + _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__)); // // Wait for the exception to propagate. It's possible the request handler ignores - // the timeout if there was a failure shortly before requestTimedOut got called. + // the timeout if there was a failure shortly before requestCanceled got called. // In this case, the exception should be set on the Outgoing. // IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!_exception.get()) + while(_state == StateInProgress) { _monitor.wait(); } @@ -223,11 +294,23 @@ IceInternal::Outgoing::invoke() { try { - int interval = _proxy->__handleException(ex, _handler, _mode, _sent, cnt); - _observer.retried(); // Invocation is being retried. - if(interval > 0) + IceUtil::Time interval; + interval = IceUtil::Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt)); + if(interval > IceUtil::Time()) + { + if(_invocationTimeoutDeadline != IceUtil::Time()) + { + IceUtil::Time deadline = _invocationTimeoutDeadline - IceUtil::Time::now(); + if(deadline < interval) + { + interval = deadline; + } + } + IceUtil::ThreadControl::sleep(interval); + } + if(_invocationTimeoutDeadline == IceUtil::Time() || _invocationTimeoutDeadline > IceUtil::Time::now()) { - IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval)); + _observer.retried(); } } catch(const Ice::Exception& ex) @@ -243,7 +326,7 @@ IceInternal::Outgoing::invoke() } void -IceInternal::Outgoing::abort(const LocalException& ex) +Outgoing::abort(const LocalException& ex) { assert(_state == StateUnsent); @@ -261,67 +344,8 @@ IceInternal::Outgoing::abort(const LocalException& ex) ex.ice_throw(); } -bool -IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response) -{ - return connection->sendRequest(this, compress, response); -} - -void -IceInternal::Outgoing::invokeCollocated(CollocatedRequestHandler* handler) -{ - handler->invokeRequest(this); -} - -void -IceInternal::Outgoing::sent() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_proxy->__reference()->getMode() != Reference::ModeTwoway) - { - _childObserver.detach(); - _state = StateOK; - } - _sent = true; - _monitor.notify(); - - // - // NOTE: At this point the stack allocated Outgoing object can be destroyed - // since the notify() on the monitor will release the thread waiting on the - // synchronous Ice call. - // -} - void -IceInternal::Outgoing::finished(const Exception& ex) -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - //assert(_state <= StateInProgress); - if(_state > StateInProgress) - { - // - // Response was already received but message - // didn't get removed first from the connection - // send message queue so it's possible we can be - // notified of failures. In this case, ignore the - // failure and assume the outgoing has been sent. - // - assert(_state != StateFailed); - _sent = true; - _monitor.notify(); - return; - } - - _childObserver.failed(ex.ice_name()); - _childObserver.detach(); - - _state = StateFailed; - _exception.reset(ex.ice_clone()); - _monitor.notify(); -} - -void -IceInternal::Outgoing::finished(BasicStream& is) +Outgoing::completed(BasicStream& is) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -482,7 +506,7 @@ IceInternal::Outgoing::finished(BasicStream& is) } void -IceInternal::Outgoing::throwUserException() +Outgoing::throwUserException() { try { @@ -496,27 +520,22 @@ IceInternal::Outgoing::throwUserException() } } -IceInternal::BatchOutgoing::BatchOutgoing(IceProxy::Ice::Object* proxy, const string& name) : - _proxy(proxy), - _connection(0), - _sent(false), - _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding), - _observer(proxy, name, 0) +FlushBatch::FlushBatch(IceProxy::Ice::Object* proxy, const string& operation) : + OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _connection(0) { checkSupportedProtocol(proxy->__reference()->getProtocol()); + + _observer.attach(proxy->__reference()->getInstance().get(), operation); } -IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, const string& name) : - _proxy(0), - _connection(connection), - _sent(false), - _os(instance, Ice::currentProtocolEncoding), - _observer(instance, name) +FlushBatch::FlushBatch(ConnectionI* connection, Instance* instance, const string& operation) : + OutgoingBase(instance, operation), _proxy(0), _connection(connection) { + _observer.attach(instance, operation); } void -IceInternal::BatchOutgoing::invoke() +FlushBatch::invoke() { assert(_proxy || _connection); @@ -577,7 +596,8 @@ IceInternal::BatchOutgoing::invoke() if(timedOut) { - handler->requestTimedOut(this); + Ice::InvocationTimeoutException ex(__FILE__, __LINE__); + handler->requestCanceled(this, ex); // // Wait for the exception to propagate. It's possible the request handler ignores @@ -614,19 +634,19 @@ IceInternal::BatchOutgoing::invoke() } bool -IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bool) +FlushBatch::send(const Ice::ConnectionIPtr& connection, bool, bool) { return connection->flushBatchRequests(this); } void -IceInternal::BatchOutgoing::invokeCollocated(CollocatedRequestHandler* handler) +FlushBatch::invokeCollocated(CollocatedRequestHandler* handler) { handler->invokeBatchRequests(this); } void -IceInternal::BatchOutgoing::sent() +FlushBatch::sent() { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); _childObserver.detach(); @@ -635,14 +655,14 @@ IceInternal::BatchOutgoing::sent() _monitor.notify(); // - // NOTE: At this point the stack allocated BatchOutgoing object + // NOTE: At this point the stack allocated FlushBatch object // can be destroyed since the notify() on the monitor will release // the thread waiting on the synchronous Ice call. // } void -IceInternal::BatchOutgoing::finished(const Ice::Exception& ex) +FlushBatch::completed(const Ice::Exception& ex) { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); _childObserver.failed(ex.ice_name()); diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index fb57965b60f..b348b1c8cc8 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -9,18 +9,11 @@ #include <IceUtil/DisableWarnings.h> #include <Ice/OutgoingAsync.h> -#include <Ice/Object.h> #include <Ice/ConnectionI.h> #include <Ice/CollocatedRequestHandler.h> #include <Ice/Reference.h> #include <Ice/Instance.h> #include <Ice/LocalException.h> -#include <Ice/Properties.h> -#include <Ice/LoggerUtil.h> -#include <Ice/LocatorInfo.h> -#include <Ice/ProxyFactory.h> -#include <Ice/RouterInfo.h> -#include <Ice/Protocol.h> #include <Ice/ReplyStatus.h> #include <Ice/ImplicitContextI.h> #include <Ice/ThreadPool.h> @@ -30,431 +23,289 @@ using namespace std; using namespace Ice; using namespace IceInternal; -IceUtil::Shared* Ice::upCast(AsyncResult* p) { return p; } - -IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; } +IceUtil::Shared* IceInternal::upCast(OutgoingAsyncBase* p) { return p; } +IceUtil::Shared* IceInternal::upCast(ProxyOutgoingAsyncBase* p) { return p; } IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; } -IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; } - -const unsigned char Ice::AsyncResult::OK = 0x1; -const unsigned char Ice::AsyncResult::Done = 0x2; -const unsigned char Ice::AsyncResult::Sent = 0x4; -const unsigned char Ice::AsyncResult::EndCalled = 0x8; - -namespace -{ - -class AsynchronousException : public DispatchWorkItem -{ -public: - - AsynchronousException(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result, - const Ice::Exception& ex) : - DispatchWorkItem(connection), _result(result), _exception(ex.ice_clone()) - { - } - - virtual void - run() - { - _result->__invokeException(*_exception.get()); - } - -private: - - const Ice::AsyncResultPtr _result; - const IceUtil::UniquePtr<Ice::Exception> _exception; -}; - -class AsynchronousSent : public DispatchWorkItem -{ -public: - - AsynchronousSent(const Ice::ConnectionPtr& connection, const Ice::AsyncResultPtr& result) : - DispatchWorkItem(connection), _result(result) - { - } - - virtual void - run() - { - _result->__invokeSent(); - } +IceUtil::Shared* IceInternal::upCast(CommunicatorFlushBatch* p) { return p; } -private: - - const Ice::AsyncResultPtr _result; -}; - -}; - -Ice::AsyncResult::AsyncResult(const CommunicatorPtr& communicator, - const IceInternal::InstancePtr& instance, - const string& op, - const CallbackBasePtr& del, - const LocalObjectPtr& cookie) : - _communicator(communicator), - _instance(instance), - _operation(op), - _callback(del), - _cookie(cookie), - _is(instance.get(), Ice::currentProtocolEncoding), - _os(instance.get(), Ice::currentProtocolEncoding), - _state(0), - _sentSynchronously(false), - _exception(0) +bool +OutgoingAsyncBase::sent() { - if(!_callback) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__); - } - const_cast<CallbackBasePtr&>(_callback) = _callback->verify(_cookie); + return sent(true); } -Ice::AsyncResult::~AsyncResult() +bool +OutgoingAsyncBase::completed(const Exception& ex) { + return finished(ex); } -Int -Ice::AsyncResult::getHash() const +OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + AsyncResult(communicator, instance, operation, delegate, cookie), + _os(instance.get(), Ice::currentProtocolEncoding) { - return static_cast<Int>(reinterpret_cast<Long>(this) >> 4); } bool -Ice::AsyncResult::isCompleted() const +OutgoingAsyncBase::sent(bool done) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - return _state & Done; -} - -void -Ice::AsyncResult::waitForCompleted() -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!(_state & Done)) + if(done) { - _monitor.wait(); + _childObserver.detach(); } + return AsyncResult::sent(done); } bool -Ice::AsyncResult::isSent() const -{ - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - return _state & Sent; -} - -void -Ice::AsyncResult::waitForSent() +OutgoingAsyncBase::finished(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - while(!(_state & Sent) && !_exception.get()) + if(_childObserver) { - _monitor.wait(); + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); } + return AsyncResult::finished(ex); } -void -Ice::AsyncResult::throwLocalException() const +Ice::ObjectPrx +ProxyOutgoingAsyncBase::getProxy() const { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_exception.get()) - { - _exception.get()->ice_throw(); - } + return _proxy; } bool -Ice::AsyncResult::__wait() +ProxyOutgoingAsyncBase::sent() { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(_state & EndCalled) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once"); - } - _state |= EndCalled; - while(!(_state & Done)) - { - _monitor.wait(); - } - if(_exception.get()) - { - _exception.get()->ice_throw(); - } - return _state & OK; + return sent(!_proxy->ice_isTwoway()); // Done if it's not a two-way proxy (no response expected). } -void -Ice::AsyncResult::__throwUserException() +bool +ProxyOutgoingAsyncBase::completed(const Exception& exc) { - try - { - _is.startReadEncaps(); - _is.throwException(); - } - catch(const Ice::UserException&) + if(_childObserver) { - _is.endReadEncaps(); - throw; + _childObserver.failed(exc.ice_name()); + _childObserver.detach(); } -} -void -Ice::AsyncResult::__invokeSent() -{ // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. + // NOTE: at this point, synchronization isn't needed, no other threads should be + // calling on the callback. // - - if(_callback) + try { - try - { - AsyncResultPtr self(this); - _callback->sent(self); - } - catch(const std::exception& ex) - { - __warning(ex); - } - catch(...) - { - __warning(); - } + _instance->retryQueue()->add(this, handleException(exc)); + return false; } - - if(_observer) + catch(const Exception& ex) { - Ice::ObjectPrx proxy = getProxy(); - if(!proxy || !proxy->ice_isTwoway()) - { - _observer.detach(); - } + return finished(ex); // No retries, we're done } } void -Ice::AsyncResult::__invokeSentAsync() +ProxyOutgoingAsyncBase::retry() { - // - // This is called when it's not safe to call the sent callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - try - { - _instance->clientThreadPool()->dispatch(new AsynchronousSent(_cachedConnection, this)); - } - catch(const Ice::CommunicatorDestroyedException&) - { - } + invokeImpl(false); } void -Ice::AsyncResult::__invokeException(const Ice::Exception& ex) +ProxyOutgoingAsyncBase::abort(const Ice::Exception& ex) { + assert(!_childObserver); + + if(finished(ex)) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _exception.reset(ex.ice_clone()); - _monitor.notifyAll(); + invokeCompletedAsync(); + } + else if(dynamic_cast<const Ice::CommunicatorDestroyedException*>(&ex)) + { + // + // If it's a communicator destroyed exception, don't swallow + // it but instead notify the user thread. Even if no callback + // was provided. + // + ex.ice_throw(); } - - __invokeCompleted(); } -void -Ice::AsyncResult::__invokeExceptionAsync(const Ice::Exception& ex) +ProxyOutgoingAsyncBase::ProxyOutgoingAsyncBase(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + OutgoingAsyncBase(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), + _proxy(prx), + _mode(Normal), + _cnt(0), + _sent(false) { - // - // This is called when it's not safe to call the exception callback synchronously - // from this thread. Instead the exception callback is called asynchronously from - // the client thread pool. - // - // CommunicatorDestroyedException is the only exception that can propagate directly - // from this method. - // - _instance->clientThreadPool()->dispatch(new AsynchronousException(_cachedConnection, this, ex)); } void -Ice::AsyncResult::__invokeCompleted() +ProxyOutgoingAsyncBase::invokeImpl(bool userThread) { - // - // Note: no need to change the _state here, specializations are responsible for - // changing the state. - // - - if(_callback) + try { - try + if(userThread) { - AsyncResultPtr self(this); - _callback->completed(self); + int invocationTimeout = _proxy->__reference()->getInvocationTimeout(); + if(invocationTimeout > 0) + { + _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); + } } - catch(const std::exception& ex) + else { - __warning(ex); + checkCanceled(); // Cancellation exception aren't retriable + _observer.retried(); } - catch(...) + + while(true) { - __warning(); + try + { + _sent = false; + _handler = _proxy->__getRequestHandler(); + AsyncStatus status = _handler->sendAsyncRequest(this); + if(status & AsyncStatusSent) + { + if(userThread) + { + _sentSynchronously = true; + if(status & AsyncStatusInvokeSentCallback) + { + invokeSent(); // Call the sent callback from the user thread. + } + } + else + { + if(status & AsyncStatusInvokeSentCallback) + { + invokeSentAsync(); // Call the sent callback from a client thread pool thread. + } + } + } + return; // We're done! + } + catch(const RetryException& ex) + { + handleRetryException(ex); + } + catch(const Exception& ex) + { + if(_childObserver) + { + _childObserver.failed(ex.ice_name()); + _childObserver.detach(); + } + int interval = handleException(ex); + if(interval > 0) + { + _instance->retryQueue()->add(this, interval); + return; + } + else + { + checkCanceled(); // Cancellation exception aren't retriable + _observer.retried(); + } + } } } - - _observer.detach(); -} - -void -Ice::AsyncResult::runTimerTask() // Implementation of TimerTask::runTimerTask() -{ - RequestHandlerPtr handler; + catch(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - swap(handler, _timeoutRequestHandler); - } - - if(handler) - { - handler->asyncRequestTimedOut(OutgoingAsyncMessageCallbackPtr::dynamicCast(this)); + // + // If called from the user thread we re-throw, the exception + // will be catch by the caller and abort() will be called. + // + if(userThread) + { + throw; + } + else if(finished(ex)) // No retries, we're done + { + invokeCompletedAsync(); + } } } - -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation) +bool +ProxyOutgoingAsyncBase::sent(bool done) { - __check(r, operation); - if(r->getProxy().get() != prx) + _sent = true; + if(done) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation + - " does not match proxy that was used to call corresponding begin_" + - operation + " method"); + if(_proxy->__reference()->getInvocationTimeout() > 0) + { + _instance->timer()->cancel(this); + } } + return OutgoingAsyncBase::sent(done); } -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation) +bool +ProxyOutgoingAsyncBase::finished(const Exception& ex) { - __check(r, operation); - if(r->getCommunicator().get() != com) + if(_proxy->__reference()->getInvocationTimeout() > 0) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation + - " does not match communicator that was used to call corresponding " + - "begin_" + operation + " method"); + _instance->timer()->cancel(this); } + return OutgoingAsyncBase::finished(ex); } -void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation) +bool +ProxyOutgoingAsyncBase::finished(bool ok) { - __check(r, operation); - if(r->getConnection().get() != con) + if(_proxy->__reference()->getInvocationTimeout() > 0) { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation + - " does not match connection that was used to call corresponding " + - "begin_" + operation + " method"); + _instance->timer()->cancel(this); } + return AsyncResult::finished(ok); } void -Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation) +ProxyOutgoingAsyncBase::handleRetryException(const RetryException& exc) { - if(!r) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null"); - } - else if(&r->_operation != &operation) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation + - " method: " + r->_operation); - } + _proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry. } - -void -Ice::AsyncResult::__warning(const std::exception& exc) const +int +ProxyOutgoingAsyncBase::handleException(const Exception& exc) { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) - { - Warning out(_instance->initializationData().logger); - const Exception* ex = dynamic_cast<const Exception*>(&exc); - if(ex) - { - out << "Ice::Exception raised by AMI callback:\n" << *ex; - } - else - { - out << "std::exception raised by AMI callback:\n" << exc.what(); - } - } + return _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); } void -Ice::AsyncResult::__warning() const +ProxyOutgoingAsyncBase::runTimerTask() { - if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0) + try { - Warning out(_instance->initializationData().logger); - out << "unknown exception raised by AMI callback"; + cancel(InvocationTimeoutException(__FILE__, __LINE__)); } -} - -void -IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const ThreadPoolPtr& threadPool, - const Ice::ConnectionPtr& connection) -{ - class InvocationTimeoutCall : public DispatchWorkItem + catch(const CommunicatorDestroyedException&) { - public: - - InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : - DispatchWorkItem(connection), _outAsync(outAsync) - { - } - - virtual void - run() - { - InvocationTimeoutException ex(__FILE__, __LINE__); - _outAsync->__finished(ex); - } - - private: - - const OutgoingAsyncMessageCallbackPtr _outAsync; - }; - threadPool->dispatch(new InvocationTimeoutCall(this, connection)); + } } -IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), - _proxy(prx), +OutgoingAsync::OutgoingAsync(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(prx, operation, delegate, cookie), _encoding(getCompatibleEncoding(prx->__reference()->getEncoding())) { } void -IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context) +OutgoingAsync::prepare(const string& operation, OperationMode mode, const Context* context) { - _handler = 0; - _cnt = 0; - _sent = false; - _mode = mode; - _sentSynchronously = false; - checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); + _mode = mode; _observer.attach(_proxy.get(), operation, context); switch(_proxy->__reference()->getMode()) @@ -482,7 +333,7 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod { _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. } - catch(const Ice::LocalException& ex) + catch(const LocalException& ex) { _observer.failed(ex.ice_name()); _proxy->__setRequestHandler(_handler, 0); // Clear request handler @@ -541,109 +392,63 @@ IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMod } AsyncStatus -IceInternal::OutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool compress, bool response) +OutgoingAsync::send(const ConnectionIPtr& connection, bool compress, bool response) { _cachedConnection = connection; return connection->sendAsyncRequest(this, compress, response); } AsyncStatus -IceInternal::OutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler) +OutgoingAsync::invokeCollocated(CollocatedRequestHandler* handler) { return handler->invokeAsyncRequest(this); } -bool -IceInternal::OutgoingAsync::__sent() +void +OutgoingAsync::abort(const Exception& ex) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - - bool alreadySent = _state & Sent; // Expected in case of a retry. - _state |= Sent; - _sent = true; - - assert(!(_state & Done)); - if(!_proxy->ice_isTwoway()) + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - _childObserver.detach(); - if(!_callback || !_callback->hasSentCallback()) + if(_handler) { - _observer.detach(); - } - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; + // + // If we didn't finish a batch oneway or datagram request, we + // must notify the connection about that we give up ownership + // of the batch stream. + // + _handler->abortBatchRequest(); } - _state |= Done | OK; - //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. } - _monitor.notifyAll(); - return !alreadySent && _callback && _callback->hasSentCallback(); -} - -void -IceInternal::OutgoingAsync::__invokeSent() -{ - ::Ice::AsyncResult::__invokeSent(); + + ProxyOutgoingAsyncBase::abort(ex); } void -IceInternal::OutgoingAsync::__finished(const Ice::Exception& exc) +OutgoingAsync::invoke() { + const Reference::Mode mode = _proxy->__reference()->getMode(); + if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!(_state & Done)); - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - if(_timeoutRequestHandler) + if(_handler) { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; + _sentSynchronously = true; + _handler->finishBatchRequest(&_os); + finished(true); } + return; // Don't call sent/completed callback for batch AMI requests } // - // NOTE: at this point, synchronization isn't needed, no other threads should be - // calling on the callback. + // NOTE: invokeImpl doesn't throw so this can be called from the + // try block with the catch block calling abort() in case of an + // exception. // - try - { - handleException(exc); - } - catch(const Ice::Exception& ex) - { - __invokeException(ex); - } -} - -void -IceInternal::OutgoingAsync::__invokeExceptionAsync(const Ice::Exception& ex) -{ - if((_state & Done) == 0 && _handler) - { - // - // If we didn't finish a batch oneway or datagram request, we - // must notify the connection about that we give up ownership - // of the batch stream. - // - int mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _handler->abortBatchRequest(); - } - } - AsyncResult::__invokeExceptionAsync(ex); -} - -void -IceInternal::OutgoingAsync::__processRetry() -{ - __invoke(false); + invokeImpl(true); // userThread = true } bool -IceInternal::OutgoingAsync::__finished() +OutgoingAsync::completed() { // // NOTE: this method is called from ConnectionI.parseMessage @@ -652,25 +457,15 @@ IceInternal::OutgoingAsync::__finished() // assert(_proxy->ice_isTwoway()); // Can only be called for twoways. - Ice::Byte replyStatus; - try + if(_childObserver) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!_exception.get() && !(_state & Done)); - assert(!_is.b.empty()); - - if(_childObserver) - { - _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); - } + _childObserver->reply(static_cast<Int>(_is.b.size() - headerSize - 4)); _childObserver.detach(); + } - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - + Byte replyStatus; + try + { _is.read(replyStatus); switch(replyStatus) @@ -789,373 +584,197 @@ IceInternal::OutgoingAsync::__finished() } } - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - if(replyStatus == replyOK) - { - _state |= OK; - } - _monitor.notifyAll(); - - if(!_callback) - { - _observer.detach(); - return false; - } - return true; + return finished(replyStatus == replyOK); } - catch(const LocalException& exc) + catch(const Exception& ex) { - // - // We don't call finished(exc) here because we don't want - // to invoke the completion callback. The completion - // callback is invoked by the connection is this method - // returns true. - // - try - { - handleException(exc); - return false; // Invocation will be retried. - } - catch(const Ice::Exception& ex) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _exception.reset(ex.ice_clone()); - _monitor.notifyAll(); - - if(!_callback) - { - _observer.detach(); - return false; - } - return true; - } + return completed(ex); } } +ProxyFlushBatch::ProxyFlushBatch(const ObjectPrx& proxy, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(proxy, operation, delegate, cookie) +{ + _observer.attach(proxy.get(), operation, 0); +} + bool -IceInternal::OutgoingAsync::__invoke(bool userThread) +ProxyFlushBatch::sent() { - const Reference::Mode mode = _proxy->__reference()->getMode(); - if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram) - { - _state |= Done | OK; - _handler->finishBatchRequest(&_os); - _observer.detach(); - return true; - } + return ProxyOutgoingAsyncBase::sent(true); // Overriden because the flush is done even if using a two-way proxy. +} - while(true) - { - try - { - _sent = false; - _handler = _proxy->__getRequestHandler(); - AsyncStatus status = _handler->sendAsyncRequest(this); - if(status & AsyncStatusSent) - { - if(userThread) - { - _sentSynchronously = true; - if(status & AsyncStatusInvokeSentCallback) - { - __invokeSent(); // Call the sent callback from the user thread. - } - } - else - { - if(status & AsyncStatusInvokeSentCallback) - { - __invokeSentAsync(); // Call the sent callback from a client thread pool thread. - } - } - } +AsyncStatus +ProxyFlushBatch::send(const ConnectionIPtr& connection, bool, bool) +{ + _cachedConnection = connection; + return connection->flushAsyncBatchRequests(this); +} - if(mode == Reference::ModeTwoway || !(status & AsyncStatusSent)) - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(!(_state & Done)) - { - int invocationTimeout = _handler->getReference()->getInvocationTimeout(); - if(invocationTimeout > 0) - { - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); - _timeoutRequestHandler = _handler; - } - } - } - } - catch(const RetryException&) - { - _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry. - continue; - } - catch(const Ice::Exception& ex) - { - handleException(ex); - } - break; - } - return _sentSynchronously; +AsyncStatus +ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler) +{ + return handler->invokeAsyncBatchRequests(this); } void -IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) +ProxyFlushBatch::invoke() { - try - { - int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); - _observer.retried(); // Invocation is being retried. - - // - // Schedule the retry. Note that we always schedule the retry - // on the retry queue even if the invocation can be retried - // immediately. This is required because it might not be safe - // to retry from this thread (this is for instance called by - // finished(BasicStream) which is called with the connection - // locked. - // - _instance->retryQueue()->add(this, interval); - } - catch(const Ice::Exception& ex) - { - _observer.failed(ex.ice_name()); - throw; - } + checkSupportedProtocol(getCompatibleProtocol(_proxy->__reference()->getProtocol())); + invokeImpl(true); // userThread = true } -IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, delegate, cookie) +void +ProxyFlushBatch::handleRetryException(const RetryException& ex) { + _proxy->__setRequestHandler(_handler, 0); // Clear request handler + ex.get()->ice_throw(); // No retries, we want to notify the user of potentially lost batch requests } -AsyncStatus -IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool) +int +ProxyFlushBatch::handleException(const Exception& ex) { - _cachedConnection = connection; - return connection->flushAsyncBatchRequests(this); + _proxy->__setRequestHandler(_handler, 0); // Clear request handler + ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests + return 0; } -AsyncStatus -IceInternal::BatchOutgoingAsync::__invokeCollocated(CollocatedRequestHandler* handler) +ProxyGetConnection::ProxyGetConnection(const ObjectPrx& prx, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + ProxyOutgoingAsyncBase(prx, operation, delegate, cookie) { - return handler->invokeAsyncBatchRequests(this); + _observer.attach(prx.get(), operation, 0); } -bool -IceInternal::BatchOutgoingAsync::__sent() +AsyncStatus +ProxyGetConnection::send(const ConnectionIPtr& connection, bool, bool) { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - assert(!_exception.get()); - _state |= Done | OK | Sent; - //_os.resize(0); // Don't clear the buffer now, it's needed for collocation optimization. - _childObserver.detach(); - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - _monitor.notifyAll(); - if(!_callback || !_callback->hasSentCallback()) + _cachedConnection = connection; + if(finished(true)) { - _observer.detach(); - return false; + invokeCompletedAsync(); } - return true; + return AsyncStatusSent; } -void -IceInternal::BatchOutgoingAsync::__invokeSent() +AsyncStatus +ProxyGetConnection::invokeCollocated(CollocatedRequestHandler*) { - ::Ice::AsyncResult::__invokeSent(); + if(finished(true)) + { + invokeCompletedAsync(); + } + return AsyncStatusSent; } void -IceInternal::BatchOutgoingAsync::__finished(const Ice::Exception& exc) +ProxyGetConnection::invoke() { - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _childObserver.failed(exc.ice_name()); - _childObserver.detach(); - if(_timeoutRequestHandler) - { - _instance->timer()->cancel(this); - _timeoutRequestHandler = 0; - } - } - __invokeException(exc); + invokeImpl(true); // userThread = true } -void -IceInternal::BatchOutgoingAsync::__processRetry() +ConnectionFlushBatch::ConnectionFlushBatch(const ConnectionIPtr& connection, + const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& delegate, + const LocalObjectPtr& cookie) : + OutgoingAsyncBase(communicator, instance, operation, delegate, cookie), _connection(connection) { - assert(false); // Retries are never scheduled + _observer.attach(instance.get(), operation); } -IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - BatchOutgoingAsync(proxy->ice_getCommunicator(), proxy->__reference()->getInstance(), operation, delegate, cookie), - _proxy(proxy) +ConnectionPtr +ConnectionFlushBatch::getConnection() const { - _observer.attach(proxy.get(), operation, 0); + return _connection; } void -IceInternal::ProxyBatchOutgoingAsync::__invoke() +ConnectionFlushBatch::invoke() { - checkSupportedProtocol(_proxy->__reference()->getProtocol()); - - RequestHandlerPtr handler; try { - handler = _proxy->__getRequestHandler(); - AsyncStatus status = handler->sendAsyncRequest(this); + AsyncStatus status = _connection->flushAsyncBatchRequests(this); if(status & AsyncStatusSent) { _sentSynchronously = true; if(status & AsyncStatusInvokeSentCallback) { - __invokeSent(); - } - } - else - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - if(!(_state & Done)) - { - int invocationTimeout = handler->getReference()->getInvocationTimeout(); - if(invocationTimeout > 0) - { - _instance->timer()->schedule(this, IceUtil::Time::milliSeconds(invocationTimeout)); - _timeoutRequestHandler = handler; - } + invokeSent(); } } } - catch(const RetryException&) - { - // - // Clear request handler but don't retry or throw. Retrying - // isn't useful, there were no batch requests associated with - // the proxy's request handler. - // - _proxy->__setRequestHandler(handler, 0); - } - catch(const Ice::Exception& ex) + catch(const Exception& ex) { - _observer.failed(ex.ice_name()); - _proxy->__setRequestHandler(handler, 0); // Clear request handler - throw; // Throw to notify the user that batch requests were potentially lost. - } -} - -IceInternal::ConnectionBatchOutgoingAsync::ConnectionBatchOutgoingAsync(const ConnectionIPtr& con, - const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - BatchOutgoingAsync(communicator, instance, operation, delegate, cookie), - _connection(con) -{ - _observer.attach(instance.get(), operation); -} - -void -IceInternal::ConnectionBatchOutgoingAsync::__invoke() -{ - AsyncStatus status = _connection->flushAsyncBatchRequests(this); - if(status & AsyncStatusSent) - { - _sentSynchronously = true; - if(status & AsyncStatusInvokeSentCallback) + if(completed(ex)) { - __invokeSent(); + invokeCompletedAsync(); } } } -Ice::ConnectionPtr -IceInternal::ConnectionBatchOutgoingAsync::getConnection() const +CommunicatorFlushBatch::CommunicatorFlushBatch(const CommunicatorPtr& communicator, + const InstancePtr& instance, + const string& operation, + const CallbackBasePtr& cb, + const LocalObjectPtr& cookie) : + AsyncResult(communicator, instance, operation, cb, cookie) { - return _connection; -} + _observer.attach(instance.get(), operation); -IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(const CommunicatorPtr& communicator, - const InstancePtr& instance, - const string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(communicator, instance, operation, delegate, cookie) -{ // // _useCount is initialized to 1 to prevent premature callbacks. // The caller must invoke ready() after all flush requests have // been initiated. // _useCount = 1; - - // - // Assume all connections are flushed synchronously. - // - _sentSynchronously = true; - - // - // Attach observer - // - _observer.attach(instance.get(), operation); } void -IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPtr& con) +CommunicatorFlushBatch::flushConnection(const ConnectionIPtr& con) { - class BatchOutgoingAsyncI : public BatchOutgoingAsync + class FlushBatch : public OutgoingAsyncBase { public: - - BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync, - const InstancePtr& instance, - InvocationObserver& observer) : - BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), - _outAsync(outAsync), _observer(observer) + + FlushBatch(const CommunicatorFlushBatchPtr& outAsync, + const InstancePtr& instance, + InvocationObserver& observer) : + OutgoingAsyncBase(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), + _outAsync(outAsync), + _observer(observer) { } - virtual bool __sent() + virtual bool sent() { _childObserver.detach(); _outAsync->check(false); return false; } -#ifdef __SUNPRO_CC - using BatchOutgoingAsync::__sent; -#endif - - virtual void __finished(const Ice::Exception& ex) + virtual bool completed(const Exception& ex) { _childObserver.failed(ex.ice_name()); _childObserver.detach(); _outAsync->check(false); + return false; } - virtual void __attachRemoteObserver(const Ice::ConnectionInfoPtr& connection, const Ice::EndpointPtr& endpt, - Ice::Int requestId, Ice::Int sz) + private: + + virtual InvocationObserver& getObserver() { - _childObserver.attach(_observer.getRemoteObserver(connection, endpt, requestId, sz)); + return _observer; } - private: - - const CommunicatorBatchOutgoingAsyncPtr _outAsync; + const CommunicatorFlushBatchPtr _outAsync; InvocationObserver& _observer; }; @@ -1166,13 +785,9 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt try { - AsyncStatus status = con->flushAsyncBatchRequests(new BatchOutgoingAsyncI(this, _instance, _observer)); - if(!(status & AsyncStatusSent)) - { - _sentSynchronously = false; - } + con->flushAsyncBatchRequests(new FlushBatch(this, _instance, _observer)); } - catch(const Ice::LocalException&) + catch(const LocalException&) { check(false); throw; @@ -1180,19 +795,13 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt } void -IceInternal::CommunicatorBatchOutgoingAsync::ready() +CommunicatorFlushBatch::ready() { check(true); } void -IceInternal::CommunicatorBatchOutgoingAsync::__processRetry() -{ - assert(false); // Retries are never scheduled -} - -void -IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) +CommunicatorFlushBatch::check(bool userThread) { { IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); @@ -1201,255 +810,18 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) { return; } - _state |= Done | OK | Sent; - _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation - _monitor.notifyAll(); } - if(!_callback || !_callback->hasSentCallback()) + if(sent(true)) { - _observer.detach(); - } - else - { - // - // _sentSynchronously is immutable here. - // - if(!_sentSynchronously || !userThread) + if(userThread) { - __invokeSentAsync(); + _sentSynchronously = true; + invokeSent(); } else { - AsyncResult::__invokeSent(); - } - } -} - -IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& prx, - const std::string& operation, - const CallbackBasePtr& delegate, - const Ice::LocalObjectPtr& cookie) : - AsyncResult(prx->ice_getCommunicator(), prx->__reference()->getInstance(), operation, delegate, cookie), - _proxy(prx), - _cnt(0) -{ - _observer.attach(prx.get(), operation, 0); -} - -void -IceInternal::GetConnectionOutgoingAsync::__invoke() -{ - while(true) - { - try - { - _handler = _proxy->__getRequestHandler(); - _handler->sendAsyncRequest(this); - } - catch(const RetryException&) - { - _proxy->__setRequestHandler(_handler, 0); - } - catch(const Ice::Exception& ex) - { - handleException(ex); + invokeSentAsync(); } - break; - } -} - -AsyncStatus -IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool) -{ - __sent(); - return AsyncStatusSent; -} - -AsyncStatus -IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*) -{ - __sent(); - return AsyncStatusSent; -} - -bool -IceInternal::GetConnectionOutgoingAsync::__sent() -{ - { - IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); - _state |= Done; - _monitor.notifyAll(); - } - __invokeCompleted(); - return false; -} - -void -IceInternal::GetConnectionOutgoingAsync::__invokeSent() -{ - // No sent callback -} - -void -IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc) -{ - try - { - handleException(exc); - } - catch(const Ice::Exception& ex) - { - __invokeException(ex); - } -} - -void -IceInternal::GetConnectionOutgoingAsync::__processRetry() -{ - __invoke(); -} - -void -IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc) -{ - try - { - _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt)); - _observer.retried(); // Invocation is being retried. - } - catch(const Ice::Exception& ex) - { - _observer.failed(ex.ice_name()); - throw; - } -} - -namespace -{ - -// -// Dummy class derived from CallbackBase -// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn, -// this allows us to test whether the user supplied a null delegate instance to the -// generated begin_ method without having to generate a separate test to throw IllegalArgumentException -// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated -// object code. -// -class DummyCallback : public CallbackBase -{ -public: - - DummyCallback() - { - } - - virtual void - completed(const Ice::AsyncResultPtr&) const - { - assert(false); - } - - virtual CallbackBasePtr - verify(const Ice::LocalObjectPtr&) - { - // - // Called by the AsyncResult constructor to verify the delegate. The dummy - // delegate is passed when the user used a begin_ method without delegate. - // By returning 0 here, we tell the AsyncResult that no delegates was - // provided. - // - return 0; - } - - virtual void - sent(const AsyncResultPtr&) const - { - assert(false); - } - - virtual bool - hasSentCallback() const - { - assert(false); - return false; - } -}; - -} - -// -// This gives a pointer value to compare against in the generated -// begin_ method to decide whether the caller passed a null pointer -// versus the generated inline version of the begin_ method having -// passed a pointer to the dummy delegate. -// -CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback; - -#ifdef ICE_CPP11 - -Ice::CallbackPtr -Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& completed, - const ::IceInternal::Function<void (const AsyncResultPtr&)>& sent) -{ - class Cpp11CB : public GenericCallbackBase - { - public: - - Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed, - const ::std::function<void (const AsyncResultPtr&)>& sent) : - _completed(completed), - _sent(sent) - { - checkCallback(true, completed != nullptr); - } - - virtual void - completed(const AsyncResultPtr& result) const - { - _completed(result); - } - - virtual CallbackBasePtr - verify(const LocalObjectPtr&) - { - return this; // Nothing to do, the cookie is not type-safe. - } - - virtual void - sent(const AsyncResultPtr& result) const - { - if(_sent != nullptr) - { - _sent(result); - } - } - - virtual bool - hasSentCallback() const - { - return _sent != nullptr; - } - - private: - - ::std::function< void (const AsyncResultPtr&)> _completed; - ::std::function< void (const AsyncResultPtr&)> _sent; - }; - - return new Cpp11CB(completed, sent); -} -#endif - -void -IceInternal::CallbackBase::checkCallback(bool obj, bool cb) -{ - if(!obj) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback object cannot be null"); - } - if(!cb) - { - throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "callback cannot be null"); } } diff --git a/cpp/src/Ice/Proxy.cpp b/cpp/src/Ice/Proxy.cpp index f455f5e6361..13bd0120a8f 100644 --- a/cpp/src/Ice/Proxy.cpp +++ b/cpp/src/Ice/Proxy.cpp @@ -217,15 +217,15 @@ IceProxy::Ice::Object::begin_ice_isA(const string& typeId, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_isA_name, del, cookie); try { - __result->__prepare(ice_isA_name, Nonmutating, ctx); - IceInternal::BasicStream* __os = __result->__startWriteParams(DefaultFormat); + __result->prepare(ice_isA_name, Nonmutating, ctx); + IceInternal::BasicStream* __os = __result->startWriteParams(DefaultFormat); __os->write(typeId); - __result->__endWriteParams(); - __result->__invoke(true); + __result->endWriteParams(); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -233,12 +233,11 @@ IceProxy::Ice::Object::begin_ice_isA(const string& typeId, #ifdef ICE_CPP11 Ice::AsyncResultPtr -IceProxy::Ice::Object::__begin_ice_isA( - const ::std::string& typeId, - const ::Ice::Context* ctx, - const ::IceInternal::Function<void (bool)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, - const ::IceInternal::Function<void (bool)>& sent) +IceProxy::Ice::Object::__begin_ice_isA(const ::std::string& typeId, + const ::Ice::Context* ctx, + const ::IceInternal::Function<void (bool)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, + const ::IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public ::IceInternal::Cpp11FnCallbackNC { @@ -281,11 +280,10 @@ IceProxy::Ice::Object::__begin_ice_isA( } Ice::AsyncResultPtr -IceProxy::Ice::Object::__begin_ice_id( - const ::Ice::Context* ctx, - const ::IceInternal::Function<void (const ::std::string&)>& response, - const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, - const ::IceInternal::Function<void (bool)>& sent) +IceProxy::Ice::Object::__begin_ice_id(const ::Ice::Context* ctx, + const ::IceInternal::Function<void (const ::std::string&)>& response, + const ::IceInternal::Function<void (const ::Ice::Exception&)>& exception, + const ::IceInternal::Function<void (bool)>& sent) { class Cpp11CB : public ::IceInternal::Cpp11FnCallbackNC { @@ -573,13 +571,13 @@ IceProxy::Ice::Object::begin_ice_ping(const Context* ctx, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_ping_name, del, cookie); try { - __result->__prepare(ice_ping_name, Nonmutating, ctx); - __result->__writeEmptyParams(); - __result->__invoke(true); + __result->prepare(ice_ping_name, Nonmutating, ctx); + __result->writeEmptyParams(); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -647,13 +645,13 @@ IceProxy::Ice::Object::begin_ice_ids(const Context* ctx, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_ids_name, del, cookie); try { - __result->__prepare(ice_ids_name, Nonmutating, ctx); - __result->__writeEmptyParams(); - __result->__invoke(true); + __result->prepare(ice_ids_name, Nonmutating, ctx); + __result->writeEmptyParams(); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -690,13 +688,13 @@ IceProxy::Ice::Object::begin_ice_id(const Context* ctx, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_id_name, del, cookie); try { - __result->__prepare(ice_id_name, Nonmutating, ctx); - __result->__writeEmptyParams(); - __result->__invoke(true); + __result->prepare(ice_id_name, Nonmutating, ctx); + __result->writeEmptyParams(); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -818,13 +816,13 @@ IceProxy::Ice::Object::begin_ice_invoke(const string& operation, OutgoingAsyncPtr __result = new OutgoingAsync(this, ice_invoke_name, del, cookie); try { - __result->__prepare(operation, mode, ctx); - __result->__writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first)); - __result->__invoke(true); + __result->prepare(operation, mode, ctx); + __result->writeParamEncaps(inEncaps.first, static_cast<Int>(inEncaps.second - inEncaps.first)); + __result->invoke(); } catch(const Exception& __ex) { - __result->__invokeExceptionAsync(__ex); + __result->abort(__ex); } return __result; } @@ -1389,17 +1387,16 @@ AsyncResultPtr IceProxy::Ice::Object::begin_ice_getConnectionInternal(const ::IceInternal::CallbackBasePtr& del, const ::Ice::LocalObjectPtr& cookie) { - ::IceInternal::GetConnectionOutgoingAsyncPtr __result = - new ::IceInternal::GetConnectionOutgoingAsync(this, ice_getConnection_name, del, cookie); + ProxyGetConnectionPtr result = new ProxyGetConnection(this, ice_getConnection_name, del, cookie); try { - __result->__invoke(); + result->invoke(); } - catch(const Exception& __ex) + catch(const Exception& ex) { - __result->__invokeExceptionAsync(__ex); + result->abort(ex); } - return __result; + return result; } ConnectionPtr @@ -1435,32 +1432,31 @@ IceProxy::Ice::Object::ice_getCachedConnection() const void IceProxy::Ice::Object::ice_flushBatchRequests() { - BatchOutgoing __og(this, ice_flushBatchRequests_name); - __og.invoke(); + FlushBatch og(this, ice_flushBatchRequests_name); + og.invoke(); } ::Ice::AsyncResultPtr IceProxy::Ice::Object::begin_ice_flushBatchRequestsInternal(const ::IceInternal::CallbackBasePtr& del, const ::Ice::LocalObjectPtr& cookie) { - ::IceInternal::ProxyBatchOutgoingAsyncPtr __result = - new ::IceInternal::ProxyBatchOutgoingAsync(this, ice_flushBatchRequests_name, del, cookie); + ProxyFlushBatchPtr result = new ProxyFlushBatch(this, ice_flushBatchRequests_name, del, cookie); try { - __result->__invoke(); + result->invoke(); } - catch(const Exception& __ex) + catch(const Exception& ex) { - __result->__invokeExceptionAsync(__ex); + result->abort(ex); } - return __result; + return result; } void -IceProxy::Ice::Object::end_ice_flushBatchRequests(const AsyncResultPtr& __result) +IceProxy::Ice::Object::end_ice_flushBatchRequests(const AsyncResultPtr& result) { - AsyncResult::__check(__result, this, ice_flushBatchRequests_name); - __result->__wait(); + AsyncResult::__check(result, this, ice_flushBatchRequests_name); + result->__wait(); } Int diff --git a/cpp/src/Ice/ProxyFactory.cpp b/cpp/src/Ice/ProxyFactory.cpp index 68ca8b02598..fb2aea3e337 100644 --- a/cpp/src/Ice/ProxyFactory.cpp +++ b/cpp/src/Ice/ProxyFactory.cpp @@ -223,7 +223,7 @@ IceInternal::ProxyFactory::checkRetryAfterException(const LocalException& ex, co // // Don't retry invocation timeouts. // - if(dynamic_cast<const InvocationTimeoutException*>(&ex)) + if(dynamic_cast<const InvocationTimeoutException*>(&ex) || dynamic_cast<const InvocationCanceledException*>(&ex)) { ex.ice_throw(); } diff --git a/cpp/src/Ice/RequestHandler.cpp b/cpp/src/Ice/RequestHandler.cpp index 2cbf7826213..07ec753912d 100644 --- a/cpp/src/Ice/RequestHandler.cpp +++ b/cpp/src/Ice/RequestHandler.cpp @@ -14,6 +14,7 @@ using namespace std; using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(RequestHandler* p) { return p; } +IceUtil::Shared* IceInternal::upCast(CancellationHandler* p) { return p; } RetryException::RetryException(const Ice::LocalException& ex) { @@ -32,10 +33,6 @@ RetryException::get() const return _ex.get(); } -RequestHandler::~RequestHandler() -{ -} - RequestHandler::RequestHandler(const ReferencePtr& reference) : _reference(reference), _response(reference->getMode() == Reference::ModeTwoway) diff --git a/cpp/src/Ice/RequestHandler.h b/cpp/src/Ice/RequestHandler.h index 73c3e818b56..68ff00d647d 100644 --- a/cpp/src/Ice/RequestHandler.h +++ b/cpp/src/Ice/RequestHandler.h @@ -31,7 +31,7 @@ namespace IceInternal class BasicStream; -class OutgoingMessageCallback; +class OutgoingBase; // // An exception wrapper, which is used to notify that the request @@ -51,11 +51,17 @@ private: IceUtil::UniquePtr<Ice::LocalException> _ex; }; -class RequestHandler : virtual public ::IceUtil::Shared +class CancellationHandler : virtual public IceUtil::Shared { public: - virtual ~RequestHandler(); + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&) = 0; + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&) = 0; +}; + +class RequestHandler : public CancellationHandler +{ +public: virtual RequestHandlerPtr connect() = 0; virtual RequestHandlerPtr update(const RequestHandlerPtr&, const RequestHandlerPtr&) = 0; @@ -64,11 +70,8 @@ public: virtual void finishBatchRequest(BasicStream*) = 0; virtual void abortBatchRequest() = 0; - virtual bool sendRequest(OutgoingMessageCallback*) = 0; - virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncMessageCallbackPtr&) = 0; - - virtual void requestTimedOut(OutgoingMessageCallback*) = 0; - virtual void asyncRequestTimedOut(const OutgoingAsyncMessageCallbackPtr&) = 0; + virtual bool sendRequest(OutgoingBase*) = 0; + virtual AsyncStatus sendAsyncRequest(const OutgoingAsyncBasePtr&) = 0; const ReferencePtr& getReference() const { return _reference; } // Inlined for performances. diff --git a/cpp/src/Ice/RetryQueue.cpp b/cpp/src/Ice/RetryQueue.cpp index adfb6bf9440..c70ed379edd 100644 --- a/cpp/src/Ice/RetryQueue.cpp +++ b/cpp/src/Ice/RetryQueue.cpp @@ -18,7 +18,7 @@ using namespace IceInternal; IceUtil::Shared* IceInternal::upCast(RetryQueue* p) { return p; } -IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const AsyncResultPtr& outAsync) : +IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const ProxyOutgoingAsyncBasePtr& outAsync) : _queue(queue), _outAsync(outAsync) { } @@ -26,14 +26,7 @@ IceInternal::RetryTask::RetryTask(const RetryQueuePtr& queue, const AsyncResultP void IceInternal::RetryTask::runTimerTask() { - try - { - _outAsync->__processRetry(); - } - catch(const Ice::LocalException& ex) - { - _outAsync->__invokeExceptionAsync(ex); - } + _outAsync->retry(); // Retry again the invocation. // // NOTE: this must be called last, destroy() blocks until all task @@ -44,10 +37,37 @@ IceInternal::RetryTask::runTimerTask() _queue->remove(this); } +void +IceInternal::RetryTask::requestCanceled(OutgoingBase*, const Ice::LocalException&) +{ + assert(false); +} + +void +IceInternal::RetryTask::asyncRequestCanceled(const OutgoingAsyncBasePtr& outAsync, const Ice::LocalException&) +{ + if(_queue->cancel(this)) + { + // + // We just retry the outgoing async now rather than marking it + // as finished. The retry will check for the cancellation + // exception and terminate appropriately the request. + // + _outAsync->retry(); + } +} + void IceInternal::RetryTask::destroy() { - _outAsync->__invokeExceptionAsync(CommunicatorDestroyedException(__FILE__, __LINE__)); + try + { + _outAsync->abort(CommunicatorDestroyedException(__FILE__, __LINE__)); + } + catch(const CommunicatorDestroyedException&) + { + // Abort shouldn't throw if there's no callback, ignore. + } } bool @@ -61,7 +81,7 @@ IceInternal::RetryQueue::RetryQueue(const InstancePtr& instance) : _instance(ins } void -IceInternal::RetryQueue::add(const AsyncResultPtr& out, int interval) +IceInternal::RetryQueue::add(const ProxyOutgoingAsyncBasePtr& out, int interval) { Lock sync(*this); if(!_instance) @@ -78,6 +98,7 @@ IceInternal::RetryQueue::add(const AsyncResultPtr& out, int interval) throw CommunicatorDestroyedException(__FILE__, __LINE__); } _requests.insert(task); + out->cancelable(task); } void @@ -119,4 +140,17 @@ IceInternal::RetryQueue::remove(const RetryTaskPtr& task) } } - +bool +IceInternal::RetryQueue::cancel(const RetryTaskPtr& task) +{ + Lock sync(*this); + if(_requests.erase(task) > 0) + { + if(!_instance && _requests.empty()) + { + notify(); // If we are destroying the queue, destroy is probably waiting on the queue to be empty. + } + return _instance->timer()->cancel(task); + } + return false; +} diff --git a/cpp/src/Ice/RetryQueue.h b/cpp/src/Ice/RetryQueue.h index 5a0b7cf52a2..71c9c946d8d 100644 --- a/cpp/src/Ice/RetryQueue.h +++ b/cpp/src/Ice/RetryQueue.h @@ -16,25 +16,31 @@ #include <Ice/RetryQueueF.h> #include <Ice/OutgoingAsyncF.h> #include <Ice/InstanceF.h> +#include <Ice/RequestHandler.h> // For CancellationHandler namespace IceInternal { -class RetryTask : public IceUtil::TimerTask +class RetryTask : public IceUtil::TimerTask, public CancellationHandler { public: - RetryTask(const RetryQueuePtr&, const Ice::AsyncResultPtr&); + RetryTask(const RetryQueuePtr&, const ProxyOutgoingAsyncBasePtr&); virtual void runTimerTask(); + + virtual void requestCanceled(OutgoingBase*, const Ice::LocalException&); + virtual void asyncRequestCanceled(const OutgoingAsyncBasePtr&, const Ice::LocalException&); + void destroy(); bool operator<(const RetryTask&) const; + private: const RetryQueuePtr _queue; - const Ice::AsyncResultPtr _outAsync; + const ProxyOutgoingAsyncBasePtr _outAsync; }; typedef IceUtil::Handle<RetryTask> RetryTaskPtr; @@ -44,12 +50,13 @@ public: RetryQueue(const InstancePtr&); - void add(const Ice::AsyncResultPtr&, int); + void add(const ProxyOutgoingAsyncBasePtr&, int); void destroy(); private: void remove(const RetryTaskPtr&); + bool cancel(const RetryTaskPtr&); friend class RetryTask; InstancePtr _instance; diff --git a/cpp/src/Ice/winrt/Makefile.mak b/cpp/src/Ice/winrt/Makefile.mak index 7eda814e8f6..c05ba0bc7e0 100644 --- a/cpp/src/Ice/winrt/Makefile.mak +++ b/cpp/src/Ice/winrt/Makefile.mak @@ -7,13 +7,14 @@ # # ********************************************************************** -top_srcdir = ..\..\.. +top_srcdir = ..\..\.. LIBNAME = $(SDK_LIBRARY_PATH)\ice.lib TARGETS = $(LIBNAME) SOURCE_DIR = .. OBJS = $(ARCH)\$(CONFIG)\Acceptor.obj \ $(ARCH)\$(CONFIG)\ACM.obj \ + $(ARCH)\$(CONFIG)\AsyncResult.obj \ $(ARCH)\$(CONFIG)\Base64.obj \ $(ARCH)\$(CONFIG)\Buffer.obj \ $(ARCH)\$(CONFIG)\BasicStream.obj \ diff --git a/cpp/src/IceDiscovery/LocatorI.h b/cpp/src/IceDiscovery/LocatorI.h index 700f4305129..bb48d8cb509 100644 --- a/cpp/src/IceDiscovery/LocatorI.h +++ b/cpp/src/IceDiscovery/LocatorI.h @@ -13,6 +13,8 @@ #include <Ice/Locator.h> #include <Ice/ProxyF.h> +#include <set> + namespace IceDiscovery { diff --git a/cpp/src/IceDiscovery/LookupI.h b/cpp/src/IceDiscovery/LookupI.h index 1a249b4e0da..58e1e7ca044 100644 --- a/cpp/src/IceDiscovery/LookupI.h +++ b/cpp/src/IceDiscovery/LookupI.h @@ -13,6 +13,7 @@ #include <IceDiscovery/IceDiscovery.h> #include <IceDiscovery/LocatorI.h> +#include <IceUtil/Timer.h> #include <Ice/Properties.h> namespace IceDiscovery diff --git a/cpp/src/IceGrid/AdapterCache.h b/cpp/src/IceGrid/AdapterCache.h index 8c37a623def..78258c2ff54 100644 --- a/cpp/src/IceGrid/AdapterCache.h +++ b/cpp/src/IceGrid/AdapterCache.h @@ -16,6 +16,8 @@ #include <IceGrid/Query.h> #include <IceGrid/Internal.h> +#include <set> + namespace IceGrid { diff --git a/cpp/src/slice2cpp/Gen.cpp b/cpp/src/slice2cpp/Gen.cpp index 5bf6c662656..57f3337c06f 100644 --- a/cpp/src/slice2cpp/Gen.cpp +++ b/cpp/src/slice2cpp/Gen.cpp @@ -373,8 +373,7 @@ Slice::Gen::generate(const UnitPtr& p) { H << "\n#include <Ice/Proxy.h>"; H << "\n#include <Ice/GCObject.h>"; - H << "\n#include <Ice/Outgoing.h>"; - H << "\n#include <Ice/OutgoingAsync.h>"; + H << "\n#include <Ice/AsyncResult.h>"; H << "\n#include <Ice/Incoming.h>"; if(p->hasContentsWithMetaData("amd")) { @@ -382,11 +381,14 @@ Slice::Gen::generate(const UnitPtr& p) } C << "\n#include <Ice/LocalException.h>"; C << "\n#include <Ice/ObjectFactory.h>"; + C << "\n#include <Ice/Outgoing.h>"; + C << "\n#include <Ice/OutgoingAsync.h>"; } else if(p->hasLocalClassDefsWithAsync()) { H << "\n#include <Ice/Proxy.h>"; - H << "\n#include <Ice/OutgoingAsync.h>"; + H << "\n#include <Ice/AsyncResult.h>"; + C << "\n#include <Ice/OutgoingAsync.h>"; } else if(p->hasNonLocalClassDecls()) { @@ -2184,26 +2186,26 @@ Slice::Gen::ProxyVisitor::visitOperation(const OperationPtr& p) C << flatName << ", __del, __cookie);"; C << nl << "try"; C << sb; - C << nl << "__result->__prepare(" << flatName << ", " << operationModeToString(p->sendMode()) << ", __ctx);"; + C << nl << "__result->prepare(" << flatName << ", " << operationModeToString(p->sendMode()) << ", __ctx);"; if(inParams.empty()) { - C << nl << "__result->__writeEmptyParams();"; + C << nl << "__result->writeEmptyParams();"; } else { - C << nl << "::IceInternal::BasicStream* __os = __result->__startWriteParams(" << opFormatTypeToString(p) <<");"; + C << nl << "::IceInternal::BasicStream* __os = __result->startWriteParams(" << opFormatTypeToString(p) <<");"; writeMarshalCode(C, inParams, 0, TypeContextInParam); if(p->sendsClasses(false)) { C << nl << "__os->writePendingObjects();"; } - C << nl << "__result->__endWriteParams();"; + C << nl << "__result->endWriteParams();"; } - C << nl << "__result->__invoke(true);"; + C << nl << "__result->invoke();"; C << eb; C << nl << "catch(const ::Ice::Exception& __ex)"; C << sb; - C << nl << "__result->__invokeExceptionAsync(__ex);"; + C << nl << "__result->abort(__ex);"; C << eb; C << nl << "return __result;"; C << eb; diff --git a/cpp/src/slice2cs/Gen.cpp b/cpp/src/slice2cs/Gen.cpp index b877d04ce69..8ba2d479750 100644 --- a/cpp/src/slice2cs/Gen.cpp +++ b/cpp/src/slice2cs/Gen.cpp @@ -5312,8 +5312,8 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p) _out << sb; if(op->returnsData()) { - _out << nl << "IceInternal.OutgoingAsync outAsync__ = (IceInternal.OutgoingAsync)r__;"; - _out << nl << "IceInternal.OutgoingAsync.check(outAsync__, this, " << flatName << ");"; + _out << nl << "IceInternal.OutgoingAsync outAsync__ = IceInternal.OutgoingAsync.check(r__, this, " + << flatName << ");"; _out << nl << "try"; _out << sb; @@ -5480,11 +5480,11 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p) _out << nl << "result__.writeEmptyParams();"; } - _out << nl << "result__.invoke(true);"; + _out << nl << "result__.invoke();"; _out << eb; _out << nl << "catch(Ice.Exception ex__)"; _out << sb; - _out << nl << "result__.invokeExceptionAsync(ex__);"; + _out << nl << "result__.abort(ex__);"; _out << eb; _out << nl << "return result__;"; _out << eb; diff --git a/cpp/src/slice2java/Gen.cpp b/cpp/src/slice2java/Gen.cpp index a288cfd8290..8eaccb5650e 100644 --- a/cpp/src/slice2java/Gen.cpp +++ b/cpp/src/slice2java/Gen.cpp @@ -4752,8 +4752,8 @@ Slice::Gen::HelperVisitor::visitClassDefStart(const ClassDefPtr& p) out << sb; if(op->returnsData()) { - out << nl << "IceInternal.OutgoingAsyncBase __result = (IceInternal.OutgoingAsyncBase)__iresult;"; - out << nl << "IceInternal.OutgoingAsyncBase.check(__result, this, __" << op->name() << "_name);"; + out << nl << "IceInternal.OutgoingAsync __result = IceInternal.OutgoingAsync.check(__iresult, this, __" + << op->name() << "_name);"; out << nl << "try"; out << sb; @@ -5737,11 +5737,11 @@ Slice::Gen::HelperVisitor::writeOperation(const ClassDefPtr& p, const string& pa out << nl << "__result.writeEmptyParams();"; } - out << nl << "__result.invoke(true);"; + out << nl << "__result.invoke();"; out << eb; out << nl << "catch(Ice.Exception __ex)"; out << sb; - out << nl << "__result.invokeExceptionAsync(__ex);"; + out << nl << "__result.abort(__ex);"; out << eb; out << nl << "return __result;"; out << eb; |