diff options
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r-- | cpp/src/Ice/OutgoingAsync.cpp | 135 |
1 files changed, 113 insertions, 22 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp index 509ba18ad06..cd3442113f8 100644 --- a/cpp/src/Ice/OutgoingAsync.cpp +++ b/cpp/src/Ice/OutgoingAsync.cpp @@ -38,6 +38,7 @@ IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; } IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; } +IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; } const unsigned char Ice::AsyncResult::OK = 0x1; const unsigned char Ice::AsyncResult::Done = 0x2; @@ -414,7 +415,7 @@ IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const Thr { public: - InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : + InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) : DispatchWorkItem(connection), _outAsync(outAsync) { } @@ -901,7 +902,7 @@ IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc) { int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt); _observer.retried(); // Invocation is being retried. - + // // Schedule the retry. Note that we always schedule the retry // on the retry queue even if the invocation can be retried @@ -928,7 +929,7 @@ IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& commu { } -AsyncStatus +AsyncStatus IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool) { _cachedConnection = connection; @@ -1094,7 +1095,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons // Assume all connections are flushed synchronously. // _sentSynchronously = true; - + // // Attach observer // @@ -1109,7 +1110,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt public: BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync, - const InstancePtr& instance, + const InstancePtr& instance, InvocationObserver& observer) : BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0), _outAsync(outAsync), _observer(observer) @@ -1141,7 +1142,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt } private: - + const CommunicatorBatchOutgoingAsyncPtr _outAsync; InvocationObserver& _observer; }; @@ -1181,7 +1182,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) if(--_useCount > 0) { return; - } + } _state |= Done | OK | Sent; _os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation _monitor.notifyAll(); @@ -1207,6 +1208,96 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread) } } +IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& proxy, + const std::string& operation, + const CallbackBasePtr& delegate, + const Ice::LocalObjectPtr& cookie) : + OutgoingAsync(proxy, operation, delegate, cookie) +{ + _observer.attach(proxy.get(), operation, 0); +} + +void +IceInternal::GetConnectionOutgoingAsync::__invoke() +{ + while(true) + { + try + { + _handler = _proxy->__getRequestHandler(); + _handler->sendAsyncRequest(this); + } + catch(const RetryException&) + { + _proxy->__setRequestHandler(_handler, 0); + } + catch(const Ice::Exception& ex) + { + handleException(ex); + } + break; + } +} + +AsyncStatus +IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool) +{ + __sent(); + return AsyncStatusSent; +} + +AsyncStatus +IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*) +{ + __sent(); + return AsyncStatusSent; +} + +bool +IceInternal::GetConnectionOutgoingAsync::__sent() +{ + { + IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor); + _state |= Done; + _monitor.notifyAll(); + } + __invokeCompleted(); + return false; +} + +void +IceInternal::GetConnectionOutgoingAsync::__invokeSent() +{ + // No sent callback +} + +void +IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc) +{ + try + { + handleException(exc); + } + catch(const Ice::Exception& ex) + { + __invokeException(ex); + } +} + +void +IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc) +{ + try + { + _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt)); + _observer.retried(); // Invocation is being retried. + } + catch(const Ice::Exception& ex) + { + _observer.failed(ex.ice_name()); + throw; + } +} namespace { @@ -1227,13 +1318,13 @@ public: { } - virtual void + virtual void completed(const Ice::AsyncResultPtr&) const { assert(false); } - virtual CallbackBasePtr + virtual CallbackBasePtr verify(const Ice::LocalObjectPtr&) { // @@ -1245,13 +1336,13 @@ public: return 0; } - virtual void + virtual void sent(const AsyncResultPtr&) const { assert(false); } - virtual bool + virtual bool hasSentCallback() const { assert(false); @@ -1281,25 +1372,25 @@ Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& co Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed, const ::std::function<void (const AsyncResultPtr&)>& sent) : - _completed(completed), + _completed(completed), _sent(sent) { checkCallback(true, completed != nullptr); } - - virtual void + + virtual void completed(const AsyncResultPtr& result) const { _completed(result); } - - virtual CallbackBasePtr + + virtual CallbackBasePtr verify(const LocalObjectPtr&) { return this; // Nothing to do, the cookie is not type-safe. } - - virtual void + + virtual void sent(const AsyncResultPtr& result) const { if(_sent != nullptr) @@ -1307,19 +1398,19 @@ Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& co _sent(result); } } - - virtual bool + + virtual bool hasSentCallback() const { return _sent != nullptr; } - + private: ::std::function< void (const AsyncResultPtr&)> _completed; ::std::function< void (const AsyncResultPtr&)> _sent; }; - + return new Cpp11CB(completed, sent); } #endif |