diff options
author | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2007-11-27 11:58:35 +0100 |
commit | 47f800495093fd7679a315e2d730fea22f6135b7 (patch) | |
tree | a7b8d3488f3841367dd03d10cae293f36fd10481 /java/src/IceInternal/TcpTransceiver.java | |
parent | Fixed SystemException to no longer derive from LocalException (diff) | |
download | ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.bz2 ice-47f800495093fd7679a315e2d730fea22f6135b7.tar.xz ice-47f800495093fd7679a315e2d730fea22f6135b7.zip |
- Added support for non-blocking AMI/batch requests, connection
creation.
- Added support for AMI oneway requests.
- Changed collocation optimization to not perform any DNS lookups.
Diffstat (limited to 'java/src/IceInternal/TcpTransceiver.java')
-rw-r--r-- | java/src/IceInternal/TcpTransceiver.java | 234 |
1 files changed, 149 insertions, 85 deletions
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index f07bff3a62c..533b91adfc7 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -18,6 +18,29 @@ final class TcpTransceiver implements Transceiver return _fd; } + public SocketStatus + initialize(int timeout) + { + if(_state == StateNeedConnect && timeout == 0) + { + _state = StateConnectPending; + return SocketStatus.NeedConnect; + } + else if(_state <= StateConnectPending) + { + Network.doFinishConnect(_fd, timeout); + _state = StateConnected; + _desc = Network.fdToString(_fd); + if(_traceLevels.network >= 1) + { + String s = "tcp connection established\n" + _desc; + _logger.trace(_traceLevels.networkCat, s); + } + } + assert(_state == StateConnected); + return SocketStatus.Finished; + } + public void close() { @@ -74,6 +97,11 @@ final class TcpTransceiver implements Transceiver public void shutdownWrite() { + if(_state < StateConnected) + { + return; + } + if(_traceLevels.network >= 2) { String s = "shutting down tcp connection for writing\n" + toString(); @@ -110,6 +138,11 @@ final class TcpTransceiver implements Transceiver public void shutdownReadWrite() { + if(_state < StateConnected) + { + return; + } + if(_traceLevels.network >= 2) { String s = "shutting down tcp connection for reading and writing\n" + toString(); @@ -144,125 +177,78 @@ final class TcpTransceiver implements Transceiver } } - public void - write(BasicStream stream, int timeout) + public boolean + write(Buffer buf, int timeout) throws LocalExceptionWrapper { - java.nio.ByteBuffer buf = stream.prepareWrite(); - int size = buf.limit(); - int packetSize = 0; - if(_maxPacketSize > 0 && size > _maxPacketSize) + while(writeBuffer(buf.b)) { - packetSize = _maxPacketSize; - buf.limit(buf.position() + packetSize); - } + // + // There is more data to write but the socket would block; now we + // must deal with timeouts. + // + assert(buf.b.hasRemaining()); - while(buf.hasRemaining()) - { + if(timeout == 0) + { + return false; + } + try { - assert(_fd != null); - int ret = _fd.write(buf); - - if(ret == -1) + if(_writeSelector == null) { - throw new Ice.ConnectionLostException(); + _writeSelector = java.nio.channels.Selector.open(); + _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null); } - if(ret == 0) + try { - if(timeout == 0) + if(timeout > 0) { - throw new Ice.TimeoutException(); - } - - if(_writeSelector == null) - { - _writeSelector = java.nio.channels.Selector.open(); - _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null); - } - - try - { - if(timeout > 0) - { - long start = IceInternal.Time.currentMonotonicTimeMillis(); - int n = _writeSelector.select(timeout); - if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout) - { - throw new Ice.TimeoutException(); - } - } - else + long start = IceInternal.Time.currentMonotonicTimeMillis(); + int n = _writeSelector.select(timeout); + if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout) { - _writeSelector.select(); + throw new Ice.TimeoutException(); } } - catch(java.io.InterruptedIOException ex) + else { - // Ignore. + _writeSelector.select(); } - - continue; } - - - if(_traceLevels.network >= 3) - { - String s = "sent " + ret + " of " + buf.limit() + " bytes via tcp\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - - if(_stats != null) - { - _stats.bytesSent(type(), ret); - } - - if(packetSize > 0) + catch(java.io.InterruptedIOException ex) { - assert(buf.position() == buf.limit()); - int position = buf.position(); - if(size - position > packetSize) - { - buf.limit(position + packetSize); - } - else - { - packetSize = 0; - buf.limit(size); - } + // Ignore. } } - catch(java.io.InterruptedIOException ex) - { - continue; - } catch(java.io.IOException ex) { Ice.SocketException se = new Ice.SocketException(); se.initCause(ex); throw se; } - } + } + return true; } public boolean - read(BasicStream stream, int timeout) + read(Buffer buf, int timeout, Ice.BooleanHolder moreData) { - java.nio.ByteBuffer buf = stream.prepareRead(); - int remaining = 0; if(_traceLevels.network >= 3) { - remaining = buf.remaining(); + remaining = buf.b.remaining(); } + moreData.value = false; - while(buf.hasRemaining()) + while(buf.b.hasRemaining()) { try { assert(_fd != null); - int ret = _fd.read(buf); + int ret = _fd.read(buf.b); if(ret == -1) { @@ -273,7 +259,7 @@ final class TcpTransceiver implements Transceiver { if(timeout == 0) { - throw new Ice.TimeoutException(); + return false; } if(_readSelector == null) @@ -339,7 +325,7 @@ final class TcpTransceiver implements Transceiver } } - return false; + return true; } public String @@ -355,9 +341,9 @@ final class TcpTransceiver implements Transceiver } public void - checkSendSize(BasicStream stream, int messageSizeMax) + checkSendSize(Buffer buf, int messageSizeMax) { - if(stream.size() > messageSizeMax) + if(buf.size() > messageSizeMax) { throw new Ice.MemoryLimitException(); } @@ -366,12 +352,13 @@ final class TcpTransceiver implements Transceiver // // Only for use by TcpConnector, TcpAcceptor // - TcpTransceiver(Instance instance, java.nio.channels.SocketChannel fd) + TcpTransceiver(Instance instance, java.nio.channels.SocketChannel fd, boolean connected) { _fd = fd; _traceLevels = instance.traceLevels(); _logger = instance.initializationData().logger; _stats = instance.initializationData().stats; + _state = connected ? StateConnected : StateNeedConnect; _desc = Network.fdToString(_fd); _maxPacketSize = 0; @@ -399,12 +386,89 @@ final class TcpTransceiver implements Transceiver super.finalize(); } + private boolean + writeBuffer(java.nio.ByteBuffer buf) + { + final int size = buf.limit(); + int packetSize = size - buf.position(); + if(_maxPacketSize > 0 && packetSize > _maxPacketSize) + { + packetSize = _maxPacketSize; + buf.limit(buf.position() + packetSize); + } + + while(buf.hasRemaining()) + { + try + { + assert(_fd != null); + int ret = _fd.write(buf); + + if(ret == -1) + { + throw new Ice.ConnectionLostException(); + } + else if(ret == 0) + { + // + // Writing would block, so we reset the limit (if necessary) and return true to indicate + // that more data must be sent. + // + if(packetSize == _maxPacketSize) + { + buf.limit(size); + } + return true; + } + + if(_traceLevels.network >= 3) + { + String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString(); + _logger.trace(_traceLevels.networkCat, s); + } + + if(_stats != null) + { + _stats.bytesSent(type(), ret); + } + + if(packetSize == _maxPacketSize) + { + assert(buf.position() == buf.limit()); + packetSize = size - buf.position(); + if(packetSize > _maxPacketSize) + { + packetSize = _maxPacketSize; + } + buf.limit(buf.position() + packetSize); + } + } + catch(java.io.InterruptedIOException ex) + { + continue; + } + catch(java.io.IOException ex) + { + Ice.SocketException se = new Ice.SocketException(); + se.initCause(ex); + throw se; + } + } + + return false; // No more data to send. + } + private java.nio.channels.SocketChannel _fd; private TraceLevels _traceLevels; private Ice.Logger _logger; private Ice.Stats _stats; private String _desc; + private int _state; private java.nio.channels.Selector _readSelector; private java.nio.channels.Selector _writeSelector; private int _maxPacketSize; + + private static final int StateNeedConnect = 0; + private static final int StateConnectPending = 1; + private static final int StateConnected = 2; } |