diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-12-01 14:02:05 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-12-01 14:02:05 +0100 |
commit | cd63c3e37dde04051fb2f4631f788bed7b48937b (patch) | |
tree | ae2c581bd228ec42c6eb46bf83a88f1174623bac /java/src/Ice/ConnectionI.java | |
parent | SLES RPM fixes (diff) | |
download | ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.tar.bz2 ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.tar.xz ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.zip |
Added support for Ice::Dispatcher
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 113 |
1 files changed, 97 insertions, 16 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index eac4cb11e3b..9153385567e 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -1022,7 +1022,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne if((current.operation & IceInternal.SocketOperation.Read) != 0) { - info = parseMessage(current.stream); + info = parseMessage(current.stream); // Optimization: use the thread's stream. } } } @@ -1066,10 +1066,55 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { _acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000; } - + current.ioCompleted(); } + + if(_dispatcher != null) + { + if(info != null) + { + // + // Create a new stream for the dispatch instead of using the thread + // pool's thread stream. + // + assert(info.stream == current.stream); + IceInternal.BasicStream stream = info.stream; + info.stream = new IceInternal.BasicStream(_instance); + info.stream.swap(stream); + } + final StartCallback finalStartCB = startCB; + final java.util.List<OutgoingMessage> finalSentCBs = sentCBs; + final MessageInfo finalInfo = info; + try + { + _dispatcher.dispatch(new Runnable() + { + public void + run() + { + dispatch(finalStartCB, finalSentCBs, finalInfo); + } + }, this); + } + catch(java.lang.Exception ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + warning("dispatch exception", ex); + } + } + } + else + { + dispatch(startCB, sentCBs, info); + } + } + + protected void + dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info) + { // // Notify the factory that the connection establishment and // validation has completed. @@ -1117,6 +1162,50 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne public void finished(IceInternal.ThreadPoolCurrent current) { + // + // Check if the connection needs to call user callbacks. If it doesn't, we + // can safely run finish() from this "IO thread". Otherwise, we either run + // finish() with the dispatcher if one is set, or we promote another IO + // thread first before calling finish(). + // + if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty()) + { + finish(); + return; + } + + if(_dispatcher == null) + { + current.ioCompleted(); + finish(); + } + else + { + try + { + _dispatcher.dispatch(new Runnable() + { + public void + run() + { + finish(); + } + }, + this); + } + catch(java.lang.Exception ex) + { + if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1) + { + warning("dispatch exception", ex); + } + } + } + } + + public void + finish() + { synchronized(this) { assert(_state == StateClosed); @@ -1125,29 +1214,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne IceInternal.SocketOperation.Connect); } - // - // If there are no callbacks to call, we don't call ioCompleted() since we're not going - // to call code that will potentially block (this avoids promoting a new leader and - // unecessary thread creation, especially if this is called on shutdown). - // - if(_startCallback != null || !_sendStreams.isEmpty() || !_asyncRequests.isEmpty()) - { - current.ioCompleted(); - } - if(_startCallback != null) { _startCallback.connectionStartFailed(this, _exception); _startCallback = null; } - + // // NOTE: for twoway requests which are not sent, finished can be called twice: the // first time because the outgoing is in the _sendStreams set and the second time // because it's either in the _requests/_asyncRequests set. This is fine, only the // first call should be taken into account by the implementation of finished. // - + for(OutgoingMessage p : _sendStreams) { if(p.requestId > 0) @@ -1164,7 +1243,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne p.finished(_exception); } _sendStreams.clear(); - + for(IceInternal.Outgoing p : _requests.values()) { p.finished(_exception, true); @@ -1176,7 +1255,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne p.__finished(_exception, true); } _asyncRequests.clear(); - + // // This must be done last as this will cause waitUntilFinished() to return (and communicator // objects such as the timer might be destroyed too). @@ -1302,6 +1381,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _endpoint = endpoint; _adapter = adapter; final Ice.InitializationData initData = instance.initializationData(); + _dispatcher = initData.dispatcher; // Cached for better performance. _logger = initData.logger; // Cached for better performance. _traceLevels = instance.traceLevels(); // Cached for better performance. _timer = instance.timer(); @@ -2504,6 +2584,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private ObjectAdapter _adapter; private IceInternal.ServantManager _servantManager; + private final Dispatcher _dispatcher; private final Logger _logger; private final IceInternal.TraceLevels _traceLevels; private final IceInternal.ThreadPool _threadPool; |