diff options
author | Mark Spruiell <mes@zeroc.com> | 2002-05-02 03:27:07 +0000 |
---|---|---|
committer | Mark Spruiell <mes@zeroc.com> | 2002-05-02 03:27:07 +0000 |
commit | 5f342d668a30647b7d7182b4b296e65f7cfc2b07 (patch) | |
tree | 53f13aa7e79a77da4ca6133cf170812d3995d8b2 /java/src | |
parent | adding assertions (diff) | |
download | ice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.tar.bz2 ice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.tar.xz ice-5f342d668a30647b7d7182b4b296e65f7cfc2b07.zip |
align with C++ changes for thread pool, properties, plug-ins
Diffstat (limited to 'java/src')
24 files changed, 878 insertions, 496 deletions
diff --git a/java/src/Ice/Application.java b/java/src/Ice/Application.java index 2444e3a57f3..becb2c1a1da 100644 --- a/java/src/Ice/Application.java +++ b/java/src/Ice/Application.java @@ -50,9 +50,9 @@ public abstract class Application StringSeqHolder argHolder = new StringSeqHolder(args); if (configFile != null) { - Properties properties = - Util.createPropertiesFromFile(argHolder, configFile); - _communicator = Util.initializeWithProperties(properties); + Properties properties = Util.createProperties(argHolder); + properties.load(configFile); + _communicator = Util.initializeWithProperties(argHolder, properties); } else { diff --git a/java/src/Ice/CommunicatorI.java b/java/src/Ice/CommunicatorI.java index 1c5fde423b2..6e46032ffef 100644 --- a/java/src/Ice/CommunicatorI.java +++ b/java/src/Ice/CommunicatorI.java @@ -29,9 +29,9 @@ class CommunicatorI implements Communicator // // No mutex locking here! // - if (_threadPool != null) + if (_serverThreadPool != null) { - _threadPool.initiateServerShutdown(); + _serverThreadPool.initiateShutdown(); } } @@ -42,9 +42,9 @@ class CommunicatorI implements Communicator // No mutex locking here, otherwise the communicator is blocked // while waiting for shutdown. // - if (_threadPool != null) + if (_serverThreadPool != null) { - _threadPool.waitUntilServerFinished(); + _serverThreadPool.waitUntilFinished(); } } @@ -84,9 +84,9 @@ class CommunicatorI implements Communicator adapter.addRouter(RouterPrxHelper.uncheckedCast(_instance.proxyFactory().stringToProxy(router))); } - if (_threadPool == null) // Lazy initialization of _threadPool. + if (_serverThreadPool == null) // Lazy initialization of _serverThreadPool. { - _threadPool = _instance.threadPool(); + _serverThreadPool = _instance.serverThreadPool(); } return adapter; @@ -115,9 +115,9 @@ class CommunicatorI implements Communicator ObjectAdapter adapter = _instance.objectAdapterFactory().createObjectAdapter(name, endpts); - if (_threadPool == null) // Lazy initialization of _threadPool. + if (_serverThreadPool == null) // Lazy initialization of _serverThreadPool. { - _threadPool = _instance.threadPool(); + _serverThreadPool = _instance.serverThreadPool(); } return adapter; @@ -225,9 +225,9 @@ class CommunicatorI implements Communicator return null; } - CommunicatorI(Properties properties) + CommunicatorI(StringSeqHolder args, Properties properties) { - _instance = new IceInternal.Instance(this, properties); + _instance = new IceInternal.Instance(this, args, properties); } protected void @@ -243,6 +243,16 @@ class CommunicatorI implements Communicator } // + // Certain initialization tasks need to be completed after the + // constructor. + // + void + finishSetup(StringSeqHolder args) + { + _instance.finishSetup(args); + } + + // // For use by Util.getInstance() // IceInternal.Instance @@ -252,5 +262,13 @@ class CommunicatorI implements Communicator } private IceInternal.Instance _instance; - private IceInternal.ThreadPool _threadPool; + + // + // We need _serverThreadPool directly in CommunicatorI. That's + // because the shutdown() operation is signal-safe, and thus must + // not access any mutex locks or _instance. It may only access + // _serverThreadPool->initiateShutdown(), which is signal-safe as + // well. + // + private IceInternal.ThreadPool _serverThreadPool; } diff --git a/java/src/Ice/ObjectAdapterI.java b/java/src/Ice/ObjectAdapterI.java index f6ea71774fa..708ff795de1 100644 --- a/java/src/Ice/ObjectAdapterI.java +++ b/java/src/Ice/ObjectAdapterI.java @@ -340,7 +340,7 @@ public class ObjectAdapterI implements ObjectAdapter // might change it, for example, to fill in the real port // number if a zero port number is given. // - IceInternal.Endpoint endp = IceInternal.Endpoint.endpointFromString(instance, es); + IceInternal.Endpoint endp = instance.endpointFactoryManager().create(es); _incomingConnectionFactories.add(new IceInternal.IncomingConnectionFactory(instance, endp, this)); if (end == s.length()) diff --git a/java/src/Ice/PluginFactory.java b/java/src/Ice/PluginFactory.java new file mode 100644 index 00000000000..54715edd0f9 --- /dev/null +++ b/java/src/Ice/PluginFactory.java @@ -0,0 +1,16 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +package Ice; + +public interface PluginFactory +{ + Plugin create(Communicator communicator, String name, String[] args); +} diff --git a/java/src/Ice/PluginManagerI.java b/java/src/Ice/PluginManagerI.java new file mode 100644 index 00000000000..442f8f4e9b2 --- /dev/null +++ b/java/src/Ice/PluginManagerI.java @@ -0,0 +1,167 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +package Ice; + +public final class PluginManagerI implements PluginManager +{ + public synchronized Plugin + getPlugin(String name) + { + Plugin p = (Plugin)_plugins.get(name); + if (p != null) + { + return p; + } + throw new PluginNotFoundException(); + } + + public synchronized void + addPlugin(String name, Plugin plugin) + { + if (_plugins.containsKey(name)) + { + throw new PluginExistsException(); + } + _plugins.put(name, plugin); + } + + public synchronized void + destroy() + { + java.util.Iterator i = _plugins.entrySet().iterator(); + while (i.hasNext()) + { + Plugin p = (Plugin)i.next(); + p.destroy(); + } + } + + public + PluginManagerI(IceInternal.Instance instance) + { + _instance = instance; + } + + public void + loadPlugins(StringSeqHolder cmdArgs) + { + // + // Load and initialize the plug-ins defined in the property set + // with the prefix "Ice.Plugin.". These properties should + // have the following format: + // + // Ice.Plugin.name=entry_point [args] + // + final String prefix = "Ice.Plugin."; + Ice.Properties properties = _instance.properties(); + String[] plugins = properties.getProperties(prefix); + for (int i = 0; i < plugins.length; i += 2) + { + String name = plugins[i].substring(prefix.length()); + String value = plugins[i + 1]; + + // + // Separate the entry point from the arguments. + // + String className; + String[] args; + int pos = value.indexOf(' '); + if (pos == -1) + { + pos = value.indexOf('\t'); + } + if (pos == -1) + { + pos = value.indexOf('\n'); + } + if (pos == -1) + { + className = value; + args = new String[0]; + } + else + { + className = value.substring(0, pos); + args = value.substring(pos).trim().split("[ \t\n]+", pos); + } + + // + // Convert command-line options into properties. First we + // convert the options from the plug-in configuration, then + // we convert the options from the application command-line. + // + args = properties.parseCommandLineOptions(name, args); + cmdArgs.value = properties.parseCommandLineOptions(name, cmdArgs.value); + + loadPlugin(name, className, args); + } + } + + private void + loadPlugin(String name, String className, String[] args) + { + // + // Instantiate the class. + // + PluginFactory factory = null; + try + { + Class c = Class.forName(className); + java.lang.Object obj = c.newInstance(); + try + { + factory = (PluginFactory)obj; + } + catch (ClassCastException ex) + { + _instance.logger().error("PluginManager: class " + className + + " does not implement Ice.PluginFactory"); + throw new SystemException(); + } + } + catch (ClassNotFoundException ex) + { + _instance.logger().error("PluginManager: class " + className + " not found"); + throw new SystemException(); + } + catch (IllegalAccessException ex) + { + _instance.logger().error("PluginManager: unable to access default constructor in class " + className); + throw new SystemException(); + } + catch (InstantiationException ex) + { + _instance.logger().error("PluginManager: unable to instantiate class " + className); + throw new SystemException(); + } + + // + // Invoke the factory. + // + Plugin plugin = null; + try + { + plugin = factory.create(_instance.communicator(), name, args); + } + catch (Exception ex) + { + _instance.logger().error("PluginManager: exception in factory " + className); + SystemException e = new SystemException(); + e.initCause(ex); + throw e; + } + + _plugins.put(name, plugin); + } + + private IceInternal.Instance _instance; + private java.util.HashMap _plugins = new java.util.HashMap(); +} diff --git a/java/src/Ice/PropertiesI.java b/java/src/Ice/PropertiesI.java index b192803a815..83a004be14b 100644 --- a/java/src/Ice/PropertiesI.java +++ b/java/src/Ice/PropertiesI.java @@ -116,7 +116,27 @@ class PropertiesI implements Properties public synchronized String[] parseCommandLineOptions(String prefix, String[] options) { - return null; + java.util.ArrayList result = new java.util.ArrayList(); + for (int i = 0; i < options.length; i++) + { + String opt = options[i]; + if (opt.startsWith("--" + prefix + ".")) + { + if (opt.indexOf('=') == -1) + { + opt += "=1"; + } + + parseLine(opt.substring(2)); + } + else + { + result.add(opt); + } + } + String[] arr = new String[result.size()]; + result.toArray(arr); + return arr; } public synchronized void @@ -144,61 +164,28 @@ class PropertiesI implements Properties return p; } - static void - addArgumentPrefix(String prefix) + PropertiesI() { - _argumentPrefixes.add(prefix); } PropertiesI(String[] args) { - String file = getConfigFile(args); - - if (file.length() > 0) - { - load(file); - } - - StringSeqHolder argsH = new StringSeqHolder(); - argsH.value = args; - parseArgs(argsH); - setProperty("Ice.Config", file); - } - - PropertiesI(StringSeqHolder args) - { - String file = getConfigFile(args.value); - - if (file.length() > 0) - { - load(file); - } - - parseArgs(args); - setProperty("Ice.Config", file); - } - - PropertiesI(String[] args, String file) - { - if (file == null) - { - file = ""; - } - - if (file.length() > 0) + for (int i = 0; i < args.length; i++) { - load(file); + if (args[i].startsWith("--Ice.Config")) + { + String line = args[i]; + if (line.indexOf('=') == -1) + { + line += "=1"; + } + parseLine(line.substring(2)); + } } - StringSeqHolder argsH = new StringSeqHolder(); - argsH.value = args; - parseArgs(argsH); - setProperty("Ice.Config", file); - } + String file = getProperty("Ice.Config"); - PropertiesI(StringSeqHolder args, String file) - { - if (file == null) + if (file.equals("1")) { file = ""; } @@ -208,23 +195,25 @@ class PropertiesI implements Properties load(file); } - parseArgs(args); setProperty("Ice.Config", file); } - private String - getConfigFile(String[] args) + PropertiesI(StringSeqHolder args) { - for (int i = 0; i < args.length; i++) + for (int i = 0; i < args.value.length; i++) { - if (args[i].startsWith("--Ice.Config")) + if (args.value[i].startsWith("--Ice.Config")) { - String line = args[i]; + String line = args.value[i]; if (line.indexOf('=') == -1) { line += "=1"; } parseLine(line.substring(2)); + String[] arr = new String[args.value.length - 1]; + System.arraycopy(args.value, 0, arr, 0, i); + System.arraycopy(args.value, i + 1, arr, i, args.value.length - i); + args.value = arr; } } @@ -235,63 +224,12 @@ class PropertiesI implements Properties file = ""; } - return file; - } - - private void - parseArgs(StringSeqHolder args) - { - int idx = 0; - while (idx < args.value.length) + if (file.length() > 0) { - boolean match = false; - String arg = args.value[idx]; - int beg = arg.indexOf("--"); - if (beg == 0) - { - String rest = arg.substring(2); - if (rest.startsWith("Ice.")) - { - match = true; - } - else - { - java.util.Iterator p = _argumentPrefixes.iterator(); - while (p.hasNext()) - { - String prefix = (String)p.next(); - if (rest.startsWith(prefix + '.')) - { - match = true; - break; - } - } - } - - if (match) - { - String[] arr = new String[args.value.length - 1]; - System.arraycopy(args.value, 0, arr, 0, idx); - if (idx < args.value.length - 1) - { - System.arraycopy(args.value, idx + 1, arr, idx, args.value.length - idx - 1); - } - args.value = arr; - - if (arg.indexOf('=') == -1) - { - arg += "=1"; - } - - parseLine(arg.substring(2)); - } - } - - if (!match) - { - idx++; - } + load(file); } + + setProperty("Ice.Config", file); } private void diff --git a/java/src/Ice/Util.java b/java/src/Ice/Util.java index efae40e3486..c29e3061172 100644 --- a/java/src/Ice/Util.java +++ b/java/src/Ice/Util.java @@ -13,17 +13,17 @@ package Ice; public final class Util { public static Properties - getDefaultProperties(String[] args) + getDefaultProperties() { if (_defaultProperties == null) { - _defaultProperties = createProperties(args); + _defaultProperties = createProperties(); } return _defaultProperties; } public static Properties - getDefaultProperties(StringSeqHolder args) + getDefaultProperties(String[] args) { if (_defaultProperties == null) { @@ -33,51 +33,67 @@ public final class Util } public static Properties - createProperties(String[] args) + getDefaultProperties(StringSeqHolder args) { - return new PropertiesI(args); + if (_defaultProperties == null) + { + _defaultProperties = createProperties(args); + } + return _defaultProperties; } public static Properties - createProperties(StringSeqHolder args) + createProperties() { - return new PropertiesI(args); + return new PropertiesI(); } public static Properties - createPropertiesFromFile(String[] args, String file) + createProperties(String[] args) { - return new PropertiesI(args, file); + return new PropertiesI(args); } public static Properties - createPropertiesFromFile(StringSeqHolder args, String file) + createProperties(StringSeqHolder args) { - return new PropertiesI(args, file); + return new PropertiesI(args); } public static Communicator initialize(String[] args) { - return new CommunicatorI(getDefaultProperties(args)); + StringSeqHolder argsH = new StringSeqHolder(args); + Properties defaultProperties = getDefaultProperties(argsH); + CommunicatorI result = new CommunicatorI(argsH, defaultProperties); + result.finishSetup(argsH); + return result; } public static Communicator initialize(StringSeqHolder args) { - return new CommunicatorI(getDefaultProperties(args)); + Properties defaultProperties = getDefaultProperties(args); + CommunicatorI result = new CommunicatorI(args, defaultProperties); + result.finishSetup(args); + return result; } public static Communicator - initializeWithProperties(Properties properties) + initializeWithProperties(String[] args, Properties properties) { - return new CommunicatorI(properties); + StringSeqHolder argsH = new StringSeqHolder(args); + CommunicatorI result = new CommunicatorI(argsH, properties); + result.finishSetup(argsH); + return result; } - public static void - addArgumentPrefix(String prefix) + public static Communicator + initializeWithProperties(StringSeqHolder args, Properties properties) { - PropertiesI.addArgumentPrefix(prefix); + CommunicatorI result = new CommunicatorI(args, properties); + result.finishSetup(args); + return result; } public static IceInternal.Instance diff --git a/java/src/IceBox/Admin.java b/java/src/IceBox/Admin.java index 4baf40b750b..b9bddd5ecb0 100644 --- a/java/src/IceBox/Admin.java +++ b/java/src/IceBox/Admin.java @@ -29,6 +29,10 @@ public final class Admin public int run(String[] args) { + Ice.Properties properties = communicator().getProperties(); + + args = properties.parseCommandLineOptions("IceBox", args); + java.util.ArrayList commands = new java.util.ArrayList(); int idx = 0; @@ -58,7 +62,6 @@ public final class Admin return 0; } - Ice.Properties properties = communicator().getProperties(); final String managerEndpointsProperty = "IceBox.ServiceManager.Endpoints"; String managerEndpoints = properties.getProperty(managerEndpointsProperty); if (managerEndpoints.length() == 0) @@ -97,8 +100,6 @@ public final class Admin public static void main(String[] args) { - Ice.Util.addArgumentPrefix("IceBox"); - Client app = new Client(); int rc = app.main("IceBox.Admin", args); diff --git a/java/src/IceBox/Server.java b/java/src/IceBox/Server.java index 1958efdbf0e..bbe013e9c2e 100644 --- a/java/src/IceBox/Server.java +++ b/java/src/IceBox/Server.java @@ -12,18 +12,45 @@ package IceBox; public final class Server { + private static void + usage() + { + System.err.println("Usage: IceBox.Server [options]\n"); + System.err.println( + "Options:\n" + + "-h, --help Show this message.\n" + ); + } + public static void main(String[] args) { Ice.Communicator communicator = null; int status = 0; - Ice.Util.addArgumentPrefix("IceBox"); - try { Ice.StringSeqHolder argsH = new Ice.StringSeqHolder(args); communicator = Ice.Util.initialize(argsH); + + Ice.Properties properties = communicator.getProperties(); + argsH.value = properties.parseCommandLineOptions("IceBox", argsH.value); + + for (int i = 1; i < argsH.value.length; ++i) + { + if (argsH.value[i].equals("-h") || argsH.value[i].equals("--help")) + { + usage(); + System.exit(0); + } + else if (!argsH.value[i].startsWith("--")) + { + System.err.println("Server: unknown option `" + argsH.value[i] + "'"); + usage(); + System.exit(1); + } + } + ServiceManagerI serviceManagerImpl = new ServiceManagerI(communicator, argsH.value); status = serviceManagerImpl.run(); } diff --git a/java/src/IceBox/ServiceManagerI.java b/java/src/IceBox/ServiceManagerI.java index 9b72c679a93..0a09889b2c8 100644 --- a/java/src/IceBox/ServiceManagerI.java +++ b/java/src/IceBox/ServiceManagerI.java @@ -217,11 +217,11 @@ public final class ServiceManagerI extends _ServiceManagerDisp // // Create the service property set. // - Ice.Util.addArgumentPrefix(service); - Ice.StringSeqHolder argsH = new Ice.StringSeqHolder(); - argsH.value = new String[l.size()]; - l.toArray(argsH.value); - Ice.Properties serviceProperties = Ice.Util.createProperties(argsH); + Ice.Properties serviceProperties = Ice.Util.createProperties(); + String[] serviceArgs = new String[l.size()]; + l.toArray(serviceArgs); + serviceArgs = serviceProperties.parseCommandLineOptions("Ice", serviceArgs); + serviceArgs = serviceProperties.parseCommandLineOptions(service, serviceArgs); // // Instantiate the class. @@ -269,7 +269,7 @@ public final class ServiceManagerI extends _ServiceManagerDisp // try { - svc.init(service, _communicator, serviceProperties, argsH.value); + svc.init(service, _communicator, serviceProperties, serviceArgs); _services.put(service, svc); } catch (FailureException ex) diff --git a/java/src/IceInternal/Connection.java b/java/src/IceInternal/Connection.java index c757f9ce824..79f1e0d8eae 100644 --- a/java/src/IceInternal/Connection.java +++ b/java/src/IceInternal/Connection.java @@ -287,21 +287,26 @@ public final class Connection extends EventHandler try { // - // In closed and holding state, we are not registered with the - // thread pool. For all other states, we have to notify the thread - // pool in case this event handler changed from a client to a - // server or vice versa. + // We are registered with a thread pool in active and closing + // mode. However, we only change subscription if we're in active + // mode, and thus ignore closing mode here.k // - if (_state != StateHolding && _state != StateClosed) + if (_state == StateActive) { if (adapter != null && _adapter == null) { - _threadPool.clientIsNowServer(); + // + // Client is now server. + // + unregisterWithPool(); } if (adapter == null && _adapter != null) { - _threadPool.serverIsNowClient(); + // + // Server is now client. + // + unregisterWithPool(); } } @@ -331,20 +336,6 @@ public final class Connection extends EventHandler // Operations from EventHandler // public boolean - server() - { - _mutex.lock(); - try - { - return _adapter != null; - } - finally - { - _mutex.unlock(); - } - } - - public boolean readable() { return true; @@ -371,7 +362,7 @@ public final class Connection extends EventHandler }; public void - message(BasicStream stream) + message(BasicStream stream, ThreadPool threadPool) { Incoming in = null; boolean batch = false; @@ -379,7 +370,7 @@ public final class Connection extends EventHandler _mutex.lock(); try { - _threadPool.promoteFollower(); + threadPool.promoteFollower(); if (_state == StateClosed) { @@ -626,16 +617,18 @@ public final class Connection extends EventHandler } public void - finished() + finished(ThreadPool threadPool) { _mutex.lock(); try { - assert(_state == StateClosed || _state == StateHolding); + threadPool.promoteFollower(); - _threadPool.promoteFollower(); - - if (_state == StateClosed) + if (_state == StateActive || _state == StateClosing) + { + registerWithPool(); + } + else if (_state == StateClosed) { _transceiver.close(); } @@ -662,7 +655,7 @@ public final class Connection extends EventHandler /* public boolean - tryDestroy() + tryDestroy(ThreadPool threadPool) { boolean isLocked = _mutex.trylock(); if (!isLocked) @@ -670,7 +663,7 @@ public final class Connection extends EventHandler return false; } - _threadPool.promoteFollower(); + threadPool.promoteFollower(); try { @@ -684,14 +677,12 @@ public final class Connection extends EventHandler } */ - Connection(Instance instance, Transceiver transceiver, Endpoint endpoint, - Ice.ObjectAdapter adapter) + Connection(Instance instance, Transceiver transceiver, Endpoint endpoint, Ice.ObjectAdapter adapter) { super(instance); _transceiver = transceiver; _endpoint = endpoint; _adapter = adapter; - _threadPool = instance.threadPool(); _logger = instance.logger(); _traceLevels = instance.traceLevels(); _nextRequestId = 1; @@ -814,7 +805,7 @@ public final class Connection extends EventHandler { return; } - _threadPool._register(_transceiver.fd(), this); + registerWithPool(); break; } @@ -824,7 +815,7 @@ public final class Connection extends EventHandler { return; } - _threadPool.unregister(_transceiver.fd()); + unregisterWithPool(); break; } @@ -839,7 +830,7 @@ public final class Connection extends EventHandler // // We need to continue to read data in closing state. // - _threadPool._register(_transceiver.fd(), this); + registerWithPool(); } break; } @@ -853,9 +844,10 @@ public final class Connection extends EventHandler // register again before we unregister, so that // finished() is called correctly. // - _threadPool._register(_transceiver.fd(), this); + registerWithPool(); } - _threadPool.unregister(_transceiver.fd()); + unregisterWithPool(); + super._stream.destroy(); break; } } @@ -889,6 +881,44 @@ public final class Connection extends EventHandler } private void + registerWithPool() + { + if (_adapter != null) + { + if (_serverThreadPool == null) + { + _serverThreadPool = _instance.serverThreadPool(); + assert(_serverThreadPool != null); + } + _serverThreadPool._register(_transceiver.fd(), this); + } + else + { + if (_clientThreadPool == null) + { + _clientThreadPool = _instance.clientThreadPool(); + assert(_clientThreadPool != null); + } + _clientThreadPool._register(_transceiver.fd(), this); + } + } + + private void + unregisterWithPool() + { + if (_adapter != null) + { + assert(_serverThreadPool != null); + _serverThreadPool.unregister(_transceiver.fd()); + } + else + { + assert(_clientThreadPool != null); + _clientThreadPool.unregister(_transceiver.fd()); + } + } + + private void warning(String msg, Exception ex) { java.io.StringWriter sw = new java.io.StringWriter(); @@ -927,9 +957,10 @@ public final class Connection extends EventHandler private Transceiver _transceiver; private Endpoint _endpoint; private Ice.ObjectAdapter _adapter; - private ThreadPool _threadPool; private Ice.Logger _logger; private TraceLevels _traceLevels; + private ThreadPool _clientThreadPool; + private ThreadPool _serverThreadPool; private int _nextRequestId; private IntMap _requests = new IntMap(); private Ice.LocalException _exception; diff --git a/java/src/IceInternal/Endpoint.java b/java/src/IceInternal/Endpoint.java index 791807ac9c0..8d6973f1c12 100644 --- a/java/src/IceInternal/Endpoint.java +++ b/java/src/IceInternal/Endpoint.java @@ -10,151 +10,56 @@ package IceInternal; -public abstract class Endpoint implements java.lang.Comparable +public interface Endpoint extends java.lang.Comparable { - public static final short UnknownEndpointType = 0; - public static final short TcpEndpointType = 1; - public static final short SslEndpointType = 2; - public static final short UdpEndpointType = 3; - - public - Endpoint() - { - } - - // - // Create an endpoint from a string - // - public static Endpoint - endpointFromString(Instance instance, String str) - { - String s = str.trim(); - if (s.length() == 0) - { - throw new Ice.EndpointParseException(); - } - - java.util.regex.Pattern p = - java.util.regex.Pattern.compile("([ \t\n\r]+)|$"); - java.util.regex.Matcher m = p.matcher(s); - boolean b = m.find(); - assert(b); - - String protocol = s.substring(0, m.start()); - - if (protocol.equals("default")) - { - protocol = instance.defaultProtocol(); - } - - if (protocol.equals("tcp")) - { - return new TcpEndpoint(instance, s.substring(m.end())); - } - - if (protocol.equals("ssl")) - { - // TODO: SSL - //return new SslEndpoint(instance, s.substring(m.end())); - } - - if (protocol.equals("udp")) - { - return new UdpEndpoint(instance, s.substring(m.end())); - } - - throw new Ice.EndpointParseException(); - } - - // - // Unmarshal an endpoint - // - public static Endpoint - streamRead(BasicStream s) - { - Endpoint v; - short type = s.readShort(); - - switch (type) - { - case TcpEndpointType: - { - v = new TcpEndpoint(s); - break; - } - - /* TODO: SSL - case SslEndpointType: - { - v = new SslEndpoint(s); - break; - } - */ - - case UdpEndpointType: - { - v = new UdpEndpoint(s); - break; - } - - default: - { - v = new UnknownEndpoint(type, s); - break; - } - } - - return v; - } - // // Marshal the endpoint // - public abstract void streamWrite(BasicStream s); + void streamWrite(BasicStream s); // // Convert the endpoint to its string form // - public abstract String toString(); + String toString(); // // Return the endpoint type // - public abstract short type(); + short type(); // // Return the timeout for the endpoint in milliseconds. 0 means // non-blocking, -1 means no timeout. // - public abstract int timeout(); + int timeout(); // // Return a new endpoint with a different timeout value, provided // that timeouts are supported by the endpoint. Otherwise the same // endpoint is returned. // - public abstract Endpoint timeout(int t); + Endpoint timeout(int t); // // Return true if the endpoint is datagram-based. // - public abstract boolean datagram(); + boolean datagram(); // // Return true if the endpoint is secure. // - public abstract boolean secure(); + boolean secure(); // // Return true if the endpoint type is unknown. // - public abstract boolean unknown(); + boolean unknown(); // // Return a client side transceiver for this endpoint, or null if a // transceiver can only be created by a connector. // - public abstract Transceiver clientTransceiver(); + Transceiver clientTransceiver(); // // Return a server side transceiver for this endpoint, or null if a @@ -163,13 +68,13 @@ public abstract class Endpoint implements java.lang.Comparable // "effective" endpoint, which might differ from this endpoint, // for example, if a dynamic port number is assigned. // - public abstract Transceiver serverTransceiver(EndpointHolder endpoint); + Transceiver serverTransceiver(EndpointHolder endpoint); // // Return a connector for this endpoint, or null if no connector // is available. // - public abstract Connector connector(); + Connector connector(); // // Return an acceptor for this endpoint, or null if no acceptors @@ -178,18 +83,18 @@ public abstract class Endpoint implements java.lang.Comparable // from this endpoint, for example, if a dynamic port number is // assigned. // - public abstract Acceptor acceptor(EndpointHolder endpoint); + Acceptor acceptor(EndpointHolder endpoint); // // Check whether the endpoint is equivalent to a specific // Transceiver or Acceptor // - public abstract boolean equivalent(Transceiver transceiver); - public abstract boolean equivalent(Acceptor acceptor); + boolean equivalent(Transceiver transceiver); + boolean equivalent(Acceptor acceptor); // // Compare endpoints for sorting purposes // - public abstract boolean equals(java.lang.Object obj); - public abstract int compareTo(java.lang.Object obj); // From java.lang.Comparable + boolean equals(java.lang.Object obj); + int compareTo(java.lang.Object obj); // From java.lang.Comparable } diff --git a/java/src/IceInternal/EndpointFactory.java b/java/src/IceInternal/EndpointFactory.java new file mode 100644 index 00000000000..396e6f2a183 --- /dev/null +++ b/java/src/IceInternal/EndpointFactory.java @@ -0,0 +1,20 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +package IceInternal; + +public interface EndpointFactory +{ + short type(); + String protocol(); + Endpoint create(String str); + Endpoint read(BasicStream s); + void destroy(); +} diff --git a/java/src/IceInternal/EndpointFactoryManager.java b/java/src/IceInternal/EndpointFactoryManager.java new file mode 100644 index 00000000000..a56a600d6de --- /dev/null +++ b/java/src/IceInternal/EndpointFactoryManager.java @@ -0,0 +1,112 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +package IceInternal; + +public final class EndpointFactoryManager +{ + EndpointFactoryManager(Instance instance) + { + _instance = instance; + } + + public synchronized void + add(EndpointFactory factory) + { + for (int i = 0; i < _factories.size(); i++) + { + EndpointFactory f = (EndpointFactory)_factories.get(i); + if (f.type() == factory.type()) + { + assert(false); + } + } + _factories.add(factory); + } + + public synchronized EndpointFactory + get(short type) + { + for (int i = 0; i < _factories.size(); i++) + { + EndpointFactory f = (EndpointFactory)_factories.get(i); + if (f.type() == type) + { + return f; + } + } + return null; + } + + public synchronized Endpoint + create(String str) + { + String s = str.trim(); + if (s.length() == 0) + { + throw new Ice.EndpointParseException(); + } + + java.util.regex.Pattern p = java.util.regex.Pattern.compile("([ \t\n\r]+)|$"); + java.util.regex.Matcher m = p.matcher(s); + boolean b = m.find(); + assert(b); + + String protocol = s.substring(0, m.start()); + + if (protocol.equals("default")) + { + protocol = _instance.defaultProtocol(); + } + + for (int i = 0; i < _factories.size(); i++) + { + EndpointFactory f = (EndpointFactory)_factories.get(i); + if (f.protocol().equals(protocol)) + { + return f.create(s.substring(m.end())); + } + } + + throw new Ice.EndpointParseException(); + } + + public synchronized Endpoint + read(BasicStream s) + { + Endpoint v; + short type = s.readShort(); + + for (int i = 0; i < _factories.size(); i++) + { + EndpointFactory f = (EndpointFactory)_factories.get(i); + if (f.type() == type) + { + return f.read(s); + } + } + + return new UnknownEndpoint(type, s); + } + + void + destroy() + { + for (int i = 0; i < _factories.size(); i++) + { + EndpointFactory f = (EndpointFactory)_factories.get(i); + f.destroy(); + } + _factories.clear(); + } + + private Instance _instance; + private java.util.ArrayList _factories = new java.util.ArrayList(); +} diff --git a/java/src/IceInternal/EventHandler.java b/java/src/IceInternal/EventHandler.java index 1beac225842..6d8080caef1 100644 --- a/java/src/IceInternal/EventHandler.java +++ b/java/src/IceInternal/EventHandler.java @@ -13,12 +13,6 @@ package IceInternal; abstract class EventHandler { // - // Returns true if the event handler belongs to the server-side of - // an application. Client-side otherwise. - // - abstract boolean server(); - - // // Return true if read() must be called before calling message(). // abstract boolean readable(); @@ -39,14 +33,14 @@ abstract class EventHandler // // A complete message has been received. // - abstract void message(BasicStream stream); + abstract void message(BasicStream stream, ThreadPool threadPool); // // Will be called if the event handler is finally // unregistered. (Calling unregister() does not unregister // immediately.) // - abstract void finished(); + abstract void finished(ThreadPool threadPool); // // Propagate an exception to the event handler. @@ -58,7 +52,7 @@ abstract class EventHandler // handler cannot be destroyed because it is in use, or true // otherwise. // -// abstract boolean tryDestroy(); +// abstract boolean tryDestroy(ThreadPool threadPool); protected EventHandler(Instance instance) diff --git a/java/src/IceInternal/IncomingConnectionFactory.java b/java/src/IceInternal/IncomingConnectionFactory.java index dbbda2c744e..c3948d89497 100644 --- a/java/src/IceInternal/IncomingConnectionFactory.java +++ b/java/src/IceInternal/IncomingConnectionFactory.java @@ -68,12 +68,6 @@ public class IncomingConnectionFactory extends EventHandler // Operations from EventHandler. // public boolean - server() - { - return true; - } - - public boolean readable() { return false; @@ -93,9 +87,9 @@ public class IncomingConnectionFactory extends EventHandler } public synchronized void - message(BasicStream unused) + message(BasicStream unused, ThreadPool threadPool) { - _threadPool.promoteFollower(); + threadPool.promoteFollower(); if (_state != StateActive) { @@ -153,16 +147,16 @@ public class IncomingConnectionFactory extends EventHandler } public synchronized void - finished() + finished(ThreadPool threadPool) { - assert(_state == StateClosed || _state == StateHolding); - - _threadPool.promoteFollower(); + threadPool.promoteFollower(); - if (_state == StateClosed) + if (_state == StateActive) + { + registerWithPool(); + } + else if (_state == StateClosed) { - assert(_connections.isEmpty()); - try { // @@ -203,7 +197,7 @@ public class IncomingConnectionFactory extends EventHandler /* public boolean - tryDestroy() + tryDestroy(ThreadPool threadPool) { // // Do nothing. We don't want collector factories to be closed by @@ -241,7 +235,6 @@ public class IncomingConnectionFactory extends EventHandler _endpoint = h.value; assert(_acceptor != null); _acceptor.listen(); - _threadPool = _instance.threadPool(); } } catch (RuntimeException ex) @@ -285,11 +278,7 @@ public class IncomingConnectionFactory extends EventHandler { return; } - - if (_threadPool != null) - { - _threadPool._register(_acceptor.fd(), this); - } + registerWithPool(); java.util.ListIterator iter = _connections.listIterator(); while (iter.hasNext()) @@ -306,11 +295,7 @@ public class IncomingConnectionFactory extends EventHandler { return; } - - if (_threadPool != null) - { - _threadPool.unregister(_acceptor.fd()); - } + unregisterWithPool(); java.util.ListIterator iter = _connections.listIterator(); while (iter.hasNext()) @@ -323,18 +308,15 @@ public class IncomingConnectionFactory extends EventHandler case StateClosed: { - if (_threadPool != null) + // + // If we come from holding state, we first need to + // register again before we unregister. + // + if (_state == StateHolding) { - // - // If we come from holding state, we first need to - // register again before we unregister. - // - if (_state == StateHolding) - { - _threadPool._register(_acceptor.fd(), this); - } - _threadPool.unregister(_acceptor.fd()); + registerWithPool(); } + unregisterWithPool(); java.util.ListIterator iter = _connections.listIterator(); while (iter.hasNext()) @@ -344,6 +326,8 @@ public class IncomingConnectionFactory extends EventHandler } _connections.clear(); + super._stream.destroy(); + break; } } @@ -352,6 +336,30 @@ public class IncomingConnectionFactory extends EventHandler } private void + registerWithPool() + { + if (_acceptor != null) + { + if (_serverThreadPool == null) + { + _serverThreadPool = _instance.serverThreadPool(); + assert(_serverThreadPool != null); + } + _serverThreadPool._register(_acceptor.fd(), this); + } + } + + private void + unregisterWithPool() + { + if (_acceptor != null) + { + assert(_serverThreadPool != null); + _serverThreadPool.unregister(_acceptor.fd()); + } + } + + private void warning(Ice.LocalException ex) { java.io.StringWriter sw = new java.io.StringWriter(); @@ -364,9 +372,9 @@ public class IncomingConnectionFactory extends EventHandler private Endpoint _endpoint; private Ice.ObjectAdapter _adapter; - private ThreadPool _threadPool; private Acceptor _acceptor; private Transceiver _transceiver; + private ThreadPool _serverThreadPool; private java.util.LinkedList _connections = new java.util.LinkedList(); private int _state; private boolean _warn; diff --git a/java/src/IceInternal/Instance.java b/java/src/IceInternal/Instance.java index d2dd0014a88..97d70b4db8a 100644 --- a/java/src/IceInternal/Instance.java +++ b/java/src/IceInternal/Instance.java @@ -101,9 +101,43 @@ public class Instance } public synchronized ThreadPool - threadPool() + clientThreadPool() { - return _threadPool; + if (_communicator != null) // Not destroyed? + { + if (_clientThreadPool == null) // Lazy initialization. + { + _clientThreadPool = new ThreadPool(this, false); + } + } + + return _clientThreadPool; + } + + public synchronized ThreadPool + serverThreadPool() + { + if (_communicator != null) // Not destroyed? + { + if (_serverThreadPool == null) // Lazy initialization. + { + _serverThreadPool = new ThreadPool(this, false); + } + } + + return _serverThreadPool; + } + + public synchronized EndpointFactoryManager + endpointFactoryManager() + { + return _endpointFactoryManager; + } + + public synchronized Ice.PluginManager + pluginManager() + { + return _pluginManager; } public BufferManager @@ -117,11 +151,16 @@ public class Instance // Only for use by Ice.CommunicatorI // public - Instance(Ice.Communicator communicator, Ice.Properties properties) + Instance(Ice.Communicator communicator, Ice.StringSeqHolder args, Ice.Properties properties) { _communicator = communicator; _properties = properties; + // + // Convert command-line options beginning with --Ice. to properties. + // + args.value = properties.parseCommandLineOptions("Ice", args.value); + try { _logger = new Ice.LoggerI(); @@ -135,6 +174,18 @@ public class Instance _routerManager = new RouterManager(); _referenceFactory = new ReferenceFactory(this); _proxyFactory = new ProxyFactory(this); + + // + // Install TCP and UDP endpoint factories. + // + _endpointFactoryManager = new EndpointFactoryManager(this); + EndpointFactory tcpEndpointFactory = new TcpEndpointFactory(this); + _endpointFactoryManager.add(tcpEndpointFactory); + EndpointFactory udpEndpointFactory = new UdpEndpointFactory(this); + _endpointFactoryManager.add(udpEndpointFactory); + + _pluginManager = new Ice.PluginManagerI(this); + String router = _properties.getProperty("Ice.DefaultRouter"); if (router.length() > 0) { @@ -146,7 +197,6 @@ public class Instance _userExceptionFactoryManager = new UserExceptionFactoryManager(); _objectAdapterFactory = new ObjectAdapterFactory(this); _bufferManager = new BufferManager(); // Must be created before the ThreadPool - _threadPool = new ThreadPool(this); } catch (Ice.LocalException ex) { @@ -166,19 +216,39 @@ public class Instance assert(_servantFactoryManager == null); assert(_userExceptionFactoryManager == null); assert(_objectAdapterFactory == null); - assert(_threadPool == null); + assert(_clientThreadPool == null); + assert(_serverThreadPool == null); assert(_routerManager == null); + assert(_endpointFactoryManager == null); + assert(_pluginManager == null); super.finalize(); } + public void + finishSetup(Ice.StringSeqHolder args) + { + // + // Load plug-ins. + // + //pluginManagerImpl = (Ice.PluginManagerI)_pluginManager; + //pluginManagerImpl.loadPlugins(args); + + // + // Thread pool initialization is now lazy initialization in + // clientThreadPool() and serverThreadPool(). + // + } + // // Only for use by Ice.CommunicatorI // public void destroy() { - ThreadPool threadPool; + ThreadPool clientThreadPool; + ThreadPool serverThreadPool; + Ice.PluginManager pluginManager; synchronized(this) { @@ -231,26 +301,52 @@ public class Instance _outgoingConnectionFactory.destroy(); _outgoingConnectionFactory = null; } - + if (_routerManager != null) { _routerManager.destroy(); _routerManager = null; } - // - // We destroy the thread pool outside the thread - // synchronization. - // - threadPool = _threadPool; - _threadPool = null; - } - - if (threadPool != null) + if (_endpointFactoryManager != null) + { + _endpointFactoryManager.destroy(); + _endpointFactoryManager = null; + } + + // + // We destroy the thread pool outside the thread + // synchronization. + // + clientThreadPool = _clientThreadPool; + _clientThreadPool = null; + serverThreadPool = _serverThreadPool; + _serverThreadPool = null; + + // + // We destroy the plugin manager after the thread pools. + // + pluginManager = _pluginManager; + _pluginManager = null; + } + + if (clientThreadPool != null) + { + clientThreadPool.waitUntilFinished(); + clientThreadPool.destroy(); + clientThreadPool.joinWithAllThreads(); + } + + if (serverThreadPool != null) { - threadPool.waitUntilFinished(); - threadPool.destroy(); - threadPool.joinWithAllThreads(); + serverThreadPool.waitUntilFinished(); + serverThreadPool.destroy(); + serverThreadPool.joinWithAllThreads(); + } + + if (pluginManager != null) + { + pluginManager.destroy(); } } @@ -265,8 +361,11 @@ public class Instance private ObjectFactoryManager _servantFactoryManager; private UserExceptionFactoryManager _userExceptionFactoryManager; private ObjectAdapterFactory _objectAdapterFactory; - private ThreadPool _threadPool; + private ThreadPool _clientThreadPool; + private ThreadPool _serverThreadPool; private String _defaultProtocol; // Immutable, not reset by destroy(). private String _defaultHost; // Immutable, not reset by destroy(). + private EndpointFactoryManager _endpointFactoryManager; + private Ice.PluginManager _pluginManager; private BufferManager _bufferManager; // Immutable, not reset by destroy(). } diff --git a/java/src/IceInternal/ReferenceFactory.java b/java/src/IceInternal/ReferenceFactory.java index 7522ae752eb..ddf84364c79 100644 --- a/java/src/IceInternal/ReferenceFactory.java +++ b/java/src/IceInternal/ReferenceFactory.java @@ -225,7 +225,7 @@ public final class ReferenceFactory } String es = s.substring(beg, end); - Endpoint endp = Endpoint.endpointFromString(_instance, es); + Endpoint endp = _instance.endpointFactoryManager().create(es); if (orig) { @@ -281,7 +281,7 @@ public final class ReferenceFactory origEndpoints = new Endpoint[sz]; for (int i = 0; i < sz; i++) { - origEndpoints[i] = Endpoint.streamRead(s); + origEndpoints[i] = _instance.endpointFactoryManager().read(s); } boolean same = s.readBool(); @@ -295,7 +295,7 @@ public final class ReferenceFactory endpoints = new Endpoint[sz]; for (int i = 0; i < sz; i++) { - endpoints[i] = Endpoint.streamRead(s); + endpoints[i] = _instance.endpointFactoryManager().read(s); } } diff --git a/java/src/IceInternal/TcpEndpoint.java b/java/src/IceInternal/TcpEndpoint.java index 64cd872cdb2..ce848ee6627 100644 --- a/java/src/IceInternal/TcpEndpoint.java +++ b/java/src/IceInternal/TcpEndpoint.java @@ -10,8 +10,10 @@ package IceInternal; -public final class TcpEndpoint extends Endpoint +final class TcpEndpoint implements Endpoint { + final static short TYPE = 1; + public TcpEndpoint(Instance instance, String ho, int po, int ti) { @@ -137,7 +139,7 @@ public final class TcpEndpoint extends Endpoint public void streamWrite(BasicStream s) { - s.writeShort(TcpEndpointType); + s.writeShort(TYPE); s.startWriteEncaps(); s.writeString(_host); s.writeInt(_port); @@ -165,7 +167,7 @@ public final class TcpEndpoint extends Endpoint public short type() { - return TcpEndpointType; + return TYPE; } // diff --git a/java/src/IceInternal/TcpEndpointFactory.java b/java/src/IceInternal/TcpEndpointFactory.java new file mode 100644 index 00000000000..0d941af639d --- /dev/null +++ b/java/src/IceInternal/TcpEndpointFactory.java @@ -0,0 +1,51 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +package IceInternal; + +final class TcpEndpointFactory implements EndpointFactory +{ + TcpEndpointFactory(Instance instance) + { + _instance = instance; + } + + public short + type() + { + return TcpEndpoint.TYPE; + } + + public String + protocol() + { + return "tcp"; + } + + public Endpoint + create(String str) + { + return new TcpEndpoint(_instance, str); + } + + public Endpoint + read(BasicStream s) + { + return new TcpEndpoint(s); + } + + public void + destroy() + { + _instance = null; + } + + private Instance _instance; +} diff --git a/java/src/IceInternal/ThreadPool.java b/java/src/IceInternal/ThreadPool.java index 13a7a646930..94d71ada0f6 100644 --- a/java/src/IceInternal/ThreadPool.java +++ b/java/src/IceInternal/ThreadPool.java @@ -15,14 +15,7 @@ public final class ThreadPool public synchronized void _register(java.nio.channels.SelectableChannel fd, EventHandler handler) { - if (handler.server()) - { - ++_servers; - } - else - { - ++_clients; - } + ++_handlers; _changes.add(new FdHandlerPair(fd, handler)); setInterrupt(0); } @@ -34,62 +27,24 @@ public final class ThreadPool setInterrupt(0); } - public synchronized void - serverIsNowClient() - { - ++_clients; - assert(_servers > 0); - --_servers; - if (_servers == 0) - { - notifyAll(); // For waitUntil...Finished() methods. - } - } - - public synchronized void - clientIsNowServer() - { - ++_servers; - assert(_clients > 0); - --_clients; - } - public void promoteFollower() { + if (_multipleThreads) + { //System.out.println("ThreadPool - promote follower - lock count = " + _threadMutex.count()); - _threadMutex.unlock(); + _threadMutex.unlock(); + } } public void - initiateServerShutdown() + initiateShutdown() { //System.out.println("ThreadPool - initiate server shutdown"); setInterrupt(1); } public synchronized void - waitUntilServerFinished() - { - while (_servers != 0 && _threadNum != 0) - { - try - { - wait(); - } - catch (InterruptedException ex) - { - } - } - - if (_servers != 0) - { - _logger.error("can't wait for graceful server termination in thread pool\n" + - "since all threads have vanished"); - } - } - - public synchronized void waitUntilFinished() { while (_handlers != 0 && _threadNum != 0) @@ -103,14 +58,10 @@ public final class ThreadPool } } - if (_clients + _servers != 0) + if (_handlers != 0) { - _logger.error("can't wait for graceful application termination in thread pool\n" + - "since all threads have vanished"); - } - else - { - assert(_handlers == 0); + _instance.logger().error("can't wait for graceful server termination in thread pool\n" + + "since all threads have vanished"); } } @@ -139,38 +90,16 @@ public final class ThreadPool } } - public synchronized void - setMaxConnections(int maxConnections) - { - if (maxConnections < _threadNum + 1 && maxConnections != 0) - { - _maxConnections = _threadNum + 1; - } - else - { - _maxConnections = maxConnections; - } - } - - public synchronized int - getMaxConnections() - { - return _maxConnections; - } - // // Only for use by Instance // - ThreadPool(Instance instance) + ThreadPool(Instance instance, boolean server) { _instance = instance; - _logger = _instance.logger(); - _properties = _instance.properties(); _destroyed = false; _handlers = 0; - _clients = 0; - _servers = 0; _timeout = 0; + _multipleThreads = false; Network.SocketPair pair = Network.createPipe(); _fdIntrRead = (java.nio.channels.ReadableByteChannel)pair.source; @@ -206,13 +135,26 @@ public final class ThreadPool // _keysIter = null; - _timeout = _properties.getPropertyAsInt("Ice.ServerIdleTime"); - _threadNum = _properties.getPropertyAsIntWithDefault("Ice.ThreadPool.Size", 10); + if (server) + { + _timeout = _instance.properties().getPropertyAsInt("Ice.ServerIdleTime"); + _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ServerThreadPool.Size", 10); + } + else + { + _threadNum = _instance.properties().getPropertyAsIntWithDefault("Ice.ClientThreadPool.Size", 10); + } + if (_threadNum < 1) { _threadNum = 1; } + if (_threadNum > 1) + { + _multipleThreads = true; + } + try { _threads = new EventHandlerThread[_threadNum]; @@ -227,9 +169,6 @@ public final class ThreadPool destroy(); throw ex; } - - // Must be called after _threadNum is set. - setMaxConnections(_properties.getPropertyAsInt("Ice.ThreadPool.MaxConnections")); } protected void @@ -378,8 +317,11 @@ catch (RuntimeException ex) while (true) { - _threadMutex.lock(); + if (_multipleThreads) + { + _threadMutex.lock(); //System.out.println("ThreadPool - thread " + Thread.currentThread() + " has the lock"); + } repeatSelect: @@ -520,7 +462,7 @@ catch (RuntimeException ex) if (change.handler != null) // Addition if handler is set. { -//System.out.println("ThreadPool - adding handler"); +//System.err.println("ThreadPool - adding handler" + change.handler + ", stream = " + change.handler._stream); int op; if ((change.fd.validOps() & java.nio.channels.SelectionKey.OP_READ) > 0) { @@ -531,7 +473,6 @@ catch (RuntimeException ex) op = java.nio.channels.SelectionKey.OP_ACCEPT; } - _handlers++; try { change.fd.register(_selector, op, change.handler); @@ -549,7 +490,6 @@ catch (RuntimeException ex) assert(key != null); handler = (EventHandler)key.attachment(); finished = true; - --_handlers; key.cancel(); // Don't goto repeatSelect; we have to call // finished() on the event handler below, outside @@ -587,6 +527,7 @@ catch (RuntimeException ex) } handler = (EventHandler)key.attachment(); +//System.err.println("Selected handler with stream " + handler._stream + " - key is " + (key.isValid() ? "" : "NOT ") + "valid"); } } @@ -598,25 +539,14 @@ catch (RuntimeException ex) // Notify a handler about it's removal from the thread // pool. // - handler.finished(); - handler._stream.destroy(); + handler.finished(this); synchronized (this) { - if (handler.server()) - { - assert(_servers > 0); - --_servers; - } - else - { - assert(_clients > 0); - --_clients; - } -//System.out.println("ThreadPool - handler is finished - _handlers = " + _handlers + ", _clients = " + _clients + ", _servers = " + _servers); - if (_clients == 0 || _servers == 0) + assert(_handlers > 0); + if (--_handlers == 0) { - notifyAll(); // For waitUntil...Finished() methods. + notifyAll(); // For waitUntilFinished(). } } } @@ -656,7 +586,7 @@ catch (RuntimeException ex) assert(stream.pos() == stream.size()); } - handler.message(stream); + handler.message(stream, this); } finally { @@ -751,8 +681,6 @@ catch (RuntimeException ex) } private Instance _instance; - private Ice.Logger _logger; - private Ice.Properties _properties; private boolean _destroyed; private java.nio.channels.ReadableByteChannel _fdIntrRead; private java.nio.channels.SelectionKey _fdIntrReadKey; @@ -762,10 +690,9 @@ catch (RuntimeException ex) private java.util.Iterator _keysIter; private java.util.LinkedList _changes = new java.util.LinkedList(); private int _handlers; - private int _clients; - private int _servers; private int _timeout; private RecursiveMutex _threadMutex = new RecursiveMutex(); + private boolean _multipleThreads; private final static class EventHandlerThread extends Thread { @@ -790,7 +717,7 @@ catch (RuntimeException ex) ex.printStackTrace(pw); pw.flush(); String s = "exception in thread pool:\n" + sw.toString(); - _pool._logger.error(s); + _pool._instance.logger().error(s); } catch (RuntimeException ex) { @@ -799,7 +726,7 @@ catch (RuntimeException ex) ex.printStackTrace(pw); pw.flush(); String s = "unknown exception in thread pool:\n" + sw.toString(); - _pool._logger.error(s); + _pool._instance.logger().error(s); } synchronized(_pool) @@ -832,7 +759,4 @@ catch (RuntimeException ex) } private EventHandlerThread[] _threads; private int _threadNum; // Number of running threads - private int _maxConnections; // Maximum number of connections. If set to - // zero, the number of connections is not - // limited. } diff --git a/java/src/IceInternal/UdpEndpoint.java b/java/src/IceInternal/UdpEndpoint.java index 47eace060b6..c55beaf0e04 100644 --- a/java/src/IceInternal/UdpEndpoint.java +++ b/java/src/IceInternal/UdpEndpoint.java @@ -10,8 +10,10 @@ package IceInternal; -public final class UdpEndpoint extends Endpoint +final class UdpEndpoint implements Endpoint { + final static short TYPE = 3; + public UdpEndpoint(Instance instance, String ho, int po) { @@ -131,7 +133,7 @@ public final class UdpEndpoint extends Endpoint public void streamWrite(BasicStream s) { - s.writeShort(UdpEndpointType); + s.writeShort(TYPE); s.startWriteEncaps(); s.writeString(_host); s.writeInt(_port); @@ -160,7 +162,7 @@ public final class UdpEndpoint extends Endpoint public short type() { - return UdpEndpointType; + return TYPE; } // diff --git a/java/src/IceInternal/UdpEndpointFactory.java b/java/src/IceInternal/UdpEndpointFactory.java new file mode 100644 index 00000000000..45710224b71 --- /dev/null +++ b/java/src/IceInternal/UdpEndpointFactory.java @@ -0,0 +1,51 @@ +// ********************************************************************** +// +// Copyright (c) 2002 +// MutableRealms, Inc. +// Huntsville, AL, USA +// +// All Rights Reserved +// +// ********************************************************************** + +package IceInternal; + +final class UdpEndpointFactory implements EndpointFactory +{ + UdpEndpointFactory(Instance instance) + { + _instance = instance; + } + + public short + type() + { + return UdpEndpoint.TYPE; + } + + public String + protocol() + { + return "udp"; + } + + public Endpoint + create(String str) + { + return new UdpEndpoint(_instance, str); + } + + public Endpoint + read(BasicStream s) + { + return new UdpEndpoint(s); + } + + public void + destroy() + { + _instance = null; + } + + private Instance _instance; +} diff --git a/java/src/IceInternal/UnknownEndpoint.java b/java/src/IceInternal/UnknownEndpoint.java index 372cb9ed533..b8dcd180844 100644 --- a/java/src/IceInternal/UnknownEndpoint.java +++ b/java/src/IceInternal/UnknownEndpoint.java @@ -10,7 +10,7 @@ package IceInternal; -public final class UnknownEndpoint extends Endpoint +final class UnknownEndpoint implements Endpoint { public UnknownEndpoint(short type, BasicStream s) |