summaryrefslogtreecommitdiff
path: root/cpp/src/Ice/OutgoingAsync.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/Ice/OutgoingAsync.cpp')
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp344
1 files changed, 281 insertions, 63 deletions
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 40cb5336c23..4286cc1949c 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -10,6 +10,7 @@
#include <Ice/OutgoingAsync.h>
#include <Ice/Object.h>
#include <Ice/ConnectionI.h>
+#include <Ice/RequestHandler.h>
#include <Ice/Reference.h>
#include <Ice/Instance.h>
#include <Ice/LocalException.h>
@@ -27,9 +28,12 @@ using namespace std;
using namespace Ice;
using namespace IceInternal;
+IceUtil::Shared* IceInternal::upCast(OutgoingAsyncMessageCallback* p) { return p; }
IceUtil::Shared* IceInternal::upCast(OutgoingAsync* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(BatchOutgoingAsync* p) { return p; }
IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_invoke* p) { return p; }
IceUtil::Shared* IceInternal::upCast(AMI_Array_Object_ice_invoke* p) { return p; }
+IceUtil::Shared* IceInternal::upCast(AMI_Object_ice_flushBatchRequests* p) { return p; }
IceInternal::OutgoingAsync::OutgoingAsync() :
__is(0),
@@ -44,17 +48,52 @@ IceInternal::OutgoingAsync::~OutgoingAsync()
}
void
+IceInternal::OutgoingAsync::__sent(Ice::ConnectionI* connection)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ _sent = true;
+
+ if(!_proxy->ice_isTwoway())
+ {
+ cleanup(); // No response expected, we're done with the OutgoingAsync.
+ }
+ else if(_response)
+ {
+ _monitor.notifyAll(); // If the response was already received notify finished() which is waiting.
+ }
+ else if(connection->timeout() > 0)
+ {
+ assert(!_timerTaskConnection);
+ _timerTaskConnection = connection;
+ IceUtil::Time timeout = IceUtil::Time::milliSeconds(connection->timeout());
+ _proxy->__reference()->getInstance()->timer()->schedule(this, timeout);
+ }
+}
+
+void
IceInternal::OutgoingAsync::__finished(BasicStream& is)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
+ assert(_proxy->ice_isTwoway()); // Can only be called for twoways.
Ice::Byte replyStatus;
-
try
{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(__os);
+ _response = true;
+
+ if(_timerTaskConnection && _proxy->__reference()->getInstance()->timer()->cancel(this))
+ {
+ _timerTaskConnection = 0; // Timer cancelled.
+ }
+
+ while(!_sent || _timerTaskConnection)
+ {
+ _monitor.wait();
+ }
+
__is->swap(is);
__is->read(replyStatus);
-
switch(replyStatus)
{
@@ -190,43 +229,65 @@ IceInternal::OutgoingAsync::__finished(BasicStream& is)
warning();
}
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
cleanup();
}
void
IceInternal::OutgoingAsync::__finished(const LocalException& exc)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
-
- if(__os) // Don't retry if cleanup() was already called.
+ bool retry = false;
{
- //
- // A CloseConnectionException indicates graceful server
- // shutdown, and is therefore always repeatable without
- // violating "at-most-once". That's because by sending a close
- // connection message, the server guarantees that all
- // outstanding requests can safely be repeated. Otherwise, we
- // can also retry if the operation mode is Nonmutating or
- // Idempotent.
- //
- // An ObjectNotExistException can always be retried as
- // well without violating "at-most-once".
- //
- if(_mode == Nonmutating || _mode == Idempotent || dynamic_cast<const CloseConnectionException*>(&exc) ||
- dynamic_cast<const ObjectNotExistException*>(&exc))
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+
+ if(__os) // Might be called from __prepare or before __prepare
{
- try
+ if(_timerTaskConnection && _proxy->__reference()->getInstance()->timer()->cancel(this))
{
- _proxy->__handleException(_delegate, exc, _cnt);
- __send();
- return;
+ _timerTaskConnection = 0; // Timer cancelled.
}
- catch(const LocalException&)
+
+ while(_timerTaskConnection)
+ {
+ _monitor.wait();
+ }
+
+ //
+ // A CloseConnectionException indicates graceful server
+ // shutdown, and is therefore always repeatable without
+ // violating "at-most-once". That's because by sending a close
+ // connection message, the server guarantees that all
+ // outstanding requests can safely be repeated. Otherwise, we
+ // can also retry if the operation mode is Nonmutating or
+ // Idempotent.
+ //
+ // An ObjectNotExistException can always be retried as
+ // well without violating "at-most-once".
+ //
+ if(!_sent ||
+ _mode == Nonmutating || _mode == Idempotent ||
+ dynamic_cast<const CloseConnectionException*>(&exc) ||
+ dynamic_cast<const ObjectNotExistException*>(&exc))
{
+ retry = true;
}
}
}
+ if(retry)
+ {
+ try
+ {
+ _proxy->__handleException(_delegate, exc, _cnt);
+ __send();
+ return;
+ }
+ catch(const LocalException&)
+ {
+ }
+ }
+
try
{
ice_exception(exc);
@@ -240,14 +301,58 @@ IceInternal::OutgoingAsync::__finished(const LocalException& exc)
warning();
}
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
cleanup();
}
void
+IceInternal::OutgoingAsync::__finished(const LocalExceptionWrapper& ex)
+{
+ //
+ // NOTE: This is called if sendRequest/sendAsyncRequest fails with
+ // a LocalExceptionWrapper exception. It's not possible for the
+ // timer to be set at this point because the request couldn't be
+ // sent.
+ //
+ assert(!_sent && !_timerTaskConnection);
+
+ try
+ {
+ if(_mode == Nonmutating || _mode == Idempotent)
+ {
+ _proxy->__handleExceptionWrapperRelaxed(_delegate, ex, _cnt);
+ }
+ else
+ {
+ _proxy->__handleExceptionWrapper(_delegate, ex);
+ }
+ __send();
+ }
+ catch(const Ice::LocalException& exc)
+ {
+ try
+ {
+ ice_exception(exc);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ cleanup();
+ }
+}
+
+void
IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operation, OperationMode mode,
const Context* context)
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
try
{
@@ -258,16 +363,20 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat
{
_monitor.wait();
}
-
+
//
- // Can't call async via a oneway proxy.
+ // Can't call async via a batch proxy.
//
- prx->__checkTwowayOnly(operation);
-
_proxy = prx;
+ if(_proxy->ice_isBatchOneway() || _proxy->ice_isBatchDatagram())
+ {
+ throw Ice::FeatureNotSupportedException(__FILE__, __LINE__, "can't send batch requests with AMI");
+ }
_delegate = 0;
_cnt = 0;
_mode = mode;
+ _sent = false;
+ _response = false;
ReferencePtr ref = _proxy->__reference();
assert(!__is);
@@ -308,11 +417,8 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat
//
// Implicit context
//
- const ImplicitContextIPtr& implicitContext =
- ref->getInstance()->getImplicitContext();
-
+ const ImplicitContextIPtr& implicitContext = ref->getInstance()->getImplicitContext();
const Context& prxContext = ref->getContext()->getValue();
-
if(implicitContext == 0)
{
__writeContext(__os, prxContext);
@@ -335,41 +441,45 @@ IceInternal::OutgoingAsync::__prepare(const ObjectPrx& prx, const string& operat
void
IceInternal::OutgoingAsync::__send()
{
- IceUtil::Monitor<IceUtil::RecMutex>::Lock sync(_monitor);
-
+ //
+ // NOTE: no synchronization needed. At this point, no other threads can be calling on this object.
+ //
+ RequestHandler* handler;
try
{
- while(true)
+ _delegate = _proxy->__getDelegate(true);
+ handler = _delegate->__getRequestHandler().get();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ __finished(ex);
+ return;
+ }
+
+ _sent = false;
+ _response = false;
+ handler->sendAsyncRequest(this);
+}
+
+void
+IceInternal::OutgoingAsync::runTimerTask() // Implementation of TimerTask::runTimerTask()
+{
+ Ice::ConnectionIPtr connection;
+ {
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ assert(_timerTaskConnection && _sent); // Can only be set once the request is sent.
+
+ if(!_response) // If the response was just received, don't close the connection.
{
- bool compress;
- _delegate = _proxy->__getDelegate();
- Ice::ConnectionIPtr connection = _delegate->__getConnection(compress);
- try
- {
- connection->sendAsyncRequest(__os, this, compress);
-
- //
- // Don't do anything after sendAsyncRequest() returned
- // without an exception. I such case, there will be
- // callbacks, i.e., calls to the __finished()
- // functions. Since there is no mutex protection, we
- // cannot modify state here and in such callbacks.
- //
- return;
- }
- catch(const LocalExceptionWrapper& ex)
- {
- _proxy->__handleExceptionWrapper(_delegate, ex);
- }
- catch(const LocalException& ex)
- {
- _proxy->__handleException(_delegate, ex, _cnt);
- }
+ connection = _timerTaskConnection;
}
+ _timerTaskConnection = 0;
+ _monitor.notifyAll();
}
- catch(const LocalException& ex)
+
+ if(connection)
{
- __finished(ex);
+ connection->exception(Ice::TimeoutException(__FILE__, __LINE__));
}
}
@@ -414,6 +524,8 @@ IceInternal::OutgoingAsync::warning() const
void
IceInternal::OutgoingAsync::cleanup()
{
+ assert(!_timerTaskConnection);
+
delete __is;
__is = 0;
delete __os;
@@ -422,6 +534,91 @@ IceInternal::OutgoingAsync::cleanup()
_monitor.notify();
}
+IceInternal::BatchOutgoingAsync::BatchOutgoingAsync() : _os(0)
+{
+}
+
+void
+IceInternal::BatchOutgoingAsync::__prepare(const InstancePtr& instance)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ while(_os)
+ {
+ _monitor.wait();
+ }
+ _os = new BasicStream(instance.get());
+}
+
+void
+IceInternal::BatchOutgoingAsync::__sent(Ice::ConnectionI* connection)
+{
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ cleanup();
+}
+
+void
+IceInternal::BatchOutgoingAsync::__finished(const Ice::LocalException& exc)
+{
+ try
+ {
+ ice_exception(exc);
+ }
+ catch(const std::exception& ex)
+ {
+ warning(ex);
+ }
+ catch(...)
+ {
+ warning();
+ }
+
+ IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
+ cleanup();
+}
+
+void
+IceInternal::BatchOutgoingAsync::warning(const std::exception& exc) const
+{
+ if(_os) // Don't print anything if cleanup() was already called.
+ {
+ if(_os->instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Warning out(_os->instance()->initializationData().logger);
+ const Exception* ex = dynamic_cast<const ObjectNotExistException*>(&exc);
+ if(ex)
+ {
+ out << "Ice::Exception raised by AMI callback:\n" << ex;
+ }
+ else
+ {
+ out << "std::exception raised by AMI callback:\n" << exc.what();
+ }
+ }
+ }
+}
+
+void
+IceInternal::BatchOutgoingAsync::warning() const
+{
+ if(_os) // Don't print anything if cleanup() was already called.
+ {
+ if(_os->instance()->initializationData().properties->getPropertyAsIntWithDefault("Ice.Warn.AMICallback", 1) > 0)
+ {
+ Warning out(_os->instance()->initializationData().logger);
+ out << "unknown exception raised by AMI callback";
+ }
+ }
+}
+
+void
+IceInternal::BatchOutgoingAsync::cleanup()
+{
+ delete _os;
+ _os = 0;
+
+ _monitor.notify();
+}
+
void
Ice::AMI_Object_ice_invoke::__invoke(const ObjectPrx& prx, const string& operation, OperationMode mode,
const vector<Byte>& inParams, const Context* context)
@@ -492,3 +689,24 @@ Ice::AMI_Array_Object_ice_invoke::__response(bool ok) // ok == true means no use
}
ice_response(ok, outParams);
}
+
+void
+Ice::AMI_Object_ice_flushBatchRequests::__invoke(const ObjectPrx& prx)
+{
+ Handle< ::IceDelegate::Ice::Object> delegate;
+ RequestHandler* handler;
+ try
+ {
+ __prepare(prx->__reference()->getInstance());
+ delegate = prx->__getDelegate(true);
+ handler = delegate->__getRequestHandler().get();
+ }
+ catch(const Ice::LocalException& ex)
+ {
+ __finished(ex);
+ return;
+ }
+
+ handler->flushAsyncBatchRequests(this);
+}
+