summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--cpp/src/Ice/AsyncResult.cpp4
-rw-r--r--cpp/src/Ice/CollocatedRequestHandler.cpp42
-rw-r--r--cs/src/Ice/AsyncResult.cs4
-rw-r--r--cs/src/Ice/CollocatedRequestHandler.cs20
-rw-r--r--java/src/Ice/src/main/java/IceInternal/AsyncResultI.java4
-rw-r--r--java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java52
6 files changed, 50 insertions, 76 deletions
diff --git a/cpp/src/Ice/AsyncResult.cpp b/cpp/src/Ice/AsyncResult.cpp
index dc1b5689339..f1829b06ef3 100644
--- a/cpp/src/Ice/AsyncResult.cpp
+++ b/cpp/src/Ice/AsyncResult.cpp
@@ -406,6 +406,7 @@ AsyncResult::invokeCompleted()
void
AsyncResult::cancel(const Ice::LocalException& ex)
{
+ CancellationHandlerPtr handler;
{
IceUtil::Monitor<IceUtil::Mutex>::Lock sync(_monitor);
_cancellationException.reset(ex.ice_clone());
@@ -413,8 +414,9 @@ AsyncResult::cancel(const Ice::LocalException& ex)
{
return;
}
+ handler = _cancellationHandler;
}
- _cancellationHandler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), ex);
+ handler->asyncRequestCanceled(OutgoingAsyncBasePtr::dynamicCast(this), ex);
}
void
diff --git a/cpp/src/Ice/CollocatedRequestHandler.cpp b/cpp/src/Ice/CollocatedRequestHandler.cpp
index 99555c76998..8ec91cdd620 100644
--- a/cpp/src/Ice/CollocatedRequestHandler.cpp
+++ b/cpp/src/Ice/CollocatedRequestHandler.cpp
@@ -341,7 +341,6 @@ void
CollocatedRequestHandler::invokeRequest(Outgoing* out)
{
int requestId = 0;
- if(_reference->getInvocationTimeout() > 0 || _response)
{
Lock sync(*this);
if(_response)
@@ -349,10 +348,7 @@ CollocatedRequestHandler::invokeRequest(Outgoing* out)
requestId = ++_requestId;
_requests.insert(make_pair(requestId, out));
}
- if(_reference->getInvocationTimeout() > 0)
- {
- _sendRequests.insert(make_pair(out, requestId));
- }
+ _sendRequests.insert(make_pair(out, requestId));
}
out->attachCollocatedObserver(_adapter, requestId);
@@ -377,7 +373,6 @@ AsyncStatus
CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
{
int requestId = 0;
- if(_reference->getInvocationTimeout() > 0 || _response)
{
Lock sync(*this);
@@ -388,10 +383,7 @@ CollocatedRequestHandler::invokeAsyncRequest(OutgoingAsync* outAsync)
requestId = ++_requestId;
_asyncRequests.insert(make_pair(requestId, outAsync));
}
- if(_reference->getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.insert(make_pair(outAsync, requestId));
- }
+ _sendAsyncRequests.insert(make_pair(outAsync, requestId));
}
outAsync->attachCollocatedObserver(_adapter, requestId);
@@ -415,10 +407,7 @@ CollocatedRequestHandler::invokeBatchRequests(OutgoingBase* out)
if(_batchRequestNum > 0)
{
- if(_reference->getInvocationTimeout() > 0)
- {
- _sendRequests.insert(make_pair(out, 0));
- }
+ _sendRequests.insert(make_pair(out, 0));
assert(!_batchStream.b.empty());
_batchStream.swap(*out->os());
@@ -473,10 +462,7 @@ CollocatedRequestHandler::invokeAsyncBatchRequests(OutgoingAsyncBase* outAsync)
{
outAsync->cancelable(this); // This will throw if the request is canceled
- if(_reference->getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.insert(make_pair(outAsync, 0));
- }
+ _sendAsyncRequests.insert(make_pair(outAsync, 0));
assert(!_batchStream.b.empty());
_batchStream.swap(*outAsync->getOs());
@@ -589,13 +575,10 @@ CollocatedRequestHandler::waitForConnection()
bool
CollocatedRequestHandler::sent(OutgoingBase* out)
{
- if(_reference->getInvocationTimeout() > 0)
+ Lock sync(*this);
+ if(_sendRequests.erase(out) == 0)
{
- Lock sync(*this);
- if(_sendRequests.erase(out) == 0)
- {
- return false; // The request timed-out.
- }
+ return false; // The request timed-out.
}
out->sent();
return true;
@@ -604,18 +587,19 @@ CollocatedRequestHandler::sent(OutgoingBase* out)
bool
CollocatedRequestHandler::sentAsync(OutgoingAsyncBase* outAsync)
{
- if(_reference->getInvocationTimeout() > 0)
{
Lock sync(*this);
if(_sendAsyncRequests.erase(outAsync) == 0)
{
return false; // The request timed-out.
}
+
+ if(!outAsync->sent())
+ {
+ return true;
+ }
}
- if(outAsync->sent())
- {
- outAsync->invokeSent();
- }
+ outAsync->invokeSent();
return true;
}
diff --git a/cs/src/Ice/AsyncResult.cs b/cs/src/Ice/AsyncResult.cs
index 39944f6871e..01ed3d7e60a 100644
--- a/cs/src/Ice/AsyncResult.cs
+++ b/cs/src/Ice/AsyncResult.cs
@@ -626,6 +626,7 @@ namespace IceInternal
protected void cancel(Ice.LocalException ex)
{
+ CancellationHandler handler;
lock(this)
{
_cancellationException = ex;
@@ -633,8 +634,9 @@ namespace IceInternal
{
return;
}
+ handler = _cancellationHandler;
}
- _cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
+ handler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
}
protected virtual Ice.Instrumentation.InvocationObserver getObserver()
diff --git a/cs/src/Ice/CollocatedRequestHandler.cs b/cs/src/Ice/CollocatedRequestHandler.cs
index d49635f415e..4b41167b177 100644
--- a/cs/src/Ice/CollocatedRequestHandler.cs
+++ b/cs/src/Ice/CollocatedRequestHandler.cs
@@ -270,7 +270,6 @@ namespace IceInternal
public bool invokeAsyncRequest(OutgoingAsync outAsync, bool synchronous, out Ice.AsyncCallback sentCallback)
{
int requestId = 0;
- if(_reference.getInvocationTimeout() > 0 || _response)
{
lock(this)
{
@@ -281,10 +280,7 @@ namespace IceInternal
requestId = ++_requestId;
_asyncRequests.Add(requestId, outAsync);
}
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.Add(outAsync, requestId);
- }
+ _sendAsyncRequests.Add(outAsync, requestId);
}
}
@@ -354,10 +350,7 @@ namespace IceInternal
{
outAsync.cancelable(this); // This will throw if the request is canceled
- if(_reference.getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.Add(outAsync, 0);
- }
+ _sendAsyncRequests.Add(outAsync, 0);
Debug.Assert(!_batchStream.isEmpty());
_batchStream.swap(outAsync.getOs());
@@ -396,14 +389,11 @@ namespace IceInternal
private bool sentAsync(OutgoingAsyncBase outAsync)
{
- if(_reference.getInvocationTimeout() > 0)
+ lock(this)
{
- lock(this)
+ if(!_sendAsyncRequests.Remove(outAsync))
{
- if(!_sendAsyncRequests.Remove(outAsync))
- {
- return false; // The request timed-out.
- }
+ return false; // The request timed-out.
}
}
diff --git a/java/src/Ice/src/main/java/IceInternal/AsyncResultI.java b/java/src/Ice/src/main/java/IceInternal/AsyncResultI.java
index d4b7b8621a4..a309ca4a2b9 100644
--- a/java/src/Ice/src/main/java/IceInternal/AsyncResultI.java
+++ b/java/src/Ice/src/main/java/IceInternal/AsyncResultI.java
@@ -400,6 +400,7 @@ public class AsyncResultI implements AsyncResult
protected void cancel(Ice.LocalException ex)
{
+ CancellationHandler handler;
synchronized(this)
{
_cancellationException = ex;
@@ -407,8 +408,9 @@ public class AsyncResultI implements AsyncResult
{
return;
}
+ handler = _cancellationHandler;
}
- _cancellationHandler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
+ handler.asyncRequestCanceled((OutgoingAsyncBase)this, ex);
}
protected Ice.Instrumentation.InvocationObserver getObserver()
diff --git a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
index 4bffef02b63..609cc1b40c6 100644
--- a/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/Ice/src/main/java/IceInternal/CollocatedRequestHandler.java
@@ -182,14 +182,13 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
synchronized public void
asyncRequestCanceled(OutgoingAsyncBase outAsync, Ice.LocalException ex)
{
- Integer requestId = _sendAsyncRequests.get(outAsync);
+ Integer requestId = _sendAsyncRequests.remove(outAsync);
if(requestId != null)
{
if(requestId > 0)
{
_asyncRequests.remove(requestId);
}
- _sendAsyncRequests.remove(outAsync);
if(outAsync.completed(ex))
{
outAsync.invokeCompletedAsync();
@@ -295,22 +294,16 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
int invokeAsyncRequest(OutgoingAsync outAsync, boolean synchronous)
{
int requestId = 0;
- if((_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0) || _response)
+ synchronized(this)
{
- synchronized(this)
- {
- outAsync.cancelable(this); // This will throw if the request is canceled
+ outAsync.cancelable(this); // This will throw if the request is canceled
- if(_response)
- {
- requestId = ++_requestId;
- _asyncRequests.put(requestId, outAsync);
- }
- if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.put(outAsync, requestId);
- }
+ if(_response)
+ {
+ requestId = ++_requestId;
+ _asyncRequests.put(requestId, outAsync);
}
+ _sendAsyncRequests.put(outAsync, requestId);
}
outAsync.attachCollocatedObserver(_adapter, requestId);
@@ -357,10 +350,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
outAsync.cancelable(this); // This will throw if the request is canceled
- if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
- {
- _sendAsyncRequests.put(outAsync, 0);
- }
+ _sendAsyncRequests.put(outAsync, 0);
assert(!_batchStream.isEmpty());
_batchStream.swap(outAsync.getOs());
@@ -395,21 +385,25 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
private boolean
sentAsync(final OutgoingAsyncBase outAsync)
{
- if(_reference.getInstance().queueRequests() || _reference.getInvocationTimeout() > 0)
+ synchronized(this)
{
- synchronized(this)
+ if(_sendAsyncRequests.remove(outAsync) == null)
{
- if(_sendAsyncRequests.remove(outAsync) == null)
- {
- return false; // The request timed-out.
- }
+ return false; // The request timed-out.
}
- }
- if(outAsync.sent())
- {
- outAsync.invokeSent();
+ //
+ // This must be called within the synchronization to
+ // ensure completed(ex) can't be called concurrently if
+ // the request is canceled.
+ //
+ if(!outAsync.sent())
+ {
+ return true;
+ }
}
+
+ outAsync.invokeSent();
return true;
}