summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp135
1 files changed, 113 insertions, 22 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 509ba18ad06..cd3442113f8 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -38,6 +38,7 @@ IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(ProxyBatchOutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(ConnectionBatchOutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(CommunicatorBatchOutgoingAsync* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(GetConnectionOutgoingAsync* p) { return p; }
const unsigned char Ice::AsyncResult::OK = 0x1;
const unsigned char Ice::AsyncResult::Done = 0x2;
@@ -414,7 +415,7 @@ IceInternal::OutgoingAsyncMessageCallback::__dispatchInvocationTimeout(const Thr
{
public:
- InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) :
+ InvocationTimeoutCall(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::ConnectionPtr& connection) :
DispatchWorkItem(connection), _outAsync(outAsync)
{
}
@@ -901,7 +902,7 @@ IceInternal::OutgoingAsync::handleException(const Ice::Exception& exc)
{
int interval = _proxy->__handleException(exc, _handler, _mode, _sent, _cnt);
_observer.retried(); // Invocation is being retried.
-
+
//
// Schedule the retry. Note that we always schedule the retry
// on the retry queue even if the invocation can be retried
@@ -928,7 +929,7 @@ IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const CommunicatorPtr& commu
{
}
-AsyncStatus
+AsyncStatus
IceInternal::BatchOutgoingAsync::__send(const Ice::ConnectionIPtr& connection, bool, bool)
{
_cachedConnection = connection;
@@ -1094,7 +1095,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::CommunicatorBatchOutgoingAsync(cons
// Assume all connections are flushed synchronously.
//
_sentSynchronously = true;
-
+
//
// Attach observer
//
@@ -1109,7 +1110,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
public:
BatchOutgoingAsyncI(const CommunicatorBatchOutgoingAsyncPtr& outAsync,
- const InstancePtr& instance,
+ const InstancePtr& instance,
InvocationObserver& observer) :
BatchOutgoingAsync(outAsync->getCommunicator(), instance, outAsync->getOperation(), __dummyCallback, 0),
_outAsync(outAsync), _observer(observer)
@@ -1141,7 +1142,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::flushConnection(const ConnectionIPt
}
private:
-
+
const CommunicatorBatchOutgoingAsyncPtr _outAsync;
InvocationObserver& _observer;
};
@@ -1181,7 +1182,7 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
if(--_useCount > 0)
{
return;
- }
+ }
_state |= Done | OK | Sent;
_os.resize(0); // Clear buffer now, instead of waiting for AsyncResult deallocation
_monitor.notifyAll();
@@ -1207,6 +1208,96 @@ IceInternal::CommunicatorBatchOutgoingAsync::check(bool userThread)
}
}
+IceInternal::GetConnectionOutgoingAsync::GetConnectionOutgoingAsync(const Ice::ObjectPrx& proxy,
+ const std::string& operation,
+ const CallbackBasePtr& delegate,
+ const Ice::LocalObjectPtr& cookie) :
+ OutgoingAsync(proxy, operation, delegate, cookie)
+{
+ _observer.attach(proxy.get(), operation, 0);
+}
+
+void
+IceInternal::GetConnectionOutgoingAsync::__invoke()
+{
+ while(true)
+ {
+ try
+ {
+ _handler = _proxy->__getRequestHandler();
+ _handler->sendAsyncRequest(this);
+ }
+ catch(const RetryException&)
+ {
+ _proxy->__setRequestHandler(_handler, 0);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ handleException(ex);
+ }
+ break;
+ }
+}
+
+AsyncStatus
+IceInternal::GetConnectionOutgoingAsync::__send(const Ice::ConnectionIPtr&, bool, bool)
+{
+ __sent();
+ return AsyncStatusSent;
+}
+
+AsyncStatus
+IceInternal::GetConnectionOutgoingAsync::__invokeCollocated(CollocatedRequestHandler*)
+{
+ __sent();
+ return AsyncStatusSent;
+}
+
+bool
+IceInternal::GetConnectionOutgoingAsync::__sent()
+{
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _state |= Done;
+ _monitor.notifyAll();
+ }
+ __invokeCompleted();
+ return false;
+}
+
+void
+IceInternal::GetConnectionOutgoingAsync::__invokeSent()
+{
+ // No sent callback
+}
+
+void
+IceInternal::GetConnectionOutgoingAsync::__finished(const Ice::Exception& exc)
+{
+ try
+ {
+ handleException(exc);
+ }
+ catch(const Ice::Exception& ex)
+ {
+ __invokeException(ex);
+ }
+}
+
+void
+IceInternal::GetConnectionOutgoingAsync::handleException(const Ice::Exception& exc)
+{
+ try
+ {
+ _instance->retryQueue()->add(this, _proxy->__handleException(exc, _handler, Ice::Idempotent, false, _cnt));
+ _observer.retried(); // Invocation is being retried.
+ }
+ catch(const Ice::Exception& ex)
+ {
+ _observer.failed(ex.ice_name());
+ throw;
+ }
+}
namespace
{
@@ -1227,13 +1318,13 @@ public:
{
}
- virtual void
+ virtual void
completed(const Ice::AsyncResultPtr&) const
{
assert(false);
}
- virtual CallbackBasePtr
+ virtual CallbackBasePtr
verify(const Ice::LocalObjectPtr&)
{
//
@@ -1245,13 +1336,13 @@ public:
return 0;
}
- virtual void
+ virtual void
sent(const AsyncResultPtr&) const
{
assert(false);
}
- virtual bool
+ virtual bool
hasSentCallback() const
{
assert(false);
@@ -1281,25 +1372,25 @@ Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& co
Cpp11CB(const ::std::function<void (const AsyncResultPtr&)>& completed,
const ::std::function<void (const AsyncResultPtr&)>& sent) :
- _completed(completed),
+ _completed(completed),
_sent(sent)
{
checkCallback(true, completed != nullptr);
}
-
- virtual void
+
+ virtual void
completed(const AsyncResultPtr& result) const
{
_completed(result);
}
-
- virtual CallbackBasePtr
+
+ virtual CallbackBasePtr
verify(const LocalObjectPtr&)
{
return this; // Nothing to do, the cookie is not type-safe.
}
-
- virtual void
+
+ virtual void
sent(const AsyncResultPtr& result) const
{
if(_sent != nullptr)
@@ -1307,19 +1398,19 @@ Ice::newCallback(const ::IceInternal::Function<void (const AsyncResultPtr&)>& co
_sent(result);
}
}
-
- virtual bool
+
+ virtual bool
hasSentCallback() const
{
return _sent != nullptr;
}
-
+
private:
::std::function< void (const AsyncResultPtr&)> _completed;
::std::function< void (const AsyncResultPtr&)> _sent;
};
-
+
return new Cpp11CB(completed, sent);
}
#endif