diff options
author | Mark Spruiell <mes@zeroc.com> | 2014-05-29 11:06:44 -0700 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2014-05-29 11:06:44 -0700 |
commit | 3cfd324cdcc65d2acbc7536f1652d44f66a0e896 (patch) | |
tree | 44613394c5b9c6c6eb0ec8b41e110002a58d60ea /java/src/IceInternal | |
parent | Fixed Python throughput demo config (diff) | |
download | ice-3cfd324cdcc65d2acbc7536f1652d44f66a0e896.tar.bz2 ice-3cfd324cdcc65d2acbc7536f1652d44f66a0e896.tar.xz ice-3cfd324cdcc65d2acbc7536f1652d44f66a0e896.zip |
porting C++ transport changes to Java
Diffstat (limited to 'java/src/IceInternal')
29 files changed, 1646 insertions, 1880 deletions
diff --git a/java/src/IceInternal/Acceptor.java b/java/src/IceInternal/Acceptor.java index fb1f9f24b3d..0d883d6ef8b 100644 --- a/java/src/IceInternal/Acceptor.java +++ b/java/src/IceInternal/Acceptor.java @@ -15,5 +15,6 @@ public interface Acceptor void close(); void listen(); Transceiver accept(); + String protocol(); String toString(); } diff --git a/java/src/IceInternal/EndpointFactory.java b/java/src/IceInternal/EndpointFactory.java index aabdec78581..61425287f5c 100644 --- a/java/src/IceInternal/EndpointFactory.java +++ b/java/src/IceInternal/EndpointFactory.java @@ -13,7 +13,9 @@ public interface EndpointFactory { short type(); String protocol(); - EndpointI create(String str, boolean oaEndpoint); + EndpointI create(java.util.ArrayList<String> args, boolean oaEndpoint); EndpointI read(BasicStream s); void destroy(); + + EndpointFactory clone(ProtocolInstance instance); } diff --git a/java/src/IceInternal/EndpointFactoryManager.java b/java/src/IceInternal/EndpointFactoryManager.java index 94f61fbb873..64fd927c771 100644 --- a/java/src/IceInternal/EndpointFactoryManager.java +++ b/java/src/IceInternal/EndpointFactoryManager.java @@ -16,8 +16,7 @@ public final class EndpointFactoryManager _instance = instance; } - public synchronized void - add(EndpointFactory factory) + public synchronized void add(EndpointFactory factory) { for(int i = 0; i < _factories.size(); i++) { @@ -30,8 +29,7 @@ public final class EndpointFactoryManager _factories.add(factory); } - public synchronized EndpointFactory - get(short type) + public synchronized EndpointFactory get(short type) { for(int i = 0; i < _factories.size(); i++) { @@ -44,77 +42,101 @@ public final class EndpointFactoryManager return null; } - public synchronized EndpointI - create(String str, boolean oaEndpoint) + public synchronized EndpointI create(String str, boolean oaEndpoint) { - String s = str.trim(); - if(s.length() == 0) + String[] arr = IceUtilInternal.StringUtil.splitString(str, " \t\r\n"); + if(arr == null) { Ice.EndpointParseException e = new Ice.EndpointParseException(); - e.str = "value has no non-whitespace characters"; + e.str = "mismatched quote"; throw e; } - java.util.regex.Pattern p = java.util.regex.Pattern.compile("([ \t\n\r]+)|$"); - java.util.regex.Matcher m = p.matcher(s); - boolean b = m.find(); - assert(b); + if(arr.length == 0) + { + Ice.EndpointParseException e = new Ice.EndpointParseException(); + e.str = "value has no non-whitespace characters"; + throw e; + } - String protocol = s.substring(0, m.start()); + java.util.ArrayList<String> v = new java.util.ArrayList<String>(java.util.Arrays.asList(arr)); + String protocol = v.get(0); + v.remove(0); if(protocol.equals("default")) { protocol = _instance.defaultsAndOverrides().defaultProtocol; } + EndpointFactory factory = null; + for(int i = 0; i < _factories.size(); i++) { EndpointFactory f = _factories.get(i); if(f.protocol().equals(protocol)) { - return f.create(s.substring(m.end()), oaEndpoint); - - // Code below left in place for debugging. - - /* - EndpointI e = f.create(s.substring(m.end()), oaEndpoint); - BasicStream bs = new BasicStream(_instance, true, false); - e.streamWrite(bs); - java.nio.ByteBuffer buf = bs.getBuffer(); - buf.position(0); - short type = bs.readShort(); - EndpointI ue = new IceInternal.OpaqueEndpointI(type, bs); - System.err.println("Normal: " + e); - System.err.println("Opaque: " + ue); - return e; - */ + factory = f; } } + if(factory != null) + { + EndpointI e = factory.create(v, oaEndpoint); + if(!v.isEmpty()) + { + Ice.EndpointParseException ex = new Ice.EndpointParseException(); + ex.str = "unrecognized argument `" + v.get(0) + "' in endpoint `" + str + "'"; + throw ex; + } + return e; + + // Code below left in place for debugging. + + /* + EndpointI e = f.create(s.substring(m.end()), oaEndpoint); + BasicStream bs = new BasicStream(_instance, true, false); + e.streamWrite(bs); + java.nio.ByteBuffer buf = bs.getBuffer(); + buf.position(0); + short type = bs.readShort(); + EndpointI ue = new IceInternal.OpaqueEndpointI(type, bs); + System.err.println("Normal: " + e); + System.err.println("Opaque: " + ue); + return e; + */ + } + // // If the stringified endpoint is opaque, create an unknown endpoint, // then see whether the type matches one of the known endpoints. // if(protocol.equals("opaque")) { - EndpointI ue = new OpaqueEndpointI(s.substring(m.end())); - for(int i = 0; i < _factories.size(); i++) + EndpointI ue = new OpaqueEndpointI(v); + if(!v.isEmpty()) + { + Ice.EndpointParseException ex = new Ice.EndpointParseException(); + ex.str = "unrecognized argument `" + v.get(0) + "' in endpoint `" + str + "'"; + throw ex; + } + factory = get(ue.type()); + if(factory != null) { - EndpointFactory f = _factories.get(i); - if(f.type() == ue.type()) - { - // - // Make a temporary stream, write the opaque endpoint data into the stream, - // and ask the factory to read the endpoint data from that stream to create - // the actual endpoint. - // - BasicStream bs = new BasicStream(_instance, Protocol.currentProtocolEncoding, true, false); - ue.streamWrite(bs); - Buffer buf = bs.getBuffer(); - buf.b.position(0); - bs.readShort(); // type - return f.read(bs); - } + // + // Make a temporary stream, write the opaque endpoint data into the stream, + // and ask the factory to read the endpoint data from that stream to create + // the actual endpoint. + // + BasicStream bs = new BasicStream(_instance, Protocol.currentProtocolEncoding, true, false); + bs.writeShort(ue.type()); + ue.streamWrite(bs); + Buffer buf = bs.getBuffer(); + buf.b.position(0); + bs.readShort(); // type + bs.startReadEncaps(); + EndpointI e = factory.read(bs); + bs.endReadEncaps(); + return e; } return ue; // Endpoint is opaque, but we don't have a factory for its type. } @@ -122,23 +144,30 @@ public final class EndpointFactoryManager return null; } - public synchronized EndpointI - read(BasicStream s) + public synchronized EndpointI read(BasicStream s) { short type = s.readShort(); - for(int i = 0; i < _factories.size(); i++) + + EndpointFactory factory = get(type); + EndpointI e = null; + + s.startReadEncaps(); + + if(factory != null) { - EndpointFactory f = _factories.get(i); - if(f.type() == type) - { - return f.read(s); - } + e = factory.read(s); } - return new OpaqueEndpointI(type, s); + else + { + e = new OpaqueEndpointI(type, s); + } + + s.endReadEncaps(); + + return e; } - void - destroy() + void destroy() { for(int i = 0; i < _factories.size(); i++) { diff --git a/java/src/IceInternal/EndpointHostResolver.java b/java/src/IceInternal/EndpointHostResolver.java index 43d646c6fb9..898055c102b 100644 --- a/java/src/IceInternal/EndpointHostResolver.java +++ b/java/src/IceInternal/EndpointHostResolver.java @@ -34,8 +34,8 @@ public class EndpointHostResolver } } - public java.util.List<Connector> - resolve(String host, int port, Ice.EndpointSelectionType selType, EndpointI endpoint) + public java.util.List<Connector> resolve(String host, int port, Ice.EndpointSelectionType selType, + IPEndpointI endpoint) { // // Try to get the addresses without DNS lookup. If this doesn't @@ -92,8 +92,8 @@ public class EndpointHostResolver return connectors; } - synchronized public void - resolve(String host, int port, Ice.EndpointSelectionType selType, EndpointI endpoint, EndpointI_connectors callback) + synchronized public void resolve(String host, int port, Ice.EndpointSelectionType selType, IPEndpointI endpoint, + EndpointI_connectors callback) { // // TODO: Optimize to avoid the lookup if the given host is a textual IPv4 or IPv6 @@ -124,16 +124,14 @@ public class EndpointHostResolver notify(); } - synchronized public void - destroy() + synchronized public void destroy() { assert(!_destroyed); _destroyed = true; notify(); } - public void - joinWithThread() + public void joinWithThread() { if(_thread != null) { @@ -151,8 +149,7 @@ public class EndpointHostResolver } } - public void - run() + public void run() { while(true) { @@ -236,8 +233,7 @@ public class EndpointHostResolver _queue.clear(); } - synchronized public void - updateObserver() + synchronized public void updateObserver() { Ice.Instrumentation.CommunicatorObserver obsv = _instance.getObserver(); if(obsv != null) @@ -258,7 +254,7 @@ public class EndpointHostResolver String host; int port; Ice.EndpointSelectionType selType; - EndpointI endpoint; + IPEndpointI endpoint; EndpointI_connectors callback; Ice.Instrumentation.Observer observer; } @@ -282,8 +278,7 @@ public class EndpointHostResolver setName(threadName + "Ice.HostResolver"); } - public void - run() + public void run() { try { diff --git a/java/src/IceInternal/EndpointI.java b/java/src/IceInternal/EndpointI.java index 99888b7ec85..6d91760b570 100644 --- a/java/src/IceInternal/EndpointI.java +++ b/java/src/IceInternal/EndpointI.java @@ -11,19 +11,21 @@ package IceInternal; abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<EndpointI> { - public EndpointI(String connectionId) - { - _connectionId = connectionId; - } - - public EndpointI() + public String toString() { + return _toString(); } - public String - toString() + public String _toString() { - return _toString(); + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + return protocol() + options(); } // @@ -35,12 +37,12 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En // Return the endpoint type. // public abstract short type(); - + // // Return the protocol name. // public abstract String protocol(); - + // // Return the timeout for the endpoint in milliseconds. 0 means // non-blocking, -1 means no timeout. @@ -55,6 +57,11 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En public abstract EndpointI timeout(int t); // + // Return the connection ID + // + public abstract String connectionId(); + + // // Return a new endpoint with a different connection id. // public abstract EndpointI connectionId(String connectionId); @@ -64,7 +71,7 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En // otherwise. // public abstract boolean compress(); - + // // Return a new endpoint with a different compression value, // provided that compression is supported by the @@ -83,14 +90,6 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En public abstract boolean secure(); // - // Return the connection ID - // - public String connectionId() - { - return _connectionId; - } - - // // Return a server side transceiver for this endpoint, or null if a // transceiver can only be created by an acceptor. In case a // transceiver is created, this operation also returns a new @@ -126,15 +125,53 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En // public abstract boolean equivalent(EndpointI endpoint); - public java.util.List<Connector> - connectors(java.util.List<java.net.InetSocketAddress> addresses, NetworkProxy proxy) + public abstract String options(); + + public void initWithOptions(java.util.ArrayList<String> args) { - // - // This method must be extended by endpoints which use the EndpointHostResolver to create - // connectors from IP addresses. - // - assert(false); - return null; + java.util.ArrayList<String> unknown = new java.util.ArrayList<String>(); + + String str = "`" + protocol() + " "; + for(String p : args) + { + if(IceUtilInternal.StringUtil.findFirstOf(p, " \t\n\r") != -1) + { + str += " \"" + p + "\""; + } + else + { + str += " " + p; + } + } + str += "'"; + + for(int n = 0; n < args.size(); ++n) + { + String option = args.get(n); + if(option.length() < 2 || option.charAt(0) != '-') + { + unknown.add(option); + continue; + } + + String argument = null; + if(n + 1 < args.size() && args.get(n + 1).charAt(0) != '-') + { + argument = args.get(++n); + } + + if(!checkOption(option, argument, str)) + { + unknown.add(option); + if(argument != null) + { + unknown.add(argument); + } + } + } + + args.clear(); + args.addAll(unknown); } // @@ -149,15 +186,9 @@ abstract public class EndpointI implements Ice.Endpoint, java.lang.Comparable<En return compareTo((EndpointI)obj) == 0; } - public int compareTo(EndpointI p) // From java.lang.Comparable. + protected boolean checkOption(String option, String argument, String endpoint) { - if(!_connectionId.equals(p._connectionId)) - { - return _connectionId.compareTo(p._connectionId); - } - - return 0; + // Must be overridden to check for options. + return false; } - - protected String _connectionId = ""; } diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java index 0b8b0efaef8..5dc747b2118 100644 --- a/java/src/IceInternal/EventHandler.java +++ b/java/src/IceInternal/EventHandler.java @@ -31,17 +31,8 @@ public abstract class EventHandler // abstract public java.nio.channels.SelectableChannel fd(); - // - // In Java, it's possible that the transceiver reads more data than what was - // really asked. If this is the case, hasMoreData() returns true and the handler - // read() method should be called again (without doing a select()). This is - // handled by the Selector class (it adds the handler to a separate list of - // handlers if this method returns true.) - // - abstract public boolean hasMoreData(); - - int _disabled = 0; - int _registered = 0; - int _ready = 0; - java.nio.channels.SelectionKey _key = null; + public int _disabled = 0; + public Ice.BooleanHolder _hasMoreData = new Ice.BooleanHolder(false); + public int _registered = 0; + public java.nio.channels.SelectionKey _key = null; } diff --git a/java/src/IceInternal/EventHandlerOpPair.java b/java/src/IceInternal/EventHandlerOpPair.java new file mode 100644 index 00000000000..43325ecb4f5 --- /dev/null +++ b/java/src/IceInternal/EventHandlerOpPair.java @@ -0,0 +1,22 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +class EventHandlerOpPair +{ + EventHandlerOpPair(EventHandler handler, int op) + { + this.handler = handler; + this.op = op; + } + + EventHandler handler; + int op; +} diff --git a/java/src/IceInternal/IPEndpointI.java b/java/src/IceInternal/IPEndpointI.java new file mode 100644 index 00000000000..7c860106754 --- /dev/null +++ b/java/src/IceInternal/IPEndpointI.java @@ -0,0 +1,327 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public abstract class IPEndpointI extends EndpointI +{ + protected IPEndpointI(ProtocolInstance instance, String host, int port, String connectionId) + { + _instance = instance; + _host = host; + _port = port; + _connectionId = connectionId; + _hashInitialized = false; + } + + protected IPEndpointI(ProtocolInstance instance) + { + _instance = instance; + _host = null; + _port = 0; + _connectionId = ""; + _hashInitialized = false; + } + + protected IPEndpointI(ProtocolInstance instance, BasicStream s) + { + _instance = instance; + _host = s.readString(); + _port = s.readInt(); + _connectionId = ""; + _hashInitialized = false; + } + + public void streamWrite(BasicStream s) + { + s.startWriteEncaps(); + streamWriteImpl(s); + s.endWriteEncaps(); + } + + public Ice.EndpointInfo getInfo() + { + Ice.IPEndpointInfo info = new Ice.IPEndpointInfo() + { + public short type() + { + return IPEndpointI.this.type(); + } + + public boolean datagram() + { + return IPEndpointI.this.datagram(); + } + + public boolean secure() + { + return IPEndpointI.this.secure(); + } + }; + fillEndpointInfo(info); + return info; + } + + public short type() + { + return _instance.type(); + } + + public String protocol() + { + return _instance.protocol(); + } + + public String connectionId() + { + return _connectionId; + } + + public EndpointI connectionId(String connectionId) + { + if(connectionId.equals(_connectionId)) + { + return this; + } + else + { + return createEndpoint(_host, _port, connectionId); + } + } + + public java.util.List<Connector> connectors(Ice.EndpointSelectionType selType) + { + return _instance.resolve(_host, _port, selType, this); + } + + public void connectors_async(Ice.EndpointSelectionType selType, EndpointI_connectors callback) + { + _instance.resolve(_host, _port, selType, this, callback); + } + + public java.util.List<EndpointI> expand() + { + java.util.List<EndpointI> endps = new java.util.ArrayList<EndpointI>(); + java.util.List<String> hosts = Network.getHostsForEndpointExpand(_host, _instance.protocolSupport(), false); + if(hosts == null || hosts.isEmpty()) + { + endps.add(this); + } + else + { + for(String h : hosts) + { + endps.add(createEndpoint(h, _port, _connectionId)); + } + } + return endps; + } + + public boolean equivalent(EndpointI endpoint) + { + if(!(endpoint instanceof IPEndpointI)) + { + return false; + } + IPEndpointI ipEndpointI = (IPEndpointI)endpoint; + return ipEndpointI.type() == type() && ipEndpointI._host.equals(_host) && ipEndpointI._port == _port; + } + + public java.util.List<Connector> connectors(java.util.List<java.net.InetSocketAddress> addresses, + NetworkProxy proxy) + { + java.util.List<Connector> connectors = new java.util.ArrayList<Connector>(); + for(java.net.InetSocketAddress p : addresses) + { + connectors.add(createConnector(p, proxy)); + } + return connectors; + } + + synchronized public int hashCode() + { + if(!_hashInitialized) + { + _hashValue = 5381; + _hashValue = HashUtil.hashAdd(_hashValue, type()); + _hashValue = hashInit(_hashValue); + } + return _hashValue; + } + + public String options() + { + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + String s = ""; + + if(_host != null && _host.length() > 0) + { + s += " -h "; + boolean addQuote = _host.indexOf(':') != -1; + if(addQuote) + { + s += "\""; + } + s += _host; + if(addQuote) + { + s += "\""; + } + } + + s += " -p " + _port; + + return s; + } + + public int compareTo(EndpointI obj) // From java.lang.Comparable + { + if(!(obj instanceof IPEndpointI)) + { + return type() < obj.type() ? -1 : 1; + } + + IPEndpointI p = (IPEndpointI)obj; + if(this == p) + { + return 0; + } + + int v = _host.compareTo(p._host); + if(v != 0) + { + return v; + } + + if(_port < p._port) + { + return -1; + } + else if(p._port < _port) + { + return 1; + } + + return _connectionId.compareTo(p._connectionId); + } + + public String host() + { + return _host; + } + + public int port() + { + return _port; + } + + protected void streamWriteImpl(BasicStream s) + { + s.writeString(_host); + s.writeInt(_port); + } + + protected int hashInit(int h) + { + h = HashUtil.hashAdd(h, _host); + h = HashUtil.hashAdd(h, _port); + h = HashUtil.hashAdd(h, _connectionId); + return h; + } + + protected void fillEndpointInfo(Ice.IPEndpointInfo info) + { + info.host = _host; + info.port = _port; + } + + public void initWithOptions(java.util.ArrayList<String> args, boolean oaEndpoint) + { + super.initWithOptions(args); + + if(_host == null || _host.length() == 0) + { + _host = _instance.defaultHost(); + } + else if(_host.equals("*")) + { + if(oaEndpoint) + { + _host = ""; + } + else + { + throw new Ice.EndpointParseException("`-h *' not valid for proxy endpoint `" + toString() + "'"); + } + } + } + + protected boolean checkOption(String option, String argument, String endpoint) + { + switch(option.charAt(1)) + { + case 'h': + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -h option in endpoint " + endpoint); + } + _host = argument; + return true; + } + + case 'p': + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -p option in endpoint " + endpoint); + } + + try + { + _port = Integer.parseInt(argument); + } + catch(NumberFormatException ex) + { + throw new Ice.EndpointParseException("invalid port value `" + argument + + "' in endpoint " + endpoint); + } + + if(_port < 0 || _port > 65535) + { + throw new Ice.EndpointParseException("port value `" + argument + + "' out of range in endpoint " + endpoint); + } + + return true; + } + + default: + { + return false; + } + } + } + + protected abstract Connector createConnector(java.net.InetSocketAddress addr, NetworkProxy proxy); + protected abstract IPEndpointI createEndpoint(String host, int port, String connectionId); + + protected ProtocolInstance _instance; + protected String _host; + protected int _port; + protected String _connectionId; + private boolean _hashInitialized; + private int _hashValue; +} diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index 54a9076fde7..59c63fb3b2a 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -308,13 +308,6 @@ public final class IncomingConnectionFactory extends EventHandler implements Ice return _acceptor.fd(); } - public boolean - hasMoreData() - { - assert(_acceptor != null); - return false; - } - // // Operations from ConnectionI.StartCallback // diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index be091ba60e7..0fa8d7e4422 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -782,9 +782,11 @@ public final class Instance } _endpointFactoryManager = new EndpointFactoryManager(this); - EndpointFactory tcpEndpointFactory = new TcpEndpointFactory(this); + ProtocolInstance tcpProtocolInstance = new ProtocolInstance(this, Ice.TCPEndpointType.value, "tcp"); + EndpointFactory tcpEndpointFactory = new TcpEndpointFactory(tcpProtocolInstance); _endpointFactoryManager.add(tcpEndpointFactory); - EndpointFactory udpEndpointFactory = new UdpEndpointFactory(this); + ProtocolInstance udpProtocolInstance = new ProtocolInstance(this, Ice.UDPEndpointType.value, "udp"); + EndpointFactory udpEndpointFactory = new UdpEndpointFactory(udpProtocolInstance); _endpointFactoryManager.add(udpEndpointFactory); _pluginManager = new Ice.PluginManagerI(communicator, this); diff --git a/java/src/IceInternal/OpaqueEndpointI.java b/java/src/IceInternal/OpaqueEndpointI.java index 89c4305cbd0..70e3559a82b 100644 --- a/java/src/IceInternal/OpaqueEndpointI.java +++ b/java/src/IceInternal/OpaqueEndpointI.java @@ -11,179 +11,50 @@ package IceInternal; final class OpaqueEndpointI extends EndpointI { - public - OpaqueEndpointI(String str) + public OpaqueEndpointI(java.util.ArrayList<String> args) { - super(""); - + _type = -1; _rawEncoding = Ice.Util.Encoding_1_0; + _rawBytes = new byte[0]; - int topt = 0; - int vopt = 0; + initWithOptions(args); - String[] arr = str.split("[ \t\n\r]+"); - int i = 0; - while(i < arr.length) + if(_type < 0) { - if(arr[i].length() == 0) - { - i++; - continue; - } - - String option = arr[i++]; - if(option.length() != 2 || option.charAt(0) != '-') - { - throw new Ice.EndpointParseException("expected an endpoint option but found `" + option + - "' in endpoint `opaque " + str + "'"); - } - - String argument = null; - if(i < arr.length && arr[i].charAt(0) != '-') - { - argument = arr[i++]; - } - - switch(option.charAt(1)) - { - case 't': - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for -t option in endpoint `opaque " - + str + "'"); - } - - int t; - try - { - t = Integer.parseInt(argument); - } - catch(NumberFormatException ex) - { - throw new Ice.EndpointParseException("invalid type value `" + argument + - "' in endpoint `opaque " + str + "'"); - } - - if(t < 0 || t > 65535) - { - throw new Ice.EndpointParseException("type value `" + argument + - "' out of range in endpoint `opaque " + str + "'"); - } - - _type = (short)t; - ++topt; - if(topt > 1) - { - throw new Ice.EndpointParseException("multiple -t options in endpoint `opaque " + str + "'"); - } - break; - } - - case 'e': - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for -e option in endpoint `opaque " - + str + "'"); - } - - try - { - _rawEncoding = Ice.Util.stringToEncodingVersion(argument); - } - catch(Ice.VersionParseException e) - { - throw new Ice.EndpointParseException("invalid encoding version `" + argument + - "' in endpoint `opaque " + str + "':\n" + e.str); - } - break; - } - - case 'v': - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for -v option in endpoint `opaque " - + str + "'"); - } - - for(int j = 0; j < argument.length(); ++j) - { - if(!IceUtilInternal.Base64.isBase64(argument.charAt(j))) - { - throw new Ice.EndpointParseException("invalid base64 character `" + argument.charAt(j) + - "' (ordinal " + ((int)argument.charAt(j)) + - ") in endpoint `opaque " + str + "'"); - } - } - _rawBytes = IceUtilInternal.Base64.decode(argument); - ++vopt; - if(vopt > 1) - { - throw new Ice.EndpointParseException("multiple -v options in endpoint `opaque " + str + "'"); - } - break; - } - - default: - { - throw new Ice.EndpointParseException("invalid option `" + option + "' in endpoint `opaque " + - str + "'"); - } - } + throw new Ice.EndpointParseException("no -t option in endpoint " + toString()); } - - if(topt != 1) + if(_rawBytes.length == 0) { - throw new Ice.EndpointParseException("no -t option in endpoint `opaque " + str + "'"); - } - if(vopt != 1) - { - throw new Ice.EndpointParseException("no -v option in endpoint `opaque " + str + "'"); + throw new Ice.EndpointParseException("no -v option in endpoint " + toString()); } + calcHashValue(); } - public - OpaqueEndpointI(short type, BasicStream s) + public OpaqueEndpointI(short type, BasicStream s) { - super(""); _type = type; - _rawEncoding = s.startReadEncaps(); + _rawEncoding = s.getReadEncoding(); int sz = s.getReadEncapsSize(); _rawBytes = s.readBlob(sz); - s.endReadEncaps(); + calcHashValue(); } // // Marshal the endpoint // - public void - streamWrite(BasicStream s) + public void streamWrite(BasicStream s) { - s.writeShort(_type); s.startWriteEncaps(_rawEncoding, Ice.FormatType.DefaultFormat); s.writeBlob(_rawBytes); s.endWriteEncaps(); } // - // Convert the endpoint to its string form - // - public String - _toString() - { - String val = IceUtilInternal.Base64.encode(_rawBytes); - return "opaque -t " + _type + " -e " + Ice.Util.encodingVersionToString(_rawEncoding) + " -v " + val; - } - - // // Return the endpoint information. // - public Ice.EndpointInfo - getInfo() + public Ice.EndpointInfo getInfo() { return new Ice.OpaqueEndpointInfo(-1, false, _rawEncoding, _rawBytes) { @@ -191,12 +62,12 @@ final class OpaqueEndpointI extends EndpointI { return _type; } - + public boolean datagram() { return false; } - + public boolean secure() { return false; @@ -207,8 +78,7 @@ final class OpaqueEndpointI extends EndpointI // // Return the endpoint type // - public short - type() + public short type() { return _type; } @@ -216,8 +86,7 @@ final class OpaqueEndpointI extends EndpointI // // Return the protocol name // - public String - protocol() + public String protocol() { return "opaque"; } @@ -226,38 +95,39 @@ final class OpaqueEndpointI extends EndpointI // Return the timeout for the endpoint in milliseconds. 0 means // non-blocking, -1 means no timeout. // - public int - timeout() + public int timeout() { return -1; } - + // // Return a new endpoint with a different timeout value, provided // that timeouts are supported by the endpoint. Otherwise the same // endpoint is returned. // - public EndpointI - timeout(int t) + public EndpointI timeout(int t) { return this; } + public String connectionId() + { + return ""; + } + // // Return a new endpoint with a different connection id. // - public EndpointI - connectionId(String connectionId) + public EndpointI connectionId(String connectionId) { return this; } - + // // Return true if the endpoints support bzip2 compress, or false // otherwise. // - public boolean - compress() + public boolean compress() { return false; } @@ -267,8 +137,7 @@ final class OpaqueEndpointI extends EndpointI // provided that compression is supported by the // endpoint. Otherwise the same endpoint is returned. // - public EndpointI - compress(boolean compress) + public EndpointI compress(boolean compress) { return this; } @@ -276,17 +145,15 @@ final class OpaqueEndpointI extends EndpointI // // Return true if the endpoint is datagram-based. // - public boolean - datagram() + public boolean datagram() { return false; } - + // // Return true if the endpoint is secure. // - public boolean - secure() + public boolean secure() { return false; } @@ -294,8 +161,7 @@ final class OpaqueEndpointI extends EndpointI // // Get the encoded endpoint. // - public byte[] - rawBytes() + public byte[] rawBytes() { return _rawBytes; } @@ -307,8 +173,7 @@ final class OpaqueEndpointI extends EndpointI // "effective" endpoint, which might differ from this endpoint, // for example, if a dynamic port number is assigned. // - public Transceiver - transceiver(EndpointIHolder endpoint) + public Transceiver transceiver(EndpointIHolder endpoint) { endpoint.value = null; return null; @@ -318,14 +183,12 @@ final class OpaqueEndpointI extends EndpointI // Return connectors for this endpoint, or empty list if no connector // is available. // - public java.util.List<Connector> - connectors(Ice.EndpointSelectionType selType) + public java.util.List<Connector> connectors(Ice.EndpointSelectionType selType) { return new java.util.ArrayList<Connector>(); } - public void - connectors_async(Ice.EndpointSelectionType selType, EndpointI_connectors callback) + public void connectors_async(Ice.EndpointSelectionType selType, EndpointI_connectors callback) { callback.connectors(new java.util.ArrayList<Connector>()); } @@ -337,8 +200,7 @@ final class OpaqueEndpointI extends EndpointI // from this endpoint, for example, if a dynamic port number is // assigned. // - public Acceptor - acceptor(EndpointIHolder endpoint, String adapterName) + public Acceptor acceptor(EndpointIHolder endpoint, String adapterName) { endpoint.value = null; return null; @@ -349,8 +211,7 @@ final class OpaqueEndpointI extends EndpointI // host if listening on INADDR_ANY on server side or if no host // was specified on client side. // - public java.util.List<EndpointI> - expand() + public java.util.List<EndpointI> expand() { java.util.List<EndpointI> endps = new java.util.ArrayList<EndpointI>(); endps.add(this); @@ -360,23 +221,35 @@ final class OpaqueEndpointI extends EndpointI // // Check whether the endpoint is equivalent to another one. // - public boolean - equivalent(EndpointI endpoint) + public boolean equivalent(EndpointI endpoint) { return false; } - public int - hashCode() + public int hashCode() { return _hashCode; } - + + public String options() + { + String s = ""; + if(_type > -1) + { + s += " -t " + _type; + } + s += " -e " + Ice.Util.encodingVersionToString(_rawEncoding); + if(_rawBytes.length > 0) + { + s += " -v " + IceUtilInternal.Base64.encode(_rawBytes); + } + return s; + } + // // Compare endpoints for sorting purposes // - public int - compareTo(EndpointI obj) // From java.lang.Comparable + public int compareTo(EndpointI obj) // From java.lang.Comparable { if(!(obj instanceof OpaqueEndpointI)) { @@ -439,8 +312,92 @@ final class OpaqueEndpointI extends EndpointI return 0; } - private void - calcHashValue() + protected boolean checkOption(String option, String argument, String endpoint) + { + switch(option.charAt(1)) + { + case 't': + { + if(_type > -1) + { + throw new Ice.EndpointParseException("multiple -t options in endpoint " + endpoint); + } + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -t option in endpoint " + endpoint); + } + + int t; + try + { + t = Integer.parseInt(argument); + } + catch(NumberFormatException ex) + { + throw new Ice.EndpointParseException("invalid type value `" + argument + "' in endpoint " + endpoint); + } + + if(t < 0 || t > 65535) + { + throw new Ice.EndpointParseException("type value `" + argument + "' out of range in endpoint " + + endpoint); + } + + _type = (short)t; + return true; + } + + case 'v': + { + if(_rawBytes.length > 0) + { + throw new Ice.EndpointParseException("multiple -v options in endpoint " + endpoint); + } + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -v option in endpoint " + endpoint); + } + + for(int j = 0; j < argument.length(); ++j) + { + if(!IceUtilInternal.Base64.isBase64(argument.charAt(j))) + { + throw new Ice.EndpointParseException("invalid base64 character `" + argument.charAt(j) + + "' (ordinal " + ((int)argument.charAt(j)) + + ") in endpoint " + endpoint); + } + } + _rawBytes = IceUtilInternal.Base64.decode(argument); + return true; + } + + case 'e': + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -e option in endpoint " + endpoint); + } + + try + { + _rawEncoding = Ice.Util.stringToEncodingVersion(argument); + } + catch(Ice.VersionParseException e) + { + throw new Ice.EndpointParseException("invalid encoding version `" + argument + + "' in endpoint " + endpoint + ":\n" + e.str); + } + return true; + } + + default: + { + return false; + } + } + } + + private void calcHashValue() { int h = 5381; h = IceInternal.HashUtil.hashAdd(h, _type); diff --git a/java/src/IceInternal/OutgoingConnectionFactory.java b/java/src/IceInternal/OutgoingConnectionFactory.java index f4822753730..53a1abbfc90 100644 --- a/java/src/IceInternal/OutgoingConnectionFactory.java +++ b/java/src/IceInternal/OutgoingConnectionFactory.java @@ -28,6 +28,7 @@ public final class OutgoingConnectionFactory list.add(value); } + /* public void removeElementWithValue(K key, V value) { @@ -39,6 +40,20 @@ public final class OutgoingConnectionFactory this.remove(key); } } + */ + + public boolean + removeElementWithValue(K key, V value) + { + java.util.List<V> list = this.get(key); + assert(list != null); + boolean v = list.remove(value); + if(list.isEmpty()) + { + this.remove(key); + } + return v; + } }; interface CreateConnectionCallback diff --git a/java/src/IceInternal/ProtocolInstance.java b/java/src/IceInternal/ProtocolInstance.java new file mode 100644 index 00000000000..ee0b9074277 --- /dev/null +++ b/java/src/IceInternal/ProtocolInstance.java @@ -0,0 +1,114 @@ +// ********************************************************************** +// +// Copyright (c) 2003-2014 ZeroC, Inc. All rights reserved. +// +// This copy of Ice is licensed to you under the terms described in the +// ICE_LICENSE file included in this distribution. +// +// ********************************************************************** + +package IceInternal; + +public class ProtocolInstance +{ + public ProtocolInstance(Ice.Communicator communicator, short type, String protocol) + { + _instance = Util.getInstance(communicator); + _traceLevel = _instance.traceLevels().network; + _traceCategory = _instance.traceLevels().networkCat; + _logger = _instance.initializationData().logger; + _properties = _instance.initializationData().properties; + _type = type; + _protocol = protocol; + } + + public int traceLevel() + { + return _traceLevel; + } + + public String traceCategory() + { + return _traceCategory; + } + + public Ice.Logger logger() + { + return _logger; + } + + public String protocol() + { + return _protocol; + } + + public short type() + { + return _type; + } + + public Ice.Properties properties() + { + return _properties; + } + + public boolean preferIPv6() + { + return _instance.preferIPv6(); + } + + public int protocolSupport() + { + return _instance.protocolSupport(); + } + + public String defaultHost() + { + return _instance.defaultsAndOverrides().defaultHost; + } + + public Ice.EncodingVersion defaultEncoding() + { + return _instance.defaultsAndOverrides().defaultEncoding; + } + + public NetworkProxy networkProxy() + { + return _instance.networkProxy(); + } + + public int messageSizeMax() + { + return _instance.messageSizeMax(); + } + + public java.util.List<Connector> resolve(String host, int port, Ice.EndpointSelectionType type, IPEndpointI endpt) + { + return _instance.endpointHostResolver().resolve(host, port, type, endpt); + } + + public void resolve(String host, int port, Ice.EndpointSelectionType type, IPEndpointI endpt, + EndpointI_connectors callback) + { + _instance.endpointHostResolver().resolve(host, port, type, endpt, callback); + } + + ProtocolInstance(Instance instance, short type, String protocol) + { + _instance = instance; + _traceLevel = _instance.traceLevels().network; + _traceCategory = _instance.traceLevels().networkCat; + _logger = _instance.initializationData().logger; + _properties = _instance.initializationData().properties; + _type = type; + _protocol = protocol; + } + + protected Instance _instance; + protected int _traceLevel; + protected String _traceCategory; + protected Ice.Logger _logger; + protected Ice.Properties _properties; + protected String _protocol; + protected short _type; +} diff --git a/java/src/IceInternal/ProtocolPluginFacade.java b/java/src/IceInternal/ProtocolPluginFacade.java index c1834c1ae92..07080ea383e 100644 --- a/java/src/IceInternal/ProtocolPluginFacade.java +++ b/java/src/IceInternal/ProtocolPluginFacade.java @@ -18,42 +18,6 @@ public interface ProtocolPluginFacade Ice.Communicator getCommunicator(); // - // Get the endpoint host resolver. - // - IceInternal.EndpointHostResolver getEndpointHostResolver(); - - // - // Get the protocol support. - // - int getProtocolSupport(); - - // - // Get the protocol support. - // - boolean getPreferIPv6(); - - // - // Get the network proxy. - // - NetworkProxy getNetworkProxy(); - - // - // Get the default encoding to be used in endpoints. - // - Ice.EncodingVersion getDefaultEncoding(); - - // - // Get the default hostname to be used in endpoints. - // - String getDefaultHost(); - - // - // Get the network trace level and category name. - // - int getNetworkTraceLevel(); - String getNetworkTraceCategory(); - - // // Register an EndpointFactory. // void addEndpointFactory(EndpointFactory factory); diff --git a/java/src/IceInternal/ProtocolPluginFacadeI.java b/java/src/IceInternal/ProtocolPluginFacadeI.java index 0c51675b1e0..d4213bba8b8 100644 --- a/java/src/IceInternal/ProtocolPluginFacadeI.java +++ b/java/src/IceInternal/ProtocolPluginFacadeI.java @@ -11,8 +11,7 @@ package IceInternal; public class ProtocolPluginFacadeI implements ProtocolPluginFacade { - public - ProtocolPluginFacadeI(Ice.Communicator communicator) + public ProtocolPluginFacadeI(Ice.Communicator communicator) { _communicator = communicator; _instance = Util.getInstance(communicator); @@ -22,82 +21,15 @@ public class ProtocolPluginFacadeI implements ProtocolPluginFacade // Get the Communicator instance with which this facade is // associated. // - public Ice.Communicator - getCommunicator() + public Ice.Communicator getCommunicator() { return _communicator; } // - // Get the endpoint host resolver. - // - public EndpointHostResolver - getEndpointHostResolver() - { - return _instance.endpointHostResolver(); - } - - // - // Get the protocol support. - // - public int - getProtocolSupport() - { - return _instance.protocolSupport(); - } - - public boolean - getPreferIPv6() - { - return _instance.preferIPv6(); - } - - // - // Get the network proxy. - // - public NetworkProxy getNetworkProxy() - { - return _instance.networkProxy(); - } - - // - // Get the default encoding to be used in endpoints. - // - public Ice.EncodingVersion - getDefaultEncoding() - { - return _instance.defaultsAndOverrides().defaultEncoding; - } - - // - // Get the default hostname to be used in endpoints. - // - public String - getDefaultHost() - { - return _instance.defaultsAndOverrides().defaultHost; - } - - // - // Get the network trace level and category name. - // - public int - getNetworkTraceLevel() - { - return _instance.traceLevels().network; - } - - public String - getNetworkTraceCategory() - { - return _instance.traceLevels().networkCat; - } - - // // Register an EndpointFactory. // - public void - addEndpointFactory(EndpointFactory factory) + public void addEndpointFactory(EndpointFactory factory) { _instance.endpointFactoryManager().add(factory); } @@ -105,8 +37,7 @@ public class ProtocolPluginFacadeI implements ProtocolPluginFacade // // Register an EndpointFactory. // - public EndpointFactory - getEndpointFactory(short type) + public EndpointFactory getEndpointFactory(short type) { return _instance.endpointFactoryManager().get(type); } @@ -114,8 +45,7 @@ public class ProtocolPluginFacadeI implements ProtocolPluginFacade // // Look up a Java class by name. // - public Class<?> - findClass(String className) + public Class<?> findClass(String className) { return _instance.findClass(className); } diff --git a/java/src/IceInternal/RoutableReference.java b/java/src/IceInternal/RoutableReference.java index 563cbb90297..de9d002f8b4 100644 --- a/java/src/IceInternal/RoutableReference.java +++ b/java/src/IceInternal/RoutableReference.java @@ -283,6 +283,7 @@ public class RoutableReference extends Reference assert(_adapterId.length() == 0); for(EndpointI endpoint : _endpoints) { + s.writeShort(endpoint.type()); endpoint.streamWrite(s); } } diff --git a/java/src/IceInternal/Selector.java b/java/src/IceInternal/Selector.java index 2ab11238b66..b23fbacb1b1 100644 --- a/java/src/IceInternal/Selector.java +++ b/java/src/IceInternal/Selector.java @@ -15,10 +15,11 @@ public final class Selector { } - public - Selector(Instance instance) + public Selector(Instance instance) { _instance = instance; + _selecting = false; + _interrupted = false; try { @@ -36,8 +37,7 @@ public final class Selector _keys = _selector.selectedKeys(); } - public void - destroy() + public void destroy() { try { @@ -49,14 +49,12 @@ public final class Selector _selector = null; } - public void - initialize(EventHandler handler) + public void initialize(EventHandler handler) { updateImpl(handler); } - public void - update(EventHandler handler, int remove, int add) + public void update(EventHandler handler, int remove, int add) { int previous = handler._registered; handler._registered = handler._registered & ~remove; @@ -65,23 +63,11 @@ public final class Selector { return; } - updateImpl(handler); - if(handler.hasMoreData() && (handler._disabled & SocketOperation.Read) == 0) - { - if((add & SocketOperation.Read) != 0) - { - _pendingHandlers.add(handler); - } - if((remove & SocketOperation.Read) != 0) - { - _pendingHandlers.remove(handler); - } - } + updateImpl(handler); } - public void - enable(EventHandler handler, int status) + public void enable(EventHandler handler, int status) { if((handler._disabled & status) == 0) { @@ -92,180 +78,85 @@ public final class Selector if((handler._registered & status) != 0) { updateImpl(handler); - - if((status & SocketOperation.Read) != 0 && handler.hasMoreData()) - { - // Add back the pending handler if reads are enabled. - _pendingHandlers.add(handler); - } } } - public void - disable(EventHandler handler, int status) + public void disable(EventHandler handler, int status) { if((handler._disabled & status) != 0) { return; } handler._disabled = handler._disabled | status; - + if((handler._registered & status) != 0) { updateImpl(handler); - - if((status & SocketOperation.Read) != 0 && handler.hasMoreData()) - { - // Remove the pending handler if reads are disabled. - _pendingHandlers.remove(handler); - } } } - public void - finish(EventHandler handler) + public void finish(EventHandler handler) { - handler._registered = 0; - - if(handler._key != null) + if(handler._registered != 0) { - handler._key.cancel(); - handler._key = null; - } - - _changes.remove(handler); - _pendingHandlers.remove(handler); - } + if(handler._key != null) + { + handler._key.cancel(); + handler._key = null; + } - public void - startSelect() - { - assert(_changes.isEmpty()); + _changes.remove(handler); - // - // Don't set _selecting = true if there are pending handlers, select() won't block - // and will just call selectNow(). - // - if(_pendingHandlers.isEmpty()) - { - _selecting = true; + update(handler, handler._registered, SocketOperation.None); } } - public void - finishSelect(java.util.List<EventHandler> handlers, long timeout) + public void startSelect() { - _selecting = false; - handlers.clear(); - - if(!_changes.isEmpty()) - { - for(EventHandler h : _changes) - { - updateImpl(h); - } - _changes.clear(); - } - else if(_keys.isEmpty() && _pendingHandlers.isEmpty() && timeout <= 0) + if(_interrupted) { - // - // This is necessary to prevent a busy loop in case of a spurious wake-up which - // sometime occurs in the client thread pool when the communicator is destroyed. - // If there are too many successive spurious wake-ups, we log an error. - // - try - { - Thread.sleep(1); - } - catch(java.lang.InterruptedException ex) - { - } - - if(++_spuriousWakeUp > 100) - { - _spuriousWakeUp = 0; - _instance.initializationData().logger.warning("spurious selector wake up"); - } - return; - } - - _spuriousWakeUp = 0; + _interrupted = false; - for(java.nio.channels.SelectionKey key : _keys) - { - EventHandler handler = (EventHandler)key.attachment(); - try - { - // - // It's important to check for interestOps here because the event handler - // registration might have changed above when _changes was processed. We - // don't want to return event handlers which aren't interested anymore in - // a given operation. - // - handler._ready = fromJavaOps(key.readyOps() & key.interestOps()); - if(handler.hasMoreData() && _pendingHandlers.remove(handler)) - { - handler._ready |= SocketOperation.Read; - } - handlers.add(handler); - } - catch(java.nio.channels.CancelledKeyException ex) + if(!_changes.isEmpty()) { - assert(handler._registered == 0); + updateSelector(); } } - _keys.clear(); + _selecting = true; + } - for(EventHandler handler : _pendingHandlers) - { - if(handler.hasMoreData()) - { - handler._ready = SocketOperation.Read; - handlers.add(handler); - } - } - _pendingHandlers.clear(); + public void finishSelect() + { + _selecting = false; } - public void - select(long timeout) + public void select(java.util.List<EventHandlerOpPair> handlers, long timeout) throws TimeoutException { while(true) { try { - // - // Only block if _selecting = true, otherwise we call selectNow() to retrieve new - // ready handlers and process handlers from _pendingHandlers. - // - if(_selecting) + if(timeout > 0) { - if(timeout > 0) + // + // NOTE: On some platforms, select() sometime returns slightly before + // the timeout (at least according to our monotonic time). To make sure + // timeouts are correctly detected, we wait for a little longer than + // the configured timeout (10ms). + // + long before = IceInternal.Time.currentMonotonicTimeMillis(); + if(_selector.select(timeout * 1000 + 10) == 0) { - // - // NOTE: On some platforms, select() sometime returns slightly before - // the timeout (at least according to our monotonic time). To make sure - // timeouts are correctly detected, we wait for a little longer than - // the configured timeout (10ms). - // - long before = IceInternal.Time.currentMonotonicTimeMillis(); - if(_selector.select(timeout * 1000 + 10) == 0) + if(IceInternal.Time.currentMonotonicTimeMillis() - before >= timeout * 1000) { - if(IceInternal.Time.currentMonotonicTimeMillis() - before >= timeout * 1000) - { - throw new TimeoutException(); - } + throw new TimeoutException(); } } - else - { - _selector.select(); - } } else { - _selector.selectNow(); + _selector.select(); } } catch(java.nio.channels.CancelledKeyException ex) @@ -299,63 +190,111 @@ public final class Selector break; } - } - public void - hasMoreData(EventHandler handler) - { - assert(!_selecting && handler.hasMoreData()); + handlers.clear(); - // - // Only add the handler if read is still registered and enabled. - // - if((handler._registered & ~handler._disabled & SocketOperation.Read) != 0) + if(_interrupted) // Interrupted, we have to process the interrupt before returning any handlers + { + return; + } + + if(_keys.isEmpty() && timeout <= 0) + { + // + // This is necessary to prevent a busy loop in case of a spurious wake-up which + // sometime occurs in the client thread pool when the communicator is destroyed. + // If there are too many successive spurious wake-ups, we log an error. + // + try + { + Thread.sleep(1); + } + catch(java.lang.InterruptedException ex) + { + } + + if(++_spuriousWakeUp > 100) + { + _spuriousWakeUp = 0; + _instance.initializationData().logger.warning("spurious selector wake up"); + } + return; + } + + _spuriousWakeUp = 0; + + for(java.nio.channels.SelectionKey key : _keys) { - _pendingHandlers.add(handler); + EventHandler handler = (EventHandler)key.attachment(); + try + { + // + // Use the intersection of readyOps and interestOps because we only want to + // report the operations in which the handler is still interested. + // + final int op = fromJavaOps(key.readyOps() & key.interestOps()); + handlers.add(new EventHandlerOpPair(handler, op)); + } + catch(java.nio.channels.CancelledKeyException ex) + { + assert(handler._registered == 0); + } } + _keys.clear(); } - private void - updateImpl(EventHandler handler) + private void updateImpl(EventHandler handler) { + _changes.add(handler); if(_selecting) { - // - // Queue the change since we can't change the selection key interest ops while a select - // operation is in progress (it could block depending on the underlying implementaiton - // of the Java selector). - // - if(_changes.isEmpty()) + if(!_interrupted) { + // + // We can't change the selection key interest ops while a select operation is in progress + // (it could block depending on the underlying implementation of the Java selector). + // + // Wake up the selector if necessary. + // _selector.wakeup(); + _interrupted = true; } - _changes.add(handler); - return; } + else + { + updateSelector(); + } + } - int ops = toJavaOps(handler, handler._registered & ~handler._disabled); - if(handler._key == null) + private void updateSelector() + { + for(EventHandler handler : _changes) { - if(handler._registered != 0) + int status = handler._registered & ~handler._disabled; + int ops = toJavaOps(handler, status); + if(handler._key == null) { - try + if(handler._registered != 0) { - handler._key = handler.fd().register(_selector, ops, handler); - } - catch(java.nio.channels.ClosedChannelException ex) - { - assert(false); + try + { + handler._key = handler.fd().register(_selector, ops, handler); + } + catch(java.nio.channels.ClosedChannelException ex) + { + assert(false); + } } } + else + { + handler._key.interestOps(ops); + } } - else - { - handler._key.interestOps(ops); - } + _changes.clear(); } - int - toJavaOps(EventHandler handler, int o) + int toJavaOps(EventHandler handler, int o) { int op = 0; if((o & SocketOperation.Read) != 0) @@ -380,8 +319,7 @@ public final class Selector return op; } - int - fromJavaOps(int o) + int fromJavaOps(int o) { int op = 0; if((o & (java.nio.channels.SelectionKey.OP_READ | java.nio.channels.SelectionKey.OP_ACCEPT)) != 0) @@ -399,7 +337,6 @@ public final class Selector return op; } - final private Instance _instance; private java.nio.channels.Selector _selector; @@ -408,5 +345,6 @@ public final class Selector private java.util.HashSet<EventHandler> _changes = new java.util.HashSet<EventHandler>(); private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>(); private boolean _selecting; + private boolean _interrupted; private int _spuriousWakeUp; } diff --git a/java/src/IceInternal/TcpAcceptor.java b/java/src/IceInternal/TcpAcceptor.java index 301577f3286..d8f35bb625a 100644 --- a/java/src/IceInternal/TcpAcceptor.java +++ b/java/src/IceInternal/TcpAcceptor.java @@ -20,10 +20,10 @@ class TcpAcceptor implements Acceptor public void close() { - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { String s = "stopping to accept tcp connections at " + toString(); - _logger.trace(_traceLevels.networkCat, s); + _instance.logger().trace(_instance.traceCategory(), s); } assert(_fd != null); @@ -36,7 +36,7 @@ class TcpAcceptor implements Acceptor { // Nothing to do. - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { StringBuffer s = new StringBuffer("listening for tcp connections at "); s.append(toString()); @@ -49,7 +49,7 @@ class TcpAcceptor implements Acceptor s.append("\nlocal interfaces: "); s.append(IceUtilInternal.StringUtil.joinString(interfaces, ", ")); } - _logger.trace(_traceLevels.networkCat, s.toString()); + _instance.logger().trace(_instance.traceCategory(), s.toString()); } } @@ -58,18 +58,24 @@ class TcpAcceptor implements Acceptor { java.nio.channels.SocketChannel fd = Network.doAccept(_fd); Network.setBlock(fd, false); - Network.setTcpBufSize(fd, _instance.initializationData().properties, _logger); + Network.setTcpBufSize(fd, _instance.properties(), _instance.logger()); - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { String s = "accepted tcp connection\n" + Network.fdToString(fd); - _logger.trace(_traceLevels.networkCat, s); + _instance.logger().trace(_instance.traceCategory(), s); } return new TcpTransceiver(_instance, fd); } public String + protocol() + { + return _instance.protocol(); + } + + public String toString() { return Network.addrToString(_addr); @@ -81,18 +87,16 @@ class TcpAcceptor implements Acceptor return _addr.getPort(); } - TcpAcceptor(Instance instance, String host, int port) + TcpAcceptor(ProtocolInstance instance, String host, int port) { _instance = instance; - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; - _backlog = instance.initializationData().properties.getPropertyAsIntWithDefault("Ice.TCP.Backlog", 511); + _backlog = instance.properties().getPropertyAsIntWithDefault("Ice.TCP.Backlog", 511); try { _fd = Network.createTcpServerSocket(); Network.setBlock(_fd, false); - Network.setTcpBufSize(_fd, _instance.initializationData().properties, _logger); + Network.setTcpBufSize(_fd, instance.properties(), _instance.logger()); if(!System.getProperty("os.name").startsWith("Windows")) { // @@ -110,11 +114,11 @@ class TcpAcceptor implements Acceptor // Network.setReuseAddress(_fd, true); } - _addr = Network.getAddressForServer(host, port, _instance.protocolSupport(), _instance.preferIPv6()); - if(_traceLevels.network >= 2) + _addr = Network.getAddressForServer(host, port, instance.protocolSupport(), instance.preferIPv6()); + if(instance.traceLevel() >= 2) { String s = "attempting to bind to tcp socket " + toString(); - _logger.trace(_traceLevels.networkCat, s); + instance.logger().trace(instance.traceCategory(), s); } _addr = Network.doBind(_fd, _addr, _backlog); } @@ -142,9 +146,7 @@ class TcpAcceptor implements Acceptor } } - private Instance _instance; - private TraceLevels _traceLevels; - private Ice.Logger _logger; + private ProtocolInstance _instance; private java.nio.channels.ServerSocketChannel _fd; private int _backlog; private java.net.InetSocketAddress _addr; diff --git a/java/src/IceInternal/TcpConnector.java b/java/src/IceInternal/TcpConnector.java index e4289f27a17..4e1ff9ff5e1 100644 --- a/java/src/IceInternal/TcpConnector.java +++ b/java/src/IceInternal/TcpConnector.java @@ -11,49 +11,45 @@ package IceInternal; final class TcpConnector implements Connector { - public Transceiver - connect() + public Transceiver connect() { - if(_traceLevels.network >= 2) + if(_instance.traceLevel() >= 2) { - String s = "trying to establish tcp connection to " + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "trying to establish " + _instance.protocol() + " connection to " + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } try { java.nio.channels.SocketChannel fd = Network.createTcpSocket(); Network.setBlock(fd, false); - Network.setTcpBufSize(fd, _instance.initializationData().properties, _logger); + Network.setTcpBufSize(fd, _instance.properties(), _instance.logger()); final java.net.InetSocketAddress addr = _proxy != null ? _proxy.getAddress() : _addr; Network.doConnect(fd, addr); return new TcpTransceiver(_instance, fd, _proxy, _addr); } catch(Ice.LocalException ex) { - if(_traceLevels.network >= 2) + if(_instance.traceLevel() >= 2) { - String s = "failed to establish tcp connection to " + toString() + "\n" + ex; - _logger.trace(_traceLevels.networkCat, s); + String s = "failed to establish " + _instance.protocol() + " connection to " + toString() + "\n" + ex; + _instance.logger().trace(_instance.traceCategory(), s); } throw ex; } } - public short - type() + public short type() { - return Ice.TCPEndpointType.value; + return _instance.type(); } - public String - toString() + public String toString() { return Network.addrToString(_proxy == null ? _addr : _proxy.getAddress()); } - public int - hashCode() + public int hashCode() { return _hashCode; } @@ -61,12 +57,10 @@ final class TcpConnector implements Connector // // Only for use by TcpEndpoint // - TcpConnector(Instance instance, java.net.InetSocketAddress addr, NetworkProxy proxy, int timeout, + TcpConnector(ProtocolInstance instance, java.net.InetSocketAddress addr, NetworkProxy proxy, int timeout, String connectionId) { _instance = instance; - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; _addr = addr; _proxy = proxy; _timeout = timeout; @@ -79,8 +73,7 @@ final class TcpConnector implements Connector _hashCode = IceInternal.HashUtil.hashAdd(_hashCode , _connectionId); } - public boolean - equals(java.lang.Object obj) + public boolean equals(java.lang.Object obj) { if(!(obj instanceof TcpConnector)) { @@ -106,9 +99,7 @@ final class TcpConnector implements Connector return Network.compareAddress(_addr, p._addr) == 0; } - private Instance _instance; - private TraceLevels _traceLevels; - private Ice.Logger _logger; + private ProtocolInstance _instance; private java.net.InetSocketAddress _addr; private NetworkProxy _proxy; private int _timeout; diff --git a/java/src/IceInternal/TcpEndpointFactory.java b/java/src/IceInternal/TcpEndpointFactory.java index 6d0df0e77c3..d50fb5c37c7 100644 --- a/java/src/IceInternal/TcpEndpointFactory.java +++ b/java/src/IceInternal/TcpEndpointFactory.java @@ -11,40 +11,42 @@ package IceInternal; final class TcpEndpointFactory implements EndpointFactory { - TcpEndpointFactory(Instance instance) + TcpEndpointFactory(ProtocolInstance instance) { _instance = instance; } - public short - type() + public short type() { - return Ice.TCPEndpointType.value; + return _instance.type(); } - public String - protocol() + public String protocol() { - return "tcp"; + return _instance.protocol(); } - public EndpointI - create(String str, boolean oaEndpoint) + public EndpointI create(java.util.ArrayList<String> args, boolean oaEndpoint) { - return new TcpEndpointI(_instance, str, oaEndpoint); + IPEndpointI endpt = new TcpEndpointI(_instance); + endpt.initWithOptions(args, oaEndpoint); + return endpt; } - public EndpointI - read(BasicStream s) + public EndpointI read(BasicStream s) { - return new TcpEndpointI(s); + return new TcpEndpointI(_instance, s); } - public void - destroy() + public void destroy() { _instance = null; } - private Instance _instance; + public EndpointFactory clone(ProtocolInstance instance) + { + return new TcpEndpointFactory(instance); + } + + private ProtocolInstance _instance; } diff --git a/java/src/IceInternal/TcpEndpointI.java b/java/src/IceInternal/TcpEndpointI.java index 9e6594b20ad..6fb7a7e9ce1 100644 --- a/java/src/IceInternal/TcpEndpointI.java +++ b/java/src/IceInternal/TcpEndpointI.java @@ -9,284 +9,61 @@ package IceInternal; -final class TcpEndpointI extends EndpointI +final class TcpEndpointI extends IPEndpointI { - public - TcpEndpointI(Instance instance, String ho, int po, int ti, String conId, boolean co) + public TcpEndpointI(ProtocolInstance instance, String ho, int po, int ti, String conId, boolean co) { - super(conId); - _instance = instance; - _host = ho; - _port = po; + super(instance, ho, po, conId); _timeout = ti; _compress = co; - calcHashValue(); } - public - TcpEndpointI(Instance instance, String str, boolean oaEndpoint) + public TcpEndpointI(ProtocolInstance instance) { - super(""); - _instance = instance; - _host = null; - _port = 0; + super(instance); _timeout = -1; _compress = false; - - String[] arr = str.split("[ \t\n\r]+"); - - int i = 0; - while(i < arr.length) - { - if(arr[i].length() == 0) - { - i++; - continue; - } - - String option = arr[i++]; - if(option.length() != 2 || option.charAt(0) != '-') - { - throw new Ice.EndpointParseException("expected an endpoint option but found `" + option + - "' in endpoint `tcp " + str + "'"); - } - - String argument = null; - if(i < arr.length && arr[i].charAt(0) != '-') - { - argument = arr[i++]; - if(argument.charAt(0) == '\"' && argument.charAt(argument.length() - 1) == '\"') - { - argument = argument.substring(1, argument.length() - 1); - } - } - - switch(option.charAt(1)) - { - case 'h': - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for -h option in endpoint `tcp " - + str + "'"); - } - - _host = argument; - break; - } - - case 'p': - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for -p option in endpoint `tcp " - + str + "'"); - } - - try - { - _port = Integer.parseInt(argument); - } - catch(NumberFormatException ex) - { - throw new Ice.EndpointParseException("invalid port value `" + argument + - "' in endpoint `tcp " + str + "'"); - } - - if(_port < 0 || _port > 65535) - { - throw new Ice.EndpointParseException("port value `" + argument + - "' out of range in endpoint `tcp " + str + "'"); - } - - break; - } - - case 't': - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for -t option in endpoint `tcp " - + str + "'"); - } - - try - { - _timeout = Integer.parseInt(argument); - } - catch(NumberFormatException ex) - { - throw new Ice.EndpointParseException("invalid timeout value `" + argument + - "' in endpoint `tcp " + str + "'"); - } - - break; - } - - case 'z': - { - if(argument != null) - { - throw new Ice.EndpointParseException("unexpected argument `" + argument + - "' provided for -z option in `tcp " + str + "'"); - } - - _compress = true; - break; - } - - default: - { - throw new Ice.EndpointParseException("unknown option `" + option + "' in `tcp " + str + "'"); - } - } - } - - if(_host == null) - { - _host = _instance.defaultsAndOverrides().defaultHost; - } - else if(_host.equals("*")) - { - if(oaEndpoint) - { - _host = null; - } - else - { - throw new Ice.EndpointParseException("`-h *' not valid for proxy endpoint `tcp " + str + "'"); - } - } - - if(_host == null) - { - _host = ""; - } - - calcHashValue(); } - public - TcpEndpointI(BasicStream s) + public TcpEndpointI(ProtocolInstance instance, BasicStream s) { - super(""); - _instance = s.instance(); - s.startReadEncaps(); - _host = s.readString(); - _port = s.readInt(); + super(instance, s); _timeout = s.readInt(); _compress = s.readBool(); - s.endReadEncaps(); - calcHashValue(); - } - - // - // Marshal the endpoint - // - public void - streamWrite(BasicStream s) - { - s.writeShort(Ice.TCPEndpointType.value); - s.startWriteEncaps(); - s.writeString(_host); - s.writeInt(_port); - s.writeInt(_timeout); - s.writeBool(_compress); - s.endWriteEncaps(); - } - - // - // Convert the endpoint to its string form - // - public String - _toString() - { - // - // WARNING: Certain features, such as proxy validation in Glacier2, - // depend on the format of proxy strings. Changes to toString() and - // methods called to generate parts of the reference string could break - // these features. Please review for all features that depend on the - // format of proxyToString() before changing this and related code. - // - String s = "tcp"; - - if(_host != null && _host.length() > 0) - { - s += " -h "; - boolean addQuote = _host.indexOf(':') != -1; - if(addQuote) - { - s += "\""; - } - s += _host; - if(addQuote) - { - s += "\""; - } - } - - s += " -p " + _port; - - if(_timeout != -1) - { - s += " -t " + _timeout; - } - if(_compress) - { - s += " -z"; - } - return s; } // // Return the endpoint information. // - public Ice.EndpointInfo - getInfo() + public Ice.EndpointInfo getInfo() { - return new Ice.TCPEndpointInfo(_timeout, _compress, _host, _port) + Ice.TCPEndpointInfo info = new Ice.TCPEndpointInfo() { public short type() { - return Ice.TCPEndpointType.value; + return TcpEndpointI.this.type(); } - + public boolean datagram() { - return false; + return TcpEndpointI.this.datagram(); } - + public boolean secure() { - return false; + return TcpEndpointI.this.secure(); } - }; - } - - // - // Return the endpoint type - // - public short - type() - { - return Ice.TCPEndpointType.value; - } + }; - // - // Return the protocol name - // - public String - protocol() - { - return "tcp"; + fillEndpointInfo(info); + return info; } // // Return the timeout for the endpoint in milliseconds. 0 means // non-blocking, -1 means no timeout. // - public int - timeout() + public int timeout() { return _timeout; } @@ -296,8 +73,7 @@ final class TcpEndpointI extends EndpointI // that timeouts are supported by the endpoint. Otherwise the same // endpoint is returned. // - public EndpointI - timeout(int timeout) + public EndpointI timeout(int timeout) { if(timeout == _timeout) { @@ -310,27 +86,10 @@ final class TcpEndpointI extends EndpointI } // - // Return a new endpoint with a different connection id. - // - public EndpointI - connectionId(String connectionId) - { - if(connectionId.equals(_connectionId)) - { - return this; - } - else - { - return new TcpEndpointI(_instance, _host, _port, _timeout, connectionId, _compress); - } - } - - // // Return true if the endpoints support bzip2 compress, or false // otherwise. // - public boolean - compress() + public boolean compress() { return _compress; } @@ -340,8 +99,7 @@ final class TcpEndpointI extends EndpointI // provided that compression is supported by the // endpoint. Otherwise the same endpoint is returned. // - public EndpointI - compress(boolean compress) + public EndpointI compress(boolean compress) { if(compress == _compress) { @@ -356,8 +114,7 @@ final class TcpEndpointI extends EndpointI // // Return true if the endpoint is datagram-based. // - public boolean - datagram() + public boolean datagram() { return false; } @@ -365,8 +122,7 @@ final class TcpEndpointI extends EndpointI // // Return true if the endpoint is secure. // - public boolean - secure() + public boolean secure() { return false; } @@ -378,103 +134,54 @@ final class TcpEndpointI extends EndpointI // "effective" endpoint, which might differ from this endpoint, // for example, if a dynamic port number is assigned. // - public Transceiver - transceiver(EndpointIHolder endpoint) + public Transceiver transceiver(EndpointIHolder endpoint) { endpoint.value = this; return null; } // - // Return connectors for this endpoint, or empty list if no connector - // is available. - // - public java.util.List<Connector> - connectors(Ice.EndpointSelectionType selType) - { - return _instance.endpointHostResolver().resolve(_host, _port, selType, this); - } - - public void - connectors_async(Ice.EndpointSelectionType selType, EndpointI_connectors callback) - { - _instance.endpointHostResolver().resolve(_host, _port, selType, this, callback); - } - - // // Return an acceptor for this endpoint, or null if no acceptors // is available. In case an acceptor is created, this operation // also returns a new "effective" endpoint, which might differ // from this endpoint, for example, if a dynamic port number is // assigned. // - public Acceptor - acceptor(EndpointIHolder endpoint, String adapterName) + public Acceptor acceptor(EndpointIHolder endpoint, String adapterName) { TcpAcceptor p = new TcpAcceptor(_instance, _host, _port); - endpoint.value = new TcpEndpointI(_instance, _host, p.effectivePort(), _timeout, _connectionId, _compress); + endpoint.value = createEndpoint(_host, p.effectivePort(), _connectionId); return p; } - // - // Expand endpoint out in to separate endpoints for each local - // host if listening on INADDR_ANY. - // - public java.util.List<EndpointI> - expand() + public String options() { - java.util.List<EndpointI> endps = new java.util.ArrayList<EndpointI>(); - java.util.List<String> hosts = Network.getHostsForEndpointExpand(_host, _instance.protocolSupport(), false); - if(hosts == null || hosts.isEmpty()) - { - endps.add(this); - } - else - { - for(String h : hosts) - { - endps.add(new TcpEndpointI(_instance, h, _port, _timeout, _connectionId, _compress)); - } - } - return endps; - } + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + String s = super.options(); - // - // Check whether the endpoint is equivalent to another one. - // - public boolean - equivalent(EndpointI endpoint) - { - if(!(endpoint instanceof TcpEndpointI)) + if(_timeout != -1) { - return false; + s += " -t " + _timeout; } - TcpEndpointI tcpEndpointI = (TcpEndpointI)endpoint; - return tcpEndpointI._host.equals(_host) && tcpEndpointI._port == _port; - } - public java.util.List<Connector> - connectors(java.util.List<java.net.InetSocketAddress> addresses, NetworkProxy proxy) - { - java.util.List<Connector> connectors = new java.util.ArrayList<Connector>(); - for(java.net.InetSocketAddress p : addresses) + if(_compress) { - connectors.add(new TcpConnector(_instance, p, proxy, _timeout, _connectionId)); + s += " -z"; } - return connectors; - } - public int - hashCode() - { - return _hashCode; + return s; } // // Compare endpoints for sorting purposes // - public int - compareTo(EndpointI obj) // From java.lang.Comparable + public int compareTo(EndpointI obj) // From java.lang.Comparable { if(!(obj instanceof TcpEndpointI)) { @@ -486,23 +193,6 @@ final class TcpEndpointI extends EndpointI { return 0; } - else - { - int r = super.compareTo(p); - if(r != 0) - { - return r; - } - } - - if(_port < p._port) - { - return -1; - } - else if(p._port < _port) - { - return 1; - } if(_timeout < p._timeout) { @@ -522,26 +212,94 @@ final class TcpEndpointI extends EndpointI return 1; } - return _host.compareTo(p._host); + return super.compareTo(obj); } - private void - calcHashValue() + protected void streamWriteImpl(BasicStream s) { - int h = 5381; - h = IceInternal.HashUtil.hashAdd(h, Ice.TCPEndpointType.value); - h = IceInternal.HashUtil.hashAdd(h, _host); - h = IceInternal.HashUtil.hashAdd(h, _port); + super.streamWriteImpl(s); + s.writeInt(_timeout); + s.writeBool(_compress); + } + + protected int hashInit(int h) + { + h = super.hashInit(h); h = IceInternal.HashUtil.hashAdd(h, _timeout); - h = IceInternal.HashUtil.hashAdd(h, _connectionId); h = IceInternal.HashUtil.hashAdd(h, _compress); - _hashCode = h; + return h; + } + + protected void fillEndpointInfo(Ice.IPEndpointInfo info) + { + super.fillEndpointInfo(info); + if(info instanceof Ice.TCPEndpointInfo) + { + Ice.TCPEndpointInfo tcpInfo = (Ice.TCPEndpointInfo)info; + tcpInfo.timeout = _timeout; + tcpInfo.compress = _compress; + } + } + + protected boolean checkOption(String option, String argument, String endpoint) + { + if(super.checkOption(option, argument, endpoint)) + { + return true; + } + + switch(option.charAt(1)) + { + case 't': + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for -t option in endpoint " + endpoint); + } + + try + { + _timeout = Integer.parseInt(argument); + } + catch(NumberFormatException ex) + { + throw new Ice.EndpointParseException("invalid timeout value `" + argument + + "' in endpoint " + endpoint); + } + + return true; + } + + case 'z': + { + if(argument != null) + { + throw new Ice.EndpointParseException("unexpected argument `" + argument + + "' provided for -z option in " + endpoint); + } + + _compress = true; + + return true; + } + + default: + { + return false; + } + } + } + + protected Connector createConnector(java.net.InetSocketAddress addr, NetworkProxy proxy) + { + return new TcpConnector(_instance, addr, proxy, _timeout, _connectionId); + } + + protected IPEndpointI createEndpoint(String host, int port, String connectionId) + { + return new TcpEndpointI(_instance, host, port, _timeout, connectionId, _compress); } - private Instance _instance; - private String _host; - private int _port; private int _timeout; private boolean _compress; - private int _hashCode; } diff --git a/java/src/IceInternal/TcpTransceiver.java b/java/src/IceInternal/TcpTransceiver.java index 7122f8ff7cf..fd8bcc9ba54 100644 --- a/java/src/IceInternal/TcpTransceiver.java +++ b/java/src/IceInternal/TcpTransceiver.java @@ -11,15 +11,13 @@ package IceInternal; final class TcpTransceiver implements Transceiver { - public java.nio.channels.SelectableChannel - fd() + public java.nio.channels.SelectableChannel fd() { assert(_fd != null); return _fd; } - public int - initialize(Buffer readBuffer, Buffer writeBuffer) + public int initialize(Buffer readBuffer, Buffer writeBuffer, Ice.BooleanHolder moreData) { try { @@ -28,7 +26,7 @@ final class TcpTransceiver implements Transceiver _state = StateConnectPending; return SocketOperation.Connect; } - else if(_state == StateConnectPending) + else if(_state <= StateConnectPending) { Network.doFinishConnect(_fd); _desc = Network.fdToString(_fd, _proxy, _addr); @@ -44,7 +42,7 @@ final class TcpTransceiver implements Transceiver // // Write the proxy connection message. // - if(write(writeBuffer)) + if(write(writeBuffer) == SocketOperation.None) { // // Write completed without blocking. @@ -54,8 +52,7 @@ final class TcpTransceiver implements Transceiver // // Try to read the response. // - Ice.BooleanHolder dummy = new Ice.BooleanHolder(); - if(read(readBuffer, dummy)) + if(read(readBuffer, moreData) == SocketOperation.None) { // // Read completed without blocking - fall through. @@ -103,32 +100,38 @@ final class TcpTransceiver implements Transceiver } catch(Ice.LocalException ex) { - if(_traceLevels.network >= 2) + if(_instance.traceLevel() >= 2) { StringBuilder s = new StringBuilder(128); - s.append("failed to establish tcp connection\n"); + s.append("failed to establish " + _instance.protocol() + " connection\n"); s.append(Network.fdToString(_fd, _proxy, _addr)); - _logger.trace(_traceLevels.networkCat, s.toString()); + _instance.logger().trace(_instance.traceCategory(), s.toString()); } throw ex; } assert(_state == StateConnected); - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - String s = "tcp connection established\n" + _desc; - _logger.trace(_traceLevels.networkCat, s); + String s = _instance.protocol() + " connection established\n" + _desc; + _instance.logger().trace(_instance.traceCategory(), s); } return SocketOperation.None; } - public void - close() + public int closing(boolean initiator, Ice.LocalException ex) { - if(_state == StateConnected && _traceLevels.network >= 1) + // If we are initiating the connection closure, wait for the peer + // to close the TCP/IP connection. Otherwise, close immediately. + return initiator ? SocketOperation.Read : SocketOperation.None; + } + + public void close() + { + if(_state == StateConnected && _instance.traceLevel() >= 1) { - String s = "closing tcp connection\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "closing " + _instance.protocol() + " connection\n" + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } assert(_fd != null); @@ -147,22 +150,26 @@ final class TcpTransceiver implements Transceiver } @SuppressWarnings("deprecation") - public boolean - write(Buffer buf) + public int write(Buffer buf) { + final int size = buf.b.limit(); + int packetSize = size - buf.b.position(); + + if(packetSize == 0) + { + return SocketOperation.None; + } + // - // We don't want write to be called on android main thread as this will cause - // NetworkOnMainThreadException to be thrown. If that is the android main thread - // we return false and this method will be later called from the thread pool. + // We don't want write to be called on Android's main thread as this will cause + // NetworkOnMainThreadException to be thrown. If this is the Android main thread + // we return false and this method will be called later from the thread pool. // if(Util.isAndroidMainThread(Thread.currentThread())) { - return false; + return SocketOperation.Write; } - final int size = buf.b.limit(); - int packetSize = size - buf.b.position(); - // // Limit packet size to avoid performance problems on WIN32 // @@ -177,8 +184,8 @@ final class TcpTransceiver implements Transceiver try { assert(_fd != null); - int ret = _fd.write(buf.b); + int ret = _fd.write(buf.b); if(ret == -1) { throw new Ice.ConnectionLostException(); @@ -186,20 +193,21 @@ final class TcpTransceiver implements Transceiver else if(ret == 0) { // - // Writing would block, so we reset the limit (if necessary) and return false to indicate + // Writing would block, so we reset the limit (if necessary) and indicate // that more data must be sent. // if(packetSize == _maxSendPacketSize) { buf.b.limit(size); } - return false; + return SocketOperation.Write; } - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { - String s = "sent " + ret + " of " + packetSize + " bytes via tcp\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "sent " + ret + " of " + packetSize + " bytes via " + _instance.protocol() + "\n" + + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } if(packetSize == _maxSendPacketSize) @@ -222,15 +230,18 @@ final class TcpTransceiver implements Transceiver throw new Ice.SocketException(ex); } } - return true; + + return SocketOperation.None; } @SuppressWarnings("deprecation") - public boolean - read(Buffer buf, Ice.BooleanHolder moreData) + public int read(Buffer buf, Ice.BooleanHolder moreData) { int packetSize = buf.b.remaining(); - moreData.value = false; + if(packetSize == 0) + { + return SocketOperation.None; + } while(buf.b.hasRemaining()) { @@ -246,15 +257,16 @@ final class TcpTransceiver implements Transceiver if(ret == 0) { - return false; + return SocketOperation.Read; } if(ret > 0) { - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { - String s = "received " + ret + " of " + packetSize + " bytes via tcp\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "received " + ret + " of " + packetSize + " bytes via " + _instance.protocol() + + "\n" + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } } @@ -270,23 +282,20 @@ final class TcpTransceiver implements Transceiver } } - return true; + return SocketOperation.None; } - public String - type() + public String protocol() { - return "tcp"; + return _instance.protocol(); } - public String - toString() + public String toString() { return _desc; } - public Ice.ConnectionInfo - getInfo() + public Ice.ConnectionInfo getInfo() { Ice.TCPConnectionInfo info = new Ice.TCPConnectionInfo(); if(_fd != null) @@ -303,8 +312,7 @@ final class TcpTransceiver implements Transceiver return info; } - public void - checkSendSize(Buffer buf, int messageSizeMax) + public void checkSendSize(Buffer buf, int messageSizeMax) { if(buf.size() > messageSizeMax) { @@ -313,14 +321,13 @@ final class TcpTransceiver implements Transceiver } @SuppressWarnings("deprecation") - TcpTransceiver(Instance instance, java.nio.channels.SocketChannel fd, NetworkProxy proxy, + TcpTransceiver(ProtocolInstance instance, java.nio.channels.SocketChannel fd, NetworkProxy proxy, java.net.InetSocketAddress addr) { + _instance = instance; _fd = fd; _proxy = proxy; _addr = addr; - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; _state = StateNeedConnect; _desc = ""; @@ -341,11 +348,10 @@ final class TcpTransceiver implements Transceiver } @SuppressWarnings("deprecation") - TcpTransceiver(Instance instance, java.nio.channels.SocketChannel fd) + TcpTransceiver(ProtocolInstance instance, java.nio.channels.SocketChannel fd) { + _instance = instance; _fd = fd; - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; _state = StateConnected; _desc = Network.fdToString(_fd); @@ -382,14 +388,13 @@ final class TcpTransceiver implements Transceiver } } + private ProtocolInstance _instance; private java.nio.channels.SocketChannel _fd; private NetworkProxy _proxy; private java.net.InetSocketAddress _addr; - private TraceLevels _traceLevels; - private Ice.Logger _logger; - private String _desc; private int _state; + private String _desc; private int _maxSendPacketSize; private static final int StateNeedConnect = 0; diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 03e3713fe93..8ab584e9623 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -25,7 +25,7 @@ public final class ThreadPool } } } - + static final class FinishedWorkItem implements ThreadPoolWorkItem { public @@ -60,6 +60,16 @@ public final class ThreadPool private final EventHandlerThread _thread; } + static final class InterruptWorkItem implements ThreadPoolWorkItem + { + public void execute(ThreadPoolCurrent current) + { + // Nothing to do, this is just used to interrupt the thread pool selector. + } + } + + private static ThreadPoolWorkItem _interruptWorkItem = new InterruptWorkItem(); + // // Exception raised by the thread pool work queue when the thread pool // is destroyed. @@ -257,7 +267,33 @@ public final class ThreadPool update(EventHandler handler, int remove, int add) { assert(!_destroyed); + + // Don't remove what needs to be added + remove &= ~add; + + // Don't remove/add if already un-registered or registered + remove = handler._registered & remove; + add = ~handler._registered & add; + if(remove == add) + { + return; + } + _selector.update(handler, remove, add); + + if((add & SocketOperation.Read) != 0 && handler._hasMoreData.value && + (handler._disabled & SocketOperation.Read) == 0) + { + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); // Interrupt select() + } + _pendingHandlers.add(handler); + } + else if((remove & SocketOperation.Read) != 0) + { + _pendingHandlers.remove(handler); + } } public void @@ -312,6 +348,8 @@ public final class ThreadPool { ThreadPoolCurrent current = new ThreadPoolCurrent(_instance, this, thread); boolean select = false; + java.util.List<EventHandlerOpPair> handlers = new java.util.ArrayList<EventHandlerOpPair>(); + while(true) { if(current._handler != null) @@ -335,7 +373,7 @@ public final class ThreadPool { try { - _selector.select(_serverIdleTime); + _selector.select(handlers, _serverIdleTime); } catch(Selector.TimeoutException ex) { @@ -356,8 +394,25 @@ public final class ThreadPool { if(select) { - _selector.finishSelect(_handlers, _serverIdleTime); + java.util.List<EventHandlerOpPair> tmp = _handlers; + _handlers = handlers; + handlers = tmp; + + if(!_pendingHandlers.isEmpty()) + { + for(EventHandlerOpPair pair : _handlers) + { + _pendingHandlers.remove(pair.handler); + } + for(EventHandler p : _pendingHandlers) + { + _handlers.add(new EventHandlerOpPair(p, SocketOperation.Read)); + } + _pendingHandlers.clear(); + } + _nextHandler = _handlers.iterator(); + _selector.finishSelect(); select = false; } else if(!current._leader && followerWait(current)) @@ -375,9 +430,14 @@ public final class ThreadPool // --_inUseIO; - if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) + if(current._handler._hasMoreData.value && + (current._handler._registered & SocketOperation.Read) != 0) { - _selector.hasMoreData(current._handler); + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); + } + _pendingHandlers.add(current._handler); } } else @@ -386,7 +446,19 @@ public final class ThreadPool // If the handler called ioCompleted(), we re-enable the handler in // case it was disabled and we decrease the number of thread in use. // - _selector.enable(current._handler, current.operation); + if(_serialize) + { + _selector.enable(current._handler, current.operation); + if(current._handler._hasMoreData.value && + (current._handler._registered & SocketOperation.Read) != 0) + { + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); // Interrupt select() + } + _pendingHandlers.add(current._handler); + } + } assert(_inUse > 0); --_inUse; } @@ -396,20 +468,34 @@ public final class ThreadPool return; // Wait timed-out. } } - else if(!current._ioCompleted && - (current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) + else if(current._handler._hasMoreData.value && + (current._handler._registered & SocketOperation.Read) != 0) { - _selector.hasMoreData(current._handler); + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); // Interrupt select() + } + _pendingHandlers.add(current._handler); } // // Get the next ready handler. // - if(_nextHandler.hasNext()) + EventHandlerOpPair next = null; + while(_nextHandler.hasNext()) + { + EventHandlerOpPair n = _nextHandler.next(); + if((n.op & n.handler._registered) != 0) + { + next = n; + break; + } + } + if(next != null) { current._ioCompleted = false; - current._handler = _nextHandler.next(); - current.operation = current._handler._ready; + current._handler = next.handler; + current.operation = next.op; thread.setState(Ice.Instrumentation.ThreadState.ThreadStateInUseForIO); } else @@ -431,6 +517,7 @@ public final class ThreadPool } else { + _handlers.clear(); _selector.startSelect(); select = true; thread.setState(Ice.Instrumentation.ThreadState.ThreadStateIdle); @@ -462,17 +549,29 @@ public final class ThreadPool if(_sizeMax > 1) { --_inUseIO; - - if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) + + if(!_destroyed) { - _selector.hasMoreData(current._handler); + if(_serialize) + { + _selector.disable(current._handler, current.operation); + + // Make sure the handler isn't in the set of pending handlers (this can + // for example occur if the handler is has more data and its added by + // ThreadPool::update while we were processing IO). + _pendingHandlers.remove(current._handler); + } + else if(current._handler._hasMoreData.value && + (current._handler._registered & SocketOperation.Read) != 0) + { + if(_pendingHandlers.isEmpty()) + { + _workQueue.queue(_interruptWorkItem); // Interrupt select() + } + _pendingHandlers.add(current._handler); + } } - - if(_serialize && !_destroyed) - { - _selector.disable(current._handler, current.operation); - } - + if(current._leader) { // @@ -527,10 +626,6 @@ public final class ThreadPool } } } - else if((current.operation & SocketOperation.Read) != 0 && current._handler.hasMoreData()) - { - _selector.hasMoreData(current._handler); - } } private synchronized void @@ -563,7 +658,7 @@ public final class ThreadPool // // Wait to be promoted and for all the IO threads to be done. // - while(!_promote || _inUseIO == _sizeIO || !_nextHandler.hasNext() && _inUseIO > 0) + while(!_promote || _inUseIO == _sizeIO || (!_nextHandler.hasNext() && _inUseIO > 0)) { try { @@ -740,8 +835,9 @@ public final class ThreadPool private int _inUse; // Number of threads that are currently in use. private int _inUseIO; // Number of threads that are currently performing IO. - private java.util.List<EventHandler> _handlers = new java.util.ArrayList<EventHandler>(); - private java.util.Iterator<EventHandler> _nextHandler; + private java.util.List<EventHandlerOpPair> _handlers = new java.util.ArrayList<EventHandlerOpPair>(); + private java.util.Iterator<EventHandlerOpPair> _nextHandler; + private java.util.HashSet<EventHandler> _pendingHandlers = new java.util.HashSet<EventHandler>(); private boolean _promote; } diff --git a/java/src/IceInternal/ThreadPoolWorkQueue.java b/java/src/IceInternal/ThreadPoolWorkQueue.java index eab2b4d2b88..ba3c19eaea8 100644 --- a/java/src/IceInternal/ThreadPoolWorkQueue.java +++ b/java/src/IceInternal/ThreadPoolWorkQueue.java @@ -149,12 +149,6 @@ final class ThreadPoolWorkQueue extends EventHandler { return (java.nio.channels.SelectableChannel)_fdIntrRead; } - - public boolean - hasMoreData() - { - return false; - } public void postMessage() diff --git a/java/src/IceInternal/Transceiver.java b/java/src/IceInternal/Transceiver.java index 329ff3e1386..ad372615208 100644 --- a/java/src/IceInternal/Transceiver.java +++ b/java/src/IceInternal/Transceiver.java @@ -13,35 +13,14 @@ public interface Transceiver { java.nio.channels.SelectableChannel fd(); - // - // Initialize the transceiver. - // - // Returns the status if the initialize operation. - // - int initialize(Buffer readBuffer, Buffer writeBuffer); - + int initialize(Buffer readBuffer, Buffer writeBuffer, Ice.BooleanHolder moreData); + int closing(boolean initiator, Ice.LocalException ex); void close(); - // - // Write data. - // - // Returns true if all the data was written, false otherwise. - // - boolean write(Buffer buf); - - // - // Read data. - // - // Returns true if all the requested data was read, false otherwise. - // - // NOTE: In Java, read() returns a boolean in moreData to indicate - // whether the transceiver has read more data than requested. - // If moreData is true, read should be called again without - // calling select on the FD. - // - boolean read(Buffer buf, Ice.BooleanHolder moreData); + int write(Buffer buf); + int read(Buffer buf, Ice.BooleanHolder moreData); - String type(); + String protocol(); String toString(); Ice.ConnectionInfo getInfo(); void checkSendSize(Buffer buf, int messageSizeMax); diff --git a/java/src/IceInternal/UdpConnector.java b/java/src/IceInternal/UdpConnector.java index b70e1b4c8ec..054014efc2c 100644 --- a/java/src/IceInternal/UdpConnector.java +++ b/java/src/IceInternal/UdpConnector.java @@ -11,33 +11,28 @@ package IceInternal; final class UdpConnector implements Connector { - public Transceiver - connect() + public Transceiver connect() { return new UdpTransceiver(_instance, _addr, _mcastInterface, _mcastTtl); } - public java.nio.channels.SelectableChannel - fd() + public java.nio.channels.SelectableChannel fd() { assert(false); // Shouldn't be called, startConnect always completes immediately. return null; } - public short - type() + public short type() { - return Ice.UDPEndpointType.value; + return _instance.type(); } - public String - toString() + public String toString() { return Network.addrToString(_addr); } - public int - hashCode() + public int hashCode() { return _hashCode; } @@ -45,7 +40,7 @@ final class UdpConnector implements Connector // // Only for use by TcpEndpoint // - UdpConnector(Instance instance, java.net.InetSocketAddress addr, String mcastInterface, int mcastTtl, + UdpConnector(ProtocolInstance instance, java.net.InetSocketAddress addr, String mcastInterface, int mcastTtl, String connectionId) { _instance = instance; @@ -62,8 +57,7 @@ final class UdpConnector implements Connector _hashCode = IceInternal.HashUtil.hashAdd(_hashCode , _connectionId); } - public boolean - equals(java.lang.Object obj) + public boolean equals(java.lang.Object obj) { if(!(obj instanceof UdpConnector)) { @@ -92,9 +86,9 @@ final class UdpConnector implements Connector } return Network.compareAddress(_addr, p._addr) == 0; - } + } - private Instance _instance; + private ProtocolInstance _instance; private java.net.InetSocketAddress _addr; private String _mcastInterface; private int _mcastTtl; diff --git a/java/src/IceInternal/UdpEndpointFactory.java b/java/src/IceInternal/UdpEndpointFactory.java index e45e12d2ea6..8e4301d358b 100644 --- a/java/src/IceInternal/UdpEndpointFactory.java +++ b/java/src/IceInternal/UdpEndpointFactory.java @@ -11,40 +11,42 @@ package IceInternal; final class UdpEndpointFactory implements EndpointFactory { - UdpEndpointFactory(Instance instance) + UdpEndpointFactory(ProtocolInstance instance) { _instance = instance; } - public short - type() + public short type() { - return Ice.UDPEndpointType.value; + return _instance.type(); } - public String - protocol() + public String protocol() { - return "udp"; + return _instance.protocol(); } - public EndpointI - create(String str, boolean oaEndpoint) + public EndpointI create(java.util.ArrayList<String> args, boolean oaEndpoint) { - return new UdpEndpointI(_instance, str, oaEndpoint); + IPEndpointI endpt = new UdpEndpointI(_instance); + endpt.initWithOptions(args, oaEndpoint); + return endpt; } - public EndpointI - read(BasicStream s) + public EndpointI read(BasicStream s) { - return new UdpEndpointI(s); + return new UdpEndpointI(_instance, s); } - public void - destroy() + public void destroy() { _instance = null; } - private Instance _instance; + public EndpointFactory clone(ProtocolInstance instance) + { + return new UdpEndpointFactory(instance); + } + + private ProtocolInstance _instance; } diff --git a/java/src/IceInternal/UdpEndpointI.java b/java/src/IceInternal/UdpEndpointI.java index 24a0fcaf29f..43b5ba48d50 100644 --- a/java/src/IceInternal/UdpEndpointI.java +++ b/java/src/IceInternal/UdpEndpointI.java @@ -9,208 +9,28 @@ package IceInternal; -final class UdpEndpointI extends EndpointI +final class UdpEndpointI extends IPEndpointI { - public - UdpEndpointI(Instance instance, String ho, int po, String mif, int mttl, boolean conn, String conId, boolean co) + public UdpEndpointI(ProtocolInstance instance, String ho, int po, String mif, int mttl, boolean conn, String conId, + boolean co) { - super(conId); - _instance = instance; - _host = ho; - _port = po; + super(instance, ho, po, conId); _mcastInterface = mif; _mcastTtl = mttl; _connect = conn; _compress = co; - calcHashValue(); } - public - UdpEndpointI(Instance instance, String str, boolean oaEndpoint) + public UdpEndpointI(ProtocolInstance instance) { - super(""); - _instance = instance; - _host = null; - _port = 0; + super(instance); _connect = false; _compress = false; - - String[] arr = str.split("[ \t\n\r]+"); - - int i = 0; - while(i < arr.length) - { - if(arr[i].length() == 0) - { - i++; - continue; - } - - String option = arr[i++]; - if(option.charAt(0) != '-') - { - throw new Ice.EndpointParseException("expected an endpoint option but found `" + option + - "' in endpoint `udp " + str + "'"); - } - - String argument = null; - if(i < arr.length && arr[i].charAt(0) != '-') - { - argument = arr[i++]; - if(argument.charAt(0) == '\"' && argument.charAt(argument.length() - 1) == '\"') - { - argument = argument.substring(1, argument.length() - 1); - } - } - - if(option.equals("-h")) - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for -h option in endpoint `udp " - + str + "'"); - } - - _host = argument; - } - else if(option.equals("-p")) - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for -p option in endpoint `udp " - + str + "'"); - } - - try - { - _port = Integer.parseInt(argument); - } - catch(NumberFormatException ex) - { - throw new Ice.EndpointParseException("invalid port value `" + argument + "' in endpoint `udp " + - str + "'"); - } - - if(_port < 0 || _port > 65535) - { - throw new Ice.EndpointParseException("port value `" + argument + - "' out of range in endpoint `udp " + str + "'"); - } - } - else if(option.equals("-c")) - { - if(argument != null) - { - throw new Ice.EndpointParseException("unexpected argument `" + argument + - "' provided for -c option in `udp " + str + "'"); - } - - _connect = true; - } - else if(option.equals("-z")) - { - if(argument != null) - { - throw new Ice.EndpointParseException("unexpected argument `" + argument + - "' provided for -z option in `udp " + str + "'"); - } - - _compress = true; - } - else if(option.equals("-v") || option.equals("-e")) - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for " + option + " option in endpoint " + - "`udp " + str + "'"); - } - - try - { - Ice.EncodingVersion v = Ice.Util.stringToEncodingVersion(argument); - if(v.major != 1 || v.minor != 0) - { - _instance.initializationData().logger.warning("deprecated udp endpoint option: " + option); - } - } - catch(Ice.VersionParseException e) - { - throw new Ice.EndpointParseException("invalid version `" + argument + "' in endpoint `udp " + - str + "':\n" + e.str); - } - } - else if(option.equals("--interface")) - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for --interface option in endpoint `udp " - + str + "'"); - } - - _mcastInterface = argument; - } - else if(option.equals("--ttl")) - { - if(argument == null) - { - throw new Ice.EndpointParseException("no argument provided for --ttl option in endpoint `udp " - + str + "'"); - } - - try - { - _mcastTtl = Integer.parseInt(argument); - } - catch(NumberFormatException ex) - { - throw new Ice.EndpointParseException("invalid TTL value `" + argument + "' in endpoint `udp " + - str + "'"); - } - - if(_mcastTtl < 0) - { - throw new Ice.EndpointParseException("TTL value `" + argument + - "' out of range in endpoint `udp " + str + "'"); - } - } - else - { - throw new Ice.EndpointParseException("unknown option `" + option + "' in `udp " + str + "'"); - } - } - - if(_host == null) - { - _host = _instance.defaultsAndOverrides().defaultHost; - } - else if(_host.equals("*")) - { - if(oaEndpoint) - { - _host = null; - } - else - { - throw new Ice.EndpointParseException("`-h *' not valid for proxy endpoint `udp " + str + "'"); - } - } - - if(_host == null) - { - _host = ""; - } - - calcHashValue(); } - public - UdpEndpointI(BasicStream s) + public UdpEndpointI(ProtocolInstance instance, BasicStream s) { - super(""); - _instance = s.instance(); - s.startReadEncaps(); - _host = s.readString(); - _port = s.readInt(); + super(instance, s); if(s.getReadEncoding().equals(Ice.Util.Encoding_1_0)) { s.readByte(); @@ -222,145 +42,59 @@ final class UdpEndpointI extends EndpointI //_connect = s.readBool(); _connect = false; _compress = s.readBool(); - s.endReadEncaps(); - calcHashValue(); - } - - // - // Marshal the endpoint - // - public void - streamWrite(BasicStream s) - { - s.writeShort(Ice.UDPEndpointType.value); - s.startWriteEncaps(); - s.writeString(_host); - s.writeInt(_port); - if(s.getWriteEncoding().equals(Ice.Util.Encoding_1_0)) - { - Ice.Util.Protocol_1_0.__write(s); - Ice.Util.Encoding_1_0.__write(s); - } - // Not transmitted. - //s.writeBool(_connect); - s.writeBool(_compress); - s.endWriteEncaps(); - } - - // - // Convert the endpoint to its string form - // - public String - _toString() - { - // - // WARNING: Certain features, such as proxy validation in Glacier2, - // depend on the format of proxy strings. Changes to toString() and - // methods called to generate parts of the reference string could break - // these features. Please review for all features that depend on the - // format of proxyToString() before changing this and related code. - // - String s = "udp"; - - if(_host != null && _host.length() > 0) - { - s += " -h "; - boolean addQuote = _host.indexOf(':') != -1; - if(addQuote) - { - s += "\""; - } - s += _host; - if(addQuote) - { - s += "\""; - } - } - - s += " -p " + _port; - - if(_mcastInterface.length() != 0) - { - s += " --interface " + _mcastInterface; - } - - if(_mcastTtl != -1) - { - s += " --ttl " + _mcastTtl; - } - - if(_connect) - { - s += " -c"; - } - - if(_compress) - { - s += " -z"; - } - - return s; } // // Return the endpoint information. // - public Ice.EndpointInfo - getInfo() + public Ice.EndpointInfo getInfo() { - return new Ice.UDPEndpointInfo(-1, _compress, _host, _port, _mcastInterface, _mcastTtl) + Ice.UDPEndpointInfo info = new Ice.UDPEndpointInfo() { public short type() { - return Ice.UDPEndpointType.value; + return UdpEndpointI.this.type(); } - + public boolean datagram() { - return true; + return UdpEndpointI.this.datagram(); } - + public boolean secure() { - return false; + return UdpEndpointI.this.secure(); } }; - } - // - // Return the endpoint type - // - public short - type() - { - return Ice.UDPEndpointType.value; + fillEndpointInfo(info); + return info; } // - // Return the protocol name + // Return the timeout for the endpoint in milliseconds. 0 means + // non-blocking, -1 means no timeout. // - public String - protocol() + public int timeout() { - return "udp"; + return -1; } // - // Return the timeout for the endpoint in milliseconds. 0 means - // non-blocking, -1 means no timeout. + // Return a new endpoint with a different timeout value, provided + // that timeouts are supported by the endpoint. Otherwise the same + // endpoint is returned. // - public int - timeout() + public EndpointI timeout(int timeout) { - return -1; + return this; } // // Return true if the endpoints support bzip2 compress, or false // otherwise. // - public boolean - compress() + public boolean compress() { return _compress; } @@ -370,8 +104,7 @@ final class UdpEndpointI extends EndpointI // provided that compression is supported by the // endpoint. Otherwise the same endpoint is returned. // - public EndpointI - compress(boolean compress) + public EndpointI compress(boolean compress) { if(compress == _compress) { @@ -385,38 +118,9 @@ final class UdpEndpointI extends EndpointI } // - // Return a new endpoint with a different connection id. - // - public EndpointI - connectionId(String connectionId) - { - if(connectionId.equals(_connectionId)) - { - return this; - } - else - { - return new UdpEndpointI(_instance, _host, _port, _mcastInterface, _mcastTtl, _connect, connectionId, - _compress); - } - } - - // - // Return a new endpoint with a different timeout value, provided - // that timeouts are supported by the endpoint. Otherwise the same - // endpoint is returned. - // - public EndpointI - timeout(int timeout) - { - return this; - } - - // // Return true if the endpoint is datagram-based. // - public boolean - datagram() + public boolean datagram() { return true; } @@ -424,8 +128,7 @@ final class UdpEndpointI extends EndpointI // // Return true if the endpoint is secure. // - public boolean - secure() + public boolean secure() { return false; } @@ -437,103 +140,64 @@ final class UdpEndpointI extends EndpointI // "effective" endpoint, which might differ from this endpoint, // for example, if a dynamic port number is assigned. // - public Transceiver - transceiver(EndpointIHolder endpoint) + public Transceiver transceiver(EndpointIHolder endpoint) { UdpTransceiver p = new UdpTransceiver(_instance, _host, _port, _mcastInterface, _connect); - endpoint.value = new UdpEndpointI(_instance, _host, p.effectivePort(), _mcastInterface, _mcastTtl, - _connect, _connectionId, _compress); + endpoint.value = createEndpoint(_host, p.effectivePort(), _connectionId); return p; } // - // Return connectors for this endpoint, or empty list if no connector - // is available. - // - public java.util.List<Connector> - connectors(Ice.EndpointSelectionType selType) - { - return _instance.endpointHostResolver().resolve(_host, _port, selType, this); - } - - public void - connectors_async(Ice.EndpointSelectionType selType, EndpointI_connectors callback) - { - _instance.endpointHostResolver().resolve(_host, _port, selType, this, callback); - } - - // // Return an acceptor for this endpoint, or null if no acceptors // is available. In case an acceptor is created, this operation // also returns a new "effective" endpoint, which might differ // from this endpoint, for example, if a dynamic port number is // assigned. // - public Acceptor - acceptor(EndpointIHolder endpoint, String adapterName) + public Acceptor acceptor(EndpointIHolder endpoint, String adapterName) { endpoint.value = this; return null; } // - // Expand endpoint out in to separate endpoints for each local - // host if listening on INADDR_ANY. + // Convert the endpoint to its string form // - public java.util.List<EndpointI> - expand() + public String options() { - java.util.ArrayList<EndpointI> endps = new java.util.ArrayList<EndpointI>(); - java.util.ArrayList<String> hosts = - Network.getHostsForEndpointExpand(_host, _instance.protocolSupport(), false); - if(hosts == null || hosts.isEmpty()) + // + // WARNING: Certain features, such as proxy validation in Glacier2, + // depend on the format of proxy strings. Changes to toString() and + // methods called to generate parts of the reference string could break + // these features. Please review for all features that depend on the + // format of proxyToString() before changing this and related code. + // + String s = super.options(); + + if(_mcastInterface.length() != 0) { - endps.add(this); + s += " --interface " + _mcastInterface; } - else + + if(_mcastTtl != -1) { - for(String host : hosts) - { - endps.add(new UdpEndpointI(_instance, host, _port, _mcastInterface, _mcastTtl, _connect, _connectionId, - _compress)); - } + s += " --ttl " + _mcastTtl; } - return endps; - } - // - // Check whether the endpoint is equivalent to another one. - // - public boolean - equivalent(EndpointI endpoint) - { - if(!(endpoint instanceof UdpEndpointI)) + if(_connect) { - return false; + s += " -c"; } - UdpEndpointI udpEndpointI = (UdpEndpointI)endpoint; - return udpEndpointI._host.equals(_host) && udpEndpointI._port == _port; - } - public java.util.List<Connector> - connectors(java.util.List<java.net.InetSocketAddress> addresses, NetworkProxy proxy) - { - java.util.ArrayList<Connector> connectors = new java.util.ArrayList<Connector>(); - for(java.net.InetSocketAddress p : addresses) + if(_compress) { - connectors.add(new UdpConnector(_instance, p, _mcastInterface, _mcastTtl, _connectionId)); + s += " -z"; } - return connectors; - } - public int - hashCode() - { - return _hashCode; + return s; } - public int - compareTo(EndpointI obj) // From java.lang.Comparable + public int compareTo(EndpointI obj) // From java.lang.Comparable { if(!(obj instanceof UdpEndpointI)) { @@ -545,23 +209,6 @@ final class UdpEndpointI extends EndpointI { return 0; } - else - { - int r = super.compareTo(p); - if(r != 0) - { - return r; - } - } - - if(_port < p._port) - { - return -1; - } - else if(p._port < _port) - { - return 1; - } if(!_connect && p._connect) { @@ -596,30 +243,148 @@ final class UdpEndpointI extends EndpointI return rc; } - return _host.compareTo(p._host); + return super.compareTo(obj); } - private void - calcHashValue() + // + // Marshal the endpoint + // + protected void streamWriteImpl(BasicStream s) { - int h = 5381; - h = IceInternal.HashUtil.hashAdd(h, Ice.UDPEndpointType.value); - h = IceInternal.HashUtil.hashAdd(h, _host); - h = IceInternal.HashUtil.hashAdd(h, _port); + super.streamWriteImpl(s); + if(s.getWriteEncoding().equals(Ice.Util.Encoding_1_0)) + { + Ice.Util.Protocol_1_0.__write(s); + Ice.Util.Encoding_1_0.__write(s); + } + // Not transmitted. + //s.writeBool(_connect); + s.writeBool(_compress); + } + + protected int hashInit(int h) + { + h = super.hashInit(h); h = IceInternal.HashUtil.hashAdd(h, _mcastInterface); h = IceInternal.HashUtil.hashAdd(h, _mcastTtl); h = IceInternal.HashUtil.hashAdd(h, _connect); - h = IceInternal.HashUtil.hashAdd(h, _connectionId); h = IceInternal.HashUtil.hashAdd(h, _compress); - _hashCode = h; + return h; + } + + protected void fillEndpointInfo(Ice.IPEndpointInfo info) + { + super.fillEndpointInfo(info); + if(info instanceof Ice.UDPEndpointInfo) + { + Ice.UDPEndpointInfo udpInfo = (Ice.UDPEndpointInfo)info; + udpInfo.timeout = -1; + udpInfo.compress = _compress; + udpInfo.mcastInterface = _mcastInterface; + udpInfo.mcastTtl = _mcastTtl; + } + } + + protected boolean checkOption(String option, String argument, String endpoint) + { + if(super.checkOption(option, argument, endpoint)) + { + return true; + } + + if(option.equals("-c")) + { + if(argument != null) + { + throw new Ice.EndpointParseException("unexpected argument `" + argument + + "' provided for -c option in " + endpoint); + } + + _connect = true; + } + else if(option.equals("-z")) + { + if(argument != null) + { + throw new Ice.EndpointParseException("unexpected argument `" + argument + + "' provided for -z option in " + endpoint); + } + + _compress = true; + } + else if(option.equals("-v") || option.equals("-e")) + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for " + option + " option in endpoint " + + endpoint); + } + + try + { + Ice.EncodingVersion v = Ice.Util.stringToEncodingVersion(argument); + if(v.major != 1 || v.minor != 0) + { + _instance.logger().warning("deprecated udp endpoint option: " + option); + } + } + catch(Ice.VersionParseException e) + { + throw new Ice.EndpointParseException("invalid version `" + argument + "' in endpoint " + + endpoint + ":\n" + e.str); + } + } + else if(option.equals("--interface")) + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for --interface option in endpoint " + + endpoint); + } + + _mcastInterface = argument; + } + else if(option.equals("--ttl")) + { + if(argument == null) + { + throw new Ice.EndpointParseException("no argument provided for --ttl option in endpoint " + endpoint); + } + + try + { + _mcastTtl = Integer.parseInt(argument); + } + catch(NumberFormatException ex) + { + throw new Ice.EndpointParseException("invalid TTL value `" + argument + "' in endpoint " + endpoint); + } + + if(_mcastTtl < 0) + { + throw new Ice.EndpointParseException("TTL value `" + argument + "' out of range in endpoint " + + endpoint); + } + } + else + { + return false; + } + return true; + } + + protected Connector createConnector(java.net.InetSocketAddress addr, NetworkProxy proxy) + { + return new UdpConnector(_instance, addr, _mcastInterface, _mcastTtl, _connectionId); + } + + protected IPEndpointI createEndpoint(String host, int port, String connectionId) + { + return new UdpEndpointI(_instance, host, port, _mcastInterface, _mcastTtl, _connect, connectionId, _compress); } - private Instance _instance; - private String _host; - private int _port; private String _mcastInterface = ""; private int _mcastTtl = -1; private boolean _connect; private boolean _compress; - private int _hashCode; } diff --git a/java/src/IceInternal/UdpTransceiver.java b/java/src/IceInternal/UdpTransceiver.java index 96174b95c26..15f1683cf4e 100644 --- a/java/src/IceInternal/UdpTransceiver.java +++ b/java/src/IceInternal/UdpTransceiver.java @@ -11,15 +11,13 @@ package IceInternal; final class UdpTransceiver implements Transceiver { - public java.nio.channels.SelectableChannel - fd() + public java.nio.channels.SelectableChannel fd() { assert(_fd != null); return _fd; } - public int - initialize(Buffer readBuffer, Buffer writeBuffer) + public int initialize(Buffer readBuffer, Buffer writeBuffer, Ice.BooleanHolder moreData) { // // Nothing to do. @@ -27,17 +25,24 @@ final class UdpTransceiver implements Transceiver return SocketOperation.None; } - public void - close() + public int closing(boolean initiator, Ice.LocalException ex) + { + // + // Nothing to do. + // + return SocketOperation.None; + } + + public void close() { assert(_fd != null); - - if(_state >= StateConnected && _traceLevels.network >= 1) + + if(_state >= StateConnected && _instance.traceLevel() >= 1) { - String s = "closing udp connection\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "closing " + _instance.protocol() + " connection\n" + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } - + try { _fd.close(); @@ -48,9 +53,8 @@ final class UdpTransceiver implements Transceiver _fd = null; } - @SuppressWarnings("deprecation") - public boolean - write(Buffer buf) + @SuppressWarnings("deprecation") + public int write(Buffer buf) { // // We don't want write or send to be called on android main thread as this will cause @@ -59,12 +63,12 @@ final class UdpTransceiver implements Transceiver // if(Util.isAndroidMainThread(Thread.currentThread())) { - return false; + return SocketOperation.Write; } assert(buf.b.position() == 0); assert(_fd != null && _state >= StateConnected); - + // The caller is supposed to check the send size before by calling checkSendSize assert(java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead) >= buf.size()); @@ -107,25 +111,23 @@ final class UdpTransceiver implements Transceiver if(ret == 0) { - return false; + return SocketOperation.Write; } - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { - String s = "sent " + ret + " bytes via udp\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "sent " + ret + " bytes via " + _instance.protocol() + "\n" + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } assert(ret == buf.b.limit()); - return true; + return SocketOperation.None; } - + @SuppressWarnings("deprecation") - public boolean - read(Buffer buf, Ice.BooleanHolder moreData) + public int read(Buffer buf, Ice.BooleanHolder moreData) { assert(buf.b.position() == 0); - moreData.value = false; final int packetSize = java.lang.Math.min(_maxPacketSize, _rcvSize - _udpOverhead); buf.resize(packetSize, true); @@ -139,7 +141,7 @@ final class UdpTransceiver implements Transceiver java.net.SocketAddress peerAddr = _fd.receive(buf.b); if(peerAddr == null || buf.b.position() == 0) { - return false; + return SocketOperation.Read; } _peerAddr = (java.net.InetSocketAddress)peerAddr; @@ -172,33 +174,31 @@ final class UdpTransceiver implements Transceiver Network.doConnect(_fd, _peerAddr); _state = StateConnected; - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - String s = "connected udp socket\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "connected " + _instance.protocol() + " socket\n" + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } } - if(_traceLevels.network >= 3) + if(_instance.traceLevel() >= 3) { - String s = "received " + ret + " bytes via udp\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "received " + ret + " bytes via " + _instance.protocol() + "\n" + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } buf.resize(ret, true); buf.b.position(ret); - return true; + return SocketOperation.None; } - public String - type() + public String protocol() { - return "udp"; + return _instance.protocol(); } - public String - toString() + public String toString() { if(_fd == null) { @@ -227,8 +227,7 @@ final class UdpTransceiver implements Transceiver return s; } - public Ice.ConnectionInfo - getInfo() + public Ice.ConnectionInfo getInfo() { Ice.UDPConnectionInfo info = new Ice.UDPConnectionInfo(); if(_fd != null) @@ -261,16 +260,15 @@ final class UdpTransceiver implements Transceiver return info; } - public void - checkSendSize(Buffer buf, int messageSizeMax) + public void checkSendSize(Buffer buf, int messageSizeMax) { if(buf.size() > messageSizeMax) { Ex.throwMemoryLimitException(buf.size(), messageSizeMax); } - + // - // The maximum packetSize is either the maximum allowable UDP packet size, or + // The maximum packetSize is either the maximum allowable UDP packet size, or // the UDP send buffer size (which ever is smaller). // final int packetSize = java.lang.Math.min(_maxPacketSize, _sndSize - _udpOverhead); @@ -280,8 +278,7 @@ final class UdpTransceiver implements Transceiver } } - public final int - effectivePort() + public final int effectivePort() { return _addr.getPort(); } @@ -290,17 +287,16 @@ final class UdpTransceiver implements Transceiver // Only for use by UdpEndpoint // @SuppressWarnings("deprecation") - UdpTransceiver(Instance instance, java.net.InetSocketAddress addr, String mcastInterface, int mcastTtl) + UdpTransceiver(ProtocolInstance instance, java.net.InetSocketAddress addr, String mcastInterface, int mcastTtl) { - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; + _instance = instance; _state = StateNeedConnect; _addr = addr; try { _fd = Network.createUdpSocket(_addr); - setBufSize(instance); + setBufSize(instance.properties()); Network.setBlock(_fd, false); // // NOTE: setting the multicast interface before performing the @@ -313,10 +309,10 @@ final class UdpTransceiver implements Transceiver Network.doConnect(_fd, _addr); _state = StateConnected; // We're connected now - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - String s = "starting to send udp packets\n" + toString(); - _logger.trace(_traceLevels.networkCat, s); + String s = "starting to send " + _instance.protocol() + " packets\n" + toString(); + _instance.logger().trace(_instance.traceCategory(), s); } } catch(Ice.LocalException ex) @@ -330,22 +326,21 @@ final class UdpTransceiver implements Transceiver // Only for use by UdpEndpoint // @SuppressWarnings("deprecation") - UdpTransceiver(Instance instance, String host, int port, String mcastInterface, boolean connect) + UdpTransceiver(ProtocolInstance instance, String host, int port, String mcastInterface, boolean connect) { - _traceLevels = instance.traceLevels(); - _logger = instance.initializationData().logger; + _instance = instance; _state = connect ? StateNeedConnect : StateNotConnected; try { _addr = Network.getAddressForServer(host, port, instance.protocolSupport(), instance.preferIPv6()); _fd = Network.createUdpSocket(_addr); - setBufSize(instance); + setBufSize(instance.properties()); Network.setBlock(_fd, false); - if(_traceLevels.network >= 2) + if(_instance.traceLevel() >= 2) { - String s = "attempting to bind to udp socket " + Network.addrToString(_addr); - _logger.trace(_traceLevels.networkCat, s); + String s = "attempting to bind to " + _instance.protocol() + " socket " + Network.addrToString(_addr); + _instance.logger().trace(_instance.traceCategory(), s); } if(_addr.getAddress().isMulticastAddress()) { @@ -357,11 +352,11 @@ final class UdpTransceiver implements Transceiver // // Windows does not allow binding to the mcast address itself // so we bind to INADDR_ANY (0.0.0.0) instead. As a result, - // bi-directional connection won't work because the source + // bi-directional connection won't work because the source // address won't be the multicast address and the client will // therefore reject the datagram. // - int protocol = + int protocol = _mcastAddr.getAddress().getAddress().length == 4 ? Network.EnableIPv4 : Network.EnableIPv6; _addr = Network.getAddressForServer("", port, protocol, instance.preferIPv6()); } @@ -395,12 +390,12 @@ final class UdpTransceiver implements Transceiver _addr = Network.doBind(_fd, _addr); } - if(_traceLevels.network >= 1) + if(_instance.traceLevel() >= 1) { - StringBuffer s = new StringBuffer("starting to receive udp packets\n"); + StringBuffer s = new StringBuffer("starting to receive " + _instance.protocol() + " packets\n"); s.append(toString()); - java.util.List<String> interfaces = + java.util.List<String> interfaces = Network.getHostsForEndpointExpand(_addr.getAddress().getHostAddress(), instance.protocolSupport(), true); if(!interfaces.isEmpty()) @@ -408,7 +403,7 @@ final class UdpTransceiver implements Transceiver s.append("\nlocal interfaces: "); s.append(IceUtilInternal.StringUtil.joinString(interfaces, ", ")); } - _logger.trace(_traceLevels.networkCat, s.toString()); + _instance.logger().trace(_instance.traceCategory(), s.toString()); } } catch(Ice.LocalException ex) @@ -418,8 +413,7 @@ final class UdpTransceiver implements Transceiver } } - private synchronized void - setBufSize(Instance instance) + private synchronized void setBufSize(Ice.Properties properties) { assert(_fd != null); @@ -446,13 +440,14 @@ final class UdpTransceiver implements Transceiver // // Get property for buffer size and check for sanity. // - int sizeRequested = instance.initializationData().properties.getPropertyAsIntWithDefault(prop, dfltSize); + int sizeRequested = properties.getPropertyAsIntWithDefault(prop, dfltSize); if(sizeRequested < (_udpOverhead + IceInternal.Protocol.headerSize)) { - _logger.warning("Invalid " + prop + " value of " + sizeRequested + " adjusted to " + dfltSize); + _instance.logger().warning("Invalid " + prop + " value of " + sizeRequested + " adjusted to " + + dfltSize); sizeRequested = dfltSize; } - + if(sizeRequested != dfltSize) { // @@ -479,233 +474,106 @@ final class UdpTransceiver implements Transceiver // if(sizeSet < sizeRequested) { - _logger.warning("UDP " + direction + " buffer size: requested size of " + _instance.logger().warning("UDP " + direction + " buffer size: requested size of " + sizeRequested + " adjusted to " + sizeSet); } } } } - // - // The NIO classes before JDK 1.7 do not support multicast, at least not directly. - // This method works around that limitation by using reflection to configure the - // file descriptor of a DatagramChannel for multicast operation. Specifically, an - // instance of java.net.PlainDatagramSocketImpl is use to (temporarily) wrap the - // channel's file descriptor. - // - // If using JDK >= 1.7 we use the new added MulticastChannel via reflection to allow - // compilation with older JDK versions. - // - private void - configureMulticast(java.net.InetSocketAddress group, String interfaceAddr, int ttl) + private void configureMulticast(java.net.InetSocketAddress group, String interfaceAddr, int ttl) { try { - Class<?> cls = Util.findClass("java.nio.channels.MulticastChannel", null); - java.lang.reflect.Method m = null; - java.net.DatagramSocketImpl socketImpl = null; - java.lang.reflect.Field socketFd = null; java.net.NetworkInterface intf = null; - if(cls == null || !cls.isAssignableFrom(_fd.getClass())) + + if(interfaceAddr.length() != 0) { - cls = Util.findClass("java.net.PlainDatagramSocketImpl", null); - if(cls == null) + intf = java.net.NetworkInterface.getByName(interfaceAddr); + if(intf == null) { - throw new Ice.SocketException(); + try + { + intf = java.net.NetworkInterface.getByInetAddress( + java.net.InetAddress.getByName(interfaceAddr)); + } + catch(Exception ex) + { + } } - java.lang.reflect.Constructor<?> c = cls.getDeclaredConstructor((Class<?>[])null); - c.setAccessible(true); - socketImpl = (java.net.DatagramSocketImpl)c.newInstance((Object[])null); + } + if(group != null) + { // - // We have to invoke the protected create() method on the PlainDatagramSocketImpl object so - // that this hack works properly when IPv6 is enabled on Windows. + // Join multicast group. // - try - { - m = cls.getDeclaredMethod("create", (Class<?>[])null); - m.setAccessible(true); - m.invoke(socketImpl); - } - catch(java.lang.NoSuchMethodException ex) // OpenJDK + boolean join = false; + if(intf != null) { + _fd.join(group.getAddress(), intf); + join = true; } - - cls = Util.findClass("sun.nio.ch.DatagramChannelImpl", null); - if(cls == null) + else { - throw new Ice.SocketException(); - } - java.lang.reflect.Field channelFd = cls.getDeclaredField("fd"); - channelFd.setAccessible(true); - - socketFd = java.net.DatagramSocketImpl.class.getDeclaredField("fd"); - socketFd.setAccessible(true); - socketFd.set(socketImpl, channelFd.get(_fd)); - } + // + // If the user doesn't specify an interface, we join to the multicast group with every + // interface that supports multicast and has a configured address with the same protocol + // as the group address protocol. + // + int protocol = group.getAddress().getAddress().length == 4 ? Network.EnableIPv4 : + Network.EnableIPv6; - try - { - if(interfaceAddr.length() != 0) - { - intf = java.net.NetworkInterface.getByName(interfaceAddr); - if(intf == null) + java.util.List<java.net.NetworkInterface> interfaces = + java.util.Collections.list(java.net.NetworkInterface.getNetworkInterfaces()); + for(java.net.NetworkInterface iface : interfaces) { - try - { - intf = java.net.NetworkInterface.getByInetAddress( - java.net.InetAddress.getByName(interfaceAddr)); - } - catch(Exception ex) + if(!iface.supportsMulticast()) { + continue; } - } - } - if(group != null) - { - // - // Join multicast group. - // - Class<?>[] types; - Object[] args; - if(socketImpl == null) - { - types = new Class<?>[]{ java.net.InetAddress.class, java.net.NetworkInterface.class }; - m = _fd.getClass().getDeclaredMethod("join", types); - m.setAccessible(true); - boolean join = false; - if(intf != null) + boolean hasProtocolAddress = false; + java.util.List<java.net.InetAddress> addresses = + java.util.Collections.list(iface.getInetAddresses()); + for(java.net.InetAddress address : addresses) { - m.invoke(_fd, new Object[] { group.getAddress(), intf }); - join = true; - } - else - { - // - // If the user doesn't specify an interface, we join to the multicast group with all the - // interfaces that support multicast and has a configured address with the same protocol - // as the group address protocol. - // - int protocol = group.getAddress().getAddress().length == 4 ? Network.EnableIPv4 : - Network.EnableIPv6; - - java.util.List<java.net.NetworkInterface> interfaces = - java.util.Collections.list(java.net.NetworkInterface.getNetworkInterfaces()); - for(java.net.NetworkInterface iface : interfaces) + if(address.getAddress().length == 4 && protocol == Network.EnableIPv4 || + address.getAddress().length != 4 && protocol == Network.EnableIPv6) { - if(!iface.supportsMulticast()) - { - continue; - } - boolean hasProtocolAddress = false; - java.util.List<java.net.InetAddress> addresses = - java.util.Collections.list(iface.getInetAddresses()); - for(java.net.InetAddress address : addresses) - { - if(address.getAddress().length == 4 && protocol == Network.EnableIPv4 || - address.getAddress().length != 4 && protocol == Network.EnableIPv6) - { - hasProtocolAddress = true; - break; - } - } - - if(hasProtocolAddress) - { - m.invoke(_fd, new Object[] { group.getAddress(), iface }); - join = true; - } + hasProtocolAddress = true; + break; } - - if(!join) - { - throw new Ice.SocketException(new IllegalArgumentException( - "There aren't any interfaces that support multicast, " + - "or the interfaces that support it\n" + - "are not configured for the group protocol. " + - "Cannot join the mulitcast group.")); - } - } - } - else - { - try - { - types = new Class<?>[]{ java.net.SocketAddress.class, java.net.NetworkInterface.class }; - m = socketImpl.getClass().getDeclaredMethod("joinGroup", types); - args = new Object[]{ group, intf }; } - catch(java.lang.NoSuchMethodException ex) // OpenJDK - { - types = new Class<?>[]{ java.net.InetAddress.class, java.net.NetworkInterface.class }; - m = socketImpl.getClass().getDeclaredMethod("join", types); - args = new Object[]{ group.getAddress(), intf }; - } - m.setAccessible(true); - m.invoke(socketImpl, args); - } - } - else if(intf != null) - { - // - // Otherwise, set the multicast interface if specified. - // - Class<?>[] types = new Class<?>[]{ Integer.TYPE, Object.class }; - if(socketImpl == null) - { - Class<?> socketOption = Util.findClass("java.net.SocketOption", null); - Class<?> standardSocketOptions = Util.findClass("java.net.StandardSocketOptions", null); - m = _fd.getClass().getDeclaredMethod("setOption", new Class<?>[]{socketOption, Object.class}); - m.setAccessible(true); - java.lang.reflect.Field ipMcastIf = standardSocketOptions.getDeclaredField("IP_MULTICAST_IF"); - ipMcastIf.setAccessible(true); - m.invoke(_fd, new Object[]{ ipMcastIf.get(null), intf }); - } - else - { - try + if(hasProtocolAddress) { - m = socketImpl.getClass().getDeclaredMethod("setOption", types); - } - catch(java.lang.NoSuchMethodException ex) // OpenJDK - { - m = socketImpl.getClass().getDeclaredMethod("socketSetOption", types); + _fd.join(group.getAddress(), iface); + join = true; } - m.setAccessible(true); - Object[] args = new Object[]{ Integer.valueOf(java.net.SocketOptions.IP_MULTICAST_IF2), intf }; - m.invoke(socketImpl, args); } - } - if(ttl != -1) - { - if(socketImpl == null) + if(!join) { - Class<?> socketOption = Util.findClass("java.net.SocketOption", null); - Class<?> standardSocketOptions = Util.findClass("java.net.StandardSocketOptions", null); - m = _fd.getClass().getDeclaredMethod("setOption", new Class<?>[]{socketOption, Object.class}); - m.setAccessible(true); - java.lang.reflect.Field ipMcastTtl = standardSocketOptions.getDeclaredField("IP_MULTICAST_TTL"); - ipMcastTtl.setAccessible(true); - m.invoke(_fd, new Object[]{ ipMcastTtl.get(null), ttl }); - } - else - { - Class<?>[] types = new Class<?>[]{ Integer.TYPE }; - m = java.net.DatagramSocketImpl.class.getDeclaredMethod("setTimeToLive", types); - m.setAccessible(true); - m.invoke(socketImpl, new Object[]{ Integer.valueOf(ttl) }); + throw new Ice.SocketException(new IllegalArgumentException( + "There aren't any interfaces that support multicast, " + + "or the interfaces that support it\n" + + "are not configured for the group protocol. " + + "Cannot join the mulitcast group.")); } } } - finally + else if(intf != null) { - if(socketFd != null && socketImpl != null) - { - socketFd.set(socketImpl, null); - } + // + // Otherwise, set the multicast interface if specified. + // + _fd.setOption(java.net.StandardSocketOptions.IP_MULTICAST_IF, intf); + } + + if(ttl != -1) + { + _fd.setOption(java.net.StandardSocketOptions.IP_MULTICAST_TTL, ttl); } } catch(Exception ex) @@ -714,8 +582,7 @@ final class UdpTransceiver implements Transceiver } } - protected synchronized void - finalize() + protected synchronized void finalize() throws Throwable { try @@ -731,8 +598,7 @@ final class UdpTransceiver implements Transceiver } } - private TraceLevels _traceLevels; - private Ice.Logger _logger; + private ProtocolInstance _instance; private int _state; private int _rcvSize; |