summaryrefslogtreecommitdiff
path: root/java/src/Ice/ConnectionI.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/Ice/ConnectionI.java')
-rw-r--r--java/src/Ice/ConnectionI.java113
1 files changed, 97 insertions, 16 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index eac4cb11e3b..9153385567e 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -1022,7 +1022,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if((current.operation & IceInternal.SocketOperation.Read) != 0)
{
- info = parseMessage(current.stream);
+ info = parseMessage(current.stream); // Optimization: use the thread's stream.
}
}
}
@@ -1066,10 +1066,55 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
_acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
}
-
+
current.ioCompleted();
}
+
+ if(_dispatcher != null)
+ {
+ if(info != null)
+ {
+ //
+ // Create a new stream for the dispatch instead of using the thread
+ // pool's thread stream.
+ //
+ assert(info.stream == current.stream);
+ IceInternal.BasicStream stream = info.stream;
+ info.stream = new IceInternal.BasicStream(_instance);
+ info.stream.swap(stream);
+ }
+ final StartCallback finalStartCB = startCB;
+ final java.util.List<OutgoingMessage> finalSentCBs = sentCBs;
+ final MessageInfo finalInfo = info;
+ try
+ {
+ _dispatcher.dispatch(new Runnable()
+ {
+ public void
+ run()
+ {
+ dispatch(finalStartCB, finalSentCBs, finalInfo);
+ }
+ }, this);
+ }
+ catch(java.lang.Exception ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ warning("dispatch exception", ex);
+ }
+ }
+ }
+ else
+ {
+ dispatch(startCB, sentCBs, info);
+ }
+ }
+
+ protected void
+ dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
+ {
//
// Notify the factory that the connection establishment and
// validation has completed.
@@ -1117,6 +1162,50 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
finished(IceInternal.ThreadPoolCurrent current)
{
+ //
+ // Check if the connection needs to call user callbacks. If it doesn't, we
+ // can safely run finish() from this "IO thread". Otherwise, we either run
+ // finish() with the dispatcher if one is set, or we promote another IO
+ // thread first before calling finish().
+ //
+ if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty())
+ {
+ finish();
+ return;
+ }
+
+ if(_dispatcher == null)
+ {
+ current.ioCompleted();
+ finish();
+ }
+ else
+ {
+ try
+ {
+ _dispatcher.dispatch(new Runnable()
+ {
+ public void
+ run()
+ {
+ finish();
+ }
+ },
+ this);
+ }
+ catch(java.lang.Exception ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ warning("dispatch exception", ex);
+ }
+ }
+ }
+ }
+
+ public void
+ finish()
+ {
synchronized(this)
{
assert(_state == StateClosed);
@@ -1125,29 +1214,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.SocketOperation.Connect);
}
- //
- // If there are no callbacks to call, we don't call ioCompleted() since we're not going
- // to call code that will potentially block (this avoids promoting a new leader and
- // unecessary thread creation, especially if this is called on shutdown).
- //
- if(_startCallback != null || !_sendStreams.isEmpty() || !_asyncRequests.isEmpty())
- {
- current.ioCompleted();
- }
-
if(_startCallback != null)
{
_startCallback.connectionStartFailed(this, _exception);
_startCallback = null;
}
-
+
//
// NOTE: for twoway requests which are not sent, finished can be called twice: the
// first time because the outgoing is in the _sendStreams set and the second time
// because it's either in the _requests/_asyncRequests set. This is fine, only the
// first call should be taken into account by the implementation of finished.
//
-
+
for(OutgoingMessage p : _sendStreams)
{
if(p.requestId > 0)
@@ -1164,7 +1243,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
p.finished(_exception);
}
_sendStreams.clear();
-
+
for(IceInternal.Outgoing p : _requests.values())
{
p.finished(_exception, true);
@@ -1176,7 +1255,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
p.__finished(_exception, true);
}
_asyncRequests.clear();
-
+
//
// This must be done last as this will cause waitUntilFinished() to return (and communicator
// objects such as the timer might be destroyed too).
@@ -1302,6 +1381,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_endpoint = endpoint;
_adapter = adapter;
final Ice.InitializationData initData = instance.initializationData();
+ _dispatcher = initData.dispatcher; // Cached for better performance.
_logger = initData.logger; // Cached for better performance.
_traceLevels = instance.traceLevels(); // Cached for better performance.
_timer = instance.timer();
@@ -2504,6 +2584,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private ObjectAdapter _adapter;
private IceInternal.ServantManager _servantManager;
+ private final Dispatcher _dispatcher;
private final Logger _logger;
private final IceInternal.TraceLevels _traceLevels;
private final IceInternal.ThreadPool _threadPool;