summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Outgoing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r--cpp/src/Ice/Outgoing.cpp251
1 files changed, 132 insertions, 119 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 5601a705641..cb0a9ff53fb 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -93,6 +93,8 @@ IceInternal::LocalExceptionWrapper::retry() const
IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation, OperationMode mode,
const Context* context, InvocationObserver& observer) :
_handler(handler),
+ _exceptionWrapper(false),
+ _exceptionWrapperRetry(false),
_observer(observer),
_state(StateUnsent),
_encoding(getCompatibleEncoding(handler->getReference()->getEncoding())),
@@ -183,72 +185,61 @@ IceInternal::Outgoing::invoke()
switch(_handler->getReference()->getMode())
{
case Reference::ModeTwoway:
+ case Reference::ModeOneway:
+ case Reference::ModeDatagram:
{
_state = StateInProgress;
- Ice::ConnectionI* connection = _handler->sendRequest(this);
- assert(connection);
+ if(_handler->sendRequest(this)) // Request sent and no response expected, we're done.
+ {
+ return true;
+ }
bool timedOut = false;
-
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
//
- // If the request is being sent in the background we first wait for the
- // sent notification.
+ // If the handler says it's not finished, we wait until we're done.
//
- while(_state != StateFailed && !_sent)
+ int invocationTimeout = _handler->getReference()->getInvocationTimeout();
+ if(invocationTimeout > 0)
{
- _monitor.wait();
- }
+ IceUtil::Time now = IceUtil::Time::now();
+ IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(invocationTimeout);
+ while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut)
+ {
+ _monitor.timedWait(deadline - now);
- //
- // Wait until the request has completed, or until the request times out.
- //
-
- Int timeout = connection->timeout();
- while(_state == StateInProgress && !timedOut)
- {
- if(timeout >= 0)
- {
- _monitor.timedWait(IceUtil::Time::milliSeconds(timeout));
-
- if(_state == StateInProgress)
+ if((_state == StateInProgress || !_sent) && _state != StateFailed)
{
- timedOut = true;
+ now = IceUtil::Time::now();
+ timedOut = now >= deadline;
}
}
- else
+ }
+ else
+ {
+ while((_state == StateInProgress || !_sent) && _state != StateFailed)
{
_monitor.wait();
}
}
}
-
+
if(timedOut)
{
- //
- // Must be called outside the synchronization of this
- // object.
- //
- connection->exception(TimeoutException(__FILE__, __LINE__));
-
- //
- // We must wait until the exception set above has
- // propagated to this Outgoing object.
- //
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(_state == StateInProgress)
- {
- _monitor.wait();
- }
- }
+ _handler->requestTimedOut(this);
+ assert(_exception.get());
}
if(_exception.get())
{
+ if(_exceptionWrapper)
+ {
+ throw LocalExceptionWrapper(*_exception.get(), _exceptionWrapperRetry);
+ }
+
//
// A CloseConnectionException indicates graceful
// server shutdown, and is therefore always repeatable
@@ -277,39 +268,9 @@ IceInternal::Outgoing::invoke()
//
throw LocalExceptionWrapper(*_exception.get(), false);
}
-
- if(_state == StateUserException)
- {
- return false;
- }
- else
- {
- assert(_state == StateOK);
- return true;
- }
- }
-
- case Reference::ModeOneway:
- case Reference::ModeDatagram:
- {
- _state = StateInProgress;
- if(_handler->sendRequest(this))
- {
- //
- // If the handler returns the connection, we must wait for the sent callback.
- //
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(_state != StateFailed && !_sent)
- {
- _monitor.wait();
- }
- if(_exception.get())
- {
- _exception->ice_throw();
- }
- }
- return true;
+ assert(_state != StateInProgress);
+ return _state == StateOK;
}
case Reference::ModeBatchOneway:
@@ -349,28 +310,23 @@ IceInternal::Outgoing::abort(const LocalException& ex)
ex.ice_throw();
}
+bool
+IceInternal::Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
+{
+ return connection->sendRequest(this, compress, response);
+}
+
void
-IceInternal::Outgoing::sent(bool notify)
+IceInternal::Outgoing::sent()
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
if(_handler->getReference()->getMode() != Reference::ModeTwoway)
{
_remoteObserver.detach();
+ _state = StateOK;
}
-
- if(notify)
- {
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _sent = true;
- _monitor.notify();
- }
- else
- {
- //
- // No synchronization is necessary if called from sendRequest() because the connection
- // send mutex is locked and no other threads can call on Outgoing until it's released.
- //
- _sent = true;
- }
+ _sent = true;
+ _monitor.notify();
//
// NOTE: At this point the stack allocated Outgoing object can be destroyed
@@ -380,6 +336,35 @@ IceInternal::Outgoing::sent(bool notify)
}
void
+IceInternal::Outgoing::finished(const LocalException& ex, bool sent)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(_state <= StateInProgress);
+ _remoteObserver.failed(ex.ice_name());
+ _remoteObserver.detach();
+
+ _state = StateFailed;
+ _exception.reset(ex.ice_clone());
+ _sent = sent;
+ _monitor.notify();
+}
+
+void
+IceInternal::Outgoing::finished(const LocalExceptionWrapper& ex)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _remoteObserver.failed(ex.get()->ice_name());
+ _remoteObserver.detach();
+
+ _state = StateFailed;
+ _exceptionWrapper = true;
+ _exceptionWrapperRetry = ex.retry();
+ _exception.reset(ex.get()->ice_clone());
+ _sent = false;
+ _monitor.notify();
+}
+
+void
IceInternal::Outgoing::finished(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
@@ -541,20 +526,6 @@ IceInternal::Outgoing::finished(BasicStream& is)
}
void
-IceInternal::Outgoing::finished(const LocalException& ex, bool sent)
-{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(_state <= StateInProgress);
- _remoteObserver.failed(ex.ice_name());
- _remoteObserver.detach();
-
- _state = StateFailed;
- _exception.reset(ex.ice_clone());
- _sent = sent;
- _monitor.notify();
-}
-
-void
IceInternal::Outgoing::throwUserException()
{
try
@@ -592,35 +563,77 @@ void
IceInternal::BatchOutgoing::invoke()
{
assert(_handler || _connection);
- if((_handler && !_handler->flushBatchRequests(this)) || (_connection && !_connection->flushBatchRequests(this)))
+
+ int timeout;
+ if(_connection)
+ {
+ if(_connection->flushBatchRequests(this))
+ {
+ return;
+ }
+ timeout = -1;
+ }
+ else
+ {
+ if(_handler->sendRequest(this))
+ {
+ return;
+ }
+ timeout = _handler->getReference()->getInvocationTimeout();
+ }
+
+ bool timedOut = false;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- while(!_exception.get() && !_sent)
+ if(timeout > 0)
{
- _monitor.wait();
+ IceUtil::Time now = IceUtil::Time::now();
+ IceUtil::Time deadline = now + IceUtil::Time::milliSeconds(timeout);
+ while(!_exception.get() && !_sent && !timedOut)
+ {
+ _monitor.timedWait(deadline - now);
+ if(!_exception.get() && !_sent)
+ {
+ now = IceUtil::Time::now();
+ timedOut = now >= deadline;
+ }
+ }
}
- if(_exception.get())
+ else
{
- _exception->ice_throw();
+ while(!_exception.get() && !_sent)
+ {
+ _monitor.wait();
+ }
}
}
-}
-void
-IceInternal::BatchOutgoing::sent(bool notify)
-{
- _remoteObserver.detach();
-
- if(notify)
+ if(timedOut)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- _sent = true;
- _monitor.notify();
+ _handler->requestTimedOut(this);
+ assert(_exception.get());
}
- else
+
+ if(_exception.get())
{
- _sent = true;
+ _exception->ice_throw();
}
+}
+
+bool
+IceInternal::BatchOutgoing::send(const Ice::ConnectionIPtr& connection, bool, bool)
+{
+ return connection->flushBatchRequests(this);
+}
+
+void
+IceInternal::BatchOutgoing::sent()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _remoteObserver.detach();
+
+ _sent = true;
+ _monitor.notify();
//
// NOTE: At this point the stack allocated BatchOutgoing object