summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2014-11-27 14:34:15 +0100
committerBenoit Foucher <benoit@zeroc.com>2014-11-27 14:34:15 +0100
commit08c354c1cb70485c1f0ed83f814b04383a24e233 (patch)
tree0229ebe7cf0cb38b8a00a34df7cfd8a90ddf8d3a /cpp
parentICE-5995 Use variable GRADLE not gradlew in makefile (diff)
downloadice-08c354c1cb70485c1f0ed83f814b04383a24e233.tar.bz2
ice-08c354c1cb70485c1f0ed83f814b04383a24e233.tar.xz
ice-08c354c1cb70485c1f0ed83f814b04383a24e233.zip
Fixed ICE-5985: Java background test failures
Diffstat (limited to 'cpp')
-rw-r--r--cpp/include/Ice/Outgoing.h4
-rw-r--r--cpp/include/Ice/OutgoingAsync.h6
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.cpp195
-rw-r--r--cpp/src/Ice/ConnectRequestHandler.h2
-rw-r--r--cpp/src/Ice/Outgoing.cpp28
-rw-r--r--cpp/src/Ice/OutgoingAsync.cpp30
-rw-r--r--cpp/src/Ice/RequestHandlerFactory.cpp3
-rw-r--r--cpp/test/Ice/background/AllTests.cpp14
8 files changed, 162 insertions, 120 deletions
diff --git a/cpp/include/Ice/Outgoing.h b/cpp/include/Ice/Outgoing.h
index c50b0a28aae..75380bae036 100644
--- a/cpp/include/Ice/Outgoing.h
+++ b/cpp/include/Ice/Outgoing.h
@@ -46,6 +46,7 @@ public:
virtual void sent() = 0;
virtual void completed(const Ice::Exception&) = 0;
+ virtual void retryException(const Ice::Exception&) = 0;
BasicStream* os() { return &_os; }
@@ -86,6 +87,7 @@ public:
virtual void sent();
virtual void completed(const Ice::Exception&);
+ virtual void retryException(const Ice::Exception&);
bool invoke(); // Returns true if ok, false if user exception.
void abort(const Ice::LocalException&);
@@ -157,6 +159,7 @@ private:
{
StateUnsent,
StateInProgress,
+ StateRetry,
StateOK,
StateUserException,
StateLocalException,
@@ -181,6 +184,7 @@ public:
virtual void sent();
virtual void completed(const Ice::Exception&);
+ virtual void retryException(const Ice::Exception&);
private:
diff --git a/cpp/include/Ice/OutgoingAsync.h b/cpp/include/Ice/OutgoingAsync.h
index 7aedb026821..00378b6ea76 100644
--- a/cpp/include/Ice/OutgoingAsync.h
+++ b/cpp/include/Ice/OutgoingAsync.h
@@ -41,6 +41,7 @@ public:
virtual bool sent();
virtual bool completed(const Ice::Exception&);
+ virtual void retryException(const Ice::Exception&);
// Those methods are public when called from an OutgoingAsyncBase reference.
using Ice::AsyncResult::cancelable;
@@ -93,6 +94,7 @@ public:
using OutgoingAsyncBase::sent;
virtual bool completed(const Ice::Exception&);
+ virtual void retryException(const Ice::Exception&);
virtual void cancelable(const CancellationHandlerPtr&);
void retry();
@@ -109,7 +111,7 @@ protected:
bool finished(const Ice::Exception&);
bool finished(bool);
- virtual void handleRetryException(const RetryException&);
+ virtual void handleRetryException(const Ice::Exception&);
virtual int handleException(const Ice::Exception&);
virtual void runTimerTask();
@@ -196,7 +198,7 @@ public:
private:
- virtual void handleRetryException(const RetryException&);
+ virtual void handleRetryException(const Ice::Exception&);
virtual int handleException(const Ice::Exception&);
};
typedef IceUtil::Handle<ProxyFlushBatch> ProxyFlushBatchPtr;
diff --git a/cpp/src/Ice/ConnectRequestHandler.cpp b/cpp/src/Ice/ConnectRequestHandler.cpp
index 8c3b37d0cb5..cab236aa9e5 100644
--- a/cpp/src/Ice/ConnectRequestHandler.cpp
+++ b/cpp/src/Ice/ConnectRequestHandler.cpp
@@ -336,7 +336,41 @@ ConnectRequestHandler::setException(const Ice::LocalException& ex)
_proxies.clear();
_proxy = 0; // Break cyclic reference count.
- flushRequestsWithException();
+ //
+ // NOTE: remove the request handler *before* notifying the
+ // requests that the connection failed. It's important to ensure
+ // that future invocations will obtain a new connect request
+ // handler once invocations are notified.
+ //
+ try
+ {
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ // Ignore
+ }
+
+ for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
+ {
+ if(p->out)
+ {
+ p->out->completed(*_exception.get());
+ }
+ else if(p->outAsync)
+ {
+ if(p->outAsync->completed(*_exception.get()))
+ {
+ p->outAsync->invokeCompletedAsync();
+ }
+ }
+ else
+ {
+ assert(p->os);
+ delete p->os;
+ }
+ }
+ _requests.clear();
notifyAll();
}
@@ -399,40 +433,66 @@ ConnectRequestHandler::flushRequests()
_flushing = true;
}
- try
+ while(!_requests.empty()) // _requests is immutable when _flushing = true
{
- while(!_requests.empty()) // _requests is immutable when _flushing = true
+ Request& req = _requests.front();
+ if(req.out)
{
- Request& req = _requests.front();
- if(req.out)
+ try
+ {
+ req.out->send(_connection, _compress, _response);
+ }
+ catch(const RetryException& ex)
{
try
{
- req.out->send(_connection, _compress, _response);
+ // Remove the request handler before retrying.
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
}
- catch(const Ice::DatagramLimitException& ex)
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ // Ignore
+ }
+ req.out->retryException(*ex.get());
+ }
+ catch(const Ice::Exception& ex)
+ {
+ req.out->completed(ex);
+ }
+ }
+ else if(req.outAsync)
+ {
+ try
+ {
+ if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
{
- req.out->completed(ex);
+ req.outAsync->invokeSentAsync();
}
}
- else if(req.outAsync)
+ catch(const RetryException& ex)
{
try
{
- if(req.outAsync->send(_connection, _compress, _response) & AsyncStatusInvokeSentCallback)
- {
- req.outAsync->invokeSentAsync();
- }
+ // Remove the request handler before retrying.
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
}
- catch(const Ice::DatagramLimitException& ex)
+ catch(const Ice::CommunicatorDestroyedException&)
{
- if(req.outAsync->completed(ex))
- {
- req.outAsync->invokeCompletedAsync();
- }
+ // Ignore
}
+ req.outAsync->retryException(*ex.get());
}
- else
+ catch(const Ice::Exception& ex)
+ {
+ if(req.outAsync->completed(ex))
+ {
+ req.outAsync->invokeCompletedAsync();
+ }
+ }
+ }
+ else
+ {
+ try
{
BasicStream os(req.os->instance(), Ice::currentProtocolEncoding);
_connection->prepareBatchRequest(&os);
@@ -452,29 +512,16 @@ ConnectRequestHandler::flushRequests()
_connection->finishBatchRequest(&os, _compress);
delete req.os;
}
- _requests.pop_front();
+ catch(const RetryException&)
+ {
+ delete req.os;
+ }
+ catch(const Ice::Exception&)
+ {
+ delete req.os;
+ }
}
- }
- catch(const RetryException& ex)
- {
- //
- // If the connection dies shortly after connection
- // establishment, we don't systematically retry on
- // RetryException. We handle the exception like it
- // was an exception that occured while sending the
- // request.
- //
- Lock sync(*this);
- assert(!_exception.get() && !_requests.empty());
- _exception.reset(ex.get()->ice_clone());
- flushRequestsWithException();
- }
- catch(const Ice::LocalException& ex)
- {
- Lock sync(*this);
- assert(!_exception.get() && !_requests.empty());
- _exception.reset(ex.ice_clone());
- flushRequestsWithException();
+ _requests.pop_front();
}
//
@@ -483,7 +530,7 @@ ConnectRequestHandler::flushRequests()
// request handler to use the more efficient connection request
// handler.
//
- if(_reference->getCacheConnection() && !_exception.get())
+ if(_reference->getCacheConnection())
{
_connectionRequestHandler = new ConnectionRequestHandler(_reference, _connection, _compress);
for(set<Ice::ObjectPrx>::const_iterator p = _proxies.begin(); p != _proxies.end(); ++p)
@@ -495,18 +542,19 @@ ConnectRequestHandler::flushRequests()
{
Lock sync(*this);
assert(!_initialized);
- if(!_exception.get())
+ _initialized = true;
+ _flushing = false;
+ try
{
- _initialized = true;
- _flushing = false;
- try
- {
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- // Ignore
- }
+ //
+ // Only remove once all the requests are flushed to
+ // guarantee serialization.
+ //
+ _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
+ }
+ catch(const Ice::CommunicatorDestroyedException&)
+ {
+ // Ignore
}
_proxies.clear();
_proxy = 0; // Break cyclic reference count.
@@ -514,44 +562,3 @@ ConnectRequestHandler::flushRequests()
}
}
-void
-ConnectRequestHandler::flushRequestsWithException()
-{
- assert(_exception.get());
-
- //
- // NOTE: remove the request handler *before* notifying the
- // requests that the connection failed. It's important to ensure
- // that future invocations will obtain a new connect request
- // handler once invocations are notified.
- //
- try
- {
- _reference->getInstance()->requestHandlerFactory()->removeRequestHandler(_reference, this);
- }
- catch(const Ice::CommunicatorDestroyedException&)
- {
- // Ignore
- }
-
- for(deque<Request>::const_iterator p = _requests.begin(); p != _requests.end(); ++p)
- {
- if(p->out)
- {
- p->out->completed(*_exception.get());
- }
- else if(p->outAsync)
- {
- if(p->outAsync->completed(*_exception.get()))
- {
- p->outAsync->invokeCompletedAsync();
- }
- }
- else
- {
- assert(p->os);
- delete p->os;
- }
- }
- _requests.clear();
-}
diff --git a/cpp/src/Ice/ConnectRequestHandler.h b/cpp/src/Ice/ConnectRequestHandler.h
index 76f93803325..9bfb41a12f4 100644
--- a/cpp/src/Ice/ConnectRequestHandler.h
+++ b/cpp/src/Ice/ConnectRequestHandler.h
@@ -58,8 +58,6 @@ public:
virtual void addedProxy();
- void flushRequestsWithException();
-
private:
bool initialized();
diff --git a/cpp/src/Ice/Outgoing.cpp b/cpp/src/Ice/Outgoing.cpp
index 5f0361ba1ba..ee9464c28a8 100644
--- a/cpp/src/Ice/Outgoing.cpp
+++ b/cpp/src/Ice/Outgoing.cpp
@@ -196,6 +196,15 @@ Outgoing::completed(const Ice::Exception& ex)
_monitor.notify();
}
+void
+Outgoing::retryException(const Ice::Exception&)
+{
+ Monitor<Mutex>::Lock sync(_monitor);
+ assert(_state <= StateInProgress);
+ _state = StateRetry;
+ _monitor.notify();
+}
+
bool
Outgoing::invoke()
{
@@ -258,8 +267,12 @@ Outgoing::invoke()
{
Time now = Time::now(Time::Monotonic);
timedOut = now >= _invocationTimeoutDeadline;
- while((_state == StateInProgress || !_sent) && _state != StateFailed && !timedOut)
+ while((_state == StateInProgress || !_sent) && _state != StateFailed && _state != StateRetry)
{
+ if(timedOut)
+ {
+ break;
+ }
_monitor.timedWait(_invocationTimeoutDeadline - now);
if((_state == StateInProgress || !_sent) && _state != StateFailed)
@@ -271,7 +284,7 @@ Outgoing::invoke()
}
else
{
- while((_state == StateInProgress || !_sent) && _state != StateFailed)
+ while((_state == StateInProgress || !_sent) && _state != StateFailed && _state != StateRetry)
{
_monitor.wait();
}
@@ -305,6 +318,11 @@ Outgoing::invoke()
{
_exception->ice_throw();
}
+ else if(_state == StateRetry)
+ {
+ _proxy->__setRequestHandler(_handler, 0); // Clear request handler and retry.
+ continue;
+ }
else
{
assert(_state != StateInProgress);
@@ -715,3 +733,9 @@ FlushBatch::completed(const Ice::Exception& ex)
_exception.reset(ex.ice_clone());
_monitor.notify();
}
+
+void
+FlushBatch::retryException(const Ice::Exception& ex)
+{
+ completed(ex);
+}
diff --git a/cpp/src/Ice/OutgoingAsync.cpp b/cpp/src/Ice/OutgoingAsync.cpp
index 1719adc6b35..518867e06f2 100644
--- a/cpp/src/Ice/OutgoingAsync.cpp
+++ b/cpp/src/Ice/OutgoingAsync.cpp
@@ -40,6 +40,12 @@ OutgoingAsyncBase::completed(const Exception& ex)
return finished(ex);
}
+void
+OutgoingAsyncBase::retryException(const Exception& ex)
+{
+ assert(false);
+}
+
OutgoingAsyncBase::OutgoingAsyncBase(const CommunicatorPtr& communicator,
const InstancePtr& instance,
const string& operation,
@@ -112,6 +118,22 @@ ProxyOutgoingAsyncBase::completed(const Exception& exc)
}
}
+void
+ProxyOutgoingAsyncBase::retryException(const Exception& ex)
+{
+ try
+ {
+ handleRetryException(ex);
+ _instance->retryQueue()->add(this, 0);
+ }
+ catch(const Ice::Exception& exc)
+ {
+ if(completed(exc))
+ {
+ invokeCompletedAsync();
+ }
+ }
+}
void
ProxyOutgoingAsyncBase::cancelable(const CancellationHandlerPtr& handler)
@@ -212,7 +234,7 @@ ProxyOutgoingAsyncBase::invokeImpl(bool userThread)
}
catch(const RetryException& ex)
{
- handleRetryException(ex);
+ handleRetryException(*ex.get());
}
catch(const Exception& ex)
{
@@ -286,7 +308,7 @@ ProxyOutgoingAsyncBase::finished(bool ok)
}
void
-ProxyOutgoingAsyncBase::handleRetryException(const RetryException& exc)
+ProxyOutgoingAsyncBase::handleRetryException(const Ice::Exception&)
{
_proxy->__setRequestHandler(_handler, 0); // Clear request handler and always retry.
}
@@ -647,10 +669,10 @@ ProxyFlushBatch::invoke()
}
void
-ProxyFlushBatch::handleRetryException(const RetryException& ex)
+ProxyFlushBatch::handleRetryException(const Ice::Exception& ex)
{
_proxy->__setRequestHandler(_handler, 0); // Clear request handler
- ex.get()->ice_throw(); // No retries, we want to notify the user of potentially lost batch requests
+ ex.ice_throw(); // No retries, we want to notify the user of potentially lost batch requests
}
int
diff --git a/cpp/src/Ice/RequestHandlerFactory.cpp b/cpp/src/Ice/RequestHandlerFactory.cpp
index e21344c98d9..3a30cc8b182 100644
--- a/cpp/src/Ice/RequestHandlerFactory.cpp
+++ b/cpp/src/Ice/RequestHandlerFactory.cpp
@@ -61,8 +61,7 @@ IceInternal::RequestHandlerFactory::removeRequestHandler(const ReferencePtr& ref
{
Lock sync(*this);
map<ReferencePtr, RequestHandlerPtr>::iterator p = _handlers.find(ref);
- assert(p != _handlers.end() && p->second.get() == handler.get());
- if(p != _handlers.end())
+ if(p != _handlers.end() && p->second.get() == handler.get())
{
_handlers.erase(p);
}
diff --git a/cpp/test/Ice/background/AllTests.cpp b/cpp/test/Ice/background/AllTests.cpp
index e0aecf86f3e..c853c2485d6 100644
--- a/cpp/test/Ice/background/AllTests.cpp
+++ b/cpp/test/Ice/background/AllTests.cpp
@@ -1047,13 +1047,6 @@ readWriteTests(const ConfigurationPtr& configuration,
configuration->readReady(false); // Required in C# to make sure beginRead() doesn't throw too soon.
configuration->readException(new Ice::SocketException(__FILE__, __LINE__));
Ice::AsyncResultPtr r = background->begin_op();
- if(!r->sentSynchronously())
- {
- // The read exception might propagate before the message send is seen as completed on IOCP.
-#ifndef ICE_USE_IOCP
- test(r->isCompleted());
-#endif
- }
try
{
background->end_op(r);
@@ -1151,13 +1144,6 @@ readWriteTests(const ConfigurationPtr& configuration,
configuration->readReady(false);
configuration->readException(new Ice::SocketException(__FILE__, __LINE__));
Ice::AsyncResultPtr r = background->begin_op();
- if(!r->sentSynchronously())
- {
- // The read exception might propagate before the message send is seen as completed on IOCP.
-#ifndef ICE_USE_IOCP
- test(r->isCompleted());
-#endif
- }
try
{
background->end_op(r);