summaryrefslogtreecommitdiff
path: root/java/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/src')
-rw-r--r--java/src/IceInternal/Connection.java6
-rw-r--r--java/src/IceInternal/EventHandler.java7
-rw-r--r--java/src/IceInternal/IncomingConnectionFactory.java7
-rw-r--r--java/src/IceInternal/TcpTransceiver.java99
-rw-r--r--java/src/IceInternal/Transceiver.java1
-rw-r--r--java/src/IceInternal/UdpTransceiver.java7
6 files changed, 124 insertions, 3 deletions
diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java
index 0dc37aeb7d4..dba7087a0f8 100644
--- a/java/src/IceInternal/Connection.java
+++ b/java/src/IceInternal/Connection.java
@@ -288,6 +288,12 @@ public final class Connection extends EventHandler
return true;
}
+ public boolean
+ tryRead(BasicStream stream)
+ {
+ return _transceiver.tryRead(stream);
+ }
+
public void
read(BasicStream stream)
{
diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java
index 1ba30d7430b..26449c88450 100644
--- a/java/src/IceInternal/EventHandler.java
+++ b/java/src/IceInternal/EventHandler.java
@@ -24,6 +24,13 @@ abstract class EventHandler
abstract boolean readable();
//
+ // Try to read data (non-blocking) via the event handler. Returns
+ // true if a subsequent call to read() is necessary, false otherwise.
+ // May only be called if readable() returns true.
+ //
+ abstract boolean tryRead(BasicStream is);
+
+ //
// Read data via the event handler. May only be called if
// readable() returns true.
//
diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java
index 79a4900df86..ba893366c20 100644
--- a/java/src/IceInternal/IncomingConnectionFactory.java
+++ b/java/src/IceInternal/IncomingConnectionFactory.java
@@ -79,6 +79,13 @@ public class IncomingConnectionFactory extends EventHandler
return false;
}
+ public boolean
+ tryRead(BasicStream unused)
+ {
+ assert(false); // Must not be called
+ return false;
+ }
+
public void
read(BasicStream unused)
{
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index 5689a5247ec..c90147072a1 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -109,6 +109,44 @@ final class TcpTransceiver implements Transceiver
}
}
+ public boolean
+ tryRead(BasicStream stream)
+ {
+ java.nio.ByteBuffer buf = stream.prepareRead();
+ while (true)
+ {
+ try
+ {
+ int ret = _fd.read(buf);
+
+ if (ret == -1)
+ {
+ throw new Ice.ConnectionLostException();
+ }
+
+ if (ret > 0 && _traceLevels.network >= 3)
+ {
+ String s = "received " + ret + " of " + buf.limit() + " bytes via tcp\n" + toString();
+ _logger.trace(_traceLevels.networkCat, s);
+ }
+
+ break;
+ }
+ catch (java.io.InterruptedIOException ex)
+ {
+ continue;
+ }
+ catch (java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+ }
+
+ return buf.hasRemaining();
+ }
+
public void
read(BasicStream stream, int timeout)
{
@@ -124,10 +162,52 @@ final class TcpTransceiver implements Transceiver
throw new Ice.ConnectionLostException();
}
- if (_traceLevels.network >= 3)
+ if (ret == 0)
{
- String s = "received " + ret + " of " + buf.limit() +
- " bytes via tcp\n" + toString();
+ // Copy fd, in case another thread calls close()
+ java.nio.channels.SocketChannel fd = _fd;
+
+ if (_selector == null)
+ {
+ _selector = java.nio.channels.Selector.open();
+ fd.register(_selector, java.nio.channels.SelectionKey.OP_READ, null);
+ }
+
+ while (true)
+ {
+ try
+ {
+ int n;
+ if (timeout == 0)
+ {
+ n = _selector.selectNow();
+ }
+ else if (timeout > 0)
+ {
+ n = _selector.select(timeout);
+ }
+ else
+ {
+ n = _selector.select();
+ }
+
+ if (n == 0 && timeout > 0)
+ {
+ throw new Ice.TimeoutException();
+ }
+
+ break;
+ }
+ catch (java.io.InterruptedIOException ex)
+ {
+ continue;
+ }
+ }
+ }
+
+ if (ret > 0 && _traceLevels.network >= 3)
+ {
+ String s = "received " + ret + " of " + buf.limit() + " bytes via tcp\n" + toString();
_logger.trace(_traceLevels.networkCat, s);
}
}
@@ -166,11 +246,24 @@ final class TcpTransceiver implements Transceiver
throws Throwable
{
assert(_fd == null);
+
+ if (_selector != null)
+ {
+ try
+ {
+ _selector.close();
+ }
+ catch (java.io.IOException ex)
+ {
+ }
+ }
+
super.finalize();
}
private Instance _instance;
private java.nio.channels.SocketChannel _fd;
+ private java.nio.channels.Selector _selector;
private TraceLevels _traceLevels;
private Ice.Logger _logger;
}
diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java
index 38d444ffa9f..45c3928cda3 100644
--- a/java/src/IceInternal/Transceiver.java
+++ b/java/src/IceInternal/Transceiver.java
@@ -16,6 +16,7 @@ public interface Transceiver
void close();
void shutdown();
void write(BasicStream stream, int timeout);
+ boolean tryRead(BasicStream stream);
void read(BasicStream stream, int timeout);
String toString();
}
diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java
index 7956b7eae1a..b68a3a9e1ef 100644
--- a/java/src/IceInternal/UdpTransceiver.java
+++ b/java/src/IceInternal/UdpTransceiver.java
@@ -81,6 +81,13 @@ final class UdpTransceiver implements Transceiver
}
}
+ public boolean
+ tryRead(BasicStream stream)
+ {
+ read(stream, 0);
+ return false; // Do not call read().
+ }
+
public void
read(BasicStream stream, int timeout)
{