summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--java/src/Ice/ConnectionI.java6
-rw-r--r--java/src/IceInternal/EventHandler.java5
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java3
-rw-r--r--java/src/IceInternal/TcpTransceiver.java4
-rw-r--r--java/src/IceInternal/ThreadPool.java296
-rw-r--r--java/src/IceInternal/Transceiver.java6
-rw-r--r--java/src/IceInternal/UdpTransceiver.java4
7 files changed, 185 insertions, 139 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java
index e2f92ca7845..53521ad6404 100644
--- a/java/src/Ice/ConnectionI.java
+++ b/java/src/Ice/ConnectionI.java
@@ -1070,15 +1070,15 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne
return true;
}
- public void
+ public boolean
read(IceInternal.BasicStream stream)
{
assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
- _transceiver.read(stream, 0);
+ return _transceiver.read(stream, 0);
//
- // Updating _acmAbsoluteTimeoutMillis is to expensive here,
+ // Updating _acmAbsoluteTimeoutMillis is too expensive here,
// because we would have to acquire a lock just for this
// purpose. Instead, we update _acmAbsoluteTimeoutMillis in
// message().
diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java
index 7775685f433..a942e79d7d9 100644
--- a/java/src/IceInternal/EventHandler.java
+++ b/java/src/IceInternal/EventHandler.java
@@ -25,7 +25,10 @@ public abstract class EventHandler
// Read data via the event handler. May only be called if
// readable() returns true.
//
- abstract public void read(BasicStream is);
+ // NOTE: In Java, read returns true if the handler has more data
+ // data available, and therefore read should be called again.
+ //
+ abstract public boolean read(BasicStream is);
//
// A complete message has been received.
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index df032a220da..86d67671eef 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -207,11 +207,12 @@ public final class IncomingConnectionFactory extends EventHandler
return false;
}
- public void
+ public boolean
read(BasicStream unused)
{
assert(!_instance.threadPerConnection()); // Only for use with a thread pool.
assert(false); // Must not be called.
+ return false;
}
public void
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index f435be5f76f..e2e7418d22f 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -215,7 +215,7 @@ final class TcpTransceiver implements Transceiver
}
}
- public void
+ public boolean
read(BasicStream stream, int timeout)
{
java.nio.ByteBuffer buf = stream.prepareRead();
@@ -307,6 +307,8 @@ final class TcpTransceiver implements Transceiver
throw se;
}
}
+
+ return false;
}
public String
diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java
index e0fb807d01a..5538bc9af22 100644
--- a/java/src/IceInternal/ThreadPool.java
+++ b/java/src/IceInternal/ThreadPool.java
@@ -449,7 +449,7 @@ public final class ThreadPool
}
}
- while(true)
+ while(true)
{
if(TRACE_REGISTRATION)
{
@@ -462,160 +462,175 @@ public final class ThreadPool
trace(" " + key.channel());
}
}
-
- select();
EventHandler handler = null;
+
+ //
+ // Only call select() if there are no pending handlers with additional data
+ // for us to read.
+ //
+ if(!_pendingHandlers.isEmpty())
+ {
+ handler = (EventHandler)_pendingHandlers.removeFirst();
+ }
+ else
+ {
+ select();
+ }
+
boolean finished = false;
boolean shutdown = false;
- synchronized(this)
+ if(handler == null)
{
- if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
- {
- if(TRACE_SELECT)
- {
- trace("timeout");
- }
-
- assert(_timeout > 0);
- _timeout = 0;
- shutdown = true;
- }
- else
+ synchronized(this)
{
- if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
+ if(_keys.size() == 0) // We initiate a shutdown if there is a thread pool timeout.
{
- if(TRACE_SELECT || TRACE_INTERRUPT)
+ if(TRACE_SELECT)
{
- trace("detected interrupt");
+ trace("timeout");
}
- //
- // There are two possiblities for an interrupt:
- //
- // 1. The thread pool has been destroyed.
- //
- // 2. An event handler was registered or unregistered.
- //
-
- //
- // Thread pool destroyed?
- //
- if(_destroyed)
+ assert(_timeout > 0);
+ _timeout = 0;
+ shutdown = true;
+ }
+ else
+ {
+ if(_keys.contains(_fdIntrReadKey) && _fdIntrReadKey.isReadable())
{
- if(TRACE_SHUTDOWN)
+ if(TRACE_SELECT || TRACE_INTERRUPT)
{
- trace("destroyed, thread id = " + Thread.currentThread());
+ trace("detected interrupt");
}
//
- // Don't clear the interrupt fd if
- // destroyed, so that the other threads
- // exit as well.
+ // There are two possiblities for an interrupt:
//
- return true;
- }
-
- //
- // Remove the interrupt channel from the
- // selected key set.
- //
- _keys.remove(_fdIntrReadKey);
-
- clearInterrupt();
-
- //
- // An event handler must have been registered
- // or unregistered.
- //
- assert(!_changes.isEmpty());
- FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
-
- if(change.handler != null) // Addition if handler is set.
- {
- int op;
- if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
- {
- op = java.nio.channels.SelectionKey.OP_READ;
- }
- else
+ // 1. The thread pool has been destroyed.
+ //
+ // 2. An event handler was registered or unregistered.
+ //
+
+ //
+ // Thread pool destroyed?
+ //
+ if(_destroyed)
{
- op = java.nio.channels.SelectionKey.OP_ACCEPT;
- }
+ if(TRACE_SHUTDOWN)
+ {
+ trace("destroyed, thread id = " + Thread.currentThread());
+ }
- java.nio.channels.SelectionKey key = null;
- try
- {
- key = change.fd.register(_selector, op, change.handler);
+ //
+ // Don't clear the interrupt fd if
+ // destroyed, so that the other threads
+ // exit as well.
+ //
+ return true;
}
- catch(java.nio.channels.ClosedChannelException ex)
+
+ //
+ // Remove the interrupt channel from the
+ // selected key set.
+ //
+ _keys.remove(_fdIntrReadKey);
+
+ clearInterrupt();
+
+ //
+ // An event handler must have been registered
+ // or unregistered.
+ //
+ assert(!_changes.isEmpty());
+ FdHandlerPair change = (FdHandlerPair)_changes.removeFirst();
+
+ if(change.handler != null) // Addition if handler is set.
{
- assert(false);
+ int op;
+ if((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0)
+ {
+ op = java.nio.channels.SelectionKey.OP_READ;
+ }
+ else
+ {
+ op = java.nio.channels.SelectionKey.OP_ACCEPT;
+ }
+
+ java.nio.channels.SelectionKey key = null;
+ try
+ {
+ key = change.fd.register(_selector, op, change.handler);
+ }
+ catch(java.nio.channels.ClosedChannelException ex)
+ {
+ assert(false);
+ }
+ _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ continue;
}
- _handlerMap.put(change.fd, new HandlerKeyPair(change.handler, key));
-
- if(TRACE_REGISTRATION)
+ else // Removal if handler is not set.
{
- trace("added handler (" + change.handler.getClass().getName() + ") for fd " +
- change.fd);
+ HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
+ assert(pair != null);
+ handler = pair.handler;
+ finished = true;
+ pair.key.cancel();
+
+ if(TRACE_REGISTRATION)
+ {
+ trace("removed handler (" + handler.getClass().getName() + ") for fd " +
+ change.fd);
+ }
+
+ // Don't continue; we have to call
+ // finished() on the event handler below,
+ // outside the thread synchronization.
}
-
- continue;
}
- else // Removal if handler is not set.
+ else
{
- HandlerKeyPair pair = (HandlerKeyPair)_handlerMap.remove(change.fd);
- assert(pair != null);
- handler = pair.handler;
- finished = true;
- pair.key.cancel();
-
- if(TRACE_REGISTRATION)
+ java.nio.channels.SelectionKey key = null;
+ java.util.Iterator iter = _keys.iterator();
+ while(iter.hasNext())
{
- trace("removed handler (" + handler.getClass().getName() + ") for fd " +
- change.fd);
+ //
+ // Ignore selection keys that have been cancelled
+ //
+ java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next();
+ iter.remove();
+ if(k.isValid() && key != _fdIntrReadKey)
+ {
+ if(TRACE_SELECT)
+ {
+ trace("found a key: " + keyToString(k));
+ }
+
+ key = k;
+ break;
+ }
}
-
- // Don't continue; we have to call
- // finished() on the event handler below,
- // outside the thread synchronization.
- }
- }
- else
- {
- java.nio.channels.SelectionKey key = null;
- java.util.Iterator iter = _keys.iterator();
- while(iter.hasNext())
- {
- //
- // Ignore selection keys that have been cancelled
- //
- java.nio.channels.SelectionKey k = (java.nio.channels.SelectionKey)iter.next();
- iter.remove();
- if(k.isValid() && key != _fdIntrReadKey)
+
+ if(key == null)
{
if(TRACE_SELECT)
{
- trace("found a key: " + keyToString(k));
+ trace("didn't find a valid key");
}
- key = k;
- break;
- }
- }
-
- if(key == null)
- {
- if(TRACE_SELECT)
- {
- trace("didn't find a valid key");
+ continue;
}
- continue;
+ handler = (EventHandler)key.attachment();
}
-
- handler = (EventHandler)key.attachment();
}
}
}
@@ -660,7 +675,7 @@ public final class ThreadPool
if(finished)
{
//
- // Notify a handler about it's removal from
+ // Notify a handler about its removal from
// the thread pool.
//
try
@@ -697,7 +712,14 @@ public final class ThreadPool
{
try
{
- read(handler);
+ //
+ // If read returns true, the handler has more data for the thread pool
+ // to process.
+ //
+ if(read(handler))
+ {
+ _pendingHandlers.add(handler);
+ }
}
catch(Ice.TimeoutException ex) // Expected.
{
@@ -705,7 +727,7 @@ public final class ThreadPool
}
catch(Ice.DatagramLimitException ex) // Expected.
{
- continue;
+ continue;
}
catch(Ice.SocketException ex)
{
@@ -721,12 +743,12 @@ public final class ThreadPool
}
catch(Ice.LocalException ex)
{
- if(handler.datagram())
+ if(handler.datagram())
{
if(_instance.initializationData().properties.getPropertyAsInt(
- "Ice.Warn.Connections") > 0)
+ "Ice.Warn.Connections") > 0)
{
- _instance.initializationData().logger.warning(
+ _instance.initializationData().logger.warning(
"datagram connection exception:\n" + ex + "\n" + handler.toString());
}
}
@@ -734,9 +756,9 @@ public final class ThreadPool
{
if(TRACE_EXCEPTION)
{
- trace("informing handler (" + handler.getClass().getName() +
+ trace("informing handler (" + handler.getClass().getName() +
") about exception " + ex);
- ex.printStackTrace();
+ ex.printStackTrace();
}
handler.exception(ex);
@@ -749,7 +771,7 @@ public final class ThreadPool
}
//
- // Provide a new mesage to the handler.
+ // Provide a new message to the handler.
//
try
{
@@ -885,9 +907,11 @@ public final class ThreadPool
}
}
- private void
+ private boolean
read(EventHandler handler)
{
+ boolean moreData = false;
+
BasicStream stream = handler._stream;
if(stream.size() == 0)
@@ -898,7 +922,7 @@ public final class ThreadPool
if(stream.pos() != stream.size())
{
- handler.read(stream);
+ moreData = handler.read(stream);
assert(stream.pos() == stream.size());
}
@@ -980,10 +1004,12 @@ public final class ThreadPool
}
else
{
- handler.read(stream);
+ moreData = handler.read(stream);
assert(stream.pos() == stream.size());
}
}
+
+ return moreData;
}
/*
@@ -1184,6 +1210,14 @@ public final class ThreadPool
private int _timeout;
+ //
+ // Since the Java5 SSL transceiver can read more data from the socket than is
+ // actually requested, we have to keep a separate list of handlers that need
+ // the thread pool to read more data before it re-enters a blocking call to
+ // select().
+ //
+ private java.util.LinkedList _pendingHandlers = new java.util.LinkedList();
+
private final class EventHandlerThread extends Thread
{
EventHandlerThread(String name)
diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java
index b23b6e24240..4934b6cdfe0 100644
--- a/java/src/IceInternal/Transceiver.java
+++ b/java/src/IceInternal/Transceiver.java
@@ -16,7 +16,11 @@ public interface Transceiver
void shutdownWrite();
void shutdownReadWrite();
void write(BasicStream stream, int timeout);
- void read(BasicStream stream, int timeout);
+ //
+ // NOTE: In Java, read() returns a boolean to indicate whether the transceiver
+ // has read more data than requested.
+ //
+ boolean read(BasicStream stream, int timeout);
String type();
String toString();
}
diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java
index 9c324fc8c90..335a23329ec 100644
--- a/java/src/IceInternal/UdpTransceiver.java
+++ b/java/src/IceInternal/UdpTransceiver.java
@@ -118,7 +118,7 @@ final class UdpTransceiver implements Transceiver
}
}
- public void
+ public boolean
read(BasicStream stream, int timeout) // NOTE: timeout is not used
{
assert(stream.pos() == 0);
@@ -218,6 +218,8 @@ final class UdpTransceiver implements Transceiver
stream.resize(ret, true);
stream.pos(ret);
+
+ return false;
}
public String