summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/Outgoing.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/Outgoing.cpp')
-rw-r--r--cpp/src/Ice/Outgoing.cpp461
1 files changed, 208 insertions, 253 deletions
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 3699c3ee29a..23cccd2708a 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -23,134 +23,29 @@ using namespace Ice;
using namespace Ice::Instrumentation;
using namespace IceInternal;
-OutgoingBase::OutgoingBase(Instance* instance, const string& operation) :
- _os(instance, Ice::currentProtocolEncoding), _sent(false)
+OutgoingBase::OutgoingBase(Instance* instance) : _os(instance, Ice::currentProtocolEncoding), _sent(false)
{
}
-Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) :
- OutgoingBase(proxy->__reference()->getInstance().get(), operation),
+ProxyOutgoingBase::ProxyOutgoingBase(IceProxy::Ice::Object* proxy, OperationMode mode) :
+ OutgoingBase(proxy->__reference()->getInstance().get()),
_proxy(proxy),
_mode(mode),
- _state(StateUnsent),
- _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())),
- _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding)
-{
- checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol()));
-
- _observer.attach(proxy, operation, context);
-
+ _state(StateUnsent)
+{
int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
if(invocationTimeout > 0)
{
_invocationTimeoutDeadline = Time::now(Time::Monotonic) + Time::milliSeconds(invocationTimeout);
}
-
- switch(_proxy->__reference()->getMode())
- {
- case Reference::ModeTwoway:
- case Reference::ModeOneway:
- case Reference::ModeDatagram:
- {
- _os.writeBlob(requestHdr, sizeof(requestHdr));
- break;
- }
-
- case Reference::ModeBatchOneway:
- case Reference::ModeBatchDatagram:
- {
- while(true)
- {
- try
- {
- _handler = proxy->__getRequestHandler();
- _handler->prepareBatchRequest(&_os);
- break;
- }
- catch(const RetryException&)
- {
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
- }
- catch(const Ice::LocalException& ex)
- {
- _observer.failed(ex.ice_name());
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler
- throw;
- }
- }
- break;
- }
- }
-
- try
- {
- _os.write(_proxy->__reference()->getIdentity());
-
- //
- // For compatibility with the old FacetPath.
- //
- if(_proxy->__reference()->getFacet().empty())
- {
- _os.write(static_cast<string*>(0), static_cast<string*>(0));
- }
- else
- {
- string facet = _proxy->__reference()->getFacet();
- _os.write(&facet, &facet + 1);
- }
-
- _os.write(operation, false);
-
- _os.write(static_cast<Ice::Byte>(mode));
-
- if(context != 0)
- {
- //
- // Explicit context
- //
- _os.write(*context);
- }
- else
- {
- //
- // Implicit context
- //
- const ImplicitContextIPtr& implicitContext = _proxy->__reference()->getInstance()->getImplicitContext();
- const Context& prxContext = _proxy->__reference()->getContext()->getValue();
- if(implicitContext == 0)
- {
- _os.write(prxContext);
- }
- else
- {
- implicitContext->write(prxContext, &_os);
- }
- }
- }
- catch(const LocalException& ex)
- {
- abort(ex);
- }
}
-Outgoing::~Outgoing()
-{
-}
-
-bool
-Outgoing::send(const Ice::ConnectionIPtr& connection, bool compress, bool response)
+ProxyOutgoingBase::~ProxyOutgoingBase()
{
- return connection->sendRequest(this, compress, response);
}
void
-Outgoing::invokeCollocated(CollocatedRequestHandler* handler)
-{
- handler->invokeRequest(this);
-}
-
-void
-Outgoing::sent()
+ProxyOutgoingBase::sent()
{
Monitor<Mutex>::Lock sync(_monitor);
if(_proxy->__reference()->getMode() != Reference::ModeTwoway)
@@ -162,14 +57,14 @@ Outgoing::sent()
_monitor.notify();
//
- // NOTE: At this point the stack allocated Outgoing object can be destroyed
+ // NOTE: At this point the stack allocated ProxyOutgoingBase object can be destroyed
// since the notify() on the monitor will release the thread waiting on the
// synchronous Ice call.
//
}
void
-Outgoing::completed(const Ice::Exception& ex)
+ProxyOutgoingBase::completed(const Ice::Exception& ex)
{
Monitor<Mutex>::Lock sync(_monitor);
//assert(_state <= StateInProgress);
@@ -197,7 +92,13 @@ Outgoing::completed(const Ice::Exception& ex)
}
void
-Outgoing::retryException(const Ice::Exception&)
+ProxyOutgoingBase::completed(BasicStream& is)
+{
+ assert(false); // Must be overriden
+}
+
+void
+ProxyOutgoingBase::retryException(const Ice::Exception&)
{
Monitor<Mutex>::Lock sync(_monitor);
assert(_state <= StateInProgress);
@@ -206,22 +107,14 @@ Outgoing::retryException(const Ice::Exception&)
}
bool
-Outgoing::invoke()
+ProxyOutgoingBase::invokeImpl()
{
assert(_state == StateUnsent);
-
- const Reference::Mode mode = _proxy->__reference()->getMode();
- if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
- {
- _state = StateInProgress;
- _handler->finishBatchRequest(&_os);
- return true;
- }
const int invocationTimeout = _proxy->__reference()->getInvocationTimeout();
int cnt = 0;
while(true)
- {
+ {
try
{
if(invocationTimeout > 0 && _invocationTimeoutDeadline <= Time::now(Time::Monotonic))
@@ -256,7 +149,7 @@ Outgoing::invoke()
{
}
}
-
+
bool timedOut = false;
{
Monitor<Mutex>::Lock sync(_monitor);
@@ -274,7 +167,7 @@ Outgoing::invoke()
break;
}
_monitor.timedWait(_invocationTimeoutDeadline - now);
-
+
if((_state == StateInProgress || !_sent) && _state != StateFailed)
{
now = Time::now(Time::Monotonic);
@@ -290,7 +183,7 @@ Outgoing::invoke()
}
}
}
-
+
if(timedOut)
{
if(invocationTimeout == -2)
@@ -304,8 +197,8 @@ Outgoing::invoke()
//
// Wait for the exception to propagate. It's possible the request handler ignores
- // the timeout if there was a failure shortly before requestCanceled got called.
- // In this case, the exception should be set on the Outgoing.
+ // the timeout if there was a failure shortly before requestCanceled got called.
+ // In this case, the exception should be set on the ProxyOutgoingBase.
//
Monitor<Mutex>::Lock sync(_monitor);
while(_state == StateInProgress)
@@ -313,14 +206,14 @@ Outgoing::invoke()
_monitor.wait();
}
}
-
+
if(_exception.get())
{
_exception->ice_throw();
}
else if(_state == StateRetry)
{
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
+ _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and retry.
continue;
}
else
@@ -331,7 +224,7 @@ Outgoing::invoke()
}
catch(const RetryException&)
{
- _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
+ _proxy->__updateRequestHandler(_handler, 0); // Clear request handler and retry.
}
catch(const Ice::Exception& ex)
{
@@ -348,8 +241,8 @@ Outgoing::invoke()
//
// Wait until either the retry and invocation timeout deadline is reached.
- // Note that we're using a loop here because sleep() precision isn't as
- // good as the motonic clock and it can return few hundred micro-seconds
+ // Note that we're using a loop here because sleep() precision isn't as
+ // good as the motonic clock and it can return few hundred micro-seconds
// earlier which breaks the check for the invocation timeout.
//
while(retryDeadline > now && _invocationTimeoutDeadline > now)
@@ -388,22 +281,129 @@ Outgoing::invoke()
return false;
}
+Outgoing::Outgoing(IceProxy::Ice::Object* proxy, const string& operation, OperationMode mode, const Context* context) :
+ ProxyOutgoingBase(proxy, mode),
+ _encoding(getCompatibleEncoding(proxy->__reference()->getEncoding())),
+ _is(proxy->__reference()->getInstance().get(), Ice::currentProtocolEncoding),
+ _operation(operation)
+{
+ checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol()));
+ _observer.attach(proxy, operation, context);
+
+ switch(_proxy->__reference()->getMode())
+ {
+ case Reference::ModeTwoway:
+ case Reference::ModeOneway:
+ case Reference::ModeDatagram:
+ {
+ _os.writeBlob(requestHdr, sizeof(requestHdr));
+ break;
+ }
+
+ case Reference::ModeBatchOneway:
+ case Reference::ModeBatchDatagram:
+ {
+ _proxy->__getBatchRequestQueue()->prepareBatchRequest(&_os);
+ break;
+ }
+ }
+
+ try
+ {
+ _os.write(_proxy->__reference()->getIdentity());
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ if(_proxy->__reference()->getFacet().empty())
+ {
+ _os.write(static_cast<string*>(0), static_cast<string*>(0));
+ }
+ else
+ {
+ string facet = _proxy->__reference()->getFacet();
+ _os.write(&facet, &facet + 1);
+ }
+
+ _os.write(operation, false);
+
+ _os.write(static_cast<Ice::Byte>(mode));
+
+ if(context != 0)
+ {
+ //
+ // Explicit context
+ //
+ _os.write(*context);
+ }
+ else
+ {
+ //
+ // Implicit context
+ //
+ const ImplicitContextIPtr& implicitContext = _proxy->__reference()->getInstance()->getImplicitContext();
+ const Context& prxContext = _proxy->__reference()->getContext()->getValue();
+ if(implicitContext == 0)
+ {
+ _os.write(prxContext);
+ }
+ else
+ {
+ implicitContext->write(prxContext, &_os);
+ }
+ }
+ }
+ catch(const LocalException& ex)
+ {
+ abort(ex);
+ }
+}
+
+Outgoing::~Outgoing()
+{
+}
+
+bool
+Outgoing::invokeRemote(const Ice::ConnectionIPtr& connection, bool compress, bool response)
+{
+ return connection->sendRequest(this, compress, response, 0);
+}
+
+void
+Outgoing::invokeCollocated(CollocatedRequestHandler* handler)
+{
+ handler->invokeRequest(this, 0);
+}
+
+bool
+Outgoing::invoke()
+{
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
+ {
+ _state = StateInProgress;
+ _proxy->__getBatchRequestQueue()->finishBatchRequest(&_os, _proxy, _operation);
+ return true;
+ }
+ return invokeImpl();
+}
+
void
Outgoing::abort(const LocalException& ex)
{
assert(_state == StateUnsent);
-
+
//
// If we didn't finish a batch oneway or datagram request, we must
// notify the connection about that we give up ownership of the
// batch stream.
//
- if(_proxy->__reference()->getMode() == Reference::ModeBatchOneway ||
- _proxy->__reference()->getMode() == Reference::ModeBatchDatagram)
+ const Reference::Mode mode = _proxy->__reference()->getMode();
+ if(mode == Reference::ModeBatchOneway || mode == Reference::ModeBatchDatagram)
{
- _handler->abortBatchRequest();
+ _proxy->__getBatchRequestQueue()->abortBatchRequest(&_os);
}
-
+
ex.ice_throw();
}
@@ -425,7 +425,7 @@ Outgoing::completed(BasicStream& is)
Ice::Byte replyStatus;
_is.read(replyStatus);
-
+
switch(replyStatus)
{
case replyOK:
@@ -433,14 +433,14 @@ Outgoing::completed(BasicStream& is)
_state = StateOK; // The state must be set last, in case there is an exception.
break;
}
-
+
case replyUserException:
{
_observer.userException();
_state = StateUserException; // The state must be set last, in case there is an exception.
break;
}
-
+
case replyObjectNotExist:
case replyFacetNotExist:
case replyOperationNotExist:
@@ -470,7 +470,7 @@ Outgoing::completed(BasicStream& is)
string operation;
_is.read(operation, false);
-
+
RequestFailedException* ex;
switch(replyStatus)
{
@@ -479,19 +479,19 @@ Outgoing::completed(BasicStream& is)
ex = new ObjectNotExistException(__FILE__, __LINE__);
break;
}
-
+
case replyFacetNotExist:
{
ex = new FacetNotExistException(__FILE__, __LINE__);
break;
}
-
+
case replyOperationNotExist:
{
ex = new OperationNotExistException(__FILE__, __LINE__);
break;
}
-
+
default:
{
ex = 0; // To keep the compiler from complaining.
@@ -499,7 +499,7 @@ Outgoing::completed(BasicStream& is)
break;
}
}
-
+
ex->id = ident;
ex->facet = facet;
ex->operation = operation;
@@ -508,7 +508,7 @@ Outgoing::completed(BasicStream& is)
_state = StateLocalException; // The state must be set last, in case there is an exception.
break;
}
-
+
case replyUnknownException:
case replyUnknownLocalException:
case replyUnknownUserException:
@@ -520,7 +520,7 @@ Outgoing::completed(BasicStream& is)
//
string unknown;
_is.read(unknown, false);
-
+
UnknownException* ex;
switch(replyStatus)
{
@@ -529,19 +529,19 @@ Outgoing::completed(BasicStream& is)
ex = new UnknownException(__FILE__, __LINE__);
break;
}
-
+
case replyUnknownLocalException:
{
ex = new UnknownLocalException(__FILE__, __LINE__);
break;
}
-
+
case replyUnknownUserException:
{
ex = new UnknownUserException(__FILE__, __LINE__);
break;
}
-
+
default:
{
ex = 0; // To keep the compiler from complaining.
@@ -549,14 +549,14 @@ Outgoing::completed(BasicStream& is)
break;
}
}
-
+
ex->unknown = unknown;
_exception.reset(ex);
_state = StateLocalException; // The state must be set last, in case there is an exception.
break;
}
-
+
default:
{
_exception.reset(new UnknownReplyStatusException(__FILE__, __LINE__));
@@ -583,145 +583,94 @@ Outgoing::throwUserException()
}
}
-FlushBatch::FlushBatch(IceProxy::Ice::Object* proxy, const string& operation) :
- OutgoingBase(proxy->__reference()->getInstance().get(), operation), _proxy(proxy), _connection(0)
+ProxyFlushBatch::ProxyFlushBatch(IceProxy::Ice::Object* proxy, const string& operation) :
+ ProxyOutgoingBase(proxy, Ice::Normal)
{
- checkSupportedProtocol(proxy->__reference()->getProtocol());
+ checkSupportedProtocol(getCompatibleProtocol(proxy->__reference()->getProtocol()));
+ _observer.attach(proxy, operation, 0);
- _observer.attach(proxy->__reference()->getInstance().get(), operation);
+ _batchRequestNum = proxy->__getBatchRequestQueue()->swap(&_os);
}
-FlushBatch::FlushBatch(ConnectionI* connection, Instance* instance, const string& operation) :
- OutgoingBase(instance, operation), _proxy(0), _connection(connection)
+bool
+ProxyFlushBatch::invokeRemote(const Ice::ConnectionIPtr& connection, bool compress, bool response)
{
- _observer.attach(instance, operation);
+ return connection->sendRequest(this, compress, response, _batchRequestNum);
}
void
-FlushBatch::invoke()
+ProxyFlushBatch::invokeCollocated(CollocatedRequestHandler* handler)
{
- assert(_proxy || _connection);
+ handler->invokeRequest(this, _batchRequestNum);
+}
- if(_connection)
+void
+ProxyFlushBatch::invoke()
+{
+ if(_batchRequestNum == 0)
{
- if(_connection->flushBatchRequests(this))
- {
- return;
- }
-
- Monitor<Mutex>::Lock sync(_monitor);
- while(!_exception.get() && !_sent)
- {
- _monitor.wait();
- }
- if(_exception.get())
- {
- _exception->ice_throw();
- }
- return;
+ sent();
}
+ else
+ {
+ invokeImpl();
+ }
+}
+
+ConnectionFlushBatch::ConnectionFlushBatch(ConnectionI* connection, Instance* instance, const string& operation) :
+ OutgoingBase(instance), _connection(connection)
+{
+ _observer.attach(instance, operation);
+}
+
+void
+ConnectionFlushBatch::invoke()
+{
+ int batchRequestNum = _connection->getBatchRequestQueue()->swap(&_os);
- RequestHandlerPtr handler;
try
{
- handler = _proxy->__getRequestHandler();
- if(handler->sendRequest(this))
+ if(batchRequestNum == 0)
{
- return;
+ sent();
}
-
- bool timedOut = false;
+ else if(!_connection->sendRequest(this, false, false, batchRequestNum))
{
Monitor<Mutex>::Lock sync(_monitor);
- int timeout = _proxy->__reference()->getInvocationTimeout();
- if(timeout > 0)
- {
- Time now = Time::now(Time::Monotonic);
- Time deadline = now + Time::milliSeconds(timeout);
- while(!_exception.get() && !_sent && !timedOut)
- {
- _monitor.timedWait(deadline - now);
- if(!_exception.get() && !_sent)
- {
- now = Time::now(Time::Monotonic);
- timedOut = now >= deadline;
- }
- }
- }
- else
- {
- while(!_exception.get() && !_sent)
- {
- _monitor.wait();
- }
- }
- }
-
- if(timedOut)
- {
- Ice::InvocationTimeoutException ex(__FILE__, __LINE__);
- handler->requestCanceled(this, ex);
-
- //
- // Wait for the exception to propagate. It's possible the request handler ignores
- // the timeout if there was a failure shortly before requestTimedOut got called.
- // In this case, the exception should be set on the Outgoing.
- //
- Monitor<Mutex>::Lock sync(_monitor);
while(!_exception.get() && !_sent)
{
_monitor.wait();
}
- }
-
- if(_exception.get())
- {
- _exception->ice_throw();
+ if(_exception.get())
+ {
+ _exception->ice_throw();
+ }
}
}
catch(const RetryException& ex)
{
- _proxy->__setRequestHandler(handler, 0); // Clear request handler
- ex.get()->ice_throw(); // Throw to notify the user that batch requests were potentially lost.
+ ex.get()->ice_throw();
}
- catch(const Ice::Exception& ex)
- {
- _proxy->__setRequestHandler(handler, 0); // Clear request handler
- _observer.failed(ex.ice_name());
- throw; // Throw to notify the user that batch requests were potentially lost.
- }
-}
-
-bool
-FlushBatch::send(const Ice::ConnectionIPtr& connection, bool, bool)
-{
- return connection->flushBatchRequests(this);
-}
-
-void
-FlushBatch::invokeCollocated(CollocatedRequestHandler* handler)
-{
- handler->invokeBatchRequests(this);
}
void
-FlushBatch::sent()
+ConnectionFlushBatch::sent()
{
Monitor<Mutex>::Lock sync(_monitor);
_childObserver.detach();
-
+
_sent = true;
_monitor.notify();
//
- // NOTE: At this point the stack allocated FlushBatch object
- // can be destroyed since the notify() on the monitor will release
- // the thread waiting on the synchronous Ice call.
+ // NOTE: At this point the stack allocated ConnectionFlushBatch
+ // object can be destroyed since the notify() on the monitor will
+ // release the thread waiting on the synchronous Ice call.
//
}
void
-FlushBatch::completed(const Ice::Exception& ex)
+ConnectionFlushBatch::completed(const Ice::Exception& ex)
{
Monitor<Mutex>::Lock sync(_monitor);
_childObserver.failed(ex.ice_name());
@@ -731,7 +680,13 @@ FlushBatch::completed(const Ice::Exception& ex)
}
void
-FlushBatch::retryException(const Ice::Exception& ex)
+ConnectionFlushBatch::completed(BasicStream& is)
+{
+ assert(false);
+}
+
+void
+ConnectionFlushBatch::retryException(const Ice::Exception& ex)
{
completed(ex);
}