summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
authorBenoit Foucher <benoit@zeroc.com>2009-12-01 14:02:05 +0100
committerBenoit Foucher <benoit@zeroc.com>2009-12-01 14:02:05 +0100
commitcd63c3e37dde04051fb2f4631f788bed7b48937b (patch)
treeae2c581bd228ec42c6eb46bf83a88f1174623bac /java/src
parentSLES RPM fixes (diff)
downloadice-cd63c3e37dde04051fb2f4631f788bed7b48937b.tar.bz2
ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.tar.xz
ice-cd63c3e37dde04051fb2f4631f788bed7b48937b.zip
Added support for Ice::Dispatcher
Diffstat (limited to 'java/src')
-rw-r--r--java/src/Ice/AsyncResult.java13
-rw-r--r--java/src/Ice/ConnectionI.java113
-rw-r--r--java/src/Ice/Dispatcher.java15
-rw-r--r--java/src/Ice/InitializationData.java5
-rw-r--r--java/src/IceInternal/ConnectRequestHandler.java20
-rw-r--r--java/src/IceInternal/DispatchWorkItem.java54
6 files changed, 185 insertions, 35 deletions
diff --git a/java/src/Ice/AsyncResult.java b/java/src/Ice/AsyncResult.java
index 67b3ecbe0b2..24b17fe10db 100644
--- a/java/src/Ice/AsyncResult.java
+++ b/java/src/Ice/AsyncResult.java
@@ -128,7 +128,8 @@ public class AsyncResult
}
if(_exception != null)
{
- throw (LocalException)_exception.fillInStackTrace(); // TODO: Correct?
+ //throw (LocalException)_exception.fillInStackTrace();
+ throw _exception;
}
return (_state & OK) > 0;
}
@@ -158,12 +159,11 @@ public class AsyncResult
//
try
{
- _instance.clientThreadPool().execute(new IceInternal.ThreadPoolWorkItem()
+ _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_instance)
{
public void
- execute(IceInternal.ThreadPoolCurrent current)
+ run()
{
- current.ioCompleted();
__exception(ex);
}
});
@@ -225,12 +225,11 @@ public class AsyncResult
//
try
{
- _instance.clientThreadPool().execute(new IceInternal.ThreadPoolWorkItem()
+ _instance.clientThreadPool().execute(new IceInternal.DispatchWorkItem(_instance)
{
public void
- execute(IceInternal.ThreadPoolCurrent current)
+ run()
{
- current.ioCompleted();
__sentInternal();
}
});
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index eac4cb11e3b..9153385567e 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -1022,7 +1022,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
if((current.operation & IceInternal.SocketOperation.Read) != 0)
{
- info = parseMessage(current.stream);
+ info = parseMessage(current.stream); // Optimization: use the thread's stream.
}
}
}
@@ -1066,10 +1066,55 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
{
_acmAbsoluteTimeoutMillis = IceInternal.Time.currentMonotonicTimeMillis() + _acmTimeout * 1000;
}
-
+
current.ioCompleted();
}
+
+ if(_dispatcher != null)
+ {
+ if(info != null)
+ {
+ //
+ // Create a new stream for the dispatch instead of using the thread
+ // pool's thread stream.
+ //
+ assert(info.stream == current.stream);
+ IceInternal.BasicStream stream = info.stream;
+ info.stream = new IceInternal.BasicStream(_instance);
+ info.stream.swap(stream);
+ }
+ final StartCallback finalStartCB = startCB;
+ final java.util.List<OutgoingMessage> finalSentCBs = sentCBs;
+ final MessageInfo finalInfo = info;
+ try
+ {
+ _dispatcher.dispatch(new Runnable()
+ {
+ public void
+ run()
+ {
+ dispatch(finalStartCB, finalSentCBs, finalInfo);
+ }
+ }, this);
+ }
+ catch(java.lang.Exception ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ warning("dispatch exception", ex);
+ }
+ }
+ }
+ else
+ {
+ dispatch(startCB, sentCBs, info);
+ }
+ }
+
+ protected void
+ dispatch(StartCallback startCB, java.util.List<OutgoingMessage> sentCBs, MessageInfo info)
+ {
//
// Notify the factory that the connection establishment and
// validation has completed.
@@ -1117,6 +1162,50 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
public void
finished(IceInternal.ThreadPoolCurrent current)
{
+ //
+ // Check if the connection needs to call user callbacks. If it doesn't, we
+ // can safely run finish() from this "IO thread". Otherwise, we either run
+ // finish() with the dispatcher if one is set, or we promote another IO
+ // thread first before calling finish().
+ //
+ if(_startCallback == null && _sendStreams.isEmpty() && _asyncRequests.isEmpty())
+ {
+ finish();
+ return;
+ }
+
+ if(_dispatcher == null)
+ {
+ current.ioCompleted();
+ finish();
+ }
+ else
+ {
+ try
+ {
+ _dispatcher.dispatch(new Runnable()
+ {
+ public void
+ run()
+ {
+ finish();
+ }
+ },
+ this);
+ }
+ catch(java.lang.Exception ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ warning("dispatch exception", ex);
+ }
+ }
+ }
+ }
+
+ public void
+ finish()
+ {
synchronized(this)
{
assert(_state == StateClosed);
@@ -1125,29 +1214,19 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
IceInternal.SocketOperation.Connect);
}
- //
- // If there are no callbacks to call, we don't call ioCompleted() since we're not going
- // to call code that will potentially block (this avoids promoting a new leader and
- // unecessary thread creation, especially if this is called on shutdown).
- //
- if(_startCallback != null || !_sendStreams.isEmpty() || !_asyncRequests.isEmpty())
- {
- current.ioCompleted();
- }
-
if(_startCallback != null)
{
_startCallback.connectionStartFailed(this, _exception);
_startCallback = null;
}
-
+
//
// NOTE: for twoway requests which are not sent, finished can be called twice: the
// first time because the outgoing is in the _sendStreams set and the second time
// because it's either in the _requests/_asyncRequests set. This is fine, only the
// first call should be taken into account by the implementation of finished.
//
-
+
for(OutgoingMessage p : _sendStreams)
{
if(p.requestId > 0)
@@ -1164,7 +1243,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
p.finished(_exception);
}
_sendStreams.clear();
-
+
for(IceInternal.Outgoing p : _requests.values())
{
p.finished(_exception, true);
@@ -1176,7 +1255,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
p.__finished(_exception, true);
}
_asyncRequests.clear();
-
+
//
// This must be done last as this will cause waitUntilFinished() to return (and communicator
// objects such as the timer might be destroyed too).
@@ -1302,6 +1381,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
_endpoint = endpoint;
_adapter = adapter;
final Ice.InitializationData initData = instance.initializationData();
+ _dispatcher = initData.dispatcher; // Cached for better performance.
_logger = initData.logger; // Cached for better performance.
_traceLevels = instance.traceLevels(); // Cached for better performance.
_timer = instance.timer();
@@ -2504,6 +2584,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
private ObjectAdapter _adapter;
private IceInternal.ServantManager _servantManager;
+ private final Dispatcher _dispatcher;
private final Logger _logger;
private final IceInternal.TraceLevels _traceLevels;
private final IceInternal.ThreadPool _threadPool;
diff --git a/java/src/Ice/Dispatcher.java b/java/src/Ice/Dispatcher.java
new file mode 100644
index 00000000000..d5ab29d8ebd
--- /dev/null
+++ b/java/src/Ice/Dispatcher.java
@@ -0,0 +1,15 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package Ice;
+
+public interface Dispatcher
+{
+ void dispatch(Runnable runnable, Ice.Connection con);
+}
diff --git a/java/src/Ice/InitializationData.java b/java/src/Ice/InitializationData.java
index 7c89724bac1..8e13fc2bf22 100644
--- a/java/src/Ice/InitializationData.java
+++ b/java/src/Ice/InitializationData.java
@@ -72,4 +72,9 @@ public final class InitializationData implements Cloneable
* The custom class loader for the communicator.
**/
public ClassLoader classLoader;
+
+ /**
+ * The call dispatcher for the communicator.
+ **/
+ public Dispatcher dispatcher;
}
diff --git a/java/src/IceInternal/ConnectRequestHandler.java b/java/src/IceInternal/ConnectRequestHandler.java
index 8bd1ca9f085..044e41c7008 100644
--- a/java/src/IceInternal/ConnectRequestHandler.java
+++ b/java/src/IceInternal/ConnectRequestHandler.java
@@ -295,12 +295,11 @@ public class ConnectRequestHandler
//
if(!_requests.isEmpty())
{
- _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance())
{
public void
- execute(ThreadPoolCurrent current)
+ run()
{
- current.ioCompleted();
flushRequestsWithException(ex);
};
});
@@ -448,12 +447,11 @@ public class ConnectRequestHandler
{
assert(_exception == null && !_requests.isEmpty());
_exception = ex.get();
- _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance())
{
public void
- execute(ThreadPoolCurrent current)
+ run()
{
- current.ioCompleted();
flushRequestsWithException(ex);
};
});
@@ -465,12 +463,11 @@ public class ConnectRequestHandler
{
assert(_exception == null && !_requests.isEmpty());
_exception = ex;
- _reference.getInstance().clientThreadPool().execute(new ThreadPoolWorkItem()
+ _reference.getInstance().clientThreadPool().execute(new DispatchWorkItem(_reference.getInstance())
{
public void
- execute(ThreadPoolCurrent current)
+ run()
{
- current.ioCompleted();
flushRequestsWithException(ex);
};
});
@@ -480,12 +477,11 @@ public class ConnectRequestHandler
if(!sentCallbacks.isEmpty())
{
final Instance instance = _reference.getInstance();
- instance.clientThreadPool().execute(new ThreadPoolWorkItem()
+ instance.clientThreadPool().execute(new DispatchWorkItem(instance)
{
public void
- execute(ThreadPoolCurrent current)
+ run()
{
- current.ioCompleted();
for(OutgoingAsyncMessageCallback callback : sentCallbacks)
{
callback.__sent();
diff --git a/java/src/IceInternal/DispatchWorkItem.java b/java/src/IceInternal/DispatchWorkItem.java
new file mode 100644
index 00000000000..bc71eff7c6a
--- /dev/null
+++ b/java/src/IceInternal/DispatchWorkItem.java
@@ -0,0 +1,54 @@
+// **********************************************************************
+//
+// Copyright (c) 2003-2009 ZeroC, Inc. All rights reserved.
+//
+// This copy of Ice is licensed to you under the terms described in the
+// ICE_LICENSE file included in this distribution.
+//
+// **********************************************************************
+
+package IceInternal;
+
+//
+// A helper class for thread pool work items that only need to call user
+// callbacks. If a dispatcher is installed with the communicator, the
+// thread pool work item is executed with the dispatcher, otherwise it's
+// executed by a thread pool thread (after promoting a follower thread).
+//
+abstract public class DispatchWorkItem implements ThreadPoolWorkItem, Runnable
+{
+ public DispatchWorkItem(Instance instance)
+ {
+ _instance = instance;
+ }
+
+ final public void execute(ThreadPoolCurrent current)
+ {
+ Ice.Dispatcher dispatcher = _instance.initializationData().dispatcher;
+ if(dispatcher != null)
+ {
+ try
+ {
+ dispatcher.dispatch(this, null);
+ }
+ catch(java.lang.Exception ex)
+ {
+ if(_instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.Warn.Dispatch", 1) > 1)
+ {
+ java.io.StringWriter sw = new java.io.StringWriter();
+ java.io.PrintWriter pw = new java.io.PrintWriter(sw);
+ ex.printStackTrace(pw);
+ pw.flush();
+ _instance.initializationData().logger.warning("dispatch exception:\n" + sw.toString());
+ }
+ }
+ }
+ else
+ {
+ current.ioCompleted(); // Promote a follower.
+ this.run();
+ }
+ }
+
+ private Instance _instance;
+}