summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Outgoing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r--cpp/src/Ice/Outgoing.cpp232
1 files changed, 126 insertions, 106 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 66509a2bedc..4815b9796fb 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -7,36 +7,44 @@
//
// **********************************************************************
+#include <IceUtil/DisableWarnings.h>
#include <Ice/Outgoing.h>
-#include <Ice/Object.h>
-#include <Ice/CollocatedRequestHandler.h>
#include <Ice/ConnectionI.h>
+#include <Ice/CollocatedRequestHandler.h>
#include <Ice/Reference.h>
-#include <Ice/Endpoint.h>
-#include <Ice/LocalException.h>
-#include <Ice/Protocol.h>
#include <Ice/Instance.h>
+#include <Ice/LocalException.h>
#include <Ice/ReplyStatus.h>
-#include <Ice/ProxyFactory.h>
+#include <Ice/ImplicitContextI.h>
using namespace std;
using namespace Ice;
using namespace Ice::Instrumentation;
using namespace IceInternal;
-IceInternal::Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode,
- const Context* context) :
+OutgoingBase::OutgoingBase(Instance* instance, const string& operation) :
+ _os(instance, Ice::currentProtocolEncoding), _sent(false)
+{
+}
+
+Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) :
+ OutgoingBase(proxy->__reference()->getInstance().get(), operation),
_proxy(proxy),
_mode(mode),
- _observer(proxy, operation, context),
_state(StateUnsent),
_encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())),
- _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
- _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
- _sent(false)
+ _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding)
{
checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol()));
+ _observer.attach(proxy, operation, context);
+
+ int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
+ if(invocationTimeout > 0)
+ {
+ _invocationTimeoutDeadline = IceUtil::Time::now() + IceUtil::Time::milliSeconds(invocationTimeout);
+ }
+
switch(_proxy->__reference()->getMode())
{
case Reference::ModeTwoway:
@@ -129,7 +137,66 @@ Outgoing::~Outgoing()
}
bool
-IceInternal::Outgoing::invoke()
+Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
+{
+ return connection->sendRequest(this, compress, response);
+}
+
+void
+Outgoing::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ handler->invokeRequest(this);
+}
+
+void
+Outgoing::sent()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
+ {
+ _childObserver.detach();
+ _state = StateOK;
+ }
+ _sent = true;
+ _monitor.notify();
+
+ //
+ // NOTE: At this point the stack allocated Outgoing object can be destroyed
+ // since the notify() on the monitor will release the thread waiting on the
+ // synchronous Ice call.
+ //
+}
+
+void
+Outgoing::completed(const Exception& ex)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ //assert(_state <= StateInProgress);
+ if(_state > StateInProgress)
+ {
+ //
+ // Response was already received but message
+ // didn't get removed first from the connection
+ // send message queue so it's possible we can be
+ // notified of failures. In this case, ignore the
+ // failure and assume the outgoing has been sent.
+ //
+ assert(_state != StateFailed);
+ _sent = true;
+ _monitor.notify();
+ return;
+ }
+
+ _childObserver.failed(ex.ice_name());
+ _childObserver.detach();
+
+ _state = StateFailed;
+ _exception.reset(ex.ice_clone());
+ _monitor.notify();
+}
+
+bool
+Outgoing::invoke()
{
assert(_state == StateUnsent);
@@ -146,6 +213,11 @@ IceInternal::Outgoing::invoke()
{
try
{
+ if(_invocationTimeoutDeadline != IceUtil::Time() && _invocationTimeoutDeadline <= IceUtil::Time::now())
+ {
+ throw Ice::InvocationTimeoutException(__FILE__, __LINE__);
+ }
+
_state = StateInProgress;
_exception.reset(0);
_sent = false;
@@ -164,19 +236,18 @@ IceInternal::Outgoing::invoke()
//
// If the handler says it's not finished, we wait until we're done.
//
- int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
- if(invocationTimeout > 0)
+ if(_invocationTimeoutDeadline != IceUtil::Time())
{
IceUtil::Time now = IceUtil::Time::now();
- IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(invocationTimeout);
+ timedOut = now >= _invocationTimeoutDeadline;
while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut)
{
- _monitor.timedWait(deadline - now);
+ _monitor.timedWait(_invocationTimeoutDeadline - now);
if((_state == StateInProgress || !_sent) && _state != StateFailed)
{
now = IceUtil::Time::now();
- timedOut = now >= deadline;
+ timedOut = now >= _invocationTimeoutDeadline;
}
}
}
@@ -191,15 +262,15 @@ IceInternal::Outgoing::invoke()
if(timedOut)
{
- _handler->requestTimedOut(this);
+ _handler->requestCanceled(this, InvocationTimeoutException(__FILE__, __LINE__));
//
// Wait for the exception to propagate. It's possible the request handler ignores
- // the timeout if there was a failure shortly before requestTimedOut got called.
+ // the timeout if there was a failure shortly before requestCanceled got called.
// In this case, the exception should be set on the Outgoing.
//
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(!_exception.get())
+ while(_state == StateInProgress)
{
_monitor.wait();
}
@@ -223,11 +294,23 @@ IceInternal::Outgoing::invoke()
{
try
{
- int interval = _proxy->__handleException(ex, _handler, _mode, _sent, cnt);
- _observer.retried(); // Invocation is being retried.
- if(interval > 0)
+ IceUtil::Time interval;
+ interval = IceUtil::Time::milliSeconds(_proxy->__handleException(ex, _handler, _mode, _sent, cnt));
+ if(interval > IceUtil::Time())
+ {
+ if(_invocationTimeoutDeadline != IceUtil::Time())
+ {
+ IceUtil::Time deadline = _invocationTimeoutDeadline - IceUtil::Time::now();
+ if(deadline < interval)
+ {
+ interval = deadline;
+ }
+ }
+ IceUtil::ThreadControl::sleep(interval);
+ }
+ if(_invocationTimeoutDeadline == IceUtil::Time() || _invocationTimeoutDeadline > IceUtil::Time::now())
{
- IceUtil::ThreadControl::sleep(IceUtil::Time::milliSeconds(interval));
+ _observer.retried();
}
}
catch(const Ice::Exception& ex)
@@ -243,7 +326,7 @@ IceInternal::Outgoing::invoke()
}
void
-IceInternal::Outgoing::abort(const LocalException& ex)
+Outgoing::abort(const LocalException& ex)
{
assert(_state == StateUnsent);
@@ -261,67 +344,8 @@ IceInternal::Outgoing::abort(const LocalException& ex)
ex.ice_throw();
}
-bool
-IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
-{
- return connection->sendRequest(this, compress, response);
-}
-
-void
-IceInternal::Outgoing::invokeCollocated(CollocatedRequestHandler* handler)
-{
- handler->invokeRequest(this);
-}
-
-void
-IceInternal::Outgoing::sent()
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
- {
- _childObserver.detach();
- _state = StateOK;
- }
- _sent = true;
- _monitor.notify();
-
- //
- // NOTE: At this point the stack allocated Outgoing object can be destroyed
- // since the notify() on the monitor will release the thread waiting on the
- // synchronous Ice call.
- //
-}
-
void
-IceInternal::Outgoing::finished(const Exception& ex)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- //assert(_state <= StateInProgress);
- if(_state > StateInProgress)
- {
- //
- // Response was already received but message
- // didn't get removed first from the connection
- // send message queue so it's possible we can be
- // notified of failures. In this case, ignore the
- // failure and assume the outgoing has been sent.
- //
- assert(_state != StateFailed);
- _sent = true;
- _monitor.notify();
- return;
- }
-
- _childObserver.failed(ex.ice_name());
- _childObserver.detach();
-
- _state = StateFailed;
- _exception.reset(ex.ice_clone());
- _monitor.notify();
-}
-
-void
-IceInternal::Outgoing::finished(BasicStream& is)
+Outgoing::completed(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -482,7 +506,7 @@ IceInternal::Outgoing::finished(BasicStream& is)
}
void
-IceInternal::Outgoing::throwUserException()
+Outgoing::throwUserException()
{
try
{
@@ -496,27 +520,22 @@ IceInternal::Outgoing::throwUserException()
}
}
-IceInternal::BatchOutgoing::BatchOutgoing(IceProxy::Ice::Object* proxy, const string& name) :
- _proxy(proxy),
- _connection(0),
- _sent(false),
- _os(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
- _observer(proxy, name, 0)
+FlushBatch::FlushBatch(IceProxy::Ice::Object* proxy, const string& operation) :
+ OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _connection(0)
{
checkSupportedProtocol(proxy->__reference()->getProtocol());
+
+ _observer.attach(proxy->__reference()->getInstance().get(), operation);
}
-IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance, const string& name) :
- _proxy(0),
- _connection(connection),
- _sent(false),
- _os(instance, Ice::currentProtocolEncoding),
- _observer(instance, name)
+FlushBatch::FlushBatch(ConnectionI* connection, Instance* instance, const string& operation) :
+ OutgoingBase(instance, operation), _proxy(0), _connection(connection)
{
+ _observer.attach(instance, operation);
}
void
-IceInternal::BatchOutgoing::invoke()
+FlushBatch::invoke()
{
assert(_proxy || _connection);
@@ -577,7 +596,8 @@ IceInternal::BatchOutgoing::invoke()
if(timedOut)
{
- handler->requestTimedOut(this);
+ Ice::InvocationTimeoutException ex(__FILE__, __LINE__);
+ handler->requestCanceled(this, ex);
//
// Wait for the exception to propagate. It's possible the request handler ignores
@@ -614,19 +634,19 @@ IceInternal::BatchOutgoing::invoke()
}
bool
-IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bool)
+FlushBatch::send(const Ice::ConnectionIPtr& connection, bool, bool)
{
return connection->flushBatchRequests(this);
}
void
-IceInternal::BatchOutgoing::invokeCollocated(CollocatedRequestHandler* handler)
+FlushBatch::invokeCollocated(CollocatedRequestHandler* handler)
{
handler->invokeBatchRequests(this);
}
void
-IceInternal::BatchOutgoing::sent()
+FlushBatch::sent()
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
_childObserver.detach();
@@ -635,14 +655,14 @@ IceInternal::BatchOutgoing::sent()
_monitor.notify();
//
- // NOTE: At this point the stack allocated BatchOutgoing object
+ // NOTE: At this point the stack allocated FlushBatch object
// can be destroyed since the notify() on the monitor will release
// the thread waiting on the synchronous Ice call.
//
}
void
-IceInternal::BatchOutgoing::finished(const Ice::Exception& ex)
+FlushBatch::completed(const Ice::Exception& ex)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
_childObserver.failed(ex.ice_name());