summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Outgoing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r--cpp/src/Ice/Outgoing.cpp211
1 files changed, 143 insertions, 68 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 8eedeffe6a1..14fb41c7afd 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -9,6 +9,7 @@
#include <Ice/Outgoing.h>
#include <Ice/Object.h>
+#include <Ice/RequestHandler.h>
#include <Ice/ConnectionI.h>
#include <Ice/Reference.h>
#include <Ice/Endpoint.h>
@@ -78,16 +79,15 @@ IceInternal::LocalExceptionWrapper::retry() const
return _retry;
}
-IceInternal::Outgoing::Outgoing(ConnectionI* connection, Reference* ref, const string& operation,
- OperationMode mode, const Context* context, bool compress) :
- _connection(connection),
- _reference(ref),
+IceInternal::Outgoing::Outgoing(RequestHandler* handler, const string& operation, OperationMode mode,
+ const Context* context) :
+ _handler(handler),
_state(StateUnsent),
- _is(ref->getInstance().get()),
- _os(ref->getInstance().get()),
- _compress(compress)
+ _is(handler->getReference()->getInstance().get()),
+ _os(handler->getReference()->getInstance().get()),
+ _sent(false)
{
- switch(_reference->getMode())
+ switch(_handler->getReference()->getMode())
{
case Reference::ModeTwoway:
case Reference::ModeOneway:
@@ -100,25 +100,25 @@ IceInternal::Outgoing::Outgoing(ConnectionI* connection, Reference* ref, const s
case Reference::ModeBatchOneway:
case Reference::ModeBatchDatagram:
{
- _connection->prepareBatchRequest(&_os);
+ _handler->prepareBatchRequest(&_os);
break;
}
}
try
{
- _reference->getIdentity().__write(&_os);
+ _handler->getReference()->getIdentity().__write(&_os);
//
// For compatibility with the old FacetPath.
//
- if(_reference->getFacet().empty())
+ if(_handler->getReference()->getFacet().empty())
{
_os.write(static_cast<string*>(0), static_cast<string*>(0));
}
else
{
- string facet = _reference->getFacet();
+ string facet = _handler->getReference()->getFacet();
_os.write(&facet, &facet + 1);
}
@@ -138,11 +138,8 @@ IceInternal::Outgoing::Outgoing(ConnectionI* connection, Reference* ref, const s
//
// Implicit context
//
- const ImplicitContextIPtr& implicitContext =
- _reference->getInstance()->getImplicitContext();
-
- const Context& prxContext = _reference->getContext()->getValue();
-
+ const ImplicitContextIPtr& implicitContext = _handler->getReference()->getInstance()->getImplicitContext();
+ const Context& prxContext = _handler->getReference()->getContext()->getValue();
if(implicitContext == 0)
{
__writeContext(&_os, prxContext);
@@ -173,42 +170,33 @@ IceInternal::Outgoing::invoke()
_os.endWriteEncaps();
- switch(_reference->getMode())
+ switch(_handler->getReference()->getMode())
{
case Reference::ModeTwoway:
{
- //
- // We let all exceptions raised by sending directly
- // propagate to the caller, because they can be retried
- // without violating "at-most-once". In case of such
- // exceptions, the connection object does not call back on
- // this object, so we don't need to lock the mutex, keep
- // track of state, or save exceptions.
- //
- _connection->sendRequest(&_os, this, _compress);
-
- //
- // Wait until the request has completed, or until the
- // request times out.
- //
+ _state = StateInProgress;
- bool timedOut = false;
+ Ice::ConnectionI* connection = _handler->sendRequest(this);
+ bool timedOut = false;
+
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
//
- // It's possible that the request has already
- // completed, due to a regular response, or because of
- // an exception. So we only change the state to "in
- // progress" if it is still "unsent".
+ // If the request is being sent in the background we first wait for the
+ // sent notification.
//
- if(_state == StateUnsent)
+ while(_state != StateFailed && !_sent)
{
- _state = StateInProgress;
+ _monitor.wait();
}
-
- Int timeout = _connection->timeout();
+
+ //
+ // Wait until the request has completed, or until the request times out.
+ //
+
+ Int timeout = connection->timeout();
while(_state == StateInProgress && !timedOut)
{
if(timeout >= 0)
@@ -233,7 +221,7 @@ IceInternal::Outgoing::invoke()
// Must be called outside the synchronization of this
// object.
//
- _connection->exception(TimeoutException(__FILE__, __LINE__));
+ connection->exception(TimeoutException(__FILE__, __LINE__));
//
// We must wait until the exception set above has
@@ -241,7 +229,6 @@ IceInternal::Outgoing::invoke()
//
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
-
while(_state == StateInProgress)
{
_monitor.wait();
@@ -262,7 +249,8 @@ IceInternal::Outgoing::invoke()
// An ObjectNotExistException can always be retried as
// well without violating "at-most-once".
//
- if(dynamic_cast<CloseConnectionException*>(_exception.get()) ||
+ if(!_sent ||
+ dynamic_cast<CloseConnectionException*>(_exception.get()) ||
dynamic_cast<ObjectNotExistException*>(_exception.get()))
{
_exception->ice_throw();
@@ -281,25 +269,35 @@ IceInternal::Outgoing::invoke()
{
return false;
}
-
- assert(_state == StateOK);
- break;
+ else
+ {
+ assert(_state == StateOK);
+ return true;
+ }
}
-
+
case Reference::ModeOneway:
case Reference::ModeDatagram:
{
- //
- // For oneway and datagram requests, the connection object
- // never calls back on this object. Therefore we don't
- // need to lock the mutex or save exceptions. We simply
- // let all exceptions from sending propagate to the
- // caller, because such exceptions can be retried without
- // violating "at-most-once".
- //
_state = StateInProgress;
- _connection->sendRequest(&_os, 0, _compress);
- break;
+ if(_handler->sendRequest(this))
+ {
+ //
+ // If the handler returns the connection, we must wait for the sent callback.
+ //
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(_state != StateFailed && !_sent)
+ {
+ _monitor.wait();
+ }
+
+ if(_exception.get())
+ {
+ assert(!_sent);
+ _exception->ice_throw();
+ }
+ }
+ return true;
}
case Reference::ModeBatchOneway:
@@ -311,12 +309,13 @@ IceInternal::Outgoing::invoke()
// apply.
//
_state = StateInProgress;
- _connection->finishBatchRequest(&_os, _compress);
- break;
+ _handler->finishBatchRequest(&_os);
+ return true;
}
}
- return true;
+ assert(false);
+ return false;
}
void
@@ -329,9 +328,10 @@ IceInternal::Outgoing::abort(const LocalException& ex)
// notify the connection about that we give up ownership of the
// batch stream.
//
- if(_reference->getMode() == Reference::ModeBatchOneway || _reference->getMode() == Reference::ModeBatchDatagram)
+ if(_handler->getReference()->getMode() == Reference::ModeBatchOneway ||
+ _handler->getReference()->getMode() == Reference::ModeBatchDatagram)
{
- _connection->abortBatchRequest();
+ _handler->abortBatchRequest();
//
// If we abort a batch requests, we cannot retry, because not
@@ -345,11 +345,30 @@ IceInternal::Outgoing::abort(const LocalException& ex)
}
void
+IceInternal::Outgoing::sent(bool notify)
+{
+ if(notify)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _sent = true;
+ _monitor.notify();
+ }
+ else
+ {
+ //
+ // No synchronization is necessary if called from sendRequest() because the connection
+ // send mutex is locked and no other threads can call on Outgoing until it's released.
+ //
+ _sent = true;
+ }
+}
+
+void
IceInternal::Outgoing::finished(BasicStream& is)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
- assert(_reference->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
+ assert(_handler->getReference()->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
assert(_state <= StateInProgress);
@@ -514,12 +533,68 @@ void
IceInternal::Outgoing::finished(const LocalException& ex)
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
-
- assert(_reference->getMode() == Reference::ModeTwoway); // Can only be called for twoways.
-
assert(_state <= StateInProgress);
- _state = StateLocalException;
+ _state = StateFailed;
+ _exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
+ _monitor.notify();
+}
+
+IceInternal::BatchOutgoing::BatchOutgoing(RequestHandler* handler) :
+ _handler(handler),
+ _connection(0),
+ _sent(false),
+ _os(handler->getReference()->getInstance().get())
+{
+}
+
+IceInternal::BatchOutgoing::BatchOutgoing(ConnectionI* connection, Instance* instance) :
+ _handler(0),
+ _connection(connection),
+ _sent(false),
+ _os(instance)
+{
+}
+
+void
+IceInternal::BatchOutgoing::invoke()
+{
+ assert(_handler || _connection);
+ if(_handler && !_handler->flushBatchRequests(this) || _connection && !_connection->flushBatchRequests(this))
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!_exception.get() && !_sent)
+ {
+ _monitor.wait();
+ }
+
+ if(_exception.get())
+ {
+ assert(!_sent);
+ _exception->ice_throw();
+ }
+ }
+}
+
+void
+IceInternal::BatchOutgoing::sent(bool notify)
+{
+ if(notify)
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _sent = true;
+ _monitor.notify();
+ }
+ else
+ {
+ _sent = true;
+ }
+}
+
+void
+IceInternal::BatchOutgoing::finished(const Ice::LocalException& ex)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
_exception.reset(dynamic_cast<LocalException*>(ex.ice_clone()));
_monitor.notify();
}