summaryrefslogtreecommitdiff
path: root/java/src/IceInternal/TcpTransceiver.java
diff options
context:
space:
mode:
Diffstat (limited to 'java/src/IceInternal/TcpTransceiver.java')
-rw-r--r--java/src/IceInternal/TcpTransceiver.java234
1 files changed, 149 insertions, 85 deletions
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java
index f07bff3a62c..533b91adfc7 100644
--- a/java/src/IceInternal/TcpTransceiver.java
+++ b/java/src/IceInternal/TcpTransceiver.java
@@ -18,6 +18,29 @@ final class TcpTransceiver implements Transceiver
return _fd;
}
+ public SocketStatus
+ initialize(int timeout)
+ {
+ if(_state == StateNeedConnect && timeout == 0)
+ {
+ _state = StateConnectPending;
+ return SocketStatus.NeedConnect;
+ }
+ else if(_state <= StateConnectPending)
+ {
+ Network.doFinishConnect(_fd, timeout);
+ _state = StateConnected;
+ _desc = Network.fdToString(_fd);
+ if(_traceLevels.network >= 1)
+ {
+ String s = "tcp connection established\n" + _desc;
+ _logger.trace(_traceLevels.networkCat, s);
+ }
+ }
+ assert(_state == StateConnected);
+ return SocketStatus.Finished;
+ }
+
public void
close()
{
@@ -74,6 +97,11 @@ final class TcpTransceiver implements Transceiver
public void
shutdownWrite()
{
+ if(_state < StateConnected)
+ {
+ return;
+ }
+
if(_traceLevels.network >= 2)
{
String s = "shutting down tcp connection for writing\n" + toString();
@@ -110,6 +138,11 @@ final class TcpTransceiver implements Transceiver
public void
shutdownReadWrite()
{
+ if(_state < StateConnected)
+ {
+ return;
+ }
+
if(_traceLevels.network >= 2)
{
String s = "shutting down tcp connection for reading and writing\n" + toString();
@@ -144,125 +177,78 @@ final class TcpTransceiver implements Transceiver
}
}
- public void
- write(BasicStream stream, int timeout)
+ public boolean
+ write(Buffer buf, int timeout)
throws LocalExceptionWrapper
{
- java.nio.ByteBuffer buf = stream.prepareWrite();
- int size = buf.limit();
- int packetSize = 0;
- if(_maxPacketSize > 0 && size > _maxPacketSize)
+ while(writeBuffer(buf.b))
{
- packetSize = _maxPacketSize;
- buf.limit(buf.position() + packetSize);
- }
+ //
+ // There is more data to write but the socket would block; now we
+ // must deal with timeouts.
+ //
+ assert(buf.b.hasRemaining());
- while(buf.hasRemaining())
- {
+ if(timeout == 0)
+ {
+ return false;
+ }
+
try
{
- assert(_fd != null);
- int ret = _fd.write(buf);
-
- if(ret == -1)
+ if(_writeSelector == null)
{
- throw new Ice.ConnectionLostException();
+ _writeSelector = java.nio.channels.Selector.open();
+ _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null);
}
- if(ret == 0)
+ try
{
- if(timeout == 0)
+ if(timeout > 0)
{
- throw new Ice.TimeoutException();
- }
-
- if(_writeSelector == null)
- {
- _writeSelector = java.nio.channels.Selector.open();
- _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null);
- }
-
- try
- {
- if(timeout > 0)
- {
- long start = IceInternal.Time.currentMonotonicTimeMillis();
- int n = _writeSelector.select(timeout);
- if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout)
- {
- throw new Ice.TimeoutException();
- }
- }
- else
+ long start = IceInternal.Time.currentMonotonicTimeMillis();
+ int n = _writeSelector.select(timeout);
+ if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout)
{
- _writeSelector.select();
+ throw new Ice.TimeoutException();
}
}
- catch(java.io.InterruptedIOException ex)
+ else
{
- // Ignore.
+ _writeSelector.select();
}
-
- continue;
}
-
-
- if(_traceLevels.network >= 3)
- {
- String s = "sent " + ret + " of " + buf.limit() + " bytes via tcp\n" + toString();
- _logger.trace(_traceLevels.networkCat, s);
- }
-
- if(_stats != null)
- {
- _stats.bytesSent(type(), ret);
- }
-
- if(packetSize > 0)
+ catch(java.io.InterruptedIOException ex)
{
- assert(buf.position() == buf.limit());
- int position = buf.position();
- if(size - position > packetSize)
- {
- buf.limit(position + packetSize);
- }
- else
- {
- packetSize = 0;
- buf.limit(size);
- }
+ // Ignore.
}
}
- catch(java.io.InterruptedIOException ex)
- {
- continue;
- }
catch(java.io.IOException ex)
{
Ice.SocketException se = new Ice.SocketException();
se.initCause(ex);
throw se;
}
- }
+ }
+ return true;
}
public boolean
- read(BasicStream stream, int timeout)
+ read(Buffer buf, int timeout, Ice.BooleanHolder moreData)
{
- java.nio.ByteBuffer buf = stream.prepareRead();
-
int remaining = 0;
if(_traceLevels.network >= 3)
{
- remaining = buf.remaining();
+ remaining = buf.b.remaining();
}
+ moreData.value = false;
- while(buf.hasRemaining())
+ while(buf.b.hasRemaining())
{
try
{
assert(_fd != null);
- int ret = _fd.read(buf);
+ int ret = _fd.read(buf.b);
if(ret == -1)
{
@@ -273,7 +259,7 @@ final class TcpTransceiver implements Transceiver
{
if(timeout == 0)
{
- throw new Ice.TimeoutException();
+ return false;
}
if(_readSelector == null)
@@ -339,7 +325,7 @@ final class TcpTransceiver implements Transceiver
}
}
- return false;
+ return true;
}
public String
@@ -355,9 +341,9 @@ final class TcpTransceiver implements Transceiver
}
public void
- checkSendSize(BasicStream stream, int messageSizeMax)
+ checkSendSize(Buffer buf, int messageSizeMax)
{
- if(stream.size() > messageSizeMax)
+ if(buf.size() > messageSizeMax)
{
throw new Ice.MemoryLimitException();
}
@@ -366,12 +352,13 @@ final class TcpTransceiver implements Transceiver
//
// Only for use by TcpConnector, TcpAcceptor
//
- TcpTransceiver(Instance instance, java.nio.channels.SocketChannel fd)
+ TcpTransceiver(Instance instance, java.nio.channels.SocketChannel fd, boolean connected)
{
_fd = fd;
_traceLevels = instance.traceLevels();
_logger = instance.initializationData().logger;
_stats = instance.initializationData().stats;
+ _state = connected ? StateConnected : StateNeedConnect;
_desc = Network.fdToString(_fd);
_maxPacketSize = 0;
@@ -399,12 +386,89 @@ final class TcpTransceiver implements Transceiver
super.finalize();
}
+ private boolean
+ writeBuffer(java.nio.ByteBuffer buf)
+ {
+ final int size = buf.limit();
+ int packetSize = size - buf.position();
+ if(_maxPacketSize > 0 && packetSize > _maxPacketSize)
+ {
+ packetSize = _maxPacketSize;
+ buf.limit(buf.position() + packetSize);
+ }
+
+ while(buf.hasRemaining())
+ {
+ try
+ {
+ assert(_fd != null);
+ int ret = _fd.write(buf);
+
+ if(ret == -1)
+ {
+ throw new Ice.ConnectionLostException();
+ }
+ else if(ret == 0)
+ {
+ //
+ // Writing would block, so we reset the limit (if necessary) and return true to indicate
+ // that more data must be sent.
+ //
+ if(packetSize == _maxPacketSize)
+ {
+ buf.limit(size);
+ }
+ return true;
+ }
+
+ if(_traceLevels.network >= 3)
+ {
+ String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString();
+ _logger.trace(_traceLevels.networkCat, s);
+ }
+
+ if(_stats != null)
+ {
+ _stats.bytesSent(type(), ret);
+ }
+
+ if(packetSize == _maxPacketSize)
+ {
+ assert(buf.position() == buf.limit());
+ packetSize = size - buf.position();
+ if(packetSize > _maxPacketSize)
+ {
+ packetSize = _maxPacketSize;
+ }
+ buf.limit(buf.position() + packetSize);
+ }
+ }
+ catch(java.io.InterruptedIOException ex)
+ {
+ continue;
+ }
+ catch(java.io.IOException ex)
+ {
+ Ice.SocketException se = new Ice.SocketException();
+ se.initCause(ex);
+ throw se;
+ }
+ }
+
+ return false; // No more data to send.
+ }
+
private java.nio.channels.SocketChannel _fd;
private TraceLevels _traceLevels;
private Ice.Logger _logger;
private Ice.Stats _stats;
private String _desc;
+ private int _state;
private java.nio.channels.Selector _readSelector;
private java.nio.channels.Selector _writeSelector;
private int _maxPacketSize;
+
+ private static final int StateNeedConnect = 0;
+ private static final int StateConnectPending = 1;
+ private static final int StateConnected = 2;
}