summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp914
1 files changed, 521 insertions, 393 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 2c6ad47d336..7fc5b8a080a 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -30,23 +30,26 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
+IceUtil::Shared* IceInternal::upCast(AsyncResult* p) { return p; }
+
IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_invoke* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(AMI_Array_Object_ice_invoke* p) { return p; }
-IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_flushBatchRequests* p) { return p; }
+const unsigned char Ice::AsyncResult::OK = 0x1;
+const unsigned char Ice::AsyncResult::Done = 0x2;
+const unsigned char Ice::AsyncResult::Sent = 0x4;
+const unsigned char Ice::AsyncResult::EndCalled = 0x8;
namespace
{
-class CallException : public ThreadPoolWorkItem
+class AsynchronousException : public ThreadPoolWorkItem
{
public:
- CallException(const OutgoingAsyncMessageCallbackPtr& outAsync, const Ice::LocalException& ex) :
- _outAsync(outAsync), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone()))
+ AsynchronousException(const Ice::AsyncResultPtr& result, const Ice::Exception& ex) :
+ _result(result), _exception(dynamic_cast<Ice::LocalException*>(ex.ice_clone()))
{
}
@@ -54,142 +57,289 @@ public:
execute(ThreadPoolCurrent& current)
{
current.ioCompleted();
- _outAsync->__exception(*_exception.get());
+ _result->__exception(*_exception.get());
}
private:
- const OutgoingAsyncMessageCallbackPtr _outAsync;
- const auto_ptr<Ice::LocalException> _exception;
+ const Ice::AsyncResultPtr _result;
+ const auto_ptr<Ice::Exception> _exception;
};
};
-IceInternal::OutgoingAsyncMessageCallback::OutgoingAsyncMessageCallback() :
- __is(0),
- __os(0)
+Ice::AsyncResult::AsyncResult(const IceInternal::InstancePtr& instance,
+ const string& op,
+ const CallbackBasePtr& del,
+ const LocalObjectPtr& cookie) :
+ _instance(instance),
+ _operation(op),
+ _callback(del),
+ _cookie(cookie),
+ _is(instance.get()),
+ _os(instance.get()),
+ _state(0),
+ _exception(0)
+{
+ if(!_callback)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__);
+ }
+ const_cast<CallbackBasePtr&>(_callback) = _callback->__verify(const_cast<LocalObjectPtr&>(_cookie));
+}
+
+Ice::AsyncResult::~AsyncResult()
+{
+}
+
+bool
+Ice::AsyncResult::operator==(const AsyncResult& r) const
+{
+ return this == &r;
+}
+
+bool
+Ice::AsyncResult::operator!=(const AsyncResult& r) const
{
+ return !operator==(r);
}
-IceInternal::OutgoingAsyncMessageCallback::~OutgoingAsyncMessageCallback()
+bool
+Ice::AsyncResult::operator<(const AsyncResult& r) const
{
- assert(!__is);
- assert(!__os);
+ return this < &r;
+}
+
+Int
+Ice::AsyncResult::getHash() const
+{
+ return static_cast<Int>(reinterpret_cast<Long>(this) >> 4);
+}
+
+bool
+Ice::AsyncResult::isCompleted() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ return _state & Done;
+}
+
+void
+Ice::AsyncResult::waitForCompleted()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!(_state & Done))
+ {
+ _monitor.wait();
+ }
+}
+
+bool
+Ice::AsyncResult::isSent() const
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ return _state & (Sent | Done);
}
void
-IceInternal::OutgoingAsyncMessageCallback::__sentCallback(const InstancePtr& instance)
+Ice::AsyncResult::waitForSent()
{
- try
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(!(_state & (Sent | Done)))
{
- dynamic_cast<Ice::AMISentCallback*>(this)->ice_sent();
+ _monitor.wait();
}
- catch(const std::exception& ex)
+}
+
+bool
+Ice::AsyncResult::__wait()
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ if(_state & EndCalled)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "end_ method called more than once");
+ }
+ _state |= EndCalled;
+ while(!(_state & Done))
{
- __warning(instance, ex);
+ _monitor.wait();
}
- catch(...)
+ if(_exception.get())
{
- __warning(instance);
+ _exception.get()->ice_throw();
}
+ return _state & OK;
}
void
-IceInternal::OutgoingAsyncMessageCallback::__exception(const Ice::Exception& exc)
-{
+Ice::AsyncResult::__throwUserException()
+{
try
{
- ice_exception(exc);
+ _is.startReadEncaps();
+ _is.throwException();
}
- catch(const std::exception& ex)
- {
- __warning(ex);
- }
- catch(...)
+ catch(const Ice::UserException&)
{
- __warning();
+ _is.endReadEncaps();
+ throw;
}
-
- __releaseCallback();
}
void
-IceInternal::OutgoingAsyncMessageCallback::__acquireCallback(const Ice::ObjectPrx& proxy)
+Ice::AsyncResult::__sent()
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
-
//
- // We must first wait for other requests to finish.
+ // Note: no need to change the _state here, specializations are responsible for
+ // changing the state.
//
- while(__os)
+
+ if(_callback)
+ {
+ try
+ {
+ AsyncResultPtr self(this);
+ _callback->__sent(self);
+ }
+ catch(const std::exception& ex)
+ {
+ __warning(ex);
+ }
+ catch(...)
+ {
+ __warning();
+ }
+ }
+}
+
+void
+Ice::AsyncResult::__exception(const Ice::Exception& ex)
+{
{
- __monitor.wait();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _state |= Done;
+ _exception.reset(ex.ice_clone());
+ _monitor.notifyAll();
}
- Instance* instance = proxy->__reference()->getInstance().get();
- assert(!__os);
- __os = new BasicStream(instance);
+ if(_callback)
+ {
+ try
+ {
+ AsyncResultPtr self(this);
+ _callback->__completed(self);
+ }
+ catch(const std::exception& ex)
+ {
+ __warning(ex);
+ }
+ catch(...)
+ {
+ __warning();
+ }
+ }
}
void
-IceInternal::OutgoingAsyncMessageCallback::__releaseCallback(const Ice::LocalException& exc)
+Ice::AsyncResult::__exceptionAsync(const Ice::Exception& ex)
{
- assert(__os);
-
//
- // This is called by the invoking thread to release the callback following a direct
- // failure to marhsall/send the request. We call the ice_exception() callback with
- // the thread pool to avoid potential deadlocks in case the invoking thread locked
- // some mutexes/resources (which couldn't be re-acquired by the callback).
+ // This is called when it's not safe to call the exception callback synchronously
+ // from this thread. Instead the exception callback is called asynchronously from
+ // the client thread pool.
//
-
try
{
- //
- // COMPILERFIX: The following in done in two separate lines in order to work around
- // bug in C++Builder 2009.
- //
- ThreadPoolPtr threadPool = __os->instance()->clientThreadPool();
- threadPool->execute(new CallException(this, exc));
+ _instance->clientThreadPool()->execute(new AsynchronousException(this, ex));
}
catch(const Ice::CommunicatorDestroyedException&)
{
- __releaseCallback();
throw; // CommunicatorDestroyedException is the only exception that can propagate directly.
}
}
void
-IceInternal::OutgoingAsyncMessageCallback::__releaseCallbackNoSync()
+Ice::AsyncResult::__response()
+{
+ //
+ // Note: no need to change the _state here, specializations are responsible for
+ // changing the state.
+ //
+
+ if(_callback)
+ {
+ try
+ {
+ AsyncResultPtr self(this);
+ _callback->__completed(self);
+ }
+ catch(const std::exception& ex)
+ {
+ __warning(ex);
+ }
+ catch(...)
+ {
+ __warning();
+ }
+ }
+}
+
+void
+Ice::AsyncResult::__check(const AsyncResultPtr& r, const IceProxy::Ice::Object* prx, const string& operation)
{
- if(__is)
+ __check(r, operation);
+ if(r->getProxy().get() != prx)
{
- delete __is;
- __is = 0;
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Proxy for call to end_" + operation +
+ " does not match proxy that was used to call corresponding begin_" +
+ operation + " method");
}
+}
- assert(__os);
- delete __os;
- __os = 0;
+void
+Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Communicator* com, const string& operation)
+{
+ __check(r, operation);
+ if(r->getCommunicator().get() != com)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Communicator for call to end_" + operation +
+ " does not match communicator that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
+}
- __monitor.notify();
+void
+Ice::AsyncResult::__check(const AsyncResultPtr& r, const Ice::Connection* con, const string& operation)
+{
+ __check(r, operation);
+ if(r->getConnection().get() != con)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Connection for call to end_" + operation +
+ " does not match connection that was used to call corresponding " +
+ "begin_" + operation + " method");
+ }
}
void
-IceInternal::OutgoingAsyncMessageCallback::__warning(const std::exception& exc) const
+Ice::AsyncResult::__check(const AsyncResultPtr& r, const string& operation)
{
- if(__os) // Don't print anything if release() was already called.
+ if(!r)
{
- __warning(__os->instance(), exc);
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "AsyncResult == null");
+ }
+ else if(&r->_operation != &operation)
+ {
+ throw IceUtil::IllegalArgumentException(__FILE__, __LINE__, "Incorrect operation for end_" + operation +
+ " method: " + r->_operation);
}
}
+
void
-IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance, const std::exception& exc) const
+Ice::AsyncResult::__warning(const std::exception& exc) const
{
- if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
{
- Warning out(instance->initializationData().logger);
+ Warning out(_instance->initializationData().logger);
const Exception* ex = dynamic_cast<const Exception*>(&exc);
if(ex)
{
@@ -203,44 +353,191 @@ IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance
}
void
-IceInternal::OutgoingAsyncMessageCallback::__warning() const
+Ice::AsyncResult::__warning() const
{
- if(__os) // Don't print anything if release() was already called.
+ if(_instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
{
- __warning(__os->instance());
+ Warning out(_instance->initializationData().logger);
+ out << "unknown exception raised by AMI callback";
}
}
+IceInternal::OutgoingAsync::OutgoingAsync(const ObjectPrx& prx,
+ const std::string& operation,
+ const CallbackBasePtr& delegate,
+ const Ice::LocalObjectPtr& cookie) :
+ AsyncResult(prx->__reference()->getInstance(), operation, delegate, cookie),
+ _proxy(prx)
+{
+}
+
void
-IceInternal::OutgoingAsyncMessageCallback::__warning(const InstancePtr& instance) const
+IceInternal::OutgoingAsync::__prepare(const std::string& operation, OperationMode mode, const Context* context)
{
- if(instance->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ _delegate = 0;
+ _cnt = 0;
+ _mode = mode;
+ _sentSynchronously = false;
+
+ //
+ // Can't call async via a batch proxy.
+ //
+ if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
{
- Warning out(instance->initializationData().logger);
- out << "unknown exception raised by AMI callback";
+ throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
}
+
+ _os.writeBlob(requestHdr, sizeof(requestHdr));
+
+ Reference* ref = _proxy->__reference().get();
+
+ ref->getIdentity().__write(&_os);
+
+ //
+ // For compatibility with the old FacetPath.
+ //
+ if(ref->getFacet().empty())
+ {
+ _os.write(static_cast<string*>(0), static_cast<string*>(0));
+ }
+ else
+ {
+ string facet = ref->getFacet();
+ _os.write(&facet, &facet + 1);
+ }
+
+ _os.write(operation, false);
+
+ _os.write(static_cast<Byte>(_mode));
+
+ if(context != 0)
+ {
+ //
+ // Explicit context
+ //
+ __writeContext(&_os, *context);
+ }
+ else
+ {
+ //
+ // Implicit context
+ //
+ const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
+ const Context& prxContext = ref->getContext()->getValue();
+ if(implicitContext == 0)
+ {
+ __writeContext(&_os, prxContext);
+ }
+ else
+ {
+ implicitContext->write(prxContext, &_os);
+ }
+ }
+
+ _os.startWriteEncaps();
}
-void
+bool
IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
- _sent = true;
-
- if(!_proxy->ice_isTwoway())
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+
+ bool alreadySent = _state & Sent; // Expected in case of a retry.
+ _state |= Sent;
+
+ //
+ // It's possible for the request to be done already when using IOCP. This
+ // is the case for example if the send callback is dispatched after the
+ // read callback for the response/exception.
+ //
+ if(!(_state & Done))
{
- __releaseCallbackNoSync(); // No response expected, we're done with the OutgoingAsync.
+ if(!_proxy->ice_isTwoway())
+ {
+ _state |= Done | OK;
+ }
+ else if(connection->timeout() > 0)
+ {
+ assert(!_timerTaskConnection);
+ _timerTaskConnection = connection;
+ IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
+ _instance->timer()->schedule(this, timeout);
+ }
}
- else if(_response)
+ _monitor.notifyAll();
+ return !alreadySent && _callback && _callback->__hasSentCallback(); // Don't call the sent call is already sent.
+}
+
+void
+IceInternal::OutgoingAsync::__sent()
+{
+#if defined(_MSC_VER) && (_MSC_VER < 1300) // VC++ 6 compiler bug
+ AsyncResult::__sent();
+#else
+ ::Ice::AsyncResult::__sent();
+#endif
+}
+
+void
+IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc, bool sent)
+{
{
- __monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(!(_state & Done));
+ if(_timerTaskConnection)
+ {
+ _instance->timer()->cancel(this);
+ _timerTaskConnection = 0;
+ }
}
- else if(connection->timeout() > 0)
+
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback.
+ //
+
+ try
+ {
+ int interval = handleException(exc, sent); // This will throw if the invocation can't be retried.
+ if(interval > 0)
+ {
+ _instance->retryQueue()->add(this, interval);
+ }
+ else
+ {
+ __send(false);
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ __exception(ex);
+ }
+}
+
+void
+IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
+{
+ //
+ // NOTE: at this point, synchronization isn't needed, no other threads should be
+ // calling on the callback. The LocalExceptionWrapper exception is only called
+ // before the invocation is sent.
+ //
+
+ try
{
- assert(!_timerTaskConnection && __os);
- _timerTaskConnection = connection;
- IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
- __os->instance()->timer()->schedule(this, timeout);
+ int interval = handleException(exc); // This will throw if the invocation can't be retried.
+ if(interval > 0)
+ {
+ _instance->retryQueue()->add(this, interval);
+ }
+ else
+ {
+ __send(false);
+ }
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ __exception(ex);
}
}
@@ -252,23 +549,17 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
Ice::Byte replyStatus;
try
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
- assert(__os);
- _response = true;
-
- if(_timerTaskConnection && __os->instance()->timer()->cancel(this))
- {
- _timerTaskConnection = 0; // Timer cancelled.
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(!_exception.get() && !(_state & Done));
- while(!_sent || _timerTaskConnection)
+ if(_timerTaskConnection)
{
- __monitor.wait();
+ _instance->timer()->cancel(this);
+ _timerTaskConnection = 0;
}
- __is = new BasicStream(__os->instance());
- __is->swap(is);
- __is->read(replyStatus);
+ _is.swap(is);
+ _is.read(replyStatus);
switch(replyStatus)
{
@@ -283,13 +574,13 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
case replyOperationNotExist:
{
Identity ident;
- ident.__read(__is);
+ ident.__read(&_is);
//
// For compatibility with the old FacetPath.
//
vector<string> facetPath;
- __is->read(facetPath);
+ _is.read(facetPath);
string facet;
if(!facetPath.empty())
{
@@ -301,7 +592,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
}
string operation;
- __is->read(operation, false);
+ _is.read(operation, false);
auto_ptr<RequestFailedException> ex;
switch(replyStatus)
@@ -342,7 +633,7 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
case replyUnknownUserException:
{
string unknown;
- __is->read(unknown, false);
+ _is.read(unknown, false);
auto_ptr<UnknownException> ex;
switch(replyStatus)
@@ -381,223 +672,73 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
throw UnknownReplyStatusException(__FILE__, __LINE__);
}
}
+
+ _state |= Done;
+ if(replyStatus == replyOK)
+ {
+ _state |= OK;
+ }
+ _monitor.notifyAll();
}
catch(const LocalException& ex)
{
- __finished(ex);
+ __finished(ex, true);
return;
}
assert(replyStatus == replyOK || replyStatus == replyUserException);
-
- try
- {
- __response(replyStatus == replyOK);
- }
- catch(const std::exception& ex)
- {
- __warning(ex);
- __releaseCallback();
- }
- catch(...)
- {
- __warning();
- __releaseCallback();
- }
+ __response();
}
-void
-IceInternal::OutgoingAsync::__finished(const Ice::LocalException& exc)
+bool
+IceInternal::OutgoingAsync::__send(bool synchronous)
{
+ while(true)
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
- assert(__os);
-
- if(_timerTaskConnection && __os->instance()->timer()->cancel(this))
+ int interval = 0;
+ try
{
- _timerTaskConnection = 0; // Timer cancelled.
+ _delegate = _proxy->__getDelegate(true);
+ bool sent = _delegate->__getRequestHandler()->sendAsyncRequest(this);
+ if(synchronous)
+ {
+ _sentSynchronously = sent;
+ }
+ break;
}
-
- while(_timerTaskConnection)
+ catch(const LocalExceptionWrapper& ex)
{
- __monitor.wait();
+ interval = handleException(ex);
}
- }
-
- //
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback.
- //
-
- try
- {
- handleException(exc); // This will throw if the invocation can't be retried.
- }
- catch(const Ice::LocalException& ex)
- {
- __exception(ex);
- }
-}
-
-void
-IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& exc)
-{
- assert(__os && !_sent);
-
- //
- // NOTE: at this point, synchronization isn't needed, no other threads should be
- // calling on the callback. The LocalExceptionWrapper exception is only called
- // before the invocation is sent.
- //
-
- try
- {
- handleException(exc); // This will throw if the invocation can't be retried.
- }
- catch(const Ice::LocalException& ex)
- {
- __exception(ex);
- }
-}
-
-void
-IceInternal::OutgoingAsync::__retry(int interval)
-{
- //
- // This method is called by the proxy to retry an invocation, no
- // other threads can access this object.
- //
- if(interval > 0)
- {
- assert(__os);
- __os->instance()->retryQueue()->add(this, interval);
- }
- else
- {
- __send();
- }
-}
-
-bool
-IceInternal::OutgoingAsync::__send()
-{
- try
- {
- _sent = false;
- _response = false;
- _delegate = _proxy->__getDelegate(true);
- _sentSynchronously = _delegate->__getRequestHandler()->sendAsyncRequest(this);
- }
- catch(const LocalExceptionWrapper& ex)
- {
- handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
- }
- catch(const Ice::LocalException& ex)
- {
- handleException(ex); // Might call __send() again upon retry and assign _sentSynchronously
- }
- return _sentSynchronously;
-}
-
-void
-IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode,
- const Context* context)
-{
- _proxy = prx;
- _delegate = 0;
- _cnt = 0;
- _mode = mode;
- _sentSynchronously = false;
-
- //
- // Can't call async via a batch proxy.
- //
- if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
- {
- throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
- }
-
- __os->writeBlob(requestHdr, sizeof(requestHdr));
-
- Reference* ref = _proxy->__reference().get();
-
- ref->getIdentity().__write(__os);
-
- //
- // For compatibility with the old FacetPath.
- //
- if(ref->getFacet().empty())
- {
- __os->write(static_cast<string*>(0), static_cast<string*>(0));
- }
- else
- {
- string facet = ref->getFacet();
- __os->write(&facet, &facet + 1);
- }
-
- __os->write(operation, false);
-
- __os->write(static_cast<Byte>(_mode));
-
- if(context != 0)
- {
- //
- // Explicit context
- //
- __writeContext(__os, *context);
- }
- else
- {
- //
- // Implicit context
- //
- const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
- const Context& prxContext = ref->getContext()->getValue();
- if(implicitContext == 0)
+ catch(const Ice::LocalException& ex)
{
- __writeContext(__os, prxContext);
+ interval = handleException(ex, false);
}
- else
+
+ if(interval > 0)
{
- implicitContext->write(prxContext, __os);
+ _instance->retryQueue()->add(this, interval);
+ return false;
}
}
-
- __os->startWriteEncaps();
-}
-
-void
-IceInternal::OutgoingAsync::__throwUserException()
-{
- try
- {
- assert(__is);
- __is->startReadEncaps();
- __is->throwException();
- }
- catch(const Ice::UserException&)
- {
- __is->endReadEncaps();
- throw;
- }
+ return _sentSynchronously;
}
-void
+int
IceInternal::OutgoingAsync::handleException(const LocalExceptionWrapper& ex)
{
if(_mode == Nonmutating || _mode == Idempotent)
{
- _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
+ return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt);
}
else
{
- _proxy->__handleExceptionWrapper(_delegate, ex, this);
+ return _proxy->__handleExceptionWrapper(_delegate, ex);
}
}
-void
-IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc)
+int
+IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc, bool sent)
{
try
{
@@ -611,7 +752,7 @@ IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc)
// "at-most-once" (see the implementation of the checkRetryAfterException method of
// the ProxyFactory class for the reasons why it can be useful).
//
- if(!_sent ||
+ if(!sent ||
dynamic_cast<const CloseConnectionException*>(&exc) ||
dynamic_cast<const ObjectNotExistException*>(&exc))
{
@@ -629,17 +770,18 @@ IceInternal::OutgoingAsync::handleException(const Ice::LocalException& exc)
{
if(_mode == Nonmutating || _mode == Idempotent)
{
- _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, this, _cnt);
+ return _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, false, _cnt);
}
else
{
- _proxy->__handleExceptionWrapper(_delegate, ex, this);
+ return _proxy->__handleExceptionWrapper(_delegate, ex);
}
}
catch(const Ice::LocalException& ex)
{
- _proxy->__handleException(_delegate, ex, this, _cnt);
+ return _proxy->__handleException(_delegate, ex, false, _cnt);
}
+ return 0; // Keep the compiler happy.
}
void
@@ -647,15 +789,9 @@ IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTi
{
Ice::ConnectionIPtr connection;
{
- IceUtil::Monitor<IceUtil::Mutex>::Lock sync(__monitor);
- assert(_timerTaskConnection && _sent); // Can only be set once the request is sent.
-
- if(!_response) // If the response was just received, don't close the connection.
- {
- connection = _timerTaskConnection;
- }
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ connection = _timerTaskConnection;
_timerTaskConnection = 0;
- __monitor.notifyAll();
}
if(connection)
@@ -664,122 +800,114 @@ IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTi
}
}
-void
+IceInternal::BatchOutgoingAsync::BatchOutgoingAsync(const InstancePtr& instance,
+ const std::string& operation,
+ const CallbackBasePtr& delegate,
+ const Ice::LocalObjectPtr& cookie) :
+ AsyncResult(instance, operation, delegate, cookie)
+{
+}
+
+bool
IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
{
- __releaseCallback();
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(!_exception.get());
+ _state |= Done | OK | Sent;
+ _monitor.notifyAll();
+ return _callback && _callback->__hasSentCallback();
}
void
-IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc)
+IceInternal::BatchOutgoingAsync::__sent()
+{
+#if defined(_MSC_VER) && (_MSC_VER < 1300) // VC++ 6 compiler bug
+ AsyncResult::__sent();
+#else
+ ::Ice::AsyncResult::__sent();
+#endif
+}
+
+void
+IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc, bool)
{
__exception(exc);
}
-bool
-Ice::AMI_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
- const vector<Byte>& inParams, const Context* context)
+IceInternal::ProxyBatchOutgoingAsync::ProxyBatchOutgoingAsync(const Ice::ObjectPrx& proxy,
+ const std::string& operation,
+ const CallbackBasePtr& delegate,
+ const Ice::LocalObjectPtr& cookie) :
+ BatchOutgoingAsync(proxy->__reference()->getInstance(), operation, delegate, cookie),
+ _proxy(proxy)
{
- __acquireCallback(prx);
- try
- {
- __prepare(prx, operation, mode, context);
- __os->writeBlob(inParams);
- __os->endWriteEncaps();
- return __send();
- }
- catch(const Ice::LocalException& ex)
- {
- __releaseCallback(ex);
- return false;
- }
}
-void
-Ice::AMI_Object_ice_invoke::__response(bool ok) // ok == true means no user exception.
+namespace
{
- vector<Byte> outParams;
- try
+
+//
+// Dummy class derived from CallbackBase
+// We use this class for the __dummyCallback extern pointer in OutgoingAsync. In turn,
+// this allows us to test whether the user supplied a null delegate instance to the
+// generated begin_ method without having to generate a separate test to throw IllegalArgumentException
+// in the inlined versions of the begin_ method. In other words, this reduces the amount of generated
+// object code.
+//
+class DummyCallback : public CallbackBase
+{
+public:
+
+ DummyCallback()
{
- __is->startReadEncaps();
- Int sz = __is->getReadEncapsSize();
- __is->readBlob(outParams, sz);
- __is->endReadEncaps();
}
- catch(const LocalException& ex)
+
+ virtual void __completed(const Ice::AsyncResultPtr&) const
{
- __finished(ex);
- return;
+ assert(false);
}
- ice_response(ok, outParams);
- __releaseCallback();
-}
-bool
-Ice::AMI_Array_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
- const pair<const Byte*, const Byte*>& inParams, const Context* context)
-{
- __acquireCallback(prx);
- try
+ virtual CallbackBasePtr __verify(Ice::LocalObjectPtr&)
{
- __prepare(prx, operation, mode, context);
- __os->writeBlob(inParams.first, static_cast<Int>(inParams.second - inParams.first));
- __os->endWriteEncaps();
- return __send();
+ //
+ // Called by the AsyncResult constructor to verify the delegate. The dummy
+ // delegate is passed when the user used a begin_ method without delegate.
+ // By returning 0 here, we tell the AsyncResult that no delegates was
+ // provided.
+ //
+ return 0;
}
- catch(const Ice::LocalException& ex)
+
+ virtual void __sent(const AsyncResultPtr&) const
{
- __releaseCallback(ex);
+ assert(false);
+ }
+
+ virtual bool __hasSentCallback() const
+ {
+ assert(false);
return false;
}
+};
+
}
+//
+// This gives a pointer value to compare against in the generated
+// begin_ method to decide whether the caller passed a null pointer
+// versus the generated inline version of the begin_ method having
+// passed a pointer to the dummy delegate.
+//
+CallbackBasePtr IceInternal::__dummyCallback = new DummyCallback;
+
void
-Ice::AMI_Array_Object_ice_invoke::__response(bool ok) // ok == true means no user exception.
+Ice::AMICallbackBase::__exception(const ::Ice::Exception& ex)
{
- pair<const Byte*, const Byte*> outParams;
- try
- {
- __is->startReadEncaps();
- Int sz = __is->getReadEncapsSize();
- __is->readBlob(outParams.first, sz);
- outParams.second = outParams.first + sz;
- __is->endReadEncaps();
- }
- catch(const LocalException& ex)
- {
- __finished(ex);
- return;
- }
- ice_response(ok, outParams);
- __releaseCallback();
+ ice_exception(ex);
}
-bool
-Ice::AMI_Object_ice_flushBatchRequests::__invoke(const ObjectPrx& prx)
+void
+Ice::AMICallbackBase::__sent()
{
- __acquireCallback(prx);
- try
- {
- //
- // We don't automatically retry if ice_flushBatchRequests fails. Otherwise, if some batch
- // requests were queued with the connection, they would be lost without being noticed.
- //
- Handle< ::IceDelegate::Ice::Object> delegate;
- int cnt = -1; // Don't retry.
- try
- {
- delegate = prx->__getDelegate(true);
- return delegate->__getRequestHandler()->flushAsyncBatchRequests(this);
- }
- catch(const Ice::LocalException& ex)
- {
- prx->__handleException(delegate, ex, 0, cnt);
- }
- }
- catch(const Ice::LocalException& ex)
- {
- __releaseCallback(ex);
- }
- return false;
+ dynamic_cast<AMISentCallback*>(this)->ice_sent();
}