summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/AsyncResult.java24
-rw-r--r--java/src/Ice/ConnectionI.java169
-rw-r--r--java/src/IceInternal/BatchOutgoing.java2
-rw-r--r--java/src/IceInternal/BatchOutgoingAsync.java38
-rw-r--r--java/src/IceInternal/CollocatedRequestHandler.java97
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java25
-rw-r--r--java/src/IceInternal/DispatchWorkItem.java36
-rw-r--r--java/src/IceInternal/Outgoing.java20
-rw-r--r--java/src/IceInternal/OutgoingAsync.java30
-rw-r--r--java/src/IceInternal/OutgoingAsyncMessageCallback.java15
-rw-r--r--java/src/IceInternal/OutgoingMessageCallback.java2
-rw-r--r--java/src/IceInternal/ThreadPool.java32
-rw-r--r--java/src/IceInternal/ThreadPoolCurrent.java6
13 files changed, 247 insertions, 249 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java
index d3146d1533f..8667a1afb88 100644
--- a/java/src/Ice/AsyncResult.java
+++ b/java/src/Ice/AsyncResult.java
@@ -252,7 +252,7 @@ public class AsyncResult
//
try
{
- _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_cachedConnection)
+ _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
{
public void
run()
@@ -370,7 +370,7 @@ public class AsyncResult
//
try
{
- _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_cachedConnection)
+ _instance.clientThreadPool().dispatch(new IceInternal.DispatchWorkItem(_cachedConnection)
{
public void
run()
@@ -488,25 +488,7 @@ public class AsyncResult
if(handler != null)
{
- final IceInternal.RequestHandler h = handler;
- Ice.Connection connection = null;
- try
- {
- connection = handler.getConnection(false);
- }
- catch(Ice.LocalException e)
- {
- // Ignore.
- }
- _instance.clientThreadPool().execute(
- new IceInternal.DispatchWorkItem(connection)
- {
- public void
- run()
- {
- h.asyncRequestTimedOut((IceInternal.OutgoingAsyncMessageCallback)AsyncResult.this);
- }
- });
+ handler.asyncRequestTimedOut((IceInternal.OutgoingAsyncMessageCallback)this);
}
}
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index 15290246234..70fed8c7ec7 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -949,12 +949,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
// If the request is being sent, don't remove it from the send streams,
// it will be removed once the sending is finished.
//
- boolean isSent = o.timedOut();
+ o.timedOut();
if(o != _sendStreams.getFirst())
{
it.remove();
}
- out.finished(new InvocationTimeoutException(), isSent);
+ out.finished(new InvocationTimeoutException());
return; // We're done.
}
}
@@ -967,7 +967,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(it2.next() == o)
{
- o.finished(new InvocationTimeoutException(), true);
+ o.finished(new InvocationTimeoutException());
it2.remove();
return; // We're done.
}
@@ -978,60 +978,45 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
asyncRequestTimedOut(IceInternal.OutgoingAsyncMessageCallback outAsync)
{
- boolean isSent = false;
- boolean finished = false;
-
- synchronized(this)
+ java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
+ while(it.hasNext())
{
- java.util.Iterator<OutgoingMessage> it = _sendStreams.iterator();
- while(it.hasNext())
+ OutgoingMessage o = it.next();
+ if(o.outAsync == outAsync)
{
- OutgoingMessage o = it.next();
- if(o.outAsync == outAsync)
+ if(o.requestId > 0)
{
- if(o.requestId > 0)
- {
- _asyncRequests.remove(o.requestId);
- }
+ _asyncRequests.remove(o.requestId);
+ }
- //
- // If the request is being sent, don't remove it from the send streams,
- // it will be removed once the sending is finished.
- //
- isSent = o.timedOut();
- if(o != _sendStreams.getFirst())
- {
- it.remove();
- }
- finished = true;
- break; // We're done.
+ //
+ // If the request is being sent, don't remove it from the send streams,
+ // it will be removed once the sending is finished.
+ //
+ o.timedOut();
+ if(o != _sendStreams.getFirst())
+ {
+ it.remove();
}
+ outAsync.__dispatchInvocationTimeout(_threadPool, this);
+ return; // We're done
}
+ }
- if(!finished)
+ if(outAsync instanceof IceInternal.OutgoingAsync)
+ {
+ IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync;
+ java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator();
+ while(it2.hasNext())
{
- if(outAsync instanceof IceInternal.OutgoingAsync)
+ if(it2.next() == o)
{
- IceInternal.OutgoingAsync o = (IceInternal.OutgoingAsync)outAsync;
- java.util.Iterator<IceInternal.OutgoingAsync> it2 = _asyncRequests.values().iterator();
- while(it2.hasNext())
- {
- if(it2.next() == o)
- {
- it2.remove();
- finished = true;
- isSent = true;
- break; // We're done.
- }
- }
+ it2.remove();
+ outAsync.__dispatchInvocationTimeout(_threadPool, this);
+ return; // We're done.
}
}
}
-
- if(finished)
- {
- outAsync.__finished(new InvocationTimeoutException(), isSent);
- }
}
@@ -1418,7 +1403,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
current.ioCompleted();
}
- if(_dispatcher != null)
+ if(!_dispatcher) // Optimization, call dispatch() directly if there's no dispatcher.
+ {
+ dispatch(startCB, sentCBs, info);
+ }
+ else
{
if(info != null && info.heartbeatCallback == null) // No need for the stream if heartbeat callback
{
@@ -1435,28 +1424,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
final StartCallback finalStartCB = startCB;
final java.util.List<OutgoingMessage> finalSentCBs = sentCBs;
final MessageInfo finalInfo = info;
- try
- {
- _dispatcher.dispatch(new Runnable()
- {
- public void
- run()
- {
- dispatch(finalStartCB, finalSentCBs, finalInfo);
- }
- }, this);
- }
- catch(java.lang.Exception ex)
- {
- if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ _threadPool.dispatchFromThisThread(
+ new IceInternal.DispatchWorkItem(this)
{
- warning("dispatch exception", ex);
- }
- }
- }
- else
- {
- dispatch(startCB, sentCBs, info);
+ public void
+ run()
+ {
+ dispatch(finalStartCB, finalSentCBs, finalInfo);
+ }
+ });
}
}
@@ -1586,32 +1562,22 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return;
}
- if(_dispatcher == null)
+ current.ioCompleted();
+ if(!_dispatcher) // Optimization, call finish() directly if there's no dispatcher.
{
- current.ioCompleted();
finish();
}
else
{
- try
- {
- _dispatcher.dispatch(new Runnable()
- {
- public void
- run()
- {
- finish();
- }
- },
- this);
- }
- catch(java.lang.Exception ex)
- {
- if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ _threadPool.dispatchFromThisThread(
+ new IceInternal.DispatchWorkItem(this)
{
- warning("dispatch exception", ex);
- }
- }
+ public void
+ run()
+ {
+ finish();
+ }
+ });
}
}
@@ -1656,13 +1622,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
for(IceInternal.Outgoing p : _requests.values())
{
- p.finished(_exception, true);
+ p.finished(_exception);
}
_requests.clear();
for(IceInternal.OutgoingAsync p : _asyncRequests.values())
{
- p.__finished(_exception, true);
+ p.__finished(_exception);
}
_asyncRequests.clear();
@@ -1796,7 +1762,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_endpoint = endpoint;
_adapter = adapter;
final Ice.InitializationData initData = instance.initializationData();
- _dispatcher = initData.dispatcher; // Cached for better performance.
+ _dispatcher = initData.dispatcher != null; // Cached for better performance.
_logger = initData.logger; // Cached for better performance.
_traceLevels = instance.traceLevels(); // Cached for better performance.
_timer = instance.timer();
@@ -3093,7 +3059,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.stream = stream;
this.compress = compress;
this.adopt = adopt;
- this.isSent = false;
this.requestId = 0;
}
@@ -3104,7 +3069,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.compress = compress;
this.out = out;
this.requestId = requestId;
- this.isSent = false;
}
OutgoingMessage(IceInternal.OutgoingAsyncMessageCallback out, IceInternal.BasicStream stream, boolean compress,
@@ -3114,16 +3078,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
this.compress = compress;
this.outAsync = out;
this.requestId = requestId;
- this.isSent = false;
}
- public boolean
+ public void
timedOut()
{
- assert((out != null || outAsync != null) && !isSent);
+ assert((out != null || outAsync != null));
out = null;
outAsync = null;
- return isSent;
}
public void
@@ -3142,21 +3104,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public boolean
sent()
{
- isSent = true; // The message is sent.
-
if(out != null)
{
out.sent();
- return false;
}
else if(outAsync != null)
{
return outAsync.__sent();
}
- else
- {
- return false;
- }
+ return false;
}
public void
@@ -3164,11 +3120,11 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
if(out != null)
{
- out.finished(ex, isSent);
+ out.finished(ex);
}
else if(outAsync != null)
{
- outAsync.__finished(ex, isSent);
+ outAsync.__finished(ex);
}
}
@@ -3179,7 +3135,6 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public int requestId;
boolean adopt;
boolean prepared;
- boolean isSent;
}
private Communicator _communicator;
@@ -3194,7 +3149,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private ObjectAdapter _adapter;
private IceInternal.ServantManager _servantManager;
- private final Dispatcher _dispatcher;
+ private final boolean _dispatcher;
private final Logger _logger;
private final IceInternal.TraceLevels _traceLevels;
private final IceInternal.ThreadPool _threadPool;
diff --git a/java/src/IceInternal/BatchOutgoing.java b/java/src/IceInternal/BatchOutgoing.java
index 8647e89ccde..acee3cdd6de 100644
--- a/java/src/IceInternal/BatchOutgoing.java
+++ b/java/src/IceInternal/BatchOutgoing.java
@@ -182,7 +182,7 @@ public final class BatchOutgoing implements OutgoingMessageCallback
}
public synchronized void
- finished(Ice.Exception ex, boolean sent)
+ finished(Ice.Exception ex)
{
if(_childObserver != null)
{
diff --git a/java/src/IceInternal/BatchOutgoingAsync.java b/java/src/IceInternal/BatchOutgoingAsync.java
index 2b08cb9aa04..81dfe8a5a4c 100644
--- a/java/src/IceInternal/BatchOutgoingAsync.java
+++ b/java/src/IceInternal/BatchOutgoingAsync.java
@@ -58,26 +58,42 @@ public class BatchOutgoingAsync extends Ice.AsyncResult implements OutgoingAsync
}
public void
- __finished(Ice.Exception exc, boolean sent)
+ __finished(Ice.Exception exc)
{
- if(_childObserver != null)
- {
- _childObserver.failed(exc.ice_name());
- _childObserver.detach();
- _childObserver = null;
- }
- if(_timeoutRequestHandler != null)
+ synchronized(_monitor)
{
- _instance.timer().cancel(this);
- _timeoutRequestHandler = null;
+ if(_childObserver != null)
+ {
+ _childObserver.failed(exc.ice_name());
+ _childObserver.detach();
+ _childObserver = null;
+ }
+ if(_timeoutRequestHandler != null)
+ {
+ _instance.timer().cancel(this);
+ _timeoutRequestHandler = null;
+ }
}
__invokeException(exc);
}
public void
+ __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection)
+ {
+ threadPool.dispatch(
+ new DispatchWorkItem(connection)
+ {
+ public void
+ run()
+ {
+ BatchOutgoingAsync.this.__finished(new Ice.InvocationTimeoutException());
+ }
+ });
+ }
+
+ public void
runTimerTask()
{
__runTimerTask();
}
-
}
diff --git a/java/src/IceInternal/CollocatedRequestHandler.java b/java/src/IceInternal/CollocatedRequestHandler.java
index 245d714a64b..d857b443475 100644
--- a/java/src/IceInternal/CollocatedRequestHandler.java
+++ b/java/src/IceInternal/CollocatedRequestHandler.java
@@ -80,6 +80,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
CollocatedRequestHandler(Reference ref, Ice.ObjectAdapter adapter)
{
_reference = ref;
+ _dispatcher = ref.getInstance().initializationData().dispatcher != null;
_response = _reference.getMode() == Reference.ModeTwoway;
_adapter = (Ice.ObjectAdapterI)adapter;
@@ -150,7 +151,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
_batchAutoFlush);
stream.swap(_batchStream);
- _adapter.getThreadPool().execute(
+ _adapter.getThreadPool().dispatch(
new DispatchWorkItem()
{
public void
@@ -235,7 +236,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
_requests.remove(requestId);
}
- out.finished(new Ice.InvocationTimeoutException(), false);
+ out.finished(new Ice.InvocationTimeoutException());
_sendRequests.remove(out);
}
else if(out instanceof Outgoing)
@@ -246,7 +247,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
if(e.getValue() == o)
{
- o.finished(new Ice.InvocationTimeoutException(), true);
+ out.finished(new Ice.InvocationTimeoutException());
_requests.remove(e.getKey());
return; // We're done.
}
@@ -254,45 +255,35 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
}
- public void
+ synchronized public void
asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
{
- OutgoingAsyncMessageCallback out = null;
- boolean sent = false;
- synchronized(this)
+ Integer requestId = _sendAsyncRequests.get(outAsync);
+ if(requestId != null)
{
- Integer requestId = _sendAsyncRequests.get(outAsync);
- if(requestId != null)
+ if(requestId > 0)
{
- if(requestId > 0)
- {
- _asyncRequests.remove(requestId);
- }
- out = outAsync;
- sent = false;
- _sendAsyncRequests.remove(outAsync);
+ _asyncRequests.remove(requestId);
}
- else if(outAsync instanceof OutgoingAsync)
+ _sendAsyncRequests.remove(outAsync);
+ outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
+ return; // We're done
+ }
+
+ if(outAsync instanceof OutgoingAsync)
+ {
+ OutgoingAsync o = (OutgoingAsync)outAsync;
+ assert(o != null);
+ for(java.util.Map.Entry<Integer, OutgoingAsync> e : _asyncRequests.entrySet())
{
- OutgoingAsync o = (OutgoingAsync)outAsync;
- assert(o != null);
- for(java.util.Map.Entry<Integer, OutgoingAsync> e : _asyncRequests.entrySet())
+ if(e.getValue() == o)
{
- if(e.getValue() == o)
- {
- out = o;
- sent = true;
- _asyncRequests.remove(e.getKey());
- break;
- }
+ _asyncRequests.remove(e.getKey());
+ outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
+ return; // We're done
}
}
}
-
- if(out != null)
- {
- out.__finished(new Ice.InvocationTimeoutException(), sent);
- }
}
public void
@@ -319,9 +310,14 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(_reference.getInvocationTimeout() > 0)
{
- _adapter.getThreadPool().execute(new InvokeAll(out, out.os(), requestId, 1, false));
+ // Don't invoke from the user thread, invocation timeouts wouldn't work otherwise.
+ _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), requestId, 1, false));
}
- else
+ else if(_dispatcher)
+ {
+ _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), requestId, 1, false));
+ }
+ else // Optimization: directly call invokeAll if there's no dispatcher.
{
out.sent();
invokeAll(out.os(), requestId, 1, false);
@@ -350,7 +346,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
outAsync.__attachCollocatedObserver(_adapter, requestId);
- _adapter.getThreadPool().execute(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false));
+ _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), requestId, 1, false));
return AsyncStatus.Queued;
}
@@ -401,9 +397,13 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
if(_reference.getInvocationTimeout() > 0)
{
- _adapter.getThreadPool().execute(new InvokeAll(out, out.os(), 0, invokeNum, true));
+ _adapter.getThreadPool().dispatch(new InvokeAll(out, out.os(), 0, invokeNum, true));
}
- else
+ else if(_dispatcher)
+ {
+ _adapter.getThreadPool().dispatchFromThisThread(new InvokeAll(out, out.os(), 0, invokeNum, true));
+ }
+ else // Optimization: directly call invokeAll if there's no dispatcher.
{
out.sent();
invokeAll(out.os(), 0, invokeNum, true);
@@ -458,7 +458,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(invokeNum > 0)
{
- _adapter.getThreadPool().execute(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true));
+ _adapter.getThreadPool().dispatch(new InvokeAllAsync(outAsync, outAsync.__getOs(), 0, invokeNum, true));
return AsyncStatus.Queued;
}
else if(outAsync.__sent())
@@ -527,7 +527,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
Outgoing out = _requests.remove(requestId);
if(out != null)
{
- out.finished(ex, true);
+ out.finished(ex);
}
else
{
@@ -536,7 +536,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
if(outAsync != null)
{
- outAsync.__finished(ex, true);
+ outAsync.__finished(ex);
}
}
_adapter.decDirectCount();
@@ -565,13 +565,9 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
{
return false; // The request timed-out.
}
- out.sent();
}
}
- else
- {
- out.sent();
- }
+ out.sent();
return true;
}
@@ -632,7 +628,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
catch(Ice.ObjectAdapterDeactivatedException ex)
{
- handleException(requestId, ex, false);
+ handleException(requestId, ex);
return;
}
@@ -644,7 +640,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
catch(Ice.SystemException ex)
{
- handleException(requestId, ex, true);
+ handleException(requestId, ex);
_adapter.decDirectCount();
}
--invokeNum;
@@ -679,7 +675,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
}
void
- handleException(int requestId, Ice.Exception ex, boolean sent)
+ handleException(int requestId, Ice.Exception ex)
{
if(requestId == 0)
{
@@ -692,7 +688,7 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
Outgoing out = _requests.get(requestId);
if(out != null)
{
- out.finished(ex, sent);
+ out.finished(ex);
_requests.remove(requestId);
}
else
@@ -707,11 +703,12 @@ public class CollocatedRequestHandler implements RequestHandler, ResponseHandler
if(outAsync != null)
{
- outAsync.__finished(ex, sent);
+ outAsync.__finished(ex);
}
}
private final Reference _reference;
+ private final boolean _dispatcher;
private final boolean _response;
private final Ice.ObjectAdapterI _adapter;
private final Ice.Logger _logger;
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 0505688cdda..6c08f7dc261 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -203,7 +203,7 @@ public class ConnectRequestHandler
Request request = it.next();
if(request.out == out)
{
- out.finished(new Ice.InvocationTimeoutException(), false);
+ out.finished(new Ice.InvocationTimeoutException());
it.remove();
return;
}
@@ -217,7 +217,6 @@ public class ConnectRequestHandler
public void
asyncRequestTimedOut(OutgoingAsyncMessageCallback outAsync)
{
- boolean timedOut = false;
synchronized(this)
{
if(_exception != null)
@@ -234,17 +233,13 @@ public class ConnectRequestHandler
if(request.outAsync == outAsync)
{
it.remove();
- timedOut = true;
- break;
+ outAsync.__dispatchInvocationTimeout(_reference.getInstance().clientThreadPool(), null);
+ return; // We're done
}
}
+ assert(false); // The request has to be queued if it timed out and we're not initialized yet.
}
}
- if(timedOut)
- {
- outAsync.__finished(new Ice.InvocationTimeoutException(), false);
- return;
- }
_connection.asyncRequestTimedOut(outAsync);
}
@@ -333,7 +328,7 @@ public class ConnectRequestHandler
//
if(!_requests.isEmpty())
{
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection)
+ _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection)
{
public void
run()
@@ -487,7 +482,7 @@ public class ConnectRequestHandler
{
assert(_exception == null && !_requests.isEmpty());
_exception = ex.get();
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection)
+ _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection)
{
public void
run()
@@ -503,7 +498,7 @@ public class ConnectRequestHandler
{
assert(_exception == null && !_requests.isEmpty());
_exception = ex;
- _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_connection)
+ _reference.getInstance().clientThreadPool().dispatch(new DispatchWorkItem(_connection)
{
public void
run()
@@ -516,7 +511,7 @@ public class ConnectRequestHandler
if(!sentCallbacks.isEmpty())
{
- _reference.getInstance().clientThreadPool().execute(
+ _reference.getInstance().clientThreadPool().dispatch(
new DispatchWorkItem(_connection)
{
public void
@@ -564,11 +559,11 @@ public class ConnectRequestHandler
{
if(request.out != null)
{
- request.out.finished(_exception, false);
+ request.out.finished(_exception);
}
else if(request.outAsync != null)
{
- request.outAsync.__finished(_exception, false);
+ request.outAsync.__finished(_exception);
}
}
_requests.clear();
diff --git a/java/src/IceInternal/DispatchWorkItem.java b/java/src/IceInternal/DispatchWorkItem.java
index a7c5ef7fa57..94fea26ada5 100644
--- a/java/src/IceInternal/DispatchWorkItem.java
+++ b/java/src/IceInternal/DispatchWorkItem.java
@@ -26,33 +26,17 @@ abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable
_connection = connection;
}
- final public void execute(ThreadPoolCurrent current)
+ final public void
+ execute(ThreadPoolCurrent current)
{
- Instance instance = current.stream.instance();
- Ice.Dispatcher dispatcher = instance.initializationData().dispatcher;
- if(dispatcher != null)
- {
- try
- {
- dispatcher.dispatch(this, _connection);
- }
- catch(java.lang.Exception ex)
- {
- if(instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
- {
- java.io.StringWriter sw = new java.io.StringWriter();
- java.io.PrintWriter pw = new java.io.PrintWriter(sw);
- ex.printStackTrace(pw);
- pw.flush();
- instance.initializationData().logger.warning("dispatch exception:\n" + sw.toString());
- }
- }
- }
- else
- {
- current.ioCompleted(); // Promote a follower.
- this.run();
- }
+ current.ioCompleted(); // Promote a follower
+ current.dispatchFromThisThread(this);
+ }
+
+ public Ice.Connection
+ getConnection()
+ {
+ return _connection;
}
private Ice.Connection _connection;
diff --git a/java/src/IceInternal/Outgoing.java b/java/src/IceInternal/Outgoing.java
index 3a14e835110..69d7b3d6cad 100644
--- a/java/src/IceInternal/Outgoing.java
+++ b/java/src/IceInternal/Outgoing.java
@@ -415,9 +415,24 @@ public final class Outgoing implements OutgoingMessageCallback
}
public synchronized void
- finished(Ice.Exception ex, boolean sent)
+ finished(Ice.Exception ex)
{
- assert(_state <= StateInProgress);
+ //assert(_state <= StateInProgress);
+ if(_state > StateInProgress)
+ {
+ //
+ // Response was already received but message
+ // didn't get removed first from the connection
+ // send message queue so it's possible we can be
+ // notified of failures. In this case, ignore the
+ // failure and assume the outgoing has been sent.
+ //
+ assert(_state != StateFailed);
+ _sent = true;
+ notify();
+ return;
+ }
+
if(_childObserver != null)
{
_childObserver.failed(ex.ice_name());
@@ -426,7 +441,6 @@ public final class Outgoing implements OutgoingMessageCallback
}
_state = StateFailed;
_exception = ex;
- _sent = sent;
notify();
}
diff --git a/java/src/IceInternal/OutgoingAsync.java b/java/src/IceInternal/OutgoingAsync.java
index 38e2eea29d7..e5e143a7e71 100644
--- a/java/src/IceInternal/OutgoingAsync.java
+++ b/java/src/IceInternal/OutgoingAsync.java
@@ -23,6 +23,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
{
_handler = null;
_cnt = 0;
+ _sent = false;
_mode = mode;
_sentSynchronously = false;
@@ -120,6 +121,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
{
boolean alreadySent = (_state & Sent) != 0;
_state |= Sent;
+ _sent = true;
assert((_state & Done) == 0);
@@ -150,7 +152,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
public void
- __finished(Ice.Exception exc, boolean sent)
+ __finished(Ice.Exception exc)
{
synchronized(_monitor)
{
@@ -174,7 +176,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
//
try
{
- if(!handleException(exc, sent))
+ if(!handleException(exc))
{
return; // Can't be retried immediately.
}
@@ -187,6 +189,20 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
}
+ public void
+ __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection)
+ {
+ threadPool.dispatch(
+ new DispatchWorkItem(connection)
+ {
+ public void
+ run()
+ {
+ OutgoingAsync.this.__finished(new Ice.InvocationTimeoutException());
+ }
+ });
+ }
+
public final void
__finished(BasicStream is)
{
@@ -350,7 +366,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
catch(Ice.LocalException ex)
{
- __finished(ex, true);
+ __finished(ex);
return;
}
@@ -365,6 +381,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
{
try
{
+ _sent = false;
_handler = _proxy.__getRequestHandler(true);
int status = _handler.sendAsyncRequest(this);
if((status & AsyncStatus.Sent) > 0)
@@ -409,7 +426,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
catch(Ice.Exception ex)
{
- if(!handleException(ex, false)) // This will throw if the invocation can't be retried.
+ if(!handleException(ex)) // This will throw if the invocation can't be retried.
{
break; // Can't be retried immediately.
}
@@ -463,12 +480,12 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
}
private boolean
- handleException(Ice.Exception exc, boolean sent)
+ handleException(Ice.Exception exc)
{
try
{
Ice.IntHolder interval = new Ice.IntHolder();
- _cnt = _proxy.__handleException(exc, _handler, _mode, sent, interval, _cnt);
+ _cnt = _proxy.__handleException(exc, _handler, _mode, _sent, interval, _cnt);
if(_observer != null)
{
_observer.retried(); // Invocation is being retried.
@@ -499,6 +516,7 @@ public class OutgoingAsync extends Ice.AsyncResult implements OutgoingAsyncMessa
private Ice.EncodingVersion _encoding;
private int _cnt;
private Ice.OperationMode _mode;
+ private boolean _sent;
private static final java.util.Map<String, String> _emptyContext = new java.util.HashMap<String, String>();
}
diff --git a/java/src/IceInternal/OutgoingAsyncMessageCallback.java b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
index 8d830987428..4b6b73a8b06 100644
--- a/java/src/IceInternal/OutgoingAsyncMessageCallback.java
+++ b/java/src/IceInternal/OutgoingAsyncMessageCallback.java
@@ -30,17 +30,20 @@ public interface OutgoingAsyncMessageCallback
// connection will call the __invokeSent() method bellow (which in turn should call the
// sent callback).
//
- public abstract boolean __sent();
+ boolean __sent();
//
// Called by the connection to call the user sent callback.
//
- public abstract void __invokeSent();
+ void __invokeSent();
//
- // Called by the connection when the request failed. The boolean indicates whether or
- // not the message was possibly sent (this is useful for retry to figure out whether
- // or not the request can't be retried without breaking at-most-once semantics.)
+ // Called by the connection when the request failed.
//
- public abstract void __finished(Ice.Exception ex, boolean sent);
+ void __finished(Ice.Exception ex);
+
+ //
+ // Helper to dispatch invocation timeout.
+ //
+ void __dispatchInvocationTimeout(ThreadPool threadPool, Ice.Connection connection);
}
diff --git a/java/src/IceInternal/OutgoingMessageCallback.java b/java/src/IceInternal/OutgoingMessageCallback.java
index 18b18a09575..1a8426b2e19 100644
--- a/java/src/IceInternal/OutgoingMessageCallback.java
+++ b/java/src/IceInternal/OutgoingMessageCallback.java
@@ -18,5 +18,5 @@ public interface OutgoingMessageCallback
void sent();
- void finished(Ice.Exception ex, boolean sent);
+ void finished(Ice.Exception ex);
}
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index 8ab584e9623..8d350b1e19c 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -82,6 +82,7 @@ public final class ThreadPool
ThreadPool(Instance instance, String prefix, int timeout)
{
_instance = instance;
+ _dispatcher = instance.initializationData().dispatcher;
_destroyed = false;
_prefix = prefix;
_selector = new Selector(instance);
@@ -278,7 +279,6 @@ public final class ThreadPool
{
return;
}
-
_selector.update(handler, remove, add);
if((add & SocketOperation.Read) != 0 && handler._hasMoreData.value &&
@@ -311,7 +311,34 @@ public final class ThreadPool
}
public void
- execute(ThreadPoolWorkItem workItem)
+ dispatchFromThisThread(DispatchWorkItem workItem)
+ {
+ if(_dispatcher != null)
+ {
+ try
+ {
+ _dispatcher.dispatch(workItem, workItem.getConnection());
+ }
+ catch(java.lang.Exception ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ _instance.initializationData().logger.warning("dispatch exception:\n" + sw.toString());
+ }
+ }
+ }
+ else
+ {
+ workItem.run();
+ }
+ }
+
+ public void
+ dispatch(DispatchWorkItem workItem)
{
_workQueue.queue(workItem);
}
@@ -698,6 +725,7 @@ public final class ThreadPool
}
private final Instance _instance;
+ private final Ice.Dispatcher _dispatcher;
private final ThreadPoolWorkQueue _workQueue;
private boolean _destroyed;
private final String _prefix;
diff --git a/java/src/IceInternal/ThreadPoolCurrent.java b/java/src/IceInternal/ThreadPoolCurrent.java
index e278fcb917b..7b81127016c 100644
--- a/java/src/IceInternal/ThreadPoolCurrent.java
+++ b/java/src/IceInternal/ThreadPoolCurrent.java
@@ -31,6 +31,12 @@ public final class ThreadPoolCurrent
_threadPool.ioCompleted(this);
}
+ public void
+ dispatchFromThisThread(DispatchWorkItem workItem)
+ {
+ _threadPool.dispatchFromThisThread(workItem);
+ }
+
final ThreadPool _threadPool;
final ThreadPool.EventHandlerThread _thread;
EventHandler _handler;