diff options
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/AsyncResult.java | 13 | ||||
-rw-r--r-- | java/src/Ice/ConnectionI.java | 113 | ||||
-rw-r--r-- | java/src/Ice/Dispatcher.java | 15 | ||||
-rw-r--r-- | java/src/Ice/InitializationData.java | 5 | ||||
-rw-r--r-- | java/src/IceInternal/ConnectRequestHandler.java | 20 | ||||
-rw-r--r-- | java/src/IceInternal/DispatchWorkItem.java | 54 |
6 files changed, 185 insertions, 35 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java index 67b3ecbe0b2..24b17fe10db 100644 --- a/java/src/Ice/AsyncResult.java +++ b/java/src/Ice/AsyncResult.java @@ -128,7 +128,8 @@ public class AsyncResult } if(_exception != null) { - throw (LocalException)_exception.fillInStackTrace(); // TODO: Correct? + //throw (LocalException)_exception.fillInStackTrace(); + throw _exception; } return (_state & OK) > 0; } @@ -158,12 +159,11 @@ public class AsyncResult // try { - _instance.clientThreadPool().execute(new IceInternal.ThreadPoolWorkItem() + _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_instance) { public void - execute(IceInternal.ThreadPoolCurrent current) + run() { - current.ioCompleted(); __exception(ex); } }); @@ -225,12 +225,11 @@ public class AsyncResult // try { - _instance.clientThreadPool().execute(new IceInternal.ThreadPoolWorkItem() + _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_instance) { public void - execute(IceInternal.ThreadPoolCurrent current) + run() { - current.ioCompleted(); __sentInternal(); } }); diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index eac4cb11e3b..9153385567e 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -1022,7 +1022,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if((current.operation & IceInternal.SocketOperation.Read) != 0) { - info = parseMessage(current.stream); + info = parseMessage(current.stream); // Optimization: use the thread's stream. } } } @@ -1066,10 +1066,55 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; } - + current.ioCompleted(); } + + if(_dispatcher != null) + { + if(info != null) + { + // + // Create a new stream for the dispatch instead of using the thread + // pool's thread stream. + // + assert(info.stream == current.stream); + IceInternal.BasicStream stream = info.stream; + info.stream = new IceInternal.BasicStream(_instance); + info.stream.swap(stream); + } + final StartCallback finalStartCB = startCB; + final java.util.List<OutgoingMessage> finalSentCBs = sentCBs; + final MessageInfo finalInfo = info; + try + { + _dispatcher.dispatch(new Runnable() + { + public void + run() + { + dispatch(finalStartCB, finalSentCBs, finalInfo); + } + }, this); + } + catch(java.lang.Exception ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + warning("dispatch exception", ex); + } + } + } + else + { + dispatch(startCB, sentCBs, info); + } + } + + protected void + dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) + { // // Notify the factory that the connection establishment and // validation has completed. @@ -1117,6 +1162,50 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void finished(IceInternal.ThreadPoolCurrent current) { + // + // Check if the connection needs to call user callbacks. If it doesn't, we + // can safely run finish() from this "IO thread". Otherwise, we either run + // finish() with the dispatcher if one is set, or we promote another IO + // thread first before calling finish(). + // + if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty()) + { + finish(); + return; + } + + if(_dispatcher == null) + { + current.ioCompleted(); + finish(); + } + else + { + try + { + _dispatcher.dispatch(new Runnable() + { + public void + run() + { + finish(); + } + }, + this); + } + catch(java.lang.Exception ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + warning("dispatch exception", ex); + } + } + } + } + + public void + finish() + { synchronized(this) { assert(_state == StateClosed); @@ -1125,29 +1214,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.SocketOperation.Connect); } - // - // If there are no callbacks to call, we don't call ioCompleted() since we're not going - // to call code that will potentially block (this avoids promoting a new leader and - // unecessary thread creation, especially if this is called on shutdown). - // - if(_startCallback != null || !_sendStreams.isEmpty() || !_asyncRequests.isEmpty()) - { - current.ioCompleted(); - } - if(_startCallback != null) { _startCallback.connectionStartFailed(this, _exception); _startCallback = null; } - + // // NOTE: for twoway requests which are not sent, finished can be called twice: the // first time because the outgoing is in the _sendStreams set and the second time // because it's either in the _requests/_asyncRequests set. This is fine, only the // first call should be taken into account by the implementation of finished. // - + for(OutgoingMessage p : _sendStreams) { if(p.requestId > 0) @@ -1164,7 +1243,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne p.finished(_exception); } _sendStreams.clear(); - + for(IceInternal.Outgoing p : _requests.values()) { p.finished(_exception, true); @@ -1176,7 +1255,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne p.__finished(_exception, true); } _asyncRequests.clear(); - + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -1302,6 +1381,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _endpoint = endpoint; _adapter = adapter; final Ice.InitializationData initData = instance.initializationData(); + _dispatcher = initData.dispatcher; // Cached for better performance. _logger = initData.logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. _timer = instance.timer(); @@ -2504,6 +2584,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private ObjectAdapter _adapter; private IceInternal.ServantManager _servantManager; + private final Dispatcher _dispatcher; private final Logger _logger; private final IceInternal.TraceLevels _traceLevels; private final IceInternal.ThreadPool _threadPool; diff --git a/java/src/Ice/Dispatcher.java b/java/src/Ice/Dispatcher.java new file mode 100644 index 00000000000..d5ab29d8ebd --- /dev/null +++ b/java/src/Ice/Dispatcher.java @@ -0,0 +1,15 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package Ice; + +public interface Dispatcher +{ + void dispatch(Runnable runnable, Ice.Connection con); +} diff --git a/java/src/Ice/InitializationData.java b/java/src/Ice/InitializationData.java index 7c89724bac1..8e13fc2bf22 100644 --- a/java/src/Ice/InitializationData.java +++ b/java/src/Ice/InitializationData.java @@ -72,4 +72,9 @@ public final class InitializationData implements Cloneable * The custom class loader for the communicator. **/ public ClassLoader classLoader; + + /** + * The call dispatcher for the communicator. + **/ + public Dispatcher dispatcher; } diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java index 8bd1ca9f085..044e41c7008 100644 --- a/java/src/IceInternal/ConnectRequestHandler.java +++ b/java/src/IceInternal/ConnectRequestHandler.java @@ -295,12 +295,11 @@ public class ConnectRequestHandler // if(!_requests.isEmpty()) { - _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance()) { public void - execute(ThreadPoolCurrent current) + run() { - current.ioCompleted(); flushRequestsWithException(ex); }; }); @@ -448,12 +447,11 @@ public class ConnectRequestHandler { assert(_exception == null && !_requests.isEmpty()); _exception = ex.get(); - _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance()) { public void - execute(ThreadPoolCurrent current) + run() { - current.ioCompleted(); flushRequestsWithException(ex); }; }); @@ -465,12 +463,11 @@ public class ConnectRequestHandler { assert(_exception == null && !_requests.isEmpty()); _exception = ex; - _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem() + _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance()) { public void - execute(ThreadPoolCurrent current) + run() { - current.ioCompleted(); flushRequestsWithException(ex); }; }); @@ -480,12 +477,11 @@ public class ConnectRequestHandler if(!sentCallbacks.isEmpty()) { final Instance instance = _reference.getInstance(); - instance.clientThreadPool().execute(new ThreadPoolWorkItem() + instance.clientThreadPool().execute(new DispatchWorkItem(instance) { public void - execute(ThreadPoolCurrent current) + run() { - current.ioCompleted(); for(OutgoingAsyncMessageCallback callback : sentCallbacks) { callback.__sent(); diff --git a/java/src/IceInternal/DispatchWorkItem.java b/java/src/IceInternal/DispatchWorkItem.java new file mode 100644 index 00000000000..bc71eff7c6a --- /dev/null +++ b/java/src/IceInternal/DispatchWorkItem.java @@ -0,0 +1,54 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +// +// A helper class for thread pool work items that only need to call user +// callbacks. If a dispatcher is installed with the communicator, the +// thread pool work item is executed with the dispatcher, otherwise it's +// executed by a thread pool thread (after promoting a follower thread). +// +abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable +{ + public DispatchWorkItem(Instance instance) + { + _instance = instance; + } + + final public void execute(ThreadPoolCurrent current) + { + Ice.Dispatcher dispatcher = _instance.initializationData().dispatcher; + if(dispatcher != null) + { + try + { + dispatcher.dispatch(this, null); + } + catch(java.lang.Exception ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + java.io.StringWriter sw = new java.io.StringWriter(); + java.io.PrintWriter pw = new java.io.PrintWriter(sw); + ex.printStackTrace(pw); + pw.flush(); + _instance.initializationData().logger.warning("dispatch exception:\n" + sw.toString()); + } + } + } + else + { + current.ioCompleted(); // Promote a follower. + this.run(); + } + } + + private Instance _instance; +} |