diff options
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/IceInternal/Connection.java | 6 | ||||
-rw-r--r-- | java/src/IceInternal/EventHandler.java | 7 | ||||
-rw-r--r-- | java/src/IceInternal/IncomingConnectionFactory.java | 7 | ||||
-rw-r--r-- | java/src/IceInternal/TcpTransceiver.java | 99 | ||||
-rw-r--r-- | java/src/IceInternal/Transceiver.java | 1 | ||||
-rw-r--r-- | java/src/IceInternal/UdpTransceiver.java | 7 |
6 files changed, 124 insertions, 3 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index 0dc37aeb7d4..dba7087a0f8 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -288,6 +288,12 @@ public final class Connection extends EventHandler return true; } + public boolean + tryRead(BasicStream stream) + { + return _transceiver.tryRead(stream); + } + public void read(BasicStream stream) { diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java index 1ba30d7430b..26449c88450 100644 --- a/java/src/IceInternal/EventHandler.java +++ b/java/src/IceInternal/EventHandler.java @@ -24,6 +24,13 @@ abstract class EventHandler abstract boolean readable(); // + // Try to read data (non-blocking) via the event handler. Returns + // true if a subsequent call to read() is necessary, false otherwise. + // May only be called if readable() returns true. + // + abstract boolean tryRead(BasicStream is); + + // // Read data via the event handler. May only be called if // readable() returns true. // diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 79a4900df86..ba893366c20 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -79,6 +79,13 @@ public class IncomingConnectionFactory extends EventHandler return false; } + public boolean + tryRead(BasicStream unused) + { + assert(false); // Must not be called + return false; + } + public void read(BasicStream unused) { diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index 5689a5247ec..c90147072a1 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -109,6 +109,44 @@ final class TcpTransceiver implements Transceiver } } + public boolean + tryRead(BasicStream stream) + { + java.nio.ByteBuffer buf = stream.prepareRead(); + while (true) + { + try + { + int ret = _fd.read(buf); + + if (ret == -1) + { + throw new Ice.ConnectionLostException(); + } + + if (ret > 0 && _traceLevels.network >= 3) + { + String s = "received " + ret + " of " + buf.limit() + " bytes via tcp\n" + toString(); + _logger.trace(_traceLevels.networkCat, s); + } + + break; + } + catch (java.io.InterruptedIOException ex) + { + continue; + } + catch (java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + + return buf.hasRemaining(); + } + public void read(BasicStream stream, int timeout) { @@ -124,10 +162,52 @@ final class TcpTransceiver implements Transceiver throw new Ice.ConnectionLostException(); } - if (_traceLevels.network >= 3) + if (ret == 0) { - String s = "received " + ret + " of " + buf.limit() + - " bytes via tcp\n" + toString(); + // Copy fd, in case another thread calls close() + java.nio.channels.SocketChannel fd = _fd; + + if (_selector == null) + { + _selector = java.nio.channels.Selector.open(); + fd.register(_selector, java.nio.channels.SelectionKey.OP_READ, null); + } + + while (true) + { + try + { + int n; + if (timeout == 0) + { + n = _selector.selectNow(); + } + else if (timeout > 0) + { + n = _selector.select(timeout); + } + else + { + n = _selector.select(); + } + + if (n == 0 && timeout > 0) + { + throw new Ice.TimeoutException(); + } + + break; + } + catch (java.io.InterruptedIOException ex) + { + continue; + } + } + } + + if (ret > 0 && _traceLevels.network >= 3) + { + String s = "received " + ret + " of " + buf.limit() + " bytes via tcp\n" + toString(); _logger.trace(_traceLevels.networkCat, s); } } @@ -166,11 +246,24 @@ final class TcpTransceiver implements Transceiver throws Throwable { assert(_fd == null); + + if (_selector != null) + { + try + { + _selector.close(); + } + catch (java.io.IOException ex) + { + } + } + super.finalize(); } private Instance _instance; private java.nio.channels.SocketChannel _fd; + private java.nio.channels.Selector _selector; private TraceLevels _traceLevels; private Ice.Logger _logger; } diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java index 38d444ffa9f..45c3928cda3 100644 --- a/java/src/IceInternal/Transceiver.java +++ b/java/src/IceInternal/Transceiver.java @@ -16,6 +16,7 @@ public interface Transceiver void close(); void shutdown(); void write(BasicStream stream, int timeout); + boolean tryRead(BasicStream stream); void read(BasicStream stream, int timeout); String toString(); } diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index 7956b7eae1a..b68a3a9e1ef 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -81,6 +81,13 @@ final class UdpTransceiver implements Transceiver } } + public boolean + tryRead(BasicStream stream) + { + read(stream, 0); + return false; // Do not call read(). + } + public void read(BasicStream stream, int timeout) { |