diff options
author | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2008-03-06 10:13:42 +0100 |
commit | c6dbd090d9691cc0116a2967b2827b858b184dfe (patch) | |
tree | 6d2ad80c98665c9090b16f97c400ab4b33c7ab73 /java/src/IceInternal/TcpTransceiver.java | |
parent | Merge branch 'master' of ssh://cvs.zeroc.com/home/git/ice (diff) | |
download | ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.bz2 ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.tar.xz ice-c6dbd090d9691cc0116a2967b2827b858b184dfe.zip |
Removed thread-per-connection and added serialize mode
Diffstat (limited to 'java/src/IceInternal/TcpTransceiver.java')
-rw-r--r-- | java/src/IceInternal/TcpTransceiver.java | 312 |
1 files changed, 56 insertions, 256 deletions
diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index 80730ce6db0..548f2c3a94b 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -19,9 +19,9 @@ final class TcpTransceiver implements Transceiver } public SocketStatus - initialize(int timeout) + initialize() { - if(_state == StateNeedConnect && timeout == 0) + if(_state == StateNeedConnect) { _state = StateConnectPending; return SocketStatus.NeedConnect; @@ -30,7 +30,7 @@ final class TcpTransceiver implements Transceiver { try { - Network.doFinishConnect(_fd, timeout); + Network.doFinishConnect(_fd); _state = StateConnected; _desc = Network.fdToString(_fd); } @@ -63,82 +63,10 @@ final class TcpTransceiver implements Transceiver _logger.trace(_traceLevels.networkCat, s); } - synchronized(this) - { - assert(_fd != null); - if(_readSelector != null) - { - try - { - _readSelector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - _readSelector = null; - } - if(_writeSelector != null) - { - try - { - _writeSelector.close(); - } - catch(java.io.IOException ex) - { - // Ignore. - } - _writeSelector = null; - } - try - { - _fd.close(); - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - finally - { - _fd = null; - } - } - } - - public void - shutdownWrite() - { - if(_state < StateConnected) - { - return; - } - - if(_traceLevels.network >= 2) - { - String s = "shutting down tcp connection for writing\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - assert(_fd != null); - java.net.Socket socket = _fd.socket(); try { - socket.shutdownOutput(); // Shutdown socket for writing - } - catch(java.net.SocketException ex) - { - // - // Ignore. We can't reliably figure out if the socket - // exception is because the socket is not connected. - // - // if(!Network.notConnected(ex)) - // { - // Ice.SocketException se = new Ice.SocketException(); - // se.initCause(ex); - // throw se; - // } + _fd.close(); } catch(java.io.IOException ex) { @@ -146,107 +74,85 @@ final class TcpTransceiver implements Transceiver se.initCause(ex); throw se; } - } - - public void - shutdownReadWrite() - { - if(_state < StateConnected) - { - return; - } - - if(_traceLevels.network >= 2) - { - String s = "shutting down tcp connection for reading and writing\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - - assert(_fd != null); - java.net.Socket socket = _fd.socket(); - try + finally { - socket.shutdownInput(); // Shutdown socket for reading - socket.shutdownOutput(); // Shutdown socket for writing - } - catch(java.net.SocketException ex) - { - // - // Ignore. We can't reliably figure out if the socket - // exception is because the socket is not connected. - // - // if(!Network.notConnected(ex)) - // { - // Ice.SocketException se = new Ice.SocketException(); - // se.initCause(ex); - // throw se; - // } - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; + _fd = null; } } - + public boolean - write(Buffer buf, int timeout) + write(Buffer buf) { - while(writeBuffer(buf.b)) + final int size = buf.b.limit(); + int packetSize = size - buf.b.position(); + if(_maxPacketSize > 0 && packetSize > _maxPacketSize) { - // - // There is more data to write but the socket would block; now we - // must deal with timeouts. - // - assert(buf.b.hasRemaining()); + packetSize = _maxPacketSize; + buf.b.limit(buf.b.position() + packetSize); + } - if(timeout == 0) - { - return false; - } - + while(buf.b.hasRemaining()) + { try { - if(_writeSelector == null) + assert(_fd != null); + int ret = _fd.write(buf.b); + + if(ret == -1) { - _writeSelector = java.nio.channels.Selector.open(); - _fd.register(_writeSelector, java.nio.channels.SelectionKey.OP_WRITE, null); + throw new Ice.ConnectionLostException(); } - - try + else if(ret == 0) { - if(timeout > 0) - { - long start = IceInternal.Time.currentMonotonicTimeMillis(); - int n = _writeSelector.select(timeout); - if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout) - { - throw new Ice.TimeoutException(); - } - } - else + // + // Writing would block, so we reset the limit (if necessary) and return true to indicate + // that more data must be sent. + // + if(packetSize == _maxPacketSize) { - _writeSelector.select(); + buf.b.limit(size); } + return false; + } + + if(_traceLevels.network >= 3) + { + String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString(); + _logger.trace(_traceLevels.networkCat, s); } - catch(java.io.InterruptedIOException ex) + + if(_stats != null) { - // Ignore. + _stats.bytesSent(type(), ret); + } + + if(packetSize == _maxPacketSize) + { + assert(buf.b.position() == buf.b.limit()); + packetSize = size - buf.b.position(); + if(packetSize > _maxPacketSize) + { + packetSize = _maxPacketSize; + } + buf.b.limit(buf.b.position() + packetSize); } } + catch(java.io.InterruptedIOException ex) + { + continue; + } catch(java.io.IOException ex) { Ice.SocketException se = new Ice.SocketException(); se.initCause(ex); throw se; } - } + } return true; } public boolean - read(Buffer buf, int timeout, Ice.BooleanHolder moreData) + read(Buffer buf, Ice.BooleanHolder moreData) { int remaining = 0; if(_traceLevels.network >= 3) @@ -269,39 +175,7 @@ final class TcpTransceiver implements Transceiver if(ret == 0) { - if(timeout == 0) - { - return false; - } - - if(_readSelector == null) - { - _readSelector = java.nio.channels.Selector.open(); - _fd.register(_readSelector, java.nio.channels.SelectionKey.OP_READ, null); - } - - try - { - if(timeout > 0) - { - long start = IceInternal.Time.currentMonotonicTimeMillis(); - int n = _readSelector.select(timeout); - if(n == 0 && IceInternal.Time.currentMonotonicTimeMillis() >= start + timeout) - { - throw new Ice.TimeoutException(); - } - } - else - { - _readSelector.select(); - } - } - catch(java.io.InterruptedIOException ex) - { - // Ignore. - } - - continue; + return false; } if(ret > 0) @@ -398,86 +272,12 @@ final class TcpTransceiver implements Transceiver super.finalize(); } - private boolean - writeBuffer(java.nio.ByteBuffer buf) - { - final int size = buf.limit(); - int packetSize = size - buf.position(); - if(_maxPacketSize > 0 && packetSize > _maxPacketSize) - { - packetSize = _maxPacketSize; - buf.limit(buf.position() + packetSize); - } - - while(buf.hasRemaining()) - { - try - { - assert(_fd != null); - int ret = _fd.write(buf); - - if(ret == -1) - { - throw new Ice.ConnectionLostException(); - } - else if(ret == 0) - { - // - // Writing would block, so we reset the limit (if necessary) and return true to indicate - // that more data must be sent. - // - if(packetSize == _maxPacketSize) - { - buf.limit(size); - } - return true; - } - - if(_traceLevels.network >= 3) - { - String s = "sent " + ret + " of " + size + " bytes via tcp\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - - if(_stats != null) - { - _stats.bytesSent(type(), ret); - } - - if(packetSize == _maxPacketSize) - { - assert(buf.position() == buf.limit()); - packetSize = size - buf.position(); - if(packetSize > _maxPacketSize) - { - packetSize = _maxPacketSize; - } - buf.limit(buf.position() + packetSize); - } - } - catch(java.io.InterruptedIOException ex) - { - continue; - } - catch(java.io.IOException ex) - { - Ice.SocketException se = new Ice.SocketException(); - se.initCause(ex); - throw se; - } - } - - return false; // No more data to send. - } - private java.nio.channels.SocketChannel _fd; private TraceLevels _traceLevels; private Ice.Logger _logger; private Ice.Stats _stats; private String _desc; private int _state; - private java.nio.channels.Selector _readSelector; - private java.nio.channels.Selector _writeSelector; private int _maxPacketSize; private static final int StateNeedConnect = 0; |