diff options
author | Benoit Foucher <benoit@zeroc.com> | 2009-10-27 12:00:32 +0100 |
---|---|---|
committer | Benoit Foucher <benoit@zeroc.com> | 2009-10-27 12:00:32 +0100 |
commit | 1fdb973182e589b0d20e987360bd694ae783b0a2 (patch) | |
tree | a94cbbcc0d078bcf7aa4427951799e09feb80826 /java/src | |
parent | add support for number protocol in Python (diff) | |
download | ice-1fdb973182e589b0d20e987360bd694ae783b0a2.tar.bz2 ice-1fdb973182e589b0d20e987360bd694ae783b0a2.tar.xz ice-1fdb973182e589b0d20e987360bd694ae783b0a2.zip |
Cleaned up UDP transceivers, fixes for bug 4223 and 4320
Diffstat (limited to 'java/src')
-rw-r--r-- | java/src/Ice/ConnectionI.java | 36 | ||||
-rw-r--r-- | java/src/IceInternal/ReferenceFactory.java | 15 | ||||
-rw-r--r-- | java/src/IceInternal/UdpTransceiver.java | 196 |
3 files changed, 155 insertions, 92 deletions
diff --git a/java/src/Ice/ConnectionI.java b/java/src/Ice/ConnectionI.java index 7fb693abf3c..5f107e2da6c 100644 --- a/java/src/Ice/ConnectionI.java +++ b/java/src/Ice/ConnectionI.java @@ -291,6 +291,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); + // + // Ensure the message isn't bigger than what we can send with the + // transport. + // + _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); + if(response) { // @@ -358,6 +364,12 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne assert(_state > StateNotValidated); assert(_state < StateClosing); + // + // Ensure the message isn't bigger than what we can send with the + // transport. + // + _transceiver.checkSendSize(os.getBuffer(), _instance.messageSizeMax()); + if(response) { // @@ -880,13 +892,14 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } if((current.operation & IceInternal.SocketOperation.Read) != 0 && !_readStream.isEmpty()) { - if(_readStream.size() == IceInternal.Protocol.headerSize) // Read header. + if(_readHeader) // Read header if necessary. { if(!_transceiver.read(_readStream.getBuffer(), _hasMoreData)) { return; } assert(!_readStream.getBuffer().b.hasRemaining()); + _readHeader = false; int pos = _readStream.pos(); if(pos < IceInternal.Protocol.headerSize) @@ -903,8 +916,8 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne m[1] = _readStream.readByte(); m[2] = _readStream.readByte(); m[3] = _readStream.readByte(); - if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] - || m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) + if(m[0] != IceInternal.Protocol.magic[0] || m[1] != IceInternal.Protocol.magic[1] || + m[2] != IceInternal.Protocol.magic[2] || m[3] != IceInternal.Protocol.magic[3]) { Ice.BadMagicException ex = new Ice.BadMagicException(); ex.badMagic = m; @@ -957,12 +970,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne { if(_endpoint.datagram()) { - if(_warnUdp) - { - _logger.warning("DatagramLimitException: maximum size of " + _readStream.pos() + - " exceeded"); - } - throw new Ice.DatagramLimitException(); + throw new Ice.DatagramLimitException(); // The message was truncated. } else { @@ -1015,8 +1023,13 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } catch(DatagramLimitException ex) // Expected. { + if(_warnUdp) + { + _logger.warning("maximum datagram size of " + _readStream.pos() + " exceeded"); + } _readStream.resize(IceInternal.Protocol.headerSize, true); _readStream.pos(0); + _readHeader = true; return; } catch(SocketException ex) @@ -1035,6 +1048,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne } _readStream.resize(IceInternal.Protocol.headerSize, true); _readStream.pos(0); + _readHeader = true; } else { @@ -1284,6 +1298,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _batchRequestCompress = false; _batchMarker = 0; _readStream = new IceInternal.BasicStream(instance); + _readHeader = false; _writeStream = new IceInternal.BasicStream(instance); _dispatchCount = 0; _state = StateNotInitialized; @@ -1732,6 +1747,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _writeStream.pos(0); _readStream.resize(IceInternal.Protocol.headerSize, true); + _readHeader = true; _readStream.pos(0); return true; @@ -1950,6 +1966,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne _readStream.swap(info.stream); _readStream.resize(IceInternal.Protocol.headerSize, true); _readStream.pos(0); + _readHeader = true; assert(info.stream.pos() == info.stream.size()); @@ -2490,6 +2507,7 @@ public final class ConnectionI extends IceInternal.EventHandler implements Conne private java.util.LinkedList<OutgoingMessage> _sendStreams = new java.util.LinkedList<OutgoingMessage>(); private IceInternal.BasicStream _readStream; + private boolean _readHeader; private IceInternal.BasicStream _writeStream; private int _dispatchCount; diff --git a/java/src/IceInternal/ReferenceFactory.java b/java/src/IceInternal/ReferenceFactory.java index d329b6249a0..30a236394ec 100644 --- a/java/src/IceInternal/ReferenceFactory.java +++ b/java/src/IceInternal/ReferenceFactory.java @@ -44,13 +44,14 @@ public final class ReferenceFactory // // Create new reference // - FixedReference ref = new FixedReference(_instance, - _communicator, - ident, - "", // Facet - Reference.ModeTwoway, - false, - fixedConnection); + FixedReference ref = new FixedReference( + _instance, + _communicator, + ident, + "", // Facet + fixedConnection.endpoint().datagram() ? Reference.ModeDatagram : Reference.ModeTwoway, + fixedConnection.endpoint().secure(), + fixedConnection); return updateCache(ref); } diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index bd18ad77678..2001f8e0e67 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -32,7 +32,7 @@ final class UdpTransceiver implements Transceiver { assert(_fd != null); - if(!_connect && _traceLevels.network >= 1) + if(_state >= StateConnected && _traceLevels.network >= 1) { String s = "closing udp connection\n" + toString(); _logger.trace(_traceLevels.networkCat, s); @@ -52,40 +52,28 @@ final class UdpTransceiver implements Transceiver write(Buffer buf) { assert(buf.b.position() == 0); - final int packetSize = java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead); - if(packetSize < buf.size()) - { - // - // We don't log a warning here because the client gets an exception anyway. - // - throw new Ice.DatagramLimitException(); - } + assert(_fd != null && _state >= StateConnected); + + // The caller is supposed to check the send size before by calling checkSendSize + assert(java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead) >= buf.size()); - while(buf.b.hasRemaining()) + int ret = 0; + while(true) { try { - assert(_fd != null); - - int ret = _fd.write(buf.b); - - if(ret == 0) + if(_state == StateConnected) { - return false; - } - - if(_traceLevels.network >= 3) - { - String s = "sent " + ret + " bytes via udp\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + ret = _fd.write(buf.b); } - - if(_stats != null) + else { - _stats.bytesSent(type(), ret); + if(_peerAddr == null) + { + throw new Ice.SocketException(); // No peer has sent a datagram yet. + } + ret = _fd.send(buf.b, _peerAddr); } - - assert(ret == buf.b.limit()); break; } catch(java.nio.channels.AsynchronousCloseException ex) @@ -112,6 +100,23 @@ final class UdpTransceiver implements Transceiver } } + if(ret == 0) + { + return false; + } + + if(_traceLevels.network >= 3) + { + String s = "sent " + ret + " bytes via udp\n" + toString(); + _logger.trace(_traceLevels.networkCat, s); + } + + if(_stats != null) + { + _stats.bytesSent(type(), ret); + } + + assert(ret == buf.b.limit()); return true; } @@ -122,18 +127,6 @@ final class UdpTransceiver implements Transceiver moreData.value = false; final int packetSize = java.lang.Math.min(_maxPacketSize, _rcvSize - _udpOverhead); - if(packetSize < buf.size()) - { - // - // We log a warning here because this is the server side -- without the - // the warning, there would only be silence. - // - if(_warn) - { - _logger.warning("DatagramLimitException: maximum size of " + packetSize + " exceeded"); - } - throw new Ice.DatagramLimitException(); - } buf.resize(packetSize, true); buf.b.position(0); @@ -142,31 +135,14 @@ final class UdpTransceiver implements Transceiver { try { - java.net.InetSocketAddress sender = (java.net.InetSocketAddress)_fd.receive(buf.b); - - if(sender == null || buf.b.position() == 0) + java.net.SocketAddress peerAddr = _fd.receive(buf.b); + if(peerAddr == null || buf.b.position() == 0) { return false; } + _peerAddr = (java.net.InetSocketAddress)peerAddr; ret = buf.b.position(); - - if(_connect) - { - // - // If we must connect, then we connect to the first peer that - // sends us a packet. - // - Network.doConnect(_fd, sender); - _connect = false; // We're connected now - - if(_traceLevels.network >= 1) - { - String s = "connected udp socket\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); - } - } - break; } catch(java.nio.channels.AsynchronousCloseException ex) @@ -193,6 +169,21 @@ final class UdpTransceiver implements Transceiver } } + if(_state == StateNeedConnect) + { + // + // If we must connect, we connect to the first peer that sends us a packet. + // + Network.doConnect(_fd, _peerAddr); + _state = StateConnected; + + if(_traceLevels.network >= 1) + { + String s = "connected udp socket\n" + toString(); + _logger.trace(_traceLevels.networkCat, s); + } + } + if(_traceLevels.network >= 3) { String s = "received " + ret + " bytes via udp\n" + toString(); @@ -219,14 +210,31 @@ final class UdpTransceiver implements Transceiver public String toString() { - if(_mcastAddr != null && _fd != null) + if(_fd == null) { - return Network.fdToString(_fd) + "\nmulticast address = " + Network.addrToString(_mcastAddr); + return "<closed>"; + } + + String s; + if(_state == StateNotConnected) + { + java.net.DatagramSocket socket = ((java.nio.channels.DatagramChannel)_fd).socket(); + s = "local address = " + Network.addrToString((java.net.InetSocketAddress)socket.getLocalSocketAddress()); + if(_peerAddr != null) + { + s += "\nremote address = " + Network.addrToString(_peerAddr); + } } else { - return Network.fdToString(_fd); + s = Network.fdToString(_fd); + } + + if(_mcastAddr != null) + { + s += "\nmulticast address = " + Network.addrToString(_mcastAddr); } + return s; } public Ice.ConnectionInfo @@ -238,15 +246,31 @@ final class UdpTransceiver implements Transceiver java.net.DatagramSocket socket = _fd.socket(); info.localAddress = socket.getLocalAddress().getHostAddress(); info.localPort = socket.getLocalPort(); - if(socket.getInetAddress() != null) + if(_state == StateNotConnected) { - info.remoteAddress = socket.getInetAddress().getHostAddress(); - info.remotePort = socket.getPort(); + if(_peerAddr != null) + { + info.remoteAddress = _peerAddr.getAddress().getHostAddress(); + info.remotePort = _peerAddr.getPort(); + } + else + { + info.remoteAddress = ""; + info.remotePort = -1; + } } else { - info.remoteAddress = ""; - info.remotePort = -1; + if(socket.getInetAddress() != null) + { + info.remoteAddress = socket.getInetAddress().getHostAddress(); + info.remotePort = socket.getPort(); + } + else + { + info.remoteAddress = ""; + info.remotePort = -1; + } } if(_mcastAddr != null) { @@ -268,6 +292,11 @@ final class UdpTransceiver implements Transceiver { Ex.throwMemoryLimitException(buf.size(), messageSizeMax); } + + // + // The maximum packetSize is either the maximum allowable UDP packet size, or + // the UDP send buffer size (which ever is smaller). + // final int packetSize = java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead); if(packetSize < buf.size()) { @@ -289,8 +318,7 @@ final class UdpTransceiver implements Transceiver _traceLevels = instance.traceLevels(); _logger = instance.initializationData().logger; _stats = instance.initializationData().stats; - _connect = true; - _warn = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; + _state = StateNeedConnect; _addr = addr; try @@ -299,7 +327,7 @@ final class UdpTransceiver implements Transceiver setBufSize(instance); Network.setBlock(_fd, false); Network.doConnect(_fd, _addr); - _connect = false; // We're connected now + _state = StateConnected; // We're connected now if(_addr.getAddress().isMulticastAddress()) { configureMulticast(null, mcastInterface, mcastTtl); @@ -326,8 +354,7 @@ final class UdpTransceiver implements Transceiver _traceLevels = instance.traceLevels(); _logger = instance.initializationData().logger; _stats = instance.initializationData().stats; - _connect = connect; - _warn = instance.initializationData().properties.getPropertyAsInt("Ice.Warn.Datagrams") > 0; + _state = connect ? StateNeedConnect : StateNotConnected; try { @@ -344,7 +371,20 @@ final class UdpTransceiver implements Transceiver { Network.setReuseAddress(_fd, true); _mcastAddr = _addr; - _addr = Network.doBind(_fd, Network.getAddress("0.0.0.0", port, Network.EnableIPv4)); + if(System.getProperty("os.name").startsWith("Windows")) + { + // + // Windows does not allow binding to the mcast address itself + // so we bind to INADDR_ANY (0.0.0.0) instead. As a result, + // bi-directional connection won't work because the source + // address won't be the multicast address and the client will + // therefore reject the datagram. + // + int protocol = + _mcastAddr.getAddress().getAddress().length == 4 ? Network.EnableIPv4 : Network.EnableIPv6; + _addr = Network.getAddressForServer("", port, protocol); + } + _addr = Network.doBind(_fd, _addr); if(port == 0) { _mcastAddr = new java.net.InetSocketAddress(_mcastAddr.getAddress(), _addr.getPort()); @@ -425,7 +465,7 @@ final class UdpTransceiver implements Transceiver // Get property for buffer size and check for sanity. // int sizeRequested = instance.initializationData().properties.getPropertyAsIntWithDefault(prop, dfltSize); - if(sizeRequested < _udpOverhead) + if(sizeRequested < (_udpOverhead + IceInternal.Protocol.headerSize)) { _logger.warning("Invalid " + prop + " value of " + sizeRequested + " adjusted to " + dfltSize); sizeRequested = dfltSize; @@ -570,13 +610,13 @@ final class UdpTransceiver implements Transceiver private TraceLevels _traceLevels; private Ice.Logger _logger; private Ice.Stats _stats; - private boolean _connect; - private final boolean _warn; + private int _state; private int _rcvSize; private int _sndSize; private java.nio.channels.DatagramChannel _fd; private java.net.InetSocketAddress _addr; private java.net.InetSocketAddress _mcastAddr = null; + private java.net.InetSocketAddress _peerAddr = null; // // The maximum IP datagram size is 65535. Subtract 20 bytes for the IP header and 8 bytes for the UDP header @@ -584,4 +624,8 @@ final class UdpTransceiver implements Transceiver // private final static int _udpOverhead = 20 + 8; private final static int _maxPacketSize = 65535 - _udpOverhead; + + private static final int StateNeedConnect = 0; + private static final int StateConnected = 1; + private static final int StateNotConnected = 2; } |