summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java96
1 files changed, 52 insertions, 44 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 73880042340..9ee6ac10121 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -9,7 +9,8 @@
package Ice;
-public final class ConnectionI extends IceInternal.EventHandler implements Connection, IceInternal.ResponseHandler
+public final class ConnectionI extends IceInternal.EventHandler
+ implements Connection, IceInternal.ResponseHandler, IceInternal.CancellationHandler
{
public interface StartCallback
{
@@ -373,8 +374,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
os.writeInt(requestId);
}
- out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId, os.size() -
- IceInternal.Protocol.headerSize - 4);
+ out.attachRemoteObserver(initConnectionInfo(), _endpoint, requestId);
int status;
try
@@ -388,6 +388,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw (Ice.LocalException) _exception.fillInStackTrace();
}
+ if(response || (status & IceInternal.AsyncStatus.Queued) > 0)
+ {
+ out.cancelable(this); // Notify the request that it's cancelable
+ }
+
if(response)
{
//
@@ -640,29 +645,21 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private Ice.AsyncResult begin_flushBatchRequestsInternal(IceInternal.CallbackBase cb)
{
- IceInternal.ConnectionBatchOutgoingAsync result = new IceInternal.ConnectionBatchOutgoingAsync(this,
- _communicator, _instance, __flushBatchRequests_name, cb);
- try
- {
- result.__invoke();
- }
- catch(LocalException __ex)
- {
- result.invokeExceptionAsync(__ex);
- }
-
+ IceInternal.ConnectionFlushBatch result =
+ new IceInternal.ConnectionFlushBatch(this, _communicator, _instance, __flushBatchRequests_name, cb);
+ result.invoke();
return result;
}
@Override
public void end_flushBatchRequests(AsyncResult ir)
{
- IceInternal.OutgoingAsyncBase r = (IceInternal.OutgoingAsyncBase) ir;
- IceInternal.OutgoingAsyncBase.check(r, this, __flushBatchRequests_name);
+ IceInternal.ConnectionFlushBatch r =
+ IceInternal.ConnectionFlushBatch.check(ir, this, __flushBatchRequests_name);
r.__wait();
}
- synchronized public int flushAsyncBatchRequests(IceInternal.BatchOutgoingAsync outAsync)
+ synchronized public int flushAsyncBatchRequests(IceInternal.OutgoingAsyncBase outAsync)
{
waitBatchStreamInUse();
@@ -687,11 +684,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_batchStream.pos(IceInternal.Protocol.headerSize);
_batchStream.writeInt(_batchRequestNum);
- outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0, _batchStream.size() -
- IceInternal.Protocol.headerSize - 4);
-
_batchStream.swap(outAsync.getOs());
+ outAsync.attachRemoteObserver(initConnectionInfo(), _endpoint, 0);
+
//
// Send the batch stream.
//
@@ -708,6 +704,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
throw (Ice.LocalException) _exception.fillInStackTrace();
}
+ if((status & IceInternal.AsyncStatus.Queued) > 0)
+ {
+ outAsync.cancelable(this); // Notify the request that it's cancelable.
+ }
+
//
// Reset the batch stream.
//
@@ -728,12 +729,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(callback != null)
{
- class CallbackWorkItem extends IceInternal.DispatchWorkItem
+ _threadPool.dispatch(new IceInternal.DispatchWorkItem(this)
{
- public CallbackWorkItem()
- {
- }
-
@Override
public void run()
{
@@ -746,8 +743,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_logger.error("connection callback exception:\n" + ex + '\n' + _desc);
}
}
- };
- _threadPool.dispatch(new CallbackWorkItem());
+ });
}
}
else
@@ -793,9 +789,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return _monitor != null ? _monitor.getACM() : new ACM(0, ACMClose.CloseOff, ACMHeartbeat.HeartbeatOff);
}
- synchronized public boolean asyncRequestCanceled(IceInternal.OutgoingAsyncMessageCallback outAsync,
- Ice.LocalException ex)
+ @Override
+ synchronized public void asyncRequestCanceled(IceInternal.OutgoingAsyncBase outAsync, Ice.LocalException ex)
{
+ if(_state >= StateClosed)
+ {
+ return; // The request has already been or will be shortly notified of the failure.
+ }
+
java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
while(it.hasNext())
{
@@ -815,13 +816,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// it's fine if the OutgoingAsync output stream is released (and
// as long as canceled requests cannot be retried).
//
- o.timedOut();
+ o.canceled();
if(o != _sendStreams.getFirst())
{
it.remove();
}
- outAsync.dispatchInvocationCancel(ex, _threadPool, this);
- return true; // We're done
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
+ return;
}
}
@@ -834,12 +838,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if(it2.next() == o)
{
it2.remove();
- outAsync.dispatchInvocationCancel(ex, _threadPool, this);
- return true; // We're done.
+ if(outAsync.completed(ex))
+ {
+ outAsync.invokeCompletedAsync();
+ }
}
}
}
- return false;
}
@Override
@@ -1469,7 +1474,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
for(OutgoingMessage p : _sendStreams)
{
- p.finished(_exception);
+ p.completed(_exception);
if(p.requestId > 0) // Make sure finished isn't called twice.
{
_asyncRequests.remove(p.requestId);
@@ -1480,7 +1485,10 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
for(IceInternal.OutgoingAsync p : _asyncRequests.values())
{
- p.finished(_exception);
+ if(p.completed(_exception))
+ {
+ p.invokeCompleted();
+ }
}
_asyncRequests.clear();
@@ -2580,7 +2588,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.TraceUtil.traceRecv(info.stream, _logger, _traceLevels);
info.requestId = info.stream.readInt();
IceInternal.OutgoingAsync outAsync = _asyncRequests.remove(info.requestId);
- if(outAsync != null && outAsync.finished(info.stream))
+ if(outAsync != null && outAsync.completed(info.stream))
{
info.outAsync = outAsync;
++info.messageDispatchCount;
@@ -2999,7 +3007,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.requestId = 0;
}
- OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress,
+ OutgoingMessage(IceInternal.OutgoingAsyncBase out, IceInternal.BasicStream stream, boolean compress,
int requestId)
{
this.stream = stream;
@@ -3008,7 +3016,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.requestId = requestId;
}
- public void timedOut()
+ public void canceled()
{
assert (outAsync != null);
outAsync = null;
@@ -3035,16 +3043,16 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return false;
}
- public void finished(Ice.LocalException ex)
+ public void completed(Ice.LocalException ex)
{
- if(outAsync != null)
+ if(outAsync != null && outAsync.completed(ex))
{
- outAsync.finished(ex);
+ outAsync.invokeCompleted();
}
}
public IceInternal.BasicStream stream;
- public IceInternal.OutgoingAsyncMessageCallback outAsync;
+ public IceInternal.OutgoingAsyncBase outAsync;
public boolean compress;
public int requestId;
boolean adopt;